|
|
|
@@ -358,6 +358,7 @@ def _quick_validate(
|
|
|
|
|
"missing_concepts": [],
|
|
|
|
|
"format_issues": [],
|
|
|
|
|
"parent_issues": [],
|
|
|
|
|
"coverage_warnings": [], # section/table coverage below threshold (non-blocking)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
units = semantic_index.get("function_units", [])
|
|
|
|
@@ -485,11 +486,51 @@ def _quick_validate(
|
|
|
|
|
gaps["missing_concepts"].append("缺少 scope 概念: 海外")
|
|
|
|
|
|
|
|
|
|
# --- Section and table coverage ---
|
|
|
|
|
# Count functional sections (those with numbered titles that contain text/tables)
|
|
|
|
|
# Filter out non-functional sections (background, glossary, changelog, etc.)
|
|
|
|
|
non_functional_patterns = [
|
|
|
|
|
re.compile(p) for p in [
|
|
|
|
|
r"编制.*变更.*日志", r"变更日志", r"文档背景", r"文档范围",
|
|
|
|
|
r"术语解释", r"参考", r"附录", r"版本", r"变更记录",
|
|
|
|
|
r"目录", r"前言", r"概述", r"简介",
|
|
|
|
|
r"PRD", r"前置条件", r"依赖", r"行业规范", r"输入文件",
|
|
|
|
|
r"后方输入", r"政策法规", r"相关文档", r"概要说明",
|
|
|
|
|
]
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def _is_functional_section(sec_name: str) -> bool:
|
|
|
|
|
if not sec_name.strip():
|
|
|
|
|
return False
|
|
|
|
|
# Check non-functional patterns first (even if section is numbered)
|
|
|
|
|
for pat in non_functional_patterns:
|
|
|
|
|
if pat.search(sec_name):
|
|
|
|
|
return False
|
|
|
|
|
# Numbered sections (e.g., "3.1.1") are functional
|
|
|
|
|
if re.match(r"^([\d.]+)", sec_name):
|
|
|
|
|
return True
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def _has_section_content(sec: dict) -> bool:
|
|
|
|
|
"""Check if a section has meaningful content (text >= 10 chars, table, or image).
|
|
|
|
|
|
|
|
|
|
A section is considered "empty" if all its text blocks have fewer than
|
|
|
|
|
10 characters and it contains no tables or images. These typically come
|
|
|
|
|
from image-only Word sections that doc_parser cannot extract text from.
|
|
|
|
|
"""
|
|
|
|
|
for block in sec.get("blocks", []):
|
|
|
|
|
blk_type = block.get("type", "")
|
|
|
|
|
if blk_type == "table":
|
|
|
|
|
return True
|
|
|
|
|
if blk_type in ("image", "figure", "picture"):
|
|
|
|
|
return True
|
|
|
|
|
text = block.get("text", "")
|
|
|
|
|
if isinstance(text, str) and len(text.strip()) >= 10:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
func_sections = [
|
|
|
|
|
s for s in doc.get("sections", [])
|
|
|
|
|
if s.get("source", "").strip()
|
|
|
|
|
and any(b.get("type") in ("para", "table") for b in s.get("blocks", []))
|
|
|
|
|
if _is_functional_section(s.get("source", ""))
|
|
|
|
|
and _has_section_content(s)
|
|
|
|
|
]
|
|
|
|
|
covered_sections: set[str] = set()
|
|
|
|
|
for fu in units:
|
|
|
|
@@ -498,12 +539,17 @@ def _quick_validate(
|
|
|
|
|
if sec:
|
|
|
|
|
covered_sections.add(sec)
|
|
|
|
|
|
|
|
|
|
# Use lower threshold for section/table coverage (70% vs 95% for logic trees)
|
|
|
|
|
SECTION_COVERAGE_TARGET = 0.70
|
|
|
|
|
|
|
|
|
|
section_cov = len(covered_sections) / max(len(func_sections), 1)
|
|
|
|
|
if section_cov < config.COVERAGE_TARGET:
|
|
|
|
|
print(f" 章节覆盖率: {section_cov:.0%} ({len(covered_sections)}/{len(func_sections)} "
|
|
|
|
|
f"functional sections)", flush=True)
|
|
|
|
|
if section_cov < SECTION_COVERAGE_TARGET:
|
|
|
|
|
uncovered = [s["source"] for s in func_sections
|
|
|
|
|
if s["source"] not in covered_sections]
|
|
|
|
|
gaps["missing_paths"].append(
|
|
|
|
|
f"章节覆盖率 {section_cov:.0%} < {config.COVERAGE_TARGET:.0%}, "
|
|
|
|
|
gaps["coverage_warnings"].append(
|
|
|
|
|
f"章节覆盖率 {section_cov:.0%} < {SECTION_COVERAGE_TARGET:.0%}, "
|
|
|
|
|
f"未覆盖: {uncovered[:5]}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@@ -520,34 +566,47 @@ def _quick_validate(
|
|
|
|
|
if src.get("type") == "table" and src.get("row")
|
|
|
|
|
)
|
|
|
|
|
row_cov = covered_rows / max(total_rows, 1)
|
|
|
|
|
if row_cov < config.COVERAGE_TARGET:
|
|
|
|
|
gaps["missing_paths"].append(
|
|
|
|
|
f"表格行覆盖率 {row_cov:.0%} < {config.COVERAGE_TARGET:.0%}, "
|
|
|
|
|
print(f" 表格行覆盖率: {row_cov:.0%} ({covered_rows}/{total_rows} rows)", flush=True)
|
|
|
|
|
if row_cov < SECTION_COVERAGE_TARGET:
|
|
|
|
|
gaps["coverage_warnings"].append(
|
|
|
|
|
f"表格行覆盖率 {row_cov:.0%} < {SECTION_COVERAGE_TARGET:.0%}, "
|
|
|
|
|
f"({covered_rows}/{total_rows} rows)"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Coverage warnings are non-blocking (depend on LLM prompt quality)
|
|
|
|
|
if gaps["coverage_warnings"]:
|
|
|
|
|
print(f" [WARN] 覆盖率低于 {SECTION_COVERAGE_TARGET:.0%} 阈值,但 pipeline 继续运行。"
|
|
|
|
|
f"请通过 Prompt 优化或反馈重试提升。", flush=True)
|
|
|
|
|
|
|
|
|
|
# Only format_issues and logic_tree missing_paths block the pipeline.
|
|
|
|
|
# parent_issues and coverage_warnings are non-blocking (LLM quality).
|
|
|
|
|
passed = (
|
|
|
|
|
not gaps["missing_paths"]
|
|
|
|
|
and not gaps["format_issues"]
|
|
|
|
|
and not gaps["parent_issues"]
|
|
|
|
|
and section_cov >= config.COVERAGE_TARGET
|
|
|
|
|
)
|
|
|
|
|
return passed, gaps
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_coverage_feedback(gaps: dict) -> str:
|
|
|
|
|
"""Generate feedback text for re-prompting when coverage is below threshold."""
|
|
|
|
|
lines = []
|
|
|
|
|
for item in gaps.get("missing_paths", []):
|
|
|
|
|
lines.append(f"- {item}")
|
|
|
|
|
if lines:
|
|
|
|
|
return (
|
|
|
|
|
"\n## 覆盖反馈(上一次运行未满足覆盖要求,请重新生成并修复以下缺口)\n\n"
|
|
|
|
|
+ "\n".join(lines)
|
|
|
|
|
+ "\n\n请重新审视文档,为缺失的章节和表格行创建对应的 function_unit。"
|
|
|
|
|
)
|
|
|
|
|
"""Generate targeted feedback text for re-prompting when coverage is below threshold."""
|
|
|
|
|
parts = []
|
|
|
|
|
for item in gaps.get("coverage_warnings", []):
|
|
|
|
|
parts.append(f"- {item}")
|
|
|
|
|
if not parts:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
return (
|
|
|
|
|
"\n## 关键覆盖反馈(上一轮 LLM 输出了以下缺口,请重新处理)\n\n"
|
|
|
|
|
+ "\n".join(parts)
|
|
|
|
|
+ "\n\n"
|
|
|
|
|
"### 修复动作(必须执行)\n\n"
|
|
|
|
|
"1. **重新扫描上述每个缺失章节**,从文字和表格中提取所有可被测试的功能行为\n"
|
|
|
|
|
"2. **为每个缺失的表格行创建独立的 function_unit**,不得合并不同行的规则\n"
|
|
|
|
|
"3. **每个 function_unit 必须引用具体的 section 号和 row 号**作为 source\n"
|
|
|
|
|
"4. **非功能章节可以跳过**(如背景、术语、变更日志),但行为规则章节必须覆盖\n"
|
|
|
|
|
"5. 输出中必须包含针对上述缺口的新 function_unit\n"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _collect_logic_tree_nodes(doc: dict) -> dict[str, dict[str, str]]:
|
|
|
|
|
"""Return {image_id: {node_id: node_type}} for all logic trees."""
|
|
|
|
@@ -767,13 +826,22 @@ def run_ensemble_semantic_index(doc: dict) -> dict:
|
|
|
|
|
# Feedback retry: re-run with coverage feedback (one retry)
|
|
|
|
|
feedback = _build_coverage_feedback(gaps)
|
|
|
|
|
if feedback:
|
|
|
|
|
print(f"\n 覆盖反馈重试...")
|
|
|
|
|
print(f"\n 覆盖反馈重试 (feedback长度={len(feedback)}字符)...", flush=True)
|
|
|
|
|
try:
|
|
|
|
|
retry_prompt = build_prompt(doc, feedback, all_paths)
|
|
|
|
|
retry_result = call_llm(retry_prompt, max_retries=1, temperature=0.0)
|
|
|
|
|
print(f" 重试 prompt 长度: {len(retry_prompt)} 字符", flush=True)
|
|
|
|
|
retry_result = call_llm(retry_prompt, max_retries=1, temperature=0.3)
|
|
|
|
|
n_retry_units = len(retry_result.get("function_units", []))
|
|
|
|
|
print(f" 重试返回: {n_retry_units} 功能单元")
|
|
|
|
|
n_retry_concepts = len(retry_result.get("concepts", []))
|
|
|
|
|
print(f" 重试返回: {n_retry_concepts} 概念, {n_retry_units} 功能单元", flush=True)
|
|
|
|
|
if n_retry_units > 0:
|
|
|
|
|
# Check which new sections were covered
|
|
|
|
|
retry_sections = set()
|
|
|
|
|
for fu in retry_result.get("function_units", []):
|
|
|
|
|
for src in fu.get("sources", []):
|
|
|
|
|
if src.get("section"):
|
|
|
|
|
retry_sections.add(src["section"])
|
|
|
|
|
print(f" 重试新增 sections: {sorted(retry_sections)}", flush=True)
|
|
|
|
|
# Merge retry into results and re-validate
|
|
|
|
|
semantic_indices.append(retry_result)
|
|
|
|
|
merged = ensemble_merge(semantic_indices)
|
|
|
|
@@ -783,9 +851,11 @@ def run_ensemble_semantic_index(doc: dict) -> dict:
|
|
|
|
|
merged["validation_gaps"] = {
|
|
|
|
|
k: v for k, v in gaps.items() if v
|
|
|
|
|
}
|
|
|
|
|
print(f" 重试后验证: {'PASS' if passed else 'GAPS FOUND'}")
|
|
|
|
|
print(f" 重试后验证: {'PASS' if passed else 'GAPS FOUND'}", flush=True)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f" 覆盖反馈重试失败: {e}")
|
|
|
|
|
print(f" 覆盖反馈重试失败: {e}", flush=True)
|
|
|
|
|
import traceback
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
|
return merged
|
|
|
|
|
|
|
|
|
@@ -826,14 +896,11 @@ def main():
|
|
|
|
|
n_versions = merged_index.get("ensemble_versions", len(config.ENSEMBLE_TEMPERATURES))
|
|
|
|
|
|
|
|
|
|
if not merged_index.get("validation_passed", True):
|
|
|
|
|
print(f"\n错误: 语义索引验证未通过!")
|
|
|
|
|
print(f"\n注意: 语义索引验证发现以下问题 (非阻塞,pipeline 继续运行):")
|
|
|
|
|
gaps = merged_index.get("validation_gaps", {})
|
|
|
|
|
for category, issues in gaps.items():
|
|
|
|
|
for issue in issues:
|
|
|
|
|
print(f" [{category}] {issue}")
|
|
|
|
|
print(f"\n流水线中止: {n_units} 个功能单元不满足最低覆盖率要求。")
|
|
|
|
|
print("请检查 LLM 配置、输入文档格式和 Prompt 兼容性。")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
print(f"\n完成! {n_versions} 版本集成, {n_concepts} 个概念, {n_units} 个功能单元.")
|
|
|
|
|
print(f"输出: {config.SEMANTIC_INDEX_JSON}")
|
|
|
|
|