The Ultimate Django AI Stack 2026: Async ORM, LangGraph, HTMX e Celery para Aplicações Full-Stack Inteligentes em Produção

Published on: 2026-01-25
Post image
pt django-ai-stack django-async-orm django-5-async django-ai-applications langgraph-django langgraph-workflows htmx-django django-htmx-async django-celery-redis django-ai-full-stack python-ai-backend ai-document-analysis-django async-django-archi

Em 2026, aplicações com inteligência artificial deixaram de ser “um chatbot colado na interface” e passaram a funcionar como sistemas completos, capazes de interpretar dados, manter estado e responder em tempo real. Nesse cenário, uma pilha moderna combina backend assíncrono, orquestração de tarefas, armazenamento confiável e uma camada de IA com fluxo controlado para entregar respostas rápidas e consistentes.

Uma arquitetura forte para esse tipo de produto une ORM assíncrona no Django, uma camada de agentes com LangGraph (fluxos com estado e ciclos), interfaces com HTMX (interatividade dirigida pelo servidor) e infraestrutura com PostgreSQL, Redis e Celery. O resultado é um aplicativo full-stack com experiência “viva” sem depender de frameworks pesados no navegador.

Visão geral do stack e por que ele funciona bem junto

Esse stack funciona porque cada parte resolve um tipo de gargalo comum em aplicações inteligentes. O Django com suporte assíncrono reduz bloqueios em operações de I/O, como acesso ao banco e streaming de resposta. O LangGraph organiza a IA em etapas, com memória e possibilidade de reprocessamento controlado. O HTMX atualiza trechos da página sem um SPA completo, mantendo HTML como unidade de composição. PostgreSQL sustenta dados persistentes e Redis dá suporte a cache, fila e canal em tempo real.

O que está sendo construído: análise inteligente de documentos ponta a ponta

Um sistema de análise de documentos representa bem os desafios reais: upload, extração de texto, processamento multi-etapas, progresso em tempo real e resultados consultáveis. A análise é executada em segundo plano para não travar requisições web. A interface precisa refletir estados como “pendente”, “processando”, “concluído” e “falhou” rapidamente. Também existe um fluxo conversacional ligado ao documento, preservando contexto e histórico. Esse conjunto cobre a maior parte dos cenários de aplicações inteligentes modernas.

Arquitetura lógica em camadas

A arquitetura pode ser entendida como uma sequência de camadas, do HTML dinâmico até a infraestrutura. No topo, o HTMX dispara requisições e substitui fragmentos de HTML no DOM, criando sensação de aplicação reativa. No meio, o Django entrega views assíncronas e endpoints para streaming, além de Channels quando WebSocket for necessário. A camada de IA é um grafo de execução com LangGraph, mantendo estado e permitindo ciclos de validação. Na base, PostgreSQL guarda documentos e resultados, Redis apoia cache e mensageria e o Celery roda tarefas longas fora do request.

Preparação do projeto e dependências principais

O projeto precisa de dependências de web, infraestrutura e IA, todas instaladas em um ambiente isolado. Um ambiente virtual evita conflitos de versões e facilita replicação em produção. O Django opera o servidor e o ORM, enquanto psycopg conecta ao PostgreSQL. O Redis dá suporte ao broker do Celery, cache e, com Channels, ao layer de comunicação. A IA é conduzida por LangGraph e LangChain, com provedores como OpenAI ou Anthropic para o modelo.

# criar e ativar ambiente virtual
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# dependências web e infraestrutura
pip install Django==5.0.1
pip install psycopg[binary]==3.1.16
pip install redis==5.0.1
pip install celery[redis]==5.3.6
pip install django-celery-results==2.5.1
pip install channels[daphne]==4.0.0
pip install channels-redis==4.1.0

# IA
pip install langgraph==0.0.20
pip install langchain==0.1.0
pip install langchain-openai==0.0.5
pip install langchain-anthropic==0.1.1
pip install openai==1.10.0

# utilitários
pip install python-dotenv==1.0.0
pip install httpx==0.26.0

Estrutura de diretórios: separação clara de responsabilidades

Uma estrutura organizada reduz acoplamento e melhora manutenção. A pasta config concentra settings, ASGI e Celery. A pasta apps separa o domínio principal de documentos e a camada de comunicação em tempo real. A pasta templates guarda HTML e componentes reutilizáveis, ideais para HTMX. A pasta static contém arquivos estáticos locais quando necessário. Essa organização facilita crescimento do sistema com novas features sem virar um monólito confuso.

Configuração do Django para assíncrono, banco e filas

A configuração assíncrona depende do ASGI, que é a interface de servidor para aplicações Python assíncronas. O banco PostgreSQL é configurado no Django, mas a ORM assíncrona aparece principalmente nos métodos async como acount e aiterator. Para cache e Channels, o Redis é configurado em CACHES e em CHANNEL_LAYERS. O Celery usa Redis como broker e grava resultados via django-celery-results. Variáveis em arquivo .env centralizam segredos e URLs.

# config/settings.py
import os
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

BASE_DIR = Path(__file__).resolve().parent.parent

SECRET_KEY = os.getenv("SECRET_KEY", "trocar-esta-chave-em-producao")
DEBUG = os.getenv("DEBUG", "False") == "True"
ALLOWED_HOSTS = ["localhost", "127.0.0.1"]

INSTALLED_APPS = [
    "daphne",  # importante para suporte ASGI
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "channels",
    "django_celery_results",
    "apps.core",
    "apps.api",
]

MIDDLEWARE = [
    "django.middleware.security.SecurityMiddleware",
    "django.contrib.sessions.middleware.SessionMiddleware",
    "django.middleware.common.CommonMiddleware",
    "django.middleware.csrf.CsrfViewMiddleware",
    "django.contrib.auth.middleware.AuthenticationMiddleware",
    "django.contrib.messages.middleware.MessageMiddleware",
    "django.middleware.clickjacking.XFrameOptionsMiddleware",
]

ROOT_URLCONF = "config.urls"

DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": os.getenv("DB_NAME", "django_ai_db"),
        "USER": os.getenv("DB_USER", "postgres"),
        "PASSWORD": os.getenv("DB_PASSWORD", ""),
        "HOST": os.getenv("DB_HOST", "localhost"),
        "PORT": os.getenv("DB_PORT", "5432"),
        "ATOMIC_REQUESTS": False,  # evita transações automáticas que atrapalham async
        "CONN_MAX_AGE": 600,
    }
}

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")

CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.redis.RedisCache",
        "LOCATION": f"redis://{REDIS_HOST}:{REDIS_PORT}/0",
    }
}

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [(REDIS_HOST, int(REDIS_PORT))], "capacity": 1500, "expiry": 10},
    }
}

CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/1"
CELERY_RESULT_BACKEND = "django-db"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")

ASGI_APPLICATION = "config.asgi.application"

TEMPLATES = [
    {
        "BACKEND": "django.template.backends.django.DjangoTemplates",
        "DIRS": [BASE_DIR / "templates"],
        "APP_DIRS": True,
        "OPTIONS": {
            "context_processors": [
                "django.template.context_processors.debug",
                "django.template.context_processors.request",
                "django.contrib.auth.context_processors.auth",
                "django.contrib.messages.context_processors.messages",
            ],
        },
    }
]

STATIC_URL = "/static/"
STATICFILES_DIRS = [BASE_DIR / "static"]
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"

Celery no Django: execução fora do request e rastreio de resultados

O Celery é um sistema de filas de tarefas para processamentos demorados, como análise de documentos por IA. O Django envia uma tarefa para o broker (Redis) e devolve resposta rápida ao navegador. Um worker Celery executa a tarefa em segundo plano e grava resultados no banco. O pacote django-celery-results permite rastrear estado e persistir resultados. Essa separação evita timeouts e permite escalar processamento de forma independente do servidor web.

# config/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery("django_ai_stack")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def tarefa_debug(self):
    print(f"Requisição: {self.request!r}")
# config/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

ASGI e Channels: suporte a SSE e WebSocket

O ASGI substitui o antigo WSGI quando se precisa de async, streaming e WebSocket. No Django, o arquivo asgi.py cria a aplicação principal e adiciona roteamento para WebSocket via Channels. O ProtocolTypeRouter direciona tráfego HTTP comum e conexões websocket. O AuthMiddlewareStack permite que a sessão e o usuário autenticado existam também no WebSocket. Essa configuração habilita recursos em tempo real sem trocar o framework principal.

# config/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

django_asgi_app = get_asgi_application()

from apps.api import routing

application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        "websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
    }
)

Modelos de dados: documentos, conversas e rastreio de execução

Os modelos persistem o estado que torna o sistema “inteligente” ao longo do tempo. O modelo Document guarda o arquivo, o texto extraído e os resultados (resumo, entidades, pontos-chave e sentimento). O modelo Conversation mantém um thread_id e mensagens para preservar contexto de interações. O modelo AgentExecution registra etapas e tempos para auditoria e depuração. Índices no banco reduzem custo de listagens e consultas frequentes por usuário, status e data.

# apps/core/models.py
import uuid
from django.db import models
from django.contrib.auth.models import User

class Document(models.Model):
    STATUS_CHOICES = [
        ("pending", "Pendente"),
        ("processing", "Processando"),
        ("completed", "Concluído"),
        ("failed", "Falhou"),
    ]

    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="documents")
    title = models.CharField(max_length=255)
    file = models.FileField(upload_to="documents/%Y/%m/%d/")
    content = models.TextField(blank=True)

    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default="pending")

    summary = models.TextField(blank=True)
    key_points = models.JSONField(default=list, blank=True)
    sentiment = models.CharField(max_length=50, blank=True)
    entities = models.JSONField(default=list, blank=True)
    metadata = models.JSONField(default=dict, blank=True)

    task_id = models.CharField(max_length=255, blank=True)
    progress = models.IntegerField(default=0)
    error_message = models.TextField(blank=True)

    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        db_table = "documents"
        ordering = ["-created_at"]
        indexes = [
            models.Index(fields=["user", "status"]),
            models.Index(fields=["created_at"]),
        ]

    def __str__(self):
        return f"{self.title} - {self.status}"

class Conversation(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="conversations")
    document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="conversations", null=True, blank=True)

    thread_id = models.CharField(max_length=255, unique=True, db_index=True)
    state = models.JSONField(default=dict)
    messages = models.JSONField(default=list)

    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = "conversations"
        ordering = ["-updated_at"]

    def __str__(self):
        return f"Conversa {self.thread_id}"

class AgentExecution(models.Model):
    conversation = models.ForeignKey(Conversation, on_delete=models.CASCADE, related_name="executions")
    step_name = models.CharField(max_length=100)
    step_type = models.CharField(max_length=50)
    input_data = models.JSONField(default=dict)
    output_data = models.JSONField(default=dict)
    duration_ms = models.IntegerField(default=0)
    success = models.BooleanField(default=True)
    error = models.TextField(blank=True)

    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = "agent_executions"
        ordering = ["created_at"]
        indexes = [models.Index(fields=["conversation", "created_at"])]

LangGraph: agente com estado, etapas e validação

O LangGraph organiza a IA como um grafo de nós, onde cada nó é uma etapa do processamento. Diferente de uma cadeia linear, o grafo permite ciclos e decisões condicionais, o que ajuda em validação e reprocessamento. O estado é modelado como um dicionário tipado, mantendo mensagens, resultados parciais e flags de controle. O MemorySaver guarda checkpoints de estado por thread_id, permitindo retomada e consistência. Esse padrão dá previsibilidade para aplicações que exigem auditoria e repetibilidade.

# apps/core/services/langraph_agent.py
from typing import TypedDict, Annotated, Sequence
import operator
import json
import logging

from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

logger = logging.getLogger(__name__)

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    document_content: str
    analysis_steps: list
    current_step: str
    summary: str
    key_points: list
    sentiment: str
    entities: list
    needs_human_input: bool

class DocumentAnalysisAgent:
    def __init__(self, model_name: str = "gpt-4-turbo-preview"):
        self.llm = ChatOpenAI(model=model_name, temperature=0)
        self.memory = MemorySaver()
        self.app = self._build_graph()

    def _build_graph(self):
        workflow = StateGraph(AgentState)

        workflow.add_node("extract_content", self.extract_content)
        workflow.add_node("analyze_sentiment", self.analyze_sentiment)
        workflow.add_node("extract_entities", self.extract_entities)
        workflow.add_node("generate_summary", self.generate_summary)
        workflow.add_node("extract_key_points", self.extract_key_points)
        workflow.add_node("validate_results", self.validate_results)

        workflow.set_entry_point("extract_content")
        workflow.add_edge("extract_content", "analyze_sentiment")
        workflow.add_edge("analyze_sentiment", "extract_entities")
        workflow.add_edge("extract_entities", "generate_summary")
        workflow.add_edge("generate_summary", "extract_key_points")
        workflow.add_edge("extract_key_points", "validate_results")

        workflow.add_conditional_edges(
            "validate_results",
            self.should_continue,
            {"continue": "analyze_sentiment", "end": END},
        )

        return workflow.compile(checkpointer=self.memory)

    async def extract_content(self, state: AgentState) -> AgentState:
        logger.info("Etapa: extração e limpeza")
        content = state.get("document_content", "")

        prompt = (
            "Extraia e limpe o conteúdo a seguir. Remova artefatos de formatação, "
            "corrija erros óbvios e reorganize em texto claro.\n\n"
            f"Documento:\n{content[:2000]}\n\n"
            "Retorne apenas o conteúdo limpo e estruturado."
        )

        response = await self.llm.ainvoke([HumanMessage(content=prompt)])

        state["messages"].append(response)
        state["analysis_steps"].append("content_extracted")
        state["current_step"] = "extract_content"
        return state

    async def analyze_sentiment(self, state: AgentState) -> AgentState:
        logger.info("Etapa: sentimento")
        content = state.get("document_content", "")

        prompt = (
            "Analise o sentimento do documento e classifique como: positivo, negativo, neutro ou misto. "
            "Inclua uma explicação curta.\n\n"
            f"Documento:\n{content[:1500]}\n\n"
            "Formato:\nSentimento: ...\nExplicação: ..."
        )

        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        texto = response.content.lower()

        if "positivo" in texto or "positive" in texto:
            state["sentiment"] = "positivo"
        elif "negativo" in texto or "negative" in texto:
            state["sentiment"] = "negativo"
        elif "misto" in texto or "mixed" in texto:
            state["sentiment"] = "misto"
        else:
            state["sentiment"] = "neutro"

        state["messages"].append(response)
        state["analysis_steps"].append("sentiment_analyzed")
        state["current_step"] = "analyze_sentiment"
        return state

    async def extract_entities(self, state: AgentState) -> AgentState:
        logger.info("Etapa: entidades")
        content = state.get("document_content", "")

        prompt = (
            "Extraia entidades do documento (pessoas, organizações, locais, datas e termos importantes). "
            "Retorne como JSON em lista de objetos com campos 'type' e 'value'.\n\n"
            f"Documento:\n{content[:1500]}"
        )

        response = await self.llm.ainvoke([HumanMessage(content=prompt)])

        try:
            txt = response.content
            inicio = txt.find("[")
            fim = txt.rfind("]") + 1
            state["entities"] = json.loads(txt[inicio:fim]) if inicio != -1 and fim > 0 else []
        except Exception as erro:
            logger.error(f"Falha ao interpretar entidades: {erro}")
            state["entities"] = []

        state["messages"].append(response)
        state["analysis_steps"].append("entities_extracted")
        state["current_step"] = "extract_entities"
        return state

    async def generate_summary(self, state: AgentState) -> AgentState:
        logger.info("Etapa: resumo")
        content = state.get("document_content", "")
        sentimento = state.get("sentiment", "neutro")

        prompt = (
            "Crie um resumo conciso do documento em 2 a 3 parágrafos. "
            f"Sentimento detectado: {sentimento}.\n\n"
            f"Documento:\n{content}"
        )

        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        state["summary"] = response.content

        state["messages"].append(response)
        state["analysis_steps"].append("summary_generated")
        state["current_step"] = "generate_summary"
        return state

    async def extract_key_points(self, state: AgentState) -> AgentState:
        logger.info("Etapa: pontos-chave")
        summary = state.get("summary", "")

        prompt = (
            "A partir do resumo, extraia de 3 a 5 pontos-chave. "
            "Retorne como JSON em lista de strings.\n\n"
            f"Resumo:\n{summary}"
        )

        response = await self.llm.ainvoke([HumanMessage(content=prompt)])

        try:
            txt = response.content
            inicio = txt.find("[")
            fim = txt.rfind("]") + 1
            state["key_points"] = json.loads(txt[inicio:fim]) if inicio != -1 and fim > 0 else []
        except Exception as erro:
            logger.error(f"Falha ao interpretar pontos-chave: {erro}")
            state["key_points"] = []

        state["messages"].append(response)
        state["analysis_steps"].append("key_points_extracted")
        state["current_step"] = "extract_key_points"
        return state

    async def validate_results(self, state: AgentState) -> AgentState:
        logger.info("Etapa: validação")

        tem_resumo = bool(state.get("summary"))
        tem_pontos = len(state.get("key_points", [])) > 0
        tem_sentimento = bool(state.get("sentiment"))

        state["needs_human_input"] = not (tem_resumo and tem_pontos and tem_sentimento)

        state["analysis_steps"].append("validation_complete")
        state["current_step"] = "validate_results"
        return state

    def should_continue(self, state: AgentState) -> str:
        if state.get("needs_human_input"):
            return "end"
        return "end"

    async def analyze_document(self, content: str, thread_id: str | None = None) -> dict:
        initial_state: AgentState = {
            "messages": [],
            "document_content": content,
            "analysis_steps": [],
            "current_step": "start",
            "summary": "",
            "key_points": [],
            "sentiment": "",
            "entities": [],
            "needs_human_input": False,
        }

        config = {"configurable": {"thread_id": thread_id or "default"}}
        final_state = await self.app.ainvoke(initial_state, config=config)

        return {
            "summary": final_state.get("summary", ""),
            "key_points": final_state.get("key_points", []),
            "sentiment": final_state.get("sentiment", ""),
            "entities": final_state.get("entities", []),
            "steps_completed": final_state.get("analysis_steps", []),
        }

document_agent = DocumentAnalysisAgent()

Tarefas do Celery: execução de análise e tratamento de falhas

Uma tarefa Celery de análise precisa marcar status, atualizar progresso e persistir resultados no final. Como o agente é assíncrono, a tarefa (sincrônica) cria um loop de evento e executa o método async. Em falhas, o status do documento vai para “failed” e a mensagem de erro é registrada. A política de retry com backoff reduz erros temporários, como instabilidade de rede ou limite de API. Também é comum uma tarefa periódica para limpeza de registros antigos ou manutenção.

# apps/core/tasks.py
import asyncio
import logging
from celery import shared_task
from django.utils import timezone

from .models import Document
from .services.langraph_agent import document_agent

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def analisar_documento_task(self, document_id: str):
    try:
        document = Document.objects.get(id=document_id)
        document.status = "processing"
        document.task_id = self.request.id
        document.progress = 10
        document.save(update_fields=["status", "task_id", "progress"])

        self.update_state(state="PROGRESS", meta={"progress": 10})

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            resultado = loop.run_until_complete(
                document_agent.analyze_document(
                    content=document.content,
                    thread_id=str(document_id),
                )
            )
        finally:
            loop.close()

        document.summary = resultado.get("summary", "")
        document.key_points = resultado.get("key_points", [])
        document.sentiment = resultado.get("sentiment", "")
        document.entities = resultado.get("entities", [])
        document.metadata = {
            "steps_completed": resultado.get("steps_completed", []),
            "analyzed_at": timezone.now().isoformat(),
        }
        document.status = "completed"
        document.progress = 100
        document.completed_at = timezone.now()
        document.save()

        return {"document_id": str(document_id), "status": "completed"}

    except Document.DoesNotExist:
        logger.error(f"Documento não encontrado: {document_id}")
        raise

    except Exception as erro:
        logger.error(f"Erro ao analisar documento {document_id}: {erro}")

        try:
            document = Document.objects.get(id=document_id)
            document.status = "failed"
            document.error_message = str(erro)
            document.save(update_fields=["status", "error_message"])
        except Exception:
            pass

        raise self.retry(exc=erro, countdown=60 * (2 ** self.request.retries))

@shared_task
def limpar_documentos_antigos():
    from datetime import timedelta

    corte = timezone.now() - timedelta(days=30)
    antigos = Document.objects.filter(created_at__lt=corte, status="completed")
    qtd = antigos.count()
    antigos.delete()

    logger.info(f"Documentos removidos: {qtd}")
    return f"Removidos {qtd} documentos"

Views assíncronas: upload, dashboard e detalhes do documento

As views assíncronas permitem que o servidor responda melhor quando há várias operações de I/O, como consultas e streaming. Em alguns casos, chamadas ao ORM ainda exigem ponte com sync_to_async para operações que não são totalmente async. O upload cria o documento no banco e dispara a tarefa Celery imediatamente. O dashboard lista documentos e calcula estatísticas, usando métodos como acount quando possível. O detalhe do documento retorna um fragmento HTML apropriado para substituição via HTMX.

# apps/core/views.py
import asyncio
import json
import time

from asgiref.sync import sync_to_async
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse, StreamingHttpResponse
from django.shortcuts import get_object_or_404, render
from django.views.decorators.http import require_http_methods

from .models import Conversation, Document
from .tasks import analisar_documento_task

@login_required
async def dashboard_view(request):
    documentos = await sync_to_async(list)(
        Document.objects.filter(user=request.user).select_related("user")[:10]
    )

    total_docs = await Document.objects.filter(user=request.user).acount()
    concluidos = await Document.objects.filter(user=request.user, status="completed").acount()

    return render(
        request,
        "dashboard.html",
        {"documents": documentos, "total_docs": total_docs, "completed_docs": concluidos},
    )

@login_required
@require_http_methods(["POST"])
async def upload_document_view(request):
    try:
        title = request.POST.get("title")
        arquivo = request.FILES.get("file")
        if not arquivo:
            return JsonResponse({"error": "Arquivo não enviado"}, status=400)

        conteudo_bytes = await sync_to_async(arquivo.read)()
        conteudo_texto = conteudo_bytes.decode("utf-8", errors="replace")

        document = await sync_to_async(Document.objects.create)(
            user=request.user,
            title=title or arquivo.name,
            file=arquivo,
            content=conteudo_texto,
            status="pending",
        )

        tarefa = analisar_documento_task.delay(str(document.id))
        document.task_id = tarefa.id
        await sync_to_async(document.save)(update_fields=["task_id"])

        return render(request, "components/document_card.html", {"document": document})
    except Exception as erro:
        return JsonResponse({"error": str(erro)}, status=500)

@login_required
async def document_detail_view(request, document_id):
    document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)
    return render(request, "components/analysis_result.html", {"document": document})

@login_required
async def document_status_sse(request, document_id):
    async def event_stream():
        document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)

        while True:
            await sync_to_async(document.refresh_from_db)()
            payload = {
                "status": document.status,
                "progress": document.progress,
                "summary": (document.summary[:200] if document.summary else ""),
            }
            yield f"data: {json.dumps(payload)}\n\n"

            if document.status in ["completed", "failed"]:
                break

            await asyncio.sleep(2)

    resp = StreamingHttpResponse(event_stream(), content_type="text/event-stream")
    resp["Cache-Control"] = "no-cache"
    resp["X-Accel-Buffering"] = "no"
    return resp

@login_required
@require_http_methods(["POST"])
async def chat_with_document(request, document_id):
    try:
        document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)
        mensagem = request.POST.get("message")
        if not mensagem:
            return JsonResponse({"error": "Mensagem não enviada"}, status=400)

        conversation, _ = await sync_to_async(Conversation.objects.get_or_create)(
            user=request.user,
            document=document,
            defaults={"thread_id": f"doc-{document_id}-{int(time.time())}"},
        )

        from .services.langraph_agent import document_agent

        prompt = (
            "Responda com base no conteúdo analisado do documento.\n\n"
            f"Resumo do documento:\n{document.summary}\n\n"
            f"Pergunta:\n{mensagem}"
        )

        resultado = await document_agent.analyze_document(content=prompt, thread_id=conversation.thread_id)

        conversation.messages.append({"role": "user", "content": mensagem, "timestamp": time.time()})
        conversation.messages.append({"role": "assistant", "content": resultado.get("summary", ""), "timestamp": time.time()})
        await sync_to_async(conversation.save)()

        return JsonResponse({"response": resultado.get("summary", ""), "conversation_id": str(conversation.id)})
    except Exception as erro:
        return JsonResponse({"error": str(erro)}, status=500)

SSE e WebSocket: quando usar cada um e como conviverem

SSE (Server-Sent Events) é ideal para atualizações unidirecionais do servidor para o navegador, como progresso de uma tarefa. Ele é simples, usa HTTP normal e funciona bem com HTMX via extensão SSE. WebSocket é bidirecional, sendo melhor quando o cliente precisa enviar eventos frequentes ou quando há interações complexas em tempo real. Ambos podem coexistir: SSE para status e WebSocket para recursos avançados. Em muitos sistemas, SSE resolve grande parte do “tempo real” com menos complexidade.

# apps/api/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from apps.core.models import Document

class DocumentStatusConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.document_id = self.scope["url_route"]["kwargs"]["document_id"]
        self.room_group_name = f"document_{self.document_id}"

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

        document = await self.get_document()
        await self.send(text_data=json.dumps({
            "type": "status_update",
            "status": document.status,
            "progress": document.progress,
        }))

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    async def receive(self, text_data):
        data = json.loads(text_data)
        if data.get("type") == "request_status":
            document = await self.get_document()
            await self.send(text_data=json.dumps({
                "type": "status_update",
                "status": document.status,
                "progress": document.progress,
                "summary": (document.summary[:200] if document.summary else ""),
            }))

    async def status_update(self, event):
        await self.send(text_data=json.dumps({
            "type": "status_update",
            "status": event["status"],
            "progress": event["progress"],
            "message": event.get("message", ""),
        }))

    @database_sync_to_async
    def get_document(self):
        return Document.objects.get(id=self.document_id)
# apps/api/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r"ws/document/(?P<document_id>[^/]+)/$", consumers.DocumentStatusConsumer.as_asgi()),
]

HTMX: interatividade com HTML retornado pelo servidor

O HTMX envia requisições por atributos HTML, como hx-post e hx-get, e injeta a resposta diretamente no DOM. Isso reduz necessidade de JavaScript customizado e mantém o servidor como fonte de verdade da renderização. Componentes em templates se tornam “blocos” reusáveis, como um card de documento ou um painel de resultado. Para tempo real, a extensão SSE do HTMX pode escutar um endpoint e atualizar o componente automaticamente. Essa abordagem mantém simplicidade e torna a UI previsível.

Os principais comportamentos usados nos templates abaixo são os seguintes.

  • hx-post envia formulário por AJAX e substitui um alvo definido.
  • hx-target define onde a resposta HTML será aplicada.
  • hx-swap define como a substituição ocorre (antes, depois, etc.).
  • hx-ext="sse" ativa a extensão de SSE para atualizar o componente em tempo real.
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="pt-br">
<head>
  <meta charset="UTF-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Django AI Stack</title>

  <!-- HTMX -->
  <script src="https://unpkg.com/htmx.org@1.9.10" defer></script>
  <script src="https://unpkg.com/htmx.org/dist/ext/sse.js" defer></script>
</head>
<body>
  {% block content %}{% endblock %}
</body>
</html>

Templates de dashboard e componentes: cards e resultados

O dashboard precisa mostrar estatísticas e uma lista recente, além de um formulário de upload que retorna um card pronto. O componente de card é responsável por exibir status e iniciar a assinatura SSE quando o documento está em processamento. Quando finaliza, o card pode mostrar resumo e um botão que carrega detalhes por HTMX. O componente de resultado mostra resumo, pontos-chave e entidades extraídas. O chat pode existir como endpoint separado, retornando JSON, ou como fragmentos HTML para inserir no histórico.

<!-- templates/dashboard.html -->
{% extends 'base.html' %}
{% block content %}
  <div>
    <form
      hx-post="{% url 'upload_document' %}"
      hx-target="#document-list"
      hx-swap="afterbegin"
      hx-encoding="multipart/form-data"
    >
      {% csrf_token %}
      <input type="text" name="title" placeholder="Título" />
      <input type="file" name="file" required />
      <button type="submit">Enviar</button>
    </form>

    <div id="document-list">
      {% for document in documents %}
        {% include 'components/document_card.html' %}
      {% endfor %}
    </div>
  </div>
{% endblock %}
<!-- templates/components/document_card.html -->
<div
  hx-ext="sse"
  sse-connect="{% url 'document_status_sse' document.id %}"
  hx-target="this"
  hx-swap="outerHTML"
>
  <h4>{{ document.title }}</h4>
  <p>Status: {{ document.status }} ({{ document.progress }}%)</p>

  {% if document.status == 'completed' %}
    <p><strong>Sentimento</strong>: {{ document.sentiment }}</p>
    <p>{{ document.summary }}</p>
    <button
      hx-get="{% url 'document_detail' document.id %}"
      hx-target="#detalhe"
      hx-swap="innerHTML"
    >Abrir detalhes</button>
  {% endif %}
</div>
<!-- templates/components/analysis_result.html -->
<div>
  <h4>{{ document.title }}</h4>

  <p><strong>Resumo</strong>: {{ document.summary }}</p>

  <p>Os pontos-chave abaixo resumem ideias centrais do texto.</p>
  <ul>
    {% for point in document.key_points %}
      <li>{{ point }}</li>
    {% endfor %}
  </ul>

  {% if document.entities %}
    <p>As entidades a seguir representam nomes e termos relevantes extraídos.</p>
    <ul>
      {% for entity in document.entities %}
        <li>{{ entity.value }} ({{ entity.type }})</li>
      {% endfor %}
    </ul>
  {% endif %}
</div>

Rotas: organização de URLs no Django

As rotas organizam o acesso ao dashboard, upload, SSE, detalhes e chat. Separar apps.core e apps.api deixa claro o que é domínio e o que é comunicação. Em Django, as rotas do projeto agregam rotas de apps por include. O endpoint SSE é importante por ser longo (streaming) e precisa de headers adequados. O endpoint de chat é POST para preservar semântica de envio de mensagem.

# config/urls.py
from django.contrib import admin
from django.urls import path, include
from django.contrib.auth import views as auth_views

urlpatterns = [
    path("admin/", admin.site.urls),
    path("", include("apps.core.urls")),
    path("api/", include("apps.api.urls")),
    path("login/", auth_views.LoginView.as_view(), name="login"),
    path("logout/", auth_views.LogoutView.as_view(), name="logout"),
]
# apps/core/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path("", views.dashboard_view, name="dashboard"),
    path("upload/", views.upload_document_view, name="upload_document"),
    path("document/<uuid:document_id>/status/", views.document_status_sse, name="document_status_sse"),
    path("document/<uuid:document_id>/", views.document_detail_view, name="document_detail"),
    path("document/<uuid:document_id>/chat/", views.chat_with_document, name="chat_with_document"),
]

Execução local: banco, redis, migrações e serviços

O ambiente local precisa do PostgreSQL e do Redis disponíveis, além do worker Celery e do servidor ASGI. Migrações criam as tabelas e o superusuário permite acessar o admin para inspeção de dados. O servidor ASGI pode ser o daphne ou o runserver com suporte a ASGI. Em paralelo, um worker Celery processa as tarefas de análise. Em sistemas desse tipo, rodar múltiplos processos é parte do desenho e não um detalhe.

# redis via docker
docker run -d -p 6379:6379 redis:latest

# migrações e usuário admin
python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser

# celery worker
celery -A config worker --loglevel=info

# celery beat (tarefas periódicas)
celery -A config beat --loglevel=info

# servidor ASGI
daphne -b 0.0.0.0 -p 8000 config.asgi:application

Streaming de resposta de IA: entrega gradual de tokens

Streaming é útil quando a resposta pode demorar e é melhor mostrar progresso incremental. No mundo web, isso pode ser feito com SSE enviando tokens conforme o modelo gera. A configuração do provedor de LLM precisa habilitar streaming e fornecer um iterador de tokens. O endpoint retorna um StreamingHttpResponse com content-type adequado. Esse padrão reduz ansiedade de espera e dá sensação de resposta imediata, principalmente em respostas longas.

# apps/core/views_stream.py (exemplo separado por clareza)
import json
from django.http import StreamingHttpResponse
from django.contrib.auth.decorators import login_required

@login_required
async def stream_ai_response(request):
    async def token_stream():
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-4-turbo-preview", streaming=True, temperature=0)

        async for token in llm.astream("Gere um texto curto e informativo sobre análise de documentos."):
            yield f"data: {json.dumps({'token': token})}\n\n"

    return StreamingHttpResponse(token_stream(), content_type="text/event-stream")

Otimizações comuns: consultas, índices, cache e carga do banco

Em aplicações de documentos, a listagem e o carregamento de detalhes podem virar gargalo sem otimização. select_related reduz queries ao trazer relacionamentos de chave estrangeira de uma vez. prefetch_related ajuda em relacionamentos muitos-para-muitos ou reverse. Índices nos campos de filtro frequente diminuem custo de busca, principalmente por usuário e status. Cache em Redis para contadores e painéis reduz pressão no PostgreSQL em páginas acessadas com frequência.

# exemplo de consulta otimizada em contexto async
from asgiref.sync import sync_to_async
from apps.core.models import Document

async def listar_concluidos():
    documentos = await sync_to_async(list)(
        Document.objects
        .select_related("user")
        .prefetch_related("conversations")
        .filter(status="completed")[:50]
    )
    return documentos

Docker Compose: ambiente reproduzível com PostgreSQL, Redis, web e Celery

Um docker-compose organiza serviços e facilita execução idêntica em máquinas diferentes. O serviço do banco precisa de volume para persistência. O Redis pode ser leve e sem volume em desenvolvimento, dependendo do caso. O serviço web roda o ASGI e depende de banco e Redis. O serviço celery roda o worker e também depende dos mesmos serviços. Variáveis de ambiente passam senhas e URLs sem hardcode no repositório.

version: "3.8"

services:
  db:
    image: postgres:15
    environment:
      POSTGRES_DB: django_ai_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  web:
    build: .
    command: daphne -b 0.0.0.0 -p 8000 config.asgi:application
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
    environment:
      - DB_NAME=django_ai_db
      - DB_USER=postgres
      - DB_PASSWORD=${DB_PASSWORD}
      - DB_HOST=db
      - DB_PORT=5432
      - REDIS_HOST=redis
      - REDIS_PORT=6379

  celery:
    build: .
    command: celery -A config worker --loglevel=info
    volumes:
      - .:/app
    depends_on:
      - db
      - redis
    environment:
      - DB_NAME=django_ai_db
      - DB_USER=postgres
      - DB_PASSWORD=${DB_PASSWORD}
      - DB_HOST=db
      - DB_PORT=5432
      - REDIS_HOST=redis
      - REDIS_PORT=6379

volumes:
  postgres_data:

Cenários importantes: falhas, concorrência, tamanho de arquivo e consistência

Falhas de API de modelo, arquivos inválidos e timeouts são esperados e precisam de tratamento. O status “failed” com mensagem de erro evita que a interface fique “presa” em processamento. Arquivos grandes exigem limites e estratégias de extração; em muitos casos, a extração vira uma etapa própria antes do LangGraph. Concorrência aparece quando múltiplas análises rodam ao mesmo tempo, exigindo workers Celery suficientes e limites por usuário. Consistência é mantida ao persistir resultados no banco e usar IDs de thread para memória do agente.

Como esse stack evolui: do “como era” ao “como fica”

Em abordagens antigas, era comum manter tudo síncrono, rodar análise no request e atualizar a UI com polling agressivo. Isso gerava travamentos, filas invisíveis e escalabilidade limitada. Com esse stack, a análise migra para tarefas, o backend usa async para reduzir bloqueios e a UI se atualiza com SSE ou WebSocket. A IA deixa de ser uma chamada isolada e vira um fluxo com etapas claras e validação. O produto fica mais previsível, observável e pronto para crescer sem reescrever a base.

Conclusão

A pilha com Django assíncrono, LangGraph e HTMX forma um caminho consistente para aplicações inteligentes full-stack em 2026. Ela combina desempenho em I/O, workflows de IA com estado e uma interface dinâmica baseada em HTML, sem complexidade desnecessária no navegador. Com PostgreSQL, Redis e Celery, o sistema ganha persistência, processamento em segundo plano e mecanismos de tempo real confiáveis. O resultado é uma arquitetura que mantém simplicidade de desenvolvimento, mas entrega recursos modernos como streaming, progresso ao vivo e memória conversacional.

Links do stack

Os links a seguir representam as tecnologias citadas e ajudam a identificar os componentes do stack.