Files
pzhang_zywl 40567a4fb6
CI / test (push) Successful in 30s
Initial commit: document_analyzer with CI/CD pipeline
- 4 skill pipeline (doc_parser, conflict_detection, ir_generation, resolution_application)
- CI workflow on push/PR (.gitea/workflows/ci.yml)
- Auto-issue on CI failure (.gitea/workflows/auto-issue.yml)
- Pytest smoke tests (tests/test_sample.py)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 20:00:26 +08:00

240 lines
9.5 KiB
Python

import logging
import os
from docx import Document
from docx.table import Table
from docx.text.paragraph import Paragraph
logger = logging.getLogger(__name__)
IMAGE_EXT = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/bmp": ".bmp",
"image/tiff": ".tiff",
"image/webp": ".webp",
"image/x-emf": ".emf",
"image/x-wmf": ".wmf",
"image/svg+xml": ".svg",
}
class WordParser:
"""Parse a .docx file — extract images, split body into sections.
Usage::
parser = WordParser("doc.docx")
parser.extract_images("images/")
sections, image_sources = parser.extract_sections()
"""
HEADER_CELL_MAX_LEN = 20 # max chars per cell to treat first row as header
WML_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
DRAW_NS = "http://schemas.openxmlformats.org/drawingml/2006/main"
REL_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
def __init__(self, docx_path: str):
if not os.path.isfile(docx_path):
raise FileNotFoundError(f"Document not found: {docx_path}")
self._doc = Document(docx_path)
# ---- public API ---------------------------------------------------------
def extract_images(self, images_dir: str) -> list[dict]:
"""Save all images to *images_dir*. Returns ``[{rid, path}, ...]``."""
os.makedirs(images_dir, exist_ok=True)
images: list[dict] = []
for rel in self._doc.part.rels.values():
if "image" not in rel.reltype:
continue
ext = IMAGE_EXT.get(rel.target_part.content_type, ".png")
name = f"image_{rel.rId}{ext}"
path = os.path.join(images_dir, name)
with open(path, "wb") as f:
f.write(rel.target_part.blob)
images.append({"rid": rel.rId, "path": path})
return images
def extract_sections(self) -> tuple[list[dict], dict[str, dict]]:
"""Walk document body and split into sections by heading.
Returns:
*sections* — ``[{source, blocks, images}, ...]``
Each block is ``{type, index, text}`` (paragraph) or
``{type, table, headers, rows}`` (table).
*image_sources* — ``rid → {section, table?, row?, column?, name?}``
"""
sections: list[dict] = []
current_source = ""
blocks: list[dict] = []
section_images: list[str] = []
image_sources: dict[str, dict] = {}
para_idx = 0
tbl_idx = 0
for child in self._doc.element.body:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if tag == "p":
para = Paragraph(child, self._doc)
if self._heading_level(para) is not None:
heading_text = para.text.strip()
if heading_text: # ignore empty heading-like paragraphs
if blocks or section_images:
sections.append({
"source": current_source,
"blocks": blocks,
"images": list(section_images),
})
blocks = []
section_images = []
para_idx = 0
tbl_idx = 0
current_source = heading_text
continue
text = para.text.strip()
# Scan for images — append [[IMAGE:rid]] markers
for run in para.runs:
for rid in self._images_in(run._element):
text += f" [[IMAGE:{rid}]]"
section_images.append(rid)
image_sources[rid] = {"section": current_source}
if text.strip():
blocks.append({"type": "para", "index": para_idx + 1, "text": text.strip()})
para_idx += 1
elif tag == "tbl":
tbl_idx += 1
table = Table(child, self._doc)
# Collect all rows as [[cell_text, ...], ...]
all_rows: list[list[str]] = []
all_images: list[list[list[str]]] = [] # row → col → [rids]
for row in table.rows:
row_texts: list[str] = []
row_cell_images: list[list[str]] = []
for cell in row.cells:
cell_text = cell.text.strip()
cell_imgs: list[str] = []
for cp in cell.paragraphs:
for run in cp.runs:
for rid in self._images_in(run._element):
cell_imgs.append(rid)
# Replace images with markers in text
for rid in cell_imgs:
cell_text += f" [[IMAGE:{rid}]]"
section_images.append(rid)
row_texts.append(cell_text.strip())
row_cell_images.append(cell_imgs)
if any(row_texts) or any(row_cell_images):
all_rows.append(row_texts)
all_images.append(row_cell_images)
if len(all_rows) >= 2:
# Heuristic: first row is a header if every cell is short
first_row = all_rows[0]
has_header = all(len(c) < self.HEADER_CELL_MAX_LEN for c in first_row)
if has_header:
headers = first_row
data_rows_slice = zip(all_rows[1:], all_images[1:])
else:
headers = [f"{ci + 1}" for ci in range(len(first_row))]
data_rows_slice = zip(all_rows, all_images)
data_rows: list[dict] = []
for ri, (row_data, row_imgs) in enumerate(data_rows_slice):
columns: list[dict] = []
max_cols = max(len(headers), len(row_data))
for ci in range(max_cols):
hdr = headers[ci] if ci < len(headers) else ""
txt = row_data[ci] if ci < len(row_data) else ""
columns.append({
"name": hdr,
"row": ri + 1,
"col": ci + 1,
"text": txt,
})
# Register image sources with structured location
imgs = row_imgs[ci] if ci < len(row_imgs) else []
for rid in imgs:
image_sources[rid] = {
"section": current_source,
"table": tbl_idx,
"row": ri + 1,
"column": ci + 1,
"name": hdr,
}
data_rows.append({"columns": columns})
blocks.append({
"type": "table",
"table": tbl_idx,
"headers": headers,
"rows": data_rows,
})
elif all_rows:
# Degenerate table (only header or single row) — treat as plain rows
for ri, row_data in enumerate(all_rows):
row_text = " | ".join(row_data)
if row_text.strip():
blocks.append({
"type": "para",
"index": para_idx + 1,
"text": row_text,
})
para_idx += 1
if blocks or section_images:
sections.append({
"source": current_source,
"blocks": blocks,
"images": list(section_images),
})
return sections, image_sources
# ---- internals ----------------------------------------------------------
def _heading_level(self, para: Paragraph) -> int | None:
"""Heading level 1-9, or *None* if not a heading."""
if para.style and para.style.name:
name = para.style.name
for prefix in ("Heading", "标题"):
if name.startswith(prefix):
try:
return int(name.split()[-1])
except (ValueError, IndexError):
pass
pPr = para._element.find(f"{{{self.WML_NS}}}pPr")
if pPr is not None:
ol = pPr.find(f"{{{self.WML_NS}}}outlineLvl")
if ol is not None:
val = ol.get(f"{{{self.WML_NS}}}val")
if val is not None:
try:
return int(val) + 1
except ValueError:
pass
return None
def _images_in(self, element) -> list[str]:
"""Return rId values for drawings embedded in *element*."""
rids: list[str] = []
for drawing in element.findall(f".//{{{self.WML_NS}}}drawing"):
blip = drawing.find(f".//{{{self.DRAW_NS}}}blip")
if blip is not None:
rid = blip.get(f"{{{self.REL_NS}}}embed")
if rid:
rids.append(rid)
return rids