14 定时任务
第六层:定时任务
📌 核心文件:
nanobot/cron/service.py
配置文件:~/.nanobot/cron.json
概述
定时任务(Cron)允许 Agent 在指定时间或周期性地执行任务,无需用户手动触发。
CronJob 模型
class CronJob(BaseModel):
"""定时任务模型"""
id: str # 任务唯一 ID
name: str # 任务名称
message: str # 要发送的消息
cron_expr: str | None = None # Cron 表达式
interval_seconds: int | None = None # 间隔秒数
enabled: bool = True # 是否启用
channel: str | None = None # 目标渠道
to: str | None = None # 接收者
两种定时方式
1. Cron 表达式
# 格式:分 时 日 月 周
# * * * * *
# 示例
"0 9 * * *" # 每天 9:00
"*/30 * * * *" # 每 30 分钟
"0 */2 * * *" # 每 2 小时
"0 0 * * 1" # 每周一 00:00
"0 12 1 * *" # 每月 1 号 12:00
2. 固定间隔
interval_seconds=3600 # 每小时
interval_seconds=86400 # 每天
interval_seconds=300 # 每 5 分钟
CronService 实现
from croniter import croniter
from datetime import datetime
class CronService:
"""定时任务服务"""
def __init__(self, cron_file: Path):
self.cron_file = cron_file
self.jobs: list[CronJob] = []
self.callback: Callable | None = None
self._running = False
# 加载任务
self.load_jobs()
def load_jobs(self):
"""从文件加载任务"""
if self.cron_file.exists():
data = json.loads(self.cron_file.read_text())
self.jobs = [CronJob(**job) for job in data]
def save_jobs(self):
"""保存任务到文件"""
data = [job.model_dump() for job in self.jobs]
self.cron_file.write_text(json.dumps(data, indent=2))
def set_callback(self, callback: Callable[[CronJob], Awaitable[None]]):
"""设置任务触发回调"""
self.callback = callback
async def start(self):
"""启动定时任务循环"""
self._running = True
while self._running:
now = datetime.now()
for job in self.jobs:
if not job.enabled:
continue
if await self._should_run(job, now):
if self.callback:
await self.callback(job)
# 更新下次执行时间
self._update_next_run(job, now)
# 每分钟检查一次
await asyncio.sleep(60)
async def _should_run(self, job: CronJob, now: datetime) -> bool:
"""判断任务是否应该执行"""
if job.cron_expr:
# 使用 croniter
cron = croniter(job.cron_expr, now)
next_run = cron.get_next(datetime)
# 如果下次执行时间在过去 1 分钟内,则执行
diff = (next_run - now).total_seconds()
return -60 < diff <= 0
elif job.interval_seconds:
# 检查是否到达间隔时间
last_run = getattr(job, '_last_run', None)
if last_run is None:
return True # 首次运行
elapsed = (now - last_run).total_seconds()
return elapsed >= job.interval_seconds
return False
def _update_next_run(self, job: CronJob, now: datetime):
"""更新任务的最后执行时间"""
job._last_run = now
def add_job(self, job: CronJob):
"""添加任务"""
self.jobs.append(job)
self.save_jobs()
def remove_job(self, job_id: str):
"""删除任务"""
self.jobs = [j for j in self.jobs if j.id != job_id]
self.save_jobs()
def enable_job(self, job_id: str, enabled: bool = True):
"""启用/禁用任务"""
for job in self.jobs:
if job.id == job_id:
job.enabled = enabled
self.save_jobs()
break
在 Gateway 中使用
# 初始化
cron_service = CronService(workspace / "cron.json")
# 设置回调:将定时消息发布到消息总线
async def on_cron_job(job: CronJob):
await bus.publish_inbound(InboundMessage(
channel=job.channel or "cli",
sender_id="cron",
chat_id=job.to or "system",
content=job.message,
session_key=f"cron:{job.id}"
))
cron_service.set_callback(on_cron_job)
# 并发运行
await asyncio.gather(
agent.run(),
bus.dispatch_outbound(),
channel_manager.start(),
cron_service.start(), # 定时任务服务
)
CLI 命令
添加任务
# Cron 表达式
nanobot cron add \
--name "morning" \
--message "早上好!今天的任务..." \
--cron "0 9 * * *"
# 固定间隔
nanobot cron add \
--name "hourly" \
--message "检查系统状态" \
--every 3600
列出任务
nanobot cron list
nanobot cron list --all # 包括禁用的
删除任务
nanobot cron remove <job_id>
启用/禁用
nanobot cron enable <job_id>
nanobot cron enable <job_id> --disable
手动执行
nanobot cron run <job_id>
使用场景
1. 每日提醒
{
"id": "daily-standup",
"name": "日报提醒",
"message": "写今天的日报",
"cron_expr": "0 18 * * 1-5", // 工作日 18:00
"channel": "telegram",
"to": "123456"
}
2. 系统监控
{
"id": "health-check",
"name": "健康检查",
"message": "检查所有服务的运行状态",
"interval_seconds": 600, // 每 10 分钟
"channel": "cli"
}
3. 定期报告
{
"id": "weekly-report",
"name": "周报",
"message": "生成本周的工作总结",
"cron_expr": "0 17 * * 5", // 每周五 17:00
"channel": "telegram",
"to": "123456"
}
与子代理的结合
定时任务可以触发子代理:
{
"message": "spawn a subagent to check website status every hour"
}
Agent 理解后会创建子代理处理。
小结
- ✅ 支持 Cron 表达式和固定间隔
- ✅ 任务持久化到 JSON 文件
- ✅ 通过消息总线触发 Agent
- ✅ 支持多渠道通知
下一步:15-心跳机制.md