#!/usr/bin/env python3 """Apply user resolutions to ``_parsed.json`` using ``_conflicts.json``. Usage:: python scripts/apply_resolutions.py --resolutions [--output-dir DIR] The *resolutions.json* file is created by the agent after user arbitration. Each resolution maps a conflict_id to a decision. Resolution format (``resolutions.json``):: [ { "conflict_id": 0, // 0-based index into conflicts array "resolution": "以文字为准", "custom_text": null } ] Outputs ``_updated.json`` — identical to *parsed.json* plus a ``resolved_conflicts`` top-level array with correction instructions for the IR generator. """ import argparse import json import logging import os logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) def apply_resolutions( parsed_path: str, resolutions_path: str, output_dir: str | None = None, ) -> dict: """Load *parsed.json*, apply resolutions, write *updated.json*.""" with open(parsed_path, "r", encoding="utf-8") as f: data = json.load(f) with open(resolutions_path, "r", encoding="utf-8") as f: resolutions = json.load(f) base_dir = os.path.dirname(os.path.abspath(parsed_path)) # Try to find _conflicts.json alongside parsed.json basename = os.path.splitext(os.path.basename(parsed_path))[0] stem = basename[:-7] if basename.endswith("_parsed") else basename candidate = os.path.join(base_dir, f"{stem}_conflicts.json") conflicts = data.get("_conflicts", []) if not conflicts and os.path.isfile(candidate): with open(candidate, "r", encoding="utf-8") as f: conflicts = json.load(f) if output_dir is None: output_dir = base_dir os.makedirs(output_dir, exist_ok=True) # Build resolved_conflicts with correction instructions for ir_generator resolved = [] for res in resolutions: cid = res.get("conflict_id") if cid is None or cid < 0 or cid >= len(conflicts): logger.warning("Invalid conflict_id: %s", cid) continue conflict = conflicts[cid] choice = res.get("resolution", "") custom = res.get("custom_text") entry = { "conflict_id": cid, "conflict_type": conflict.get("conflict_type"), "section": conflict.get("section", ""), "resolution": choice, } # Build a correction instruction string image_val = conflict.get("image_snippet", "") text_val = conflict.get("text_snippet", "") if choice == "以图片为准": entry["correction"] = image_val entry["source"] = "图片" elif choice == "以文字为准": entry["correction"] = text_val entry["source"] = "文字" elif choice == "两处都保留": entry["correction"] = f"{text_val}(另外的观点:{image_val})" entry["source"] = "两者兼容" elif custom: entry["correction"] = custom entry["source"] = "自定义" logger.info("Conflict %d: custom: %s", cid, custom[:60]) else: entry["correction"] = text_val entry["source"] = "文字(默认)" logger.warning("Conflict %d: unknown resolution '%s', defaulting to text", cid, choice) logger.info("Conflict %d: %s → %s", cid, choice, entry["source"]) resolved.append(entry) data["resolved_conflicts"] = resolved logger.info("Applied %d resolutions", len(resolved)) # Write output if basename.endswith("_parsed"): out_name = f"{stem}_updated.json" else: out_name = f"{basename}_updated.json" output_path = os.path.join(output_dir, out_name) with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info("Saved: %s", output_path) return data # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser( description="Apply user resolutions to parsed.json.", ) parser.add_argument("input", metavar="parsed.json", help="Path to _parsed.json") parser.add_argument("--resolutions", "-r", required=True, help="Path to resolutions JSON file") parser.add_argument("--output-dir", default=None, help="Output directory (default: same as input)") args = parser.parse_args() apply_resolutions(args.input, args.resolutions, args.output_dir)