La mayoría de los equipos comienzan con la generación de vídeo por IA haciendo llamadas únicas a la API: generan un solo vídeo, lo descargan y siguen adelante. Eso funciona para experimentar.
*Última actualización: 28 de febrero de 2026*
Vea estos modelos en acción:
Arquitectura del pipeline
Antes de escribir el código, aquí está la arquitectura de alto nivel de lo que vamos a construir:
plaintext1``` 2+-------------------+ +--------------------+ +------------------+ 3| Prompt Config | | Atlas Cloud API | | Output Storage | 4| (JSON/YAML) | | | | | 5| - prompts +---->+ /generateImage +---->+ /images/ | 6| - models | | /generateVideo | | /videos/ | 7| - parameters | | /prediction/get | | /manifest.json | 8+-------------------+ +--------------------+ +------------------+ 9 | | | 10 v v v 11+-------------------+ +--------------------+ +------------------+ 12| Pipeline Engine | | Polling & Retry | | Cost Tracker | 13| | | | | | 14| - batch_generate | | - exponential | | - per-request | 15| - concurrency | | backoff | | - cumulative | 16| - model routing | | - max retries | | - per-model | 17+-------------------+ +--------------------+ +------------------+ 18```
El pipeline sigue un flujo sencillo:
- Leer configuraciones de prompts desde un archivo de entrada estructurado.
- Enviar cada prompt al modelo y endpoint apropiado (imagen o vídeo).
- Enviar todas las solicitudes a la API de Atlas Cloud con concurrencia controlada.
- Consultar (poll) los resultados con lógica de reintento y retroceso exponencial (exponential backoff).
- Descargar los resultados completados y guardarlos en directorios organizados.
- Realizar un seguimiento de costes y generar un manifiesto de resumen.
Primeros pasos: Acceso a la API
Paso 1: Obtén tu clave de API
Regístrate en Atlas Cloud y crea una clave de API desde el panel de control. El crédito gratuito de USD1 es suficiente para probar el pipeline completo con varias generaciones de imágenes y vídeos.


Paso 2: Instalar dependencias
plaintext1```bash 2pip install requests pyyaml 3```
No se requieren frameworks pesados. El pipeline utiliza únicamente
1requests1pyyamlCódigo completo del pipeline
A continuación se muestra el pipeline funcional completo. Cada sección se explica después del bloque de código.
plaintext1```python 2import requests 3import time 4import json 5import os 6import logging 7from concurrent.futures import ThreadPoolExecutor, as_completed 8from dataclasses import dataclass, field 9from typing import Optional 10from datetime import datetime 11 12# Configurar logging 13logging.basicConfig( 14 level=logging.INFO, 15 format="%(asctime)s [%(levelname)s] %(message)s", 16 datefmt="%Y-%m-%d %H:%M:%S" 17) 18logger = logging.getLogger("atlas_pipeline") 19 20@dataclass 21class GenerationResult: 22 """Almacena el resultado de una única solicitud de generación.""" 23 name: str 24 model: str 25 media_type: str # "image" o "video" 26 status: str # "success", "failed", "error" 27 output_url: Optional[str] = None 28 local_path: Optional[str] = None 29 cost_estimate: float = 0.0 30 duration_seconds: float = 0.0 31 error_message: Optional[str] = None 32 33class AtlasCloudClient: 34 """Cliente para la API de Atlas Cloud.""" 35 36 BASE_URL = "https://api.atlascloud.ai/api/v1" 37 38 # Precios por modelo (aproximados) 39 PRICING = { 40 "black-forest-labs/flux-2-pro/text-to-image": 0.04, # por imagen 41 "google/imagen4-ultra/text-to-image": 0.06, # por imagen 42 "bytedance/seedance-v1.5-pro/text-to-video": 0.022, # por segundo 43 "google/veo3.1/text-to-video": 0.03, # por segundo 44 "openai/sora-v2/text-to-video": 0.15, # por segundo 45 } 46 47 def __init__(self, api_key: str): 48 self.api_key = api_key 49 self.session = requests.Session() 50 self.session.headers.update({ 51 "Authorization": f"Bearer {api_key}", 52 "Content-Type": "application/json" 53 }) 54 55 def generate_image( 56 self, 57 model: str, 58 prompt: str, 59 width: int = 1024, 60 height: int = 1024 61 ) -> dict: 62 """Enviar una solicitud de generación de imagen.""" 63 response = self.session.post( 64 f"{self.BASE_URL}/model/generateImage", 65 json={ 66 "model": model, 67 "prompt": prompt, 68 "width": width, 69 "height": height 70 } 71 ) 72 response.raise_for_status() 73 return response.json() 74 75 def generate_video( 76 self, 77 model: str, 78 prompt: str, 79 duration: int = 5, 80 resolution: str = "1080p" 81 ) -> dict: 82 """Enviar una solicitud de generación de vídeo.""" 83 response = self.session.post( 84 f"{self.BASE_URL}/model/generateVideo", 85 json={ 86 "model": model, 87 "prompt": prompt, 88 "duration": duration, 89 "resolution": resolution 90 } 91 ) 92 response.raise_for_status() 93 return response.json() 94 95 def poll_result( 96 self, 97 request_id: str, 98 max_wait: int = 300, 99 initial_interval: int = 5, 100 max_interval: int = 30 101 ) -> Optional[dict]: 102 """Consultar el resultado con retroceso exponencial.""" 103 start_time = time.time() 104 interval = initial_interval 105 106 while time.time() - start_time < max_wait: 107 try: 108 response = self.session.get( 109 f"{self.BASE_URL}/model/prediction/{request_id}/get" 110 ) 111 data = response.json() 112 113 if data["status"] == "completed": 114 return data 115 elif data["status"] == "failed": 116 logger.error(f"La generación falló: {data.get('error', 'Error desconocido')}") 117 return None 118 119 logger.debug(f"Estado: {data['status']}, esperando {interval}s...") 120 time.sleep(interval) 121 interval = min(interval * 1.5, max_interval) 122 123 except requests.RequestException as e: 124 logger.warning(f"La consulta falló: {e}, reintentando en {interval}s") 125 time.sleep(interval) 126 127 logger.error(f"Tiempo de espera agotado después de {max_wait}s para {request_id}") 128 return None 129 130 def estimate_cost(self, model: str, duration: int = 0) -> float: 131 """Estimar el coste de una solicitud.""" 132 base_price = self.PRICING.get(model, 0.05) 133 if "text-to-video" in model and duration > 0: 134 return base_price * duration 135 return base_price 136 137class VideoPipeline: 138 """Orquesta la generación por lotes de imágenes y vídeos.""" 139 140 def __init__(self, api_key: str, output_dir: str = "pipeline_output"): 141 self.client = AtlasCloudClient(api_key) 142 self.output_dir = output_dir 143 self.results: list[GenerationResult] = [] 144 self.total_cost = 0.0 145 146 os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) 147 os.makedirs(os.path.join(output_dir, "videos"), exist_ok=True) 148 149 def _download_file(self, url: str, filepath: str) -> bool: 150 try: 151 response = requests.get(url, timeout=60) 152 response.raise_for_status() 153 with open(filepath, "wb") as f: 154 f.write(response.content) 155 return True 156 except Exception as e: 157 logger.error(f"Error en descarga para {url}: {e}") 158 return False 159 160 def _safe_filename(self, name: str, extension: str) -> str: 161 safe = name.lower().replace(" ", "_") 162 safe = "".join(c for c in safe if c.isalnum() or c == "_") 163 return f"{safe}.{extension}" 164 165 def _process_image(self, name: str, model: str, prompt: str, 166 width: int = 1024, height: int = 1024, 167 retries: int = 2) -> GenerationResult: 168 start = time.time() 169 cost = self.client.estimate_cost(model) 170 171 for attempt in range(retries + 1): 172 try: 173 logger.info(f"[Imagen] Generando '{name}' (intento {attempt + 1})") 174 result = self.client.generate_image(model, prompt, width, height) 175 request_id = result["request_id"] 176 177 data = self.client.poll_result(request_id) 178 if data and data["status"] == "completed": 179 image_url = data["output"]["image_url"] 180 filename = self._safe_filename(name, "png") 181 filepath = os.path.join(self.output_dir, "images", filename) 182 self._download_file(image_url, filepath) 183 184 return GenerationResult( 185 name=name, model=model, media_type="image", 186 status="success", output_url=image_url, 187 local_path=filepath, cost_estimate=cost, 188 duration_seconds=time.time() - start 189 ) 190 except requests.HTTPError as e: 191 if e.response.status_code == 429: 192 wait = 2 ** (attempt + 2) 193 logger.warning(f"Límite de tasa alcanzado, esperando {wait}s") 194 time.sleep(wait) 195 continue 196 logger.error(f"Error HTTP generando '{name}': {e}") 197 except Exception as e: 198 logger.error(f"Error generando '{name}': {e}") 199 200 if attempt < retries: 201 time.sleep(2 ** attempt) 202 203 return GenerationResult( 204 name=name, model=model, media_type="image", 205 status="failed", cost_estimate=0, 206 duration_seconds=time.time() - start, 207 error_message="Máximo de reintentos excedido" 208 ) 209 210 def _process_video(self, name: str, model: str, prompt: str, 211 duration: int = 5, resolution: str = "1080p", 212 retries: int = 2) -> GenerationResult: 213 start = time.time() 214 cost = self.client.estimate_cost(model, duration) 215 216 for attempt in range(retries + 1): 217 try: 218 logger.info(f"[Vídeo] Generando '{name}' (intento {attempt + 1})") 219 result = self.client.generate_video(model, prompt, duration, resolution) 220 request_id = result["request_id"] 221 222 data = self.client.poll_result(request_id, max_wait=600) 223 if data and data["status"] == "completed": 224 video_url = data["output"]["video_url"] 225 filename = self._safe_filename(name, "mp4") 226 filepath = os.path.join(self.output_dir, "videos", filename) 227 self._download_file(video_url, filepath) 228 229 return GenerationResult( 230 name=name, model=model, media_type="video", 231 status="success", output_url=video_url, 232 local_path=filepath, cost_estimate=cost, 233 duration_seconds=time.time() - start 234 ) 235 except requests.HTTPError as e: 236 if e.response.status_code == 429: 237 wait = 2 ** (attempt + 2) 238 logger.warning(f"Límite de tasa alcanzado, esperando {wait}s") 239 time.sleep(wait) 240 continue 241 logger.error(f"Error HTTP generando '{name}': {e}") 242 except Exception as e: 243 logger.error(f"Error generando '{name}': {e}") 244 245 if attempt < retries: 246 time.sleep(2 ** (attempt + 1)) 247 248 return GenerationResult( 249 name=name, model=model, media_type="video", 250 status="failed", cost_estimate=0, 251 duration_seconds=time.time() - start, 252 error_message="Máximo de reintentos excedido" 253 ) 254 255 def batch_generate(self, jobs: list[dict], max_workers: int = 3): 256 logger.info(f"Iniciando lote de {len(jobs)} trabajos con {max_workers} hilos") 257 start_time = time.time() 258 259 with ThreadPoolExecutor(max_workers=max_workers) as executor: 260 futures = {} 261 for job in jobs: 262 if job["type"] == "image": 263 future = executor.submit( 264 self._process_image, 265 name=job["name"], 266 model=job["model"], 267 prompt=job["prompt"], 268 width=job.get("width", 1024), 269 height=job.get("height", 1024) 270 ) 271 elif job["type"] == "video": 272 future = executor.submit( 273 self._process_video, 274 name=job["name"], 275 model=job["model"], 276 prompt=job["prompt"], 277 duration=job.get("duration", 5), 278 resolution=job.get("resolution", "1080p") 279 ) 280 else: 281 logger.warning(f"Tipo de trabajo desconocido: {job['type']}") 282 continue 283 futures[future] = job["name"] 284 285 for future in as_completed(futures): 286 result = future.result() 287 self.results.append(result) 288 self.total_cost += result.cost_estimate 289 status_icon = "OK" if result.status == "success" else "FAIL" 290 logger.info( 291 f"[{status_icon}] {result.name} -- " 292 f"USD{result.cost_estimate:.3f} -- " 293 f"{result.duration_seconds:.1f}s" 294 ) 295 296 elapsed = time.time() - start_time 297 self._save_manifest() 298 self._print_summary(elapsed) 299 300 def _save_manifest(self): 301 manifest = { 302 "generated_at": datetime.now().isoformat(), 303 "total_cost": round(self.total_cost, 4), 304 "total_jobs": len(self.results), 305 "successful": sum(1 for r in self.results if r.status == "success"), 306 "failed": sum(1 for r in self.results if r.status != "success"), 307 "results": [ 308 { 309 "name": r.name, 310 "model": r.model, 311 "type": r.media_type, 312 "status": r.status, 313 "output_url": r.output_url, 314 "local_path": r.local_path, 315 "cost": round(r.cost_estimate, 4), 316 "generation_time": round(r.duration_seconds, 1), 317 "error": r.error_message 318 } 319 for r in self.results 320 ] 321 } 322 manifest_path = os.path.join(self.output_dir, "manifest.json") 323 with open(manifest_path, "w") as f: 324 json.dump(manifest, f, indent=2) 325 logger.info(f"Manifiesto guardado en {manifest_path}") 326 327 def _print_summary(self, elapsed: float): 328 success = sum(1 for r in self.results if r.status == "success") 329 failed = len(self.results) - success 330 cost_by_model = {} 331 for r in self.results: 332 cost_by_model[r.model] = cost_by_model.get(r.model, 0) + r.cost_estimate 333 334 print("\n" + "=" * 60) 335 print("RESUMEN DEL PIPELINE") 336 print("=" * 60) 337 print(f"Total trabajos: {len(self.results)}") 338 print(f"Exitosos: {success}") 339 print(f"Fallidos: {failed}") 340 print(f"Coste total: USD{self.total_cost:.4f}") 341 print(f"Tiempo total: {elapsed:.1f}s") 342 print(f"\nCoste por modelo:") 343 for model, cost in sorted(cost_by_model.items()): 344 short_name = model.split("/")[1] 345 print(f" {short_name}: USD{cost:.4f}") 346 print("=" * 60) 347```
Uso del pipeline
Con las clases
1AtlasCloudClient1VideoPipelineUso básico: Miniaturas + Vídeos
plaintext1```python 2API_KEY = "tu-clave-api-de-atlas-cloud" 3 4pipeline = VideoPipeline(api_key=API_KEY, output_dir="contenido_semanal") 5 6jobs = [ 7 # Generar miniaturas con Flux 2 Pro 8 { 9 "name": "Miniatura Lanzamiento", 10 "type": "image", 11 "model": "black-forest-labs/flux-2-pro/text-to-image", 12 "prompt": "Miniatura de YouTube llamativa, texto en negrita 'NUEVO LANZAMIENTO', " 13 "foco en producto sobre fondo degradado oscuro, colores acentuados vibrantes, " 14 "diseño profesional, 4K" 15 }, 16 { 17 "name": "Miniatura Tutorial", 18 "type": "image", 19 "model": "black-forest-labs/flux-2-pro/text-to-image", 20 "prompt": "Miniatura de YouTube para tutorial de programación, pantalla dividida " 21 "mostrando editor de código y resultado final, estética tecnológica, " 22 "diseño limpio y moderno, texto legible y grande" 23 }, 24 25 # Generar vídeos con Seedance 2.0 (económico) 26 { 27 "name": "Showcase Producto", 28 "type": "video", 29 "model": "bytedance/seedance-v1.5-pro/text-to-video", 30 "prompt": "Animación elegante de presentación de producto, gadget moderno emergiendo " 31 "de una luz suave, girando lentamente para mostrar todos los ángulos, " 32 "fondo blanco minimalista, iluminación cinematográfica", 33 "duration": 10 34 }, 35 36 # Generar vídeo cinematográfico con Veo 3.1 (con audio) 37 { 38 "name": "Vídeo Hero", 39 "type": "video", 40 "model": "google/veo3.1/text-to-video", 41 "prompt": "Plano aéreo cinematográfico de un skyline moderno durante la hora dorada, " 42 "cámara avanzando lentamente, destello de lente del sol poniente, " 43 "sonidos ambientales de ciudad, grano de película, " 44 "corrección de color profesional", 45 "duration": 8 46 }, 47] 48 49pipeline.batch_generate(jobs, max_workers=3) 50```
Enfoque basado en configuración (YAML)
Para pipelines recurrentes, define los trabajos en un archivo de configuración YAML:
plaintext1```yaml 2# pipeline_config.yaml 3output_dir: contenido_semanal 4max_workers: 3 5 6jobs: 7 - name: Imagen Hero Producto 8 type: image 9 model: google/imagen4-ultra/text-to-image 10 prompt: > 11 Fotografía de producto premium de auriculares inalámbricos en estuche de carga, 12 superficie reflectante oscura, iluminación dramática, estética tecnológica de lujo, 13 resolución 8K, calidad comercial 14 width: 2048 15 height: 2048 16 17 - name: Vídeo Redes Sociales 18 type: video 19 model: bytedance/seedance-v1.5-pro/text-to-video 20 prompt: > 21 Contenido de tendencia para redes sociales, manos abriendo un producto tecnológico premium, 22 momento de revelación satisfactorio, detalles en primer plano, iluminación natural brillante, 23 formato vertical 24 duration: 10 25 resolution: 1080p 26 27 - name: Anuncio Cinematográfico 28 type: video 29 model: google/veo3.1/text-to-video 30 prompt: > 31 Comercial cinematográfico para auriculares premium, persona poniéndose 32 auriculares en una cafetería concurrida, el mundo se silencia, profundidad de campo 33 superficial, paleta de colores cálidos, sonidos ambientales de café desvaneciéndose al silencio 34 duration: 8 35 resolution: 1080p 36```
Cargar y ejecutar:
plaintext1```python 2import yaml 3 4with open("pipeline_config.yaml") as f: 5 config = yaml.safe_load(f) 6 7pipeline = VideoPipeline( 8 api_key=API_KEY, 9 output_dir=config["output_dir"] 10) 11pipeline.batch_generate( 12 config["jobs"], 13 max_workers=config.get("max_workers", 3) 14) 15```
Detalles clave de implementación
Consulta con retroceso exponencial
La generación de vídeo toma desde 30 segundos hasta 5 minutos según el modelo y la duración. El pipeline utiliza retroceso exponencial para consultar eficientemente sin saturar la API:
plaintext1```python 2interval = initial_interval # comienza en 5s 3while time.time() - start_time < max_wait: 4 # ... verificar estado ... 5 time.sleep(interval) 6 interval = min(interval * 1.5, max_interval) # crece hasta un máximo de 30s 7```
Esto significa que las primeras consultas ocurren a intervalos de 5 segundos, luego se van espaciando gradualmente hasta 30 segundos para generaciones más largas, reduciendo las llamadas innecesarias a la API en un 60% aproximadamente.
Gestión de límites de tasa
Cuando la API devuelve un estado 429 (límite de tasa), el pipeline retrocede exponencialmente en lugar de fallar inmediatamente:
plaintext1```python 2except requests.HTTPError as e: 3 if e.response.status_code == 429: 4 wait = 2 ** (attempt + 2) # 4s, 8s, 16s 5 logger.warning(f"Límite de tasa alcanzado, esperando {wait}s") 6 time.sleep(wait) 7 continue 8```
Esto es esencial para operaciones por lotes donde muchas solicitudes concurrentes pueden exceder temporalmente los límites.
Control de concurrencia
El
1ThreadPoolExecutorplaintext1```python 2with ThreadPoolExecutor(max_workers=3) as executor: 3 futures = {executor.submit(process, job): job for job in jobs} 4```
Comienza con
1max_workers=3Seguimiento de costes
Cada solicitud de generación obtiene una estimación de coste basada en la tabla de precios del modelo:
plaintext1```python 2PRICING = { 3 "black-forest-labs/flux-2-pro/text-to-image": 0.04, 4 "bytedance/seedance-v1.5-pro/text-to-video": 0.022, # por segundo 5 "google/veo3.1/text-to-video": 0.03, # por segundo 6} 7```
Para los modelos de vídeo, el coste escala con la duración. El archivo de manifiesto rastrea los costes por solicitud y los acumulados para el control presupuestario.
Estimación de costes para ejecuciones del pipeline
Aquí tienes lo que cuestan las ejecuciones típicas del pipeline:
| Escenario de Pipeline | Trabajos | Modelos usados | Coste est. | Tiempo est. |
|---|---|---|---|---|
| Pack semanal redes sociales | 10 imágenes + 5 vídeos (5s c/u) | Flux 2 Pro + Seedance 2.0 | USD0.95 | ~10 min |
| Campaña lanzamiento | 20 imágenes + 10 vídeos (10s c/u) | Flux 2 Pro + Imagen 4 Ultra + Seedance 2.0 | USD3.80 | ~25 min |
| Biblioteca mensual | 50 imágenes + 20 vídeos (8s c/u) | Mezcla | USD7.50 | ~45 min |
| Catálogo e-commerce (500 SKUs) | 500 imágenes | Flux 2 Pro | USD20.00 | ~30 min |
| Serie de anuncios cinemáticos | 5 imágenes + 5 vídeos (8s c/u) | Imagen 4 Ultra + Veo 3.1 | USD1.50 | ~20 min |
Comparativa de costes Seedance 2.0 frente a Veo 3.1 para el mismo vídeo:
| Modelo | Vídeo 5s | Vídeo 10s | Vídeo 15s |
|---|---|---|---|
| Seedance 2.0 (Rápido) | USD0.11 | USD0.22 | USD0.33 |
| Veo 3.1 | USD0.15 | USD0.30 | N/A (máx 8s) |
| Sora 2 | USD0.75 | USD1.50 | USD2.25 |
Seedance 2.0 es la opción más económica para generación de gran volumen. Veo 3.1 ofrece un buen equilibrio de calidad y coste para clips cinematográficos cortos. Sora 2 es considerablemente más caro, pero ofrece una simulación física inigualable.
Comienza a construir tu pipeline de vídeo -- USD1 de crédito gratis
Consejos de despliegue
Cron Jobs para generación programada
Ejecuta el pipeline mediante un cron:
plaintext1```bash 2# Generar contenido semanal cada lunes a las 6 AM 30 6 * * 1 cd /ruta/al/proyecto && python run_pipeline.py --config semanal.yaml 4```
Arquitectura basada en colas
Para despliegues más grandes, utiliza una cola de tareas como Celery o Redis Queue para desacoplar la creación del trabajo de su procesamiento:
plaintext1```python 2# tasks.py (ejemplo Celery) 3import os 4from celery import Celery 5from pipeline import AtlasCloudClient 6 7app = Celery("video_tasks", broker="redis://localhost:6379") 8client = AtlasCloudClient(os.environ["ATLAS_CLOUD_API_KEY"]) 9 10@app.task(bind=True, max_retries=3) 11def generate_video_task(self, prompt, model, duration): 12 try: 13 result = client.generate_video(model, prompt, duration) 14 data = client.poll_result(result["request_id"]) 15 if data and data["status"] == "completed": 16 return {"url": data["output"]["video_url"], "status": "success"} 17 return {"status": "failed"} 18 except Exception as e: 19 self.retry(countdown=60, exc=e) 20```
Gestión de variables de entorno
Nunca guardes claves de API en el código fuente. Utiliza variables de entorno:
plaintext1```python 2import os 3 4API_KEY = os.environ.get("ATLAS_CLOUD_API_KEY") 5if not API_KEY: 6 raise ValueError("Variable de entorno ATLAS_CLOUD_API_KEY no establecida") 7```
Veredicto
Construir un pipeline de vídeo por IA no consiste en escribir código inteligente, sino en contar con una infraestructura fiable que gestione las realidades de la integración con APIs: límites de tasa, tiempos de espera, errores, seguimiento de costes y ejecución concurrente. El pipeline de esta guía aborda todo esto. Cópialo, personaliza los prompts y modelos para tu caso de uso, y despliégalo según una programación o mediante una cola.
La combinación de Flux 2 Pro para generación rápida de imágenes, Seedance 2.0 para vídeo rentable y Veo 3.1 para clips cinematográficos te ofrece cobertura para todas las necesidades de producción de contenido. Los tres modelos son accesibles a través de una sola clave de API de Atlas Cloud.
Construye tu pipeline de vídeo con IA -- USD1 de crédito gratis en Atlas Cloud






