Como Criar um Sistema RAG Privado com Django, pgvector e Hugging Face para IA Segura e Escalável em 2026

Published on: 2026-01-27
Post image
pt rag-privado sistema-rag-com-django rag-com-pgvector embeddings-hugging-face ia-privada-para-empresas ia-segura-com-dados-sensiveis busca-semantica-com-postgresql django-inteligencia-artificial rag-self-hosted banco-vetorial-postgresql pgvector

Sistemas de RAG (Geração Aumentada por Recuperação) combinam busca em uma base de conhecimento com geração de texto, permitindo respostas mais úteis e fundamentadas em documentos reais. Em cenários sensíveis, como saúde, finanças e ambientes corporativos, a prioridade costuma ser manter dados e processos dentro da própria infraestrutura, reduzindo riscos de vazamento e dependência de serviços externos.

Um RAG privado pode ser construído com Django, PostgreSQL e a extensão pgvector, usando embeddings gerados localmente por modelos da Hugging Face. Essa abordagem cria uma base de conhecimento pesquisável por similaridade semântica, com integração ao ORM do Django, controle de acesso e trilha de auditoria, formando um pipeline completo do texto bruto até a recuperação do contexto.

Por que RAG privado é importante em ambientes sensíveis

Um RAG comum costuma enviar textos para APIs externas para gerar embeddings ou consultar modelos, o que pode ser incompatível com requisitos internos e regulatórios. Em sistemas privados, os dados permanecem “em casa”, e o processamento ocorre em servidores controlados, com regras próprias de acesso e retenção. Isso reduz a superfície de ataque e facilita a aplicação de políticas de segurança, como segmentação por departamento ou por paciente. Também diminui dependências operacionais, como limites de requisição e mudanças de preço de terceiros.

Em termos de privacidade, o ponto crítico é que documentos podem conter identificadores diretos e indiretos, e até “pedaços” de texto podem ser sensíveis. Mesmo que um provedor prometa não treinar com dados, ainda há trânsito de informação e riscos de registro indevido. Em um desenho privado, as camadas de rede, banco, logs e autenticação ficam sob governança interna. Isso ajuda a alinhar o sistema a controles como minimização de dados, rastreabilidade e segregação por finalidade.

Visão geral do pipeline de um RAG com Django

Um pipeline de RAG organiza etapas previsíveis: ingestão de documentos, quebra em partes menores, geração de embeddings, armazenamento vetorial, recuperação por similaridade e montagem de contexto. O termo embedding descreve um vetor numérico que representa o significado do texto, permitindo medir proximidade sem depender apenas de palavras iguais. A extensão pgvector adiciona ao PostgreSQL um tipo de dado vetorial e operadores de distância, viabilizando busca por similaridade. No Django, serviços isolados em módulos tornam o fluxo mais testável e fácil de evoluir.

O ciclo de consulta costuma seguir uma linha: a pergunta vira embedding, o banco retorna trechos mais parecidos e esses trechos formam o “contexto” para gerar uma resposta. Mesmo quando a geração de resposta não é implementada no mesmo serviço, o componente de recuperação já entrega o essencial: texto relevante e metadados. Isso permite controlar filtros, como tipo de documento, tags e status ativo. Também permite auditoria ao registrar o que foi recuperado para cada consulta.

Pré-requisitos e dependências do projeto

Uma base sólida começa com versões compatíveis e um conjunto mínimo de bibliotecas. O Python 3.10+ oferece suporte moderno e estabilidade para dependências de IA. O PostgreSQL 15+ facilita o uso de extensões recentes e recursos de indexação. O Django 5.x e o Django REST Framework expõem APIs com autenticação e padronização de respostas.

A lista a seguir exemplifica um arquivo de dependências fixando versões para reduzir variações de comportamento. Ela inclui bibliotecas de embeddings e utilitários de processamento de texto. O termo Sentence Transformers descreve uma família de modelos e ferramentas voltadas a gerar embeddings de frases e parágrafos. O uso de torch é comum porque muitos modelos rodam sobre PyTorch.

Django==5.0.1
djangorestframework==3.14.0
psycopg2-binary==2.9.9
pgvector==0.2.4
sentence-transformers==2.3.1
torch==2.1.2
transformers==4.36.2
langchain==0.1.0
numpy==1.26.3

PostgreSQL com pgvector: instalação e ativação

O pgvector precisa ser instalado no servidor e ativado no banco desejado. No Linux, costuma exigir headers de desenvolvimento do PostgreSQL para compilar a extensão. A extensão cria o tipo vector e operadores de distância como cosseno, que são usados na comparação entre embeddings. Em ambientes produtivos, a instalação tende a ser feita por pacotes do sistema ou imagem de container padronizada, mantendo reprodutibilidade.

Os comandos abaixo representam um fluxo típico: instalar dependências, compilar a extensão e habilitar no banco. O comando CREATE EXTENSION registra a funcionalidade no banco específico, não globalmente. Isso permite separar bancos que usam vetores de bancos que não usam, reduzindo escopo. Uma vez habilitado, o Django pode usar campos vetoriais via biblioteca pgvector.

sudo apt-get install postgresql postgresql-contrib postgresql-server-dev-15
git clone https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install

sudo -u postgres psql
CREATE EXTENSION IF NOT EXISTS vector;

Configuração do Django para PostgreSQL e pgvector

O Django precisa apontar para o banco PostgreSQL e incluir o app que integra o pgvector. A configuração em settings.py define credenciais, host e porta, que variam por ambiente. Em produção, variáveis de ambiente são preferíveis para senhas, reduzindo risco de vazamento em repositório. Além disso, a inclusão de apps garante que campos e utilitários do pgvector fiquem disponíveis.

O exemplo abaixo mostra uma configuração direta e comum para ambientes locais. O termo ENGINE define o backend do banco, e a seção INSTALLED_APPS registra dependências e módulos do projeto. Em sistemas maiores, podem existir múltiplos bancos com roteamento, mas o núcleo do RAG normalmente vive em um banco principal. O ponto importante é garantir que migrações e consultas usem a mesma base habilitada com a extensão.

# settings.py
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "rag_db",
        "USER": "postgres",
        "PASSWORD": "sua_senha_aqui",
        "HOST": "localhost",
        "PORT": "5432",
    }
}

INSTALLED_APPS = [
    # ... outros apps
    "rest_framework",
    "pgvector",
    "rag_app",
]

Modelagem de dados: documentos, chunks e histórico de consultas

A modelagem separa o documento original de seus “pedaços”, chamados chunks, para melhorar a recuperação. Um chunk é um trecho menor do texto, usado porque embeddings têm melhor desempenho com entradas de tamanho moderado. A classe Document guarda metadados e o texto integral, enquanto DocumentChunk guarda o texto do trecho e o vetor embedding. O modelo QueryHistory registra consultas e resultados recuperados, apoiando auditoria e melhoria contínua.

A definição abaixo usa VectorField, que representa o vetor no banco com dimensão fixa, como 384 para “all-MiniLM-L6-v2”. Campos como tags e metadata ajudam filtragens por domínio e controles de acesso em nível de aplicação. Índices tradicionais em colunas comuns aceleram filtros e paginação. Já o índice vetorial é criado via migração SQL por causa do tipo específico de índice.

# rag_app/models.py
from django.db import models
from pgvector.django import VectorField


class Document(models.Model):
    title = models.CharField(max_length=500)
    content = models.TextField()
    source = models.CharField(max_length=255, blank=True)
    document_type = models.CharField(max_length=50)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)

    tags = models.JSONField(default=list, blank=True)
    metadata = models.JSONField(default=dict, blank=True)

    class Meta:
        db_table = "documents"
        indexes = [
            models.Index(fields=["document_type", "is_active"]),
            models.Index(fields=["created_at"]),
        ]

    def __str__(self):
        return f"{self.title} ({self.document_type})"


class DocumentChunk(models.Model):
    document = models.ForeignKey(
        Document,
        on_delete=models.CASCADE,
        related_name="chunks",
    )
    chunk_text = models.TextField()
    chunk_index = models.IntegerField()

    embedding = VectorField(dimensions=384)
    token_count = models.IntegerField(default=0)

    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = "document_chunks"
        indexes = [
            models.Index(fields=["document", "chunk_index"]),
        ]

    def __str__(self):
        return f"Chunk {self.chunk_index} de {self.document.title}"


class QueryHistory(models.Model):
    query_text = models.TextField()
    query_embedding = VectorField(dimensions=384)
    response = models.TextField(blank=True)
    retrieved_chunks = models.JSONField(default=list)
    relevance_score = models.FloatField(null=True, blank=True)
    user_feedback = models.IntegerField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = "query_history"
        indexes = [
            models.Index(fields=["-created_at"]),
        ]

Indexação vetorial: como era e como fica com pgvector

Antes do pgvector, uma busca semântica em PostgreSQL exigia soluções externas, armazenamento em sistemas especializados ou aproximações por palavras-chave. Isso criava um “vão” arquitetural: o banco guardava texto e metadados, mas a similaridade ficava fora, com integrações complexas. Com pgvector, o PostgreSQL passa a armazenar embeddings e executar a busca por distância diretamente. Isso simplifica o desenho, mantém transações e reduz o número de componentes obrigatórios.

Para performance, a indexação vetorial é decisiva quando há muitos chunks. O índice IVFFlat acelera busca aproximada, trocando um pouco de precisão por velocidade e escalabilidade. O índice HNSW pode fornecer alta qualidade com boa performance, mas tem custos de construção e memória diferentes. A migração abaixo cria um índice IVFFlat com distância do cosseno, que é comum para embeddings normalizados.

# rag_app/migrations/0002_add_vector_index.py
from django.db import migrations


class Migration(migrations.Migration):
    dependencies = [
        ("rag_app", "0001_initial"),
    ]

    operations = [
        migrations.RunSQL(
            sql="""
                CREATE INDEX document_chunks_embedding_idx
                ON document_chunks
                USING ivfflat (embedding vector_cosine_ops)
                WITH (lists = 100);
            """,
            reverse_sql="DROP INDEX IF EXISTS document_chunks_embedding_idx;",
        ),
    ]

Embeddings locais com Hugging Face: serviço de geração

Embeddings locais eliminam a necessidade de enviar dados para APIs externas, pois o modelo roda no próprio servidor. O serviço de embeddings encapsula o carregamento do modelo e oferece funções para gerar vetores de textos individuais e em lote. A expressão “carregamento preguiçoso” descreve carregar o modelo apenas quando necessário, economizando memória em processos que podem iniciar sem uso imediato. A dimensão do vetor precisa ser coerente com o campo VectorField no banco.

O exemplo usa SentenceTransformer e retorna listas de floats, formato adequado para salvar no pgvector. A função de similaridade por cosseno pode apoiar depuração e testes, embora a busca real seja feita no banco. O tratamento de erros e logs ajuda a identificar textos problemáticos e falhas do ambiente. O termo batch representa processar vários textos de uma vez, melhorando throughput.

# rag_app/services/embedding_service.py
from typing import List
import logging
import numpy as np
from sentence_transformers import SentenceTransformer

logger = logging.getLogger(__name__)


class EmbeddingService:
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.model_name = model_name
        self._model = None
        self.dimension = 384 if "MiniLM-L6" in model_name else 768

    @property
    def model(self) -> SentenceTransformer:
        if self._model is None:
            logger.info("Carregando modelo de embeddings: %s", self.model_name)
            self._model = SentenceTransformer(self.model_name)
            logger.info("Modelo carregado com sucesso")
        return self._model

    def generate_embedding(self, text: str) -> List[float]:
        try:
            embedding = self.model.encode(text, convert_to_numpy=True)
            return embedding.tolist()
        except Exception as exc:
            logger.error("Falha ao gerar embedding: %s", exc)
            raise

    def generate_embeddings_batch(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
        try:
            embeddings = self.model.encode(
                texts,
                convert_to_numpy=True,
                batch_size=batch_size,
                show_progress_bar=False,
            )
            return embeddings.tolist()
        except Exception as exc:
            logger.error("Falha ao gerar embeddings em lote: %s", exc)
            raise

    def similarity(self, emb1: List[float], emb2: List[float]) -> float:
        v1 = np.array(emb1)
        v2 = np.array(emb2)
        return float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))


embedding_service = EmbeddingService()

Estratégia de chunking: por que quebrar textos e como fazer

Chunking é a técnica de dividir um documento grande em trechos menores para indexação e recuperação. Sem chunking, embeddings de textos longos podem perder foco semântico e gerar resultados menos precisos. Além disso, modelos têm limites de tamanho de entrada e custos que crescem com o texto. O uso de overlap (sobreposição) preserva contexto entre trechos consecutivos, reduzindo o risco de “cortar” uma ideia no meio.

Uma estratégia simples divide por limites naturais, como parágrafos, e recorre a separadores menores quando necessário. O RecursiveCharacterTextSplitter ajuda a priorizar separações mais “humanas” antes de cortar por tamanho. O serviço abaixo também calcula uma contagem aproximada de tokens por palavras, útil para controlar quanto texto vai para contexto. Em sistemas mais rígidos, o token_count pode ser calculado por tokenizador real, mas a aproximação já ajuda no começo.

# rag_app/services/chunking_service.py
from typing import Dict, List
from langchain.text_splitter import RecursiveCharacterTextSplitter


class ChunkingService:
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", " ", ""],
            length_function=len,
        )

    def chunk_document(self, text: str, metadata: Dict | None = None) -> List[Dict]:
        chunks = self.text_splitter.split_text(text)
        resultado: List[Dict] = []

        for idx, chunk_text in enumerate(chunks):
            chunk_limpo = chunk_text.strip()
            if not chunk_limpo:
                continue

            resultado.append(
                {
                    "text": chunk_limpo,
                    "index": idx,
                    "token_count": len(chunk_limpo.split()),
                    "metadata": metadata or {},
                }
            )

        return resultado

    def chunk_with_semantic_splitting(self, text: str) -> List[str]:
        paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
        chunks: List[str] = []
        atual = ""

        for para in paragraphs:
            if len(atual) + len(para) > self.chunk_size and atual:
                chunks.append(atual.strip())
                atual = para
            else:
                atual = (atual + "\n\n" + para) if atual else para

        if atual:
            chunks.append(atual.strip())

        return chunks


chunking_service = ChunkingService()

Ingestão de documentos: do texto bruto ao armazenamento vetorial

A ingestão transforma um documento em registros pesquisáveis: cria o documento, gera chunks, cria embeddings e salva tudo de forma consistente. O decorator transaction.atomic garante atomicidade, significando que, se algo falhar no meio, a operação pode ser revertida, evitando dados “pela metade”. A criação em lote com bulk_create reduz tempo e carga no banco quando há muitos chunks. Logs ajudam a acompanhar volume, tempo e falhas por documento.

Além de inserir, a ingestão precisa cobrir atualização, pois alterações no conteúdo exigem regenerar chunks e embeddings. O fluxo de atualização costuma apagar chunks antigos e recriar novos, mantendo coerência. Em bases grandes, pode haver versionamento e expiração por data, mas o núcleo é manter a correspondência entre texto e vetor. O serviço abaixo implementa ingestão unitária, em lote e atualização.

# rag_app/services/ingestion_service.py
from typing import Dict, List
import logging
from django.db import transaction

from ..models import Document, DocumentChunk
from .embedding_service import embedding_service
from .chunking_service import chunking_service

logger = logging.getLogger(__name__)


class IngestionService:
    @transaction.atomic
    def ingest_document(
        self,
        title: str,
        content: str,
        document_type: str,
        metadata: Dict | None = None,
        tags: List[str] | None = None,
        source: str = "",
    ) -> Document:
        document = Document.objects.create(
            title=title,
            content=content,
            document_type=document_type,
            metadata=metadata or {},
            tags=tags or [],
            source=source,
        )

        chunks = chunking_service.chunk_document(content, metadata=document.metadata)
        textos = [c["text"] for c in chunks]
        embeddings = embedding_service.generate_embeddings_batch(textos)

        chunk_objs: List[DocumentChunk] = []
        for chunk_data, emb in zip(chunks, embeddings):
            chunk_objs.append(
                DocumentChunk(
                    document=document,
                    chunk_text=chunk_data["text"],
                    chunk_index=chunk_data["index"],
                    embedding=emb,
                    token_count=chunk_data["token_count"],
                )
            )

        DocumentChunk.objects.bulk_create(chunk_objs)
        logger.info("Documento %s ingerido com %s chunks", document.id, len(chunk_objs))
        return document

    @transaction.atomic
    def ingest_batch(self, documents: List[Dict]) -> List[Document]:
        criados: List[Document] = []
        for doc in documents:
            try:
                criados.append(self.ingest_document(**doc))
            except Exception as exc:
                logger.error("Falha ao ingerir '%s': %s", doc.get("title"), exc)
        return criados

    @transaction.atomic
    def update_document(self, document_id: int, content: str) -> Document:
        document = Document.objects.get(id=document_id)
        document.chunks.all().delete()

        document.content = content
        document.save(update_fields=["content", "updated_at"])

        chunks = chunking_service.chunk_document(content, metadata=document.metadata)
        textos = [c["text"] for c in chunks]
        embeddings = embedding_service.generate_embeddings_batch(textos)

        novos = [
            DocumentChunk(
                document=document,
                chunk_text=c["text"],
                chunk_index=c["index"],
                embedding=e,
                token_count=c["token_count"],
            )
            for c, e in zip(chunks, embeddings)
        ]
        DocumentChunk.objects.bulk_create(novos)
        logger.info("Documento %s atualizado com %s chunks", document.id, len(novos))
        return document


ingestion_service = IngestionService()

Recuperação semântica: busca por similaridade com filtros

A recuperação semântica começa transformando a consulta em embedding e buscando no banco os chunks “mais próximos” por uma métrica de distância. A função CosineDistance calcula distância do cosseno no PostgreSQL, em que valores menores indicam maior similaridade. Um similarity_threshold controla qualidade mínima, evitando retornar trechos pouco relacionados. Filtros por tipo e tags ajudam a restringir contexto ao domínio correto, reduzindo respostas equivocadas.

O serviço abaixo retorna uma estrutura com chunks, similaridade e metadados do documento, suficiente para montar um prompt em um gerador posterior. O histórico de consulta salva o texto da query, o embedding e quais chunks foram recuperados, formando uma trilha de auditoria. Isso também viabiliza análises como consultas recorrentes e lacunas na base. A mesma camada inclui busca híbrida, combinando vetor e palavras-chave para capturar termos exatos.

# rag_app/services/retrieval_service.py
from typing import Dict, List, Optional
import logging

from pgvector.django import CosineDistance
from django.contrib.postgres.search import SearchQuery, SearchRank, SearchVector

from ..models import DocumentChunk, QueryHistory
from .embedding_service import embedding_service

logger = logging.getLogger(__name__)


class RetrievalService:
    def __init__(self, top_k: int = 5, similarity_threshold: float = 0.7):
        self.top_k = top_k
        self.similarity_threshold = similarity_threshold

    def retrieve_context(
        self,
        query: str,
        document_type: Optional[str] = None,
        tags: Optional[List[str]] = None,
        save_history: bool = True,
    ) -> Dict:
        query_embedding = embedding_service.generate_embedding(query)

        qs = (
            DocumentChunk.objects.filter(document__is_active=True)
            .select_related("document")
        )

        if document_type:
            qs = qs.filter(document__document_type=document_type)

        if tags:
            qs = qs.filter(document__tags__overlap=tags)

        similares = (
            qs.annotate(distance=CosineDistance("embedding", query_embedding))
            .order_by("distance")[: self.top_k]
        )

        resultados: List[Dict] = []
        for chunk in similares:
            similarity = 1 - float(chunk.distance)
            if similarity < self.similarity_threshold:
                continue

            resultados.append(
                {
                    "chunk_id": chunk.id,
                    "document_id": chunk.document.id,
                    "document_title": chunk.document.title,
                    "text": chunk.chunk_text,
                    "similarity": similarity,
                    "metadata": chunk.document.metadata,
                    "chunk_index": chunk.chunk_index,
                    "document_type": chunk.document.document_type,
                }
            )

        if save_history:
            QueryHistory.objects.create(
                query_text=query,
                query_embedding=query_embedding,
                retrieved_chunks=[r["chunk_id"] for r in resultados],
            )

        logger.info("Consulta recuperou %s chunks (após threshold)", len(resultados))
        return {"query": query, "query_embedding": query_embedding, "chunks": resultados}

    def hybrid_search(self, query: str, top_k: Optional[int] = None) -> List[Dict]:
        if top_k is None:
            top_k = self.top_k

        query_embedding = embedding_service.generate_embedding(query)

        vector_qs = (
            DocumentChunk.objects.filter(document__is_active=True)
            .select_related("document")
            .annotate(distance=CosineDistance("embedding", query_embedding))
            .order_by("distance")[: top_k * 2]
        )

        search_vector = (
            SearchVector("chunk_text", weight="A") +
            SearchVector("document__title", weight="B")
        )
        search_query = SearchQuery(query)

        keyword_qs = (
            DocumentChunk.objects.filter(document__is_active=True)
            .select_related("document")
            .annotate(rank=SearchRank(search_vector, search_query))
            .filter(rank__gte=0.1)
            .order_by("-rank")[: top_k * 2]
        )

        vistos = set()
        combinados: List[Dict] = []

        for c in vector_qs:
            if c.id in vistos:
                continue
            vistos.add(c.id)
            combinados.append(
                {
                    "chunk_id": c.id,
                    "text": c.chunk_text,
                    "document_title": c.document.title,
                    "score": 1 - float(c.distance),
                    "score_type": "vector",
                }
            )

        for c in keyword_qs:
            if c.id in vistos:
                continue
            vistos.add(c.id)
            combinados.append(
                {
                    "chunk_id": c.id,
                    "text": c.chunk_text,
                    "document_title": c.document.title,
                    "score": float(c.rank),
                    "score_type": "keyword",
                }
            )

        combinados.sort(key=lambda x: x["score"], reverse=True)
        return combinados[:top_k]


retrieval_service = RetrievalService()

API REST no Django: endpoints de ingestão e consulta

Uma API REST expõe operações de ingestão e busca para sistemas internos, interfaces web ou integrações. O Django REST Framework fornece APIView, serialização e respostas padronizadas. A permissão IsAuthenticated restringe acesso a usuários autenticados, base para políticas mais avançadas como perfis e escopos. Na ingestão, validações evitam salvar documentos incompletos e reduzem falhas posteriores.

Os endpoints abaixo cobrem ingestão unitária, ingestão em lote, consulta RAG e busca híbrida, além de listagem e detalhe de documentos. A paginação reduz o peso de listar muitos registros. A exclusão “soft delete” marca is_active como falso, mantendo rastreabilidade e permitindo restauração. Em sistemas sensíveis, esse padrão evita perda irreversível por engano e facilita auditoria.

# rag_app/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from django.core.paginator import Paginator

from .services.ingestion_service import ingestion_service
from .services.retrieval_service import retrieval_service
from .models import Document
from .serializers import DocumentSerializer


class DocumentIngestionView(APIView):
    permission_classes = [IsAuthenticated]

    def post(self, request):
        title = request.data.get("title")
        content = request.data.get("content")
        document_type = request.data.get("document_type", "general")
        tags = request.data.get("tags", [])
        metadata = request.data.get("metadata", {})
        source = request.data.get("source", "")

        if not title or not content:
            return Response(
                {"error": "title e content são obrigatórios"},
                status=status.HTTP_400_BAD_REQUEST,
            )

        document = ingestion_service.ingest_document(
            title=title,
            content=content,
            document_type=document_type,
            tags=tags,
            metadata=metadata,
            source=source,
        )

        return Response(
            {"message": "Documento ingerido com sucesso", "document": DocumentSerializer(document).data},
            status=status.HTTP_201_CREATED,
        )


class BatchIngestionView(APIView):
    permission_classes = [IsAuthenticated]

    def post(self, request):
        documents = request.data.get("documents", [])
        if not documents:
            return Response(
                {"error": "Nenhum documento informado"},
                status=status.HTTP_400_BAD_REQUEST,
            )

        created_docs = ingestion_service.ingest_batch(documents)
        return Response(
            {"message": "Ingestão em lote concluída", "document_count": len(created_docs)},
            status=status.HTTP_201_CREATED,
        )


class RAGQueryView(APIView):
    permission_classes = [IsAuthenticated]

    def post(self, request):
        query = request.data.get("query")
        document_type = request.data.get("document_type")
        tags = request.data.get("tags")
        top_k = int(request.data.get("top_k", 5))

        if not query:
            return Response(
                {"error": "query é obrigatória"},
                status=status.HTTP_400_BAD_REQUEST,
            )

        top_k_original = retrieval_service.top_k
        retrieval_service.top_k = top_k
        try:
            results = retrieval_service.retrieve_context(
                query=query,
                document_type=document_type,
                tags=tags,
            )
        finally:
            retrieval_service.top_k = top_k_original

        return Response(
            {"query": query, "results": results["chunks"], "retrieved_count": len(results["chunks"])},
            status=status.HTTP_200_OK,
        )


class HybridSearchView(APIView):
    permission_classes = [IsAuthenticated]

    def post(self, request):
        query = request.data.get("query")
        top_k = int(request.data.get("top_k", 5))

        if not query:
            return Response(
                {"error": "query é obrigatória"},
                status=status.HTTP_400_BAD_REQUEST,
            )

        results = retrieval_service.hybrid_search(query, top_k=top_k)
        return Response(
            {"query": query, "results": results, "retrieved_count": len(results)},
            status=status.HTTP_200_OK,
        )


class DocumentListView(APIView):
    permission_classes = [IsAuthenticated]

    def get(self, request):
        page_num = int(request.query_params.get("page", 1))
        page_size = int(request.query_params.get("page_size", 20))
        document_type = request.query_params.get("document_type")
        tags = request.query_params.get("tags")

        qs = Document.objects.filter(is_active=True)

        if document_type:
            qs = qs.filter(document_type=document_type)

        if tags:
            tag_list = [t.strip() for t in tags.split(",") if t.strip()]
            qs = qs.filter(tags__overlap=tag_list)

        qs = qs.order_by("-created_at")
        paginator = Paginator(qs, page_size)
        page = paginator.get_page(page_num)

        return Response(
            {
                "count": paginator.count,
                "total_pages": paginator.num_pages,
                "current_page": page_num,
                "results": DocumentSerializer(page, many=True).data,
            },
            status=status.HTTP_200_OK,
        )


class DocumentDetailView(APIView):
    permission_classes = [IsAuthenticated]

    def get(self, request, document_id: int):
        try:
            document = Document.objects.prefetch_related("chunks").get(id=document_id)
        except Document.DoesNotExist:
            return Response({"error": "Documento não encontrado"}, status=status.HTTP_404_NOT_FOUND)

        return Response(DocumentSerializer(document).data, status=status.HTTP_200_OK)

    def delete(self, request, document_id: int):
        try:
            document = Document.objects.get(id=document_id)
        except Document.DoesNotExist:
            return Response({"error": "Documento não encontrado"}, status=status.HTTP_404_NOT_FOUND)

        document.is_active = False
        document.save(update_fields=["is_active", "updated_at"])
        return Response({"message": "Documento desativado com sucesso"}, status=status.HTTP_200_OK)

Serializers: como os dados saem da API com consistência

Serializers definem como modelos viram JSON, reduzindo inconsistências e campos inesperados. Um serializer de chunks evita expor embedding, o que é útil por privacidade e tamanho de resposta. O campo calculado chunk_count fornece uma visão rápida do volume de indexação do documento. A inclusão de chunks no detalhe ajuda inspeção e depuração, mas pode ser pesada em documentos grandes, então costuma ser usada com cuidado.

O exemplo abaixo cria dois serializers: um para chunks e outro para documentos. A lista de campos é explícita, o que evita vazamento de campos adicionados no futuro sem intenção. O método get_chunk_count calcula o total sem exigir coluna redundante. Esse padrão mantém o contrato da API mais estável ao longo do tempo.

# rag_app/serializers.py
from rest_framework import serializers
from .models import Document, DocumentChunk


class ChunkSerializer(serializers.ModelSerializer):
    class Meta:
        model = DocumentChunk
        fields = ["id", "chunk_text", "chunk_index", "token_count"]


class DocumentSerializer(serializers.ModelSerializer):
    chunk_count = serializers.SerializerMethodField()
    chunks = ChunkSerializer(many=True, read_only=True)

    class Meta:
        model = Document
        fields = [
            "id",
            "title",
            "content",
            "source",
            "document_type",
            "tags",
            "metadata",
            "is_active",
            "created_at",
            "updated_at",
            "chunk_count",
            "chunks",
        ]

    def get_chunk_count(self, obj: Document) -> int:
        return obj.chunks.count()

URLs: roteamento dos endpoints

O roteamento define caminhos estáveis para ingestão, busca e gestão de documentos. No Django, o arquivo urls.py conecta rotas às views. Uma separação clara por prefixos, como “documents/” e “rag/”, facilita políticas de proxy, logs e monitoramento. Também ajuda a controlar permissões e rate limiting por grupo de endpoints.

O exemplo abaixo cria rotas diretas e previsíveis. Em ambientes maiores, costuma existir um prefixo global, como “/api/”, no roteador principal do projeto. O importante é manter nomes e padrões consistentes, reduzindo confusão entre versões de API. A estrutura aqui cobre todo o ciclo: inserir, consultar e administrar.

# rag_app/urls.py
from django.urls import path
from .views import (
    DocumentIngestionView,
    BatchIngestionView,
    RAGQueryView,
    HybridSearchView,
    DocumentListView,
    DocumentDetailView,
)

urlpatterns = [
    path("documents/ingest/", DocumentIngestionView.as_view(), name="ingest-document"),
    path("documents/ingest-batch/", BatchIngestionView.as_view(), name="ingest-batch"),
    path("rag/query/", RAGQueryView.as_view(), name="rag-query"),
    path("rag/hybrid-search/", HybridSearchView.as_view(), name="hybrid-search"),
    path("documents/", DocumentListView.as_view(), name="document-list"),
    path("documents//", DocumentDetailView.as_view(), name="document-detail"),
]

O que muda “na prática”: exemplos de ingestão e consulta

Em um fluxo tradicional sem RAG, encontrar informação depende de pesquisa por palavra-chave e leitura manual. Isso funciona quando termos são exatos, mas falha com sinônimos, reformulações e perguntas mais livres. Com RAG, um texto como “pressão alta” pode recuperar trechos que falam “hipertensão” mesmo sem coincidência literal. O ganho vem da proximidade semântica dos embeddings e da capacidade de filtrar por contexto.

Os exemplos abaixo demonstram ingestão de registros e consulta com filtros, mantendo a ideia de separar bases por tipo e tags. Tags podem representar domínio (“cardiology”) e também segmentação (“patient-123”), útil para evitar mistura de contexto. Metadados estruturados guardam campos específicos para auditoria e rastreamento. A consulta recupera chunks relevantes e devolve similaridade para inspeção.

# Exemplo de ingestão (uso interno em serviços/rotinas)
from rag_app.services.ingestion_service import ingestion_service
from rag_app.services.retrieval_service import retrieval_service

ingestion_service.ingest_document(
    title="Prontuário - Paciente 123",
    content="Paciente relata dor no peito há 2 dias. Exames iniciais indicam ...",
    document_type="medical_record",
    tags=["cardiology", "patient-123"],
    metadata={"patient_id": "123", "visit_date": "2026-01-15"},
    source="sistema_interno",
)

resultado = retrieval_service.retrieve_context(
    query="dor no peito com exame alterado",
    document_type="medical_record",
    tags=["cardiology", "patient-123"],
)

# 'resultado["chunks"]' contém trechos e similaridade

Otimização de desempenho: índices, lotes e cache

Em bases pequenas, a busca vetorial pode parecer rápida mesmo sem índice, mas isso muda quando o volume cresce. O índice IVFFlat acelera consultas, mas exige ajuste de parâmetros como lists, que influencia precisão e latência. Outra forma é usar HNSW, que costuma ter excelente qualidade e velocidade, com trade-offs distintos. Em ambos os casos, a escolha depende de volume, padrão de atualização e metas de tempo de resposta.

Além do índice, a ingestão em lote melhora muito o tempo total, pois reduz overhead de rede e de transação. Para consultas repetidas, cache de embeddings pode economizar CPU, principalmente quando consultas idênticas ou muito próximas se repetem. O snippet abaixo ilustra um cache simples com o cache do Django, que pode ser configurado para Redis. Em ambientes sensíveis, chaves e dados de cache devem respeitar políticas de retenção.

# Exemplo de cache de embeddings de consulta
from django.core.cache import cache
from rag_app.services.embedding_service import embedding_service

def get_cached_embedding(texto: str) -> list[float]:
    chave = f"embedding:{hash(texto)}"
    emb = cache.get(chave)

    if emb is None:
        emb = embedding_service.generate_embedding(texto)
        cache.set(chave, emb, timeout=3600)  # 1 hora

    return emb

Testes automatizados: validando ingestão e recuperação

Testes ajudam a garantir que alterações no chunking, modelo de embeddings ou índices não quebrem o comportamento essencial. Um teste básico valida que a ingestão cria chunks e que a recuperação retorna resultados com campos esperados. Isso reduz regressões silenciosas, comuns quando se mexe em thresholds ou na forma de filtrar. Em Django, TestCase cria um ambiente isolado com banco de teste.

O exemplo abaixo cria um documento de diretriz e executa uma consulta para validar retorno. Em um cenário real, testes adicionais verificariam filtros por tipo, tags e desativação de documentos. Também é comum testar o comportamento quando não há resultados acima do threshold. O objetivo é manter previsibilidade do sistema mesmo com evolução de dependências e modelos.

# rag_app/tests/test_rag.py
from django.test import TestCase
from rag_app.services.ingestion_service import ingestion_service
from rag_app.services.retrieval_service import retrieval_service


class RAGSystemTest(TestCase):
    def setUp(self):
        ingestion_service.ingest_document(
            title="Diretriz de Hipertensão",
            content="Hipertensão é frequentemente definida como pressão arterial acima de 140/90 em medições repetidas...",
            document_type="medical_guideline",
            tags=["cardiology"],
            metadata={"versao": "2026.1"},
        )

    def test_retrieval_retorna_chunks(self):
        results = retrieval_service.retrieve_context(query="o que é pressão alta?")
        self.assertGreater(len(results["chunks"]), 0)
        self.assertIn("similarity", results["chunks"][0])

    def test_filter_por_tipo(self):
        results = retrieval_service.retrieve_context(
            query="definição de hipertensão",
            document_type="medical_guideline",
        )
        for c in results["chunks"]:
            self.assertEqual(c["document_type"], "medical_guideline")

Segurança e privacidade: controles essenciais em um RAG privado

Um RAG privado não é apenas “rodar local”, pois ainda existe risco de vazamento por logs, permissões erradas e mistura de contextos. Autenticação e autorização precisam garantir que documentos e chunks acessados respeitem a mesma política do dado original. O registro de histórico de consulta deve ser cuidadosamente definido, pois consultas podem conter dados sensíveis e devem seguir retenção e mascaramento quando necessário. O tráfego deve usar HTTPS e certificados gerenciados, reduzindo interceptação.

Outro ponto é criptografia “em repouso”, que significa proteger dados no disco e backups com criptografia em nível de volume ou serviço. Embeddings também podem carregar sinal semântico do texto, então devem ser tratados como dados sensíveis quando o domínio exigir. Rate limiting reduz abuso e negação de serviço, especialmente em endpoints de consulta. Validação de entrada reduz riscos de payloads gigantes, que podem causar custos excessivos de CPU e memória.

Encerramento: um RAG completo, privado e escalável com Django e pgvector

Um RAG privado com Django, PostgreSQL e pgvector cria uma base de conhecimento segura e pesquisável por significado, mantendo documentos e embeddings sob controle interno. O desenho com modelos para documentos e chunks, serviços para embeddings, chunking, ingestão e recuperação organiza o sistema em camadas claras e testáveis. Índices vetoriais e processamento em lote elevam desempenho e permitem crescer para grandes volumes de conteúdo. Com autenticação, filtros e trilha de histórico, o sistema também ganha governança e auditabilidade.

O resultado é um pipeline completo que começa no texto bruto, passa por indexação semântica e termina em recuperação de contexto consistente, pronto para alimentar uma camada de geração de respostas quando necessário. A combinação de busca vetorial e busca por palavras-chave melhora robustez em perguntas livres e em termos específicos. A estrutura apresentada cobre início, meio e fim do problema: armazenar, organizar, recuperar e controlar informação sensível com qualidade e previsibilidade. Esse conjunto forma uma base sólida para aplicações internas que precisam equilibrar privacidade, desempenho e utilidade.

Recursos citados e úteis para consulta técnica: