7 Padrões de Middleware em FastAPI para Reduzir p95/p99 e Acelerar APIs em Produção

Published on: 2025-12-29
Post image
pt fastapi-middleware performance-fastapi latencia-p95-p99 otimizacao-de-api fastapi-em-producao arquitetura-de-api python-async-api starlette-middleware api-de-alta-performance engenharia-de-software-backend

Middleware em FastAPI é uma camada executada antes e depois do código das rotas, funcionando como “encanamento” comum a toda a aplicação. Essa camada permite aplicar otimizações globais sem reescrever a lógica de cada endpoint, reduzindo trabalho repetido e diminuindo o tempo total de resposta.

Em APIs, a métrica mais sensível para experiência e estabilidade costuma ser a latência de cauda, como p95 e p99, que representam os 5% e 1% mais lentos. Sete padrões de middleware ajudam a atacar exatamente essas caudas: reuso de conexões HTTP, serialização JSON rápida, cache condicional com ETag, compressão seletiva, streaming de resposta, coalescência de requisições e circuit breaker com fallback. Em conjunto, esses padrões reduzem CPU, bytes transferidos e esperas por dependências externas.

Base técnica: FastAPI, Starlette e o que significa “latência de cauda”

FastAPI é um framework web em Python construído sobre o Starlette, que fornece as primitivas ASGI para requisições assíncronas. ASGI é uma especificação para servidores assíncronos, permitindo lidar com muitas conexões simultâneas sem bloquear a execução inteira. Latência de cauda (p95/p99) é mais importante do que média em cenários reais, porque picos de lentidão costumam vir de contenção, GC, rede, serialização e serviços upstream instáveis. Middleware é o ponto ideal para centralizar decisões de desempenho e consistência, evitando divergências entre endpoints. Essas otimizações são “drop-in”, pois entram no app sem exigir mudanças profundas na regra de negócio.

1) Pool global de cliente HTTP: reuso de conexões para cortar handshake

Chamadas a outros serviços costumam ser um grande custo invisível, principalmente quando cada requisição recria conexões. O reuso de conexões via pooling evita repetir DNS, TCP e TLS, diminuindo dezenas de milissegundos por salto de rede. Um único cliente HTTP assíncrono e global também centraliza timeouts e limites, aumentando previsibilidade. Em FastAPI, a forma recomendada é criar o cliente no ciclo de vida (lifespan) e fechá-lo ao final. A seguir está um exemplo completo com httpx usando HTTP/2 e limites de keep-alive.

from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI, Request


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Cliente HTTP global com pool de conexões (reduz handshakes repetidos)
    app.state.cliente_http = httpx.AsyncClient(
        http2=True,
        timeout=httpx.Timeout(5.0),
        limits=httpx.Limits(
            max_connections=200,
            max_keepalive_connections=100,
            keepalive_expiry=30.0,
        ),
        headers={"User-Agent": "api-fastapi/1.0"},
    )
    try:
        yield
    finally:
        await app.state.cliente_http.aclose()


app = FastAPI(lifespan=lifespan)


@app.get("/saude-upstream")
async def saude_upstream(request: Request):
    # Uso do cliente compartilhado sem recriar por requisição
    resp = await request.app.state.cliente_http.get("https://example.com/health")
    return {"status_upstream": resp.status_code}

2) ORJSON como resposta padrão: serialização JSON com menor custo de CPU

Em APIs que retornam muito JSON, serializar objetos pode consumir uma fração relevante do tempo total. ORJSON é uma biblioteca rápida de JSON, com implementação de alto desempenho, reduzindo custo de CPU em respostas grandes e frequentes. Definir uma classe de resposta padrão evita que cada rota escolha um caminho diferente e elimina “drift” de performance entre endpoints. Em FastAPI, isso é feito com default_response_class, que passa a ser aplicada por padrão a retornos serializáveis. O exemplo abaixo mostra a configuração global e um middleware de “segurança” para padronização, usado apenas quando se deseja reforçar consistência.

from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from starlette.responses import Response


app = FastAPI(default_response_class=ORJSONResponse)


@app.middleware("http")
async def garantir_orjson(request, call_next):
    response = await call_next(request)

    # Válvula de segurança: evita respostas fora do padrão caso algo retorne indevidamente
    # Observação: em Starlette/FastAPI, normalmente a rota já produz um Response.
    if isinstance(response, dict):
        return ORJSONResponse(response)

    if isinstance(response, Response):
        return response

    return ORJSONResponse(response)

3) Cache condicional com ETag: respostas 304 para reduzir payload e tempo

ETag é um identificador do conteúdo retornado, permitindo cache condicional com o cabeçalho If-None-Match. Quando o cliente envia a ETag e o conteúdo não mudou, o servidor responde 304 Not Modified, evitando reconstruir e retransmitir o corpo. Isso reduz CPU, banda e tempo de transferência, especialmente em endpoints de leitura com alta repetição. Como padrão de middleware, a ETag pode ser calculada após a resposta, adicionada no header e comparada com a enviada pelo cliente. O exemplo abaixo calcula a ETag a partir do corpo para respostas GET bem-sucedidas, com cuidado para reconstruir o body após consumir o iterador.

import hashlib
from fastapi import FastAPI, Request
from starlette.responses import Response

app = FastAPI()


async def _ler_corpo_resposta(response: Response) -> bytes | None:
    # Em streaming, body_iterator pode existir; em respostas comuns, pode haver response.body
    if getattr(response, "body", None) is not None:
        return response.body

    iterator = getattr(response, "body_iterator", None)
    if iterator is None:
        return None

    partes = []
    async for chunk in iterator:
        partes.append(chunk)
    return b"".join(partes)


@app.middleware("http")
async def etag_em_get(request: Request, call_next):
    response: Response = await call_next(request)

    if request.method != "GET":
        return response
    if not (200 <= response.status_code < 300):
        return response

    corpo = await _ler_corpo_resposta(response)
    if corpo is None:
        return response

    # Hash curto e estável para ETag (ideal para payloads não gigantes)
    etag = hashlib.blake2b(corpo, digest_size=8).hexdigest()
    etag_cliente = request.headers.get("if-none-match")

    if etag_cliente == etag:
        # 304 não inclui corpo; mantém ETag para validação
        return Response(status_code=304, headers={"ETag": etag})

    response.headers["ETag"] = etag
    response.body = corpo
    return response

4) Compressão seletiva: economizar rede sem pagar CPU em respostas pequenas

Compressão pode reduzir bastante o tempo de transmissão em respostas textuais grandes, como JSON, CSV e NDJSON. O custo é CPU, então comprimir tudo indiscriminadamente tende a piorar respostas pequenas e aumentar consumo de servidor. A abordagem eficiente usa um limiar mínimo e ignora tipos que já são compactados, como imagens e PDFs. No ecossistema Starlette, o GZipMiddleware é simples e amplamente compatível com clientes, sendo uma escolha prática para o padrão “seletivo”. O trecho abaixo adiciona compressão com tamanho mínimo e remove Content-Encoding quando o tipo for binário.

from fastapi import FastAPI
from starlette.middleware.gzip import GZipMiddleware

app = FastAPI()

# Comprime apenas a partir de 2 KB, evitando custo em respostas pequenas
app.add_middleware(GZipMiddleware, minimum_size=2048)


@app.middleware("http")
async def evitar_compressao_binarios(request, call_next):
    response = await call_next(request)

    content_type = (response.headers.get("content-type") or "").lower()
    tipos_binarios = ("image/", "application/pdf", "application/zip")

    if any(t in content_type for t in tipos_binarios):
        # Garante que não haja compressão acidental para conteúdo já compactado
        response.headers.pop("Content-Encoding", None)

    return response

5) Streaming de resposta: enviar dados enquanto o processamento continua

Streaming significa transmitir a resposta em partes, sem esperar o corpo inteiro ficar pronto. Isso reduz a latência percebida, pois os primeiros bytes chegam rápido, e também reduz pressão de memória em exportações grandes. Em FastAPI, StreamingResponse usa um iterador (ou gerador assíncrono) que vai produzindo “chunks” em bytes. Esse padrão é especialmente útil para relatórios, exportações e logs, desde que o formato permita ser emitido incrementalmente. O exemplo abaixo constrói um JSON grande em partes, assumindo que cada item já esteja em bytes válidos de JSON.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


async def iterador_caro_assincrono():
    # Exemplo didático: em produção, isso costuma iterar linhas do banco/serviço
    for i in range(1, 6):
        yield f'{{"id": {i}, "valor": "item-{i}"}}'.encode("utf-8")


async def gerador_json():
    yield b'{"items":['
    primeiro = True

    async for item in iterador_caro_assincrono():
        if not primeiro:
            yield b","
        else:
            primeiro = False
        yield item

    yield b"]}"


@app.get("/exportacao")
async def exportacao():
    return StreamingResponse(gerador_json(), media_type="application/json")

6) Coalescência de requisições (single-flight): evitar “stampede” em picos

Em picos, múltiplos clientes podem pedir o mesmo recurso simultaneamente, causando trabalho duplicado e derrubando p95/p99. Coalescência, também chamada single-flight, garante que apenas uma execução “carregue” o resultado enquanto as demais esperam. Isso é útil quando há cache com “miss”, reprocessamento caro, ou chamadas pesadas a banco e serviços externos. Uma implementação simples mantém um mapa em memória de chaves “em voo” para GETs idênticos. O exemplo abaixo é intencionalmente minimalista e funciona por processo, sendo suficiente para reduzir picos em um único worker.

import asyncio
from typing import Dict

from fastapi import FastAPI, Request
from starlette.responses import Response

app = FastAPI()

_em_voo: Dict[str, asyncio.Future] = {}


@app.middleware("http")
async def single_flight_get(request: Request, call_next):
    if request.method != "GET":
        return await call_next(request)

    chave = f"{request.url.path}?{request.url.query}"

    if chave in _em_voo:
        # Espera a primeira requisição concluir o trabalho pesado
        await _em_voo[chave]
        return await call_next(request)

    loop = asyncio.get_running_loop()
    futuro = loop.create_future()
    _em_voo[chave] = futuro

    try:
        response: Response = await call_next(request)
        if not futuro.done():
            futuro.set_result(True)
        return response
    finally:
        _em_voo.pop(chave, None)

7) Timeouts agressivos e circuit breaker com fallback em cache

Dependências lentas contaminam a cauda de latência, fazendo o serviço inteiro parecer instável. Um timeout curto limita o tempo máximo de espera, enquanto um circuit breaker “abre” por um período após falhas, evitando repetir chamadas que já estão causando atrasos. O fallback usa um cache recente, mesmo que levemente desatualizado, para manter respostas rápidas durante incidentes. Em middleware, isso pode ser aplicado sem alterar rotas, mas exige cuidado ao ler o corpo da resposta para armazenar no cache. O exemplo abaixo usa estruturas simples em memória por processo, com janela de abertura e cache de curta duração para respostas JSON.

import asyncio
import time
from typing import Any, Dict, Tuple

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.responses import Response

app = FastAPI()

# Estado simples por processo (didático)
_disjuntor = {"aberto_ate": 0.0}
_cache: Dict[str, Tuple[float, Any]] = {}  # chave -> (timestamp, payload)


async def _extrair_json_resposta(response: Response) -> Any | None:
    # Tenta obter o corpo em bytes e decodificar; simplificado para aplicações JSON
    if getattr(response, "body", None) is not None:
        corpo = response.body
    else:
        iterator = getattr(response, "body_iterator", None)
        if iterator is None:
            return None
        partes = []
        async for chunk in iterator:
            partes.append(chunk)
        corpo = b"".join(partes)

    if not corpo:
        return None

    # JSONResponse aceita dict/list; aqui mantém string como fallback simples
    try:
        import orjson
        return orjson.loads(corpo)
    except Exception:
        try:
            return corpo.decode("utf-8")
        except Exception:
            return None


@app.middleware("http")
async def breaker_com_fallback(request: Request, call_next):
    chave = str(request.url)
    agora = time.time()

    # Se o disjuntor estiver aberto, tenta responder do cache imediatamente
    if agora < _disjuntor["aberto_ate"] and chave in _cache:
        _, payload = _cache[chave]
        return JSONResponse(payload, headers={"X-Fallback": "circuito-aberto"})

    try:
        # Timeout global do middleware (pode ser refinado por rotas em apps maiores)
        response: Response = await asyncio.wait_for(call_next(request), timeout=1.5)

        content_type = (response.headers.get("content-type") or "").lower()
        if response.status_code == 200 and content_type.startswith("application/json"):
            payload = await _extrair_json_resposta(response)
            if payload is not None:
                _cache[chave] = (agora, payload)
                # Reconstrói resposta JSON a partir do payload para não perder o corpo consumido
                return JSONResponse(payload, headers=dict(response.headers))

        return response

    except asyncio.TimeoutError:
        # Abre o circuito por um curto período para proteger p95/p99
        _disjuntor["aberto_ate"] = agora + 10.0
        if chave in _cache:
            _, payload = _cache[chave]
            return JSONResponse(payload, headers={"X-Fallback": "timeout"})
        raise

Como os padrões se combinam para reduzir p95/p99 sem reescrever rotas

Os ganhos mais consistentes vêm de remover custos fixos repetidos e de proteger o sistema contra picos e lentidão externa. Pool global de HTTP reduz latência de rede “por salto”, enquanto ORJSON corta CPU em praticamente toda resposta JSON. ETag e compressão seletiva diminuem bytes transmitidos e evitam trabalho quando nada mudou. Streaming melhora tempo até o primeiro byte e reduz memória em exportações. Coalescência e circuit breaker estabilizam a cauda sob carga, eliminando duplicação e evitando filas causadas por upstream.

Os cenários mais comuns cobertos por esse conjunto incluem endpoints de leitura com alto volume, exportações grandes, e serviços que agregam dados de outros sistemas. Também há proteção para “thundering herd”, quando muitos clientes provocam recomputação simultânea após expiração de cache. Em operações com múltiplos processos ou múltiplas instâncias, estruturas em memória continuam úteis localmente, mas exigem coordenação externa para coalescência e cache compartilhado. Mesmo com limitações, os padrões melhoram a previsibilidade e reduzem picos sem alterar a modelagem do domínio.

Conclusão

Middleware oferece alavancagem porque aplica decisões de desempenho de forma centralizada, repetível e consistente. Pooling de conexões, serialização eficiente, cache condicional, compressão seletiva e streaming atacam custos frequentes de CPU e rede. Coalescência e circuit breaker com fallback protegem a latência de cauda quando a carga aumenta ou quando dependências ficam instáveis. Em conjunto, esses padrões reduzem p95/p99 de maneira prática, mantendo a lógica das rotas intacta. O resultado é uma API mais rápida, mais estável e com comportamento mais previsível em produção.