Dominando o Celery: Gerenciamento de Tarefas, Conexões com Banco de Dados e Escalabilidade em Django

Published on: 2025-12-25
Post image
pt celery django tarefas-assincronas processamento-em-background filas-de-tarefas escalabilidade-backend workers-celery concorrencia-python redis-rabbitmq arquitetura-backend

O Celery é uma biblioteca de filas de tarefas para Python que permite executar trabalho em segundo plano, fora do fluxo principal de uma aplicação. Esse modelo é conhecido como processamento assíncrono, isto é, operações que não precisam terminar imediatamente podem ser delegadas a processos separados, liberando a aplicação para responder mais rápido.

Em projetos com Django, o Celery costuma ser usado para enviar e-mails, processar arquivos, integrar serviços externos e executar rotinas pesadas sem bloquear requisições. Um uso profissional envolve compreender como tarefas são criadas e executadas, como conexões com banco de dados se comportam em processos longos e como escalar trabalhadores (workers) com segurança.

Arquitetura básica do Celery: broker, worker e backend

O Celery funciona como um sistema distribuído de execução de tarefas. O broker é o intermediário que recebe mensagens de tarefas e as mantém em filas, sendo comum usar Redis ou RabbitMQ. O worker é o processo que busca tarefas no broker e as executa, podendo existir vários workers em paralelo. O result backend é o local onde os resultados e estados são armazenados, o que ajuda no monitoramento e na recuperação de status. Essa separação permite que a aplicação publique tarefas rapidamente, enquanto a execução ocorre de forma independente.

Como tarefas são criadas: registro, serialização e metadados

Uma tarefa no Celery é uma função Python transformada em unidade de trabalho distribuível. Ao aplicar um decorador de tarefa, a função passa por registro, entrando em um catálogo interno com nome único, normalmente baseado no caminho completo do módulo. Em seguida, ocorre a serialização, que é a conversão dos argumentos para um formato transportável, como JSON. O Celery também adiciona metadados de execução, como identificador da tarefa, contagem de tentativas e timestamps. Esse conjunto permite que a execução seja reconstituída em outro processo, em outro servidor, com rastreabilidade.

O trecho a seguir mostra uma configuração mínima de aplicação Celery e uma tarefa simples registrada no app. Ele demonstra a estrutura típica usada em projetos Python e é suficiente para entender o papel do decorador e a publicação da tarefa. A conexão com broker é apontada para Redis em localhost, mas o mesmo padrão vale para outros brokers. A função de tarefa retorna um texto, que pode ser armazenado no backend de resultados se configurado. Esse modelo é a base para evoluir com filas, roteamento e políticas de retry.

from celery import Celery

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(name="meuapp.tasks.processar_pedido")
def processar_pedido(id_pedido: int) -> str:
    # Lógica principal da tarefa
    return f"Pedido {id_pedido} processado com sucesso"

Fluxo completo de execução: do delay ao resultado

O fluxo começa quando a aplicação invoca a tarefa com delay ou apply_async, que publicam uma mensagem no broker. A mensagem contém nome da tarefa, argumentos e opções, tudo serializado. Workers ficam em loop, buscando novas mensagens e, ao receber uma, realizam a desserialização para reconstruir a chamada. O worker então monta um contexto com logs, tratamento de erros e integração com o backend de resultados. Por fim, o retorno é serializado e armazenado, ou o erro é registrado com estado apropriado.

Existem formas diferentes de publicar tarefas, e o exemplo abaixo mostra três padrões frequentes. O primeiro publica imediatamente, o segundo agenda para o futuro usando countdown (atraso em segundos) e o terceiro tenta incorporar uma intenção de retry. Na prática, retry controlado costuma ser mais robusto quando implementado dentro da própria tarefa, especialmente com tarefas vinculadas (bound). Mesmo assim, entender opções de agendamento e publicação é essencial para controlar carga e cadência de execução. Esse conjunto compõe a interface mais comum do Celery em aplicações.

# Execução assíncrona simples
resultado1 = processar_pedido.delay(123)

# Execução com atraso (countdown em segundos)
resultado2 = processar_pedido.apply_async(args=[123], countdown=60)

# Publicação com parâmetros de execução (retry real normalmente é feito na tarefa bound)
resultado3 = processar_pedido.apply_async(args=[123])

Estados e ciclo de vida de uma tarefa

Uma tarefa do Celery transita por estados que facilitam observabilidade e controle. PENDING significa que a tarefa foi aceita mas ainda não começou, ou que o estado ainda não foi consultado. STARTED indica que um worker já iniciou a execução, quando essa emissão de estado está habilitada. SUCCESS representa conclusão com retorno, enquanto FAILURE indica término com exceção. Há ainda RETRY, quando a tarefa foi reprogramada após falha, e REVOKED, quando foi cancelada antes ou durante a execução.

A transição de estados depende do broker, do backend de resultados e das opções do worker, como confirmação tardia. Em sistemas que exigem alta confiabilidade, estados são usados para auditoria e para evitar duplicidade lógica. Mesmo quando tarefas podem ser reexecutadas, é comum garantir idempotência, isto é, executar a mesma tarefa mais de uma vez sem efeitos colaterais indesejados. Em Celery, isso costuma ser implementado com chaves únicas no banco, checagens de status e operações atômicas. O ciclo de vida bem compreendido reduz falhas silenciosas e facilita depuração.

Tarefas normais e tarefas vinculadas (bound): diferenças reais

Tarefas normais são funções sem acesso direto ao contexto de execução do Celery. Elas recebem argumentos e retornam valores, sendo ideais para rotinas simples e sem necessidade de introspecção. Já as tarefas vinculadas, chamadas de bound tasks, recebem a instância da tarefa como primeiro argumento (frequentemente nomeada self). Isso permite acessar informações como id da tarefa, número de tentativas e outros dados presentes em request. Essa capacidade habilita controle fino de retry, atualização de progresso e respostas diferentes por cenário.

O exemplo abaixo apresenta uma tarefa normal adequada para operações diretas. Ela ilustra a clareza de assinatura e o baixo acoplamento com infraestrutura. Esse padrão é comum quando erros são raros ou podem ser tratados fora do Celery. Também é frequente em tarefas de transformação simples ou rotinas curtas. Em ambientes de produção, essas tarefas ainda se beneficiam de timeouts e roteamento para filas específicas quando necessário.

from celery import Celery

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(name="meuapp.tasks.enviar_email")
def enviar_email(destinatario: str, assunto: str, corpo: str) -> str:
    # Exemplo simplificado; envio real dependeria do serviço de e-mail escolhido
    return f"E-mail enfileirado para {destinatario} com assunto '{assunto}'"

Retry avançado e contexto com tarefas bound

Tarefas bound são úteis quando falhas são esperadas, como integrações com APIs externas ou operações com dependências instáveis. O método self.retry reprograma a tarefa, podendo incluir atraso e a exceção original. Além disso, a contagem de tentativas fica disponível em self.request.retries, o que permite limitar tentativas e ajustar backoff. Esse padrão evita loops infinitos e cria comportamento previsível em falhas transitórias. Em produção, retries controlados reduzem incidentes e estabilizam a fila.

O código a seguir mostra uma tarefa bound com retry, mantendo a lógica de falha e reexecução no próprio lugar. O parâmetro countdown representa atraso antes da próxima tentativa, e o controle de máximo é implementado com checagem explícita. Esse modelo é útil quando o motivo da falha pode desaparecer com o tempo, como latência, rate limit ou indisponibilidade. Ao final, a exceção é propagada caso o limite seja alcançado, garantindo que o erro apareça como FAILURE. Esse comportamento preserva rastreabilidade e facilita alarmes operacionais.

from celery import Celery

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(bind=True, name="meuapp.tasks.processamento_complexo", max_retries=3)
def processamento_complexo(self, dados: dict) -> dict:
    try:
        # Exemplo: chamada a serviço externo ou lógica sujeita a falhas transitórias
        resultado = {"status": "ok", "eco": dados}
        return resultado
    except Exception as exc:
        if self.request.retries < 3:
            # Nova tentativa após 60 segundos, preservando a exceção
            raise self.retry(countdown=60, exc=exc)
        raise

Progresso e estados personalizados durante a execução

Algumas tarefas são longas e se beneficiam de indicação de andamento. O Celery permite atualizar estado com update_state, que registra um estado e um dicionário de metadados. Isso costuma ser combinado com um backend de resultados para consulta posterior. Um estado como PROGRESS não é padrão, mas é aceito como estado customizado para monitoramento. Esse padrão melhora observabilidade em importações, uploads, processamento de lotes e geração de relatórios.

O exemplo abaixo simula envio por partes, calculando porcentagem e gravando metadados. O estado é atualizado a cada bloco processado, informando total e atual. Mesmo sendo um exemplo simplificado, ele mostra a estrutura típica usada em tarefas reais. Em produção, a frequência de atualização costuma ser limitada para não gerar excesso de escrita no backend. Ao final, o retorno descreve conclusão, podendo ser persistido conforme configuração. Essa abordagem padroniza acompanhamento sem bloquear o fluxo principal da aplicação.

import os
from celery import Celery

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(bind=True, name="meuapp.tasks.enviar_arquivo_com_progresso")
def enviar_arquivo_com_progresso(self, caminho_arquivo: str, destino: str) -> dict:
    tamanho = os.path.getsize(caminho_arquivo)
    enviado = 0

    with open(caminho_arquivo, "rb") as arquivo:
        while True:
            bloco = arquivo.read(8192)
            if not bloco:
                break

            # Simulação de envio do bloco
            enviado += len(bloco)

            progresso = int((enviado / tamanho) * 100)
            self.update_state(
                state="PROGRESS",
                meta={"atual": enviado, "total": tamanho, "progresso": progresso},
            )

    return {"status": "concluido", "destino": destino}

Conexões com banco de dados no Django dentro de workers Celery

O Django foi desenhado com foco em ciclo de requisição e resposta, onde conexões podem ser abertas e fechadas ao fim de cada request. Workers Celery, por outro lado, são processos de longa duração, executando milhares de tarefas sem reiniciar. Isso pode levar a conexões reaproveitadas por tempo demais, gerando erros como “conexão encerrada” ou timeouts no servidor de banco. Também pode ocorrer acúmulo de conexões quando a concorrência é alta e não existe controle de limites. Por esse motivo, gestão explícita e políticas de reciclagem são importantes.

O cenário mais comum envolve tarefas que usam ORM do Django para consultas e gravações. Quando o banco encerra conexões inativas, um worker pode tentar reutilizar uma conexão quebrada. Outra fonte de problemas é alta concorrência criando mais conexões do que o banco permite, especialmente em filas de I/O. Assim, boas práticas incluem fechar conexões após tarefas críticas, validar saúde da conexão antes de operações e ajustar parâmetros de persistência. Uma configuração alinhada evita instabilidade intermitente e falhas difíceis de reproduzir.

Fechamento explícito e transações atômicas

Uma prática importante é garantir que operações críticas sejam consistentes e que conexões não fiquem em estado indefinido. O Django fornece transaction.atomic, que cria uma transação atômica, ou seja, ou tudo é gravado, ou nada é gravado. Ao final da tarefa, pode ser necessário fechar conexões com connections.close_all, principalmente em tarefas intensivas ou com grande volume. Isso reduz reutilização de conexões envelhecidas e ajuda o banco a gerenciar recursos. Em ambientes com muitos workers, esse padrão também diminui vazamentos de conexões em cenários de erro.

O exemplo a seguir mostra uma tarefa que cria registros dentro de transação e garante fechamento de conexões no bloco finally. Esse formato evita que exceções interrompam o fechamento, mesmo quando a tarefa falha. A transação protege integridade quando múltiplas escritas precisam ser coerentes. Em produção, esse padrão também costuma ser combinado com restrições de unicidade e validações de negócio para garantir idempotência. A soma dessas medidas torna o processamento assíncrono mais previsível.

from celery import Celery
from django.db import connections, transaction
from minha_app.models import MeuModelo

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(name="meuapp.tasks.tarefa_intensiva_banco")
def tarefa_intensiva_banco() -> str:
    try:
        with transaction.atomic():
            MeuModelo.objects.create(nome="exemplo")
        return "Operação concluída"
    finally:
        # Fecha conexões para evitar reaproveitamento de conexões quebradas
        connections.close_all()

Checagem de saúde da conexão e recuperação de falhas operacionais

Além de fechar conexões, é comum validar se a conexão está ativa antes de iniciar consultas. O Django expõe ensure_connection, que tenta garantir uma conexão utilizável. Em falhas do tipo OperationalError, pode ser necessário fechar e reabrir a conexão para retomar operações. Esse padrão é útil em bancos que encerram conexões por ociosidade, em redes instáveis ou em janelas de manutenção. Com isso, tarefas passam a se recuperar de erros transitórios sem falhar permanentemente.

O código abaixo demonstra uma abordagem defensiva com tentativa de reconectar. Ele separa o momento de verificar a conexão e o momento de executar o trabalho. Em caso de erro operacional, a conexão é fechada e reestabelecida, e a operação é tentada novamente. Em tarefas críticas, esse padrão pode ser combinado com retry do Celery, criando duas camadas de resiliência. Essa combinação reduz falhas por instabilidade breve de infraestrutura.

from celery import Celery
from django.db import connection
from django.db.utils import OperationalError

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

def executar_trabalho_banco() -> None:
    # Função exemplo: aqui entrariam consultas e atualizações no ORM
    connection.cursor().execute("SELECT 1")

@app.task(name="meuapp.tasks.tarefa_banco_segura")
def tarefa_banco_segura() -> str:
    try:
        connection.ensure_connection()
        executar_trabalho_banco()
        return "Trabalho executado com conexão saudável"
    except OperationalError:
        connection.close()
        connection.ensure_connection()
        executar_trabalho_banco()
        return "Trabalho executado após reconexão"

Uso de bancos dedicados e roteamento de operações

Em alguns sistemas, separar carga de leitura e escrita ajuda a estabilidade. O Django permite múltiplas conexões com o dicionário DATABASES e também permite escolher o banco com using. Uma estratégia é direcionar tarefas de alto volume para um banco dedicado, reduzindo impacto no banco principal da aplicação web. Outra estratégia é usar réplicas de leitura para tarefas analíticas. A escolha depende de arquitetura, requisitos de consistência e limites do servidor de banco.

O exemplo abaixo ilustra dois bancos e uma tarefa que escreve especificamente no banco das tarefas. A opção CONN_MAX_AGE em zero indica que conexões não são persistidas, reduzindo risco de conexões envelhecidas em workers. Esse tipo de separação facilita dimensionamento independente e manutenção. Também permite aplicar políticas diferentes de pool e de limites de conexão por tipo de carga. Em ambientes grandes, esse desenho ajuda a reduzir contenção e gargalos.

# settings.py (exemplo)
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "principal",
    },
    "celery_tasks": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "tarefas",
        "CONN_MAX_AGE": 0,
    },
}
from celery import Celery
from minha_app.models import LogTarefa

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(name="meuapp.tasks.registrar_log_tarefa")
def registrar_log_tarefa(status: str) -> int:
    registro = LogTarefa.objects.using("celery_tasks").create(status=status)
    return registro.id

Concorrência de workers: prefork, gevent e eventlet

Concorrência define quantas tarefas podem ser processadas ao mesmo tempo por um worker. O modelo prefork usa múltiplos processos, oferecendo paralelismo real e isolamento, sendo indicado para tarefas CPU-bound, ou seja, limitadas por processamento. O modelo gevent usa cooperação com greenlets, indicado para tarefas I/O-bound, como rede e espera de disco, permitindo alta concorrência com baixo consumo de memória. O eventlet é alternativa semelhante ao gevent, com o mesmo objetivo de I/O cooperativo. A escolha errada pode causar travamentos, baixa vazão ou excesso de conexões.

Em prefork, cada processo tem seu próprio interpretador e memória, o que aumenta custo, mas reduz interferência entre tarefas. Em gevent/eventlet, bibliotecas precisam ser compatíveis com monkey patching, e bloqueios podem comprometer a concorrência. Em cenários mistos, é comum separar filas e criar pools diferentes por tipo de carga. Esse desenho permite que tarefas de CPU não bloqueiem tarefas de I/O, e vice-versa. Uma arquitetura com roteamento por fila tende a ser mais estável sob picos.

Monitoramento interno: inspeção de tarefas e estatísticas

O Celery fornece um mecanismo de controle que consulta workers e obtém informações em tempo real. A função inspect permite listar tarefas ativas, estatísticas e tarefas registradas. Esse tipo de visibilidade ajuda a diagnosticar filas travadas, tarefas demoradas e saturação de concorrência. Em produção, esses dados também são usados para alertas, dashboards e auditoria operacional. A inspeção não substitui métricas completas, mas oferece um ponto de partida confiável.

O exemplo abaixo retorna um dicionário com tarefas ativas, estatísticas e lista de tarefas conhecidas pelos workers. Em ambientes distribuídos, o resultado vem por worker, o que permite identificar qual nó está sobrecarregado. Essas informações também ajudam a confirmar se um deploy registrou tarefas corretamente. Em situações de falha, a comparação entre registered e expected pode indicar imports quebrados ou autodiscovery incompleto. Esse monitoramento reduz tempo de análise em incidentes.

from celery import current_app

def obter_dados_monitoramento() -> dict:
    inspetor = current_app.control.inspect()
    return {
        "ativas": inspetor.active(),
        "estatisticas": inspetor.stats(),
        "registradas": inspetor.registered(),
    }

Filas e roteamento de tarefas para separar tipos de carga

Roteamento de tarefas permite enviar diferentes tarefas para filas diferentes. Isso é essencial quando existem perfis distintos, como e-mails, processamento de imagem e integração externa. Ao separar filas, cada fila pode ter workers com o pool e a concorrência adequados, reduzindo competição por recursos. Esse desenho também facilita priorização, pois filas críticas podem ter mais capacidade. Em sistemas grandes, roteamento é uma das ferramentas mais importantes para previsibilidade.

O exemplo abaixo mostra um mapeamento simples de rotas e a inicialização de workers especializados por fila. Em prática, os nomes de tarefas devem refletir módulos reais e a organização do projeto. Essa separação reduz interferência entre tarefas pesadas e tarefas rápidas. Também ajuda a limitar conexões com banco em tarefas de I/O por meio de pools específicos. O resultado é um sistema mais controlável e escalável.

# settings.py (exemplo)
CELERY_TASK_ROUTES = {
    "meuapp.tasks.tarefa_cpu_pesada": {"queue": "cpu_pesado"},
    "meuapp.tasks.tarefa_io_pesada": {"queue": "io_pesado"},
    "meuapp.tasks.enviar_email": {"queue": "emails"},
}

Configuração de escala: prefetch, acks tardios e reciclagem de processos

Escalar Celery não é apenas aumentar quantidade de workers, mas também ajustar comportamento de consumo e confirmação. O worker_prefetch_multiplier define quantas mensagens cada processo reserva antecipadamente; valores altos podem causar desigualdade de carga e amplificar problemas com tarefas longas. A opção task_acks_late confirma a tarefa apenas após concluir, reduzindo risco de perda quando um worker cai no meio da execução. A opção worker_max_tasks_per_child recicla processos após certo número de tarefas, reduzindo impacto de vazamentos de memória. Essas opções formam um conjunto comum para ambientes de produção.

O bloco a seguir mostra uma configuração típica de Celery com serialização JSON e ajustes de worker para previsibilidade. JSON facilita interoperabilidade e reduz riscos de serialização perigosa quando comparado a formatos que executam código. A reciclagem de processos é uma prática defensiva em tarefas que usam bibliotecas nativas ou manipulam arquivos grandes. A expiração de resultados evita crescimento infinito do backend. Esse conjunto representa uma base sólida para evoluir com monitoramento e roteamento.

# celeryconfig.py (exemplo)
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/0"

task_serializer = "json"
accept_content = ["json"]
result_serializer = "json"
timezone = "UTC"
enable_utc = True

worker_prefetch_multiplier = 1
task_acks_late = True
worker_max_tasks_per_child = 1000

result_expires = 3600

worker_send_task_events = True
task_send_sent_event = True

Autoescala e dimensionamento orientado por fila

O Celery oferece autoscaling interno, ajustando o número de processos do worker conforme demanda. Esse recurso é útil quando há variação forte de carga ao longo do tempo. Mesmo com autoscale, é importante manter limites mínimos e máximos alinhados à capacidade do banco e do broker. Em muitos casos, é preferível escalar por fila, garantindo que cada tipo de tarefa tenha recursos proporcionais. O dimensionamento deve considerar CPU, memória, conexões de banco e limites de serviços externos.

O exemplo abaixo mostra configuração e comando de worker com autoscale. O par max,min define o teto e o piso de processos dentro do mesmo worker. Em tarefas I/O-bound, autoscale pode elevar concorrência rapidamente e aumentar conexões, exigindo atenção a limites do banco e de APIs externas. Já em tarefas CPU-bound, o teto costuma ser próximo ao número de núcleos disponíveis. Um autoscale bem calibrado reduz custos e melhora tempos de fila sem instabilidade.

# celeryconfig.py (exemplo)
worker_autoscale_max = 10
worker_autoscale_min = 2

Coleta de métricas com sinais: duração e falhas

O Celery expõe signals (sinais) para capturar eventos do ciclo de vida das tarefas. Entre os mais usados estão task_prerun, task_postrun e task_failure. Esses sinais permitem medir duração, contar erros e gerar logs estruturados. Mesmo sem um sistema externo de métricas, esse padrão já melhora muito a observabilidade. Em produção, esses dados podem alimentar alertas e análises de performance.

O exemplo a seguir registra tempo inicial no prerun, calcula duração no postrun e registra falha no failure. Ele usa um dicionário em memória para simplificar, o que funciona por processo, mas ilustra a ideia central. Em sistemas maiores, a gravação pode ser enviada para um coletor central ou persistida em armazenamento apropriado. O log de duração ajuda a identificar tarefas que cresceram com o tempo ou inputs que causam degradação. O log de falhas ajuda a diferenciar erros transitórios de erros lógicos.

import logging
import time
from celery.signals import task_prerun, task_postrun, task_failure

metricas_tarefas = {}

@task_prerun.connect
def ao_iniciar_tarefa(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
    metricas_tarefas[task_id] = {"inicio": time.time(), "nome": getattr(task, "name", "desconhecida")}

@task_postrun.connect
def ao_finalizar_tarefa(sender=None, task_id=None, task=None, retval=None, state=None, **extras):
    dados = metricas_tarefas.pop(task_id, None)
    if dados:
        duracao = time.time() - dados["inicio"]
        logging.info(f"Tarefa {dados['nome']} finalizada em {duracao:.2f}s com estado {state}")

@task_failure.connect
def ao_falhar_tarefa(sender=None, task_id=None, exception=None, einfo=None, **extras):
    dados = metricas_tarefas.pop(task_id, None)
    nome = getattr(sender, "name", "desconhecida")
    logging.error(f"Tarefa {nome} falhou (task_id={task_id}): {exception}")

Otimização de desempenho: granularidade, grupos e lotes

Um erro comum é criar tarefas grandes demais, que seguram recursos por muito tempo e aumentam chance de falhas longas. Melhorar granularidade significa dividir trabalho em unidades menores, que podem ser distribuídas por vários workers. O Celery oferece primitivas como group, que dispara várias tarefas em paralelo e permite aguardar o conjunto. Essa abordagem melhora vazão e reduz impacto de uma única falha. Também facilita retentativas, pois apenas partes falhas precisam ser reexecutadas.

O exemplo abaixo apresenta uma evolução: evitar uma tarefa monolítica e preferir batches e, depois, paralelismo com group. A divisão por lote reduz overhead quando há milhões de itens, pois cria um número controlado de mensagens. O paralelismo com group é útil quando cada item é independente. Em ambientes com banco, essa divisão também reduz transações gigantes e melhora controle de locks. A escolha ideal equilibra overhead de fila e tempo de processamento por unidade.

from celery import Celery, group

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task(name="meuapp.tasks.processar_usuario")
def processar_usuario(id_usuario: int) -> str:
    return f"Usuário {id_usuario} processado"

def processar_todos_usuarios(ids_usuarios: list[int]):
    # Dispara tarefas em paralelo
    job = group(processar_usuario.s(uid) for uid in ids_usuarios)
    return job.apply_async()

Resiliência com circuit breaker: evitando colapso por falhas em cascata

Quando uma dependência externa falha, retries em massa podem piorar a situação e saturar filas. O padrão circuit breaker (disjuntor) reduz danos ao bloquear temporariamente execuções após um número de falhas. Ele tem estados típicos: fechado (permite), aberto (bloqueia) e meia-aberto (testa novamente após um tempo). Isso protege a aplicação de sobrecarga e evita explosão de tentativas. Em tarefas Celery, pode ser combinado com retries e atrasos para recuperação gradual.

O exemplo abaixo apresenta uma implementação simples em memória, adequada para explicar a lógica. Em sistemas distribuídos, um circuito global costuma ser armazenado em Redis ou banco para consistência entre workers, mas o mecanismo de estado é o mesmo. Ao detectar que o circuito está aberto, a tarefa falha rapidamente, economizando chamadas externas. Ao retornar para meia-aberto, uma execução é permitida para testar se o serviço voltou. Esse padrão reduz filas inchadas e melhora estabilidade sob incidentes externos.

from datetime import datetime, timedelta
from celery import Celery

app = Celery("meuapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

class CircuitBreaker:
    def __init__(self, limite_falhas: int = 5, timeout_segundos: int = 60):
        self.limite_falhas = limite_falhas
        self.timeout = timedelta(seconds=timeout_segundos)
        self.falhas = 0
        self.ultima_falha = None
        self.estado = "FECHADO"  # FECHADO, ABERTO, MEIO_ABERTO

    def pode_executar(self) -> bool:
        if self.estado == "ABERTO":
            if self.ultima_falha and (datetime.now() - self.ultima_falha) > self.timeout:
                self.estado = "MEIO_ABERTO"
                return True
            return False
        return True

    def sucesso(self) -> None:
        self.falhas = 0
        self.estado = "FECHADO"

    def falha(self) -> None:
        self.falhas += 1
        self.ultima_falha = datetime.now()
        if self.falhas >= self.limite_falhas:
            self.estado = "ABERTO"

circuito = CircuitBreaker()

def chamar_api_externa(dados: dict) -> dict:
    # Simulação; em produção seria uma chamada HTTP real
    return {"ok": True, "dados": dados}

@app.task(bind=True, name="meuapp.tasks.tarefa_api_externa", max_retries=3)
def tarefa_api_externa(self, dados: dict) -> dict:
    if not circuito.pode_executar():
        raise Exception("Circuit breaker ABERTO: chamadas bloqueadas temporariamente")

    try:
        resposta = chamar_api_externa(dados)
        circuito.sucesso()
        return resposta
    except Exception as exc:
        circuito.falha()
        raise self.retry(countdown=60, exc=exc)

Conclusão: domínio do Celery com previsibilidade e escala

O uso sólido de Celery depende de compreender a jornada da tarefa, desde o registro e a serialização até execução e armazenamento de resultados. A distinção entre tarefas normais e bound define o nível de controle possível, especialmente para retries e progresso. Em Django, a gestão de conexões de banco é um ponto crítico, porque workers são processos longos e podem reaproveitar conexões quebradas ou exceder limites. Ao combinar fechamento explícito, checagem de saúde e estratégias de bancos dedicados, o processamento assíncrono se torna mais estável.

Escala e desempenho surgem de decisões coerentes: modelo de concorrência adequado ao tipo de carga, roteamento por filas, prefetch controlado e confirmações tardias quando necessário. Monitoramento por inspeção e sinais, somado a métricas de duração e falhas, dá visibilidade operacional e reduz tempo de diagnóstico. Otimizações de granularidade e padrões de resiliência como circuit breaker evitam colapsos em cascata e melhoram a recuperação. Com esses componentes, o Celery deixa de ser apenas uma fila e passa a ser uma base consistente para processamento distribuído em produção.