Files
pzhang_zywl 076fb25eda
CI / test (pull_request) Successful in 8s
test: _measure_coverage overall 排除零内容维度 - Closes #36
添加 3 个回归测试验证 total=0 的维度不参与 overall 计算:
- 零内容维度被正确排除
- 所有维度有内容则全部参与
- 无内容时返回 0.0
fix 已在 1a867b0 合入,本次补充 UT 覆盖。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-02 14:20:38 +08:00

686 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""QE Acceptance Test — Three-layer main branch health check.
Layer A (Schema): structural correctness of IR
Layer B (Coverage): structural source-traceability coverage + stability
Layer C (QE Audit): LLM as QE expert — functional coverage assessment
Final verdict: all three layers must pass for main to be releasable.
"""
from __future__ import annotations
import json
import math
import re
import statistics
import tempfile
import time
from pathlib import Path
from typing import Any
import pytest
from .ir_schema import validate_ir, schema_checklist
from .report import generate_report, schema_verdict, coverage_verdict, audit_verdict
# ═══════════════════════════════════════════════════════════════════════════════
# Layer A: SCHEMA — deterministic structural validation
# ═══════════════════════════════════════════════════════════════════════════════
def test_layer_a_schema(ir_data: dict, request):
"""Validate IR structure: required fields, types, naming conventions, no nulls."""
report = validate_ir(ir_data)
checks = schema_checklist(ir_data)
# Build Layer A result
a_errors = report["errors"]
a_stats = report["stats"]
a_result = schema_verdict(a_errors, a_stats)
a_result["checks"] = checks
# Store for downstream layers & report
_stash(request, "layer_a", a_result)
# Assert
assert report["valid"], (
f"Schema validation FAILED ({len(a_errors)} errors)\n"
+ "\n".join(f" - {e}" for e in a_errors[:20])
)
# ═══════════════════════════════════════════════════════════════════════════════
# Layer B: STRUCTURAL COVERAGE + STABILITY
# ═══════════════════════════════════════════════════════════════════════════════
# Section titles that are NOT functional requirements
NON_FUNCTIONAL_PATTERNS = [
re.compile(p) for p in [
r"编制.*变更.*日志",
r"变更日志",
r"文档背景",
r"文档范围",
r"术语解释",
r"参考(文献|文档|资料)?",
r"附录",
r"版本",
r"变更记录",
r"目录",
r"前言",
r"概述.*背景",
r"产品简介",
r"场景.*(说明|概述)",
r".*概要说明$",
r"相关文档",
r"行业规范",
r"政策法规",
r"非功能说明",
r"背景介绍",
r"PRD", # document title like "XX Auto XXX PRD V1.0"
r"产品架构", # architecture overview
r"系统架构",
]
]
def _is_functional_section(section_name: str) -> bool:
"""Heuristic: exclude background, glossary, changelog, scope sections.
Check non-functional patterns first, then treat numbered sections (like
'3.1.1 系统限制') as likely functional.
"""
# Explicitly non-functional patterns (checked first)
for pat in NON_FUNCTIONAL_PATTERNS:
if pat.search(section_name):
return False
# Documents with only a title (no section number) — check for functional keywords
sec_num = _section_number(section_name)
if not sec_num:
return False
if "." not in sec_num and not sec_num[0].isdigit():
func_keywords = ["策略", "规则", "功能", "限制", "流程", "配置", "场景",
"约束", "条件", "方案", "逻辑", "处理", "机制", "禁止"]
if not any(kw in section_name for kw in func_keywords):
return False
return True
def _has_section_content(sec: dict) -> bool:
"""Check if a section has meaningful content (text, table, or image).
A section is considered "empty" (no real content) if all its text blocks
have fewer than 10 characters and it contains no tables or images.
"""
for block in sec.get("blocks", []):
blk_type = block.get("type", "")
if blk_type == "table":
return True
if blk_type in ("image", "figure", "picture"):
return True
text = block.get("text", "")
if isinstance(text, str) and len(text.strip()) >= 10:
return True
return False
def _extract_content_units(parsed_data: dict) -> dict:
"""Extract countable content units from parsed JSON.
Returns:
{"sections": [{"name": ..., "number": ...}, ...],
"table_rows": int, "diagram_images": [rid, ...]}
"""
sections = parsed_data.get("sections", [])
functional_sections: list[dict] = []
total_table_rows = 0
for sec in sections:
name = sec.get("source", "")
is_func = _is_functional_section(name) and _has_section_content(sec)
if is_func:
functional_sections.append({
"name": name,
"number": _section_number(name),
})
# Only count table rows from functional sections
# (non-functional sections like changelog, glossary, references
# cannot be covered by function_units — counting them inflates
# the denominator and yields misleadingly low coverage.)
if is_func:
for block in sec.get("blocks", []):
if block.get("type") == "table":
rows = block.get("rows", [])
total_table_rows += len(rows)
# Diagram-type images from image_analysis
diagram_rids: list[str] = []
for img in parsed_data.get("image_analysis", []):
img_type = img.get("type", "")
if img_type in ("flowchart", "logic_tree", "architecture",
"state", "sequence", "activity"):
diagram_rids.append(img.get("rid", ""))
return {
"functional_sections": functional_sections,
"table_rows": total_table_rows,
"diagram_images": diagram_rids,
}
def _section_number(section_name: str) -> str:
"""Extract leading section number, e.g. '3.1.1 系统限制''3.1.1'."""
import re
m = re.match(r"^([\d.]+)", section_name)
return m.group(1) if m else section_name
def _section_matches(sec_ref: str, func_sections: list[dict]) -> str | None:
"""Find a functional section matching *sec_ref*. Returns the section name or None.
Matching: exact match → starts-with match → number match → substring match.
"""
# exact
for s in func_sections:
if s["name"] == sec_ref:
return s["name"]
# starts with section number
for s in func_sections:
if s["name"].startswith(sec_ref) or sec_ref.startswith(s["name"]):
return s["name"]
# number match
sec_num = _section_number(sec_ref)
if sec_num:
for s in func_sections:
if s["number"] == sec_num:
return s["name"]
# substring
for s in func_sections:
if sec_ref in s["name"] or s["name"] in sec_ref:
return s["name"]
return None
def _measure_coverage(ir_data: dict, parsed_data: dict) -> dict:
"""Compute structural coverage of IR over parsed document.
Returns:
{
"section_coverage": {total, covered, rate, uncovered},
"table_coverage": {total_rows, covered_rows, rate},
"diagram_coverage": {total, covered, rate},
"overall_rate": float,
}
"""
units = _extract_content_units(parsed_data)
rules = ir_data.get("rules", [])
# ── section coverage ──
func_sections = units["functional_sections"]
covered_sections: set[str] = set()
for rule in rules:
for src in rule.get("sources", []):
sec_ref = src.get("section", "")
if sec_ref:
matched = _section_matches(sec_ref, func_sections)
if matched:
covered_sections.add(matched)
def _safe_rate(covered: int, total: int) -> float:
"""Return coverage rate. total=0 means nothing to cover → 1.0."""
return round(covered / total, 3) if total > 0 else 1.0
section_coverage = {
"total": len(func_sections),
"covered": len(covered_sections),
"rate": _safe_rate(len(covered_sections), len(func_sections)),
"uncovered": [s["name"] for s in func_sections
if s["name"] not in covered_sections],
}
# ── table row coverage ──
covered_rows: set[tuple] = set()
for rule in rules:
for src in rule.get("sources", []):
if src.get("type") == "table":
sec = src.get("section", "")
row = src.get("row")
if sec and row is not None:
covered_rows.add((sec, row))
total_rows = units["table_rows"]
table_coverage = {
"total_rows": total_rows,
"covered_rows": len(covered_rows),
"rate": _safe_rate(len(covered_rows), total_rows),
}
# ── diagram coverage ──
diagram_rids = units["diagram_images"]
covered_rids: set[str] = set()
for rule in rules:
for src in rule.get("sources", []):
if src.get("type") == "logic_tree":
img_id = src.get("image_id", "")
if img_id and img_id in diagram_rids:
covered_rids.add(img_id)
diagram_coverage = {
"total": len(diagram_rids),
"covered": len(covered_rids),
"rate": _safe_rate(len(covered_rids), len(diagram_rids)),
"uncovered": [r for r in diagram_rids if r not in covered_rids],
}
# ── overall: only include dimensions with actual content ──
rates: list[float] = []
if section_coverage["total"] > 0:
rates.append(section_coverage["rate"])
if table_coverage["total_rows"] > 0:
rates.append(table_coverage["rate"])
if diagram_coverage["total"] > 0:
rates.append(diagram_coverage["rate"])
overall = round(sum(rates) / len(rates), 3) if rates else 0.0
return {
"section_coverage": section_coverage,
"table_coverage": table_coverage,
"diagram_coverage": diagram_coverage,
"overall_rate": overall,
}
def test_measure_coverage_excludes_zero_dimensions():
"""#36: dimensions with total=0 must not drag down the overall rate.
When diagram total=0, the overall should be computed from sections and tables
only, not include a 0% diagram entry that makes the goal unreachable.
"""
parsed_data = {
"sections": [
{"source": "3.1.1 功能A", "blocks": [
{"type": "table", "rows": [{"cell": "1"}, {"cell": "2"}]}
]}
],
"image_analysis": [], # no diagrams → total=0
}
# IR that covers the section but no table rows (table coverage = 0/2)
ir_data = {
"rules": [
{"sources": [{"section": "3.1.1"}]} # 1 section covered, 0 tables
]
}
cov = _measure_coverage(ir_data, parsed_data)
# Section: 1/1 = 100%, Table: 0/2 = 0%, Diagram: total=0 → excluded
assert cov["section_coverage"]["total"] == 1
assert cov["section_coverage"]["rate"] == 1.0
assert cov["table_coverage"]["total_rows"] == 2
assert cov["table_coverage"]["rate"] == 0.0
assert cov["diagram_coverage"]["total"] == 0
assert cov["diagram_coverage"]["rate"] == 1.0 # _safe_rate: 0/0 → 1.0
# Key assertion: diagram (total=0) is excluded from overall
# overall = (1.0 + 0.0) / 2 = 0.5
# NOT (1.0 + 0.0 + 1.0) / 3 = 0.667
assert cov["overall_rate"] == 0.5, (
f"Expected overall 0.5 (sections + tables only), got {cov['overall_rate']}. "
f"Zero-content dimension may be leaking into the average."
)
def test_measure_coverage_all_dimensions_have_content():
"""When all dimensions have content, all should be included."""
parsed_data = {
"sections": [
{"source": "3.1.1 功能A", "blocks": [
{"type": "table", "rows": [{"cell": "1"}]}
]}
],
"image_analysis": [{"type": "flowchart", "rid": "img_001"}],
}
ir_data = {
"rules": [
{"sources": [{"section": "3.1.1"}]},
{"sources": [{"type": "table", "section": "3.1.1", "row": 0}]},
{"sources": [{"type": "logic_tree", "image_id": "img_001"}]},
]
}
cov = _measure_coverage(ir_data, parsed_data)
# All three dimensions have content → all included
assert cov["section_coverage"]["total"] == 1
assert cov["table_coverage"]["total_rows"] == 1
assert cov["diagram_coverage"]["total"] == 1
# overall = (1.0 + 1.0 + 1.0) / 3 = 1.0
assert cov["overall_rate"] == 1.0, (
f"Expected overall 1.0 (all covered), got {cov['overall_rate']}"
)
def test_measure_coverage_no_content_returns_zero():
"""When no dimensions have content, overall should be 0.0."""
parsed_data = {"sections": [], "image_analysis": []}
ir_data = {"rules": []}
cov = _measure_coverage(ir_data, parsed_data)
assert cov["overall_rate"] == 0.0
def test_layer_b_coverage(
ir_data: dict,
parsed_data: dict | None,
ir_path: str,
acceptance_runs: int,
run_ir_pipeline,
request,
):
"""Measure structural coverage and (optionally) coverage stability."""
if parsed_data is None:
pytest.skip("No parsed JSON available for coverage analysis")
# ── B1: single-run coverage ──
cov = _measure_coverage(ir_data, parsed_data)
# ── B2: stability (multi-run) ──
stability_values: list[float] = [cov["overall_rate"]]
stability_std = 0.0
if acceptance_runs > 1 and run_ir_pipeline is not None:
parsed_path = request.config.getoption("--parsed-path")
if parsed_path and os.path.exists(parsed_path):
for _ in range(acceptance_runs - 1):
try:
ir_list, _ = run_ir_pipeline(parsed_path)
run_ir = _wrap_list_ir(ir_list)
run_cov = _measure_coverage(run_ir, parsed_data)
stability_values.append(run_cov["overall_rate"])
time.sleep(0.5)
except Exception as e:
pytest.fail(f"Stability run failed: {e}")
elif acceptance_runs > 1 and run_ir_pipeline is None:
print(" [Layer B] Stability testing skipped: pipeline runner not available")
if len(stability_values) > 1:
stability_std = statistics.stdev(stability_values)
# Build Layer B result
b_result = coverage_verdict(
coverage_rate=cov["overall_rate"],
stability_std=stability_std,
stability_values=stability_values,
section_coverage=cov["section_coverage"],
table_coverage=cov["table_coverage"],
diagram_coverage=cov["diagram_coverage"],
)
_stash(request, "layer_b", b_result)
# Assert — both B1 and B2 must pass
assert b_result["coverage_pass"], (
f"Coverage {cov['overall_rate']:.1%} < threshold 70%\n"
f" Sections: {cov['section_coverage']['covered']}/{cov['section_coverage']['total']} "
f"({cov['section_coverage']['rate']:.1%})\n"
f" Uncovered: {cov['section_coverage']['uncovered']}\n"
f" Table rows: {cov['table_coverage']['covered_rows']}/{cov['table_coverage']['total_rows']} "
f"({cov['table_coverage']['rate']:.1%})\n"
f" Diagrams: {cov['diagram_coverage']['covered']}/{cov['diagram_coverage']['total']} "
f"({cov['diagram_coverage']['rate']:.1%})\n"
f" Uncovered diagrams: {cov['diagram_coverage']['uncovered']}"
)
if len(stability_values) > 1:
assert b_result["stability"]["pass"], (
f"Coverage stability std={stability_std:.4f} > threshold 0.05\n"
f" Values across {len(stability_values)} runs: {stability_values}"
)
def _wrap_list_ir(ir_list: list) -> dict:
"""Wrap a list-format IR (from ir_generator.py) into a dict for schema compat."""
# Convert simple format to rich format for coverage measurement
rules = []
for i, entry in enumerate(ir_list):
if not isinstance(entry, dict):
continue
rule = {
"rule_id": f"GEN-001-RULE-{i:03d}",
"description": entry.get("function", ""),
"path": [],
"priority": "P2",
"sources": [],
"precondition": {},
"trigger": entry.get("trigger", {"operator": "AND", "conditions": []}),
"actions": [],
}
# Convert source
src = entry.get("source", {})
if src.get("section"):
rule["sources"].append({
"type": "text",
"section": src["section"],
"paragraph": 1,
"text_snippet": src.get("location", ""),
"priority": "primary_source",
})
rules.append(rule)
return {
"feature": "generated",
"feature_id": "GEN-001",
"rules": rules,
}
# ═══════════════════════════════════════════════════════════════════════════════
# Layer C: LLM QE EXPERT AUDIT
# ═══════════════════════════════════════════════════════════════════════════════
QE_AUDITOR_PROMPT = """你是一个资深 QE 专家,负责审查需求文档的 IR(中间表示层)是否充分覆盖了源文档的所有可测试功能点。
你不是 IR 的生成者,你是独立的质量审计员。你的职责是判断 IR 的功能覆盖率是否充分。
## 审计输入
### Layer B 结构化覆盖率数据(参考)
{coverage_summary}
### 源文档内容(Parsed JSON
{parsed_content}
### 生成的 IR(待审计)
{ir_content}
## 审计要求
对源文档中的每个章节逐一评估其功能需求是否被 IR 充分覆盖。
**判断标准**
- **adequate**(充分覆盖):该章节的所有功能需求在 IR 中都有对应的 rule,包括触发条件、执行动作
- **inadequate**(覆盖不足):该章节存在功能需求未在 IR 中体现,或描述不完整(缺少触发条件或动作)
- **not_applicable**(不适用):该章节为背景介绍、术语定义、变更日志等,不包含功能需求
**注意**
- 如果某个章节涉及多个决策路径(如流程图),检查 IR 是否覆盖了每条路径
- 表格中的每个功能行都应被至少一个 IR rule 覆盖
- 图片分析中的流程图/决策树节点应被 IR 引用
## 输出格式
请严格输出以下 JSON 格式(不要包含代码块标记):
{{
"total_functional_sections": <number>,
"adequate": <number>,
"inadequate": <number>,
"not_applicable": <number>,
"inadequate_ratio": <float>,
"verdict": "ACCEPT 或 REJECT",
"rationale": "<一句话说明接受或拒绝的理由>",
"section_assessments": [
{{
"section": "<章节名>",
"assessment": "adequate | inadequate | not_applicable",
"reason": "<评估理由>",
"missing": ["<缺失项1>", "<缺失项2>"] // 仅 inadequate 时需要
}}
]
}}
verdict 判定规则:
- inadequate_ratio ≤ 0.30 → "ACCEPT"(风险可控)
- inadequate_ratio > 0.30 → "REJECT"(功能点认知差异大,需要补充 IR
"""
def test_layer_c_qe_audit(
ir_data: dict, parsed_data: dict | None, llm_client, request
):
"""LLM QE expert audit of functional coverage."""
if parsed_data is None:
pytest.skip("No parsed JSON available — cannot run QE audit")
# ── get Layer B summary for context ──
layer_b = _unstash(request, "layer_b") or {}
cov_summary = json.dumps(
{
"coverage_rate": layer_b.get("coverage_rate", "N/A"),
"section_coverage": layer_b.get("section_coverage", {}),
"diagram_coverage": layer_b.get("diagram_coverage", {}),
},
ensure_ascii=False,
indent=2,
)
# ── prepare content (trim to avoid token overflow) ──
parsed_str = json.dumps(parsed_data, ensure_ascii=False)
ir_str = json.dumps(ir_data, ensure_ascii=False)
max_parsed = 12000
max_ir = 8000
if len(parsed_str) > max_parsed:
parsed_str = parsed_str[:max_parsed] + "\n...[truncated]"
if len(ir_str) > max_ir:
ir_str = ir_str[:max_ir] + "\n...[truncated]"
prompt = QE_AUDITOR_PROMPT.format(
coverage_summary=cov_summary,
parsed_content=parsed_str,
ir_content=ir_str,
)
# ── call LLM ──
try:
raw = llm_client.chat(
model=llm_client.TEXT_MODEL,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
except Exception as e:
pytest.fail(f"QE audit LLM call failed: {e}")
# ── parse response ──
audit_data = _parse_json_response(raw)
if audit_data is None:
pytest.fail(f"QE audit returned unparseable response:\n{raw[:500]}")
# Build Layer C result
c_result = audit_verdict(audit_data)
c_result["raw_assessments"] = audit_data.get("section_assessments", [])
_stash(request, "layer_c", c_result)
# Assert
assert c_result["verdict"] == "ACCEPT", (
f"QE Audit REJECTED — inadequate_ratio={c_result['inadequate_ratio']:.1%} > 30%\n"
f" Rationale: {c_result['rationale']}\n"
f" Adequate: {c_result['adequate']}, Inadequate: {c_result['inadequate']}"
)
# ═══════════════════════════════════════════════════════════════════════════════
# Final report (runs last)
# ═══════════════════════════════════════════════════════════════════════════════
def test_final_report(ir_data: dict, ir_path: str, request):
"""Generate the final three-layer JSON report.
This test always passes (report generation). The verdicts from layers A/B/C
determine the final releasable status, but the report itself is informational.
"""
layer_a = _unstash(request, "layer_a") or {"verdict": "SKIPPED"}
layer_b = _unstash(request, "layer_b") or {"verdict": "SKIPPED"}
layer_c = _unstash(request, "layer_c") or {"verdict": "SKIPPED"}
report_path = request.config.getoption("--json-report-file", None) or str(
Path.cwd() / "acceptance-report.json"
)
report = generate_report(
layer_a,
layer_b,
layer_c,
commit=os.environ.get("GITEA_SHA", ""),
branch=os.environ.get("GITEA_BRANCH", "main"),
output_path=report_path,
)
# Print summary
print(f"\n{'='*60}")
print(f"QE ACCEPTANCE REPORT")
print(f"{'='*60}")
print(f" Layer A (Schema): {layer_a.get('verdict', '?')}")
print(f" Layer B (Coverage): {layer_b.get('verdict', '?')} "
f"(rate={layer_b.get('coverage_rate', '?')})")
print(f" Layer C (QE Audit): {layer_c.get('verdict', '?')}")
print(f" {''*40}")
print(f" FINAL: {report['final_verdict']} | "
f"Releasable: {report['releasable']}")
print(f" Report: {report_path}")
print(f"{'='*60}\n")
# Fail if any layer failed (aggregate assertion)
failures = report.get("failure_details", [])
if failures:
pytest.fail(
"Acceptance tests FAILED:\n" + "\n".join(f" - {f}" for f in failures)
)
# ═══════════════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════════════
import os # noqa: E402
# Module-level stash for sharing results across tests in the same module.
# Each test function stores its result here; later tests read earlier results.
_module_stash: dict[str, dict] = {}
def _stash(request, key: str, value: dict):
"""Store a result dict for cross-test access within this module."""
_module_stash[key] = value
def _unstash(request, key: str) -> dict | None:
"""Retrieve a stashed result."""
return _module_stash.get(key)
def _parse_json_response(raw: str) -> dict | None:
"""Parse JSON from an LLM response, handling markdown code fences."""
if not raw:
return None
text = raw.strip()
if text.startswith("```"):
nl = text.find("\n")
text = text[nl + 1:] if nl != -1 else text[3:]
if text.endswith("```"):
text = text[:-3]
try:
return json.loads(text)
except json.JSONDecodeError:
return None