"""QE Acceptance Test — Three-layer main branch health check. Layer A (Schema): structural correctness of IR Layer B (Coverage): structural source-traceability coverage + stability Layer C (QE Audit): LLM as QE expert — functional coverage assessment Final verdict: all three layers must pass for main to be releasable. """ from __future__ import annotations import json import math import re import statistics import tempfile import time from pathlib import Path from typing import Any import pytest from .ir_schema import validate_ir, schema_checklist from .report import generate_report, schema_verdict, coverage_verdict, audit_verdict # ═══════════════════════════════════════════════════════════════════════════════ # Layer A: SCHEMA — deterministic structural validation # ═══════════════════════════════════════════════════════════════════════════════ def test_layer_a_schema(ir_data: dict, request): """Validate IR structure: required fields, types, naming conventions, no nulls.""" report = validate_ir(ir_data) checks = schema_checklist(ir_data) # Build Layer A result a_errors = report["errors"] a_stats = report["stats"] a_result = schema_verdict(a_errors, a_stats) a_result["checks"] = checks # Store for downstream layers & report _stash(request, "layer_a", a_result) # Assert assert report["valid"], ( f"Schema validation FAILED ({len(a_errors)} errors)\n" + "\n".join(f" - {e}" for e in a_errors[:20]) ) # ═══════════════════════════════════════════════════════════════════════════════ # Layer B: STRUCTURAL COVERAGE + STABILITY # ═══════════════════════════════════════════════════════════════════════════════ # Section titles that are NOT functional requirements NON_FUNCTIONAL_PATTERNS = [ re.compile(p) for p in [ r"编制.*变更.*日志", r"变更日志", r"文档背景", r"文档范围", r"术语解释", r"参考(文献|文档|资料)?", r"附录", r"版本", r"变更记录", r"目录", r"前言", r"概述.*背景", r"产品简介", r"场景.*(说明|概述)", r".*概要说明$", r"相关文档", r"行业规范", r"政策法规", r"非功能说明", r"背景介绍", r"PRD", # document title like "XX Auto XXX PRD V1.0" r"产品架构", # architecture overview r"系统架构", ] ] def _is_functional_section(section_name: str) -> bool: """Heuristic: exclude background, glossary, changelog, scope sections. Check non-functional patterns first, then treat numbered sections (like '3.1.1 系统限制') as likely functional. """ # Explicitly non-functional patterns (checked first) for pat in NON_FUNCTIONAL_PATTERNS: if pat.search(section_name): return False # Documents with only a title (no section number) — check for functional keywords sec_num = _section_number(section_name) if not sec_num: return False if "." not in sec_num and not sec_num[0].isdigit(): func_keywords = ["策略", "规则", "功能", "限制", "流程", "配置", "场景", "约束", "条件", "方案", "逻辑", "处理", "机制", "禁止"] if not any(kw in section_name for kw in func_keywords): return False return True def _has_section_content(sec: dict) -> bool: """Check if a section has meaningful content (text, table, or image). A section is considered "empty" (no real content) if all its text blocks have fewer than 10 characters and it contains no tables or images. """ for block in sec.get("blocks", []): blk_type = block.get("type", "") if blk_type == "table": return True if blk_type in ("image", "figure", "picture"): return True text = block.get("text", "") if isinstance(text, str) and len(text.strip()) >= 10: return True return False def _extract_content_units(parsed_data: dict) -> dict: """Extract countable content units from parsed JSON. Returns: {"sections": [{"name": ..., "number": ...}, ...], "table_rows": int, "diagram_images": [rid, ...]} """ sections = parsed_data.get("sections", []) functional_sections: list[dict] = [] total_table_rows = 0 for sec in sections: name = sec.get("source", "") is_func = _is_functional_section(name) and _has_section_content(sec) if is_func: functional_sections.append({ "name": name, "number": _section_number(name), }) # Only count table rows from functional sections # (non-functional sections like changelog, glossary, references # cannot be covered by function_units — counting them inflates # the denominator and yields misleadingly low coverage.) if is_func: for block in sec.get("blocks", []): if block.get("type") == "table": rows = block.get("rows", []) total_table_rows += len(rows) # Diagram-type images from image_analysis diagram_rids: list[str] = [] for img in parsed_data.get("image_analysis", []): img_type = img.get("type", "") if img_type in ("flowchart", "logic_tree", "architecture", "state", "sequence", "activity"): diagram_rids.append(img.get("rid", "")) return { "functional_sections": functional_sections, "table_rows": total_table_rows, "diagram_images": diagram_rids, } def _section_number(section_name: str) -> str: """Extract leading section number, e.g. '3.1.1 系统限制' → '3.1.1'.""" import re m = re.match(r"^([\d.]+)", section_name) return m.group(1) if m else section_name def _section_matches(sec_ref: str, func_sections: list[dict]) -> str | None: """Find a functional section matching *sec_ref*. Returns the section name or None. Matching: exact match → starts-with match → number match → substring match. """ # exact for s in func_sections: if s["name"] == sec_ref: return s["name"] # starts with section number for s in func_sections: if s["name"].startswith(sec_ref) or sec_ref.startswith(s["name"]): return s["name"] # number match sec_num = _section_number(sec_ref) if sec_num: for s in func_sections: if s["number"] == sec_num: return s["name"] # substring for s in func_sections: if sec_ref in s["name"] or s["name"] in sec_ref: return s["name"] return None def _measure_coverage(ir_data: dict, parsed_data: dict) -> dict: """Compute structural coverage of IR over parsed document. Returns: { "section_coverage": {total, covered, rate, uncovered}, "table_coverage": {total_rows, covered_rows, rate}, "diagram_coverage": {total, covered, rate}, "overall_rate": float, } """ units = _extract_content_units(parsed_data) rules = ir_data.get("rules", []) # ── section coverage ── func_sections = units["functional_sections"] covered_sections: set[str] = set() for rule in rules: for src in rule.get("sources", []): sec_ref = src.get("section", "") if sec_ref: matched = _section_matches(sec_ref, func_sections) if matched: covered_sections.add(matched) def _safe_rate(covered: int, total: int) -> float: """Return coverage rate. total=0 means nothing to cover → 1.0.""" return round(covered / total, 3) if total > 0 else 1.0 section_coverage = { "total": len(func_sections), "covered": len(covered_sections), "rate": _safe_rate(len(covered_sections), len(func_sections)), "uncovered": [s["name"] for s in func_sections if s["name"] not in covered_sections], } # ── table row coverage ── covered_rows: set[tuple] = set() for rule in rules: for src in rule.get("sources", []): if src.get("type") == "table": sec = src.get("section", "") row = src.get("row") if sec and row is not None: covered_rows.add((sec, row)) total_rows = units["table_rows"] table_coverage = { "total_rows": total_rows, "covered_rows": len(covered_rows), "rate": _safe_rate(len(covered_rows), total_rows), } # ── diagram coverage ── diagram_rids = units["diagram_images"] covered_rids: set[str] = set() for rule in rules: for src in rule.get("sources", []): if src.get("type") == "logic_tree": img_id = src.get("image_id", "") if img_id and img_id in diagram_rids: covered_rids.add(img_id) diagram_coverage = { "total": len(diagram_rids), "covered": len(covered_rids), "rate": _safe_rate(len(covered_rids), len(diagram_rids)), "uncovered": [r for r in diagram_rids if r not in covered_rids], } # ── overall: only include dimensions with actual content ── rates: list[float] = [] if section_coverage["total"] > 0: rates.append(section_coverage["rate"]) if table_coverage["total_rows"] > 0: rates.append(table_coverage["rate"]) if diagram_coverage["total"] > 0: rates.append(diagram_coverage["rate"]) overall = round(sum(rates) / len(rates), 3) if rates else 0.0 return { "section_coverage": section_coverage, "table_coverage": table_coverage, "diagram_coverage": diagram_coverage, "overall_rate": overall, } def test_measure_coverage_excludes_zero_dimensions(): """#36: dimensions with total=0 must not drag down the overall rate. When diagram total=0, the overall should be computed from sections and tables only, not include a 0% diagram entry that makes the goal unreachable. """ parsed_data = { "sections": [ {"source": "3.1.1 功能A", "blocks": [ {"type": "table", "rows": [{"cell": "1"}, {"cell": "2"}]} ]} ], "image_analysis": [], # no diagrams → total=0 } # IR that covers the section but no table rows (table coverage = 0/2) ir_data = { "rules": [ {"sources": [{"section": "3.1.1"}]} # 1 section covered, 0 tables ] } cov = _measure_coverage(ir_data, parsed_data) # Section: 1/1 = 100%, Table: 0/2 = 0%, Diagram: total=0 → excluded assert cov["section_coverage"]["total"] == 1 assert cov["section_coverage"]["rate"] == 1.0 assert cov["table_coverage"]["total_rows"] == 2 assert cov["table_coverage"]["rate"] == 0.0 assert cov["diagram_coverage"]["total"] == 0 assert cov["diagram_coverage"]["rate"] == 1.0 # _safe_rate: 0/0 → 1.0 # Key assertion: diagram (total=0) is excluded from overall # overall = (1.0 + 0.0) / 2 = 0.5 # NOT (1.0 + 0.0 + 1.0) / 3 = 0.667 assert cov["overall_rate"] == 0.5, ( f"Expected overall 0.5 (sections + tables only), got {cov['overall_rate']}. " f"Zero-content dimension may be leaking into the average." ) def test_measure_coverage_all_dimensions_have_content(): """When all dimensions have content, all should be included.""" parsed_data = { "sections": [ {"source": "3.1.1 功能A", "blocks": [ {"type": "table", "rows": [{"cell": "1"}]} ]} ], "image_analysis": [{"type": "flowchart", "rid": "img_001"}], } ir_data = { "rules": [ {"sources": [{"section": "3.1.1"}]}, {"sources": [{"type": "table", "section": "3.1.1", "row": 0}]}, {"sources": [{"type": "logic_tree", "image_id": "img_001"}]}, ] } cov = _measure_coverage(ir_data, parsed_data) # All three dimensions have content → all included assert cov["section_coverage"]["total"] == 1 assert cov["table_coverage"]["total_rows"] == 1 assert cov["diagram_coverage"]["total"] == 1 # overall = (1.0 + 1.0 + 1.0) / 3 = 1.0 assert cov["overall_rate"] == 1.0, ( f"Expected overall 1.0 (all covered), got {cov['overall_rate']}" ) def test_measure_coverage_no_content_returns_zero(): """When no dimensions have content, overall should be 0.0.""" parsed_data = {"sections": [], "image_analysis": []} ir_data = {"rules": []} cov = _measure_coverage(ir_data, parsed_data) assert cov["overall_rate"] == 0.0 def test_layer_b_coverage( ir_data: dict, parsed_data: dict | None, ir_path: str, acceptance_runs: int, run_ir_pipeline, request, ): """Measure structural coverage and (optionally) coverage stability.""" if parsed_data is None: pytest.skip("No parsed JSON available for coverage analysis") # ── B1: single-run coverage ── cov = _measure_coverage(ir_data, parsed_data) # ── B2: stability (multi-run) ── stability_values: list[float] = [cov["overall_rate"]] stability_std = 0.0 if acceptance_runs > 1 and run_ir_pipeline is not None: parsed_path = request.config.getoption("--parsed-path") if parsed_path and os.path.exists(parsed_path): for _ in range(acceptance_runs - 1): try: ir_list, _ = run_ir_pipeline(parsed_path) run_ir = _wrap_list_ir(ir_list) run_cov = _measure_coverage(run_ir, parsed_data) stability_values.append(run_cov["overall_rate"]) time.sleep(0.5) except Exception as e: pytest.fail(f"Stability run failed: {e}") elif acceptance_runs > 1 and run_ir_pipeline is None: print(" [Layer B] Stability testing skipped: pipeline runner not available") if len(stability_values) > 1: stability_std = statistics.stdev(stability_values) # Build Layer B result b_result = coverage_verdict( coverage_rate=cov["overall_rate"], stability_std=stability_std, stability_values=stability_values, section_coverage=cov["section_coverage"], table_coverage=cov["table_coverage"], diagram_coverage=cov["diagram_coverage"], ) _stash(request, "layer_b", b_result) # Assert — both B1 and B2 must pass assert b_result["coverage_pass"], ( f"Coverage {cov['overall_rate']:.1%} < threshold 70%\n" f" Sections: {cov['section_coverage']['covered']}/{cov['section_coverage']['total']} " f"({cov['section_coverage']['rate']:.1%})\n" f" Uncovered: {cov['section_coverage']['uncovered']}\n" f" Table rows: {cov['table_coverage']['covered_rows']}/{cov['table_coverage']['total_rows']} " f"({cov['table_coverage']['rate']:.1%})\n" f" Diagrams: {cov['diagram_coverage']['covered']}/{cov['diagram_coverage']['total']} " f"({cov['diagram_coverage']['rate']:.1%})\n" f" Uncovered diagrams: {cov['diagram_coverage']['uncovered']}" ) if len(stability_values) > 1: assert b_result["stability"]["pass"], ( f"Coverage stability std={stability_std:.4f} > threshold 0.05\n" f" Values across {len(stability_values)} runs: {stability_values}" ) def _wrap_list_ir(ir_list: list) -> dict: """Wrap a list-format IR (from ir_generator.py) into a dict for schema compat.""" # Convert simple format to rich format for coverage measurement rules = [] for i, entry in enumerate(ir_list): if not isinstance(entry, dict): continue rule = { "rule_id": f"GEN-001-RULE-{i:03d}", "description": entry.get("function", ""), "path": [], "priority": "P2", "sources": [], "precondition": {}, "trigger": entry.get("trigger", {"operator": "AND", "conditions": []}), "actions": [], } # Convert source src = entry.get("source", {}) if src.get("section"): rule["sources"].append({ "type": "text", "section": src["section"], "paragraph": 1, "text_snippet": src.get("location", ""), "priority": "primary_source", }) rules.append(rule) return { "feature": "generated", "feature_id": "GEN-001", "rules": rules, } # ═══════════════════════════════════════════════════════════════════════════════ # Layer C: LLM QE EXPERT AUDIT # ═══════════════════════════════════════════════════════════════════════════════ QE_AUDITOR_PROMPT = """你是一个资深 QE 专家,负责审查需求文档的 IR(中间表示层)是否充分覆盖了源文档的所有可测试功能点。 你不是 IR 的生成者,你是独立的质量审计员。你的职责是判断 IR 的功能覆盖率是否充分。 ## 审计输入 ### Layer B 结构化覆盖率数据(参考) {coverage_summary} ### 源文档内容(Parsed JSON) {parsed_content} ### 生成的 IR(待审计) {ir_content} ## 审计要求 对源文档中的每个章节逐一评估其功能需求是否被 IR 充分覆盖。 **判断标准**: - **adequate**(充分覆盖):该章节的所有功能需求在 IR 中都有对应的 rule,包括触发条件、执行动作 - **inadequate**(覆盖不足):该章节存在功能需求未在 IR 中体现,或描述不完整(缺少触发条件或动作) - **not_applicable**(不适用):该章节为背景介绍、术语定义、变更日志等,不包含功能需求 **注意**: - 如果某个章节涉及多个决策路径(如流程图),检查 IR 是否覆盖了每条路径 - 表格中的每个功能行都应被至少一个 IR rule 覆盖 - 图片分析中的流程图/决策树节点应被 IR 引用 ## 输出格式 请严格输出以下 JSON 格式(不要包含代码块标记): {{ "total_functional_sections": , "adequate": , "inadequate": , "not_applicable": , "inadequate_ratio": , "verdict": "ACCEPT 或 REJECT", "rationale": "<一句话说明接受或拒绝的理由>", "section_assessments": [ {{ "section": "<章节名>", "assessment": "adequate | inadequate | not_applicable", "reason": "<评估理由>", "missing": ["<缺失项1>", "<缺失项2>"] // 仅 inadequate 时需要 }} ] }} verdict 判定规则: - inadequate_ratio ≤ 0.30 → "ACCEPT"(风险可控) - inadequate_ratio > 0.30 → "REJECT"(功能点认知差异大,需要补充 IR) """ def test_layer_c_qe_audit( ir_data: dict, parsed_data: dict | None, llm_client, request ): """LLM QE expert audit of functional coverage.""" if parsed_data is None: pytest.skip("No parsed JSON available — cannot run QE audit") # ── get Layer B summary for context ── layer_b = _unstash(request, "layer_b") or {} cov_summary = json.dumps( { "coverage_rate": layer_b.get("coverage_rate", "N/A"), "section_coverage": layer_b.get("section_coverage", {}), "diagram_coverage": layer_b.get("diagram_coverage", {}), }, ensure_ascii=False, indent=2, ) # ── prepare content (trim to avoid token overflow) ── parsed_str = json.dumps(parsed_data, ensure_ascii=False) ir_str = json.dumps(ir_data, ensure_ascii=False) max_parsed = 12000 max_ir = 8000 if len(parsed_str) > max_parsed: parsed_str = parsed_str[:max_parsed] + "\n...[truncated]" if len(ir_str) > max_ir: ir_str = ir_str[:max_ir] + "\n...[truncated]" prompt = QE_AUDITOR_PROMPT.format( coverage_summary=cov_summary, parsed_content=parsed_str, ir_content=ir_str, ) # ── call LLM ── try: raw = llm_client.chat( model=llm_client.TEXT_MODEL, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, ) except Exception as e: pytest.fail(f"QE audit LLM call failed: {e}") # ── parse response ── audit_data = _parse_json_response(raw) if audit_data is None: pytest.fail(f"QE audit returned unparseable response:\n{raw[:500]}") # Build Layer C result c_result = audit_verdict(audit_data) c_result["raw_assessments"] = audit_data.get("section_assessments", []) _stash(request, "layer_c", c_result) # Assert assert c_result["verdict"] == "ACCEPT", ( f"QE Audit REJECTED — inadequate_ratio={c_result['inadequate_ratio']:.1%} > 30%\n" f" Rationale: {c_result['rationale']}\n" f" Adequate: {c_result['adequate']}, Inadequate: {c_result['inadequate']}" ) # ═══════════════════════════════════════════════════════════════════════════════ # Final report (runs last) # ═══════════════════════════════════════════════════════════════════════════════ def test_final_report(ir_data: dict, ir_path: str, request): """Generate the final three-layer JSON report. This test always passes (report generation). The verdicts from layers A/B/C determine the final releasable status, but the report itself is informational. """ layer_a = _unstash(request, "layer_a") or {"verdict": "SKIPPED"} layer_b = _unstash(request, "layer_b") or {"verdict": "SKIPPED"} layer_c = _unstash(request, "layer_c") or {"verdict": "SKIPPED"} report_path = request.config.getoption("--json-report-file", None) or str( Path.cwd() / "acceptance-report.json" ) report = generate_report( layer_a, layer_b, layer_c, commit=os.environ.get("GITEA_SHA", ""), branch=os.environ.get("GITEA_BRANCH", "main"), output_path=report_path, ) # Print summary print(f"\n{'='*60}") print(f"QE ACCEPTANCE REPORT") print(f"{'='*60}") print(f" Layer A (Schema): {layer_a.get('verdict', '?')}") print(f" Layer B (Coverage): {layer_b.get('verdict', '?')} " f"(rate={layer_b.get('coverage_rate', '?')})") print(f" Layer C (QE Audit): {layer_c.get('verdict', '?')}") print(f" {'─'*40}") print(f" FINAL: {report['final_verdict']} | " f"Releasable: {report['releasable']}") print(f" Report: {report_path}") print(f"{'='*60}\n") # Fail if any layer failed (aggregate assertion) failures = report.get("failure_details", []) if failures: pytest.fail( "Acceptance tests FAILED:\n" + "\n".join(f" - {f}" for f in failures) ) # ═══════════════════════════════════════════════════════════════════════════════ # Helpers # ═══════════════════════════════════════════════════════════════════════════════ import os # noqa: E402 # Module-level stash for sharing results across tests in the same module. # Each test function stores its result here; later tests read earlier results. _module_stash: dict[str, dict] = {} def _stash(request, key: str, value: dict): """Store a result dict for cross-test access within this module.""" _module_stash[key] = value def _unstash(request, key: str) -> dict | None: """Retrieve a stashed result.""" return _module_stash.get(key) def _parse_json_response(raw: str) -> dict | None: """Parse JSON from an LLM response, handling markdown code fences.""" if not raw: return None text = raw.strip() if text.startswith("```"): nl = text.find("\n") text = text[nl + 1:] if nl != -1 else text[3:] if text.endswith("```"): text = text[:-3] try: return json.loads(text) except json.JSONDecodeError: return None