Files
document_analyzer/tests/acceptance/ir_schema.py
T
pzhang_zywl fec4c09ee0
CI / test (push) Successful in 8s
sync: update all skills from latest workspace code
doc_parser_skill:
- New: verify_flowchart.py (flowchart validation)
- Updated: LLM.py (multi-provider: DeepSeek + DashScope)
- Updated: image_parser.py (logic tree support, external prompts)
- Updated: SKILL.md, prompts/image_prompt.md

conflict_detection_skill:
- Updated: LLM.py (multi-provider sync)
- Updated: detect_conflicts.py (logic tree text conversion)

ir_generation_skill:
- Replaced old scripts/LLM.py + ir_generator.py with standalone project
- New: main.py, config.py, step1-3_*.py, ensemble_merge.py
- New: prompts/, tests/ subdirectories

tests:
- New: acceptance/ test suite with schema validation
- Fixed: conftest no longer globally skips non-acceptance tests
- Updated: test_sample.py for new ir_generation structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-30 22:45:08 +08:00

326 lines
12 KiB
Python

"""Rich IR schema definition and validators for the document_analyzer QE framework.
Target format is the production IR (``ir_final.json``):
{feature, feature_id, config_defaults?, rules: [{rule_id, path, description,
priority, sources, precondition, trigger, actions}]}
"""
from __future__ import annotations
import re
from typing import Any
# ── Constants ────────────────────────────────────────────────────────────────
VALID_SOURCE_TYPES = {"table", "logic_tree", "text"}
VALID_ACTION_TYPES = {"system", "user_interaction"}
VALID_PRIORITIES = {"P0", "P1", "P2"}
VALID_TRIGGER_OPERATORS = {"AND", "OR"}
# rule_id pattern: FEAT-NNN-SCOPE-TYPE-...-PATH-NN (variable middle segments)
RULE_ID_RE = re.compile(
r"^[A-Z]+-\d+(-[A-Z]+)+-\d+$"
)
# ── Validation helpers ──────────────────────────────────────────────────────
def _check(condition: bool, message: str) -> list[str]:
"""Return a list with an error message if *condition* is False, else empty list."""
return [] if condition else [message]
def validate_rule(rule: dict, index: int = 0) -> list[str]:
"""Validate a single rule dict. Returns a (possibly empty) list of error strings."""
errors: list[str] = []
label = f"rules[{index}]"
if not isinstance(rule, dict):
return [f"{label}: not a dict"]
# ── required top-level fields ──
for field in ("rule_id", "description"):
errors.extend(_check(
isinstance(rule.get(field), str) and bool(rule[field].strip()),
f'{label}.{field}: required non-empty string',
))
# sources is a list, not a string — validated separately below
# ── rule_id naming ──
rid = rule.get("rule_id", "")
if rid and isinstance(rid, str):
errors.extend(_check(
bool(RULE_ID_RE.match(rid)),
f'{label}.rule_id: "{rid}" does not match pattern FEAT-NNN-SCOPE-TYPE-PATH-NN',
))
# ── priority ──
priority = rule.get("priority")
if priority is not None:
errors.extend(_check(
priority in VALID_PRIORITIES,
f'{label}.priority: "{priority}" not in {VALID_PRIORITIES}',
))
# ── path ──
path = rule.get("path")
if path is not None:
if not isinstance(path, list):
errors.append(f"{label}.path: must be a list")
elif len(path) == 0:
errors.append(f"{label}.path: must not be empty")
elif not all(isinstance(p, str) and p.strip() for p in path):
errors.append(f"{label}.path: all segments must be non-empty strings")
# ── sources[] ──
sources = rule.get("sources", [])
if not isinstance(sources, list):
errors.append(f"{label}.sources: must be a list")
elif len(sources) == 0:
errors.append(f"{label}.sources: must have at least one source")
else:
for si, src in enumerate(sources):
errors.extend(_validate_source(src, f"{label}.sources[{si}]"))
# ── precondition ──
precondition = rule.get("precondition")
if precondition is not None:
if not isinstance(precondition, dict):
errors.append(f"{label}.precondition: must be a dict")
elif len(precondition) == 0:
errors.append(f"{label}.precondition: must not be empty")
# ── trigger ──
trigger = rule.get("trigger")
if trigger is not None:
if not isinstance(trigger, dict):
errors.append(f"{label}.trigger: must be a dict")
else:
errors.extend(_validate_trigger(trigger, f"{label}.trigger"))
# ── actions ──
actions = rule.get("actions")
if actions is not None:
if not isinstance(actions, list):
errors.append(f"{label}.actions: must be a list")
else:
for ai, act in enumerate(actions):
errors.extend(_validate_action(act, f"{label}.actions[{ai}]"))
# ── no null values at any depth ──
errors.extend(_find_nulls(rule, label))
return errors
def _validate_source(src: dict, label: str) -> list[str]:
errors: list[str] = []
if not isinstance(src, dict):
return [f"{label}: not a dict"]
stype = src.get("type", "")
errors.extend(_check(
stype in VALID_SOURCE_TYPES,
f'{label}.type: "{stype}" not in {VALID_SOURCE_TYPES}',
))
priority = src.get("priority", "")
if priority:
errors.extend(_check(
priority in ("primary_source", "supplementary"),
f'{label}.priority: "{priority}" must be primary_source or supplementary',
))
# type-specific fields
if stype == "table":
errors.extend(_check(
isinstance(src.get("section"), str) and bool(src["section"].strip()),
f"{label}.section: required non-empty string for table source",
))
errors.extend(_check(
isinstance(src.get("row"), int),
f"{label}.row: required int for table source",
))
elif stype == "logic_tree":
errors.extend(_check(
isinstance(src.get("image_id"), str) and bool(src["image_id"].strip()),
f"{label}.image_id: required non-empty string for logic_tree source",
))
node_ids = src.get("node_ids", [])
errors.extend(_check(
isinstance(node_ids, list) and len(node_ids) > 0,
f"{label}.node_ids: required non-empty list for logic_tree source",
))
elif stype == "text":
errors.extend(_check(
isinstance(src.get("section"), str) and bool(src["section"].strip()),
f"{label}.section: required non-empty string for text source",
))
return errors
def _validate_trigger(trigger: dict, label: str) -> list[str]:
errors: list[str] = []
operator = trigger.get("operator", "")
errors.extend(_check(
operator in VALID_TRIGGER_OPERATORS,
f'{label}.operator: "{operator}" not in {VALID_TRIGGER_OPERATORS}',
))
conditions = trigger.get("conditions")
if conditions is not None:
if not isinstance(conditions, list):
errors.append(f"{label}.conditions: must be a list")
else:
for ci, cond in enumerate(conditions):
if not isinstance(cond, dict):
errors.append(f"{label}.conditions[{ci}]: not a dict")
else:
errors.extend(_check(
isinstance(cond.get("signal"), str) and bool(cond["signal"].strip()),
f"{label}.conditions[{ci}].signal: required non-empty string",
))
errors.extend(_check(
"operator" in cond,
f"{label}.conditions[{ci}].operator: required",
))
# empty conditions is valid (e.g. "switch always off, no conditions")
return errors
def _validate_action(action: dict, label: str) -> list[str]:
errors: list[str] = []
if not isinstance(action, dict):
return [f"{label}: not a dict"]
atype = action.get("type", "")
errors.extend(_check(
atype in VALID_ACTION_TYPES,
f'{label}.type: "{atype}" not in {VALID_ACTION_TYPES}',
))
errors.extend(_check(
isinstance(action.get("description"), str) and bool(action["description"].strip()),
f"{label}.description: required non-empty string",
))
return errors
def _find_nulls(obj: Any, label: str) -> list[str]:
"""Find any None values at any depth in *obj*."""
errors: list[str] = []
if obj is None:
return [f"{label}: null value"]
elif isinstance(obj, dict):
for k, v in obj.items():
errors.extend(_find_nulls(v, f"{label}.{k}"))
elif isinstance(obj, list):
for i, v in enumerate(obj):
errors.extend(_find_nulls(v, f"{label}[{i}]"))
return errors
# ── Top-level validation ────────────────────────────────────────────────────
def validate_ir(ir_data: dict) -> dict:
"""Validate the entire IR document.
Returns:
{
"valid": bool,
"errors": [str, ...],
"stats": {total_rules, valid_rules, has_config_defaults, ...}
}
"""
errors: list[str] = []
stats = {"total_rules": 0, "valid_rules": 0, "has_config_defaults": False, "features": 0}
if not isinstance(ir_data, dict):
return {"valid": False, "errors": ["IR root is not a dict"], "stats": stats}
# top-level required fields
for field in ("feature", "feature_id", "rules"):
if field not in ir_data:
errors.append(f"root.{field}: missing required field")
elif field in ("feature", "feature_id") and not (
isinstance(ir_data[field], str) and ir_data[field].strip()
):
errors.append(f"root.{field}: must be non-empty string")
# config_defaults (optional)
if "config_defaults" in ir_data:
stats["has_config_defaults"] = True
cd = ir_data["config_defaults"]
if not isinstance(cd, dict):
errors.append("root.config_defaults: must be a dict")
# rules array
rules = ir_data.get("rules", [])
if not isinstance(rules, list):
errors.append("root.rules: must be a list")
else:
stats["total_rules"] = len(rules)
if len(rules) == 0:
errors.append("root.rules: must have at least one rule")
else:
for i, rule in enumerate(rules):
rule_errors = validate_rule(rule, i)
if rule_errors:
errors.extend(rule_errors)
else:
stats["valid_rules"] += 1
# feature count
if isinstance(ir_data.get("feature_id"), str):
stats["features"] = 1
return {
"valid": len(errors) == 0,
"errors": errors,
"stats": stats,
}
# ── Summary helpers ─────────────────────────────────────────────────────────
def schema_checklist(ir_data: dict) -> list[dict]:
"""Run individual checks and return a checklist for reporting.
Each item: {"check": str, "passed": bool, "detail": str}
"""
report = validate_ir(ir_data)
checks: list[dict] = []
def _add(name: str, passed: bool, detail: str = ""):
checks.append({"check": name, "passed": passed, "detail": detail})
# Top-level
_add("root is dict", isinstance(ir_data, dict))
_add("root.feature present", isinstance(ir_data.get("feature"), str) and bool(ir_data["feature"].strip()))
_add("root.feature_id present", isinstance(ir_data.get("feature_id"), str) and bool(ir_data["feature_id"].strip()))
_add("root.rules is non-empty list", isinstance(ir_data.get("rules"), list) and len(ir_data["rules"]) > 0)
# Per-rule checks
rules = ir_data.get("rules", []) if isinstance(ir_data, dict) else []
rule_ids = []
for i, rule in enumerate(rules):
if not isinstance(rule, dict):
continue
rid = rule.get("rule_id", f"rules[{i}]")
rule_ids.append(rid)
errs = validate_rule(rule, i)
_add(f"{rid}: valid", len(errs) == 0, "; ".join(errs) if errs else "")
# Aggregate checks
_add("no duplicate rule_ids", len(rule_ids) == len(set(rule_ids)),
f"duplicates: {[r for r in rule_ids if rule_ids.count(r) > 1]}" if len(rule_ids) != len(set(rule_ids)) else "")
_add("all rules valid", report["valid"],
f"{report['stats']['valid_rules']}/{report['stats']['total_rules']} valid")
return checks