Initial commit: document_analyzer with CI/CD pipeline
CI / test (push) Successful in 30s

- 4 skill pipeline (doc_parser, conflict_detection, ir_generation, resolution_application)
- CI workflow on push/PR (.gitea/workflows/ci.yml)
- Auto-issue on CI failure (.gitea/workflows/auto-issue.yml)
- Pytest smoke tests (tests/test_sample.py)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 20:00:26 +08:00
commit 40567a4fb6
22 changed files with 2898 additions and 0 deletions
+37
View File
@@ -0,0 +1,37 @@
---
name: 冲突检测技能
description: 分析解析后的文档,检测图表类图像与其相应文本描述之间的矛盾和条件不匹配。
---
# 冲突检测技能
## 概述
此技能识别解析文档中文本内容与视觉内容之间的潜在冲突。它特别针对图表类图像(流程图、架构图、状态图、序列图和活动图)并交叉检查其描述与同文档部分的文本内容。
## 功能
该技能:
- 从解析的文档结构中识别图表类图像
- 将图像描述与同一文档部分中的相应文本内容进行交叉引用
- 检测视觉表示和文本表示之间的矛盾和条件不匹配
- 生成包含其位置的已识别冲突的结构化列表
- 专门针对流程图、架构图、状态图、序列图和活动图
## 输入要求
- 解析文档JSON文件的路径(由文档解析技能生成)
- 可选输出目录规范
- 可选试运行标志,在不调用API的情况下预览大语言模型提示
## 输出
该技能生成一个结构化JSON文件,文件名为输入文档的基本名称后跟'_conflicts.json',包含:
- 带有关于差异详情的冲突对象列表
- 标识每个冲突发生位置的节标识符
- 冲突图像和文本内容的片段
- 每个冲突的类型分类(例如,矛盾、条件不匹配)
## 集成点
此技能消耗文档解析技能的输出并为解决方案应用技能提供输入。冲突解决过程通常需要人工输入才能进入下一阶段。
@@ -0,0 +1,105 @@
import logging
import os
import time
from typing import Optional
from openai import OpenAI
logger = logging.getLogger(__name__)
class LLMClient:
"""Low-level OpenAI-compatible LLM client with retry and token tracking.
Usage::
llm = LLMClient()
content = llm.chat("qwen3.5-flash", [{"role": "user", "content": "Hello"}])
print(llm.usage)
"""
IMAGE_MODEL = "qwen3-vl-plus"
TEXT_MODEL = "qwen3.5-flash-2026-02-23"
TIMEOUT = 120
MAX_RETRIES = 3
def __init__(
self,
*,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
timeout: int | None = None,
):
key = os.environ.get("DASHSCOPE_API_KEY", "")
if not key:
raise ValueError("DASHSCOPE_API_KEY environment variable is not set.")
self._client = OpenAI(api_key=key, base_url=base_url)
self._timeout = timeout or self.TIMEOUT
self._prompt_tokens = 0
self._completion_tokens = 0
@property
def usage(self) -> dict:
"""Return accumulated token counts as ``{prompt, completion, total}``."""
return {
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"total_tokens": self._prompt_tokens + self._completion_tokens,
}
@staticmethod
def estimate_tokens(text: str) -> int:
"""Quick token estimate. CJK ≈1.7/token, others ≈3.0/token."""
cjk = sum(1 for c in text if '' <= c <= '鿿' or ' ' <= c <= '')
other = len(text) - cjk
return max(1, int(cjk / 1.7 + other / 3.0))
@staticmethod
def estimate_image_tokens() -> int:
"""Fixed estimate for one vision-model image (~500 tokens)."""
return 500
def chat(
self, model: str, messages: list[dict], *, timeout: int | None = None,
response_format: dict | None = None,
) -> str:
"""Send a chat completion request and return the response content.
Automatically retries on failure and accumulates token usage.
"""
label = f"chat({model})"
def _call():
t0 = time.time()
kwargs = dict(model=model, messages=messages, timeout=timeout or self._timeout)
if response_format is not None:
kwargs["response_format"] = response_format
kwargs["temperature"] = 0
resp = self._client.chat.completions.create(**kwargs)
content = resp.choices[0].message.content
usg = resp.usage
if usg:
self._prompt_tokens += usg.prompt_tokens
self._completion_tokens += usg.completion_tokens
elapsed = time.time() - t0
logger.info("%s: %d chars in %.1fs", label, len(content) if content else 0, elapsed)
if not content:
raise RuntimeError("Empty response from LLM")
return content
return self._retry(_call, label)
def _retry(self, fn, label: str) -> str:
"""Call *fn()* with exponential-backoff retry."""
last_error: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
return fn()
except Exception as e:
last_error = e
logger.warning(
"%s error (attempt %d/%d): %s",
label, attempt + 1, self.MAX_RETRIES, e,
)
if attempt < self.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
raise RuntimeError(f"{label}: all retries exhausted") from last_error
@@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""Detect logical conflicts between image analysis and text in ``_parsed.json``.
Usage::
python scripts/detect_conflicts.py D:/projects/jike/output/车机娱乐系统禁止功能文档_精简_parsed.json [--output-dir DIR]
For each diagram-type image (flowchart, architecture, state, sequence, activity),
the script locates its section via *image_sources*, grabs the corresponding text
blocks, and calls an LLM to find contradictions/condition-mismatches between the
image description and the text.
Output: ``<basename>_conflicts.json``
"""
import argparse
import json
import logging
import os
import re
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from LLM import LLMClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
RATE_LIMIT_DELAY = 0.5
DIAGRAM_TYPES = {"flowchart", "architecture", "state", "sequence", "activity"}
MIN_TEXT_CHARS = 20
PROMPT_DETECT_CONFLICT = """你是一个文档一致性检查专家。以下内容来自同一份需求文档的同一个章节,包含两部分:
## 部分1:图片(流程图/架构图/状态图)的描述
```
{image_description}
```
## 部分2:同章节的文字描述
```
{text_description}
```
## 你的任务
检查这两部分之间是否存在**逻辑矛盾或条件不一致**。
你需要关注的冲突类型:
1. **condition_mismatch**(条件不一致):两者描述了同一规则,但触发条件、阈值、时序不同。
例如:图片说"车速≥15km/h且持续5秒",文字说"车速≥10km/h且持续3秒"
例如:图片说"非P档限制",文字说"车速>0限制"
2. **contradiction**(直接矛盾):两者对同一事物的描述完全相反。
例如:图片说"功能X被禁止",文字说"功能X可用"
例如:图片说"开关默认关闭",文字说"开关默认开启"
3. **scope_mismatch**(范围不一致):两者描述的场景/地域/设备范围不同。
例如:图片说"国内方案",文字说"海外方案"
例如:图片说"CSD中控屏",文字描述包含"PSD副驾屏"
## 输出格式
如果**没有冲突**,只输出:
```
[[NO_CONFLICT]]
```
如果**有冲突**,输出以下JSON数组(不要任何其他文字):
```json
[
{{
"conflict_type": "condition_mismatch",
"severity": "high",
"section": "{section_name}",
"image_snippet": "图片中描述的关键内容(摘录)",
"text_snippet": "文字中描述的关键内容(摘录)",
"description": "用中文说明冲突的具体差异"
}}
]
```
注意:
- 每个冲突一个条目,不要合并
- severity: high(功能正确性受影响)| medium(边界条件模糊)| low(表达方式差异)
- 输出必须是**严格合法的JSON数组**,不要有尾随逗号
- 如果没有严格冲突,输出 [[NO_CONFLICT]]
"""
def _build_text_for_section(sections: list[dict], section_name: str) -> str:
"""Build a single text block for the given section name."""
texts: list[str] = []
for sec in sections:
if sec.get("source", "") == section_name:
for blk in sec.get("blocks", []):
if blk["type"] == "para":
texts.append(blk["text"])
elif blk["type"] == "table":
table_lines = [f"表格 {blk['table']}:"]
for ri, row in enumerate(blk.get("rows", [])):
cols = row.get("columns", [])
parts = [f"{c['name']}: {c['text']}" for c in cols]
table_lines.append(f"{ri + 1}: {' | '.join(parts)}")
texts.append("\n".join(table_lines))
return "\n\n".join(texts)
def _parse_conflict_json(content: str) -> list[dict]:
"""Extract JSON array from LLM response, handling markdown fences."""
stripped = content.strip()
if "[[NO_CONFLICT]]" in stripped:
return []
# Remove markdown code fences
if "```json" in stripped:
stripped = stripped.split("```json", 1)[1]
if "```" in stripped:
stripped = stripped.split("```", 1)[0]
elif "```" in stripped:
stripped = stripped.split("```", 1)[1]
if "```" in stripped:
stripped = stripped.split("```", 1)[0]
stripped = stripped.strip()
if not stripped:
return []
# Try to find a JSON array
match = re.search(r"\[\s*\{.*\}\s*\]", stripped, re.DOTALL)
if match:
stripped = match.group()
try:
conflicts = json.loads(stripped)
if isinstance(conflicts, list):
return conflicts
return []
except json.JSONDecodeError as e:
logger.warning("Failed to parse conflict JSON: %s", e)
logger.debug("Raw content: %s", stripped)
return []
def detect_conflicts(
parsed_path: str,
output_dir: str | None = None,
*,
dry_run: bool = False,
) -> list[dict]:
"""Load ``_parsed.json`` and detect image-vs-text conflicts.
Returns a flat list of conflict dicts and writes to ``<basename>_conflicts.json``.
"""
with open(parsed_path, "r", encoding="utf-8") as f:
data = json.load(f)
basename = os.path.splitext(os.path.basename(parsed_path))[0]
if basename.endswith("_parsed"):
basename = basename[:-7]
if output_dir is None:
output_dir = os.path.dirname(os.path.abspath(parsed_path))
os.makedirs(output_dir, exist_ok=True)
sections = data.get("sections", [])
image_sources = data.get("image_sources", {})
image_analysis = data.get("image_analysis", [])
llm = LLMClient()
all_conflicts: list[dict] = []
# ---- For each diagram image, compare with its section text -------------
for img in image_analysis:
img_type = img.get("type", "other")
rid = img.get("rid", "")
description = img.get("description", "").strip()
if img_type not in DIAGRAM_TYPES or not description:
logger.info("Skip conflict check: rid=%s type=%s", rid, img_type)
continue
# Find source section
src = image_sources.get(rid, {})
section_name = src.get("section", "")
if not section_name:
logger.warning("No section found for rid=%s, skipping", rid)
continue
# Build text from the same section
text_content = _build_text_for_section(sections, section_name)
text_len = len(text_content.strip())
if text_len < MIN_TEXT_CHARS:
logger.info("Section text too short (%d chars) for rid=%s, skip", text_len, rid)
continue
logger.info("Checking conflicts: rid=%s section=%s (desc=%d chars, text=%d chars)",
rid, section_name, len(description), text_len)
if dry_run:
logger.info(" [DRY RUN] would call LLM to detect conflicts")
continue
prompt = PROMPT_DETECT_CONFLICT.format(
image_description=description,
text_description=text_content,
section_name=section_name,
)
try:
raw = llm.chat(
model=LLMClient.TEXT_MODEL,
messages=[{"role": "user", "content": prompt}],
)
logger.info("Conflict check response: %d chars", len(raw))
except RuntimeError as e:
logger.error("Conflict check failed: %s", e)
continue
conflicts = _parse_conflict_json(raw)
# Enrich with location info
for c in conflicts:
c["rid"] = rid
c["image_path"] = img.get("path", "")
if "section" not in c:
c["section"] = section_name
if src.get("table"):
c.setdefault("source_location", {})["table"] = src["table"]
if src.get("row"):
c.setdefault("source_location", {})["image_row"] = src["row"]
all_conflicts.extend(conflicts)
logger.info(" Found %d conflicts for rid=%s", len(conflicts), rid)
if any(x.get("type") in DIAGRAM_TYPES
for x in image_analysis
if x.get("rid", "") != rid):
time.sleep(RATE_LIMIT_DELAY)
# ---- Save ---------------------------------------------------------------
conflicts_path = os.path.join(output_dir, f"{basename}_conflicts.json")
with open(conflicts_path, "w", encoding="utf-8") as f:
json.dump(all_conflicts, f, ensure_ascii=False, indent=2)
logger.info("Saved: %s (%d conflicts)", conflicts_path, len(all_conflicts))
# ---- Summary ------------------------------------------------------------
usg = llm.usage
logger.info("Tokens: %d prompt + %d completion = %d total",
usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"])
return all_conflicts
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Detect image-vs-text conflicts in parsed document.",
)
parser.add_argument("input", metavar="parsed.json", help="Path to _parsed.json from doc_parser")
parser.add_argument("--output-dir", metavar="DIR", default=None,
help="Output directory (default: same as input)")
parser.add_argument("--dry-run", action="store_true",
help="Print LLM prompts without calling the API.")
args = parser.parse_args()
detect_conflicts(args.input, output_dir=args.output_dir, dry_run=args.dry_run)
+36
View File
@@ -0,0 +1,36 @@
---
name: 文档解析技能
description: 解析文档(.docx, .pdf)以提取图像和文本结构,并使用视觉大语言模型分析每个图像的类型和描述。
---
# 文档解析技能
## 概述
此技能从文档(.docx, .pdf)中提取内容并准备进行进一步分析。它提取文本内容和嵌入图像,并对图像执行初始分析以了解其类型和内容。
## 功能
该技能:
- 从文档中提取文本结构(段落、表格、标题)
- 识别并提取嵌入的图像
- 使用视觉大语言模型分析每个图像并确定其类型和内容描述
- 生成结构化输出,将图像映射到其在文档中的位置
- 创建文档的初始解析表示,供后续处理阶段使用
## 输入要求
- 文档文件路径(必需,支持.docx和.pdf格式)
- 可选输出目录(默认为'output/'
- 可选试运行标志,在不调用API的情况下预览大语言模型提示
## 输出
该技能生成一个结构化JSON文件,文件名为输入文档的基本名称后跟'_parsed.json',包含:
- `sections`:按标题分组的文档文本结构
- `image_sources`:从图像标识符到其在文档中位置的映射
- `image_analysis`:由视觉大语言模型确定的每个图像的类型和内容描述
## 集成点
此技能作为文档分析管道中的初始处理步骤。其输出被冲突检测技能消费以识别文本和视觉内容之间的差异。
+105
View File
@@ -0,0 +1,105 @@
import logging
import os
import time
from typing import Optional
from openai import OpenAI
logger = logging.getLogger(__name__)
class LLMClient:
"""Low-level OpenAI-compatible LLM client with retry and token tracking.
Usage::
llm = LLMClient()
content = llm.chat("qwen3.5-flash", [{"role": "user", "content": "Hello"}])
print(llm.usage)
"""
IMAGE_MODEL = "qwen3-vl-plus"
TEXT_MODEL = "qwen3.5-flash-2026-02-23"
TIMEOUT = 120
MAX_RETRIES = 3
def __init__(
self,
*,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
timeout: int | None = None,
):
key = os.environ.get("DASHSCOPE_API_KEY", "")
if not key:
raise ValueError("DASHSCOPE_API_KEY environment variable is not set.")
self._client = OpenAI(api_key=key, base_url=base_url)
self._timeout = timeout or self.TIMEOUT
self._prompt_tokens = 0
self._completion_tokens = 0
@property
def usage(self) -> dict:
"""Return accumulated token counts as ``{prompt, completion, total}``."""
return {
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"total_tokens": self._prompt_tokens + self._completion_tokens,
}
@staticmethod
def estimate_tokens(text: str) -> int:
"""Quick token estimate. CJK ≈1.7/token, others ≈3.0/token."""
cjk = sum(1 for c in text if '' <= c <= '鿿' or ' ' <= c <= '')
other = len(text) - cjk
return max(1, int(cjk / 1.7 + other / 3.0))
@staticmethod
def estimate_image_tokens() -> int:
"""Fixed estimate for one vision-model image (~500 tokens)."""
return 500
def chat(
self, model: str, messages: list[dict], *, timeout: int | None = None,
response_format: dict | None = None,
) -> str:
"""Send a chat completion request and return the response content.
Automatically retries on failure and accumulates token usage.
"""
label = f"chat({model})"
def _call():
t0 = time.time()
kwargs = dict(model=model, messages=messages, timeout=timeout or self._timeout)
if response_format is not None:
kwargs["response_format"] = response_format
kwargs["temperature"] = 0
resp = self._client.chat.completions.create(**kwargs)
content = resp.choices[0].message.content
usg = resp.usage
if usg:
self._prompt_tokens += usg.prompt_tokens
self._completion_tokens += usg.completion_tokens
elapsed = time.time() - t0
logger.info("%s: %d chars in %.1fs", label, len(content) if content else 0, elapsed)
if not content:
raise RuntimeError("Empty response from LLM")
return content
return self._retry(_call, label)
def _retry(self, fn, label: str) -> str:
"""Call *fn()* with exponential-backoff retry."""
last_error: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
return fn()
except Exception as e:
last_error = e
logger.warning(
"%s error (attempt %d/%d): %s",
label, attempt + 1, self.MAX_RETRIES, e,
)
if attempt < self.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
raise RuntimeError(f"{label}: all retries exhausted") from last_error
@@ -0,0 +1,106 @@
import argparse
import json
import logging
import os
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from image_parser import ImageParser
from LLM import LLMClient
from word_parser import WordParser
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
RATE_LIMIT_DELAY = 0.5
def parse_document(
docx_path: str,
output_dir: str = "output",
*,
dry_run: bool = False,
) -> dict:
"""Parse a .docx file: extract text structure and parse embedded images.
Produces ``<basename>_parsed.json`` in *output_dir*.
"""
word = WordParser(docx_path)
basename = os.path.splitext(os.path.basename(docx_path))[0]
os.makedirs(output_dir, exist_ok=True)
images_dir = os.path.join(output_dir, "images")
# ---- extract sections and images -----------------------------------------
sections, image_sources = word.extract_sections()
logger.info("Document has %d sections, %d image sources", len(sections), len(image_sources))
# ---- parse images ----------------------------------------------------------
images = word.extract_images(images_dir)
logger.info("Found %d images in document", len(images))
image_analysis: list[dict] = []
if images:
llm = ImageParser()
for i, img in enumerate(images):
logger.info("[image %d/%d] rid=%s", i + 1, len(images), img["rid"])
if dry_run:
est = LLMClient.estimate_image_tokens()
logger.info(" [DRY RUN] would call vision LLM (~%d tokens)", est)
result = {"type": "other", "description": "[DRY RUN]"}
else:
result = llm.parse_image(img["path"])
if result is None:
result = {"type": "other", "description": ""}
result["rid"] = img["rid"]
result["path"] = img["path"]
image_analysis.append(result)
if i < len(images) - 1:
time.sleep(RATE_LIMIT_DELAY)
usg = llm.usage
logger.info("Tokens: %d prompt + %d completion = %d total",
usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"])
else:
logger.info("No images found in document")
# ---- build output --------------------------------------------------------
output = {
"source": os.path.abspath(docx_path),
"sections": sections,
"image_sources": image_sources,
"image_analysis": image_analysis,
}
parsed_path = os.path.join(output_dir, f"{basename}_parsed.json")
with open(parsed_path, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info("Saved: %s", parsed_path)
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Parse a .docx file: extract text structure and parse images.",
)
parser.add_argument("input", metavar="input.docx", help="Path to the Word document")
parser.add_argument("output_dir", nargs="?", default="output", metavar="output_dir",
help="Directory for output files (default: output/)")
parser.add_argument("--dry-run", action="store_true",
help="Print LLM prompts without calling the API.")
args = parser.parse_args()
parse_document(args.input, args.output_dir, dry_run=args.dry_run)
@@ -0,0 +1,123 @@
import base64
import logging
import os
from typing import Optional
from LLM import LLMClient
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prompts
# ---------------------------------------------------------------------------
PROMPT_IMAGE = """请分析这张图片,判断类型并输出文字描述。
## 判断图片类型
如果是 **流程图 / 架构图 / 状态图 / 时序图 / 活动图**,详细描述:
- 图中所有节点/步骤/状态/组件的名称
- 所有连线/箭头/转换关系及其方向
- 所有分支条件、判断逻辑和判断结果
- 所有文字标注、注释、标签
- 图的整体结构和逻辑流程
- 如果图片包含多个子图,拆解描述
如果是 **其他类型**(UI原型图 / 界面截图 / 设计稿 / 手机屏幕截图 / 网页截图等),简要描述图片内容。
## 输出格式
**1. 类型标签(单独一行):**
type: <flowchart|architecture|state|sequence|activity|other>
**2. 文字描述:**
该图片的详细文字描述。
不要输出 ---YAML--- 分隔符或 YAML 内容,不要添加任何额外的解释或问候语。"""
# ---------------------------------------------------------------------------
# ImageParser
# ---------------------------------------------------------------------------
class ImageParser:
"""Vision LLM wrapper for parsing images (type + description).
Usage::
parser = ImageParser()
result = parser.parse_image("images/img1.png")
"""
_VALID_TYPES = {"flowchart", "architecture", "state", "sequence", "activity", "text"}
def __init__(self, llm: LLMClient | None = None):
self._llm = llm or LLMClient()
@property
def usage(self) -> dict:
return self._llm.usage
def parse_image(self, image_path: str) -> Optional[dict]:
"""Parse an image and return its type and description (no YAML IR).
Returns ``{type, description}``, or *None* for UI mockups.
"""
logger.info("Parsing image: %s", image_path)
with open(image_path, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
mime = self._mime_type(image_path)
try:
content = self._llm.chat(
model=LLMClient.IMAGE_MODEL,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{img_b64}"}},
{"type": "text", "text": PROMPT_IMAGE},
],
}],
)
except RuntimeError as e:
logger.error(str(e))
return {"type": "other", "description": "", "error": str(e)}
parsed = self._parse_type_and_description(content)
if parsed is None:
return None
return {"type": parsed[0], "description": parsed[1]}
# ---- internals ----------------------------------------------------------
def _parse_type_and_description(self, content: str) -> Optional[tuple[str, str]]:
"""Extract ``(type, description)`` from LLM response.
Returns *None* for ``[[UI]]`` (skip).
"""
content = content.strip()
if content == "[[UI]]" or content.startswith("[[UI]]"):
return None
parsed_type = "other"
desc_lines: list[str] = []
for line in content.splitlines():
stripped = line.strip()
if (stripped.startswith("type:") or stripped.startswith("类型:")) and parsed_type == "other":
type_val = stripped.split(":", 1)[1].strip().lower()
if type_val in self._VALID_TYPES:
parsed_type = type_val
else:
desc_lines.append(line)
return parsed_type, "\n".join(desc_lines).strip()
@staticmethod
def _mime_type(image_path: str) -> str:
ext = os.path.splitext(image_path)[1].lstrip(".").lower()
return {
"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
"gif": "image/gif", "bmp": "image/bmp",
"webp": "image/webp", "svg": "image/svg+xml", "tiff": "image/tiff",
}.get(ext, "image/png")
@@ -0,0 +1,239 @@
import logging
import os
from docx import Document
from docx.table import Table
from docx.text.paragraph import Paragraph
logger = logging.getLogger(__name__)
IMAGE_EXT = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/bmp": ".bmp",
"image/tiff": ".tiff",
"image/webp": ".webp",
"image/x-emf": ".emf",
"image/x-wmf": ".wmf",
"image/svg+xml": ".svg",
}
class WordParser:
"""Parse a .docx file — extract images, split body into sections.
Usage::
parser = WordParser("doc.docx")
parser.extract_images("images/")
sections, image_sources = parser.extract_sections()
"""
HEADER_CELL_MAX_LEN = 20 # max chars per cell to treat first row as header
WML_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
DRAW_NS = "http://schemas.openxmlformats.org/drawingml/2006/main"
REL_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
def __init__(self, docx_path: str):
if not os.path.isfile(docx_path):
raise FileNotFoundError(f"Document not found: {docx_path}")
self._doc = Document(docx_path)
# ---- public API ---------------------------------------------------------
def extract_images(self, images_dir: str) -> list[dict]:
"""Save all images to *images_dir*. Returns ``[{rid, path}, ...]``."""
os.makedirs(images_dir, exist_ok=True)
images: list[dict] = []
for rel in self._doc.part.rels.values():
if "image" not in rel.reltype:
continue
ext = IMAGE_EXT.get(rel.target_part.content_type, ".png")
name = f"image_{rel.rId}{ext}"
path = os.path.join(images_dir, name)
with open(path, "wb") as f:
f.write(rel.target_part.blob)
images.append({"rid": rel.rId, "path": path})
return images
def extract_sections(self) -> tuple[list[dict], dict[str, dict]]:
"""Walk document body and split into sections by heading.
Returns:
*sections* — ``[{source, blocks, images}, ...]``
Each block is ``{type, index, text}`` (paragraph) or
``{type, table, headers, rows}`` (table).
*image_sources* — ``rid → {section, table?, row?, column?, name?}``
"""
sections: list[dict] = []
current_source = ""
blocks: list[dict] = []
section_images: list[str] = []
image_sources: dict[str, dict] = {}
para_idx = 0
tbl_idx = 0
for child in self._doc.element.body:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if tag == "p":
para = Paragraph(child, self._doc)
if self._heading_level(para) is not None:
heading_text = para.text.strip()
if heading_text: # ignore empty heading-like paragraphs
if blocks or section_images:
sections.append({
"source": current_source,
"blocks": blocks,
"images": list(section_images),
})
blocks = []
section_images = []
para_idx = 0
tbl_idx = 0
current_source = heading_text
continue
text = para.text.strip()
# Scan for images — append [[IMAGE:rid]] markers
for run in para.runs:
for rid in self._images_in(run._element):
text += f" [[IMAGE:{rid}]]"
section_images.append(rid)
image_sources[rid] = {"section": current_source}
if text.strip():
blocks.append({"type": "para", "index": para_idx + 1, "text": text.strip()})
para_idx += 1
elif tag == "tbl":
tbl_idx += 1
table = Table(child, self._doc)
# Collect all rows as [[cell_text, ...], ...]
all_rows: list[list[str]] = []
all_images: list[list[list[str]]] = [] # row → col → [rids]
for row in table.rows:
row_texts: list[str] = []
row_cell_images: list[list[str]] = []
for cell in row.cells:
cell_text = cell.text.strip()
cell_imgs: list[str] = []
for cp in cell.paragraphs:
for run in cp.runs:
for rid in self._images_in(run._element):
cell_imgs.append(rid)
# Replace images with markers in text
for rid in cell_imgs:
cell_text += f" [[IMAGE:{rid}]]"
section_images.append(rid)
row_texts.append(cell_text.strip())
row_cell_images.append(cell_imgs)
if any(row_texts) or any(row_cell_images):
all_rows.append(row_texts)
all_images.append(row_cell_images)
if len(all_rows) >= 2:
# Heuristic: first row is a header if every cell is short
first_row = all_rows[0]
has_header = all(len(c) < self.HEADER_CELL_MAX_LEN for c in first_row)
if has_header:
headers = first_row
data_rows_slice = zip(all_rows[1:], all_images[1:])
else:
headers = [f"{ci + 1}" for ci in range(len(first_row))]
data_rows_slice = zip(all_rows, all_images)
data_rows: list[dict] = []
for ri, (row_data, row_imgs) in enumerate(data_rows_slice):
columns: list[dict] = []
max_cols = max(len(headers), len(row_data))
for ci in range(max_cols):
hdr = headers[ci] if ci < len(headers) else ""
txt = row_data[ci] if ci < len(row_data) else ""
columns.append({
"name": hdr,
"row": ri + 1,
"col": ci + 1,
"text": txt,
})
# Register image sources with structured location
imgs = row_imgs[ci] if ci < len(row_imgs) else []
for rid in imgs:
image_sources[rid] = {
"section": current_source,
"table": tbl_idx,
"row": ri + 1,
"column": ci + 1,
"name": hdr,
}
data_rows.append({"columns": columns})
blocks.append({
"type": "table",
"table": tbl_idx,
"headers": headers,
"rows": data_rows,
})
elif all_rows:
# Degenerate table (only header or single row) — treat as plain rows
for ri, row_data in enumerate(all_rows):
row_text = " | ".join(row_data)
if row_text.strip():
blocks.append({
"type": "para",
"index": para_idx + 1,
"text": row_text,
})
para_idx += 1
if blocks or section_images:
sections.append({
"source": current_source,
"blocks": blocks,
"images": list(section_images),
})
return sections, image_sources
# ---- internals ----------------------------------------------------------
def _heading_level(self, para: Paragraph) -> int | None:
"""Heading level 1-9, or *None* if not a heading."""
if para.style and para.style.name:
name = para.style.name
for prefix in ("Heading", "标题"):
if name.startswith(prefix):
try:
return int(name.split()[-1])
except (ValueError, IndexError):
pass
pPr = para._element.find(f"{{{self.WML_NS}}}pPr")
if pPr is not None:
ol = pPr.find(f"{{{self.WML_NS}}}outlineLvl")
if ol is not None:
val = ol.get(f"{{{self.WML_NS}}}val")
if val is not None:
try:
return int(val) + 1
except ValueError:
pass
return None
def _images_in(self, element) -> list[str]:
"""Return rId values for drawings embedded in *element*."""
rids: list[str] = []
for drawing in element.findall(f".//{{{self.WML_NS}}}drawing"):
blip = drawing.find(f".//{{{self.DRAW_NS}}}blip")
if blip is not None:
rid = blip.get(f"{{{self.REL_NS}}}embed")
if rid:
rids.append(rid)
return rids
+46
View File
@@ -0,0 +1,46 @@
---
name: IR生成技能
description: 从处理后的文档生成结构化的JSON中间表示,合并冲突解决方案并维护源可追溯性。
---
# IR生成技能
## 概述
此技能从处理后的文档创建结构化的JSON中间表示(IR)。它在章节级别工作以提供全局上下文,合并冲突解决方案,并维护所有生成内容的源可追溯性。
## 功能
该技能:
- 在章节级别而非小块级别处理文档
- 在生成过程中合并冲突解决方案
- 生成文档内容的结构化JSON表示
- 维护包含章节和位置信息的源可追溯性
- 通过智能拆分大型章节同时保留冲突上下文来处理大型章节
- 确保处理过程中保持全局上下文
## 输入要求
- 更新文档JSON文件的路径(包含应用的解决方案)
- 可选输出目录规范
- 可选试运行标志,在不调用API的情况下预览大语言模型提示
## 输出
该技能生成一个结构化JSON文件,文件名为输入文档的基本名称后跟'_ir.json',包含:
- 文档内容的结构化表示
- 源跟踪信息(章节和位置)
- 转换字段中的清理和解析的JSON数据
- 适合下游处理的格式正确的中间表示
## 处理详情
- 每个完整章节(文本+图像)一起发送给大语言模型以获得全局上下文
- 解决的冲突更正注入到提示中,因此大语言模型使用校正后的值
- 超过约3000个标记的章节会智能拆分,同时在每个块中保留冲突上下文
- 每个IR条目包括source.section + source.location以实现可追溯性
- 处理后的内容以JSON格式结构化,便于机器读取
## 集成点
此技能消耗解决方案应用技能的输出并为文档分析管道生成最终结构化输出。
+105
View File
@@ -0,0 +1,105 @@
import logging
import os
import time
from typing import Optional
from openai import OpenAI
logger = logging.getLogger(__name__)
class LLMClient:
"""Low-level OpenAI-compatible LLM client with retry and token tracking.
Usage::
llm = LLMClient()
content = llm.chat("qwen3.5-flash", [{"role": "user", "content": "Hello"}])
print(llm.usage)
"""
IMAGE_MODEL = "qwen3-vl-plus"
TEXT_MODEL = "qwen3.5-flash-2026-02-23"
TIMEOUT = 120
MAX_RETRIES = 3
def __init__(
self,
*,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
timeout: int | None = None,
):
key = os.environ.get("DASHSCOPE_API_KEY", "")
if not key:
raise ValueError("DASHSCOPE_API_KEY environment variable is not set.")
self._client = OpenAI(api_key=key, base_url=base_url)
self._timeout = timeout or self.TIMEOUT
self._prompt_tokens = 0
self._completion_tokens = 0
@property
def usage(self) -> dict:
"""Return accumulated token counts as ``{prompt, completion, total}``."""
return {
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"total_tokens": self._prompt_tokens + self._completion_tokens,
}
@staticmethod
def estimate_tokens(text: str) -> int:
"""Quick token estimate. CJK ≈1.7/token, others ≈3.0/token."""
cjk = sum(1 for c in text if '' <= c <= '鿿' or ' ' <= c <= '')
other = len(text) - cjk
return max(1, int(cjk / 1.7 + other / 3.0))
@staticmethod
def estimate_image_tokens() -> int:
"""Fixed estimate for one vision-model image (~500 tokens)."""
return 500
def chat(
self, model: str, messages: list[dict], *, timeout: int | None = None,
response_format: dict | None = None,
) -> str:
"""Send a chat completion request and return the response content.
Automatically retries on failure and accumulates token usage.
"""
label = f"chat({model})"
def _call():
t0 = time.time()
kwargs = dict(model=model, messages=messages, timeout=timeout or self._timeout)
if response_format is not None:
kwargs["response_format"] = response_format
kwargs["temperature"] = 0
resp = self._client.chat.completions.create(**kwargs)
content = resp.choices[0].message.content
usg = resp.usage
if usg:
self._prompt_tokens += usg.prompt_tokens
self._completion_tokens += usg.completion_tokens
elapsed = time.time() - t0
logger.info("%s: %d chars in %.1fs", label, len(content) if content else 0, elapsed)
if not content:
raise RuntimeError("Empty response from LLM")
return content
return self._retry(_call, label)
def _retry(self, fn, label: str) -> str:
"""Call *fn()* with exponential-backoff retry."""
last_error: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
return fn()
except Exception as e:
last_error = e
logger.warning(
"%s error (attempt %d/%d): %s",
label, attempt + 1, self.MAX_RETRIES, e,
)
if attempt < self.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
raise RuntimeError(f"{label}: all retries exhausted") from last_error
@@ -0,0 +1,359 @@
#!/usr/bin/env python3
"""Generate JSON intermediate representation from ``_parsed.json`` or ``_updated.json``.
Sends the JSON document directly to the LLM for analysis. If the document exceeds
``MAX_ANALYSIS_TOKENS``, sections are batched greedily without splitting any
individual section. Conflict corrections from ``resolved_conflicts`` are included
so the output respects user arbitration decisions.
Usage::
python scripts/ir_generator.py output/<basename>_updated.json [output_dir] [--dry-run]
Output: ``<basename>_ir.json``
"""
import argparse
import json
import logging
import os
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from LLM import LLMClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
RATE_LIMIT_DELAY = 0.5
MAX_ANALYSIS_TOKENS = 6000 # max content size per LLM call
# ---------------------------------------------------------------------------
# Prompt
# ---------------------------------------------------------------------------
PROMPT = """你是一个需求文档分析助手。请分析以下需求文档的JSON内容,输出结构化JSON。
## 已知修正(来自冲突检测)
以下内容已确认修正,生成JSON时请**使用修正后的值**,不要同时输出两个版本。
{conflict_context}
## 待分析内容(JSON格式)
{content}
## JSON字段说明
- sections: 文档章节列表,每个章节含 source(章节标题)和 blocks(内容块数组)
- blocks: 类型含 para(段落,字段 text)和 table(表格,字段 rows,每行含 columns 数组)
- image_sources: 图片所在章节映射,key 为图片 rid
- image_analysis: 图片分析结果,每个含 rid、type(流程图/架构图/状态图等)、description
- resolved_conflicts: 已知修正列表,每个含 section、conflict_type、correction、source
## 功能点定义
只有满足以下**全部条件**的才视为功能点:
1. 描述了一个**系统或软件要实现的具体行为**(有触发条件、执行动作、状态变化或逻辑规则)
2. 该行为直接由**系统或框架**执行(不是人的操作流程、管理流程)
3. 对用户或系统有**可观察的效果**
**以下内容不是功能点,不要输出:**
- 术语/缩略词定义(
- 文档背景、范围说明(如 "本文档涵盖xxx"
- 变更日志、版本记录、编制人信息
- 文档结构描述(如 "产品简介用户场景说明"
- 纯文本的概述、没有具体行为的介绍
## 决策树/流程图分解规则(重要)
图片分析(image_analysis)中的流程图和决策树描述包含丰富的功能逻辑,**必须完全分解**:
1. **每个叶子路径 = 一个独立 function**:从根节点到每个最终结果的完整路径,都拆成一个 function
2. **每个判断分支 = 一个独立 function**:菱形判断节点的每个分支方向和对应的结果,单独作为一个 function
3. **不同约束条件 = 不同 function**:例如"通过接入SDK限制""通过系统限制"是不同约束机制,必须分别列出
4. **不要合并不同路径**:即使最终结果相同,只要到达路径不同,就是不同的 function
## 输出格式
只输出功能点,每个功能点格式如下:
{
"function": "功能名称",
"source": {
"section": "章节名",
"location": "原文位置(如:正文第1段、表格1第2行、图片rId13)"
},
"trigger": {
"type": "AND或者OR",
"conditions": [
"触发条件1",
"触发条件2"
]
},
"actions": {
"场景/角色": [
"动作1",
"动作2"
]
}
}
## 输出原则
1. **只输出功能点**,没有功能点就输出空数组 []
2. 每个功能点**必须**包含 source.section 和 source.location
3. location 必须是具体的原文位置标签(如 "正文第1段""表格1""图片rId13"
4. **一个 function 只对应一种行为逻辑(一条完整路径)**。决策树中的每个分支路径(从根到叶子)必须拆成独立 function,conditions 中明确写出该路径上的所有判断条件和分支方向。
5. **穷举所有分支**:流程图/决策树中的每一条分支路径都要输出对应的 function,不能遗漏任何子逻辑。
6. 没有 trigger 或 actions 的字段直接**省略**,不要写 null 或空列表/空对象
7. 所有功能点全部列出,**宁多勿漏**
8. **已知修正**中确认的信息,使用修正后的值
9. 输出一个JSON数组,不要用 ```json 代码块包裹,直接输出纯JSON
"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_llm_response(raw: str) -> list | dict | str | None:
"""Parse JSON from LLM response, handling markdown code fences."""
if raw is None:
return None
stripped = raw.strip()
if stripped.startswith("```"):
nl = stripped.find("\n")
stripped = stripped[nl + 1:] if nl != -1 else stripped[3:]
if stripped.endswith("```"):
stripped = stripped[:-3]
try:
return json.loads(stripped)
except json.JSONDecodeError:
logger.warning(" Failed to parse JSON, returning raw text")
return raw
def _build_conflict_context(
section_name: str | None,
resolved_conflicts: list[dict],
) -> str:
"""Build conflict correction context for a section, or all if section_name is None."""
if section_name is None:
relevant = resolved_conflicts
else:
relevant = [c for c in resolved_conflicts if c.get("section", "") == section_name]
if not relevant:
return "没有"
lines: list[str] = []
for c in relevant:
correction = c.get("correction", "")
conflict_type = c.get("conflict_type", "")
source = c.get("source", "")
lines.append(f"- 冲突类型:{conflict_type},依据:{source}")
lines.append(f" 修正后的值:{correction}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# LLM analysis
# ---------------------------------------------------------------------------
def _analyze_content(
content: str,
conflict_context: str,
llm: LLMClient,
*,
dry_run: bool = False,
) -> list[dict]:
"""Send content to the LLM and return IR entries."""
prompt = PROMPT.replace("{conflict_context}", conflict_context).replace("{content}", content)
if dry_run:
est = llm.estimate_tokens(prompt)
logger.info(" [DRY RUN] prompt ~%d tokens", est)
return []
try:
raw = llm.chat(
model=LLMClient.TEXT_MODEL,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
logger.info(" Response: %d chars", len(raw))
except RuntimeError as e:
logger.error(" Analysis failed: %s", e)
return []
parsed = _parse_llm_response(raw)
if isinstance(parsed, list):
return parsed
elif isinstance(parsed, dict):
return [parsed]
else:
logger.warning(" Unparseable response, raw length: %d", len(raw))
return []
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def generate_ir(
parsed_path: str,
output_dir: str = "output",
*,
dry_run: bool = False,
) -> dict:
"""Read parsed/updated JSON and generate JSON IR.
Produces ``<basename>_ir.json`` in *output_dir*.
"""
with open(parsed_path, "r", encoding="utf-8") as f:
data = json.load(f)
basename = os.path.splitext(os.path.basename(parsed_path))[0]
for suffix in ("_parsed", "_updated"):
if basename.endswith(suffix):
basename = basename[:-len(suffix)]
break
os.makedirs(output_dir, exist_ok=True)
llm = LLMClient()
ir_output: list[dict] = []
sections = data.get("sections", [])
image_sources = data.get("image_sources", {})
image_analysis = data.get("image_analysis", [])
resolved_conflicts = data.get("resolved_conflicts", [])
# Build full document JSON to measure size
full_doc = {
"sections": sections,
"image_sources": image_sources,
"image_analysis": image_analysis,
}
full_json = json.dumps(full_doc, ensure_ascii=False)
total_chars = len(full_json)
logger.info("Total document JSON chars: %d", total_chars)
if total_chars < MAX_ANALYSIS_TOKENS:
logger.info("Document fits in one request (< %d chars)", MAX_ANALYSIS_TOKENS)
conflict_ctx = _build_conflict_context(None, resolved_conflicts)
entries = _analyze_content(full_json, conflict_ctx, llm, dry_run=dry_run)
ir_output.extend(entries)
else:
logger.info("Document is large (>= %d chars), batching sections", MAX_ANALYSIS_TOKENS)
# Filter to non-empty sections, measure effective size per section
# (section JSON + image_sources + image_analysis for images in that section)
sec_sizes = []
for sec in sections:
if not sec.get("blocks"):
continue
sec_json = json.dumps(sec, ensure_ascii=False)
sec_chars = len(sec_json)
# Add image overhead for this section
sec_name = sec.get("source", "")
sec_rids = [rid for rid, src in image_sources.items()
if src.get("section", "") == sec_name]
if sec_rids:
overhead_doc = {
"image_sources": {rid: image_sources[rid] for rid in sec_rids},
"image_analysis": [img for img in image_analysis
if img.get("rid", "") in sec_rids],
}
sec_chars += len(json.dumps(overhead_doc, ensure_ascii=False))
sec_sizes.append((sec, sec_chars))
# Greedy batch: never split a section, keep adding until next exceeds limit
i = 0
while i < len(sec_sizes):
batch = []
batch_size = 0
while i < len(sec_sizes) and batch_size + sec_sizes[i][1] <= MAX_ANALYSIS_TOKENS:
batch.append(sec_sizes[i][0])
batch_size += sec_sizes[i][1]
i += 1
if not batch:
i += 1
continue
# Collect sections and their images for this batch
batch_names = [s.get("source", "") for s in batch]
batch_image_sources = {
rid: src for rid, src in image_sources.items()
if src.get("section", "") in batch_names
}
batch_images = [
img for img in image_analysis
if image_sources.get(img.get("rid", ""), {}).get("section", "") in batch_names
]
batch_doc = {
"sections": batch,
"image_sources": batch_image_sources,
"image_analysis": batch_images,
}
batch_json = json.dumps(batch_doc, ensure_ascii=False)
# Merge conflict contexts
ctx_parts = []
for sn in batch_names:
ctx = _build_conflict_context(sn, resolved_conflicts)
if ctx != "没有":
ctx_parts.append(ctx)
conflict_ctx = "\n".join(ctx_parts) if ctx_parts else "没有"
label = " + ".join(batch_names)
logger.info("Batch [%s]: %d sections, %d chars", label, len(batch), len(batch_json))
entries = _analyze_content(batch_json, conflict_ctx, llm, dry_run=dry_run)
ir_output.extend(entries)
time.sleep(RATE_LIMIT_DELAY)
# ---- save ----------------------------------------------------------------
ir_path = os.path.join(output_dir, f"{basename}_ir.json")
os.makedirs(os.path.dirname(ir_path) or ".", exist_ok=True)
with open(ir_path, "w", encoding="utf-8") as f:
json.dump(ir_output, f, ensure_ascii=False, indent=2)
logger.info("Saved: %s (%d entries)", ir_path, len(ir_output))
# ---- summary -------------------------------------------------------------
usg = llm.usage
logger.info("Tokens: %d prompt + %d completion = %d total",
usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"])
logger.info("Output: %s", ir_path)
return {"ir": ir_output, "path": ir_path}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate JSON intermediate representation from parsed/updated JSON.",
)
parser.add_argument("input", metavar="parsed.json",
help="Path to _parsed.json or _updated.json")
parser.add_argument("output_dir", nargs="?", default="output", metavar="output_dir",
help="Directory for output files (default: output/)")
parser.add_argument("--dry-run", action="store_true",
help="Print token estimates without calling the API.")
args = parser.parse_args()
generate_ir(args.input, args.output_dir, dry_run=args.dry_run)
@@ -0,0 +1,49 @@
import argparse
import sys
import yaml
def print_ir(yaml_path: str) -> None:
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, list):
print(f"Expected a YAML list, got {type(data).__name__}")
sys.exit(1)
for i, item in enumerate(data):
yc = item.get("converted")
if yc is None:
continue
# yaml_content is a raw string that happens to look like YAML —
# parse it then pretty-print it back
if isinstance(yc, str):
stripped = yc.strip()
# Strip markdown code fences: ```yaml / ``` at start, ``` at end
if stripped.startswith("```"):
first_newline = stripped.find("\n")
stripped = stripped[first_newline + 1:] if first_newline != -1 else stripped[3:]
if stripped.endswith("```"):
stripped = stripped[:-3]
try:
parsed = yaml.safe_load(stripped)
except yaml.YAMLError:
print(yc)
raise
else:
parsed = yc
if isinstance(parsed, (dict, list)):
print(yaml.dump(parsed, allow_unicode=True, default_flow_style=False, sort_keys=False).rstrip())
else:
print(str(parsed))
print("=" * 60)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pretty-print yaml_content fields from an analysis YAML file.")
parser.add_argument("input", metavar="file.yaml", help="Path to a _文字.yaml or _图片.yaml file")
args = parser.parse_args()
print_ir(args.input)
@@ -0,0 +1,907 @@
{
"source": "D:\\projects\\jike\\车机娱乐系统禁止功能文档_精简.docx",
"sections": [
{
"source": "XX Auto行车娱乐限制功能PRD _ V1.0",
"blocks": [
{
"type": "para",
"index": 1,
"text": "功能号:SW-55-33-22"
}
],
"images": []
},
{
"source": "编制/变更日志",
"blocks": [
{
"type": "para",
"index": 1,
"text": "包含变更时间、变更内容简述、编制人"
},
{
"type": "table",
"table": 1,
"headers": [
"时间",
"版本号",
"作者",
"主要变更内容"
],
"rows": [
{
"columns": [
{
"name": "时间",
"row": 1,
"col": 1,
"text": "2025-11-11"
},
{
"name": "版本号",
"row": 1,
"col": 2,
"text": "1.0"
},
{
"name": "作者",
"row": 1,
"col": 3,
"text": "A"
},
{
"name": "主要变更内容",
"row": 1,
"col": 4,
"text": "初版撰写"
}
]
}
]
}
],
"images": []
},
{
"source": "1.1 文档背景",
"blocks": [
{
"type": "para",
"index": 1,
"text": "本文应用于XX集团的xx Auto系统行车娱乐限制功能的需求文档,详细描述系统在行车状态下对于视频、游戏等限制的场景分析,应用流程,详细策略设计等。项目实现过程中,请以详细设计为基础进行需求实现,如遇到歧义或变更,请及时沟通。"
}
],
"images": []
},
{
"source": "1.2 文档范围",
"blocks": [
{
"type": "para",
"index": 1,
"text": "当前文档包括了国内xx Auto系统行车娱乐限制功能的产品定义,以及海外车型的平台化系统行车娱乐限制功能的产品定义(仅针对视频、游戏等开车完全不能使用APP所有功能的场景)"
},
{
"type": "para",
"index": 2,
"text": "不包括部分在应用内开发的行车娱乐限制功能的定义,例如图库APP内的视频播放功能、多屏同看、DVR行车记录仪等功能(这些业务属于部分功能需要进行行车娱乐限制,相关定义在各模块的PRD内进行描述)"
}
],
"images": []
},
{
"source": "1.3 术语解释",
"blocks": [
{
"type": "table",
"table": 1,
"headers": [
"术语 / 缩略词",
"说明"
],
"rows": [
{
"columns": [
{
"name": "术语 / 缩略词",
"row": 1,
"col": 1,
"text": "CSD"
},
{
"name": "说明",
"row": 1,
"col": 2,
"text": "中控屏"
}
]
},
{
"columns": [
{
"name": "术语 / 缩略词",
"row": 2,
"col": 1,
"text": "PSD"
},
{
"name": "说明",
"row": 2,
"col": 2,
"text": "副驾屏"
}
]
},
{
"columns": [
{
"name": "术语 / 缩略词",
"row": 3,
"col": 1,
"text": "RFD(左/右)"
},
{
"name": "说明",
"row": 3,
"col": 2,
"text": "后排吸顶屏"
}
]
}
]
}
],
"images": []
},
{
"source": "1.4.1 当前功能相关文档",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "1.4.2 政策法规文件",
"blocks": [
{
"type": "para",
"index": 1,
"text": "方案总结:"
},
{
"type": "table",
"table": 1,
"headers": [
"列1",
"列2",
"列3"
],
"rows": [
{
"columns": [
{
"name": "列1",
"row": 1,
"col": 1,
"text": "方案总结"
},
{
"name": "列2",
"row": 1,
"col": 2,
"text": "CSD"
},
{
"name": "列3",
"row": 1,
"col": 3,
"text": "中国大陆:上“行车娱乐限制”功能开关,允许用户打开/关闭此功能\n其余国家及地区(全球范围内):上“行车娱乐限制”功能,并且强制打开"
}
]
},
{
"columns": [
{
"name": "列1",
"row": 2,
"col": 1,
"text": "方案总结"
},
{
"name": "列2",
"row": 2,
"col": 2,
"text": "PSD"
},
{
"name": "列3",
"row": 2,
"col": 3,
"text": "巴西、加纳、新加坡:上“行车娱乐限制”功能,并强制打开,无需配置“防窥膜”;\n韩国、南非、美国、新西兰:1. 若考虑到用户体验,则配置“防窥膜”,无需“行车娱乐限制”;2. 若考虑到成本(一个防窥膜400RMB),则只上“行车娱乐限制”,无需“防窥膜”\n其余国家及地区(全球范围内):可视同中国大陆方案\n香港、台湾、毛里求斯市场:不能预装任何视频软件(如Youtube)、游戏、浏览器、短信通讯软件(如Wechat、SMS/Message)、相册、相机。同时若具备RADIO、MUSIC,则不能显示歌词。CPAA映射不在考虑范围,属于客户行为,不做管控。"
}
]
}
]
},
{
"type": "para",
"index": 2,
"text": "相关法规可参考法规指导书:座舱域法规指导书"
}
],
"images": []
},
{
"source": "1.4.3 行业规范文件",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "2.1 产品场景及概要说明",
"blocks": [
{
"type": "para",
"index": 1,
"text": "产品简介用户场景说明,简单表述,让人快速看懂对用户来说这个功能是干嘛的"
},
{
"type": "para",
"index": 2,
"text": "● 产品场景说明:为了用户行车时的安全考虑,用户不可以在开车时播放视频、玩游戏"
},
{
"type": "para",
"index": 3,
"text": "● 产品可用地点: 搭载xx AUTO系统的车型座舱,海外平台化的车型"
},
{
"type": "para",
"index": 4,
"text": "● 产品使用方式:车机交互"
},
{
"type": "para",
"index": 5,
"text": "● 产品关键参数:无"
}
],
"images": []
},
{
"source": "2.2 产品架构/系统架构",
"blocks": [
{
"type": "para",
"index": 1,
"text": "系统架构"
}
],
"images": []
},
{
"source": "2.3 产品梯度配置说明",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "2.4 功能流程图",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "2.5 关键状态流转",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "2.6.1 硬件配置",
"blocks": [
{
"type": "para",
"index": 1,
"text": "无"
}
],
"images": []
},
{
"source": "2.7 功能列表",
"blocks": [
{
"type": "table",
"table": 1,
"headers": [
"一级功能",
"二级功能",
"三级功能",
"功能描述"
],
"rows": [
{
"columns": [
{
"name": "一级功能",
"row": 1,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 1,
"col": 2,
"text": "系统行车娱乐限制"
},
{
"name": "三级功能",
"row": 1,
"col": 3,
"text": "行车娱乐限制"
},
{
"name": "功能描述",
"row": 1,
"col": 4,
"text": "行车娱乐限制开关功能开启后,受限应用在满足条件后会退出/暂停应用,同时 toast提示用户"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 2,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 2,
"col": 2,
"text": "系统行车娱乐限制"
},
{
"name": "三级功能",
"row": 2,
"col": 3,
"text": "行车娱乐禁止"
},
{
"name": "功能描述",
"row": 2,
"col": 4,
"text": "行车娱乐限制开关功能开启后,非 P 挡时框架禁止受限应用的启动,同时 toast提示用户"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 3,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 3,
"col": 2,
"text": "系统行车娱乐限制"
},
{
"name": "三级功能",
"row": 3,
"col": 3,
"text": "行车娱乐限制SDK"
},
{
"name": "功能描述",
"row": 3,
"col": 4,
"text": "提供行车娱乐限制SDK,通过SDK的能力,业务可定义限制目标功能(视频,游戏等)"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 4,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 4,
"col": 2,
"text": "行车娱乐限制开关"
},
{
"name": "三级功能",
"row": 4,
"col": 3,
"text": "默认开启"
},
{
"name": "功能描述",
"row": 4,
"col": 4,
"text": "开关开启,开启娱乐限制逻辑,用户启动受限应用时,框架拦截应用的启动,同时 toast提示用户"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 5,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 5,
"col": 2,
"text": "行车娱乐限制开关"
},
{
"name": "三级功能",
"row": 5,
"col": 3,
"text": "关闭开关"
},
{
"name": "功能描述",
"row": 5,
"col": 4,
"text": "开关关闭,关闭娱乐限制逻辑,点击关闭按钮时弹出风险确认弹窗,用户阅读风险提示后选择关闭或取消;\n选择关闭后,娱乐限制功能关闭受限应用在非P挡启动时,框架不再限制和提示用户选择取消后,弹窗消失,开关保持开启,功能保持开启"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 6,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 6,
"col": 2,
"text": "行车娱乐限制开关"
},
{
"name": "三级功能",
"row": 6,
"col": 3,
"text": "非 P 挡时开关置灰逻辑"
},
{
"name": "功能描述",
"row": 6,
"col": 4,
"text": "非 P 挡时行车娱乐限制开关置灰,须在P挡时才可以操作开关状态"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 7,
"col": 1,
"text": "国内行车娱乐限制"
},
{
"name": "二级功能",
"row": 7,
"col": 2,
"text": "多屏场景"
},
{
"name": "三级功能",
"row": 7,
"col": 3,
"text": "多屏场景"
},
{
"name": "功能描述",
"row": 7,
"col": 4,
"text": "针对车内有多个屏幕的场景进行详细描述"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 8,
"col": 1,
"text": "国外行车娱乐限制"
},
{
"name": "二级功能",
"row": 8,
"col": 2,
"text": "系统行车娱乐限制"
},
{
"name": "三级功能",
"row": 8,
"col": 3,
"text": "行车娱乐限制"
},
{
"name": "功能描述",
"row": 8,
"col": 4,
"text": "受限应用在满足条件后会退出/暂停应用,同时 toast提示用户"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 9,
"col": 1,
"text": "国外行车娱乐限制"
},
{
"name": "二级功能",
"row": 9,
"col": 2,
"text": "系统行车娱乐限制"
},
{
"name": "三级功能",
"row": 9,
"col": 3,
"text": "行车娱乐禁止"
},
{
"name": "功能描述",
"row": 9,
"col": 4,
"text": "非 P 挡或者车速>0时框架禁止受限应用的启动,同时 toast提示用户"
}
]
},
{
"columns": [
{
"name": "一级功能",
"row": 10,
"col": 1,
"text": "国外行车娱乐限制"
},
{
"name": "二级功能",
"row": 10,
"col": 2,
"text": "多屏场景"
},
{
"name": "三级功能",
"row": 10,
"col": 3,
"text": "多屏场景"
},
{
"name": "功能描述",
"row": 10,
"col": 4,
"text": "针对车内有多个屏幕的场景进行详细描述"
}
]
}
]
}
],
"images": []
},
{
"source": "3.1 国内行车娱乐限制",
"blocks": [
{
"type": "para",
"index": 1,
"text": "下面的任务管理器图显示了行车娱乐限制清单:W899应用矩阵表(多屏互动)(手机互联应用由应用接入SDK自行实现行车娱乐限制功能)"
},
{
"type": "table",
"table": 1,
"headers": [
"功能",
"功能详细说明"
],
"rows": [
{
"columns": [
{
"name": "功能",
"row": 1,
"col": 1,
"text": "方案"
},
{
"name": "功能详细说明",
"row": 1,
"col": 2,
"text": "行车娱乐限制的实现有两种限制方法,分别是系统限制和SDK限制"
}
]
},
{
"columns": [
{
"name": "功能",
"row": 2,
"col": 1,
"text": "系统限制"
},
{
"name": "功能详细说明",
"row": 2,
"col": 2,
"text": "● 实现方法:本地和云端配置受行车限制应用的包名,系统针对已配置的包名,按照规则(4.1.2)限制其进程\n● 适用范围:视频类,游戏类,视频投屏类,KTV应用,浏览器等\n● 配置名单及方法桌面&系统相关配置 \n● 行车娱乐限制的应用在应用列表需显示P档标志"
}
]
},
{
"columns": [
{
"name": "功能",
"row": 3,
"col": 1,
"text": "SDK限制"
},
{
"name": "功能详细说明",
"row": 3,
"col": 2,
"text": "● 实现方法:应用接入对应的SDK,通过SDK的能力,业务可定义限制目标功能(视频,游戏等)\n● 适用范围:车辆设置,图库,语音,桌面,用户手册,Carplay、HiCar等"
}
]
}
]
},
{
"type": "para",
"index": 2,
"text": "** 国内的行车娱乐限制应用清单可以在魅族的后台进行配置,新增应用可以走云端修改,可以不用修改本地代码进行发版。(本地也有一份清单作为兜底的)"
},
{
"type": "para",
"index": 3,
"text": "下面的任务管理器图显示了行车娱乐限制清单:W899应用矩阵表(多屏互动)(手机互联应用由应用接入SDK自行实现行车娱乐限制功能)"
},
{
"type": "para",
"index": 4,
"text": "应用禁用:"
},
{
"type": "para",
"index": 5,
"text": "[[IMAGE:rId12]]"
}
],
"images": [
"rId12"
]
},
{
"source": "3.1.1 系统行车娱乐限制",
"blocks": [
{
"type": "table",
"table": 1,
"headers": [
"功能",
"功能详细说明"
],
"rows": [
{
"columns": [
{
"name": "功能",
"row": 1,
"col": 1,
"text": "开关"
},
{
"name": "功能详细说明",
"row": 1,
"col": 2,
"text": "● 该开关处于开启状态时行车娱乐限制生效,关闭时行车娱乐限制功能不生效\n● 该开关具体功能细节见4.1.3"
}
]
},
{
"columns": [
{
"name": "功能",
"row": 2,
"col": 1,
"text": "两种限制规则"
},
{
"name": "功能详细说明",
"row": 2,
"col": 2,
"text": "1.  系统限制方案:\n行车娱乐限制的应用在应用列表需显示P档标志;\n● 行车娱乐限制:\n 目标应用/功能处于前台时\n○ 打断:车速≥15km/h且持续5秒后,将目标应用/功能退至后台或暂停对应功能(Toast文案参考UE)\n■ 并发起toast提示用户在行车状态下无法使用\n● 行车娱乐禁止:\n 目标应用/功能处于后台时\n○ 限制:非P挡时,限制目标应用/功能启用\n■ toast提示用户请在P挡时使用该功能/应用(Toast文案参考UE)\n2.  应用自行限制方案:\n应用可根据业务需求,接入SDK自行实现限制的业务,可自定义Toast文案,并可以只对APP内部分功能进行行车娱乐限制"
}
]
},
{
"columns": [
{
"name": "功能",
"row": 3,
"col": 1,
"text": "相关流程图"
},
{
"name": "功能详细说明",
"row": 3,
"col": 2,
"text": "[[IMAGE:rId13]]"
}
]
},
{
"columns": [
{
"name": "功能",
"row": 4,
"col": 1,
"text": "交互图"
},
{
"name": "功能详细说明",
"row": 4,
"col": 2,
"text": "[[IMAGE:rId14]] [[IMAGE:rId15]]"
}
]
}
]
}
],
"images": [
"rId13",
"rId14",
"rId15"
]
},
{
"source": "四、非功能说明",
"blocks": [
{
"type": "para",
"index": 1,
"text": "不涉及"
}
],
"images": []
}
],
"image_sources": {
"rId12": {
"section": "3.1 国内行车娱乐限制"
},
"rId13": {
"section": "3.1.1 系统行车娱乐限制",
"table": 1,
"row": 3,
"column": 2,
"name": "功能详细说明"
},
"rId14": {
"section": "3.1.1 系统行车娱乐限制",
"table": 1,
"row": 4,
"column": 2,
"name": "功能详细说明"
},
"rId15": {
"section": "3.1.1 系统行车娱乐限制",
"table": 1,
"row": 4,
"column": 2,
"name": "功能详细说明"
}
},
"image_analysis": [
{
"type": "flowchart",
"description": "**1. 类型标签(单独一行):**\n\n**2. 文字描述:**\n该图片是一个关于“行车娱乐开关状态”下应用使用权限控制的决策流程图,采用标准流程图符号(圆角矩形表示起始/终止/处理步骤,菱形表示判断条件,平行四边形表示输入/操作动作,箭头表示流向),整体结构为自上而下的树状分支逻辑。\n\n顶层节点为蓝色圆角矩形“行车娱乐开关状态”,其向下分出两条主路径:\n- 左侧路径指向绿色圆角矩形“开启”\n- 右侧路径指向绿色圆角矩形“关闭”\n\n“关闭”路径直接连接至蓝色圆角矩形“不受限”,表示当行车娱乐功能关闭时,所有应用均不受限制。\n\n“开启”路径进一步分为三条并行子路径:\n1. **“其他应用”路径**(左侧):\n - 绿色圆角矩形“其他应用” → 蓝色圆角矩形“不受限”\n\n2. **“通过接入SDK限制的应用”路径**(中间):\n - 绿色圆角矩形“通过接入SDK限制的应用” → 形判断“是否在目标场景”\n - 若“否” → 黄色平行四边形“点击进入使用目标功能或进入对应页面” → 菱形判断“车辆是否达到限制条件:非P档”\n - 若“否” → 蓝色圆角矩形“不受限”\n - 若“是” → 绿色圆角矩形“应用启动被限制” → 蓝色圆角矩形“发起toast,文案由业务定义”\n - 若“是” → 菱形判断“车辆是否达到限制条件:车速>=15km/h,且超过5秒”\n - 若“否” → 蓝色圆角矩形“不受限”\n - 若“是” → 绿色圆角矩形“暂停功能,或退出对应页面” → 蓝色圆角矩形“发起toast,文案由业务定义”\n\n3. **“通过系统限制应用”路径**(右侧):\n - 绿色圆角矩形“通过系统限制应用” → 菱形判断“目标应用是否在前台”\n - 若“否” → 黄色平行四边形“点击应用,将其调用至前台” → 菱形判断“车辆是否达到限制条件:非P档”\n - 若“否” → 蓝色圆角矩形“不受限”\n - 若“是” → 绿色圆角矩形“应用启动被限制” → 蓝色圆角矩形“Toast提示‘请在P档是使用该功能’”\n - 若“是” → 菱形判断“车辆是否达到限制条件:车速>=15km/h,且超过5秒”\n - 若“否” → 蓝色圆角矩形“不受限”\n - 若“是” → 绿色圆角矩形“应用被打断,并调至后台” → 蓝色圆角矩形“Toast提示‘在行车状态下无法使用该应用’”\n\n所有判断节点均标注“是”或“否”分支标签;所有最终输出节点均为蓝色圆角矩形,包含具体行为(如“不受限”、“发起toast”等)及部分Toast文案说明。流程图清晰表达了不同应用类型、不同触发方式(点击进入 vs 点击调用)、不同车辆状态(车速、档位、是否前台)组合下的权限控制策略与用户反馈机制。",
"rid": "rId13",
"path": "D:\\projects\\jike\\output\\images\\image_rId13.png"
},
{
"type": "other",
"description": "该图片是一张 Windows 系统“任务管理器”界面的截图,当前选中的是左侧导航栏中的“启动应用”选项卡。界面顶部有搜索框,提示文字为“键入要搜索的名称、发布者或 PID”。主区域标题为“启动应用”,下方是一个表格,包含四列:名称、发布者、状态、启动影响。表格中列出了多个开机自启程序及其信息,包括:\n- XboxMicrosoft Corporation,已禁用,无影响)\n- 终端(Microsoft Corporation,已禁用,无影响)\n- Microsoft 365 CopilotMicrosoft Corporation,已禁用,未计量)\n- IntelGraphicsSoftwareIntel,已启用,未计量)\n- WXWork(腾讯,已启用,未计量)\n- msegde(Microsoft,已启用,未计量)\n- Feishu(字节跳动,已启用,未计量)\n- OneDriveMicrosoft,已启用,未计量)\n- LogiLDA.DLLLogitech,已启用,未计量)\n- RAVCpl64Realtek,已启用,未计量)\n- RtkAudUService64Realtek,已启用,未计量)\n- SecurityHealthSystrayMicrosoft,已启用,未计量)\n\n左侧导航栏从上至下依次为:任务管理器(标题)、进程、性能、应用历史记录、启动应用(高亮选中)、用户、详细信息、服务。整体为浅色主题的 Windows 11 风格 UI。",
"rid": "rId12",
"path": "D:\\projects\\jike\\output\\images\\image_rId12.png"
},
{
"type": "other",
"description": "该图片是一张车载中控屏幕的界面截图,显示的是车辆的360度环视影像(鸟瞰视角),中央为一辆白色汽车的俯视模型,车头朝上,车身下方及两侧有蓝色半透明区域表示传感器探测范围或辅助驾驶视野。屏幕顶部左侧有一个用户头像图标,右侧有信号、Wi-Fi、时间(11:11)等状态图标。屏幕底部为功能控制栏,从左到右依次为:车辆设置、主页、空调、电话、音量调节(当前75%)、应用菜单(九宫格图标)、媒体播放控制(播放/暂停、上一曲、下一曲)、麦克风(语音输入)、扬声器(音量调节)等图标;右下角有“百度AI生成”水印。图片上方标注“C. 语音”,下方有一行文字说明:“1,如果行车中使用语音打开受限应用,语音播报回复请在P档使用该功能”。整体为现代智能电动汽车中控UI界面,风格简洁、扁平化,以浅灰白为主色调。",
"rid": "rId15",
"path": "D:\\projects\\jike\\output\\images\\image_rId15.png"
},
{
"type": "flowchart",
"description": "2. 文字描述: \n该图片为一张包含两个独立场景的车载系统功能逻辑流程图,用于说明“行车娱乐限制”与“行车娱乐禁止”两种模式下受限应用的自动关闭或启用规则。整体采用左右分栏+上下分区结构,每部分均含“A. 应用列表”界面截图、“判断逻辑流程图”和“B. Toast提示”界面截图,并配有编号步骤说明。\n\n**第一部分:01,行车娱乐限制——已打开受限应用,在满足条件后自动关闭** \n- A. 应用列表:显示一个简化车载中控屏界面,顶部有用户头像,中央大区域显示“应用”字样,底部为状态栏(含时间、信号、音量等图标)。 \n- 判断逻辑流程图(居中): \n - 起始节点为菱形判断框:“应用是否属于被限制应用” \n - 若“否” → 指向矩形操作框:“保持不变” \n - 若“是” → 进入下一菱形判断框:“速度超过15km/h并持续5s” \n - 若“否” → 指向矩形操作框:“保持不变” \n - 若“是” → 指向右侧B图(Toast提示) \n- B. Toast提示:显示车载中控屏界面,中央为车辆俯视图及雷达扫描区域,上方弹出Toast提示框,文字为:“在行驶状态下无法使用该功能”。下方标注步骤“1,自动关闭应用,并Toast提示”。\n\n**第二部分:02,行车娱乐禁止——打开受限的应用** \n- A. 应用列表:显示完整车载应用网格界面,含“首页”“媒体中心”“蓝牙电话”“设置”“应用市场”等12个图标化应用入口;其中“应用市场”图标被红色箭头指向。 \n- B. Toast提示:同样为应用列表界面,但顶部叠加灰色半透明Toast提示框,文字为:“请在P档下使用该功能”。左侧用户头像旁有红色箭头从A图“应用市场”指向此处Toast提示位置。 \n- 下方标注步骤:“1,如果此时挂P档,点击则Toast提示,反之则正常打开”,表明该Toast仅在非P档时触发,P档时可正常使用。\n\n整图通过流程图+UI截图组合方式,清晰表达两种行车安全策略下的交互逻辑与反馈机制,属于典型的车载HMI功能逻辑流程图。",
"rid": "rId14",
"path": "D:\\projects\\jike\\output\\images\\image_rId14.png"
}
],
"resolved_conflicts": [
{
"conflict_id": 0,
"conflict_type": "condition_mismatch",
"section": "3.1.1 系统行车娱乐限制",
"resolution": "以文字为准",
"correction": "文字中描述的系统限制应用(前台)触发条件为:'车速≥15km/h 且持续 5 秒'",
"source": "文字"
},
{
"conflict_id": 1,
"conflict_type": "contradiction",
"section": "行车娱乐禁止规则",
"resolution": "以文字为准",
"correction": "非 P 挡时,限制目标应用/功能启用",
"source": "文字"
},
{
"conflict_id": 2,
"conflict_type": "contradiction",
"section": "行车娱乐限制规则",
"resolution": "以图片为准",
"correction": "自动关闭应用",
"source": "图片"
},
{
"conflict_id": 3,
"conflict_type": "condition_mismatch",
"section": "行车娱乐限制规则",
"resolution": "以文字为准",
"correction": "车速≥15km/h",
"source": "文字"
}
]
}
@@ -0,0 +1,42 @@
---
name: 解决方案应用技能
description: 应用用户提供的冲突解决方案以创建合并了更正的更新文档表示。
---
# 解决方案应用技能
## 概述
此技能采用用户提供的冲突解决方案并将它们应用到解析的文档中,创建一个协调文本和视觉内容差异的校正版本。它生成包含应用更正的更新文档表示。
## 功能
该技能:
- 接受用户提供的冲突解决方案决策
- 支持多种解决方案类型:"以图像为准"、"以文字为准"、"两处都保留"或自定义文本
- 更新解析的文档结构以合并解决方案决策
- 创建文档表示的校正版本,包含应用的更改
- 维护所有应用更正的源可追溯性
- 向输出添加包含更正指令的resolved_conflicts数组
## 输入要求
- 解析文档JSON文件的路径(带有已识别的冲突)
- 包含用户决策的解决方案JSON文件的路径
- 可选输出目录规范
- 解决方案JSON应包含具有以下内容的对象:
- `conflict_id`:冲突数组中的冲突索引
- `resolution`:决策类型("以图片为准"、"以文字为准"、"两处都保留")或自定义文本
- `custom_text`:解决方案的可选自定义文本
## 输出
该技能生成一个结构化JSON文件,文件名为输入文档的基本名称后跟'_updated.json',包含:
- 包含应用更正的原始文档结构
- 详细说明应用更改的`resolved_conflicts`数组
- 关于每个冲突类型和应用更正的信息
- 用于可追溯性的源跟踪信息
## 集成点
此技能消耗冲突检测技能的输出(带冲突的文档)和用户提供的解决方案。其输出被IR生成技能使用以创建最终结构化表示。
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Apply user resolutions to ``_parsed.json`` using ``_conflicts.json``.
Usage::
python scripts/apply_resolutions.py <parsed.json> --resolutions <resolutions.json> [--output-dir DIR]
The *resolutions.json* file is created by the agent after user arbitration.
Each resolution maps a conflict_id to a decision.
Resolution format (``resolutions.json``)::
[
{
"conflict_id": 0, // 0-based index into conflicts array
"resolution": "以文字为准",
"custom_text": null
}
]
Outputs ``<basename>_updated.json`` — identical to *parsed.json* plus a
``resolved_conflicts`` top-level array with correction instructions for the IR generator.
"""
import argparse
import json
import logging
import os
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def apply_resolutions(
parsed_path: str,
resolutions_path: str,
output_dir: str | None = None,
) -> dict:
"""Load *parsed.json*, apply resolutions, write *updated.json*."""
with open(parsed_path, "r", encoding="utf-8") as f:
data = json.load(f)
with open(resolutions_path, "r", encoding="utf-8") as f:
resolutions = json.load(f)
base_dir = os.path.dirname(os.path.abspath(parsed_path))
# Try to find _conflicts.json alongside parsed.json
basename = os.path.splitext(os.path.basename(parsed_path))[0]
stem = basename[:-7] if basename.endswith("_parsed") else basename
candidate = os.path.join(base_dir, f"{stem}_conflicts.json")
conflicts = data.get("_conflicts", [])
if not conflicts and os.path.isfile(candidate):
with open(candidate, "r", encoding="utf-8") as f:
conflicts = json.load(f)
if output_dir is None:
output_dir = base_dir
os.makedirs(output_dir, exist_ok=True)
# Build resolved_conflicts with correction instructions for ir_generator
resolved = []
for res in resolutions:
cid = res.get("conflict_id")
if cid is None or cid < 0 or cid >= len(conflicts):
logger.warning("Invalid conflict_id: %s", cid)
continue
conflict = conflicts[cid]
choice = res.get("resolution", "")
custom = res.get("custom_text")
entry = {
"conflict_id": cid,
"conflict_type": conflict.get("conflict_type"),
"section": conflict.get("section", ""),
"resolution": choice,
}
# Build a correction instruction string
image_val = conflict.get("image_snippet", "")
text_val = conflict.get("text_snippet", "")
if choice == "以图片为准":
entry["correction"] = image_val
entry["source"] = "图片"
elif choice == "以文字为准":
entry["correction"] = text_val
entry["source"] = "文字"
elif choice == "两处都保留":
entry["correction"] = f"{text_val}(另外的观点:{image_val}"
entry["source"] = "两者兼容"
elif custom:
entry["correction"] = custom
entry["source"] = "自定义"
logger.info("Conflict %d: custom: %s", cid, custom[:60])
else:
entry["correction"] = text_val
entry["source"] = "文字(默认)"
logger.warning("Conflict %d: unknown resolution '%s', defaulting to text", cid, choice)
logger.info("Conflict %d: %s%s", cid, choice, entry["source"])
resolved.append(entry)
data["resolved_conflicts"] = resolved
logger.info("Applied %d resolutions", len(resolved))
# Write output
if basename.endswith("_parsed"):
out_name = f"{stem}_updated.json"
else:
out_name = f"{basename}_updated.json"
output_path = os.path.join(output_dir, out_name)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info("Saved: %s", output_path)
return data
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Apply user resolutions to parsed.json.",
)
parser.add_argument("input", metavar="parsed.json",
help="Path to _parsed.json")
parser.add_argument("--resolutions", "-r", required=True,
help="Path to resolutions JSON file")
parser.add_argument("--output-dir", default=None,
help="Output directory (default: same as input)")
args = parser.parse_args()
apply_resolutions(args.input, args.resolutions, args.output_dir)