Files
document_analyzer/skills/doc_parser_skill/scripts/image_parser.py
pzhang_zywl fec4c09ee0
CI / test (push) Successful in 8s
sync: update all skills from latest workspace code
doc_parser_skill:
- New: verify_flowchart.py (flowchart validation)
- Updated: LLM.py (multi-provider: DeepSeek + DashScope)
- Updated: image_parser.py (logic tree support, external prompts)
- Updated: SKILL.md, prompts/image_prompt.md

conflict_detection_skill:
- Updated: LLM.py (multi-provider sync)
- Updated: detect_conflicts.py (logic tree text conversion)

ir_generation_skill:
- Replaced old scripts/LLM.py + ir_generator.py with standalone project
- New: main.py, config.py, step1-3_*.py, ensemble_merge.py
- New: prompts/, tests/ subdirectories

tests:
- New: acceptance/ test suite with schema validation
- Fixed: conftest no longer globally skips non-acceptance tests
- Updated: test_sample.py for new ir_generation structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-30 22:45:08 +08:00

411 lines
15 KiB
Python

import base64
import json
import logging
import os
import re
from typing import Optional
from LLM import LLMClient
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prompt loading
# ---------------------------------------------------------------------------
def _load_prompt() -> str:
"""Load PROMPT_IMAGE from external file, falling back to inline default."""
prompt_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "prompts")
prompt_path = os.path.join(prompt_dir, "image_prompt.md")
if os.path.isfile(prompt_path):
with open(prompt_path, "r", encoding="utf-8") as f:
return f.read()
# Fallback inline prompt (nested tree format)
return """请分析这张图片,判断类型并输出文字描述和(如适用)结构化逻辑树。
## 判断图片类型
如果是 **流程图 / 架构图 / 状态图 / 时序图 / 活动图**,你需要输出三项内容:
1. 类型标签
2. **嵌套逻辑树 JSON**(见下方格式)
3. 文字描述
如果是 **其他类型**(UI原型图 / 界面截图 / 设计稿 / 手机屏幕截图 / 网页截图等),只输出类型标签和简要文字描述。
## 嵌套逻辑树 JSON 格式(仅流程图/架构图/状态图/时序图/活动图需要)
**核心原则:用嵌套的 `children` 数组表达流程的层级关系,而不是用 id 引用。**
节点类型:`start`(起始), `end`(结束), `process`(处理/状态), `decision`(判断), `action`(动作)
非判断节点的 `children` 是子节点数组。`end` 节点无 `children`。
判断节点的 `children` 格式:
```json
{"condition": "", "node": {"id": "n6", "name": "...", "type": "action", "children": [...]}}
```
每条从根到 `end` 的路径必须是完整逻辑链。decision 必须穷举所有分支。
节点 id 使用 "n1", "n2", "n3"... 格式。
## 输出格式
type: <flowchart|architecture|state|sequence|activity|other>
logic_tree:
{...}
description:
该图片的详细文字描述。"""
PROMPT_IMAGE = _load_prompt()
# ---------------------------------------------------------------------------
# ImageParser
# ---------------------------------------------------------------------------
class ImageParser:
"""Vision LLM wrapper for parsing images (type + description + logic_tree).
The nested-tree ``logic_tree`` is stored alongside a backward-compatible
flat representation so downstream consumers are not broken.
Usage::
parser = ImageParser()
result = parser.parse_image("images/img1.png")
"""
_VALID_TYPES = {"flowchart", "architecture", "state", "sequence", "activity", "other"}
def __init__(self, llm: LLMClient | None = None):
self._llm = llm or LLMClient()
@property
def usage(self) -> dict:
return self._llm.usage
def parse_image(self, image_path: str) -> Optional[dict]:
"""Parse an image and return its type, description, and optional logic_tree.
Returns ``{type, description, [logic_tree], [logic_tree_nested]}``.
"""
logger.info("Parsing image: %s", image_path)
with open(image_path, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
mime = self._mime_type(image_path)
try:
content = self._llm.chat(
model=LLMClient.IMAGE_MODEL,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{img_b64}"}},
{"type": "text", "text": PROMPT_IMAGE},
],
}],
)
except RuntimeError as e:
logger.error(str(e))
return {"type": "other", "description": "", "error": str(e)}
parsed = self._parse_response(content)
if parsed is None:
return None
ptype, description, logic_tree_nested = parsed
result: dict = {"type": ptype, "description": description}
if logic_tree_nested is not None:
result["logic_tree_nested"] = logic_tree_nested
result["logic_tree"] = self._flatten_tree(logic_tree_nested)
return result
# ---- internals ----------------------------------------------------------
def _parse_response(self, content: str) -> Optional[tuple[str, str, Optional[dict]]]:
"""Extract ``(type, description, logic_tree_nested)`` from LLM response.
Parses the nested-tree format. Returns *None* for unparseable content.
"""
content = content.strip()
parsed_type = "other"
logic_tree = None
description = ""
# --- type ---
type_match = re.search(r'(?:type|类型):\s*(\S+)', content)
if type_match:
type_val = type_match.group(1).strip().lower()
if type_val in self._VALID_TYPES:
parsed_type = type_val
# --- logic_tree (anchored at line start) ---
lt_match = re.search(r'(?m)^logic_tree:\s*', content)
desc_match = re.search(r'(?m)^description:\s*', content)
if lt_match:
lt_start = lt_match.end()
lt_end = desc_match.start() if desc_match and desc_match.start() > lt_start else len(content)
lt_raw = content[lt_start:lt_end].strip()
# Try multiple JSON extraction strategies
logic_tree = self._extract_json(lt_raw)
if logic_tree is not None:
is_valid, err_msg = self._validate_flowchart(logic_tree)
if not is_valid:
logger.warning("Flowchart validation warning: %s", err_msg)
else:
logger.info("Failed to extract logic_tree JSON. Raw block length=%d", len(lt_raw))
logger.debug("Raw logic_tree block: %s", lt_raw[:500])
elif parsed_type in self._VALID_TYPES - {"other"}:
logger.info("Diagram type=%s but no logic_tree: in response. Response length=%d",
parsed_type, len(content))
logger.debug("Raw response (first 500): %s", content[:500])
# --- description ---
if desc_match:
description = content[desc_match.end():].strip()
else:
desc = content
if type_match:
desc = desc[type_match.end():]
desc = re.sub(r'(?m)^logic_tree:\s*\{.*?\}\s*', '', desc, flags=re.DOTALL)
description = desc.strip()
return parsed_type, description, logic_tree
@staticmethod
def _validate_flowchart(tree: dict) -> tuple[bool, str]:
"""Validate a nested flowchart tree structure.
Returns ``(is_valid, error_message)``. Non-fatal: returns ``False``
with a warning message but the tree is still kept.
"""
if not isinstance(tree, dict):
return False, "logic_tree is not a dict"
seen_ids: set[str] = set()
def _walk(node: dict, depth: int = 0) -> tuple[bool, str]:
if depth > 20:
return False, f"Tree too deep (>20) at node {node.get('id', '?')}"
nid = node.get("id", "")
if not nid:
return False, "Node missing 'id' field"
if not isinstance(nid, str):
return False, f"Node id must be string, got {type(nid).__name__}"
if nid in seen_ids:
return False, f"Duplicate node id: {nid}"
seen_ids.add(nid)
ntype = node.get("type", "")
if ntype not in ("start", "end", "process", "decision", "action"):
return False, f"Unknown node type '{ntype}' at {nid}"
if ntype == "end":
if "children" in node:
return False, f"End node {nid} should not have children"
return True, ""
children = node.get("children")
if not children:
if ntype != "end":
return False, f"Non-end node {nid} ({ntype}) has no children"
return True, ""
if not isinstance(children, list):
return False, f"children of {nid} is not a list"
if ntype == "decision":
for child in children:
if not isinstance(child, dict):
return False, f"decision child of {nid} is not a dict"
if "condition" not in child:
return False, f"decision child of {nid} missing 'condition'"
if "node" not in child:
return False, f"decision child of {nid} missing 'node'"
ok, err = _walk(child["node"], depth + 1)
if not ok:
return False, err
else:
for child in children:
if not isinstance(child, dict):
return False, f"child of {nid} is not a dict"
ok, err = _walk(child, depth + 1)
if not ok:
return False, err
return True, ""
return _walk(tree)
@staticmethod
def _flatten_tree(tree: dict) -> dict:
"""Convert a nested flowchart tree into the legacy flat-nodes format.
This preserves backward compatibility with downstream consumers
(conflict_detection_skill, ir_generator) that expect the flat format.
"""
nodes: list[dict] = []
root_name = ""
def _collect(node: dict):
nonlocal root_name
nid = node.get("id", "")
ntype = node.get("type", "")
name = node.get("name", "")
if root_name == "" and "children" in node:
root_name = name
if ntype == "decision":
branches = []
for child in node.get("children", []):
branches.append({
"value": child.get("condition", ""),
"target": child["node"].get("id", ""),
})
_collect(child["node"])
nodes.append({
"id": nid,
"type": ntype,
"condition": name,
"branches": branches,
})
elif ntype in ("action", "process", "state"):
nodes.append({
"id": nid,
"type": ntype,
"description": name,
})
for child in node.get("children", []):
_collect(child)
elif ntype == "start":
nodes.append({
"id": nid,
"type": ntype,
"description": name,
})
for child in node.get("children", []):
_collect(child)
# end nodes are collected but have no children
_collect(tree)
# Add end nodes from the nested tree
ends: list[dict] = []
def _collect_ends(node: dict):
if node.get("type") == "end":
ends.append({
"id": node.get("id", ""),
"type": "end",
"description": node.get("name", ""),
})
elif "children" in node:
for child in node.get("children", []):
if isinstance(child, dict):
if "node" in child:
_collect_ends(child["node"])
else:
_collect_ends(child)
_collect_ends(tree)
nodes.extend(ends)
return {"root": root_name, "nodes": nodes}
@staticmethod
def extract_paths(tree: dict) -> list[list[dict]]:
"""Extract all root-to-leaf paths from a nested flowchart tree.
Each path is a list of node dicts (each with id, name, type).
Returns a list of paths useful for human review and LLM verification.
"""
paths: list[list[dict]] = []
def _walk(node: dict, current_path: list[dict]):
entry = {"id": node.get("id", ""), "name": node.get("name", ""), "type": node.get("type", "")}
new_path = current_path + [entry]
if node.get("type") == "end":
paths.append(new_path)
return
children = node.get("children", [])
if not children:
paths.append(new_path)
return
if node.get("type") == "decision":
for child in children:
_walk(child["node"], new_path)
else:
for child in children:
_walk(child, new_path)
_walk(tree, [])
return paths
@staticmethod
def paths_to_text(paths: list[list[dict]]) -> str:
"""Render extracted paths as human-readable text for review."""
lines: list[str] = []
for i, path in enumerate(paths, 1):
steps = []
for node in path:
if node["type"] == "decision":
steps.append(f"[判断] {node['name']}")
elif node["type"] == "end":
steps.append(f"[结束] {node['name']}")
else:
steps.append(f"[{node['type']}] {node['name']}")
lines.append(f"路径 {i}: {' -> '.join(steps)}")
return "\n".join(lines)
@staticmethod
def _extract_json(text: str) -> Optional[dict]:
"""Try multiple strategies to extract a JSON object from text.
Returns the parsed dict or None.
"""
# Strategy 1: first { ... } pair (simple regex)
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Strategy 2: find balanced braces
start = text.find("{")
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
@staticmethod
def _mime_type(image_path: str) -> str:
ext = os.path.splitext(image_path)[1].lstrip(".").lower()
return {
"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
"gif": "image/gif", "bmp": "image/bmp",
"webp": "image/webp", "svg": "image/svg+xml", "tiff": "image/tiff",
}.get(ext, "image/png")