荣耀文档
80.56M · 2026-04-14
当你的 Agent 需要执行 npm install、pytest、docker build 这样的慢命令时,会发生什么?
如果你的答案是肯定的,那么你需要一个后台任务系统。
到了这一阶段,你的 Agent 已经具备了多种能力:
但当面对慢命令时,同步执行的方式会遇到明显问题:
所以到了这个阶段,我们需要一个后台任务系统:
把「慢执行」移到后台,让主循环继续推进别的事情。
用一个图来表示后台任务系统的工作流程:
主循环
|
+-- background_run("pytest")
| -> 立刻返回 task_id
|
+-- 继续别的工作
|
+-- 下一轮模型调用前
-> drain_notifications()
-> 把摘要注入 messages
后台执行线
|
+-- 真正执行 pytest
+-- 完成后写入通知队列
关键点只有两个:
前台指的是:
后台不是神秘系统。
后台只是说:
通知队列就是一条「稍后再告诉主循环」的收件箱。
后台任务完成以后,不是直接把全文硬塞回模型, 而是先写一条摘要通知,等下一轮再统一带回去。
运行时任务指的是:
import os
import threading
import subprocess
import json
from pathlib import Path
import time
import uuid
class BackgroundManager:
"""后台任务管理器"""
def __init__(self, runtime_dir=".runtime-tasks"):
self.runtime_dir = Path(runtime_dir)
self.runtime_dir.mkdir(exist_ok=True)
self.tasks = {}
self.notifications = []
self.lock = threading.Lock()
# 加载已存在的任务
self._load_existing_tasks()
def _load_existing_tasks(self):
"""加载已存在的任务"""
for task_file in self.runtime_dir.glob("*.json"):
try:
task = json.loads(task_file.read_text(encoding="utf-8"))
self.tasks[task["id"]] = task
except Exception as e:
print(f"加载任务失败 {task_file}: {e}")
def _generate_task_id(self):
"""生成任务 ID"""
return str(uuid.uuid4())
def run(self, command):
"""运行后台任务"""
task_id = self._generate_task_id()
# 创建任务记录
task = {
"id": task_id,
"command": command,
"status": "running",
"started_at": time.time(),
"result_preview": "",
"output_file": f"{task_id}.log",
}
# 保存任务记录
task_file = self.runtime_dir / f"{task_id}.json"
task_file.write_text(json.dumps(task, indent=2, ensure_ascii=False), encoding="utf-8")
with self.lock:
self.tasks[task_id] = task
# 启动后台线程
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True,
)
thread.start()
return task_id
def _execute(self, task_id, command):
"""在后台执行命令"""
output_file = self.runtime_dir / f"{task_id}.log"
try:
# 执行命令
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=300
)
# 读取输出
output = result.stdout + result.stderr
output_file.write_text(output, encoding="utf-8")
# 生成摘要
status = "completed" if result.returncode == 0 else "failed"
preview = output[:500] + ("..." if len(output) > 500 else "")
except subprocess.TimeoutExpired:
status = "timeout"
preview = "Command timed out after 300 seconds"
output_file.write_text(preview, encoding="utf-8")
except Exception as e:
status = "error"
preview = f"Error executing command: {str(e)}"
output_file.write_text(preview, encoding="utf-8")
# 更新任务状态
with self.lock:
if task_id in self.tasks:
self.tasks[task_id].update({
"status": status,
"result_preview": preview,
"completed_at": time.time(),
})
# 保存任务记录
task_file = self.runtime_dir / f"{task_id}.json"
task_file.write_text(
json.dumps(self.tasks[task_id], indent=2, ensure_ascii=False),
encoding="utf-8"
)
# 添加通知
self.notifications.append({
"type": "background_completed",
"task_id": task_id,
"status": status,
"preview": preview,
})
def check(self, task_id):
"""检查任务状态"""
with self.lock:
task = self.tasks.get(task_id)
if not task:
return "Task not found"
return f"Task #{task_id}: {task['status']}nCommand: {task['command']}nPreview: {task.get('result_preview', '')}"
def list(self):
"""列出所有后台任务"""
with self.lock:
if not self.tasks:
return "No background tasks"
lines = ["# Background Tasksn"]
for task_id, task in self.tasks.items():
lines.append(f"- **#{task_id}** {task['command']} [{task['status']}]")
return "n".join(lines)
def drain_notifications(self):
"""排空通知队列"""
with self.lock:
notifications = self.notifications.copy()
self.notifications.clear()
return notifications
def read_output(self, task_id):
"""读取任务完整输出"""
output_file = self.runtime_dir / f"{task_id}.log"
if not output_file.exists():
return "Output file not found"
try:
return output_file.read_text(encoding="utf-8")
except Exception as e:
return f"Error reading output: {str(e)}"
def create_background_tools(background_manager):
"""创建后台任务相关的工具"""
def background_run(command):
"""运行后台任务"""
task_id = background_manager.run(command)
return f"后台任务已启动: #{task_id}n命令: {command}n请使用 background_check 查看状态"
def background_check(task_id):
"""检查后台任务状态"""
return background_manager.check(task_id)
def background_list():
"""列出所有后台任务"""
return background_manager.list()
def background_read_output(task_id):
"""读取后台任务完整输出"""
output = background_manager.read_output(task_id)
return f"任务 #{task_id} 完整输出:n{output}"
return {
"background_run": background_run,
"background_check": background_check,
"background_list": background_list,
"background_read_output": background_read_output,
}
def agent_loop_with_background(state):
"""带后台任务系统的 Agent Loop"""
# 初始化后台任务管理器
background_manager = BackgroundManager()
# 创建后台任务工具
background_tools = create_background_tools(background_manager)
state["tools"] = state.get("tools", []) + [
{
"name": "background_run",
"description": "在后台运行命令",
"parameters": {
"command": {"type": "string", "description": "要执行的命令"}
}
},
{
"name": "background_check",
"description": "检查后台任务状态",
"parameters": {
"task_id": {"type": "string", "description": "任务 ID"}
}
},
{
"name": "background_list",
"description": "列出所有后台任务",
"parameters": {}
},
{
"name": "background_read_output",
"description": "读取后台任务完整输出",
"parameters": {
"task_id": {"type": "string", "description": "任务 ID"}
}
}
]
# 主循环
while True:
# 1. 排空通知队列
notifications = background_manager.drain_notifications()
if notifications:
text = "n".join(
f"[后台任务 #{n['task_id']}] {n['status']} - {n['preview']}"
for n in notifications
)
state["messages"].append({"role": "user", "content": text})
# 2. 调用模型
response = call_model(state["messages"])
if response.stop_reason != "tool_use":
return response.content
results = []
for block in response.content:
if hasattr(block, "type") and block.type == "tool_use":
tool_name = block.name
tool_input = block.input
# 执行后台任务工具
if tool_name in background_tools:
output = background_tools[tool_name](**tool_input)
else:
# 执行其他工具
output = run_tool(tool_name, tool_input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output
})
if results:
state["messages"].append({"role": "user", "content": results})
启动后台任务:
task_id = background_manager.run("pytest")
# 立刻返回 task_id,不阻塞主循环
检查任务状态:
status = background_manager.check(task_id)
# 查看任务是否完成,以及结果摘要
读取完整输出:
output = background_manager.read_output(task_id)
# 当模型需要查看详细输出时使用
通知格式:
notification = {
"type": "background_completed",
"task_id": "a1b2c3d4",
"status": "completed",
"preview": "tests passed",
}
通知处理:
输出处理:
.runtime-tasks/{task_id}.log)background_read_output 工具查看完整输出这样做的好处是:
| 特性 | 后台任务 | 任务系统 |
|---|---|---|
| 关注点 | 正在运行的执行单元 | 工作目标和依赖关系 |
| 生命周期 | 短期(执行完成后可清理) | 长期(跨会话持久) |
| 状态 | 运行中、完成、失败、超时 | pending、in_progress、completed、deleted |
| 作用 | 处理慢命令,不阻塞主循环 | 管理复杂任务的依赖和进度 |
| 存储 | 运行时临时存储 | 持久化存储 |
使用建议:
npm install、pytest):使用后台任务# 错误
# 为每个后台任务创建一个主循环
def background_main_loop():
while True:
# 处理后台任务...
# 正确
# 主循环仍然只有一条,后台任务只是执行线
thread = threading.Thread(target=self._execute, args=(task_id, command))
thread.start()
# 错误
# 只启动线程,没有任务状态管理
def run_in_background(command):
thread = threading.Thread(target=lambda: subprocess.run(command, shell=True))
thread.start()
# 正确
# 登记任务状态,便于管理和查询
def run(self, command):
task_id = self._generate_task_id()
self.tasks[task_id] = {"id": task_id, "command": command, "status": "running"}
thread = threading.Thread(target=self._execute, args=(task_id, command))
thread.start()
return task_id
# 错误
# 直接把完整输出塞进上下文
def _execute(self, task_id, command):
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout + result.stderr
self.notifications.append({"task_id": task_id, "output": output}) # 危险!
# 正确
# 只放摘要,完整输出放文件
def _execute(self, task_id, command):
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout + result.stderr
output_file.write_text(output, encoding="utf-8")
preview = output[:500] + ("..." if len(output) > 500 else "")
self.notifications.append({"task_id": task_id, "preview": preview})
# 错误
# 把任务系统的工作目标和后台任务混在一起
def run_task(task_id):
# 直接执行任务系统中的任务
task = task_manager.get(task_id)
background_manager.run(task["subject"]) # 错误!subject 不是命令
# 正确
# 明确区分工作目标和执行命令
def run_task(task_id):
# 标记任务开始
task_manager.update(task_id, status="in_progress")
# 执行具体命令
bg_task_id = background_manager.run("pytest")
# 记录后台任务 ID
task_manager.update(task_id, bg_task_id=bg_task_id)
因为一个真正高效的 Agent,不应该被慢命令阻塞。
后台任务系统让你能够:
所以后台任务系统是构建高效 Agent 系统的重要组件。
有了后台任务系统,你的 Agent 已经具备了处理慢命令的能力。下一章我们将探讨定时任务系统,让 Agent 能够按时间自动触发任务,实现更智能的工作调度。
一句话总结:主循环只有一条,并行的是等待,不是主循环本身。