Como Escalar AI Agents em Django SaaS para 1 Milhão de Usuários com LangGraph e Celery (Arquitetura, Custos e Alta Performance em Produção)

Published on: 2026-01-27
Post image
pt ai-agents django-saas langgraph celery django-celery scalable-ai-agents autonomous-ai-workflows ai-orchestration async-tasks-django distributed-task-queue llm-agents-production ai-agents-architecture scaling-ai-systems high-scale-ai django

Escalar agentes de inteligência artificial em um produto do tipo SaaS (software como serviço) envolve mais do que “chamar um modelo e retornar texto”. Em produção, agentes precisam executar fluxos longos, lidar com falhas externas, manter estado entre etapas e entregar resultados sem travar a aplicação principal. Nesse cenário, uma arquitetura bem definida reduz custos, aumenta a previsibilidade e impede que a infraestrutura entre em colapso sob carga.

Uma abordagem sólida combina Django como camada de orquestração e persistência, Celery como fila de execução assíncrona, e LangGraph como motor de workflow em grafo com estado. Esse conjunto separa responsabilidades, permite reprocessamento seguro, melhora observabilidade e suporta crescimento para volumes muito altos de tarefas diárias com estabilidade.

Visão geral da arquitetura em camadas

Uma arquitetura escalável costuma ser descrita como um fluxo que sai do pedido do usuário e chega ao resultado persistido. A aplicação web recebe requisições, valida entrada e cria um registro de execução, enquanto o processamento pesado ocorre fora do ciclo da requisição. O resultado é armazenado, e o usuário é notificado quando a execução termina. Essa separação preserva desempenho e evita que o servidor web fique preso em chamadas longas.

Uma forma simples de visualizar o fluxo é: Requisição → View do Django → Tarefa do Celery → Agente do LangGraph → Ferramentas externas → Persistência em banco → Notificação. Nessa cadeia, o Django não “roda o agente”, apenas inicia e acompanha o ciclo. O Celery garante processamento assíncrono e retentativas controladas. O LangGraph organiza etapas e decisões do agente como um grafo com checkpoints.

O que muda do protótipo para a produção

Em protótipos, é comum executar o agente diretamente dentro de uma view, porque funciona com poucos usuários. Em produção, esse padrão cria bloqueio de workers web, aumenta tempo de resposta e provoca filas internas de requisições. Além disso, falhas de provedores, rate limit (limite de taxa) e timeouts são frequentes e precisam de tratamento. A produção exige rastreabilidade, custo previsível e capacidade de retomar execuções interrompidas.

Uma diferença importante é que o agente passa a ser tratado como um “job” com ciclo de vida. Esse job tem estado, logs, métricas, custo estimado e resultado persistido. O sistema passa a registrar cada execução, incluindo entradas, saídas, erros e tempos. A infraestrutura também passa a ser desenhada para degradar de forma controlada, mantendo o serviço vivo mesmo quando parte do ecossistema falha.

Camada 1: Django como orquestração e gestão de estado

O Django funciona como o ponto central de controle: autenticação, permissões, validação, criação de registros e consulta de resultados. Ele também é o local ideal para regras de negócio, como plano do usuário, limites e auditoria. Em vez de “rodar o agente”, o Django cria uma execução e envia um identificador para processamento. Essa estratégia torna o tempo de resposta do endpoint previsível e baixo.

Para isso, um modelo de banco representa a execução do agente com status, timestamps e metadados. Esse registro facilita observabilidade e permite reprocessar ou investigar problemas. Também é um bom local para armazenar consumo de tokens e custo calculado por execução. A seguir, um exemplo completo de modelagem com índices pensados para escala.

from django.conf import settings
from django.db import models

class ExecucaoAgente(models.Model):
    """Registra cada execução de agente para rastreabilidade e suporte a escala."""

    STATUS_CHOICES = [
        ("pendente", "Pendente"),
        ("executando", "Executando"),
        ("concluida", "Concluída"),
        ("falhou", "Falhou"),
        ("reprocessando", "Reprocessando"),
        ("cancelada", "Cancelada"),
    ]

    usuario = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    tipo_agente = models.CharField(max_length=50)  # "pesquisa", "analise_dados", etc.

    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default="pendente")

    entrada = models.JSONField()
    saida = models.JSONField(null=True, blank=True)
    mensagem_erro = models.TextField(null=True, blank=True)

    iniciada_em = models.DateTimeField(null=True, blank=True)
    finalizada_em = models.DateTimeField(null=True, blank=True)

    tokens_total = models.IntegerField(default=0)
    custo_usd = models.DecimalField(max_digits=10, decimal_places=6, default=0)

    checkpoint_id = models.CharField(max_length=255, null=True, blank=True)
    etapa_atual = models.CharField(max_length=100, null=True, blank=True)

    criado_em = models.DateTimeField(auto_now_add=True)
    atualizado_em = models.DateTimeField(auto_now=True)

    class Meta:
        indexes = [
            models.Index(fields=["usuario", "-criado_em"]),
            models.Index(fields=["status", "criado_em"]),
            models.Index(fields=["checkpoint_id"]),
            models.Index(fields=["tipo_agente", "-criado_em"]),
        ]

Views do Django: o que era e o que passa a ser

O padrão que “era” comum é bloquear a view esperando o agente finalizar. Esse comportamento prende o worker web, aumenta latência e derruba a capacidade de atender múltiplos usuários simultaneamente. Mesmo com poucos segundos, esse tipo de bloqueio escala muito mal. A falha típica aparece como timeouts e erros 502 em momentos de pico.

O padrão “que será” é retornar rápido com um identificador de execução e processar em segundo plano. A view valida entrada, aplica limites e cria a execução no banco. Em seguida, enfileira a tarefa no Celery e devolve status 202 com um caminho de consulta de status. Esse desenho cria uma fronteira clara entre requisição e processamento pesado.

from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.utils import timezone

from .models import ExecucaoAgente
from .tasks import executar_agente_langgraph
from .utils import checar_limite_taxa

@require_POST
def criar_relatorio(request):
    query = request.POST.get("query")
    if not query:
        return JsonResponse({"erro": "Campo 'query' é obrigatório."}, status=400)

    if not checar_limite_taxa(request.user):
        return JsonResponse({"erro": "Limite de taxa excedido."}, status=429)

    execucao = ExecucaoAgente.objects.create(
        usuario=request.user,
        tipo_agente="pesquisa",
        entrada={"query": query},
        status="pendente",
        iniciada_em=None,
        finalizada_em=None,
    )

    executar_agente_langgraph.delay(execucao_id=execucao.id, usuario_id=request.user.id)

    return JsonResponse(
        {
            "execucao_id": execucao.id,
            "status": execucao.status,
            "tempo_estimado_segundos": 30,
            "status_url": f"/api/execucoes/{execucao.id}/",
        },
        status=202,
    )

Camada 2: Celery como execução assíncrona, filas e retentativas

O Celery é uma fila de tarefas assíncronas que permite executar trabalhos fora do ciclo da requisição. Ele é útil para tarefas longas, chamadas a APIs externas e processamentos que não devem travar o servidor web. Em cenários de grande volume, o Celery também oferece roteamento por filas e controle de concorrência. Outro ponto essencial é o suporte a retentativas com backoff, reduzindo falhas transitórias.

Uma tarefa bem configurada considera ack tardio (confirmar apenas após concluir), limites de tempo e tratamento distinto para falhas esperadas. Também é comum separar filas por prioridade para evitar que tarefas grandes “travem” tarefas pequenas. A seguir está um exemplo completo com classe base de tarefa, retentativa exponencial e marcação de status no banco. Nesse exemplo, funções auxiliares são chamadas para notificação e telemetria.

import logging
from celery import shared_task, Task
from celery.exceptions import MaxRetriesExceededError
from django.utils import timezone

logger = logging.getLogger(__name__)

class TarefaAgente(Task):
    """Tarefa base com retentativas e padrões seguros para execução longa."""
    autoretry_for = (TimeoutError,)
    retry_kwargs = {"max_retries": 3, "countdown": 5}
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

@shared_task(
    base=TarefaAgente,
    name="agentes.executar_agente_langgraph",
    bind=True,
    acks_late=True,
    reject_on_worker_lost=True,
    time_limit=300,
    soft_time_limit=270,
)
def executar_agente_langgraph(self, execucao_id: int, usuario_id: int):
    from .models import ExecucaoAgente
    from .agentes import criar_agente_pesquisa
    from .utils import calcular_custo_usd, enviar_progresso_ws, notificar_conclusao, notificar_erro

    execucao = ExecucaoAgente.objects.get(id=execucao_id)

    try:
        execucao.status = "executando"
        execucao.iniciada_em = timezone.now()
        execucao.save(update_fields=["status", "iniciada_em"])

        agente = criar_agente_pesquisa(usuario_id=usuario_id)

        resultado_final = None
        for evento in agente.stream(execucao.entrada):
            enviar_progresso_ws(execucao_id=execucao_id, evento=evento)

            if evento.get("final_output"):
                resultado_final = evento["final_output"]

            etapa = evento.get("step")
            if etapa:
                execucao.etapa_atual = str(etapa)
                execucao.save(update_fields=["etapa_atual"])

        if not resultado_final:
            raise RuntimeError("Execução finalizou sem resultado final.")

        tokens_total = int(resultado_final.get("tokens_total", 0))
        custo = calcular_custo_usd(tokens_total=tokens_total, modelo="gpt-4")

        execucao.status = "concluida"
        execucao.saida = resultado_final
        execucao.finalizada_em = timezone.now()
        execucao.tokens_total = tokens_total
        execucao.custo_usd = custo
        execucao.save()

        logger.info("Execução %s concluída.", execucao_id)
        notificar_conclusao.delay(usuario_id=usuario_id, execucao_id=execucao_id)
        return resultado_final

    except TimeoutError as exc:
        execucao.status = "reprocessando"
        execucao.save(update_fields=["status"])
        logger.warning("Timeout na execução %s. Tentando novamente.", execucao_id)
        raise self.retry(exc=exc)

    except MaxRetriesExceededError:
        execucao.status = "falhou"
        execucao.mensagem_erro = "Número máximo de retentativas excedido."
        execucao.finalizada_em = timezone.now()
        execucao.save(update_fields=["status", "mensagem_erro", "finalizada_em"])
        notificar_erro.delay(usuario_id=usuario_id, execucao_id=execucao_id)

    except Exception as exc:
        execucao.status = "falhou"
        execucao.mensagem_erro = str(exc)
        execucao.finalizada_em = timezone.now()
        execucao.save(update_fields=["status", "mensagem_erro", "finalizada_em"])
        logger.exception("Falha inesperada na execução %s.", execucao_id)
        notificar_erro.delay(usuario_id=usuario_id, execucao_id=execucao_id)
        raise

Filas do Celery: prioridades e isolamento de carga

Separar filas evita que tarefas de baixa prioridade ou muito longas atrasem tarefas importantes. Esse desenho também permite escalonar workers diferentes para necessidades diferentes, como tarefas pesadas, notificações rápidas e reprocessamento. A separação cria um sistema mais previsível sob picos, reduzindo “starvation” (fome), quando uma classe de tarefa nunca recebe CPU. Com filas distintas, fica mais simples ajustar concorrência e custo.

Uma configuração comum inclui filas como: alta prioridade, baixa prioridade, batch, retentativas e notificações. Cada rota aponta um tipo de tarefa para uma fila e permite iniciar workers dedicados com parâmetros adequados. O parâmetro prefetch baixo é importante em tarefas longas para evitar que um worker “reserve” várias tarefas e gere atraso artificial. A seguir, um exemplo de roteamento e parâmetros críticos.

from celery import Celery

app = Celery("agentes")

app.conf.task_routes = {
    "agentes.executar_agente_langgraph": {"queue": "agentes_alta", "routing_key": "agentes.alta"},
    "agentes.gerar_insights": {"queue": "agentes_baixa", "routing_key": "agentes.baixa"},
    "agentes.processar_lote": {"queue": "agentes_lote", "routing_key": "agentes.lote"},
    "agentes.reprocessar_falhas": {"queue": "agentes_retry", "routing_key": "agentes.retry"},
    "agentes.notificar": {"queue": "notificacoes", "routing_key": "notify"},
}

app.conf.worker_pool = "prefork"
app.conf.worker_prefetch_multiplier = 1
app.conf.worker_max_tasks_per_child = 50

Camada 3: LangGraph como runtime de workflows com estado

O LangGraph organiza um agente como um grafo de estados, onde cada nó é uma etapa e cada aresta define o fluxo. Essa abordagem é útil para workflows com múltiplas fases, validações e caminhos condicionais. O estado é compartilhado entre etapas e pode ser persistido com checkpoint, permitindo retomada após falha. Essa persistência reduz custo e evita repetir chamadas caras a modelos e APIs.

Um workflow típico inclui: busca, análise, geração de relatório e validação. A validação pode decidir se termina com sucesso, se tenta novamente com feedback, ou se falha de forma controlada. A seguir está um exemplo completo de definição de estado, grafo e roteamento condicional, incluindo checkpoint em PostgreSQL. O exemplo usa tipos simples e campos essenciais para iniciantes.

import json
import operator
from typing import TypedDict, Annotated, List, Dict, Any

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI

class EstadoAgente(TypedDict):
    """Estado compartilhado entre etapas do grafo."""
    mensagens: Annotated[list, operator.add]
    query: str
    resultados_busca: list
    analise: str
    relatorio_final: str
    validacao: dict
    erros: int

def criar_agente_pesquisa(usuario_id: int):
    llm_analise = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.1, timeout=30, max_retries=3)
    llm_geracao = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.3, timeout=30, max_retries=3)
    llm_validacao = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0, timeout=30, max_retries=3)

    checkpointer = PostgresSaver.from_conn_string(
        "postgresql://usuario:senha@localhost/banco",
        table_name="langgraph_checkpoints",
    )

    workflow = StateGraph(EstadoAgente)

    workflow.add_node("buscar", lambda s: no_busca(s))
    workflow.add_node("analisar", lambda s: no_analise(s, llm_analise))
    workflow.add_node("gerar_relatorio", lambda s: no_gerar_relatorio(s, llm_geracao))
    workflow.add_node("validar", lambda s: no_validar(s, llm_validacao))
    workflow.add_node("revisar", lambda s: no_revisar(s))

    workflow.set_entry_point("buscar")
    workflow.add_edge("buscar", "analisar")
    workflow.add_edge("analisar", "gerar_relatorio")
    workflow.add_edge("gerar_relatorio", "validar")

    workflow.add_conditional_edges(
        "validar",
        lambda s: rota_pos_validacao(s),
        {
            "sucesso": END,
            "retry": "revisar",
            "falha": END,
        },
    )

    workflow.add_edge("revisar", "gerar_relatorio")

    app = workflow.compile(
        checkpointer=checkpointer,
        interrupt_before=["validar"],
    )
    return app

def no_busca(state: EstadoAgente) -> EstadoAgente:
    """Simula busca externa e registra resultados no estado."""
    query = state.get("query") or state.get("entrada", {}).get("query")  # tolerância a formatos
    if not query:
        raise ValueError("Query ausente no estado.")

    state["query"] = query
    state.setdefault("mensagens", [])
    state.setdefault("erros", 0)

    resultados = [
        {"titulo": "Fonte A", "conteudo": "Resumo relevante sobre o tema."},
        {"titulo": "Fonte B", "conteudo": "Outro ponto de vista com detalhes adicionais."},
    ]
    state["resultados_busca"] = resultados

    state["mensagens"].append({"papel": "sistema", "conteudo": f"Busca concluída com {len(resultados)} resultados."})
    return state

def no_analise(state: EstadoAgente, llm: ChatOpenAI) -> EstadoAgente:
    """Gera uma análise estruturada baseada nos resultados."""
    contexto = "\n\n".join(
        [f"Fonte {i+1}: {r['titulo']}\n{r['conteudo']}" for i, r in enumerate(state.get("resultados_busca", []))]
    )

    prompt = (
        f"Analise os resultados para a consulta: \"{state['query']}\"\n\n"
        f"Resultados:\n{contexto}\n\n"
        "Produza uma análise estruturada com:\n"
        "1) Achados principais\n2) Temas recorrentes\n3) Lacunas\n4) Avaliação de qualidade\n"
    )

    resp = llm.invoke(prompt)
    state["analise"] = resp.content
    state["mensagens"].append({"papel": "assistente", "conteudo": resp.content})
    return state

def no_gerar_relatorio(state: EstadoAgente, llm: ChatOpenAI) -> EstadoAgente:
    """Gera um relatório final com base na análise."""
    prompt = (
        "Gere um relatório completo e bem organizado.\n\n"
        f"Consulta: {state['query']}\n\n"
        f"Análise:\n{state.get('analise','')}\n\n"
        "Inclua: resumo executivo, achados detalhados, evidências e conclusão.\n"
    )

    resp = llm.invoke(prompt)
    state["relatorio_final"] = resp.content
    state["mensagens"].append({"papel": "assistente", "conteudo": resp.content})
    return state

def no_validar(state: EstadoAgente, llm: ChatOpenAI) -> EstadoAgente:
    """Valida qualidade do relatório e retorna JSON interpretável."""
    prompt = (
        "Valide a qualidade do relatório abaixo.\n\n"
        f"Relatório:\n{state.get('relatorio_final','')}\n\n"
        "Critérios: completude, clareza, consistência e ausência de afirmações sem suporte.\n"
        "Responda estritamente em JSON no formato:\n"
        '{ "is_valid": true/false, "issues": ["..."], "confidence": 0.0 }\n'
    )

    resp = llm.invoke(prompt)
    validacao = json.loads(resp.content)

    state["validacao"] = validacao
    state["mensagens"].append({"papel": "sistema", "conteudo": f"Validação: {validacao}"})
    return state

def rota_pos_validacao(state: EstadoAgente) -> str:
    """Decide o próximo passo com base na validação e contagem de erros."""
    validacao = state.get("validacao") or {}
    if validacao.get("is_valid") and float(validacao.get("confidence", 0)) > 0.8:
        return "sucesso"
    if state.get("erros", 0) < 2:
        return "retry"
    return "falha"

def no_revisar(state: EstadoAgente) -> EstadoAgente:
    """Acrescenta feedback de revisão antes de tentar gerar novamente."""
    validacao = state.get("validacao") or {}
    issues = validacao.get("issues") or []
    feedback = "Problemas detectados: " + (", ".join(issues) if issues else "qualidade insuficiente") + ". Revisar o texto."

    state["mensagens"].append({"papel": "sistema", "conteudo": feedback})
    state["erros"] = int(state.get("erros", 0)) + 1
    return state

Checkpointing: por que é indispensável e como persistir

Checkpointing é a persistência do estado intermediário do agente, permitindo retomar uma execução sem reprocessar tudo. Sem isso, uma falha após dezenas de segundos e múltiplas chamadas externas obriga reiniciar do zero, elevando custo e latência. Com checkpoint, o workflow pode voltar do último ponto consistente. Essa estratégia também ajuda em reinícios de servidor e em execuções longas que precisam ser pausadas.

O checkpoint costuma ser armazenado no PostgreSQL como JSONB, com índices por thread e tempo. Um esquema simples inclui chaves como thread_id e checkpoint_id, além do payload do estado. Esse armazenamento facilita auditoria e limpeza controlada por retenção. A seguir, um exemplo de SQL para a tabela de checkpoints.

CREATE TABLE langgraph_checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_id TEXT NOT NULL,
    parent_id TEXT,
    checkpoint JSONB NOT NULL,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (thread_id, checkpoint_id)
);

CREATE INDEX idx_checkpoints_thread ON langgraph_checkpoints(thread_id);
CREATE INDEX idx_checkpoints_created ON langgraph_checkpoints(created_at);

Rate limiting em múltiplas camadas

Rate limiting é a limitação de quantidade de requisições por intervalo de tempo para proteger recursos. Em agentes, é comum aplicar limites em mais de um ponto, pois cada camada protege um tipo de gargalo diferente. No nível do Django, impede abuso e protege a aplicação web. No nível do Celery, evita explosão de backlog e uso descontrolado de workers.

No nível do provedor de LLM, respeita quotas e reduz falhas por excesso de chamadas. No nível de ferramenta, evita esgotar APIs específicas como busca, e-mail ou dados pagos. O conjunto dessas camadas cria previsibilidade sob picos e permite justiça entre usuários. A seguir, um exemplo simples de limitação de ferramenta usando Redis.

from functools import wraps
import redis

redis_client = redis.Redis(host="localhost", port=6379, db=0)

class ErroLimiteTaxa(Exception):
    pass

def limitar_ferramenta(max_chamadas: int, periodo_segundos: int):
    """Decorator para limitar chamadas de uma ferramenta por janela de tempo."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            chave = f"limite_ferramenta:{func.__name__}"
            atual = redis_client.get(chave)

            if atual and int(atual) >= max_chamadas:
                raise ErroLimiteTaxa(f"Limite da ferramenta {func.__name__} excedido.")

            pipe = redis_client.pipeline()
            pipe.incr(chave)
            pipe.expire(chave, periodo_segundos)
            pipe.execute()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@limitar_ferramenta(max_chamadas=100, periodo_segundos=3600)
def buscar_web(query: str) -> str:
    """Exemplo de ferramenta cara, protegida por limite de taxa."""
    return f"Resultados simulados para: {query}"

Observabilidade: métricas, logs estruturados e rastreio

Observabilidade é a capacidade de entender o que acontece no sistema por métricas, logs e eventos. Em agentes, isso é fundamental porque falhas podem ser intermitentes e dependem de sistemas externos. Métricas mostram volume, latência, taxa de erro e custo por execução. Logs estruturados facilitam buscar uma execução específica e entender a sequência de passos.

Também é importante medir fila, tempo em fila e tempo de execução, pois problemas de capacidade aparecem primeiro como backlog. Custos precisam ser tratados como métrica de produto, porque variam conforme tokens e ferramentas usadas. A seguir, um exemplo de instrumentação com Prometheus e logging simples, com contadores e histogramas para tempo e custo. O exemplo assume que o registro de execução já contém tokens e custo ao final.

import time
import logging
from functools import wraps
from prometheus_client import Counter, Histogram, Gauge

logger = logging.getLogger(__name__)

execucoes_total = Counter(
    "execucoes_agente_total",
    "Total de execuções de agente",
    ["tipo_agente", "status"]
)

duracao_execucao_segundos = Histogram(
    "execucao_agente_duracao_segundos",
    "Duração de execução do agente",
    ["tipo_agente"],
    buckets=[1, 5, 10, 30, 60, 120, 300]
)

custo_execucao_usd = Histogram(
    "execucao_agente_custo_usd",
    "Custo por execução em USD",
    ["tipo_agente"],
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)

agentes_ativos = Gauge(
    "agentes_ativos",
    "Quantidade de agentes em execução",
    ["tipo_agente"]
)

def instrumentar_execucao(func):
    """Decorator para medir tempo, sucesso/falha e custo."""
    @wraps(func)
    def wrapper(execucao_id: int, *args, **kwargs):
        from .models import ExecucaoAgente

        execucao = ExecucaoAgente.objects.get(id=execucao_id)
        tipo = execucao.tipo_agente

        inicio = time.time()
        agentes_ativos.labels(tipo_agente=tipo).inc()

        logger.info("execucao_iniciada", extra={"execucao_id": execucao_id, "tipo_agente": tipo})

        try:
            resultado = func(execucao_id, *args, **kwargs)

            duracao = time.time() - inicio
            execucoes_total.labels(tipo_agente=tipo, status="sucesso").inc()
            duracao_execucao_segundos.labels(tipo_agente=tipo).observe(duracao)
            custo_execucao_usd.labels(tipo_agente=tipo).observe(float(execucao.custo_usd))

            logger.info(
                "execucao_concluida",
                extra={"execucao_id": execucao_id, "duracao": duracao, "custo_usd": float(execucao.custo_usd)}
            )
            return resultado

        except Exception as exc:
            execucoes_total.labels(tipo_agente=tipo, status="falha").inc()
            logger.error("execucao_falhou", extra={"execucao_id": execucao_id, "erro": str(exc)})
            raise

        finally:
            agentes_ativos.labels(tipo_agente=tipo).dec()

    return wrapper

Resiliência com fallback de provedores e circuit breaker

Dependência de um único provedor de LLM cria um ponto único de falha. Um mecanismo de fallback permite alternar para outro provedor quando há indisponibilidade, erro recorrente ou degradação de latência. Para evitar alternância caótica, usa-se circuit breaker, que “abre o circuito” quando muitas falhas ocorrem em pouco tempo. Com o circuito aberto, o provedor é temporariamente evitado.

Além disso, a escolha pode considerar custo e complexidade da tarefa. Tarefas simples podem usar modelos mais baratos, enquanto tarefas críticas usam modelos mais robustos. Essa seleção controlada reduz custo sem comprometer o funcionamento. A seguir, um exemplo de roteador simples com registro de falhas e escolha ponderada, mantendo o código objetivo.

import time
import random
import logging
from typing import List, Dict, Tuple, Any

logger = logging.getLogger(__name__)

class RoteadorLLM:
    """Seleciona provedores de LLM saudáveis com fallback e circuit breaker simples."""

    def __init__(self):
        self.provedores: List[Dict[str, Any]] = [
            {"nome": "openai", "modelo": "gpt-4-turbo-preview", "prioridade": 1, "multiplicador_custo": 1.0},
            {"nome": "anthropic", "modelo": "claude-3-sonnet", "prioridade": 2, "multiplicador_custo": 0.8},
            {"nome": "google", "modelo": "gemini-1.5-pro", "prioridade": 3, "multiplicador_custo": 0.5},
        ]
        self.circuitos: Dict[str, Dict[str, Any]] = {}

    def circuito_aberto(self, nome: str) -> bool:
        estado = self.circuitos.get(nome)
        if not estado:
            return False

        falhas_recentes = [t for t in estado["falhas"] if time.time() - t < 300]
        if len(falhas_recentes) > 5 and (time.time() - estado["ultima_falha"] < 60):
            return True
        return False

    def registrar_falha(self, nome: str):
        if nome not in self.circuitos:
            self.circuitos[nome] = {"falhas": [], "ultima_falha": 0.0}
        self.circuitos[nome]["falhas"].append(time.time())
        self.circuitos[nome]["ultima_falha"] = time.time()
        logger.warning("circuito_aberto", extra={"provedor": nome})

    def escolher(self) -> Dict[str, Any]:
        saudaveis = [p for p in self.provedores if not self.circuito_aberto(p["nome"])]
        if not saudaveis:
            raise RuntimeError("Nenhum provedor de LLM está saudável no momento.")

        pesos = [1.0 / (p["prioridade"] * p["multiplicador_custo"]) for p in saudaveis]
        return random.choices(saudaveis, weights=pesos)[0]

Otimização de custo: cache, seleção de modelo e lote

O custo em agentes costuma ser dominado por tokens de LLM e chamadas externas. Por isso, custo precisa ser tratado como parte do design, com medição e mecanismos para reduzir repetição. O primeiro ganho geralmente vem de cache de prompts e resultados quando o contexto é o mesmo. Outro ganho importante é selecionar modelos mais baratos para tarefas simples.

Também existe economia por processamento em lote, quando várias entradas podem ser agrupadas em uma chamada. Em tarefas como classificação, isso reduz overhead e aumenta throughput. Um cuidado importante é manter limites de tamanho de prompt para não estourar contexto. A seguir, um exemplo de seleção de modelo por tipo de tarefa e tamanho aproximado de entrada.

def selecionar_modelo(tipo_tarefa: str, tamanho_entrada: int) -> str:
    """Seleciona modelo por complexidade para equilibrar custo e qualidade."""
    if tipo_tarefa in ("classificacao", "extracao") and tamanho_entrada < 500:
        return "gpt-3.5-turbo"
    if tipo_tarefa in ("geracao", "resumo") and tamanho_entrada < 2000:
        return "gpt-4-turbo-preview"
    if tipo_tarefa in ("raciocinio", "planejamento") or tamanho_entrada > 5000:
        return "gpt-4"
    return "gpt-3.5-turbo-16k"

Benchmarks e metas operacionais para escala

Metas operacionais traduzem “escala” em números verificáveis, como latência P95, taxa de erro e custo por execução. Em ambientes com fila, também é essencial acompanhar profundidade de fila e tempo em fila. Isso indica se a capacidade está acompanhando a demanda. Um sistema saudável mantém taxa de erro baixa mesmo sob picos, com retentativas e degradação controlada.

Além disso, limites como conexões ao banco e memória de workers precisam ser monitorados. Em longas execuções, vazamentos de memória podem surgir, e reciclar processos após X tarefas reduz risco. O ajuste de concorrência deve respeitar gargalos de CPU, rede e limites do provedor de LLM. Um desenho coerente mede tudo isso e permite correção antes de virar incidente.

Pontos de segurança e consistência em agentes autônomos

Agentes ampliam superfície de risco porque recebem entrada aberta e interagem com ferramentas. Um problema comum é prompt injection, quando uma entrada tenta manipular regras internas do agente. Outra preocupação é vazamento de dados sensíveis em logs ou em respostas geradas. Para minimizar riscos, validação de entrada, filtragem de saída e separação de dados por usuário são indispensáveis.

Chaves de API devem ser tratadas como segredo e nunca persistidas em locais inadequados. Auditoria de execução ajuda a identificar padrões de abuso, além de sustentar investigações. Limites por usuário e por ferramenta impedem que uma conta degrade o serviço para as demais. Um desenho seguro também evita que o agente acesse dados de outro usuário por engano, mantendo isolamento estrito.

Encerramento: um sistema que cresce sem travar

Um agente que funciona em demonstração pode falhar rapidamente quando entra tráfego real, porque produção exige previsibilidade, retentativas, controle de carga e rastreabilidade. A combinação de Django para orquestração, Celery para execução assíncrona e LangGraph para workflows com estado cria uma base estável para crescimento. Checkpoints reduzem custo e evitam retrabalho, enquanto filas e prioridades impedem travamentos sob pico. Observabilidade transforma incidentes em dados acionáveis, e otimização de custo mantém sustentabilidade.

Com essas camadas bem separadas, o sistema deixa de depender de “execuções perfeitas” e passa a operar de forma resiliente. Falhas externas viram eventos tratados, não catástrofes que derrubam o serviço. A escalabilidade passa a ser resultado de engenharia de execução e controle, e não apenas de escolher um modelo mais forte. O resultado final é um fluxo autônomo que mantém desempenho, controla custos e preserva estabilidade mesmo em volumes muito altos.