da17b3b3b2
CI / test (pull_request) Successful in 9s
- step3 rule_signature: trigger.conditions=None 时使用 `or []` 防御 - step1 _quick_validate: total_rows=0 时行覆盖率设为 100% 而非 0% - test_step1: 新增 TestHasSectionContent (10个) + TestQuickValidateEmptySections (2个) - test_step3: 新增 TestRuleSignature (7个) + TestNormalizeRule (4个) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1150 lines
39 KiB
Python
1150 lines
39 KiB
Python
"""
|
|
Stage 3: Deterministic Merge, Consistency Check & Completeness Audit.
|
|
|
|
- Merges IR rule fragments (including autocomplete), deduplicating by trigger+actions.
|
|
- Reassigns stable hierarchical rule_ids.
|
|
- Runs consistency checks: naming uniformity, rule contradictions.
|
|
- Generates an audit report covering:
|
|
1. Path coverage (vs enumerated logic tree paths)
|
|
2. Table enumeration coverage
|
|
3. Global switch state coverage
|
|
4. Consistency scan report
|
|
5. Auto-complete summary
|
|
6. Final rule manifest
|
|
|
|
Outputs:
|
|
- ir_final.json
|
|
- ir_audit_report.md
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import config
|
|
from step2_5_branch_coverage import compute_path_coverage, enumerate_all_paths
|
|
|
|
PASS = "[PASS]"
|
|
WARN = "[WARN]"
|
|
FAIL = "[FAIL]"
|
|
|
|
# ---- Rule ID Generation ----
|
|
|
|
LABEL_MAP = {
|
|
"国内": "DOMESTIC",
|
|
"海外": "OVERSEAS",
|
|
"系统限制": "SYS",
|
|
"SDK限制": "SDK",
|
|
"SDK自定义限制": "SDK",
|
|
"其他应用": "OTHER",
|
|
"行车娱乐限制": "SYS",
|
|
"行车娱乐禁止": "SYS",
|
|
"前台打断": "FG-INTERRUPT",
|
|
"后台限制启动": "BG-BLOCK",
|
|
"后台禁止": "BG-BLOCK",
|
|
"后台暂停功能": "BG-PAUSE",
|
|
"后台允许": "BG-ALLOW",
|
|
"无限制": "NO-RESTRICT",
|
|
"开关关闭": "SWITCH-OFF",
|
|
"开关置灰": "SWITCH-GRAY",
|
|
"确认弹窗": "CONFIRM-DLG",
|
|
"风险确认弹窗": "CONFIRM-DLG",
|
|
}
|
|
|
|
|
|
def _path_to_rule_id_components(path: list[str]) -> tuple[str, str, str]:
|
|
"""Extract (scope, method, behavior) from a path array.
|
|
|
|
Falls back to "UNKNOWN" for unrecognized components.
|
|
"""
|
|
scope = "UNKNOWN"
|
|
method = "UNKNOWN"
|
|
behavior = "UNKNOWN"
|
|
|
|
for segment in path:
|
|
mapped = LABEL_MAP.get(segment)
|
|
if mapped in ("DOMESTIC", "OVERSEAS"):
|
|
scope = mapped
|
|
elif mapped in ("SYS", "SDK", "OTHER"):
|
|
method = mapped
|
|
elif mapped in ("FG-INTERRUPT", "BG-BLOCK", "BG-PAUSE",
|
|
"NO-RESTRICT", "SWITCH-OFF"):
|
|
behavior = mapped
|
|
|
|
return scope, method, behavior
|
|
|
|
|
|
# ---- Loading ----
|
|
|
|
def load_fragments() -> list[dict]:
|
|
"""Load IR fragments from Stage 2."""
|
|
return config.load_json(config.IR_FRAGMENTS_JSON)
|
|
|
|
|
|
def load_autocomplete_fragments() -> list[dict]:
|
|
"""Load auto-complete fragments from Stage 2.5, or return [] if absent."""
|
|
path = config.IR_AUTOCOMPLETE_FRAGMENTS_JSON
|
|
if not Path(path).exists():
|
|
return []
|
|
return config.load_json(path)
|
|
|
|
|
|
def load_semantic_index() -> dict:
|
|
"""Load merged semantic index from Stage 1."""
|
|
return config.load_json(config.SEMANTIC_INDEX_JSON)
|
|
|
|
|
|
def load_path_enumeration() -> dict:
|
|
"""Load logic tree path enumeration, or return {} if absent."""
|
|
path = config.PATH_ENUM_JSON
|
|
if not Path(path).exists():
|
|
return {}
|
|
data = config.load_json(path)
|
|
return data.get("logic_tree_paths", {})
|
|
|
|
|
|
# ---- Rule Merge ----
|
|
|
|
def rule_signature(rule: dict) -> str:
|
|
"""Generate a dedup signature from path + trigger + actions."""
|
|
path = rule.get("path", [])
|
|
trigger = rule.get("trigger") or {}
|
|
actions = rule.get("actions") or []
|
|
|
|
raw_conditions = trigger.get("conditions") or []
|
|
conditions = sorted(
|
|
raw_conditions, key=lambda c: (c or {}).get("signal", "")
|
|
)
|
|
sorted_actions = sorted(actions, key=lambda a: a.get("description", ""))
|
|
|
|
sig_data = {
|
|
"path": path,
|
|
"conditions": conditions,
|
|
"actions": sorted_actions,
|
|
}
|
|
sig_json = json.dumps(sig_data, ensure_ascii=False, sort_keys=True)
|
|
return hashlib.sha256(sig_json.encode()).hexdigest()[:16]
|
|
|
|
|
|
def _normalize_rule(rule: dict) -> dict:
|
|
"""Ensure a rule has all required fields with valid defaults.
|
|
|
|
Fixes common LLM output issues: missing trigger, null operator, etc.
|
|
"""
|
|
# Ensure trigger exists
|
|
if not rule.get("trigger"):
|
|
rule["trigger"] = {}
|
|
|
|
trigger = rule["trigger"]
|
|
|
|
# Ensure trigger-level combining operator (AND/OR) for multi-condition triggers
|
|
if not trigger.get("operator"):
|
|
trigger["operator"] = "AND"
|
|
|
|
# If trigger has an event, it's event-based (no conditions needed)
|
|
if trigger.get("event") is not None:
|
|
return rule
|
|
|
|
# Ensure conditions list exists
|
|
if "conditions" not in trigger:
|
|
trigger["conditions"] = []
|
|
|
|
# Fix null operators in individual conditions
|
|
for cond in trigger["conditions"]:
|
|
if not cond.get("operator"):
|
|
cond["operator"] = "=="
|
|
if not cond.get("signal"):
|
|
cond["signal"] = "unknown"
|
|
if "value" not in cond:
|
|
cond["value"] = "N/A"
|
|
|
|
# If still no conditions, add a default one
|
|
if not trigger["conditions"]:
|
|
trigger["conditions"] = [{
|
|
"signal": "system_state",
|
|
"operator": "==",
|
|
"value": "active"
|
|
}]
|
|
|
|
return rule
|
|
|
|
|
|
def merge_rules(fragments: list[dict],
|
|
autocomplete_fragments: list[dict] | None = None) -> list[dict]:
|
|
"""Merge rules across all fragments, deduplicating by trigger+actions.
|
|
|
|
Includes autocomplete fragments if provided.
|
|
"""
|
|
all_fragments = list(fragments)
|
|
if autocomplete_fragments:
|
|
all_fragments.extend(autocomplete_fragments)
|
|
|
|
signature_map: dict[str, dict] = {}
|
|
order = []
|
|
|
|
for fragment in all_fragments:
|
|
for rule in fragment.get("rules", []):
|
|
sig = rule_signature(rule)
|
|
|
|
if sig in signature_map:
|
|
existing = signature_map[sig]
|
|
existing_sources = existing.setdefault("sources", [])
|
|
for src in rule.get("sources", []):
|
|
if src not in existing_sources:
|
|
existing_sources.append(src)
|
|
if len(rule.get("description", "")) > len(
|
|
existing.get("description", "")
|
|
):
|
|
existing["description"] = rule["description"]
|
|
# Merge path if present and different
|
|
rpath = rule.get("path", [])
|
|
epath = existing.get("path", [])
|
|
if rpath and not epath:
|
|
existing["path"] = rpath
|
|
else:
|
|
signature_map[sig] = dict(rule)
|
|
order.append(sig)
|
|
|
|
merged = [signature_map[sig] for sig in order]
|
|
|
|
total_before = sum(len(f.get("rules", [])) for f in all_fragments)
|
|
auto_before = sum(
|
|
len(f.get("rules", [])) for f in (autocomplete_fragments or [])
|
|
)
|
|
print(f" 主片段规则: {total_before - auto_before} 条")
|
|
if auto_before:
|
|
print(f" 自动补全规则: {auto_before} 条")
|
|
print(f" 合并后: {len(merged)} 条 (去重 {total_before - len(merged)} 条)")
|
|
return merged
|
|
|
|
|
|
# ---- Rule ID Assignment ----
|
|
|
|
def assign_rule_ids(rules: list[dict], feature_id: str = "DRL-001") -> list[dict]:
|
|
"""Reassign stable hierarchical rule_ids.
|
|
|
|
Format: {feature_id}-SCOPE-METHOD-BEHAVIOR-NN
|
|
Example: DRL-001-DOMESTIC-SYS-FG-INTERRUPT-01
|
|
"""
|
|
# Counter per (scope, method, behavior) key
|
|
counters: dict[tuple[str, str, str], int] = defaultdict(int)
|
|
|
|
for rule in rules:
|
|
path = rule.get("path", [])
|
|
scope, method, behavior = _path_to_rule_id_components(path)
|
|
|
|
# If path is missing, try to infer from precondition
|
|
if scope == "UNKNOWN":
|
|
precond = rule.get("precondition", {})
|
|
geo = precond.get("geographic_scope", "")
|
|
scope = LABEL_MAP.get(geo, "DOMESTIC")
|
|
if method == "UNKNOWN":
|
|
precond = rule.get("precondition", {})
|
|
at = precond.get("app_type", "")
|
|
method = LABEL_MAP.get(at, "SYS")
|
|
if behavior == "UNKNOWN":
|
|
precond = rule.get("precondition", {})
|
|
sw = precond.get("switch", "")
|
|
if sw == "关闭":
|
|
behavior = "SWITCH-OFF"
|
|
else:
|
|
# Infer from actions
|
|
actions = rule.get("actions", [])
|
|
action_descs = " ".join(
|
|
a.get("description", "") for a in actions
|
|
)
|
|
if "打断" in action_descs or "前台" in action_descs:
|
|
behavior = "FG-INTERRUPT"
|
|
elif "限制" in action_descs and "启动" in action_descs:
|
|
behavior = "BG-BLOCK"
|
|
elif "暂停" in action_descs:
|
|
behavior = "BG-PAUSE"
|
|
else:
|
|
behavior = "NO-RESTRICT"
|
|
|
|
key = (scope, method, behavior)
|
|
counters[key] += 1
|
|
seq = counters[key]
|
|
rule["rule_id"] = f"{feature_id}-{scope}-{method}-{behavior}-{seq:02d}"
|
|
|
|
return rules
|
|
|
|
|
|
# ---- Consistency Checks ----
|
|
|
|
def _check_naming_consistency(rules: list[dict]) -> list[dict]:
|
|
"""Check that app_type, app_state, switch values use unified terminology.
|
|
|
|
Returns a list of inconsistency items.
|
|
"""
|
|
results = []
|
|
|
|
# Collect all values for each field
|
|
app_types = set()
|
|
app_states = set()
|
|
switches = set()
|
|
geo_scopes = set()
|
|
screen_types = set()
|
|
|
|
for rule in rules:
|
|
precond = rule.get("precondition", {})
|
|
if precond.get("app_type"):
|
|
app_types.add(precond["app_type"])
|
|
if precond.get("app_state"):
|
|
app_states.add(precond["app_state"])
|
|
if precond.get("switch"):
|
|
switches.add(precond["switch"])
|
|
if precond.get("geographic_scope"):
|
|
geo_scopes.add(precond["geographic_scope"])
|
|
if precond.get("screen_type"):
|
|
screen_types.add(precond["screen_type"])
|
|
|
|
# Known canonical values
|
|
canonical_app_types = {"系统限制", "SDK限制", "其他应用"}
|
|
canonical_app_states = {"前台", "后台"}
|
|
canonical_switches = {"开启", "关闭"}
|
|
canonical_geo = {"国内", "海外"}
|
|
canonical_screens = {"CSD", "PSD", "RFD", "any"}
|
|
|
|
unknown_app_types = app_types - canonical_app_types
|
|
unknown_app_states = app_states - canonical_app_states
|
|
unknown_switches = switches - canonical_switches
|
|
unknown_geo = geo_scopes - canonical_geo
|
|
unknown_screens = screen_types - canonical_screens
|
|
|
|
if unknown_app_types:
|
|
results.append({
|
|
"field": "app_type",
|
|
"issue": f"非标准值: {sorted(unknown_app_types)}",
|
|
"expected": sorted(canonical_app_types),
|
|
"status": WARN,
|
|
})
|
|
|
|
if unknown_app_states:
|
|
results.append({
|
|
"field": "app_state",
|
|
"issue": f"非标准值: {sorted(unknown_app_states)}",
|
|
"expected": sorted(canonical_app_states),
|
|
"status": WARN,
|
|
})
|
|
|
|
if unknown_switches:
|
|
results.append({
|
|
"field": "switch",
|
|
"issue": f"非标准值: {sorted(unknown_switches)}",
|
|
"expected": sorted(canonical_switches),
|
|
"status": WARN,
|
|
})
|
|
|
|
if unknown_geo:
|
|
results.append({
|
|
"field": "geographic_scope",
|
|
"issue": f"非标准值: {sorted(unknown_geo)}",
|
|
"expected": sorted(canonical_geo),
|
|
"status": WARN,
|
|
})
|
|
|
|
if unknown_screens:
|
|
results.append({
|
|
"field": "screen_type",
|
|
"issue": f"非标准值: {sorted(unknown_screens)}",
|
|
"expected": sorted(canonical_screens),
|
|
"status": WARN,
|
|
})
|
|
|
|
# Also check for near-duplicates (e.g., "系统限制类" vs "系统限制")
|
|
similar_pairs = []
|
|
for v1 in app_types:
|
|
for v2 in app_types:
|
|
if v1 < v2 and (v1 in v2 or v2 in v1):
|
|
similar_pairs.append(f"'{v1}' vs '{v2}'")
|
|
if similar_pairs:
|
|
results.append({
|
|
"field": "app_type",
|
|
"issue": f"疑似同义异名: {', '.join(similar_pairs)}",
|
|
"status": WARN,
|
|
})
|
|
|
|
if not results:
|
|
results.append({
|
|
"field": "all",
|
|
"issue": "所有字段术语统一",
|
|
"status": PASS,
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
def _trigger_overlaps(t1: dict, t2: dict) -> bool:
|
|
"""Check if two triggers have any overlapping signal conditions."""
|
|
conds1 = t1.get("conditions", [])
|
|
conds2 = t2.get("conditions", [])
|
|
signals1 = {c.get("signal") for c in conds1 if isinstance(c, dict)}
|
|
signals2 = {c.get("signal") for c in conds2 if isinstance(c, dict)}
|
|
return bool(signals1 & signals2)
|
|
|
|
|
|
def _actions_conflict(a1: list[dict], a2: list[dict]) -> bool:
|
|
"""Check if two action lists appear contradictory.
|
|
|
|
"Contradictory" means: both have user_interaction with different content,
|
|
or one does system interrupt while the other does nothing.
|
|
"""
|
|
descs1 = {a.get("description", "") for a in a1}
|
|
descs2 = {a.get("description", "") for a in a2}
|
|
|
|
# If one set is a subset of the other, no conflict — just less detail
|
|
if descs1.issubset(descs2) or descs2.issubset(descs1):
|
|
return False
|
|
|
|
# Check for contradictory user_interaction content
|
|
contents1 = {
|
|
a.get("content", "") for a in a1
|
|
if a.get("type") == "user_interaction"
|
|
}
|
|
contents2 = {
|
|
a.get("content", "") for a in a2
|
|
if a.get("type") == "user_interaction"
|
|
}
|
|
if contents1 and contents2 and contents1 != contents2:
|
|
return True
|
|
|
|
# Check if one has system actions and the other doesn't
|
|
has_sys1 = any(a.get("type") == "system" for a in a1)
|
|
has_sys2 = any(a.get("type") == "system" for a in a2)
|
|
if has_sys1 != has_sys2 and (contents1 or contents2):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def _precondition_overlaps(p1: dict, p2: dict) -> bool:
|
|
"""Check if two preconditions overlap significantly.
|
|
|
|
Two preconditions overlap if they share the same scope, switch state,
|
|
and either share app_type or one is unspecified.
|
|
"""
|
|
if p1.get("geographic_scope") != p2.get("geographic_scope"):
|
|
return False
|
|
if p1.get("switch") != p2.get("switch"):
|
|
return False
|
|
# App type overlap (empty = any)
|
|
at1 = p1.get("app_type", "")
|
|
at2 = p2.get("app_type", "")
|
|
if at1 and at2 and at1 != at2:
|
|
return False
|
|
# App state overlap
|
|
as1 = p1.get("app_state", "")
|
|
as2 = p2.get("app_state", "")
|
|
if as1 and as2 and as1 != as2:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _detect_contradictions(rules: list[dict]) -> list[dict]:
|
|
"""Find pairs of rules with overlapping preconditions but contradictory actions.
|
|
|
|
Returns a list of contradiction items with:
|
|
{rule_a, rule_b, conflict_point, resolvable, recommendation}
|
|
"""
|
|
contradictions = []
|
|
|
|
for i in range(len(rules)):
|
|
for j in range(i + 1, len(rules)):
|
|
r1 = rules[i]
|
|
r2 = rules[j]
|
|
rid1 = r1.get("rule_id", f"rule[{i}]")
|
|
rid2 = r2.get("rule_id", f"rule[{j}]")
|
|
|
|
p1 = r1.get("precondition", {})
|
|
p2 = r2.get("precondition", {})
|
|
|
|
if not _precondition_overlaps(p1, p2):
|
|
continue
|
|
|
|
t1 = r1.get("trigger", {})
|
|
t2 = r2.get("trigger", {})
|
|
|
|
if not _trigger_overlaps(t1, t2):
|
|
continue
|
|
|
|
a1 = r1.get("actions", [])
|
|
a2 = r2.get("actions", [])
|
|
|
|
if not _actions_conflict(a1, a2):
|
|
continue
|
|
|
|
# Determine the conflict point
|
|
path1 = r1.get("path", [])
|
|
path2 = r2.get("path", [])
|
|
conflict_point = (
|
|
f"相同前置状态 (scope={p1.get('geographic_scope')}, "
|
|
f"app={p1.get('app_type', 'any')}, "
|
|
f"state={p1.get('app_state', 'any')}) "
|
|
f"但行为路径不同: {path1} vs {path2}"
|
|
)
|
|
|
|
# Check if resolvable: if paths differ only at behavior level
|
|
# and the behaviors are non-overlapping (e.g., FG vs BG)
|
|
resolvable = False
|
|
if path1 and path2:
|
|
shared_prefix = []
|
|
for a, b in zip(path1, path2):
|
|
if a == b:
|
|
shared_prefix.append(a)
|
|
else:
|
|
break
|
|
# If they share scope/method but differ on app_state → not a conflict
|
|
# they are different scenarios
|
|
if len(shared_prefix) >= 3: # scope + method + (app_state)
|
|
# Actually, same app_state with different behaviors could be a real conflict
|
|
pass
|
|
elif len(shared_prefix) >= 2:
|
|
# Same scope+method, different app_state → not a real conflict
|
|
resolvable = True
|
|
|
|
contradictions.append({
|
|
"rule_a": rid1,
|
|
"rule_b": rid2,
|
|
"conflict_point": conflict_point,
|
|
"rule_a_path": path1,
|
|
"rule_b_path": path2,
|
|
"resolvable": resolvable,
|
|
"recommendation": (
|
|
"路径前缀不同,可能为不同场景的正常分支"
|
|
if resolvable
|
|
else "请人工确认是否为真正的矛盾,或合并规则"
|
|
),
|
|
})
|
|
|
|
return contradictions
|
|
|
|
|
|
def _auto_resolve_contradictions(
|
|
contradictions: list[dict], doc: dict
|
|
) -> tuple[list[dict], list[dict]]:
|
|
"""Attempt to auto-resolve contradictions using resolved_conflicts.
|
|
|
|
Returns (resolved, unresolved).
|
|
"""
|
|
if not contradictions:
|
|
return [], []
|
|
|
|
resolved_conflicts = doc.get("resolved_conflicts", [])
|
|
resolved = []
|
|
unresolved = []
|
|
|
|
for c in contradictions:
|
|
# Check if any resolved_conflict covers this
|
|
auto_fixed = False
|
|
for rc in resolved_conflicts:
|
|
rc_text = rc.get("correction", "") + rc.get("conflict_type", "")
|
|
# Simple heuristic: check if the conflict involves the sections mentioned
|
|
if any(
|
|
seg in rc_text
|
|
for seg in c.get("rule_a_path", []) + c.get("rule_b_path", [])
|
|
):
|
|
c["auto_resolved_by"] = rc.get("correction", "")
|
|
c["resolvable"] = True
|
|
resolved.append(c)
|
|
auto_fixed = True
|
|
break
|
|
|
|
if not auto_fixed:
|
|
unresolved.append(c)
|
|
|
|
return resolved, unresolved
|
|
|
|
|
|
# ---- Path Coverage Audit ----
|
|
|
|
def audit_path_coverage(
|
|
doc: dict, rules: list[dict]
|
|
) -> tuple[list[dict], dict]:
|
|
"""Audit logic tree path coverage (vs node coverage).
|
|
|
|
Uses the same path enumeration and coverage computation as step 2.5.
|
|
Returns (results_list, stats_dict).
|
|
"""
|
|
all_paths = enumerate_all_paths(doc)
|
|
if not all_paths:
|
|
return [], {"total_paths": 0, "covered_paths": 0,
|
|
"uncovered_paths": 0, "coverage_pct": 100.0}
|
|
|
|
covered, uncovered, stats = compute_path_coverage(all_paths, rules)
|
|
|
|
results = []
|
|
for image_id, paths in all_paths.items():
|
|
# Compute per-image coverage
|
|
img_covered = [p for p in covered if p.get("image_id") == image_id]
|
|
img_uncovered = [p for p in uncovered if p.get("image_id") == image_id]
|
|
img_total = len(img_covered) + len(img_uncovered)
|
|
img_cov = (
|
|
round(len(img_covered) / img_total * 100, 1) if img_total > 0 else 100.0
|
|
)
|
|
|
|
status = PASS if img_cov >= 95 else (WARN if img_cov >= 70 else FAIL)
|
|
detail = f"{len(img_covered)}/{img_total} 路径被覆盖 ({img_cov}%)"
|
|
|
|
if img_uncovered:
|
|
uncovered_meanings = [p.get("meaning", "?") for p in img_uncovered[:5]]
|
|
detail += f"; 未覆盖路径示例: {uncovered_meanings}"
|
|
if len(img_uncovered) > 5:
|
|
detail += f" ... 还有 {len(img_uncovered) - 5} 条"
|
|
|
|
results.append({
|
|
"check": f"逻辑树 {image_id} 路径覆盖率",
|
|
"status": status,
|
|
"coverage_pct": img_cov,
|
|
"detail": detail,
|
|
"image_id": image_id,
|
|
"uncovered_paths": img_uncovered,
|
|
})
|
|
|
|
return results, stats
|
|
|
|
|
|
# ---- Table Enumeration Audit ----
|
|
|
|
def find_table_enums(doc: dict) -> list[dict]:
|
|
"""Find enumerated values in tables."""
|
|
enums = []
|
|
for section in doc.get("sections", []):
|
|
for block in section.get("blocks", []):
|
|
if block["type"] != "table":
|
|
continue
|
|
headers = block.get("headers", [])
|
|
if not headers:
|
|
continue
|
|
if "功能" in headers and "功能详细说明" in headers:
|
|
for row in block.get("rows", []):
|
|
cols = row.get("columns", [])
|
|
key_col = next(
|
|
(c for c in cols if c.get("name") == "功能"), None
|
|
)
|
|
val_col = next(
|
|
(c for c in cols if c.get("name") == "功能详细说明"), None
|
|
)
|
|
if key_col and val_col:
|
|
enums.append({
|
|
"section": section.get("source", ""),
|
|
"row": key_col.get("row"),
|
|
"key": key_col.get("text", ""),
|
|
"value": val_col.get("text", ""),
|
|
})
|
|
else:
|
|
first_col_name = headers[0] if headers else ""
|
|
values = []
|
|
for row in block.get("rows", []):
|
|
for col in row.get("columns", []):
|
|
if col.get("name") == first_col_name:
|
|
values.append(col.get("text", ""))
|
|
if values:
|
|
enums.append({
|
|
"section": section.get("source", ""),
|
|
"column": first_col_name,
|
|
"values": values,
|
|
})
|
|
return enums
|
|
|
|
|
|
def audit_table_enums(rules: list[dict], doc: dict) -> list[dict]:
|
|
"""Check if key enumerated values appear in rule preconditions."""
|
|
results = []
|
|
rule_preconditions = [rule.get("precondition", {}) for rule in rules]
|
|
|
|
# App type coverage
|
|
app_types = {"系统限制", "SDK限制", "其他应用"}
|
|
found_app_types = set()
|
|
for precond in rule_preconditions:
|
|
at = precond.get("app_type", "")
|
|
if at:
|
|
found_app_types.add(at)
|
|
missing_types = app_types - found_app_types
|
|
results.append({
|
|
"check": "应用类型枚举覆盖",
|
|
"status": PASS if not missing_types else WARN,
|
|
"detail": f"已覆盖: {found_app_types or '无'}"
|
|
+ (f"; 未覆盖: {missing_types}" if missing_types else ""),
|
|
})
|
|
|
|
# App state coverage
|
|
app_states = {"前台", "后台"}
|
|
found_states = set()
|
|
for precond in rule_preconditions:
|
|
st = precond.get("app_state", "")
|
|
if st:
|
|
found_states.add(st)
|
|
missing_states = app_states - found_states
|
|
results.append({
|
|
"check": "应用前后台状态覆盖",
|
|
"status": PASS if not missing_states else WARN,
|
|
"detail": f"已覆盖: {found_states or '无'}"
|
|
+ (f"; 未覆盖: {missing_states}" if missing_states else ""),
|
|
})
|
|
|
|
# Trigger signal coverage
|
|
trigger_signals = set()
|
|
for rule in rules:
|
|
for cond in rule.get("trigger", {}).get("conditions", []):
|
|
signal = cond.get("signal", "")
|
|
if signal:
|
|
trigger_signals.add(signal)
|
|
|
|
key_signals = {"车速", "档位", "车速_持续时间", "应用请求启动"}
|
|
missing_signals = key_signals - trigger_signals
|
|
results.append({
|
|
"check": "触发信号覆盖(车速/档位/持续时间/启动请求)",
|
|
"status": PASS if not missing_signals else WARN,
|
|
"detail": f"已覆盖信号: {sorted(trigger_signals)}"
|
|
+ (f"; 未覆盖: {missing_signals}" if missing_signals else ""),
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# ---- Switch Coverage Audit ----
|
|
|
|
def audit_switch_coverage(rules: list[dict]) -> list[dict]:
|
|
"""Check that rules cover both switch ON and OFF states."""
|
|
switch_on = False
|
|
switch_off = False
|
|
|
|
for rule in rules:
|
|
sw = rule.get("precondition", {}).get("switch", "")
|
|
if sw == "开启":
|
|
switch_on = True
|
|
elif sw == "关闭":
|
|
switch_off = True
|
|
|
|
status = PASS
|
|
detail_parts = []
|
|
if switch_on:
|
|
detail_parts.append("开关=开启: 有规则覆盖")
|
|
else:
|
|
detail_parts.append("开关=开启: 未找到规则")
|
|
status = FAIL
|
|
if switch_off:
|
|
detail_parts.append("开关=关闭: 有规则覆盖")
|
|
else:
|
|
detail_parts.append("开关=关闭: 未找到规则")
|
|
status = FAIL
|
|
|
|
return [{
|
|
"check": "开关状态完整性(开启/关闭)",
|
|
"status": status,
|
|
"detail": "; ".join(detail_parts),
|
|
}]
|
|
|
|
|
|
# ---- Audit Report Generation ----
|
|
|
|
def generate_audit_report(
|
|
rules: list[dict],
|
|
doc: dict,
|
|
feature_name: str,
|
|
path_results: list[dict],
|
|
path_stats: dict,
|
|
enum_results: list[dict],
|
|
switch_results: list[dict],
|
|
consistency_results: list[dict],
|
|
contradictions: list[dict],
|
|
unresolved_contradictions: list[dict],
|
|
autocomplete_count: int,
|
|
path_conflicts: list[dict] | None = None,
|
|
) -> str:
|
|
"""Generate ir_audit_report.md with all audit sections."""
|
|
lines = []
|
|
lines.append("# IR 完整性审计报告")
|
|
lines.append("")
|
|
lines.append(f"**功能**: {feature_name}")
|
|
lines.append(f"**规则总数**: {len(rules)}")
|
|
lines.append(f"**生成时间**: {datetime.now().isoformat()}")
|
|
lines.append("")
|
|
|
|
# Human review notice
|
|
issue_count = sum(
|
|
1 for r in path_results + enum_results + switch_results + consistency_results
|
|
if r["status"] in (WARN, FAIL)
|
|
) + len(unresolved_contradictions) + len(path_conflicts or [])
|
|
|
|
lines.append(
|
|
f"> **重要**: 请人工审查以下标记项。"
|
|
f"共 {issue_count} 项需要关注。"
|
|
)
|
|
lines.append(
|
|
f'> 如无需修改,在对应项后标注 **"已确认"**。'
|
|
)
|
|
lines.append("")
|
|
|
|
# ---- Section 1: Path Coverage ----
|
|
lines.append("## 1. 逻辑树路径覆盖率")
|
|
lines.append("")
|
|
lines.append(
|
|
f"**总体**: {path_stats.get('covered_paths', 0)}/"
|
|
f"{path_stats.get('total_paths', 0)} 路径已覆盖 "
|
|
f"({path_stats.get('coverage_pct', 0)}%)"
|
|
)
|
|
lines.append("")
|
|
lines.append("| 图片 ID | 覆盖率 | 状态 | 详情 |")
|
|
lines.append("|---------|--------|------|------|")
|
|
for r in path_results:
|
|
lines.append(
|
|
f"| {r['image_id']} | {r['coverage_pct']}% "
|
|
f"| {r['status']} | {r['detail']} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# Uncovered path details
|
|
for r in path_results:
|
|
uncovered = r.get("uncovered_paths", [])
|
|
if uncovered:
|
|
lines.append(f"### {r['image_id']} 未覆盖路径详情")
|
|
lines.append("")
|
|
for p in uncovered[:10]:
|
|
meaning = p.get("meaning", "?")
|
|
node_ids = p.get("node_ids", [])
|
|
lines.append(
|
|
f"- **路径**: {meaning} "
|
|
f"(节点: {' → '.join(node_ids)})"
|
|
)
|
|
if len(uncovered) > 10:
|
|
lines.append(f"- ... 还有 {len(uncovered) - 10} 条未覆盖路径")
|
|
lines.append("")
|
|
|
|
# ---- Section 2: Table Enumeration ----
|
|
lines.append("## 2. 表格枚举覆盖")
|
|
lines.append("")
|
|
lines.append("| 检查项 | 状态 | 详情 |")
|
|
lines.append("|--------|------|------|")
|
|
for r in enum_results:
|
|
lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |")
|
|
lines.append("")
|
|
|
|
# ---- Section 3: Switch Coverage ----
|
|
lines.append("## 3. 全局开关状态覆盖")
|
|
lines.append("")
|
|
lines.append("| 检查项 | 状态 | 详情 |")
|
|
lines.append("|--------|------|------|")
|
|
for r in switch_results:
|
|
lines.append(f"| {r['check']} | {r['status']} | {r['detail']} |")
|
|
lines.append("")
|
|
|
|
# ---- Section 4: Consistency Scan ----
|
|
lines.append("## 4. 一致性扫描报告")
|
|
lines.append("")
|
|
|
|
lines.append("### 4.1 术语统一性")
|
|
lines.append("")
|
|
lines.append("| 字段 | 状态 | 详情 |")
|
|
lines.append("|------|------|------|")
|
|
for r in consistency_results:
|
|
expected = r.get("expected", [])
|
|
expected_str = f" (期望: {expected})" if expected else ""
|
|
lines.append(
|
|
f"| {r['field']} | {r['status']} | {r['issue']}{expected_str} |"
|
|
)
|
|
lines.append("")
|
|
|
|
lines.append("### 4.2 规则矛盾检测")
|
|
lines.append("")
|
|
if contradictions:
|
|
auto_resolved = [c for c in contradictions if c.get("auto_resolved_by")]
|
|
remaining = [c for c in contradictions if not c.get("auto_resolved_by")]
|
|
|
|
if auto_resolved:
|
|
lines.append(f"**自动解决**: {len(auto_resolved)} 项 (通过图文冲突仲裁)")
|
|
lines.append("")
|
|
for c in auto_resolved:
|
|
lines.append(
|
|
f"- {c['rule_a']} vs {c['rule_b']}: "
|
|
f"已按仲裁 '**{c['auto_resolved_by']}**' 处理"
|
|
)
|
|
lines.append("")
|
|
|
|
if remaining:
|
|
lines.append(f"**需人工确认**: {len(remaining)} 项")
|
|
lines.append("")
|
|
for c in remaining:
|
|
lines.append(f"### 矛盾: {c['rule_a']} vs {c['rule_b']}")
|
|
lines.append(f"- **冲突点**: {c['conflict_point']}")
|
|
lines.append(f"- **路径A**: {c.get('rule_a_path', [])}")
|
|
lines.append(f"- **路径B**: {c.get('rule_b_path', [])}")
|
|
lines.append(f"- **建议**: {c.get('recommendation', '请人工判断')}")
|
|
lines.append(f'- [ ] 已确认 (标注 **"已确认"**)')
|
|
lines.append("")
|
|
else:
|
|
lines.append("未检测到规则矛盾。")
|
|
lines.append("")
|
|
|
|
# ---- Section 4.3: Path Conflicts ----
|
|
lines.append("### 4.3 路径冲突(同path不同行为)")
|
|
lines.append("")
|
|
if path_conflicts:
|
|
for pc in path_conflicts:
|
|
lines.append(f"- **Path**: {' > '.join(pc['path'])}")
|
|
lines.append(f" - 规则: {', '.join(pc['rule_ids'])}")
|
|
lines.append(f" - 不同行为数: {pc['distinct_behaviors']}")
|
|
lines.append(f" - 建议: {pc['suggestion']}")
|
|
lines.append("")
|
|
else:
|
|
lines.append("未检测到路径冲突。")
|
|
lines.append("")
|
|
|
|
# ---- Section 5: Auto-Complete Summary ----
|
|
lines.append("## 5. 自动补全摘要")
|
|
lines.append("")
|
|
if autocomplete_count > 0:
|
|
lines.append(f"- 自动补全片段数: {autocomplete_count}")
|
|
lines.append(
|
|
f"- 补全后路径覆盖率: "
|
|
f"{path_stats.get('coverage_pct', 0)}%"
|
|
)
|
|
lines.append(f"- 自动生成的规则已合并到最终规则集中")
|
|
else:
|
|
lines.append("- 未执行自动补全(所有路径已被手动覆盖,或未运行 step2.5)")
|
|
lines.append("")
|
|
|
|
# ---- Section 6: Rule Manifest ----
|
|
lines.append("## 6. 规则清单")
|
|
lines.append("")
|
|
lines.append("| rule_id | Priority | Path | 简述 |")
|
|
lines.append("|---------|----------|------|------|")
|
|
for rule in rules:
|
|
rid = rule.get("rule_id", "?")
|
|
pri = rule.get("priority", "?")
|
|
path_str = " > ".join(rule.get("path", []))
|
|
desc = rule.get("description", "")[:60]
|
|
lines.append(f"| {rid} | {pri} | {path_str} | {desc} |")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _extract_config_defaults(doc: dict, semantic_index: dict) -> dict:
|
|
"""Extract configuration defaults (e.g. switch default states) from document.
|
|
|
|
Scans table text for patterns like "默认开启"/"默认关闭" and checks
|
|
semantic_index concepts for "默认" keywords.
|
|
"""
|
|
defaults = {}
|
|
|
|
# Scan document tables for default config values
|
|
for section in doc.get("sections", []):
|
|
for block in section.get("blocks", []):
|
|
if block["type"] != "table":
|
|
continue
|
|
for row in block.get("rows", []):
|
|
row_texts = []
|
|
for col in row.get("columns", []):
|
|
row_texts.append(f"{col.get('name','')}: {col.get('text','')}")
|
|
combined = " ".join(row_texts)
|
|
|
|
if "行车娱乐限制开关" in combined or "开关" in combined:
|
|
if "默认开启" in combined or "默认状态:开启" in combined:
|
|
defaults["行车娱乐限制开关"] = {
|
|
"default": "开启",
|
|
"section": section.get("source", "").split()[0]
|
|
if section.get("source") else "",
|
|
}
|
|
elif "默认关闭" in combined or "默认状态:关闭" in combined:
|
|
defaults["行车娱乐限制开关"] = {
|
|
"default": "关闭",
|
|
"section": section.get("source", "").split()[0]
|
|
if section.get("source") else "",
|
|
}
|
|
|
|
# Supplement from semantic_index concepts
|
|
for concept in semantic_index.get("concepts", []):
|
|
name = concept.get("name", "")
|
|
if "默认" in name:
|
|
if "开启" in name:
|
|
defaults.setdefault("行车娱乐限制开关", {})["default"] = "开启"
|
|
elif "关闭" in name:
|
|
defaults.setdefault("行车娱乐限制开关", {})["default"] = "关闭"
|
|
|
|
return defaults
|
|
|
|
|
|
def _detect_path_conflicts(rules: list[dict]) -> list[dict]:
|
|
"""Detect rules that share the same path triplet but have different behaviors.
|
|
|
|
Returns list of conflict items for the audit report.
|
|
"""
|
|
from collections import defaultdict
|
|
|
|
path_groups = defaultdict(list)
|
|
for rule in rules:
|
|
path_key = tuple(rule.get("path", []))
|
|
path_groups[path_key].append(rule)
|
|
|
|
conflicts = []
|
|
for path_key, group in path_groups.items():
|
|
if len(group) <= 1:
|
|
continue
|
|
# Check if rules in the same path have different trigger/action signatures
|
|
signatures = set()
|
|
for r in group:
|
|
trigger = r.get("trigger", {})
|
|
actions = tuple(
|
|
a.get("description", "") for a in r.get("actions", [])
|
|
)
|
|
sig = (
|
|
tuple(sorted(
|
|
(c.get("signal",""), c.get("operator",""), str(c.get("value","")))
|
|
for c in trigger.get("conditions", [])
|
|
)),
|
|
actions,
|
|
)
|
|
signatures.add(sig)
|
|
|
|
if len(signatures) > 1:
|
|
# Same path, different behaviors → potential organization issue
|
|
conflicts.append({
|
|
"status": "WARN",
|
|
"type": "path_collision",
|
|
"path": list(path_key),
|
|
"rule_ids": [r["rule_id"] for r in group],
|
|
"count": len(group),
|
|
"distinct_behaviors": len(signatures),
|
|
"suggestion": "多条规则共享相同path但行为不同,考虑拆分path或使用更细粒度的叶子路径",
|
|
})
|
|
|
|
return conflicts
|
|
|
|
|
|
# ---- Main ----
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("阶段三:确定性合并、一致性校验与完整性审计")
|
|
print("=" * 60)
|
|
|
|
# 1. Load inputs
|
|
print(f"\n[1/7] 加载输入...")
|
|
fragments = load_fragments()
|
|
autocomplete_fragments = load_autocomplete_fragments()
|
|
doc = config.load_input_document()
|
|
semantic_index = load_semantic_index()
|
|
path_enum = load_path_enumeration()
|
|
|
|
total_fragments = len(fragments)
|
|
if total_fragments == 0 and not autocomplete_fragments:
|
|
print("错误: 无 IR 片段可合并 (fragments 和 autocomplete_fragments 均为空)。")
|
|
print(" 请检查 step2_ir_extraction 是否正确运行。")
|
|
print(" 可能原因: step1 未生成 function_units,或 step2 提取失败。")
|
|
sys.exit(1)
|
|
|
|
feature_name = semantic_index.get("feature_name", "行车娱乐限制")
|
|
feature_id = "DRL-001"
|
|
print(f" 功能: {feature_name} ({feature_id})")
|
|
print(f" 主片段: {total_fragments}")
|
|
if autocomplete_fragments:
|
|
print(f" 自动补全片段: {len(autocomplete_fragments)}")
|
|
|
|
# 2. Merge rules
|
|
print(f"\n[2/7] 合并去重...")
|
|
merged_rules = merge_rules(fragments, autocomplete_fragments)
|
|
|
|
# 2.5 Normalize rules (fix missing triggers, null operators)
|
|
merged_rules = [_normalize_rule(r) for r in merged_rules]
|
|
print(f" 标准化: {len(merged_rules)} 条规则")
|
|
|
|
# 3. Reassign rule IDs
|
|
print(f"\n[3/7] 重分配 rule_id (层次化格式)...")
|
|
final_rules = assign_rule_ids(merged_rules, feature_id)
|
|
print(f" 已分配 {len(final_rules)} 个稳定 ID")
|
|
|
|
# Show ID examples
|
|
if final_rules:
|
|
sample_ids = [r["rule_id"] for r in final_rules[:3]]
|
|
print(f" 示例: {sample_ids}")
|
|
|
|
# 4. Consistency checks
|
|
print(f"\n[4/7] 一致性扫描...")
|
|
consistency_results = _check_naming_consistency(final_rules)
|
|
|
|
n_warns = sum(1 for r in consistency_results if r["status"] == WARN)
|
|
if n_warns:
|
|
print(f" {WARN} {n_warns} 个术语不一致问题")
|
|
else:
|
|
print(f" {PASS} 术语统一")
|
|
|
|
contradictions = _detect_contradictions(final_rules)
|
|
resolved, unresolved = _auto_resolve_contradictions(contradictions, doc)
|
|
|
|
if resolved:
|
|
print(f" {PASS} 自动解决 {len(resolved)} 个矛盾")
|
|
if unresolved:
|
|
print(f" {WARN} {len(unresolved)} 个矛盾需要人工确认")
|
|
for c in unresolved:
|
|
print(f" - {c['rule_a']} vs {c['rule_b']}: {c['conflict_point'][:80]}")
|
|
if not contradictions:
|
|
print(f" {PASS} 未检测到规则矛盾")
|
|
|
|
path_conflicts = _detect_path_conflicts(final_rules)
|
|
if path_conflicts:
|
|
print(f" {WARN} {len(path_conflicts)} 个 path 冲突(同path不同行为)")
|
|
else:
|
|
print(f" {PASS} 无 path 冲突")
|
|
|
|
# 5. Generate audit report
|
|
print(f"\n[5/7] 生成审计报告...")
|
|
path_results, path_stats = audit_path_coverage(doc, final_rules)
|
|
enum_results = audit_table_enums(final_rules, doc)
|
|
switch_results = audit_switch_coverage(final_rules)
|
|
|
|
report = generate_audit_report(
|
|
final_rules, doc, feature_name,
|
|
path_results, path_stats,
|
|
enum_results, switch_results,
|
|
consistency_results,
|
|
contradictions, unresolved,
|
|
len(autocomplete_fragments),
|
|
path_conflicts,
|
|
)
|
|
|
|
# 6. Extract config defaults from document
|
|
config_defaults = _extract_config_defaults(doc, semantic_index)
|
|
if config_defaults:
|
|
print(f" 配置默认值: {list(config_defaults.keys())}")
|
|
|
|
# 7. Save outputs
|
|
print(f"\n[7/7] 保存输出...")
|
|
ir_final = {
|
|
"feature": feature_name,
|
|
"feature_id": feature_id,
|
|
"rules": final_rules,
|
|
}
|
|
if config_defaults:
|
|
ir_final["config_defaults"] = config_defaults
|
|
config.save_json(ir_final, config.IR_FINAL_JSON)
|
|
print(f" IR: {config.IR_FINAL_JSON}")
|
|
|
|
with open(config.IR_AUDIT_REPORT_MD, "w", encoding="utf-8") as f:
|
|
f.write(report)
|
|
print(f" 审计报告: {config.IR_AUDIT_REPORT_MD}")
|
|
|
|
# Summary
|
|
print(f"\n完成!")
|
|
issue_count = (
|
|
sum(1 for r in path_results + enum_results + switch_results
|
|
if r["status"] in (WARN, FAIL))
|
|
+ n_warns
|
|
+ len(unresolved)
|
|
+ len(path_conflicts)
|
|
)
|
|
print(f" 规则: {len(final_rules)} 条")
|
|
print(f" 路径覆盖: {path_stats.get('coverage_pct', 0)}%")
|
|
print(f" 审计问题: {issue_count} 个需要关注")
|
|
|
|
if issue_count > 0:
|
|
print(f"\n 请查看 {config.IR_AUDIT_REPORT_MD} 并审查标记项。")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|