Async SQLAlchemy 2.0 com PostgreSQL: 12 Padrões Práticos para Código Limpo e Escalável

Published on: 2025-12-26
Post image
pt async-sqlalchemy sqlalchemy-20 async-sqlalchemy-postgres sqlalchemy-asyncpg python-async-database postgresql-async-python sqlalchemy-patterns sqlalchemy-best-practices async-session-sqlalchemy postgres-async-performance sqlalchemy-upsert-postgr

O uso de SQLAlchemy 2.0 em modo assíncrono com PostgreSQL permite alta concorrência e bom aproveitamento de I/O, especialmente quando combinado com o driver asyncpg. Ao mesmo tempo, a camada assíncrona adiciona detalhes importantes sobre ciclo de vida de sessão, transações, concorrência e observabilidade, que podem causar comportamentos imprevisíveis quando ignorados.

Um conjunto de padrões práticos ajuda a manter o código limpo, previsível e eficiente. A organização a seguir reúne doze práticas essenciais para sessões curtas, transações explícitas, consultas modernas, upserts, paginação eficiente, streaming de grandes resultados, uso correto de JSONB, controle otimista de concorrência, inserções em lote e medições de desempenho.

Engine assíncrona e ciclo de vida de sessão

Em SQLAlchemy assíncrono, o engine representa a configuração de conexão e o pool (conjunto de conexões reutilizáveis) do processo. A AsyncSession representa uma unidade curta de trabalho, normalmente limitada a uma requisição, job ou operação. Um padrão robusto cria um engine único por processo e abre sessões curtas sob demanda. Transações explícitas reduzem ambiguidades e evitam estados intermediários difíceis de depurar.

O bloco session.begin() define a fronteira transacional e garante commit ao final, ou rollback em caso de erro. O parâmetro expire_on_commit=False evita expiração automática de atributos após commit, reduzindo consultas inesperadas depois de salvar. Ajustes de pool como pool_size, max_overflow e pool_timeout controlam concorrência e tempo de espera por conexão. O parâmetro pool_recycle ajuda a renovar conexões antigas e reduzir problemas com timeouts de infraestrutura.

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

URL_BANCO = "postgresql+asyncpg://app:pwd@db/app"

engine = create_async_engine(
    URL_BANCO,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
)

SessionLocal = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

async def operacao_exemplo():
    async with SessionLocal() as session:
        async with session.begin():
            # Operações de leitura/escrita dentro da mesma transação
            pass

Unit of Work (Unidade de Trabalho) com gerenciador de contexto

O padrão de Unit of Work concentra abertura e fechamento de sessão, além do controle transacional, em um único ponto. Isso reduz repetição e padroniza o comportamento quando exceções ocorrem. Em aplicações web, esse padrão costuma ser usado como dependência ou wrapper de serviço. Em testes, o mesmo contrato permite injetar uma sessão de teste, mantendo o código de domínio estável.

Um asynccontextmanager cria um gerenciador de contexto assíncrono que garante limpeza correta de recursos. O yield expõe a sessão já dentro da transação, simplificando a função chamadora. Se ocorrer erro, o contexto encerra a transação com rollback automaticamente. A sessão é sempre fechada ao final, evitando vazamento de conexões no pool.

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession

@asynccontextmanager
async def uow(session_factory):
    async with session_factory() as session:  # type: AsyncSession
        async with session.begin():
            yield session

Modelos ORM tipados e consultas com select() no estilo 2.0

O estilo 2.0 do SQLAlchemy favorece modelos com anotações de tipo e consultas com select(). O termo ORM significa “mapeamento objeto-relacional”, isto é, classes Python representando tabelas. O tipo Mapped indica que o atributo é persistido no banco. O mapped_column define coluna, tipo, restrições e índices de forma declarativa.

A API de resultados também muda a forma de consumir dados. Métodos como scalar_one_or_none() retornam uma instância ou None de forma previsível. Isso reduz ambiguidade entre “linha” e “objeto”, e evita erros comuns ao lidar com tuplas de colunas. O padrão a seguir busca por e-mail, limitando a um resultado.

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import String, select

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(320), unique=True, index=True)
    name: Mapped[str]

async def buscar_por_email(session: AsyncSession, email: str) -> User | None:
    stmt = select(User).where(User.email == email).limit(1)
    result = await session.execute(stmt)
    return result.scalar_one_or_none()

Defesa contra N+1 com selectinload e load_only

O problema N+1 ocorre quando uma consulta principal carrega N registros e, para cada registro, dispara mais uma consulta adicional para relacionamentos. Em cenários assíncronos, isso aumenta latência e pressiona o pool de conexões. A opção selectinload carrega relacionamentos em poucas consultas adicionais, geralmente uma por relacionamento, em vez de uma por linha. A opção load_only limita colunas, reduzindo payload e custo de decodificação.

Essa estratégia permite buscar apenas o necessário, com previsibilidade. O relacionamento é populado com uma consulta adicional baseada em IN, evitando explosão de round-trips. Ao mesmo tempo, limitar colunas evita carregar textos grandes ou colunas raramente usadas. Esse padrão é valioso ao montar telas, APIs ou relatórios de listagem.

from sqlalchemy import select
from sqlalchemy.orm import selectinload, load_only
from sqlalchemy.ext.asyncio import AsyncSession

# Supõe-se que User tenha relacionamento "posts" com Post
async def buscar_usuarios_com_posts(session: AsyncSession, ids: list[int]):
    stmt = (
        select(User)
        .options(
            load_only(User.id, User.name),
            selectinload(User.posts).load_only(Post.id, Post.title),
        )
        .where(User.id.in_(ids))
    )
    result = await session.execute(stmt)
    return result.scalars().all()

Transações e retry otimista para isolamento SERIALIZABLE

No PostgreSQL, o isolamento SERIALIZABLE oferece forte consistência, mas pode gerar falhas de serialização sob alta concorrência. Em termos simples, o banco rejeita uma transação quando não consegue garantir que o resultado seja equivalente a uma execução em série. Nesses casos, a forma correta de lidar é fazer retry (tentativa novamente) da transação inteira, pois o erro é transitório. Em aplicações assíncronas, o retry precisa ser cuidadoso para não repetir efeitos colaterais externos.

O padrão a seguir reexecuta apenas quando a exceção indica problema de serialização. Um backoff exponencial (espera crescente) reduz contenção, e um pequeno atraso impede loops agressivos. O erro genérico OperationalError é uma categoria que inclui falhas do driver e do banco; por isso, o filtro por mensagem ou código é importante. Essa função é útil para trechos “quentes”, com muitas atualizações concorrentes.

import asyncio
from sqlalchemy.exc import OperationalError

async def executar_com_retry_serializavel(session_factory, funcao, tentativas: int = 3):
    for i in range(tentativas):
        async with session_factory() as session:
            try:
                async with session.begin():
                    return await funcao(session)
            except OperationalError as e:
                msg = str(e).lower()
                erro_serializacao = "could not serialize" in msg or "serialization" in msg
                if erro_serializacao and i < tentativas - 1:
                    await asyncio.sleep(0.05 * (2 ** i))  # backoff simples
                    continue
                raise

Upsert com ON CONFLICT para merge previsível

Upsert é a junção de “update” e “insert”: insere se não existir, atualiza se já existir. No PostgreSQL, isso é feito com ON CONFLICT, que define o que ocorre quando uma restrição única é violada. O SQLAlchemy oferece helpers do dialeto PostgreSQL para expressar isso com clareza. Esse padrão é especialmente útil para operações idempotentes, onde repetir a chamada não deve mudar o resultado final indevidamente.

O comando pode retornar colunas usando RETURNING, evitando uma consulta adicional. O atributo index_elements define quais colunas compõem o conflito, geralmente uma coluna com UNIQUE. O dicionário set_ define quais campos serão atualizados no conflito. O método mappings() retorna resultados como dicionários, o que facilita integração com camadas de serialização.

from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import AsyncSession

async def upsert_usuario(session: AsyncSession, email: str, name: str) -> dict:
    stmt = (
        insert(User)
        .values(email=email, name=name)
        .on_conflict_do_update(
            index_elements=[User.email],
            set_={"name": name},
        )
        .returning(User.id, User.email, User.name)
    )
    result = await session.execute(stmt)
    return result.mappings().one()

Paginação por keyset (seek) em vez de OFFSET

A paginação com OFFSET perde desempenho em páginas altas porque o banco precisa descartar muitas linhas antes de entregar o resultado. A paginação por keyset (também chamada “seek”) usa um marcador, como o último id retornado, para buscar a próxima página. Isso mantém o custo estável com o crescimento da tabela. Em feeds e listagens ordenadas por id ou data, esse padrão é mais previsível.

O marcador costuma ser o menor id (ou a última data) da página anterior, dependendo da ordenação. Quando ordenado de forma decrescente, a próxima página filtra por valores menores. Esse formato também é mais estável quando novos registros são inseridos, pois não “desloca” páginas antigas como o OFFSET. O retorno de next_after funciona como o próximo ponto de continuação.

from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession

async def feed_posts(session: AsyncSession, after_id: int | None, limite: int = 50):
    stmt = select(Post).order_by(desc(Post.id)).limit(limite)
    if after_id is not None:
        stmt = stmt.where(Post.id < after_id)

    result = await session.execute(stmt)
    rows = result.scalars().all()

    proximo_after = rows[-1].id if rows else None
    return rows, proximo_after

Streaming de resultados grandes com cursores do servidor

Consultas muito grandes podem consumir memória excessiva ao carregar todas as linhas de uma vez. O streaming permite iterar pelos resultados em partes, reduzindo uso de RAM e melhorando resiliência em exportações e ETL. No SQLAlchemy assíncrono, o método stream() produz um AsyncResult que pode ser consumido incrementalmente. A opção stream_results pede ao driver um comportamento mais “cursor-like”.

O parâmetro yield_per sugere o tamanho do lote buscado por vez. O método partitions() agrupa linhas em blocos, o que reduz overhead de loop e permite processamento por lote. Esse padrão é apropriado para tarefas longas, desde que a transação e a conexão permaneçam abertas durante o consumo. Por isso, costuma ser usado em jobs dedicados e não em caminhos de requisições curtas.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def processar_eventos_grandes(session: AsyncSession):
    stmt = select(Event).execution_options(stream_results=True, yield_per=1000)
    result = await session.stream(stmt)

    async for lote in result.partitions(1000):
        for evento in lote:
            # Processamento essencial por linha
            tratar_evento(evento)

JSONB com MutableDict e índice GIN

O tipo JSONB do PostgreSQL armazena JSON em formato binário, permitindo indexação e buscas eficientes. Em SQLAlchemy, atualizações em campos JSON podem não ser detectadas quando ocorre mutação interna de dicionários. O MutableDict resolve isso ao rastrear mudanças internas e marcar a coluna como alterada. Assim, mudanças parciais como adicionar uma chave geram um UPDATE correto.

Para consultas rápidas, um índice GIN (Generalized Inverted Index) acelera filtros sobre chaves e valores em JSONB. O índice é criado no nível do banco e expresso no modelo com Index. Esse padrão mantém flexibilidade de esquema em metadados sem perder desempenho em filtros seletivos. Ele também reduz a necessidade de normalizar cada atributo dinâmico em colunas separadas.

from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy import Index
from sqlalchemy.orm import Mapped, mapped_column

class Profile(Base):
    __tablename__ = "profiles"

    id: Mapped[int] = mapped_column(primary_key=True)
    meta: Mapped[dict] = mapped_column(MutableDict.as_mutable(JSONB), default=dict)

Index("ix_profiles_meta_gin", Profile.meta, postgresql_using="gin")

async def atualizar_tema(session: AsyncSession, profile_id: int):
    profile = await session.get(Profile, profile_id)
    if profile is None:
        return None

    profile.meta["theme"] = "dark"  # alteração rastreada
    return profile

Concorrência otimista com versionamento (version_id_col)

Em sistemas com edição concorrente, o problema “última gravação vence” pode apagar mudanças de outra transação. A concorrência otimista assume que conflitos são raros, mas detecta quando ocorrem. O SQLAlchemy suporta isso por meio de uma coluna de versão, incrementada a cada update. Se um update tentar gravar sobre uma versão antiga, ocorre uma exceção indicando conflito.

O campo de versão costuma ser um inteiro inicializado no banco com server_default. O mapeamento define __mapper_args__ para ativar o controle. Quando duas transações carregam o mesmo registro e tentam atualizar, a segunda encontra versão divergente e falha. Esse mecanismo preserva intenção e evita sobrescritas silenciosas.

from sqlalchemy import Integer
from sqlalchemy.orm import Mapped, mapped_column

class Doc(Base):
    __tablename__ = "docs"

    id: Mapped[int] = mapped_column(primary_key=True)
    body: Mapped[str]
    version_id: Mapped[int] = mapped_column(Integer, nullable=False, server_default="1")

    __mapper_args__ = {"version_id_col": version_id}

Inserções em lote com retorno de IDs usando RETURNING

Inserções em lote aumentam throughput ao reduzir overhead de múltiplos round-trips. Em SQLAlchemy, é possível enviar uma lista de dicionários e deixar o driver executar em modo “executemany”. O uso de RETURNING permite obter os IDs gerados sem uma consulta adicional. Esse padrão evita a complexidade de soluções de cópia em massa quando o volume é moderado.

Em lotes grandes, a estratégia comum é quebrar em blocos de alguns milhares para equilibrar tempo de transação e memória. Envolver tudo em uma única transação mantém consistência e reduz custo de commit repetido. O retorno pode ser convertido em lista simples para uso em etapas seguintes. Esse formato também facilita rastrear quantos registros foram efetivamente criados.

from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession

async def inserir_usuarios_em_lote(session: AsyncSession, linhas: list[dict]) -> list[int]:
    stmt = insert(User).returning(User.id)
    result = await session.execute(stmt, linhas)
    return [r[0] for r in result.all()]

Observabilidade e timeouts para tornar o sistema mensurável

Sem métricas e limites, problemas de performance se transformam em “travamentos misteriosos”. O statement_timeout do PostgreSQL limita o tempo máximo de execução de uma consulta, evitando que uma query ruim degrade o sistema inteiro. A configuração pode ser aplicada por conexão, normalmente no início de uma transação ou no setup do pool. Comentários em SQL ajudam a rastrear chamadas específicas em logs do banco e ferramentas internas.

O SQLAlchemy permite adicionar um prefixo de comentário com prefix_with para um dialeto específico. A opção execution_options pode ajustar comportamento de execução, como desabilitar cache compilado em investigações pontuais. Métricas úteis incluem latência por operação, contagem de linhas retornadas e classe de erro. Uma camada pequena de instrumentação ao redor de execute() fornece consistência de coleta sem poluir regras de negócio.

from sqlalchemy import text, select
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

async def configurar_timeout(engine: AsyncEngine):
    async with engine.begin() as conn:
        await conn.execute(text("SET statement_timeout = 3000"))

async def buscar_usuario_com_comentario(session: AsyncSession, email: str):
    stmt = (
        select(User)
        .where(User.email == email)
        .prefix_with("/* get_user_by_email v1 */", dialect="postgresql")
    )
    result = await session.execute(stmt)
    return result.scalar_one_or_none()

Exemplo integrado de “caminho limpo” com serviço e DTOs

Uma organização comum separa o acesso a dados (funções de repositório), a lógica de serviço (casos de uso) e os esquemas de entrada/saída. Em Python, Pydantic é usado para validar e estruturar dados, funcionando como DTO (objeto de transferência de dados). O serviço recebe uma sessão já gerenciada pelo ciclo de vida padrão e executa operações como upsert. O retorno é convertido para um modelo de saída, mantendo consistência de formato.

Esse arranjo reduz acoplamento entre camada HTTP e persistência. O mesmo serviço pode ser reutilizado em CLI, worker ou testes, bastando fornecer uma sessão. A validação de entrada impede dados inválidos antes do acesso ao banco. A validação de saída padroniza o que é exposto, evitando “vazamento” de campos internos do ORM.

from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession

class CreateUserIn(BaseModel):
    email: str
    name: str

class UserOut(BaseModel):
    id: int
    email: str
    name: str

async def criar_ou_atualizar_usuario(session: AsyncSession, data: CreateUserIn) -> UserOut:
    row = await upsert_usuario(session, email=data.email, name=data.name)
    return UserOut(**row)

Conclusão

O SQLAlchemy 2.0 em modo assíncrono favorece explicitidade: engine único por processo, sessões curtas e transações bem definidas. Consultas no estilo select() e consumo correto de resultados tornam o fluxo previsível, enquanto padrões como selectinload e load_only evitam explosões de consultas e dados. Em PostgreSQL, recursos nativos como ON CONFLICT, JSONB com GIN e RETURNING trazem desempenho e clareza sem soluções improvisadas.

Em cenários concorrentes, retries controlados para falhas de serialização e controle otimista por versionamento reduzem inconsistências e sobrescritas silenciosas. Para grandes volumes, streaming e inserções em lote equilibram memória, latência e throughput. Com timeouts e observabilidade mínima, o comportamento passa a ser mensurável e menos sujeito a “surpresas”, mantendo a base assíncrona organizada e sustentável.