""" Stage 2.5: Branch Coverage Auto-Completion. 1. Enumerates all root-to-leaf paths in every logic tree 2. Compares paths against existing IR rules to find uncovered paths 3. Generates synthetic function_units for uncovered paths 4. Calls LLM (same extract_rules_for_unit) to produce rules for synthetic units 5. Iterates up to MAX_RETRIES_PER_STAGE rounds to reach COVERAGE_TARGET Outputs: - output/path_enumeration.json - output/ir_autocomplete_fragments.json """ import concurrent.futures import json import time from pathlib import Path import config # ---- Path Enumeration (shared with step1, duplicated for module independence) ---- def enumerate_all_paths(doc: dict) -> dict[str, list[dict]]: """Enumerate all root-to-leaf paths for every logic tree.""" from step1_semantic_index import enumerate_all_paths as _enum return _enum(doc) # ---- Coverage Analysis ---- def find_referenced_path_ids(rules: list[dict]) -> dict[str, set[str]]: """Map each rule to the set of logic tree nodes it references. Returns {rule_id: set of "image_id:node_id" pairs} """ result = {} for rule in rules: rid = rule.get("rule_id", "?") refs = set() for src in rule.get("sources", []): if src.get("type") == "logic_tree": image_id = src.get("image_id", "") for nid in src.get("node_ids", []): refs.add(f"{image_id}:{nid}") result[rid] = refs return result def compute_path_coverage( all_paths: dict[str, list[dict]], rules: list[dict] ) -> tuple[list[dict], list[dict], dict]: """Compute coverage of enumerated paths by existing rules. Returns (covered_paths, uncovered_paths, stats). A path is "covered" if at least one rule's node_ids form a superset of the path's decision+action nodes for that image. """ # Build per-rule node sets keyed by image_id rule_node_sets = {} # {rule_id: {image_id: set(node_ids)}} for rule in rules: rid = rule.get("rule_id", "?") rule_node_sets[rid] = {} for src in rule.get("sources", []): if src.get("type") == "logic_tree": image_id = src.get("image_id", "") rule_node_sets[rid].setdefault(image_id, set()).update( src.get("node_ids", []) ) covered = [] uncovered = [] for image_id, paths in all_paths.items(): for path in paths: # Get checkable nodes for this path (decision + action) checkable = set( n["id"] for n in path["nodes"] if n["type"] in ("decision", "action") ) if not checkable: # Path with no decision/action nodes — trivially covered covered.append(path) continue path_covered = False for rid, img_sets in rule_node_sets.items(): rule_nodes = img_sets.get(image_id, set()) if checkable.issubset(rule_nodes): path_covered = True break if path_covered: covered.append(path) else: uncovered.append(path) total = len(covered) + len(uncovered) stats = { "total_paths": total, "covered_paths": len(covered), "uncovered_paths": len(uncovered), "coverage_pct": round(len(covered) / total * 100, 1) if total > 0 else 100.0, } return covered, uncovered, stats # ---- Synthetic Function Unit Generation ---- def generate_synthetic_unit(path: dict, unit_seq: int) -> dict: """Create a synthetic function_unit from an uncovered logic tree path. Infers preconditions and trigger from the decision nodes along the path. """ node_map = {n["id"]: n for n in path["nodes"]} # Infer switch state from path switch = _infer_switch_state(path) # Infer app_type from path app_type = _infer_app_type(path) # Infer app_state from path app_state = _infer_app_state(path) # Infer geographic_scope from section context scope = _infer_scope(path) # Build description from path meaning description = f"自动补全: {path.get('meaning', '')}" if switch: description = f"开关{switch}, {description}" # Build path list path_labels = [] if scope: path_labels.append(scope) if switch: path_labels.append(f"开关{switch}") if app_type: path_labels.append(app_type) if app_state: path_labels.append(app_state) # Add behavior from terminal action action_nodes = path.get("action_nodes", []) if action_nodes: last_action = action_nodes[-1].get("label", "") path_labels.append(last_action[:20]) unit_id = f"FU-AUTO-{path['image_id']}-{unit_seq:03d}" seq = f"{unit_seq:03d}" return { "unit_id": unit_id, "name": f"自动补全-{path.get('meaning', '')[:60]}", "description": description, "path": path_labels, "auto_generated": True, "sources": [ { "section": "", "type": "logic_tree", "image_id": path["image_id"], "logic_tree_nodes": path.get("node_ids", []), } ], } def _infer_switch_state(path: dict) -> str: """Infer switch state from decision nodes in path.""" for n in path["nodes"]: label = n.get("label", "") branch = n.get("branch_taken", "") if "开关" in label and n["type"] == "decision": if branch == "开启": return "开启" elif branch == "关闭": return "关闭" return "" def _infer_app_type(path: dict) -> str: """Infer app type from state nodes in path.""" type_map = { "其他应用": "其他应用", "SDK限制": "SDK限制", "通过接入SDK限制的应用": "SDK限制", "系统限制": "系统限制", "通过系统限制应用": "系统限制", } for n in path["nodes"]: if n["type"] == "state": for key, val in type_map.items(): if key in n.get("label", ""): return val return "" def _infer_app_state(path: dict) -> str: """Infer app state (前台/后台) from decision nodes.""" for n in path["nodes"]: label = n.get("label", "") branch = n.get("branch_taken", "") if "前台" in label: if branch == "是": return "前台" elif branch == "否": return "后台" return "" def _infer_scope(path: dict) -> str: """Infer geographic scope. Defaults to 国内.""" return "国内" # ---- LLM Extraction for Synthetic Units ---- def extract_rules_for_synthetic_units( synthetic_units: list[dict], doc: dict, max_retries: int | None = None ) -> list[dict]: """Extract IR rules for synthetic function_units using step2's LLM logic.""" from step2_ir_extraction import ( build_document_lookup, extract_context_package, extract_rules_for_unit, ) if max_retries is None: max_retries = config.MAX_RETRIES_PER_STAGE sections_by_source, image_by_rid, conflicts_by_section = build_document_lookup(doc) fragments = [] for unit in synthetic_units: pkg = extract_context_package( unit, doc, sections_by_source, image_by_rid, conflicts_by_section ) # Enrich pkg with unit's own path and description pkg["unit_path"] = unit.get("path", []) pkg["unit_description"] = unit.get("description", pkg["unit_description"]) try: rules = extract_rules_for_unit(pkg, max_retries) except Exception as e: rules = [] fragments.append({ "unit_id": unit["unit_id"], "unit_name": unit.get("name", ""), "rules": rules, "auto_generated": True, }) print(f" {unit['unit_id']}: {len(rules)} 条规则") return fragments # ---- Iterative Auto-Completion ---- def run_autocomplete( all_paths: dict[str, list[dict]], existing_rules: list[dict], doc: dict, ) -> tuple[list[dict], dict]: """Run iterative auto-completion. Returns (autocomplete_fragments, final_stats).""" print(f"\n 初始路径覆盖率分析...") covered, uncovered, stats = compute_path_coverage(all_paths, existing_rules) print(f" 覆盖: {stats['covered_paths']}/{stats['total_paths']} " f"({stats['coverage_pct']}%)") if not uncovered: print(f" 所有路径已覆盖,无需自动补全") return [], stats print(f" 未覆盖路径: {len(uncovered)} 条") all_fragments = [] best_stats = stats for round_n in range(1, config.MAX_RETRIES_PER_STAGE + 1): if not uncovered: break print(f"\n--- 自动补全 第 {round_n} 轮 ---") print(f" 为 {len(uncovered)} 条未覆盖路径生成合成单元...") # Generate synthetic units start_seq = (round_n - 1) * len(uncovered) + 1 synthetic_units = [ generate_synthetic_unit(path, start_seq + i) for i, path in enumerate(uncovered) ] # Extract rules via LLM max_llm_workers = min(2, len(synthetic_units)) if len(synthetic_units) <= 1: fragments = extract_rules_for_synthetic_units(synthetic_units, doc) else: # Sequential to avoid flooding the API fragments = extract_rules_for_synthetic_units(synthetic_units, doc) all_fragments.extend(fragments) # Re-compute coverage all_rules = existing_rules + [ rule for f in fragments for rule in f.get("rules", []) ] covered, uncovered, stats = compute_path_coverage(all_paths, all_rules) print(f" 第 {round_n} 轮后覆盖: {stats['covered_paths']}/{stats['total_paths']} " f"({stats['coverage_pct']}%)") if stats["coverage_pct"] > best_stats["coverage_pct"]: best_stats = stats if stats["coverage_pct"] >= config.COVERAGE_TARGET * 100: print(f" 达到目标覆盖率 {config.COVERAGE_TARGET:.0%},停止") break # If coverage didn't improve, try a different approach next round uncovered_decision_nodes = set() for p in uncovered: for n in p.get("decision_nodes", []): uncovered_decision_nodes.add(n.get("label", "")) if not uncovered_decision_nodes: print(f" 无更多可补全路径,停止") break return all_fragments, best_stats # ---- Main ---- def main(): print("=" * 60) print("阶段 2.5:分支覆盖自动补全") print("=" * 60) # 1. Load inputs print(f"\n[1/5] 加载输入...") doc = config.load_input_document() fragments = config.load_json(config.IR_FRAGMENTS_JSON) all_rules = [] for f in fragments: all_rules.extend(f.get("rules", [])) print(f" 已有规则: {len(all_rules)} 条") # 2. Enumerate paths print(f"\n[2/5] 枚举逻辑树路径...") all_paths = enumerate_all_paths(doc) total_paths = sum(len(v) for v in all_paths.values()) print(f" 共 {total_paths} 条路径") # Save path enumeration for downstream audit path_enum_data = { "logic_tree_paths": { k: [{kk: vv for kk, vv in p.items() if kk != "nodes"} for p in v] for k, v in all_paths.items() }, "total_paths": total_paths, } config.save_json(path_enum_data, config.PATH_ENUM_JSON) # 3. Run auto-completion print(f"\n[3/5] 运行自动补全...") autocomplete_fragments, final_stats = run_autocomplete( all_paths, all_rules, doc ) # 4. Save print(f"\n[4/5] 保存自动补全片段...") config.save_json( autocomplete_fragments, config.IR_AUTOCOMPLETE_FRAGMENTS_JSON ) print(f" 输出: {config.IR_AUTOCOMPLETE_FRAGMENTS_JSON}") print(f" 生成 {len(autocomplete_fragments)} 个补全片段") # 5. Summary print(f"\n[5/5] 完成!") print(f" 最终路径覆盖: {final_stats['covered_paths']}/{final_stats['total_paths']} " f"({final_stats['coverage_pct']}%)") if final_stats["coverage_pct"] < config.COVERAGE_TARGET * 100: remaining = final_stats["total_paths"] - final_stats["covered_paths"] print(f" WARN: {remaining} 条路径仍未覆盖,将在审计报告中列出") if __name__ == "__main__": main()