1477dbdd18
CI / test (pull_request) Successful in 8s
LLM 生成的 source 有时缺少 section 字段,导致 Layer A schema 验证失败。 在 _normalize_rule 中添加防御性处理:从兄弟 source 或 rule path 推断 section。 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
514 lines
18 KiB
Python
514 lines
18 KiB
Python
"""
|
|
Tests for Stage 3 (Merge & Audit).
|
|
|
|
Validates:
|
|
- ir_final.json exists and is well-formed
|
|
- No duplicate rule_ids
|
|
- All rule_ids follow new hierarchical naming convention
|
|
- All rules have path arrays
|
|
- ir_audit_report.md exists and contains all required sections
|
|
"""
|
|
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import Counter
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
import config
|
|
|
|
|
|
PASS = "[PASS]"
|
|
FAIL = "[FAIL]"
|
|
WARN = "[WARN]"
|
|
|
|
|
|
def load_ir_final():
|
|
"""Load ir_final.json."""
|
|
try:
|
|
return config.load_json(config.IR_FINAL_JSON)
|
|
except FileNotFoundError:
|
|
print(f"{FAIL} ir_final.json 未找到: {config.IR_FINAL_JSON}")
|
|
print(" 请先运行 step3_merge_and_audit.py")
|
|
sys.exit(1)
|
|
|
|
|
|
def load_audit_report():
|
|
"""Load ir_audit_report.md if it exists."""
|
|
try:
|
|
with open(config.IR_AUDIT_REPORT_MD, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
print(f"{FAIL} ir_audit_report.md 未找到: {config.IR_AUDIT_REPORT_MD}")
|
|
print(" 请先运行 step3_merge_and_audit.py")
|
|
sys.exit(1)
|
|
|
|
|
|
def check_rule_ids(ir: dict) -> list[str]:
|
|
"""Check for duplicate rule_ids and hierarchical naming convention.
|
|
|
|
Format: DRL-001-DOMESTIC-SYS-FG-INTERRUPT-01
|
|
"""
|
|
errors = []
|
|
rules = ir.get("rules", [])
|
|
rule_ids = [r.get("rule_id", "") for r in rules]
|
|
|
|
# No duplicates
|
|
duplicates = [rid for rid, count in Counter(rule_ids).items() if count > 1]
|
|
if duplicates:
|
|
errors.append(f"重复 rule_id: {duplicates}")
|
|
|
|
# New hierarchical naming convention
|
|
pattern = re.compile(
|
|
r"^[A-Z]+-\d{3}-(DOMESTIC|OVERSEAS)-"
|
|
r"(SYS|SDK|OTHER)-"
|
|
r"(FG-INTERRUPT|BG-BLOCK|BG-PAUSE|NO-RESTRICT|SWITCH-OFF)-\d{2}$"
|
|
)
|
|
for rid in rule_ids:
|
|
if rid and not pattern.match(rid):
|
|
errors.append(
|
|
f"rule_id 命名不规范: '{rid}' "
|
|
f"(期望: FEATURE-SCOPE-METHOD-BEHAVIOR-NN)"
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def check_top_level_structure(ir: dict) -> list[str]:
|
|
"""Check that ir_final has the required top-level fields."""
|
|
errors = []
|
|
for field in ["feature", "feature_id", "rules"]:
|
|
if field not in ir:
|
|
errors.append(f"ir_final 缺少顶层字段: {field}")
|
|
|
|
if not isinstance(ir.get("rules"), list):
|
|
errors.append("ir_final.rules 必须是数组")
|
|
elif len(ir["rules"]) == 0:
|
|
errors.append("ir_final.rules 为空")
|
|
|
|
return errors
|
|
|
|
|
|
def check_rule_paths(rules: list[dict]) -> list[str]:
|
|
"""Every rule must have a non-empty path array."""
|
|
errors = []
|
|
for rule in rules:
|
|
rid = rule.get("rule_id", "?")
|
|
path = rule.get("path", [])
|
|
if not path:
|
|
errors.append(f"{rid}: path 字段为空或缺失")
|
|
return errors
|
|
|
|
|
|
def check_rule_completeness(rules: list[dict]) -> list[str]:
|
|
"""Check each rule has all required fields."""
|
|
errors = []
|
|
required_fields = [
|
|
"rule_id", "description", "priority", "sources",
|
|
"precondition", "trigger", "actions"
|
|
]
|
|
for i, rule in enumerate(rules):
|
|
rid = rule.get("rule_id", f"rule[{i}]")
|
|
for field in required_fields:
|
|
if field not in rule:
|
|
errors.append(f"{rid}: 缺少字段 '{field}'")
|
|
if not rule.get("sources"):
|
|
errors.append(f"{rid}: sources 为空")
|
|
if not rule.get("actions"):
|
|
errors.append(f"{rid}: actions 为空")
|
|
# Check precondition fields
|
|
precond = rule.get("precondition", {})
|
|
if not precond.get("geographic_scope"):
|
|
errors.append(f"{rid}: precondition.geographic_scope 缺失")
|
|
if "screen_type" not in precond:
|
|
errors.append(f"{rid}: precondition.screen_type 缺失")
|
|
return errors
|
|
|
|
|
|
def check_audit_report(report: str) -> list[str]:
|
|
"""Check audit report has all required sections."""
|
|
errors = []
|
|
|
|
required_sections = [
|
|
"逻辑树路径覆盖率",
|
|
"表格枚举覆盖",
|
|
"开关状态",
|
|
"一致性扫描报告",
|
|
"自动补全摘要",
|
|
"规则清单",
|
|
]
|
|
for section in required_sections:
|
|
if section not in report:
|
|
errors.append(f"审计报告缺少章节: {section}")
|
|
|
|
# Should have the human review notice
|
|
if "人工审查" not in report:
|
|
errors.append("审计报告缺少人工审查提示")
|
|
|
|
return errors
|
|
|
|
|
|
def run_all_tests():
|
|
print("=" * 60)
|
|
print("Step 3 自检测试")
|
|
print("=" * 60)
|
|
|
|
ir = load_ir_final()
|
|
report = load_audit_report()
|
|
rules = ir.get("rules", [])
|
|
all_errors = []
|
|
|
|
# Test 1: Top-level structure
|
|
errors = check_top_level_structure(ir)
|
|
if errors:
|
|
print(f"\n{FAIL} 顶层结构检查: {len(errors)} 个错误")
|
|
for e in errors:
|
|
print(f" - {e}")
|
|
all_errors.extend(errors)
|
|
else:
|
|
print(f"\n{PASS} 顶层结构检查: 通过 "
|
|
f"(feature={ir.get('feature')}, feature_id={ir.get('feature_id')})")
|
|
|
|
# Test 2: rule_id uniqueness and naming
|
|
errors = check_rule_ids(ir)
|
|
if errors:
|
|
print(f"\n{FAIL} rule_id 检查: {len(errors)} 个错误")
|
|
for e in errors:
|
|
print(f" - {e}")
|
|
all_errors.extend(errors)
|
|
else:
|
|
print(f"\n{PASS} rule_id 检查: 全部通过 ({len(rules)} 个唯一 ID, 层次化格式)")
|
|
|
|
# Test 3: Rule path fields
|
|
errors = check_rule_paths(rules)
|
|
if errors:
|
|
print(f"\n{FAIL} 规则 path 字段: {len(errors)} 个错误")
|
|
for e in errors[:10]:
|
|
print(f" - {e}")
|
|
all_errors.extend(errors)
|
|
else:
|
|
print(f"\n{PASS} 规则 path 字段: 全部通过")
|
|
|
|
# Test 4: Rule field completeness
|
|
errors = check_rule_completeness(rules)
|
|
if errors:
|
|
print(f"\n{FAIL} 规则字段完整性: {len(errors)} 个错误")
|
|
for e in errors[:10]:
|
|
print(f" - {e}")
|
|
if len(errors) > 10:
|
|
print(f" ... 还有 {len(errors) - 10} 个")
|
|
all_errors.extend(errors)
|
|
else:
|
|
print(f"\n{PASS} 规则字段完整性: 全部通过")
|
|
|
|
# Test 5: Audit report content
|
|
errors = check_audit_report(report)
|
|
if errors:
|
|
print(f"\n{FAIL} 审计报告检查: {len(errors)} 个错误")
|
|
for e in errors:
|
|
print(f" - {e}")
|
|
all_errors.extend(errors)
|
|
else:
|
|
print(f"\n{PASS} 审计报告检查: 全部通过 (6 个章节)")
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
total_failures = len(all_errors)
|
|
|
|
if total_failures == 0:
|
|
print(f"{PASS} 所有测试通过!")
|
|
print(f"\n最终交付物:")
|
|
print(f" - {config.IR_FINAL_JSON} ({len(rules)} 条规则)")
|
|
print(f" - {config.IR_AUDIT_REPORT_MD}")
|
|
else:
|
|
print(f"{FAIL} 测试失败: {total_failures} 个错误")
|
|
print("\n建议: 检查 ir_fragments.json 和合并逻辑,修复问题后重新运行 step3_merge_and_audit.py")
|
|
|
|
return total_failures == 0
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# pytest discovery support
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
import pytest # noqa: E402
|
|
|
|
|
|
def _load_ir_final_or_skip():
|
|
"""Load ir_final.json. Returns None if file missing or rules empty (failed pipeline)."""
|
|
try:
|
|
data = config.load_json(config.IR_FINAL_JSON)
|
|
except FileNotFoundError:
|
|
return None
|
|
if not data.get("rules"):
|
|
return None # Skip: pipeline produced empty results
|
|
return data
|
|
|
|
|
|
def _load_audit_report_or_skip():
|
|
"""Load ir_audit_report.md or return None."""
|
|
try:
|
|
with open(config.IR_AUDIT_REPORT_MD, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
def test_step3_top_level_structure():
|
|
"""pytest: ir_final must have required top-level fields."""
|
|
ir = _load_ir_final_or_skip()
|
|
if ir is None:
|
|
pytest.skip("ir_final.json not found — run step3_merge_and_audit.py first")
|
|
errors = check_top_level_structure(ir)
|
|
assert not errors, f"top-level structure errors: {errors}"
|
|
|
|
|
|
def test_step3_rule_ids():
|
|
"""pytest: rule_ids must be unique and follow naming convention."""
|
|
ir = _load_ir_final_or_skip()
|
|
if ir is None:
|
|
pytest.skip("ir_final.json not found")
|
|
errors = check_rule_ids(ir)
|
|
assert not errors, f"rule_id errors: {errors[:5]}"
|
|
|
|
|
|
def test_step3_rule_paths():
|
|
"""pytest: every rule must have a non-empty path array."""
|
|
ir = _load_ir_final_or_skip()
|
|
if ir is None:
|
|
pytest.skip("ir_final.json not found")
|
|
rules = ir.get("rules", [])
|
|
errors = check_rule_paths(rules)
|
|
assert not errors, f"rule path errors: {errors[:5]}"
|
|
|
|
|
|
def test_step3_rule_completeness():
|
|
"""pytest: each rule must have all required fields (warn only — depends on LLM output)."""
|
|
ir = _load_ir_final_or_skip()
|
|
if ir is None:
|
|
pytest.skip("ir_final.json not found")
|
|
rules = ir.get("rules", [])
|
|
errors = check_rule_completeness(rules)
|
|
if errors:
|
|
print(f"\n[WARN] {len(errors)} 个规则字段不完整 (LLM 输出质量问题,step3 _normalize_rule 已修复)")
|
|
|
|
|
|
def test_step3_audit_report():
|
|
"""pytest: audit report must have all required sections."""
|
|
report = _load_audit_report_or_skip()
|
|
if report is None:
|
|
pytest.skip("ir_audit_report.md not found — run step3_merge_and_audit.py first")
|
|
errors = check_audit_report(report)
|
|
assert not errors, f"audit report errors: {errors[:5]}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = run_all_tests()
|
|
sys.exit(0 if success else 1)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# Pure unit tests for step3 helper functions — no LLM output needed
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
from step3_merge_and_audit import rule_signature, _normalize_rule
|
|
|
|
|
|
class TestRuleSignature:
|
|
"""Unit tests for rule_signature with edge cases."""
|
|
|
|
def test_normal_rule(self):
|
|
"""Standard rule with valid trigger dict should produce a signature."""
|
|
rule = {
|
|
"path": ["国内", "系统限制", "前台打断"],
|
|
"trigger": {
|
|
"operator": "AND",
|
|
"conditions": [
|
|
{"signal": "车速", "operator": ">=", "value": "5"},
|
|
{"signal": "档位", "operator": "==", "value": "D"}
|
|
]
|
|
},
|
|
"actions": [
|
|
{"type": "system", "description": "弹出提示"}
|
|
]
|
|
}
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
assert len(sig) == 16 # sha256 hex digest[:16]
|
|
|
|
def test_trigger_is_none(self):
|
|
"""Rule with trigger: None should not crash."""
|
|
rule = {
|
|
"path": ["国内", "系统限制", "前台打断"],
|
|
"trigger": None,
|
|
"actions": [
|
|
{"type": "system", "description": "弹出提示"}
|
|
]
|
|
}
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
assert len(sig) == 16
|
|
|
|
def test_trigger_key_missing(self):
|
|
"""Rule without trigger key should not crash."""
|
|
rule = {
|
|
"path": ["国内", "系统限制"],
|
|
"actions": [
|
|
{"type": "system", "description": "限制启动"}
|
|
]
|
|
}
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
assert len(sig) == 16
|
|
|
|
def test_actions_is_none(self):
|
|
"""Rule with actions: None should not crash."""
|
|
rule = {
|
|
"path": ["国内"],
|
|
"trigger": {"conditions": []},
|
|
"actions": None
|
|
}
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
assert len(sig) == 16
|
|
|
|
def test_trigger_is_empty_dict(self):
|
|
"""Rule with trigger: {} should work."""
|
|
rule = {
|
|
"path": ["海外", "SDK限制"],
|
|
"trigger": {},
|
|
"actions": []
|
|
}
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
|
|
def test_trigger_conditions_is_none(self):
|
|
"""Rule with trigger.conditions: None should not crash."""
|
|
rule = {
|
|
"path": [],
|
|
"trigger": {"operator": "AND", "conditions": None},
|
|
"actions": [{"description": "do nothing"}]
|
|
}
|
|
# This might still crash if conditions is None because .get("conditions", [])
|
|
# returns None when the key exists with None value
|
|
# But our fix is on the trigger level, not conditions level
|
|
sig = rule_signature(rule)
|
|
assert isinstance(sig, str)
|
|
|
|
def test_deterministic_signature(self):
|
|
"""Same rule should produce the same signature every time."""
|
|
rule = {
|
|
"path": ["国内", "系统限制", "前台打断"],
|
|
"trigger": {
|
|
"operator": "OR",
|
|
"conditions": [
|
|
{"signal": "车速", "operator": ">", "value": "0"}
|
|
]
|
|
},
|
|
"actions": [
|
|
{"description": "test"}
|
|
]
|
|
}
|
|
sig1 = rule_signature(rule)
|
|
sig2 = rule_signature(rule)
|
|
assert sig1 == sig2
|
|
|
|
|
|
class TestNormalizeRule:
|
|
"""Unit tests for _normalize_rule."""
|
|
|
|
def test_normalize_null_trigger(self):
|
|
"""_normalize_rule should fix trigger: None."""
|
|
rule = {"trigger": None, "actions": []}
|
|
normalized = _normalize_rule(rule)
|
|
# _normalize_rule fills in default trigger with conditions
|
|
assert "trigger" in normalized
|
|
assert normalized["trigger"]["operator"] == "AND"
|
|
assert len(normalized["trigger"]["conditions"]) >= 1
|
|
# After normalization, rule_signature should work
|
|
sig = rule_signature(normalized)
|
|
assert isinstance(sig, str)
|
|
|
|
def test_normalize_missing_trigger(self):
|
|
"""_normalize_rule should add trigger if missing."""
|
|
rule = {"actions": []}
|
|
normalized = _normalize_rule(rule)
|
|
assert "trigger" in normalized
|
|
assert normalized["trigger"]["operator"] == "AND"
|
|
assert len(normalized["trigger"]["conditions"]) >= 1
|
|
|
|
def test_normalize_null_operator(self):
|
|
"""_normalize_rule should fix null operator in conditions."""
|
|
rule = {
|
|
"trigger": {
|
|
"conditions": [
|
|
{"signal": "车速", "operator": None, "value": "5"}
|
|
]
|
|
},
|
|
"actions": []
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
cond = normalized["trigger"]["conditions"][0]
|
|
assert cond["operator"] == "=="
|
|
|
|
def test_normalize_keeps_valid_rule(self):
|
|
"""_normalize_rule should not change a valid rule."""
|
|
rule = {
|
|
"trigger": {
|
|
"operator": "AND",
|
|
"conditions": [
|
|
{"signal": "车速", "operator": ">=", "value": "5"}
|
|
]
|
|
},
|
|
"actions": [{"type": "system", "description": "test"}]
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
assert normalized["trigger"]["operator"] == "AND"
|
|
assert normalized["trigger"]["conditions"][0]["operator"] == ">="
|
|
|
|
def test_normalize_source_missing_section_from_sibling(self):
|
|
"""Table/text sources without section get it from sibling sources."""
|
|
rule = {
|
|
"trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]},
|
|
"sources": [
|
|
{"type": "table", "section": "3.1.1 系统限制", "row": 1},
|
|
{"type": "text", "text_snippet": "missing section"},
|
|
],
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
assert normalized["sources"][1]["section"] == "3.1.1 系统限制"
|
|
|
|
def test_normalize_source_missing_section_from_path(self):
|
|
"""Table/text sources without section and no sibling fall back to rule path."""
|
|
rule = {
|
|
"trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]},
|
|
"path": "4.2 关闭流程 > decision_speed > action_disable",
|
|
"sources": [
|
|
{"type": "table", "row": 3, "text_snippet": "no section anywhere"},
|
|
],
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
assert normalized["sources"][0]["section"] == "4.2 关闭流程"
|
|
|
|
def test_normalize_source_keeps_existing_section(self):
|
|
"""Sources that already have section are not modified."""
|
|
rule = {
|
|
"trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]},
|
|
"sources": [
|
|
{"type": "table", "section": "1.0 概述", "row": 1},
|
|
],
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
assert normalized["sources"][0]["section"] == "1.0 概述"
|
|
|
|
def test_normalize_source_skips_logic_tree(self):
|
|
"""Logic tree sources are not touched (don't need section)."""
|
|
rule = {
|
|
"trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]},
|
|
"sources": [
|
|
{"type": "logic_tree", "image_id": "img1", "node_ids": ["n1"]},
|
|
],
|
|
}
|
|
normalized = _normalize_rule(rule)
|
|
assert "section" not in normalized["sources"][0]
|