Files
document_analyzer/tests/acceptance/test_main_health.py
T
pzhang_zywl 119c08faca
CI / test (pull_request) Successful in 9s
test: _extract_content_units 仅统计功能章节表格行 - Closes #33
非功能章节(变更日志、术语解释等)的表格行不可能被
function_units 覆盖,计入分母会导致覆盖率虚低。

修复: table_rows 统计仅在 _is_functional_section
且 _has_section_content 的章节中进行。

Table 覆盖率: 54.2% → 72.2% (24行→18行分母)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 14:06:16 +08:00

601 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""QE Acceptance Test — Three-layer main branch health check.
Layer A (Schema): structural correctness of IR
Layer B (Coverage): structural source-traceability coverage + stability
Layer C (QE Audit): LLM as QE expert — functional coverage assessment
Final verdict: all three layers must pass for main to be releasable.
"""
from __future__ import annotations
import json
import math
import re
import statistics
import tempfile
import time
from pathlib import Path
from typing import Any
import pytest
from .ir_schema import validate_ir, schema_checklist
from .report import generate_report, schema_verdict, coverage_verdict, audit_verdict
# ═══════════════════════════════════════════════════════════════════════════════
# Layer A: SCHEMA — deterministic structural validation
# ═══════════════════════════════════════════════════════════════════════════════
def test_layer_a_schema(ir_data: dict, request):
"""Validate IR structure: required fields, types, naming conventions, no nulls."""
report = validate_ir(ir_data)
checks = schema_checklist(ir_data)
# Build Layer A result
a_errors = report["errors"]
a_stats = report["stats"]
a_result = schema_verdict(a_errors, a_stats)
a_result["checks"] = checks
# Store for downstream layers & report
_stash(request, "layer_a", a_result)
# Assert
assert report["valid"], (
f"Schema validation FAILED ({len(a_errors)} errors)\n"
+ "\n".join(f" - {e}" for e in a_errors[:20])
)
# ═══════════════════════════════════════════════════════════════════════════════
# Layer B: STRUCTURAL COVERAGE + STABILITY
# ═══════════════════════════════════════════════════════════════════════════════
# Section titles that are NOT functional requirements
NON_FUNCTIONAL_PATTERNS = [
re.compile(p) for p in [
r"编制.*变更.*日志",
r"变更日志",
r"文档背景",
r"文档范围",
r"术语解释",
r"参考(文献|文档|资料)?",
r"附录",
r"版本",
r"变更记录",
r"目录",
r"前言",
r"概述.*背景",
r"产品简介",
r"场景.*(说明|概述)",
r".*概要说明$",
r"相关文档",
r"行业规范",
r"政策法规",
r"非功能说明",
r"背景介绍",
r"PRD", # document title like "XX Auto XXX PRD V1.0"
r"产品架构", # architecture overview
r"系统架构",
]
]
def _is_functional_section(section_name: str) -> bool:
"""Heuristic: exclude background, glossary, changelog, scope sections.
Check non-functional patterns first, then treat numbered sections (like
'3.1.1 系统限制') as likely functional.
"""
# Explicitly non-functional patterns (checked first)
for pat in NON_FUNCTIONAL_PATTERNS:
if pat.search(section_name):
return False
# Documents with only a title (no section number) — check for functional keywords
sec_num = _section_number(section_name)
if not sec_num:
return False
if "." not in sec_num and not sec_num[0].isdigit():
func_keywords = ["策略", "规则", "功能", "限制", "流程", "配置", "场景",
"约束", "条件", "方案", "逻辑", "处理", "机制", "禁止"]
if not any(kw in section_name for kw in func_keywords):
return False
return True
def _has_section_content(sec: dict) -> bool:
"""Check if a section has meaningful content (text, table, or image).
A section is considered "empty" (no real content) if all its text blocks
have fewer than 10 characters and it contains no tables or images.
"""
for block in sec.get("blocks", []):
blk_type = block.get("type", "")
if blk_type == "table":
return True
if blk_type in ("image", "figure", "picture"):
return True
text = block.get("text", "")
if isinstance(text, str) and len(text.strip()) >= 10:
return True
return False
def _extract_content_units(parsed_data: dict) -> dict:
"""Extract countable content units from parsed JSON.
Returns:
{"sections": [{"name": ..., "number": ...}, ...],
"table_rows": int, "diagram_images": [rid, ...]}
"""
sections = parsed_data.get("sections", [])
functional_sections: list[dict] = []
total_table_rows = 0
for sec in sections:
name = sec.get("source", "")
is_func = _is_functional_section(name) and _has_section_content(sec)
if is_func:
functional_sections.append({
"name": name,
"number": _section_number(name),
})
# Only count table rows from functional sections
# (non-functional sections like changelog, glossary, references
# cannot be covered by function_units — counting them inflates
# the denominator and yields misleadingly low coverage.)
if is_func:
for block in sec.get("blocks", []):
if block.get("type") == "table":
rows = block.get("rows", [])
total_table_rows += len(rows)
# Diagram-type images from image_analysis
diagram_rids: list[str] = []
for img in parsed_data.get("image_analysis", []):
img_type = img.get("type", "")
if img_type in ("flowchart", "logic_tree", "architecture",
"state", "sequence", "activity"):
diagram_rids.append(img.get("rid", ""))
return {
"functional_sections": functional_sections,
"table_rows": total_table_rows,
"diagram_images": diagram_rids,
}
def _section_number(section_name: str) -> str:
"""Extract leading section number, e.g. '3.1.1 系统限制''3.1.1'."""
import re
m = re.match(r"^([\d.]+)", section_name)
return m.group(1) if m else section_name
def _section_matches(sec_ref: str, func_sections: list[dict]) -> str | None:
"""Find a functional section matching *sec_ref*. Returns the section name or None.
Matching: exact match → starts-with match → number match → substring match.
"""
# exact
for s in func_sections:
if s["name"] == sec_ref:
return s["name"]
# starts with section number
for s in func_sections:
if s["name"].startswith(sec_ref) or sec_ref.startswith(s["name"]):
return s["name"]
# number match
sec_num = _section_number(sec_ref)
if sec_num:
for s in func_sections:
if s["number"] == sec_num:
return s["name"]
# substring
for s in func_sections:
if sec_ref in s["name"] or s["name"] in sec_ref:
return s["name"]
return None
def _measure_coverage(ir_data: dict, parsed_data: dict) -> dict:
"""Compute structural coverage of IR over parsed document.
Returns:
{
"section_coverage": {total, covered, rate, uncovered},
"table_coverage": {total_rows, covered_rows, rate},
"diagram_coverage": {total, covered, rate},
"overall_rate": float,
}
"""
units = _extract_content_units(parsed_data)
rules = ir_data.get("rules", [])
# ── section coverage ──
func_sections = units["functional_sections"]
covered_sections: set[str] = set()
for rule in rules:
for src in rule.get("sources", []):
sec_ref = src.get("section", "")
if sec_ref:
matched = _section_matches(sec_ref, func_sections)
if matched:
covered_sections.add(matched)
section_coverage = {
"total": len(func_sections),
"covered": len(covered_sections),
"rate": round(len(covered_sections) / max(len(func_sections), 1), 3),
"uncovered": [s["name"] for s in func_sections
if s["name"] not in covered_sections],
}
# ── table row coverage ──
covered_rows: set[tuple] = set()
for rule in rules:
for src in rule.get("sources", []):
if src.get("type") == "table":
sec = src.get("section", "")
row = src.get("row")
if sec and row is not None:
covered_rows.add((sec, row))
total_rows = units["table_rows"]
table_coverage = {
"total_rows": total_rows,
"covered_rows": len(covered_rows),
"rate": round(len(covered_rows) / max(total_rows, 1), 3),
}
# ── diagram coverage ──
diagram_rids = units["diagram_images"]
covered_rids: set[str] = set()
for rule in rules:
for src in rule.get("sources", []):
if src.get("type") == "logic_tree":
img_id = src.get("image_id", "")
if img_id and img_id in diagram_rids:
covered_rids.add(img_id)
diagram_coverage = {
"total": len(diagram_rids),
"covered": len(covered_rids),
"rate": round(len(covered_rids) / max(len(diagram_rids), 1), 3),
"uncovered": [r for r in diagram_rids if r not in covered_rids],
}
# ── overall ──
rates = [
section_coverage["rate"],
table_coverage["rate"],
diagram_coverage["rate"],
]
overall = round(sum(rates) / len(rates), 3) if rates else 0.0
return {
"section_coverage": section_coverage,
"table_coverage": table_coverage,
"diagram_coverage": diagram_coverage,
"overall_rate": overall,
}
def test_layer_b_coverage(
ir_data: dict,
parsed_data: dict | None,
ir_path: str,
acceptance_runs: int,
run_ir_pipeline,
request,
):
"""Measure structural coverage and (optionally) coverage stability."""
if parsed_data is None:
pytest.skip("No parsed JSON available for coverage analysis")
# ── B1: single-run coverage ──
cov = _measure_coverage(ir_data, parsed_data)
# ── B2: stability (multi-run) ──
stability_values: list[float] = [cov["overall_rate"]]
stability_std = 0.0
if acceptance_runs > 1 and run_ir_pipeline is not None:
parsed_path = request.config.getoption("--parsed-path")
if parsed_path and os.path.exists(parsed_path):
for _ in range(acceptance_runs - 1):
try:
ir_list, _ = run_ir_pipeline(parsed_path)
run_ir = _wrap_list_ir(ir_list)
run_cov = _measure_coverage(run_ir, parsed_data)
stability_values.append(run_cov["overall_rate"])
time.sleep(0.5)
except Exception as e:
pytest.fail(f"Stability run failed: {e}")
elif acceptance_runs > 1 and run_ir_pipeline is None:
print(" [Layer B] Stability testing skipped: pipeline runner not available")
if len(stability_values) > 1:
stability_std = statistics.stdev(stability_values)
# Build Layer B result
b_result = coverage_verdict(
coverage_rate=cov["overall_rate"],
stability_std=stability_std,
stability_values=stability_values,
section_coverage=cov["section_coverage"],
table_coverage=cov["table_coverage"],
diagram_coverage=cov["diagram_coverage"],
)
_stash(request, "layer_b", b_result)
# Assert — both B1 and B2 must pass
assert b_result["coverage_pass"], (
f"Coverage {cov['overall_rate']:.1%} < threshold 70%\n"
f" Sections: {cov['section_coverage']['covered']}/{cov['section_coverage']['total']} "
f"({cov['section_coverage']['rate']:.1%})\n"
f" Uncovered: {cov['section_coverage']['uncovered']}\n"
f" Table rows: {cov['table_coverage']['covered_rows']}/{cov['table_coverage']['total_rows']} "
f"({cov['table_coverage']['rate']:.1%})\n"
f" Diagrams: {cov['diagram_coverage']['covered']}/{cov['diagram_coverage']['total']} "
f"({cov['diagram_coverage']['rate']:.1%})\n"
f" Uncovered diagrams: {cov['diagram_coverage']['uncovered']}"
)
if len(stability_values) > 1:
assert b_result["stability"]["pass"], (
f"Coverage stability std={stability_std:.4f} > threshold 0.05\n"
f" Values across {len(stability_values)} runs: {stability_values}"
)
def _wrap_list_ir(ir_list: list) -> dict:
"""Wrap a list-format IR (from ir_generator.py) into a dict for schema compat."""
# Convert simple format to rich format for coverage measurement
rules = []
for i, entry in enumerate(ir_list):
if not isinstance(entry, dict):
continue
rule = {
"rule_id": f"GEN-001-RULE-{i:03d}",
"description": entry.get("function", ""),
"path": [],
"priority": "P2",
"sources": [],
"precondition": {},
"trigger": entry.get("trigger", {"operator": "AND", "conditions": []}),
"actions": [],
}
# Convert source
src = entry.get("source", {})
if src.get("section"):
rule["sources"].append({
"type": "text",
"section": src["section"],
"paragraph": 1,
"text_snippet": src.get("location", ""),
"priority": "primary_source",
})
rules.append(rule)
return {
"feature": "generated",
"feature_id": "GEN-001",
"rules": rules,
}
# ═══════════════════════════════════════════════════════════════════════════════
# Layer C: LLM QE EXPERT AUDIT
# ═══════════════════════════════════════════════════════════════════════════════
QE_AUDITOR_PROMPT = """你是一个资深 QE 专家,负责审查需求文档的 IR(中间表示层)是否充分覆盖了源文档的所有可测试功能点。
你不是 IR 的生成者,你是独立的质量审计员。你的职责是判断 IR 的功能覆盖率是否充分。
## 审计输入
### Layer B 结构化覆盖率数据(参考)
{coverage_summary}
### 源文档内容(Parsed JSON
{parsed_content}
### 生成的 IR(待审计)
{ir_content}
## 审计要求
对源文档中的每个章节逐一评估其功能需求是否被 IR 充分覆盖。
**判断标准**
- **adequate**(充分覆盖):该章节的所有功能需求在 IR 中都有对应的 rule,包括触发条件、执行动作
- **inadequate**(覆盖不足):该章节存在功能需求未在 IR 中体现,或描述不完整(缺少触发条件或动作)
- **not_applicable**(不适用):该章节为背景介绍、术语定义、变更日志等,不包含功能需求
**注意**
- 如果某个章节涉及多个决策路径(如流程图),检查 IR 是否覆盖了每条路径
- 表格中的每个功能行都应被至少一个 IR rule 覆盖
- 图片分析中的流程图/决策树节点应被 IR 引用
## 输出格式
请严格输出以下 JSON 格式(不要包含代码块标记):
{{
"total_functional_sections": <number>,
"adequate": <number>,
"inadequate": <number>,
"not_applicable": <number>,
"inadequate_ratio": <float>,
"verdict": "ACCEPT 或 REJECT",
"rationale": "<一句话说明接受或拒绝的理由>",
"section_assessments": [
{{
"section": "<章节名>",
"assessment": "adequate | inadequate | not_applicable",
"reason": "<评估理由>",
"missing": ["<缺失项1>", "<缺失项2>"] // 仅 inadequate 时需要
}}
]
}}
verdict 判定规则:
- inadequate_ratio ≤ 0.30 → "ACCEPT"(风险可控)
- inadequate_ratio > 0.30 → "REJECT"(功能点认知差异大,需要补充 IR
"""
def test_layer_c_qe_audit(
ir_data: dict, parsed_data: dict | None, llm_client, request
):
"""LLM QE expert audit of functional coverage."""
if parsed_data is None:
pytest.skip("No parsed JSON available — cannot run QE audit")
# ── get Layer B summary for context ──
layer_b = _unstash(request, "layer_b") or {}
cov_summary = json.dumps(
{
"coverage_rate": layer_b.get("coverage_rate", "N/A"),
"section_coverage": layer_b.get("section_coverage", {}),
"diagram_coverage": layer_b.get("diagram_coverage", {}),
},
ensure_ascii=False,
indent=2,
)
# ── prepare content (trim to avoid token overflow) ──
parsed_str = json.dumps(parsed_data, ensure_ascii=False)
ir_str = json.dumps(ir_data, ensure_ascii=False)
max_parsed = 12000
max_ir = 8000
if len(parsed_str) > max_parsed:
parsed_str = parsed_str[:max_parsed] + "\n...[truncated]"
if len(ir_str) > max_ir:
ir_str = ir_str[:max_ir] + "\n...[truncated]"
prompt = QE_AUDITOR_PROMPT.format(
coverage_summary=cov_summary,
parsed_content=parsed_str,
ir_content=ir_str,
)
# ── call LLM ──
try:
raw = llm_client.chat(
model=llm_client.TEXT_MODEL,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
except Exception as e:
pytest.fail(f"QE audit LLM call failed: {e}")
# ── parse response ──
audit_data = _parse_json_response(raw)
if audit_data is None:
pytest.fail(f"QE audit returned unparseable response:\n{raw[:500]}")
# Build Layer C result
c_result = audit_verdict(audit_data)
c_result["raw_assessments"] = audit_data.get("section_assessments", [])
_stash(request, "layer_c", c_result)
# Assert
assert c_result["verdict"] == "ACCEPT", (
f"QE Audit REJECTED — inadequate_ratio={c_result['inadequate_ratio']:.1%} > 30%\n"
f" Rationale: {c_result['rationale']}\n"
f" Adequate: {c_result['adequate']}, Inadequate: {c_result['inadequate']}"
)
# ═══════════════════════════════════════════════════════════════════════════════
# Final report (runs last)
# ═══════════════════════════════════════════════════════════════════════════════
def test_final_report(ir_data: dict, ir_path: str, request):
"""Generate the final three-layer JSON report.
This test always passes (report generation). The verdicts from layers A/B/C
determine the final releasable status, but the report itself is informational.
"""
layer_a = _unstash(request, "layer_a") or {"verdict": "SKIPPED"}
layer_b = _unstash(request, "layer_b") or {"verdict": "SKIPPED"}
layer_c = _unstash(request, "layer_c") or {"verdict": "SKIPPED"}
report_path = request.config.getoption("--json-report-file", None) or str(
Path.cwd() / "acceptance-report.json"
)
report = generate_report(
layer_a,
layer_b,
layer_c,
commit=os.environ.get("GITEA_SHA", ""),
branch=os.environ.get("GITEA_BRANCH", "main"),
output_path=report_path,
)
# Print summary
print(f"\n{'='*60}")
print(f"QE ACCEPTANCE REPORT")
print(f"{'='*60}")
print(f" Layer A (Schema): {layer_a.get('verdict', '?')}")
print(f" Layer B (Coverage): {layer_b.get('verdict', '?')} "
f"(rate={layer_b.get('coverage_rate', '?')})")
print(f" Layer C (QE Audit): {layer_c.get('verdict', '?')}")
print(f" {''*40}")
print(f" FINAL: {report['final_verdict']} | "
f"Releasable: {report['releasable']}")
print(f" Report: {report_path}")
print(f"{'='*60}\n")
# Fail if any layer failed (aggregate assertion)
failures = report.get("failure_details", [])
if failures:
pytest.fail(
"Acceptance tests FAILED:\n" + "\n".join(f" - {f}" for f in failures)
)
# ═══════════════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════════════
import os # noqa: E402
# Module-level stash for sharing results across tests in the same module.
# Each test function stores its result here; later tests read earlier results.
_module_stash: dict[str, dict] = {}
def _stash(request, key: str, value: dict):
"""Store a result dict for cross-test access within this module."""
_module_stash[key] = value
def _unstash(request, key: str) -> dict | None:
"""Retrieve a stashed result."""
return _module_stash.get(key)
def _parse_json_response(raw: str) -> dict | None:
"""Parse JSON from an LLM response, handling markdown code fences."""
if not raw:
return None
text = raw.strip()
if text.startswith("```"):
nl = text.find("\n")
text = text[nl + 1:] if nl != -1 else text[3:]
if text.endswith("```"):
text = text[:-3]
try:
return json.loads(text)
except json.JSONDecodeError:
return None