fec4c09ee0
CI / test (push) Successful in 8s
doc_parser_skill: - New: verify_flowchart.py (flowchart validation) - Updated: LLM.py (multi-provider: DeepSeek + DashScope) - Updated: image_parser.py (logic tree support, external prompts) - Updated: SKILL.md, prompts/image_prompt.md conflict_detection_skill: - Updated: LLM.py (multi-provider sync) - Updated: detect_conflicts.py (logic tree text conversion) ir_generation_skill: - Replaced old scripts/LLM.py + ir_generator.py with standalone project - New: main.py, config.py, step1-3_*.py, ensemble_merge.py - New: prompts/, tests/ subdirectories tests: - New: acceptance/ test suite with schema validation - Fixed: conftest no longer globally skips non-acceptance tests - Updated: test_sample.py for new ir_generation structure Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
362 lines
12 KiB
Python
362 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Detect logical conflicts between image analysis and text in ``_parsed.json``.
|
|
|
|
Usage::
|
|
|
|
python scripts/detect_conflicts.py D:/projects/jike/output/车机娱乐系统禁止功能文档_精简_parsed.json [--output-dir DIR]
|
|
|
|
For each diagram-type image (flowchart, architecture, state, sequence, activity),
|
|
the script locates its section via *image_sources*, grabs the corresponding text
|
|
blocks, and calls an LLM to find contradictions/condition-mismatches between the
|
|
image description and the text.
|
|
|
|
Output: ``<basename>_conflicts.json``
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from LLM import LLMClient
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
RATE_LIMIT_DELAY = 0.5
|
|
DIAGRAM_TYPES = {"flowchart", "architecture", "state", "sequence", "activity"}
|
|
MIN_TEXT_CHARS = 20
|
|
|
|
PROMPT_DETECT_CONFLICT = """你是一个文档一致性检查专家。以下内容来自同一份需求文档的同一个章节,包含两部分:
|
|
|
|
## 部分1:图片(流程图/架构图/状态图)的描述
|
|
```
|
|
{image_description}
|
|
```
|
|
|
|
## 部分2:同章节的文字描述
|
|
```
|
|
{text_description}
|
|
```
|
|
|
|
## 你的任务
|
|
检查这两部分之间是否存在**逻辑矛盾或条件不一致**。
|
|
|
|
你需要关注的冲突类型:
|
|
|
|
1. **condition_mismatch**(条件不一致):两者描述了同一规则,但触发条件、阈值、时序不同。
|
|
例如:图片说"车速≥15km/h且持续5秒",文字说"车速≥10km/h且持续3秒"。
|
|
例如:图片说"非P档限制",文字说"车速>0限制"。
|
|
|
|
2. **contradiction**(直接矛盾):两者对同一事物的描述完全相反。
|
|
例如:图片说"功能X被禁止",文字说"功能X可用"。
|
|
例如:图片说"开关默认关闭",文字说"开关默认开启"。
|
|
|
|
3. **scope_mismatch**(范围不一致):两者描述的场景/地域/设备范围不同。
|
|
例如:图片说"国内方案",文字说"海外方案"。
|
|
例如:图片说"CSD中控屏",文字描述包含"PSD副驾屏"。
|
|
|
|
## 输出格式
|
|
|
|
如果**没有冲突**,只输出:
|
|
```
|
|
[[NO_CONFLICT]]
|
|
```
|
|
|
|
如果**有冲突**,输出以下JSON数组(不要任何其他文字):
|
|
|
|
```json
|
|
[
|
|
{{
|
|
"conflict_type": "condition_mismatch",
|
|
"severity": "high",
|
|
"section": "{section_name}",
|
|
"image_snippet": "图片中描述的关键内容(摘录)",
|
|
"text_snippet": "文字中描述的关键内容(摘录)",
|
|
"description": "用中文说明冲突的具体差异"
|
|
}}
|
|
]
|
|
```
|
|
|
|
注意:
|
|
- 每个冲突一个条目,不要合并
|
|
- severity: high(功能正确性受影响)| medium(边界条件模糊)| low(表达方式差异)
|
|
- 输出必须是**严格合法的JSON数组**,不要有尾随逗号
|
|
- 如果没有严格冲突,输出 [[NO_CONFLICT]]
|
|
"""
|
|
|
|
|
|
def _is_nested_tree(lt: dict) -> bool:
|
|
"""Return True if logic_tree uses the nested children format."""
|
|
return isinstance(lt.get("children"), list)
|
|
|
|
|
|
def _logic_tree_to_text(lt: dict) -> str:
|
|
"""Convert logic_tree JSON to readable text for conflict detection.
|
|
|
|
Supports both the new nested-tree format and the legacy flat-nodes format.
|
|
"""
|
|
if _is_nested_tree(lt):
|
|
return _nested_tree_to_text(lt)
|
|
return _flat_tree_to_text(lt)
|
|
|
|
|
|
def _nested_tree_to_text(tree: dict) -> str:
|
|
"""Convert a nested flowchart tree to readable text."""
|
|
lines: list[str] = []
|
|
|
|
def _walk(node: dict, indent: int = 0):
|
|
prefix = " " * indent
|
|
nid = node.get("id", "")
|
|
name = node.get("name", "")
|
|
ntype = node.get("type", "")
|
|
|
|
type_label = {
|
|
"start": "起始", "end": "结束", "process": "处理",
|
|
"decision": "判断", "action": "动作",
|
|
}.get(ntype, ntype)
|
|
|
|
lines.append(f"{prefix}[{type_label}] {nid}: {name}")
|
|
|
|
if ntype == "decision":
|
|
for child in node.get("children", []):
|
|
cond = child.get("condition", "")
|
|
lines.append(f"{prefix} 分支 \"{cond}\":")
|
|
_walk(child["node"], indent + 2)
|
|
elif "children" in node:
|
|
for child in node.get("children", []):
|
|
_walk(child, indent + 1)
|
|
|
|
_walk(tree)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _flat_tree_to_text(lt: dict) -> str:
|
|
"""Convert legacy flat-nodes logic_tree to readable text."""
|
|
lines: list[str] = []
|
|
root = lt.get("root", "")
|
|
if root:
|
|
lines.append(f"根节点: {root}")
|
|
for node in lt.get("nodes", []):
|
|
nid = node.get("id", "")
|
|
ntype = node.get("type", "")
|
|
if ntype == "decision":
|
|
cond = node.get("condition", "")
|
|
branches = node.get("branches", [])
|
|
lines.append(f"判断节点 {nid}: 条件=\"{cond}\"")
|
|
for b in branches:
|
|
lines.append(f" - 分支 \"{b.get('value', '')}\" → {b.get('target', '')}")
|
|
elif ntype == "action":
|
|
lines.append(f"动作节点 {nid}: {node.get('description', '')}")
|
|
elif ntype == "state":
|
|
lines.append(f"状态节点 {nid}: {node.get('description', '')}")
|
|
elif ntype == "start":
|
|
lines.append(f"起始节点 {nid}: {node.get('description', '')}")
|
|
elif ntype == "end":
|
|
lines.append(f"结束节点 {nid}: {node.get('description', '')}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_text_for_section(sections: list[dict], section_name: str) -> str:
|
|
"""Build a single text block for the given section name."""
|
|
texts: list[str] = []
|
|
for sec in sections:
|
|
if sec.get("source", "") == section_name:
|
|
for blk in sec.get("blocks", []):
|
|
if blk["type"] == "para":
|
|
texts.append(blk["text"])
|
|
elif blk["type"] == "table":
|
|
table_lines = [f"表格 {blk['table']}:"]
|
|
for ri, row in enumerate(blk.get("rows", [])):
|
|
cols = row.get("columns", [])
|
|
parts = [f"{c['name']}: {c['text']}" for c in cols]
|
|
table_lines.append(f" 行{ri + 1}: {' | '.join(parts)}")
|
|
texts.append("\n".join(table_lines))
|
|
return "\n\n".join(texts)
|
|
|
|
|
|
def _parse_conflict_json(content: str) -> list[dict]:
|
|
"""Extract JSON array from LLM response, handling markdown fences."""
|
|
stripped = content.strip()
|
|
|
|
if "[[NO_CONFLICT]]" in stripped:
|
|
return []
|
|
|
|
# Remove markdown code fences
|
|
if "```json" in stripped:
|
|
stripped = stripped.split("```json", 1)[1]
|
|
if "```" in stripped:
|
|
stripped = stripped.split("```", 1)[0]
|
|
elif "```" in stripped:
|
|
stripped = stripped.split("```", 1)[1]
|
|
if "```" in stripped:
|
|
stripped = stripped.split("```", 1)[0]
|
|
|
|
stripped = stripped.strip()
|
|
if not stripped:
|
|
return []
|
|
|
|
# Try to find a JSON array
|
|
match = re.search(r"\[\s*\{.*\}\s*\]", stripped, re.DOTALL)
|
|
if match:
|
|
stripped = match.group()
|
|
|
|
try:
|
|
conflicts = json.loads(stripped)
|
|
if isinstance(conflicts, list):
|
|
return conflicts
|
|
return []
|
|
except json.JSONDecodeError as e:
|
|
logger.warning("Failed to parse conflict JSON: %s", e)
|
|
logger.debug("Raw content: %s", stripped)
|
|
return []
|
|
|
|
|
|
def detect_conflicts(
|
|
parsed_path: str,
|
|
output_dir: str | None = None,
|
|
*,
|
|
dry_run: bool = False,
|
|
) -> list[dict]:
|
|
"""Load ``_parsed.json`` and detect image-vs-text conflicts.
|
|
|
|
Returns a flat list of conflict dicts and writes to ``<basename>_conflicts.json``.
|
|
"""
|
|
with open(parsed_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
basename = os.path.splitext(os.path.basename(parsed_path))[0]
|
|
if basename.endswith("_parsed"):
|
|
basename = basename[:-7]
|
|
|
|
if output_dir is None:
|
|
output_dir = os.path.dirname(os.path.abspath(parsed_path))
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
sections = data.get("sections", [])
|
|
image_sources = data.get("image_sources", {})
|
|
image_analysis = data.get("image_analysis", [])
|
|
|
|
llm = LLMClient()
|
|
all_conflicts: list[dict] = []
|
|
|
|
# ---- For each diagram image, compare with its section text -------------
|
|
for img in image_analysis:
|
|
img_type = img.get("type", "other")
|
|
rid = img.get("rid", "")
|
|
description = img.get("description", "").strip()
|
|
logic_tree = img.get("logic_tree_nested") or img.get("logic_tree")
|
|
|
|
if img_type not in DIAGRAM_TYPES or (not description and not logic_tree):
|
|
logger.info("Skip conflict check: rid=%s type=%s", rid, img_type)
|
|
continue
|
|
|
|
# Find source section
|
|
src = image_sources.get(rid, {})
|
|
section_name = src.get("section", "")
|
|
|
|
if not section_name:
|
|
logger.warning("No section found for rid=%s, skipping", rid)
|
|
continue
|
|
|
|
# Build text from the same section
|
|
text_content = _build_text_for_section(sections, section_name)
|
|
text_len = len(text_content.strip())
|
|
if text_len < MIN_TEXT_CHARS:
|
|
logger.info("Section text too short (%d chars) for rid=%s, skip", text_len, rid)
|
|
continue
|
|
|
|
logger.info("Checking conflicts: rid=%s section=%s (desc=%d chars, text=%d chars)",
|
|
rid, section_name, len(description), text_len)
|
|
|
|
if dry_run:
|
|
logger.info(" [DRY RUN] would call LLM to detect conflicts")
|
|
continue
|
|
|
|
# Enrich description with logic_tree if available
|
|
combined_desc = description
|
|
if logic_tree:
|
|
lt_text = _logic_tree_to_text(logic_tree)
|
|
if combined_desc:
|
|
combined_desc = f"[结构化逻辑树]\n{lt_text}\n\n[文字描述]\n{combined_desc}"
|
|
else:
|
|
combined_desc = f"[结构化逻辑树]\n{lt_text}"
|
|
|
|
prompt = PROMPT_DETECT_CONFLICT.format(
|
|
image_description=combined_desc,
|
|
text_description=text_content,
|
|
section_name=section_name,
|
|
)
|
|
|
|
try:
|
|
raw = llm.chat(
|
|
model=LLMClient.TEXT_MODEL,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
)
|
|
logger.info("Conflict check response: %d chars", len(raw))
|
|
except RuntimeError as e:
|
|
logger.error("Conflict check failed: %s", e)
|
|
continue
|
|
|
|
conflicts = _parse_conflict_json(raw)
|
|
|
|
# Enrich with location info
|
|
for c in conflicts:
|
|
c["rid"] = rid
|
|
c["image_path"] = img.get("path", "")
|
|
if "section" not in c:
|
|
c["section"] = section_name
|
|
if src.get("table"):
|
|
c.setdefault("source_location", {})["table"] = src["table"]
|
|
if src.get("row"):
|
|
c.setdefault("source_location", {})["image_row"] = src["row"]
|
|
|
|
all_conflicts.extend(conflicts)
|
|
logger.info(" Found %d conflicts for rid=%s", len(conflicts), rid)
|
|
|
|
if any(x.get("type") in DIAGRAM_TYPES
|
|
for x in image_analysis
|
|
if x.get("rid", "") != rid):
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
# ---- Save ---------------------------------------------------------------
|
|
conflicts_path = os.path.join(output_dir, f"{basename}_conflicts.json")
|
|
with open(conflicts_path, "w", encoding="utf-8") as f:
|
|
json.dump(all_conflicts, f, ensure_ascii=False, indent=2)
|
|
logger.info("Saved: %s (%d conflicts)", conflicts_path, len(all_conflicts))
|
|
|
|
# ---- Summary ------------------------------------------------------------
|
|
usg = llm.usage
|
|
logger.info("Tokens: %d prompt + %d completion = %d total",
|
|
usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"])
|
|
|
|
return all_conflicts
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Detect image-vs-text conflicts in parsed document.",
|
|
)
|
|
parser.add_argument("input", metavar="parsed.json", help="Path to _parsed.json from doc_parser")
|
|
parser.add_argument("--output-dir", metavar="DIR", default=None,
|
|
help="Output directory (default: same as input)")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="Print LLM prompts without calling the API.")
|
|
|
|
args = parser.parse_args()
|
|
detect_conflicts(args.input, output_dir=args.output_dir, dry_run=args.dry_run)
|