Files
2026-06-03 14:44:11 +08:00

1215 lines
42 KiB
Python

"""
Stage 3: Deterministic Merge, Consistency Check & Completeness Audit.
- Merges IR rule fragments (including autocomplete), deduplicating by trigger+actions.
- Reassigns stable hierarchical rule_ids.
- Runs consistency checks: naming uniformity, rule contradictions.
- Generates an audit report covering:
1. Path coverage (vs enumerated logic tree paths)
2. Table enumeration coverage
3. Global switch state coverage
4. Consistency scan report
5. Auto-complete summary
6. Final rule manifest
Outputs:
- ir_final.json
- ir_audit_report.md
"""
import hashlib
import json
import sys
from collections import defaultdict
from datetime import datetime
from pathlib import Path
import config
from step2_5_branch_coverage import compute_path_coverage, enumerate_all_paths
PASS = "[PASS]"
WARN = "[WARN]"
FAIL = "[FAIL]"
# ---- Rule ID Generation ----
LABEL_MAP = {
"国内": "DOMESTIC",
"海外": "OVERSEAS",
"系统限制": "SYS",
"SDK限制": "SDK",
"SDK自定义限制": "SDK",
"其他应用": "OTHER",
"行车娱乐限制": "SYS",
"行车娱乐禁止": "SYS",
"前台打断": "FG-INTERRUPT",
"后台限制启动": "BG-BLOCK",
"后台禁止": "BG-BLOCK",
"后台暂停功能": "BG-PAUSE",
"后台允许": "BG-ALLOW",
"无限制": "NO-RESTRICT",
"开关关闭": "SWITCH-OFF",
"开关置灰": "SWITCH-GRAY",
"确认弹窗": "CONFIRM-DLG",
"风险确认弹窗": "CONFIRM-DLG",
}
def _path_to_rule_id_components(path: list[str]) -> tuple[str, str, str]:
"""Extract (scope, method, behavior) from a path array.
Falls back to "UNKNOWN" for unrecognized components.
"""
scope = "UNKNOWN"
method = "UNKNOWN"
behavior = "UNKNOWN"
for segment in path:
mapped = LABEL_MAP.get(segment)
if mapped in ("DOMESTIC", "OVERSEAS"):
scope = mapped
elif mapped in ("SYS", "SDK", "OTHER"):
method = mapped
elif mapped in ("FG-INTERRUPT", "BG-BLOCK", "BG-PAUSE",
"NO-RESTRICT", "SWITCH-OFF"):
behavior = mapped
return scope, method, behavior
# ---- Loading ----
def load_fragments() -> list[dict]:
"""Load IR fragments from Stage 2."""
return config.load_json(config.IR_FRAGMENTS_JSON)
def load_autocomplete_fragments() -> list[dict]:
"""Load auto-complete fragments from Stage 2.5, or return [] if absent."""
path = config.IR_AUTOCOMPLETE_FRAGMENTS_JSON
if not Path(path).exists():
return []
return config.load_json(path)
def load_semantic_index() -> dict:
"""Load merged semantic index from Stage 1."""
return config.load_json(config.SEMANTIC_INDEX_JSON)
def load_path_enumeration() -> dict:
"""Load logic tree path enumeration, or return {} if absent."""
path = config.PATH_ENUM_JSON
if not Path(path).exists():
return {}
data = config.load_json(path)
return data.get("logic_tree_paths", {})
# ---- Rule Merge ----
def rule_signature(rule: dict) -> str:
"""Generate a dedup signature from path + trigger + actions."""
path = rule.get("path", [])
trigger = rule.get("trigger") or {}
actions = rule.get("actions") or []
raw_conditions = trigger.get("conditions") or []
conditions = sorted(
raw_conditions, key=lambda c: (c or {}).get("signal", "")
)
sorted_actions = sorted(actions, key=lambda a: a.get("description", ""))
sig_data = {
"path": path,
"conditions": conditions,
"actions": sorted_actions,
}
sig_json = json.dumps(sig_data, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(sig_json.encode()).hexdigest()[:16]
def _normalize_rule(rule: dict) -> dict:
"""Ensure a rule has all required fields with valid defaults.
Fixes common LLM output issues: missing trigger, null operator, etc.
"""
# Ensure precondition has required fields (defensive against LLM omission)
if "precondition" not in rule:
rule["precondition"] = {}
precond = rule["precondition"]
if precond is None:
rule["precondition"] = {}
precond = rule["precondition"]
if "geographic_scope" not in precond or not precond["geographic_scope"]:
precond["geographic_scope"] = "global"
if "screen_type" not in precond:
precond["screen_type"] = "any"
# Ensure trigger exists
if not rule.get("trigger"):
rule["trigger"] = {}
trigger = rule["trigger"]
# Ensure trigger-level combining operator (AND/OR) for multi-condition triggers
if not trigger.get("operator"):
trigger["operator"] = "AND"
# If trigger has an event, it's event-based (no conditions needed)
if trigger.get("event") is not None:
return rule
# Ensure conditions list exists
if "conditions" not in trigger:
trigger["conditions"] = []
# Fix null operators in individual conditions
for cond in trigger["conditions"]:
if not cond.get("operator"):
cond["operator"] = "=="
if not cond.get("signal"):
cond["signal"] = "unknown"
if "value" not in cond:
cond["value"] = "N/A"
# If still no conditions, add a default one
if not trigger["conditions"]:
trigger["conditions"] = [{
"signal": "system_state",
"operator": "==",
"value": "active"
}]
# Ensure table/text sources have a section field (defensive against LLM omission)
# Also normalize invalid source types (LLM hallucinations like function_unit_description)
sources = rule.get("sources", [])
valid_types = {"table", "text", "logic_tree"}
def _clean_section(val):
"""Normalize section value: list→first element, ensure string."""
if isinstance(val, list):
return str(val[0]).strip() if val else ""
if isinstance(val, str):
return val.strip()
return str(val).strip() if val else ""
# Normalize section fields that might be lists (LLM format instability)
for s in sources:
sec = s.get("section")
if sec is not None:
s["section"] = _clean_section(sec)
# try to infer a default section from the rule path
default_section = ""
for s in sources:
sec = s.get("section", "")
if sec and isinstance(sec, str) and sec.strip():
default_section = sec.strip()
break
if not default_section:
path = rule.get("path", "")
if path:
default_section = path.split(" > ")[0] if " > " in path else path
if sources:
for src in sources:
stype = src.get("type", "")
if stype and stype not in valid_types:
src["type"] = "text"
stype = "text"
if stype == "table":
if not src.get("section"):
src["section"] = default_section
if src.get("row") is None:
src["row"] = 0
elif stype == "text":
if not src.get("section"):
src["section"] = default_section
else:
# Empty sources list — add a minimal text source (defensive against schema failure)
src = {"type": "text", "text_snippet": "inferred from rule context"}
if default_section:
src["section"] = default_section
sources.append(src)
rule["sources"] = sources
return rule
def merge_rules(fragments: list[dict],
autocomplete_fragments: list[dict] | None = None) -> list[dict]:
"""Merge rules across all fragments, deduplicating by trigger+actions.
Includes autocomplete fragments if provided.
"""
all_fragments = list(fragments)
if autocomplete_fragments:
all_fragments.extend(autocomplete_fragments)
signature_map: dict[str, dict] = {}
order = []
for fragment in all_fragments:
for rule in fragment.get("rules", []):
sig = rule_signature(rule)
if sig in signature_map:
existing = signature_map[sig]
existing_sources = existing.setdefault("sources", [])
for src in rule.get("sources", []):
if src not in existing_sources:
existing_sources.append(src)
if len(rule.get("description", "")) > len(
existing.get("description", "")
):
existing["description"] = rule["description"]
# Merge path if present and different
rpath = rule.get("path", [])
epath = existing.get("path", [])
if rpath and not epath:
existing["path"] = rpath
else:
signature_map[sig] = dict(rule)
order.append(sig)
merged = [signature_map[sig] for sig in order]
total_before = sum(len(f.get("rules", [])) for f in all_fragments)
auto_before = sum(
len(f.get("rules", [])) for f in (autocomplete_fragments or [])
)
print(f" 主片段规则: {total_before - auto_before}")
if auto_before:
print(f" 自动补全规则: {auto_before}")
print(f" 合并后: {len(merged)} 条 (去重 {total_before - len(merged)} 条)")
return merged
# ---- Rule ID Assignment ----
def assign_rule_ids(rules: list[dict], feature_id: str = "DRL-001") -> list[dict]:
"""Reassign stable hierarchical rule_ids.
Format: {feature_id}-SCOPE-METHOD-BEHAVIOR-NN
Example: DRL-001-DOMESTIC-SYS-FG-INTERRUPT-01
"""
# Counter per (scope, method, behavior) key
counters: dict[tuple[str, str, str], int] = defaultdict(int)
for rule in rules:
path = rule.get("path", [])
scope, method, behavior = _path_to_rule_id_components(path)
# If path is missing, try to infer from precondition
if scope == "UNKNOWN":
precond = rule.get("precondition", {})
geo = precond.get("geographic_scope", "")
scope = LABEL_MAP.get(geo, "DOMESTIC")
if method == "UNKNOWN":
precond = rule.get("precondition", {})
at = precond.get("app_type", "")
method = LABEL_MAP.get(at, "SYS")
if behavior == "UNKNOWN":
precond = rule.get("precondition", {})
sw = precond.get("switch", "")
if sw == "关闭":
behavior = "SWITCH-OFF"
else:
# Infer from actions
actions = rule.get("actions", [])
action_descs = " ".join(
a.get("description", "") for a in actions
)
if "打断" in action_descs or "前台" in action_descs:
behavior = "FG-INTERRUPT"
elif "限制" in action_descs and "启动" in action_descs:
behavior = "BG-BLOCK"
elif "暂停" in action_descs:
behavior = "BG-PAUSE"
else:
behavior = "NO-RESTRICT"
key = (scope, method, behavior)
counters[key] += 1
seq = counters[key]
rule["rule_id"] = f"{feature_id}-{scope}-{method}-{behavior}-{seq:02d}"
return rules
# ---- Consistency Checks ----
def _check_naming_consistency(rules: list[dict]) -> list[dict]:
"""Check that app_type, app_state, switch values use unified terminology.
Returns a list of inconsistency items.
"""
results = []
# Collect all values for each field
app_types = set()
app_states = set()
switches = set()
geo_scopes = set()
screen_types = set()
for rule in rules:
precond = rule.get("precondition", {})
if precond.get("app_type"):
app_types.add(precond["app_type"])
if precond.get("app_state"):
app_states.add(precond["app_state"])
if precond.get("switch"):
switches.add(precond["switch"])
if precond.get("geographic_scope"):
geo_scopes.add(precond["geographic_scope"])
if precond.get("screen_type"):
screen_types.add(precond["screen_type"])
# Known canonical values
canonical_app_types = {"系统限制", "SDK限制", "其他应用"}
canonical_app_states = {"前台", "后台"}
canonical_switches = {"开启", "关闭"}
canonical_geo = {"国内", "海外"}
canonical_screens = {"CSD", "PSD", "RFD", "any"}
unknown_app_types = app_types - canonical_app_types
unknown_app_states = app_states - canonical_app_states
unknown_switches = switches - canonical_switches
unknown_geo = geo_scopes - canonical_geo
unknown_screens = screen_types - canonical_screens
if unknown_app_types:
results.append({
"field": "app_type",
"issue": f"非标准值: {sorted(unknown_app_types)}",
"expected": sorted(canonical_app_types),
"status": WARN,
})
if unknown_app_states:
results.append({
"field": "app_state",
"issue": f"非标准值: {sorted(unknown_app_states)}",
"expected": sorted(canonical_app_states),
"status": WARN,
})
if unknown_switches:
results.append({
"field": "switch",
"issue": f"非标准值: {sorted(unknown_switches)}",
"expected": sorted(canonical_switches),
"status": WARN,
})
if unknown_geo:
results.append({
"field": "geographic_scope",
"issue": f"非标准值: {sorted(unknown_geo)}",
"expected": sorted(canonical_geo),
"status": WARN,
})
if unknown_screens:
results.append({
"field": "screen_type",
"issue": f"非标准值: {sorted(unknown_screens)}",
"expected": sorted(canonical_screens),
"status": WARN,
})
# Also check for near-duplicates (e.g., "系统限制类" vs "系统限制")
similar_pairs = []
for v1 in app_types:
for v2 in app_types:
if v1 < v2 and (v1 in v2 or v2 in v1):
similar_pairs.append(f"'{v1}' vs '{v2}'")
if similar_pairs:
results.append({
"field": "app_type",
"issue": f"疑似同义异名: {', '.join(similar_pairs)}",
"status": WARN,
})
if not results:
results.append({
"field": "all",
"issue": "所有字段术语统一",
"status": PASS,
})
return results
def _trigger_overlaps(t1: dict, t2: dict) -> bool:
"""Check if two triggers have any overlapping signal conditions."""
conds1 = t1.get("conditions", [])
conds2 = t2.get("conditions", [])
signals1 = {c.get("signal") for c in conds1 if isinstance(c, dict)}
signals2 = {c.get("signal") for c in conds2 if isinstance(c, dict)}
return bool(signals1 & signals2)
def _actions_conflict(a1: list[dict], a2: list[dict]) -> bool:
"""Check if two action lists appear contradictory.
"Contradictory" means: both have user_interaction with different content,
or one does system interrupt while the other does nothing.
"""
descs1 = {a.get("description", "") for a in a1}
descs2 = {a.get("description", "") for a in a2}
# If one set is a subset of the other, no conflict — just less detail
if descs1.issubset(descs2) or descs2.issubset(descs1):
return False
# Check for contradictory user_interaction content
contents1 = {
a.get("content", "") for a in a1
if a.get("type") == "user_interaction"
}
contents2 = {
a.get("content", "") for a in a2
if a.get("type") == "user_interaction"
}
if contents1 and contents2 and contents1 != contents2:
return True
# Check if one has system actions and the other doesn't
has_sys1 = any(a.get("type") == "system" for a in a1)
has_sys2 = any(a.get("type") == "system" for a in a2)
if has_sys1 != has_sys2 and (contents1 or contents2):
return True
return False
def _precondition_overlaps(p1: dict, p2: dict) -> bool:
"""Check if two preconditions overlap significantly.
Two preconditions overlap if they share the same scope, switch state,
and either share app_type or one is unspecified.
"""
if p1.get("geographic_scope") != p2.get("geographic_scope"):
return False
if p1.get("switch") != p2.get("switch"):
return False
# App type overlap (empty = any)
at1 = p1.get("app_type", "")
at2 = p2.get("app_type", "")
if at1 and at2 and at1 != at2:
return False
# App state overlap
as1 = p1.get("app_state", "")
as2 = p2.get("app_state", "")
if as1 and as2 and as1 != as2:
return False
return True
def _detect_contradictions(rules: list[dict]) -> list[dict]:
"""Find pairs of rules with overlapping preconditions but contradictory actions.
Returns a list of contradiction items with:
{rule_a, rule_b, conflict_point, resolvable, recommendation}
"""
contradictions = []
for i in range(len(rules)):
for j in range(i + 1, len(rules)):
r1 = rules[i]
r2 = rules[j]
rid1 = r1.get("rule_id", f"rule[{i}]")
rid2 = r2.get("rule_id", f"rule[{j}]")
p1 = r1.get("precondition", {})
p2 = r2.get("precondition", {})
if not _precondition_overlaps(p1, p2):
continue
t1 = r1.get("trigger", {})
t2 = r2.get("trigger", {})
if not _trigger_overlaps(t1, t2):
continue
a1 = r1.get("actions", [])
a2 = r2.get("actions", [])
if not _actions_conflict(a1, a2):
continue
# Determine the conflict point
path1 = r1.get("path", [])
path2 = r2.get("path", [])
conflict_point = (
f"相同前置状态 (scope={p1.get('geographic_scope')}, "
f"app={p1.get('app_type', 'any')}, "
f"state={p1.get('app_state', 'any')}) "
f"但行为路径不同: {path1} vs {path2}"
)
# Check if resolvable: if paths differ only at behavior level
# and the behaviors are non-overlapping (e.g., FG vs BG)
resolvable = False
if path1 and path2:
shared_prefix = []
for a, b in zip(path1, path2):
if a == b:
shared_prefix.append(a)
else:
break
# If they share scope/method but differ on app_state → not a conflict
# they are different scenarios
if len(shared_prefix) >= 3: # scope + method + (app_state)
# Actually, same app_state with different behaviors could be a real conflict
pass
elif len(shared_prefix) >= 2:
# Same scope+method, different app_state → not a real conflict
resolvable = True
contradictions.append({
"rule_a": rid1,
"rule_b": rid2,
"conflict_point": conflict_point,
"rule_a_path": path1,
"rule_b_path": path2,
"resolvable": resolvable,
"recommendation": (
"路径前缀不同,可能为不同场景的正常分支"
if resolvable
else "请人工确认是否为真正的矛盾,或合并规则"
),
})
return contradictions
def _auto_resolve_contradictions(
contradictions: list[dict], doc: dict
) -> tuple[list[dict], list[dict]]:
"""Attempt to auto-resolve contradictions using resolved_conflicts.
Returns (resolved, unresolved).
"""
if not contradictions:
return [], []
resolved_conflicts = doc.get("resolved_conflicts", [])
resolved = []
unresolved = []
for c in contradictions:
# Check if any resolved_conflict covers this
auto_fixed = False
for rc in resolved_conflicts:
rc_text = rc.get("correction", "") + rc.get("conflict_type", "")
# Simple heuristic: check if the conflict involves the sections mentioned
if any(
seg in rc_text
for seg in c.get("rule_a_path", []) + c.get("rule_b_path", [])
):
c["auto_resolved_by"] = rc.get("correction", "")
c["resolvable"] = True
resolved.append(c)
auto_fixed = True
break
if not auto_fixed:
unresolved.append(c)
return resolved, unresolved
# ---- Path Coverage Audit ----
def audit_path_coverage(
doc: dict, rules: list[dict]
) -> tuple[list[dict], dict]:
"""Audit logic tree path coverage (vs node coverage).
Uses the same path enumeration and coverage computation as step 2.5.
Returns (results_list, stats_dict).
"""
all_paths = enumerate_all_paths(doc)
if not all_paths:
return [], {"total_paths": 0, "covered_paths": 0,
"uncovered_paths": 0, "coverage_pct": 100.0}
covered, uncovered, stats = compute_path_coverage(all_paths, rules)
results = []
for image_id, paths in all_paths.items():
# Compute per-image coverage
img_covered = [p for p in covered if p.get("image_id") == image_id]
img_uncovered = [p for p in uncovered if p.get("image_id") == image_id]
img_total = len(img_covered) + len(img_uncovered)
img_cov = (
round(len(img_covered) / img_total * 100, 1) if img_total > 0 else 100.0
)
status = PASS if img_cov >= 95 else (WARN if img_cov >= 70 else FAIL)
detail = f"{len(img_covered)}/{img_total} 路径被覆盖 ({img_cov}%)"
if img_uncovered:
uncovered_meanings = [p.get("meaning", "?") for p in img_uncovered[:5]]
detail += f"; 未覆盖路径示例: {uncovered_meanings}"
if len(img_uncovered) > 5:
detail += f" ... 还有 {len(img_uncovered) - 5}"
results.append({
"check": f"逻辑树 {image_id} 路径覆盖率",
"status": status,
"coverage_pct": img_cov,
"detail": detail,
"image_id": image_id,
"uncovered_paths": img_uncovered,
})
return results, stats
# ---- Table Enumeration Audit ----
def find_table_enums(doc: dict) -> list[dict]:
"""Find enumerated values in tables."""
enums = []
for section in doc.get("sections", []):
for block in section.get("blocks", []):
if block["type"] != "table":
continue
headers = block.get("headers", [])
if not headers:
continue
if "功能" in headers and "功能详细说明" in headers:
for row in block.get("rows", []):
cols = row.get("columns", [])
key_col = next(
(c for c in cols if c.get("name") == "功能"), None
)
val_col = next(
(c for c in cols if c.get("name") == "功能详细说明"), None
)
if key_col and val_col:
enums.append({
"section": section.get("source", ""),
"row": key_col.get("row"),
"key": key_col.get("text", ""),
"value": val_col.get("text", ""),
})
else:
first_col_name = headers[0] if headers else ""
values = []
for row in block.get("rows", []):
for col in row.get("columns", []):
if col.get("name") == first_col_name:
values.append(col.get("text", ""))
if values:
enums.append({
"section": section.get("source", ""),
"column": first_col_name,
"values": values,
})
return enums
def audit_table_enums(rules: list[dict], doc: dict) -> list[dict]:
"""Check if key enumerated values appear in rule preconditions."""
results = []
rule_preconditions = [rule.get("precondition", {}) for rule in rules]
# App type coverage
app_types = {"系统限制", "SDK限制", "其他应用"}
found_app_types = set()
for precond in rule_preconditions:
at = precond.get("app_type", "")
if at:
found_app_types.add(at)
missing_types = app_types - found_app_types
results.append({
"check": "应用类型枚举覆盖",
"status": PASS if not missing_types else WARN,
"detail": f"已覆盖: {found_app_types or ''}"
+ (f"; 未覆盖: {missing_types}" if missing_types else ""),
})
# App state coverage
app_states = {"前台", "后台"}
found_states = set()
for precond in rule_preconditions:
st = precond.get("app_state", "")
if st:
found_states.add(st)
missing_states = app_states - found_states
results.append({
"check": "应用前后台状态覆盖",
"status": PASS if not missing_states else WARN,
"detail": f"已覆盖: {found_states or ''}"
+ (f"; 未覆盖: {missing_states}" if missing_states else ""),
})
# Trigger signal coverage
trigger_signals = set()
for rule in rules:
for cond in rule.get("trigger", {}).get("conditions", []):
signal = cond.get("signal", "")
if signal:
trigger_signals.add(signal)
key_signals = {"车速", "档位", "车速_持续时间", "应用请求启动"}
missing_signals = key_signals - trigger_signals
results.append({
"check": "触发信号覆盖(车速/档位/持续时间/启动请求)",
"status": PASS if not missing_signals else WARN,
"detail": f"已覆盖信号: {sorted(trigger_signals)}"
+ (f"; 未覆盖: {missing_signals}" if missing_signals else ""),
})
return results
# ---- Switch Coverage Audit ----
def audit_switch_coverage(rules: list[dict]) -> list[dict]:
"""Check that rules cover both switch ON and OFF states."""
switch_on = False
switch_off = False
for rule in rules:
sw = rule.get("precondition", {}).get("switch", "")
if sw == "开启":
switch_on = True
elif sw == "关闭":
switch_off = True
status = PASS
detail_parts = []
if switch_on:
detail_parts.append("开关=开启: 有规则覆盖")
else:
detail_parts.append("开关=开启: 未找到规则")
status = FAIL
if switch_off:
detail_parts.append("开关=关闭: 有规则覆盖")
else:
detail_parts.append("开关=关闭: 未找到规则")
status = FAIL
return [{
"check": "开关状态完整性(开启/关闭)",
"status": status,
"detail": "; ".join(detail_parts),
}]
# ---- Audit Report Generation ----
def generate_audit_report(
rules: list[dict],
doc: dict,
feature_name: str,
path_results: list[dict],
path_stats: dict,
enum_results: list[dict],
switch_results: list[dict],
consistency_results: list[dict],
contradictions: list[dict],
unresolved_contradictions: list[dict],
autocomplete_count: int,
path_conflicts: list[dict] | None = None,
) -> str:
"""Generate ir_audit_report.md with all audit sections."""
lines = []
lines.append("# IR 完整性审计报告")
lines.append("")
lines.append(f"**功能**: {feature_name}")
lines.append(f"**规则总数**: {len(rules)}")
lines.append(f"**生成时间**: {datetime.now().isoformat()}")
lines.append("")
# Human review notice
issue_count = sum(
1 for r in path_results + enum_results + switch_results + consistency_results
if r["status"] in (WARN, FAIL)
) + len(unresolved_contradictions) + len(path_conflicts or [])
lines.append(
f"> **重要**: 请人工审查以下标记项。"
f"{issue_count} 项需要关注。"
)
lines.append(
f'> 如无需修改,在对应项后标注 **"已确认"**。'
)
lines.append("")
# ---- Section 1: Path Coverage ----
lines.append("## 1. 逻辑树路径覆盖率")
lines.append("")
lines.append(
f"**总体**: {path_stats.get('covered_paths', 0)}/"
f"{path_stats.get('total_paths', 0)} 路径已覆盖 "
f"({path_stats.get('coverage_pct', 0)}%)"
)
lines.append("")
lines.append("| 图片 ID | 覆盖率 | 状态 | 详情 |")
lines.append("|---------|--------|------|------|")
for r in path_results:
lines.append(
f"| {r['image_id']} | {r['coverage_pct']}% "
f"| {r['status']} | {r['detail']} |"
)
lines.append("")
# Uncovered path details
for r in path_results:
uncovered = r.get("uncovered_paths", [])
if uncovered:
lines.append(f"### {r['image_id']} 未覆盖路径详情")
lines.append("")
for p in uncovered[:10]:
meaning = p.get("meaning", "?")
node_ids = p.get("node_ids", [])
lines.append(
f"- **路径**: {meaning} "
f"(节点: {''.join(node_ids)})"
)
if len(uncovered) > 10:
lines.append(f"- ... 还有 {len(uncovered) - 10} 条未覆盖路径")
lines.append("")
# ---- Section 2: Table Enumeration ----
lines.append("## 2. 表格枚举覆盖")
lines.append("")
lines.append("| 检查项 | 状态 | 详情 |")
lines.append("|--------|------|------|")
for r in enum_results:
lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |")
lines.append("")
# ---- Section 3: Switch Coverage ----
lines.append("## 3. 全局开关状态覆盖")
lines.append("")
lines.append("| 检查项 | 状态 | 详情 |")
lines.append("|--------|------|------|")
for r in switch_results:
lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |")
lines.append("")
# ---- Section 4: Consistency Scan ----
lines.append("## 4. 一致性扫描报告")
lines.append("")
lines.append("### 4.1 术语统一性")
lines.append("")
lines.append("| 字段 | 状态 | 详情 |")
lines.append("|------|------|------|")
for r in consistency_results:
expected = r.get("expected", [])
expected_str = f" (期望: {expected})" if expected else ""
lines.append(
f"| {r['field']} | {r['status']} | {r['issue']}{expected_str} |"
)
lines.append("")
lines.append("### 4.2 规则矛盾检测")
lines.append("")
if contradictions:
auto_resolved = [c for c in contradictions if c.get("auto_resolved_by")]
remaining = [c for c in contradictions if not c.get("auto_resolved_by")]
if auto_resolved:
lines.append(f"**自动解决**: {len(auto_resolved)} 项 (通过图文冲突仲裁)")
lines.append("")
for c in auto_resolved:
lines.append(
f"- {c['rule_a']} vs {c['rule_b']}: "
f"已按仲裁 '**{c['auto_resolved_by']}**' 处理"
)
lines.append("")
if remaining:
lines.append(f"**需人工确认**: {len(remaining)}")
lines.append("")
for c in remaining:
lines.append(f"### 矛盾: {c['rule_a']} vs {c['rule_b']}")
lines.append(f"- **冲突点**: {c['conflict_point']}")
lines.append(f"- **路径A**: {c.get('rule_a_path', [])}")
lines.append(f"- **路径B**: {c.get('rule_b_path', [])}")
lines.append(f"- **建议**: {c.get('recommendation', '请人工判断')}")
lines.append(f'- [ ] 已确认 (标注 **"已确认"**)')
lines.append("")
else:
lines.append("未检测到规则矛盾。")
lines.append("")
# ---- Section 4.3: Path Conflicts ----
lines.append("### 4.3 路径冲突(同path不同行为)")
lines.append("")
if path_conflicts:
for pc in path_conflicts:
lines.append(f"- **Path**: {' > '.join(pc['path'])}")
lines.append(f" - 规则: {', '.join(pc['rule_ids'])}")
lines.append(f" - 不同行为数: {pc['distinct_behaviors']}")
lines.append(f" - 建议: {pc['suggestion']}")
lines.append("")
else:
lines.append("未检测到路径冲突。")
lines.append("")
# ---- Section 5: Auto-Complete Summary ----
lines.append("## 5. 自动补全摘要")
lines.append("")
if autocomplete_count > 0:
lines.append(f"- 自动补全片段数: {autocomplete_count}")
lines.append(
f"- 补全后路径覆盖率: "
f"{path_stats.get('coverage_pct', 0)}%"
)
lines.append(f"- 自动生成的规则已合并到最终规则集中")
else:
lines.append("- 未执行自动补全(所有路径已被手动覆盖,或未运行 step2.5)")
lines.append("")
# ---- Section 6: Rule Manifest ----
lines.append("## 6. 规则清单")
lines.append("")
lines.append("| rule_id | Priority | Path | 简述 |")
lines.append("|---------|----------|------|------|")
for rule in rules:
rid = rule.get("rule_id", "?")
pri = rule.get("priority", "?")
path_str = " > ".join(rule.get("path", []))
desc = rule.get("description", "")[:60]
lines.append(f"| {rid} | {pri} | {path_str} | {desc} |")
lines.append("")
return "\n".join(lines)
def _extract_config_defaults(doc: dict, semantic_index: dict) -> dict:
"""Extract configuration defaults (e.g. switch default states) from document.
Scans table text for patterns like "默认开启"/"默认关闭" and checks
semantic_index concepts for "默认" keywords.
"""
defaults = {}
# Scan document tables for default config values
for section in doc.get("sections", []):
for block in section.get("blocks", []):
if block["type"] != "table":
continue
for row in block.get("rows", []):
row_texts = []
for col in row.get("columns", []):
row_texts.append(f"{col.get('name','')}: {col.get('text','')}")
combined = " ".join(row_texts)
if "行车娱乐限制开关" in combined or "开关" in combined:
if "默认开启" in combined or "默认状态:开启" in combined:
defaults["行车娱乐限制开关"] = {
"default": "开启",
"section": section.get("source", "").split()[0]
if section.get("source") else "",
}
elif "默认关闭" in combined or "默认状态:关闭" in combined:
defaults["行车娱乐限制开关"] = {
"default": "关闭",
"section": section.get("source", "").split()[0]
if section.get("source") else "",
}
# Supplement from semantic_index concepts
for concept in semantic_index.get("concepts", []):
name = concept.get("name", "")
if "默认" in name:
if "开启" in name:
defaults.setdefault("行车娱乐限制开关", {})["default"] = "开启"
elif "关闭" in name:
defaults.setdefault("行车娱乐限制开关", {})["default"] = "关闭"
return defaults
def _detect_path_conflicts(rules: list[dict]) -> list[dict]:
"""Detect rules that share the same path triplet but have different behaviors.
Returns list of conflict items for the audit report.
"""
from collections import defaultdict
path_groups = defaultdict(list)
for rule in rules:
path_key = tuple(rule.get("path", []))
path_groups[path_key].append(rule)
conflicts = []
for path_key, group in path_groups.items():
if len(group) <= 1:
continue
# Check if rules in the same path have different trigger/action signatures
signatures = set()
for r in group:
trigger = r.get("trigger", {})
actions = tuple(
a.get("description", "") for a in r.get("actions", [])
)
sig = (
tuple(sorted(
(c.get("signal",""), c.get("operator",""), str(c.get("value","")))
for c in trigger.get("conditions", [])
)),
actions,
)
signatures.add(sig)
if len(signatures) > 1:
# Same path, different behaviors → potential organization issue
conflicts.append({
"status": "WARN",
"type": "path_collision",
"path": list(path_key),
"rule_ids": [r["rule_id"] for r in group],
"count": len(group),
"distinct_behaviors": len(signatures),
"suggestion": "多条规则共享相同path但行为不同,考虑拆分path或使用更细粒度的叶子路径",
})
return conflicts
# ---- Main ----
def main():
print("=" * 60)
print("阶段三:确定性合并、一致性校验与完整性审计")
print("=" * 60)
# 1. Load inputs
print(f"\n[1/7] 加载输入...")
fragments = load_fragments()
autocomplete_fragments = load_autocomplete_fragments()
doc = config.load_input_document()
semantic_index = load_semantic_index()
path_enum = load_path_enumeration()
total_fragments = len(fragments)
if total_fragments == 0 and not autocomplete_fragments:
print("错误: 无 IR 片段可合并 (fragments 和 autocomplete_fragments 均为空)。")
print(" 请检查 step2_ir_extraction 是否正确运行。")
print(" 可能原因: step1 未生成 function_units,或 step2 提取失败。")
sys.exit(1)
feature_name = semantic_index.get("feature_name", "行车娱乐限制")
feature_id = "DRL-001"
print(f" 功能: {feature_name} ({feature_id})")
print(f" 主片段: {total_fragments}")
if autocomplete_fragments:
print(f" 自动补全片段: {len(autocomplete_fragments)}")
# 2. Merge rules
print(f"\n[2/7] 合并去重...")
merged_rules = merge_rules(fragments, autocomplete_fragments)
# 2.5 Normalize rules (fix missing triggers, null operators)
merged_rules = [_normalize_rule(r) for r in merged_rules]
print(f" 标准化: {len(merged_rules)} 条规则")
# 3. Reassign rule IDs
print(f"\n[3/7] 重分配 rule_id (层次化格式)...")
final_rules = assign_rule_ids(merged_rules, feature_id)
print(f" 已分配 {len(final_rules)} 个稳定 ID")
# Show ID examples
if final_rules:
sample_ids = [r["rule_id"] for r in final_rules[:3]]
print(f" 示例: {sample_ids}")
# 4. Consistency checks
print(f"\n[4/7] 一致性扫描...")
consistency_results = _check_naming_consistency(final_rules)
n_warns = sum(1 for r in consistency_results if r["status"] == WARN)
if n_warns:
print(f" {WARN} {n_warns} 个术语不一致问题")
else:
print(f" {PASS} 术语统一")
contradictions = _detect_contradictions(final_rules)
resolved, unresolved = _auto_resolve_contradictions(contradictions, doc)
if resolved:
print(f" {PASS} 自动解决 {len(resolved)} 个矛盾")
if unresolved:
print(f" {WARN} {len(unresolved)} 个矛盾需要人工确认")
for c in unresolved:
print(f" - {c['rule_a']} vs {c['rule_b']}: {c['conflict_point'][:80]}")
if not contradictions:
print(f" {PASS} 未检测到规则矛盾")
path_conflicts = _detect_path_conflicts(final_rules)
if path_conflicts:
print(f" {WARN} {len(path_conflicts)} 个 path 冲突(同path不同行为)")
else:
print(f" {PASS} 无 path 冲突")
# 5. Generate audit report
print(f"\n[5/7] 生成审计报告...")
path_results, path_stats = audit_path_coverage(doc, final_rules)
enum_results = audit_table_enums(final_rules, doc)
switch_results = audit_switch_coverage(final_rules)
report = generate_audit_report(
final_rules, doc, feature_name,
path_results, path_stats,
enum_results, switch_results,
consistency_results,
contradictions, unresolved,
len(autocomplete_fragments),
path_conflicts,
)
# 6. Extract config defaults from document
config_defaults = _extract_config_defaults(doc, semantic_index)
if config_defaults:
print(f" 配置默认值: {list(config_defaults.keys())}")
# 7. Save outputs
print(f"\n[7/7] 保存输出...")
ir_final = {
"feature": feature_name,
"feature_id": feature_id,
"rules": final_rules,
}
if config_defaults:
ir_final["config_defaults"] = config_defaults
config.save_json(ir_final, config.IR_FINAL_JSON)
print(f" IR: {config.IR_FINAL_JSON}")
with open(config.IR_AUDIT_REPORT_MD, "w", encoding="utf-8") as f:
f.write(report)
print(f" 审计报告: {config.IR_AUDIT_REPORT_MD}")
# Summary
print(f"\n完成!")
issue_count = (
sum(1 for r in path_results + enum_results + switch_results
if r["status"] in (WARN, FAIL))
+ n_warns
+ len(unresolved)
+ len(path_conflicts)
)
print(f" 规则: {len(final_rules)}")
print(f" 路径覆盖: {path_stats.get('coverage_pct', 0)}%")
print(f" 审计问题: {issue_count} 个需要关注")
if issue_count > 0:
print(f"\n 请查看 {config.IR_AUDIT_REPORT_MD} 并审查标记项。")
if __name__ == "__main__":
main()