"""Rich IR schema definition and validators for the document_analyzer QE framework. Target format is the production IR (``ir_final.json``): {feature, feature_id, config_defaults?, rules: [{rule_id, path, description, priority, sources, precondition, trigger, actions}]} """ from __future__ import annotations import re from typing import Any # ── Constants ──────────────────────────────────────────────────────────────── VALID_SOURCE_TYPES = {"table", "logic_tree", "text"} VALID_ACTION_TYPES = {"system", "user_interaction"} VALID_PRIORITIES = {"P0", "P1", "P2"} VALID_TRIGGER_OPERATORS = {"AND", "OR"} # rule_id pattern: FEAT-NNN-SCOPE-TYPE-...-PATH-NN (variable middle segments) RULE_ID_RE = re.compile( r"^[A-Z]+-\d+(-[A-Z]+)+-\d+$" ) # ── Validation helpers ────────────────────────────────────────────────────── def _check(condition: bool, message: str) -> list[str]: """Return a list with an error message if *condition* is False, else empty list.""" return [] if condition else [message] def validate_rule(rule: dict, index: int = 0) -> list[str]: """Validate a single rule dict. Returns a (possibly empty) list of error strings.""" errors: list[str] = [] label = f"rules[{index}]" if not isinstance(rule, dict): return [f"{label}: not a dict"] # ── required top-level fields ── for field in ("rule_id", "description"): errors.extend(_check( isinstance(rule.get(field), str) and bool(rule[field].strip()), f'{label}.{field}: required non-empty string', )) # sources is a list, not a string — validated separately below # ── rule_id naming ── rid = rule.get("rule_id", "") if rid and isinstance(rid, str): errors.extend(_check( bool(RULE_ID_RE.match(rid)), f'{label}.rule_id: "{rid}" does not match pattern FEAT-NNN-SCOPE-TYPE-PATH-NN', )) # ── priority ── priority = rule.get("priority") if priority is not None: errors.extend(_check( priority in VALID_PRIORITIES, f'{label}.priority: "{priority}" not in {VALID_PRIORITIES}', )) # ── path ── path = rule.get("path") if path is not None: if not isinstance(path, list): errors.append(f"{label}.path: must be a list") elif len(path) == 0: errors.append(f"{label}.path: must not be empty") elif not all(isinstance(p, str) and p.strip() for p in path): errors.append(f"{label}.path: all segments must be non-empty strings") # ── sources[] ── sources = rule.get("sources", []) if not isinstance(sources, list): errors.append(f"{label}.sources: must be a list") elif len(sources) == 0: errors.append(f"{label}.sources: must have at least one source") else: for si, src in enumerate(sources): errors.extend(_validate_source(src, f"{label}.sources[{si}]")) # ── precondition ── precondition = rule.get("precondition") if precondition is not None: if not isinstance(precondition, dict): errors.append(f"{label}.precondition: must be a dict") elif len(precondition) == 0: errors.append(f"{label}.precondition: must not be empty") # ── trigger ── trigger = rule.get("trigger") if trigger is not None: if not isinstance(trigger, dict): errors.append(f"{label}.trigger: must be a dict") else: errors.extend(_validate_trigger(trigger, f"{label}.trigger")) # ── actions ── actions = rule.get("actions") if actions is not None: if not isinstance(actions, list): errors.append(f"{label}.actions: must be a list") else: for ai, act in enumerate(actions): errors.extend(_validate_action(act, f"{label}.actions[{ai}]")) # ── no null values at any depth ── errors.extend(_find_nulls(rule, label)) return errors def _validate_source(src: dict, label: str) -> list[str]: errors: list[str] = [] if not isinstance(src, dict): return [f"{label}: not a dict"] stype = src.get("type", "") errors.extend(_check( stype in VALID_SOURCE_TYPES, f'{label}.type: "{stype}" not in {VALID_SOURCE_TYPES}', )) priority = src.get("priority", "") if priority: errors.extend(_check( priority in ("primary_source", "supplementary"), f'{label}.priority: "{priority}" must be primary_source or supplementary', )) # type-specific fields if stype == "table": errors.extend(_check( isinstance(src.get("section"), str) and bool(src["section"].strip()), f"{label}.section: required non-empty string for table source", )) errors.extend(_check( isinstance(src.get("row"), int), f"{label}.row: required int for table source", )) elif stype == "logic_tree": errors.extend(_check( isinstance(src.get("image_id"), str) and bool(src["image_id"].strip()), f"{label}.image_id: required non-empty string for logic_tree source", )) node_ids = src.get("node_ids", []) errors.extend(_check( isinstance(node_ids, list) and len(node_ids) > 0, f"{label}.node_ids: required non-empty list for logic_tree source", )) elif stype == "text": errors.extend(_check( isinstance(src.get("section"), str) and bool(src["section"].strip()), f"{label}.section: required non-empty string for text source", )) return errors def _validate_trigger(trigger: dict, label: str) -> list[str]: errors: list[str] = [] operator = trigger.get("operator", "") errors.extend(_check( operator in VALID_TRIGGER_OPERATORS, f'{label}.operator: "{operator}" not in {VALID_TRIGGER_OPERATORS}', )) conditions = trigger.get("conditions") if conditions is not None: if not isinstance(conditions, list): errors.append(f"{label}.conditions: must be a list") else: for ci, cond in enumerate(conditions): if not isinstance(cond, dict): errors.append(f"{label}.conditions[{ci}]: not a dict") else: errors.extend(_check( isinstance(cond.get("signal"), str) and bool(cond["signal"].strip()), f"{label}.conditions[{ci}].signal: required non-empty string", )) errors.extend(_check( "operator" in cond, f"{label}.conditions[{ci}].operator: required", )) # empty conditions is valid (e.g. "switch always off, no conditions") return errors def _validate_action(action: dict, label: str) -> list[str]: errors: list[str] = [] if not isinstance(action, dict): return [f"{label}: not a dict"] atype = action.get("type", "") errors.extend(_check( atype in VALID_ACTION_TYPES, f'{label}.type: "{atype}" not in {VALID_ACTION_TYPES}', )) errors.extend(_check( isinstance(action.get("description"), str) and bool(action["description"].strip()), f"{label}.description: required non-empty string", )) return errors def _find_nulls(obj: Any, label: str) -> list[str]: """Find any None values at any depth in *obj*.""" errors: list[str] = [] if obj is None: return [f"{label}: null value"] elif isinstance(obj, dict): for k, v in obj.items(): errors.extend(_find_nulls(v, f"{label}.{k}")) elif isinstance(obj, list): for i, v in enumerate(obj): errors.extend(_find_nulls(v, f"{label}[{i}]")) return errors # ── Top-level validation ──────────────────────────────────────────────────── def validate_ir(ir_data: dict) -> dict: """Validate the entire IR document. Returns: { "valid": bool, "errors": [str, ...], "stats": {total_rules, valid_rules, has_config_defaults, ...} } """ errors: list[str] = [] stats = {"total_rules": 0, "valid_rules": 0, "has_config_defaults": False, "features": 0} if not isinstance(ir_data, dict): return {"valid": False, "errors": ["IR root is not a dict"], "stats": stats} # top-level required fields for field in ("feature", "feature_id", "rules"): if field not in ir_data: errors.append(f"root.{field}: missing required field") elif field in ("feature", "feature_id") and not ( isinstance(ir_data[field], str) and ir_data[field].strip() ): errors.append(f"root.{field}: must be non-empty string") # config_defaults (optional) if "config_defaults" in ir_data: stats["has_config_defaults"] = True cd = ir_data["config_defaults"] if not isinstance(cd, dict): errors.append("root.config_defaults: must be a dict") # rules array rules = ir_data.get("rules", []) if not isinstance(rules, list): errors.append("root.rules: must be a list") else: stats["total_rules"] = len(rules) if len(rules) == 0: errors.append("root.rules: must have at least one rule") else: for i, rule in enumerate(rules): rule_errors = validate_rule(rule, i) if rule_errors: errors.extend(rule_errors) else: stats["valid_rules"] += 1 # feature count if isinstance(ir_data.get("feature_id"), str): stats["features"] = 1 return { "valid": len(errors) == 0, "errors": errors, "stats": stats, } # ── Summary helpers ───────────────────────────────────────────────────────── def schema_checklist(ir_data: dict) -> list[dict]: """Run individual checks and return a checklist for reporting. Each item: {"check": str, "passed": bool, "detail": str} """ report = validate_ir(ir_data) checks: list[dict] = [] def _add(name: str, passed: bool, detail: str = ""): checks.append({"check": name, "passed": passed, "detail": detail}) # Top-level _add("root is dict", isinstance(ir_data, dict)) _add("root.feature present", isinstance(ir_data.get("feature"), str) and bool(ir_data["feature"].strip())) _add("root.feature_id present", isinstance(ir_data.get("feature_id"), str) and bool(ir_data["feature_id"].strip())) _add("root.rules is non-empty list", isinstance(ir_data.get("rules"), list) and len(ir_data["rules"]) > 0) # Per-rule checks rules = ir_data.get("rules", []) if isinstance(ir_data, dict) else [] rule_ids = [] for i, rule in enumerate(rules): if not isinstance(rule, dict): continue rid = rule.get("rule_id", f"rules[{i}]") rule_ids.append(rid) errs = validate_rule(rule, i) _add(f"{rid}: valid", len(errs) == 0, "; ".join(errs) if errs else "") # Aggregate checks _add("no duplicate rule_ids", len(rule_ids) == len(set(rule_ids)), f"duplicates: {[r for r in rule_ids if rule_ids.count(r) > 1]}" if len(rule_ids) != len(set(rule_ids)) else "") _add("all rules valid", report["valid"], f"{report['stats']['valid_rules']}/{report['stats']['total_rules']} valid") return checks