Como criar um pipeline de vídeo com IA usando Python e Atlas Cloud

A maioria das equipes começa com a geração de vídeo por IA fazendo chamadas de API pontuais — gera um único vídeo, faz o download e segue em frente. Isso funciona para experimentação.

*Última atualização: 28 de fevereiro de 2026*

Veja estes modelos em ação:

j-qDCyXubyE

Arquitetura do Pipeline

Antes de escrever o código, aqui está a arquitetura de alto nível do que estamos construindo:

plaintext
1```
2+-------------------+     +--------------------+     +------------------+
3|  Config. de Prompt |     |  API Atlas Cloud   |     |  Armazenamento   |
4|  (JSON/YAML)      |     |                    |     |  de Saída        |
5|  - prompts        +---->+  /generateImage    +---->+  /images/        |
6|  - models         |     |  /generateVideo    |     |  /videos/        |
7|  - parameters     |     |  /prediction/get   |     |  /manifest.json  |
8+-------------------+     +--------------------+     +------------------+
9         |                         |                         |
10         v                         v                         v
11+-------------------+     +--------------------+     +------------------+
12|  Motor de Pipeline|     |  Polling & Retry   |     |  Rastreio de     |
13|                   |     |                    |     |  Custos          |
14|  - batch_generate |     |  - exponential     |     |                  |
15|  - concurrency    |     |    backoff         |     |  - por requisição|
16|  - roteamento     |     |  - max retries     |     |  - cumulativo    |
17+-------------------+     +--------------------+     +------------------+
18```

O pipeline segue um fluxo simples:

  1. Ler configurações de prompt de um arquivo de entrada estruturado.
  2. Roteia cada prompt para o modelo e endpoint apropriado (imagem ou vídeo).
  3. Envia todas as requisições para a API do Atlas Cloud com concorrência controlada.
  4. Verifica resultados (polling) com lógica de espera exponencial (backoff) e novas tentativas.
  5. Faz o download das saídas concluídas e salva em diretórios organizados.
  6. Rastreia custos e gera um manifesto de resumo.

Primeiros Passos: Acesso à API

Passo 1: Obtenha sua chave de API

Cadastre-se no Atlas Cloud e crie uma chave de API no dashboard. O crédito gratuito de USD1 é suficiente para testar o pipeline completo com várias gerações de imagem e vídeo.

image.png

image.png

Passo 2: Instale as dependências

plaintext
1```bash
2pip install requests pyyaml
3```

Não são necessários frameworks pesados. O pipeline utiliza apenas

text
1requests
para chamadas HTTP,
text
1pyyaml
para arquivos de configuração e os módulos da biblioteca padrão do Python para concorrência e manipulação de arquivos.

O Código Completo do Pipeline

Abaixo está o pipeline funcional completo. Cada seção é explicada após o bloco de código.

plaintext
1```python
2import requests
3import time
4import json
5import os
6import logging
7from concurrent.futures import ThreadPoolExecutor, as_completed
8from dataclasses import dataclass, field
9from typing import Optional
10from datetime import datetime
11
12# Configurar logs
13logging.basicConfig(
14    level=logging.INFO,
15    format="%(asctime)s [%(levelname)s] %(message)s",
16    datefmt="%Y-%m-%d %H:%M:%S"
17)
18logger = logging.getLogger("atlas_pipeline")
19
20@dataclass
21class GenerationResult:
22    """Armazena o resultado de uma única requisição de geração."""
23    name: str
24    model: str
25    media_type: str  # "image" ou "video"
26    status: str  # "success", "failed", "error"
27    output_url: Optional[str] = None
28    local_path: Optional[str] = None
29    cost_estimate: float = 0.0
30    duration_seconds: float = 0.0
31    error_message: Optional[str] = None
32
33class AtlasCloudClient:
34    """Wrapper de cliente para a API do Atlas Cloud."""
35
36    BASE_URL = "https://api.atlascloud.ai/api/v1"
37
38    # Preços por modelo (aproximados)
39    PRICING = {
40        "black-forest-labs/flux-2-pro/text-to-image": 0.04,        # por imagem
41        "google/imagen4-ultra/text-to-image": 0.06,                # por imagem
42        "bytedance/seedance-v1.5-pro/text-to-video": 0.022,       # por segundo
43        "google/veo3.1/text-to-video": 0.03,                       # por segundo
44        "openai/sora-v2/text-to-video": 0.15,                     # por segundo
45    }
46
47    def __init__(self, api_key: str):
48        self.api_key = api_key
49        self.session = requests.Session()
50        self.session.headers.update({
51            "Authorization": f"Bearer {api_key}",
52            "Content-Type": "application/json"
53        })
54
55    def generate_image(
56        self,
57        model: str,
58        prompt: str,
59        width: int = 1024,
60        height: int = 1024
61    ) -> dict:
62        """Envia uma requisição de geração de imagem."""
63        response = self.session.post(
64            f"{self.BASE_URL}/model/generateImage",
65            json={
66                "model": model,
67                "prompt": prompt,
68                "width": width,
69                "height": height
70            }
71        )
72        response.raise_for_status()
73        return response.json()
74
75    def generate_video(
76        self,
77        model: str,
78        prompt: str,
79        duration: int = 5,
80        resolution: str = "1080p"
81    ) -> dict:
82        """Envia uma requisição de geração de vídeo."""
83        response = self.session.post(
84            f"{self.BASE_URL}/model/generateVideo",
85            json={
86                "model": model,
87                "prompt": prompt,
88                "duration": duration,
89                "resolution": resolution
90            }
91        )
92        response.raise_for_status()
93        return response.json()
94
95    def poll_result(
96        self,
97        request_id: str,
98        max_wait: int = 300,
99        initial_interval: int = 5,
100        max_interval: int = 30
101    ) -> Optional[dict]:
102        """Verifica o resultado da geração com exponential backoff."""
103        start_time = time.time()
104        interval = initial_interval
105
106        while time.time() - start_time < max_wait:
107            try:
108                response = self.session.get(
109                    f"{self.BASE_URL}/model/prediction/{request_id}/get"
110                )
111                data = response.json()
112
113                if data["status"] == "completed":
114                    return data
115                elif data["status"] == "failed":
116                    logger.error(f"Geração falhou: {data.get('error', 'Erro desconhecido')}")
117                    return None
118
119                logger.debug(f"Status: {data['status']}, aguardando {interval}s...")
120                time.sleep(interval)
121                interval = min(interval * 1.5, max_interval)
122
123            except requests.RequestException as e:
124                logger.warning(f"Requisição de poll falhou: {e}, tentando novamente em {interval}s")
125                time.sleep(interval)
126
127        logger.error(f"Tempo limite excedido após {max_wait}s aguardando {request_id}")
128        return None
129
130    def estimate_cost(self, model: str, duration: int = 0) -> float:
131        """Estima o custo de uma requisição de geração."""
132        base_price = self.PRICING.get(model, 0.05)
133        if "text-to-video" in model and duration > 0:
134            return base_price * duration
135        return base_price
136
137class VideoPipeline:
138    """Orquestra a geração em lote de imagens e vídeos."""
139
140    def __init__(self, api_key: str, output_dir: str = "pipeline_output"):
141        self.client = AtlasCloudClient(api_key)
142        self.output_dir = output_dir
143        self.results: list[GenerationResult] = []
144        self.total_cost = 0.0
145
146        os.makedirs(os.path.join(output_dir, "images"), exist_ok=True)
147        os.makedirs(os.path.join(output_dir, "videos"), exist_ok=True)
148
149    def _download_file(self, url: str, filepath: str) -> bool:
150        try:
151            response = requests.get(url, timeout=60)
152            response.raise_for_status()
153            with open(filepath, "wb") as f:
154                f.write(response.content)
155            return True
156        except Exception as e:
157            logger.error(f"Download falhou para {url}: {e}")
158            return False
159
160    def _safe_filename(self, name: str, extension: str) -> str:
161        safe = name.lower().replace(" ", "_")
162        safe = "".join(c for c in safe if c.isalnum() or c == "_")
163        return f"{safe}.{extension}"
164
165    def _process_image(self, name: str, model: str, prompt: str,
166                       width: int = 1024, height: int = 1024,
167                       retries: int = 2) -> GenerationResult:
168        start = time.time()
169        cost = self.client.estimate_cost(model)
170
171        for attempt in range(retries + 1):
172            try:
173                logger.info(f"[Imagem] Gerando '{name}' (tentativa {attempt + 1})")
174                result = self.client.generate_image(model, prompt, width, height)
175                request_id = result["request_id"]
176
177                data = self.client.poll_result(request_id)
178                if data and data["status"] == "completed":
179                    image_url = data["output"]["image_url"]
180                    filename = self._safe_filename(name, "png")
181                    filepath = os.path.join(self.output_dir, "images", filename)
182                    self._download_file(image_url, filepath)
183
184                    return GenerationResult(
185                        name=name, model=model, media_type="image",
186                        status="success", output_url=image_url,
187                        local_path=filepath, cost_estimate=cost,
188                        duration_seconds=time.time() - start
189                    )
190            except requests.HTTPError as e:
191                if e.response.status_code == 429:
192                    wait = 2 ** (attempt + 2)
193                    logger.warning(f"Limite de taxa excedido, aguardando {wait}s")
194                    time.sleep(wait)
195                    continue
196                logger.error(f"Erro HTTP gerando '{name}': {e}")
197            except Exception as e:
198                logger.error(f"Erro gerando '{name}': {e}")
199
200            if attempt < retries:
201                time.sleep(2 ** attempt)
202
203        return GenerationResult(
204            name=name, model=model, media_type="image",
205            status="failed", cost_estimate=0,
206            duration_seconds=time.time() - start,
207            error_message="Máximo de retentativas excedido"
208        )
209
210    def _process_video(self, name: str, model: str, prompt: str,
211                       duration: int = 5, resolution: str = "1080p",
212                       retries: int = 2) -> GenerationResult:
213        start = time.time()
214        cost = self.client.estimate_cost(model, duration)
215
216        for attempt in range(retries + 1):
217            try:
218                logger.info(f"[Vídeo] Gerando '{name}' (tentativa {attempt + 1})")
219                result = self.client.generate_video(model, prompt, duration, resolution)
220                request_id = result["request_id"]
221
222                data = self.client.poll_result(request_id, max_wait=600)
223                if data and data["status"] == "completed":
224                    video_url = data["output"]["video_url"]
225                    filename = self._safe_filename(name, "mp4")
226                    filepath = os.path.join(self.output_dir, "videos", filename)
227                    self._download_file(video_url, filepath)
228
229                    return GenerationResult(
230                        name=name, model=model, media_type="video",
231                        status="success", output_url=video_url,
232                        local_path=filepath, cost_estimate=cost,
233                        duration_seconds=time.time() - start
234                    )
235            except requests.HTTPError as e:
236                if e.response.status_code == 429:
237                    wait = 2 ** (attempt + 2)
238                    logger.warning(f"Limite de taxa excedido, aguardando {wait}s")
239                    time.sleep(wait)
240                    continue
241                logger.error(f"Erro HTTP gerando '{name}': {e}")
242            except Exception as e:
243                logger.error(f"Erro gerando '{name}': {e}")
244
245            if attempt < retries:
246                time.sleep(2 ** (attempt + 1))
247
248        return GenerationResult(
249            name=name, model=model, media_type="video",
250            status="failed", cost_estimate=0,
251            duration_seconds=time.time() - start,
252            error_message="Máximo de retentativas excedido"
253        )
254
255    def batch_generate(self, jobs: list[dict], max_workers: int = 3):
256        logger.info(f"Iniciando lote de {len(jobs)} trabalhos com {max_workers} workers")
257        start_time = time.time()
258
259        with ThreadPoolExecutor(max_workers=max_workers) as executor:
260            futures = {}
261            for job in jobs:
262                if job["type"] == "image":
263                    future = executor.submit(
264                        self._process_image,
265                        name=job["name"],
266                        model=job["model"],
267                        prompt=job["prompt"],
268                        width=job.get("width", 1024),
269                        height=job.get("height", 1024)
270                    )
271                elif job["type"] == "video":
272                    future = executor.submit(
273                        self._process_video,
274                        name=job["name"],
275                        model=job["model"],
276                        prompt=job["prompt"],
277                        duration=job.get("duration", 5),
278                        resolution=job.get("resolution", "1080p")
279                    )
280                else:
281                    logger.warning(f"Tipo de trabalho desconhecido: {job['type']}")
282                    continue
283                futures[future] = job["name"]
284
285            for future in as_completed(futures):
286                result = future.result()
287                self.results.append(result)
288                self.total_cost += result.cost_estimate
289                status_icon = "OK" if result.status == "success" else "FAIL"
290                logger.info(
291                    f"[{status_icon}] {result.name} -- "
292                    f"USD{result.cost_estimate:.3f} -- "
293                    f"{result.duration_seconds:.1f}s"
294                )
295
296        elapsed = time.time() - start_time
297        self._save_manifest()
298        self._print_summary(elapsed)
299
300    def _save_manifest(self):
301        manifest = {
302            "generated_at": datetime.now().isoformat(),
303            "total_cost": round(self.total_cost, 4),
304            "total_jobs": len(self.results),
305            "successful": sum(1 for r in self.results if r.status == "success"),
306            "failed": sum(1 for r in self.results if r.status != "success"),
307            "results": [
308                {
309                    "name": r.name,
310                    "model": r.model,
311                    "type": r.media_type,
312                    "status": r.status,
313                    "output_url": r.output_url,
314                    "local_path": r.local_path,
315                    "cost": round(r.cost_estimate, 4),
316                    "generation_time": round(r.duration_seconds, 1),
317                    "error": r.error_message
318                }
319                for r in self.results
320            ]
321        }
322        manifest_path = os.path.join(self.output_dir, "manifest.json")
323        with open(manifest_path, "w") as f:
324            json.dump(manifest, f, indent=2)
325        logger.info(f"Manifesto salvo em {manifest_path}")
326
327    def _print_summary(self, elapsed: float):
328        success = sum(1 for r in self.results if r.status == "success")
329        failed = len(self.results) - success
330        cost_by_model = {}
331        for r in self.results:
332            cost_by_model[r.model] = cost_by_model.get(r.model, 0) + r.cost_estimate
333
334        print("\n" + "=" * 60)
335        print("RESUMO DO PIPELINE")
336        print("=" * 60)
337        print(f"Total de trabalhos: {len(self.results)}")
338        print(f"Bem-sucedidos:      {success}")
339        print(f"Falhas:             {failed}")
340        print(f"Custo total:        USD{self.total_cost:.4f}")
341        print(f"Tempo total:        {elapsed:.1f}s")
342        print(f"\nCusto por modelo:")
343        for model, cost in sorted(cost_by_model.items()):
344            short_name = model.split("/")[1]
345            print(f"  {short_name}: USD{cost:.4f}")
346        print("=" * 60)
347```

Usando o Pipeline

Com as classes

text
1AtlasCloudClient
e
text
1VideoPipeline
definidas, veja como utilizá-las para um fluxo de produção de conteúdo típico.

Uso Básico: Miniaturas + Vídeos

plaintext
1```python
2API_KEY = "sua-chave-api-atlas-cloud"
3
4pipeline = VideoPipeline(api_key=API_KEY, output_dir="conteudo_semanal")
5
6jobs = [
7    # Gerar miniaturas com Flux 2 Pro
8    {
9        "name": "Miniatura de Lançamento",
10        "type": "image",
11        "model": "black-forest-labs/flux-2-pro/text-to-image",
12        "prompt": "Miniatura chamativa para YouTube, texto em negrito 'NOVO LANÇAMENTO', "
13                  "foco no produto sobre fundo degradê escuro, cores de destaque vibrantes, "
14                  "design profissional, 4K"
15    },
16    {
17        "name": "Miniatura de Tutorial",
18        "type": "image",
19        "model": "black-forest-labs/flux-2-pro/text-to-image",
20        "prompt": "Miniatura de YouTube para tutorial de programação, tela dividida "
21                  "mostrando editor de código e resultado final, estética tech, "
22                  "design limpo e moderno, texto grande e legível"
23    },
24
25    # Gerar vídeos com Seedance 2.0 (econômico)
26    {
27        "name": "Apresentação de Produto Seedance",
28        "type": "video",
29        "model": "bytedance/seedance-v1.5-pro/text-to-video",
30        "prompt": "Animação elegante de revelação de produto, gadget moderno emergindo "
31                  "de luz suave, girando lentamente para mostrar todos os ângulos, "
32                  "fundo branco minimalista, iluminação cinematográfica",
33        "duration": 10
34    },
35
36    # Gerar vídeo cinematográfico com Veo 3.1
37    {
38        "name": "Vídeo Hero Veo",
39        "type": "video",
40        "model": "google/veo3.1/text-to-video",
41        "prompt": "Take aéreo cinematográfico de um horizonte urbano moderno na golden "
42                  "hour, câmera avançando lentamente, lens flare do "
43                  "sol poente, sons ambientes da cidade, granulação de filme, "
44                  "color grading profissional",
45        "duration": 8
46    },
47]
48
49pipeline.batch_generate(jobs, max_workers=3)
50```

Abordagem Orientada a Configuração

Para pipelines recorrentes, defina trabalhos em um arquivo de configuração YAML:

plaintext
1```yaml
2# pipeline_config.yaml
3output_dir: conteudo_semanal
4max_workers: 3
5
6jobs:
7  - name: Imagem de Produto
8    type: image
9    model: google/imagen4-ultra/text-to-image
10    prompt: >
11      Fotografia de produto premium de fones de ouvido sem fio em estojo de carregamento,
12      superfície reflexiva escura, iluminação dramática, estética tecnológica de luxo,
13      resolução 8K, qualidade comercial
14    width: 2048
15    height: 2048
16
17  - name: Vídeo Social Media
18    type: video
19    model: bytedance/seedance-v1.5-pro/text-to-video
20    prompt: >
21      Conteúdo de mídia social moderno, mãos fazendo unboxing de produto tech premium,
22      momento de revelação satisfatório, detalhes em close-up, iluminação natural brilhante,
23      formato vertical
24    duration: 10
25    resolution: 1080p
26```

Carregue e execute:

plaintext
1```python
2import yaml
3
4with open("pipeline_config.yaml") as f:
5    config = yaml.safe_load(f)
6
7pipeline = VideoPipeline(
8    api_key=API_KEY,
9    output_dir=config["output_dir"]
10)
11pipeline.batch_generate(
12    config["jobs"],
13    max_workers=config.get("max_workers", 3)
14)
15```

Detalhes Importantes da Implementação

Polling com Exponential Backoff

A geração de vídeo leva de 30 segundos a 5 minutos, dependendo do modelo e da duração. O pipeline usa exponential backoff para verificar resultados de forma eficiente:

plaintext
1```python
2interval = initial_interval  # começa em 5s
3while time.time() - start_time < max_wait:
4    # ... verificar status ...
5    time.sleep(interval)
6    interval = min(interval * 1.5, max_interval)  # aumenta até 30s
7```

Gerenciamento de Limites de Taxa (Rate Limits)

Quando a API retorna status 429, o pipeline faz uma pausa exponencial em vez de falhar imediatamente. Isso é essencial para operações em lote onde muitas requisições simultâneas podem exceder os limites.

Controle de Concorrência

O

text
1ThreadPoolExecutor
limita as requisições simultâneas para evitar sobrecarregar a API ou sua conexão de rede. Comece com
text
1max_workers=3
e aumente gradualmente se sua conta suportar maior concorrência.

Rastreamento de Custos

Cada requisição de geração recebe uma estimativa de custo baseada na tabela de preços do modelo. O arquivo de manifesto acompanha os custos por requisição e os custos cumulativos para controle de orçamento.

Estimativa de Custos para Execuções do Pipeline

Aqui está o custo típico de execuções do pipeline:

Cenário de PipelineTrabalhosModelos UsadosCusto Est.Tempo Est.
Pacote social semanal10 img + 5 vídeos (5s)Flux 2 Pro + Seedance 2.0USD0.95~10 min
Campanha de lançamento20 img + 10 vídeos (10s)Flux 2 Pro + Imagen 4 Ultra + Seedance 2.0USD3.80~25 min
Biblioteca mensal50 img + 20 vídeos (8s)MistoUSD7.50~45 min
Catálogo E-commerce500 imagensFlux 2 ProUSD20.00~30 min

Seedance 2.0 é a opção mais econômica para geração de vídeo em alto volume. Veo 3.1 oferece um excelente equilíbrio entre qualidade e custo para clipes cinematográficos.

Comece a construir seu pipeline de vídeo — USD1 de crédito grátis

Dicas de Implantação

Cron Jobs para Geração Agendada

Execute o pipeline seguindo um agendamento usando cron:

plaintext
1# Gera conteúdo semanal toda segunda-feira às 06:00
20 6 * * 1 cd /caminho/do/projeto && python run_pipeline.py --config weekly.yaml

Arquitetura Baseada em Fila

Para implantações maiores, use uma fila de tarefas como Celery ou Redis Queue para desacoplar a submissão do trabalho do processamento, permitindo que a geração ocorra de forma assíncrona.

Gerenciamento de Variáveis de Ambiente

Nunca coloque chaves de API diretamente no código. Use variáveis de ambiente e arquivos

text
1.env
para segurança.

Veredito

Construir um pipeline de vídeo com IA não se trata apenas de escrever um código inteligente, mas de ter uma infraestrutura robusta que lide com as realidades da integração via API: limites, timeouts, falhas e custos. O pipeline deste guia resolve esses desafios. Copie, customize os prompts e modelos, e implante-o conforme sua necessidade.

Construa seu pipeline de vídeo com IA — USD1 de crédito grátis no Atlas Cloud

Modelos relacionados

Mais de 300 Modelos, Comece Agora,

Explorar Todos os Modelos

Join our Discord community

Join the Discord community for the latest model updates, prompts, and support.