fec4c09ee0
CI / test (push) Successful in 8s
doc_parser_skill: - New: verify_flowchart.py (flowchart validation) - Updated: LLM.py (multi-provider: DeepSeek + DashScope) - Updated: image_parser.py (logic tree support, external prompts) - Updated: SKILL.md, prompts/image_prompt.md conflict_detection_skill: - Updated: LLM.py (multi-provider sync) - Updated: detect_conflicts.py (logic tree text conversion) ir_generation_skill: - Replaced old scripts/LLM.py + ir_generator.py with standalone project - New: main.py, config.py, step1-3_*.py, ensemble_merge.py - New: prompts/, tests/ subdirectories tests: - New: acceptance/ test suite with schema validation - Fixed: conftest no longer globally skips non-acceptance tests - Updated: test_sample.py for new ir_generation structure Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
400 lines
12 KiB
Python
400 lines
12 KiB
Python
"""
|
|
Stage 2.5: Branch Coverage Auto-Completion.
|
|
|
|
1. Enumerates all root-to-leaf paths in every logic tree
|
|
2. Compares paths against existing IR rules to find uncovered paths
|
|
3. Generates synthetic function_units for uncovered paths
|
|
4. Calls LLM (same extract_rules_for_unit) to produce rules for synthetic units
|
|
5. Iterates up to MAX_RETRIES_PER_STAGE rounds to reach COVERAGE_TARGET
|
|
|
|
Outputs:
|
|
- output/path_enumeration.json
|
|
- output/ir_autocomplete_fragments.json
|
|
"""
|
|
|
|
import concurrent.futures
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import config
|
|
|
|
|
|
# ---- Path Enumeration (shared with step1, duplicated for module independence) ----
|
|
|
|
|
|
def enumerate_all_paths(doc: dict) -> dict[str, list[dict]]:
|
|
"""Enumerate all root-to-leaf paths for every logic tree."""
|
|
from step1_semantic_index import enumerate_all_paths as _enum
|
|
return _enum(doc)
|
|
|
|
|
|
# ---- Coverage Analysis ----
|
|
|
|
|
|
def find_referenced_path_ids(rules: list[dict]) -> dict[str, set[str]]:
|
|
"""Map each rule to the set of logic tree nodes it references.
|
|
|
|
Returns {rule_id: set of "image_id:node_id" pairs}
|
|
"""
|
|
result = {}
|
|
for rule in rules:
|
|
rid = rule.get("rule_id", "?")
|
|
refs = set()
|
|
for src in rule.get("sources", []):
|
|
if src.get("type") == "logic_tree":
|
|
image_id = src.get("image_id", "")
|
|
for nid in src.get("node_ids", []):
|
|
refs.add(f"{image_id}:{nid}")
|
|
result[rid] = refs
|
|
return result
|
|
|
|
|
|
def compute_path_coverage(
|
|
all_paths: dict[str, list[dict]], rules: list[dict]
|
|
) -> tuple[list[dict], list[dict], dict]:
|
|
"""Compute coverage of enumerated paths by existing rules.
|
|
|
|
Returns (covered_paths, uncovered_paths, stats).
|
|
A path is "covered" if at least one rule's node_ids form a superset
|
|
of the path's decision+action nodes for that image.
|
|
"""
|
|
# Build per-rule node sets keyed by image_id
|
|
rule_node_sets = {} # {rule_id: {image_id: set(node_ids)}}
|
|
for rule in rules:
|
|
rid = rule.get("rule_id", "?")
|
|
rule_node_sets[rid] = {}
|
|
for src in rule.get("sources", []):
|
|
if src.get("type") == "logic_tree":
|
|
image_id = src.get("image_id", "")
|
|
rule_node_sets[rid].setdefault(image_id, set()).update(
|
|
src.get("node_ids", [])
|
|
)
|
|
|
|
covered = []
|
|
uncovered = []
|
|
|
|
for image_id, paths in all_paths.items():
|
|
for path in paths:
|
|
# Get checkable nodes for this path (decision + action)
|
|
checkable = set(
|
|
n["id"] for n in path["nodes"]
|
|
if n["type"] in ("decision", "action")
|
|
)
|
|
if not checkable:
|
|
# Path with no decision/action nodes — trivially covered
|
|
covered.append(path)
|
|
continue
|
|
|
|
path_covered = False
|
|
for rid, img_sets in rule_node_sets.items():
|
|
rule_nodes = img_sets.get(image_id, set())
|
|
if checkable.issubset(rule_nodes):
|
|
path_covered = True
|
|
break
|
|
|
|
if path_covered:
|
|
covered.append(path)
|
|
else:
|
|
uncovered.append(path)
|
|
|
|
total = len(covered) + len(uncovered)
|
|
stats = {
|
|
"total_paths": total,
|
|
"covered_paths": len(covered),
|
|
"uncovered_paths": len(uncovered),
|
|
"coverage_pct": round(len(covered) / total * 100, 1) if total > 0 else 100.0,
|
|
}
|
|
return covered, uncovered, stats
|
|
|
|
|
|
# ---- Synthetic Function Unit Generation ----
|
|
|
|
|
|
def generate_synthetic_unit(path: dict, unit_seq: int) -> dict:
|
|
"""Create a synthetic function_unit from an uncovered logic tree path.
|
|
|
|
Infers preconditions and trigger from the decision nodes along the path.
|
|
"""
|
|
node_map = {n["id"]: n for n in path["nodes"]}
|
|
|
|
# Infer switch state from path
|
|
switch = _infer_switch_state(path)
|
|
|
|
# Infer app_type from path
|
|
app_type = _infer_app_type(path)
|
|
|
|
# Infer app_state from path
|
|
app_state = _infer_app_state(path)
|
|
|
|
# Infer geographic_scope from section context
|
|
scope = _infer_scope(path)
|
|
|
|
# Build description from path meaning
|
|
description = f"自动补全: {path.get('meaning', '')}"
|
|
if switch:
|
|
description = f"开关{switch}, {description}"
|
|
|
|
# Build path list
|
|
path_labels = []
|
|
if scope:
|
|
path_labels.append(scope)
|
|
if switch:
|
|
path_labels.append(f"开关{switch}")
|
|
if app_type:
|
|
path_labels.append(app_type)
|
|
if app_state:
|
|
path_labels.append(app_state)
|
|
# Add behavior from terminal action
|
|
action_nodes = path.get("action_nodes", [])
|
|
if action_nodes:
|
|
last_action = action_nodes[-1].get("label", "")
|
|
path_labels.append(last_action[:20])
|
|
|
|
unit_id = f"FU-AUTO-{path['image_id']}-{unit_seq:03d}"
|
|
seq = f"{unit_seq:03d}"
|
|
|
|
return {
|
|
"unit_id": unit_id,
|
|
"name": f"自动补全-{path.get('meaning', '')[:60]}",
|
|
"description": description,
|
|
"path": path_labels,
|
|
"auto_generated": True,
|
|
"sources": [
|
|
{
|
|
"section": "",
|
|
"type": "logic_tree",
|
|
"image_id": path["image_id"],
|
|
"logic_tree_nodes": path.get("node_ids", []),
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def _infer_switch_state(path: dict) -> str:
|
|
"""Infer switch state from decision nodes in path."""
|
|
for n in path["nodes"]:
|
|
label = n.get("label", "")
|
|
branch = n.get("branch_taken", "")
|
|
if "开关" in label and n["type"] == "decision":
|
|
if branch == "开启":
|
|
return "开启"
|
|
elif branch == "关闭":
|
|
return "关闭"
|
|
return ""
|
|
|
|
|
|
def _infer_app_type(path: dict) -> str:
|
|
"""Infer app type from state nodes in path."""
|
|
type_map = {
|
|
"其他应用": "其他应用",
|
|
"SDK限制": "SDK限制",
|
|
"通过接入SDK限制的应用": "SDK限制",
|
|
"系统限制": "系统限制",
|
|
"通过系统限制应用": "系统限制",
|
|
}
|
|
for n in path["nodes"]:
|
|
if n["type"] == "state":
|
|
for key, val in type_map.items():
|
|
if key in n.get("label", ""):
|
|
return val
|
|
return ""
|
|
|
|
|
|
def _infer_app_state(path: dict) -> str:
|
|
"""Infer app state (前台/后台) from decision nodes."""
|
|
for n in path["nodes"]:
|
|
label = n.get("label", "")
|
|
branch = n.get("branch_taken", "")
|
|
if "前台" in label:
|
|
if branch == "是":
|
|
return "前台"
|
|
elif branch == "否":
|
|
return "后台"
|
|
return ""
|
|
|
|
|
|
def _infer_scope(path: dict) -> str:
|
|
"""Infer geographic scope. Defaults to 国内."""
|
|
return "国内"
|
|
|
|
|
|
# ---- LLM Extraction for Synthetic Units ----
|
|
|
|
|
|
def extract_rules_for_synthetic_units(
|
|
synthetic_units: list[dict], doc: dict, max_retries: int | None = None
|
|
) -> list[dict]:
|
|
"""Extract IR rules for synthetic function_units using step2's LLM logic."""
|
|
from step2_ir_extraction import (
|
|
build_document_lookup,
|
|
extract_context_package,
|
|
extract_rules_for_unit,
|
|
)
|
|
|
|
if max_retries is None:
|
|
max_retries = config.MAX_RETRIES_PER_STAGE
|
|
|
|
sections_by_source, image_by_rid, conflicts_by_section = build_document_lookup(doc)
|
|
|
|
fragments = []
|
|
for unit in synthetic_units:
|
|
pkg = extract_context_package(
|
|
unit, doc, sections_by_source, image_by_rid, conflicts_by_section
|
|
)
|
|
# Enrich pkg with unit's own path and description
|
|
pkg["unit_path"] = unit.get("path", [])
|
|
pkg["unit_description"] = unit.get("description", pkg["unit_description"])
|
|
|
|
try:
|
|
rules = extract_rules_for_unit(pkg, max_retries)
|
|
except Exception as e:
|
|
rules = []
|
|
|
|
fragments.append({
|
|
"unit_id": unit["unit_id"],
|
|
"unit_name": unit.get("name", ""),
|
|
"rules": rules,
|
|
"auto_generated": True,
|
|
})
|
|
print(f" {unit['unit_id']}: {len(rules)} 条规则")
|
|
|
|
return fragments
|
|
|
|
|
|
# ---- Iterative Auto-Completion ----
|
|
|
|
|
|
def run_autocomplete(
|
|
all_paths: dict[str, list[dict]],
|
|
existing_rules: list[dict],
|
|
doc: dict,
|
|
) -> tuple[list[dict], dict]:
|
|
"""Run iterative auto-completion. Returns (autocomplete_fragments, final_stats)."""
|
|
print(f"\n 初始路径覆盖率分析...")
|
|
covered, uncovered, stats = compute_path_coverage(all_paths, existing_rules)
|
|
print(f" 覆盖: {stats['covered_paths']}/{stats['total_paths']} "
|
|
f"({stats['coverage_pct']}%)")
|
|
|
|
if not uncovered:
|
|
print(f" 所有路径已覆盖,无需自动补全")
|
|
return [], stats
|
|
|
|
print(f" 未覆盖路径: {len(uncovered)} 条")
|
|
|
|
all_fragments = []
|
|
best_stats = stats
|
|
|
|
for round_n in range(1, config.MAX_RETRIES_PER_STAGE + 1):
|
|
if not uncovered:
|
|
break
|
|
|
|
print(f"\n--- 自动补全 第 {round_n} 轮 ---")
|
|
print(f" 为 {len(uncovered)} 条未覆盖路径生成合成单元...")
|
|
|
|
# Generate synthetic units
|
|
start_seq = (round_n - 1) * len(uncovered) + 1
|
|
synthetic_units = [
|
|
generate_synthetic_unit(path, start_seq + i)
|
|
for i, path in enumerate(uncovered)
|
|
]
|
|
|
|
# Extract rules via LLM
|
|
max_llm_workers = min(2, len(synthetic_units))
|
|
if len(synthetic_units) <= 1:
|
|
fragments = extract_rules_for_synthetic_units(synthetic_units, doc)
|
|
else:
|
|
# Sequential to avoid flooding the API
|
|
fragments = extract_rules_for_synthetic_units(synthetic_units, doc)
|
|
|
|
all_fragments.extend(fragments)
|
|
|
|
# Re-compute coverage
|
|
all_rules = existing_rules + [
|
|
rule for f in fragments for rule in f.get("rules", [])
|
|
]
|
|
covered, uncovered, stats = compute_path_coverage(all_paths, all_rules)
|
|
print(f" 第 {round_n} 轮后覆盖: {stats['covered_paths']}/{stats['total_paths']} "
|
|
f"({stats['coverage_pct']}%)")
|
|
|
|
if stats["coverage_pct"] > best_stats["coverage_pct"]:
|
|
best_stats = stats
|
|
|
|
if stats["coverage_pct"] >= config.COVERAGE_TARGET * 100:
|
|
print(f" 达到目标覆盖率 {config.COVERAGE_TARGET:.0%},停止")
|
|
break
|
|
|
|
# If coverage didn't improve, try a different approach next round
|
|
uncovered_decision_nodes = set()
|
|
for p in uncovered:
|
|
for n in p.get("decision_nodes", []):
|
|
uncovered_decision_nodes.add(n.get("label", ""))
|
|
if not uncovered_decision_nodes:
|
|
print(f" 无更多可补全路径,停止")
|
|
break
|
|
|
|
return all_fragments, best_stats
|
|
|
|
|
|
# ---- Main ----
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("阶段 2.5:分支覆盖自动补全")
|
|
print("=" * 60)
|
|
|
|
# 1. Load inputs
|
|
print(f"\n[1/5] 加载输入...")
|
|
doc = config.load_input_document()
|
|
fragments = config.load_json(config.IR_FRAGMENTS_JSON)
|
|
|
|
all_rules = []
|
|
for f in fragments:
|
|
all_rules.extend(f.get("rules", []))
|
|
|
|
print(f" 已有规则: {len(all_rules)} 条")
|
|
|
|
# 2. Enumerate paths
|
|
print(f"\n[2/5] 枚举逻辑树路径...")
|
|
all_paths = enumerate_all_paths(doc)
|
|
total_paths = sum(len(v) for v in all_paths.values())
|
|
print(f" 共 {total_paths} 条路径")
|
|
|
|
# Save path enumeration for downstream audit
|
|
path_enum_data = {
|
|
"logic_tree_paths": {
|
|
k: [{kk: vv for kk, vv in p.items() if kk != "nodes"} for p in v]
|
|
for k, v in all_paths.items()
|
|
},
|
|
"total_paths": total_paths,
|
|
}
|
|
config.save_json(path_enum_data, config.PATH_ENUM_JSON)
|
|
|
|
# 3. Run auto-completion
|
|
print(f"\n[3/5] 运行自动补全...")
|
|
autocomplete_fragments, final_stats = run_autocomplete(
|
|
all_paths, all_rules, doc
|
|
)
|
|
|
|
# 4. Save
|
|
print(f"\n[4/5] 保存自动补全片段...")
|
|
config.save_json(
|
|
autocomplete_fragments, config.IR_AUTOCOMPLETE_FRAGMENTS_JSON
|
|
)
|
|
print(f" 输出: {config.IR_AUTOCOMPLETE_FRAGMENTS_JSON}")
|
|
print(f" 生成 {len(autocomplete_fragments)} 个补全片段")
|
|
|
|
# 5. Summary
|
|
print(f"\n[5/5] 完成!")
|
|
print(f" 最终路径覆盖: {final_stats['covered_paths']}/{final_stats['total_paths']} "
|
|
f"({final_stats['coverage_pct']}%)")
|
|
|
|
if final_stats["coverage_pct"] < config.COVERAGE_TARGET * 100:
|
|
remaining = final_stats["total_paths"] - final_stats["covered_paths"]
|
|
print(f" WARN: {remaining} 条路径仍未覆盖,将在审计报告中列出")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|