40567a4fb6
CI / test (push) Successful in 30s
- 4 skill pipeline (doc_parser, conflict_detection, ir_generation, resolution_application) - CI workflow on push/PR (.gitea/workflows/ci.yml) - Auto-issue on CI failure (.gitea/workflows/auto-issue.yml) - Pytest smoke tests (tests/test_sample.py) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
240 lines
9.5 KiB
Python
240 lines
9.5 KiB
Python
import logging
|
|
import os
|
|
|
|
from docx import Document
|
|
from docx.table import Table
|
|
from docx.text.paragraph import Paragraph
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IMAGE_EXT = {
|
|
"image/png": ".png",
|
|
"image/jpeg": ".jpg",
|
|
"image/gif": ".gif",
|
|
"image/bmp": ".bmp",
|
|
"image/tiff": ".tiff",
|
|
"image/webp": ".webp",
|
|
"image/x-emf": ".emf",
|
|
"image/x-wmf": ".wmf",
|
|
"image/svg+xml": ".svg",
|
|
}
|
|
|
|
|
|
class WordParser:
|
|
"""Parse a .docx file — extract images, split body into sections.
|
|
|
|
Usage::
|
|
|
|
parser = WordParser("doc.docx")
|
|
parser.extract_images("images/")
|
|
sections, image_sources = parser.extract_sections()
|
|
"""
|
|
|
|
HEADER_CELL_MAX_LEN = 20 # max chars per cell to treat first row as header
|
|
|
|
WML_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
DRAW_NS = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
REL_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
|
|
|
def __init__(self, docx_path: str):
|
|
if not os.path.isfile(docx_path):
|
|
raise FileNotFoundError(f"Document not found: {docx_path}")
|
|
self._doc = Document(docx_path)
|
|
|
|
# ---- public API ---------------------------------------------------------
|
|
|
|
def extract_images(self, images_dir: str) -> list[dict]:
|
|
"""Save all images to *images_dir*. Returns ``[{rid, path}, ...]``."""
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
images: list[dict] = []
|
|
for rel in self._doc.part.rels.values():
|
|
if "image" not in rel.reltype:
|
|
continue
|
|
ext = IMAGE_EXT.get(rel.target_part.content_type, ".png")
|
|
name = f"image_{rel.rId}{ext}"
|
|
path = os.path.join(images_dir, name)
|
|
with open(path, "wb") as f:
|
|
f.write(rel.target_part.blob)
|
|
images.append({"rid": rel.rId, "path": path})
|
|
return images
|
|
|
|
def extract_sections(self) -> tuple[list[dict], dict[str, dict]]:
|
|
"""Walk document body and split into sections by heading.
|
|
|
|
Returns:
|
|
*sections* — ``[{source, blocks, images}, ...]``
|
|
Each block is ``{type, index, text}`` (paragraph) or
|
|
``{type, table, headers, rows}`` (table).
|
|
*image_sources* — ``rid → {section, table?, row?, column?, name?}``
|
|
"""
|
|
sections: list[dict] = []
|
|
current_source = ""
|
|
blocks: list[dict] = []
|
|
section_images: list[str] = []
|
|
image_sources: dict[str, dict] = {}
|
|
para_idx = 0
|
|
tbl_idx = 0
|
|
|
|
for child in self._doc.element.body:
|
|
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
|
|
|
if tag == "p":
|
|
para = Paragraph(child, self._doc)
|
|
|
|
if self._heading_level(para) is not None:
|
|
heading_text = para.text.strip()
|
|
if heading_text: # ignore empty heading-like paragraphs
|
|
if blocks or section_images:
|
|
sections.append({
|
|
"source": current_source,
|
|
"blocks": blocks,
|
|
"images": list(section_images),
|
|
})
|
|
blocks = []
|
|
section_images = []
|
|
para_idx = 0
|
|
tbl_idx = 0
|
|
current_source = heading_text
|
|
continue
|
|
|
|
text = para.text.strip()
|
|
|
|
# Scan for images — append [[IMAGE:rid]] markers
|
|
for run in para.runs:
|
|
for rid in self._images_in(run._element):
|
|
text += f" [[IMAGE:{rid}]]"
|
|
section_images.append(rid)
|
|
image_sources[rid] = {"section": current_source}
|
|
|
|
if text.strip():
|
|
blocks.append({"type": "para", "index": para_idx + 1, "text": text.strip()})
|
|
para_idx += 1
|
|
|
|
elif tag == "tbl":
|
|
tbl_idx += 1
|
|
table = Table(child, self._doc)
|
|
|
|
# Collect all rows as [[cell_text, ...], ...]
|
|
all_rows: list[list[str]] = []
|
|
all_images: list[list[list[str]]] = [] # row → col → [rids]
|
|
for row in table.rows:
|
|
row_texts: list[str] = []
|
|
row_cell_images: list[list[str]] = []
|
|
for cell in row.cells:
|
|
cell_text = cell.text.strip()
|
|
cell_imgs: list[str] = []
|
|
for cp in cell.paragraphs:
|
|
for run in cp.runs:
|
|
for rid in self._images_in(run._element):
|
|
cell_imgs.append(rid)
|
|
# Replace images with markers in text
|
|
for rid in cell_imgs:
|
|
cell_text += f" [[IMAGE:{rid}]]"
|
|
section_images.append(rid)
|
|
row_texts.append(cell_text.strip())
|
|
row_cell_images.append(cell_imgs)
|
|
if any(row_texts) or any(row_cell_images):
|
|
all_rows.append(row_texts)
|
|
all_images.append(row_cell_images)
|
|
|
|
if len(all_rows) >= 2:
|
|
# Heuristic: first row is a header if every cell is short
|
|
first_row = all_rows[0]
|
|
has_header = all(len(c) < self.HEADER_CELL_MAX_LEN for c in first_row)
|
|
if has_header:
|
|
headers = first_row
|
|
data_rows_slice = zip(all_rows[1:], all_images[1:])
|
|
else:
|
|
headers = [f"列{ci + 1}" for ci in range(len(first_row))]
|
|
data_rows_slice = zip(all_rows, all_images)
|
|
|
|
data_rows: list[dict] = []
|
|
|
|
for ri, (row_data, row_imgs) in enumerate(data_rows_slice):
|
|
columns: list[dict] = []
|
|
max_cols = max(len(headers), len(row_data))
|
|
for ci in range(max_cols):
|
|
hdr = headers[ci] if ci < len(headers) else ""
|
|
txt = row_data[ci] if ci < len(row_data) else ""
|
|
columns.append({
|
|
"name": hdr,
|
|
"row": ri + 1,
|
|
"col": ci + 1,
|
|
"text": txt,
|
|
})
|
|
|
|
# Register image sources with structured location
|
|
imgs = row_imgs[ci] if ci < len(row_imgs) else []
|
|
for rid in imgs:
|
|
image_sources[rid] = {
|
|
"section": current_source,
|
|
"table": tbl_idx,
|
|
"row": ri + 1,
|
|
"column": ci + 1,
|
|
"name": hdr,
|
|
}
|
|
|
|
data_rows.append({"columns": columns})
|
|
|
|
blocks.append({
|
|
"type": "table",
|
|
"table": tbl_idx,
|
|
"headers": headers,
|
|
"rows": data_rows,
|
|
})
|
|
elif all_rows:
|
|
# Degenerate table (only header or single row) — treat as plain rows
|
|
for ri, row_data in enumerate(all_rows):
|
|
row_text = " | ".join(row_data)
|
|
if row_text.strip():
|
|
blocks.append({
|
|
"type": "para",
|
|
"index": para_idx + 1,
|
|
"text": row_text,
|
|
})
|
|
para_idx += 1
|
|
|
|
if blocks or section_images:
|
|
sections.append({
|
|
"source": current_source,
|
|
"blocks": blocks,
|
|
"images": list(section_images),
|
|
})
|
|
|
|
return sections, image_sources
|
|
|
|
# ---- internals ----------------------------------------------------------
|
|
|
|
def _heading_level(self, para: Paragraph) -> int | None:
|
|
"""Heading level 1-9, or *None* if not a heading."""
|
|
if para.style and para.style.name:
|
|
name = para.style.name
|
|
for prefix in ("Heading", "标题"):
|
|
if name.startswith(prefix):
|
|
try:
|
|
return int(name.split()[-1])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
pPr = para._element.find(f"{{{self.WML_NS}}}pPr")
|
|
if pPr is not None:
|
|
ol = pPr.find(f"{{{self.WML_NS}}}outlineLvl")
|
|
if ol is not None:
|
|
val = ol.get(f"{{{self.WML_NS}}}val")
|
|
if val is not None:
|
|
try:
|
|
return int(val) + 1
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
def _images_in(self, element) -> list[str]:
|
|
"""Return rId values for drawings embedded in *element*."""
|
|
rids: list[str] = []
|
|
for drawing in element.findall(f".//{{{self.WML_NS}}}drawing"):
|
|
blip = drawing.find(f".//{{{self.DRAW_NS}}}blip")
|
|
if blip is not None:
|
|
rid = blip.get(f"{{{self.REL_NS}}}embed")
|
|
if rid:
|
|
rids.append(rid)
|
|
return rids
|