Cache em Python é um conjunto de técnicas para evitar trabalho repetido, reaproveitando resultados já calculados em chamadas futuras. Em serviços sob carga, essa prática reduz uso de CPU, diminui latências de cauda (como p95) e estabiliza o comportamento quando muitas requisições pedem os mesmos dados.
Um cache útil precisa equilibrar desempenho e frescor, isto é, entregar respostas rápidas sem “mentir” sobre dados que mudaram. Por isso, além de memoização em memória com functools, entram padrões com TTL (tempo de vida), cache por requisição, cache assíncrono e estratégias com Redis para compartilhamento entre processos e proteção contra “tempestades” de recomputação.
Fundamentos: o que é cache e por que reduz CPU
Cache é o armazenamento temporário de um resultado para que a próxima consulta semelhante seja respondida com uma busca rápida. Em geral, uma “batida” no cache (cache hit) custa menos do que recalcular, buscar no banco ou chamar uma API. Um “erro” no cache (cache miss) acontece quando o item não está armazenado e o trabalho original precisa ocorrer. O ganho de CPU aparece quando muitas chamadas repetidas passam a virar hits, reduzindo cálculos e serializações.
Dois conceitos costumam aparecer juntos: memoização e cache distribuído. Memoização é cache local, normalmente em memória, baseado nos argumentos de uma função. Cache distribuído é quando o valor fica fora do processo, como no Redis, permitindo que vários workers compartilhem os mesmos resultados. Na prática, cada padrão resolve uma combinação diferente de repetição, frescor e concorrência.
Padrão 1: Memoização de função pura com lru_cache
Memoização funciona melhor em funções determinísticas e sem efeitos colaterais, chamadas de “funções puras”. Determinística significa que a mesma entrada sempre produz a mesma saída, sem depender de estado externo como horário, banco de dados ou variáveis globais. O decorador lru_cache (Least Recently Used) guarda resultados recentes e descarta os menos usados quando o limite é atingido. O custo de um hit vira uma busca em estrutura de dados interna, geralmente muito mais barata que repetir o cálculo.
O trecho a seguir mostra uma função simples de normalização, onde entradas repetidas se beneficiam imediatamente do cache. Ele também exemplifica como limitar memória com maxsize, que controla quantas combinações de argumentos podem ficar armazenadas. Em ambientes reais, esse limite deve considerar cardinalidade de entradas e custo do cálculo. Em casos raros de mudanças na regra de normalização, o cache pode ser limpo explicitamente.
from functools import lru_cache
@lru_cache(maxsize=4096) # ajusta conforme taxa de hit e memória disponível
def normalizar_pais(codigo: str) -> str:
mapa = {"uk": "GB", "gb": "GB", "us": "US"}
return mapa.get(codigo.lower(), codigo.upper())
def limpar_cache_normalizacao() -> None:
# útil em recargas quentes ou ajustes excepcionais do mapa
normalizar_pais.cache_clear()
Padrão 2: Resultados com TTL usando lru_cache (sem bibliotecas extras)
TTL (Time To Live) é um tempo de vida que define por quanto tempo um valor pode ser considerado válido. O lru_cache não expira por tempo nativamente, mas é possível criar um “segmento de tempo” e incluir esse segmento na chave do cache. Assim, todas as chamadas dentro da mesma janela compartilham o resultado, e ao virar a janela o cache passa a guardar outra versão. Esse padrão serve para dados que podem ficar “eventualmente frescos”, como configurações, flags, taxas de câmbio ou metadados.
O exemplo abaixo implementa um decorador ttl_cache com poucos recursos, usando o tempo atual arredondado por janelas. A granularidade é propositalmente grossa, o que reduz recomputações mas não garante invalidação imediata. Para necessidades de frescor mais rigorosas, costuma-se preferir invalidação por evento (quando algo muda) ou cache distribuído com controle central. Ainda assim, para leituras periódicas, esse padrão reduz chamadas e CPU de forma consistente.
import time
from functools import lru_cache
from typing import Callable, TypeVar, Any, Dict
T = TypeVar("T")
def ttl_cache(segundos: int, maxsize: int = 1024) -> Callable[[Callable[..., T]], Callable[..., T]]:
def decorador(funcao: Callable[..., T]) -> Callable[..., T]:
@lru_cache(maxsize=maxsize)
def _cacheado(janela: int, *args: Any, **kwargs: Any) -> T:
return funcao(*args, **kwargs)
def wrapper(*args: Any, **kwargs: Any) -> T:
janela = int(time.time() // segundos)
return _cacheado(janela, *args, **kwargs)
# expõe limpeza manual quando necessário
wrapper.cache_clear = _cacheado.cache_clear # type: ignore[attr-defined]
return wrapper
return decorador
@ttl_cache(300) # 5 minutos
def ler_feature_flags() -> Dict[str, bool]:
# simula trabalho caro: leitura de arquivo, banco ou serviço de configuração
return {"checkout_novo": True, "modo_manutencao": False}
Padrão 3: Cache por requisição (escopo curto, sem risco entre requisições)
Em aplicações web, uma mesma requisição pode chamar repetidamente as mesmas rotinas com os mesmos argumentos. Isso acontece em checagens de permissão, resolução de usuário, normalizações e validações reutilizadas em diferentes camadas. O cache por requisição existe somente durante aquele ciclo e evita que um valor “vaze” para outras requisições, reduzindo riscos de dados desatualizados. O ganho típico é em micro-latência e CPU, por eliminar duplicidade dentro do mesmo handler.
O padrão usa um dicionário associado ao contexto da requisição e usa como chave o nome da função e seus argumentos. O exemplo abaixo mostra uma versão genérica que recebe um get_store, isto é, uma função que devolve o dicionário do contexto atual. Esse contexto pode ser implementado com contextvars (adequado para concorrência assíncrona) ou com mecanismos do framework web em uso. O foco é que o armazenamento seja isolado por requisição.
from functools import wraps
from typing import Any, Callable, Dict, Tuple
import contextvars
_store_requisicao: contextvars.ContextVar[Dict[Tuple[Any, ...], Any]] = contextvars.ContextVar(
"store_requisicao",
default={}
)
def get_store_requisicao() -> Dict[Tuple[Any, ...], Any]:
return _store_requisicao.get()
def request_cache(get_store: Callable[[], Dict[Tuple[Any, ...], Any]]):
def decorador(funcao: Callable[..., Any]):
@wraps(funcao)
def wrapper(*args: Any, **kwargs: Any) -> Any:
chave = (funcao.__name__, args, tuple(sorted(kwargs.items())))
store = get_store()
if chave in store:
return store[chave]
store[chave] = funcao(*args, **kwargs)
return store[chave]
return wrapper
return decorador
@request_cache(get_store_requisicao)
def pode_acessar(usuario_id: str, recurso: str) -> bool:
# exemplo de cálculo repetitivo dentro de uma requisição
return recurso != "admin" or usuario_id == "1"
Padrão 4: Memoização assíncrona com compartilhamento de tarefas em voo
Em Python assíncrono, funções async retornam corrotinas, e o lru_cache não armazena automaticamente o resultado final “aguardado”. Uma abordagem prática é cachear uma Task (tarefa) em voo, criada por asyncio.create_task. Assim, chamadas concorrentes para a mesma chave aguardam a mesma tarefa, evitando N chamadas idênticas competindo entre si. Esse padrão diminui carga em IO e CPU indireta, como serialização e tratamento repetido.
Esse tipo de cache precisa ter cuidado com falhas e crescimento de memória. Uma tarefa que falhou pode ficar armazenada e repetir a falha para todos até ser removida, e tarefas guardam referências que prolongam a vida de objetos. Por isso, é comum remover a entrada ao final, ao menos em cenários de erro ou de uso pouco repetitivo. O exemplo abaixo compartilha o “trabalho em andamento” e limpa a chave quando necessário, mantendo o processo saudável.
import asyncio
from functools import lru_cache
from typing import Dict, Any
async def _buscar_perfil_usuario(usuario_id: str) -> Dict[str, Any]:
# simula chamada de rede
await asyncio.sleep(0.05)
return {"id": usuario_id, "nome": "Usuário " + usuario_id}
@lru_cache(maxsize=512)
def _tarefa_em_voo(usuario_id: str) -> asyncio.Task:
# a Task é compartilhada entre chamadores concorrentes
return asyncio.create_task(_buscar_perfil_usuario(usuario_id))
async def obter_perfil_usuario(usuario_id: str) -> Dict[str, Any]:
tarefa = _tarefa_em_voo(usuario_id)
try:
return await tarefa
finally:
# limpeza simples para evitar crescimento em chaves pouco reutilizadas
# não existe "cache_pop" no lru_cache padrão, então usa limpeza total em caso extremo
if tarefa.done() and tarefa.exception() is not None:
_tarefa_em_voo.cache_clear()
Padrão 5: Cache read-through no Redis com backoff e jitter
Cache read-through significa “ler do cache primeiro e, se faltar, calcular e preencher o cache”. Com Redis, o cache fica fora do processo e é compartilhado por múltiplos workers, evitando recomputações entre instâncias. Esse padrão reduz CPU ao diminuir buscas repetidas no banco e chamadas a serviços externos, além de reduzir latência média. Em sistemas reais, também é importante evitar que muitos processos calculem o mesmo item ao mesmo tempo.
Para reduzir concorrência na falta do cache, uma trava curta baseada em chave pode ser criada com SET NX (set se não existir) e expiração. Quando outra instância já está calculando, um pequeno atraso com jitter (variação aleatória) diminui sincronização e colisões, e então ocorre uma nova tentativa de leitura. O exemplo abaixo serializa em JSON, mas o mesmo padrão vale para outros formatos. Também existe um “último recurso” que computa localmente caso o lock não seja obtido após uma tentativa rápida, evitando espera indefinida.
import json
import random
import time
from typing import Callable, Any, Dict
import redis
redis_cliente = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_or_set_json(chave: str, ttl: int, compute: Callable[[], Dict[str, Any]]) -> Dict[str, Any]:
valor = redis_cliente.get(chave)
if valor is not None:
return json.loads(valor)
chave_lock = f"lock:{chave}"
# lock curto para controlar recomputação concorrente
if redis_cliente.set(chave_lock, "1", nx=True, ex=10):
try:
dados = compute()
redis_cliente.set(chave, json.dumps(dados), ex=ttl)
return dados
finally:
redis_cliente.delete(chave_lock)
# outro worker está computando: aguarda pouco e tenta ler de novo
time.sleep(0.02 + random.random() * 0.05)
valor = redis_cliente.get(chave)
if valor is not None:
return json.loads(valor)
# último recurso: computa sem cache para evitar espera indefinida
return compute()
def carregar_preco_produto(produto_id: str) -> Dict[str, Any]:
chave = f"produto:preco:{produto_id}"
return get_or_set_json(
chave=chave,
ttl=120,
compute=lambda: {"produto_id": produto_id, "preco": 19.90, "moeda": "BRL"}
)
Padrão 6: Escrita com invalidação precisa (write-through e invalidate)
O maior desafio de cache costuma ser a invalidação, isto é, decidir quando um valor armazenado não serve mais. Quando existe atualização de dados, manter o cache correto evita leituras inconsistentes, retrabalho e “checagens duplas” custosas. Dois caminhos comuns são: invalidar na escrita (apagar a chave) ou escrever e atualizar o cache imediatamente. A decisão depende do custo de recarregar e do quão “quente” é a chave após a escrita.
No modelo de invalidar, a próxima leitura sofre miss e repovoa o cache, simplificando lógica. No modelo write-through, a escrita no banco é seguida de atualização do cache com o valor já fresco, reduzindo miss logo depois da alteração. Para funcionar bem, é importante disciplina de chaves, com prefixos e templates consistentes, como entidade por ID e listas por filtros. O exemplo abaixo mostra as duas opções aplicadas à mesma operação.
import json
from typing import Any, Dict
def atualizar_produto(db: Any, produto_id: str, patch: Dict[str, Any], redis_cli: Any) -> None:
db.atualizar(produto_id, patch)
chave = f"produto:{produto_id}"
# opção 1: invalidação simples na escrita
redis_cli.delete(chave)
# opção 2: write-through para manter quente (descomentar se fizer sentido)
# produto_fresco = db.ler(produto_id)
# redis_cli.set(chave, json.dumps(produto_fresco), ex=600)
Padrão 7: Proteção contra stampede (dogpile lock e renovação probabilística)
Cache stampede é quando muitas requisições tentam recomputar ao mesmo tempo, geralmente porque uma chave muito acessada expirou para todos de uma vez. Isso pode causar picos de CPU e sobrecarga no banco, formando filas e aumentando p95. Uma estratégia é o dogpile lock, em que apenas um processo recalcula enquanto os outros aguardam pouco ou servem um valor anterior. Outra estratégia é evitar expirações sincronizadas, espalhando renovações ao longo do tempo.
No dogpile lock, uma trava com TTL controla quem recalcula, e os demais tentam servir algo aceitável sem bloquear indefinidamente. Uma técnica complementar é a renovação antecipada com probabilidade crescente quando o TTL restante é baixo, chamada aqui de “renovação probabilística”. Isso distribui recomputações e reduz a chance de tempestade no segundo exato da expiração. O conjunto das duas medidas melhora previsibilidade de latência e reduz picos intermitentes de CPU.
import math
import random
import time
from typing import Callable, Optional
import redis
redis_cli = redis.Redis(host="localhost", port=6379, decode_responses=True)
def deve_renovar(ttl_restante_segundos: int, beta: float = 1.5) -> bool:
# quanto menor o TTL restante, maior a probabilidade de renovar cedo
return random.random() < math.exp(-beta * ttl_restante_segundos)
def ler_relatorio_pesado(report_id: str, compute: Callable[[], str]) -> str:
chave = f"relatorio:{report_id}"
valor = redis_cli.get(chave)
if valor:
return valor
chave_lock = f"lock:{chave}"
# apenas um recalcula; lock cobre tempo estimado de computação
if redis_cli.set(chave_lock, "1", nx=True, ex=30):
try:
dados = compute()
redis_cli.set(chave, dados, ex=600)
redis_cli.set(f"stale:{chave}", dados, ex=900) # reserva para servir ligeiramente “antigo”
return dados
finally:
redis_cli.delete(chave_lock)
# outro worker recalcula: tenta servir reserva, evitando bloquear indefinidamente
stale = redis_cli.get(f"stale:{chave}")
if stale:
return stale
# pior caso: computa localmente sem cache
return compute()
def ler_com_renovacao_antecipada(chave: str, ttl_padrao: int, compute: Callable[[], str]) -> str:
valor = redis_cli.get(chave)
if valor is None:
# miss: computa e preenche
dados = compute()
redis_cli.set(chave, dados, ex=ttl_padrao)
return dados
ttl_restante = redis_cli.ttl(chave)
if ttl_restante is not None and ttl_restante > 0 and deve_renovar(ttl_restante):
chave_lock = f"lock:{chave}"
if redis_cli.set(chave_lock, "1", nx=True, ex=10):
try:
dados = compute()
redis_cli.set(chave, dados, ex=ttl_padrao)
return dados
finally:
redis_cli.delete(chave_lock)
return valor
Combinações práticas: quando sobrepor padrões melhora o resultado
Em produção, padrões de cache raramente aparecem isolados, porque cada camada reduz um tipo específico de custo. Cache por requisição reduz duplicidade dentro do mesmo ciclo de processamento, enquanto TTL reduz repetição em janelas de tempo. Já o Redis elimina recomputação entre processos e máquinas, compartilhando ganhos. Quando essas camadas se somam, a redução de CPU vem tanto de menos cálculos quanto de menos chamadas a dependências.
Uma combinação típica ocorre em endpoints que transformam dados repetidos: primeiro um cache por requisição evita duplicidade interna, depois um TTL em memória reduz recomputações em curto prazo, e por fim um read-through no Redis compartilha o resultado entre workers. Esse empilhamento tende a reduzir picos, porque o Redis absorve o calor entre instâncias, enquanto o cache local protege de micro-repetições. O cuidado principal é manter consistência: valores muito voláteis pedem TTL curto ou invalidação por escrita.
Boas práticas operacionais: chaves, limites e comportamento sob falhas
Um cache saudável precisa de limites claros para não virar um vazamento de memória ou um repositório de valores inúteis. Em memória, maxsize controla cardinalidade e evita crescimento infinito. No Redis, tamanho de valor e TTL determinam custo de rede, memória e taxa de expiração, influenciando latência. Em qualquer cenário, medir taxa de hit e contagem de chamadas ao “origin” (banco, API ou cálculo) ajuda a confirmar que o cache está entregando o ganho esperado.
Chaves consistentes evitam colisões e simplificam invalidação, por exemplo com padrões como “entidade:{id}” e “lista:{hash_filtros}”. Namespacing (prefixar chaves por serviço e ambiente) evita mistura de dados entre sistemas e impede que chaves iguais tenham significados diferentes. Em concorrência, locks nunca devem bloquear indefinidamente, e sempre deve existir um caminho de último recurso que compute sem esperar eternamente. Pequenas variações aleatórias (jitter) em retentativas reduzem sincronização e estabilizam o sistema sob picos.
Conclusão
Os sete padrões apresentados cobrem desde ganhos imediatos com lru_cache até estratégias com Redis que estabilizam múltiplos processos sob alta concorrência. Memoização de funções puras reduz CPU com simplicidade, enquanto TTL adiciona frescor controlado para leituras periódicas. Cache por requisição corta repetição dentro do mesmo ciclo, e memoização assíncrona evita múltiplas chamadas concorrentes para a mesma chave. No Redis, read-through, invalidação na escrita e proteção contra stampede formam a base para desempenho previsível e latências de cauda menores.