import logging import os from docx import Document from docx.table import Table from docx.text.paragraph import Paragraph logger = logging.getLogger(__name__) IMAGE_EXT = { "image/png": ".png", "image/jpeg": ".jpg", "image/gif": ".gif", "image/bmp": ".bmp", "image/tiff": ".tiff", "image/webp": ".webp", "image/x-emf": ".emf", "image/x-wmf": ".wmf", "image/svg+xml": ".svg", } class WordParser: """Parse a .docx file — extract images, split body into sections. Usage:: parser = WordParser("doc.docx") parser.extract_images("images/") sections, image_sources = parser.extract_sections() """ HEADER_CELL_MAX_LEN = 20 # max chars per cell to treat first row as header WML_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main" DRAW_NS = "http://schemas.openxmlformats.org/drawingml/2006/main" REL_NS = "http://schemas.openxmlformats.org/officeDocument/2006/relationships" def __init__(self, docx_path: str): if not os.path.isfile(docx_path): raise FileNotFoundError(f"Document not found: {docx_path}") self._doc = Document(docx_path) # ---- public API --------------------------------------------------------- def extract_images(self, images_dir: str) -> list[dict]: """Save all images to *images_dir*. Returns ``[{rid, path}, ...]``.""" os.makedirs(images_dir, exist_ok=True) images: list[dict] = [] for rel in self._doc.part.rels.values(): if "image" not in rel.reltype: continue ext = IMAGE_EXT.get(rel.target_part.content_type, ".png") name = f"image_{rel.rId}{ext}" path = os.path.join(images_dir, name) with open(path, "wb") as f: f.write(rel.target_part.blob) images.append({"rid": rel.rId, "path": path}) return images def extract_sections(self) -> tuple[list[dict], dict[str, dict]]: """Walk document body and split into sections by heading. Returns: *sections* — ``[{source, blocks, images}, ...]`` Each block is ``{type, index, text}`` (paragraph) or ``{type, table, headers, rows}`` (table). *image_sources* — ``rid → {section, table?, row?, column?, name?}`` """ sections: list[dict] = [] current_source = "" blocks: list[dict] = [] section_images: list[str] = [] image_sources: dict[str, dict] = {} para_idx = 0 tbl_idx = 0 for child in self._doc.element.body: tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag if tag == "p": para = Paragraph(child, self._doc) if self._heading_level(para) is not None: heading_text = para.text.strip() if heading_text: # ignore empty heading-like paragraphs if blocks or section_images: sections.append({ "source": current_source, "blocks": blocks, "images": list(section_images), }) blocks = [] section_images = [] para_idx = 0 tbl_idx = 0 current_source = heading_text continue text = para.text.strip() # Scan for images — append [[IMAGE:rid]] markers for run in para.runs: for rid in self._images_in(run._element): text += f" [[IMAGE:{rid}]]" section_images.append(rid) image_sources[rid] = {"section": current_source} if text.strip(): blocks.append({"type": "para", "index": para_idx + 1, "text": text.strip()}) para_idx += 1 elif tag == "tbl": tbl_idx += 1 table = Table(child, self._doc) # Collect all rows as [[cell_text, ...], ...] all_rows: list[list[str]] = [] all_images: list[list[list[str]]] = [] # row → col → [rids] for row in table.rows: row_texts: list[str] = [] row_cell_images: list[list[str]] = [] for cell in row.cells: cell_text = cell.text.strip() cell_imgs: list[str] = [] for cp in cell.paragraphs: for run in cp.runs: for rid in self._images_in(run._element): cell_imgs.append(rid) # Replace images with markers in text for rid in cell_imgs: cell_text += f" [[IMAGE:{rid}]]" section_images.append(rid) row_texts.append(cell_text.strip()) row_cell_images.append(cell_imgs) if any(row_texts) or any(row_cell_images): all_rows.append(row_texts) all_images.append(row_cell_images) if len(all_rows) >= 2: # Heuristic: first row is a header if every cell is short first_row = all_rows[0] has_header = all(len(c) < self.HEADER_CELL_MAX_LEN for c in first_row) if has_header: headers = first_row data_rows_slice = zip(all_rows[1:], all_images[1:]) else: headers = [f"列{ci + 1}" for ci in range(len(first_row))] data_rows_slice = zip(all_rows, all_images) data_rows: list[dict] = [] for ri, (row_data, row_imgs) in enumerate(data_rows_slice): columns: list[dict] = [] max_cols = max(len(headers), len(row_data)) for ci in range(max_cols): hdr = headers[ci] if ci < len(headers) else "" txt = row_data[ci] if ci < len(row_data) else "" columns.append({ "name": hdr, "row": ri + 1, "col": ci + 1, "text": txt, }) # Register image sources with structured location imgs = row_imgs[ci] if ci < len(row_imgs) else [] for rid in imgs: image_sources[rid] = { "section": current_source, "table": tbl_idx, "row": ri + 1, "column": ci + 1, "name": hdr, } data_rows.append({"columns": columns}) blocks.append({ "type": "table", "table": tbl_idx, "headers": headers, "rows": data_rows, }) elif all_rows: # Degenerate table (only header or single row) — treat as plain rows for ri, row_data in enumerate(all_rows): row_text = " | ".join(row_data) if row_text.strip(): blocks.append({ "type": "para", "index": para_idx + 1, "text": row_text, }) para_idx += 1 if blocks or section_images: sections.append({ "source": current_source, "blocks": blocks, "images": list(section_images), }) return sections, image_sources # ---- internals ---------------------------------------------------------- def _heading_level(self, para: Paragraph) -> int | None: """Heading level 1-9, or *None* if not a heading.""" if para.style and para.style.name: name = para.style.name for prefix in ("Heading", "标题"): if name.startswith(prefix): try: return int(name.split()[-1]) except (ValueError, IndexError): pass pPr = para._element.find(f"{{{self.WML_NS}}}pPr") if pPr is not None: ol = pPr.find(f"{{{self.WML_NS}}}outlineLvl") if ol is not None: val = ol.get(f"{{{self.WML_NS}}}val") if val is not None: try: return int(val) + 1 except ValueError: pass return None def _images_in(self, element) -> list[str]: """Return rId values for drawings embedded in *element*.""" rids: list[str] = [] for drawing in element.findall(f".//{{{self.WML_NS}}}drawing"): blip = drawing.find(f".//{{{self.DRAW_NS}}}blip") if blip is not None: rid = blip.get(f"{{{self.REL_NS}}}embed") if rid: rids.append(rid) return rids