nanobot模块设计详解:从Channels到Providers的完整架构
nanobot模块设计详解:从Channels到Providers的完整架构
nanobot模块设计详解:从Channels到Providers的完整架构
引言
在前两篇文章中,我们介绍了nanobot的基本概念和核心架构。本文将深入探讨nanobot的模块化设计,详细分析各个核心组件的实现原理和交互方式。
项目结构概览
nanobot的代码结构非常清晰,体现了其”超轻量级”的设计哲学:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
nanobot/
├── agent/ # 🧠 核心代理逻辑
│ ├── loop.py # 代理循环 (LLM ↔ 工具执行)
│ ├── context.py # 提示词构建器
│ ├── memory.py # 持久化记忆
│ ├── skills.py # 技能加载器
│ ├── subagent.py # 后台任务执行
│ └── tools/ # 内置工具 (包含spawn)
├── skills/ # 🎯 捆绑技能 (github, weather, tmux...)
├── channels/ # 📱 聊天渠道集成
├── bus/ # 🚌 消息路由
├── cron/ # ⏰ 定时任务
├── heartbeat/ # 💓 主动唤醒
├── providers/ # 🤖 LLM提供商 (OpenRouter等)
├── session/ # 💬 对话会话
├── config/ # ⚙️ 配置管理
└── cli/ # 🖥️ 命令行接口
Channels模块深度解析
设计理念
Channels模块是nanobot与外部世界通信的桥梁。每个聊天平台(Telegram、Discord、WhatsApp等)都有对应的channel实现。
核心接口
所有channel都遵循统一的接口设计:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BaseChannel:
def __init__(self, config: ChannelConfig):
self.config = config
self.is_running = False
async def start(self):
"""启动channel连接"""
raise NotImplementedError
async def stop(self):
"""停止channel连接"""
raise NotImplementedError
async def send_message(self, message: Message):
"""发送消息"""
raise NotImplementedError
async def handle_incoming(self, message: Message):
"""处理入站消息"""
# 将消息转发到bus进行路由
await self.bus.route(message)
具体实现示例:Telegram Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# channels/telegram.py
import asyncio
from telegram import Bot
from telegram.ext import Application, MessageHandler, filters
class TelegramChannel(BaseChannel):
def __init__(self, config: TelegramConfig):
super().__init__(config)
self.bot_token = config.token
self.allow_from = config.allow_from or []
self.application = None
async def start(self):
"""启动Telegram bot"""
self.application = Application.builder().token(self.bot_token).build()
# 添加消息处理器
self.application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message)
)
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling()
self.is_running = True
async def _handle_message(self, update, context):
"""处理入站消息"""
user_id = str(update.effective_user.id)
# 安全检查:只允许配置的用户
if self.allow_from and user_id not in self.allow_from:
return
message = Message(
channel="telegram",
user_id=user_id,
text=update.message.text,
timestamp=update.message.date
)
await self.bus.route(message)
Providers模块分析
Provider Registry设计
nanobot使用Provider Registry作为单一事实源,这是其支持多LLM提供商的关键:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# providers/registry.py
from dataclasses import dataclass
from typing import Tuple, Optional
@dataclass
class ProviderSpec:
name: str # 配置字段名
keywords: Tuple[str, ...] # 模型名关键词,用于自动匹配
env_key: str # LiteLLM的环境变量
display_name: str # 在status中显示的名称
litellm_prefix: str # 自动前缀:model → provider/model
skip_prefixes: Tuple[str, ...] = () # 不要重复前缀
# 所有支持的提供商
PROVIDERS = [
ProviderSpec(
name="openrouter",
keywords=("openrouter", "claude", "gpt", "mistral"),
env_key="OPENROUTER_API_KEY",
display_name="OpenRouter",
litellm_prefix="openrouter"
),
ProviderSpec(
name="anthropic",
keywords=("claude", "anthropic"),
env_key="ANTHROPIC_API_KEY",
display_name="Anthropic",
litellm_prefix="anthropic"
),
# ... 其他提供商
]
动态提供商加载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# providers/loader.py
def get_provider_for_model(model_name: str) -> Optional[ProviderSpec]:
"""根据模型名自动选择提供商"""
for provider in PROVIDERS:
if any(keyword in model_name.lower() for keyword in provider.keywords):
return provider
return None
def configure_provider(provider_spec: ProviderSpec, config: dict):
"""配置提供商"""
# 设置环境变量
if provider_spec.env_key:
os.environ[provider_spec.env_key] = config.get('apiKey', '')
# 处理API基础URL
if 'apiBase' in config:
os.environ[f"{provider_spec.name.upper()}_API_BASE"] = config['apiBase']
Agent模块核心逻辑
代理循环 (loop.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# agent/loop.py
class AgentLoop:
def __init__(self, config: AgentConfig):
self.config = config
self.context_builder = ContextBuilder()
self.tool_executor = ToolExecutor()
self.memory = MemoryManager()
async def run(self, message: Message) -> Response:
"""主代理循环"""
# 1. 构建上下文
context = await self.context_builder.build(message, self.memory)
# 2. 调用LLM
llm_response = await self._call_llm(context)
# 3. 处理工具调用
if llm_response.has_tool_calls():
tool_results = await self.tool_executor.execute(llm_response.tool_calls)
# 递归调用,将工具结果作为新上下文
return await self.run_with_tool_results(message, tool_results)
# 4. 返回最终响应
return llm_response.content
上下文构建 (context.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# agent/context.py
class ContextBuilder:
def __init__(self):
self.prompt_templates = load_prompt_templates()
async def build(self, message: Message, memory: MemoryManager) -> str:
"""构建完整的提示词上下文"""
# 系统提示词
system_prompt = self.prompt_templates['system']
# 用户记忆
user_memory = await memory.get_relevant(message.text)
# 会话历史
session_history = await self._get_session_history(message)
# 构建完整上下文
context = f"""
{system_prompt}
# 用户记忆
{user_memory}
# 会话历史
{session_history}
# 当前消息
用户: {message.text}
助手:"""
return context.strip()
Bus模块:消息路由中枢
Bus模块是nanobot的消息路由中枢,负责将不同channel的消息路由到正确的处理逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# bus/router.py
class MessageBus:
def __init__(self):
self.routes = {}
self.agent_loop = AgentLoop()
async def route(self, message: Message):
"""路由消息到正确的处理器"""
# 根据channel类型路由
if message.channel == "telegram":
await self._handle_telegram_message(message)
elif message.channel == "discord":
await self._handle_discord_message(message)
# ... 其他channel
# 统一处理:发送到代理循环
response = await self.agent_loop.run(message)
# 发送响应回原channel
await self._send_response(message.channel, message.user_id, response)
配置系统设计
nanobot的配置系统采用分层设计,支持灵活的配置覆盖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"agents": {
"defaults": {
"model": "anthropic/claude-opus-4-5",
"provider": "openrouter",
"workspace": "~/.nanobot/workspace"
}
},
"providers": {
"openrouter": {
"apiKey": "sk-or-v1-xxx"
}
},
"channels": {
"telegram": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"allowFrom": ["YOUR_USER_ID"]
}
},
"tools": {
"restrictToWorkspace": true
}
}
安全设计
nanobot在安全方面做了以下考虑:
- 访问控制:每个channel都支持
allowFrom配置,限制可交互的用户 - 工作区限制:通过
tools.restrictToWorkspace选项限制文件操作范围 - 默认安全:空的
allowFrom列表默认拒绝所有访问(v0.1.4.post4+)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# security.py
def check_access(channel: str, user_id: str, config: dict) -> bool:
"""检查用户访问权限"""
allow_list = config.get('channels', {}).get(channel, {}).get('allowFrom', [])
# 空列表默认拒绝所有(安全第一)
if not allow_list:
return False
# 允许所有用户
if "*" in allow_list:
return True
# 检查特定用户
return user_id in allow_list
总结
nanobot的模块化设计体现了”简单但不简陋”的哲学。每个模块都有明确的职责边界,通过清晰的接口进行交互。这种设计使得:
- 易于理解:代码结构清晰,新人可以快速上手
- 易于扩展:添加新的channel或provider只需要实现相应接口
- 易于维护:各模块解耦,修改一个模块不会影响其他部分
- 易于调试:每个模块都可以独立测试和调试
在下一篇文章中,我们将深入探讨nanobot的关键代码实现,包括代理循环、工具执行和内存管理的具体细节。
本文由作者按照 CC BY 4.0 进行授权