Files
document_analyzer/scripts/run_pipeline.py
2026-06-03 14:39:55 +08:00

188 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""End-to-end pipeline runner for QE acceptance testing.
Runs the complete document_analyzer pipeline:
1. doc_parser (docx → _parsed.json, if .docx provided)
2. ir_generation steps (parsed JSON → ir_final.json + audit report)
3. QE acceptance tests (optional, if --test flag)
Usage:
python scripts/run_pipeline.py --input <path.docx> # full pipeline
python scripts/run_pipeline.py --parsed <_updated.json> # skip doc_parser
python scripts/run_pipeline.py --parsed <_updated.json> --test # pipeline + acceptance tests
Outputs are placed in output/ matching the project config.py structure:
output/final/ir_final.json
output/final/ir_audit_report.md
acceptance-report.json (if --test)
"""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import json
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "skills" / "ir_generation_skill"))
sys.path.insert(0, str(PROJECT_ROOT / "skills" / "doc_parser_skill" / "scripts"))
import config
# ── Stage 1: Document Parsing ────────────────────────────────────────────────
def run_doc_parser(docx_path: str, output_dir: str) -> str | None:
"""Run doc_parser on a .docx file. Returns path to _parsed.json or None."""
from doc_parser import parse_document
print(f"[1/3] Parsing document: {docx_path}")
result = parse_document(docx_path, output_dir, dry_run=False)
# parse_document returns {source, sections, image_sources, image_analysis}
# Output is saved as <basename>_parsed.json in output_dir
basename = os.path.splitext(os.path.basename(docx_path))[0]
parsed_path = os.path.join(output_dir, f"{basename}_parsed.json")
if os.path.isfile(parsed_path):
print(f"{parsed_path}")
return parsed_path
print(f" [FAIL] doc_parser output not found: {parsed_path}", file=sys.stderr)
return None
# ── Stage 2: IR Generation ───────────────────────────────────────────────────
def run_ir_pipeline(parsed_path: str) -> str | None:
"""Run the ir_generation steps. Returns path to ir_final.json or None."""
os.makedirs(config.PROJECT_OUTPUT, exist_ok=True)
os.makedirs(config.IR_OUTPUT, exist_ok=True)
os.makedirs(config.FINAL_OUTPUT, exist_ok=True)
env = os.environ.copy()
env["IR_INPUT_JSON"] = parsed_path
steps = [
("step1_semantic_index.py", "Semantic Index"),
("step2_ir_extraction.py", "IR Extraction"),
("step2_5_branch_coverage.py", "Branch Coverage"),
("step3_merge_and_audit.py", "Merge & Audit"),
]
print(f"[2/3] Generating IR from: {parsed_path}")
for script, label in steps:
script_path = PROJECT_ROOT / "skills" / "ir_generation_skill" / script
if not script_path.exists():
print(f" [FAIL] Missing: {script}", file=sys.stderr)
continue
print(f" Running {script} ({label})...")
result = subprocess.run(
[sys.executable, str(script_path)],
cwd=str(PROJECT_ROOT),
capture_output=True, text=True, encoding="utf-8",
env=env,
)
if result.returncode != 0:
print(f" [FAIL] {script} failed (exit {result.returncode})", file=sys.stderr)
print(result.stderr[-500:], file=sys.stderr)
else:
# Print last line of stdout for brief progress
lines = result.stdout.strip().split("\n")
last = lines[-1] if lines else "done"
print(f" [OK] {label}: {last[:120]}")
if os.path.isfile(config.IR_FINAL_JSON):
print(f"{config.IR_FINAL_JSON}")
return config.IR_FINAL_JSON
print(" [FAIL] IR generation did not produce ir_final.json", file=sys.stderr)
return None
# ── Stage 3: Acceptance Tests ────────────────────────────────────────────────
def run_acceptance_tests(parsed_json_path: str) -> int:
"""Run QE acceptance tests. Returns pytest exit code."""
print("[3/3] Running QE acceptance tests...")
test_dir = PROJECT_ROOT / "tests" / "acceptance"
env = os.environ.copy()
env.setdefault("PYTHONIOENCODING", "utf-8")
result = subprocess.run(
[
sys.executable, "-m", "pytest", str(test_dir),
"-v", "--run-acceptance",
"--ir-path", config.IR_FINAL_JSON,
"--parsed-path", parsed_json_path,
"--tb=short",
],
cwd=str(PROJECT_ROOT),
encoding="utf-8",
env=env,
)
return result.returncode
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Run the full document_analyzer pipeline")
parser.add_argument("--input", help="Path to .docx PRD file")
parser.add_argument("--parsed", help="Path to pre-parsed _updated.json (skip doc_parser)")
parser.add_argument("--test", action="store_true", help="Run acceptance tests after pipeline")
parser.add_argument("--output-dir", default=None, help="Output directory (default: output/)")
args = parser.parse_args()
parsed_path = args.parsed
# Stage 1: doc_parser
if args.input:
docx = args.input
if not os.path.isfile(docx):
print(f"Error: Input file not found: {docx}", file=sys.stderr)
sys.exit(1)
out_dir = args.output_dir or str(PROJECT_ROOT / "output")
parsed_path = run_doc_parser(docx, out_dir)
if not parsed_path:
print("\n[FAIL] Pipeline blocked at Stage 1 (doc_parser)", file=sys.stderr)
# Create tracking issue for dev-agent
_maybe_create_blocking_issue("doc_parser", f"Input: {docx}")
sys.exit(1)
if not parsed_path:
print("Error: Either --input or --parsed is required", file=sys.stderr)
sys.exit(1)
if not os.path.isfile(parsed_path):
print(f"Error: Parsed JSON not found: {parsed_path}", file=sys.stderr)
sys.exit(1)
# Stage 2: IR generation
ir_path = run_ir_pipeline(parsed_path)
if not ir_path:
print("\n[FAIL] Pipeline blocked at Stage 2 (ir_generation)", file=sys.stderr)
_maybe_create_blocking_issue("ir_generation", f"Parsed: {parsed_path}")
sys.exit(1)
print(f"\n[OK] Pipeline complete: {ir_path}")
# Stage 3: Acceptance tests
if args.test:
exit_code = run_acceptance_tests(parsed_path)
sys.exit(exit_code)
def _maybe_create_blocking_issue(stage: str, detail: str):
"""Notify about a pipeline blockage. The acceptance CI will create the issue."""
print(f"\n⚠ Stage '{stage}' failed. CI will create an acceptance-failure issue.", file=sys.stderr)
if __name__ == "__main__":
main()