Files
document_analyzer/skills/ir_generation_skill/scripts/ir_generator.py
T
pzhang_zywl 40567a4fb6
CI / test (push) Successful in 30s
Initial commit: document_analyzer with CI/CD pipeline
- 4 skill pipeline (doc_parser, conflict_detection, ir_generation, resolution_application)
- CI workflow on push/PR (.gitea/workflows/ci.yml)
- Auto-issue on CI failure (.gitea/workflows/auto-issue.yml)
- Pytest smoke tests (tests/test_sample.py)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 20:00:26 +08:00

360 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Generate JSON intermediate representation from ``_parsed.json`` or ``_updated.json``.
Sends the JSON document directly to the LLM for analysis. If the document exceeds
``MAX_ANALYSIS_TOKENS``, sections are batched greedily without splitting any
individual section. Conflict corrections from ``resolved_conflicts`` are included
so the output respects user arbitration decisions.
Usage::
python scripts/ir_generator.py output/<basename>_updated.json [output_dir] [--dry-run]
Output: ``<basename>_ir.json``
"""
import argparse
import json
import logging
import os
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from LLM import LLMClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
RATE_LIMIT_DELAY = 0.5
MAX_ANALYSIS_TOKENS = 6000 # max content size per LLM call
# ---------------------------------------------------------------------------
# Prompt
# ---------------------------------------------------------------------------
PROMPT = """你是一个需求文档分析助手。请分析以下需求文档的JSON内容,输出结构化JSON。
## 已知修正(来自冲突检测)
以下内容已确认修正,生成JSON时请**使用修正后的值**,不要同时输出两个版本。
{conflict_context}
## 待分析内容(JSON格式)
{content}
## JSON字段说明
- sections: 文档章节列表,每个章节含 source(章节标题)和 blocks(内容块数组)
- blocks: 类型含 para(段落,字段 text)和 table(表格,字段 rows,每行含 columns 数组)
- image_sources: 图片所在章节映射,key 为图片 rid
- image_analysis: 图片分析结果,每个含 rid、type(流程图/架构图/状态图等)、description
- resolved_conflicts: 已知修正列表,每个含 section、conflict_type、correction、source
## 功能点定义
只有满足以下**全部条件**的才视为功能点:
1. 描述了一个**系统或软件要实现的具体行为**(有触发条件、执行动作、状态变化或逻辑规则)
2. 该行为直接由**系统或框架**执行(不是人的操作流程、管理流程)
3. 对用户或系统有**可观察的效果**
**以下内容不是功能点,不要输出:**
- 术语/缩略词定义(
- 文档背景、范围说明(如 "本文档涵盖xxx"
- 变更日志、版本记录、编制人信息
- 文档结构描述(如 "产品简介用户场景说明"
- 纯文本的概述、没有具体行为的介绍
## 决策树/流程图分解规则(重要)
图片分析(image_analysis)中的流程图和决策树描述包含丰富的功能逻辑,**必须完全分解**:
1. **每个叶子路径 = 一个独立 function**:从根节点到每个最终结果的完整路径,都拆成一个 function
2. **每个判断分支 = 一个独立 function**:菱形判断节点的每个分支方向和对应的结果,单独作为一个 function
3. **不同约束条件 = 不同 function**:例如"通过接入SDK限制""通过系统限制"是不同约束机制,必须分别列出
4. **不要合并不同路径**:即使最终结果相同,只要到达路径不同,就是不同的 function
## 输出格式
只输出功能点,每个功能点格式如下:
{
"function": "功能名称",
"source": {
"section": "章节名",
"location": "原文位置(如:正文第1段、表格1第2行、图片rId13)"
},
"trigger": {
"type": "AND或者OR",
"conditions": [
"触发条件1",
"触发条件2"
]
},
"actions": {
"场景/角色": [
"动作1",
"动作2"
]
}
}
## 输出原则
1. **只输出功能点**,没有功能点就输出空数组 []
2. 每个功能点**必须**包含 source.section 和 source.location
3. location 必须是具体的原文位置标签(如 "正文第1段""表格1""图片rId13"
4. **一个 function 只对应一种行为逻辑(一条完整路径)**。决策树中的每个分支路径(从根到叶子)必须拆成独立 function,conditions 中明确写出该路径上的所有判断条件和分支方向。
5. **穷举所有分支**:流程图/决策树中的每一条分支路径都要输出对应的 function,不能遗漏任何子逻辑。
6. 没有 trigger 或 actions 的字段直接**省略**,不要写 null 或空列表/空对象
7. 所有功能点全部列出,**宁多勿漏**
8. **已知修正**中确认的信息,使用修正后的值
9. 输出一个JSON数组,不要用 ```json 代码块包裹,直接输出纯JSON
"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_llm_response(raw: str) -> list | dict | str | None:
"""Parse JSON from LLM response, handling markdown code fences."""
if raw is None:
return None
stripped = raw.strip()
if stripped.startswith("```"):
nl = stripped.find("\n")
stripped = stripped[nl + 1:] if nl != -1 else stripped[3:]
if stripped.endswith("```"):
stripped = stripped[:-3]
try:
return json.loads(stripped)
except json.JSONDecodeError:
logger.warning(" Failed to parse JSON, returning raw text")
return raw
def _build_conflict_context(
section_name: str | None,
resolved_conflicts: list[dict],
) -> str:
"""Build conflict correction context for a section, or all if section_name is None."""
if section_name is None:
relevant = resolved_conflicts
else:
relevant = [c for c in resolved_conflicts if c.get("section", "") == section_name]
if not relevant:
return "没有"
lines: list[str] = []
for c in relevant:
correction = c.get("correction", "")
conflict_type = c.get("conflict_type", "")
source = c.get("source", "")
lines.append(f"- 冲突类型:{conflict_type},依据:{source}")
lines.append(f" 修正后的值:{correction}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# LLM analysis
# ---------------------------------------------------------------------------
def _analyze_content(
content: str,
conflict_context: str,
llm: LLMClient,
*,
dry_run: bool = False,
) -> list[dict]:
"""Send content to the LLM and return IR entries."""
prompt = PROMPT.replace("{conflict_context}", conflict_context).replace("{content}", content)
if dry_run:
est = llm.estimate_tokens(prompt)
logger.info(" [DRY RUN] prompt ~%d tokens", est)
return []
try:
raw = llm.chat(
model=LLMClient.TEXT_MODEL,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
logger.info(" Response: %d chars", len(raw))
except RuntimeError as e:
logger.error(" Analysis failed: %s", e)
return []
parsed = _parse_llm_response(raw)
if isinstance(parsed, list):
return parsed
elif isinstance(parsed, dict):
return [parsed]
else:
logger.warning(" Unparseable response, raw length: %d", len(raw))
return []
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def generate_ir(
parsed_path: str,
output_dir: str = "output",
*,
dry_run: bool = False,
) -> dict:
"""Read parsed/updated JSON and generate JSON IR.
Produces ``<basename>_ir.json`` in *output_dir*.
"""
with open(parsed_path, "r", encoding="utf-8") as f:
data = json.load(f)
basename = os.path.splitext(os.path.basename(parsed_path))[0]
for suffix in ("_parsed", "_updated"):
if basename.endswith(suffix):
basename = basename[:-len(suffix)]
break
os.makedirs(output_dir, exist_ok=True)
llm = LLMClient()
ir_output: list[dict] = []
sections = data.get("sections", [])
image_sources = data.get("image_sources", {})
image_analysis = data.get("image_analysis", [])
resolved_conflicts = data.get("resolved_conflicts", [])
# Build full document JSON to measure size
full_doc = {
"sections": sections,
"image_sources": image_sources,
"image_analysis": image_analysis,
}
full_json = json.dumps(full_doc, ensure_ascii=False)
total_chars = len(full_json)
logger.info("Total document JSON chars: %d", total_chars)
if total_chars < MAX_ANALYSIS_TOKENS:
logger.info("Document fits in one request (< %d chars)", MAX_ANALYSIS_TOKENS)
conflict_ctx = _build_conflict_context(None, resolved_conflicts)
entries = _analyze_content(full_json, conflict_ctx, llm, dry_run=dry_run)
ir_output.extend(entries)
else:
logger.info("Document is large (>= %d chars), batching sections", MAX_ANALYSIS_TOKENS)
# Filter to non-empty sections, measure effective size per section
# (section JSON + image_sources + image_analysis for images in that section)
sec_sizes = []
for sec in sections:
if not sec.get("blocks"):
continue
sec_json = json.dumps(sec, ensure_ascii=False)
sec_chars = len(sec_json)
# Add image overhead for this section
sec_name = sec.get("source", "")
sec_rids = [rid for rid, src in image_sources.items()
if src.get("section", "") == sec_name]
if sec_rids:
overhead_doc = {
"image_sources": {rid: image_sources[rid] for rid in sec_rids},
"image_analysis": [img for img in image_analysis
if img.get("rid", "") in sec_rids],
}
sec_chars += len(json.dumps(overhead_doc, ensure_ascii=False))
sec_sizes.append((sec, sec_chars))
# Greedy batch: never split a section, keep adding until next exceeds limit
i = 0
while i < len(sec_sizes):
batch = []
batch_size = 0
while i < len(sec_sizes) and batch_size + sec_sizes[i][1] <= MAX_ANALYSIS_TOKENS:
batch.append(sec_sizes[i][0])
batch_size += sec_sizes[i][1]
i += 1
if not batch:
i += 1
continue
# Collect sections and their images for this batch
batch_names = [s.get("source", "") for s in batch]
batch_image_sources = {
rid: src for rid, src in image_sources.items()
if src.get("section", "") in batch_names
}
batch_images = [
img for img in image_analysis
if image_sources.get(img.get("rid", ""), {}).get("section", "") in batch_names
]
batch_doc = {
"sections": batch,
"image_sources": batch_image_sources,
"image_analysis": batch_images,
}
batch_json = json.dumps(batch_doc, ensure_ascii=False)
# Merge conflict contexts
ctx_parts = []
for sn in batch_names:
ctx = _build_conflict_context(sn, resolved_conflicts)
if ctx != "没有":
ctx_parts.append(ctx)
conflict_ctx = "\n".join(ctx_parts) if ctx_parts else "没有"
label = " + ".join(batch_names)
logger.info("Batch [%s]: %d sections, %d chars", label, len(batch), len(batch_json))
entries = _analyze_content(batch_json, conflict_ctx, llm, dry_run=dry_run)
ir_output.extend(entries)
time.sleep(RATE_LIMIT_DELAY)
# ---- save ----------------------------------------------------------------
ir_path = os.path.join(output_dir, f"{basename}_ir.json")
os.makedirs(os.path.dirname(ir_path) or ".", exist_ok=True)
with open(ir_path, "w", encoding="utf-8") as f:
json.dump(ir_output, f, ensure_ascii=False, indent=2)
logger.info("Saved: %s (%d entries)", ir_path, len(ir_output))
# ---- summary -------------------------------------------------------------
usg = llm.usage
logger.info("Tokens: %d prompt + %d completion = %d total",
usg["prompt_tokens"], usg["completion_tokens"], usg["total_tokens"])
logger.info("Output: %s", ir_path)
return {"ir": ir_output, "path": ir_path}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate JSON intermediate representation from parsed/updated JSON.",
)
parser.add_argument("input", metavar="parsed.json",
help="Path to _parsed.json or _updated.json")
parser.add_argument("output_dir", nargs="?", default="output", metavar="output_dir",
help="Directory for output files (default: output/)")
parser.add_argument("--dry-run", action="store_true",
help="Print token estimates without calling the API.")
args = parser.parse_args()
generate_ir(args.input, args.output_dir, dry_run=args.dry_run)