diff --git a/README.md b/README.md index 35687da..a7e4e80 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ > 连接 200+ 应用系统,实现企业数据自动化流转。Auto Task Runner 正是我们在 > AI 辅助研发实践中沉淀出的工程工具。 +--- + ## 特性 - 📁 **项目化架构** — 以项目为中心,支持多任务集、运行历史、模板管理 @@ -29,113 +31,31 @@ - 🛡️ **健壮可靠** — PTY 色彩保留、原子写入、优雅信号处理、git 安全标签 - ⏱️ **防误标** — AI CLI 执行低于 10s 自动标记失败(防止空跑) - 🕐 **防封号** — 任务间随机延时(默认 60-120s),降低被检测为机器人的风险 +- 📢 **企业微信通知** — 批次完成、任务失败、中断时自动推送(可选) --- -## 快速上手(5 分钟) - -### 第 1 步:安装 +## 快速上手 ```bash -git clone https://github.com/qeasy-cloud/auto-run-task.git -cd auto-run-task -pip install rich # 唯一依赖 -``` +# 1. 安装 +pip install rich -### 第 2 步:创建项目 +# 2. 创建项目 +python run.py project create MY_PROJECT --workspace /path/to/repo -```bash -python run.py project create MY_PROJECT \ - --workspace /path/to/your/repo \ - --description "我的批量修复项目" -``` - -这会在 `projects/MY_PROJECT/` 下生成项目骨架: - -``` -projects/MY_PROJECT/ -├── __init__.json # 项目配置 -├── templates/ -│ └── __init__.md # 默认 Prompt 模板(可编辑) -└── runtime/ # 运行时输出(自动生成) -``` - -### 第 3 步:编写 Prompt 模板 - -编辑 `projects/MY_PROJECT/templates/__init__.md`: - -```markdown -## Task: {{task_name}} - -### Description -{{description}} - -### Task Data -\`\`\`json -#item -\`\`\` - -### Instructions -1. Read the task description and understand the requirement -2. Implement the changes following project conventions -3. Verify your changes -``` +# 3. 编写 templates/__init__.md 和 *.tasks.json(见下方结构说明) -- `{{key}}` — 替换为任务字段值(如 `{{task_name}}`, `{{description}}`) -- `#item` — 替换为整个任务对象的 JSON - -### 第 4 步:创建任务集 - -在项目目录下创建 `projects/MY_PROJECT/fix-bugs.tasks.json`: - -```json -{ - "template": "templates/__init__.md", - "tasks": [ - { - "task_no": "F-1", - "task_name": "修复用户登录验证", - "batch": 1, - "description": "用户登录时未校验密码强度", - "priority": 10, - "status": "not-started" - }, - { - "task_no": "F-2", - "task_name": "修复订单金额计算", - "batch": 1, - "description": "订单金额小数精度丢失", - "priority": 20, - "status": "not-started" - }, - { - "task_no": "F-3", - "task_name": "添加接口鉴权", - "batch": 2, - "description": "REST API 缺少 JWT 鉴权中间件", - "priority": 10, - "status": "not-started", - "depends_on": "F-1" - } - ] -} +# 4. 执行 +python run.py dry-run MY_PROJECT my-tasks # 预览 +python run.py run MY_PROJECT my-tasks # 执行 ``` -### 第 5 步:执行! - -```bash -# 先预览(不真正执行) -python run.py dry-run MY_PROJECT fix-bugs - -# 确认无误后执行 -python run.py run MY_PROJECT fix-bugs -``` +**完整操作指引、命令示例与典型工作流** → [用户操作指南](docs/USER_GUIDE.md) --- -## 命令速查表 - -### 总览 +## 命令速查 | 命令 | 说明 | | --- | --- | @@ -150,202 +70,6 @@ python run.py run MY_PROJECT fix-bugs | `list` | 列出任务集/任务 | | `status` | 项目状态仪表板 | -### 项目管理 - -```bash -# 创建项目 -python run.py project create FIX_CODE --workspace /path/to/repo --description "修复代码" - -# 列出所有项目 -python run.py project list - -# 查看项目详情(任务集、运行历史等) -python run.py project info FIX_CODE - -# 验证项目结构是否正确 -python run.py project validate FIX_CODE - -# 归档项目(标记为 archived) -python run.py project archive FIX_CODE -``` - -### 执行任务 - -```bash -# 基本执行(使用项目默认 tool/model) -python run.py run FIX_CODE code-quality-fix - -# 指定工具和模型 -python run.py run FIX_CODE code-quality-fix --tool agent --model opus-4.6 -python run.py run FIX_CODE code-quality-fix --tool kimi -python run.py run FIX_CODE code-quality-fix --tool copilot --model claude-opus-4.6 - -# 只运行指定批次 -python run.py run FIX_CODE code-quality-fix --batch 1 - -# 从指定任务开始(跳过前面的任务) -python run.py run FIX_CODE code-quality-fix --start F-3 - -# 只重跑失败的任务 -python run.py run FIX_CODE code-quality-fix --retry-failed - -# 代理控制 -python run.py run FIX_CODE code-quality-fix --proxy # 强制启用代理 -python run.py run FIX_CODE code-quality-fix --no-proxy # 强制关闭代理 - -# 自定义模板 -python run.py run FIX_CODE code-quality-fix --template templates/custom.md - -# 指定工作目录(覆盖项目配置) -python run.py run FIX_CODE code-quality-fix --work-dir /other/repo - -# Git 安全模式(执行前自动创建 git tag 作为回退点) -python run.py run FIX_CODE code-quality-fix --git-safety - -# 任务间延时控制(防止被检测为机器人) -python run.py run FIX_CODE code-quality-fix --delay 60-120 # 随机 60~120s(默认) -python run.py run FIX_CODE code-quality-fix --delay 30 # 固定 30s -python run.py run FIX_CODE code-quality-fix --delay 0 # 不延时 - -# 输出控制 -python run.py run FIX_CODE code-quality-fix --verbose # 详细模式 -python run.py run FIX_CODE code-quality-fix --quiet # 安静模式 -python run.py run FIX_CODE code-quality-fix --no-color # 无颜色(CI 环境) - -# 心跳间隔 -python run.py run FIX_CODE code-quality-fix --heartbeat 30 # 每 30s 打印一次状态 -``` - -### 重置任务状态 - -当你需要重新执行任务时,先重置状态再运行: - -```bash -# 重置所有失败的任务 -python run.py reset FIX_CODE code-quality-fix --status failed - -# 重置所有被中断的任务 -python run.py reset FIX_CODE code-quality-fix --status interrupted - -# 从 F-3 开始的所有任务重置 -python run.py reset FIX_CODE code-quality-fix --from F-3 - -# 重置全部任务(完全重跑) -python run.py reset FIX_CODE code-quality-fix --all - -# 只重置第 2 批中失败的任务 -python run.py reset FIX_CODE code-quality-fix --status failed --batch 2 - -# 重置后执行 -python run.py reset FIX_CODE code-quality-fix --status failed -python run.py run FIX_CODE code-quality-fix --retry-failed - -# 或者重置后从某个任务开始执行 -python run.py reset FIX_CODE code-quality-fix --from F-3 -python run.py run FIX_CODE code-quality-fix --start F-3 -``` - -### Dry-run 预览 - -```bash -# 生成 prompt 但不执行(检查渲染结果) -python run.py dry-run FIX_CODE code-quality-fix - -# 预览指定批次 -python run.py dry-run FIX_CODE code-quality-fix --batch 1 -``` - -### 列出任务 - -```bash -# 列出项目内所有任务集 -python run.py list FIX_CODE - -# 列出特定任务集的任务 -python run.py list FIX_CODE code-quality-fix - -# 按状态过滤 -python run.py list FIX_CODE code-quality-fix --status failed -python run.py list FIX_CODE code-quality-fix --status completed -python run.py list FIX_CODE code-quality-fix --status not-started -``` - -### 状态仪表板 - -```bash -# 全局仪表板(所有项目概览) -python run.py status - -# 单项目详情 -python run.py status FIX_CODE -``` - ---- - -## 典型工作流 - -### 场景 1:批量修复 → 检查 → 重跑失败 - -```bash -# 1. 创建项目 -python run.py project create BUG_FIX --workspace /home/user/my-app - -# 2. 编写任务集 + 模板(见上方说明) - -# 3. 预览确认 -python run.py dry-run BUG_FIX fix-bugs - -# 4. 执行全部任务 -python run.py run BUG_FIX fix-bugs - -# 5. 查看结果 -python run.py list BUG_FIX fix-bugs --status failed -python run.py status BUG_FIX - -# 6. 重跑失败的任务 -python run.py run BUG_FIX fix-bugs --retry-failed - -# 7. 如果需要完全重跑某些任务 -python run.py reset BUG_FIX fix-bugs --from F-5 -python run.py run BUG_FIX fix-bugs --start F-5 -``` - -### 场景 2:分批执行大量任务 - -```bash -# 先跑第 1 批(基础任务) -python run.py run MY_PROJECT migration --batch 1 - -# 手动检查结果后,再跑第 2 批 -python run.py run MY_PROJECT migration --batch 2 - -# 最后跑第 3 批 -python run.py run MY_PROJECT migration --batch 3 -``` - -### 场景 3:不同任务用不同 AI 工具 - -在 `.tasks.json` 中为不同任务指定不同的 tool/model: - -```json -{ - "tasks": [ - { "task_no": "T-1", "cli": { "tool": "kimi" }, "..." : "..." }, - { "task_no": "T-2", "cli": { "tool": "agent", "model": "opus-4.6" }, "..." : "..." }, - { "task_no": "T-3", "cli": { "tool": "copilot", "model": "claude-opus-4.6" }, "..." : "..." } - ] -} -``` - -### 场景 4:中断后继续 - -```bash -# 执行过程中按 CTRL+C 优雅中断 -# 已完成的任务状态已保存,再次运行会自动跳过已完成的任务 -python run.py run MY_PROJECT my-tasks -# → 自动从上次中断的位置继续 -``` - --- ## 支持的工具 @@ -357,175 +81,35 @@ python run.py run MY_PROJECT my-tasks | `copilot` | `claude-opus-4.6` | ✓ | GitHub Copilot CLI | | `claude` | 固定 | ✓ | Claude CLI(单模型) | +--- + ## 项目结构 ``` auto-run-task/ -├── run.py # 入口 (子命令分发) -├── task_runner/ -│ ├── __init__.py # v3.0.0 -│ ├── cli.py # 子命令架构 + Legacy 兼容 -│ ├── config.py # 工具/模型配置 -│ ├── display/ # Rich 终端显示(模块化) -│ │ ├── __init__.py # 统一导出 -│ │ ├── core.py # Console 单例 & 常量 -│ │ ├── banners.py # 启动横幅 -│ │ ├── tasks.py # 任务列表 & 执行展示 -│ │ ├── tracker.py # Rich Live 实时面板 -│ │ ├── summary.py # 执行摘要 & 进度条 -│ │ ├── projects.py # 项目仪表板 -│ │ └── messages.py # 错误/警告/提示消息 -│ ├── executor.py # PTY 任务执行引擎 -│ ├── renderer.py # 模板渲染 -│ ├── state.py # Legacy 状态管理 -│ ├── project.py # 项目 CRUD + 验证 -│ ├── task_set.py # 任务集加载/验证/保存 -│ ├── runtime.py # 运行时目录管理 -│ ├── scheduler.py # 调度器(排序/过滤/依赖) -│ ├── validators.py # 验证框架 -│ └── commands/ # 命令处理器 -│ ├── project_cmd.py # project 子命令 -│ ├── run_cmd.py # run 子命令 -│ ├── dryrun_cmd.py # dry-run 子命令 -│ ├── reset_cmd.py # reset 子命令 -│ ├── list_cmd.py # list 子命令 -│ └── status_cmd.py # status 子命令 -├── projects/ # 项目目录 (gitignored) -│ └── EXAMPLE/ # 示例项目 -└── example/ # 旧版示例(供参考) -``` - -### 项目目录结构 - +├── run.py # 入口 +├── task_runner/ # 核心模块 +├── projects/ # 项目目录(gitignored) +│ └── MY_PROJECT/ +│ ├── __init__.json # 项目配置 +│ ├── *.tasks.json # 任务集 +│ ├── templates/ # Prompt 模板 +│ └── runtime/ # 运行输出 +└── docs/ + └── USER_GUIDE.md # 用户操作指南 ``` -projects/FIX_CODE/ -├── __init__.json # 项目元数据(必须) -├── code-quality-fix.tasks.json # 任务集文件(可多个) -├── feature-dev.tasks.json -├── templates/ # 提示词模板目录 -│ ├── __init__.md # 默认模板(必须) -│ └── custom-fix.md # 自定义模板 -└── runtime/ # 运行时输出 - ├── runs/ # 按运行记录存储 - │ └── 2024-06-01_10-00-00__code-quality-fix/ - │ ├── run.json # 运行元数据 - │ ├── prompts/ # 渲染后的 prompt - │ ├── logs/ # 执行日志(.log 原始 + .clean.log 净化版) - │ └── summary.json # 运行摘要 - ├── latest -> runs/... # 最新运行的软链接 - └── backups/ # 任务集备份 -``` - -## 数据结构 - -### `__init__.json` — 项目配置 - -```json -{ - "project": "FIX_CODE", - "description": "A project to fix code issues", - "workspace": "/home/user/workspace/my-repo", - "status": "planned", - "created_at": "2024-06-01_10-00-00", - "default_tool": "copilot", - "default_model": "claude-opus-4.6", - "tags": ["code-quality"], - "run_record": [ - { - "run_at": "2024-06-01_10-00-00", - "stop_at": "2024-06-01_12-00-00", - "cumulated_minutes": 120, - "status": "completed", - "task_set_name": "code-quality-fix", - "tasks_attempted": 6, - "tasks_succeeded": 5, - "tasks_failed": 1 - } - ] -} -``` - -### `.tasks.json` — 任务集 - -```json -{ - "template": "templates/__init__.md", - "tasks": [ - { - "task_no": "F-1", - "task_name": "创建 Product 模型", - "batch": 1, - "description": "创建 Product 模型,包含 name, code 等字段", - "priority": 10, - "status": "not-started", - "depends_on": null, - "cli": { "tool": "copilot", "model": "claude-opus-4.6" } - } - ] -} -``` - -**任务字段说明:** - -| 字段 | 必填 | 说明 | -| --- | --- | --- | -| `task_no` | ✓ | 任务编号(如 `F-1`, `RT-001`),全局唯一 | -| `task_name` | ✓ | 任务名称 | -| `batch` | | 批次号(默认 1),同批次内按 priority 排序 | -| `description` | | 任务描述,渲染到 prompt 模板 | -| `priority` | | 优先级(越小越先执行,默认 50) | -| `status` | | 状态:`not-started` / `in-progress` / `completed` / `failed` / `interrupted` | -| `prompt` | | 任务级模板覆盖(相对路径) | -| `cli.tool` | | 任务级工具覆盖 | -| `cli.model` | | 任务级模型覆盖 | -| `depends_on` | | 依赖的任务编号 | - -### 默认值解析链 - -任务的 `tool` 和 `model` 按以下优先级解析: - -1. **任务级** — `task.cli.tool` / `task.cli.model` -2. **命令行级** — `--tool` / `--model` -3. **项目级** — `__init__.json` 中的 `default_tool` / `default_model` -4. **全局默认** — `copilot` / `claude-opus-4.6` - -## Prompt 模板格式 - -模板使用两种占位符: -| 占位符 | 替换为 | 示例 | -| --------- | ---------------------------- | -------------------------- | -| `{{key}}` | `task[key]` 的值 | `{{task_name}}` → 任务名称 | -| `#item` | 整个 task 对象的 JSON 字符串 | 完整任务上下文 | - -如果值是 dict/list 类型,会自动序列化为 JSON 字符串。 +--- -## 代理控制逻辑 +## 核心概念 -| 工具 | `--proxy` | `--no-proxy` | 默认行为 | -| ------- | --------- | ------------ | ------------ | -| kimi | 启用代理 | 关闭代理 | **关闭代理** | -| agent | 启用代理 | 关闭代理 | **启用代理** | -| copilot | 启用代理 | 关闭代理 | **启用代理** | -| claude | 启用代理 | 关闭代理 | **启用代理** | +- **`{{key}}`** — 模板占位符,替换为 `task[key]` 的值 +- **`#item`** — 替换为整个任务对象的 JSON +- **任务字段** — `task_no`, `task_name`, `batch`, `priority`, `status`, `depends_on`, `cli.tool`, `cli.model` 等 -## 执行安全机制 +详见 [用户操作指南 - 数据结构](docs/USER_GUIDE.md#数据结构详解)。 -| 机制 | 说明 | -| --- | --- | -| **最短执行时间** | AI CLI 执行不足 10 秒自动标记为失败(防止空跑误标成功) | -| **任务间延时** | 默认随机等待 60-120 秒,降低触发反爬/封号风险,`--delay 0` 可关闭 | -| **PTY 色彩保留** | 使用伪终端执行,AI CLI 的彩色输出原样呈现 | -| **自动降级** | PTY 不可用时自动切换 PIPE 模式 | -| **日志全量捕获** | 终端实时输出的同时写入日志文件,同时生成去噪净化版 `.clean.log` | -| **心跳 & 标题** | 长时间运行时定期打印状态,终端标题显示任务进度 | -| **优雅中断** | 第一次 CTRL+C 优雅终止当前任务并保存状态,第二次强制退出 | -| **状态持久化** | 每个任务完成后立即更新 JSON,崩溃后可从断点续跑 | -| **原子写入** | JSON 保存使用 tmp + rename,防止写入中途断电损坏 | -| **自动备份** | 执行前自动备份 .tasks.json 文件 | -| **运行历史** | 每次运行自动记录到 __init__.json | -| **latest 软链接** | runtime/latest 始终指向最新运行目录 | -| **Git 安全** | --git-safety 执行前检查 git 状态并创建安全 tag | +--- ## 环境要求 @@ -534,44 +118,24 @@ projects/FIX_CODE/ - 对应的 AI CLI 工具已安装并在 PATH 中 - 需要代理的工具,确保系统已配置 `HTTP_PROXY` / `HTTPS_PROXY` 环境变量 -## 调试 - -```bash -DEBUG=1 python run.py run MY_PROJECT my-tasks -``` - -## Legacy 兼容(已弃用) - -旧版 `--plan` 模式仍可使用,但会显示弃用警告,将在 v4.0 移除: - -```bash -python run.py --plan plan.json --project my-fix --template prompt.md -``` +--- ## 开源信息 ### 许可证 -本项目基于 [MIT License](LICENSE) 开源。您可以自由使用、修改和分发本软件。 +本项目基于 [MIT License](LICENSE) 开源。 ### 作者 **广东轻亿云软件科技有限公司(QeasyCloud)** 研发团队 -- 🏢 公司:广东轻亿云软件科技有限公司 - 🌐 官网:[https://www.qeasy.cloud](https://www.qeasy.cloud) -- 🚀 核心产品:[轻易云数据集成平台](https://www.qeasy.cloud) — 连接 200+ 应用,一站式企业数据集成 - 📦 GitHub:[https://github.com/qeasy-cloud](https://github.com/qeasy-cloud) -### 相关开源项目 - -| 项目 | 说明 | -| --- | --- | -| [auto-run-task](https://github.com/qeasy-cloud/auto-run-task) | AI Agent CLI 批量任务执行引擎(本项目) | - ### 贡献 -欢迎提交 Issue 和 Pull Request!请参阅项目的 GitHub 仓库参与贡献。 +欢迎提交 Issue 和 Pull Request! --- diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md new file mode 100644 index 0000000..e07601f --- /dev/null +++ b/docs/USER_GUIDE.md @@ -0,0 +1,466 @@ +# Auto Task Runner — 用户操作指南 + +> 详细的操作指引、命令示例与典型工作流。项目概览请参阅 [README](../README.md)。 + +--- + +## 快速上手(5 分钟) + +### 第 1 步:安装 + +```bash +git clone https://github.com/qeasy-cloud/auto-run-task.git +cd auto-run-task +pip install rich # 唯一依赖 +``` + +### 第 2 步:创建项目 + +```bash +python run.py project create MY_PROJECT \ + --workspace /path/to/your/repo \ + --description "我的批量修复项目" +``` + +这会在 `projects/MY_PROJECT/` 下生成项目骨架: + +``` +projects/MY_PROJECT/ +├── __init__.json # 项目配置 +├── templates/ +│ └── __init__.md # 默认 Prompt 模板(可编辑) +└── runtime/ # 运行时输出(自动生成) +``` + +### 第 3 步:编写 Prompt 模板 + +编辑 `projects/MY_PROJECT/templates/__init__.md`: + +```markdown +## Task: {{task_name}} + +### Description +{{description}} + +### Task Data +\`\`\`json +#item +\`\`\` + +### Instructions +1. Read the task description and understand the requirement +2. Implement the changes following project conventions +3. Verify your changes +``` + +- `{{key}}` — 替换为任务字段值(如 `{{task_name}}`, `{{description}}`) +- `#item` — 替换为整个任务对象的 JSON + +### 第 4 步:创建任务集 + +在项目目录下创建 `projects/MY_PROJECT/fix-bugs.tasks.json`: + +```json +{ + "template": "templates/__init__.md", + "tasks": [ + { + "task_no": "F-1", + "task_name": "修复用户登录验证", + "batch": 1, + "description": "用户登录时未校验密码强度", + "priority": 10, + "status": "not-started" + }, + { + "task_no": "F-2", + "task_name": "修复订单金额计算", + "batch": 1, + "description": "订单金额小数精度丢失", + "priority": 20, + "status": "not-started" + }, + { + "task_no": "F-3", + "task_name": "添加接口鉴权", + "batch": 2, + "description": "REST API 缺少 JWT 鉴权中间件", + "priority": 10, + "status": "not-started", + "depends_on": "F-1" + } + ] +} +``` + +### 第 5 步:执行! + +```bash +# 先预览(不真正执行) +python run.py dry-run MY_PROJECT fix-bugs + +# 确认无误后执行 +python run.py run MY_PROJECT fix-bugs +``` + +--- + +## 命令速查与示例 + +### 项目管理 + +```bash +# 创建项目 +python run.py project create FIX_CODE --workspace /path/to/repo --description "修复代码" + +# 列出所有项目 +python run.py project list + +# 查看项目详情(任务集、运行历史等) +python run.py project info FIX_CODE + +# 验证项目结构是否正确 +python run.py project validate FIX_CODE + +# 归档项目(标记为 archived) +python run.py project archive FIX_CODE +``` + +### 执行任务 + +```bash +# 基本执行(使用项目默认 tool/model) +python run.py run FIX_CODE code-quality-fix + +# 指定工具和模型 +python run.py run FIX_CODE code-quality-fix --tool agent --model opus-4.6 +python run.py run FIX_CODE code-quality-fix --tool kimi +python run.py run FIX_CODE code-quality-fix --tool copilot --model claude-opus-4.6 + +# 只运行指定批次 +python run.py run FIX_CODE code-quality-fix --batch 1 + +# 从指定任务开始(跳过前面的任务) +python run.py run FIX_CODE code-quality-fix --start F-3 + +# 只重跑失败的任务 +python run.py run FIX_CODE code-quality-fix --retry-failed + +# 代理控制 +python run.py run FIX_CODE code-quality-fix --proxy # 强制启用代理 +python run.py run FIX_CODE code-quality-fix --no-proxy # 强制关闭代理 + +# 自定义模板 +python run.py run FIX_CODE code-quality-fix --template templates/custom.md + +# 指定工作目录(覆盖项目配置) +python run.py run FIX_CODE code-quality-fix --work-dir /other/repo + +# Git 安全模式(执行前自动创建 git tag 作为回退点) +python run.py run FIX_CODE code-quality-fix --git-safety + +# 任务间延时控制(防止被检测为机器人) +python run.py run FIX_CODE code-quality-fix --delay 60-120 # 随机 60~120s(默认) +python run.py run FIX_CODE code-quality-fix --delay 30 # 固定 30s +python run.py run FIX_CODE code-quality-fix --delay 0 # 不延时 + +# 企业微信通知(需配置 TASK_RUNNER_WECOM_WEBHOOK 环境变量) +python run.py run FIX_CODE code-quality-fix # 默认启用 +python run.py run FIX_CODE code-quality-fix --no-notify # 关闭通知 +python run.py run FIX_CODE code-quality-fix --notify-each # 每个任务完成都通知 +python run.py run FIX_CODE code-quality-fix --wecom-webhook "https://..." # 命令行指定 webhook + +# 输出控制 +python run.py run FIX_CODE code-quality-fix --verbose # 详细模式 +python run.py run FIX_CODE code-quality-fix --quiet # 安静模式 +python run.py run FIX_CODE code-quality-fix --no-color # 无颜色(CI 环境) + +# 心跳间隔 +python run.py run FIX_CODE code-quality-fix --heartbeat 30 # 每 30s 打印一次状态 +``` + +### 重置任务状态 + +当你需要重新执行任务时,先重置状态再运行: + +```bash +# 重置所有失败的任务 +python run.py reset FIX_CODE code-quality-fix --status failed + +# 重置所有被中断的任务 +python run.py reset FIX_CODE code-quality-fix --status interrupted + +# 从 F-3 开始的所有任务重置 +python run.py reset FIX_CODE code-quality-fix --from F-3 + +# 重置全部任务(完全重跑) +python run.py reset FIX_CODE code-quality-fix --all + +# 只重置第 2 批中失败的任务 +python run.py reset FIX_CODE code-quality-fix --status failed --batch 2 + +# 重置后执行 +python run.py reset FIX_CODE code-quality-fix --status failed +python run.py run FIX_CODE code-quality-fix --retry-failed + +# 或者重置后从某个任务开始执行 +python run.py reset FIX_CODE code-quality-fix --from F-3 +python run.py run FIX_CODE code-quality-fix --start F-3 +``` + +### Dry-run 预览 + +```bash +# 生成 prompt 但不执行(检查渲染结果) +python run.py dry-run FIX_CODE code-quality-fix + +# 预览指定批次 +python run.py dry-run FIX_CODE code-quality-fix --batch 1 +``` + +### 列出任务 + +```bash +# 列出项目内所有任务集 +python run.py list FIX_CODE + +# 列出特定任务集的任务 +python run.py list FIX_CODE code-quality-fix + +# 按状态过滤 +python run.py list FIX_CODE code-quality-fix --status failed +python run.py list FIX_CODE code-quality-fix --status completed +python run.py list FIX_CODE code-quality-fix --status not-started +``` + +### 状态仪表板 + +```bash +# 全局仪表板(所有项目概览) +python run.py status + +# 单项目详情 +python run.py status FIX_CODE +``` + +--- + +## 典型工作流 + +### 场景 1:批量修复 → 检查 → 重跑失败 + +```bash +# 1. 创建项目 +python run.py project create BUG_FIX --workspace /home/user/my-app + +# 2. 编写任务集 + 模板(见上方说明) + +# 3. 预览确认 +python run.py dry-run BUG_FIX fix-bugs + +# 4. 执行全部任务 +python run.py run BUG_FIX fix-bugs + +# 5. 查看结果 +python run.py list BUG_FIX fix-bugs --status failed +python run.py status BUG_FIX + +# 6. 重跑失败的任务 +python run.py run BUG_FIX fix-bugs --retry-failed + +# 7. 如果需要完全重跑某些任务 +python run.py reset BUG_FIX fix-bugs --from F-5 +python run.py run BUG_FIX fix-bugs --start F-5 +``` + +### 场景 2:分批执行大量任务 + +```bash +# 先跑第 1 批(基础任务) +python run.py run MY_PROJECT migration --batch 1 + +# 手动检查结果后,再跑第 2 批 +python run.py run MY_PROJECT migration --batch 2 + +# 最后跑第 3 批 +python run.py run MY_PROJECT migration --batch 3 +``` + +### 场景 3:不同任务用不同 AI 工具 + +在 `.tasks.json` 中为不同任务指定不同的 tool/model: + +```json +{ + "tasks": [ + { "task_no": "T-1", "cli": { "tool": "kimi" }, "..." : "..." }, + { "task_no": "T-2", "cli": { "tool": "agent", "model": "opus-4.6" }, "..." : "..." }, + { "task_no": "T-3", "cli": { "tool": "copilot", "model": "claude-opus-4.6" }, "..." : "..." } + ] +} +``` + +### 场景 4:中断后继续 + +```bash +# 执行过程中按 CTRL+C 优雅中断 +# 已完成的任务状态已保存,再次运行会自动跳过已完成的任务 +python run.py run MY_PROJECT my-tasks +# → 自动从上次中断的位置继续 +``` + +--- + +## 数据结构详解 + +### `__init__.json` — 项目配置 + +```json +{ + "project": "FIX_CODE", + "description": "A project to fix code issues", + "workspace": "/home/user/workspace/my-repo", + "status": "planned", + "created_at": "2024-06-01_10-00-00", + "default_tool": "kimi", + "default_model": "", + "tags": ["code-quality"], + "run_record": [ + { + "run_at": "2024-06-01_10-00-00", + "stop_at": "2024-06-01_12-00-00", + "cumulated_minutes": 120, + "status": "completed", + "task_set_name": "code-quality-fix", + "tasks_attempted": 6, + "tasks_succeeded": 5, + "tasks_failed": 1 + } + ] +} +``` + +### `.tasks.json` — 任务集 + +```json +{ + "template": "templates/__init__.md", + "tasks": [ + { + "task_no": "F-1", + "task_name": "创建 Product 模型", + "batch": 1, + "description": "创建 Product 模型,包含 name, code 等字段", + "priority": 10, + "status": "not-started", + "depends_on": null, + "cli": { "tool": "copilot", "model": "claude-opus-4.6" } + } + ] +} +``` + +**任务字段说明:** + +| 字段 | 必填 | 说明 | +| --- | --- | --- | +| `task_no` | ✓ | 任务编号(如 `F-1`, `RT-001`),全局唯一 | +| `task_name` | ✓ | 任务名称 | +| `batch` | | 批次号(默认 1),同批次内按 priority 排序 | +| `description` | | 任务描述,渲染到 prompt 模板 | +| `priority` | | 优先级(越小越先执行,默认 50) | +| `status` | | 状态:`not-started` / `in-progress` / `completed` / `failed` / `interrupted` | +| `prompt` | | 任务级模板覆盖(相对路径) | +| `cli.tool` | | 任务级工具覆盖 | +| `cli.model` | | 任务级模型覆盖 | +| `depends_on` | | 依赖的任务编号 | + +### 默认值解析链 + +任务的 `tool` 和 `model` 按以下优先级解析: + +1. **任务级** — `task.cli.tool` / `task.cli.model` +2. **命令行级** — `--tool` / `--model` +3. **项目级** — `__init__.json` 中的 `default_tool` / `default_model` +4. **全局默认** — `kimi` / 空(kimi 不支持 model 选择) + +--- + +## Prompt 模板格式 + +模板使用两种占位符: + +| 占位符 | 替换为 | 示例 | +| --------- | ---------------------------- | -------------------------- | +| `{{key}}` | `task[key]` 的值 | `{{task_name}}` → 任务名称 | +| `#item` | 整个 task 对象的 JSON 字符串 | 完整任务上下文 | + +如果值是 dict/list 类型,会自动序列化为 JSON 字符串。 + +--- + +## 代理控制逻辑 + +| 工具 | `--proxy` | `--no-proxy` | 默认行为 | +| ------- | --------- | ------------ | ------------ | +| kimi | 启用代理 | 关闭代理 | **关闭代理** | +| agent | 启用代理 | 关闭代理 | **启用代理** | +| copilot | 启用代理 | 关闭代理 | **启用代理** | +| claude | 启用代理 | 关闭代理 | **启用代理** | + +--- + +## 企业微信通知 + +配置环境变量 `TASK_RUNNER_WECOM_WEBHOOK` 后,任务执行会在以下时机自动推送通知: + +| 事件 | 默认 | +|------|------| +| 批次/全部完成 | ✅ | +| 任务失败 | ✅ | +| Ctrl+C 中断 | ✅ | +| 单任务成功 | ❌(需 `--notify-each`) | + +```bash +export TASK_RUNNER_WECOM_WEBHOOK="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx" +python run.py run MY_PROJECT tasks +``` + +--- + +## 执行安全机制 + +| 机制 | 说明 | +| --- | --- | +| **最短执行时间** | AI CLI 执行不足 10 秒自动标记为失败(防止空跑误标成功) | +| **单任务超时** | 默认 40 分钟,超时自动终止并标记失败,`--timeout` 可调整 | +| **任务间延时** | 默认随机等待 60-120 秒,降低触发反爬/封号风险,`--delay 0` 可关闭 | +| **PTY 色彩保留** | 使用伪终端执行,AI CLI 的彩色输出原样呈现 | +| **自动降级** | PTY 不可用时自动切换 PIPE 模式 | +| **日志全量捕获** | 终端实时输出的同时写入日志文件,同时生成去噪净化版 `.clean.log` | +| **心跳 & 标题** | 长时间运行时定期打印状态,终端标题显示任务进度 | +| **优雅中断** | 第一次 CTRL+C 优雅终止当前任务并保存状态,第二次强制退出 | +| **状态持久化** | 每个任务完成后立即更新 JSON,崩溃后可从断点续跑 | +| **原子写入** | JSON 保存使用 tmp + rename,防止写入中途断电损坏 | +| **自动备份** | 执行前自动备份 .tasks.json 文件 | +| **运行历史** | 每次运行自动记录到 __init__.json | +| **latest 软链接** | runtime/latest 始终指向最新运行目录 | +| **Git 安全** | --git-safety 执行前检查 git 状态并创建安全 tag | + +--- + +## 调试 + +```bash +DEBUG=1 python run.py run MY_PROJECT my-tasks +``` + +--- + +## Legacy 兼容(已弃用) + +旧版 `--plan` 模式仍可使用,但会显示弃用警告,将在 v4.0 移除: + +```bash +python run.py --plan plan.json --project my-fix --template prompt.md +``` diff --git a/requirements-dev.txt b/requirements-dev.txt index dc7503c..bc25521 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ # Runtime rich>=13.0.0 +certifi>=2024.8.30 # Dev / Quality ruff>=0.9.0 diff --git a/requirements.txt b/requirements.txt index da06809..9314928 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ rich>=13.0.0 +certifi>=2024.8.30 diff --git a/task_runner/cli.py b/task_runner/cli.py index 88581c1..d67224f 100644 --- a/task_runner/cli.py +++ b/task_runner/cli.py @@ -216,12 +216,49 @@ def _add_execution_options(parser): help="Random delay between tasks in seconds, e.g. '60-120' (default). " "Use '0' to disable, or a single number for fixed delay.", ) + ctrl_group.add_argument( + "--timeout", + type=int, + default=None, + metavar="SEC", + help="Max execution time per task in seconds (default: 2400 = 40min). " + "The task is killed and marked failed if it exceeds this limit.", + ) ctrl_group.add_argument( "--git-safety", action="store_true", help="Check workspace git status and create safety tag before execution", ) + # Notification + notify_group = parser.add_argument_group("notification") + notify_toggle = notify_group.add_mutually_exclusive_group() + notify_toggle.add_argument( + "--notify", + dest="notify_enabled", + action="store_true", + default=None, + help="Enable webhook notifications (default: auto-detect from env/config)", + ) + notify_toggle.add_argument( + "--no-notify", + dest="notify_enabled", + action="store_false", + help="Disable all webhook notifications", + ) + notify_group.add_argument( + "--notify-each", + action="store_true", + default=False, + help="Send a notification for every completed task (not just failures/summary)", + ) + notify_group.add_argument( + "--wecom-webhook", + default=None, + metavar="URL", + help="WeCom bot webhook URL (overrides TASK_RUNNER_WECOM_WEBHOOK env var)", + ) + # Output control output_group = parser.add_argument_group("output control") verbosity = output_group.add_mutually_exclusive_group() diff --git a/task_runner/commands/run_cmd.py b/task_runner/commands/run_cmd.py index 95163da..ec9ee26 100644 --- a/task_runner/commands/run_cmd.py +++ b/task_runner/commands/run_cmd.py @@ -182,6 +182,17 @@ def _execute(args, dry_run: bool = False) -> int: delay_range = parse_delay_range(getattr(args, "delay", None)) + # Resolve per-task timeout + from ..config import MAX_EXECUTION_SECONDS + + cli_timeout = getattr(args, "timeout", None) + max_execution_seconds = cli_timeout if cli_timeout is not None else MAX_EXECUTION_SECONDS + + # ── Notification settings ── + notify_enabled = getattr(args, "notify_enabled", None) + notify_each = getattr(args, "notify_each", False) + wecom_webhook = getattr(args, "wecom_webhook", None) + executor = TaskExecutor( project_config=config, task_set=task_set, @@ -201,6 +212,10 @@ def _execute(args, dry_run: bool = False) -> int: verbose=verbose, quiet=quiet, delay_range=delay_range, + max_execution_seconds=max_execution_seconds, + notify_enabled=notify_enabled, + notify_each=notify_each, + wecom_webhook=wecom_webhook, ) result_code = executor.run() diff --git a/task_runner/config.py b/task_runner/config.py index 62d5d97..c447573 100644 --- a/task_runner/config.py +++ b/task_runner/config.py @@ -94,6 +94,14 @@ class ToolConfig: } +# ─── Execution Limits ──────────────────────────────────────────── + +# Maximum time (in seconds) a single task is allowed to run before being +# killed and marked as failed. 40 minutes by default; override via +# ``--timeout`` CLI flag or project-level configuration. +MAX_EXECUTION_SECONDS: int = 2400 + + # ─── Proxy Environment Variables ───────────────────────────────── PROXY_ENV_KEYS = [ diff --git a/task_runner/executor.py b/task_runner/executor.py index e06f3b7..ed96470 100644 --- a/task_runner/executor.py +++ b/task_runner/executor.py @@ -11,10 +11,12 @@ - Per-task tool/model configuration """ +import argparse import atexit import contextlib import errno import json +import logging import os import random import re @@ -28,7 +30,46 @@ from datetime import datetime from pathlib import Path -from .config import PROXY_ENV_KEYS, ToolConfig, get_tool_config +from .config import MAX_EXECUTION_SECONDS, PROXY_ENV_KEYS, ToolConfig, get_tool_config +from .display import ( + SPINNER_FRAMES, + ExecutionTracker, + console, + reset_terminal_title, + set_terminal_title, + show_all_done, + show_available_models, + show_banner, + show_dry_run_skip, + show_error, + show_force_exit, + show_heartbeat, + show_info, + show_interrupt, + show_progress_bar, + show_summary, + show_task_cmd, + show_task_list, + show_task_prompt_info, + show_task_result, + show_task_running, + show_task_skip, + show_task_start, + show_tool_not_found, + show_warning, +) +from .notify import ( + build_batch_complete_message, + build_interrupt_message, + build_task_complete_message, + build_task_failure_message, + create_notifier, + send_notification_safe, +) +from .renderer import render_prompt +from .state import find_start_index, get_task_stats, load_plan, save_plan + +logger = logging.getLogger(__name__) # Any AI CLI execution completing in under this threshold is treated as a # failure (the tool almost certainly did not actually process the task). @@ -61,7 +102,7 @@ def parse_delay_range(value: str | None) -> tuple[int, int]: except ValueError: raise argparse.ArgumentTypeError( f"Invalid delay range '{value}'. Use '60-120', '30', or '0'." - ) + ) from None if lo < 0 or hi < 0: raise argparse.ArgumentTypeError("Delay values must be non-negative.") return (min(lo, hi), max(lo, hi)) @@ -71,7 +112,7 @@ def parse_delay_range(value: str | None) -> tuple[int, int]: except ValueError: raise argparse.ArgumentTypeError( f"Invalid delay value '{value}'. Use '60-120', '30', or '0'." - ) + ) from None if n < 0: raise argparse.ArgumentTypeError("Delay value must be non-negative.") return (n, n) @@ -166,41 +207,22 @@ def _extract_output_tail(clean_text: str, max_lines: int = 30) -> str: This gives users a quick view of what the AI CLI actually produced. """ - lines = [l for l in clean_text.splitlines() if l.strip()] + lines = [line for line in clean_text.splitlines() if line.strip()] tail = lines[-max_lines:] if len(lines) > max_lines else lines return "\n".join(tail) -from .display import ( - SPINNER_FRAMES, - ExecutionTracker, - console, - reset_terminal_title, - set_terminal_title, - show_all_done, - show_available_models, - show_banner, - show_delay, - show_dry_run_skip, - show_error, - show_force_exit, - show_heartbeat, - show_info, - show_interrupt, - show_progress_bar, - show_summary, - show_task_cmd, - show_task_list, - show_task_prompt_info, - show_task_result, - show_task_running, - show_task_skip, - show_task_start, - show_tool_not_found, - show_warning, -) -from .renderer import render_prompt -from .state import find_start_index, get_task_stats, load_plan, save_plan +def _fmt_elapsed_short(elapsed: float) -> str: + """Format elapsed seconds into a compact human-readable string.""" + total_secs = int(elapsed) + hours, remainder = divmod(total_secs, 3600) + mins, secs = divmod(remainder, 60) + if hours > 0: + return f"{hours}h {mins:02d}m {secs:02d}s" + elif mins > 0: + return f"{mins}m {secs:02d}s" + else: + return f"{secs}s" class TaskExecutor: @@ -230,6 +252,7 @@ def __init__(self, **kwargs): # Runtime state (shared) self.current_process: subprocess.Popen | None = None self.interrupted: bool = False + self._timed_out: bool = False self._ctrl_c_count: int = 0 self._task_needs_proxy: bool | None = None # Per-task proxy override @@ -272,6 +295,10 @@ def _init_v3(self, **kwargs): self.verbose: bool = kwargs.get("verbose", False) self.quiet: bool = kwargs.get("quiet", False) self.delay_range: tuple[int, int] = kwargs.get("delay_range", (60, 120)) + self.max_execution_seconds: int = kwargs.get("max_execution_seconds", MAX_EXECUTION_SECONDS) + self.notify_enabled: bool | None = kwargs.get("notify_enabled") + self.notify_each: bool = kwargs.get("notify_each", False) + self.wecom_webhook: str | None = kwargs.get("wecom_webhook") self.work_dir = Path(self.workspace) if self.workspace else None @@ -290,6 +317,9 @@ def _init_legacy(self, args): self.work_dir: Path | None = args.work_dir_path # type: ignore[no-redef] self.delay_range: tuple[int, int] = getattr(args, "delay_range", (60, 120)) # type: ignore[no-redef] + self.max_execution_seconds: int = getattr( # type: ignore[no-redef] + args, "max_execution_seconds", MAX_EXECUTION_SECONDS + ) self.project_dir: Path | None = None self.tasks_dir: Path | None = None self.logs_dir: Path | None = None @@ -394,6 +424,11 @@ def _setup_signals(self): signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, sig, frame): + # NOTE: Python signals are always delivered to the main thread, and + # the GIL guarantees that simple integer increments are atomic. + # We keep this handler minimal (no I/O beyond the show_* helpers + # which only write to sys.stderr) to stay within the constraints of + # signal-safety. self._ctrl_c_count += 1 self.interrupted = True @@ -434,6 +469,40 @@ def _force_kill(self): with contextlib.suppress(Exception): os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + def _timeout_kill(self) -> None: + """Kill the current child process due to timeout. + + Sends SIGTERM first, waits up to 5 seconds, then escalates to SIGKILL. + """ + proc = self.current_process + if not proc or proc.poll() is not None: + return + + try: + pgid = os.getpgid(proc.pid) + except (ProcessLookupError, OSError): + return + + show_warning( + f"Task exceeded timeout ({self.max_execution_seconds}s / " + f"{self.max_execution_seconds // 60}min) — sending SIGTERM …" + ) + + try: + os.killpg(pgid, signal.SIGTERM) + except (ProcessLookupError, OSError): + return + + # Wait up to 5 seconds for graceful exit + for _ in range(50): + if proc.poll() is not None: + return + time.sleep(0.1) + + show_warning("Process did not exit after SIGTERM, sending SIGKILL …") + with contextlib.suppress(ProcessLookupError, OSError): + os.killpg(pgid, signal.SIGKILL) + def _make_env(self) -> dict: env = os.environ.copy() @@ -441,9 +510,9 @@ def _make_env(self) -> dict: # explicit --proxy/--no-proxy always wins, # otherwise per-task tool's needs_proxy, # fallback to global self.use_proxy. - if getattr(self, 'proxy_mode', None) == "on": + if getattr(self, "proxy_mode", None) == "on": needs_proxy = True - elif getattr(self, 'proxy_mode', None) == "off": + elif getattr(self, "proxy_mode", None) == "off": needs_proxy = False elif self._task_needs_proxy is not None: needs_proxy = self._task_needs_proxy @@ -495,7 +564,7 @@ def _inter_task_delay(self, current_idx: int, remaining_tasks, *, last_success: has_more = False if isinstance(remaining_tasks, list) and len(remaining_tasks) > 0: has_more = True - elif hasattr(remaining_tasks, '__len__'): + elif hasattr(remaining_tasks, "__len__"): has_more = current_idx + 1 < len(remaining_tasks) if not has_more: return @@ -507,7 +576,7 @@ def _inter_task_delay(self, current_idx: int, remaining_tasks, *, last_success: try: if isinstance(remaining_tasks, list) and len(remaining_tasks) > 0: nxt = remaining_tasks[0] - next_label = nxt.task_no if hasattr(nxt, 'task_no') else nxt.get('task_no', '') + next_label = nxt.task_no if hasattr(nxt, "task_no") else nxt.get("task_no", "") except (IndexError, TypeError, AttributeError): pass @@ -604,12 +673,19 @@ def _execute_with_pty(self, cmd: str, log_path: Path) -> tuple[int, float]: os.close(slave_fd) start_time = time.time() + deadline = start_time + self.max_execution_seconds with open(log_path, "wb") as log_file: while True: if self.interrupted: break + # ── Timeout check ── + if time.time() >= deadline: + self._timed_out = True + self._timeout_kill() + break + try: ready, _, _ = select.select([master_fd], [], [], 0.5) except (ValueError, OSError): @@ -663,6 +739,7 @@ def _execute_with_pipe(self, cmd: str, log_path: Path) -> tuple[int, float]: ) start_time = time.time() + deadline = start_time + self.max_execution_seconds with open(log_path, "wb") as log_file: stdout = self.current_process.stdout @@ -670,6 +747,11 @@ def _execute_with_pipe(self, cmd: str, log_path: Path) -> tuple[int, float]: for line in iter(stdout.readline, b""): if self.interrupted: break + # ── Timeout check ── + if time.time() >= deadline: + self._timed_out = True + self._timeout_kill() + break os.write(sys.stdout.fileno(), line) log_file.write(line) log_file.flush() @@ -690,11 +772,20 @@ def _execute_with_pipe(self, cmd: str, log_path: Path) -> tuple[int, float]: return return_code, elapsed def execute_task(self, cmd: str, log_path: Path) -> tuple[int, float]: + self._timed_out = False try: return self._execute_with_pty(cmd, log_path) except Exception as e: - show_warning(f"PTY mode failed ({e}), falling back to PIPE mode") + logger.info("PTY mode failed (%s), falling back to PIPE mode", e) + show_warning( + f"PTY mode unavailable ({type(e).__name__}: {e}), " + f"falling back to PIPE mode. " + f"Output may lose colours / formatting." + ) return self._execute_with_pipe(cmd, log_path) + finally: + # Issue #5b: unified subprocess cleanup — ensure no zombie remains + self._ensure_child_cleaned_up() @staticmethod def _drain_fd(fd: int, log_file): @@ -712,6 +803,38 @@ def _drain_fd(fd: int, log_file): except OSError: break + def _ensure_child_cleaned_up(self) -> None: + """Final safety net: make sure the child process is dead and reaped. + + Called from the ``finally`` block of ``execute_task()`` to handle edge + cases such as PTY EOF arriving before the process exits, an exception + during PIPE reading, or any other unexpected early return. + """ + proc = self.current_process + if proc is None: + return + + if proc.poll() is None: + # Still running — send SIGTERM → wait 5s → SIGKILL + try: + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signal.SIGTERM) + except (ProcessLookupError, OSError): + pass + else: + for _ in range(50): + if proc.poll() is not None: + break + time.sleep(0.1) + if proc.poll() is None: + with contextlib.suppress(ProcessLookupError, OSError): + os.killpg(pgid, signal.SIGKILL) + # Final reap + with contextlib.suppress(Exception): + proc.wait(timeout=5) + + self.current_process = None + # ─── Log Sanitization ──────────────────────────────────────── @staticmethod @@ -791,6 +914,15 @@ def _run_v3(self) -> int: self._setup_signals() + # ── Initialize notifier ── + notify_enabled = self.notify_enabled + if notify_enabled is None: + notify_enabled = True # default on when webhook is configured + notifier = create_notifier( + webhook_url=self.wecom_webhook, + enabled=notify_enabled if notify_enabled is not False else False, + ) + project_dir = get_project_dir(self.project_config.project) tasks = self.scheduled_tasks total = len(tasks) @@ -940,8 +1072,81 @@ def _run_v3(self) -> int: if self.interrupted: task.status = "interrupted" save_task_set(self.task_set, project_dir) + + # Send interrupt notification + completed_count = sum(1 for t in all_tasks if t.status == "completed") + send_notification_safe( + notifier, + build_interrupt_message( + project=self.project_config.project, + task_set=self.task_set.name, + current_task_no=task_no, + current_task_name=task.task_name, + completed=completed_count, + total=len(all_tasks), + ), + ) break + # ── Timeout handling ── + if self._timed_out: + show_warning( + f"Task {task_no} timed out after " + f"{self.max_execution_seconds}s " + f"({self.max_execution_seconds // 60}min) — marking as FAILED." + ) + task.status = "failed" + task.elapsed_seconds = round(elapsed, 1) + task.last_run_at = datetime.now().isoformat() + save_task_set(self.task_set, project_dir) + failed += 1 + + # Sanitize log even for timed-out tasks + clean_log, clean_text = self._sanitize_log(log_path) + output_tail = _extract_output_tail(clean_text) + rel_log = str(log_path.relative_to(project_dir)) + if clean_log: + rel_log = str(clean_log.relative_to(project_dir)) + show_task_result(task_no, False, elapsed, rel_log, output_tail) + + if self._tracker: + self._tracker.record_result(task_no, task.task_name, False, elapsed) + + self._task_results.append( + { + "task_no": task_no, + "status": "failed", + "duration_seconds": round(elapsed, 1), + "return_code": return_code, + "failure_reason": "timeout", + "log_file": f"logs/{task_no.replace('/', '_').replace(chr(92), '_')}.log", + } + ) + save_live_status( + self.run_context, None, dict(self._results), list(self._task_results) + ) + + # Send failure notification + send_notification_safe( + notifier, + build_task_failure_message( + project=self.project_config.project, + task_set=self.task_set.name, + task_no=task_no, + task_name=task.task_name, + failure_reason=f"超时 ({self.max_execution_seconds // 60}min)", + elapsed=_fmt_elapsed_short(elapsed), + tool=task_tool_config.name, + model=task_model, + return_code=return_code, + output_tail=output_tail, + log_file=rel_log, + ), + ) + + # Continue to next task (don't delay — no point after a timeout) + continue + # Record result success = return_code == 0 @@ -977,11 +1182,69 @@ def _run_v3(self) -> int: show_task_result(task_no, success, elapsed, rel_clean, output_tail) else: show_task_result(task_no, success, elapsed, rel_log, output_tail) + notify_log_file = rel_clean if clean_log else rel_log # Record to tracker if self._tracker: self._tracker.record_result(task_no, task.task_name, success, elapsed) + # Send notification on failure or (opt-in) on each task + if not success: + failure_reason = ( + f"exit code {return_code}" + if elapsed >= MIN_EXECUTION_SECONDS + else f"执行过快 ({elapsed:.1f}s < {MIN_EXECUTION_SECONDS}s)" + ) + send_notification_safe( + notifier, + build_task_failure_message( + project=self.project_config.project, + task_set=self.task_set.name, + task_no=task_no, + task_name=task.task_name, + failure_reason=failure_reason, + elapsed=_fmt_elapsed_short(elapsed), + tool=task_tool_config.name, + model=task_model, + return_code=return_code, + output_tail=output_tail, + log_file=notify_log_file, + ), + ) + elif success and self.notify_each: + next_task = next((t for t in tasks[idx + 1 :] if t.status != "completed"), None) + next_tool_name: str | None = None + next_model_name: str | None = None + if next_task: + next_tool_name = self.tool_config.name + next_model_name = self.model + if next_task.cli.tool and not self.cli_tool_override: + next_tool_name = next_task.cli.tool + if next_task.cli.model and not self.cli_model_override: + next_model_name = next_task.cli.model + + send_notification_safe( + notifier, + build_task_complete_message( + project=self.project_config.project, + task_set=self.task_set.name, + task_no=task_no, + task_name=task.task_name, + elapsed=_fmt_elapsed_short(elapsed), + tool=task_tool_config.name, + model=task_model, + return_code=return_code, + progress_done=succeeded + failed, + progress_total=total, + output_tail=output_tail, + log_file=notify_log_file, + next_task_no=next_task.task_no if next_task else None, + next_task_name=next_task.task_name if next_task else None, + next_tool=next_tool_name, + next_model=next_model_name, + ), + ) + # Track result self._task_results.append( { @@ -997,7 +1260,7 @@ def _run_v3(self) -> int: save_live_status(self.run_context, None, dict(self._results), list(self._task_results)) # Random delay between tasks (anti-rate-limit) - self._inter_task_delay(idx, tasks[idx + 1:], last_success=success) + self._inter_task_delay(idx, tasks[idx + 1 :], last_success=success) # Cleanup reset_terminal_title() @@ -1023,6 +1286,29 @@ def _run_v3(self) -> int: task_results=self._task_results, ) + # ── Batch completion notification ── + if notifier and (succeeded + failed) > 0: + run_start_str = datetime.fromtimestamp(run_start).strftime("%H:%M:%S") + run_end_str = datetime.now().strftime("%H:%M:%S") + failed_task_details = [r for r in self._task_results if r.get("status") == "failed"] + send_notification_safe( + notifier, + build_batch_complete_message( + project=self.project_config.project, + task_set=self.task_set.name, + start_time=run_start_str, + end_time=run_end_str, + duration=_fmt_elapsed_short(total_elapsed), + succeeded=succeeded, + failed=failed, + skipped=skipped, + total=len(all_tasks), + total_done=total_done, + interrupted=self.interrupted, + failed_tasks=failed_task_details, + ), + ) + return 0 if failed == 0 and not self.interrupted else 1 # ─── Legacy Mode ───────────────────────────────────────────── @@ -1156,6 +1442,29 @@ def _run_legacy(self) -> int: save_plan(self.plan_path, plan) break + # ── Timeout handling (legacy) ── + if self._timed_out: + show_warning( + f"Task {task_no} timed out after " + f"{self.max_execution_seconds}s " + f"({self.max_execution_seconds // 60}min) — marking as FAILED." + ) + task["status"] = "failed" + failed += 1 + save_plan(self.plan_path, plan) + + clean_log, clean_text = self._sanitize_log(log_path) + output_tail = _extract_output_tail(clean_text) + if clean_log: + rel_log = str(clean_log.relative_to(self.plan_path.parent)) + else: + rel_log = str(log_path.relative_to(self.plan_path.parent)) + show_task_result(task_no, False, elapsed, rel_log, output_tail) + + current_done = sum(1 for t in tasks if t.get("status") == "completed") + show_progress_bar(current_done, total) + continue + success = return_code == 0 # Guard: AI CLI finishing in < MIN_EXECUTION_SECONDS is bogus @@ -1190,7 +1499,7 @@ def _run_legacy(self) -> int: show_progress_bar(current_done, total) # Random delay between tasks (anti-rate-limit) - remaining_tasks = [t for t in tasks[idx + 1:] if t.get("status") != "completed"] + remaining_tasks = [t for t in tasks[idx + 1 :] if t.get("status") != "completed"] self._inter_task_delay(idx, remaining_tasks, last_success=success) reset_terminal_title() diff --git a/task_runner/notify.py b/task_runner/notify.py new file mode 100644 index 0000000..149e0f4 --- /dev/null +++ b/task_runner/notify.py @@ -0,0 +1,478 @@ +""" +Notification system for Auto Task Runner v3.0. + +Supports sending notifications on batch completion, task failure, and +interruption events via configurable webhook integrations. + +Currently implemented: + - WeCom (企业微信) — markdown_v2 format via group bot webhook + +Future extensions (abstract interface ready): + - DingTalk (钉钉) + - Feishu / Lark (飞书) + +Design principles: + - Zero external dependencies — uses only ``urllib.request`` + - Notification failures NEVER block task execution + - All network I/O runs with a 10-second timeout + - Message content is truncated to stay within API limits +""" + +from __future__ import annotations + +import json +import logging +import os +import ssl +import urllib.error +import urllib.request +from abc import ABC, abstractmethod +from datetime import datetime + +logger = logging.getLogger(__name__) + +try: + import certifi +except Exception: # pragma: no cover - optional dependency + certifi = None + +# ─── Constants ──────────────────────────────────────────────────── + +ENV_WECOM_WEBHOOK = "TASK_RUNNER_WECOM_WEBHOOK" +ENV_NOTIFY_ENABLED = "TASK_RUNNER_NOTIFY_ENABLED" + +WECOM_MAX_CONTENT_BYTES = 4096 +WECOM_SEND_TIMEOUT = 10 + + +# ─── Abstract Base ─────────────────────────────────────────────── + + +class Notifier(ABC): + """Abstract notification channel. + + Subclass this to add DingTalk, Feishu, or any other webhook-based + notification backend. + """ + + @abstractmethod + def send_markdown(self, content: str) -> bool: + """Send a markdown-formatted message. + + Args: + content: Markdown text to send. + + Returns: + True if the message was delivered successfully. + """ + ... + + @abstractmethod + def name(self) -> str: + """Human-readable name of this notifier (e.g. 'WeCom').""" + ... + + +# ─── WeCom (企业微信) ──────────────────────────────────────────── + + +class WeComNotifier(Notifier): + """企业微信群机器人 webhook 通知. + + Uses the ``markdown_v2`` message type which supports richer formatting + (tables, code blocks, ordered lists) compared to the legacy ``markdown`` + type. Content is capped at 4096 UTF-8 bytes per API requirement. + """ + + def __init__(self, webhook_url: str): + if not webhook_url: + raise ValueError("WeCom webhook URL must not be empty") + self._webhook_url = webhook_url + + def name(self) -> str: + return "WeCom" + + def send_markdown(self, content: str) -> bool: + safe_content = truncate_utf8(content, WECOM_MAX_CONTENT_BYTES) + payload = { + "msgtype": "markdown_v2", + "markdown_v2": {"content": safe_content}, + } + return self._post(payload) + + def _post(self, payload: dict) -> bool: + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = urllib.request.Request( + self._webhook_url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + ssl_context = _build_ssl_context() + opener = urllib.request.build_opener( + urllib.request.ProxyHandler({}), + urllib.request.HTTPSHandler(context=ssl_context), + ) + with opener.open(req, timeout=WECOM_SEND_TIMEOUT) as resp: + body = json.loads(resp.read()) + if body.get("errcode") != 0: + logger.warning( + "WeCom API returned error: errcode=%s errmsg=%s", + body.get("errcode"), + body.get("errmsg"), + ) + return False + return True + except (urllib.error.URLError, OSError, json.JSONDecodeError, ValueError) as exc: + logger.warning("WeCom notification failed: %s", exc) + return False + + +# ─── Notifier Factory ──────────────────────────────────────────── + + +def create_notifier( + webhook_url: str | None = None, + enabled: bool | None = None, +) -> Notifier | None: + """Create a notifier from explicit args or environment variables. + + Resolution order for webhook URL: + 1. Explicit ``webhook_url`` argument + 2. ``TASK_RUNNER_WECOM_WEBHOOK`` environment variable + + The notifier is disabled (returns None) when: + - ``enabled`` is explicitly False + - ``TASK_RUNNER_NOTIFY_ENABLED`` is ``"false"`` / ``"0"`` + - No webhook URL is available + """ + if enabled is False: + return None + + env_enabled = os.environ.get(ENV_NOTIFY_ENABLED, "").strip().lower() + if env_enabled in ("false", "0", "no", "off"): + return None + + url = webhook_url or os.environ.get(ENV_WECOM_WEBHOOK, "").strip() + if not url: + return None + + return WeComNotifier(url) + + +# ─── Message Builders ──────────────────────────────────────────── + + +def build_batch_complete_message( + *, + project: str, + task_set: str, + start_time: str, + end_time: str, + duration: str, + succeeded: int, + failed: int, + skipped: int, + total: int, + total_done: int, + interrupted: bool, + failed_tasks: list[dict] | None = None, +) -> str: + """Build a markdown_v2 message for batch / full completion.""" + if interrupted: + title = "🤖 轻易云自动机器人|执行中断" + elif failed > 0: + title = "🤖 轻易云自动机器人|执行完成(有失败)" + else: + title = "🤖 轻易云自动机器人|全部完成" + + lines: list[str] = [] + lines.append(f"## {title}") + lines.append("") + overall_status = "⚠️ 已中断" if interrupted else ("❌ 存在失败" if failed > 0 else "✅ 全部成功") + lines.append(f"**总体状态:** {overall_status}") + lines.append(f"**项目:** {project}") + lines.append(f"**任务集:** {task_set}") + lines.append(f"**执行时间:** {start_time} ~ {end_time} ({duration})") + lines.append("") + + lines.append("### 执行结果") + lines.append("") + lines.append("| 状态 | 数量 |") + lines.append("| :--- | ---: |") + lines.append(f"| 成功 | {succeeded} |") + lines.append(f"| 失败 | {failed} |") + lines.append(f"| 跳过 | {skipped} |") + lines.append(f"| 总进度 | {total_done}/{total} |") + + if failed_tasks: + lines.append("") + lines.append("### 失败任务") + lines.append("") + for ft in failed_tasks: + task_no = ft.get("task_no", "?") + reason = ft.get("failure_reason", "exit code != 0") + dur = ft.get("duration_seconds", 0) + dur_str = _format_duration(dur) + lines.append(f"- **{task_no}** — {reason} ({dur_str})") + + if interrupted: + lines.append("") + lines.append("> 执行被用户中断 (Ctrl+C)") + + lines.append("") + lines.append("> 播报来源:轻易云自动机器人") + + return "\n".join(lines) + + +def build_task_failure_message( + *, + project: str, + task_set: str, + task_no: str, + task_name: str, + failure_reason: str, + elapsed: str, + tool: str | None = None, + model: str | None = None, + return_code: int | None = None, + output_tail: str | None = None, + log_file: str | None = None, +) -> str: + """Build a markdown_v2 message for a single task failure.""" + lines: list[str] = [] + lines.append("## 🤖 轻易云自动机器人|任务失败") + lines.append("") + lines.append("**状态:** ❌ 失败") + lines.append(f"**项目:** {project} / {task_set}") + lines.append(f"**任务:** {task_no} {task_name}") + if tool: + lines.append(f"**执行工具:** {tool}") + if model: + lines.append(f"**模型:** {model}") + if return_code is not None: + lines.append(f"**退出码:** {return_code}") + lines.append(f"**失败原因:** {failure_reason}") + lines.append(f"**耗时:** {elapsed}") + if log_file: + lines.append(f"**日志文件:** {log_file}") + + suffix_lines = ["", "> 播报来源:轻易云自动机器人"] + compact_output = _fit_output_by_budget(lines, output_tail, suffix_lines) + if compact_output: + lines.append("") + lines.append("### 最终结果输出") + lines.append(compact_output) + + lines.extend(suffix_lines) + return "\n".join(lines) + + +def build_interrupt_message( + *, + project: str, + task_set: str, + current_task_no: str, + current_task_name: str, + completed: int, + total: int, +) -> str: + """Build a markdown_v2 message for execution interruption (Ctrl+C).""" + now_str = datetime.now().strftime("%H:%M:%S") + lines: list[str] = [] + lines.append("## 🤖 轻易云自动机器人|执行中断") + lines.append("") + lines.append("**状态:** ⚠️ 中断") + lines.append(f"**项目:** {project} / {task_set}") + lines.append(f"**中断时间:** {now_str}") + lines.append(f"**当前任务:** {current_task_no} {current_task_name}") + lines.append(f"**已完成:** {completed}/{total}") + lines.append("") + lines.append("> 播报来源:轻易云自动机器人") + return "\n".join(lines) + + +def build_task_complete_message( + *, + project: str, + task_set: str, + task_no: str, + task_name: str, + elapsed: str, + tool: str | None = None, + model: str | None = None, + return_code: int | None = None, + progress_done: int | None = None, + progress_total: int | None = None, + output_tail: str | None = None, + log_file: str | None = None, + next_task_no: str | None = None, + next_task_name: str | None = None, + next_tool: str | None = None, + next_model: str | None = None, +) -> str: + """Build a markdown_v2 message for a single task success (opt-in via --notify-each).""" + lines: list[str] = [] + lines.append("## 🤖 轻易云自动机器人|任务完成") + lines.append("") + lines.append("**状态:** ✅ 成功") + lines.append(f"**项目:** {project} / {task_set}") + lines.append(f"**任务:** {task_no} {task_name}") + if tool: + lines.append(f"**执行工具:** {tool}") + if model: + lines.append(f"**模型:** {model}") + if return_code is not None: + lines.append(f"**退出码:** {return_code}") + lines.append(f"**耗时:** {elapsed}") + if progress_done is not None and progress_total: + pct = (progress_done / progress_total) * 100 + lines.append(f"**当前进度:** {progress_done}/{progress_total} ({pct:.1f}%)") + if log_file: + lines.append(f"**日志文件:** {log_file}") + + suffix_lines: list[str] = [""] + if next_task_no and next_task_name: + suffix_lines.append("### 下一任务预告") + suffix_lines.append(f"- {next_task_no} {next_task_name}") + if next_tool: + suffix_lines.append(f"- 工具: {next_tool}") + if next_model: + suffix_lines.append(f"- 模型: {next_model}") + else: + suffix_lines.append("### 下一任务预告") + suffix_lines.append("- 当前任务集已无待执行任务") + + suffix_lines.append("") + suffix_lines.append("> 播报来源:轻易云自动机器人") + + compact_output = _fit_output_by_budget(lines, output_tail, suffix_lines) + if compact_output: + lines.append("") + lines.append("### 最终结果输出") + lines.append(compact_output) + + lines.extend(suffix_lines) + return "\n".join(lines) + + +# ─── Helpers ───────────────────────────────────────────────────── + + +def truncate_utf8(text: str, max_bytes: int) -> str: + """Truncate *text* so its UTF-8 encoding is at most *max_bytes* bytes. + + If truncation is necessary, a trailing ``\\n\\n> (内容已截断)`` note is + appended, and the main body is shortened to make room. + """ + encoded = text.encode("utf-8") + if len(encoded) <= max_bytes: + return text + + suffix = "\n\n> (内容已截断)" + suffix_bytes = len(suffix.encode("utf-8")) + target = max_bytes - suffix_bytes + + if target <= 0: + return suffix.strip() + + # Truncate at a valid UTF-8 char boundary + truncated = encoded[:target].decode("utf-8", errors="ignore") + return truncated + suffix + + +def _format_duration(seconds: float) -> str: + total_secs = int(seconds) + hours, remainder = divmod(total_secs, 3600) + mins, secs = divmod(remainder, 60) + if hours > 0: + return f"{hours}h {mins:02d}m {secs:02d}s" + elif mins > 0: + return f"{mins}m {secs:02d}s" + else: + return f"{secs}s" + + +def _compact_result_text(text: str | None, *, max_lines: int = 10, max_chars: int = 700) -> str: + if not text: + return "" + normalized = text.strip() + if not normalized: + return "" + lines = normalized.splitlines() + if len(lines) > max_lines: + lines = lines[-max_lines:] + compact = "\n".join(lines) + if len(compact) > max_chars: + compact = compact[-max_chars:] + compact = f"...(截断)\n{compact}" + return compact + + +def _fit_output_by_budget( + prefix_lines: list[str], + output_tail: str | None, + suffix_lines: list[str], + *, + target_bytes: int = WECOM_MAX_CONTENT_BYTES, +) -> str: + """Fit output snippet within byte budget while keeping key fields visible. + + Strategy: + 1) Reserve space for prefix + suffix + wrapper markdown. + 2) Iteratively shrink output (lines/chars/bytes) until total fits. + 3) Fall back to empty output when budget is insufficient. + """ + if not output_tail or not output_tail.strip(): + return "" + + wrapper = ["\n### 最终结果输出", ""] + base_bytes = len("\n".join(prefix_lines + wrapper + suffix_lines).encode("utf-8")) + available = target_bytes - base_bytes - 96 + if available <= 80: + return "" + + max_lines = 18 + max_chars = min(2200, max(300, available * 2)) + + for _ in range(8): + compact = _compact_result_text(output_tail, max_lines=max_lines, max_chars=max_chars) + compact_bytes = len(compact.encode("utf-8")) + if compact_bytes <= available: + return compact + max_lines = max(3, max_lines - 2) + max_chars = max(120, int(max_chars * 0.7)) + + return _compact_result_text(output_tail, max_lines=3, max_chars=120) + + +def _build_ssl_context() -> ssl.SSLContext: + """Build SSL context for webhook calls. + + Preference order: + 1) certifi CA bundle (if installed) + 2) system default trust store + """ + if certifi is not None: + return ssl.create_default_context(cafile=certifi.where()) + return ssl.create_default_context() + + +def send_notification_safe(notifier: Notifier | None, content: str) -> None: + """Fire-and-forget: send a notification, swallowing all exceptions. + + This is the entry-point that the executor should call — it guarantees + that notification failures never propagate into the task execution flow. + """ + if notifier is None: + return + try: + ok = notifier.send_markdown(content) + if ok: + logger.info("Notification sent via %s", notifier.name()) + else: + logger.warning("Notification via %s returned failure", notifier.name()) + except Exception as exc: + logger.warning("Notification via %s raised exception: %s", notifier.name(), exc) diff --git a/task_runner/runtime.py b/task_runner/runtime.py index 1482117..f4cab8d 100644 --- a/task_runner/runtime.py +++ b/task_runner/runtime.py @@ -94,9 +94,11 @@ def save_run_metadata(ctx: RunContext): "tasks_to_execute": ctx.tasks_to_execute, } run_json = ctx.run_dir / "run.json" - with open(run_json, "w", encoding="utf-8") as f: + tmp_path = run_json.with_suffix(".json.tmp") + with open(tmp_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) f.write("\n") + tmp_path.replace(run_json) def save_run_summary(ctx: RunContext, results: dict, task_results: list[dict]): @@ -115,9 +117,11 @@ def save_run_summary(ctx: RunContext, results: dict, task_results: list[dict]): } summary_json = ctx.run_dir / "summary.json" - with open(summary_json, "w", encoding="utf-8") as f: + tmp_path = summary_json.with_suffix(".json.tmp") + with open(tmp_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) f.write("\n") + tmp_path.replace(summary_json) def save_live_status( diff --git a/tests/test_notify.py b/tests/test_notify.py new file mode 100644 index 0000000..f21602e --- /dev/null +++ b/tests/test_notify.py @@ -0,0 +1,545 @@ +"""Tests for task_runner.notify module.""" + +import json +import os +import urllib.error +from unittest.mock import MagicMock, patch + +import pytest + +from task_runner.notify import ( + ENV_NOTIFY_ENABLED, + ENV_WECOM_WEBHOOK, + WECOM_MAX_CONTENT_BYTES, + Notifier, + WeComNotifier, + build_batch_complete_message, + build_interrupt_message, + build_task_complete_message, + build_task_failure_message, + create_notifier, + send_notification_safe, + truncate_utf8, +) + +FAKE_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=test-key" + + +# ─── Helper ────────────────────────────────────────────────────── + + +def _make_urlopen_response(body: dict, status: int = 200) -> MagicMock: + """Build a mock that behaves like ``urllib.request.urlopen()`` return value.""" + data = json.dumps(body).encode("utf-8") + resp = MagicMock() + resp.read.return_value = data + resp.status = status + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + return resp + + +# ─── truncate_utf8 ─────────────────────────────────────────────── + + +class TestTruncateUtf8: + def test_no_truncation_needed(self): + text = "hello world" + assert truncate_utf8(text, 100) == text + + def test_exact_boundary(self): + text = "abc" + assert truncate_utf8(text, 3) == text + + def test_truncates_long_text(self): + text = "a" * 5000 + result = truncate_utf8(text, 4096) + assert len(result.encode("utf-8")) <= 4096 + assert result.endswith("(内容已截断)") + + def test_preserves_utf8_char_boundary(self): + text = "中" * 2000 # each '中' is 3 bytes → 6000 bytes total + result = truncate_utf8(text, 4096) + encoded = result.encode("utf-8") + assert len(encoded) <= 4096 + # Should not produce invalid UTF-8 + encoded.decode("utf-8") + + def test_very_small_limit(self): + text = "hello world" + result = truncate_utf8(text, 10) + # When limit is too small for body + suffix, we get just the stripped suffix + suffix_full = "\n\n> (内容已截断)" + assert len(result.encode("utf-8")) <= max(10, len(suffix_full.encode("utf-8"))) + + def test_empty_text(self): + assert truncate_utf8("", 100) == "" + + def test_mixed_ascii_and_unicode(self): + text = "Status: 成功 " * 500 + result = truncate_utf8(text, 4096) + assert len(result.encode("utf-8")) <= 4096 + + +# ─── WeComNotifier ─────────────────────────────────────────────── + + +class TestWeComNotifier: + def test_init_empty_url_raises(self): + with pytest.raises(ValueError, match="must not be empty"): + WeComNotifier("") + + def test_name(self): + n = WeComNotifier(FAKE_WEBHOOK) + assert n.name() == "WeCom" + + @patch("task_runner.notify.urllib.request.build_opener") + def test_send_markdown_success(self, mock_build_opener): + mock_opener = MagicMock() + mock_opener.open.return_value = _make_urlopen_response({"errcode": 0, "errmsg": "ok"}) + mock_build_opener.return_value = mock_opener + n = WeComNotifier(FAKE_WEBHOOK) + assert n.send_markdown("## test") is True + + # Verify the request was built correctly + call_args = mock_opener.open.call_args + req = call_args[0][0] + assert req.get_method() == "POST" + assert req.get_header("Content-type") == "application/json" + + payload = json.loads(req.data.decode("utf-8")) + assert payload["msgtype"] == "markdown_v2" + assert payload["markdown_v2"]["content"] == "## test" + + @patch("task_runner.notify.urllib.request.build_opener") + def test_send_markdown_api_error(self, mock_build_opener): + mock_opener = MagicMock() + mock_opener.open.return_value = _make_urlopen_response( + {"errcode": 93000, "errmsg": "invalid webhook url"} + ) + mock_build_opener.return_value = mock_opener + n = WeComNotifier(FAKE_WEBHOOK) + assert n.send_markdown("test") is False + + @patch("task_runner.notify.urllib.request.build_opener") + def test_send_markdown_network_error(self, mock_build_opener): + mock_opener = MagicMock() + mock_opener.open.side_effect = urllib.error.URLError("timeout") + mock_build_opener.return_value = mock_opener + n = WeComNotifier(FAKE_WEBHOOK) + assert n.send_markdown("test") is False + + @patch("task_runner.notify.urllib.request.build_opener") + def test_send_markdown_truncates_long_content(self, mock_build_opener): + mock_opener = MagicMock() + mock_opener.open.return_value = _make_urlopen_response({"errcode": 0, "errmsg": "ok"}) + mock_build_opener.return_value = mock_opener + n = WeComNotifier(FAKE_WEBHOOK) + long_content = "x" * 10000 + n.send_markdown(long_content) + + call_args = mock_opener.open.call_args + req = call_args[0][0] + payload = json.loads(req.data.decode("utf-8")) + content = payload["markdown_v2"]["content"] + assert len(content.encode("utf-8")) <= WECOM_MAX_CONTENT_BYTES + + @patch("task_runner.notify.urllib.request.build_opener") + def test_send_markdown_json_decode_error(self, mock_build_opener): + resp = MagicMock() + resp.read.return_value = b"not json" + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + mock_opener = MagicMock() + mock_opener.open.return_value = resp + mock_build_opener.return_value = mock_opener + n = WeComNotifier(FAKE_WEBHOOK) + assert n.send_markdown("test") is False + + @patch("task_runner.notify.urllib.request.HTTPSHandler") + @patch("task_runner.notify.urllib.request.build_opener") + @patch("task_runner.notify.urllib.request.ProxyHandler") + def test_send_markdown_always_no_proxy( + self, + mock_proxy_handler, + mock_build_opener, + mock_https_handler, + ): + proxy_handler_obj = object() + mock_proxy_handler.return_value = proxy_handler_obj + https_handler_obj = object() + mock_https_handler.return_value = https_handler_obj + + mock_opener = MagicMock() + mock_opener.open.return_value = _make_urlopen_response({"errcode": 0, "errmsg": "ok"}) + mock_build_opener.return_value = mock_opener + + n = WeComNotifier(FAKE_WEBHOOK) + assert n.send_markdown("test") is True + + mock_proxy_handler.assert_called_once_with({}) + mock_https_handler.assert_called_once() + mock_build_opener.assert_called_once_with(proxy_handler_obj, https_handler_obj) + + +# ─── create_notifier ───────────────────────────────────────────── + + +class TestCreateNotifier: + def test_explicit_url(self): + n = create_notifier(webhook_url=FAKE_WEBHOOK) + assert isinstance(n, WeComNotifier) + + def test_env_url(self): + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK}): + n = create_notifier() + assert isinstance(n, WeComNotifier) + + def test_no_url_returns_none(self): + with patch.dict(os.environ, {}, clear=True): + n = create_notifier() + assert n is None + + def test_explicit_disabled(self): + n = create_notifier(webhook_url=FAKE_WEBHOOK, enabled=False) + assert n is None + + def test_env_disabled_false(self): + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK, ENV_NOTIFY_ENABLED: "false"}): + n = create_notifier() + assert n is None + + def test_env_disabled_zero(self): + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK, ENV_NOTIFY_ENABLED: "0"}): + n = create_notifier() + assert n is None + + def test_env_disabled_off(self): + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK, ENV_NOTIFY_ENABLED: "off"}): + n = create_notifier() + assert n is None + + def test_env_enabled_true(self): + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK, ENV_NOTIFY_ENABLED: "true"}): + n = create_notifier() + assert isinstance(n, WeComNotifier) + + def test_explicit_url_overrides_env(self): + other_url = "https://example.com/other" + with patch.dict(os.environ, {ENV_WECOM_WEBHOOK: FAKE_WEBHOOK}): + n = create_notifier(webhook_url=other_url) + assert isinstance(n, WeComNotifier) + assert n._webhook_url == other_url + + +# ─── send_notification_safe ────────────────────────────────────── + + +class TestSendNotificationSafe: + def test_none_notifier(self): + # Should not raise + send_notification_safe(None, "test") + + def test_success(self): + mock = MagicMock(spec=Notifier) + mock.send_markdown.return_value = True + mock.name.return_value = "TestBot" + send_notification_safe(mock, "hello") + mock.send_markdown.assert_called_once_with("hello") + + def test_failure_does_not_raise(self): + mock = MagicMock(spec=Notifier) + mock.send_markdown.return_value = False + mock.name.return_value = "TestBot" + send_notification_safe(mock, "hello") + + def test_exception_does_not_raise(self): + mock = MagicMock(spec=Notifier) + mock.send_markdown.side_effect = RuntimeError("boom") + mock.name.return_value = "TestBot" + # Must NOT propagate exception + send_notification_safe(mock, "hello") + + +# ─── Message Builders ─────────────────────────────────────────── + + +class TestBuildBatchCompleteMessage: + def test_all_success(self): + msg = build_batch_complete_message( + project="MY_PROJECT", + task_set="my-tasks", + start_time="14:30:05", + end_time="16:45:30", + duration="2h 15m 25s", + succeeded=12, + failed=0, + skipped=1, + total=15, + total_done=13, + interrupted=False, + ) + assert "全部完成" in msg + assert "MY_PROJECT" in msg + assert "my-tasks" in msg + assert "14:30:05" in msg + assert "16:45:30" in msg + assert "12" in msg + + def test_with_failures(self): + msg = build_batch_complete_message( + project="P", + task_set="ts", + start_time="10:00:00", + end_time="11:00:00", + duration="1h 00m 00s", + succeeded=8, + failed=2, + skipped=0, + total=10, + total_done=8, + interrupted=False, + failed_tasks=[ + {"task_no": "M18", "failure_reason": "timeout", "duration_seconds": 2400}, + {"task_no": "M22", "failure_reason": "exit code 1", "duration_seconds": 120}, + ], + ) + assert "有失败" in msg + assert "M18" in msg + assert "timeout" in msg + assert "M22" in msg + + def test_interrupted(self): + msg = build_batch_complete_message( + project="P", + task_set="ts", + start_time="10:00:00", + end_time="10:30:00", + duration="30m 00s", + succeeded=5, + failed=0, + skipped=0, + total=10, + total_done=5, + interrupted=True, + ) + assert "中断" in msg + assert "Ctrl+C" in msg + + def test_fits_in_4096_bytes(self): + many_failures = [ + {"task_no": f"T-{i}", "failure_reason": "timeout", "duration_seconds": 2400} + for i in range(200) + ] + msg = build_batch_complete_message( + project="P", + task_set="ts", + start_time="10:00:00", + end_time="11:00:00", + duration="1h", + succeeded=0, + failed=200, + skipped=0, + total=200, + total_done=0, + interrupted=False, + failed_tasks=many_failures, + ) + # The message builder itself doesn't truncate — that's WeComNotifier's job. + # But it should still produce valid markdown. + assert "失败任务" in msg + + +class TestBuildTaskFailureMessage: + def test_basic(self): + msg = build_task_failure_message( + project="P", + task_set="ts", + task_no="M18", + task_name="物料建模", + failure_reason="timeout", + elapsed="40m 00s", + ) + assert "任务失败" in msg + assert "M18" in msg + assert "物料建模" in msg + assert "timeout" in msg + assert "40m 00s" in msg + + def test_rich_details(self): + msg = build_task_failure_message( + project="P", + task_set="ts", + task_no="M18", + task_name="物料建模", + failure_reason="exit code 1", + elapsed="3m 20s", + tool="copilot", + model="claude-opus-4.6", + return_code=1, + output_tail="Traceback...", + log_file="runtime/20260228/logs/M18.log", + ) + assert "❌" in msg + assert "copilot" in msg + assert "claude-opus-4.6" in msg + assert "退出码" in msg + assert "最终结果输出" in msg + assert "M18.log" in msg + + +class TestBuildInterruptMessage: + def test_basic(self): + msg = build_interrupt_message( + project="P", + task_set="ts", + current_task_no="M18", + current_task_name="物料建模", + completed=5, + total=10, + ) + assert "中断" in msg + assert "M18" in msg + assert "5/10" in msg + + +class TestBuildTaskCompleteMessage: + def test_basic(self): + msg = build_task_complete_message( + project="P", + task_set="ts", + task_no="M10", + task_name="会计要素建模", + elapsed="4m 18s", + ) + assert "任务完成" in msg + assert "M10" in msg + assert "4m 18s" in msg + + def test_rich_details_and_next_task_preview(self): + msg = build_task_complete_message( + project="P", + task_set="ts", + task_no="M10", + task_name="会计要素建模", + elapsed="4m 18s", + tool="copilot", + model="claude-opus-4.6", + return_code=0, + progress_done=3, + progress_total=10, + output_tail="All checks passed", + log_file="runtime/20260228/logs/M10.log", + next_task_no="M11", + next_task_name="科目映射重构", + next_tool="agent", + next_model="opus-4.6", + ) + assert "✅" in msg + assert "copilot" in msg + assert "claude-opus-4.6" in msg + assert "3/10" in msg + assert "最终结果输出" in msg + assert "下一任务预告" in msg + assert "M11" in msg + assert "opus-4.6" in msg + + def test_long_output_keeps_preview_and_under_wecom_limit(self): + long_output = "\n".join(f"line-{i}: " + ("x" * 120) for i in range(500)) + msg = build_task_complete_message( + project="P", + task_set="ts", + task_no="M99", + task_name="超长输出测试", + elapsed="9m 12s", + tool="copilot", + model="claude-opus-4.6", + return_code=0, + progress_done=9, + progress_total=10, + output_tail=long_output, + log_file="runtime/20260228/logs/M99.log", + next_task_no="M100", + next_task_name="收尾任务", + next_tool="agent", + next_model="opus-4.6", + ) + assert "下一任务预告" in msg + assert "M100" in msg + assert "最终结果输出" in msg + assert len(msg.encode("utf-8")) <= WECOM_MAX_CONTENT_BYTES + + +# ─── Integration test with real webhook ────────────────────────── + + +class TestWeComIntegration: + """Integration test using the real webhook. + + Only runs when TASK_RUNNER_WECOM_WEBHOOK is set. + Sends a real test message to the configured WeCom group. + """ + + @pytest.fixture + def webhook_url(self): + url = os.environ.get(ENV_WECOM_WEBHOOK, "").strip() + if not url: + pytest.skip("TASK_RUNNER_WECOM_WEBHOOK not set") + return url + + def test_real_send_markdown_v2(self, webhook_url): + """Send a real test notification to verify the webhook works.""" + n = WeComNotifier(webhook_url) + content = build_batch_complete_message( + project="AUTO_TEST", + task_set="unit-test", + start_time="10:00:00", + end_time="10:05:00", + duration="5m 00s", + succeeded=3, + failed=1, + skipped=1, + total=5, + total_done=3, + interrupted=False, + failed_tasks=[ + {"task_no": "T-2", "failure_reason": "exit code 1", "duration_seconds": 65}, + ], + ) + result = n.send_markdown(content) + assert result is True, "Real WeCom webhook send failed" + + def test_real_send_truncated(self, webhook_url): + """Verify truncation works with the real API.""" + n = WeComNotifier(webhook_url) + # Build a message that exceeds 4096 bytes + lines = ["## Truncation Test\n"] + lines.append("| No | Task | Status |") + lines.append("| :--- | :--- | :--- |") + for i in range(300): + lines.append(f"| T-{i:03d} | 测试任务名称很长很长 {i} | 成功 |") + content = "\n".join(lines) + assert len(content.encode("utf-8")) > WECOM_MAX_CONTENT_BYTES + result = n.send_markdown(content) + assert result is True, "Truncated message send failed" + + +# ─── Notifier abstract interface ───────────────────────────────── + + +class TestNotifierInterface: + def test_cannot_instantiate_abstract(self): + with pytest.raises(TypeError): + Notifier() # type: ignore[abstract] + + def test_concrete_subclass(self): + class DummyNotifier(Notifier): + def send_markdown(self, content: str) -> bool: + return True + + def name(self) -> str: + return "Dummy" + + n = DummyNotifier() + assert n.send_markdown("test") is True + assert n.name() == "Dummy"