Em 2026, aplicações com inteligência artificial deixaram de ser “um chatbot colado na interface” e passaram a funcionar como sistemas completos, capazes de interpretar dados, manter estado e responder em tempo real. Nesse cenário, uma pilha moderna combina backend assíncrono, orquestração de tarefas, armazenamento confiável e uma camada de IA com fluxo controlado para entregar respostas rápidas e consistentes.
Uma arquitetura forte para esse tipo de produto une ORM assíncrona no Django, uma camada de agentes com LangGraph (fluxos com estado e ciclos), interfaces com HTMX (interatividade dirigida pelo servidor) e infraestrutura com PostgreSQL, Redis e Celery. O resultado é um aplicativo full-stack com experiência “viva” sem depender de frameworks pesados no navegador.
Visão geral do stack e por que ele funciona bem junto
Esse stack funciona porque cada parte resolve um tipo de gargalo comum em aplicações inteligentes. O Django com suporte assíncrono reduz bloqueios em operações de I/O, como acesso ao banco e streaming de resposta. O LangGraph organiza a IA em etapas, com memória e possibilidade de reprocessamento controlado. O HTMX atualiza trechos da página sem um SPA completo, mantendo HTML como unidade de composição. PostgreSQL sustenta dados persistentes e Redis dá suporte a cache, fila e canal em tempo real.
O que está sendo construído: análise inteligente de documentos ponta a ponta
Um sistema de análise de documentos representa bem os desafios reais: upload, extração de texto, processamento multi-etapas, progresso em tempo real e resultados consultáveis. A análise é executada em segundo plano para não travar requisições web. A interface precisa refletir estados como “pendente”, “processando”, “concluído” e “falhou” rapidamente. Também existe um fluxo conversacional ligado ao documento, preservando contexto e histórico. Esse conjunto cobre a maior parte dos cenários de aplicações inteligentes modernas.
Arquitetura lógica em camadas
A arquitetura pode ser entendida como uma sequência de camadas, do HTML dinâmico até a infraestrutura. No topo, o HTMX dispara requisições e substitui fragmentos de HTML no DOM, criando sensação de aplicação reativa. No meio, o Django entrega views assíncronas e endpoints para streaming, além de Channels quando WebSocket for necessário. A camada de IA é um grafo de execução com LangGraph, mantendo estado e permitindo ciclos de validação. Na base, PostgreSQL guarda documentos e resultados, Redis apoia cache e mensageria e o Celery roda tarefas longas fora do request.
Preparação do projeto e dependências principais
O projeto precisa de dependências de web, infraestrutura e IA, todas instaladas em um ambiente isolado. Um ambiente virtual evita conflitos de versões e facilita replicação em produção. O Django opera o servidor e o ORM, enquanto psycopg conecta ao PostgreSQL. O Redis dá suporte ao broker do Celery, cache e, com Channels, ao layer de comunicação. A IA é conduzida por LangGraph e LangChain, com provedores como OpenAI ou Anthropic para o modelo.
# criar e ativar ambiente virtual
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# dependências web e infraestrutura
pip install Django==5.0.1
pip install psycopg[binary]==3.1.16
pip install redis==5.0.1
pip install celery[redis]==5.3.6
pip install django-celery-results==2.5.1
pip install channels[daphne]==4.0.0
pip install channels-redis==4.1.0
# IA
pip install langgraph==0.0.20
pip install langchain==0.1.0
pip install langchain-openai==0.0.5
pip install langchain-anthropic==0.1.1
pip install openai==1.10.0
# utilitários
pip install python-dotenv==1.0.0
pip install httpx==0.26.0
Estrutura de diretórios: separação clara de responsabilidades
Uma estrutura organizada reduz acoplamento e melhora manutenção. A pasta config concentra settings, ASGI e Celery. A pasta apps separa o domínio principal de documentos e a camada de comunicação em tempo real. A pasta templates guarda HTML e componentes reutilizáveis, ideais para HTMX. A pasta static contém arquivos estáticos locais quando necessário. Essa organização facilita crescimento do sistema com novas features sem virar um monólito confuso.
Configuração do Django para assíncrono, banco e filas
A configuração assíncrona depende do ASGI, que é a interface de servidor para aplicações Python assíncronas. O banco PostgreSQL é configurado no Django, mas a ORM assíncrona aparece principalmente nos métodos async como acount e aiterator. Para cache e Channels, o Redis é configurado em CACHES e em CHANNEL_LAYERS. O Celery usa Redis como broker e grava resultados via django-celery-results. Variáveis em arquivo .env centralizam segredos e URLs.
# config/settings.py
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = os.getenv("SECRET_KEY", "trocar-esta-chave-em-producao")
DEBUG = os.getenv("DEBUG", "False") == "True"
ALLOWED_HOSTS = ["localhost", "127.0.0.1"]
INSTALLED_APPS = [
"daphne", # importante para suporte ASGI
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"channels",
"django_celery_results",
"apps.core",
"apps.api",
]
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = "config.urls"
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": os.getenv("DB_NAME", "django_ai_db"),
"USER": os.getenv("DB_USER", "postgres"),
"PASSWORD": os.getenv("DB_PASSWORD", ""),
"HOST": os.getenv("DB_HOST", "localhost"),
"PORT": os.getenv("DB_PORT", "5432"),
"ATOMIC_REQUESTS": False, # evita transações automáticas que atrapalham async
"CONN_MAX_AGE": 600,
}
}
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": f"redis://{REDIS_HOST}:{REDIS_PORT}/0",
}
}
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [(REDIS_HOST, int(REDIS_PORT))], "capacity": 1500, "expiry": 10},
}
}
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/1"
CELERY_RESULT_BACKEND = "django-db"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ASGI_APPLICATION = "config.asgi.application"
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [BASE_DIR / "templates"],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
}
]
STATIC_URL = "/static/"
STATICFILES_DIRS = [BASE_DIR / "static"]
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
Celery no Django: execução fora do request e rastreio de resultados
O Celery é um sistema de filas de tarefas para processamentos demorados, como análise de documentos por IA. O Django envia uma tarefa para o broker (Redis) e devolve resposta rápida ao navegador. Um worker Celery executa a tarefa em segundo plano e grava resultados no banco. O pacote django-celery-results permite rastrear estado e persistir resultados. Essa separação evita timeouts e permite escalar processamento de forma independente do servidor web.
# config/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("django_ai_stack")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def tarefa_debug(self):
print(f"Requisição: {self.request!r}")
# config/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
ASGI e Channels: suporte a SSE e WebSocket
O ASGI substitui o antigo WSGI quando se precisa de async, streaming e WebSocket. No Django, o arquivo asgi.py cria a aplicação principal e adiciona roteamento para WebSocket via Channels. O ProtocolTypeRouter direciona tráfego HTTP comum e conexões websocket. O AuthMiddlewareStack permite que a sessão e o usuário autenticado existam também no WebSocket. Essa configuração habilita recursos em tempo real sem trocar o framework principal.
# config/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
django_asgi_app = get_asgi_application()
from apps.api import routing
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
}
)
Modelos de dados: documentos, conversas e rastreio de execução
Os modelos persistem o estado que torna o sistema “inteligente” ao longo do tempo. O modelo Document guarda o arquivo, o texto extraído e os resultados (resumo, entidades, pontos-chave e sentimento). O modelo Conversation mantém um thread_id e mensagens para preservar contexto de interações. O modelo AgentExecution registra etapas e tempos para auditoria e depuração. Índices no banco reduzem custo de listagens e consultas frequentes por usuário, status e data.
# apps/core/models.py
import uuid
from django.db import models
from django.contrib.auth.models import User
class Document(models.Model):
STATUS_CHOICES = [
("pending", "Pendente"),
("processing", "Processando"),
("completed", "Concluído"),
("failed", "Falhou"),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="documents")
title = models.CharField(max_length=255)
file = models.FileField(upload_to="documents/%Y/%m/%d/")
content = models.TextField(blank=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default="pending")
summary = models.TextField(blank=True)
key_points = models.JSONField(default=list, blank=True)
sentiment = models.CharField(max_length=50, blank=True)
entities = models.JSONField(default=list, blank=True)
metadata = models.JSONField(default=dict, blank=True)
task_id = models.CharField(max_length=255, blank=True)
progress = models.IntegerField(default=0)
error_message = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = "documents"
ordering = ["-created_at"]
indexes = [
models.Index(fields=["user", "status"]),
models.Index(fields=["created_at"]),
]
def __str__(self):
return f"{self.title} - {self.status}"
class Conversation(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="conversations")
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="conversations", null=True, blank=True)
thread_id = models.CharField(max_length=255, unique=True, db_index=True)
state = models.JSONField(default=dict)
messages = models.JSONField(default=list)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = "conversations"
ordering = ["-updated_at"]
def __str__(self):
return f"Conversa {self.thread_id}"
class AgentExecution(models.Model):
conversation = models.ForeignKey(Conversation, on_delete=models.CASCADE, related_name="executions")
step_name = models.CharField(max_length=100)
step_type = models.CharField(max_length=50)
input_data = models.JSONField(default=dict)
output_data = models.JSONField(default=dict)
duration_ms = models.IntegerField(default=0)
success = models.BooleanField(default=True)
error = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "agent_executions"
ordering = ["created_at"]
indexes = [models.Index(fields=["conversation", "created_at"])]
LangGraph: agente com estado, etapas e validação
O LangGraph organiza a IA como um grafo de nós, onde cada nó é uma etapa do processamento. Diferente de uma cadeia linear, o grafo permite ciclos e decisões condicionais, o que ajuda em validação e reprocessamento. O estado é modelado como um dicionário tipado, mantendo mensagens, resultados parciais e flags de controle. O MemorySaver guarda checkpoints de estado por thread_id, permitindo retomada e consistência. Esse padrão dá previsibilidade para aplicações que exigem auditoria e repetibilidade.
# apps/core/services/langraph_agent.py
from typing import TypedDict, Annotated, Sequence
import operator
import json
import logging
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
logger = logging.getLogger(__name__)
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
document_content: str
analysis_steps: list
current_step: str
summary: str
key_points: list
sentiment: str
entities: list
needs_human_input: bool
class DocumentAnalysisAgent:
def __init__(self, model_name: str = "gpt-4-turbo-preview"):
self.llm = ChatOpenAI(model=model_name, temperature=0)
self.memory = MemorySaver()
self.app = self._build_graph()
def _build_graph(self):
workflow = StateGraph(AgentState)
workflow.add_node("extract_content", self.extract_content)
workflow.add_node("analyze_sentiment", self.analyze_sentiment)
workflow.add_node("extract_entities", self.extract_entities)
workflow.add_node("generate_summary", self.generate_summary)
workflow.add_node("extract_key_points", self.extract_key_points)
workflow.add_node("validate_results", self.validate_results)
workflow.set_entry_point("extract_content")
workflow.add_edge("extract_content", "analyze_sentiment")
workflow.add_edge("analyze_sentiment", "extract_entities")
workflow.add_edge("extract_entities", "generate_summary")
workflow.add_edge("generate_summary", "extract_key_points")
workflow.add_edge("extract_key_points", "validate_results")
workflow.add_conditional_edges(
"validate_results",
self.should_continue,
{"continue": "analyze_sentiment", "end": END},
)
return workflow.compile(checkpointer=self.memory)
async def extract_content(self, state: AgentState) -> AgentState:
logger.info("Etapa: extração e limpeza")
content = state.get("document_content", "")
prompt = (
"Extraia e limpe o conteúdo a seguir. Remova artefatos de formatação, "
"corrija erros óbvios e reorganize em texto claro.\n\n"
f"Documento:\n{content[:2000]}\n\n"
"Retorne apenas o conteúdo limpo e estruturado."
)
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
state["messages"].append(response)
state["analysis_steps"].append("content_extracted")
state["current_step"] = "extract_content"
return state
async def analyze_sentiment(self, state: AgentState) -> AgentState:
logger.info("Etapa: sentimento")
content = state.get("document_content", "")
prompt = (
"Analise o sentimento do documento e classifique como: positivo, negativo, neutro ou misto. "
"Inclua uma explicação curta.\n\n"
f"Documento:\n{content[:1500]}\n\n"
"Formato:\nSentimento: ...\nExplicação: ..."
)
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
texto = response.content.lower()
if "positivo" in texto or "positive" in texto:
state["sentiment"] = "positivo"
elif "negativo" in texto or "negative" in texto:
state["sentiment"] = "negativo"
elif "misto" in texto or "mixed" in texto:
state["sentiment"] = "misto"
else:
state["sentiment"] = "neutro"
state["messages"].append(response)
state["analysis_steps"].append("sentiment_analyzed")
state["current_step"] = "analyze_sentiment"
return state
async def extract_entities(self, state: AgentState) -> AgentState:
logger.info("Etapa: entidades")
content = state.get("document_content", "")
prompt = (
"Extraia entidades do documento (pessoas, organizações, locais, datas e termos importantes). "
"Retorne como JSON em lista de objetos com campos 'type' e 'value'.\n\n"
f"Documento:\n{content[:1500]}"
)
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
try:
txt = response.content
inicio = txt.find("[")
fim = txt.rfind("]") + 1
state["entities"] = json.loads(txt[inicio:fim]) if inicio != -1 and fim > 0 else []
except Exception as erro:
logger.error(f"Falha ao interpretar entidades: {erro}")
state["entities"] = []
state["messages"].append(response)
state["analysis_steps"].append("entities_extracted")
state["current_step"] = "extract_entities"
return state
async def generate_summary(self, state: AgentState) -> AgentState:
logger.info("Etapa: resumo")
content = state.get("document_content", "")
sentimento = state.get("sentiment", "neutro")
prompt = (
"Crie um resumo conciso do documento em 2 a 3 parágrafos. "
f"Sentimento detectado: {sentimento}.\n\n"
f"Documento:\n{content}"
)
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
state["summary"] = response.content
state["messages"].append(response)
state["analysis_steps"].append("summary_generated")
state["current_step"] = "generate_summary"
return state
async def extract_key_points(self, state: AgentState) -> AgentState:
logger.info("Etapa: pontos-chave")
summary = state.get("summary", "")
prompt = (
"A partir do resumo, extraia de 3 a 5 pontos-chave. "
"Retorne como JSON em lista de strings.\n\n"
f"Resumo:\n{summary}"
)
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
try:
txt = response.content
inicio = txt.find("[")
fim = txt.rfind("]") + 1
state["key_points"] = json.loads(txt[inicio:fim]) if inicio != -1 and fim > 0 else []
except Exception as erro:
logger.error(f"Falha ao interpretar pontos-chave: {erro}")
state["key_points"] = []
state["messages"].append(response)
state["analysis_steps"].append("key_points_extracted")
state["current_step"] = "extract_key_points"
return state
async def validate_results(self, state: AgentState) -> AgentState:
logger.info("Etapa: validação")
tem_resumo = bool(state.get("summary"))
tem_pontos = len(state.get("key_points", [])) > 0
tem_sentimento = bool(state.get("sentiment"))
state["needs_human_input"] = not (tem_resumo and tem_pontos and tem_sentimento)
state["analysis_steps"].append("validation_complete")
state["current_step"] = "validate_results"
return state
def should_continue(self, state: AgentState) -> str:
if state.get("needs_human_input"):
return "end"
return "end"
async def analyze_document(self, content: str, thread_id: str | None = None) -> dict:
initial_state: AgentState = {
"messages": [],
"document_content": content,
"analysis_steps": [],
"current_step": "start",
"summary": "",
"key_points": [],
"sentiment": "",
"entities": [],
"needs_human_input": False,
}
config = {"configurable": {"thread_id": thread_id or "default"}}
final_state = await self.app.ainvoke(initial_state, config=config)
return {
"summary": final_state.get("summary", ""),
"key_points": final_state.get("key_points", []),
"sentiment": final_state.get("sentiment", ""),
"entities": final_state.get("entities", []),
"steps_completed": final_state.get("analysis_steps", []),
}
document_agent = DocumentAnalysisAgent()
Tarefas do Celery: execução de análise e tratamento de falhas
Uma tarefa Celery de análise precisa marcar status, atualizar progresso e persistir resultados no final. Como o agente é assíncrono, a tarefa (sincrônica) cria um loop de evento e executa o método async. Em falhas, o status do documento vai para “failed” e a mensagem de erro é registrada. A política de retry com backoff reduz erros temporários, como instabilidade de rede ou limite de API. Também é comum uma tarefa periódica para limpeza de registros antigos ou manutenção.
# apps/core/tasks.py
import asyncio
import logging
from celery import shared_task
from django.utils import timezone
from .models import Document
from .services.langraph_agent import document_agent
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def analisar_documento_task(self, document_id: str):
try:
document = Document.objects.get(id=document_id)
document.status = "processing"
document.task_id = self.request.id
document.progress = 10
document.save(update_fields=["status", "task_id", "progress"])
self.update_state(state="PROGRESS", meta={"progress": 10})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
resultado = loop.run_until_complete(
document_agent.analyze_document(
content=document.content,
thread_id=str(document_id),
)
)
finally:
loop.close()
document.summary = resultado.get("summary", "")
document.key_points = resultado.get("key_points", [])
document.sentiment = resultado.get("sentiment", "")
document.entities = resultado.get("entities", [])
document.metadata = {
"steps_completed": resultado.get("steps_completed", []),
"analyzed_at": timezone.now().isoformat(),
}
document.status = "completed"
document.progress = 100
document.completed_at = timezone.now()
document.save()
return {"document_id": str(document_id), "status": "completed"}
except Document.DoesNotExist:
logger.error(f"Documento não encontrado: {document_id}")
raise
except Exception as erro:
logger.error(f"Erro ao analisar documento {document_id}: {erro}")
try:
document = Document.objects.get(id=document_id)
document.status = "failed"
document.error_message = str(erro)
document.save(update_fields=["status", "error_message"])
except Exception:
pass
raise self.retry(exc=erro, countdown=60 * (2 ** self.request.retries))
@shared_task
def limpar_documentos_antigos():
from datetime import timedelta
corte = timezone.now() - timedelta(days=30)
antigos = Document.objects.filter(created_at__lt=corte, status="completed")
qtd = antigos.count()
antigos.delete()
logger.info(f"Documentos removidos: {qtd}")
return f"Removidos {qtd} documentos"
Views assíncronas: upload, dashboard e detalhes do documento
As views assíncronas permitem que o servidor responda melhor quando há várias operações de I/O, como consultas e streaming. Em alguns casos, chamadas ao ORM ainda exigem ponte com sync_to_async para operações que não são totalmente async. O upload cria o documento no banco e dispara a tarefa Celery imediatamente. O dashboard lista documentos e calcula estatísticas, usando métodos como acount quando possível. O detalhe do documento retorna um fragmento HTML apropriado para substituição via HTMX.
# apps/core/views.py
import asyncio
import json
import time
from asgiref.sync import sync_to_async
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse, StreamingHttpResponse
from django.shortcuts import get_object_or_404, render
from django.views.decorators.http import require_http_methods
from .models import Conversation, Document
from .tasks import analisar_documento_task
@login_required
async def dashboard_view(request):
documentos = await sync_to_async(list)(
Document.objects.filter(user=request.user).select_related("user")[:10]
)
total_docs = await Document.objects.filter(user=request.user).acount()
concluidos = await Document.objects.filter(user=request.user, status="completed").acount()
return render(
request,
"dashboard.html",
{"documents": documentos, "total_docs": total_docs, "completed_docs": concluidos},
)
@login_required
@require_http_methods(["POST"])
async def upload_document_view(request):
try:
title = request.POST.get("title")
arquivo = request.FILES.get("file")
if not arquivo:
return JsonResponse({"error": "Arquivo não enviado"}, status=400)
conteudo_bytes = await sync_to_async(arquivo.read)()
conteudo_texto = conteudo_bytes.decode("utf-8", errors="replace")
document = await sync_to_async(Document.objects.create)(
user=request.user,
title=title or arquivo.name,
file=arquivo,
content=conteudo_texto,
status="pending",
)
tarefa = analisar_documento_task.delay(str(document.id))
document.task_id = tarefa.id
await sync_to_async(document.save)(update_fields=["task_id"])
return render(request, "components/document_card.html", {"document": document})
except Exception as erro:
return JsonResponse({"error": str(erro)}, status=500)
@login_required
async def document_detail_view(request, document_id):
document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)
return render(request, "components/analysis_result.html", {"document": document})
@login_required
async def document_status_sse(request, document_id):
async def event_stream():
document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)
while True:
await sync_to_async(document.refresh_from_db)()
payload = {
"status": document.status,
"progress": document.progress,
"summary": (document.summary[:200] if document.summary else ""),
}
yield f"data: {json.dumps(payload)}\n\n"
if document.status in ["completed", "failed"]:
break
await asyncio.sleep(2)
resp = StreamingHttpResponse(event_stream(), content_type="text/event-stream")
resp["Cache-Control"] = "no-cache"
resp["X-Accel-Buffering"] = "no"
return resp
@login_required
@require_http_methods(["POST"])
async def chat_with_document(request, document_id):
try:
document = await sync_to_async(get_object_or_404)(Document, id=document_id, user=request.user)
mensagem = request.POST.get("message")
if not mensagem:
return JsonResponse({"error": "Mensagem não enviada"}, status=400)
conversation, _ = await sync_to_async(Conversation.objects.get_or_create)(
user=request.user,
document=document,
defaults={"thread_id": f"doc-{document_id}-{int(time.time())}"},
)
from .services.langraph_agent import document_agent
prompt = (
"Responda com base no conteúdo analisado do documento.\n\n"
f"Resumo do documento:\n{document.summary}\n\n"
f"Pergunta:\n{mensagem}"
)
resultado = await document_agent.analyze_document(content=prompt, thread_id=conversation.thread_id)
conversation.messages.append({"role": "user", "content": mensagem, "timestamp": time.time()})
conversation.messages.append({"role": "assistant", "content": resultado.get("summary", ""), "timestamp": time.time()})
await sync_to_async(conversation.save)()
return JsonResponse({"response": resultado.get("summary", ""), "conversation_id": str(conversation.id)})
except Exception as erro:
return JsonResponse({"error": str(erro)}, status=500)
SSE e WebSocket: quando usar cada um e como conviverem
SSE (Server-Sent Events) é ideal para atualizações unidirecionais do servidor para o navegador, como progresso de uma tarefa. Ele é simples, usa HTTP normal e funciona bem com HTMX via extensão SSE. WebSocket é bidirecional, sendo melhor quando o cliente precisa enviar eventos frequentes ou quando há interações complexas em tempo real. Ambos podem coexistir: SSE para status e WebSocket para recursos avançados. Em muitos sistemas, SSE resolve grande parte do “tempo real” com menos complexidade.
# apps/api/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from apps.core.models import Document
class DocumentStatusConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.document_id = self.scope["url_route"]["kwargs"]["document_id"]
self.room_group_name = f"document_{self.document_id}"
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
document = await self.get_document()
await self.send(text_data=json.dumps({
"type": "status_update",
"status": document.status,
"progress": document.progress,
}))
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def receive(self, text_data):
data = json.loads(text_data)
if data.get("type") == "request_status":
document = await self.get_document()
await self.send(text_data=json.dumps({
"type": "status_update",
"status": document.status,
"progress": document.progress,
"summary": (document.summary[:200] if document.summary else ""),
}))
async def status_update(self, event):
await self.send(text_data=json.dumps({
"type": "status_update",
"status": event["status"],
"progress": event["progress"],
"message": event.get("message", ""),
}))
@database_sync_to_async
def get_document(self):
return Document.objects.get(id=self.document_id)
# apps/api/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/document/(?P<document_id>[^/]+)/$", consumers.DocumentStatusConsumer.as_asgi()),
]
HTMX: interatividade com HTML retornado pelo servidor
O HTMX envia requisições por atributos HTML, como hx-post e hx-get, e injeta a resposta diretamente no DOM. Isso reduz necessidade de JavaScript customizado e mantém o servidor como fonte de verdade da renderização. Componentes em templates se tornam “blocos” reusáveis, como um card de documento ou um painel de resultado. Para tempo real, a extensão SSE do HTMX pode escutar um endpoint e atualizar o componente automaticamente. Essa abordagem mantém simplicidade e torna a UI previsível.
Os principais comportamentos usados nos templates abaixo são os seguintes.
- hx-post envia formulário por AJAX e substitui um alvo definido.
- hx-target define onde a resposta HTML será aplicada.
- hx-swap define como a substituição ocorre (antes, depois, etc.).
- hx-ext="sse" ativa a extensão de SSE para atualizar o componente em tempo real.
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="pt-br">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Django AI Stack</title>
<!-- HTMX -->
<script src="https://unpkg.com/htmx.org@1.9.10" defer></script>
<script src="https://unpkg.com/htmx.org/dist/ext/sse.js" defer></script>
</head>
<body>
{% block content %}{% endblock %}
</body>
</html>
Templates de dashboard e componentes: cards e resultados
O dashboard precisa mostrar estatísticas e uma lista recente, além de um formulário de upload que retorna um card pronto. O componente de card é responsável por exibir status e iniciar a assinatura SSE quando o documento está em processamento. Quando finaliza, o card pode mostrar resumo e um botão que carrega detalhes por HTMX. O componente de resultado mostra resumo, pontos-chave e entidades extraídas. O chat pode existir como endpoint separado, retornando JSON, ou como fragmentos HTML para inserir no histórico.
<!-- templates/dashboard.html -->
{% extends 'base.html' %}
{% block content %}
<div>
<form
hx-post="{% url 'upload_document' %}"
hx-target="#document-list"
hx-swap="afterbegin"
hx-encoding="multipart/form-data"
>
{% csrf_token %}
<input type="text" name="title" placeholder="Título" />
<input type="file" name="file" required />
<button type="submit">Enviar</button>
</form>
<div id="document-list">
{% for document in documents %}
{% include 'components/document_card.html' %}
{% endfor %}
</div>
</div>
{% endblock %}
<!-- templates/components/document_card.html -->
<div
hx-ext="sse"
sse-connect="{% url 'document_status_sse' document.id %}"
hx-target="this"
hx-swap="outerHTML"
>
<h4>{{ document.title }}</h4>
<p>Status: {{ document.status }} ({{ document.progress }}%)</p>
{% if document.status == 'completed' %}
<p><strong>Sentimento</strong>: {{ document.sentiment }}</p>
<p>{{ document.summary }}</p>
<button
hx-get="{% url 'document_detail' document.id %}"
hx-target="#detalhe"
hx-swap="innerHTML"
>Abrir detalhes</button>
{% endif %}
</div>
<!-- templates/components/analysis_result.html -->
<div>
<h4>{{ document.title }}</h4>
<p><strong>Resumo</strong>: {{ document.summary }}</p>
<p>Os pontos-chave abaixo resumem ideias centrais do texto.</p>
<ul>
{% for point in document.key_points %}
<li>{{ point }}</li>
{% endfor %}
</ul>
{% if document.entities %}
<p>As entidades a seguir representam nomes e termos relevantes extraídos.</p>
<ul>
{% for entity in document.entities %}
<li>{{ entity.value }} ({{ entity.type }})</li>
{% endfor %}
</ul>
{% endif %}
</div>
Rotas: organização de URLs no Django
As rotas organizam o acesso ao dashboard, upload, SSE, detalhes e chat. Separar apps.core e apps.api deixa claro o que é domínio e o que é comunicação. Em Django, as rotas do projeto agregam rotas de apps por include. O endpoint SSE é importante por ser longo (streaming) e precisa de headers adequados. O endpoint de chat é POST para preservar semântica de envio de mensagem.
# config/urls.py
from django.contrib import admin
from django.urls import path, include
from django.contrib.auth import views as auth_views
urlpatterns = [
path("admin/", admin.site.urls),
path("", include("apps.core.urls")),
path("api/", include("apps.api.urls")),
path("login/", auth_views.LoginView.as_view(), name="login"),
path("logout/", auth_views.LogoutView.as_view(), name="logout"),
]
# apps/core/urls.py
from django.urls import path
from . import views
urlpatterns = [
path("", views.dashboard_view, name="dashboard"),
path("upload/", views.upload_document_view, name="upload_document"),
path("document/<uuid:document_id>/status/", views.document_status_sse, name="document_status_sse"),
path("document/<uuid:document_id>/", views.document_detail_view, name="document_detail"),
path("document/<uuid:document_id>/chat/", views.chat_with_document, name="chat_with_document"),
]
Execução local: banco, redis, migrações e serviços
O ambiente local precisa do PostgreSQL e do Redis disponíveis, além do worker Celery e do servidor ASGI. Migrações criam as tabelas e o superusuário permite acessar o admin para inspeção de dados. O servidor ASGI pode ser o daphne ou o runserver com suporte a ASGI. Em paralelo, um worker Celery processa as tarefas de análise. Em sistemas desse tipo, rodar múltiplos processos é parte do desenho e não um detalhe.
# redis via docker
docker run -d -p 6379:6379 redis:latest
# migrações e usuário admin
python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser
# celery worker
celery -A config worker --loglevel=info
# celery beat (tarefas periódicas)
celery -A config beat --loglevel=info
# servidor ASGI
daphne -b 0.0.0.0 -p 8000 config.asgi:application
Streaming de resposta de IA: entrega gradual de tokens
Streaming é útil quando a resposta pode demorar e é melhor mostrar progresso incremental. No mundo web, isso pode ser feito com SSE enviando tokens conforme o modelo gera. A configuração do provedor de LLM precisa habilitar streaming e fornecer um iterador de tokens. O endpoint retorna um StreamingHttpResponse com content-type adequado. Esse padrão reduz ansiedade de espera e dá sensação de resposta imediata, principalmente em respostas longas.
# apps/core/views_stream.py (exemplo separado por clareza)
import json
from django.http import StreamingHttpResponse
from django.contrib.auth.decorators import login_required
@login_required
async def stream_ai_response(request):
async def token_stream():
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4-turbo-preview", streaming=True, temperature=0)
async for token in llm.astream("Gere um texto curto e informativo sobre análise de documentos."):
yield f"data: {json.dumps({'token': token})}\n\n"
return StreamingHttpResponse(token_stream(), content_type="text/event-stream")
Otimizações comuns: consultas, índices, cache e carga do banco
Em aplicações de documentos, a listagem e o carregamento de detalhes podem virar gargalo sem otimização. select_related reduz queries ao trazer relacionamentos de chave estrangeira de uma vez. prefetch_related ajuda em relacionamentos muitos-para-muitos ou reverse. Índices nos campos de filtro frequente diminuem custo de busca, principalmente por usuário e status. Cache em Redis para contadores e painéis reduz pressão no PostgreSQL em páginas acessadas com frequência.
# exemplo de consulta otimizada em contexto async
from asgiref.sync import sync_to_async
from apps.core.models import Document
async def listar_concluidos():
documentos = await sync_to_async(list)(
Document.objects
.select_related("user")
.prefetch_related("conversations")
.filter(status="completed")[:50]
)
return documentos
Docker Compose: ambiente reproduzível com PostgreSQL, Redis, web e Celery
Um docker-compose organiza serviços e facilita execução idêntica em máquinas diferentes. O serviço do banco precisa de volume para persistência. O Redis pode ser leve e sem volume em desenvolvimento, dependendo do caso. O serviço web roda o ASGI e depende de banco e Redis. O serviço celery roda o worker e também depende dos mesmos serviços. Variáveis de ambiente passam senhas e URLs sem hardcode no repositório.
version: "3.8"
services:
db:
image: postgres:15
environment:
POSTGRES_DB: django_ai_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
web:
build: .
command: daphne -b 0.0.0.0 -p 8000 config.asgi:application
volumes:
- .:/app
ports:
- "8000:8000"
depends_on:
- db
- redis
environment:
- DB_NAME=django_ai_db
- DB_USER=postgres
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=db
- DB_PORT=5432
- REDIS_HOST=redis
- REDIS_PORT=6379
celery:
build: .
command: celery -A config worker --loglevel=info
volumes:
- .:/app
depends_on:
- db
- redis
environment:
- DB_NAME=django_ai_db
- DB_USER=postgres
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=db
- DB_PORT=5432
- REDIS_HOST=redis
- REDIS_PORT=6379
volumes:
postgres_data:
Cenários importantes: falhas, concorrência, tamanho de arquivo e consistência
Falhas de API de modelo, arquivos inválidos e timeouts são esperados e precisam de tratamento. O status “failed” com mensagem de erro evita que a interface fique “presa” em processamento. Arquivos grandes exigem limites e estratégias de extração; em muitos casos, a extração vira uma etapa própria antes do LangGraph. Concorrência aparece quando múltiplas análises rodam ao mesmo tempo, exigindo workers Celery suficientes e limites por usuário. Consistência é mantida ao persistir resultados no banco e usar IDs de thread para memória do agente.
Como esse stack evolui: do “como era” ao “como fica”
Em abordagens antigas, era comum manter tudo síncrono, rodar análise no request e atualizar a UI com polling agressivo. Isso gerava travamentos, filas invisíveis e escalabilidade limitada. Com esse stack, a análise migra para tarefas, o backend usa async para reduzir bloqueios e a UI se atualiza com SSE ou WebSocket. A IA deixa de ser uma chamada isolada e vira um fluxo com etapas claras e validação. O produto fica mais previsível, observável e pronto para crescer sem reescrever a base.
Conclusão
A pilha com Django assíncrono, LangGraph e HTMX forma um caminho consistente para aplicações inteligentes full-stack em 2026. Ela combina desempenho em I/O, workflows de IA com estado e uma interface dinâmica baseada em HTML, sem complexidade desnecessária no navegador. Com PostgreSQL, Redis e Celery, o sistema ganha persistência, processamento em segundo plano e mecanismos de tempo real confiáveis. O resultado é uma arquitetura que mantém simplicidade de desenvolvimento, mas entrega recursos modernos como streaming, progresso ao vivo e memória conversacional.
Links do stack
Os links a seguir representam as tecnologias citadas e ajudam a identificar os componentes do stack.