第六层:定时任务

📌 核心文件nanobot/cron/service.py
配置文件~/.nanobot/cron.json

概述

定时任务(Cron)允许 Agent 在指定时间或周期性地执行任务,无需用户手动触发。

CronJob 模型

class CronJob(BaseModel):
    """定时任务模型"""
    id: str                         # 任务唯一 ID
    name: str                       # 任务名称
    message: str                    # 要发送的消息
    cron_expr: str | None = None    # Cron 表达式
    interval_seconds: int | None = None  # 间隔秒数
    enabled: bool = True            # 是否启用
    channel: str | None = None      # 目标渠道
    to: str | None = None           # 接收者

两种定时方式

1. Cron 表达式

# 格式:分 时 日 月 周
# *    *  *  *  *

# 示例
"0 9 * * *"      # 每天 9:00
"*/30 * * * *"   # 每 30 分钟
"0 */2 * * *"    # 每 2 小时
"0 0 * * 1"      # 每周一 00:00
"0 12 1 * *"     # 每月 1 号 12:00

2. 固定间隔

interval_seconds=3600    # 每小时
interval_seconds=86400   # 每天
interval_seconds=300     # 每 5 分钟

CronService 实现

from croniter import croniter
from datetime import datetime

class CronService:
    """定时任务服务"""
    
    def __init__(self, cron_file: Path):
        self.cron_file = cron_file
        self.jobs: list[CronJob] = []
        self.callback: Callable | None = None
        self._running = False
        
        # 加载任务
        self.load_jobs()
    
    def load_jobs(self):
        """从文件加载任务"""
        if self.cron_file.exists():
            data = json.loads(self.cron_file.read_text())
            self.jobs = [CronJob(**job) for job in data]
    
    def save_jobs(self):
        """保存任务到文件"""
        data = [job.model_dump() for job in self.jobs]
        self.cron_file.write_text(json.dumps(data, indent=2))
    
    def set_callback(self, callback: Callable[[CronJob], Awaitable[None]]):
        """设置任务触发回调"""
        self.callback = callback
    
    async def start(self):
        """启动定时任务循环"""
        self._running = True
        
        while self._running:
            now = datetime.now()
            
            for job in self.jobs:
                if not job.enabled:
                    continue
                
                if await self._should_run(job, now):
                    if self.callback:
                        await self.callback(job)
                    
                    # 更新下次执行时间
                    self._update_next_run(job, now)
            
            # 每分钟检查一次
            await asyncio.sleep(60)
    
    async def _should_run(self, job: CronJob, now: datetime) -> bool:
        """判断任务是否应该执行"""
        if job.cron_expr:
            # 使用 croniter
            cron = croniter(job.cron_expr, now)
            next_run = cron.get_next(datetime)
            
            # 如果下次执行时间在过去 1 分钟内,则执行
            diff = (next_run - now).total_seconds()
            return -60 < diff <= 0
        
        elif job.interval_seconds:
            # 检查是否到达间隔时间
            last_run = getattr(job, '_last_run', None)
            if last_run is None:
                return True  # 首次运行
            
            elapsed = (now - last_run).total_seconds()
            return elapsed >= job.interval_seconds
        
        return False
    
    def _update_next_run(self, job: CronJob, now: datetime):
        """更新任务的最后执行时间"""
        job._last_run = now
    
    def add_job(self, job: CronJob):
        """添加任务"""
        self.jobs.append(job)
        self.save_jobs()
    
    def remove_job(self, job_id: str):
        """删除任务"""
        self.jobs = [j for j in self.jobs if j.id != job_id]
        self.save_jobs()
    
    def enable_job(self, job_id: str, enabled: bool = True):
        """启用/禁用任务"""
        for job in self.jobs:
            if job.id == job_id:
                job.enabled = enabled
                self.save_jobs()
                break

在 Gateway 中使用

# 初始化
cron_service = CronService(workspace / "cron.json")

# 设置回调:将定时消息发布到消息总线
async def on_cron_job(job: CronJob):
    await bus.publish_inbound(InboundMessage(
        channel=job.channel or "cli",
        sender_id="cron",
        chat_id=job.to or "system",
        content=job.message,
        session_key=f"cron:{job.id}"
    ))

cron_service.set_callback(on_cron_job)

# 并发运行
await asyncio.gather(
    agent.run(),
    bus.dispatch_outbound(),
    channel_manager.start(),
    cron_service.start(),  # 定时任务服务
)

CLI 命令

添加任务

# Cron 表达式
nanobot cron add \
  --name "morning" \
  --message "早上好!今天的任务..." \
  --cron "0 9 * * *"

# 固定间隔
nanobot cron add \
  --name "hourly" \
  --message "检查系统状态" \
  --every 3600

列出任务

nanobot cron list
nanobot cron list --all  # 包括禁用的

删除任务

nanobot cron remove <job_id>

启用/禁用

nanobot cron enable <job_id>
nanobot cron enable <job_id> --disable

手动执行

nanobot cron run <job_id>

使用场景

1. 每日提醒

{
  "id": "daily-standup",
  "name": "日报提醒",
  "message": "写今天的日报",
  "cron_expr": "0 18 * * 1-5",  // 工作日 18:00
  "channel": "telegram",
  "to": "123456"
}

2. 系统监控

{
  "id": "health-check",
  "name": "健康检查",
  "message": "检查所有服务的运行状态",
  "interval_seconds": 600,  //  10 分钟
  "channel": "cli"
}

3. 定期报告

{
  "id": "weekly-report",
  "name": "周报",
  "message": "生成本周的工作总结",
  "cron_expr": "0 17 * * 5",  // 每周五 17:00
  "channel": "telegram",
  "to": "123456"
}

与子代理的结合

定时任务可以触发子代理:

{
  "message": "spawn a subagent to check website status every hour"
}

Agent 理解后会创建子代理处理。

小结

  • ✅ 支持 Cron 表达式和固定间隔
  • ✅ 任务持久化到 JSON 文件
  • ✅ 通过消息总线触发 Agent
  • ✅ 支持多渠道通知

下一步15-心跳机制.md