光宇
24.33M · 2026-03-31
0x00 概要
0x01 OpenClaw 基础
0x02 Nanobot 基础
0x03 Nanobot 总体架构
0x04 Nanobot 消息分发机制详解
0xFF 参考
OpenClaw 应该有几十万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。其核心定位如下,非常适合学习Agent架构:
注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
我们首先来看看 OpenClaw 的基础概念,能让我们在后续利用 Nanobot 学习更加顺利。
OpenClaw 是 Harness,是面向个人与本地场景的、开箱即用的 Agent Harness 框架。它不生产模型,而是把模型 “套上马具”,让模型能稳定、安全、自主地在本地执行真实任务。或者说,OpenClaw 是 Agent 中"不是 AI 的部分",而 Agent 的实际"聪明程度"完全取决于背后接入的语言模型。
智能体 = 模型 + 控制壳(Harness) 。
Harness 是包裹在 LLM 之外、负责让 Agent 稳定、可控、可落地执行任务的全套基础设施层,模型提供智能,控制壳让智能变得可用。或者说,Harness 是 Agent 在特定领域工作所需要的一切:Harness = (推理·上下文·记忆·状态) + (工具·编排·闭环) + Knowledge + Observation + Action Interfaces + Permissions,即:智能管理层 + 执行调度层 + 领域知识层 + 反馈观测层 + 安全权限层:
OpenClaw 的架构与能力完全符合 Harness 的定义,是 Harness 在个人场景的落地:
| Harness 核心能力 | OpenClaw 对应实现 |
|---|---|
| 模型无关的执行层 | 不绑定模型,统一接口对接各类 LLM |
| 任务编排与执行闭环 | 自然语言→拆解→工具调用→反馈→持久化 |
| 工具 / 系统调用管控 | 本地 Shell、文件、浏览器、API 调用网关 |
| 上下文与记忆管理 | 会话记忆、长期偏好、跨会话状态持久化 |
| 安全与护栏 | 权限控制、操作审计、本地数据隔离 |
| 可扩展生态 | ClawHub 技能市场、插件化扩展 |
OpenClaw 的架构可以概括为一个以Gateway(网关)为核心的控制平面的分布式系统,OpenClaw 的核心不是模型,而是网关(Gateway)。
OpenClaw 本质上是一个围绕集中式控制平面构建的、事件驱动的、会话隔离的单写入状态机,其整体架构是以网关为中心的星型拓扑:
下面两个图可以展示其架构。
OpenClaw 精简架构-1
OpenClaw 精简架构-2
OpenClaw 的关键组件如下:
Channels(频道 / 用户接入层):
ChannelPlugin 适配器合同。核心系统不需要知道“某个平台的私有细节”,只要按统一接口调用即可。账号生命周期由 ChannelManager 统一管理。Gateway(控制平面 / 信息调度中心):
智能体运行时(agent runtime / 思考核心):
run.ts) 负责错误处理、重试、profile轮换;尝试层 (attempt.ts) 负责单次LLM调用的完整生命周期;事件订阅 (subscribe.ts) 处理流式响应和工具调用。Nodes & Apps:
Nanobot遵循两条核心原则:
因此,Nanobot的核心特色如下:
以下是 Nanobot 的技术栈。
| 组件 | 技术/库 |
|---|---|
| 核心语言 | Python ≥ 3.11 |
| CLI 工具 | Typer |
| LLM 接入 | LiteLLM (支持多个提供商) |
| 配置管理 | Pydantic 2.x |
| 日志 | Loguru |
| WebSocket | websockets, websocket-client |
| HTTP 客户端 | httpx |
| OAuth | oauth-cli-kit |
| T@elegrimm | python-telegram-bot |
| Discord | Discord.py (通过 Slack SDK) |
| 飞书 | lark-oapi |
| 钉钉 | dingtalk-stream |
| Slack | slack-sdk |
| QQ 机器人 | qq-botpy |
| 定时任务 | croniter |
| WhatsApp Bridge | Node.js + TypeScript + Baileys |
| MCP 支持 | mcp (Model Context Protocol) |
| Markdown 渲染 | rich |
| 代码规范 | Ruff |
以下是 Nanobot 的主要目录结构。
nanobot-main/
├── nanobot/ # 核心包目录
│ ├── agent/ # 核心代理逻辑
│ │ ├── loop.py # 代理循环 (LLM ↔ 工具执行)
│ │ ├── context.py # 提示构建器
│ │ ├── memory.py # 持久化记忆
│ │ ├── skills.py # 技能加载器
│ │ ├── subagent.py # 后台任务执行
│ │ └── tools/ # 内置工具
│ ├── skills/ # 捆绑技能 (github, weather, tmux...)
│ │ ├── clawhub/
│ │ ├── cron/
│ │ ├── github/
│ │ ├── memory/
│ │ ├── skill-creator/
│ │ ├── summarize/
│ │ ├── tmux/
│ │ └── weather/
│ ├── channels/ # 聊天渠道集成
│ ├── bus/ # 消息路由
│ ├── cron/ # ⏰ 定时任务
│ ├── heartbeat/ # 主动唤醒
│ ├── providers/ # LLM 提供商配置
│ ├── session/ # 会话管理
│ ├── config/ # ️ 配置处理
│ └── cli/ # ️ 命令行界面
├── bridge/ # Node.js WhatsApp 桥接器
│ ├── package.json
│ └── tsconfig.json
├── tests/ # 测试目录
├── case/ # 案例展示 (GIF)
├── pyproject.toml # Python 项目配置
├── Dockerfile # Docker 镜像配置
├── docker-compose.yml # Docker Compose 配置
├── README.md # 项目文档
└── SECURITY.md # 安全文档
以下是 Nanobot 支持的平台。
聊天渠道:T@elegrimm、Discord、WhatsApp、飞书、Moch@t、钉钉、Slack、Email、QQ、Matrix
LLM 提供商:OpenRouter、Anthropic (Claude)、OpenAI (GPT)、DeepSeek、Groq、Gemini、MiniMax、AiHubMix、SiliconFlow、VolcEngine、通义千问 (Dashscope)、Moonshot (Kimi)、智谱 (Zhipu)、vLLM、OpenAI Codex、GitHub Copilot
Nanobot 的架构特点如下:
消息驱动架构:通过 MessageBus 实现渠道与 Agent 的解耦
核心引擎:AgentLoop是核心处理引擎,负责 LLM 与工具执行的循环
多渠道支持:通过 ChannelManager 统一管理 10+ 种聊天平台
可扩展性:
持久化:SessionManager和 MemoryStore 负责会话和记忆的持久化
后台任务:CronService 和 Heartbeat提供定时任务和主动唤醒功能
Nanobot 的架构图如下:
nanobot_arch
对应各组件职责为:
Gateway:
Channel(如QQchannel):
AgentLoop:
MessageBus:
对应具体代码逻辑为
总体架构-1
我们梳理下Nanobot 消息分发机制,后续文章会针对各个环节进行解析学习。
nanobot 采用 异步消息总线 架构,实现消息的解耦分发:
nanobot 的消息处理流程图如下:
Gateway启动所有服务:启动 AgentLoop、所有 Channels、CronService 和 HeartbeatService
Channels 与具体平台(QQ、T@elegrimm等)对接,将消息标准化后发送到MessageBus
MessageBus 解耦Channels和Agent,实现消息传递
AgentLoop 统一处理来自所有渠道的消息,执行核心逻辑
整体架构-消息流程
以 QQ 用户发送消息为例的完整流程如下:
当 QQ 用户向 nanobot 发送消息(如"帮我分析这段代码")时,消息首先被 QQ 平台的服务器接收,然后通过 WebSocket 连接传递给 nanobot 的 QQ 机器人实例。
QQChannel 类通过继承 botpy.Client 并实现事件处理方法来接收消息:
on_c2c_message_create() - 处理 C2C(用户对机器人)消息on_direct_message_create() - 处理直接消息当这两个事件被触发时,QQChannel 会调用内部方法 _on_message()。这个方法:
_processed_ids 来记录已处理的消息 ID,避免重复处理相同消息。然后提取用户信息,包括发送者 ID(author.id 或 author.user_openid)和消息内容。如果内容为空则直接返回。_handle_message() 方法进行权限检查和。这个方法首先调用 is_allowed(sender_id) 检查用户是否在白名单中。白名单通过配置文件的 allowFrom 字段设置,如果未配置白名单则允许所有用户访问。如果用户在白名单外,会记录警告日志并返回,拒绝处理此消息。通过权限检查后,BaseChannel 会构建一个 InboundMessage 数据类对象:
channel: 渠道名称,如 "qq"sender_id: 发送者 ID,如 "123456789"ch@t_id: 聊天 ID,QQ 私聊时等于 sender_idcontent: 消息文本内容timestamp: 消息时间戳(自动生成)media: 媒体文件列表(如图片 URL),默认为空metadata: 渠道特定元数据,如 QQ 的 message_idsession_key_override: 会话键覆盖,用于线程作用域会话InboundMessage 有一个 session_key 属性,自动生成会话键:如果设置了 session_key_override 则使用它,否则使用 f"{channel}:{ch@t_id}" 格式。这样 QQ 用户的会话键就是 "qq:123456789"。
BaseChannel 调用 await self.bus.publish_inbound(msg) 将入站消息发布到消息总线。
MessageBus 维护两个异步队列:
inbound: asyncio.Queue[InboundMessage] - 入站消息队列(渠道 → Agent)outbound: asyncio.Queue[OutboundMessage] - 出站消息队列(Agent → 渠道)publish_inbound() 方法使用 await self.inbound.put(msg) 将消息放入入站队列。这是一个非阻塞操作,如果队列满了会自动等待。
AgentLoop 的 run() 方法是主循环,持续从消息总线消费入站消息:
while self._running:
try:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
continue
consume_inbound() 使用 await self.inbound.get() 从队列获取消息,这是一个阻塞操作,会等待直到有消息可用。这里设置了 1 秒超时,用于定期检查 _running 状态以便优雅停止。
获取到消息后,AgentLoop 会检查是否是特殊命令 /stop,如果是则调用 _handle_stop(msg) 取消该会话的所有活跃任务和子代理。否则,创建一个异步任务来处理这条消息:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
AgentLoop 将任务添加到 _active_tasks 字典中,键是 session_key,值是该会话的任务列表。这样 /stop 命令可以取消特定会话的所有任务。
_dispatch(msg) 方法是消息分发的核心,它使用全局处理锁 _processing_lock 确保消息串行化处理,避免并发问题:
async with self._processing_lock:
try:
response = await self._process_message(msg)
if response is not None:
await self.bus.publish_outbound(response)
_process_message(msg) 是完整的 Agent 处理流程,包括获取或创建会话、构建上下文、运行 Agent 迭代循环(LLM 与工具交互)、保存会话等。最终返回一个 OutboundMessage 对象。
_process_message() 返回的 OutboundMessage 包含:
channel: 目标渠道名,如 "qq"ch@t_id: 目标聊天 ID,如 "123456789"content: Agent 的响应文本reply_to: 可选的回复消息 IDmedia: 可选的媒体文件列表metadata: 可选的元数据,如进度标记AgentLoop 调用 await self.bus.publish_outbound(response) 将响应发布到出站队列。publish_outbound() 使用 await self.outbound.put(msg) 将消息放入出站队列。
ChannelManager 运行一个独立的协程 _dispatch_outbound() 来分发出站消息:
while True:
try:
msg = await asyncio.wait_for(self.bus.consume_outbound(), timeout=1.0)
# 过滤进度消息(根据配置)
if msg.metadata.get("_progress"):
if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints:
continue
if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress:
continue
# 获取目标渠道
channel = self.channels.get(msg.channel)
if channel:
await channel.send(msg)
这个循环持续从出站队列消费消息,支持根据配置过滤工具提示和进度消息。然后通过 self.channels.get(msg.channel) 获取目标渠道实例。channels 是一个字典,存储了所有启用的渠道,如 {"qq": QQChannel实例, "telegram": T@elegrimmChannel实例}。
获取到目标渠道实例后,调用其 send(msg) 方法。对于 QQChannel 具体如下:
async def send(self, msg: OutboundMessage) -> None:
if not self._client:
return
try:
await self._client.api.post_c2c_message(
openid=msg.ch@t_id,
msg_type=0,
content=msg.content,
)
except Exception as e:
logger.error("Error sending QQ message: {}", e)
QQChannel 使用 botpy SDK 的 API post_c2c_message() 发送 C2C 私聊消息,msg_type=0 表示文本消息。
最终,QQ 用户在客户端收到 nanobot 的响应消息,完成了完整的消息流入和流出流程。
Session 用以区分不同用户/会话的机制:
nanobot 通过 session_key 来区分不同的会话。每个 InboundMessage 都有会话键属性:
@property
def session_key(self) -> str:
return self.session_key_override or f"{self.channel}:{self.ch@t_id}"
默认情况下,会话键使用 {渠道名}:{聊天ID} 格式生成:
"telegram:123456789""qq:987654321""discord:456789123""cli:direct"如果设置了 session_key_override,则使用覆盖值。这用于特殊场景,如线程作用域会话、系统任务专用会话等。
SessionManager 负责会话的持久化管理:
def __init__(self, workspace: Path):
self.workspace = workspace
self.sessions_dir = ensure_dir(self.workspace / "sessions")
self._cache: dict[str, Session] = {}
_cache 是内存缓存字典,键是 session_key,值是 Session 对象。这避免频繁读取磁盘。
会话文件存储在 workspace/sessions/ 目录,每个会话一个 JSONL 文件,文件名是安全的会话键(将 : 替换为 _)。例如:telegram_123456789.jsonl。
Session 数据类存储会话的所有消息:
@dataclass
class Session:
key: str # 会话键
messages: list[dict[str, Any]] # 消息列表
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
metadata: dict[str, Any] # 元数据
last_consolidated: int # 已归档的消息数
messages 是 append-only 的消息列表,存储完整的对话历史。last_consolidated 字段记录已经归档到 MEMORY.md 的消息数量,get_history() 方法只返回 messages[last_consolidated:] 的未归档消息,这样 LLM 不会看到重复的历史。
当 AgentLoop 处理消息时,调用 session = sessions.get_or_create(msg.session_key):
def get_or_create(self, key: str) -> Session:
if key in self._cache:
return self._cache[key]
session = self._load(key)
if session is None:
session = Session(key=key)
self._cache[key] = session
return session
首先检查内存缓存,如果存在直接返回。否则尝试从磁盘加载会话文件,如果文件不存在或加载失败则创建新会话。最后将会话加入缓存并返回。
不同用户/会话的消息通过 session_key 完全隔离:
{safe_key}.jsonl 文件这种设计确保用户 A 的对话历史不会影响用户 B 的会话,T@elegrimm 群组的消息不会泄露给 Discord 私聊。
/stop 命令只取消特定 session_key 的任务:
async def _handle_stop(self, msg: InboundMessage) -> None:
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
这确保停止一个会话不会影响其他会话的运行任务。
CronService 是定时任务服务,在 Gateway 启动时通过回调与消息系统连接:
async def on_cron_job(job: CronJob) -> str | None:
channel, ch@t_id = _pick_cron_target(job)
if job.payload.deliver:
await bus.publish_inbound(InboundMessage(
channel="system", # 使用 system 标识,不是具体渠道
sender_id="cron",
ch@t_id=f"{channel}:{ch@t_id}", # 目标渠道和 ID(特殊格式)
content=job.payload.message,
session_key_override=f"cron{job.id}", # 专用会话键
))
return "Job delivered"
else:
# 不发送到渠道,直接调用 Agent 处理
return await agent.process_direct(
content=job.payload.message,
session_key=f"cron{job.id}",
)
当定时任务到期时,CronService 调用 Gateway 设置的 on_job 回调。如果任务配置了 deliver=true,回调通过 MessageBus 发布一个 system 消息,这个消息的特点是:
channel="system":标识这是系统消息,不是直接来自聊天平台sender_id="cron":标识消息来源是 cron 定时任务ch@t_id=f"{channel}:{ch@t_id}":特殊格式,包含实际的目标渠道和聊天 IDsession_key_override=f"cron{job.id}":使用专用会话键这个消息进入 inbound 队列后被 AgentLoop 接收。
AgentLoop 的 _process_message() 方法会识别并处理 system 消息:
if msg.channel == "system":
if ":" in msg.ch@t_id:
target_channel, target_ch@t_id = msg.ch@t_id.split(":", 1)
else:
target_channel = msg.channel
target_ch@t_id = msg.ch@t_id
对于 system 消息,从 ch@t_id 解析出实际的目标渠道和聊天 ID。例如,ch@t_id="telegram:123456789" 会被解析为 target_channel="telegram" 和 target_ch@t_id="123456789"。
AgentLoop 使用解析的目标渠道和聊天 ID 构建出站消息,确保定时任务的响应发送到正确的用户。
HeartbeatService 是心跳服务,用于定期检查和执行 HEARTBEAT.md 中的任务。它有两个回调:
async def on_heartbeat_execute(tasks: str) -> str:
channel, ch@t_id = _pick_heartbeat_target()
return await agent.process_direct(
content=tasks,
session_key="heartbeat",
channel=channel,
ch@t_id=ch@t_id,
on_progress=_silent,
)
这个回调直接调用 agent.process_direct(),不经过 MessageBus。process_direct() 内部处理消息但不自动发送结果到渠道。
async def on_heartbeat_notify(response: str) -> None:
channel, ch@t_id = _pick_heartbeat_target()
if channel == "cli":
return # CLI 模式无法发送
await bus.publish_outbound(OutboundMessage(
channel=channel,
ch@t_id=ch@t_id,
content=response,
))
这个回调在任务完成后通过 MessageBus 发布出站消息,ChannelManager 会将其分发给目标渠道。
MessageTool 允许 LLM 主动向任意启用的渠道发送消息:
async def execute(self, content, channel=None, ch@t_id=None, **kwargs) -> str:
channel = channel or self._default_channel
ch@t_id = ch@t_id or self._default_ch@t_id
msg = OutboundMessage(
channel=channel,
ch@t_id=ch@t_id,
content=content,
)
if self._send_callback:
await self._send_callback(msg) # -> bus.publish_outbound(msg)
AgentLoop 在处理每条消息时设置 MessageTool 的上下文:
self.tools.message_tool.set_context(msg.channel, msg.ch@t_id)
这样 LLM 可以在响应 T@elegrimm 用户时主动发送消息到 Discord 群组,实现跨渠道通知。
on_job 回调 → MessageBus.publish_inbound() → AgentLoop 处理 → MessageBus.publish_outbound() → ChannelManager 分发on_execute 回调 → AgentLoop.process_direct()(内部处理),然后通过 on_notify 回调 → MessageBus.publish_outbound() → ChannelManager 分发_send_callback(msg) → MessageBus.publish_outbound() → ChannelManager 分发到指定渠道所有模块间的通信都通过 MessageBus 的异步队列实现,确保了系统的解耦和可扩展性。
3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析
Kimi Agent产品很厉害,然后呢?
OpenClaw真完整解说:架构与智能体内核
github.com/shareAI-lab…
深入理解OpenClaw技术架构与实现原理(上)
深度解析:一张图拆解OpenClaw的Agent核心设计
OpenClaw小龙虾架构全面解析
OpenClaw架构-Agent Runtime 运行时深度拆解
OpenClaw 架构详解 · 第一部分:控制平面、会话管理与事件循环
从回答问题到替你做事,AI Agent 为什么突然火了?
本文使用 markdown.com.cn 排版