Nanobot por Dentro: A Arquitetura Completa de um Assistente de IA Minimalista em Python com Sessões Persistentes, Ferramentas, Subagentes, Cron, Heartbeat e Loop de Agente

Published on: 2026-02-06
Post image
pt nanobot nanobot-ai nanobot-github arquitetura-do-nanobot como-funciona-o-nanobot nanobot-por-dentro assistente-de-ia-minimalista agente-de-ia-em-python arquitetura-de-agentes-em-python loop-de-agente-llm bot-com-memoria-persistente sessoes

O Nanobot representa uma abordagem minimalista e elegante para construir assistentes de inteligência artificial pessoais. Inspirado pelo projeto Clawdbot (agora OpenClaw), o Nanobot entrega funcionalidades essenciais de agentes de IA em aproximadamente 4.000 linhas de código, representando uma redução de 99% em comparação com os mais de 430.000 linhas do projeto original. Esta filosofia de design prioriza clareza, manutenibilidade e facilidade de extensão, tornando o projeto ideal tanto para uso pessoal quanto para pesquisa acadêmica.

O repositório oficial está disponível em https://github.com/HKUDS/nanobot e contém toda a implementação necessária para criar um assistente de IA funcional com persistência de memória, execução de ferramentas, integração com canais de comunicação e capacidade de executar tarefas em segundo plano através de subagentes. Este artigo explora cada componente interno do sistema, desde a arquitetura fundamental até os detalhes de implementação que permitem criar assistentes inteligentes e autônomos.

Filosofia e Arquitetura do Nanobot

O Nanobot foi projetado seguindo o princípio de que menos código significa mais clareza e menos bugs. A arquitetura é dividida em módulos bem definidos que se comunicam através de interfaces simples e previsíveis. O coração do sistema é o AgentLoop, um loop de processamento que recebe mensagens, constrói contexto, chama o modelo de linguagem, executa ferramentas quando necessário e envia respostas de volta aos canais de comunicação.

Todo o código é escrito em Python, utilizando tipagem estática através de type hints para maior segurança e legibilidade. A arquitetura assíncrona baseada em asyncio permite que o sistema lide eficientemente com múltiplas conexões simultâneas de diferentes canais de comunicação. O uso de dataclasses e Pydantic para definição de schemas garante validação robusta de configurações e dados em tempo de execução.

A estrutura de diretórios segue uma organização lógica e intuitiva. O diretório agent/ contém a lógica central do agente incluindo o loop principal, contexto, memória, skills e ferramentas. O diretório channels/ implementa integrações com plataformas de comunicação. O diretório bus/ gerencia o roteamento de mensagens. O diretório cron/ controla tarefas agendadas. O diretório heartbeat/ implementa verificações periódicas.

nanobot/
├── agent/              # Lógica central do agente
│   ├── loop.py         # Loop principal (LLM ↔ execução de ferramentas)
│   ├── context.py      # Construtor de prompts
│   ├── memory.py       # Sistema de memória persistente
│   ├── skills.py       # Carregador de habilidades
│   ├── subagent.py     # Execução de tarefas em background
│   └── tools/          # Ferramentas built-in
├── skills/             # Habilidades empacotadas (github, weather, tmux...)
├── channels/           # Integrações com canais de comunicação
├── bus/                # Roteamento de mensagens
├── cron/               # Tarefas agendadas
├── heartbeat/          # Verificações periódicas
├── providers/          # Provedores de LLM
├── session/            # Gerenciamento de sessões
├── config/             # Configuração
└── cli/                # Interface de linha de comando

O AgentLoop: O Coração do Sistema

O AgentLoop é a classe central que coordena todo o processamento de mensagens no Nanobot. Implementada em /nanobot/agent/loop.py, esta classe é responsável por receber mensagens do barramento, construir o contexto apropriado com histórico e memória, chamar o modelo de linguagem grande (LLM), executar chamadas de ferramentas quando solicitadas pelo modelo, e enviar respostas de volta aos canais de origem.

O loop opera de forma contínua, aguardando mensagens na fila de entrada com um timeout de 1 segundo. Quando uma mensagem chega, ela é processada através de um ciclo iterativo que continua até que o modelo produza uma resposta final ou o número máximo de iterações seja atingido. O padrão de máximo de iterações é 20, configurável para permitir conversas mais ou menos complexas com uso de ferramentas.

O construtor do AgentLoop recebe várias dependências através de injeção. O MessageBus para comunicação assíncrona. O LLMProvider para chamadas ao modelo de linguagem. O caminho do workspace onde arquivos de memória e configuração residem. O modelo específico a ser usado. A chave de API do Brave Search para buscas na web. Configurações de execução de comandos shell. E opcionalmente um serviço de cron para tarefas agendadas.

class AgentLoop:
    """
    O agent loop é o motor central de processamento.

    Ele:
    1. Recebe mensagens do barramento
    2. Constrói contexto com histórico, memória, skills
    3. Chama o LLM
    4. Executa chamadas de ferramentas
    5. Envia respostas de volta
    """

    def __init__(
        self,
        bus: MessageBus,
        provider: LLMProvider,
        workspace: Path,
        model: str | None = None,
        max_iterations: int = 20,
        brave_api_key: str | None = None,
        exec_config: ExecToolConfig | None = None,
        cron_service: CronService | None = None,
    ):
        self.bus = bus
        self.provider = provider
        self.workspace = workspace
        self.model = model or provider.get_default_model()
        self.max_iterations = max_iterations

        # Inicializa componentes internos
        self.context = ContextBuilder(workspace)
        self.sessions = SessionManager(workspace)
        self.tools = ToolRegistry()
        self.subagents = SubagentManager(
            provider=provider,
            workspace=workspace,
            bus=bus,
            model=self.model,
        )

        # Registra ferramentas padrão
        self._register_default_tools()

Fluxo de Processamento de Mensagens

Quando uma mensagem chega ao AgentLoop, ela passa por um fluxo bem definido de processamento. Primeiro, o sistema verifica se é uma mensagem de sistema (como anúncios de subagentes) ou uma mensagem regular de usuário. Para mensagens regulares, a sessão correspondente é recuperada ou criada baseada na chave de sessão que combina canal e identificador de chat.

O contexto das ferramentas é então atualizado para refletir o canal e chat atuais. Isto é importante para ferramentas como message que precisam saber para onde enviar respostas, e para spawn que precisa saber onde anunciar resultados de subagentes. Em seguida, as mensagens são construídas incluindo o prompt de sistema, histórico da sessão e a mensagem atual do usuário.

O loop principal então entra em ação. A cada iteração, o LLM é chamado com as mensagens atuais e definições de ferramentas. Se a resposta contém chamadas de ferramentas, elas são executadas e os resultados adicionados ao contexto. O loop continua até que o modelo produza uma resposta final sem chamadas de ferramentas ou o limite de iterações seja atingido.

async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """Processa uma única mensagem de entrada."""

    # Obtém ou cria sessão
    session = self.sessions.get_or_create(msg.session_key)

    # Atualiza contexto das ferramentas
    message_tool = self.tools.get("message")
    if isinstance(message_tool, MessageTool):
        message_tool.set_context(msg.channel, msg.chat_id)

    # Constrói mensagens iniciais (system + histórico + mensagem atual)
    messages = self.context.build_messages(
        history=session.get_history(),
        current_message=msg.content,
        channel=msg.channel,
        chat_id=msg.chat_id,
    )

    # Loop do agente
    iteration = 0
    final_content = None

    while iteration < self.max_iterations:
        iteration += 1

        # Chama LLM
        response = await self.provider.chat(
            messages=messages,
            tools=self.tools.get_definitions(),
            model=self.model
        )

        # Processa chamadas de ferramentas
        if response.has_tool_calls:
            # Executa ferramentas e adiciona resultados ao contexto
            for tool_call in response.tool_calls:
                result = await self.tools.execute(tool_call.name, tool_call.arguments)
                messages = self.context.add_tool_result(
                    messages, tool_call.id, tool_call.name, result
                )
        else:
            # Sem chamadas de ferramentas, terminamos
            final_content = response.content
            break

    # Salva na sessão
    session.add_message("user", msg.content)
    session.add_message("assistant", final_content)
    self.sessions.save(session)

    return OutboundMessage(
        channel=msg.channel,
        chat_id=msg.chat_id,
        content=final_content
    )

O Barramento de Mensagens (MessageBus)

O MessageBus é o componente que desacopla os canais de comunicação do núcleo do agente. Implementado em /nanobot/bus/queue.py, ele utiliza filas assíncronas do asyncio para gerenciar o fluxo de mensagens de entrada (inbound) e saída (outbound). Esta arquitetura permite que canais e agente operem de forma independente e em paralelo.

As mensagens de entrada são representadas pela classe InboundMessage que contém o canal de origem, identificador do remetente, identificador do chat, conteúdo textual, lista de arquivos de mídia e metadados específicos do canal. A propriedade session_key combina canal e chat_id para identificar unicamente uma conversa.

As mensagens de saída são representadas pela classe OutboundMessage que contém canal de destino, identificador do chat, conteúdo da resposta, referência opcional para reply e lista de arquivos de mídia. O barramento suporta subscrição de callbacks para canais específicos, permitindo que cada canal receba apenas as mensagens destinadas a ele.

# Definição dos tipos de eventos
@dataclass
class InboundMessage:
    """Mensagem recebida de um canal de chat."""
    channel: str        # telegram, discord, slack, whatsapp
    sender_id: str      # Identificador do usuário
    chat_id: str        # Identificador do chat/canal
    content: str        # Texto da mensagem
    timestamp: datetime = field(default_factory=datetime.now)
    media: list[str] = field(default_factory=list)  # URLs de mídia
    metadata: dict[str, Any] = field(default_factory=dict)

    @property
    def session_key(self) -> str:
        """Chave única para identificação de sessão."""
        return f"{self.channel}:{self.chat_id}"


@dataclass
class OutboundMessage:
    """Mensagem para enviar a um canal de chat."""
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None
    media: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

Funcionamento do MessageBus

O MessageBus gerencia duas filas principais: inbound para mensagens de entrada vindas dos canais de comunicação, e outbound para respostas do agente que devem ser enviadas de volta aos usuários. O método publish_inbound permite que canais publiquem novas mensagens, enquanto consume_inbound é usado pelo AgentLoop para receber a próxima mensagem disponível.

Para mensagens de saída, o barramento suporta um padrão de publicação-subscrição. Canais podem se inscrever para receber mensagens através do método subscribe_outbound, especificando um callback assíncrono que será chamado quando mensagens chegarem para aquele canal. O método dispatch_outbound roda como uma tarefa em background, consumindo mensagens da fila de saída e despachando-as para os subscribers apropriados.

Esta arquitetura proporciona baixo acoplamento entre componentes. O agente não precisa conhecer os detalhes de cada canal de comunicação, e novos canais podem ser adicionados simplesmente registrando-se no barramento. Erros em um canal não afetam outros canais ou o processamento do agente.

class MessageBus:
    """
    Barramento de mensagens assíncrono que desacopla
    canais de chat do núcleo do agente.
    """

    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
        self._outbound_subscribers: dict[str, list[Callable]] = {}
        self._running = False

    async def publish_inbound(self, msg: InboundMessage) -> None:
        """Publica mensagem de um canal para o agente."""
        await self.inbound.put(msg)

    async def consume_inbound(self) -> InboundMessage:
        """Consome a próxima mensagem de entrada (bloqueia até disponível)."""
        return await self.inbound.get()

    async def publish_outbound(self, msg: OutboundMessage) -> None:
        """Publica resposta do agente para os canais."""
        await self.outbound.put(msg)

    def subscribe_outbound(
        self,
        channel: str,
        callback: Callable[[OutboundMessage], Awaitable[None]]
    ) -> None:
        """Inscreve-se para mensagens de saída de um canal específico."""
        if channel not in self._outbound_subscribers:
            self._outbound_subscribers[channel] = []
        self._outbound_subscribers[channel].append(callback)

    async def dispatch_outbound(self) -> None:
        """Despacha mensagens de saída para canais inscritos."""
        self._running = True
        while self._running:
            try:
                msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
                subscribers = self._outbound_subscribers.get(msg.channel, [])
                for callback in subscribers:
                    await callback(msg)
            except asyncio.TimeoutError:
                continue

Sistema de Sessões

O SessionManager é responsável por manter o histórico de conversas persistente entre reinicializações do sistema. Implementado em /nanobot/session/manager.py, ele armazena sessões como arquivos JSONL (JSON Lines) no diretório ~/.nanobot/sessions/. Cada sessão é identificada por uma chave única que combina canal e identificador de chat.

Uma sessão contém a lista de mensagens trocadas, timestamps de criação e atualização, e metadados opcionais. O método get_history retorna as mensagens no formato esperado pelo LLM, opcionalmente limitando a quantidade para evitar exceder limites de contexto. Por padrão, as últimas 50 mensagens são retornadas.

O formato JSONL foi escolhido por sua simplicidade e eficiência. Cada linha do arquivo representa uma mensagem ou metadado em JSON independente, permitindo append eficiente e leitura incremental. A primeira linha sempre contém metadados da sessão incluindo timestamps e dados adicionais. Linhas subsequentes contêm as mensagens da conversa.

@dataclass
class Session:
    """
    Uma sessão de conversa.
    Armazena mensagens em formato JSONL para fácil leitura e persistência.
    """
    key: str  # channel:chat_id
    messages: list[dict[str, Any]] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    metadata: dict[str, Any] = field(default_factory=dict)

    def add_message(self, role: str, content: str, **kwargs: Any) -> None:
        """Adiciona uma mensagem à sessão."""
        msg = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            **kwargs
        }
        self.messages.append(msg)
        self.updated_at = datetime.now()

    def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
        """
        Obtém histórico de mensagens para contexto do LLM.
        Retorna apenas role e content para formato compatível.
        """
        recent = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages
        return [{"role": m["role"], "content": m["content"]} for m in recent]

    def clear(self) -> None:
        """Limpa todas as mensagens da sessão."""
        self.messages = []
        self.updated_at = datetime.now()

Gerenciamento de Sessões

O SessionManager gerencia o ciclo de vida completo das sessões. O método get_or_create verifica primeiro se a sessão existe em cache em memória, depois tenta carregar do disco, e finalmente cria uma nova sessão se necessário. Este padrão de cache reduz operações de disco durante conversas ativas.

A persistência em disco é tratada pelo método save que escreve a sessão completa em formato JSONL. O arquivo começa com uma linha de metadados contendo tipo, timestamps e dados adicionais, seguida pelas mensagens da conversa. A operação de escrita sobrescreve o arquivo inteiro para garantir consistência.

O método list_sessions permite listar todas as sessões armazenadas, retornando informações básicas como chave, timestamps e caminho do arquivo. Isto é útil para interfaces de administração ou debugging. O método delete remove uma sessão tanto do cache quanto do disco.

class SessionManager:
    """Gerencia sessões de conversa persistentes."""

    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.sessions_dir = ensure_dir(Path.home() / ".nanobot" / "sessions")
        self._cache: dict[str, Session] = {}

    def get_or_create(self, key: str) -> Session:
        """Obtém sessão existente ou cria nova."""
        # Verifica cache
        if key in self._cache:
            return self._cache[key]

        # Tenta carregar do disco
        session = self._load(key)
        if session is None:
            session = Session(key=key)

        self._cache[key] = session
        return session

    def save(self, session: Session) -> None:
        """Salva sessão em disco no formato JSONL."""
        path = self._get_session_path(session.key)

        with open(path, "w") as f:
            # Escreve metadados primeiro
            metadata_line = {
                "_type": "metadata",
                "created_at": session.created_at.isoformat(),
                "updated_at": session.updated_at.isoformat(),
                "metadata": session.metadata
            }
            f.write(json.dumps(metadata_line) + "\n")

            # Escreve mensagens
            for msg in session.messages:
                f.write(json.dumps(msg) + "\n")

        self._cache[session.key] = session

    def list_sessions(self) -> list[dict[str, Any]]:
        """Lista todas as sessões armazenadas."""
        sessions = []
        for path in self.sessions_dir.glob("*.jsonl"):
            # Lê apenas a linha de metadados
            with open(path) as f:
                first_line = f.readline().strip()
                if first_line:
                    data = json.loads(first_line)
                    if data.get("_type") == "metadata":
                        sessions.append({
                            "key": path.stem.replace("_", ":"),
                            "created_at": data.get("created_at"),
                            "updated_at": data.get("updated_at"),
                        })
        return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True)

O Construtor de Contexto (ContextBuilder)

O ContextBuilder é responsável por montar o prompt completo que será enviado ao modelo de linguagem. Implementado em /nanobot/agent/context.py, ele combina múltiplas fontes de informação: arquivos de bootstrap do workspace, memória persistente, habilidades disponíveis e histórico da conversa atual.

Os arquivos de bootstrap são carregados do workspace e incluem AGENTS.md com instruções operacionais, SOUL.md com personalidade do agente, USER.md com informações sobre o usuário, TOOLS.md com documentação de ferramentas, e IDENTITY.md opcional com identidade adicional. Estes arquivos permitem customizar completamente o comportamento do agente.

O método build_system_prompt monta o prompt de sistema combinando seção de identidade com data/hora atual e caminho do workspace, conteúdo dos arquivos de bootstrap, contexto de memória incluindo memória de longo prazo e notas do dia, e resumo das habilidades disponíveis. O resultado é um prompt estruturado que orienta o modelo sobre sua identidade, capacidades e contexto atual.

class ContextBuilder:
    """
    Constrói o contexto (system prompt + mensagens) para o agente.
    Combina arquivos de bootstrap, memória, skills e histórico.
    """

    BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]

    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.memory = MemoryStore(workspace)
        self.skills = SkillsLoader(workspace)

    def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
        """Constrói o prompt de sistema completo."""
        parts = []

        # Identidade central
        parts.append(self._get_identity())

        # Arquivos de bootstrap
        bootstrap = self._load_bootstrap_files()
        if bootstrap:
            parts.append(bootstrap)

        # Contexto de memória
        memory = self.memory.get_memory_context()
        if memory:
            parts.append(f"# Memory\n\n{memory}")

        # Skills sempre carregadas
        always_skills = self.skills.get_always_skills()
        if always_skills:
            always_content = self.skills.load_skills_for_context(always_skills)
            if always_content:
                parts.append(f"# Active Skills\n\n{always_content}")

        # Resumo de skills disponíveis
        skills_summary = self.skills.build_skills_summary()
        if skills_summary:
            parts.append(f"# Skills\n\n{skills_summary}")

        return "\n\n---\n\n".join(parts)

    def _get_identity(self) -> str:
        """Obtém a seção de identidade central."""
        from datetime import datetime
        now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
        workspace_path = str(self.workspace.expanduser().resolve())

        return f"""# nanobot 🐈

You are nanobot, a helpful AI assistant.

## Current Time
{now}

## Workspace
Your workspace is at: {workspace_path}
- Memory files: {workspace_path}/memory/MEMORY.md
- Daily notes: {workspace_path}/memory/YYYY-MM-DD.md"""

Sistema de Memória

O MemoryStore gerencia a memória persistente do agente, permitindo que informações importantes sobrevivam entre sessões. Implementado em /nanobot/agent/memory.py, ele suporta dois tipos principais de memória: notas diárias organizadas por data e memória de longo prazo para informações permanentes.

As notas diárias são armazenadas em arquivos com formato YYYY-MM-DD.md no diretório memory/ do workspace. O agente pode ler e escrever nestas notas durante conversas, registrando eventos importantes, descobertas ou tarefas do dia. O método get_recent_memories permite recuperar notas dos últimos N dias para contexto adicional.

A memória de longo prazo reside no arquivo MEMORY.md e contém informações curadas que devem persistir indefinidamente. Isto inclui preferências do usuário, decisões importantes, fatos estáveis e lições aprendidas. O agente é instruído a atualizar este arquivo quando aprende algo que considera importante lembrar no futuro.

class MemoryStore:
    """
    Sistema de memória para o agente.
    Suporta notas diárias (memory/YYYY-MM-DD.md) e memória de longo prazo (MEMORY.md).
    """

    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.memory_dir = ensure_dir(workspace / "memory")
        self.memory_file = self.memory_dir / "MEMORY.md"

    def get_today_file(self) -> Path:
        """Obtém caminho do arquivo de memória de hoje."""
        return self.memory_dir / f"{today_date()}.md"

    def read_today(self) -> str:
        """Lê notas de memória de hoje."""
        today_file = self.get_today_file()
        if today_file.exists():
            return today_file.read_text(encoding="utf-8")
        return ""

    def append_today(self, content: str) -> None:
        """Adiciona conteúdo às notas de hoje."""
        today_file = self.get_today_file()

        if today_file.exists():
            existing = today_file.read_text(encoding="utf-8")
            content = existing + "\n" + content
        else:
            # Adiciona cabeçalho para novo dia
            header = f"# {today_date()}\n\n"
            content = header + content

        today_file.write_text(content, encoding="utf-8")

    def read_long_term(self) -> str:
        """Lê memória de longo prazo (MEMORY.md)."""
        if self.memory_file.exists():
            return self.memory_file.read_text(encoding="utf-8")
        return ""

    def get_memory_context(self) -> str:
        """Obtém contexto de memória para o agente."""
        parts = []

        # Memória de longo prazo
        long_term = self.read_long_term()
        if long_term:
            parts.append("## Long-term Memory\n" + long_term)

        # Notas de hoje
        today = self.read_today()
        if today:
            parts.append("## Today's Notes\n" + today)

        return "\n\n".join(parts) if parts else ""

Sistema de Ferramentas (Tools)

As ferramentas são o mecanismo que permite ao agente interagir com o mundo exterior. O Nanobot implementa um sistema de ferramentas baseado em classes abstratas que define uma interface consistente para todas as ferramentas. A classe base Tool em /nanobot/agent/tools/base.py define propriedades obrigatórias de nome, descrição e parâmetros, além do método execute para execução.

Cada ferramenta define seu schema de parâmetros no formato JSON Schema, compatível com o formato esperado pela API de function calling dos LLMs. O método to_schema converte a definição da ferramenta para o formato OpenAI/Anthropic. O método validate_params valida parâmetros recebidos contra o schema definido, retornando lista de erros se houver problemas.

A validação implementada suporta tipos básicos (string, integer, number, boolean, array, object), enumerações, ranges numéricos (minimum, maximum), constraints de string (minLength, maxLength) e validação recursiva para objetos e arrays aninhados. Esta validação robusta previne erros de execução e fornece feedback claro ao modelo quando parâmetros são inválidos.

class Tool(ABC):
    """
    Classe base abstrata para ferramentas do agente.
    Ferramentas são capacidades que o agente pode usar para interagir
    com o ambiente, como ler arquivos, executar comandos, etc.
    """

    _TYPE_MAP = {
        "string": str,
        "integer": int,
        "number": (int, float),
        "boolean": bool,
        "array": list,
        "object": dict,
    }

    @property
    @abstractmethod
    def name(self) -> str:
        """Nome da ferramenta usado em chamadas de função."""
        pass

    @property
    @abstractmethod
    def description(self) -> str:
        """Descrição do que a ferramenta faz."""
        pass

    @property
    @abstractmethod
    def parameters(self) -> dict[str, Any]:
        """JSON Schema para parâmetros da ferramenta."""
        pass

    @abstractmethod
    async def execute(self, **kwargs: Any) -> str:
        """Executa a ferramenta com parâmetros dados."""
        pass

    def validate_params(self, params: dict[str, Any]) -> list[str]:
        """Valida parâmetros contra JSON schema. Retorna lista de erros."""
        schema = self.parameters or {}
        return self._validate(params, {**schema, "type": "object"}, "")

    def to_schema(self) -> dict[str, Any]:
        """Converte ferramenta para formato de function schema OpenAI."""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters,
            }
        }

O Registro de Ferramentas (ToolRegistry)

O ToolRegistry gerencia o conjunto de ferramentas disponíveis para o agente. Implementado em /nanobot/agent/tools/registry.py, ele permite registro dinâmico de ferramentas, obtenção de definições para envio ao LLM, e execução de ferramentas por nome. Esta arquitetura permite adicionar ou remover ferramentas em tempo de execução.

O método register adiciona uma ferramenta ao registro usando seu nome como chave. O método get_definitions retorna todas as definições de ferramentas no formato JSON Schema esperado pelos LLMs. O método execute localiza a ferramenta pelo nome, valida os parâmetros e executa, retornando o resultado como string ou mensagem de erro se algo falhar.

A lista de ferramentas padrão registradas pelo AgentLoop inclui ferramentas de arquivo (read_file, write_file, edit_file, list_dir), ferramenta de shell (exec), ferramentas web (web_search, web_fetch), ferramenta de mensagens (message), ferramenta de spawn (spawn) e ferramenta de cron (cron). Cada uma destas ferramentas estende a classe base Tool com implementação específica.

class ToolRegistry:
    """
    Registro para ferramentas do agente.
    Permite registro dinâmico e execução de ferramentas.
    """

    def __init__(self):
        self._tools: dict[str, Tool] = {}

    def register(self, tool: Tool) -> None:
        """Registra uma ferramenta."""
        self._tools[tool.name] = tool

    def unregister(self, name: str) -> None:
        """Remove registro de uma ferramenta por nome."""
        self._tools.pop(name, None)

    def get(self, name: str) -> Tool | None:
        """Obtém ferramenta por nome."""
        return self._tools.get(name)

    def get_definitions(self) -> list[dict[str, Any]]:
        """Obtém todas as definições de ferramentas em formato OpenAI."""
        return [tool.to_schema() for tool in self._tools.values()]

    async def execute(self, name: str, params: dict[str, Any]) -> str:
        """Executa ferramenta por nome com parâmetros dados."""
        tool = self._tools.get(name)
        if not tool:
            return f"Error: Tool '{name}' not found"

        try:
            errors = tool.validate_params(params)
            if errors:
                return f"Error: Invalid parameters: " + "; ".join(errors)
            return await tool.execute(**params)
        except Exception as e:
            return f"Error executing {name}: {str(e)}"

    @property
    def tool_names(self) -> list[str]:
        """Lista de nomes de ferramentas registradas."""
        return list(self._tools.keys())

Ferramentas de Sistema de Arquivos

O Nanobot inclui quatro ferramentas para manipulação do sistema de arquivos. A ferramenta read_file lê o conteúdo de um arquivo dado seu caminho, expandindo caminhos com til (~) e tratando erros de permissão ou arquivo não encontrado. A ferramenta write_file escreve conteúdo em um arquivo, criando diretórios pai automaticamente se necessário.

A ferramenta edit_file permite edição precisa de arquivos através de substituição de texto. O agente especifica o texto antigo a ser encontrado e o texto novo para substituição. A ferramenta verifica se o texto existe, alerta se aparecer múltiplas vezes (para evitar edições ambíguas), e realiza a substituição única. Isto permite edições cirúrgicas sem reescrever arquivos inteiros.

A ferramenta list_dir lista o conteúdo de um diretório, mostrando ícones diferentes para arquivos e diretórios. A saída formatada facilita a navegação do agente pelo sistema de arquivos para entender a estrutura de projetos e localizar arquivos relevantes.

class ReadFileTool(Tool):
    """Ferramenta para ler conteúdo de arquivos."""

    name = "read_file"
    description = "Read the contents of a file at the given path."
    parameters = {
        "type": "object",
        "properties": {
            "path": {"type": "string", "description": "The file path to read"}
        },
        "required": ["path"]
    }

    async def execute(self, path: str, **kwargs: Any) -> str:
        try:
            file_path = Path(path).expanduser()
            if not file_path.exists():
                return f"Error: File not found: {path}"
            if not file_path.is_file():
                return f"Error: Not a file: {path}"
            return file_path.read_text(encoding="utf-8")
        except PermissionError:
            return f"Error: Permission denied: {path}"


class EditFileTool(Tool):
    """Ferramenta para editar arquivo substituindo texto."""

    name = "edit_file"
    description = "Edit a file by replacing old_text with new_text."
    parameters = {
        "type": "object",
        "properties": {
            "path": {"type": "string"},
            "old_text": {"type": "string", "description": "Exact text to find"},
            "new_text": {"type": "string", "description": "Text to replace with"}
        },
        "required": ["path", "old_text", "new_text"]
    }

    async def execute(self, path: str, old_text: str, new_text: str, **kwargs) -> str:
        file_path = Path(path).expanduser()
        if not file_path.exists():
            return f"Error: File not found: {path}"

        content = file_path.read_text(encoding="utf-8")

        if old_text not in content:
            return "Error: old_text not found in file."

        count = content.count(old_text)
        if count > 1:
            return f"Warning: old_text appears {count} times. Provide more context."

        new_content = content.replace(old_text, new_text, 1)
        file_path.write_text(new_content, encoding="utf-8")
        return f"Successfully edited {path}"

Ferramenta de Execução de Shell

A ferramenta exec permite ao agente executar comandos shell no sistema. Implementada em /nanobot/agent/tools/shell.py, ela inclui várias camadas de proteção para prevenir comandos potencialmente destrutivos. Um conjunto de padrões regex bloqueia comandos como rm -rf, dd, format, shutdown e fork bombs.

A execução usa asyncio.create_subprocess_shell para rodar comandos de forma assíncrona com timeout configurável (padrão 60 segundos). Stdout e stderr são capturados e retornados ao agente. Saídas muito longas são truncadas para evitar sobrecarregar o contexto do LLM. O código de saída é incluído se diferente de zero.

A opção restrict_to_workspace quando ativada bloqueia comandos que acessam caminhos fora do diretório de trabalho. Isto adiciona uma camada extra de segurança em ambientes onde o agente deve operar apenas dentro de um escopo limitado. A ferramenta também detecta e bloqueia tentativas de path traversal usando ../.

class ExecTool(Tool):
    """Ferramenta para executar comandos shell."""

    def __init__(
        self,
        timeout: int = 60,
        working_dir: str | None = None,
        deny_patterns: list[str] | None = None,
        restrict_to_workspace: bool = False,
    ):
        self.timeout = timeout
        self.working_dir = working_dir
        self.deny_patterns = deny_patterns or [
            r"\brm\s+-[rf]{1,2}\b",          # rm -r, rm -rf
            r"\b(format|mkfs|diskpart)\b",   # operações de disco
            r"\bdd\s+if=",                   # dd
            r"\b(shutdown|reboot|poweroff)\b",
            r":\(\)\s*\{.*\};\s*:",          # fork bomb
        ]
        self.restrict_to_workspace = restrict_to_workspace

    name = "exec"
    description = "Execute a shell command and return its output."
    parameters = {
        "type": "object",
        "properties": {
            "command": {"type": "string", "description": "Shell command to execute"},
            "working_dir": {"type": "string", "description": "Optional working directory"}
        },
        "required": ["command"]
    }

    async def execute(self, command: str, working_dir: str | None = None, **kwargs) -> str:
        cwd = working_dir or self.working_dir or os.getcwd()

        # Verifica padrões bloqueados
        guard_error = self._guard_command(command, cwd)
        if guard_error:
            return guard_error

        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=cwd,
        )

        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout
            )
        except asyncio.TimeoutError:
            process.kill()
            return f"Error: Command timed out after {self.timeout} seconds"

        output_parts = []
        if stdout:
            output_parts.append(stdout.decode("utf-8", errors="replace"))
        if stderr:
            output_parts.append(f"STDERR:\n{stderr.decode()}")
        if process.returncode != 0:
            output_parts.append(f"\nExit code: {process.returncode}")

        result = "\n".join(output_parts) if output_parts else "(no output)"

        # Trunca saída muito longa
        if len(result) > 10000:
            result = result[:10000] + f"\n... (truncated)"

        return result

Ferramentas Web

O Nanobot inclui duas ferramentas para acesso à web. A ferramenta web_search realiza buscas usando a API do Brave Search, retornando títulos, URLs e snippets dos resultados. Requer uma chave de API configurada. O número de resultados pode ser especificado (1-10), com padrão de 5 resultados.

A ferramenta web_fetch busca conteúdo de uma URL e extrai o texto legível usando a biblioteca Readability. Ela suporta diferentes modos de extração: markdown converte HTML para markdown preservando links e formatação básica, enquanto text retorna apenas texto puro. URLs são validadas para aceitar apenas http/https e domínios válidos.

A extração de conteúdo trata diferentes tipos de resposta automaticamente. JSON é formatado com indentação. HTML passa pelo Readability para extrair o conteúdo principal removendo navegação, publicidade e outros elementos não essenciais. Outros tipos de conteúdo são retornados como texto bruto. Conteúdo muito longo é truncado para não sobrecarregar o contexto.

class WebSearchTool(Tool):
    """Busca na web usando Brave Search API."""

    name = "web_search"
    description = "Search the web. Returns titles, URLs, and snippets."
    parameters = {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"},
            "count": {"type": "integer", "description": "Results (1-10)", "minimum": 1, "maximum": 10}
        },
        "required": ["query"]
    }

    def __init__(self, api_key: str | None = None, max_results: int = 5):
        self.api_key = api_key or os.environ.get("BRAVE_API_KEY", "")
        self.max_results = max_results

    async def execute(self, query: str, count: int | None = None, **kwargs) -> str:
        if not self.api_key:
            return "Error: BRAVE_API_KEY not configured"

        n = min(max(count or self.max_results, 1), 10)
        async with httpx.AsyncClient() as client:
            r = await client.get(
                "https://api.search.brave.com/res/v1/web/search",
                params={"q": query, "count": n},
                headers={"X-Subscription-Token": self.api_key},
                timeout=10.0
            )
            r.raise_for_status()

        results = r.json().get("web", {}).get("results", [])
        if not results:
            return f"No results for: {query}"

        lines = [f"Results for: {query}\n"]
        for i, item in enumerate(results[:n], 1):
            lines.append(f"{i}. {item.get('title', '')}\n   {item.get('url', '')}")
            if desc := item.get("description"):
                lines.append(f"   {desc}")
        return "\n".join(lines)

Sistema de Subagentes

O SubagentManager é uma das características mais poderosas do Nanobot, permitindo a execução de tarefas complexas em segundo plano. Implementado em /nanobot/agent/subagent.py, ele gerencia instâncias leves de agente que rodam de forma assíncrona para completar tarefas específicas sem bloquear o agente principal.

Subagentes são criados através do método spawn que recebe a descrição da tarefa, um label opcional para identificação, e informações de origem para saber onde anunciar o resultado. Cada subagente recebe um ID único e é executado como uma tarefa asyncio independente. O agente principal pode continuar processando outras mensagens enquanto subagentes trabalham.

A grande diferença entre o agente principal e subagentes está no conjunto de ferramentas disponíveis. Subagentes não têm acesso à ferramenta message (não podem enviar mensagens diretamente aos usuários) nem à ferramenta spawn (não podem criar outros subagentes). Isto mantém subagentes focados em suas tarefas sem efeitos colaterais inesperados.

class SubagentManager:
    """
    Gerencia execução de subagentes em background.

    Subagentes são instâncias leves de agente que rodam em background
    para lidar com tarefas específicas. Compartilham o mesmo provedor
    LLM mas têm contexto isolado e prompt de sistema focado.
    """

    def __init__(
        self,
        provider: LLMProvider,
        workspace: Path,
        bus: MessageBus,
        model: str | None = None,
        brave_api_key: str | None = None,
        exec_config: ExecToolConfig | None = None,
    ):
        self.provider = provider
        self.workspace = workspace
        self.bus = bus
        self.model = model or provider.get_default_model()
        self.brave_api_key = brave_api_key
        self.exec_config = exec_config or ExecToolConfig()
        self._running_tasks: dict[str, asyncio.Task[None]] = {}

    async def spawn(
        self,
        task: str,
        label: str | None = None,
        origin_channel: str = "cli",
        origin_chat_id: str = "direct",
    ) -> str:
        """Cria subagente para executar tarefa em background."""
        task_id = str(uuid.uuid4())[:8]
        display_label = label or task[:30] + ("..." if len(task) > 30 else "")

        origin = {"channel": origin_channel, "chat_id": origin_chat_id}

        # Cria tarefa em background
        bg_task = asyncio.create_task(
            self._run_subagent(task_id, task, display_label, origin)
        )
        self._running_tasks[task_id] = bg_task

        # Limpa quando terminar
        bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))

        return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."

    def get_running_count(self) -> int:
        """Retorna número de subagentes em execução."""
        return len(self._running_tasks)

Execução e Anúncio de Subagentes

Quando um subagente é executado, ele opera de forma similar ao agente principal mas com contexto isolado. O método _run_subagent cria um registro de ferramentas próprio sem message e spawn, constrói um prompt de sistema específico que instrui o subagente sobre sua tarefa e limitações, e executa o loop de agente com máximo de 15 iterações.

O prompt do subagente é focado e direto. Ele informa a tarefa específica a ser completada, lista o que o subagente pode fazer (ler/escrever arquivos, executar comandos, buscar na web), explica o que não pode fazer (enviar mensagens diretamente, criar outros subagentes), e instrui a fornecer um resumo claro dos achados ao final.

Ao completar (com sucesso ou erro), o subagente anuncia seu resultado de volta ao agente principal através do barramento de mensagens. A mensagem de anúncio é enviada como uma InboundMessage de sistema que o agente principal processa, resumindo o resultado de forma natural para o usuário. O formato do anúncio instrui o agente a ser breve e não mencionar detalhes técnicos como "subagente".

async def _run_subagent(
    self,
    task_id: str,
    task: str,
    label: str,
    origin: dict[str, str],
) -> None:
    """Executa tarefa do subagente e anuncia resultado."""

    try:
        # Constrói ferramentas do subagente (sem message, sem spawn)
        tools = ToolRegistry()
        tools.register(ReadFileTool())
        tools.register(WriteFileTool())
        tools.register(ListDirTool())
        tools.register(ExecTool(working_dir=str(self.workspace)))
        tools.register(WebSearchTool(api_key=self.brave_api_key))
        tools.register(WebFetchTool())

        # Constrói mensagens com prompt específico de subagente
        system_prompt = self._build_subagent_prompt(task)
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": task},
        ]

        # Loop do agente (iterações limitadas)
        max_iterations = 15
        iteration = 0
        final_result = None

        while iteration < max_iterations:
            iteration += 1
            response = await self.provider.chat(
                messages=messages,
                tools=tools.get_definitions(),
                model=self.model,
            )

            if response.has_tool_calls:
                # Executa ferramentas e adiciona resultados
                for tool_call in response.tool_calls:
                    result = await tools.execute(tool_call.name, tool_call.arguments)
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": result,
                    })
            else:
                final_result = response.content
                break

        # Anuncia resultado
        await self._announce_result(task_id, label, task, final_result, origin, "ok")

    except Exception as e:
        await self._announce_result(task_id, label, task, str(e), origin, "error")

A Ferramenta Spawn

A ferramenta spawn é a interface que permite ao agente criar subagentes. Implementada em /nanobot/agent/tools/spawn.py, ela encapsula a chamada ao SubagentManager mantendo o contexto de origem atualizado para que anúncios sejam enviados ao canal e chat corretos.

O método set_context é chamado pelo AgentLoop antes de processar cada mensagem, atualizando o canal e chat_id de origem. Quando o agente decide usar a ferramenta spawn, ela passa estes valores ao SubagentManager junto com a descrição da tarefa e label opcional. A resposta imediata informa que o subagente foi iniciado e o usuário será notificado ao completar.

Este padrão de spawn-and-announce permite ao agente delegar tarefas demoradas ou complexas sem bloquear a conversa. Por exemplo, o agente pode receber um pedido de pesquisa extensa, criar um subagente para realizar a pesquisa, continuar respondendo outras perguntas, e eventualmente receber e comunicar os resultados da pesquisa quando o subagente terminar.

class SpawnTool(Tool):
    """
    Ferramenta para criar subagente para execução de tarefas em background.
    O subagente roda de forma assíncrona e anuncia resultado ao completar.
    """

    def __init__(self, manager: SubagentManager):
        self._manager = manager
        self._origin_channel = "cli"
        self._origin_chat_id = "direct"

    def set_context(self, channel: str, chat_id: str) -> None:
        """Define contexto de origem para anúncios de subagente."""
        self._origin_channel = channel
        self._origin_chat_id = chat_id

    name = "spawn"
    description = (
        "Spawn a subagent to handle a task in the background. "
        "Use this for complex or time-consuming tasks that can run independently. "
        "The subagent will complete the task and report back when done."
    )
    parameters = {
        "type": "object",
        "properties": {
            "task": {"type": "string", "description": "Task for the subagent"},
            "label": {"type": "string", "description": "Optional short label (for display)"},
        },
        "required": ["task"],
    }

    async def execute(self, task: str, label: str | None = None, **kwargs) -> str:
        """Cria subagente para executar a tarefa dada."""
        return await self._manager.spawn(
            task=task,
            label=label,
            origin_channel=self._origin_channel,
            origin_chat_id=self._origin_chat_id,
        )

Sistema de Cron Jobs

O CronService permite agendar tarefas que executam automaticamente em horários ou intervalos específicos. Implementado em /nanobot/cron/service.py, ele suporta três tipos de agendamento: execução única em momento específico (at), execução em intervalos regulares (every), e expressões cron tradicionais (cron).

Cada job é representado pela classe CronJob que contém identificador único, nome, flag de habilitado, definição de schedule, payload com mensagem e configuração de entrega, estado atual (próxima execução, última execução, status), timestamps de criação/atualização, e flag para deletar após execução (útil para lembretes únicos).

O serviço utiliza um sistema de timers assíncronos para acordar no momento exato da próxima execução. Quando um job está pronto para executar, o callback on_job é chamado (configurado durante inicialização do Gateway). Se o job tem deliver=True, a resposta é enviada ao canal e destinatário especificados. Jobs do tipo at são automaticamente desabilitados ou deletados após execução.

# Tipos de dados para o sistema de cron
@dataclass
class CronSchedule:
    """Definição de agendamento para um cron job."""
    kind: Literal["at", "every", "cron"]
    at_ms: int | None = None        # Para "at": timestamp em ms
    every_ms: int | None = None     # Para "every": intervalo em ms
    expr: str | None = None         # Para "cron": expressão (ex: "0 9 * * *")
    tz: str | None = None           # Timezone para expressões cron


@dataclass
class CronPayload:
    """O que fazer quando o job executa."""
    kind: Literal["system_event", "agent_turn"] = "agent_turn"
    message: str = ""
    deliver: bool = False           # Entregar resposta ao canal
    channel: str | None = None      # Ex: "telegram"
    to: str | None = None           # Ex: número de telefone ou chat_id


@dataclass
class CronJobState:
    """Estado de execução de um job."""
    next_run_at_ms: int | None = None
    last_run_at_ms: int | None = None
    last_status: Literal["ok", "error", "skipped"] | None = None
    last_error: str | None = None


@dataclass
class CronJob:
    """Um job agendado."""
    id: str
    name: str
    enabled: bool = True
    schedule: CronSchedule = field(default_factory=lambda: CronSchedule(kind="every"))
    payload: CronPayload = field(default_factory=CronPayload)
    state: CronJobState = field(default_factory=CronJobState)
    created_at_ms: int = 0
    updated_at_ms: int = 0
    delete_after_run: bool = False

Execução de Cron Jobs

O CronService opera através de um loop de timers. O método start carrega jobs do arquivo de persistência, recalcula próximos horários de execução e arma o primeiro timer. Quando o timer dispara, o método _on_timer identifica todos os jobs que estão prontos (next_run_at_ms <= agora), executa cada um e rearma o timer para o próximo.

A execução de um job é tratada pelo método _execute_job. Ele chama o callback on_job com o job como parâmetro, que no contexto do Gateway executa a mensagem do job através do agente. O estado do job é atualizado com timestamp da última execução, status (ok/error) e mensagem de erro se houver. Para jobs recorrentes, o próximo horário de execução é recalculado.

A persistência é feita em um arquivo JSON no diretório de dados (~/.nanobot/cron/jobs.json). A cada modificação (adicionar, remover, executar job), o arquivo é reescrito. A API pública inclui métodos para listar jobs, adicionar novo job, remover job, habilitar/desabilitar job e executar job manualmente.

class CronService:
    """Serviço para gerenciar e executar jobs agendados."""

    def __init__(
        self,
        store_path: Path,
        on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
    ):
        self.store_path = store_path
        self.on_job = on_job  # Callback para executar job
        self._store: CronStore | None = None
        self._timer_task: asyncio.Task | None = None
        self._running = False

    async def start(self) -> None:
        """Inicia o serviço de cron."""
        self._running = True
        self._load_store()
        self._recompute_next_runs()
        self._save_store()
        self._arm_timer()

    async def _on_timer(self) -> None:
        """Trata tick do timer - executa jobs prontos."""
        if not self._store:
            return

        now = _now_ms()
        due_jobs = [
            j for j in self._store.jobs
            if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
        ]

        for job in due_jobs:
            await self._execute_job(job)

        self._save_store()
        self._arm_timer()

    def add_job(
        self,
        name: str,
        schedule: CronSchedule,
        message: str,
        deliver: bool = False,
        channel: str | None = None,
        to: str | None = None,
    ) -> CronJob:
        """Adiciona novo job."""
        store = self._load_store()
        now = _now_ms()

        job = CronJob(
            id=str(uuid.uuid4())[:8],
            name=name,
            enabled=True,
            schedule=schedule,
            payload=CronPayload(
                kind="agent_turn",
                message=message,
                deliver=deliver,
                channel=channel,
                to=to,
            ),
            state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)),
            created_at_ms=now,
        )

        store.jobs.append(job)
        self._save_store()
        self._arm_timer()
        return job

Sistema de Heartbeat

O HeartbeatService implementa verificações periódicas que permitem ao agente acordar automaticamente para checar tarefas pendentes. Implementado em /nanobot/heartbeat/service.py, ele lê o arquivo HEARTBEAT.md do workspace a cada intervalo (padrão 30 minutos) e executa as instruções encontradas através do agente.

O arquivo HEARTBEAT.md serve como lista de tarefas periódicas que o agente deve verificar. Se o arquivo estiver vazio ou contiver apenas headers e comentários, o heartbeat é pulado para economizar recursos. Se houver conteúdo acionável, o agente é acordado com um prompt que instrui a ler o arquivo e seguir as instruções.

Quando o agente processa o heartbeat, ele pode determinar que não há nada a fazer e responder com HEARTBEAT_OK, que é detectado pelo serviço. Alternativamente, pode executar ações e reportar o que foi feito. O callback on_heartbeat é configurado durante inicialização do Gateway para chamar agent.process_direct.

class HeartbeatService:
    """
    Serviço de heartbeat periódico que acorda o agente para checar tarefas.

    O agente lê HEARTBEAT.md do workspace e executa tarefas listadas lá.
    Se nada precisa atenção, responde HEARTBEAT_OK.
    """

    def __init__(
        self,
        workspace: Path,
        on_heartbeat: Callable[[str], Coroutine[Any, Any, str]] | None = None,
        interval_s: int = 30 * 60,  # 30 minutos
        enabled: bool = True,
    ):
        self.workspace = workspace
        self.on_heartbeat = on_heartbeat
        self.interval_s = interval_s
        self.enabled = enabled
        self._running = False
        self._task: asyncio.Task | None = None

    @property
    def heartbeat_file(self) -> Path:
        return self.workspace / "HEARTBEAT.md"

    async def start(self) -> None:
        """Inicia o serviço de heartbeat."""
        if not self.enabled:
            return
        self._running = True
        self._task = asyncio.create_task(self._run_loop())

    async def _run_loop(self) -> None:
        """Loop principal de heartbeat."""
        while self._running:
            await asyncio.sleep(self.interval_s)
            if self._running:
                await self._tick()

    async def _tick(self) -> None:
        """Executa um tick de heartbeat."""
        content = self._read_heartbeat_file()

        # Pula se HEARTBEAT.md está vazio
        if _is_heartbeat_empty(content):
            return

        if self.on_heartbeat:
            response = await self.on_heartbeat(HEARTBEAT_PROMPT)

            # Verifica se agente disse "nada a fazer"
            if "HEARTBEATOК" in response.upper().replace("_", ""):
                pass  # OK, nada a fazer
            else:
                pass  # Tarefa completada

Sistema de Canais de Comunicação

O Nanobot suporta múltiplos canais de comunicação através de uma arquitetura extensível baseada na classe abstrata BaseChannel. Cada canal implementa métodos para iniciar, parar e enviar mensagens, além de herdar a lógica de verificação de permissões e publicação no barramento.

O método is_allowed verifica se um remetente tem permissão para usar o bot baseado na lista allow_from da configuração. Se a lista estiver vazia, todos são permitidos. O método _handle_message é chamado pelos canais após receber uma mensagem, verificando permissão e publicando no barramento se autorizado.

Os canais suportados atualmente incluem Telegram (recomendado, usa long polling, requer apenas token do bot), WhatsApp (requer Node.js, scan de QR code para vincular dispositivo) e Feishu (app empresarial chinês, usa WebSocket, não requer IP público). Cada canal tem configurações específicas mas segue a mesma interface básica.

class BaseChannel(ABC):
    """
    Classe base abstrata para implementações de canais de chat.
    Cada canal (Telegram, Discord, etc.) deve implementar esta interface.
    """

    name: str = "base"

    def __init__(self, config: Any, bus: MessageBus):
        self.config = config
        self.bus = bus
        self._running = False

    @abstractmethod
    async def start(self) -> None:
        """Inicia o canal e começa a escutar mensagens."""
        pass

    @abstractmethod
    async def stop(self) -> None:
        """Para o canal e limpa recursos."""
        pass

    @abstractmethod
    async def send(self, msg: OutboundMessage) -> None:
        """Envia mensagem através deste canal."""
        pass

    def is_allowed(self, sender_id: str) -> bool:
        """Verifica se remetente tem permissão para usar o bot."""
        allow_list = getattr(self.config, "allow_from", [])
        if not allow_list:
            return True  # Lista vazia = todos permitidos
        return str(sender_id) in allow_list

    async def _handle_message(
        self,
        sender_id: str,
        chat_id: str,
        content: str,
        media: list[str] | None = None,
        metadata: dict[str, Any] | None = None
    ) -> None:
        """Processa mensagem de entrada verificando permissões."""
        if not self.is_allowed(sender_id):
            return

        msg = InboundMessage(
            channel=self.name,
            sender_id=str(sender_id),
            chat_id=str(chat_id),
            content=content,
            media=media or [],
            metadata=metadata or {}
        )

        await self.bus.publish_inbound(msg)

Canal Telegram

O canal Telegram é implementado em /nanobot/channels/telegram.py usando a biblioteca python-telegram-bot. Ele opera em modo de long polling, eliminando a necessidade de webhook ou IP público. O bot escuta mensagens de texto, fotos, áudio, voz e documentos.

A conversão de markdown para HTML do Telegram é tratada pela função _markdown_to_telegram_html que preserva blocos de código, converte formatação básica (negrito, itálico, links) e escapa caracteres especiais. Se a conversão falhar, a mensagem é enviada como texto puro como fallback.

Para mensagens de voz, o canal suporta transcrição automática usando o serviço Whisper do Groq se configurado. O áudio é baixado, enviado para transcrição, e o texto resultante é incluído na mensagem. Fotos e outros arquivos são baixados para o diretório ~/.nanobot/media/ e seus caminhos são passados ao agente.

class TelegramChannel(BaseChannel):
    """
    Canal Telegram usando long polling.
    Simples e confiável - não requer webhook/IP público.
    """

    name = "telegram"

    def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""):
        super().__init__(config, bus)
        self.groq_api_key = groq_api_key
        self._app: Application | None = None

    async def start(self) -> None:
        """Inicia bot Telegram com long polling."""
        if not self.config.token:
            return

        self._running = True
        self._app = Application.builder().token(self.config.token).build()

        # Handler para mensagens (texto, fotos, voz, documentos)
        self._app.add_handler(
            MessageHandler(
                (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO)
                & ~filters.COMMAND,
                self._on_message
            )
        )

        await self._app.initialize()
        await self._app.start()

        bot_info = await self._app.bot.get_me()
        logger.info(f"Telegram bot @{bot_info.username} connected")

        await self._app.updater.start_polling(
            allowed_updates=["message"],
            drop_pending_updates=True
        )

        while self._running:
            await asyncio.sleep(1)

    async def send(self, msg: OutboundMessage) -> None:
        """Envia mensagem via Telegram."""
        if not self._app:
            return

        chat_id = int(msg.chat_id)
        html_content = _markdown_to_telegram_html(msg.content)

        try:
            await self._app.bot.send_message(
                chat_id=chat_id,
                text=html_content,
                parse_mode="HTML"
            )
        except Exception:
            # Fallback para texto puro
            await self._app.bot.send_message(chat_id=chat_id, text=msg.content)

Gerenciador de Canais

O ChannelManager coordena todos os canais de comunicação. Implementado em /nanobot/channels/manager.py, ele inicializa os canais habilitados na configuração, inicia/para todos os canais, e despacha mensagens de saída para o canal apropriado.

Durante inicialização, o manager verifica quais canais estão habilitados na configuração e instancia as classes correspondentes. Se uma dependência de canal não estiver instalada (por exemplo, python-telegram-bot para Telegram), um aviso é registrado mas a inicialização continua com os canais disponíveis.

O método start_all inicia uma tarefa para despachar mensagens de saída e depois inicia todos os canais configurados em paralelo usando asyncio.gather. O método _dispatch_outbound consome mensagens da fila de saída do barramento e as envia para o canal correspondente.

class ChannelManager:
    """
    Gerencia canais de chat e coordena roteamento de mensagens.

    Responsabilidades:
    - Inicializar canais habilitados (Telegram, WhatsApp, etc.)
    - Iniciar/parar canais
    - Rotear mensagens de saída
    """

    def __init__(self, config: Config, bus: MessageBus):
        self.config = config
        self.bus = bus
        self.channels: dict[str, BaseChannel] = {}
        self._dispatch_task: asyncio.Task | None = None
        self._init_channels()

    def _init_channels(self) -> None:
        """Inicializa canais baseado na configuração."""
        if self.config.channels.telegram.enabled:
            try:
                from nanobot.channels.telegram import TelegramChannel
                self.channels["telegram"] = TelegramChannel(
                    self.config.channels.telegram,
                    self.bus,
                    groq_api_key=self.config.providers.groq.api_key,
                )
            except ImportError as e:
                logger.warning(f"Telegram channel not available: {e}")

        if self.config.channels.whatsapp.enabled:
            try:
                from nanobot.channels.whatsapp import WhatsAppChannel
                self.channels["whatsapp"] = WhatsAppChannel(
                    self.config.channels.whatsapp, self.bus
                )
            except ImportError as e:
                logger.warning(f"WhatsApp channel not available: {e}")

    async def start_all(self) -> None:
        """Inicia todos os canais e o despachante de saída."""
        if not self.channels:
            return

        self._dispatch_task = asyncio.create_task(self._dispatch_outbound())

        tasks = [asyncio.create_task(ch.start()) for ch in self.channels.values()]
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _dispatch_outbound(self) -> None:
        """Despacha mensagens de saída para o canal apropriado."""
        while True:
            msg = await self.bus.consume_outbound()
            channel = self.channels.get(msg.channel)
            if channel:
                await channel.send(msg)

Sistema de Skills

O SkillsLoader gerencia habilidades que estendem as capacidades do agente. Implementado em /nanobot/agent/skills.py, ele carrega skills de dois locais: skills personalizadas no workspace do usuário (workspace/skills/) e skills empacotadas com o Nanobot (nanobot/skills/). Skills do workspace têm precedência.

Cada skill é um diretório contendo um arquivo SKILL.md com frontmatter YAML e instruções em markdown. O frontmatter define nome, descrição e metadados incluindo requisitos (binários necessários, variáveis de ambiente). O conteúdo markdown instrui o agente sobre como usar a skill.

Skills que têm requisitos não atendidos (por exemplo, binário gh não instalado para skill GitHub) são marcadas como indisponíveis mas ainda aparecem no resumo com indicação do que precisa ser instalado. O agente pode tentar instalar dependências ou informar o usuário sobre o que está faltando.

class SkillsLoader:
    """
    Carregador de skills do agente.
    Skills são arquivos markdown (SKILL.md) que ensinam o agente
    a usar ferramentas específicas ou realizar certas tarefas.
    """

    def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
        self.workspace = workspace
        self.workspace_skills = workspace / "skills"
        self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR

    def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
        """Lista todas as skills disponíveis."""
        skills = []

        # Skills do workspace (maior prioridade)
        if self.workspace_skills.exists():
            for skill_dir in self.workspace_skills.iterdir():
                if skill_dir.is_dir():
                    skill_file = skill_dir / "SKILL.md"
                    if skill_file.exists():
                        skills.append({
                            "name": skill_dir.name,
                            "path": str(skill_file),
                            "source": "workspace"
                        })

        # Skills built-in
        if self.builtin_skills and self.builtin_skills.exists():
            for skill_dir in self.builtin_skills.iterdir():
                if skill_dir.is_dir():
                    skill_file = skill_dir / "SKILL.md"
                    if skill_file.exists():
                        # Não duplica se já existe no workspace
                        if not any(s["name"] == skill_dir.name for s in skills):
                            skills.append({
                                "name": skill_dir.name,
                                "path": str(skill_file),
                                "source": "builtin"
                            })

        if filter_unavailable:
            return [s for s in skills if self._check_requirements(self._get_skill_meta(s["name"]))]
        return skills

    def build_skills_summary(self) -> str:
        """Constrói resumo de skills em formato XML para o prompt."""
        all_skills = self.list_skills(filter_unavailable=False)
        if not all_skills:
            return ""

        lines = ["<skills>"]
        for s in all_skills:
            name = s["name"]
            desc = self._get_skill_description(name)
            available = self._check_requirements(self._get_skill_meta(name))
            lines.append(f'  <skill available="{str(available).lower()}">')
            lines.append(f"    <name>{name}</name>")
            lines.append(f"    <description>{desc}</description>")
            lines.append(f"    <location>{s['path']}</location>")
            lines.append(f"  </skill>")
        lines.append("</skills>")
        return "\n".join(lines)

Skills Empacotadas

O Nanobot inclui várias skills empacotadas que demonstram o sistema e fornecem funcionalidades úteis imediatamente. A skill github instrui o agente a usar o CLI gh para interagir com GitHub, incluindo exemplos de como verificar status de CI, listar PRs e usar a API para queries avançadas.

A skill tmux permite ao agente controlar sessões tmux remotamente, útil para rodar processos interativos de longa duração. Ela inclui scripts auxiliares para encontrar sessões e aguardar padrões específicos na saída. A skill documenta como usar sockets isolados para evitar conflitos e como orquestrar múltiplos agentes de código em paralelo.

A skill weather fornece instruções para consultar previsão do tempo. A skill cron documenta como usar a ferramenta de agendamento. A skill summarize instrui técnicas de sumarização. A skill skill-creator ensina o agente a criar novas skills para si mesmo.

# Exemplo: Skill GitHub (SKILL.md)
---
name: github
description: "Interact with GitHub using the gh CLI."
metadata: {"nanobot":{"requires":{"bins":["gh"]}}}
---

# GitHub Skill

Use the `gh` CLI to interact with GitHub.

## Pull Requests

Check CI status on a PR:
```bash
gh pr checks 55 --repo owner/repo
```

List recent workflow runs:
```bash
gh run list --repo owner/repo --limit 10
```

## API for Advanced Queries

Get PR with specific fields:
```bash
gh api repos/owner/repo/pulls/55 --jq '.title, .state'
```

Provedores de LLM

O Nanobot suporta múltiplos provedores de modelos de linguagem através do LiteLLM, uma biblioteca que fornece interface unificada para diversos serviços. O LiteLLMProvider em /nanobot/providers/litellm_provider.py implementa a interface abstrata LLMProvider e configura automaticamente o ambiente baseado no provedor detectado.

Os provedores suportados incluem OpenRouter (recomendado, acesso a múltiplos modelos com uma chave), Anthropic (Claude diretamente), OpenAI (GPT diretamente), DeepSeek, Gemini (Google), Groq (também usado para transcrição de voz), Zhipu/GLM (modelos chineses) e vLLM (modelos locais).

A detecção do provedor é feita automaticamente baseada no prefixo da chave de API ou URL base. Chaves começando com sk-or- indicam OpenRouter. Uma URL base customizada indica vLLM ou endpoint compatível com OpenAI. O nome do modelo também é usado para detectar provedor (por exemplo, modelos contendo "anthropic" configuram a chave da Anthropic).

class LiteLLMProvider(LLMProvider):
    """
    Provedor LLM usando LiteLLM para suporte multi-provedor.
    Suporta OpenRouter, Anthropic, OpenAI, Gemini e outros.
    """

    def __init__(
        self,
        api_key: str | None = None,
        api_base: str | None = None,
        default_model: str = "anthropic/claude-opus-4-5"
    ):
        super().__init__(api_key, api_base)
        self.default_model = default_model

        # Detecta OpenRouter pelo prefixo da chave ou api_base
        self.is_openrouter = (
            (api_key and api_key.startswith("sk-or-")) or
            (api_base and "openrouter" in api_base)
        )

        # Configura LiteLLM baseado no provedor
        if api_key:
            if self.is_openrouter:
                os.environ["OPENROUTER_API_KEY"] = api_key
            elif "anthropic" in default_model:
                os.environ.setdefault("ANTHROPIC_API_KEY", api_key)
            elif "openai" in default_model or "gpt" in default_model:
                os.environ.setdefault("OPENAI_API_KEY", api_key)

    async def chat(
        self,
        messages: list[dict[str, Any]],
        tools: list[dict[str, Any]] | None = None,
        model: str | None = None,
        max_tokens: int = 4096,
        temperature: float = 0.7,
    ) -> LLMResponse:
        """Envia requisição de chat completion via LiteLLM."""
        model = model or self.default_model

        # Para OpenRouter, prefixa nome do modelo
        if self.is_openrouter and not model.startswith("openrouter/"):
            model = f"openrouter/{model}"

        kwargs = {
            "model": model,
            "messages": messages,
            "max_tokens": max_tokens,
            "temperature": temperature,
        }

        if tools:
            kwargs["tools"] = tools
            kwargs["tool_choice"] = "auto"

        response = await acompletion(**kwargs)
        return self._parse_response(response)

Configuração do Sistema

A configuração do Nanobot é centralizada em um arquivo JSON localizado em ~/.nanobot/config.json. O schema é definido usando Pydantic em /nanobot/config/schema.py, proporcionando validação automática e valores padrão. A classe Config é o ponto de entrada que agrupa todas as seções de configuração.

As seções principais incluem providers para credenciais dos provedores de LLM, channels para configuração de canais de comunicação, agents para configurações padrão do agente, gateway para host e porta do servidor, e tools para configuração de ferramentas específicas como busca web e execução de shell.

O método get_api_key retorna a primeira chave de API configurada em ordem de prioridade (OpenRouter > DeepSeek > Anthropic > OpenAI > Gemini > Groq). O método get_api_base retorna a URL base apropriada para provedores que a requerem (OpenRouter, vLLM). Variáveis de ambiente com prefixo NANOBOT_ podem sobrescrever valores do arquivo.

# Schema de configuração
class Config(BaseSettings):
    """Configuração raiz do nanobot."""
    agents: AgentsConfig = Field(default_factory=AgentsConfig)
    channels: ChannelsConfig = Field(default_factory=ChannelsConfig)
    providers: ProvidersConfig = Field(default_factory=ProvidersConfig)
    gateway: GatewayConfig = Field(default_factory=GatewayConfig)
    tools: ToolsConfig = Field(default_factory=ToolsConfig)

    @property
    def workspace_path(self) -> Path:
        """Obtém caminho expandido do workspace."""
        return Path(self.agents.defaults.workspace).expanduser()

    def get_api_key(self) -> str | None:
        """Obtém chave de API em ordem de prioridade."""
        return (
            self.providers.openrouter.api_key or
            self.providers.deepseek.api_key or
            self.providers.anthropic.api_key or
            self.providers.openai.api_key or
            self.providers.gemini.api_key or
            self.providers.groq.api_key or
            self.providers.vllm.api_key or
            None
        )

    class Config:
        env_prefix = "NANOBOT_"


class AgentDefaults(BaseModel):
    """Configuração padrão do agente."""
    workspace: str = "~/.nanobot/workspace"
    model: str = "anthropic/claude-opus-4-5"
    max_tokens: int = 8192
    temperature: float = 0.7
    max_tool_iterations: int = 20

Exemplo de Configuração

A configuração típica do Nanobot é simples e direta. A seção providers precisa apenas da chave de API do provedor escolhido. A seção agents permite customizar o modelo padrão. A seção channels habilita e configura os canais de comunicação desejados. A seção tools configura ferramentas específicas como busca web.

{
  "providers": {
    "openrouter": {
      "apiKey": "sk-or-v1-sua-chave-aqui"
    },
    "groq": {
      "apiKey": "gsk_sua-chave-groq"
    }
  },
  "agents": {
    "defaults": {
      "model": "anthropic/claude-opus-4-5",
      "workspace": "~/.nanobot/workspace",
      "maxTokens": 8192,
      "temperature": 0.7
    }
  },
  "channels": {
    "telegram": {
      "enabled": true,
      "token": "123456789:ABCdefGHIjklMNOpqrsTUVwxyz",
      "allowFrom": ["seu_user_id"]
    },
    "whatsapp": {
      "enabled": false
    },
    "feishu": {
      "enabled": false
    }
  },
  "tools": {
    "web": {
      "search": {
        "apiKey": "BSA-sua-chave-brave-search"
      }
    },
    "exec": {
      "timeout": 60,
      "restrictToWorkspace": false
    }
  }
}

Interface de Linha de Comando

O Nanobot fornece uma CLI completa implementada com Typer em /nanobot/cli/commands.py. Os comandos principais incluem onboard para inicialização, agent para interação direta, gateway para iniciar o servidor com canais, status para verificar configuração, e subcomandos para channels e cron.

O comando nanobot onboard cria a estrutura inicial de diretórios e arquivos de configuração. Ele gera o arquivo config.json com valores padrão, cria o diretório workspace com arquivos de bootstrap (AGENTS.md, SOUL.md, USER.md), e cria o diretório de memória com MEMORY.md inicial.

O comando nanobot agent permite interação direta com o agente. Com a flag -m "mensagem" executa uma única mensagem e retorna. Sem a flag, entra em modo interativo onde o usuário pode conversar continuamente até pressionar Ctrl+C. Este modo é útil para testes e uso local sem configurar canais de comunicação.

# Comandos principais da CLI

# Inicialização - cria config e workspace
nanobot onboard

# Chat com agente (mensagem única)
nanobot agent -m "Qual é a capital do Brasil?"

# Chat interativo
nanobot agent

# Iniciar gateway (servidor com canais)
nanobot gateway

# Verificar status
nanobot status

# Gerenciar canais
nanobot channels status
nanobot channels login  # Para WhatsApp

# Gerenciar cron jobs
nanobot cron list
nanobot cron add --name "diario" --message "Bom dia!" --cron "0 9 * * *"
nanobot cron remove <job_id>

O Comando Gateway

O comando nanobot gateway é o ponto de entrada principal para execução em produção. Ele inicializa todos os componentes do sistema: carrega configuração, cria o MessageBus, instancia o provedor LLM, cria o CronService, instancia o AgentLoop com todas as dependências, configura o HeartbeatService, e inicializa o ChannelManager com os canais habilitados.

O callback do cron é configurado para executar jobs através do método agent.process_direct, opcionalmente entregando respostas aos canais configurados. O callback do heartbeat similarmente usa process_direct para executar verificações periódicas. Todos os componentes são então iniciados em paralelo usando asyncio.gather.

O gateway continua rodando até receber um sinal de interrupção (Ctrl+C), momento em que para ordenadamente todos os serviços: heartbeat, cron, agent loop e canais. Esta inicialização cuidadosa garante que todos os componentes estejam configurados corretamente antes de começar a processar mensagens.

@app.command()
def gateway(
    port: int = typer.Option(18790, "--port", "-p"),
    verbose: bool = typer.Option(False, "--verbose", "-v"),
):
    """Inicia o gateway do nanobot."""
    config = load_config()

    # Cria componentes
    bus = MessageBus()

    api_key = config.get_api_key()
    api_base = config.get_api_base()

    provider = LiteLLMProvider(
        api_key=api_key,
        api_base=api_base,
        default_model=config.agents.defaults.model
    )

    # Cria serviço de cron
    cron_store_path = get_data_dir() / "cron" / "jobs.json"
    cron = CronService(cron_store_path)

    # Cria agente com serviço de cron
    agent = AgentLoop(
        bus=bus,
        provider=provider,
        workspace=config.workspace_path,
        model=config.agents.defaults.model,
        brave_api_key=config.tools.web.search.api_key or None,
        cron_service=cron,
    )

    # Configura callback do cron
    async def on_cron_job(job: CronJob) -> str | None:
        response = await agent.process_direct(job.payload.message)
        if job.payload.deliver and job.payload.to:
            await bus.publish_outbound(OutboundMessage(
                channel=job.payload.channel or "cli",
                chat_id=job.payload.to,
                content=response or ""
            ))
        return response
    cron.on_job = on_cron_job

    # Cria serviço de heartbeat
    heartbeat = HeartbeatService(
        workspace=config.workspace_path,
        on_heartbeat=lambda p: agent.process_direct(p),
        interval_s=30 * 60,
    )

    # Cria gerenciador de canais
    channels = ChannelManager(config, bus)

    # Executa
    async def run():
        await cron.start()
        await heartbeat.start()
        await asyncio.gather(agent.run(), channels.start_all())

    asyncio.run(run())

Arquivos de Bootstrap do Workspace

O workspace do Nanobot contém arquivos de bootstrap que definem o comportamento do agente. O arquivo AGENTS.md contém instruções operacionais: diretrizes de comportamento, lista de ferramentas disponíveis, como usar memória, como agendar lembretes e como gerenciar tarefas periódicas via HEARTBEAT.md.

O arquivo SOUL.md define a personalidade do agente incluindo traços de personalidade (útil, conciso, curioso), valores (precisão, privacidade, transparência) e estilo de comunicação. Este arquivo permite customizar completamente como o agente se apresenta e interage com usuários.

O arquivo USER.md armazena informações sobre o usuário que o agente deve lembrar: preferências de comunicação, fuso horário, idioma preferido e outros detalhes pessoais. O agente pode atualizar este arquivo conforme aprende mais sobre o usuário ao longo das conversas.

# AGENTS.md - Instruções Operacionais

You are a helpful AI assistant. Be concise, accurate, and friendly.

## Guidelines
- Always explain what you're doing before taking actions
- Ask for clarification when the request is ambiguous
- Use tools to help accomplish tasks
- Remember important information in your memory files

## Tools Available
- File operations (read, write, edit, list)
- Shell commands (exec)
- Web access (search, fetch)
- Messaging (message)
- Background tasks (spawn)

## Memory
- Use `memory/` directory for daily notes
- Use `MEMORY.md` for long-term information

## Scheduled Reminders
When user asks for a reminder at a specific time, use the cron tool.
Get USER_ID and CHANNEL from the current session.

## Heartbeat Tasks
`HEARTBEAT.md` is checked every 30 minutes.
Add periodic tasks by editing this file.

Instalação e Primeiros Passos

A instalação do Nanobot pode ser feita de três formas. Para desenvolvimento, clone o repositório e instale em modo editável com pip install -e .. Para uso rápido, instale via uv com uv tool install nanobot-ai. Para instalação estável, use PyPI com pip install nanobot-ai. O Python 3.11 ou superior é necessário.

Após instalação, execute nanobot onboard para criar a estrutura inicial. Em seguida, edite ~/.nanobot/config.json para adicionar sua chave de API (recomendado OpenRouter). Para testar, execute nanobot agent -m "Olá!" e verifique se recebe uma resposta.

Para uso com canais de comunicação, configure o canal desejado no arquivo de configuração e execute nanobot gateway. Para Telegram, basta adicionar o token do bot e opcionalmente a lista de IDs permitidos. Para WhatsApp, primeiro execute nanobot channels login para escanear o QR code e vincular o dispositivo.

# Instalação do Nanobot

# Opção 1: Do código-fonte (desenvolvimento)
git clone https://github.com/HKUDS/nanobot.git
cd nanobot
pip install -e .

# Opção 2: Via uv (rápido)
uv tool install nanobot-ai

# Opção 3: Via PyPI (estável)
pip install nanobot-ai

# Inicialização
nanobot onboard

# Editar configuração para adicionar API key
# vim ~/.nanobot/config.json

# Testar
nanobot agent -m "Olá, como você está?"

# Executar com canais de comunicação
nanobot gateway

Execução com Docker

O Nanobot pode ser executado em container Docker para facilitar deployment. O projeto inclui um Dockerfile que cria uma imagem com todas as dependências. O diretório de configuração é montado como volume para persistir dados entre execuções.

Para construir a imagem, execute docker build -t nanobot . no diretório do projeto. Para inicializar a configuração, execute docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard. Após editar o arquivo de configuração no host, execute docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway para iniciar o gateway.

O volume montado em ~/.nanobot garante que configurações, sessões, memória e jobs de cron persistam entre reinicializações do container. A porta 18790 é exposta para eventual comunicação de rede, embora os canais principais (Telegram, WhatsApp, Feishu) usem conexões de saída que não requerem exposição de portas.

# Executar Nanobot com Docker

# Construir imagem
docker build -t nanobot .

# Inicializar configuração (primeira vez)
docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard

# Editar config no host para adicionar API keys
vim ~/.nanobot/config.json

# Executar gateway (conecta a Telegram/WhatsApp)
docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway

# Executar comando único
docker run -v ~/.nanobot:/root/.nanobot --rm nanobot agent -m "Hello!"

# Verificar status
docker run -v ~/.nanobot:/root/.nanobot --rm nanobot status

Comparação com OpenClaw

O Nanobot foi inspirado pelo OpenClaw (anteriormente Clawdbot) mas adota uma filosofia fundamentalmente diferente. Enquanto o OpenClaw é um sistema empresarial com mais de 430.000 linhas de código suportando múltiplos agentes, protocolo RPC complexo e centenas de configurações, o Nanobot foca em entregar funcionalidade essencial em aproximadamente 4.000 linhas.

Esta diferença de abordagem resulta em trade-offs claros. O Nanobot é mais fácil de entender, modificar e debugar. A inicialização é mais rápida e o consumo de recursos é menor. Por outro lado, funcionalidades avançadas como múltiplos agentes coordenados, sistema de notificações elaborado e gateway WebSocket RPC não estão presentes.

Para pesquisadores e desenvolvedores que querem entender como agentes de IA funcionam internamente, o Nanobot oferece uma base de código acessível. Para uso pessoal com um único agente, oferece todas as funcionalidades necessárias. Para equipes ou uso empresarial com múltiplos agentes coordenados, o OpenClaw pode ser mais apropriado.

Limitações e Considerações

O Nanobot, como qualquer sistema de IA, tem limitações importantes. O modelo de linguagem pode gerar informações incorretas ou executar ações não intencionadas. A ferramenta de execução de shell, apesar das proteções, pode potencialmente causar danos se mal utilizada. Supervisão humana continua sendo necessária especialmente para ações com consequências significativas.

O sistema de memória é baseado em arquivos simples sem indexação vetorial ou busca semântica. Para grandes volumes de informação, a recuperação pode não ser ideal. O histórico de sessão é limitado às últimas 50 mensagens por padrão, o que pode perder contexto em conversas muito longas.

Subagentes compartilham o mesmo modelo e credenciais do agente principal, o que significa que custos de API podem crescer rapidamente com uso intensivo de spawn. Não há mecanismo de rate limiting ou orçamento embutido - isto deve ser gerenciado externamente se necessário.

Conclusão

O Nanobot demonstra que é possível construir um assistente de IA pessoal funcional e capaz com código enxuto e arquitetura clara. Através de aproximadamente 4.000 linhas de Python bem estruturado, o sistema oferece loop de agente com uso de ferramentas, persistência de sessões e memória, integração com múltiplos canais de comunicação, execução de tarefas em background via subagentes, agendamento de tarefas com cron, e verificações periódicas via heartbeat.

A arquitetura modular baseada em componentes desacoplados facilita tanto o entendimento do código quanto sua extensão. O barramento de mensagens separa canais do núcleo do agente. O registro de ferramentas permite adicionar novas capacidades facilmente. O sistema de skills oferece uma forma documentada de ensinar novas habilidades ao agente.

Para desenvolvedores interessados em criar seus próprios assistentes de IA ou pesquisadores estudando arquiteturas de agentes, o Nanobot oferece um ponto de partida acessível e completo. O código está disponível em https://github.com/HKUDS/nanobot sob licença MIT, permitindo uso, modificação e distribuição livre. Com as ferramentas e conceitos apresentados neste artigo, é possível construir assistentes personalizados que atendam necessidades específicas enquanto mantêm a simplicidade e clareza que caracterizam o projeto.