第五层:子代理系统

📌 核心文件nanobot/agent/subagent.py

概述

子代理(Subagent)系统允许主 Agent 派生独立的后台 Agent 来处理长时间运行的任务,实现真正的并发处理和异步通知。

使用场景

适合子代理的任务

长时间运行

  • 定期监控服务状态
  • 等待某个事件发生
  • 批量处理大量数据

异步通知

  • 完成后主动通知用户
  • 发现异常时及时警告

独立任务

  • 不影响主对话流程
  • 可以并行执行多个

不适合子代理的任务

❌ 需要立即结果的操作
❌ 简单的同步任务
❌ 需要频繁交互的任务

核心架构

graph TB
    User[用户] -->|1. 请求任务| Main[主 Agent]
    Main -->|2. spawn 工具| SM[SubagentManager]
    SM -->|3. 创建| Sub1[子代理 1]
    SM -->|3. 创建| Sub2[子代理 2]
    
    Sub1 -.->|4. 后台运行| Loop1[独立 AgentLoop]
    Sub2 -.->|4. 后台运行| Loop2[独立 AgentLoop]
    
    Loop1 -->|5. 完成/通知| Bus[MessageBus]
    Loop2 -->|5. 完成/通知| Bus
    
    Bus -->|6. 分发| Channel[Telegram/WhatsApp]
    Channel -->|7. 通知| User
    
    Main -->|同时| User

SubagentManager 实现

核心类

class SubagentManager:
    """管理所有子代理"""
    
    def __init__(self, provider, workspace, bus, model, brave_api_key=None):
        self.provider = provider
        self.workspace = workspace
        self.bus = bus
        self.model = model
        self.brave_api_key = brave_api_key
        self._subagents: dict[str, asyncio.Task] = {}
    
    async def spawn(
        self,
        task: str,
        origin: str,  # 原始对话标识 "channel:chat_id"
        announce: bool = True
    ) -> str:
        """
        生成新的子代理
        
        Args:
            task: 任务描述
            origin: 完成后通知的目标
            announce: 是否在完成后通知
        
        Returns:
            子代理 ID
        """
        subagent_id = str(uuid.uuid4())[:8]
        
        # 创建独立的 AgentLoop
        subagent_loop = AgentLoop(
            bus=self.bus,
            provider=self.provider,
            workspace=self.workspace,
            model=self.model,
            brave_api_key=self.brave_api_key
        )
        
        # 在后台运行
        task_coro = self._run_subagent(
            subagent_loop,
            task,
            origin,
            announce,
            subagent_id
        )
        
        self._subagents[subagent_id] = asyncio.create_task(task_coro)
        
        logger.info(f"Spawned subagent {subagent_id} for task: {task}")
        return subagent_id
    
    async def _run_subagent(
        self,
        loop: AgentLoop,
        task: str,
        origin: str,
        announce: bool,
        subagent_id: str
    ):
        """运行子代理直到完成"""
        try:
            # 处理任务
            session_key = f"subagent:{subagent_id}"
            result = await loop.process_direct(task, session_key)
            
            # 如果需要通知
            if announce and origin:
                channel, chat_id = origin.split(":", 1)
                await self.bus.publish_outbound(OutboundMessage(
                    channel=channel,
                    chat_id=chat_id,
                    content=f"🤖 子代理 {subagent_id} 完成任务:\n\n{result}"
                ))
        
        except Exception as e:
            logger.error(f"Subagent {subagent_id} failed: {e}")
        
        finally:
            # 清理
            self._subagents.pop(subagent_id, None)

SpawnTool 实现

class SpawnTool(Tool):
    """生成子代理的工具"""
    
    def __init__(self, manager: SubagentManager):
        self.manager = manager
        self._channel = None
        self._chat_id = None
    
    def set_context(self, channel: str, chat_id: str):
        """设置当前对话上下文(从主 Agent 传入)"""
        self._channel = channel
        self._chat_id = chat_id
    
    @property
    def name(self) -> str:
        return "spawn"
    
    @property
    def description(self) -> str:
        return "Spawn a subagent to handle a task in the background."
    
    @property
    def parameters(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "task": {
                    "type": "string",
                    "description": "任务描述"
                },
                "announce": {
                    "type": "boolean",
                    "description": "完成后是否通知用户",
                    "default": True
                }
            },
            "required": ["task"]
        }
    
    async def execute(self, task: str, announce: bool = True) -> str:
        origin = f"{self._channel}:{self._chat_id}"
        
        subagent_id = await self.manager.spawn(
            task=task,
            origin=origin,
            announce=announce
        )
        
        return f"已派生子代理 {subagent_id} 处理任务:{task}"

完整使用示例

场景 1:定期监控

用户“每小时检查一次 example.com 是否在线”

# 主 Agent 理解后调用
{
  "name": "spawn",
  "arguments": {
    "task": "每小时访问 https://example.com,如果返回非 200 状态码则通知我",
    "announce": true
  }
}

# 返回
"已派生子代理 a1b2c3d4 处理任务:每小时访问..."

# 子代理在后台运行
while True:
    status = await check_website("https://example.com")
    if status != 200:
        # 通过消息总线通知用户
        await bus.publish_outbound(OutboundMessage(
            channel="telegram",
            chat_id="123456",
            content=f"⚠️ 网站离线!状态码:{status}"
        ))
    await asyncio.sleep(3600)  # 等待 1 小时

场景 2:大文件处理

用户“下载并处理这个 10GB 的数据集”

# 主 Agent
{
  "name": "spawn",
  "arguments": {
    "task": "下载 https://data.example.com/dataset.zip 并解压、清洗,完成后通知我",
    "announce": true
  }
}

# 子代理独立运行
await exec("wget https://data.example.com/dataset.zip")  # 30 分钟
await exec("unzip dataset.zip")                          # 10 分钟
await exec("python clean_data.py")                       # 1 小时

# 完成后自动通知:
"🤖 子代理 x7y8z9 完成任务:
数据集已下载并清洗完成,共 1,234,567 条记录。"

场景 3:多任务并行

# 用户同时发起 3 个任务

任务 1spawn(task="监控服务器 A")
任务 2spawn(task="监控服务器 B")
任务 3spawn(task="每天生成报告")

# 3 个子代理同时运行,互不干扰
SubagentManager._subagents = {
    "aaa111": <Task for 任务 1>,
    "bbb222": <Task for 任务 2>,
    "ccc333": <Task for 任务 3>
}

与定时任务的对比

维度 子代理(Subagent) 定时任务(Cron)
触发方式 对话中动态创建 预先配置
灵活性 高(LLM理解任务) 低(固定消息)
生命周期 临时(可能一次性) 持久(周期性)
复杂度 可以很复杂 通常简单
适用场景 临时的复杂后台任务 定期的简单提醒

实现细节

1. 独立的会话

每个子代理有独立的会话:

session_key = f"subagent:{subagent_id}"

对话历史保存在:

~/.nanobot/sessions/subagent:a1b2c3d4.json

2. 共享工具和技能

子代理使用相同的:

  • LLM Provider
  • 工具注册表
  • 技能加载器
  • 工作区

3. 后台运行

# 使用 asyncio.create_task 在后台运行
task = asyncio.create_task(self._run_subagent(...))

# 主 Agent 立即返回,不等待子代理完成

4. 通知机制

# 通过消息总线发送通知
await self.bus.publish_outbound(OutboundMessage(
    channel=original_channel,
    chat_id=original_chat_id,
    content=f"子代理完成:{result}"
))

# 消息总线分发到对应渠道
# 用户收到通知

最佳实践

1. 明确任务目标

# ✅ 好的任务描述
"每 30 分钟检查一次 GitHub Actions 的状态,
如果有失败的 workflow 就通知我,
并附上失败的日志链接。"

# ❌ 不好的任务描述
"监控 CI"

2. 设置终止条件

# ✅ 有明确终止条件
"监控服务器,如果 CPU 超过 80% 连续 5 分钟则通知我,
然后停止监控。"

# ❌ 无限循环
"一直监控服务器"  # 可能永不终止

3. 合理使用 announce

# 需要通知
spawn(task="下载文件", announce=True)   # 完成后通知

# 不需要通知
spawn(task="静默清理日志", announce=False)  # 后台执行即可

调试和监控

查看活动的子代理

def list_subagents(self) -> list[str]:
    """列出所有活动的子代理 ID"""
    return list(self._subagents.keys())

# 使用
active = manager.list_subagents()
print(f"活动子代理:{active}")

停止子代理

async def stop_subagent(self, subagent_id: str):
    """停止指定的子代理"""
    task = self._subagents.get(subagent_id)
    if task:
        task.cancel()
        self._subagents.pop(subagent_id)

日志记录

logger.info(f"Spawned subagent {subagent_id}")
logger.info(f"Subagent {subagent_id} completed")
logger.error(f"Subagent {subagent_id} failed: {e}")

未来优化

1. 持久化子代理状态

当前实现在进程重启后子代理会丢失,可以改进:

# 保存子代理状态到文件
{
  "id": "a1b2c3d4",
  "task": "监控服务器",
  "origin": "telegram:123456",
  "created_at": "2026-02-03T10:00:00Z",
  "status": "running"
}

# 重启后恢复
async def restore_subagents(self):
    for state in load_subagent_states():
        await self.spawn(state["task"], state["origin"])

2. 资源限制

MAX_SUBAGENTS = 10

async def spawn(self, task, origin, announce=True):
    if len(self._subagents) >= MAX_SUBAGENTS:
        return "Error: 已达到最大子代理数量限制"

3. 超时控制

async def _run_subagent(self, ..., timeout=3600):
    try:
        await asyncio.wait_for(
            loop.process_direct(task, session_key),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        logger.warning(f"Subagent {subagent_id} timeout")

小结

  • ✅ 子代理实现真正的并发后台任务
  • ✅ 独立的 AgentLoop,共享资源
  • ✅ 自动通知机制
  • ✅ 适合长时间运行、异步通知的场景
  • ✅ 与定时任务互补

下一步12-记忆系统.md