Processar um CSV de 100GB com apenas 512MB de RAM não é um teste de bravura, é um exercício de engenharia. A memória disponível é insuficiente para carregar o arquivo inteiro, portanto a única saída é transformar a tarefa em um fluxo: ler pouco, processar pouco, escrever pouco — e repetir até o fim. O desafio não está apenas em “ler linha a linha”, mas em garantir desempenho aceitável, tolerância a falhas e saída consistente, mesmo sob restrições rígidas.
Essa classe de problema é frequente em ETL (extração, transformação e carregamento) e em pipelines de dados administrativos. O arquivo precisa ser limpo e transformado, e o resultado precisa ser salvo em formatos mais eficientes para análise, como JSON de linha única (NDJSON) ou Parquet. Em ambientes corporativos, a resposta pode incluir Kafka, Flink e HDFS; em um servidor simples, a resposta precisa ser minimalista e confiável. Em ambos os cenários, os mesmos princípios se aplicam: processamento em fluxo, “chunking” (lotes pequenos), checkpointing (pontos de retomada) e idempotência (reprocessar sem duplicar ou corromper a saída).
O conteúdo a seguir apresenta uma estratégia progressiva e prática em Python. Começa pelo essencial — streaming local — e evolui para soluções distribuídas com Kafka e Apache Flink/HDFS, incluindo exemplos de código e decisões de projeto. O objetivo é tornar o processamento viável, controlado e reproduzível, sem recorrer a atalhos frágis.
Os exemplos assumem Python 3.x, uso de bibliotecas amplamente adotadas (csv, json, pandas, pyarrow) e linha de comando Unix quando útil. Quando envolver Kafka/Flink/HDFS, o foco é a arquitetura e as peças mínimas de código necessárias para montar um pipeline realista.
O problema e as restrições reais
CSV de 100GB significa bilhões de caracteres, potencialmente dezenas ou centenas de milhões de linhas. Cada linha pode conter campos com aspas, vírgulas escapadas e quebras de linha internas. O parser deve ser resiliente a esses detalhes e não pode carregar tudo em memória. A leitura em fluxo é obrigatória, e o uso do módulo csv do Python é preferível a “split por vírgula”.
Com 512MB de RAM, qualquer lote precisa ser pequeno, sob o risco de o coletor de lixo e a paginação do sistema operacional degradarem o desempenho. A CPU única limita ganhos com paralelismo, mas há benefício em sobrepor I/O e CPU com duas threads (produtor-consumidor) e uma fila pequena. A escrita precisa ser incremental e robusta: usar arquivos parciais, flush periódico e um mecanismo de checkpoint para retomar em caso de falha.
Outro ponto prático é a saída. NDJSON (um JSON por linha) permite escrita incremental simples e é amigável a ferramentas de streaming. Parquet é colunar e mais eficiente para leitura analítica, mas exige consolidação por grupos de linhas (row groups) e schema estável; isso pede um escritor dedicado (como pyarrow.parquet.ParquetWriter) e lotes coerentes.
Estratégia-base: streaming, pequenos lotes e checkpoint
Uma abordagem sólida para o cenário local segue quatro pilares: ler em fluxo (sem carregar tudo), processar em pequenos lotes para amortizar custo fixo do parser, fazer checkpoint periódico do progresso (idealmente por deslocamento em bytes no arquivo) e escrever de forma idempotente (arquivos de saída por “partes” ou por lotes concluídos).
O checkpoint por offset de bytes permite retomar com precisão sem recomeçar. Textos são mais traiçoeiros que binários nesse ponto por causa de encodings e normalização de novas linhas. Abrir o arquivo original em binário e envolver com um TextIOWrapper dá acesso ao deslocamento real em bytes e mantém parsing correto. O offset salvo deve apontar para o início da próxima linha completa, para evitar duplicação ou perda.
Para sair de CSV para NDJSON com robustez, o fluxo clássico é: ler cabeçalho, iterar linha a linha gerando dicionários, validar/transformar campos, serializar com json.dumps e escrever uma linha por vez. Para Parquet, a variação é agregar N linhas em um lote consistente, converter para uma tabela Arrow e anexar como um novo “row group”.
Implementação mínima em Python: CSV para NDJSON com checkpoint
O exemplo a seguir implementa leitura em fluxo com csv.DictReader, transformação simples, escrita NDJSON e checkpoint por deslocamento de bytes. O checkpoint inclui: offset, número de linhas processadas, parte atual do arquivo de saída e cabeçalho (para retomar sem reler o início).
import csv
import io
import json
import os
import signal
from typing import Optional, Dict
CHECKPOINT_PATH = "checkpoint.json"
INPUT_PATH = "dados.csv" # 100GB
OUTPUT_DIR = "saida_ndjson" # cria arquivos part-00000.ndjson, part-00001.ndjson, ...
ENCODING = "utf-8"
BATCH_LINES = 50000 # ajuste conforme teste; mantenha baixo com pouca RAM
FLUSH_EVERY = 5000 # frequência de flush para reduzir perdas em falha
def carregar_checkpoint() -> Optional[Dict]:
if not os.path.exists(CHECKPOINT_PATH):
return None
with open(CHECKPOINT_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def salvar_checkpoint(cp: Dict):
tmp = CHECKPOINT_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(cp, f, ensure_ascii=False, indent=2)
os.replace(tmp, CHECKPOINT_PATH)
def transformar(linha: Dict[str, str]) -> Dict:
# Exemplo: normaliza espaços e converte um campo numérico
out = {k: (v.strip() if isinstance(v, str) else v) for k, v in linha.items()}
if "preco" in out and out["preco"] not in (None, ""):
try:
out["preco"] = float(out["preco"].replace(",", "."))
except Exception:
out["preco"] = None # marca inválido
return out
def processar():
os.makedirs(OUTPUT_DIR, exist_ok=True)
cp = carregar_checkpoint() or {
"offset": 0,
"linhas_processadas": 0,
"parte_atual": 0,
"header": None
}
parar = {"flag": False}
def handler(sig, frame):
parar["flag"] = True
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
# Abre em binário para checkpoint por bytes; envolve com TextIO para CSV
with open(INPUT_PATH, "rb") as raw:
# Posiciona no offset salvo (0 se iniciar)
if cp["offset"] > 0:
raw.seek(cp["offset"])
text = io.TextIOWrapper(raw, encoding=ENCODING, newline="")
# Lê/define header
if cp["header"] is None:
# Estamos no início do arquivo: leia a linha de cabeçalho
header_line = text.readline()
if not header_line:
return # arquivo vazio
header = next(csv.reader([header_line]))
cp["header"] = header
cp["offset"] = raw.tell() # salva offset logo após o cabeçalho
salvar_checkpoint(cp)
else:
header = cp["header"]
# Se offset apontar para o meio da linha, avance até a próxima quebra
# (caso de interrupção não alinhada). Garante início limpo de registro.
if cp["offset"] > 0:
# Garante que text esteja sincronizado com raw após seek
text.seek(0, io.SEEK_CUR)
reader = csv.DictReader(text, fieldnames=header)
# Se não estamos no início, pular a primeira "linha" que seria o header re-lido
if cp["offset"] == 0:
# Já consumimos o header manualmente, então o DictReader começará na primeira linha de dados
pass
else:
# Estamos no meio do arquivo: a próxima leitura já é dado
pass
linhas_no_lote = 0
parte = cp["parte_atual"]
saida_path = os.path.join(OUTPUT_DIR, f"part-{parte:05d}.ndjson")
out = open(saida_path, "a", encoding="utf-8")
try:
for linha in reader:
registro = transformar(linha)
out.write(json.dumps(registro, ensure_ascii=False) + "\n")
cp["linhas_processadas"] += 1
linhas_no_lote += 1
if (cp["linhas_processadas"] % FLUSH_EVERY) == 0:
out.flush()
os.fsync(out.fileno())
# Atualiza offset em bytes para ponto de retomada
cp["offset"] = raw.tell()
salvar_checkpoint(cp)
if linhas_no_lote >= BATCH_LINES:
# Rotaciona a parte para facilitar retomada e compactação posterior
out.flush()
out.close()
parte += 1
cp["parte_atual"] = parte
cp["offset"] = raw.tell()
salvar_checkpoint(cp)
saida_path = os.path.join(OUTPUT_DIR, f"part-{parte:05d}.ndjson")
out = open(saida_path, "a", encoding="utf-8")
linhas_no_lote = 0
if parar["flag"]:
# Interrompe com checkpoint consistente
out.flush()
cp["offset"] = raw.tell()
salvar_checkpoint(cp)
return
# Fim do arquivo
out.flush()
out.close()
cp["offset"] = raw.tell()
salvar_checkpoint(cp)
finally:
try:
out.close()
except Exception:
pass
if __name__ == "__main__":
processar()
O código mantém o uso de memória baixo: processa uma linha por vez, grava em NDJSON, gira arquivos de saída por lote e salva checkpoint periódico. O deslocamento salvo vem de raw.tell(), que mede bytes efetivos do arquivo. Em caso de interrupção, a retomada abre o arquivo original no offset salvo e continua a partir daí. Como os arquivos de saída são gravados por partes, evitar perda ou duplicação torna-se simples: cada “parte” representa um bloco finalizado.
Escrita incremental em Parquet com PyArrow (sem estourar a memória)
Para escrever em Parquet de forma incremental, é necessário agrupar linhas em lotes moderados e usar um ParquetWriter com schema estável. O fluxo é: acumular N registros (balanceando CPU/memória), converter para uma tabela Arrow, gravar como um novo “row group” e descartar o lote da memória. Em cenários com 512MB, manter lotes pequenos é prudente.
import csv
import io
import os
import pyarrow as pa
import pyarrow.parquet as pq
INPUT_PATH = "dados.csv"
ENCODING = "utf-8"
BATCH_ROWS = 20000 # ajuste a partir de testes
OUTPUT_PARQUET = "saida.parquet"
def inferir_schema_amostral(reader, header, amostras=2000):
linhas = []
for i, row in enumerate(reader):
if i >= amostras:
break
linhas.append({k: (v.strip() if isinstance(v, str) else v) for k, v in row.items()})
# Converte amostra para tabela e infere schema
table = pa.Table.from_pylist(linhas)
return table.schema, linhas
def to_table_pylist(registros, schema):
# Converte uma lista de dicionários em Tabela Arrow conforme schema
return pa.Table.from_pylist(registros, schema=schema)
def processar_para_parquet():
with open(INPUT_PATH, "rb") as raw:
text = io.TextIOWrapper(raw, encoding=ENCODING, newline="")
header = next(csv.reader([text.readline()]))
reader = csv.DictReader(text, fieldnames=header)
# Infere schema em pequena amostra e cria writer
schema, amostra = inferir_schema_amostral(reader, header)
writer = pq.ParquetWriter(OUTPUT_PARQUET, schema=schema, compression="snappy", use_dictionary=True)
try:
# Escreve a amostra primeiro
if amostra:
t = to_table_pylist(amostra, schema)
writer.write_table(t)
buffer = []
for row in reader:
# Exemplo mínimo de limpeza
out = {k: (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
# Converta tipos conforme o schema esperado se necessário
buffer.append(out)
if len(buffer) >= BATCH_ROWS:
t = to_table_pylist(buffer, schema)
writer.write_table(t)
buffer.clear()
if buffer:
t = to_table_pylist(buffer, schema)
writer.write_table(t)
finally:
writer.close()
if __name__ == "__main__":
processar_para_parquet()
O exemplo infere um schema a partir de uma amostra pequena, garantindo consistência dos tipos nos lotes seguintes. Em produção, schemas explícitos costumam ser preferíveis a inferência, para evitar surpresas com valores atípicos. O parâmetro BATCH_ROWS deve ser ajustado por teste, buscando equilíbrio entre overhead de escrita e memória ocupada pelos objetos Python antes de virarem colunas Arrow.
Controle de memória: definindo o tamanho do lote
O tamanho do lote ideal depende da largura do CSV (número de colunas), do tamanho médio das strings e do custo de transformações. Um limite seguro pode ser definido por número de linhas, mas convém observar o consumo real. Uma heurística prática é medir o crescimento da memória do processo e reduzir o lote se o pico se aproximar de uma fração do limite (por exemplo, 200–300MB num ambiente com 512MB), deixando margem para buffers do SO e bibliotecas nativas.
Uma alternativa é adaptar dinamicamente o lote a partir do volume de bytes lidos. Outra é usar monitoramento leve (por exemplo, tracemalloc ou psutil) apenas para aferir empiricamente durante testes e, com base nisso, fixar um tamanho de lote estável para produção. Evitar estruturas aninhadas ou cópias desnecessárias de strings também ajuda a conter o uso de memória.
Independentemente da estratégia, os princípios permanecem: não reter mais do que o necessário entre uma escrita e outra e liberar buffers assim que possível (listas com clear(), objetos locais fora de escopo e flush em arquivos).
Produtor–consumidor: sobreposição de I/O e CPU
Com um único núcleo, múltiplos processos não ajudam. No entanto, duas threads podem melhorar a vazão: uma lê e faz parsing leve, outra aplica transformações e escreve. A fila entre elas deve ter tamanho pequeno para evitar empilhar objetos em memória. Essa arquitetura oculta parte da latência de disco durante a CPU e vice-versa.
import csv
import io
import json
import threading
from queue import Queue
INPUT_PATH = "dados.csv"
ENCODING = "utf-8"
QUEUE_MAX = 2000
SENTINELA = object()
def produtor(q: Queue):
with open(INPUT_PATH, "rb") as raw:
text = io.TextIOWrapper(raw, encoding=ENCODING, newline="")
header = next(csv.reader([text.readline()]))
reader = csv.DictReader(text, fieldnames=header)
for row in reader:
q.put(row)
q.put(SENTINELA)
def consumidor(q: Queue):
with open("saida.ndjson", "w", encoding="utf-8") as out:
while True:
item = q.get()
if item is SENTINELA:
break
# transformação básica
out.write(json.dumps(item, ensure_ascii=False) + "\n")
if __name__ == "__main__":
fila = Queue(maxsize=QUEUE_MAX)
t1 = threading.Thread(target=produtor, args=(fila,))
t2 = threading.Thread(target=consumidor, args=(fila,))
t1.start(); t2.start()
t1.join(); t2.join()
Esse padrão traz ganhos quando o disco é relativamente lento e a transformação, moderada. Se a transformação for muito pesada, a fila crescerá e a memória aumentará; nesse caso, reduza QUEUE_MAX ou volte ao loop síncrono com lotes pequenos.
Validação, limpeza e tratamento de erros no CSV
CSV é um formato simples, porém traiçoeiro. Aspas não fechadas, vírgulas em textos, quebras de linha internas e caracteres inválidos são frequentes. O módulo csv lida com muitos desses casos quando aberto com newline="" e respeitando o dialecto padrão. Para entradas mais difíceis, convém capturar csv.Error e registrar linha problemática para auditoria.
Falhas pontuais não devem derrubar o processo inteiro. Uma prática saudável é enviar linhas inválidas para um arquivo de “quarentena” (dead-letter), com o erro associado, para posterior inspeção. Linhas inconsistentes podem ser normalizadas (trim), mapeadas para tipos esperados (inteiros, floats, datas) e padronizadas (formato ISO para datas).
Como a transformação acontece fora do contexto de um banco de dados, a validação precisa ser determinística: a mesma entrada deve produzir a mesma saída, inclusive para valores faltantes e erros de parsing. Isso facilita auditoria e depuração.
Quando usar Pandas e quando evitar
Pandas oferece um caminho prático para processamento em lotes com o parâmetro chunksize de read_csv. Cada chunk é um DataFrame em memória, processado de forma vetorizada, e em seguida liberado. Para 512MB, os chunks devem ser modestos, e transformações devem evitar criarem cópias desnecessárias de colunas.
import pandas as pd
from pathlib import Path
INPUT = "dados.csv"
OUT_DIR = Path("parquet_parts")
OUT_DIR.mkdir(exist_ok=True)
# chunksize deve ser ajustado por teste; começa-se baixo com pouca RAM
for i, chunk in enumerate(pd.read_csv(INPUT, chunksize=100_000)): # 100k linhas por exemplo
# transformações vetorizadas
if "preco" in chunk.columns:
chunk["preco"] = (
chunk["preco"]
.astype(str)
.str.replace(",", ".", regex=False)
.astype("float64", errors="ignore")
)
# grava cada chunk em um arquivo parquet separado
chunk.to_parquet(OUT_DIR / f"part-{i:05d}.parquet", index=False)
A escrita por partes facilita retomada e posterior consolidação. Evitar concatenações de DataFrames em loop é crucial, pois isso cria picos de memória. Quando a necessidade é controle total de memória e tolerância fina a falhas, o caminho com csv + pyarrow tende a ser mais previsível; Pandas é valioso quando as transformações vetorizadas trazem economia de CPU suficiente para compensar seu overhead.
Arquivos comprimidos e impacto no checkpoint
Se o CSV estiver comprimido (ex.: .gz), o deslocamento em bytes no arquivo comprimido não corresponde ao progresso no fluxo descomprimido; retomar por offset torna-se impreciso. Nesses casos, o checkpoint pode ser feito por número de linhas processadas e reprocessamento parcial (aceitável com NDJSON idempotente) ou por divisão prévia do arquivo em partes descomprimidas.
Processar diretamente gz com gzip.open(…, “rt”) funciona, mas a retomada precisa optar por métricas lógicas (ex.: contador de linhas) e aceitar reprocessar algumas linhas para garantir consistência. Alternativamente, descompactar previamente em disco e aplicar a estratégia de offset por bytes.
Independentemente da escolha, a idempotência da escrita é o pilar que evita duplicações na saída: escrever por partes com nomes determinísticos e somente “promover” uma parte concluída ao catálogo de saída após flush e fsync.
Dividir o arquivo fisicamente e processar em série
Separar o CSV em pedaços menores e processar cada pedaço sequencialmente simplifica a retomada e o controle de memória. É importante replicar o cabeçalho em cada parte, para manter o parsing simples no consumidor.
# Divide o arquivo em partes de ~500MB sem quebrar linhas
split -C 500m -d --additional-suffix=.csv dados.csv parte-
# Adiciona cabeçalho a cada parte (assume cabeçalho na primeira linha)
header=$(head -n 1 dados.csv)
for f in parte-*.csv; do
(echo "$header"; tail -n +2 "$f") > "${f%.csv}.with_header.csv" && mv "${f%.csv}.with_header.csv" "$f"
done
Com as partes menores, cada execução trata um arquivo de cada vez e produz um resultado de saída correspondente. A estratégia também facilita paralelizar em nós diferentes quando houver possibilidade de distribuir o processamento.
Arquiteturas distribuídas: HDFS, Kafka e Apache Flink
Quando o arquivo reside em HDFS e há um cluster disponível, a resposta muda de escopo: em vez de espremer tudo em uma máquina, distribui-se o trabalho. Kafka, Flink e o próprio HDFS formam um ecossistema robusto para ingestão, processamento com tolerância a falhas (checkpointing) e saída exatamente uma vez (exactly-once) — conceitos cruciais em pipelines de dados confiáveis.
Uma arquitetura típica: um produtor lê o CSV bruto e publica registros em um tópico Kafka (cada linha vira uma mensagem JSON). O Flink lê esse tópico, aplica transformações em paralelo e escreve Parquet particionado em HDFS. O Kafka gerencia offsets e retenção; o Flink gerencia checkpoints, backpressure e semântica de entrega.
Produtor em Python para Kafka (publica NDJSON). Usa um lote pequeno de envio para não inchar a memória do produtor:
from confluent_kafka import Producer
import csv, io, json
KAFKA_BOOTSTRAP = "localhost:9092"
TOPIC = "csv-registros"
INPUT = "dados.csv"
p = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP, "linger.ms": 50, "batch.num.messages": 1000})
def delivery(err, msg):
if err:
print(f"Falha ao enviar: {err}")
with open(INPUT, "rb") as raw:
text = io.TextIOWrapper(raw, encoding="utf-8", newline="")
header = next(csv.reader([text.readline()]))
reader = csv.DictReader(text, fieldnames=header)
for row in reader:
payload = json.dumps(row, ensure_ascii=False)
p.produce(TOPIC, value=payload.encode("utf-8"), callback=delivery)
p.poll(0) # drena eventos de entrega
p.flush()
No lado do Flink, é possível usar PyFlink Table API para declarar fontes e destinos. O exemplo a seguir mostra DDLs para ler JSON de um tópico Kafka e escrever Parquet em um sistema de arquivos (pode ser file:// ou hdfs://), com uma transformação simples em SQL:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# Observação: é necessário disponibilizar os JARs dos conectores (kafka, parquet/filesystem) no classpath do Flink.
t_env.execute_sql("""
CREATE TABLE kafka_src (
id STRING,
nome STRING,
preco STRING
) WITH (
'connector' = 'kafka',
'topic' = 'csv-registros',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-grupo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
""")
t_env.execute_sql("""
CREATE TABLE parquet_sink (
id STRING,
nome STRING,
preco DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///data/saida_parquet', -- ou 'file:///tmp/saida_parquet'
'format' = 'parquet'
)
""")
t_env.execute_sql("""
INSERT INTO parquet_sink
SELECT
id,
nome,
CAST(REPLACE(preco, ',', '.') AS DOUBLE) AS preco
FROM kafka_src
""").wait()
Nesse arranjo, a tolerância a falhas deriva dos checkpoints do Flink e do commit de offsets em Kafka. A escalabilidade vem do particionamento do tópico (mais partições, mais paralelismo) e do processamento distribuído. O armazenamento final em HDFS permite consultas analíticas eficientes sobre dados colunares.
Outra variante: ler diretamente arquivos em HDFS sem Kafka, com Flink File Source e escrever Parquet no mesmo HDFS, aplicando a limpeza via Table API/SQL. A decisão entre “arquivo direto” e “via Kafka” depende da necessidade de desacoplar ingestão do processamento e de transformar um batch único em uma pipeline contínua.
Escrita direto no HDFS a partir de Python
Quando só há HDFS e um processo Python simples, a biblioteca hdfs permite escrita incremental sem carregar tudo em memória. A combinação leitura local em fluxo + escrita remota por partes funciona bem para NDJSON e para arquivos Parquet individuais gerados por lote.
from hdfs import InsecureClient
import json
client = InsecureClient("http://namenode:9870", user="hdfs")
# Exemplo: envia NDJSON incremental para HDFS
registros = [{"id": "1", "nome": "A"}, {"id": "2", "nome": "B"}] # substitua pelo seu fluxo real
with client.write("/data/saida/part-00000.ndjson", encoding="utf-8", overwrite=True) as writer:
for r in registros:
writer.write(json.dumps(r, ensure_ascii=False) + "\n")
Para Parquet, a geração do arquivo continua local (pyarrow), e o upload do arquivo finalizado vai para o HDFS. Essa abordagem reduz o risco de arquivos Parquet parcialmente válidos em caso de falha durante a escrita remota.
Diferença entre abordagens e quando escolher cada uma
Várias soluções resolvem o problema, mas cada uma atende a contextos diferentes. Abaixo, um resumo prático para orientar a escolha em função dos requisitos, sem exaurir o assunto.
- Streaming local com csv/json: máxima simplicidade e controle, memória mínima, ótima para uma máquina única e processamento determinístico. Escreve NDJSON facilmente e permite checkpoint por bytes.
- Streaming local com pyarrow (Parquet): melhor saída para analytics, exige lotes e schema estável. Excelente para consumo posterior por engines colunares. Pouco uso de RAM quando bem configurado.
- Pandas com chunksize: produtividade alta e transformações vetorizadas, bom se os chunks couberem na RAM restante. Escrever por partes para evitar reabrir/append em Parquet.
- Produtor–consumidor com fila: ganho moderado ao sobrepor I/O e CPU, útil quando o disco é gargalo. Controlar tamanho da fila para não “vazar” memória.
- Dividir arquivo fisicamente (split): simplifica checkpoint e retomada, e prepara cenário para processamento distribuído manual (vários nós).
- Kafka + Flink + HDFS: solução robusta e distribuída, ótima para pipelines contínuas, alto throughput e tolerância a falhas. Overhead maior, adequado quando há cluster e volume/latência justificam.
Embora as tecnologias variem, o traço comum é a disciplina do streaming: evitar materializar mais do que o necessário e projetar a escrita para ser retomável e idempotente. A partir disso, o desenho arquitetural escala de um laptop a um cluster.
Limitações e cuidados
CSV não tem tipos nativos e carrega ambiguidades (campos vazios, números com vírgula, datas em formatos múltiplos). Definir regras de normalização antes de iniciar o processamento evita “tipos flutuantes” no resultado. Em Parquet, schema inconsistente quebra a leitura futura; prefira schema explícito ou inferência conservadora com coerção controlada.
Para 512MB de RAM, bibliotecas que constroem objetos pesados em memória exigem cautela. Evitar listas gigantes, reduzir cópias e serializações desnecessárias e selecionar campos essenciais para a saída reduz o footprint. Em cenários com compressão, avaliar se vale descomprimir primeiro ou aceitar retomada por contagem de linhas com possível reprocessamento parcial.
Na escrita Parquet, não tentar “append” ingênuo em um único arquivo sem controle do writer. A alternativa segura é gerar um arquivo final por lote ou usar ParquetWriter para consolidar vários row groups com schema fixo. Em NDJSON, não esquecer de flush e fsync periódicos para minimizar perdas em falhas abruptas.
Exemplo completo: fluxo local CSV → Parquet com checkpoint e partes
Este exemplo combina as ideias: processa em lotes, transforma, grava Parquet por partes (um arquivo Parquet por lote finalizado) e mantém um checkpoint para retomar no ponto certo em caso de falha. É uma variação pragmática quando se quer saída colunar sem manipular um único arquivo monolítico.
import csv
import io
import json
import os
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
INPUT = "dados.csv"
ENCODING = "utf-8"
CHECKPOINT = "ckpt_parquet.json"
OUT_DIR = Path("parquet_parts")
BATCH = 25000
def load_ckpt():
if os.path.exists(CHECKPOINT):
return json.load(open(CHECKPOINT, "r", encoding="utf-8"))
return {"offset": 0, "part": 0, "header": None}
def save_ckpt(ckpt):
tmp = CHECKPOINT + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(ckpt, f, ensure_ascii=False, indent=2)
os.replace(tmp, CHECKPOINT)
def normalize(row):
r = {k: (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
if "preco" in r and r["preco"]:
try:
r["preco"] = float(r["preco"].replace(",", "."))
except Exception:
r["preco"] = None
return r
def main():
OUT_DIR.mkdir(exist_ok=True)
ckpt = load_ckpt()
with open(INPUT, "rb") as raw:
if ckpt["offset"] > 0:
raw.seek(ckpt["offset"])
text = io.TextIOWrapper(raw, encoding=ENCODING, newline="")
if ckpt["header"] is None:
header = next(csv.reader([text.readline()]))
ckpt["header"] = header
ckpt["offset"] = raw.tell()
save_ckpt(ckpt)
else:
header = ckpt["header"]
text.seek(0, io.SEEK_CUR)
reader = csv.DictReader(text, fieldnames=header)
part = ckpt["part"]
buffer = []
for row in reader:
buffer.append(normalize(row))
if len(buffer) >= BATCH:
# infere schema por amostra do próprio buffer (ou defina fixo)
schema = pa.Table.from_pylist(buffer[:1000]).schema
table = pa.Table.from_pylist(buffer, schema=schema)
pq.write_table(table, OUT_DIR / f"part-{part:05d}.parquet", compression="snappy")
part += 1
ckpt["part"] = part
ckpt["offset"] = raw.tell()
save_ckpt(ckpt)
buffer.clear()
if buffer:
schema = pa.Table.from_pylist(buffer[:1000]).schema
table = pa.Table.from_pylist(buffer, schema=schema)
pq.write_table(table, OUT_DIR / f"part-{part:05d}.parquet", compression="snappy")
ckpt["part"] = part + 1
ckpt["offset"] = raw.tell()
save_ckpt(ckpt)
if __name__ == "__main__":
main()
Essa abordagem mantém a memória sob controle, evita corrupção de um único arquivo Parquet grande e torna a retomada trivial. A consolidação posterior (se desejada) pode ser feita por ferramentas que leem diretórios de partes, como motores SQL, Spark, DuckDB ou o próprio PyArrow Dataset.
Erros comuns nesse tipo de tarefa
Alguns deslizes aparecem repetidamente e custam caro quando o volume de dados é grande. A lista a seguir destaca problemas práticos e como evitá-los desde o início do projeto.
- Tentar carregar tudo em um DataFrame “só para ver” e derrubar a máquina. A solução é pensar em fluxo desde o primeiro teste, inclusive em notebooks.
- Usar split por vírgula em vez do parser CSV. Isso falha em casos triviais com aspas e vírgulas em campos textuais.
- Escrever Parquet “em append” sem ParquetWriter e sem schema estável. Isso resulta em arquivos corrompidos ou incompatíveis.
- Fazer checkpoint por linha em arquivos comprimidos e assumir retomada exata por byte. Em gzip, o offset de bytes da origem não corresponde ao fluxo decompresso.
- Ignorar idempotência e criar duplicatas de registros ao retomar. Escrever por partes fechadas e nomeadas determinísticamente mitiga o problema.
- Dimensionar lotes pelo “otimismo” em vez de medir. Ajuste o BATCH empiricamente e mantenha folga.
A correção desses pontos quase sempre elimina instabilidades e resultados “misteriosos”. Em seguida, os ganhos de desempenho vêm de detalhes como compressão adequada, ajuste do tamanho dos lotes e sobreposição leve de I/O e CPU.
Aplicação em projetos reais
Em ingestões pontuais, a estratégia de streaming local com checkpoint e escrita por partes atende bem. Logs operacionais, cadastros legados e dumps de sistemas transacionais geralmente podem ser tratados com loops simples, csv, json e pyarrow. O cuidado extra com schema e tipos em Parquet paga dividendos quando as consultas analíticas surgirem.
Em integrações contínuas, a opção Kafka + Flink entra em cena: o CSV de 100GB vira uma fila de eventos e, a partir daí, o pipeline é generalizado para outras fontes. O custo de infraestrutura aumenta, mas os benefícios de elasticidade e tolerância a falhas também. Em setores com compliance, a auditabilidade com checkpoints, offsets e commits de transação é tão importante quanto a latência.
Na borda do sistema, soluções mistas são comuns: dividir o arquivo, processar localmente em lotes que geram Parquet, enviar para HDFS/objeto e só então acionar processamento distribuído de alto nível. Essa composição preserva simplicidade em cada etapa e permite escalar quando necessário.
Fechamento
Ler um CSV de 100GB com 512MB de RAM não pede milagre, pede método. O eixo é o processamento em fluxo: ler pouco, transformar pouco e escrever pouco, com checkpoints que permitam retomar sem refazer tudo. Em ambiente local, csv.DictReader + NDJSON ou pyarrow.parquet com lotes pequenos resolvem com previsibilidade. Com necessidades analíticas, Parquet em partes consolida a saída em formato eficiente. Com demandas de continuidade e escala, Kafka e Flink introduzem particionamento, checkpointing e exactly-once de forma robusta sobre HDFS.
A resposta “certa” não é o nome de uma ferramenta, mas o conjunto de princípios aplicados com disciplina: streaming, chunking, checkpointing, idempotência e schema consistente. A execução cuidadosa desses pontos transforma um arquivo enorme e um hardware modesto em um pipeline confiável e reproduzível, pronto para ser auditado e evoluído.