A maioria das equipes começa com a geração de vídeo por IA fazendo chamadas de API pontuais — gera um único vídeo, faz o download e segue em frente. Isso funciona para experimentação.
*Última atualização: 28 de fevereiro de 2026*
Veja estes modelos em ação:
j-qDCyXubyE
Arquitetura do Pipeline
Antes de escrever o código, aqui está a arquitetura de alto nível do que estamos construindo:
plaintext1``` 2+-------------------+ +--------------------+ +------------------+ 3| Config. de Prompt | | API Atlas Cloud | | Armazenamento | 4| (JSON/YAML) | | | | de Saída | 5| - prompts +---->+ /generateImage +---->+ /images/ | 6| - models | | /generateVideo | | /videos/ | 7| - parameters | | /prediction/get | | /manifest.json | 8+-------------------+ +--------------------+ +------------------+ 9 | | | 10 v v v 11+-------------------+ +--------------------+ +------------------+ 12| Motor de Pipeline| | Polling & Retry | | Rastreio de | 13| | | | | Custos | 14| - batch_generate | | - exponential | | | 15| - concurrency | | backoff | | - por requisição| 16| - roteamento | | - max retries | | - cumulativo | 17+-------------------+ +--------------------+ +------------------+ 18```
O pipeline segue um fluxo simples:
- Ler configurações de prompt de um arquivo de entrada estruturado.
- Roteia cada prompt para o modelo e endpoint apropriado (imagem ou vídeo).
- Envia todas as requisições para a API do Atlas Cloud com concorrência controlada.
- Verifica resultados (polling) com lógica de espera exponencial (backoff) e novas tentativas.
- Faz o download das saídas concluídas e salva em diretórios organizados.
- Rastreia custos e gera um manifesto de resumo.
Primeiros Passos: Acesso à API
Passo 1: Obtenha sua chave de API
Cadastre-se no Atlas Cloud e crie uma chave de API no dashboard. O crédito gratuito de USD1 é suficiente para testar o pipeline completo com várias gerações de imagem e vídeo.


Passo 2: Instale as dependências
plaintext1```bash 2pip install requests pyyaml 3```
Não são necessários frameworks pesados. O pipeline utiliza apenas
1requests1pyyamlO Código Completo do Pipeline
Abaixo está o pipeline funcional completo. Cada seção é explicada após o bloco de código.
plaintext1```python 2import requests 3import time 4import json 5import os 6import logging 7from concurrent.futures import ThreadPoolExecutor, as_completed 8from dataclasses import dataclass, field 9from typing import Optional 10from datetime import datetime 11 12# Configurar logs 13logging.basicConfig( 14 level=logging.INFO, 15 format="%(asctime)s [%(levelname)s] %(message)s", 16 datefmt="%Y-%m-%d %H:%M:%S" 17) 18logger = logging.getLogger("atlas_pipeline") 19 20@dataclass 21class GenerationResult: 22 """Armazena o resultado de uma única requisição de geração.""" 23 name: str 24 model: str 25 media_type: str # "image" ou "video" 26 status: str # "success", "failed", "error" 27 output_url: Optional[str] = None 28 local_path: Optional[str] = None 29 cost_estimate: float = 0.0 30 duration_seconds: float = 0.0 31 error_message: Optional[str] = None 32 33class AtlasCloudClient: 34 """Wrapper de cliente para a API do Atlas Cloud.""" 35 36 BASE_URL = "https://api.atlascloud.ai/api/v1" 37 38 # Preços por modelo (aproximados) 39 PRICING = { 40 "black-forest-labs/flux-2-pro/text-to-image": 0.04, # por imagem 41 "google/imagen4-ultra/text-to-image": 0.06, # por imagem 42 "bytedance/seedance-v1.5-pro/text-to-video": 0.022, # por segundo 43 "google/veo3.1/text-to-video": 0.03, # por segundo 44 "openai/sora-v2/text-to-video": 0.15, # por segundo 45 } 46 47 def __init__(self, api_key: str): 48 self.api_key = api_key 49 self.session = requests.Session() 50 self.session.headers.update({ 51 "Authorization": f"Bearer {api_key}", 52 "Content-Type": "application/json" 53 }) 54 55 def generate_image( 56 self, 57 model: str, 58 prompt: str, 59 width: int = 1024, 60 height: int = 1024 61 ) -> dict: 62 """Envia uma requisição de geração de imagem.""" 63 response = self.session.post( 64 f"{self.BASE_URL}/model/generateImage", 65 json={ 66 "model": model, 67 "prompt": prompt, 68 "width": width, 69 "height": height 70 } 71 ) 72 response.raise_for_status() 73 return response.json() 74 75 def generate_video( 76 self, 77 model: str, 78 prompt: str, 79 duration: int = 5, 80 resolution: str = "1080p" 81 ) -> dict: 82 """Envia uma requisição de geração de vídeo.""" 83 response = self.session.post( 84 f"{self.BASE_URL}/model/generateVideo", 85 json={ 86 "model": model, 87 "prompt": prompt, 88 "duration": duration, 89 "resolution": resolution 90 } 91 ) 92 response.raise_for_status() 93 return response.json() 94 95 def poll_result( 96 self, 97 request_id: str, 98 max_wait: int = 300, 99 initial_interval: int = 5, 100 max_interval: int = 30 101 ) -> Optional[dict]: 102 """Verifica o resultado da geração com exponential backoff.""" 103 start_time = time.time() 104 interval = initial_interval 105 106 while time.time() - start_time < max_wait: 107 try: 108 response = self.session.get( 109 f"{self.BASE_URL}/model/prediction/{request_id}/get" 110 ) 111 data = response.json() 112 113 if data["status"] == "completed": 114 return data 115 elif data["status"] == "failed": 116 logger.error(f"Geração falhou: {data.get('error', 'Erro desconhecido')}") 117 return None 118 119 logger.debug(f"Status: {data['status']}, aguardando {interval}s...") 120 time.sleep(interval) 121 interval = min(interval * 1.5, max_interval) 122 123 except requests.RequestException as e: 124 logger.warning(f"Requisição de poll falhou: {e}, tentando novamente em {interval}s") 125 time.sleep(interval) 126 127 logger.error(f"Tempo limite excedido após {max_wait}s aguardando {request_id}") 128 return None 129 130 def estimate_cost(self, model: str, duration: int = 0) -> float: 131 """Estima o custo de uma requisição de geração.""" 132 base_price = self.PRICING.get(model, 0.05) 133 if "text-to-video" in model and duration > 0: 134 return base_price * duration 135 return base_price 136 137class VideoPipeline: 138 """Orquestra a geração em lote de imagens e vídeos.""" 139 140 def __init__(self, api_key: str, output_dir: str = "pipeline_output"): 141 self.client = AtlasCloudClient(api_key) 142 self.output_dir = output_dir 143 self.results: list[GenerationResult] = [] 144 self.total_cost = 0.0 145 146 os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) 147 os.makedirs(os.path.join(output_dir, "videos"), exist_ok=True) 148 149 def _download_file(self, url: str, filepath: str) -> bool: 150 try: 151 response = requests.get(url, timeout=60) 152 response.raise_for_status() 153 with open(filepath, "wb") as f: 154 f.write(response.content) 155 return True 156 except Exception as e: 157 logger.error(f"Download falhou para {url}: {e}") 158 return False 159 160 def _safe_filename(self, name: str, extension: str) -> str: 161 safe = name.lower().replace(" ", "_") 162 safe = "".join(c for c in safe if c.isalnum() or c == "_") 163 return f"{safe}.{extension}" 164 165 def _process_image(self, name: str, model: str, prompt: str, 166 width: int = 1024, height: int = 1024, 167 retries: int = 2) -> GenerationResult: 168 start = time.time() 169 cost = self.client.estimate_cost(model) 170 171 for attempt in range(retries + 1): 172 try: 173 logger.info(f"[Imagem] Gerando '{name}' (tentativa {attempt + 1})") 174 result = self.client.generate_image(model, prompt, width, height) 175 request_id = result["request_id"] 176 177 data = self.client.poll_result(request_id) 178 if data and data["status"] == "completed": 179 image_url = data["output"]["image_url"] 180 filename = self._safe_filename(name, "png") 181 filepath = os.path.join(self.output_dir, "images", filename) 182 self._download_file(image_url, filepath) 183 184 return GenerationResult( 185 name=name, model=model, media_type="image", 186 status="success", output_url=image_url, 187 local_path=filepath, cost_estimate=cost, 188 duration_seconds=time.time() - start 189 ) 190 except requests.HTTPError as e: 191 if e.response.status_code == 429: 192 wait = 2 ** (attempt + 2) 193 logger.warning(f"Limite de taxa excedido, aguardando {wait}s") 194 time.sleep(wait) 195 continue 196 logger.error(f"Erro HTTP gerando '{name}': {e}") 197 except Exception as e: 198 logger.error(f"Erro gerando '{name}': {e}") 199 200 if attempt < retries: 201 time.sleep(2 ** attempt) 202 203 return GenerationResult( 204 name=name, model=model, media_type="image", 205 status="failed", cost_estimate=0, 206 duration_seconds=time.time() - start, 207 error_message="Máximo de retentativas excedido" 208 ) 209 210 def _process_video(self, name: str, model: str, prompt: str, 211 duration: int = 5, resolution: str = "1080p", 212 retries: int = 2) -> GenerationResult: 213 start = time.time() 214 cost = self.client.estimate_cost(model, duration) 215 216 for attempt in range(retries + 1): 217 try: 218 logger.info(f"[Vídeo] Gerando '{name}' (tentativa {attempt + 1})") 219 result = self.client.generate_video(model, prompt, duration, resolution) 220 request_id = result["request_id"] 221 222 data = self.client.poll_result(request_id, max_wait=600) 223 if data and data["status"] == "completed": 224 video_url = data["output"]["video_url"] 225 filename = self._safe_filename(name, "mp4") 226 filepath = os.path.join(self.output_dir, "videos", filename) 227 self._download_file(video_url, filepath) 228 229 return GenerationResult( 230 name=name, model=model, media_type="video", 231 status="success", output_url=video_url, 232 local_path=filepath, cost_estimate=cost, 233 duration_seconds=time.time() - start 234 ) 235 except requests.HTTPError as e: 236 if e.response.status_code == 429: 237 wait = 2 ** (attempt + 2) 238 logger.warning(f"Limite de taxa excedido, aguardando {wait}s") 239 time.sleep(wait) 240 continue 241 logger.error(f"Erro HTTP gerando '{name}': {e}") 242 except Exception as e: 243 logger.error(f"Erro gerando '{name}': {e}") 244 245 if attempt < retries: 246 time.sleep(2 ** (attempt + 1)) 247 248 return GenerationResult( 249 name=name, model=model, media_type="video", 250 status="failed", cost_estimate=0, 251 duration_seconds=time.time() - start, 252 error_message="Máximo de retentativas excedido" 253 ) 254 255 def batch_generate(self, jobs: list[dict], max_workers: int = 3): 256 logger.info(f"Iniciando lote de {len(jobs)} trabalhos com {max_workers} workers") 257 start_time = time.time() 258 259 with ThreadPoolExecutor(max_workers=max_workers) as executor: 260 futures = {} 261 for job in jobs: 262 if job["type"] == "image": 263 future = executor.submit( 264 self._process_image, 265 name=job["name"], 266 model=job["model"], 267 prompt=job["prompt"], 268 width=job.get("width", 1024), 269 height=job.get("height", 1024) 270 ) 271 elif job["type"] == "video": 272 future = executor.submit( 273 self._process_video, 274 name=job["name"], 275 model=job["model"], 276 prompt=job["prompt"], 277 duration=job.get("duration", 5), 278 resolution=job.get("resolution", "1080p") 279 ) 280 else: 281 logger.warning(f"Tipo de trabalho desconhecido: {job['type']}") 282 continue 283 futures[future] = job["name"] 284 285 for future in as_completed(futures): 286 result = future.result() 287 self.results.append(result) 288 self.total_cost += result.cost_estimate 289 status_icon = "OK" if result.status == "success" else "FAIL" 290 logger.info( 291 f"[{status_icon}] {result.name} -- " 292 f"USD{result.cost_estimate:.3f} -- " 293 f"{result.duration_seconds:.1f}s" 294 ) 295 296 elapsed = time.time() - start_time 297 self._save_manifest() 298 self._print_summary(elapsed) 299 300 def _save_manifest(self): 301 manifest = { 302 "generated_at": datetime.now().isoformat(), 303 "total_cost": round(self.total_cost, 4), 304 "total_jobs": len(self.results), 305 "successful": sum(1 for r in self.results if r.status == "success"), 306 "failed": sum(1 for r in self.results if r.status != "success"), 307 "results": [ 308 { 309 "name": r.name, 310 "model": r.model, 311 "type": r.media_type, 312 "status": r.status, 313 "output_url": r.output_url, 314 "local_path": r.local_path, 315 "cost": round(r.cost_estimate, 4), 316 "generation_time": round(r.duration_seconds, 1), 317 "error": r.error_message 318 } 319 for r in self.results 320 ] 321 } 322 manifest_path = os.path.join(self.output_dir, "manifest.json") 323 with open(manifest_path, "w") as f: 324 json.dump(manifest, f, indent=2) 325 logger.info(f"Manifesto salvo em {manifest_path}") 326 327 def _print_summary(self, elapsed: float): 328 success = sum(1 for r in self.results if r.status == "success") 329 failed = len(self.results) - success 330 cost_by_model = {} 331 for r in self.results: 332 cost_by_model[r.model] = cost_by_model.get(r.model, 0) + r.cost_estimate 333 334 print("\n" + "=" * 60) 335 print("RESUMO DO PIPELINE") 336 print("=" * 60) 337 print(f"Total de trabalhos: {len(self.results)}") 338 print(f"Bem-sucedidos: {success}") 339 print(f"Falhas: {failed}") 340 print(f"Custo total: USD{self.total_cost:.4f}") 341 print(f"Tempo total: {elapsed:.1f}s") 342 print(f"\nCusto por modelo:") 343 for model, cost in sorted(cost_by_model.items()): 344 short_name = model.split("/")[1] 345 print(f" {short_name}: USD{cost:.4f}") 346 print("=" * 60) 347```
Usando o Pipeline
Com as classes
1AtlasCloudClient1VideoPipelineUso Básico: Miniaturas + Vídeos
plaintext1```python 2API_KEY = "sua-chave-api-atlas-cloud" 3 4pipeline = VideoPipeline(api_key=API_KEY, output_dir="conteudo_semanal") 5 6jobs = [ 7 # Gerar miniaturas com Flux 2 Pro 8 { 9 "name": "Miniatura de Lançamento", 10 "type": "image", 11 "model": "black-forest-labs/flux-2-pro/text-to-image", 12 "prompt": "Miniatura chamativa para YouTube, texto em negrito 'NOVO LANÇAMENTO', " 13 "foco no produto sobre fundo degradê escuro, cores de destaque vibrantes, " 14 "design profissional, 4K" 15 }, 16 { 17 "name": "Miniatura de Tutorial", 18 "type": "image", 19 "model": "black-forest-labs/flux-2-pro/text-to-image", 20 "prompt": "Miniatura de YouTube para tutorial de programação, tela dividida " 21 "mostrando editor de código e resultado final, estética tech, " 22 "design limpo e moderno, texto grande e legível" 23 }, 24 25 # Gerar vídeos com Seedance 2.0 (econômico) 26 { 27 "name": "Apresentação de Produto Seedance", 28 "type": "video", 29 "model": "bytedance/seedance-v1.5-pro/text-to-video", 30 "prompt": "Animação elegante de revelação de produto, gadget moderno emergindo " 31 "de luz suave, girando lentamente para mostrar todos os ângulos, " 32 "fundo branco minimalista, iluminação cinematográfica", 33 "duration": 10 34 }, 35 36 # Gerar vídeo cinematográfico com Veo 3.1 37 { 38 "name": "Vídeo Hero Veo", 39 "type": "video", 40 "model": "google/veo3.1/text-to-video", 41 "prompt": "Take aéreo cinematográfico de um horizonte urbano moderno na golden " 42 "hour, câmera avançando lentamente, lens flare do " 43 "sol poente, sons ambientes da cidade, granulação de filme, " 44 "color grading profissional", 45 "duration": 8 46 }, 47] 48 49pipeline.batch_generate(jobs, max_workers=3) 50```
Abordagem Orientada a Configuração
Para pipelines recorrentes, defina trabalhos em um arquivo de configuração YAML:
plaintext1```yaml 2# pipeline_config.yaml 3output_dir: conteudo_semanal 4max_workers: 3 5 6jobs: 7 - name: Imagem de Produto 8 type: image 9 model: google/imagen4-ultra/text-to-image 10 prompt: > 11 Fotografia de produto premium de fones de ouvido sem fio em estojo de carregamento, 12 superfície reflexiva escura, iluminação dramática, estética tecnológica de luxo, 13 resolução 8K, qualidade comercial 14 width: 2048 15 height: 2048 16 17 - name: Vídeo Social Media 18 type: video 19 model: bytedance/seedance-v1.5-pro/text-to-video 20 prompt: > 21 Conteúdo de mídia social moderno, mãos fazendo unboxing de produto tech premium, 22 momento de revelação satisfatório, detalhes em close-up, iluminação natural brilhante, 23 formato vertical 24 duration: 10 25 resolution: 1080p 26```
Carregue e execute:
plaintext1```python 2import yaml 3 4with open("pipeline_config.yaml") as f: 5 config = yaml.safe_load(f) 6 7pipeline = VideoPipeline( 8 api_key=API_KEY, 9 output_dir=config["output_dir"] 10) 11pipeline.batch_generate( 12 config["jobs"], 13 max_workers=config.get("max_workers", 3) 14) 15```
Detalhes Importantes da Implementação
Polling com Exponential Backoff
A geração de vídeo leva de 30 segundos a 5 minutos, dependendo do modelo e da duração. O pipeline usa exponential backoff para verificar resultados de forma eficiente:
plaintext1```python 2interval = initial_interval # começa em 5s 3while time.time() - start_time < max_wait: 4 # ... verificar status ... 5 time.sleep(interval) 6 interval = min(interval * 1.5, max_interval) # aumenta até 30s 7```
Gerenciamento de Limites de Taxa (Rate Limits)
Quando a API retorna status 429, o pipeline faz uma pausa exponencial em vez de falhar imediatamente. Isso é essencial para operações em lote onde muitas requisições simultâneas podem exceder os limites.
Controle de Concorrência
O
1ThreadPoolExecutor1max_workers=3Rastreamento de Custos
Cada requisição de geração recebe uma estimativa de custo baseada na tabela de preços do modelo. O arquivo de manifesto acompanha os custos por requisição e os custos cumulativos para controle de orçamento.
Estimativa de Custos para Execuções do Pipeline
Aqui está o custo típico de execuções do pipeline:
| Cenário de Pipeline | Trabalhos | Modelos Usados | Custo Est. | Tempo Est. |
|---|---|---|---|---|
| Pacote social semanal | 10 img + 5 vídeos (5s) | Flux 2 Pro + Seedance 2.0 | USD0.95 | ~10 min |
| Campanha de lançamento | 20 img + 10 vídeos (10s) | Flux 2 Pro + Imagen 4 Ultra + Seedance 2.0 | USD3.80 | ~25 min |
| Biblioteca mensal | 50 img + 20 vídeos (8s) | Misto | USD7.50 | ~45 min |
| Catálogo E-commerce | 500 imagens | Flux 2 Pro | USD20.00 | ~30 min |
Seedance 2.0 é a opção mais econômica para geração de vídeo em alto volume. Veo 3.1 oferece um excelente equilíbrio entre qualidade e custo para clipes cinematográficos.
Comece a construir seu pipeline de vídeo — USD1 de crédito grátis
Dicas de Implantação
Cron Jobs para Geração Agendada
Execute o pipeline seguindo um agendamento usando cron:
plaintext1# Gera conteúdo semanal toda segunda-feira às 06:00 20 6 * * 1 cd /caminho/do/projeto && python run_pipeline.py --config weekly.yaml
Arquitetura Baseada em Fila
Para implantações maiores, use uma fila de tarefas como Celery ou Redis Queue para desacoplar a submissão do trabalho do processamento, permitindo que a geração ocorra de forma assíncrona.
Gerenciamento de Variáveis de Ambiente
Nunca coloque chaves de API diretamente no código. Use variáveis de ambiente e arquivos
1.envVeredito
Construir um pipeline de vídeo com IA não se trata apenas de escrever um código inteligente, mas de ter uma infraestrutura robusta que lide com as realidades da integração via API: limites, timeouts, falhas e custos. O pipeline deste guia resolve esses desafios. Copie, customize os prompts e modelos, e implante-o conforme sua necessidade.
Construa seu pipeline de vídeo com IA — USD1 de crédito grátis no Atlas Cloud






