Trocar o Redis pelo PostgreSQL em projetos Django significa consolidar funções que antes exigiam dois serviços em uma única base de dados. Em vez de manter um banco relacional para os dados principais e um armazenamento em memória para tarefas auxiliares, a aplicação passa a usar recursos modernos do PostgreSQL para cache, pub/sub, filas de tarefas, controle de acesso por frequência e sessões.
Essa substituição se apoia em funcionalidades nativas do PostgreSQL, como JSONB (armazenamento JSON eficiente), LISTEN/NOTIFY (mensageria leve), bloqueios com SKIP LOCKED (concorrência segura em filas) e índices avançados. O resultado típico é menor complexidade operacional, uma estratégia única de backup e consistência transacional para rotinas que, antes, ficavam separadas.
Por que considerar PostgreSQL no lugar do Redis
Redis costuma ser escolhido por oferecer operações muito rápidas em memória, mas isso adiciona uma peça extra para hospedar, monitorar e proteger. PostgreSQL, por outro lado, já está presente em muitos projetos Django e pode assumir várias dessas funções com boa performance e persistência. A principal diferença prática é que o PostgreSQL trabalha com durabilidade e consistência por padrão, reduzindo decisões sobre persistência e recuperação. Em cenários comuns, a consolidação diminui custos e reduz pontos de falha.
Também existe uma diferença de “como era e como será” na arquitetura: antes, eventos rápidos e dados temporários ficavam no Redis, e o banco guardava apenas o essencial. Com PostgreSQL assumindo parte do temporário, o sistema passa a ter observabilidade e auditoria melhores, pois tudo fica consultável em SQL. Em compensação, é importante controlar volume e manutenção de tabelas “quentes”, como logs e rate limit. A troca tende a funcionar melhor quando a taxa de operações é alta, mas não extrema ao ponto de exigir milhões de ações por segundo em memória.
Cache no Django usando PostgreSQL
Um cache guarda respostas ou resultados caros de calcular para reduzir consultas repetidas. Em vez de usar Redis como backend de cache, o Django permite usar o próprio banco com o backend DatabaseCache. Essa abordagem é simples e centraliza a persistência, mas exige índices para não degradar com volume. O cache no banco funciona bem para dados pequenos e médios e para tempos de expiração bem definidos.
O funcionamento é baseado em uma tabela que guarda chave, valor serializado e expiração. Quando a aplicação busca uma chave, o Django lê a tabela e valida se ainda está dentro do prazo. Quando o limite de entradas é atingido, ocorre descarte de parte dos itens, conforme a configuração. Em sistemas com muitas leituras, índices adequados evitam varreduras desnecessárias.
As configurações abaixo mostram como ativar o cache no banco e controlar o tamanho e o descarte de entradas.
# settings.py
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.db.DatabaseCache",
"LOCATION": "cache_table",
"OPTIONS": {
"MAX_ENTRIES": 10000,
"CULL_FREQUENCY": 3, # remove 1/3 quando atingir MAX_ENTRIES
},
"TIMEOUT": 300, # tempo padrão do cache (segundos)
}
}
A tabela do cache precisa existir para que o backend funcione corretamente.
python manage.py createcachetable
Para melhorar a performance, índices aceleram as buscas pela chave e as limpezas por expiração.
CREATE INDEX IF NOT EXISTS idx_cache_key ON cache_table(cache_key);
CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache_table(expires);
O uso do cache no código segue o padrão do Django, com operações de set, get e delete, além de cache por página.
from django.core.cache import cache
from django.views.decorators.cache import cache_page
from django.shortcuts import render
def salvar_no_cache():
dados_usuario = {"id": 123, "nome": "Ana", "plano": "pro"}
cache.set("perfil_usuario_123", dados_usuario, timeout=3600)
def ler_do_cache():
return cache.get("perfil_usuario_123")
def remover_do_cache():
cache.delete("perfil_usuario_123")
@cache_page(60 * 15) # 15 minutos
def minha_view(request):
return render(request, "template.html")
Pub/Sub no PostgreSQL com LISTEN e NOTIFY
Pub/Sub é um modelo de comunicação onde um componente publica mensagens em um canal e outros componentes inscritos recebem essas mensagens. No Redis, isso é comum com comandos de publish e subscribe. No PostgreSQL, isso é possível com LISTEN/NOTIFY, que envia notificações curtas associadas a um canal. Esse mecanismo é leve e útil para sinais de evento, invalidação de cache e integrações internas.
Uma diferença importante é que NOTIFY não é uma fila persistente: se o consumidor estiver desconectado, a mensagem pode não ser entregue. Por isso, ele funciona melhor como “campainha” para avisar que algo ocorreu, e não como mecanismo garantido de entrega. Quando a necessidade é confiabilidade, costuma-se combinar NOTIFY com uma tabela de eventos ou usar uma fila baseada em tabela. Ainda assim, para muitos sistemas, a simplicidade de LISTEN/NOTIFY resolve o caso.
O código abaixo implementa um pub/sub básico usando psycopg2, com publicação em canal e loop de escuta.
import json
import select
import psycopg2
import psycopg2.extensions
class PostgresPubSub:
def __init__(self, dsn):
self.conn = psycopg2.connect(dsn)
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.cursor = self.conn.cursor()
def publicar(self, canal, mensagem):
payload = json.dumps(mensagem, ensure_ascii=False)
self.cursor.execute("SELECT pg_notify(%s, %s);", (canal, payload))
def assinar(self, canais, callback):
for canal in canais:
# LISTEN não aceita parâmetro via %s, por isso o formato direto deve ser controlado
self.cursor.execute(f"LISTEN {canal};")
while True:
pronto, _, _ = select.select([self.conn], [], [], 5)
if not pronto:
continue
self.conn.poll()
while self.conn.notifies:
notify = self.conn.notifies.pop(0)
mensagem = json.loads(notify.payload)
callback(notify.channel, mensagem)
def fechar(self):
self.cursor.close()
self.conn.close()
Em Django, uma forma comum de manter um assinante rodando é criar um comando de gerenciamento dedicado.
# management/commands/ouvir_canais.py
from django.conf import settings
from django.core.management.base import BaseCommand
from meuapp.pubsub import PostgresPubSub
class Command(BaseCommand):
help = "Ouve canais LISTEN/NOTIFY no PostgreSQL"
def handle(self, *args, **options):
dsn = settings.DATABASE_URL
pubsub = PostgresPubSub(dsn)
def tratar_mensagem(canal, mensagem):
if canal == "notificacoes":
# lógica de notificação interna
pass
elif canal == "eventos":
# lógica de processamento de evento
pass
pubsub.assinar(["notificacoes", "eventos"], tratar_mensagem)
A publicação pode ocorrer em uma view, em um serviço interno ou após concluir uma transação importante.
# views.py
from django.conf import settings
from django.http import JsonResponse
from django.utils import timezone
from meuapp.pubsub import PostgresPubSub
def enviar_notificacao(request):
pubsub = PostgresPubSub(settings.DATABASE_URL)
pubsub.publicar(
"notificacoes",
{
"user_id": request.user.id,
"mensagem": "Nova notificação",
"timestamp": timezone.now().isoformat(),
},
)
pubsub.fechar()
return JsonResponse({"status": "enviado"})
Streaming de logs em tempo real com triggers e NOTIFY
Um uso prático de LISTEN/NOTIFY é transmitir eventos de log para consumidores internos em tempo real. Em vez de enviar logs somente para arquivo, o PostgreSQL pode armazenar o log em tabela e notificar automaticamente quando uma linha nova for inserida. Isso combina persistência (histórico consultável) com “tempo real” para painéis e diagnósticos. Esse padrão evita dependência de serviços extras quando a necessidade é interna.
A peça central é uma trigger, que é uma rotina do banco executada automaticamente em eventos como INSERT. Ela chama uma função em PL/pgSQL para disparar o NOTIFY com um payload em JSON. Essa mensagem pode ser consumida por um processo assinante, enquanto a tabela mantém o histórico completo. O cuidado principal é evitar payloads grandes e manter índices para consultas frequentes.
O SQL abaixo cria a tabela de logs, a função de notificação e a trigger associada.
CREATE TABLE IF NOT EXISTS application_logs (
id SERIAL PRIMARY KEY,
level VARCHAR(20) NOT NULL,
message TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION notify_log_insert()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'log_stream',
json_build_object(
'id', NEW.id,
'level', NEW.level,
'message', NEW.message,
'metadata', NEW.metadata,
'created_at', NEW.created_at
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS log_insert_trigger ON application_logs;
CREATE TRIGGER log_insert_trigger
AFTER INSERT ON application_logs
FOR EACH ROW
EXECUTE FUNCTION notify_log_insert();
O handler abaixo integra com o módulo logging do Python e insere o registro no PostgreSQL.
# logging_handler.py
import logging
from django.db import connection
class PostgresLogHandler(logging.Handler):
def emit(self, record):
mensagem = self.format(record)
metadata = {
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
}
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO application_logs (level, message, metadata)
VALUES (%s, %s, %s)
""",
[record.levelname, mensagem, metadata],
)
A configuração do Django registra o handler e direciona logs do app para o PostgreSQL.
# settings.py
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"postgres": {
"class": "meuapp.logging_handler.PostgresLogHandler",
"level": "INFO",
}
},
"loggers": {
"meuapp": {
"handlers": ["postgres"],
"level": "INFO",
"propagate": False,
}
},
}
Fila de tarefas em background com SKIP LOCKED
Uma fila de tarefas em background organiza trabalhos para serem processados fora do fluxo principal, como enviar e-mails, gerar relatórios ou processar imagens. No Redis, isso costuma ser feito com listas, streams ou bibliotecas de fila. No PostgreSQL, o padrão mais comum usa uma tabela e uma consulta que “pega e bloqueia” trabalhos com FOR UPDATE SKIP LOCKED. Isso permite múltiplos workers sem colisões, evitando que dois processos peguem o mesmo job.
O “como era e como será” muda a forma de garantir concorrência: Redis normalmente resolve com atomicidade de comandos e estruturas de dados próprias. PostgreSQL resolve com transações, bloqueios de linha e atualização com retorno (UPDATE ... RETURNING). O desenho também pode incluir prioridade, agendamento e tentativas com backoff, tudo em colunas explícitas e auditáveis. Esse modelo tende a ser muito estável e fácil de depurar, pois cada job é uma linha consultável.
O SQL abaixo cria a tabela da fila e um índice parcial para acelerar a busca por jobs pendentes.
CREATE TABLE IF NOT EXISTS job_queue (
id SERIAL PRIMARY KEY,
queue_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
retry_count INTEGER NOT NULL DEFAULT 0,
scheduled_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_queue_processing
ON job_queue(queue_name, status, priority DESC, scheduled_at)
WHERE status = 'pending';
O código abaixo enfileira, desenfileira com bloqueio seguro e finaliza jobs com sucesso ou falha com tentativa.
# job_queue.py
import json
from datetime import datetime
from django.db import connection, transaction
class JobQueue:
@staticmethod
def enfileirar(queue_name, payload, priority=0, scheduled_at=None):
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO job_queue (queue_name, payload, priority, scheduled_at)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
[queue_name, json.dumps(payload, ensure_ascii=False), priority, scheduled_at or datetime.now()],
)
return cursor.fetchone()[0]
@staticmethod
def desenfileirar(queue_name, batch_size=1):
with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute(
"""
UPDATE job_queue
SET status = 'processing',
started_at = NOW(),
updated_at = NOW()
WHERE id IN (
SELECT id
FROM job_queue
WHERE queue_name = %s
AND status = 'pending'
AND scheduled_at <= NOW()
ORDER BY priority DESC, scheduled_at ASC
LIMIT %s
FOR UPDATE SKIP LOCKED
)
RETURNING id, payload
""",
[queue_name, batch_size],
)
return cursor.fetchall()
@staticmethod
def finalizar(job_id, success=True, error_message=None):
with connection.cursor() as cursor:
if success:
cursor.execute(
"""
UPDATE job_queue
SET status = 'completed',
completed_at = NOW(),
updated_at = NOW(),
error_message = NULL
WHERE id = %s
""",
[job_id],
)
return
cursor.execute(
"""
UPDATE job_queue
SET status = CASE
WHEN retry_count + 1 >= max_retries THEN 'failed'
ELSE 'pending'
END,
retry_count = retry_count + 1,
error_message = %s,
scheduled_at = CASE
WHEN retry_count + 1 < max_retries
THEN NOW() + (INTERVAL '5 minutes' * POWER(2, retry_count))
ELSE scheduled_at
END,
updated_at = NOW()
WHERE id = %s
""",
[error_message, job_id],
)
Um worker pode ser um comando Django que faz polling leve e processa lotes.
# management/commands/worker_fila.py
import json
import time
from django.core.management.base import BaseCommand
from meuapp.job_queue import JobQueue
class Command(BaseCommand):
help = "Processa jobs em background usando PostgreSQL"
def add_arguments(self, parser):
parser.add_argument("queue_name", type=str)
parser.add_argument("--batch-size", type=int, default=5)
def handle(self, *args, **options):
queue_name = options["queue_name"]
batch_size = options["batch_size"]
while True:
jobs = JobQueue.desenfileirar(queue_name, batch_size=batch_size)
if not jobs:
time.sleep(1)
continue
for job_id, payload in jobs:
try:
dados = json.loads(payload)
self.processar_job(dados)
JobQueue.finalizar(job_id, success=True)
except Exception as exc:
JobQueue.finalizar(job_id, success=False, error_message=str(exc))
def processar_job(self, dados):
tipo = dados.get("task_type")
if tipo == "send_email":
# envio de e-mail (exemplo de roteamento)
return
if tipo == "process_image":
# processamento de imagem (exemplo de roteamento)
return
O exemplo abaixo cria um job típico de envio assíncrono, registrando os dados no payload.
# views.py
from django.http import JsonResponse
from meuapp.job_queue import JobQueue
def criar_tarefa(request):
job_id = JobQueue.enfileirar(
"default",
{
"task_type": "send_email",
"to": "user@example.com",
"subject": "Boas-vindas",
"body": "Mensagem de boas-vindas",
},
priority=1,
)
return JsonResponse({"job_id": job_id})
Rate limiting com PostgreSQL
Rate limiting é o controle de quantas requisições uma identidade pode fazer em um intervalo de tempo. Em Redis, isso normalmente é resolvido com contadores com expiração. Em PostgreSQL, uma forma direta é registrar eventos em uma tabela e contar quantos ocorreram dentro de uma janela, garantindo consistência em uma única fonte. Esse método é simples de entender e auditar, mas precisa de limpeza periódica para não crescer sem controle.
Uma alternativa mais “transacional” é usar uma tabela agregada por janela e fazer UPSERT, mas o modelo por eventos é mais didático e flexível. A consistência vem do fato de que o registro e a contagem ocorrem no mesmo banco, sob as mesmas regras. Em volume muito alto, a tabela pode crescer rapidamente, então índices e expurgo são parte do desenho. Para cenários comuns de API com limites moderados, a abordagem atende bem.
O SQL abaixo cria a tabela de eventos de rate limit e um índice para consultas por janela de tempo.
CREATE TABLE IF NOT EXISTS rate_limits (
id SERIAL PRIMARY KEY,
identifier VARCHAR(255) NOT NULL,
action VARCHAR(100) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_rate_limits_lookup
ON rate_limits(identifier, action, timestamp DESC);
O código abaixo checa o limite, calcula “restante” e registra a requisição quando permitido.
# rate_limiter.py
from datetime import datetime, timedelta
from django.db import connection
class RateLimiter:
@staticmethod
def checar(identifier, action, max_requests, window_seconds):
window_start = datetime.now() - timedelta(seconds=window_seconds)
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT COUNT(*)
FROM rate_limits
WHERE identifier = %s
AND action = %s
AND timestamp > %s
""",
[identifier, action, window_start],
)
count = cursor.fetchone()[0]
if count >= max_requests:
cursor.execute(
"""
SELECT timestamp
FROM rate_limits
WHERE identifier = %s
AND action = %s
AND timestamp > %s
ORDER BY timestamp ASC
LIMIT 1
""",
[identifier, action, window_start],
)
oldest = cursor.fetchone()[0]
reset_at = oldest + timedelta(seconds=window_seconds)
return False, 0, reset_at
cursor.execute(
"""
INSERT INTO rate_limits (identifier, action)
VALUES (%s, %s)
""",
[identifier, action],
)
remaining = max_requests - count - 1
reset_at = datetime.now() + timedelta(seconds=window_seconds)
return True, remaining, reset_at
@staticmethod
def limpar_antigos(days=7):
cutoff = datetime.now() - timedelta(days=days)
with connection.cursor() as cursor:
cursor.execute("DELETE FROM rate_limits WHERE timestamp < %s", [cutoff])
O middleware abaixo aplica rate limit por IP e inclui cabeçalhos úteis na resposta.
# middleware.py
from django.http import JsonResponse
from meuapp.rate_limiter import RateLimiter
class RateLimitMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
identifier = self._ip_cliente(request)
action = f"{request.method}:{request.path}"
allowed, remaining, reset_at = RateLimiter.checar(
identifier, action, max_requests=100, window_seconds=60
)
if not allowed:
response = JsonResponse({"error": "Rate limit excedido"}, status=429)
else:
response = self.get_response(request)
response["X-RateLimit-Limit"] = "100"
response["X-RateLimit-Remaining"] = str(remaining)
response["X-RateLimit-Reset"] = reset_at.isoformat()
return response
def _ip_cliente(self, request):
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
return x_forwarded_for.split(",")[0].strip()
return request.META.get("REMOTE_ADDR")
Em cenários em que apenas algumas views precisam de limites específicos, um decorador centraliza a regra.
# decorators.py
from functools import wraps
from django.http import JsonResponse
from meuapp.rate_limiter import RateLimiter
def rate_limit(max_requests=60, window_seconds=60):
def decorator(view_func):
@wraps(view_func)
def wrapped(request, *args, **kwargs):
if getattr(request, "user", None) and request.user.is_authenticated:
identifier = str(request.user.id)
else:
identifier = request.META.get("REMOTE_ADDR")
action = view_func.__name__
allowed, remaining, reset_at = RateLimiter.checar(
identifier, action, max_requests, window_seconds
)
if not allowed:
return JsonResponse(
{"error": "Rate limit excedido", "reset_at": reset_at.isoformat()},
status=429,
)
response = view_func(request, *args, **kwargs)
response["X-RateLimit-Limit"] = str(max_requests)
response["X-RateLimit-Remaining"] = str(remaining)
response["X-RateLimit-Reset"] = reset_at.isoformat()
return response
return wrapped
return decorator
Sessões no Django com PostgreSQL e JSONB
Sessão é um mecanismo para guardar estado entre requisições, como autenticação e preferências temporárias. Django suporta sessões no banco por padrão, o que substitui a necessidade de usar Redis para esse fim em muitos projetos. No PostgreSQL, o armazenamento pode ser eficiente e consultável, com índice por data de expiração. Para maior controle, uma modelagem própria com JSONB permite guardar metadados como IP, user agent e conteúdo estruturado.
O “antes e depois” costuma aparecer quando sessões estavam em memória e passavam a expirar sem histórico ou auditoria. Com banco, sessões podem ser analisadas e limpas com consultas e índices, mantendo previsibilidade. A atenção principal é garantir limpeza das sessões expiradas e evitar crescimento contínuo. Em aplicações com volume muito grande de sessões, o desenho precisa considerar índices e rotinas de manutenção.
As configurações abaixo ativam o backend de sessão no banco e definem PostgreSQL como banco principal.
# settings.py
SESSION_ENGINE = "django.contrib.sessions.backends.db"
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": "sua_base",
"USER": "seu_usuario",
"PASSWORD": "sua_senha",
"HOST": "localhost",
"PORT": "5432",
"CONN_MAX_AGE": 600, # reaproveita conexões por até 10 minutos
"OPTIONS": {"connect_timeout": 10},
}
}
A criação das tabelas padrão de sessão ocorre nas migrações normais do Django.
python manage.py migrate
Quando a necessidade é guardar mais informações e consultar por campos, um modelo próprio com JSONField usa JSONB no PostgreSQL.
# models.py
from django.db import models
class CustomSession(models.Model):
session_key = models.CharField(max_length=40, primary_key=True)
session_data = models.JSONField() # usa JSONB no PostgreSQL
expire_date = models.DateTimeField(db_index=True)
user_id = models.IntegerField(null=True, db_index=True)
ip_address = models.GenericIPAddressField(null=True)
user_agent = models.TextField(null=True)
class Meta:
db_table = "custom_sessions"
indexes = [
models.Index(fields=["user_id", "expire_date"]),
]
Consultas típicas incluem listar sessões ativas e filtrar por conteúdo no JSONB.
from django.utils import timezone
from meuapp.models import CustomSession
sessoes_ativas = CustomSession.objects.filter(
user_id=123,
expire_date__gt=timezone.now(),
)
sessoes_com_carrinho = CustomSession.objects.filter(
session_data__cart__isnull=False
)
Otimizações essenciais no PostgreSQL para substituir Redis
Ao centralizar cache, fila e rate limit no PostgreSQL, o banco passa a receber padrões de carga diferentes, com mais escrita e mais operações de curta duração. Por isso, índices e estratégia de limpeza deixam de ser “opcionais” e passam a ser parte do desenho. Também é importante reduzir overhead de conexão, pois workloads de fila e pub/sub podem abrir muitas conexões. O objetivo é manter latência previsível mesmo com tabelas crescendo.
Outra diferença é a manutenção: tabelas com muitas inserções e deleções precisam de VACUUM para recuperar espaço e atualizar estatísticas. Em volumes grandes, particionamento ajuda a manter consultas e limpezas rápidas, isolando dados por período. O uso de índices parciais reduz custo, pois indexa apenas o subconjunto realmente consultado (por exemplo, apenas jobs pendentes). Essas técnicas fazem o PostgreSQL sustentar casos que antes eram "naturais" no Redis.
Os exemplos abaixo mostram índices práticos para tabelas “quentes” e consultas recorrentes.
-- Cache
CREATE INDEX IF NOT EXISTS idx_cache_expires_notnull
ON cache_table(expires)
WHERE expires IS NOT NULL;
-- Fila
CREATE INDEX IF NOT EXISTS idx_job_pending_by_time
ON job_queue(queue_name, scheduled_at)
WHERE status = 'pending';
-- Rate limiting
CREATE INDEX IF NOT EXISTS idx_rate_limits_window
ON rate_limits(identifier, action, timestamp DESC);
-- Sessões padrão do Django
CREATE INDEX IF NOT EXISTS idx_session_expire
ON django_session(expire_date);
A manutenção manual pode ser usada pontualmente para tabelas muito atualizadas.
VACUUM ANALYZE job_queue;
VACUUM ANALYZE rate_limits;
VACUUM ANALYZE cache_table;
O particionamento por data é útil quando a tabela cresce por eventos, como rate limit e logs.
CREATE TABLE IF NOT EXISTS rate_limits_partitioned (
id SERIAL,
identifier VARCHAR(255) NOT NULL,
action VARCHAR(100) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (timestamp);
CREATE TABLE IF NOT EXISTS rate_limits_2026_01
PARTITION OF rate_limits_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
Quando Redis ainda faz mais sentido
Mesmo com PostgreSQL cobrindo muitos casos, Redis continua superior em operações extremamente frequentes e com requisitos de latência muito baixos. Estruturas especializadas, como conjuntos ordenados e estruturas probabilísticas, também podem ser determinantes em alguns domínios. Além disso, cenários de cache distribuído multi-região podem exigir soluções específicas para sincronização. Nesses casos, a escolha por Redis geralmente se baseia em exigências de escala e tempo de resposta.
A troca para PostgreSQL costuma ser mais vantajosa quando a prioridade é simplificar infraestrutura e manter consistência transacional. Quando a carga cresce, a arquitetura pode permanecer estável desde que tabelas sejam bem indexadas e mantidas. Em sistemas com picos severos, filas e rate limit no banco podem pressionar I/O e locks, o que pede ajustes finos e particionamento. A decisão final costuma refletir o equilíbrio entre simplicidade e limites de performance.
Monitoramento e diagnóstico no PostgreSQL dentro do Django
Monitoramento é essencial quando o PostgreSQL passa a assumir funções além do armazenamento principal. Métricas simples como profundidade de fila, contagem de pendências e queries longas ajudam a detectar gargalos cedo. O PostgreSQL expõe visões internas como pg_stat_activity, que mostram consultas ativas e duração. No Django, um comando de gerenciamento permite coletar essas informações de forma padronizada.
O objetivo é enxergar sinais de saturação antes de virar erro: filas que não baixam, consultas travadas e workers ociosos. Uma prática útil é observar separadamente tabelas com alta taxa de escrita, pois elas tendem a exigir mais manutenção. Também ajuda manter métricas por fila e por status, o que orienta ajustes de batch size e prioridade. Esse monitoramento complementa índices e manutenção, formando a base de estabilidade do desenho.
O comando abaixo mostra profundidade da fila por nome e lista consultas ativas com mais de um minuto.
# management/commands/monitorar_postgres.py
from django.core.management.base import BaseCommand
from django.db import connection
class Command(BaseCommand):
help = "Mostra métricas básicas para filas e queries longas no PostgreSQL"
def handle(self, *args, **options):
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT queue_name, COUNT(*)
FROM job_queue
WHERE status = 'pending'
GROUP BY queue_name
ORDER BY COUNT(*) DESC
"""
)
self.stdout.write("Pendências por fila:")
for queue_name, total in cursor.fetchall():
self.stdout.write(f" {queue_name}: {total}")
cursor.execute(
"""
SELECT pid,
now() - query_start AS duration,
query
FROM pg_stat_activity
WHERE state = 'active'
AND now() - query_start > interval '1 minute'
ORDER BY duration DESC
"""
)
self.stdout.write("\nQueries ativas acima de 1 minuto:")
for pid, duration, query in cursor.fetchall():
trecho = (query or "").replace("\n", " ")[:120]
self.stdout.write(f" PID {pid}: {duration} - {trecho}")
Conclusão
Substituir Redis por PostgreSQL em projetos Django é uma forma prática de reduzir complexidade e consolidar responsabilidades em uma base robusta e transacional. Recursos como DatabaseCache, LISTEN/NOTIFY, SKIP LOCKED e armazenamento em JSONB permitem cobrir cache, mensageria leve, filas, rate limiting e sessões com uma infraestrutura mais simples. Esse desenho também fortalece auditoria e depuração, pois os estados intermediários passam a ser consultáveis via SQL. A estabilidade do resultado depende de índices, limpeza e manutenção alinhados ao volume de escrita.
Quando bem modelado, o PostgreSQL suporta a maior parte dos cenários típicos que motivam Redis, mantendo consistência e persistência como padrão. Em cargas extremas ou exigências de latência sub-milissegundo, Redis ainda pode ser a melhor escolha, mas isso representa um subconjunto mais específico de aplicações. Para muitos sistemas, a consolidação no PostgreSQL oferece uma solução completa, coerente e previsível, com menos componentes para operar. O conjunto de padrões apresentados forma um caminho sólido para substituir Redis com segurança dentro do ecossistema Django.