""" Stage 3: Deterministic Merge, Consistency Check & Completeness Audit. - Merges IR rule fragments (including autocomplete), deduplicating by trigger+actions. - Reassigns stable hierarchical rule_ids. - Runs consistency checks: naming uniformity, rule contradictions. - Generates an audit report covering: 1. Path coverage (vs enumerated logic tree paths) 2. Table enumeration coverage 3. Global switch state coverage 4. Consistency scan report 5. Auto-complete summary 6. Final rule manifest Outputs: - ir_final.json - ir_audit_report.md """ import hashlib import json import sys from collections import defaultdict from datetime import datetime from pathlib import Path import config from step2_5_branch_coverage import compute_path_coverage, enumerate_all_paths PASS = "[PASS]" WARN = "[WARN]" FAIL = "[FAIL]" # ---- Rule ID Generation ---- LABEL_MAP = { "国内": "DOMESTIC", "海外": "OVERSEAS", "系统限制": "SYS", "SDK限制": "SDK", "SDK自定义限制": "SDK", "其他应用": "OTHER", "行车娱乐限制": "SYS", "行车娱乐禁止": "SYS", "前台打断": "FG-INTERRUPT", "后台限制启动": "BG-BLOCK", "后台禁止": "BG-BLOCK", "后台暂停功能": "BG-PAUSE", "后台允许": "BG-ALLOW", "无限制": "NO-RESTRICT", "开关关闭": "SWITCH-OFF", "开关置灰": "SWITCH-GRAY", "确认弹窗": "CONFIRM-DLG", "风险确认弹窗": "CONFIRM-DLG", } def _path_to_rule_id_components(path: list[str]) -> tuple[str, str, str]: """Extract (scope, method, behavior) from a path array. Falls back to "UNKNOWN" for unrecognized components. """ scope = "UNKNOWN" method = "UNKNOWN" behavior = "UNKNOWN" for segment in path: mapped = LABEL_MAP.get(segment) if mapped in ("DOMESTIC", "OVERSEAS"): scope = mapped elif mapped in ("SYS", "SDK", "OTHER"): method = mapped elif mapped in ("FG-INTERRUPT", "BG-BLOCK", "BG-PAUSE", "NO-RESTRICT", "SWITCH-OFF"): behavior = mapped return scope, method, behavior # ---- Loading ---- def load_fragments() -> list[dict]: """Load IR fragments from Stage 2.""" return config.load_json(config.IR_FRAGMENTS_JSON) def load_autocomplete_fragments() -> list[dict]: """Load auto-complete fragments from Stage 2.5, or return [] if absent.""" path = config.IR_AUTOCOMPLETE_FRAGMENTS_JSON if not Path(path).exists(): return [] return config.load_json(path) def load_semantic_index() -> dict: """Load merged semantic index from Stage 1.""" return config.load_json(config.SEMANTIC_INDEX_JSON) def load_path_enumeration() -> dict: """Load logic tree path enumeration, or return {} if absent.""" path = config.PATH_ENUM_JSON if not Path(path).exists(): return {} data = config.load_json(path) return data.get("logic_tree_paths", {}) # ---- Rule Merge ---- def rule_signature(rule: dict) -> str: """Generate a dedup signature from path + trigger + actions.""" path = rule.get("path", []) trigger = rule.get("trigger") or {} actions = rule.get("actions") or [] raw_conditions = trigger.get("conditions") or [] conditions = sorted( raw_conditions, key=lambda c: (c or {}).get("signal", "") ) sorted_actions = sorted(actions, key=lambda a: a.get("description", "")) sig_data = { "path": path, "conditions": conditions, "actions": sorted_actions, } sig_json = json.dumps(sig_data, ensure_ascii=False, sort_keys=True) return hashlib.sha256(sig_json.encode()).hexdigest()[:16] def _normalize_rule(rule: dict) -> dict: """Ensure a rule has all required fields with valid defaults. Fixes common LLM output issues: missing trigger, null operator, etc. """ # Ensure trigger exists if not rule.get("trigger"): rule["trigger"] = {} trigger = rule["trigger"] # Ensure trigger-level combining operator (AND/OR) for multi-condition triggers if not trigger.get("operator"): trigger["operator"] = "AND" # If trigger has an event, it's event-based (no conditions needed) if trigger.get("event") is not None: return rule # Ensure conditions list exists if "conditions" not in trigger: trigger["conditions"] = [] # Fix null operators in individual conditions for cond in trigger["conditions"]: if not cond.get("operator"): cond["operator"] = "==" if not cond.get("signal"): cond["signal"] = "unknown" if "value" not in cond: cond["value"] = "N/A" # If still no conditions, add a default one if not trigger["conditions"]: trigger["conditions"] = [{ "signal": "system_state", "operator": "==", "value": "active" }] # Ensure table/text sources have a section field (defensive against LLM omission) # Also normalize invalid source types (LLM hallucinations like function_unit_description) sources = rule.get("sources", []) valid_types = {"table", "text", "logic_tree"} # try to infer a default section from the rule path default_section = "" for s in sources: sec = s.get("section", "") if sec and sec.strip(): default_section = sec.strip() break if not default_section: path = rule.get("path", "") if path: default_section = path.split(" > ")[0] if " > " in path else path if sources: for src in sources: stype = src.get("type", "") if stype and stype not in valid_types: src["type"] = "text" stype = "text" if stype in ("table", "text"): if not src.get("section"): src["section"] = default_section else: # Empty sources list — add a minimal text source (defensive against schema failure) src = {"type": "text", "text_snippet": "inferred from rule context"} if default_section: src["section"] = default_section sources.append(src) rule["sources"] = sources return rule def merge_rules(fragments: list[dict], autocomplete_fragments: list[dict] | None = None) -> list[dict]: """Merge rules across all fragments, deduplicating by trigger+actions. Includes autocomplete fragments if provided. """ all_fragments = list(fragments) if autocomplete_fragments: all_fragments.extend(autocomplete_fragments) signature_map: dict[str, dict] = {} order = [] for fragment in all_fragments: for rule in fragment.get("rules", []): sig = rule_signature(rule) if sig in signature_map: existing = signature_map[sig] existing_sources = existing.setdefault("sources", []) for src in rule.get("sources", []): if src not in existing_sources: existing_sources.append(src) if len(rule.get("description", "")) > len( existing.get("description", "") ): existing["description"] = rule["description"] # Merge path if present and different rpath = rule.get("path", []) epath = existing.get("path", []) if rpath and not epath: existing["path"] = rpath else: signature_map[sig] = dict(rule) order.append(sig) merged = [signature_map[sig] for sig in order] total_before = sum(len(f.get("rules", [])) for f in all_fragments) auto_before = sum( len(f.get("rules", [])) for f in (autocomplete_fragments or []) ) print(f" 主片段规则: {total_before - auto_before} 条") if auto_before: print(f" 自动补全规则: {auto_before} 条") print(f" 合并后: {len(merged)} 条 (去重 {total_before - len(merged)} 条)") return merged # ---- Rule ID Assignment ---- def assign_rule_ids(rules: list[dict], feature_id: str = "DRL-001") -> list[dict]: """Reassign stable hierarchical rule_ids. Format: {feature_id}-SCOPE-METHOD-BEHAVIOR-NN Example: DRL-001-DOMESTIC-SYS-FG-INTERRUPT-01 """ # Counter per (scope, method, behavior) key counters: dict[tuple[str, str, str], int] = defaultdict(int) for rule in rules: path = rule.get("path", []) scope, method, behavior = _path_to_rule_id_components(path) # If path is missing, try to infer from precondition if scope == "UNKNOWN": precond = rule.get("precondition", {}) geo = precond.get("geographic_scope", "") scope = LABEL_MAP.get(geo, "DOMESTIC") if method == "UNKNOWN": precond = rule.get("precondition", {}) at = precond.get("app_type", "") method = LABEL_MAP.get(at, "SYS") if behavior == "UNKNOWN": precond = rule.get("precondition", {}) sw = precond.get("switch", "") if sw == "关闭": behavior = "SWITCH-OFF" else: # Infer from actions actions = rule.get("actions", []) action_descs = " ".join( a.get("description", "") for a in actions ) if "打断" in action_descs or "前台" in action_descs: behavior = "FG-INTERRUPT" elif "限制" in action_descs and "启动" in action_descs: behavior = "BG-BLOCK" elif "暂停" in action_descs: behavior = "BG-PAUSE" else: behavior = "NO-RESTRICT" key = (scope, method, behavior) counters[key] += 1 seq = counters[key] rule["rule_id"] = f"{feature_id}-{scope}-{method}-{behavior}-{seq:02d}" return rules # ---- Consistency Checks ---- def _check_naming_consistency(rules: list[dict]) -> list[dict]: """Check that app_type, app_state, switch values use unified terminology. Returns a list of inconsistency items. """ results = [] # Collect all values for each field app_types = set() app_states = set() switches = set() geo_scopes = set() screen_types = set() for rule in rules: precond = rule.get("precondition", {}) if precond.get("app_type"): app_types.add(precond["app_type"]) if precond.get("app_state"): app_states.add(precond["app_state"]) if precond.get("switch"): switches.add(precond["switch"]) if precond.get("geographic_scope"): geo_scopes.add(precond["geographic_scope"]) if precond.get("screen_type"): screen_types.add(precond["screen_type"]) # Known canonical values canonical_app_types = {"系统限制", "SDK限制", "其他应用"} canonical_app_states = {"前台", "后台"} canonical_switches = {"开启", "关闭"} canonical_geo = {"国内", "海外"} canonical_screens = {"CSD", "PSD", "RFD", "any"} unknown_app_types = app_types - canonical_app_types unknown_app_states = app_states - canonical_app_states unknown_switches = switches - canonical_switches unknown_geo = geo_scopes - canonical_geo unknown_screens = screen_types - canonical_screens if unknown_app_types: results.append({ "field": "app_type", "issue": f"非标准值: {sorted(unknown_app_types)}", "expected": sorted(canonical_app_types), "status": WARN, }) if unknown_app_states: results.append({ "field": "app_state", "issue": f"非标准值: {sorted(unknown_app_states)}", "expected": sorted(canonical_app_states), "status": WARN, }) if unknown_switches: results.append({ "field": "switch", "issue": f"非标准值: {sorted(unknown_switches)}", "expected": sorted(canonical_switches), "status": WARN, }) if unknown_geo: results.append({ "field": "geographic_scope", "issue": f"非标准值: {sorted(unknown_geo)}", "expected": sorted(canonical_geo), "status": WARN, }) if unknown_screens: results.append({ "field": "screen_type", "issue": f"非标准值: {sorted(unknown_screens)}", "expected": sorted(canonical_screens), "status": WARN, }) # Also check for near-duplicates (e.g., "系统限制类" vs "系统限制") similar_pairs = [] for v1 in app_types: for v2 in app_types: if v1 < v2 and (v1 in v2 or v2 in v1): similar_pairs.append(f"'{v1}' vs '{v2}'") if similar_pairs: results.append({ "field": "app_type", "issue": f"疑似同义异名: {', '.join(similar_pairs)}", "status": WARN, }) if not results: results.append({ "field": "all", "issue": "所有字段术语统一", "status": PASS, }) return results def _trigger_overlaps(t1: dict, t2: dict) -> bool: """Check if two triggers have any overlapping signal conditions.""" conds1 = t1.get("conditions", []) conds2 = t2.get("conditions", []) signals1 = {c.get("signal") for c in conds1 if isinstance(c, dict)} signals2 = {c.get("signal") for c in conds2 if isinstance(c, dict)} return bool(signals1 & signals2) def _actions_conflict(a1: list[dict], a2: list[dict]) -> bool: """Check if two action lists appear contradictory. "Contradictory" means: both have user_interaction with different content, or one does system interrupt while the other does nothing. """ descs1 = {a.get("description", "") for a in a1} descs2 = {a.get("description", "") for a in a2} # If one set is a subset of the other, no conflict — just less detail if descs1.issubset(descs2) or descs2.issubset(descs1): return False # Check for contradictory user_interaction content contents1 = { a.get("content", "") for a in a1 if a.get("type") == "user_interaction" } contents2 = { a.get("content", "") for a in a2 if a.get("type") == "user_interaction" } if contents1 and contents2 and contents1 != contents2: return True # Check if one has system actions and the other doesn't has_sys1 = any(a.get("type") == "system" for a in a1) has_sys2 = any(a.get("type") == "system" for a in a2) if has_sys1 != has_sys2 and (contents1 or contents2): return True return False def _precondition_overlaps(p1: dict, p2: dict) -> bool: """Check if two preconditions overlap significantly. Two preconditions overlap if they share the same scope, switch state, and either share app_type or one is unspecified. """ if p1.get("geographic_scope") != p2.get("geographic_scope"): return False if p1.get("switch") != p2.get("switch"): return False # App type overlap (empty = any) at1 = p1.get("app_type", "") at2 = p2.get("app_type", "") if at1 and at2 and at1 != at2: return False # App state overlap as1 = p1.get("app_state", "") as2 = p2.get("app_state", "") if as1 and as2 and as1 != as2: return False return True def _detect_contradictions(rules: list[dict]) -> list[dict]: """Find pairs of rules with overlapping preconditions but contradictory actions. Returns a list of contradiction items with: {rule_a, rule_b, conflict_point, resolvable, recommendation} """ contradictions = [] for i in range(len(rules)): for j in range(i + 1, len(rules)): r1 = rules[i] r2 = rules[j] rid1 = r1.get("rule_id", f"rule[{i}]") rid2 = r2.get("rule_id", f"rule[{j}]") p1 = r1.get("precondition", {}) p2 = r2.get("precondition", {}) if not _precondition_overlaps(p1, p2): continue t1 = r1.get("trigger", {}) t2 = r2.get("trigger", {}) if not _trigger_overlaps(t1, t2): continue a1 = r1.get("actions", []) a2 = r2.get("actions", []) if not _actions_conflict(a1, a2): continue # Determine the conflict point path1 = r1.get("path", []) path2 = r2.get("path", []) conflict_point = ( f"相同前置状态 (scope={p1.get('geographic_scope')}, " f"app={p1.get('app_type', 'any')}, " f"state={p1.get('app_state', 'any')}) " f"但行为路径不同: {path1} vs {path2}" ) # Check if resolvable: if paths differ only at behavior level # and the behaviors are non-overlapping (e.g., FG vs BG) resolvable = False if path1 and path2: shared_prefix = [] for a, b in zip(path1, path2): if a == b: shared_prefix.append(a) else: break # If they share scope/method but differ on app_state → not a conflict # they are different scenarios if len(shared_prefix) >= 3: # scope + method + (app_state) # Actually, same app_state with different behaviors could be a real conflict pass elif len(shared_prefix) >= 2: # Same scope+method, different app_state → not a real conflict resolvable = True contradictions.append({ "rule_a": rid1, "rule_b": rid2, "conflict_point": conflict_point, "rule_a_path": path1, "rule_b_path": path2, "resolvable": resolvable, "recommendation": ( "路径前缀不同,可能为不同场景的正常分支" if resolvable else "请人工确认是否为真正的矛盾,或合并规则" ), }) return contradictions def _auto_resolve_contradictions( contradictions: list[dict], doc: dict ) -> tuple[list[dict], list[dict]]: """Attempt to auto-resolve contradictions using resolved_conflicts. Returns (resolved, unresolved). """ if not contradictions: return [], [] resolved_conflicts = doc.get("resolved_conflicts", []) resolved = [] unresolved = [] for c in contradictions: # Check if any resolved_conflict covers this auto_fixed = False for rc in resolved_conflicts: rc_text = rc.get("correction", "") + rc.get("conflict_type", "") # Simple heuristic: check if the conflict involves the sections mentioned if any( seg in rc_text for seg in c.get("rule_a_path", []) + c.get("rule_b_path", []) ): c["auto_resolved_by"] = rc.get("correction", "") c["resolvable"] = True resolved.append(c) auto_fixed = True break if not auto_fixed: unresolved.append(c) return resolved, unresolved # ---- Path Coverage Audit ---- def audit_path_coverage( doc: dict, rules: list[dict] ) -> tuple[list[dict], dict]: """Audit logic tree path coverage (vs node coverage). Uses the same path enumeration and coverage computation as step 2.5. Returns (results_list, stats_dict). """ all_paths = enumerate_all_paths(doc) if not all_paths: return [], {"total_paths": 0, "covered_paths": 0, "uncovered_paths": 0, "coverage_pct": 100.0} covered, uncovered, stats = compute_path_coverage(all_paths, rules) results = [] for image_id, paths in all_paths.items(): # Compute per-image coverage img_covered = [p for p in covered if p.get("image_id") == image_id] img_uncovered = [p for p in uncovered if p.get("image_id") == image_id] img_total = len(img_covered) + len(img_uncovered) img_cov = ( round(len(img_covered) / img_total * 100, 1) if img_total > 0 else 100.0 ) status = PASS if img_cov >= 95 else (WARN if img_cov >= 70 else FAIL) detail = f"{len(img_covered)}/{img_total} 路径被覆盖 ({img_cov}%)" if img_uncovered: uncovered_meanings = [p.get("meaning", "?") for p in img_uncovered[:5]] detail += f"; 未覆盖路径示例: {uncovered_meanings}" if len(img_uncovered) > 5: detail += f" ... 还有 {len(img_uncovered) - 5} 条" results.append({ "check": f"逻辑树 {image_id} 路径覆盖率", "status": status, "coverage_pct": img_cov, "detail": detail, "image_id": image_id, "uncovered_paths": img_uncovered, }) return results, stats # ---- Table Enumeration Audit ---- def find_table_enums(doc: dict) -> list[dict]: """Find enumerated values in tables.""" enums = [] for section in doc.get("sections", []): for block in section.get("blocks", []): if block["type"] != "table": continue headers = block.get("headers", []) if not headers: continue if "功能" in headers and "功能详细说明" in headers: for row in block.get("rows", []): cols = row.get("columns", []) key_col = next( (c for c in cols if c.get("name") == "功能"), None ) val_col = next( (c for c in cols if c.get("name") == "功能详细说明"), None ) if key_col and val_col: enums.append({ "section": section.get("source", ""), "row": key_col.get("row"), "key": key_col.get("text", ""), "value": val_col.get("text", ""), }) else: first_col_name = headers[0] if headers else "" values = [] for row in block.get("rows", []): for col in row.get("columns", []): if col.get("name") == first_col_name: values.append(col.get("text", "")) if values: enums.append({ "section": section.get("source", ""), "column": first_col_name, "values": values, }) return enums def audit_table_enums(rules: list[dict], doc: dict) -> list[dict]: """Check if key enumerated values appear in rule preconditions.""" results = [] rule_preconditions = [rule.get("precondition", {}) for rule in rules] # App type coverage app_types = {"系统限制", "SDK限制", "其他应用"} found_app_types = set() for precond in rule_preconditions: at = precond.get("app_type", "") if at: found_app_types.add(at) missing_types = app_types - found_app_types results.append({ "check": "应用类型枚举覆盖", "status": PASS if not missing_types else WARN, "detail": f"已覆盖: {found_app_types or '无'}" + (f"; 未覆盖: {missing_types}" if missing_types else ""), }) # App state coverage app_states = {"前台", "后台"} found_states = set() for precond in rule_preconditions: st = precond.get("app_state", "") if st: found_states.add(st) missing_states = app_states - found_states results.append({ "check": "应用前后台状态覆盖", "status": PASS if not missing_states else WARN, "detail": f"已覆盖: {found_states or '无'}" + (f"; 未覆盖: {missing_states}" if missing_states else ""), }) # Trigger signal coverage trigger_signals = set() for rule in rules: for cond in rule.get("trigger", {}).get("conditions", []): signal = cond.get("signal", "") if signal: trigger_signals.add(signal) key_signals = {"车速", "档位", "车速_持续时间", "应用请求启动"} missing_signals = key_signals - trigger_signals results.append({ "check": "触发信号覆盖(车速/档位/持续时间/启动请求)", "status": PASS if not missing_signals else WARN, "detail": f"已覆盖信号: {sorted(trigger_signals)}" + (f"; 未覆盖: {missing_signals}" if missing_signals else ""), }) return results # ---- Switch Coverage Audit ---- def audit_switch_coverage(rules: list[dict]) -> list[dict]: """Check that rules cover both switch ON and OFF states.""" switch_on = False switch_off = False for rule in rules: sw = rule.get("precondition", {}).get("switch", "") if sw == "开启": switch_on = True elif sw == "关闭": switch_off = True status = PASS detail_parts = [] if switch_on: detail_parts.append("开关=开启: 有规则覆盖") else: detail_parts.append("开关=开启: 未找到规则") status = FAIL if switch_off: detail_parts.append("开关=关闭: 有规则覆盖") else: detail_parts.append("开关=关闭: 未找到规则") status = FAIL return [{ "check": "开关状态完整性(开启/关闭)", "status": status, "detail": "; ".join(detail_parts), }] # ---- Audit Report Generation ---- def generate_audit_report( rules: list[dict], doc: dict, feature_name: str, path_results: list[dict], path_stats: dict, enum_results: list[dict], switch_results: list[dict], consistency_results: list[dict], contradictions: list[dict], unresolved_contradictions: list[dict], autocomplete_count: int, path_conflicts: list[dict] | None = None, ) -> str: """Generate ir_audit_report.md with all audit sections.""" lines = [] lines.append("# IR 完整性审计报告") lines.append("") lines.append(f"**功能**: {feature_name}") lines.append(f"**规则总数**: {len(rules)}") lines.append(f"**生成时间**: {datetime.now().isoformat()}") lines.append("") # Human review notice issue_count = sum( 1 for r in path_results + enum_results + switch_results + consistency_results if r["status"] in (WARN, FAIL) ) + len(unresolved_contradictions) + len(path_conflicts or []) lines.append( f"> **重要**: 请人工审查以下标记项。" f"共 {issue_count} 项需要关注。" ) lines.append( f'> 如无需修改,在对应项后标注 **"已确认"**。' ) lines.append("") # ---- Section 1: Path Coverage ---- lines.append("## 1. 逻辑树路径覆盖率") lines.append("") lines.append( f"**总体**: {path_stats.get('covered_paths', 0)}/" f"{path_stats.get('total_paths', 0)} 路径已覆盖 " f"({path_stats.get('coverage_pct', 0)}%)" ) lines.append("") lines.append("| 图片 ID | 覆盖率 | 状态 | 详情 |") lines.append("|---------|--------|------|------|") for r in path_results: lines.append( f"| {r['image_id']} | {r['coverage_pct']}% " f"| {r['status']} | {r['detail']} |" ) lines.append("") # Uncovered path details for r in path_results: uncovered = r.get("uncovered_paths", []) if uncovered: lines.append(f"### {r['image_id']} 未覆盖路径详情") lines.append("") for p in uncovered[:10]: meaning = p.get("meaning", "?") node_ids = p.get("node_ids", []) lines.append( f"- **路径**: {meaning} " f"(节点: {' → '.join(node_ids)})" ) if len(uncovered) > 10: lines.append(f"- ... 还有 {len(uncovered) - 10} 条未覆盖路径") lines.append("") # ---- Section 2: Table Enumeration ---- lines.append("## 2. 表格枚举覆盖") lines.append("") lines.append("| 检查项 | 状态 | 详情 |") lines.append("|--------|------|------|") for r in enum_results: lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |") lines.append("") # ---- Section 3: Switch Coverage ---- lines.append("## 3. 全局开关状态覆盖") lines.append("") lines.append("| 检查项 | 状态 | 详情 |") lines.append("|--------|------|------|") for r in switch_results: lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |") lines.append("") # ---- Section 4: Consistency Scan ---- lines.append("## 4. 一致性扫描报告") lines.append("") lines.append("### 4.1 术语统一性") lines.append("") lines.append("| 字段 | 状态 | 详情 |") lines.append("|------|------|------|") for r in consistency_results: expected = r.get("expected", []) expected_str = f" (期望: {expected})" if expected else "" lines.append( f"| {r['field']} | {r['status']} | {r['issue']}{expected_str} |" ) lines.append("") lines.append("### 4.2 规则矛盾检测") lines.append("") if contradictions: auto_resolved = [c for c in contradictions if c.get("auto_resolved_by")] remaining = [c for c in contradictions if not c.get("auto_resolved_by")] if auto_resolved: lines.append(f"**自动解决**: {len(auto_resolved)} 项 (通过图文冲突仲裁)") lines.append("") for c in auto_resolved: lines.append( f"- {c['rule_a']} vs {c['rule_b']}: " f"已按仲裁 '**{c['auto_resolved_by']}**' 处理" ) lines.append("") if remaining: lines.append(f"**需人工确认**: {len(remaining)} 项") lines.append("") for c in remaining: lines.append(f"### 矛盾: {c['rule_a']} vs {c['rule_b']}") lines.append(f"- **冲突点**: {c['conflict_point']}") lines.append(f"- **路径A**: {c.get('rule_a_path', [])}") lines.append(f"- **路径B**: {c.get('rule_b_path', [])}") lines.append(f"- **建议**: {c.get('recommendation', '请人工判断')}") lines.append(f'- [ ] 已确认 (标注 **"已确认"**)') lines.append("") else: lines.append("未检测到规则矛盾。") lines.append("") # ---- Section 4.3: Path Conflicts ---- lines.append("### 4.3 路径冲突(同path不同行为)") lines.append("") if path_conflicts: for pc in path_conflicts: lines.append(f"- **Path**: {' > '.join(pc['path'])}") lines.append(f" - 规则: {', '.join(pc['rule_ids'])}") lines.append(f" - 不同行为数: {pc['distinct_behaviors']}") lines.append(f" - 建议: {pc['suggestion']}") lines.append("") else: lines.append("未检测到路径冲突。") lines.append("") # ---- Section 5: Auto-Complete Summary ---- lines.append("## 5. 自动补全摘要") lines.append("") if autocomplete_count > 0: lines.append(f"- 自动补全片段数: {autocomplete_count}") lines.append( f"- 补全后路径覆盖率: " f"{path_stats.get('coverage_pct', 0)}%" ) lines.append(f"- 自动生成的规则已合并到最终规则集中") else: lines.append("- 未执行自动补全(所有路径已被手动覆盖,或未运行 step2.5)") lines.append("") # ---- Section 6: Rule Manifest ---- lines.append("## 6. 规则清单") lines.append("") lines.append("| rule_id | Priority | Path | 简述 |") lines.append("|---------|----------|------|------|") for rule in rules: rid = rule.get("rule_id", "?") pri = rule.get("priority", "?") path_str = " > ".join(rule.get("path", [])) desc = rule.get("description", "")[:60] lines.append(f"| {rid} | {pri} | {path_str} | {desc} |") lines.append("") return "\n".join(lines) def _extract_config_defaults(doc: dict, semantic_index: dict) -> dict: """Extract configuration defaults (e.g. switch default states) from document. Scans table text for patterns like "默认开启"/"默认关闭" and checks semantic_index concepts for "默认" keywords. """ defaults = {} # Scan document tables for default config values for section in doc.get("sections", []): for block in section.get("blocks", []): if block["type"] != "table": continue for row in block.get("rows", []): row_texts = [] for col in row.get("columns", []): row_texts.append(f"{col.get('name','')}: {col.get('text','')}") combined = " ".join(row_texts) if "行车娱乐限制开关" in combined or "开关" in combined: if "默认开启" in combined or "默认状态:开启" in combined: defaults["行车娱乐限制开关"] = { "default": "开启", "section": section.get("source", "").split()[0] if section.get("source") else "", } elif "默认关闭" in combined or "默认状态:关闭" in combined: defaults["行车娱乐限制开关"] = { "default": "关闭", "section": section.get("source", "").split()[0] if section.get("source") else "", } # Supplement from semantic_index concepts for concept in semantic_index.get("concepts", []): name = concept.get("name", "") if "默认" in name: if "开启" in name: defaults.setdefault("行车娱乐限制开关", {})["default"] = "开启" elif "关闭" in name: defaults.setdefault("行车娱乐限制开关", {})["default"] = "关闭" return defaults def _detect_path_conflicts(rules: list[dict]) -> list[dict]: """Detect rules that share the same path triplet but have different behaviors. Returns list of conflict items for the audit report. """ from collections import defaultdict path_groups = defaultdict(list) for rule in rules: path_key = tuple(rule.get("path", [])) path_groups[path_key].append(rule) conflicts = [] for path_key, group in path_groups.items(): if len(group) <= 1: continue # Check if rules in the same path have different trigger/action signatures signatures = set() for r in group: trigger = r.get("trigger", {}) actions = tuple( a.get("description", "") for a in r.get("actions", []) ) sig = ( tuple(sorted( (c.get("signal",""), c.get("operator",""), str(c.get("value",""))) for c in trigger.get("conditions", []) )), actions, ) signatures.add(sig) if len(signatures) > 1: # Same path, different behaviors → potential organization issue conflicts.append({ "status": "WARN", "type": "path_collision", "path": list(path_key), "rule_ids": [r["rule_id"] for r in group], "count": len(group), "distinct_behaviors": len(signatures), "suggestion": "多条规则共享相同path但行为不同,考虑拆分path或使用更细粒度的叶子路径", }) return conflicts # ---- Main ---- def main(): print("=" * 60) print("阶段三:确定性合并、一致性校验与完整性审计") print("=" * 60) # 1. Load inputs print(f"\n[1/7] 加载输入...") fragments = load_fragments() autocomplete_fragments = load_autocomplete_fragments() doc = config.load_input_document() semantic_index = load_semantic_index() path_enum = load_path_enumeration() total_fragments = len(fragments) if total_fragments == 0 and not autocomplete_fragments: print("错误: 无 IR 片段可合并 (fragments 和 autocomplete_fragments 均为空)。") print(" 请检查 step2_ir_extraction 是否正确运行。") print(" 可能原因: step1 未生成 function_units,或 step2 提取失败。") sys.exit(1) feature_name = semantic_index.get("feature_name", "行车娱乐限制") feature_id = "DRL-001" print(f" 功能: {feature_name} ({feature_id})") print(f" 主片段: {total_fragments}") if autocomplete_fragments: print(f" 自动补全片段: {len(autocomplete_fragments)}") # 2. Merge rules print(f"\n[2/7] 合并去重...") merged_rules = merge_rules(fragments, autocomplete_fragments) # 2.5 Normalize rules (fix missing triggers, null operators) merged_rules = [_normalize_rule(r) for r in merged_rules] print(f" 标准化: {len(merged_rules)} 条规则") # 3. Reassign rule IDs print(f"\n[3/7] 重分配 rule_id (层次化格式)...") final_rules = assign_rule_ids(merged_rules, feature_id) print(f" 已分配 {len(final_rules)} 个稳定 ID") # Show ID examples if final_rules: sample_ids = [r["rule_id"] for r in final_rules[:3]] print(f" 示例: {sample_ids}") # 4. Consistency checks print(f"\n[4/7] 一致性扫描...") consistency_results = _check_naming_consistency(final_rules) n_warns = sum(1 for r in consistency_results if r["status"] == WARN) if n_warns: print(f" {WARN} {n_warns} 个术语不一致问题") else: print(f" {PASS} 术语统一") contradictions = _detect_contradictions(final_rules) resolved, unresolved = _auto_resolve_contradictions(contradictions, doc) if resolved: print(f" {PASS} 自动解决 {len(resolved)} 个矛盾") if unresolved: print(f" {WARN} {len(unresolved)} 个矛盾需要人工确认") for c in unresolved: print(f" - {c['rule_a']} vs {c['rule_b']}: {c['conflict_point'][:80]}") if not contradictions: print(f" {PASS} 未检测到规则矛盾") path_conflicts = _detect_path_conflicts(final_rules) if path_conflicts: print(f" {WARN} {len(path_conflicts)} 个 path 冲突(同path不同行为)") else: print(f" {PASS} 无 path 冲突") # 5. Generate audit report print(f"\n[5/7] 生成审计报告...") path_results, path_stats = audit_path_coverage(doc, final_rules) enum_results = audit_table_enums(final_rules, doc) switch_results = audit_switch_coverage(final_rules) report = generate_audit_report( final_rules, doc, feature_name, path_results, path_stats, enum_results, switch_results, consistency_results, contradictions, unresolved, len(autocomplete_fragments), path_conflicts, ) # 6. Extract config defaults from document config_defaults = _extract_config_defaults(doc, semantic_index) if config_defaults: print(f" 配置默认值: {list(config_defaults.keys())}") # 7. Save outputs print(f"\n[7/7] 保存输出...") ir_final = { "feature": feature_name, "feature_id": feature_id, "rules": final_rules, } if config_defaults: ir_final["config_defaults"] = config_defaults config.save_json(ir_final, config.IR_FINAL_JSON) print(f" IR: {config.IR_FINAL_JSON}") with open(config.IR_AUDIT_REPORT_MD, "w", encoding="utf-8") as f: f.write(report) print(f" 审计报告: {config.IR_AUDIT_REPORT_MD}") # Summary print(f"\n完成!") issue_count = ( sum(1 for r in path_results + enum_results + switch_results if r["status"] in (WARN, FAIL)) + n_warns + len(unresolved) + len(path_conflicts) ) print(f" 规则: {len(final_rules)} 条") print(f" 路径覆盖: {path_stats.get('coverage_pct', 0)}%") print(f" 审计问题: {issue_count} 个需要关注") if issue_count > 0: print(f"\n 请查看 {config.IR_AUDIT_REPORT_MD} 并审查标记项。") if __name__ == "__main__": main()