第二层:消息总线

📌 核心文件

  • nanobot/bus/queue.py - 消息队列实现
  • nanobot/bus/events.py - 消息事件定义

概述

消息总线(Message Bus)是 nanobot 的核心通信层,采用发布-订阅模式解耦渠道(Channel)和智能体(Agent)。

设计理念

问题:紧耦合设计

如果没有消息总线,系统会是这样:

# ❌ 紧耦合
class TelegramBot:
    def __init__(self, agent: AgentLoop):
        self.agent = agent  # 直接依赖 Agent
    
    async def on_message(self, msg):
        response = await self.agent.process(msg)
        await self.send(response)

# Agent 也需要知道所有渠道
class AgentLoop:
    def __init__(self, telegram: TelegramBot, whatsapp: WhatsApp):
        self.channels = [telegram, whatsapp]

问题

  • 添加新渠道需要修改 Agent 代码
  • 渠道之间相互依赖
  • 难以测试

解决方案:消息总线

# ✅ 解耦设计
class TelegramBot:
    def __init__(self, bus: MessageBus):
        self.bus = bus  # 只依赖消息总线
    
    async def on_message(self, msg):
        # 发布到总线
        await self.bus.publish_inbound(msg)

class AgentLoop:
    def __init__(self, bus: MessageBus):
        self.bus = bus  # 只依赖消息总线
    
    async def run(self):
        # 从总线消费消息
        while True:
            msg = await self.bus.consume_inbound()
            response = await self.process(msg)
            await self.bus.publish_outbound(response)

优势

  • Agent 和 Channel 互不知道对方
  • 添加新渠道只需订阅总线
  • 易于测试(Mock MessageBus)

消息事件定义

InboundMessage - 入站消息

from pydantic import BaseModel

class InboundMessage(BaseModel):
    """从用户到 Agent 的消息"""
    
    channel: str           # 来源渠道:cli, telegram, whatsapp
    sender_id: str         # 发送者 ID
    chat_id: str           # 对话 ID
    content: str           # 消息内容
    session_key: str = ""  # 会话标识(默认为 channel:chat_id)
    media: list[str] = []  # 可选的媒体文件路径
    
    def __init__(self, **data):
        super().__init__(**data)
        # 自动生成 session_key
        if not self.session_key:
            self.session_key = f"{self.channel}:{self.chat_id}"

示例

msg = InboundMessage(
    channel="telegram",
    sender_id="123456789",
    chat_id="123456789",
    content="你好,帮我读取文件",
    media=["/tmp/image.jpg"]
)

OutboundMessage - 出站消息

class OutboundMessage(BaseModel):
    """从 Agent 到用户的响应"""
    
    channel: str      # 目标渠道
    chat_id: str      # 目标对话
    content: str      # 响应内容
    media: list[str] = []  # 可选的媒体文件

示例

response = OutboundMessage(
    channel="telegram",
    chat_id="123456789",
    content="文件内容:..."
)

MessageBus 实现

核心结构

import asyncio
from typing import Callable, Awaitable

class MessageBus:
    """异步消息总线"""
    
    def __init__(self):
        # 两个异步队列
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
        
        # 出站消息订阅者:{channel: [callback1, callback2, ...]}
        self._outbound_subscribers: dict[str, list[Callable]] = {}
        
        self._running = False

入站消息处理

async def publish_inbound(self, msg: InboundMessage) -> None:
    """渠道发布消息到总线"""
    await self.inbound.put(msg)

async def consume_inbound(self) -> InboundMessage:
    """Agent 从总线消费消息(阻塞直到有消息)"""
    return await self.inbound.get()

出站消息处理

async def publish_outbound(self, msg: OutboundMessage) -> None:
    """Agent 发布响应到总线"""
    await self.outbound.put(msg)

def subscribe_outbound(
    self, 
    channel: str, 
    callback: Callable[[OutboundMessage], Awaitable[None]]
) -> None:
    """渠道订阅出站消息"""
    if channel not in self._outbound_subscribers:
        self._outbound_subscribers[channel] = []
    self._outbound_subscribers[channel].append(callback)

async def dispatch_outbound(self) -> None:
    """后台任务:分发出站消息到订阅者"""
    self._running = True
    while self._running:
        try:
            # 等待下一条消息
            msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
            
            # 找到订阅者
            subscribers = self._outbound_subscribers.get(msg.channel, [])
            
            # 调用所有订阅者的回调
            for callback in subscribers:
                try:
                    await callback(msg)
                except Exception as e:
                    logger.error(f"Error dispatching to {msg.channel}: {e}")
        
        except asyncio.TimeoutError:
            continue

def stop(self):
    """停止分发循环"""
    self._running = False

完整的消息流程

sequenceDiagram
    participant User
    participant Channel
    participant Bus
    participant Agent

    User->>Channel: 发送消息 "你好"
    Channel->>Bus: publish_inbound(msg)
    Bus->>Agent: consume_inbound() → msg
    Agent->>Agent: 处理消息
    Agent->>Bus: publish_outbound(response)
    Bus->>Channel: dispatch_outbound() → callback
    Channel->>User: 发送响应 "你好!"

使用示例

1. 在 Gateway 中使用

async def main():
    # 创建总线
    bus = MessageBus()
    
    # 创建 Agent
    agent = AgentLoop(bus, provider, workspace)
    
    # 创建渠道
    telegram = TelegramChannel(config, bus)
    whatsapp = WhatsAppChannel(config, bus)
    
    # 并发运行
    await asyncio.gather(
        agent.run(),               # Agent 消费 inbound,发布 outbound
        bus.dispatch_outbound(),   # 分发 outbound 到渠道
        telegram.start(),          # Telegram 监听,发布 inbound
        whatsapp.start(),          # WhatsApp 监听,发布 inbound
    )

2. 渠道实现示例

class TelegramChannel:
    def __init__(self, config, bus: MessageBus):
        self.config = config
        self.bus = bus
        self.bot = TelegramBot(config.token)
        
        # 订阅出站消息
        bus.subscribe_outbound("telegram", self.send_message)
    
    async def start(self):
        """监听 Telegram 消息"""
        @self.bot.on_message
        async def on_message(update):
            # 发布入站消息
            await self.bus.publish_inbound(InboundMessage(
                channel="telegram",
                sender_id=str(update.user.id),
                chat_id=str(update.chat.id),
                content=update.text
            ))
        
        await self.bot.run()
    
    async def send_message(self, msg: OutboundMessage):
        """发送消息回用户(订阅回调)"""
        if msg.channel == "telegram":
            await self.bot.send_message(
                chat_id=msg.chat_id,
                text=msg.content
            )

优势总结

1. 解耦

# Agent 不需要知道渠道
class AgentLoop:
    def __init__(self, bus):
        self.bus = bus  # 只依赖抽象的总线

# 渠道不需要知道 Agent
class TelegramChannel:
    def __init__(self, bus):
        self.bus = bus  # 只依赖抽象的总线

2. 可测试

async def test_agent():
    # Mock 消息总线
    bus = MessageBus()
    agent = AgentLoop(bus, mock_provider, tmp_path)
    
    # 发布测试消息
    await bus.publish_inbound(InboundMessage(
        channel="test",
        sender_id="test",
        chat_id="test",
        content="测试消息"
    ))
    
    # 验证响应
    response = await bus.consume_outbound()
    assert "期望的响应" in response.content

3. 可扩展

添加新渠道只需三步:

# 1. 实现 Channel 类
class DiscordChannel:
    def __init__(self, config, bus):
        self.bus = bus
        bus.subscribe_outbound("discord", self.send_message)
    
    async def start(self):
        # 监听 Discord,发布 inbound
        pass
    
    async def send_message(self, msg):
        # 发送到 Discord
        pass

# 2. 在 gateway 中启动
discord = DiscordChannel(config, bus)

# 3. 运行
await discord.start()

性能考虑

队列大小

# asyncio.Queue 默认无限大小
# 如果需要限制:
self.inbound = asyncio.Queue(maxsize=100)

# 当队列满时,publish_inbound 会阻塞

消息积压

@property
def inbound_size(self) -> int:
    """检查积压的入站消息数"""
    return self.inbound.qsize()

@property
def outbound_size(self) -> int:
    """检查积压的出站消息数"""
    return self.outbound.qsize()

# 使用
if bus.inbound_size > 100:
    logger.warning("Too many pending messages!")

小结

  • ✅ 发布-订阅模式解耦渠道和 Agent
  • ✅ 异步队列支持高并发
  • ✅ 简洁的实现(仅 ~80 行代码)
  • ✅ 易于测试和扩展

下一步06-上下文构建.md