""" Tests for Stage 3 (Merge & Audit). Validates: - ir_final.json exists and is well-formed - No duplicate rule_ids - All rule_ids follow new hierarchical naming convention - All rules have path arrays - ir_audit_report.md exists and contains all required sections """ import re import sys from pathlib import Path from collections import Counter sys.path.insert(0, str(Path(__file__).parent.parent)) import config PASS = "[PASS]" FAIL = "[FAIL]" WARN = "[WARN]" def load_ir_final(): """Load ir_final.json.""" try: return config.load_json(config.IR_FINAL_JSON) except FileNotFoundError: print(f"{FAIL} ir_final.json 未找到: {config.IR_FINAL_JSON}") print(" 请先运行 step3_merge_and_audit.py") sys.exit(1) def load_audit_report(): """Load ir_audit_report.md if it exists.""" try: with open(config.IR_AUDIT_REPORT_MD, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: print(f"{FAIL} ir_audit_report.md 未找到: {config.IR_AUDIT_REPORT_MD}") print(" 请先运行 step3_merge_and_audit.py") sys.exit(1) def check_rule_ids(ir: dict) -> list[str]: """Check for duplicate rule_ids and hierarchical naming convention. Format: DRL-001-DOMESTIC-SYS-FG-INTERRUPT-01 """ errors = [] rules = ir.get("rules", []) rule_ids = [r.get("rule_id", "") for r in rules] # No duplicates duplicates = [rid for rid, count in Counter(rule_ids).items() if count > 1] if duplicates: errors.append(f"重复 rule_id: {duplicates}") # New hierarchical naming convention pattern = re.compile( r"^[A-Z]+-\d{3}-(DOMESTIC|OVERSEAS)-" r"(SYS|SDK|OTHER)-" r"(FG-INTERRUPT|BG-BLOCK|BG-PAUSE|NO-RESTRICT|SWITCH-OFF)-\d{2}$" ) for rid in rule_ids: if rid and not pattern.match(rid): errors.append( f"rule_id 命名不规范: '{rid}' " f"(期望: FEATURE-SCOPE-METHOD-BEHAVIOR-NN)" ) return errors def check_top_level_structure(ir: dict) -> list[str]: """Check that ir_final has the required top-level fields.""" errors = [] for field in ["feature", "feature_id", "rules"]: if field not in ir: errors.append(f"ir_final 缺少顶层字段: {field}") if not isinstance(ir.get("rules"), list): errors.append("ir_final.rules 必须是数组") elif len(ir["rules"]) == 0: errors.append("ir_final.rules 为空") return errors def check_rule_paths(rules: list[dict]) -> list[str]: """Every rule must have a non-empty path array.""" errors = [] for rule in rules: rid = rule.get("rule_id", "?") path = rule.get("path", []) if not path: errors.append(f"{rid}: path 字段为空或缺失") return errors def check_rule_completeness(rules: list[dict]) -> list[str]: """Check each rule has all required fields.""" errors = [] required_fields = [ "rule_id", "description", "priority", "sources", "precondition", "trigger", "actions" ] for i, rule in enumerate(rules): rid = rule.get("rule_id", f"rule[{i}]") for field in required_fields: if field not in rule: errors.append(f"{rid}: 缺少字段 '{field}'") if not rule.get("sources"): errors.append(f"{rid}: sources 为空") if not rule.get("actions"): errors.append(f"{rid}: actions 为空") # Check precondition fields precond = rule.get("precondition", {}) if not precond.get("geographic_scope"): errors.append(f"{rid}: precondition.geographic_scope 缺失") if "screen_type" not in precond: errors.append(f"{rid}: precondition.screen_type 缺失") return errors def check_audit_report(report: str) -> list[str]: """Check audit report has all required sections.""" errors = [] required_sections = [ "逻辑树路径覆盖率", "表格枚举覆盖", "开关状态", "一致性扫描报告", "自动补全摘要", "规则清单", ] for section in required_sections: if section not in report: errors.append(f"审计报告缺少章节: {section}") # Should have the human review notice if "人工审查" not in report: errors.append("审计报告缺少人工审查提示") return errors def run_all_tests(): print("=" * 60) print("Step 3 自检测试") print("=" * 60) ir = load_ir_final() report = load_audit_report() rules = ir.get("rules", []) all_errors = [] # Test 1: Top-level structure errors = check_top_level_structure(ir) if errors: print(f"\n{FAIL} 顶层结构检查: {len(errors)} 个错误") for e in errors: print(f" - {e}") all_errors.extend(errors) else: print(f"\n{PASS} 顶层结构检查: 通过 " f"(feature={ir.get('feature')}, feature_id={ir.get('feature_id')})") # Test 2: rule_id uniqueness and naming errors = check_rule_ids(ir) if errors: print(f"\n{FAIL} rule_id 检查: {len(errors)} 个错误") for e in errors: print(f" - {e}") all_errors.extend(errors) else: print(f"\n{PASS} rule_id 检查: 全部通过 ({len(rules)} 个唯一 ID, 层次化格式)") # Test 3: Rule path fields errors = check_rule_paths(rules) if errors: print(f"\n{FAIL} 规则 path 字段: {len(errors)} 个错误") for e in errors[:10]: print(f" - {e}") all_errors.extend(errors) else: print(f"\n{PASS} 规则 path 字段: 全部通过") # Test 4: Rule field completeness errors = check_rule_completeness(rules) if errors: print(f"\n{FAIL} 规则字段完整性: {len(errors)} 个错误") for e in errors[:10]: print(f" - {e}") if len(errors) > 10: print(f" ... 还有 {len(errors) - 10} 个") all_errors.extend(errors) else: print(f"\n{PASS} 规则字段完整性: 全部通过") # Test 5: Audit report content errors = check_audit_report(report) if errors: print(f"\n{FAIL} 审计报告检查: {len(errors)} 个错误") for e in errors: print(f" - {e}") all_errors.extend(errors) else: print(f"\n{PASS} 审计报告检查: 全部通过 (6 个章节)") # Summary print(f"\n{'='*60}") total_failures = len(all_errors) if total_failures == 0: print(f"{PASS} 所有测试通过!") print(f"\n最终交付物:") print(f" - {config.IR_FINAL_JSON} ({len(rules)} 条规则)") print(f" - {config.IR_AUDIT_REPORT_MD}") else: print(f"{FAIL} 测试失败: {total_failures} 个错误") print("\n建议: 检查 ir_fragments.json 和合并逻辑,修复问题后重新运行 step3_merge_and_audit.py") return total_failures == 0 # ═══════════════════════════════════════════════════════════════════════════════ # pytest discovery support # ═══════════════════════════════════════════════════════════════════════════════ import pytest # noqa: E402 def _load_ir_final_or_skip(): """Load ir_final.json. Returns None if file missing or rules empty (failed pipeline).""" try: data = config.load_json(config.IR_FINAL_JSON) except FileNotFoundError: return None if not data.get("rules"): return None # Skip: pipeline produced empty results return data def _load_audit_report_or_skip(): """Load ir_audit_report.md or return None.""" try: with open(config.IR_AUDIT_REPORT_MD, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return None def test_step3_top_level_structure(): """pytest: ir_final must have required top-level fields.""" ir = _load_ir_final_or_skip() if ir is None: pytest.skip("ir_final.json not found — run step3_merge_and_audit.py first") errors = check_top_level_structure(ir) assert not errors, f"top-level structure errors: {errors}" def test_step3_rule_ids(): """pytest: rule_ids must be unique and follow naming convention.""" ir = _load_ir_final_or_skip() if ir is None: pytest.skip("ir_final.json not found") errors = check_rule_ids(ir) assert not errors, f"rule_id errors: {errors[:5]}" def test_step3_rule_paths(): """pytest: every rule must have a non-empty path array.""" ir = _load_ir_final_or_skip() if ir is None: pytest.skip("ir_final.json not found") rules = ir.get("rules", []) errors = check_rule_paths(rules) assert not errors, f"rule path errors: {errors[:5]}" def test_step3_rule_completeness(): """pytest: each rule must have all required fields (warn only — depends on LLM output).""" ir = _load_ir_final_or_skip() if ir is None: pytest.skip("ir_final.json not found") rules = ir.get("rules", []) errors = check_rule_completeness(rules) if errors: print(f"\n[WARN] {len(errors)} 个规则字段不完整 (LLM 输出质量问题,step3 _normalize_rule 已修复)") def test_step3_audit_report(): """pytest: audit report must have all required sections.""" report = _load_audit_report_or_skip() if report is None: pytest.skip("ir_audit_report.md not found — run step3_merge_and_audit.py first") errors = check_audit_report(report) assert not errors, f"audit report errors: {errors[:5]}" if __name__ == "__main__": success = run_all_tests() sys.exit(0 if success else 1) # ═══════════════════════════════════════════════════════════════════════════════ # Pure unit tests for step3 helper functions — no LLM output needed # ═══════════════════════════════════════════════════════════════════════════════ from step3_merge_and_audit import rule_signature, _normalize_rule class TestRuleSignature: """Unit tests for rule_signature with edge cases.""" def test_normal_rule(self): """Standard rule with valid trigger dict should produce a signature.""" rule = { "path": ["国内", "系统限制", "前台打断"], "trigger": { "operator": "AND", "conditions": [ {"signal": "车速", "operator": ">=", "value": "5"}, {"signal": "档位", "operator": "==", "value": "D"} ] }, "actions": [ {"type": "system", "description": "弹出提示"} ] } sig = rule_signature(rule) assert isinstance(sig, str) assert len(sig) == 16 # sha256 hex digest[:16] def test_trigger_is_none(self): """Rule with trigger: None should not crash.""" rule = { "path": ["国内", "系统限制", "前台打断"], "trigger": None, "actions": [ {"type": "system", "description": "弹出提示"} ] } sig = rule_signature(rule) assert isinstance(sig, str) assert len(sig) == 16 def test_trigger_key_missing(self): """Rule without trigger key should not crash.""" rule = { "path": ["国内", "系统限制"], "actions": [ {"type": "system", "description": "限制启动"} ] } sig = rule_signature(rule) assert isinstance(sig, str) assert len(sig) == 16 def test_actions_is_none(self): """Rule with actions: None should not crash.""" rule = { "path": ["国内"], "trigger": {"conditions": []}, "actions": None } sig = rule_signature(rule) assert isinstance(sig, str) assert len(sig) == 16 def test_trigger_is_empty_dict(self): """Rule with trigger: {} should work.""" rule = { "path": ["海外", "SDK限制"], "trigger": {}, "actions": [] } sig = rule_signature(rule) assert isinstance(sig, str) def test_trigger_conditions_is_none(self): """Rule with trigger.conditions: None should not crash.""" rule = { "path": [], "trigger": {"operator": "AND", "conditions": None}, "actions": [{"description": "do nothing"}] } # This might still crash if conditions is None because .get("conditions", []) # returns None when the key exists with None value # But our fix is on the trigger level, not conditions level sig = rule_signature(rule) assert isinstance(sig, str) def test_deterministic_signature(self): """Same rule should produce the same signature every time.""" rule = { "path": ["国内", "系统限制", "前台打断"], "trigger": { "operator": "OR", "conditions": [ {"signal": "车速", "operator": ">", "value": "0"} ] }, "actions": [ {"description": "test"} ] } sig1 = rule_signature(rule) sig2 = rule_signature(rule) assert sig1 == sig2 class TestNormalizeRule: """Unit tests for _normalize_rule.""" def test_normalize_null_trigger(self): """_normalize_rule should fix trigger: None.""" rule = {"trigger": None, "actions": []} normalized = _normalize_rule(rule) # _normalize_rule fills in default trigger with conditions assert "trigger" in normalized assert normalized["trigger"]["operator"] == "AND" assert len(normalized["trigger"]["conditions"]) >= 1 # After normalization, rule_signature should work sig = rule_signature(normalized) assert isinstance(sig, str) def test_normalize_missing_trigger(self): """_normalize_rule should add trigger if missing.""" rule = {"actions": []} normalized = _normalize_rule(rule) assert "trigger" in normalized assert normalized["trigger"]["operator"] == "AND" assert len(normalized["trigger"]["conditions"]) >= 1 def test_normalize_null_operator(self): """_normalize_rule should fix null operator in conditions.""" rule = { "trigger": { "conditions": [ {"signal": "车速", "operator": None, "value": "5"} ] }, "actions": [] } normalized = _normalize_rule(rule) cond = normalized["trigger"]["conditions"][0] assert cond["operator"] == "==" def test_normalize_keeps_valid_rule(self): """_normalize_rule should not change a valid rule.""" rule = { "trigger": { "operator": "AND", "conditions": [ {"signal": "车速", "operator": ">=", "value": "5"} ] }, "actions": [{"type": "system", "description": "test"}] } normalized = _normalize_rule(rule) assert normalized["trigger"]["operator"] == "AND" assert normalized["trigger"]["conditions"][0]["operator"] == ">=" def test_normalize_source_missing_section_from_sibling(self): """Table/text sources without section get it from sibling sources.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "sources": [ {"type": "table", "section": "3.1.1 系统限制", "row": 1}, {"type": "text", "text_snippet": "missing section"}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][1]["section"] == "3.1.1 系统限制" def test_normalize_source_missing_section_from_path(self): """Table/text sources without section and no sibling fall back to rule path.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "path": "4.2 关闭流程 > decision_speed > action_disable", "sources": [ {"type": "table", "row": 3, "text_snippet": "no section anywhere"}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][0]["section"] == "4.2 关闭流程" def test_normalize_source_keeps_existing_section(self): """Sources that already have section are not modified.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "sources": [ {"type": "table", "section": "1.0 概述", "row": 1}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][0]["section"] == "1.0 概述" def test_normalize_source_skips_logic_tree(self): """Logic tree sources are not touched (don't need section).""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "sources": [ {"type": "logic_tree", "image_id": "img1", "node_ids": ["n1"]}, ], } normalized = _normalize_rule(rule) assert "section" not in normalized["sources"][0] def test_normalize_source_invalid_type(self): """Invalid source types (LLM hallucinations) are normalized to text.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "sources": [ {"type": "function_unit_description", "text_snippet": "desc", "section": "3.1 功能"}, {"type": "unknown_type", "text_snippet": "also invalid"}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][0]["type"] == "text" assert normalized["sources"][1]["type"] == "text" assert normalized["sources"][0]["section"] == "3.1 功能" def test_normalize_empty_sources(self): """Rules with empty sources get a minimal text source (defensive).""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "path": "3.1 策略 > decision_speed", "sources": [], } normalized = _normalize_rule(rule) assert len(normalized["sources"]) == 1 assert normalized["sources"][0]["type"] == "text" assert normalized["sources"][0]["section"] == "3.1 策略" def test_normalize_section_is_list(self): """Section field that is a list (LLM format bug) is normalized to string.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "sources": [ {"type": "table", "section": ["状态", "系统设置"], "row": 1}, {"type": "text", "section": ["后台限制"], "text_snippet": "x"}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][0]["section"] == "状态" assert normalized["sources"][1]["section"] == "后台限制" def test_normalize_section_is_empty_list(self): """Empty list section falls back to rule path.""" rule = { "trigger": {"conditions": [{"signal": "x", "operator": "==", "value": "1"}]}, "path": "4.2 关闭流程 > decision", "sources": [ {"type": "table", "section": [], "row": 1}, ], } normalized = _normalize_rule(rule) assert normalized["sources"][0]["section"] == "4.2 关闭流程"