Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e792fac6c0 | |||
| c31ddd0bb3 | |||
| 884848f15f | |||
| 9be264250a | |||
| 682dedb4b4 |
@@ -20,7 +20,7 @@ jobs:
|
||||
run: pip install -r requirements.txt
|
||||
|
||||
- name: Run tests
|
||||
run: python -m pytest tests/ -v
|
||||
run: python -m pytest -v
|
||||
|
||||
- name: Create issue on failure
|
||||
if: failure()
|
||||
|
||||
+96
-11
@@ -83,7 +83,7 @@ python scripts/agent_poller.py --action get --issue N
|
||||
1. git pull origin main
|
||||
2. git checkout -b dev/issue-N-<slug>
|
||||
3. 修改功能代码 + 更新/补充 UT 和接口集成测试
|
||||
4. python -m pytest tests/ -v # 本地全量测试
|
||||
4. python -m pytest -v # 本地全量测试
|
||||
5. git commit -m "fix: <描述> - Closes #N"
|
||||
6. git push origin dev/issue-N-<slug>
|
||||
```
|
||||
@@ -94,14 +94,68 @@ python scripts/agent_poller.py --action get --issue N
|
||||
- 关注 IR 一致性:对同一输入的多次运行结果应尽量稳定
|
||||
- 关注功能覆盖率:确保 IR 覆盖了输入文档中的功能点
|
||||
|
||||
### 4. 等待 CI
|
||||
### 4. 提交 PR
|
||||
|
||||
Push 后 CI 自动运行。可通过 Gitea Actions 页面或 `agent_poller.py` 查看状态。
|
||||
Push 后立即用 `agent_poller.py` 创建 PR:
|
||||
|
||||
### 5. 处理结果
|
||||
```bash
|
||||
python scripts/agent_poller.py --action create-pr \
|
||||
--issue N --branch dev/issue-N-<slug> \
|
||||
--body "## Summary
|
||||
- <改动摘要>
|
||||
|
||||
- **CI 通过**:创建 PR 合并到 main(或直接 push 到 main),`Closes #N` 自动关闭 Issue
|
||||
- **CI 失败**:CI 自动创建新 Issue,分析失败原因,进入下一轮修复
|
||||
## Test
|
||||
- [x] pytest 全量通过 (XX passed, Y skipped)
|
||||
- [x] UT / 集成测试已更新
|
||||
|
||||
Closes #N"
|
||||
```
|
||||
|
||||
PR 创建后,在 Issue 下评论 PR 链接:
|
||||
|
||||
```bash
|
||||
python scripts/agent_poller.py --action comment --issue N \
|
||||
--body "PR 已创建: <PR_URL>
|
||||
|
||||
变更:
|
||||
- <摘要>
|
||||
|
||||
等待 CI 通过后 merge。"
|
||||
```
|
||||
|
||||
### 5. 等待 CI
|
||||
|
||||
PR 创建后 CI 自动触发。用 agent_poller 监控状态:
|
||||
|
||||
```bash
|
||||
python scripts/agent_poller.py --action pr-status --pr <PR_NUM>
|
||||
```
|
||||
|
||||
### 6. Merge & 关闭
|
||||
|
||||
CI 通过后,执行 merge 并关闭 Issue:
|
||||
|
||||
```bash
|
||||
# Merge PR(会自动检查 CI 状态)
|
||||
python scripts/agent_poller.py --action merge-pr --pr <PR_NUM>
|
||||
|
||||
# 如果 Issue 未被自动关闭,手动关闭
|
||||
python scripts/agent_poller.py --action close-issue --issue N \
|
||||
--body "PR #<NUM> merged. 变更已合入 main."
|
||||
```
|
||||
|
||||
**一键查看完整生命周期:**
|
||||
```bash
|
||||
python scripts/agent_poller.py --action lifecycle --issue N
|
||||
```
|
||||
|
||||
### 7. CI 失败处理
|
||||
|
||||
CI 失败时 Gitea 自动创建 `ci-failure` Issue:
|
||||
1. `agent_poller.py --action get --issue <NEW_NUM>` 分析失败原因
|
||||
2. 在修复分支上修改代码,`git commit --amend` 或新 commit
|
||||
3. `git push origin dev/issue-N-<slug>` 触发 CI 重跑
|
||||
4. 重复步骤 5-6 直到 CI 通过
|
||||
|
||||
## 闭环
|
||||
|
||||
@@ -110,16 +164,47 @@ QE-Agent 开 Issue (qe-feedback)
|
||||
↓
|
||||
Dev-Agent 分析 → 开发/重构 → 更新测试
|
||||
↓
|
||||
git push → CI (lint + pytest + acceptance)
|
||||
git push → create-pr → CI (pytest)
|
||||
↓
|
||||
┌─ 失败 → 自动开 Issue → 回到开头
|
||||
┌─ 失败 → 自动开 Issue → push 修复 → 回到 CI
|
||||
│
|
||||
└─ 成功 → Issue 关闭 → QE-Agent 验证 → 新反馈
|
||||
└─ 成功 → merge-pr → close-issue → QE-Agent 验证 → 新反馈
|
||||
```
|
||||
|
||||
## 提交规范
|
||||
|
||||
- **格式**:`fix: <简短描述> - Closes #N` 或 `feat: <描述> - Closes #N`
|
||||
- **粒度**:一个 commit 对应一个 Issue
|
||||
- **测试**:每次提交必须确保 `pytest tests/ -v` 全量通过
|
||||
- **粒度**:一个 Issue → 一个分支 → 一个 PR → 一个 commit
|
||||
- **测试**:每次提交前必须确保 `python -m pytest -v` 全量通过
|
||||
- **范围**:不混入与当前 Issue 无关的改动
|
||||
- **PR**:Push 后立即创建 PR,CI 通过后 merge,PR 信息写入 Issue 后关闭
|
||||
|
||||
## agent_poller 命令速查
|
||||
|
||||
| 命令 | 用途 | 阶段 |
|
||||
|------|------|------|
|
||||
| `--action list` | 列出所有待处理 Issue | 1. 轮询 |
|
||||
| `--action get --issue N` | 查看 Issue 详情 | 2. 分析 |
|
||||
| `--action create-pr --issue N --branch X --body "..."` | 创建 PR | 4. 提 PR |
|
||||
| `--action comment --issue N --body "..."` | 评论 Issue(记录 PR 链接等) | 4. 提 PR |
|
||||
| `--action pr-status --pr N` | 查看 PR + CI 状态 | 5. 等 CI |
|
||||
| `--action merge-pr --pr N` | Merge PR(自动检查 CI) | 6. Merge |
|
||||
| `--action close-issue --issue N --body "..."` | 手动关闭 Issue | 6. 关闭 |
|
||||
| `--action lifecycle --issue N` | 查看 Issue 完整生命周期 | 随时 |
|
||||
|
||||
## 闭环完成检查清单
|
||||
|
||||
处理每个 Issue 时,确认以下节点全部完成:
|
||||
|
||||
- [ ] **分析**:`agent_poller.py --action get` 理解 Issue 内容
|
||||
- [ ] **分支**:`git checkout -b dev/issue-N-<slug>`
|
||||
- [ ] **开发**:修改功能代码 + 同步更新 UT
|
||||
- [ ] **测试**:`python -m pytest -v` 全量通过
|
||||
- [ ] **提交**:`git commit -m "fix: <描述> - Closes #N"`
|
||||
- [ ] **推送**:`git push origin dev/issue-N-<slug>`
|
||||
- [ ] **PR**:`agent_poller.py --action create-pr` 创建 PR
|
||||
- [ ] **评论**:`agent_poller.py --action comment` 在 Issue 下记录 PR 链接
|
||||
- [ ] **CI**:`agent_poller.py --action pr-status` 确认 CI 通过
|
||||
- [ ] **合并**:`agent_poller.py --action merge-pr` 合并 PR
|
||||
- [ ] **关闭**:确认 Issue 已自动关闭,否则 `--action close-issue`
|
||||
- [ ] **验证**:`agent_poller.py --action lifecycle` 确认全流程完成
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
[pytest]
|
||||
testpaths = tests skills/ir_generation_skill/tests
|
||||
python_files = test_*.py
|
||||
pythonpath = .
|
||||
+142
-9
@@ -1,10 +1,14 @@
|
||||
"""Helper for dev agent to interact with Gitea issues.
|
||||
"""Helper for dev agent to interact with Gitea issues and PRs.
|
||||
|
||||
Usage:
|
||||
python scripts/agent_poller.py --action list
|
||||
python scripts/agent_poller.py --action get --issue 1
|
||||
python scripts/agent_poller.py --action comment --issue 1 --body "Working on this"
|
||||
python scripts/agent_poller.py --action create-pr --issue 1 --branch fix/issue-1
|
||||
python scripts/agent_poller.py --action pr-status --pr 4
|
||||
python scripts/agent_poller.py --action merge-pr --pr 4
|
||||
python scripts/agent_poller.py --action close-issue --issue 2 --body "Done"
|
||||
python scripts/agent_poller.py --action lifecycle --issue 2
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -19,7 +23,6 @@ GITEA_REPO = os.environ.get("GITEA_REPO", "pzhang_zywl/document_analyzer")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_API_TOKEN", "")
|
||||
|
||||
BASE = f"{GITEA_URL}/api/v1/repos/{GITEA_REPO}"
|
||||
TARGET_LABELS = set() # List all issues, Dev-Agent handles all non-test issues
|
||||
|
||||
|
||||
def _req(method, path, data=None):
|
||||
@@ -30,13 +33,18 @@ def _req(method, path, data=None):
|
||||
req.add_header("Content-Type", "application/json")
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
return json.loads(resp.read())
|
||||
raw = resp.read()
|
||||
if not raw:
|
||||
return {} # Gitea merge returns 200 with empty body
|
||||
return json.loads(raw)
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode()
|
||||
print(f"API Error {e.code}: {body}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ── Issue operations ─────────────────────────────────────────────────────────
|
||||
|
||||
def list_issues():
|
||||
issues = _req("GET", "/issues?state=open")
|
||||
if not issues:
|
||||
@@ -65,10 +73,22 @@ def comment_issue(num, body):
|
||||
return i
|
||||
|
||||
|
||||
def create_pr(issue_num, branch):
|
||||
# Get issue title for PR description
|
||||
def close_issue(num, body=None):
|
||||
"""Close an issue, optionally with a final comment."""
|
||||
if body:
|
||||
comment_issue(num, body)
|
||||
i = _req("PATCH", f"/issues/{num}", {"state": "closed"})
|
||||
print(f"Issue #{num} closed")
|
||||
return i
|
||||
|
||||
|
||||
# ── PR operations ────────────────────────────────────────────────────────────
|
||||
|
||||
def create_pr(issue_num, branch, body=None):
|
||||
"""Create a PR for the given issue and branch."""
|
||||
issue = _req("GET", f"/issues/{issue_num}")
|
||||
title = f"Fix #{issue_num}: {issue['title'].replace('CI Failure: ', '')}"
|
||||
title = f"fix: {issue['title']} - Closes #{issue_num}"
|
||||
if body is None:
|
||||
body = f"Closes #{issue_num}\n\n{issue.get('body', '')}\n\n🤖 Generated by dev agent"
|
||||
pr = _req("POST", "/pulls", {
|
||||
"title": title,
|
||||
@@ -76,15 +96,108 @@ def create_pr(issue_num, branch):
|
||||
"base": "main",
|
||||
"body": body,
|
||||
})
|
||||
print(f"PR created: {pr.get('html_url', pr.get('url', ''))}")
|
||||
pr_url = pr.get('html_url', pr.get('url', ''))
|
||||
print(f"PR created: {pr_url}")
|
||||
return pr
|
||||
|
||||
|
||||
def pr_status(pr_num):
|
||||
"""Check PR state and CI status."""
|
||||
pr = _req("GET", f"/pulls/{pr_num}")
|
||||
print(f"PR #{pr['number']}: {pr['title']}")
|
||||
print(f"State: {pr['state']}")
|
||||
print(f"Merged: {pr.get('merged', False)}")
|
||||
print(f"Mergeable: {pr.get('mergeable', 'unknown')}")
|
||||
print(f"URL: {pr['html_url']}")
|
||||
|
||||
# CI status
|
||||
sha = pr.get("head", {}).get("sha", "")
|
||||
if sha:
|
||||
try:
|
||||
status = _req("GET", f"/commits/{sha}/status")
|
||||
print(f"CI Status: {status.get('state', 'pending')}")
|
||||
for s in status.get('statuses', []):
|
||||
desc = s.get('description', '')
|
||||
print(f" - {s.get('context')}: {s.get('state')} ({desc})")
|
||||
except SystemExit:
|
||||
print("CI Status: no statuses found")
|
||||
return pr
|
||||
|
||||
|
||||
def merge_pr(pr_num):
|
||||
"""Merge a PR. Fails if CI hasn't passed or PR is not mergeable."""
|
||||
pr = _req("GET", f"/pulls/{pr_num}")
|
||||
|
||||
if pr.get("state") == "closed":
|
||||
if pr.get("merged"):
|
||||
print(f"PR #{pr_num} already merged")
|
||||
return pr
|
||||
else:
|
||||
print(f"PR #{pr_num} is closed (not merged)", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Check CI
|
||||
sha = pr.get("head", {}).get("sha", "")
|
||||
ci_passed = True
|
||||
if sha:
|
||||
try:
|
||||
status = _req("GET", f"/commits/{sha}/status")
|
||||
ci_state = status.get("state", "pending")
|
||||
if ci_state in ("failure", "error"):
|
||||
ci_passed = False
|
||||
print(f"CI status: {ci_state} — cannot merge", file=sys.stderr)
|
||||
for s in status.get('statuses', []):
|
||||
print(f" {s.get('context')}: {s.get('state')}", file=sys.stderr)
|
||||
except SystemExit:
|
||||
pass # No statuses, proceed
|
||||
|
||||
if not ci_passed:
|
||||
print("Merge blocked: CI has not passed", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = _req("POST", f"/pulls/{pr_num}/merge", {"Do": "merge"})
|
||||
# Verify merge success by re-checking PR state
|
||||
pr_after = _req("GET", f"/pulls/{pr_num}")
|
||||
if pr_after.get("merged"):
|
||||
print(f"PR #{pr_num} merged successfully")
|
||||
elif result.get("merged"):
|
||||
print(f"PR #{pr_num} merged successfully")
|
||||
else:
|
||||
print(f"Merge result: {result.get('message', 'unknown')}")
|
||||
return result
|
||||
|
||||
|
||||
# ── Lifecycle management ─────────────────────────────────────────────────────
|
||||
|
||||
def lifecycle(issue_num):
|
||||
"""Print the full lifecycle status for an issue: branch, PR, CI, merge."""
|
||||
print(f"=== Issue #{issue_num} Lifecycle ===\n")
|
||||
|
||||
issue = _req("GET", f"/issues/{issue_num}")
|
||||
print(f"Issue: {issue['title']}")
|
||||
print(f"State: {issue['state']}\n")
|
||||
|
||||
# Find associated PRs
|
||||
prs = _req("GET", "/pulls?state=all")
|
||||
related = [p for p in prs if f"Closes #{issue_num}" in p.get("body", "")
|
||||
or f"#{issue_num}" in p.get("title", "")]
|
||||
if related:
|
||||
for pr in related:
|
||||
print(f"PR #{pr['number']}: {pr['state']} (merged={pr.get('merged', False)})")
|
||||
pr_status(pr["number"])
|
||||
else:
|
||||
print("No associated PR found.")
|
||||
|
||||
|
||||
# ── CLI ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Dev agent Gitea helper")
|
||||
parser.add_argument("--action", required=True,
|
||||
choices=["list", "get", "comment", "create-pr"])
|
||||
choices=["list", "get", "comment", "close-issue",
|
||||
"create-pr", "pr-status", "merge-pr", "lifecycle"])
|
||||
parser.add_argument("--issue", type=int)
|
||||
parser.add_argument("--pr", type=int)
|
||||
parser.add_argument("--branch")
|
||||
parser.add_argument("--body")
|
||||
args = parser.parse_args()
|
||||
@@ -106,11 +219,31 @@ def main():
|
||||
print("--issue and --body are required for 'comment' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
comment_issue(args.issue, args.body)
|
||||
elif args.action == "close-issue":
|
||||
if not args.issue:
|
||||
print("--issue is required for 'close-issue' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
close_issue(args.issue, args.body)
|
||||
elif args.action == "create-pr":
|
||||
if not args.issue or not args.branch:
|
||||
print("--issue and --branch are required for 'create-pr' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
create_pr(args.issue, args.branch)
|
||||
create_pr(args.issue, args.branch, args.body)
|
||||
elif args.action == "pr-status":
|
||||
if not args.pr:
|
||||
print("--pr is required for 'pr-status' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
pr_status(args.pr)
|
||||
elif args.action == "merge-pr":
|
||||
if not args.pr:
|
||||
print("--pr is required for 'merge-pr' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
merge_pr(args.pr)
|
||||
elif args.action == "lifecycle":
|
||||
if not args.issue:
|
||||
print("--issue is required for 'lifecycle' action", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
lifecycle(args.issue)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -26,19 +26,20 @@ case "$MODE" in
|
||||
echo ""
|
||||
echo "正在执行单次检查..."
|
||||
claude -p --agent agents/DEV_AGENT.md \
|
||||
"你是 Dev-Agent,检查 Gitea 所有打开的 Issue,跳过纯测试相关的,其他全部领取分析并修复,记得同步更新测试。"
|
||||
"你是 Dev-Agent。检查 Gitea 所有打开的 Issue(--action list),跳过纯测试相关的。对每个负责的 Issue,走完完整闭环:分析 → 分支 → 开发+UT → pytest → commit → push → create-pr → comment Issue → 等 CI → merge-pr → 关闭。"
|
||||
;;
|
||||
2)
|
||||
echo ""
|
||||
echo "启动持续轮询模式 (每 10 分钟)..."
|
||||
echo "按 Ctrl+C 停止"
|
||||
claude -p --agent agents/DEV_AGENT.md \
|
||||
"你是 Dev-Agent,用 loop 模式每 10 分钟检查一次 Gitea 所有打开的 Issue,跳过纯测试相关的,其他全部领取处理。完成后评论进度,push 触发 CI。"
|
||||
"你是 Dev-Agent。用 loop 模式每 10 分钟检查一次 Gitea Issue(--action list)。跳过纯测试相关的。每个 Issue 走完整闭环:分析→开发→push→create-pr→comment→CI→merge-pr→close。每个步骤用 agent_poller.py 对应命令。"
|
||||
;;
|
||||
3)
|
||||
echo ""
|
||||
echo "启动交互模式..."
|
||||
echo "进入后输入: 检查 Gitea Issues 并处理"
|
||||
echo "可用命令速查: agent_poller.py --help"
|
||||
claude --agent agents/DEV_AGENT.md
|
||||
;;
|
||||
*)
|
||||
|
||||
@@ -10,15 +10,22 @@ import yaml
|
||||
# ---- Paths ----
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
WORKSPACE_DIR = os.path.dirname(BASE_DIR)
|
||||
PROJECT_ROOT = os.path.dirname(WORKSPACE_DIR)
|
||||
PROJECT_OUTPUT = os.path.join(PROJECT_ROOT, "output")
|
||||
|
||||
# Subdirectories under PROJECT_OUTPUT
|
||||
IR_OUTPUT = os.path.join(PROJECT_OUTPUT, "ir")
|
||||
FINAL_OUTPUT = os.path.join(PROJECT_OUTPUT, "final")
|
||||
|
||||
# Legacy paths (maintained for doc_parser integration)
|
||||
DOC_PARSER_OUTPUT = os.path.join(WORKSPACE_DIR, "doc_parser_skill", "output")
|
||||
PROMPTS_DIR = os.path.join(BASE_DIR, "prompts")
|
||||
TESTS_DIR = os.path.join(BASE_DIR, "tests")
|
||||
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
|
||||
OUTPUT_DIR = IR_OUTPUT # backward compatibility alias
|
||||
|
||||
# Input file (the parsed PRD JSON)
|
||||
_DEFAULT_INPUT = os.path.join(
|
||||
DOC_PARSER_OUTPUT,
|
||||
"车机娱乐系统禁止功能文档_脱敏 v0.9_v2_updated.json",
|
||||
PROJECT_OUTPUT, "车机娱乐系统禁止功能文档_脱敏 v0.9_v2_updated.json",
|
||||
)
|
||||
INPUT_JSON = os.environ.get("IR_INPUT_JSON", _DEFAULT_INPUT)
|
||||
|
||||
@@ -35,18 +42,18 @@ SECRETS_YAML = os.path.join(
|
||||
OPENCLAW_HOME, "workspace-document-analyzer", "config", "secrets.yaml",
|
||||
)
|
||||
|
||||
# Intermediate outputs
|
||||
SEMANTIC_INDEX_R1_JSON = os.path.join(OUTPUT_DIR, "semantic_index_r1.json")
|
||||
SEMANTIC_INDEX_R2_JSON = os.path.join(OUTPUT_DIR, "semantic_index_r2.json")
|
||||
SEMANTIC_INDEX_R3_JSON = os.path.join(OUTPUT_DIR, "semantic_index_r3.json")
|
||||
SEMANTIC_INDEX_JSON = os.path.join(OUTPUT_DIR, "semantic_index.json") # merged final
|
||||
IR_FRAGMENTS_JSON = os.path.join(OUTPUT_DIR, "ir_fragments.json")
|
||||
PATH_ENUM_JSON = os.path.join(OUTPUT_DIR, "path_enumeration.json")
|
||||
IR_AUTOCOMPLETE_FRAGMENTS_JSON = os.path.join(OUTPUT_DIR, "ir_autocomplete_fragments.json")
|
||||
# Intermediate outputs (all under PROJECT_OUTPUT/ir/)
|
||||
SEMANTIC_INDEX_R1_JSON = os.path.join(IR_OUTPUT, "semantic_index_r1.json")
|
||||
SEMANTIC_INDEX_R2_JSON = os.path.join(IR_OUTPUT, "semantic_index_r2.json")
|
||||
SEMANTIC_INDEX_R3_JSON = os.path.join(IR_OUTPUT, "semantic_index_r3.json")
|
||||
SEMANTIC_INDEX_JSON = os.path.join(IR_OUTPUT, "semantic_index.json")
|
||||
IR_FRAGMENTS_JSON = os.path.join(IR_OUTPUT, "ir_fragments.json")
|
||||
PATH_ENUM_JSON = os.path.join(IR_OUTPUT, "path_enumeration.json")
|
||||
IR_AUTOCOMPLETE_FRAGMENTS_JSON = os.path.join(IR_OUTPUT, "ir_autocomplete_fragments.json")
|
||||
|
||||
# Final deliverables (placed in doc_parser output per spec)
|
||||
IR_FINAL_JSON = os.path.join(DOC_PARSER_OUTPUT, "ir_final.json")
|
||||
IR_AUDIT_REPORT_MD = os.path.join(DOC_PARSER_OUTPUT, "ir_audit_report.md")
|
||||
# Final deliverables (under PROJECT_OUTPUT/final/)
|
||||
IR_FINAL_JSON = os.path.join(FINAL_OUTPUT, "ir_final.json")
|
||||
IR_AUDIT_REPORT_MD = os.path.join(FINAL_OUTPUT, "ir_audit_report.md")
|
||||
|
||||
# ---- LLM API ----
|
||||
# Choose provider: "deepseek" | "dashscope"
|
||||
|
||||
@@ -365,6 +365,97 @@ def run_all_tests():
|
||||
return total_failures == 0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# pytest discovery support — skips gracefully when output files are absent
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
|
||||
def _load_si_and_doc():
|
||||
"""Try to load semantic_index.json and the input document. Returns (si, doc) or (None, None)."""
|
||||
try:
|
||||
si = config.load_json(config.SEMANTIC_INDEX_JSON)
|
||||
doc = config.load_input_document()
|
||||
return si, doc
|
||||
except FileNotFoundError:
|
||||
return None, None
|
||||
|
||||
|
||||
def test_step1_unit_ids():
|
||||
"""pytest: verify all function_units have valid unit_id and name."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found — run step1_semantic_index.py first")
|
||||
units = si.get("function_units", [])
|
||||
errors = check_unit_ids(units)
|
||||
assert not errors, f"unit_id/name errors: {errors}"
|
||||
|
||||
|
||||
def test_step1_path_fields():
|
||||
"""pytest: verify all function_units have non-empty path arrays."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
units = si.get("function_units", [])
|
||||
errors = check_unit_paths(units)
|
||||
assert not errors, f"path field errors: {errors}"
|
||||
|
||||
|
||||
def test_step1_concept_parents():
|
||||
"""pytest: verify concept parent references are valid."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
concepts = si.get("concepts", [])
|
||||
errors = check_concept_parents(concepts)
|
||||
assert not errors, f"concept parent errors: {errors}"
|
||||
|
||||
|
||||
def test_step1_sources_exist():
|
||||
"""pytest: verify all source references point to real content."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
units = si.get("function_units", [])
|
||||
image_index = build_image_index(doc)
|
||||
node_index = build_logic_tree_node_index(doc)
|
||||
errors = check_sources_exist(units, image_index, node_index)
|
||||
assert not errors, f"source reference errors: {errors}"
|
||||
|
||||
|
||||
def test_step1_logic_tree_coverage():
|
||||
"""pytest: verify decision/action nodes in logic trees are covered (warnings only)."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
units = si.get("function_units", [])
|
||||
node_index = build_logic_tree_node_index(doc)
|
||||
warnings = check_logic_tree_coverage(units, node_index)
|
||||
# Warnings are informational, not failures — but report them
|
||||
if warnings:
|
||||
print(f"\n[WARN] Logic tree coverage warnings: {warnings}")
|
||||
|
||||
|
||||
def test_step1_ensemble_confidence():
|
||||
"""pytest: verify function_units have confidence/ensemble_support/source_versions."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
units = si.get("function_units", [])
|
||||
errors = check_ensemble_confidence(units)
|
||||
assert not errors, f"ensemble confidence errors: {errors}"
|
||||
|
||||
|
||||
def test_step1_confidence_summary():
|
||||
"""pytest: verify confidence_summary counts match actual unit/concept counts."""
|
||||
si, doc = _load_si_and_doc()
|
||||
if si is None:
|
||||
pytest.skip("semantic_index.json not found")
|
||||
errors = check_confidence_summary(si)
|
||||
assert not errors, f"confidence_summary errors: {errors}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
@@ -317,6 +317,93 @@ def run_all_tests():
|
||||
return total_failures == 0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# pytest discovery support
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
|
||||
def _load_fragments_or_skip():
|
||||
"""Load ir_fragments.json or return None."""
|
||||
try:
|
||||
return config.load_json(config.IR_FRAGMENTS_JSON)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
|
||||
def test_step2_non_empty_rules():
|
||||
"""pytest: every fragment must have at least one rule."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found — run step2_ir_extraction.py first")
|
||||
errors = check_non_empty_rules(fragments)
|
||||
assert not errors, f"non-empty rule errors: {errors}"
|
||||
|
||||
|
||||
def test_step2_rule_paths():
|
||||
"""pytest: every rule must have a non-empty path array."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_rule_paths(fragments)
|
||||
assert not errors, f"rule path errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step2_precondition_fields():
|
||||
"""pytest: every rule must have precondition with geographic_scope and screen_type."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_precondition_fields(fragments)
|
||||
assert not errors, f"precondition errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step2_user_interaction_content():
|
||||
"""pytest: user_interaction actions must have non-empty, non-placeholder content."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_user_interaction_content(fragments)
|
||||
assert not errors, f"user_interaction content errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step2_sources_have_refs():
|
||||
"""pytest: every rule should reference at least one source."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_sources_have_logic_tree_nodes(fragments)
|
||||
assert not errors, f"source reference errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step2_trigger_conditions():
|
||||
"""pytest: every trigger condition must have signal, operator, value."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_trigger_conditions(fragments)
|
||||
assert not errors, f"trigger condition errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step2_duplicate_rule_ids():
|
||||
"""pytest: no duplicate rule_ids across all fragments."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_duplicate_rule_ids(fragments)
|
||||
assert not errors, f"duplicate rule_id errors: {errors}"
|
||||
|
||||
|
||||
def test_step2_action_types():
|
||||
"""pytest: all actions must have valid types."""
|
||||
fragments = _load_fragments_or_skip()
|
||||
if fragments is None:
|
||||
pytest.skip("ir_fragments.json not found")
|
||||
errors = check_action_types(fragments)
|
||||
assert not errors, f"action type errors: {errors[:5]}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
@@ -147,6 +147,32 @@ def run_all_tests():
|
||||
return total_failures == 0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# pytest discovery support
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
|
||||
def test_step2_5_path_enumeration():
|
||||
"""pytest: path_enumeration.json should have valid structure."""
|
||||
try:
|
||||
path_data = config.load_json(config.PATH_ENUM_JSON)
|
||||
except FileNotFoundError:
|
||||
pytest.skip("path_enumeration.json not found — run step2_5_branch_coverage.py first")
|
||||
errors = check_path_enumeration(path_data)
|
||||
assert not errors, f"path enumeration errors: {errors}"
|
||||
|
||||
|
||||
def test_step2_5_autocomplete_fragments():
|
||||
"""pytest: auto-complete fragments must have valid structure if present."""
|
||||
fragments = load_autocomplete_fragments()
|
||||
errors = check_autocomplete_fragments(fragments)
|
||||
# If fragments is None (no autocomplete needed), check_autocomplete_fragments returns a warning
|
||||
actual_errors = [e for e in errors if "未生成" not in e]
|
||||
assert not actual_errors, f"autocomplete fragment errors: {actual_errors}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
@@ -227,6 +227,77 @@ def run_all_tests():
|
||||
return total_failures == 0
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# pytest discovery support
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
|
||||
def _load_ir_final_or_skip():
|
||||
"""Load ir_final.json or return None."""
|
||||
try:
|
||||
return config.load_json(config.IR_FINAL_JSON)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
|
||||
def _load_audit_report_or_skip():
|
||||
"""Load ir_audit_report.md or return None."""
|
||||
try:
|
||||
with open(config.IR_AUDIT_REPORT_MD, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
|
||||
def test_step3_top_level_structure():
|
||||
"""pytest: ir_final must have required top-level fields."""
|
||||
ir = _load_ir_final_or_skip()
|
||||
if ir is None:
|
||||
pytest.skip("ir_final.json not found — run step3_merge_and_audit.py first")
|
||||
errors = check_top_level_structure(ir)
|
||||
assert not errors, f"top-level structure errors: {errors}"
|
||||
|
||||
|
||||
def test_step3_rule_ids():
|
||||
"""pytest: rule_ids must be unique and follow naming convention."""
|
||||
ir = _load_ir_final_or_skip()
|
||||
if ir is None:
|
||||
pytest.skip("ir_final.json not found")
|
||||
errors = check_rule_ids(ir)
|
||||
assert not errors, f"rule_id errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step3_rule_paths():
|
||||
"""pytest: every rule must have a non-empty path array."""
|
||||
ir = _load_ir_final_or_skip()
|
||||
if ir is None:
|
||||
pytest.skip("ir_final.json not found")
|
||||
rules = ir.get("rules", [])
|
||||
errors = check_rule_paths(rules)
|
||||
assert not errors, f"rule path errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step3_rule_completeness():
|
||||
"""pytest: each rule must have all required fields."""
|
||||
ir = _load_ir_final_or_skip()
|
||||
if ir is None:
|
||||
pytest.skip("ir_final.json not found")
|
||||
rules = ir.get("rules", [])
|
||||
errors = check_rule_completeness(rules)
|
||||
assert not errors, f"rule completeness errors: {errors[:5]}"
|
||||
|
||||
|
||||
def test_step3_audit_report():
|
||||
"""pytest: audit report must have all required sections."""
|
||||
report = _load_audit_report_or_skip()
|
||||
if report is None:
|
||||
pytest.skip("ir_audit_report.md not found — run step3_merge_and_audit.py first")
|
||||
errors = check_audit_report(report)
|
||||
assert not errors, f"audit report errors: {errors[:5]}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
@@ -115,10 +115,7 @@ def ir_path(request) -> str:
|
||||
path = (
|
||||
request.config.getoption("--ir-path")
|
||||
or os.environ.get("TEST_IR_PATH")
|
||||
or str(
|
||||
Path.home()
|
||||
/ ".openclaw/workspace/skills/doc_parser_skill/output/ir_final.json"
|
||||
)
|
||||
or str(_PROJECT_ROOT / "output" / "final" / "ir_final.json")
|
||||
)
|
||||
if not os.path.exists(path):
|
||||
pytest.skip(f"IR file not found: {path}")
|
||||
@@ -139,8 +136,7 @@ def parsed_path(request) -> str | None:
|
||||
request.config.getoption("--parsed-path")
|
||||
or os.environ.get("TEST_PARSED_PATH")
|
||||
or str(
|
||||
_PROJECT_ROOT
|
||||
/ "skills/ir_generation_skill/车机娱乐系统禁止功能文档_精简_updated.json"
|
||||
_PROJECT_ROOT / "output" / "车机娱乐系统禁止功能文档_精简_updated.json"
|
||||
)
|
||||
)
|
||||
if os.path.exists(path):
|
||||
@@ -258,21 +254,27 @@ def acceptance_runs(request) -> int:
|
||||
def run_ir_pipeline():
|
||||
"""Return a callable that runs the IR generation pipeline on a parsed JSON.
|
||||
|
||||
Returns None if the pipeline script is not available in the current environment.
|
||||
This is common when the acceptance tests run on pre-generated IR output.
|
||||
|
||||
Usage::
|
||||
|
||||
ir_data, ir_path = run_ir_pipeline(parsed_json_path, output_dir)
|
||||
runner = run_ir_pipeline()
|
||||
if runner:
|
||||
ir_data, ir_path = runner(parsed_json_path, output_dir)
|
||||
"""
|
||||
sys.path.insert(0, _skill_path("ir_generation_skill"))
|
||||
ir_gen_path = (
|
||||
_PROJECT_ROOT / "skills" / "ir_generation_skill" / "scripts" / "ir_generator.py"
|
||||
)
|
||||
if not ir_gen_path.exists():
|
||||
return None
|
||||
|
||||
sys.path.insert(0, str(ir_gen_path.parent))
|
||||
from ir_generator import generate_ir
|
||||
|
||||
def _run(parsed_path: str, output_dir: str | None = None) -> tuple[dict, str]:
|
||||
"""Run IR generation and return (ir_data, ir_path)."""
|
||||
def _run(parsed_path: str, output_dir: str | None = None) -> tuple[list, str]:
|
||||
out = output_dir or tempfile.mkdtemp(prefix="qe_acceptance_")
|
||||
result = generate_ir(parsed_path, out, dry_run=False)
|
||||
ir_list = result.get("ir", [])
|
||||
ir_path = result.get("path", "")
|
||||
# ir_generator produces a list; wrap to match rich format expectations
|
||||
# for schema validation we accept both formats
|
||||
return ir_list, ir_path
|
||||
return result.get("ir", []), result.get("path", "")
|
||||
|
||||
return _run
|
||||
|
||||
@@ -57,18 +57,28 @@ def test_layer_a_schema(ir_data: dict, request):
|
||||
NON_FUNCTIONAL_PATTERNS = [
|
||||
re.compile(p) for p in [
|
||||
r"编制.*变更.*日志",
|
||||
r"变更日志",
|
||||
r"文档背景",
|
||||
r"文档范围",
|
||||
r"术语解释",
|
||||
r"参考",
|
||||
r"参考(文献|文档|资料)?",
|
||||
r"附录",
|
||||
r"版本",
|
||||
r"变更记录",
|
||||
r"目录",
|
||||
r"前言",
|
||||
r"概述",
|
||||
r"简介",
|
||||
r"概述.*背景",
|
||||
r"产品简介",
|
||||
r"场景.*(说明|概述)",
|
||||
r".*概要说明$",
|
||||
r"相关文档",
|
||||
r"行业规范",
|
||||
r"政策法规",
|
||||
r"非功能说明",
|
||||
r"背景介绍",
|
||||
r"PRD", # document title like "XX Auto XXX PRD V1.0"
|
||||
r"产品架构", # architecture overview
|
||||
r"系统架构",
|
||||
]
|
||||
]
|
||||
|
||||
@@ -76,15 +86,20 @@ NON_FUNCTIONAL_PATTERNS = [
|
||||
def _is_functional_section(section_name: str) -> bool:
|
||||
"""Heuristic: exclude background, glossary, changelog, scope sections.
|
||||
|
||||
Sections that are purely structural — preface, glossary, changelog — are excluded.
|
||||
Sections with numbering like '3.1.1' are always considered functional.
|
||||
Check non-functional patterns first, then treat numbered sections (like
|
||||
'3.1.1 系统限制') as likely functional.
|
||||
"""
|
||||
# Numbered sections are functional
|
||||
if _section_number(section_name) != section_name:
|
||||
return True
|
||||
# Explicitly non-functional patterns (checked first)
|
||||
for pat in NON_FUNCTIONAL_PATTERNS:
|
||||
if pat.search(section_name):
|
||||
return False
|
||||
# Documents with only a title (no section number) — check for functional keywords
|
||||
sec_num = _section_number(section_name)
|
||||
if "." not in sec_num and not sec_num[0].isdigit():
|
||||
func_keywords = ["策略", "规则", "功能", "限制", "流程", "配置", "场景",
|
||||
"约束", "条件", "方案", "逻辑", "处理", "机制", "禁止"]
|
||||
if not any(kw in section_name for kw in func_keywords):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@@ -263,19 +278,20 @@ def test_layer_b_coverage(
|
||||
stability_values: list[float] = [cov["overall_rate"]]
|
||||
stability_std = 0.0
|
||||
|
||||
if acceptance_runs > 1:
|
||||
if acceptance_runs > 1 and run_ir_pipeline is not None:
|
||||
parsed_path = request.config.getoption("--parsed-path")
|
||||
if parsed_path and os.path.exists(parsed_path):
|
||||
for _ in range(acceptance_runs - 1):
|
||||
try:
|
||||
ir_list, _ = run_ir_pipeline(parsed_path)
|
||||
# Convert list-format IR to dict for coverage measurement
|
||||
run_ir = _wrap_list_ir(ir_list)
|
||||
run_cov = _measure_coverage(run_ir, parsed_data)
|
||||
stability_values.append(run_cov["overall_rate"])
|
||||
time.sleep(0.5) # rate limiting between runs
|
||||
time.sleep(0.5)
|
||||
except Exception as e:
|
||||
pytest.fail(f"Stability run failed: {e}")
|
||||
elif acceptance_runs > 1 and run_ir_pipeline is None:
|
||||
print(" [Layer B] Stability testing skipped: pipeline runner not available")
|
||||
|
||||
if len(stability_values) > 1:
|
||||
stability_std = statistics.stdev(stability_values)
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
"""Unit tests for config.py pure functions."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "skills" / "ir_generation_skill"))
|
||||
import config
|
||||
|
||||
|
||||
def test_set_input_file():
|
||||
original = config.INPUT_JSON
|
||||
try:
|
||||
config.set_input_file("/tmp/test_input.json")
|
||||
assert config.INPUT_JSON == "/tmp/test_input.json"
|
||||
finally:
|
||||
config.set_input_file(original)
|
||||
|
||||
|
||||
def test_config_constants_exist():
|
||||
"""Verify all expected path constants are defined."""
|
||||
assert config.BASE_DIR
|
||||
assert config.PROJECT_ROOT
|
||||
assert config.PROJECT_OUTPUT
|
||||
assert config.IR_OUTPUT
|
||||
assert config.FINAL_OUTPUT
|
||||
assert config.OUTPUT_DIR
|
||||
assert config.PROMPTS_DIR
|
||||
assert config.TESTS_DIR
|
||||
assert config.DOC_PARSER_OUTPUT
|
||||
assert config.SEMANTIC_INDEX_JSON
|
||||
assert config.IR_FRAGMENTS_JSON
|
||||
assert config.PATH_ENUM_JSON
|
||||
assert config.IR_AUTOCOMPLETE_FRAGMENTS_JSON
|
||||
assert config.IR_FINAL_JSON
|
||||
assert config.IR_AUDIT_REPORT_MD
|
||||
|
||||
|
||||
def test_output_dir_is_under_project_root():
|
||||
"""PROJECT_OUTPUT, IR_OUTPUT, FINAL_OUTPUT should all be under PROJECT_ROOT."""
|
||||
assert config.PROJECT_OUTPUT.startswith(config.PROJECT_ROOT)
|
||||
assert config.IR_OUTPUT.startswith(config.PROJECT_OUTPUT)
|
||||
assert config.FINAL_OUTPUT.startswith(config.PROJECT_OUTPUT)
|
||||
|
||||
|
||||
def test_output_dir_structure():
|
||||
"""IR files should go to output/ir/, final deliverable to output/final/."""
|
||||
assert config.IR_OUTPUT.endswith(os.path.join("output", "ir"))
|
||||
assert config.FINAL_OUTPUT.endswith(os.path.join("output", "final"))
|
||||
assert config.SEMANTIC_INDEX_JSON.startswith(config.IR_OUTPUT)
|
||||
assert config.IR_FRAGMENTS_JSON.startswith(config.IR_OUTPUT)
|
||||
assert config.IR_FINAL_JSON.startswith(config.FINAL_OUTPUT)
|
||||
assert config.IR_AUDIT_REPORT_MD.startswith(config.FINAL_OUTPUT)
|
||||
|
||||
|
||||
def test_ensemble_temperatures_count():
|
||||
"""Should have exactly 3 ensemble temperatures."""
|
||||
assert len(config.ENSEMBLE_TEMPERATURES) == 3
|
||||
|
||||
|
||||
def test_max_tokens_is_int():
|
||||
assert isinstance(config.MAX_TOKENS, int)
|
||||
assert config.MAX_TOKENS > 0
|
||||
|
||||
|
||||
def test_temperature_is_float():
|
||||
assert isinstance(config.TEMPERATURE, float)
|
||||
assert 0.0 <= config.TEMPERATURE <= 2.0
|
||||
|
||||
|
||||
def test_provider_models_has_expected_keys():
|
||||
assert "deepseek" in config.PROVIDER_MODELS
|
||||
assert "dashscope" in config.PROVIDER_MODELS
|
||||
|
||||
|
||||
def test_model_name_in_provider_models():
|
||||
assert config.MODEL_NAME in config.PROVIDER_MODELS.values()
|
||||
@@ -0,0 +1,200 @@
|
||||
"""Unit tests for pure functions in detect_conflicts.py — no LLM calls, no file I/O."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Add detect_conflicts script to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "skills" / "conflict_detection_skill" / "scripts"))
|
||||
import detect_conflicts as dc
|
||||
|
||||
|
||||
# ── _is_nested_tree ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_is_nested_tree_with_children_list():
|
||||
assert dc._is_nested_tree({"children": []}) is True
|
||||
|
||||
|
||||
def test_is_nested_tree_without_children_list():
|
||||
assert dc._is_nested_tree({"nodes": []}) is False
|
||||
|
||||
|
||||
def test_is_nested_tree_empty_dict():
|
||||
assert dc._is_nested_tree({}) is False
|
||||
|
||||
|
||||
# ── _nested_tree_to_text ─────────────────────────────────────────────────────
|
||||
|
||||
def test_nested_tree_simple():
|
||||
tree = {
|
||||
"id": "N1",
|
||||
"name": "开始",
|
||||
"type": "start",
|
||||
"children": [
|
||||
{
|
||||
"id": "N2",
|
||||
"name": "判断条件",
|
||||
"type": "decision",
|
||||
"children": [
|
||||
{
|
||||
"condition": "是",
|
||||
"node": {
|
||||
"id": "N3",
|
||||
"name": "执行动作",
|
||||
"type": "action",
|
||||
},
|
||||
},
|
||||
{
|
||||
"condition": "否",
|
||||
"node": {
|
||||
"id": "N4",
|
||||
"name": "结束",
|
||||
"type": "end",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
result = dc._nested_tree_to_text(tree)
|
||||
assert "[起始] N1: 开始" in result
|
||||
assert "[判断] N2: 判断条件" in result
|
||||
assert '分支 "是":' in result
|
||||
assert "[动作] N3: 执行动作" in result
|
||||
assert '分支 "否":' in result
|
||||
assert "[结束] N4: 结束" in result
|
||||
|
||||
|
||||
def test_nested_tree_empty_children():
|
||||
tree = {"id": "N1", "name": "结束", "type": "end"}
|
||||
result = dc._nested_tree_to_text(tree)
|
||||
assert "[结束] N1: 结束" in result
|
||||
|
||||
|
||||
# ── _flat_tree_to_text ───────────────────────────────────────────────────────
|
||||
|
||||
def test_flat_tree_with_decision_and_action():
|
||||
lt = {
|
||||
"root": "START",
|
||||
"nodes": [
|
||||
{"id": "D1", "type": "decision", "condition": "车速>15",
|
||||
"branches": [{"value": "是", "target": "A1"}, {"value": "否", "target": "END"}]},
|
||||
{"id": "A1", "type": "action", "description": "禁止视频播放"},
|
||||
{"id": "END", "type": "end", "description": "结束"},
|
||||
],
|
||||
}
|
||||
result = dc._flat_tree_to_text(lt)
|
||||
assert "根节点: START" in result
|
||||
assert '判断节点 D1: 条件="车速>15"' in result
|
||||
assert '分支 "是" → A1' in result
|
||||
assert "动作节点 A1: 禁止视频播放" in result
|
||||
|
||||
|
||||
def test_flat_tree_empty():
|
||||
result = dc._flat_tree_to_text({})
|
||||
assert result == ""
|
||||
|
||||
|
||||
# ── _logic_tree_to_text (dispatcher) ─────────────────────────────────────────
|
||||
|
||||
def test_logic_tree_to_text_nested():
|
||||
lt = {"children": [{"id": "N1", "name": "开始", "type": "start"}]}
|
||||
result = dc._logic_tree_to_text(lt)
|
||||
assert "[起始] N1: 开始" in result
|
||||
|
||||
|
||||
def test_logic_tree_to_text_flat():
|
||||
lt = {"nodes": [{"id": "A1", "type": "action", "description": "测试"}]}
|
||||
result = dc._logic_tree_to_text(lt)
|
||||
assert "动作节点 A1: 测试" in result
|
||||
|
||||
|
||||
# ── _parse_conflict_json ─────────────────────────────────────────────────────
|
||||
|
||||
def test_parse_no_conflict():
|
||||
assert dc._parse_conflict_json("[[NO_CONFLICT]]") == []
|
||||
assert dc._parse_conflict_json("some text [[NO_CONFLICT]] more") == []
|
||||
|
||||
|
||||
def test_parse_conflict_valid_json():
|
||||
content = json.dumps([
|
||||
{"conflict_type": "condition_mismatch", "severity": "high",
|
||||
"section": "3.1", "image_snippet": "img", "text_snippet": "txt",
|
||||
"description": "冲突描述"}
|
||||
], ensure_ascii=False)
|
||||
result = dc._parse_conflict_json(content)
|
||||
assert len(result) == 1
|
||||
assert result[0]["conflict_type"] == "condition_mismatch"
|
||||
|
||||
|
||||
def test_parse_conflict_with_markdown_fence():
|
||||
content = '```json\n[{"conflict_type": "contradiction", "severity": "high", "section": "1.0", "image_snippet": "a", "text_snippet": "b", "description": "x"}]\n```'
|
||||
result = dc._parse_conflict_json(content)
|
||||
assert len(result) == 1
|
||||
assert result[0]["conflict_type"] == "contradiction"
|
||||
|
||||
|
||||
def test_parse_conflict_unparseable():
|
||||
assert dc._parse_conflict_json("not valid json at all") == []
|
||||
assert dc._parse_conflict_json("") == []
|
||||
|
||||
|
||||
def test_parse_conflict_mixed_content():
|
||||
content = 'some prefix text\n[\n {"conflict_type": "scope_mismatch", "severity": "low", "section": "4.2", "image_snippet": "img", "text_snippet": "txt", "description": "d"}]\nsuffix'
|
||||
result = dc._parse_conflict_json(content)
|
||||
assert len(result) == 1
|
||||
assert result[0]["conflict_type"] == "scope_mismatch"
|
||||
|
||||
|
||||
# ── _build_text_for_section ──────────────────────────────────────────────────
|
||||
|
||||
def test_build_text_para_blocks():
|
||||
sections = [
|
||||
{"source": "3.1 测试章节",
|
||||
"blocks": [
|
||||
{"type": "para", "text": "第一段文字"},
|
||||
{"type": "para", "text": "第二段文字"},
|
||||
]},
|
||||
]
|
||||
result = dc._build_text_for_section(sections, "3.1 测试章节")
|
||||
assert "第一段文字" in result
|
||||
assert "第二段文字" in result
|
||||
|
||||
|
||||
def test_build_text_table_blocks():
|
||||
sections = [
|
||||
{"source": "4.0 功能列表",
|
||||
"blocks": [
|
||||
{"type": "table", "table": "功能表",
|
||||
"rows": [
|
||||
{"columns": [{"name": "功能", "text": "视频播放"}, {"name": "状态", "text": "禁止"}]},
|
||||
]},
|
||||
]},
|
||||
]
|
||||
result = dc._build_text_for_section(sections, "4.0 功能列表")
|
||||
assert "功能表" in result
|
||||
assert "视频播放" in result
|
||||
assert "禁止" in result
|
||||
|
||||
|
||||
def test_build_text_section_not_found():
|
||||
sections = [{"source": "1.0 概述", "blocks": []}]
|
||||
result = dc._build_text_for_section(sections, "2.0 不存在的章节")
|
||||
assert result == ""
|
||||
|
||||
|
||||
def test_build_text_mixed_blocks():
|
||||
sections = [
|
||||
{"source": "5.1 混合章节",
|
||||
"blocks": [
|
||||
{"type": "para", "text": "介绍文字"},
|
||||
{"type": "table", "table": "表1",
|
||||
"rows": [{"columns": [{"name": "A", "text": "值1"}]}]},
|
||||
]},
|
||||
]
|
||||
result = dc._build_text_for_section(sections, "5.1 混合章节")
|
||||
assert "介绍文字" in result
|
||||
assert "表1" in result
|
||||
assert "值1" in result
|
||||
@@ -83,7 +83,7 @@ def test_sample_ir_json_is_valid():
|
||||
"""The sample IR JSON file should be valid JSON."""
|
||||
sample_path = os.path.join(
|
||||
os.path.dirname(os.path.dirname(__file__)),
|
||||
"skills", "ir_generation_skill",
|
||||
"output",
|
||||
"车机娱乐系统禁止功能文档_精简_updated.json"
|
||||
)
|
||||
if os.path.exists(sample_path):
|
||||
|
||||
Reference in New Issue
Block a user