#!/usr/bin/env python3 """Detect logical conflicts between image analysis and text in ``_parsed.json``. Usage:: python scripts/detect_conflicts.py D:/projects/jike/output/车机娱乐系统禁止功能文档_精简_parsed.json [--output-dir DIR] For each diagram-type image (flowchart, architecture, state, sequence, activity), the script locates its section via *image_sources*, grabs the corresponding text blocks, and calls an LLM to find contradictions/condition-mismatches between the image description and the text. Output: ``_conflicts.json`` """ import argparse import json import logging import os import re import sys import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from LLM import LLMClient logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) RATE_LIMIT_DELAY = 0.5 DIAGRAM_TYPES = {"flowchart", "architecture", "state", "sequence", "activity"} MIN_TEXT_CHARS = 20 PROMPT_DETECT_CONFLICT = """你是一个文档一致性检查专家。以下内容来自同一份需求文档的同一个章节,包含两部分: ## 部分1:图片(流程图/架构图/状态图)的描述 ``` {image_description} ``` ## 部分2:同章节的文字描述 ``` {text_description} ``` ## 你的任务 检查这两部分之间是否存在**逻辑矛盾或条件不一致**。 你需要关注的冲突类型: 1. **condition_mismatch**(条件不一致):两者描述了同一规则,但触发条件、阈值、时序不同。 例如:图片说"车速≥15km/h且持续5秒",文字说"车速≥10km/h且持续3秒"。 例如:图片说"非P档限制",文字说"车速>0限制"。 2. **contradiction**(直接矛盾):两者对同一事物的描述完全相反。 例如:图片说"功能X被禁止",文字说"功能X可用"。 例如:图片说"开关默认关闭",文字说"开关默认开启"。 3. **scope_mismatch**(范围不一致):两者描述的场景/地域/设备范围不同。 例如:图片说"国内方案",文字说"海外方案"。 例如:图片说"CSD中控屏",文字描述包含"PSD副驾屏"。 ## 输出格式 如果**没有冲突**,只输出: ``` [[NO_CONFLICT]] ``` 如果**有冲突**,输出以下JSON数组(不要任何其他文字): ```json [ {{ "conflict_type": "condition_mismatch", "severity": "high", "section": "{section_name}", "image_snippet": "图片中描述的关键内容(摘录)", "text_snippet": "文字中描述的关键内容(摘录)", "description": "用中文说明冲突的具体差异" }} ] ``` 注意: - 每个冲突一个条目,不要合并 - severity: high(功能正确性受影响)| medium(边界条件模糊)| low(表达方式差异) - 输出必须是**严格合法的JSON数组**,不要有尾随逗号 - 如果没有严格冲突,输出 [[NO_CONFLICT]] """ def _is_nested_tree(lt: dict) -> bool: """Return True if logic_tree uses the nested children format.""" return isinstance(lt.get("children"), list) def _logic_tree_to_text(lt: dict) -> str: """Convert logic_tree JSON to readable text for conflict detection. Supports both the new nested-tree format and the legacy flat-nodes format. """ if _is_nested_tree(lt): return _nested_tree_to_text(lt) return _flat_tree_to_text(lt) def _nested_tree_to_text(tree: dict) -> str: """Convert a nested flowchart tree to readable text.""" lines: list[str] = [] def _walk(node: dict, indent: int = 0): prefix = " " * indent nid = node.get("id", "") name = node.get("name", "") ntype = node.get("type", "") type_label = { "start": "起始", "end": "结束", "process": "处理", "decision": "判断", "action": "动作", }.get(ntype, ntype) lines.append(f"{prefix}[{type_label}] {nid}: {name}") if ntype == "decision": for child in node.get("children", []): cond = child.get("condition", "") lines.append(f"{prefix} 分支 \"{cond}\":") _walk(child["node"], indent + 2) elif "children" in node: for child in node.get("children", []): _walk(child, indent + 1) _walk(tree) return "\n".join(lines) def _flat_tree_to_text(lt: dict) -> str: """Convert legacy flat-nodes logic_tree to readable text.""" lines: list[str] = [] root = lt.get("root", "") if root: lines.append(f"根节点: {root}") for node in lt.get("nodes", []): nid = node.get("id", "") ntype = node.get("type", "") if ntype == "decision": cond = node.get("condition", "") branches = node.get("branches", []) lines.append(f"判断节点 {nid}: 条件=\"{cond}\"") for b in branches: lines.append(f" - 分支 \"{b.get('value', '')}\" → {b.get('target', '')}") elif ntype == "action": lines.append(f"动作节点 {nid}: {node.get('description', '')}") elif ntype == "state": lines.append(f"状态节点 {nid}: {node.get('description', '')}") elif ntype == "start": lines.append(f"起始节点 {nid}: {node.get('description', '')}") elif ntype == "end": lines.append(f"结束节点 {nid}: {node.get('description', '')}") return "\n".join(lines) def _build_text_for_section(sections: list[dict], section_name: str) -> str: """Build a single text block for the given section name.""" texts: list[str] = [] for sec in sections: if sec.get("source", "") == section_name: for blk in sec.get("blocks", []): if blk["type"] == "para": texts.append(blk["text"]) elif blk["type"] == "table": table_lines = [f"表格 {blk['table']}:"] for ri, row in enumerate(blk.get("rows", [])): cols = row.get("columns", []) parts = [f"{c['name']}: {c['text']}" for c in cols] table_lines.append(f" 行{ri + 1}: {' | '.join(parts)}") texts.append("\n".join(table_lines)) return "\n\n".join(texts) def _parse_conflict_json(content: str) -> list[dict]: """Extract JSON array from LLM response, handling markdown fences.""" stripped = content.strip() if "[[NO_CONFLICT]]" in stripped: return [] # Remove markdown code fences if "```json" in stripped: stripped = stripped.split("```json", 1)[1] if "```" in stripped: stripped = stripped.split("```", 1)[0] elif "```" in stripped: stripped = stripped.split("```", 1)[1] if "```" in stripped: stripped = stripped.split("```", 1)[0] stripped = stripped.strip() if not stripped: return [] # Try to find a JSON array match = re.search(r"\[\s*\{.*\}\s*\]", stripped, re.DOTALL) if match: stripped = match.group() try: conflicts = json.loads(stripped) if isinstance(conflicts, list): return conflicts return [] except json.JSONDecodeError as e: logger.warning("Failed to parse conflict JSON: %s", e) logger.debug("Raw content: %s", stripped) return [] def detect_conflicts( parsed_path: str, output_dir: str | None = None, *, dry_run: bool = False, ) -> list[dict]: """Load ``_parsed.json`` and detect image-vs-text conflicts. Returns a flat list of conflict dicts and writes to ``_conflicts.json``. """ with open(parsed_path, "r", encoding="utf-8") as f: data = json.load(f) basename = os.path.splitext(os.path.basename(parsed_path))[0] if basename.endswith("_parsed"): basename = basename[:-7] if output_dir is None: output_dir = os.path.dirname(os.path.abspath(parsed_path)) os.makedirs(output_dir, exist_ok=True) sections = data.get("sections", []) image_sources = data.get("image_sources", {}) image_analysis = data.get("image_analysis", []) llm = LLMClient() all_conflicts: list[dict] = [] # ---- For each diagram image, compare with its section text ------------- for img in image_analysis: img_type = img.get("type", "other") rid = img.get("rid", "") description = img.get("description", "").strip() logic_tree = img.get("logic_tree_nested") or img.get("logic_tree") if img_type not in DIAGRAM_TYPES or (not description and not logic_tree): logger.info("Skip conflict check: rid=%s type=%s", rid, img_type) continue # Find source section src = image_sources.get(rid, {}) section_name = src.get("section", "") if not section_name: logger.warning("No section found for rid=%s, skipping", rid) continue # Build text from the same section text_content = _build_text_for_section(sections, section_name) text_len = len(text_content.strip()) if text_len < MIN_TEXT_CHARS: logger.info("Section text too short (%d chars) for rid=%s, skip", text_len, rid) continue logger.info("Checking conflicts: rid=%s section=%s (desc=%d chars, text=%d chars)", rid, section_name, len(description), text_len) if dry_run: logger.info(" [DRY RUN] would call LLM to detect conflicts") continue # Enrich description with logic_tree if available combined_desc = description if logic_tree: lt_text = _logic_tree_to_text(logic_tree) if combined_desc: combined_desc = f"[结构化逻辑树]\n{lt_text}\n\n[文字描述]\n{combined_desc}" else: combined_desc = f"[结构化逻辑树]\n{lt_text}" prompt = PROMPT_DETECT_CONFLICT.format( image_description=combined_desc, text_description=text_content, section_name=section_name, ) try: raw = llm.chat( model=LLMClient.TEXT_MODEL, messages=[{"role": "user", "content": prompt}], ) logger.info("Conflict check response: %d chars", len(raw)) except RuntimeError as e: logger.error("Conflict check failed: %s", e) continue conflicts = _parse_conflict_json(raw) # Enrich with location info for c in conflicts: c["rid"] = rid c["image_path"] = img.get("path", "") if "section" not in c: c["section"] = section_name if src.get("table"): c.setdefault("source_location", {})["table"] = src["table"] if src.get("row"): c.setdefault("source_location", {})["image_row"] = src["row"] all_conflicts.extend(conflicts) logger.info(" Found %d conflicts for rid=%s", len(conflicts), rid) if any(x.get("type") in DIAGRAM_TYPES for x in image_analysis if x.get("rid", "") != rid): time.sleep(RATE_LIMIT_DELAY) # ---- Save --------------------------------------------------------------- conflicts_path = os.path.join(output_dir, f"{basename}_conflicts.json") with open(conflicts_path, "w", encoding="utf-8") as f: json.dump(all_conflicts, f, ensure_ascii=False, indent=2) logger.info("Saved: %s (%d conflicts)", conflicts_path, len(all_conflicts)) # ---- Summary ------------------------------------------------------------ usg = llm.usage logger.info("Tokens: %d prompt + %d completion = %d total", usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"]) return all_conflicts # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser( description="Detect image-vs-text conflicts in parsed document.", ) parser.add_argument("input", metavar="parsed.json", help="Path to _parsed.json from doc_parser") parser.add_argument("--output-dir", metavar="DIR", default=None, help="Output directory (default: same as input)") parser.add_argument("--dry-run", action="store_true", help="Print LLM prompts without calling the API.") args = parser.parse_args() detect_conflicts(args.input, output_dir=args.output_dir, dry_run=args.dry_run)