fix: 修复章节覆盖率误报 + pipeline 验证非阻塞 - Closes #21 #27

Merged
pzhang_zywl merged 1 commits from dev/issue-22-fix-trigger-null into main 2026-05-31 22:46:32 +08:00
@@ -358,6 +358,7 @@ def _quick_validate(
"missing_concepts": [], "missing_concepts": [],
"format_issues": [], "format_issues": [],
"parent_issues": [], "parent_issues": [],
"coverage_warnings": [], # section/table coverage below threshold (non-blocking)
} }
units = semantic_index.get("function_units", []) units = semantic_index.get("function_units", [])
@@ -485,10 +486,32 @@ def _quick_validate(
gaps["missing_concepts"].append("缺少 scope 概念: 海外") gaps["missing_concepts"].append("缺少 scope 概念: 海外")
# --- Section and table coverage --- # --- Section and table coverage ---
# Count functional sections (those with numbered titles that contain text/tables) # Filter out non-functional sections (background, glossary, changelog, etc.)
non_functional_patterns = [
re.compile(p) for p in [
r"编制.*变更.*日志", r"变更日志", r"文档背景", r"文档范围",
r"术语解释", r"参考", r"附录", r"版本", r"变更记录",
r"目录", r"前言", r"概述", r"简介",
r"PRD", r"前置条件", r"依赖", r"行业规范", r"输入文件",
r"后方输入", r"政策法规", r"相关文档", r"概要说明",
]
]
def _is_functional_section(sec_name: str) -> bool:
if not sec_name.strip():
return False
# Check non-functional patterns first (even if section is numbered)
for pat in non_functional_patterns:
if pat.search(sec_name):
return False
# Numbered sections (e.g., "3.1.1") are functional
if re.match(r"^([\d.]+)", sec_name):
return True
return True
func_sections = [ func_sections = [
s for s in doc.get("sections", []) s for s in doc.get("sections", [])
if s.get("source", "").strip() if _is_functional_section(s.get("source", ""))
and any(b.get("type") in ("para", "table") for b in s.get("blocks", [])) and any(b.get("type") in ("para", "table") for b in s.get("blocks", []))
] ]
covered_sections: set[str] = set() covered_sections: set[str] = set()
@@ -498,12 +521,17 @@ def _quick_validate(
if sec: if sec:
covered_sections.add(sec) covered_sections.add(sec)
# Use lower threshold for section/table coverage (70% vs 95% for logic trees)
SECTION_COVERAGE_TARGET = 0.70
section_cov = len(covered_sections) / max(len(func_sections), 1) section_cov = len(covered_sections) / max(len(func_sections), 1)
if section_cov < config.COVERAGE_TARGET: print(f" 章节覆盖率: {section_cov:.0%} ({len(covered_sections)}/{len(func_sections)} "
f"functional sections)", flush=True)
if section_cov < SECTION_COVERAGE_TARGET:
uncovered = [s["source"] for s in func_sections uncovered = [s["source"] for s in func_sections
if s["source"] not in covered_sections] if s["source"] not in covered_sections]
gaps["missing_paths"].append( gaps["coverage_warnings"].append(
f"章节覆盖率 {section_cov:.0%} < {config.COVERAGE_TARGET:.0%}, " f"章节覆盖率 {section_cov:.0%} < {SECTION_COVERAGE_TARGET:.0%}, "
f"未覆盖: {uncovered[:5]}" f"未覆盖: {uncovered[:5]}"
) )
@@ -520,17 +548,23 @@ def _quick_validate(
if src.get("type") == "table" and src.get("row") if src.get("type") == "table" and src.get("row")
) )
row_cov = covered_rows / max(total_rows, 1) row_cov = covered_rows / max(total_rows, 1)
if row_cov < config.COVERAGE_TARGET: print(f" 表格行覆盖率: {row_cov:.0%} ({covered_rows}/{total_rows} rows)", flush=True)
gaps["missing_paths"].append( if row_cov < SECTION_COVERAGE_TARGET:
f"表格行覆盖率 {row_cov:.0%} < {config.COVERAGE_TARGET:.0%}, " gaps["coverage_warnings"].append(
f"表格行覆盖率 {row_cov:.0%} < {SECTION_COVERAGE_TARGET:.0%}, "
f"({covered_rows}/{total_rows} rows)" f"({covered_rows}/{total_rows} rows)"
) )
# Coverage warnings are non-blocking (depend on LLM prompt quality)
if gaps["coverage_warnings"]:
print(f" [WARN] 覆盖率低于 {SECTION_COVERAGE_TARGET:.0%} 阈值,但 pipeline 继续运行。"
f"请通过 Prompt 优化或反馈重试提升。", flush=True)
# Only format_issues and logic_tree missing_paths block the pipeline.
# parent_issues and coverage_warnings are non-blocking (LLM quality).
passed = ( passed = (
not gaps["missing_paths"] not gaps["missing_paths"]
and not gaps["format_issues"] and not gaps["format_issues"]
and not gaps["parent_issues"]
and section_cov >= config.COVERAGE_TARGET
) )
return passed, gaps return passed, gaps
@@ -538,7 +572,7 @@ def _quick_validate(
def _build_coverage_feedback(gaps: dict) -> str: def _build_coverage_feedback(gaps: dict) -> str:
"""Generate targeted feedback text for re-prompting when coverage is below threshold.""" """Generate targeted feedback text for re-prompting when coverage is below threshold."""
parts = [] parts = []
for item in gaps.get("missing_paths", []): for item in gaps.get("coverage_warnings", []):
parts.append(f"- {item}") parts.append(f"- {item}")
if not parts: if not parts:
return "" return ""
@@ -844,14 +878,11 @@ def main():
n_versions = merged_index.get("ensemble_versions", len(config.ENSEMBLE_TEMPERATURES)) n_versions = merged_index.get("ensemble_versions", len(config.ENSEMBLE_TEMPERATURES))
if not merged_index.get("validation_passed", True): if not merged_index.get("validation_passed", True):
print(f"\n错误: 语义索引验证未通过!") print(f"\n注意: 语义索引验证发现以下问题 (非阻塞,pipeline 继续运行):")
gaps = merged_index.get("validation_gaps", {}) gaps = merged_index.get("validation_gaps", {})
for category, issues in gaps.items(): for category, issues in gaps.items():
for issue in issues: for issue in issues:
print(f" [{category}] {issue}") print(f" [{category}] {issue}")
print(f"\n流水线中止: {n_units} 个功能单元不满足最低覆盖率要求。")
print("请检查 LLM 配置、输入文档格式和 Prompt 兼容性。")
sys.exit(1)
print(f"\n完成! {n_versions} 版本集成, {n_concepts} 个概念, {n_units} 个功能单元.") print(f"\n完成! {n_versions} 版本集成, {n_concepts} 个概念, {n_units} 个功能单元.")
print(f"输出: {config.SEMANTIC_INDEX_JSON}") print(f"输出: {config.SEMANTIC_INDEX_JSON}")