第四层:工具系统架构

📌 核心文件

  • nanobot/agent/tools/base.py - 工具基类
  • nanobot/agent/tools/registry.py - 工具注册表
  • nanobot/agent/tools/*.py - 各种内置工具

概述

工具(Tool)是 Agent 与外部世界交互的唯一方式。通过工具,LLM 可以:

  • 📁 读写文件
  • 🐚 执行 Shell 命令
  • 🌐 搜索和抓取网页
  • 💬 发送消息到聊天渠道
  • 🤖 生成子代理处理复杂任务

工具的抽象设计

1. 基类:Tool (ABC)

所有工具都继承自这个抽象基类:

from abc import ABC, abstractmethod
from typing import Any

class Tool(ABC):
    """
    Abstract base class for agent tools.
    
    Tools are capabilities that the agent can use to interact with
    the environment, such as reading files, executing commands, etc.
    """
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Tool name used in function calls."""
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        """Description of what the tool does."""
        pass
    
    @property
    @abstractmethod
    def parameters(self) -> dict[str, Any]:
        """JSON Schema for tool parameters."""
        pass
    
    @abstractmethod
    async def execute(self, **kwargs: Any) -> str:
        """
        Execute the tool with given parameters.
        
        Args:
            **kwargs: Tool-specific parameters.
        
        Returns:
            String result of the tool execution.
        """
        pass
    
    def to_schema(self) -> dict[str, Any]:
        """Convert tool to OpenAI function schema format."""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters,
            }
        }

设计亮点

  • 使用 ABC(抽象基类)强制子类实现必要方法
  • to_schema() 自动生成 OpenAI Function Calling 格式
  • execute() 返回字符串,简化 LLM 的理解

2. 工具注册表:ToolRegistry

管理所有工具的注册、查找和执行:

class ToolRegistry:
    """Registry for managing agent tools."""
    
    def __init__(self):
        self._tools: dict[str, Tool] = {}
    
    def register(self, tool: Tool) -> None:
        """Register a new tool."""
        self._tools[tool.name] = tool
        logger.debug(f"Registered tool: {tool.name}")
    
    def get(self, name: str) -> Tool | None:
        """Get a tool by name."""
        return self._tools.get(name)
    
    def get_definitions(self) -> list[dict[str, Any]]:
        """Get all tool definitions for LLM function calling."""
        return [tool.to_schema() for tool in self._tools.values()]
    
    async def execute(self, name: str, arguments: dict[str, Any]) -> str:
        """Execute a tool by name with given arguments."""
        tool = self.get(name)
        if not tool:
            return f"Error: Tool '{name}' not found"
        
        try:
            result = await tool.execute(**arguments)
            return result
        except Exception as e:
            logger.error(f"Tool {name} execution error: {e}")
            return f"Error executing {name}: {str(e)}"

核心功能

  1. 注册register(tool) - 添加新工具
  2. 查找get(name) - 按名称获取工具
  3. 定义get_definitions() - 生成 LLM 能理解的工具列表
  4. 执行execute(name, args) - 调用工具并捕获异常

内置工具详解

1. 文件系统工具

ReadFileTool - 读取文件

class ReadFileTool(Tool):
    @property
    def name(self) -> str:
        return "read_file"
    
    @property
    def description(self) -> str:
        return "Read the contents of a file"
    
    @property
    def parameters(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file to read"
                }
            },
            "required": ["path"]
        }
    
    async def execute(self, path: str) -> str:
        try:
            file_path = Path(path).expanduser()
            content = file_path.read_text(encoding="utf-8")
            return f"File content of {path}:\n\n{content}"
        except Exception as e:
            return f"Error reading file: {str(e)}"

LLM 会看到的工具定义

{
  "type": "function",
  "function": {
    "name": "read_file",
    "description": "Read the contents of a file",
    "parameters": {
      "type": "object",
      "properties": {
        "path": {
          "type": "string",
          "description": "Path to the file to read"
        }
      },
      "required": ["path"]
    }
  }
}

WriteFileTool - 写入文件

async def execute(self, path: str, content: str) -> str:
    try:
        file_path = Path(path).expanduser()
        file_path.parent.mkdir(parents=True, exist_ok=True)
        file_path.write_text(content, encoding="utf-8")
        return f"Successfully wrote to {path}"
    except Exception as e:
        return f"Error writing file: {str(e)}"

特性

  • 自动创建父目录
  • UTF-8 编码
  • 错误处理

EditFileTool - 编辑文件

支持行号范围编辑:

async def execute(
    self, 
    path: str, 
    start_line: int, 
    end_line: int, 
    new_content: str
) -> str:
    try:
        file_path = Path(path).expanduser()
        lines = file_path.read_text(encoding="utf-8").splitlines()
        
        # 替换指定行
        new_lines = (
            lines[:start_line-1] + 
            new_content.splitlines() + 
            lines[end_line:]
        )
        
        file_path.write_text("\n".join(new_lines), encoding="utf-8")
        return f"Successfully edited {path} (lines {start_line}-{end_line})"
    except Exception as e:
        return f"Error editing file: {str(e)}"

ListDirTool - 列出目录

async def execute(self, path: str = ".") -> str:
    try:
        dir_path = Path(path).expanduser()
        items = []
        for item in sorted(dir_path.iterdir()):
            prefix = "📁" if item.is_dir() else "📄"
            items.append(f"{prefix} {item.name}")
        
        return f"Contents of {path}:\n" + "\n".join(items)
    except Exception as e:
        return f"Error listing directory: {str(e)}"

2. Shell 工具

ExecTool - 执行命令

class ExecTool(Tool):
    def __init__(self, working_dir: str = "."):
        self.working_dir = working_dir
    
    @property
    def name(self) -> str:
        return "exec"
    
    @property
    def description(self) -> str:
        return "Execute a shell command and return its output"
    
    @property
    def parameters(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "The shell command to execute"
                }
            },
            "required": ["command"]
        }
    
    async def execute(self, command: str) -> str:
        try:
            # 使用 asyncio.create_subprocess_shell
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=self.working_dir
            )
            
            stdout, stderr = await process.communicate()
            
            result_parts = []
            if stdout:
                result_parts.append(f"stdout:\n{stdout.decode()}")
            if stderr:
                result_parts.append(f"stderr:\n{stderr.decode()}")
            if process.returncode != 0:
                result_parts.append(f"exit code: {process.returncode}")
            
            return "\n".join(result_parts) if result_parts else "Command executed successfully (no output)"
        except Exception as e:
            return f"Error executing command: {str(e)}"

安全注意事项

  • ⚠️ 直接执行 shell 命令有安全风险
  • 建议在受控环境中使用
  • 可以添加命令白名单机制

3. Web 工具

WebSearchTool - 搜索网页

使用 Brave Search API:

class WebSearchTool(Tool):
    def __init__(self, api_key: str | None = None):
        self.api_key = api_key
    
    async def execute(self, query: str, count: int = 5) -> str:
        if not self.api_key:
            return "Web search not configured (missing API key)"
        
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    "https://api.search.brave.com/res/v1/web/search",
                    headers={"X-Subscription-Token": self.api_key},
                    params={"q": query, "count": count}
                )
                response.raise_for_status()
                
                data = response.json()
                results = data.get("web", {}).get("results", [])
                
                formatted = []
                for i, result in enumerate(results, 1):
                    formatted.append(
                        f"{i}. {result['title']}\n"
                        f"   {result['url']}\n"
                        f"   {result['description']}"
                    )
                
                return "Search results:\n\n" + "\n\n".join(formatted)
        except Exception as e:
            return f"Error searching web: {str(e)}"

WebFetchTool - 抓取网页

async def execute(self, url: str) -> str:
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=10.0)
            response.raise_for_status()
            
            # 使用 readability-lxml 提取主要内容
            from readability import Document
            doc = Document(response.text)
            
            title = doc.title()
            content = doc.summary()  # HTML
            
            # 简单的 HTML 转文本
            import re
            text = re.sub('<[^<]+?>', '', content)
            text = re.sub(r'\n\s*\n', '\n\n', text)
            
            return f"Page: {title}\n\n{text[:2000]}"  # 限制长度
    except Exception as e:
        return f"Error fetching URL: {str(e)}"

4. 消息工具

MessageTool - 发送消息

发送消息到特定渠道:

class MessageTool(Tool):
    def __init__(self, send_callback):
        self.send_callback = send_callback
        self._channel = None
        self._chat_id = None
    
    def set_context(self, channel: str, chat_id: str):
        """Set the current conversation context."""
        self._channel = channel
        self._chat_id = chat_id
    
    async def execute(self, content: str, to: str | None = None) -> str:
        """
        Send a message.
        
        Args:
            content: Message content
            to: Optional override for chat_id
        """
        target_chat_id = to or self._chat_id
        
        if not target_chat_id:
            return "Error: No recipient specified"
        
        await self.send_callback(OutboundMessage(
            channel=self._channel,
            chat_id=target_chat_id,
            content=content
        ))
        
        return f"Message sent to {target_chat_id}"

用途

  • 主动发送通知
  • 定时任务的输出
  • 子代理的结果通知

5. 子代理工具

SpawnTool - 生成子代理

class SpawnTool(Tool):
    def __init__(self, manager: SubagentManager):
        self.manager = manager
        self._channel = None
        self._chat_id = None
    
    def set_context(self, channel: str, chat_id: str):
        self._channel = channel
        self._chat_id = chat_id
    
    async def execute(
        self, 
        task: str, 
        announce: bool = True
    ) -> str:
        """
        Spawn a subagent to handle a task in the background.
        
        Args:
            task: Task description for the subagent
            announce: Whether to announce completion
        """
        origin = f"{self._channel}:{self._chat_id}"
        
        subagent_id = await self.manager.spawn(
            task=task,
            origin=origin,
            announce=announce
        )
        
        return f"Spawned subagent {subagent_id} to handle: {task}"

应用场景

  • 长时间运行的任务(如编译代码)
  • 定期监控(如检查网站更新)
  • 复杂的多步骤操作

工具执行流程

让我们跟踪一次完整的工具调用:

1. LLM 决定调用工具

用户:“读取 config.json 文件”

LLM 返回:

{
  "role": "assistant",
  "content": null,
  "tool_calls": [
    {
      "id": "call_abc123",
      "type": "function",
      "function": {
        "name": "read_file",
        "arguments": "{\"path\": \"config.json\"}"
      }
    }
  ]
}

2. Agent 执行工具

for tool_call in response.tool_calls:
    # 解析参数
    args = json.loads(tool_call.function.arguments)
    # {"path": "config.json"}
    
    # 执行工具
    result = await self.tools.execute(
        name="read_file",
        arguments=args
    )
    # result = "File content of config.json:\n\n{...}"

3. 将结果反馈给 LLM

messages.append({
    "role": "tool",
    "tool_call_id": "call_abc123",
    "name": "read_file",
    "content": result
})

4. LLM 看到结果后继续推理

{
  "role": "assistant",
  "content": "这个配置文件包含了以下设置:..."
}

参数验证

工具使用 JSON Schema 定义参数:

@property
def parameters(self) -> dict[str, Any]:
    return {
        "type": "object",
        "properties": {
            "path": {
                "type": "string",
                "description": "File path"
            },
            "content": {
                "type": "string",
                "description": "Content to write"
            }
        },
        "required": ["path", "content"]
    }

LLM 会根据 schema 生成正确的参数,但最好在 execute() 中做额外验证:

async def execute(self, path: str, content: str) -> str:
    if not path:
        return "Error: path cannot be empty"
    
    if len(content) > 1_000_000:
        return "Error: content too large (max 1MB)"
    
    # ... 实际逻辑

错误处理最佳实践

1. 捕获所有异常

async def execute(self, **kwargs) -> str:
    try:
        # 工具逻辑
        return "Success"
    except FileNotFoundError as e:
        return f"File not found: {e.filename}"
    except PermissionError:
        return "Permission denied"
    except Exception as e:
        return f"Unexpected error: {str(e)}"

2. 返回详细错误信息

让 LLM 能够理解问题并采取行动:

# ❌ 不好
return "Error"

# ✅ 好
return "Error: File 'config.json' not found in /workspace. Available files: [...]"

3. 记录日志

logger.error(f"Tool {self.name} failed: {e}")
logger.debug(f"Tool {self.name} called with args: {kwargs}")

创建自定义工具

示例:天气查询工具

import httpx
from nanobot.agent.tools.base import Tool

class WeatherTool(Tool):
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    @property
    def name(self) -> str:
        return "get_weather"
    
    @property
    def description(self) -> str:
        return "Get current weather for a city"
    
    @property
    def parameters(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "City name (e.g., 'London', 'Tokyo')"
                },
                "units": {
                    "type": "string",
                    "enum": ["metric", "imperial"],
                    "description": "Temperature units",
                    "default": "metric"
                }
            },
            "required": ["city"]
        }
    
    async def execute(self, city: str, units: str = "metric") -> str:
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    "https://api.openweathermap.org/data/2.5/weather",
                    params={
                        "q": city,
                        "appid": self.api_key,
                        "units": units
                    }
                )
                response.raise_for_status()
                data = response.json()
                
                temp = data["main"]["temp"]
                desc = data["weather"][0]["description"]
                
                unit_symbol = "°C" if units == "metric" else "°F"
                return f"Weather in {city}: {desc}, {temp}{unit_symbol}"
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return f"City '{city}' not found"
            return f"Error fetching weather: {e}"
        except Exception as e:
            return f"Unexpected error: {str(e)}"

# 注册工具
agent.tools.register(WeatherTool(api_key="YOUR_API_KEY"))

工具组合使用

LLM 可以智能地组合多个工具:

用户“找到项目中所有的 Python 文件并统计行数”

迭代 1:
  LLM → exec(command="find . -name '*.py'")
  Tool → 返回文件列表
  
迭代 2:
  LLM → exec(command="wc -l file1.py file2.py ...")
  Tool → 返回行数统计
  
迭代 3:
  LLM → 总结结果:"找到 15 个 Python 文件,共 2,345 行代码"

性能优化

1. 缓存结果

class CachedWebFetchTool(WebFetchTool):
    def __init__(self):
        super().__init__()
        self._cache: dict[str, str] = {}
    
    async def execute(self, url: str) -> str:
        if url in self._cache:
            return self._cache[url]
        
        result = await super().execute(url)
        self._cache[url] = result
        return result

2. 超时控制

async def execute(self, command: str) -> str:
    try:
        result = await asyncio.wait_for(
            self._run_command(command),
            timeout=30.0  # 30 秒超时
        )
        return result
    except asyncio.TimeoutError:
        return "Error: Command timed out after 30 seconds"

3. 并发执行

如果 LLM 调用多个独立的工具,可以并发执行:

# 串行执行(当前实现)
for tool_call in response.tool_calls:
    result = await self.tools.execute(tool_call.name, tool_call.arguments)

# 并发执行(优化)
tasks = [
    self.tools.execute(tc.name, tc.arguments)
    for tc in response.tool_calls
]
results = await asyncio.gather(*tasks)

小结

通过本章,你应该掌握了:

  • ✅ 工具的抽象设计(Tool 基类)
  • ✅ 工具注册表的实现
  • ✅ 所有内置工具的功能和用法
  • ✅ 工具执行的完整流程
  • ✅ 如何创建自定义工具

关键要点

  • 工具是 Agent 与外界交互的唯一方式
  • 所有工具遵循统一接口(name, description, parameters, execute)
  • JSON Schema 用于定义参数格式
  • 错误处理至关重要(返回可理解的错误信息)

下一步10-技能系统.md - 了解如何扩展 Agent 的能力。