La plupart des équipes débutent avec la génération vidéo par IA en effectuant des appels API ponctuels : générer une seule vidéo, la télécharger, puis passer à la suite. Cela fonctionne pour l'expérimentation.
Dernière mise à jour : 28 février 2026
Découvrez ces modèles en action :
j-qDCyXubyE
Architecture du pipeline
Avant d'écrire du code, voici l'architecture globale de ce que nous allons construire :
plaintext1``` 2+-------------------+ +--------------------+ +------------------+ 3| Configuration | | API Atlas Cloud | | Stockage Sortie | 4| (JSON/YAML) | | | | | 5| - prompts +---->+ /generateImage +---->+ /images/ | 6| - models | | /generateVideo | | /videos/ | 7| - parameters | | /prediction/get | | /manifest.json | 8+-------------------+ +--------------------+ +------------------+ 9 | | | 10 v v v 11+-------------------+ +--------------------+ +------------------+ 12| Moteur de Pipeline| | Polling & Retry | | Suivi des coûts | 13| | | | | | 14| - batch_generate | | - backoff | | - par requête | 15| - concurrence | | exponentiel | | - cumulé | 16| - routage modèle | | - max retries | | - par modèle | 17+-------------------+ +--------------------+ +------------------+ 18```
Le pipeline suit un flux simple :
- Lecture des configurations des prompts à partir d'un fichier d'entrée structuré.
- Routage de chaque prompt vers le modèle et le point de terminaison appropriés (image ou vidéo).
- Soumission de toutes les requêtes à l'API Atlas Cloud avec une concurrence contrôlée.
- Interrogation (polling) des résultats avec une logique de backoff exponentiel et de tentatives répétées.
- Téléchargement des résultats terminés et sauvegarde dans des répertoires organisés.
- Suivi des coûts et génération d'un manifeste récapitulatif.
Démarrage : Accès à l'API
Étape 1 : Obtenir votre clé API
Inscrivez-vous sur Atlas Cloud et créez une clé API depuis le tableau de bord. Le crédit gratuit de USD1 est suffisant pour tester le pipeline complet avec plusieurs générations d'images et de vidéos.


Étape 2 : Installer les dépendances
plaintext1```bash 2pip install requests pyyaml 3```
Aucun framework lourd n'est requis. Le pipeline utilise uniquement
1requests1pyyamlLe code complet du pipeline
Voici le pipeline opérationnel complet. Chaque section est expliquée après le bloc de code.
plaintext1```python 2import requests 3import time 4import json 5import os 6import logging 7from concurrent.futures import ThreadPoolExecutor, as_completed 8from dataclasses import dataclass, field 9from typing import Optional 10from datetime import datetime 11 12# Configuration de la journalisation 13logging.basicConfig( 14 level=logging.INFO, 15 format="%(asctime)s [%(levelname)s] %(message)s", 16 datefmt="%Y-%m-%d %H:%M:%S" 17) 18logger = logging.getLogger("atlas_pipeline") 19 20@dataclass 21class GenerationResult: 22 """Stocke le résultat d'une seule requête de génération.""" 23 name: str 24 model: str 25 media_type: str # "image" ou "video" 26 status: str # "success", "failed", "error" 27 output_url: Optional[str] = None 28 local_path: Optional[str] = None 29 cost_estimate: float = 0.0 30 duration_seconds: float = 0.0 31 error_message: Optional[str] = None 32 33class AtlasCloudClient: 34 """Client wrapper pour l'API Atlas Cloud.""" 35 36 BASE_URL = "https://api.atlascloud.ai/api/v1" 37 38 # Tarification par modèle (approximative) 39 PRICING = { 40 "black-forest-labs/flux-2-pro/text-to-image": 0.04, # par image 41 "google/imagen4-ultra/text-to-image": 0.06, # par image 42 "bytedance/seedance-v1.5-pro/text-to-video": 0.022, # par seconde 43 "google/veo3.1/text-to-video": 0.03, # par seconde 44 "openai/sora-v2/text-to-video": 0.15, # par seconde 45 } 46 47 def __init__(self, api_key: str): 48 self.api_key = api_key 49 self.session = requests.Session() 50 self.session.headers.update({ 51 "Authorization": f"Bearer {api_key}", 52 "Content-Type": "application/json" 53 }) 54 55 def generate_image( 56 self, 57 model: str, 58 prompt: str, 59 width: int = 1024, 60 height: int = 1024 61 ) -> dict: 62 """Soumet une requête de génération d'image.""" 63 response = self.session.post( 64 f"{self.BASE_URL}/model/generateImage", 65 json={ 66 "model": model, 67 "prompt": prompt, 68 "width": width, 69 "height": height 70 } 71 ) 72 response.raise_for_status() 73 return response.json() 74 75 def generate_video( 76 self, 77 model: str, 78 prompt: str, 79 duration: int = 5, 80 resolution: str = "1080p" 81 ) -> dict: 82 """Soumet une requête de génération de vidéo.""" 83 response = self.session.post( 84 f"{self.BASE_URL}/model/generateVideo", 85 json={ 86 "model": model, 87 "prompt": prompt, 88 "duration": duration, 89 "resolution": resolution 90 } 91 ) 92 response.raise_for_status() 93 return response.json() 94 95 def poll_result( 96 self, 97 request_id: str, 98 max_wait: int = 300, 99 initial_interval: int = 5, 100 max_interval: int = 30 101 ) -> Optional[dict]: 102 """Interroge le résultat avec un backoff exponentiel.""" 103 start_time = time.time() 104 interval = initial_interval 105 106 while time.time() - start_time < max_wait: 107 try: 108 response = self.session.get( 109 f"{self.BASE_URL}/model/prediction/{request_id}/get" 110 ) 111 data = response.json() 112 113 if data["status"] == "completed": 114 return data 115 elif data["status"] == "failed": 116 logger.error(f"Génération échouée : {data.get('error', 'Erreur inconnue')}") 117 return None 118 119 logger.debug(f"Statut : {data['status']}, attente {interval}s...") 120 time.sleep(interval) 121 interval = min(interval * 1.5, max_interval) 122 123 except requests.RequestException as e: 124 logger.warning(f"La requête de polling a échoué : {e}, nouvelle tentative dans {interval}s") 125 time.sleep(interval) 126 127 logger.error(f"Délai d'attente dépassé après {max_wait}s pour {request_id}") 128 return None 129 130 def estimate_cost(self, model: str, duration: int = 0) -> float: 131 """Estime le coût d'une requête.""" 132 base_price = self.PRICING.get(model, 0.05) 133 if "text-to-video" in model and duration > 0: 134 return base_price * duration 135 return base_price 136 137class VideoPipeline: 138 """Orchestre la génération par lots.""" 139 140 def __init__(self, api_key: str, output_dir: str = "pipeline_output"): 141 self.client = AtlasCloudClient(api_key) 142 self.output_dir = output_dir 143 self.results: list[GenerationResult] = [] 144 self.total_cost = 0.0 145 146 os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) 147 os.makedirs(os.path.join(output_dir, "videos"), exist_ok=True) 148 149 def _download_file(self, url: str, filepath: str) -> bool: 150 try: 151 response = requests.get(url, timeout=60) 152 response.raise_for_status() 153 with open(filepath, "wb") as f: 154 f.write(response.content) 155 return True 156 except Exception as e: 157 logger.error(f"Téléchargement échoué pour {url} : {e}") 158 return False 159 160 def _safe_filename(self, name: str, extension: str) -> str: 161 safe = name.lower().replace(" ", "_") 162 safe = "".join(c for c in safe if c.isalnum() or c == "_") 163 return f"{safe}.{extension}" 164 165 def _process_image(self, name: str, model: str, prompt: str, 166 width: int = 1024, height: int = 1024, 167 retries: int = 2) -> GenerationResult: 168 start = time.time() 169 cost = self.client.estimate_cost(model) 170 171 for attempt in range(retries + 1): 172 try: 173 logger.info(f"[Image] Génération de '{name}' (tentative {attempt + 1})") 174 result = self.client.generate_image(model, prompt, width, height) 175 request_id = result["request_id"] 176 177 data = self.client.poll_result(request_id) 178 if data and data["status"] == "completed": 179 image_url = data["output"]["image_url"] 180 filename = self._safe_filename(name, "png") 181 filepath = os.path.join(self.output_dir, "images", filename) 182 self._download_file(image_url, filepath) 183 184 return GenerationResult( 185 name=name, model=model, media_type="image", 186 status="success", output_url=image_url, 187 local_path=filepath, cost_estimate=cost, 188 duration_seconds=time.time() - start 189 ) 190 except requests.HTTPError as e: 191 if e.response.status_code == 429: 192 wait = 2 ** (attempt + 2) 193 logger.warning(f"Limite de taux atteinte, attente {wait}s") 194 time.sleep(wait) 195 continue 196 logger.error(f"Erreur HTTP lors de la génération de '{name}' : {e}") 197 except Exception as e: 198 logger.error(f"Erreur lors de la génération de '{name}' : {e}") 199 200 if attempt < retries: 201 time.sleep(2 ** attempt) 202 203 return GenerationResult( 204 name=name, model=model, media_type="image", 205 status="failed", cost_estimate=0, 206 duration_seconds=time.time() - start, 207 error_message="Nombre maximum de tentatives atteint" 208 ) 209 210 def _process_video(self, name: str, model: str, prompt: str, 211 duration: int = 5, resolution: str = "1080p", 212 retries: int = 2) -> GenerationResult: 213 start = time.time() 214 cost = self.client.estimate_cost(model, duration) 215 216 for attempt in range(retries + 1): 217 try: 218 logger.info(f"[Video] Génération de '{name}' (tentative {attempt + 1})") 219 result = self.client.generate_video(model, prompt, duration, resolution) 220 request_id = result["request_id"] 221 222 data = self.client.poll_result(request_id, max_wait=600) 223 if data and data["status"] == "completed": 224 video_url = data["output"]["video_url"] 225 filename = self._safe_filename(name, "mp4") 226 filepath = os.path.join(self.output_dir, "videos", filename) 227 self._download_file(video_url, filepath) 228 229 return GenerationResult( 230 name=name, model=model, media_type="video", 231 status="success", output_url=video_url, 232 local_path=filepath, cost_estimate=cost, 233 duration_seconds=time.time() - start 234 ) 235 except requests.HTTPError as e: 236 if e.response.status_code == 429: 237 wait = 2 ** (attempt + 2) 238 logger.warning(f"Limite de taux atteinte, attente {wait}s") 239 time.sleep(wait) 240 continue 241 logger.error(f"Erreur HTTP lors de la génération de '{name}' : {e}") 242 except Exception as e: 243 logger.error(f"Erreur lors de la génération de '{name}' : {e}") 244 245 if attempt < retries: 246 time.sleep(2 ** (attempt + 1)) 247 248 return GenerationResult( 249 name=name, model=model, media_type="video", 250 status="failed", cost_estimate=0, 251 duration_seconds=time.time() - start, 252 error_message="Nombre maximum de tentatives atteint" 253 ) 254 255 def batch_generate(self, jobs: list[dict], max_workers: int = 3): 256 """Exécute un lot de tâches en parallèle.""" 257 logger.info(f"Démarrage d'un lot de {len(jobs)} tâches avec {max_workers} workers") 258 start_time = time.time() 259 260 with ThreadPoolExecutor(max_workers=max_workers) as executor: 261 futures = {} 262 for job in jobs: 263 if job["type"] == "image": 264 future = executor.submit( 265 self._process_image, 266 name=job["name"], 267 model=job["model"], 268 prompt=job["prompt"], 269 width=job.get("width", 1024), 270 height=job.get("height", 1024) 271 ) 272 elif job["type"] == "video": 273 future = executor.submit( 274 self._process_video, 275 name=job["name"], 276 model=job["model"], 277 prompt=job["prompt"], 278 duration=job.get("duration", 5), 279 resolution=job.get("resolution", "1080p") 280 ) 281 futures[future] = job["name"] 282 283 for future in as_completed(futures): 284 result = future.result() 285 self.results.append(result) 286 self.total_cost += result.cost_estimate 287 status_icon = "OK" if result.status == "success" else "FAIL" 288 logger.info( 289 f"[{status_icon}] {result.name} -- " 290 f"USD{result.cost_estimate:.3f} -- " 291 f"{result.duration_seconds:.1f}s" 292 ) 293 294 elapsed = time.time() - start_time 295 self._save_manifest() 296 self._print_summary(elapsed) 297 298 def _save_manifest(self): 299 manifest = { 300 "generated_at": datetime.now().isoformat(), 301 "total_cost": round(self.total_cost, 4), 302 "total_jobs": len(self.results), 303 "successful": sum(1 for r in self.results if r.status == "success"), 304 "failed": sum(1 for r in self.results if r.status != "success"), 305 "results": [ 306 { 307 "name": r.name, 308 "model": r.model, 309 "type": r.media_type, 310 "status": r.status, 311 "output_url": r.output_url, 312 "local_path": r.local_path, 313 "cost": round(r.cost_estimate, 4), 314 "generation_time": round(r.duration_seconds, 1), 315 "error": r.error_message 316 } 317 for r in self.results 318 ] 319 } 320 manifest_path = os.path.join(self.output_dir, "manifest.json") 321 with open(manifest_path, "w") as f: 322 json.dump(manifest, f, indent=2) 323 logger.info(f"Manifeste sauvegardé dans {manifest_path}") 324 325 def _print_summary(self, elapsed: float): 326 success = sum(1 for r in self.results if r.status == "success") 327 failed = len(self.results) - success 328 cost_by_model = {} 329 for r in self.results: 330 cost_by_model[r.model] = cost_by_model.get(r.model, 0) + r.cost_estimate 331 332 print("\n" + "=" * 60) 333 print("RÉSUMÉ DU PIPELINE") 334 print("=" * 60) 335 print(f"Total tâches: {len(self.results)}") 336 print(f"Réussies: {success}") 337 print(f"Échouées: {failed}") 338 print(f"Coût total: USD{self.total_cost:.4f}") 339 print(f"Temps total: {elapsed:.1f}s") 340 print(f"\nCoût par modèle:") 341 for model, cost in sorted(cost_by_model.items()): 342 short_name = model.split("/")[1] 343 print(f" {short_name}: USD{cost:.4f}") 344 print("=" * 60) 345```
Utilisation du pipeline
Avec les classes
1AtlasCloudClient1VideoPipelineUsage de base : Miniatures + Vidéos
plaintext1```python 2API_KEY = "votre-clé-api-atlas-cloud" 3 4pipeline = VideoPipeline(api_key=API_KEY, output_dir="contenu_hebdomadaire") 5 6jobs = [ 7 # Générer des miniatures avec Flux 2 Pro 8 { 9 "name": "Miniature Lancement Produit", 10 "type": "image", 11 "model": "black-forest-labs/flux-2-pro/text-to-image", 12 "prompt": "Miniature YouTube accrocheuse, texte en gras 'NOUVEAU LANCEMENT', " 13 "mise en avant produit sur fond dégradé sombre, couleurs " 14 "vives, design professionnel, 4K" 15 }, 16 17 # Générer des vidéos avec Seedance 1.5 Pro 18 { 19 "name": "Présentation Produit Seedance", 20 "type": "video", 21 "model": "bytedance/seedance-v1.5-pro/text-to-video", 22 "prompt": "Animation de révélation de produit élégante, gadget moderne " 23 "émergeant d'une lumière douce, rotation lente, fond blanc " 24 "minimaliste, éclairage cinématographique", 25 "duration": 10 26 }, 27 28 # Générer une vidéo cinématographique avec Veo 3.1 29 { 30 "name": "Vidéo Héros Veo", 31 "type": "video", 32 "model": "google/veo3.1/text-to-video", 33 "prompt": "Prise de vue aérienne cinématographique d'une ville moderne " 34 "à l'heure dorée, travelling avant lent, reflet d'objectif " 35 "du soleil couchant, sons ambiants, grain de film, " 36 "étalonnage professionnel", 37 "duration": 8 38 }, 39] 40 41pipeline.batch_generate(jobs, max_workers=3) 42```
Approche basée sur la configuration
Pour des pipelines récurrents, définissez vos tâches dans un fichier YAML :
plaintext1```yaml 2# pipeline_config.yaml 3output_dir: contenu_hebdomadaire 4max_workers: 3 5 6jobs: 7 - name: Image Héro Produit 8 type: image 9 model: google/imagen4-ultra/text-to-image 10 prompt: > 11 Photographie produit premium d'écouteurs sans fil dans leur boîtier, 12 surface sombre réfléchissante, éclairage dramatique, esthétique tech, 13 résolution 8K 14 width: 2048 15 height: 2048 16 17 - name: Vidéo Réseaux Sociaux 18 type: video 19 model: bytedance/seedance-v1.5-pro/text-to-video 20 prompt: > 21 Contenu réseaux sociaux tendance, mains déballant un produit tech, 22 détails en gros plan, lumière naturelle vive, format vertical 23 duration: 10 24```
Détails techniques clés
Polling avec Backoff Exponentiel
La génération vidéo peut prendre de 30 secondes à 5 minutes. Le pipeline utilise un backoff exponentiel pour interroger l'API efficacement sans la saturer, réduisant les appels inutiles d'environ 60 % par rapport à un intervalle fixe.
Gestion des limites de taux (Rate Limiting)
Lorsqu'une réponse HTTP 429 est reçue, le pipeline patiente selon une progression exponentielle (4s, 8s, 16s) plutôt que d'échouer immédiatement.
Contrôle de la concurrence
Le
1ThreadPoolExecutor1max_workers=3Suivi des coûts
Chaque requête bénéficie d'une estimation de coût basée sur la table de prix intégrée au
1AtlasCloudClientEstimation des coûts
| Scénario Pipeline | Tâches | Modèles | Coût Estimé |
|---|---|---|---|
| Pack réseaux sociaux | 10 images + 5 vidéos (5s) | Flux 2 Pro + Seedance 1.5 | USD0.95 |
| Campagne de lancement | 20 images + 10 vidéos (10s) | Mixte | USD3.80 |
Commencez à construire votre pipeline vidéo -- USD1 de crédit gratuit
Conseils de déploiement
Tâches Cron pour une génération planifiée
Vous pouvez automatiser l'exécution via
1cronplaintext10 6 * * 1 cd /chemin/vers/projet && python run_pipeline.py --config weekly.yaml
Architecture basée sur les files d'attente
Pour des déploiements plus larges, utilisez Celery ou Redis Queue afin de découpler la soumission des tâches du traitement, facilitant ainsi l'intégration avec une application web.
Verdict
Construire un pipeline vidéo par IA ne consiste pas seulement à écrire du code ingénieux, mais à bâtir une infrastructure fiable capable de gérer les réalités des intégrations API : timeouts, échecs, coûts et exécutions concurrentes. Le pipeline présenté ici répond à ces enjeux.
La combinaison de Flux 2 Pro pour la rapidité des images, Seedance 1.5 Pro pour la rentabilité vidéo, et Veo 3.1 pour la qualité cinématographique offre une solution complète, accessible via une seule clé API Atlas Cloud.
Construisez votre pipeline vidéo IA -- USD1 de crédit gratuit sur Atlas Cloud






