fec4c09ee0
CI / test (push) Successful in 8s
doc_parser_skill: - New: verify_flowchart.py (flowchart validation) - Updated: LLM.py (multi-provider: DeepSeek + DashScope) - Updated: image_parser.py (logic tree support, external prompts) - Updated: SKILL.md, prompts/image_prompt.md conflict_detection_skill: - Updated: LLM.py (multi-provider sync) - Updated: detect_conflicts.py (logic tree text conversion) ir_generation_skill: - Replaced old scripts/LLM.py + ir_generator.py with standalone project - New: main.py, config.py, step1-3_*.py, ensemble_merge.py - New: prompts/, tests/ subdirectories tests: - New: acceptance/ test suite with schema validation - Fixed: conftest no longer globally skips non-acceptance tests - Updated: test_sample.py for new ir_generation structure Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""Rich IR schema definition and validators for the document_analyzer QE framework.
|
|
|
|
Target format is the production IR (``ir_final.json``):
|
|
{feature, feature_id, config_defaults?, rules: [{rule_id, path, description,
|
|
priority, sources, precondition, trigger, actions}]}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
# ── Constants ────────────────────────────────────────────────────────────────
|
|
|
|
VALID_SOURCE_TYPES = {"table", "logic_tree", "text"}
|
|
VALID_ACTION_TYPES = {"system", "user_interaction"}
|
|
VALID_PRIORITIES = {"P0", "P1", "P2"}
|
|
VALID_TRIGGER_OPERATORS = {"AND", "OR"}
|
|
|
|
# rule_id pattern: FEAT-NNN-SCOPE-TYPE-...-PATH-NN (variable middle segments)
|
|
RULE_ID_RE = re.compile(
|
|
r"^[A-Z]+-\d+(-[A-Z]+)+-\d+$"
|
|
)
|
|
|
|
|
|
# ── Validation helpers ──────────────────────────────────────────────────────
|
|
|
|
def _check(condition: bool, message: str) -> list[str]:
|
|
"""Return a list with an error message if *condition* is False, else empty list."""
|
|
return [] if condition else [message]
|
|
|
|
|
|
def validate_rule(rule: dict, index: int = 0) -> list[str]:
|
|
"""Validate a single rule dict. Returns a (possibly empty) list of error strings."""
|
|
errors: list[str] = []
|
|
label = f"rules[{index}]"
|
|
|
|
if not isinstance(rule, dict):
|
|
return [f"{label}: not a dict"]
|
|
|
|
# ── required top-level fields ──
|
|
for field in ("rule_id", "description"):
|
|
errors.extend(_check(
|
|
isinstance(rule.get(field), str) and bool(rule[field].strip()),
|
|
f'{label}.{field}: required non-empty string',
|
|
))
|
|
|
|
# sources is a list, not a string — validated separately below
|
|
|
|
# ── rule_id naming ──
|
|
rid = rule.get("rule_id", "")
|
|
if rid and isinstance(rid, str):
|
|
errors.extend(_check(
|
|
bool(RULE_ID_RE.match(rid)),
|
|
f'{label}.rule_id: "{rid}" does not match pattern FEAT-NNN-SCOPE-TYPE-PATH-NN',
|
|
))
|
|
|
|
# ── priority ──
|
|
priority = rule.get("priority")
|
|
if priority is not None:
|
|
errors.extend(_check(
|
|
priority in VALID_PRIORITIES,
|
|
f'{label}.priority: "{priority}" not in {VALID_PRIORITIES}',
|
|
))
|
|
|
|
# ── path ──
|
|
path = rule.get("path")
|
|
if path is not None:
|
|
if not isinstance(path, list):
|
|
errors.append(f"{label}.path: must be a list")
|
|
elif len(path) == 0:
|
|
errors.append(f"{label}.path: must not be empty")
|
|
elif not all(isinstance(p, str) and p.strip() for p in path):
|
|
errors.append(f"{label}.path: all segments must be non-empty strings")
|
|
|
|
# ── sources[] ──
|
|
sources = rule.get("sources", [])
|
|
if not isinstance(sources, list):
|
|
errors.append(f"{label}.sources: must be a list")
|
|
elif len(sources) == 0:
|
|
errors.append(f"{label}.sources: must have at least one source")
|
|
else:
|
|
for si, src in enumerate(sources):
|
|
errors.extend(_validate_source(src, f"{label}.sources[{si}]"))
|
|
|
|
# ── precondition ──
|
|
precondition = rule.get("precondition")
|
|
if precondition is not None:
|
|
if not isinstance(precondition, dict):
|
|
errors.append(f"{label}.precondition: must be a dict")
|
|
elif len(precondition) == 0:
|
|
errors.append(f"{label}.precondition: must not be empty")
|
|
|
|
# ── trigger ──
|
|
trigger = rule.get("trigger")
|
|
if trigger is not None:
|
|
if not isinstance(trigger, dict):
|
|
errors.append(f"{label}.trigger: must be a dict")
|
|
else:
|
|
errors.extend(_validate_trigger(trigger, f"{label}.trigger"))
|
|
|
|
# ── actions ──
|
|
actions = rule.get("actions")
|
|
if actions is not None:
|
|
if not isinstance(actions, list):
|
|
errors.append(f"{label}.actions: must be a list")
|
|
else:
|
|
for ai, act in enumerate(actions):
|
|
errors.extend(_validate_action(act, f"{label}.actions[{ai}]"))
|
|
|
|
# ── no null values at any depth ──
|
|
errors.extend(_find_nulls(rule, label))
|
|
|
|
return errors
|
|
|
|
|
|
def _validate_source(src: dict, label: str) -> list[str]:
|
|
errors: list[str] = []
|
|
if not isinstance(src, dict):
|
|
return [f"{label}: not a dict"]
|
|
|
|
stype = src.get("type", "")
|
|
errors.extend(_check(
|
|
stype in VALID_SOURCE_TYPES,
|
|
f'{label}.type: "{stype}" not in {VALID_SOURCE_TYPES}',
|
|
))
|
|
|
|
priority = src.get("priority", "")
|
|
if priority:
|
|
errors.extend(_check(
|
|
priority in ("primary_source", "supplementary"),
|
|
f'{label}.priority: "{priority}" must be primary_source or supplementary',
|
|
))
|
|
|
|
# type-specific fields
|
|
if stype == "table":
|
|
errors.extend(_check(
|
|
isinstance(src.get("section"), str) and bool(src["section"].strip()),
|
|
f"{label}.section: required non-empty string for table source",
|
|
))
|
|
errors.extend(_check(
|
|
isinstance(src.get("row"), int),
|
|
f"{label}.row: required int for table source",
|
|
))
|
|
elif stype == "logic_tree":
|
|
errors.extend(_check(
|
|
isinstance(src.get("image_id"), str) and bool(src["image_id"].strip()),
|
|
f"{label}.image_id: required non-empty string for logic_tree source",
|
|
))
|
|
node_ids = src.get("node_ids", [])
|
|
errors.extend(_check(
|
|
isinstance(node_ids, list) and len(node_ids) > 0,
|
|
f"{label}.node_ids: required non-empty list for logic_tree source",
|
|
))
|
|
elif stype == "text":
|
|
errors.extend(_check(
|
|
isinstance(src.get("section"), str) and bool(src["section"].strip()),
|
|
f"{label}.section: required non-empty string for text source",
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
def _validate_trigger(trigger: dict, label: str) -> list[str]:
|
|
errors: list[str] = []
|
|
operator = trigger.get("operator", "")
|
|
errors.extend(_check(
|
|
operator in VALID_TRIGGER_OPERATORS,
|
|
f'{label}.operator: "{operator}" not in {VALID_TRIGGER_OPERATORS}',
|
|
))
|
|
|
|
conditions = trigger.get("conditions")
|
|
if conditions is not None:
|
|
if not isinstance(conditions, list):
|
|
errors.append(f"{label}.conditions: must be a list")
|
|
else:
|
|
for ci, cond in enumerate(conditions):
|
|
if not isinstance(cond, dict):
|
|
errors.append(f"{label}.conditions[{ci}]: not a dict")
|
|
else:
|
|
errors.extend(_check(
|
|
isinstance(cond.get("signal"), str) and bool(cond["signal"].strip()),
|
|
f"{label}.conditions[{ci}].signal: required non-empty string",
|
|
))
|
|
errors.extend(_check(
|
|
"operator" in cond,
|
|
f"{label}.conditions[{ci}].operator: required",
|
|
))
|
|
# empty conditions is valid (e.g. "switch always off, no conditions")
|
|
|
|
return errors
|
|
|
|
|
|
def _validate_action(action: dict, label: str) -> list[str]:
|
|
errors: list[str] = []
|
|
if not isinstance(action, dict):
|
|
return [f"{label}: not a dict"]
|
|
|
|
atype = action.get("type", "")
|
|
errors.extend(_check(
|
|
atype in VALID_ACTION_TYPES,
|
|
f'{label}.type: "{atype}" not in {VALID_ACTION_TYPES}',
|
|
))
|
|
errors.extend(_check(
|
|
isinstance(action.get("description"), str) and bool(action["description"].strip()),
|
|
f"{label}.description: required non-empty string",
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
def _find_nulls(obj: Any, label: str) -> list[str]:
|
|
"""Find any None values at any depth in *obj*."""
|
|
errors: list[str] = []
|
|
if obj is None:
|
|
return [f"{label}: null value"]
|
|
elif isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
errors.extend(_find_nulls(v, f"{label}.{k}"))
|
|
elif isinstance(obj, list):
|
|
for i, v in enumerate(obj):
|
|
errors.extend(_find_nulls(v, f"{label}[{i}]"))
|
|
return errors
|
|
|
|
|
|
# ── Top-level validation ────────────────────────────────────────────────────
|
|
|
|
def validate_ir(ir_data: dict) -> dict:
|
|
"""Validate the entire IR document.
|
|
|
|
Returns:
|
|
{
|
|
"valid": bool,
|
|
"errors": [str, ...],
|
|
"stats": {total_rules, valid_rules, has_config_defaults, ...}
|
|
}
|
|
"""
|
|
errors: list[str] = []
|
|
stats = {"total_rules": 0, "valid_rules": 0, "has_config_defaults": False, "features": 0}
|
|
|
|
if not isinstance(ir_data, dict):
|
|
return {"valid": False, "errors": ["IR root is not a dict"], "stats": stats}
|
|
|
|
# top-level required fields
|
|
for field in ("feature", "feature_id", "rules"):
|
|
if field not in ir_data:
|
|
errors.append(f"root.{field}: missing required field")
|
|
elif field in ("feature", "feature_id") and not (
|
|
isinstance(ir_data[field], str) and ir_data[field].strip()
|
|
):
|
|
errors.append(f"root.{field}: must be non-empty string")
|
|
|
|
# config_defaults (optional)
|
|
if "config_defaults" in ir_data:
|
|
stats["has_config_defaults"] = True
|
|
cd = ir_data["config_defaults"]
|
|
if not isinstance(cd, dict):
|
|
errors.append("root.config_defaults: must be a dict")
|
|
|
|
# rules array
|
|
rules = ir_data.get("rules", [])
|
|
if not isinstance(rules, list):
|
|
errors.append("root.rules: must be a list")
|
|
else:
|
|
stats["total_rules"] = len(rules)
|
|
if len(rules) == 0:
|
|
errors.append("root.rules: must have at least one rule")
|
|
else:
|
|
for i, rule in enumerate(rules):
|
|
rule_errors = validate_rule(rule, i)
|
|
if rule_errors:
|
|
errors.extend(rule_errors)
|
|
else:
|
|
stats["valid_rules"] += 1
|
|
|
|
# feature count
|
|
if isinstance(ir_data.get("feature_id"), str):
|
|
stats["features"] = 1
|
|
|
|
return {
|
|
"valid": len(errors) == 0,
|
|
"errors": errors,
|
|
"stats": stats,
|
|
}
|
|
|
|
|
|
# ── Summary helpers ─────────────────────────────────────────────────────────
|
|
|
|
def schema_checklist(ir_data: dict) -> list[dict]:
|
|
"""Run individual checks and return a checklist for reporting.
|
|
|
|
Each item: {"check": str, "passed": bool, "detail": str}
|
|
"""
|
|
report = validate_ir(ir_data)
|
|
checks: list[dict] = []
|
|
|
|
def _add(name: str, passed: bool, detail: str = ""):
|
|
checks.append({"check": name, "passed": passed, "detail": detail})
|
|
|
|
# Top-level
|
|
_add("root is dict", isinstance(ir_data, dict))
|
|
_add("root.feature present", isinstance(ir_data.get("feature"), str) and bool(ir_data["feature"].strip()))
|
|
_add("root.feature_id present", isinstance(ir_data.get("feature_id"), str) and bool(ir_data["feature_id"].strip()))
|
|
_add("root.rules is non-empty list", isinstance(ir_data.get("rules"), list) and len(ir_data["rules"]) > 0)
|
|
|
|
# Per-rule checks
|
|
rules = ir_data.get("rules", []) if isinstance(ir_data, dict) else []
|
|
rule_ids = []
|
|
for i, rule in enumerate(rules):
|
|
if not isinstance(rule, dict):
|
|
continue
|
|
rid = rule.get("rule_id", f"rules[{i}]")
|
|
rule_ids.append(rid)
|
|
|
|
errs = validate_rule(rule, i)
|
|
_add(f"{rid}: valid", len(errs) == 0, "; ".join(errs) if errs else "")
|
|
|
|
# Aggregate checks
|
|
_add("no duplicate rule_ids", len(rule_ids) == len(set(rule_ids)),
|
|
f"duplicates: {[r for r in rule_ids if rule_ids.count(r) > 1]}" if len(rule_ids) != len(set(rule_ids)) else "")
|
|
|
|
_add("all rules valid", report["valid"],
|
|
f"{report['stats']['valid_rules']}/{report['stats']['total_rules']} valid")
|
|
|
|
return checks
|