以观书法
108.85M · 2026-02-05
0x00 概述
0x01 服务
0xFF 参考
本篇结合官方文档进行解读OpenHands的服务器,这是OpenHands系统的立身基础。
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
OpenHands提供了WebSocket服务器。
openhands-server
可以发送或从服务器接收两种类型的消息:
一个action 包含三个部分:
action:要采取的动作args:动作的参数message:可以放在聊天记录中的友好消息有几种action 。它们的参数列在下面。 随着时间的推移,这个列表可能会增长。
initialize - 初始化代理。仅由客户端发送。
model - 要使用的模型名称directory - 工作空间的路径agent_cls - 要使用的代理类start - 开始一个新的开发任务。仅由客户端发送。
task - 要开始的任务read - 读取文件内容。
path - 要读取的文件路径write - 写入内容到文件。
path - 要写入的文件路径content - 写入文件的内容run - 运行命令。
command - 要运行的命令browse - 打开网页。
url - 要打开的URLthink - 允许代理制定计划、设定目标或记录想法
thought - 要记录的想法finish - 代理发出任务完成的信号
一个observation 包含四个部分:
observation:观察类型content:表示观察数据的字符串extras:额外的结构化数据message:可以放在聊天记录中的友好消息有几种observation 。它们的额外信息列在下面。 随着时间的推移,这个列表可能会增长。
read - 文件内容
path - 读取的文件路径browse - URL的HTML内容
url - 打开的URLrun - 命令的输出
command - 运行的命令exit_code - 命令的退出代码ch@t - 用户的消息
以下部分描述了OpenHands项目的服务器端组件。
session.py 文件定义了Session类,它代表与客户端的WebSocket会话。关键特性包括:
agent_session.py 文件包含AgentSession类,它管理会话内代理的生命周期。关键特性包括:
conversation_manager.py 文件定义了ConversationManager类,它负责管理多个客户端会话。关键特性包括:
listen.py 文件是主服务器文件,它设置FastAPI应用程序并定义各种API端点。关键特性包括:
该脚本定义了服务接口,主要分为两个部分:
FastAPI库实现的HTTP接口,其具体实现位于openhands/server/routes目录中;socketio库实现的WebSocket接口,其代码实现在openhands/server/listen_socket.py文件中。用户与代理的交互通过WebSocket进行,连接初始化时会触发connect事件,用户发送消息时会触发oh_user_action事件,连接断开时会触发disconnect事件。因此,梳理代理交互逻辑的核心在于对这三个事件的处理流程进行整理。import socketio
from openhands.server.app import app as base_app
from openhands.server.listen_socket import sio
from openhands.server.middleware import (
CacheControlMiddleware,
InMemoryRateLimiter,
LocalhostCORSMiddleware,
RateLimitMiddleware,
)
from openhands.server.static import SPAStaticFiles
if os.getenv('SERVE_FRONTEND', 'true').lower() == 'true':
base_app.mount(
'/', SPAStaticFiles(directory='./frontend/build', html=True), name='dist'
)
base_app.add_middleware(LocalhostCORSMiddleware)
base_app.add_middleware(CacheControlMiddleware)
base_app.add_middleware(
RateLimitMiddleware,
rate_limiter=InMemoryRateLimiter(requests=10, seconds=1),
)
app = socketio.ASGIApp(sio, other_asgi_app=base_app)
服务的工作流程如下:
服务器初始化:
listen.py中创建和配置。ConversationManager。客户端连接:
Session或重启现有一个。Session初始化AgentSession,设置运行时环境和代理控制器。代理初始化:
事件处理:
Session管理客户端和代理之间的事件流。文件操作:
安全分析:
会话管理:
ConversationManager定期清理非活动会话。API端点:
这种服务器架构允许管理多个客户端会话,每个会话都有自己的代理实例、运行时环境和安全分析器。事件驱动设计促进了客户端和代理之间的实时通信,而模块化结构允许轻松扩展和维护不同组件。
listen_socket.py是 OpenHands 服务器端的 Socket.IO 事件器,负责处理客户端和服务器之间的实时双向通信,包括连接建立、事件回放、用户行动转发和连接断开四大核心场景,是客户端与后端会话、代理系统交互的桥梁。
listen_socket.py的核心特色如下:
latest_event_id 参数实现事件断点续传,客户端重连时仅回放未接收的事件,避免重复数据传输,提升连接效率。oh_action 处理器兼容旧版客户端,同时提供 oh_user_action 新版接口,平滑过渡不中断服务。AgentStateChangedObservation)最后发送,确保客户端先接收历史事件,再同步最新状态,避免状态不一致。async/await)实现事件回放和转发,支持高并发连接,不阻塞主线程,提升系统吞吐量。NullAction),减少不必要的网络传输。listen_socket.py的具体功能如下:
连接管理(connect 事件)
动作处理(oh_user_action 和 oh_action 事件)
断开连接处理(disconnect 事件)
listen_socket.py的核心工作流程为:
连接建立:
事件历史重播:
会话加入:
安全机制
错误处理
listen_socket.py 与其他组件关系
Openhands-服务
此处关键一步为与会话管理器 ConversationManager 建立连接。
conversation_init_data = await setup_init_conversation_settings(
user_id, conversation_id, providers_set
)
agent_loop_info = await conversation_manager.join_conversation(
conversation_id,
connection_id,
conversation_init_data,
user_id,
)
listen_socket.py的代码举例如下:
@sio.event
async def connect(connection_id: str, environ: dict) -> None:
"""
SocketIO连接事件处理器:客户端建立连接时触发,完成会话验证、事件回放、会话加入等初始化流程。
参数:
connection_id: 客户端连接唯一标识(SocketIO分配)
environ: WSGI环境变量字典,包含请求头、查询参数等信息
"""
try:
logger.info(f"SocketIO连接建立:connection_id={connection_id}")
# 解析查询参数(从WSGI环境变量中提取QUERY_STRING)
query_params = parse_qs(environ.get('QUERY_STRING', ''))
# 解析最新事件ID(用于断点续传,默认-1表示从最开始回放)
latest_event_id_str = query_params.get('latest_event_id', [-1])[0]
try:
latest_event_id = int(latest_event_id_str)
except ValueError:
logger.debug(f"无效的latest_event_id值:{latest_event_id_str},默认设为-1")
latest_event_id = -1
# 解析会话ID(必需参数,用于关联特定对话)
conversation_id = query_params.get('conversation_id', [None])[0]
logger.info(f"会话连接请求:conversation_id={conversation_id}, connection_id={connection_id}")
# 解析提供者集合(如支持的LLM提供商列表,用于限制可用资源)
raw_list = query_params.get('providers_set', [])
providers_list = []
for item in raw_list:
# 拆分逗号分隔的提供者名称,过滤空值
providers_list.extend(item.split(',') if isinstance(item, str) else [])
providers_list = [p for p in providers_list if p]
providers_set = [ProviderType(p) for p in providers_list] # 转换为ProviderType枚举类型
# 校验会话ID是否存在
if not conversation_id:
logger.error("查询参数中缺少conversation_id")
raise ConnectionRefusedError("缺少会话ID(conversation_id)")
# 校验会话API密钥是否有效
if _invalid_session_api_key(query_params):
raise ConnectionRefusedError("无效的会话API密钥")
# 提取请求中的Cookie和Authorization头(用于用户身份验证)
cookies_str = environ.get('HTTP_COOKIE', '')
# WSGI环境中,HTTP头会转为"HTTP_前缀+下划线替换短横线"格式
authorization_header = environ.get('HTTP_AUTHORIZATION', None)
# 创建会话验证器,校验用户身份(关联会话ID、Cookie、授权头)
conversation_validator = create_conversation_validator()
user_id = await conversation_validator.validate(
conversation_id, cookies_str, authorization_header
)
# 创建事件存储实例(用于读取会话历史事件)
try:
event_store = EventStore(
conversation_id, conversation_manager.file_store, user_id
)
except FileNotFoundError as e:
logger.error(f"创建会话事件存储失败:conversation_id={conversation_id}, 错误={e}")
raise ConnectionRefusedError(f"无法访问会话事件:{e}")
agent_state_changed = None # 存储代理状态变更事件(最后单独发送)
# 创建异步事件存储包装器,从latest_event_id+1开始回放事件(避免重复)
async_store = AsyncEventStoreWrapper(event_store, latest_event_id + 1)
# 异步回放历史事件(向客户端推送未接收过的事件)
async for event in async_store:
logger.debug(f"回放事件:{event.__class__.__name__}")
# 跳过无效/召回类事件(无需推送给客户端)
if isinstance(
event,
(NullAction, NullObservation, RecallAction),
):
continue
# 暂存代理状态变更事件(最后发送,确保客户端状态同步)
elif isinstance(event, AgentStateChangedObservation):
agent_state_changed = event
# 其他事件直接推送给客户端
else:
await sio.emit('oh_event', event_to_dict(event), to=connection_id)
# 最后发送代理状态变更事件(确保客户端获取最新状态)
if agent_state_changed:
await sio.emit(
'oh_event', event_to_dict(agent_state_changed), to=connection_id
)
logger.info(f"会话事件回放完成:conversation_id={conversation_id}")
# 初始化会话设置(用户偏好、提供者配置等)
conversation_init_data = await setup_init_conversation_settings(
user_id, conversation_id, providers_set
)
# 加入会话:关联connection_id与会话,启动代理循环
agent_loop_info = await conversation_manager.join_conversation(
conversation_id,
connection_id,
conversation_init_data,
user_id,
)
# 校验会话加入结果
if agent_loop_info is None:
raise ConnectionRefusedError("加入会话失败")
logger.info(f"会话加入成功:conversation_id={conversation_id}, connection_id={connection_id}")
except ConnectionRefusedError:
# 发送错误后断开无效连接
asyncio.create_task(sio.disconnect(connection_id))
raise
@sio.event
async def oh_user_action(connection_id: str, data: dict[str, Any]) -> None:
"""
处理客户端发送的用户行动事件(如用户输入、操作指令)。
参数:
connection_id: 客户端连接ID
data: 用户行动数据(字典格式,包含行动类型、内容等)
"""
# 将用户行动转发到事件流,由会话管理器处理
await conversation_manager.send_to_event_stream(connection_id, data)
@sio.event
async def oh_action(connection_id: str, data: dict[str, Any]) -> None:
"""
兼容旧版客户端的行动事件处理器(保留用于向后兼容)。
注意:待所有客户端升级为使用oh_user_action后,可移除该处理器
目前用于支持正在进行中的旧会话,避免中断服务
"""
await conversation_manager.send_to_event_stream(connection_id, data)
@sio.event
async def disconnect(connection_id: str) -> None:
"""
SocketIO断开连接事件处理器:客户端断开连接时触发。
参数:
connection_id: 断开连接的客户端ID
"""
logger.info(f"SocketIO连接断开:connection_id={connection_id}")
# 通知会话管理器,断开该连接与会话的关联
await conversation_manager.disconnect_from_session(connection_id)
docs.all-hands.dev/openhands/u…
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第二篇:Agent 相关核心概念】 克里
当AI Agent从“玩具”走向“工具”,我们该关注什么?Openhands架构解析【第一篇:系列导读】 克里
Coding Agent之Openhands解析(含代码) Arrow
OpenHands 源码解读 一力辉
本文使用 markdown.com.cn 排版