Comment créer un pipeline vidéo IA avec Python et Atlas Cloud

La plupart des équipes débutent avec la génération vidéo par IA en effectuant des appels API ponctuels : générer une seule vidéo, la télécharger, puis passer à la suite. Cela fonctionne pour l'expérimentation.

Dernière mise à jour : 28 février 2026

Découvrez ces modèles en action :

j-qDCyXubyE

Architecture du pipeline

Avant d'écrire du code, voici l'architecture globale de ce que nous allons construire :

plaintext
1```
2+-------------------+     +--------------------+     +------------------+
3|  Configuration    |     |  API Atlas Cloud   |     | Stockage Sortie  |
4|  (JSON/YAML)      |     |                    |     |                  |
5|  - prompts        +---->+  /generateImage    +---->+  /images/        |
6|  - models         |     |  /generateVideo    |     |  /videos/        |
7|  - parameters     |     |  /prediction/get   |     |  /manifest.json  |
8+-------------------+     +--------------------+     +------------------+
9          |                         |                         |
10          v                         v                         v
11+-------------------+     +--------------------+     +------------------+
12| Moteur de Pipeline|     |  Polling & Retry   |     |  Suivi des coûts |
13|                   |     |                    |     |                  |
14|  - batch_generate |     |  - backoff         |     |  - par requête   |
15|  - concurrence    |     |    exponentiel     |     |  - cumulé        |
16|  - routage modèle |     |  - max retries     |     |  - par modèle    |
17+-------------------+     +--------------------+     +------------------+
18```

Le pipeline suit un flux simple :

  1. Lecture des configurations des prompts à partir d'un fichier d'entrée structuré.
  2. Routage de chaque prompt vers le modèle et le point de terminaison appropriés (image ou vidéo).
  3. Soumission de toutes les requêtes à l'API Atlas Cloud avec une concurrence contrôlée.
  4. Interrogation (polling) des résultats avec une logique de backoff exponentiel et de tentatives répétées.
  5. Téléchargement des résultats terminés et sauvegarde dans des répertoires organisés.
  6. Suivi des coûts et génération d'un manifeste récapitulatif.

Démarrage : Accès à l'API

Étape 1 : Obtenir votre clé API

Inscrivez-vous sur Atlas Cloud et créez une clé API depuis le tableau de bord. Le crédit gratuit de USD1 est suffisant pour tester le pipeline complet avec plusieurs générations d'images et de vidéos.

image.png

image.png

Étape 2 : Installer les dépendances

plaintext
1```bash
2pip install requests pyyaml
3```

Aucun framework lourd n'est requis. Le pipeline utilise uniquement

text
1requests
pour les appels HTTP,
text
1pyyaml
pour les fichiers de configuration, et les modules de la bibliothèque standard Python pour la concurrence et la gestion des fichiers.

Le code complet du pipeline

Voici le pipeline opérationnel complet. Chaque section est expliquée après le bloc de code.

plaintext
1```python
2import requests
3import time
4import json
5import os
6import logging
7from concurrent.futures import ThreadPoolExecutor, as_completed
8from dataclasses import dataclass, field
9from typing import Optional
10from datetime import datetime
11
12# Configuration de la journalisation
13logging.basicConfig(
14    level=logging.INFO,
15    format="%(asctime)s [%(levelname)s] %(message)s",
16    datefmt="%Y-%m-%d %H:%M:%S"
17)
18logger = logging.getLogger("atlas_pipeline")
19
20@dataclass
21class GenerationResult:
22    """Stocke le résultat d'une seule requête de génération."""
23    name: str
24    model: str
25    media_type: str  # "image" ou "video"
26    status: str  # "success", "failed", "error"
27    output_url: Optional[str] = None
28    local_path: Optional[str] = None
29    cost_estimate: float = 0.0
30    duration_seconds: float = 0.0
31    error_message: Optional[str] = None
32
33class AtlasCloudClient:
34    """Client wrapper pour l'API Atlas Cloud."""
35
36    BASE_URL = "https://api.atlascloud.ai/api/v1"
37
38    # Tarification par modèle (approximative)
39    PRICING = {
40        "black-forest-labs/flux-2-pro/text-to-image": 0.04,        # par image
41        "google/imagen4-ultra/text-to-image": 0.06,                 # par image
42        "bytedance/seedance-v1.5-pro/text-to-video": 0.022,        # par seconde
43        "google/veo3.1/text-to-video": 0.03,                       # par seconde
44        "openai/sora-v2/text-to-video": 0.15,                      # par seconde
45    }
46
47    def __init__(self, api_key: str):
48        self.api_key = api_key
49        self.session = requests.Session()
50        self.session.headers.update({
51            "Authorization": f"Bearer {api_key}",
52            "Content-Type": "application/json"
53        })
54
55    def generate_image(
56        self,
57        model: str,
58        prompt: str,
59        width: int = 1024,
60        height: int = 1024
61    ) -> dict:
62        """Soumet une requête de génération d'image."""
63        response = self.session.post(
64            f"{self.BASE_URL}/model/generateImage",
65            json={
66                "model": model,
67                "prompt": prompt,
68                "width": width,
69                "height": height
70            }
71        )
72        response.raise_for_status()
73        return response.json()
74
75    def generate_video(
76        self,
77        model: str,
78        prompt: str,
79        duration: int = 5,
80        resolution: str = "1080p"
81    ) -> dict:
82        """Soumet une requête de génération de vidéo."""
83        response = self.session.post(
84            f"{self.BASE_URL}/model/generateVideo",
85            json={
86                "model": model,
87                "prompt": prompt,
88                "duration": duration,
89                "resolution": resolution
90            }
91        )
92        response.raise_for_status()
93        return response.json()
94
95    def poll_result(
96        self,
97        request_id: str,
98        max_wait: int = 300,
99        initial_interval: int = 5,
100        max_interval: int = 30
101    ) -> Optional[dict]:
102        """Interroge le résultat avec un backoff exponentiel."""
103        start_time = time.time()
104        interval = initial_interval
105
106        while time.time() - start_time < max_wait:
107            try:
108                response = self.session.get(
109                    f"{self.BASE_URL}/model/prediction/{request_id}/get"
110                )
111                data = response.json()
112
113                if data["status"] == "completed":
114                    return data
115                elif data["status"] == "failed":
116                    logger.error(f"Génération échouée : {data.get('error', 'Erreur inconnue')}")
117                    return None
118
119                logger.debug(f"Statut : {data['status']}, attente {interval}s...")
120                time.sleep(interval)
121                interval = min(interval * 1.5, max_interval)
122
123            except requests.RequestException as e:
124                logger.warning(f"La requête de polling a échoué : {e}, nouvelle tentative dans {interval}s")
125                time.sleep(interval)
126
127        logger.error(f"Délai d'attente dépassé après {max_wait}s pour {request_id}")
128        return None
129
130    def estimate_cost(self, model: str, duration: int = 0) -> float:
131        """Estime le coût d'une requête."""
132        base_price = self.PRICING.get(model, 0.05)
133        if "text-to-video" in model and duration > 0:
134            return base_price * duration
135        return base_price
136
137class VideoPipeline:
138    """Orchestre la génération par lots."""
139
140    def __init__(self, api_key: str, output_dir: str = "pipeline_output"):
141        self.client = AtlasCloudClient(api_key)
142        self.output_dir = output_dir
143        self.results: list[GenerationResult] = []
144        self.total_cost = 0.0
145
146        os.makedirs(os.path.join(output_dir, "images"), exist_ok=True)
147        os.makedirs(os.path.join(output_dir, "videos"), exist_ok=True)
148
149    def _download_file(self, url: str, filepath: str) -> bool:
150        try:
151            response = requests.get(url, timeout=60)
152            response.raise_for_status()
153            with open(filepath, "wb") as f:
154                f.write(response.content)
155            return True
156        except Exception as e:
157            logger.error(f"Téléchargement échoué pour {url} : {e}")
158            return False
159
160    def _safe_filename(self, name: str, extension: str) -> str:
161        safe = name.lower().replace(" ", "_")
162        safe = "".join(c for c in safe if c.isalnum() or c == "_")
163        return f"{safe}.{extension}"
164
165    def _process_image(self, name: str, model: str, prompt: str,
166                       width: int = 1024, height: int = 1024,
167                       retries: int = 2) -> GenerationResult:
168        start = time.time()
169        cost = self.client.estimate_cost(model)
170
171        for attempt in range(retries + 1):
172            try:
173                logger.info(f"[Image] Génération de '{name}' (tentative {attempt + 1})")
174                result = self.client.generate_image(model, prompt, width, height)
175                request_id = result["request_id"]
176
177                data = self.client.poll_result(request_id)
178                if data and data["status"] == "completed":
179                    image_url = data["output"]["image_url"]
180                    filename = self._safe_filename(name, "png")
181                    filepath = os.path.join(self.output_dir, "images", filename)
182                    self._download_file(image_url, filepath)
183
184                    return GenerationResult(
185                        name=name, model=model, media_type="image",
186                        status="success", output_url=image_url,
187                        local_path=filepath, cost_estimate=cost,
188                        duration_seconds=time.time() - start
189                    )
190            except requests.HTTPError as e:
191                if e.response.status_code == 429:
192                    wait = 2 ** (attempt + 2)
193                    logger.warning(f"Limite de taux atteinte, attente {wait}s")
194                    time.sleep(wait)
195                    continue
196                logger.error(f"Erreur HTTP lors de la génération de '{name}' : {e}")
197            except Exception as e:
198                logger.error(f"Erreur lors de la génération de '{name}' : {e}")
199
200            if attempt < retries:
201                time.sleep(2 ** attempt)
202
203        return GenerationResult(
204            name=name, model=model, media_type="image",
205            status="failed", cost_estimate=0,
206            duration_seconds=time.time() - start,
207            error_message="Nombre maximum de tentatives atteint"
208        )
209
210    def _process_video(self, name: str, model: str, prompt: str,
211                       duration: int = 5, resolution: str = "1080p",
212                       retries: int = 2) -> GenerationResult:
213        start = time.time()
214        cost = self.client.estimate_cost(model, duration)
215
216        for attempt in range(retries + 1):
217            try:
218                logger.info(f"[Video] Génération de '{name}' (tentative {attempt + 1})")
219                result = self.client.generate_video(model, prompt, duration, resolution)
220                request_id = result["request_id"]
221
222                data = self.client.poll_result(request_id, max_wait=600)
223                if data and data["status"] == "completed":
224                    video_url = data["output"]["video_url"]
225                    filename = self._safe_filename(name, "mp4")
226                    filepath = os.path.join(self.output_dir, "videos", filename)
227                    self._download_file(video_url, filepath)
228
229                    return GenerationResult(
230                        name=name, model=model, media_type="video",
231                        status="success", output_url=video_url,
232                        local_path=filepath, cost_estimate=cost,
233                        duration_seconds=time.time() - start
234                    )
235            except requests.HTTPError as e:
236                if e.response.status_code == 429:
237                    wait = 2 ** (attempt + 2)
238                    logger.warning(f"Limite de taux atteinte, attente {wait}s")
239                    time.sleep(wait)
240                    continue
241                logger.error(f"Erreur HTTP lors de la génération de '{name}' : {e}")
242            except Exception as e:
243                logger.error(f"Erreur lors de la génération de '{name}' : {e}")
244
245            if attempt < retries:
246                time.sleep(2 ** (attempt + 1))
247
248        return GenerationResult(
249            name=name, model=model, media_type="video",
250            status="failed", cost_estimate=0,
251            duration_seconds=time.time() - start,
252            error_message="Nombre maximum de tentatives atteint"
253        )
254
255    def batch_generate(self, jobs: list[dict], max_workers: int = 3):
256        """Exécute un lot de tâches en parallèle."""
257        logger.info(f"Démarrage d'un lot de {len(jobs)} tâches avec {max_workers} workers")
258        start_time = time.time()
259
260        with ThreadPoolExecutor(max_workers=max_workers) as executor:
261            futures = {}
262            for job in jobs:
263                if job["type"] == "image":
264                    future = executor.submit(
265                        self._process_image,
266                        name=job["name"],
267                        model=job["model"],
268                        prompt=job["prompt"],
269                        width=job.get("width", 1024),
270                        height=job.get("height", 1024)
271                    )
272                elif job["type"] == "video":
273                    future = executor.submit(
274                        self._process_video,
275                        name=job["name"],
276                        model=job["model"],
277                        prompt=job["prompt"],
278                        duration=job.get("duration", 5),
279                        resolution=job.get("resolution", "1080p")
280                    )
281                futures[future] = job["name"]
282
283            for future in as_completed(futures):
284                result = future.result()
285                self.results.append(result)
286                self.total_cost += result.cost_estimate
287                status_icon = "OK" if result.status == "success" else "FAIL"
288                logger.info(
289                    f"[{status_icon}] {result.name} -- "
290                    f"USD{result.cost_estimate:.3f} -- "
291                    f"{result.duration_seconds:.1f}s"
292                )
293
294        elapsed = time.time() - start_time
295        self._save_manifest()
296        self._print_summary(elapsed)
297
298    def _save_manifest(self):
299        manifest = {
300            "generated_at": datetime.now().isoformat(),
301            "total_cost": round(self.total_cost, 4),
302            "total_jobs": len(self.results),
303            "successful": sum(1 for r in self.results if r.status == "success"),
304            "failed": sum(1 for r in self.results if r.status != "success"),
305            "results": [
306                {
307                    "name": r.name,
308                    "model": r.model,
309                    "type": r.media_type,
310                    "status": r.status,
311                    "output_url": r.output_url,
312                    "local_path": r.local_path,
313                    "cost": round(r.cost_estimate, 4),
314                    "generation_time": round(r.duration_seconds, 1),
315                    "error": r.error_message
316                }
317                for r in self.results
318            ]
319        }
320        manifest_path = os.path.join(self.output_dir, "manifest.json")
321        with open(manifest_path, "w") as f:
322            json.dump(manifest, f, indent=2)
323        logger.info(f"Manifeste sauvegardé dans {manifest_path}")
324
325    def _print_summary(self, elapsed: float):
326        success = sum(1 for r in self.results if r.status == "success")
327        failed = len(self.results) - success
328        cost_by_model = {}
329        for r in self.results:
330            cost_by_model[r.model] = cost_by_model.get(r.model, 0) + r.cost_estimate
331
332        print("\n" + "=" * 60)
333        print("RÉSUMÉ DU PIPELINE")
334        print("=" * 60)
335        print(f"Total tâches:     {len(self.results)}")
336        print(f"Réussies:         {success}")
337        print(f"Échouées:         {failed}")
338        print(f"Coût total:       USD{self.total_cost:.4f}")
339        print(f"Temps total:      {elapsed:.1f}s")
340        print(f"\nCoût par modèle:")
341        for model, cost in sorted(cost_by_model.items()):
342            short_name = model.split("/")[1]
343            print(f"  {short_name}: USD{cost:.4f}")
344        print("=" * 60)
345```

Utilisation du pipeline

Avec les classes

text
1AtlasCloudClient
et
text
1VideoPipeline
définies, voici comment les utiliser pour un flux de production de contenu standard.

Usage de base : Miniatures + Vidéos

plaintext
1```python
2API_KEY = "votre-clé-api-atlas-cloud"
3
4pipeline = VideoPipeline(api_key=API_KEY, output_dir="contenu_hebdomadaire")
5
6jobs = [
7    # Générer des miniatures avec Flux 2 Pro
8    {
9        "name": "Miniature Lancement Produit",
10        "type": "image",
11        "model": "black-forest-labs/flux-2-pro/text-to-image",
12        "prompt": "Miniature YouTube accrocheuse, texte en gras 'NOUVEAU LANCEMENT', "
13                  "mise en avant produit sur fond dégradé sombre, couleurs "
14                  "vives, design professionnel, 4K"
15    },
16    
17    # Générer des vidéos avec Seedance 1.5 Pro
18    {
19        "name": "Présentation Produit Seedance",
20        "type": "video",
21        "model": "bytedance/seedance-v1.5-pro/text-to-video",
22        "prompt": "Animation de révélation de produit élégante, gadget moderne "
23                  "émergeant d'une lumière douce, rotation lente, fond blanc "
24                  "minimaliste, éclairage cinématographique",
25        "duration": 10
26    },
27
28    # Générer une vidéo cinématographique avec Veo 3.1
29    {
30        "name": "Vidéo Héros Veo",
31        "type": "video",
32        "model": "google/veo3.1/text-to-video",
33        "prompt": "Prise de vue aérienne cinématographique d'une ville moderne "
34                  "à l'heure dorée, travelling avant lent, reflet d'objectif "
35                  "du soleil couchant, sons ambiants, grain de film, "
36                  "étalonnage professionnel",
37        "duration": 8
38    },
39]
40
41pipeline.batch_generate(jobs, max_workers=3)
42```

Approche basée sur la configuration

Pour des pipelines récurrents, définissez vos tâches dans un fichier YAML :

plaintext
1```yaml
2# pipeline_config.yaml
3output_dir: contenu_hebdomadaire
4max_workers: 3
5
6jobs:
7  - name: Image Héro Produit
8    type: image
9    model: google/imagen4-ultra/text-to-image
10    prompt: >
11      Photographie produit premium d'écouteurs sans fil dans leur boîtier,
12      surface sombre réfléchissante, éclairage dramatique, esthétique tech,
13      résolution 8K
14    width: 2048
15    height: 2048
16
17  - name: Vidéo Réseaux Sociaux
18    type: video
19    model: bytedance/seedance-v1.5-pro/text-to-video
20    prompt: >
21      Contenu réseaux sociaux tendance, mains déballant un produit tech,
22      détails en gros plan, lumière naturelle vive, format vertical
23    duration: 10
24```

Détails techniques clés

Polling avec Backoff Exponentiel

La génération vidéo peut prendre de 30 secondes à 5 minutes. Le pipeline utilise un backoff exponentiel pour interroger l'API efficacement sans la saturer, réduisant les appels inutiles d'environ 60 % par rapport à un intervalle fixe.

Gestion des limites de taux (Rate Limiting)

Lorsqu'une réponse HTTP 429 est reçue, le pipeline patiente selon une progression exponentielle (4s, 8s, 16s) plutôt que d'échouer immédiatement.

Contrôle de la concurrence

Le

text
1ThreadPoolExecutor
limite les requêtes simultanées. Commencez avec
text
1max_workers=3
et augmentez jusqu'à 5-8 selon les capacités de votre compte. Au-delà de 10 requêtes simultanées, le risque de dépasser les limites de taux augmente.

Suivi des coûts

Chaque requête bénéficie d'une estimation de coût basée sur la table de prix intégrée au

text
1AtlasCloudClient
. Le fichier manifeste généré à la fin permet un suivi budgétaire précis.

Estimation des coûts

Scénario PipelineTâchesModèlesCoût Estimé
Pack réseaux sociaux10 images + 5 vidéos (5s)Flux 2 Pro + Seedance 1.5USD0.95
Campagne de lancement20 images + 10 vidéos (10s)MixteUSD3.80

Commencez à construire votre pipeline vidéo -- USD1 de crédit gratuit

Conseils de déploiement

Tâches Cron pour une génération planifiée

Vous pouvez automatiser l'exécution via

text
1cron
:

plaintext
10 6 * * 1 cd /chemin/vers/projet && python run_pipeline.py --config weekly.yaml

Architecture basée sur les files d'attente

Pour des déploiements plus larges, utilisez Celery ou Redis Queue afin de découpler la soumission des tâches du traitement, facilitant ainsi l'intégration avec une application web.

Verdict

Construire un pipeline vidéo par IA ne consiste pas seulement à écrire du code ingénieux, mais à bâtir une infrastructure fiable capable de gérer les réalités des intégrations API : timeouts, échecs, coûts et exécutions concurrentes. Le pipeline présenté ici répond à ces enjeux.

La combinaison de Flux 2 Pro pour la rapidité des images, Seedance 1.5 Pro pour la rentabilité vidéo, et Veo 3.1 pour la qualité cinématographique offre une solution complète, accessible via une seule clé API Atlas Cloud.

Construisez votre pipeline vidéo IA -- USD1 de crédit gratuit sur Atlas Cloud

Modèles associés

Commencez avec Plus de 300 Modèles,

Explorer tous les modèles

Join our Discord community

Join the Discord community for the latest model updates, prompts, and support.