#!/usr/bin/env python3 """Verify flowchart logic trees for structural correctness and consistency. Usage:: python verify_flowchart.py [--llm] [--output-report REPORT.md] Performs three levels of checks: 1. **Structural validation** — tree integrity, node uniqueness, leaf types 2. **Path extraction** — renders all root-to-leaf paths as readable text 3. **LLM consistency check** (opt-in with ``--llm``) — compares extracted paths against the original text description for logical inconsistencies Outputs PASS/FAIL and a detailed report. """ import argparse import json import logging import os import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from image_parser import ImageParser from LLM import LLMClient logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Prompt for LLM path-vs-description consistency check # --------------------------------------------------------------------------- PROMPT_VERIFY_PATHS = """你是一个流程图审核专家。以下内容来自同一张流程图的解析结果: ## 流程图路径(从嵌套逻辑树提取的所有根到叶路径) ``` {paths_text} ``` ## 原始文字描述 ``` {description} ``` ## 你的任务 逐条检查每条路径是否与文字描述一致。重点关注: 1. **分支方向错误**:路径中的判断分支走向是否与文字描述矛盾? 例如:文字说"满足条件后退出",但路径中"是"分支走向了"不受限"。 2. **缺失步骤**:路径中是否缺少文字描述中提到的关键步骤? 3. **冗余步骤**:路径中是否包含文字描述未提及的多余步骤? 4. **条件颠倒**:判断条件的"是/否"分支是否与文字描述相反? ## 输出格式 如果**所有路径一致**,只输出: ``` [[PATHS_CONSISTENT]] ``` 如果**发现不一致**,输出 JSON 数组: ```json [ {{ "path_index": 1, "issue_type": "branch_error|missing_step|redundant_step|condition_reversed", "severity": "high|medium|low", "description": "用中文说明具体问题" }} ] ``` 注意:输出必须是严格合法的 JSON 数组,不要有尾随逗号,不要包含代码块包裹符号。 """ # --------------------------------------------------------------------------- # Core verification logic # --------------------------------------------------------------------------- def verify_parsed_json(parsed_path: str, *, use_llm: bool = False) -> dict: """Load _parsed.json and verify all flowchart logic trees. Returns a report dict with keys: - total_flowcharts: int - passed: int - failed: int - results: list of per-flowchart results """ with open(parsed_path, "r", encoding="utf-8") as f: data = json.load(f) image_analysis = data.get("image_analysis", []) flowcharts = [img for img in image_analysis if img.get("type") == "flowchart"] report = { "total_flowcharts": len(flowcharts), "passed": 0, "failed": 0, "results": [], } llm = LLMClient() if use_llm else None for img in flowcharts: rid = img.get("rid", "unknown") logger.info("Verifying flowchart: rid=%s", rid) result = _verify_single(img, llm) report["results"].append(result) if result["structural_ok"] and (not use_llm or result.get("llm_ok", True)): report["passed"] += 1 else: report["failed"] += 1 return report def verify_flowchart_file(filepath: str, *, use_llm: bool = False) -> dict: """Load a standalone flowchart JSON file and verify it.""" with open(filepath, "r", encoding="utf-8") as f: tree = json.load(f) img = {"logic_tree_nested": tree, "description": "", "rid": os.path.basename(filepath)} llm = LLMClient() if use_llm else None result = _verify_single(img, llm) return { "total_flowcharts": 1, "passed": 1 if result["structural_ok"] else 0, "failed": 0 if result["structural_ok"] else 1, "results": [result], } def _verify_single(img: dict, llm: LLMClient | None) -> dict: """Verify a single flowchart image analysis entry.""" rid = img.get("rid", "unknown") description = img.get("description", "").strip() # Try nested format first, fall back to flat format tree = img.get("logic_tree_nested") or img.get("logic_tree") if tree is None: return { "rid": rid, "structural_ok": False, "errors": ["No logic_tree found"], "paths_text": "", "llm_issues": [], } # Check if it's the new nested format or old flat format is_nested = "children" in tree and isinstance(tree.get("children"), list) # --- Level 1: Structural validation --- structural_ok = True errors: list[str] = [] if is_nested: ok, err = ImageParser._validate_flowchart(tree) if not ok: structural_ok = False errors.append(f"Structure: {err}") # Extract paths paths = ImageParser.extract_paths(tree) paths_text = ImageParser.paths_to_text(paths) errors.append(f"Path count: {len(paths)}") else: # Old flat format — basic check nodes = tree.get("nodes", []) ids = [n.get("id", "") for n in nodes] if len(ids) != len(set(ids)): structural_ok = False errors.append("Structure: duplicate node ids in flat format") # Build simple path-like text for flat format paths_text = _flat_to_text(tree) # --- Level 2: Path count sanity check --- if is_nested and len(paths) == 0: structural_ok = False errors.append("No paths extracted from tree") # --- Level 3: LLM consistency check --- llm_issues: list[dict] = [] llm_ok = True if llm and description and paths_text: prompt = PROMPT_VERIFY_PATHS.format( paths_text=paths_text, description=description, ) try: raw = llm.chat( model=LLMClient.TEXT_MODEL, messages=[{"role": "user", "content": prompt}], ) llm_issues = _parse_llm_issues(raw) if llm_issues: llm_ok = False errors.append(f"LLM found {len(llm_issues)} issue(s)") except RuntimeError as e: errors.append(f"LLM check failed: {e}") return { "rid": rid, "structural_ok": structural_ok, "errors": errors, "paths_text": paths_text, "llm_ok": llm_ok, "llm_issues": llm_issues, } def _flat_to_text(tree: dict) -> str: """Build path-like text from old flat-format logic_tree.""" nodes = tree.get("nodes", []) root = tree.get("root", "") lines = [f"Root: {root}"] node_map = {n["id"]: n for n in nodes} def _trace(node_id: str, visited: set, path: list[str]) -> list[str]: if node_id in visited: path.append(f"[循环] {node_id}") return path visited.add(node_id) node = node_map.get(node_id) if node is None: path.append(f"[缺失] {node_id}") return path ntype = node.get("type", "") if ntype == "decision": cond = node.get("condition", "") for b in node.get("branches", []): val = b.get("value", "") tgt = b.get("target", "") new_path = path + [f"[判断] {cond} → {val}"] _trace(tgt, visited.copy(), new_path) elif ntype == "end": path.append(f"[结束] {node.get('description', '')}") lines.append(" -> ".join(path)) else: path.append(f"[{ntype}] {node.get('description', '')}") # Flat format doesn't have explicit children for non-decision nodes # so we can't trace further lines.append(" -> ".join(path)) return path # Try to find start nodes starts = [n for n in nodes if n.get("type") == "start"] if starts: for s in starts: _trace(s["id"], set(), []) else: lines.append("(Cannot trace: no start node in flat format)") return "\n".join(lines) def _parse_llm_issues(content: str) -> list[dict]: """Parse LLM response for path consistency issues.""" stripped = content.strip() if "[[PATHS_CONSISTENT]]" in stripped: return [] # Remove markdown code fences if "```json" in stripped: stripped = stripped.split("```json", 1)[1] if "```" in stripped: stripped = stripped.split("```", 1)[0] elif "```" in stripped: stripped = stripped.split("```", 1)[1] if "```" in stripped: stripped = stripped.split("```", 1)[0] stripped = stripped.strip() if not stripped: return [] try: issues = json.loads(stripped) if isinstance(issues, list): return issues return [] except json.JSONDecodeError: logger.debug("Failed to parse LLM issues: %s", stripped[:200]) return [] # --------------------------------------------------------------------------- # Report rendering # --------------------------------------------------------------------------- def print_report(report: dict) -> str: """Print a human-readable verification report and return it as a string.""" lines: list[str] = [] lines.append("=" * 60) lines.append("流程图校验报告") lines.append("=" * 60) lines.append(f"流程图总数: {report['total_flowcharts']}") lines.append(f"通过: {report['passed']}") lines.append(f"失败: {report['failed']}") overall = "PASS" if report["failed"] == 0 else "FAIL" lines.append(f"总体结果: {overall}") lines.append("") for i, r in enumerate(report["results"], 1): rid = r["rid"] status = "[PASS]" if r["structural_ok"] else "[FAIL]" lines.append(f"[{i}] rid={rid} {status}") for err in r.get("errors", []): lines.append(f" - {err}") if r.get("paths_text"): lines.append(" 路径:") for path_line in r["paths_text"].split("\n"): lines.append(f" {path_line}") llm_issues = r.get("llm_issues", []) if llm_issues: lines.append(" LLM发现的问题:") for issue in llm_issues: lines.append(f" [{issue.get('severity', '?')}] {issue.get('description', '')}") lines.append("") report_text = "\n".join(lines) print(report_text) return report_text # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Verify flowchart logic trees for correctness.", ) parser.add_argument( "input", metavar="FILE", help="Path to _parsed.json or standalone flowchart JSON", ) parser.add_argument( "--llm", action="store_true", help="Run LLM consistency check (compares paths against text description)", ) parser.add_argument( "--output-report", metavar="PATH", help="Save verification report to a file", ) args = parser.parse_args() # Determine input type with open(args.input, "r", encoding="utf-8") as f: data = json.load(f) if "image_analysis" in data: report = verify_parsed_json(args.input, use_llm=args.llm) else: report = verify_flowchart_file(args.input, use_llm=args.llm) report_text = print_report(report) if args.output_report: with open(args.output_report, "w", encoding="utf-8") as f: f.write(report_text) logger.info("Report saved: %s", args.output_report) # Exit code: 0 for PASS, 1 for FAIL if report["failed"] > 0: sys.exit(1) if __name__ == "__main__": main()