Files
document_analyzer/skills/ir_generation_skill/step2_5_branch_coverage.py
T
pzhang_zywl fec4c09ee0
CI / test (push) Successful in 8s
sync: update all skills from latest workspace code
doc_parser_skill:
- New: verify_flowchart.py (flowchart validation)
- Updated: LLM.py (multi-provider: DeepSeek + DashScope)
- Updated: image_parser.py (logic tree support, external prompts)
- Updated: SKILL.md, prompts/image_prompt.md

conflict_detection_skill:
- Updated: LLM.py (multi-provider sync)
- Updated: detect_conflicts.py (logic tree text conversion)

ir_generation_skill:
- Replaced old scripts/LLM.py + ir_generator.py with standalone project
- New: main.py, config.py, step1-3_*.py, ensemble_merge.py
- New: prompts/, tests/ subdirectories

tests:
- New: acceptance/ test suite with schema validation
- Fixed: conftest no longer globally skips non-acceptance tests
- Updated: test_sample.py for new ir_generation structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-30 22:45:08 +08:00

400 lines
12 KiB
Python

"""
Stage 2.5: Branch Coverage Auto-Completion.
1. Enumerates all root-to-leaf paths in every logic tree
2. Compares paths against existing IR rules to find uncovered paths
3. Generates synthetic function_units for uncovered paths
4. Calls LLM (same extract_rules_for_unit) to produce rules for synthetic units
5. Iterates up to MAX_RETRIES_PER_STAGE rounds to reach COVERAGE_TARGET
Outputs:
- output/path_enumeration.json
- output/ir_autocomplete_fragments.json
"""
import concurrent.futures
import json
import time
from pathlib import Path
import config
# ---- Path Enumeration (shared with step1, duplicated for module independence) ----
def enumerate_all_paths(doc: dict) -> dict[str, list[dict]]:
"""Enumerate all root-to-leaf paths for every logic tree."""
from step1_semantic_index import enumerate_all_paths as _enum
return _enum(doc)
# ---- Coverage Analysis ----
def find_referenced_path_ids(rules: list[dict]) -> dict[str, set[str]]:
"""Map each rule to the set of logic tree nodes it references.
Returns {rule_id: set of "image_id:node_id" pairs}
"""
result = {}
for rule in rules:
rid = rule.get("rule_id", "?")
refs = set()
for src in rule.get("sources", []):
if src.get("type") == "logic_tree":
image_id = src.get("image_id", "")
for nid in src.get("node_ids", []):
refs.add(f"{image_id}:{nid}")
result[rid] = refs
return result
def compute_path_coverage(
all_paths: dict[str, list[dict]], rules: list[dict]
) -> tuple[list[dict], list[dict], dict]:
"""Compute coverage of enumerated paths by existing rules.
Returns (covered_paths, uncovered_paths, stats).
A path is "covered" if at least one rule's node_ids form a superset
of the path's decision+action nodes for that image.
"""
# Build per-rule node sets keyed by image_id
rule_node_sets = {} # {rule_id: {image_id: set(node_ids)}}
for rule in rules:
rid = rule.get("rule_id", "?")
rule_node_sets[rid] = {}
for src in rule.get("sources", []):
if src.get("type") == "logic_tree":
image_id = src.get("image_id", "")
rule_node_sets[rid].setdefault(image_id, set()).update(
src.get("node_ids", [])
)
covered = []
uncovered = []
for image_id, paths in all_paths.items():
for path in paths:
# Get checkable nodes for this path (decision + action)
checkable = set(
n["id"] for n in path["nodes"]
if n["type"] in ("decision", "action")
)
if not checkable:
# Path with no decision/action nodes — trivially covered
covered.append(path)
continue
path_covered = False
for rid, img_sets in rule_node_sets.items():
rule_nodes = img_sets.get(image_id, set())
if checkable.issubset(rule_nodes):
path_covered = True
break
if path_covered:
covered.append(path)
else:
uncovered.append(path)
total = len(covered) + len(uncovered)
stats = {
"total_paths": total,
"covered_paths": len(covered),
"uncovered_paths": len(uncovered),
"coverage_pct": round(len(covered) / total * 100, 1) if total > 0 else 100.0,
}
return covered, uncovered, stats
# ---- Synthetic Function Unit Generation ----
def generate_synthetic_unit(path: dict, unit_seq: int) -> dict:
"""Create a synthetic function_unit from an uncovered logic tree path.
Infers preconditions and trigger from the decision nodes along the path.
"""
node_map = {n["id"]: n for n in path["nodes"]}
# Infer switch state from path
switch = _infer_switch_state(path)
# Infer app_type from path
app_type = _infer_app_type(path)
# Infer app_state from path
app_state = _infer_app_state(path)
# Infer geographic_scope from section context
scope = _infer_scope(path)
# Build description from path meaning
description = f"自动补全: {path.get('meaning', '')}"
if switch:
description = f"开关{switch}, {description}"
# Build path list
path_labels = []
if scope:
path_labels.append(scope)
if switch:
path_labels.append(f"开关{switch}")
if app_type:
path_labels.append(app_type)
if app_state:
path_labels.append(app_state)
# Add behavior from terminal action
action_nodes = path.get("action_nodes", [])
if action_nodes:
last_action = action_nodes[-1].get("label", "")
path_labels.append(last_action[:20])
unit_id = f"FU-AUTO-{path['image_id']}-{unit_seq:03d}"
seq = f"{unit_seq:03d}"
return {
"unit_id": unit_id,
"name": f"自动补全-{path.get('meaning', '')[:60]}",
"description": description,
"path": path_labels,
"auto_generated": True,
"sources": [
{
"section": "",
"type": "logic_tree",
"image_id": path["image_id"],
"logic_tree_nodes": path.get("node_ids", []),
}
],
}
def _infer_switch_state(path: dict) -> str:
"""Infer switch state from decision nodes in path."""
for n in path["nodes"]:
label = n.get("label", "")
branch = n.get("branch_taken", "")
if "开关" in label and n["type"] == "decision":
if branch == "开启":
return "开启"
elif branch == "关闭":
return "关闭"
return ""
def _infer_app_type(path: dict) -> str:
"""Infer app type from state nodes in path."""
type_map = {
"其他应用": "其他应用",
"SDK限制": "SDK限制",
"通过接入SDK限制的应用": "SDK限制",
"系统限制": "系统限制",
"通过系统限制应用": "系统限制",
}
for n in path["nodes"]:
if n["type"] == "state":
for key, val in type_map.items():
if key in n.get("label", ""):
return val
return ""
def _infer_app_state(path: dict) -> str:
"""Infer app state (前台/后台) from decision nodes."""
for n in path["nodes"]:
label = n.get("label", "")
branch = n.get("branch_taken", "")
if "前台" in label:
if branch == "":
return "前台"
elif branch == "":
return "后台"
return ""
def _infer_scope(path: dict) -> str:
"""Infer geographic scope. Defaults to 国内."""
return "国内"
# ---- LLM Extraction for Synthetic Units ----
def extract_rules_for_synthetic_units(
synthetic_units: list[dict], doc: dict, max_retries: int | None = None
) -> list[dict]:
"""Extract IR rules for synthetic function_units using step2's LLM logic."""
from step2_ir_extraction import (
build_document_lookup,
extract_context_package,
extract_rules_for_unit,
)
if max_retries is None:
max_retries = config.MAX_RETRIES_PER_STAGE
sections_by_source, image_by_rid, conflicts_by_section = build_document_lookup(doc)
fragments = []
for unit in synthetic_units:
pkg = extract_context_package(
unit, doc, sections_by_source, image_by_rid, conflicts_by_section
)
# Enrich pkg with unit's own path and description
pkg["unit_path"] = unit.get("path", [])
pkg["unit_description"] = unit.get("description", pkg["unit_description"])
try:
rules = extract_rules_for_unit(pkg, max_retries)
except Exception as e:
rules = []
fragments.append({
"unit_id": unit["unit_id"],
"unit_name": unit.get("name", ""),
"rules": rules,
"auto_generated": True,
})
print(f" {unit['unit_id']}: {len(rules)} 条规则")
return fragments
# ---- Iterative Auto-Completion ----
def run_autocomplete(
all_paths: dict[str, list[dict]],
existing_rules: list[dict],
doc: dict,
) -> tuple[list[dict], dict]:
"""Run iterative auto-completion. Returns (autocomplete_fragments, final_stats)."""
print(f"\n 初始路径覆盖率分析...")
covered, uncovered, stats = compute_path_coverage(all_paths, existing_rules)
print(f" 覆盖: {stats['covered_paths']}/{stats['total_paths']} "
f"({stats['coverage_pct']}%)")
if not uncovered:
print(f" 所有路径已覆盖,无需自动补全")
return [], stats
print(f" 未覆盖路径: {len(uncovered)}")
all_fragments = []
best_stats = stats
for round_n in range(1, config.MAX_RETRIES_PER_STAGE + 1):
if not uncovered:
break
print(f"\n--- 自动补全 第 {round_n} 轮 ---")
print(f"{len(uncovered)} 条未覆盖路径生成合成单元...")
# Generate synthetic units
start_seq = (round_n - 1) * len(uncovered) + 1
synthetic_units = [
generate_synthetic_unit(path, start_seq + i)
for i, path in enumerate(uncovered)
]
# Extract rules via LLM
max_llm_workers = min(2, len(synthetic_units))
if len(synthetic_units) <= 1:
fragments = extract_rules_for_synthetic_units(synthetic_units, doc)
else:
# Sequential to avoid flooding the API
fragments = extract_rules_for_synthetic_units(synthetic_units, doc)
all_fragments.extend(fragments)
# Re-compute coverage
all_rules = existing_rules + [
rule for f in fragments for rule in f.get("rules", [])
]
covered, uncovered, stats = compute_path_coverage(all_paths, all_rules)
print(f"{round_n} 轮后覆盖: {stats['covered_paths']}/{stats['total_paths']} "
f"({stats['coverage_pct']}%)")
if stats["coverage_pct"] > best_stats["coverage_pct"]:
best_stats = stats
if stats["coverage_pct"] >= config.COVERAGE_TARGET * 100:
print(f" 达到目标覆盖率 {config.COVERAGE_TARGET:.0%},停止")
break
# If coverage didn't improve, try a different approach next round
uncovered_decision_nodes = set()
for p in uncovered:
for n in p.get("decision_nodes", []):
uncovered_decision_nodes.add(n.get("label", ""))
if not uncovered_decision_nodes:
print(f" 无更多可补全路径,停止")
break
return all_fragments, best_stats
# ---- Main ----
def main():
print("=" * 60)
print("阶段 2.5:分支覆盖自动补全")
print("=" * 60)
# 1. Load inputs
print(f"\n[1/5] 加载输入...")
doc = config.load_input_document()
fragments = config.load_json(config.IR_FRAGMENTS_JSON)
all_rules = []
for f in fragments:
all_rules.extend(f.get("rules", []))
print(f" 已有规则: {len(all_rules)}")
# 2. Enumerate paths
print(f"\n[2/5] 枚举逻辑树路径...")
all_paths = enumerate_all_paths(doc)
total_paths = sum(len(v) for v in all_paths.values())
print(f"{total_paths} 条路径")
# Save path enumeration for downstream audit
path_enum_data = {
"logic_tree_paths": {
k: [{kk: vv for kk, vv in p.items() if kk != "nodes"} for p in v]
for k, v in all_paths.items()
},
"total_paths": total_paths,
}
config.save_json(path_enum_data, config.PATH_ENUM_JSON)
# 3. Run auto-completion
print(f"\n[3/5] 运行自动补全...")
autocomplete_fragments, final_stats = run_autocomplete(
all_paths, all_rules, doc
)
# 4. Save
print(f"\n[4/5] 保存自动补全片段...")
config.save_json(
autocomplete_fragments, config.IR_AUTOCOMPLETE_FRAGMENTS_JSON
)
print(f" 输出: {config.IR_AUTOCOMPLETE_FRAGMENTS_JSON}")
print(f" 生成 {len(autocomplete_fragments)} 个补全片段")
# 5. Summary
print(f"\n[5/5] 完成!")
print(f" 最终路径覆盖: {final_stats['covered_paths']}/{final_stats['total_paths']} "
f"({final_stats['coverage_pct']}%)")
if final_stats["coverage_pct"] < config.COVERAGE_TARGET * 100:
remaining = final_stats["total_paths"] - final_stats["covered_paths"]
print(f" WARN: {remaining} 条路径仍未覆盖,将在审计报告中列出")
if __name__ == "__main__":
main()