什么是 ‘Unstructured’ 数据预处理?解析如何从扫描版 PDF、PPT 和 Word 中提取干净的 Markdown

各位同学,大家好。

在当今数据爆炸的时代,我们面临着一个核心挑战:绝大多数有价值的信息并非以结构化数据库的形式存在,而是散落在各种非结构化文档中,例如合同、报告、演示文稿,甚至是扫描件。这些文档承载着企业的智慧、历史的记录和决策的关键。然而,它们的“自由格式”特性,使得直接利用它们进行分析、搜索或自动化变得异常困难。

今天,我们将深入探讨“非结构化数据预处理”这一主题,特别是如何将最常见的非结构化文档——扫描版 PDF、PPT 和 Word——转化为干净、易于处理的 Markdown 格式。这不仅仅是一个技术操作,更是一项艺术,它要求我们理解文档的内在结构,并利用编程的魔力将其重塑。

1. 非结构化数据:挑战与机遇

1.1 什么是非结构化数据?

非结构化数据是指那些不遵循预定义数据模型或模式的数据。它们通常是文本密集型,包含日期、数字和事实,但这些信息没有以易于机器读取的方式组织。

常见形式包括:

  • 文本文件: 电子邮件、聊天记录、社交媒体帖子、文章、报告。
  • 文档: PDF、Word、PPT、Excel(部分内容,如单元格注释)。
  • 图像和视频: 包含文本、元数据。
  • 音频: 语音转文本后的数据。

1.2 为什么需要预处理?

想象一下,你需要从成千上万份法律合同中提取所有当事人名称和日期。如果这些合同是扫描版的 PDF,你无法直接搜索或使用传统数据库查询。非结构化数据预处理的目标就是:

  1. 可访问性: 将数据从难以处理的格式(如图片中的文字)转换为可编程访问的格式(如纯文本)。
  2. 标准化: 消除格式、编码、拼写等不一致性,确保数据的一致性。
  3. 结构化: 从自由格式的文本中识别并提取出有意义的结构,如标题、段落、列表、表格。
  4. 去噪: 移除无关信息,如页眉、页脚、广告、水印等。
  5. 为后续处理铺路: 清理后的数据可以用于信息抽取、文本挖掘、机器学习、搜索引擎索引等更高级的应用。

1.3 为什么选择 Markdown 作为目标格式?

Markdown 是一种轻量级标记语言,它允许人们使用易读易写的纯文本格式编写文档,然后可以将其转换为结构化的 HTML 或其他格式。选择 Markdown 有以下几个关键优势:

  • 简洁性与可读性: 语法简单直观,即使是原始 Markdown 文件也具有良好的可读性。
  • 通用性与移植性: 几乎所有文本编辑器和文档系统都支持 Markdown,方便跨平台使用和共享。
  • 结构化潜力: 虽然是纯文本,但其标记(如 # 代表标题,- 代表列表,| 代表表格)本身就蕴含了结构信息,便于程序进一步解析。
  • 利于版本控制: 纯文本格式非常适合 Git 等版本控制系统,易于比较差异。
  • 聚焦内容: Markdown 鼓励作者关注内容本身,而不是复杂的格式。

我们的目标,就是将各种复杂的文档格式,通过一系列技术手段,提炼成这种既包含结构又保持简洁的 Markdown 文本。

2. 非结构化数据预处理的核心流程与工具栈

无论源数据格式如何,非结构化数据预处理通常遵循一个通用流程:

  1. 数据采集 (Acquisition): 获取原始文件。
  2. 初始解析与提取 (Parsing & Extraction): 从文件容器中提取出原始文本、图像或其他内容。
  3. 内容清理与规范化 (Cleaning & Normalization): 处理提取出的内容,去除噪音,统一格式。
  4. 结构推理 (Structure Inference): 识别文本中的逻辑结构,如标题、段落、列表、表格、代码块等。
  5. 格式转换 (Transformation): 将推理出的结构映射到目标 Markdown 语法。

2.1 Python 工具生态概览

Python 因其丰富的库和强大的文本处理能力,成为非结构化数据预处理的首选语言。

核心库分类:

类别 功能 推荐库
PDF 处理 文本提取、页面操作、OCR PyPDF2 (文本提取, 合并分割), pypdf (PyPDF2 现代替代), pdfminer.six (高级文本及布局提取), tabula-py (表格提取), pytesseract (Tesseract OCR 封装), ocrmypdf (增强型 OCR)
Word 处理 .docx 文件内容提取 python-docx (解析 .docx)
PPT 处理 .pptx 文件内容提取 python-pptx (解析 .pptx)
图像处理 OCR 预处理、图像操作 Pillow (PIL fork), OpenCV (高级图像处理)
文本处理 字符串操作、正则表达式、自然语言处理 内置 str 方法, re 模块, NLTK, spaCy (高级 NLP, 分词、命名实体识别等)
Markdown 生成 辅助 Markdown 语法生成 自定义函数, markdown-it-py (Markdown 解析器,也可用于生成), commonmark (CommonMark 标准实现)

在接下来的部分,我们将针对不同类型的源文档,详细阐述这些工具的具体应用。

3. 从扫描版 PDF 中提取干净的 Markdown

扫描版 PDF 是非结构化数据处理中最具挑战性的一种。它本质上是文本的图像,而不是可直接选择、复制的文本。这意味着我们必须借助光学字符识别(OCR)技术将其转换为文本。

3.1 扫描版 PDF 的挑战

  • 文本不可选: 无法直接复制粘贴。
  • 图像质量问题: 低分辨率、歪斜、模糊、噪声、手写体等都会严重影响 OCR 准确性。
  • 布局复杂性: 多栏、表格、图文混排、页眉页脚等使得文本提取和结构识别更加困难。
  • 语言多样性: 不同语言需要不同的 OCR 模型。

3.2 OCR 预处理与执行

在执行 OCR 之前,对图像进行预处理可以显著提高识别准确率。

常见预处理步骤:

  1. 图像加载: 使用 PillowOpenCV
  2. 去倾斜 (Deskewing): 纠正倾斜的页面。
  3. 去噪 (Denoising): 移除图像中的噪点。
  4. 二值化 (Binarization): 将彩色或灰度图像转换为纯黑白,增强文本与背景对比度。
  5. 边缘增强 (Sharpening): 锐化文本边缘。

OCR 引擎:Tesseract

Tesseract 是一个强大的开源 OCR 引擎,由 Google 维护。pytesseract 是其 Python 封装。

首先,你需要安装 Tesseract OCR 引擎本身(例如在 Ubuntu 上 sudo apt install tesseract-ocr,在 Windows 上下载安装包),并安装 pytesseractPillow
pip install pytesseract Pillow

import pytesseract
from PIL import Image
import os

# 配置 Tesseract 路径 (如果 Tesseract 不在系统 PATH 中,Windows 用户可能需要)
# pytesseract.pytesseract.tesseract_cmd = r'C:Program FilesTesseract-OCRtesseract.exe'

def ocr_image_to_text(image_path: str, lang: str = 'chi_sim+eng') -> str:
    """
    使用 Tesseract 对图像进行 OCR,提取文本。
    :param image_path: 图像文件路径。
    :param lang: OCR 识别语言。默认为简体中文和英文。
    :return: 提取出的文本。
    """
    try:
        img = Image.open(image_path)
        # 简单预处理:转换为灰度图,有助于 Tesseract
        img = img.convert('L')
        text = pytesseract.image_to_string(img, lang=lang)
        return text
    except Exception as e:
        print(f"OCR failed for {image_path}: {e}")
        return ""

def ocr_pdf_to_text(pdf_path: str, output_dir: str = 'ocr_temp', lang: str = 'chi_sim+eng') -> str:
    """
    将扫描版 PDF 转换为文本,通过先将 PDF 页面转换为图像再进行 OCR。
    注意:此方法效率较低,且无法保留布局信息。更推荐使用 ocrmypdf。
    :param pdf_path: PDF 文件路径。
    :param output_dir: 临时图像输出目录。
    :param lang: OCR 识别语言。
    :return: 提取出的所有文本。
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    full_text = []
    try:
        from pdf2image import convert_from_path
        # 需要安装 poppler:conda install -c conda-forge poppler 或下载安装包
        # pip install pdf2image

        print(f"Converting PDF '{pdf_path}' to images...")
        images = convert_from_path(pdf_path) # 默认DPI 200,可调整 dpi=300 等

        for i, image in enumerate(images):
            image_filename = os.path.join(output_dir, f'page_{i+1}.png')
            image.save(image_filename, 'PNG')
            print(f"OCR-ing page {i+1}...")
            page_text = ocr_image_to_text(image_filename, lang=lang)
            full_text.append(page_text)
            # os.remove(image_filename) # 可以选择删除临时图片

    except ImportError:
        print("Error: 'pdf2image' library not found. Please install it: pip install pdf2image")
        print("       Also, ensure 'poppler' is installed (e.g., via conda or system package manager).")
        return ""
    except Exception as e:
        print(f"Error processing PDF '{pdf_path}': {e}")
        return ""
    finally:
        # 清理临时目录,如果需要
        # import shutil
        # shutil.rmtree(output_dir)
        pass

    return "n".join(full_text)

# 示例使用 (假设存在一个扫描版 PDF: scanned_document.pdf)
# if __name__ == "__main__":
#     # 创建一个假的扫描版PDF文件用于测试,实际中你需要一个真实的扫描版PDF
#     # 例如,用画图工具写一些字,然后保存为图片,再将图片转换为PDF
#     # 或者直接使用一个真实的扫描版PDF
#     # with open('scanned_document.pdf', 'w') as f:
#     #     f.write("This is a dummy scanned PDF content.nHello World!")

#     # text_from_scanned_pdf = ocr_pdf_to_text('scanned_document.pdf')
#     # print("--- Extracted Text from Scanned PDF ---")
#     # print(text_from_scanned_pdf)
#     pass

3.3 增强型 OCR:ocrmypdf

直接使用 pytesseract 处理 PDF 需要手动将 PDF 转换为图片,再逐页 OCR,效率低且无法直接生成可搜索的 PDF。ocrmypdf 是一个更专业的工具,它在 Tesseract 的基础上提供了更多功能:

  • 直接处理 PDF: 无需手动转换图片。
  • 生成可搜索 PDF: 在原始 PDF 图像层下方添加文本层,使 PDF 可搜索。
  • 优化图像: 自动进行去倾斜、去噪等预处理。
  • 并行处理: 提高大型文档的处理速度。
  • 保留原始布局: 尽可能保持文本在页面上的相对位置。

ocrmypdf 是一个命令行工具,但可以在 Python 中通过 subprocess 调用。
首先,你需要安装 ocrmypdf(例如 pip install ocrmypdf,它会自动安装 Tesseract 的 Python 依赖)。

import subprocess
import os
import PyPDF2 # 用于读取 ocrmypdf 生成的 PDF 中的文本

def ocr_pdf_with_ocrmypdf(input_pdf_path: str, output_pdf_path: str, lang: str = 'chi_sim+eng') -> str:
    """
    使用 ocrmypdf 对 PDF 进行 OCR,生成可搜索 PDF,并从中提取文本。
    :param input_pdf_path: 输入 PDF 文件路径。
    :param output_pdf_path: 输出可搜索 PDF 文件路径。
    :param lang: OCR 识别语言。
    :return: 提取出的文本。
    """
    try:
        print(f"Running ocrmypdf on '{input_pdf_path}'...")
        # ocrmypdf 的命令行参数:
        # -l <lang>: 指定语言
        # --skip-text: 如果 PDF 已经有文本层,跳过 OCR
        # --force-ocr: 强制进行 OCR,即使 PDF 已经有文本层
        # --output-type pdf: 输出为 PDF
        # --deskew: 自动去倾斜
        # --clean: 清理页面
        command = [
            'ocrmypdf',
            '-l', lang,
            '--deskew',
            '--clean',
            input_pdf_path,
            output_pdf_path
        ]

        # 执行命令行
        result = subprocess.run(command, capture_output=True, text=True, check=True)
        print("ocrmypdf stdout:", result.stdout)
        if result.stderr:
            print("ocrmypdf stderr:", result.stderr)

        print(f"OCR successful. Output saved to '{output_pdf_path}'. Now extracting text...")

        # 从生成的 PDF 中提取文本
        with open(output_pdf_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            full_text = []
            for page in reader.pages:
                full_text.append(page.extract_text())
            return "n".join(full_text)

    except subprocess.CalledProcessError as e:
        print(f"ocrmypdf failed with error code {e.returncode}:")
        print("stdout:", e.stdout)
        print("stderr:", e.stderr)
        return ""
    except FileNotFoundError:
        print("Error: ocrmypdf command not found. Please ensure it is installed and in your system PATH.")
        return ""
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return ""

# 示例使用 (需要一个真实的扫描版 PDF)
# if __name__ == "__main__":
#     input_pdf = 'scanned_report.pdf' # 替换为你的扫描版PDF路径
#     output_pdf = 'scanned_report_ocr.pdf'
#     if os.path.exists(input_pdf):
#         extracted_text = ocr_pdf_with_ocrmypdf(input_pdf, output_pdf)
#         print("n--- Extracted Text from ocrmypdf processed PDF ---")
#         print(extracted_text[:1000]) # 打印前1000字符
#     else:
#         print(f"Error: Input PDF '{input_pdf}' not found.")

3.4 提取后的文本清理与结构化(针对 OCR 文本)

OCR 提取的文本通常包含大量噪声,需要进一步清理和结构化。

常见的清理任务:

  • 去除冗余换行和空格: OCR 结果可能有多余的换行符或空格。
  • 合并断字: 识别并合并因行尾断开的单词(如 pre-nprocessing -> preprocessing)。
  • 去除页眉页脚: 这些内容在每页重复,需要通过模式匹配或位置信息去除。
  • 处理乱码/低置信度字符: OCR 引擎可能会输出一些无法识别的字符。
  • 识别标题、段落、列表: 这需要基于字体大小、加粗、缩进等启发式规则。

Markdown 转换策略:

  1. 分段: 连续的非空行块通常形成一个段落。
  2. 标题识别: 较大的、居中的、加粗的文本可能是标题。由于 OCR 无法提供字体信息,这通常需要依赖于文本的视觉特征(如行内空白,是否全大写,是否位于页面顶部)。
  3. 列表识别: 识别常见的列表符号(-, *, 1., a.)和缩进。
  4. 表格识别: OCR 很难直接识别表格结构。如果 PDF 中有表格,可能需要专门的表格 OCR 工具(如 camelottabula-py,尽管它们对扫描版表格支持有限)。对于纯文本 OCR 结果,可能需要复杂的正则表达式或机器学习来推断表格。
import re

def clean_ocr_text(text: str) -> str:
    """
    清理 OCR 结果中常见的噪声。
    :param text: 原始 OCR 文本。
    :return: 清理后的文本。
    """
    # 1. 移除常见低置信度字符或乱码 (根据实际情况调整)
    text = re.sub(r'[^ws.,;!?'"(){}[]-+/\<>=_`~@#$%^&*]', '', text) # 移除大部分非ASCII字符和一些特殊字符

    # 2. 合并断字 (例如 "pre-nprocessing" -> "preprocessing")
    text = re.sub(r'(w+)-s*n(w+)', r'12', text) # 查找单词-换行-单词,合并

    # 3. 替换多个连续的换行符为最多两个 (表示段落分隔)
    text = re.sub(r'n{3,}', 'nn', text)

    # 4. 移除行首尾的空白字符
    text = 'n'.join([line.strip() for line in text.split('n')])

    # 5. 移除页眉页脚 (这需要更复杂的逻辑,例如根据行号、固定文本模式等)
    # 假设页眉是每页第一行,页脚是每页最后一行,且内容重复
    # 这是一个非常简化的例子,实际情况需要更智能的检测
    # lines = text.split('n')
    # if len(lines) > 10: # 假设文档至少有10行
    #     header_candidate = lines[0]
    #     footer_candidate = lines[-1]
    #     # 简单的重复模式检测
    #     if lines[10].startswith(header_candidate[:5]): # 假设页眉至少5个字符重复
    #         text = "n".join([line for i, line in enumerate(lines) if not line.startswith(header_candidate)])
    #     # 重新分割处理页脚
    #     lines = text.split('n')
    #     if lines[-10].endswith(footer_candidate[-5:]):
    #         text = "n".join([line for i, line in enumerate(lines) if not line.endswith(footer_candidate)])

    return text.strip()

def ocr_text_to_markdown(ocr_text: str) -> str:
    """
    将清理后的 OCR 文本转换为 Markdown 格式。
    这部分对 OCR 文本的结构化推断是启发式的,因为缺乏原始格式信息。
    :param ocr_text: 清理后的文本。
    :return: Markdown 格式文本。
    """
    markdown_lines = []
    lines = ocr_text.split('n')

    # 简单的标题和列表识别启发式
    for i, line in enumerate(lines):
        stripped_line = line.strip()

        if not stripped_line:
            continue

        # 尝试识别标题:简单启发式,如全大写、较短、行之间有空白
        # 这非常粗糙,实际应用需更复杂的规则
        if len(stripped_line) < 80 and stripped_line.isupper() and 
           (i == 0 or not lines[i-1].strip()) and 
           (i == len(lines) - 1 or not lines[i+1].strip()):
            markdown_lines.append(f"## {stripped_line}n") # 假定为二级标题
        # 尝试识别列表:以特定符号开头的行
        elif stripped_line.startswith(('-', '*', '+', '1.', 'a.')):
            # 进一步处理缩进列表,但这里简化
            markdown_lines.append(f"- {stripped_line.lstrip('-*+1.a. ').strip()}")
        else:
            # 普通段落
            markdown_lines.append(stripped_line)

    # 合并相邻的段落文本,用两个换行符分隔
    final_markdown = []
    current_paragraph = []
    for line in markdown_lines:
        if line.startswith('#') or line.startswith('-'): # 标题或列表,独立成行
            if current_paragraph:
                final_markdown.append(" ".join(current_paragraph))
                current_paragraph = []
            final_markdown.append(line)
        elif not line.strip(): # 空行表示段落结束
            if current_paragraph:
                final_markdown.append(" ".join(current_paragraph))
                current_paragraph = []
            final_markdown.append('') # 保留空行作为段落分隔
        else:
            current_paragraph.append(line.strip())

    if current_paragraph: # 处理最后一个段落
        final_markdown.append(" ".join(current_paragraph))

    return "nn".join(filter(None, final_markdown)) # 移除连续空行

# 结合使用
# if __name__ == "__main__":
#     # 假设 ocr_text 是从 scanned_report_ocr.pdf 提取的文本
#     sample_ocr_text = """
#     REPORT TITLE
#     A Summary of Findings

#     This is the first paragraph of the report. It contains some important
#     information about the project.

#     SECTION ONE: INTRODUCTION
#     1. Initial Analysis
#     - Data Collection Phase
#     - Data Cleaning Process
#     2. Key Observations
#     * Observation A
#     * Observation B

#     Further details are provided below.

#     FINAL REMARKS
#     Thank you for your attention.
#     """
#     cleaned_text = clean_ocr_text(sample_ocr_text)
#     markdown_output = ocr_text_to_markdown(cleaned_text)
#     print("n--- Converted Markdown from OCR Text ---")
#     print(markdown_output)

注意: OCR 文本的结构化推理非常困难且容易出错,因为它丢失了原始文档的视觉和逻辑结构信息(如字体、字号、颜色、精确位置)。上述 ocr_text_to_markdown 函数是一个非常简化的启发式示例,对于复杂文档可能需要结合更高级的布局分析(例如使用 pdfminer.six 提取坐标信息来推断结构,即使是扫描版 PDF,如果经过 ocrmypdf 处理,也可以尝试通过 PyPDF2pypdf 提取其文本层中的坐标信息,但这超出本节的范围)。

4. 从数字版 PDF 中提取干净的 Markdown

数字版 PDF 与扫描版 PDF 的根本区别在于它包含了可直接提取的文本和字体、位置等元数据。这使得结构化提取变得更加可行。

4.1 数字版 PDF 的挑战

  • 布局复杂: 文本可能不是逻辑上连续的,而是由许多独立定位的文本块组成。
  • 格式丢失: PDF 本身是显示格式,不直接存储“标题”或“段落”的概念。需要从字体大小、加粗、位置等推断。
  • 表格提取: 表格结构需要专门的算法来识别和提取。
  • 多语言支持: 确保正确处理不同编码的文本。

4.2 文本和布局信息提取:pdfminer.six

pdfminer.six 是一个强大的 PDF 解析库,它不仅能提取文本,还能提供文本的字体、大小、位置(坐标)等详细信息,这对于推断文档结构至关重要。

pip install pdfminer.six

from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer, LTChar, LTFigure, LTImage, LTTextBox, LTRect, LTCurve
import re

def extract_layout_info_from_pdf(pdf_path: str) -> list:
    """
    从数字版 PDF 中提取文本及其布局信息。
    :param pdf_path: PDF 文件路径。
    :return: 包含每页文本元素的列表,每个元素是一个字典,包含文本、字体大小、是否加粗等。
    """
    all_elements = []

    for page_layout in extract_pages(pdf_path):
        page_elements = []
        for element in page_layout:
            if isinstance(element, LTTextContainer):
                # LTTextContainer 可以是 LTTextBoxHorizontal, LTTextBoxVertical 等
                # 我们需要遍历其内部的行和字符来获取更细粒度的信息

                # 尝试获取字体大小和是否加粗 (这通常通过检查第一个字符的属性来近似)
                font_size = None
                is_bold = False

                # 获取文本内容
                text_content = element.get_text().strip()

                if text_content:
                    # 尝试从第一个字符获取字体信息
                    for text_line in element:
                        for char in text_line:
                            if isinstance(char, LTChar):
                                font_size = char.size
                                # 简单判断是否加粗:通常字体名称包含 "Bold" 或 "Bd"
                                if 'bold' in char.fontname.lower() or 'bd' in char.fontname.lower():
                                    is_bold = True
                                break # 只取第一个字符的属性作为代表
                        if font_size is not None:
                            break

                    page_elements.append({
                        'text': text_content,
                        'x0': element.x0, 'y0': element.y0, 'x1': element.x1, 'y1': element.y1,
                        'font_size': font_size,
                        'is_bold': is_bold,
                        'type': 'text'
                    })
            # 可以添加处理图像、线条等其他元素,但我们主要关注文本
            # elif isinstance(element, LTFigure):
            #     page_elements.append({'type': 'figure', 'x0': element.x0, 'y0': element.y0, 'x1': element.x1, 'y1': element.y1})
            # elif isinstance(element, LTImage):
            #     page_elements.append({'type': 'image', 'x0': element.x0, 'y0': element.y0, 'x1': element.x1, 'y1': element.y1})

        # 将页面元素按其在页面上的垂直位置排序,以便按阅读顺序处理
        page_elements.sort(key=lambda x: -x['y0']) # 从上到下
        all_elements.append(page_elements)

    return all_elements

def pdf_layout_to_markdown(layout_info: list) -> str:
    """
    将提取的 PDF 布局信息转换为 Markdown 格式。
    :param layout_info: extract_layout_info_from_pdf 的输出。
    :return: Markdown 格式文本。
    """
    markdown_output = []

    # 定义一些启发式规则
    # 假设最大的字体是标题1,次大的字体是标题2,等等
    # 实际应用中需要根据文档的具体样式来调整这些阈值
    font_size_thresholds = {
        'h1': 24,
        'h2': 18,
        'h3': 14,
        'normal': 10
    }

    # 动态检测字体大小范围 (更健壮的方式)
    all_font_sizes = []
    for page in layout_info:
        for element in page:
            if element['type'] == 'text' and element['font_size']:
                all_font_sizes.append(element['font_size'])

    if all_font_sizes:
        unique_font_sizes = sorted(list(set(all_font_sizes)), reverse=True)
        if len(unique_font_sizes) >= 1: font_size_thresholds['h1'] = unique_font_sizes[0] * 0.9
        if len(unique_font_sizes) >= 2: font_size_thresholds['h2'] = unique_font_sizes[1] * 0.9
        if len(unique_font_sizes) >= 3: font_size_thresholds['h3'] = unique_font_sizes[2] * 0.9
        if len(unique_font_sizes) >= 4: font_size_thresholds['normal'] = unique_font_sizes[-1] * 0.9

    last_was_list = False
    last_indent = 0 # 跟踪列表缩进

    for page_num, page_elements in enumerate(layout_info):
        if page_num > 0:
            markdown_output.append("---n") # 分页符,可选

        for i, element in enumerate(page_elements):
            if element['type'] == 'text':
                text = element['text']
                font_size = element['font_size']
                is_bold = element['is_bold']

                # 清理文本,去除多余空白
                text = re.sub(r's+', ' ', text).strip()
                if not text:
                    continue

                # 尝试识别标题
                if font_size >= font_size_thresholds['h1'] and is_bold:
                    markdown_output.append(f"# {text}n")
                    last_was_list = False
                elif font_size >= font_size_thresholds['h2'] and is_bold:
                    markdown_output.append(f"## {text}n")
                    last_was_list = False
                elif font_size >= font_size_thresholds['h3'] and is_bold:
                    markdown_output.append(f"### {text}n")
                    last_was_list = False
                # 尝试识别列表
                elif re.match(r'^s*[-*+]?s*d+.s+.*', text) or re.match(r'^s*[-*+]s+.*', text):
                    # 简单的列表检测,可以根据 x0 坐标判断缩进
                    current_indent = element['x0']
                    list_prefix = ''
                    if current_indent > last_indent + 5: # 假设缩进超过5个单位是子列表
                        list_prefix = '  ' * int((current_indent - last_indent) / 5)

                    # 移除列表前缀,例如 "1. " 或 "- "
                    cleaned_text = re.sub(r'^s*[-*+]?s*d+.s+', '', text)
                    cleaned_text = re.sub(r'^s*[-*+]s+', '', cleaned_text)

                    markdown_output.append(f"{list_prefix}- {cleaned_text}n")
                    last_was_list = True
                    last_indent = current_indent
                else:
                    # 普通段落
                    if last_was_list: # 如果上一个是列表,新段落前加空行
                        markdown_output.append("n")
                    markdown_output.append(f"{text}n")
                    last_was_list = False
                    last_indent = 0 # 重置缩进

        markdown_output.append("n") # 页面结束加空行

    # 最后清理并合并段落
    final_output = []
    current_paragraph_lines = []
    for line in markdown_output:
        if line.strip().startswith('#') or line.strip().startswith('-') or line.strip() == '---':
            if current_paragraph_lines:
                final_output.append(" ".join(current_paragraph_lines).strip())
                current_paragraph_lines = []
            final_output.append(line.strip())
        elif not line.strip(): # 空行
            if current_paragraph_lines:
                final_output.append(" ".join(current_paragraph_lines).strip())
                current_paragraph_lines = []
            final_output.append('') # 保留一个空行
        else:
            current_paragraph_lines.append(line.strip())

    if current_paragraph_lines:
        final_output.append(" ".join(current_paragraph_lines).strip())

    # 移除连续多个空行
    cleaned_final_output = []
    last_was_empty = False
    for line in final_output:
        if not line.strip():
            if not last_was_empty:
                cleaned_final_output.append('')
            last_was_empty = True
        else:
            cleaned_final_output.append(line)
            last_was_empty = False

    return "n".join(cleaned_final_output).strip()

# 示例使用 (假设有一个数字版 PDF: digital_document.pdf)
# if __name__ == "__main__":
#     # 创建一个假的数字版PDF用于测试
#     # 实际中你需要一个真实的PDF文件
#     # 例如,用 LibreOffice Writer 或 Word 创建一个包含标题、段落、列表的PDF
#     # 注意:pdfminer.six 不支持直接从字符串创建PDF
#     # input_pdf = 'digital_document.pdf'
#     # if os.path.exists(input_pdf):
#     #     layout_data = extract_layout_info_from_pdf(input_pdf)
#     #     markdown_output = pdf_layout_to_markdown(layout_data)
#     #     print("n--- Converted Markdown from Digital PDF ---")
#     #     print(markdown_output)
#     # else:
#     #     print(f"Error: Input PDF '{input_pdf}' not found. Please create one for testing.")
#     pass

4.3 表格提取:tabula-py

pdfminer.six 可以提供文本块的坐标,但要从这些块中重构表格非常复杂。tabula-py 是一个基于 Java tabula-java 的 Python 封装,专门用于从 PDF 中提取表格。它对于数字版 PDF 中的表格表现优秀。

pip install tabula-py

import tabula
import pandas as pd

def extract_tables_from_pdf(pdf_path: str, pages: str = 'all') -> list[pd.DataFrame]:
    """
    从 PDF 中提取表格。
    :param pdf_path: PDF 文件路径。
    :param pages: 要提取的页面范围,可以是 'all' 或 '1-3,5'。
    :return: 包含 Pandas DataFrame 的列表,每个 DataFrame 代表一个表格。
    """
    try:
        # area='all' 尝试在整个页面寻找表格
        # lattice=True 倾向于识别有明确线条分隔的表格
        # stream=True 倾向于识别没有明确线条,但有规律排列的表格
        # guess=True 尝试自动检测表格区域
        tables = tabula.read_pdf(pdf_path, pages=pages, multiple_tables=True, guess=True, stream=True)
        return tables
    except Exception as e:
        print(f"Error extracting tables from '{pdf_path}': {e}")
        return []

def dataframe_to_markdown_table(df: pd.DataFrame) -> str:
    """
    将 Pandas DataFrame 转换为 Markdown 表格格式。
    :param df: 输入的 DataFrame。
    :return: Markdown 格式的表格字符串。
    """
    if df.empty:
        return ""

    # 获取列名
    headers = [str(col) for col in df.columns]

    # 构建表头行
    header_line = "| " + " | ".join(headers) + " |n"

    # 构建分隔行
    separator_line = "| " + " | ".join(["---" for _ in headers]) + " |n"

    # 构建数据行
    data_lines = []
    for index, row in df.iterrows():
        row_values = [str(cell).replace('n', '<br>') for cell in row.values] # 表格内容换行处理
        data_lines.append("| " + " | ".join(row_values) + " |")

    return header_line + separator_line + "n".join(data_lines) + "n"

# 示例使用
# if __name__ == "__main__":
#     # 假设 digital_document_with_table.pdf 包含表格
#     # input_pdf_with_table = 'digital_document_with_table.pdf'
#     # if os.path.exists(input_pdf_with_table):
#     #     tables = extract_tables_from_pdf(input_pdf_with_table)
#     #     if tables:
#     #         print("n--- Extracted Tables from PDF ---")
#     #         for i, df in enumerate(tables):
#     #             print(f"Table {i+1}:n")
#     #             print(dataframe_to_markdown_table(df))
#     #     else:
#     #         print("No tables found.")
#     # else:
#     #     print(f"Error: Input PDF '{input_pdf_with_table}' not found. Please create one with tables.")
#     pass

结合 pdfminer.sixtabula-py,我们可以更全面地从数字版 PDF 中提取文本和表格。实际工作中,需要先用 pdfminer.six 获取文本流,再用 tabula-py 识别并去除文本流中的表格区域,避免重复提取。

5. 从 Word 文档 (.docx) 中提取干净的 Markdown

Word 文档(.docx 格式)是 XML 文件的压缩包,其内部结构比 PDF 更接近语义。python-docx 库提供了一个方便的 API 来访问 .docx 文件的内容和样式信息。

5.1 Word 文档的挑战

  • 样式多样性: 用户可以自定义各种样式(标题、列表、正文),需要正确映射到 Markdown。
  • 复杂内容: 嵌入的图片、图表、批注、修订等。
  • 格式嵌套: 文本可能同时具有粗体、斜体等多种格式。

5.2 使用 python-docx 提取内容

pip install python-docx

from docx import Document
from docx.opc.exceptions import PackageNotFoundError

def extract_docx_content(docx_path: str) -> list:
    """
    从 .docx 文件中提取内容,包括段落、列表、表格及其样式信息。
    :param docx_path: .docx 文件路径。
    :return: 包含文档元素的列表,每个元素是一个字典。
    """
    document_elements = []
    try:
        document = Document(docx_path)

        for element in document.element.body:
            if element.tag.endswith('p'):  # 段落
                para = document.paragraphs[document.element.body.index(element)]
                text = ""
                # 处理段落中的 Run (文本块,可能包含样式)
                runs_info = []
                for run in para.runs:
                    run_text = run.text
                    if run_text:
                        # 检查run的样式
                        is_bold = run.bold or (run.font and run.font.bold)
                        is_italic = run.italic or (run.font and run.font.italic)
                        # 其他样式如underline, strikethrough等也可以检查
                        runs_info.append({
                            'text': run_text,
                            'bold': is_bold,
                            'italic': is_italic
                        })
                        text += run_text

                # 获取段落样式
                style_name = para.style.name if para.style else 'Normal'

                document_elements.append({
                    'type': 'paragraph',
                    'text': text.strip(),
                    'style': style_name,
                    'runs_info': runs_info, # 包含更详细的run信息
                    'level': para.paragraph_format.left_indent # 尝试获取缩进,有助于列表
                })
            elif element.tag.endswith('tbl'):  # 表格
                table = document.tables[document.element.body.index(element) - document.element.body.count('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}p', 0, document.element.body.index(element))]
                table_data = []
                for row in table.rows:
                    row_data = []
                    for cell in row.cells:
                        cell_text = ""
                        for para in cell.paragraphs:
                            cell_text += para.text + "n"
                        row_data.append(cell_text.strip())
                    table_data.append(row_data)
                document_elements.append({
                    'type': 'table',
                    'data': table_data
                })
            # 可以添加处理图片、形状等的逻辑,但通常只提取文本和表格

    except PackageNotFoundError:
        print(f"Error: Document '{docx_path}' not found or is not a valid .docx file.")
        return []
    except Exception as e:
        print(f"Error processing .docx file '{docx_path}': {e}")
        return []

    return document_elements

def docx_elements_to_markdown(elements: list) -> str:
    """
    将提取的 .docx 元素转换为 Markdown 格式。
    :param elements: extract_docx_content 的输出。
    :return: Markdown 格式文本。
    """
    markdown_output = []

    # 定义 Word 样式到 Markdown 的映射
    style_to_markdown_heading = {
        'Title': '#',
        'Heading 1': '#',
        'Heading 2': '##',
        'Heading 3': '###',
        'Heading 4': '####',
        'Heading 5': '#####',
        'Heading 6': '######',
        'List Paragraph': '-', # 简单映射,可能需要更复杂的逻辑处理多级列表
        'Normal': ''
    }

    last_was_list = False

    for element in elements:
        if element['type'] == 'paragraph':
            text = element['text']
            style = element['style']

            # 处理粗体和斜体
            formatted_text = ""
            for run_info in element['runs_info']:
                run_text = run_info['text']
                if run_info['bold'] and run_info['italic']:
                    formatted_text += f"***{run_text}***"
                elif run_info['bold']:
                    formatted_text += f"**{run_text}**"
                elif run_info['italic']:
                    formatted_text += f"*{run_text}*"
                else:
                    formatted_text += run_text

            # 移除多余的空白并处理换行
            formatted_text = re.sub(r's+', ' ', formatted_text).strip()
            if not formatted_text:
                continue

            # 映射到 Markdown 标题或列表
            if style.startswith('Heading') or style == 'Title':
                heading_prefix = style_to_markdown_heading.get(style, '##') # 默认二级标题
                markdown_output.append(f"{heading_prefix} {formatted_text}n")
                last_was_list = False
            elif 'List Paragraph' in style or style.startswith('List'): # 简单的列表检测
                # 需要根据缩进或列表级别进一步细化,这里简化为一级列表
                list_prefix = '-'
                # 根据element['level']来判断缩进,但docx的indent是Length对象,需要转换
                # if element['level'] and element['level'].pt > 0: # 假设缩进大于0
                #    indent_level = int(element['level'].pt / 12) # 粗略计算缩进级别
                #    list_prefix = '  ' * indent_level + '-'
                markdown_output.append(f"{list_prefix} {formatted_text}n")
                last_was_list = True
            elif style == 'Normal' or not style:
                if last_was_list:
                    markdown_output.append("n") # 列表后接普通段落,加空行
                markdown_output.append(f"{formatted_text}n")
                last_was_list = False
            else: # 对于未知样式,也作为普通段落处理
                if last_was_list:
                    markdown_output.append("n")
                markdown_output.append(f"{formatted_text}n")
                last_was_list = False

        elif element['type'] == 'table':
            markdown_output.append("n")
            table_data = element['data']
            if not table_data:
                continue

            # 构建 Markdown 表格
            headers = table_data[0]
            header_line = "| " + " | ".join(headers) + " |n"
            separator_line = "| " + " | ".join(["---" for _ in headers]) + " |n"

            data_lines = []
            for row in table_data[1:]: # 跳过标题行
                row_values = [str(cell).replace('n', '<br>') for cell in row]
                data_lines.append("| " + " | ".join(row_values) + " |")

            markdown_output.append(header_line + separator_line + "n".join(data_lines) + "n")
            last_was_list = False

    # 清理并合并段落
    final_output = []
    current_paragraph_lines = []
    for line in markdown_output:
        if line.strip().startswith('#') or line.strip().startswith('-') or line.strip().startswith('|'): # 标题、列表、表格
            if current_paragraph_lines:
                final_output.append(" ".join(current_paragraph_lines).strip())
                current_paragraph_lines = []
            final_output.append(line.strip())
        elif not line.strip(): # 空行
            if current_paragraph_lines:
                final_output.append(" ".join(current_paragraph_lines).strip())
                current_paragraph_lines = []
            final_output.append('') # 保留一个空行
        else:
            current_paragraph_lines.append(line.strip())

    if current_paragraph_lines:
        final_output.append(" ".join(current_paragraph_lines).strip())

    # 移除连续多个空行
    cleaned_final_output = []
    last_was_empty = False
    for line in final_output:
        if not line.strip():
            if not last_was_empty:
                cleaned_final_output.append('')
            last_was_empty = True
        else:
            cleaned_final_output.append(line)
            last_was_empty = False

    return "n".join(cleaned_final_output).strip()

# 示例使用 (假设存在一个 .docx 文件: sample_document.docx)
# if __name__ == "__main__":
#     # 假设有一个Word文档 sample_document.docx
#     # 包含:
#     # 标题1: Document Title
#     # 标题2: Section One
#     # 正文段落,包含**粗体**和*斜体*。
#     # - 列表项1
#     # - 列表项2
#     # 表格:两行两列
#     # input_docx = 'sample_document.docx'
#     # if os.path.exists(input_docx):
#     #     docx_elements = extract_docx_content(input_docx)
#     #     markdown_output = docx_elements_to_markdown(docx_elements)
#     #     print("n--- Converted Markdown from DOCX ---")
#     #     print(markdown_output)
#     # else:
#     #     print(f"Error: Input DOCX '{input_docx}' not found. Please create one for testing.")
#     pass

表格:Word 样式到 Markdown 映射示例

Word 样式名称 对应的 Markdown 语法 备注
Title # 文档主标题
Heading 1 # 一级标题
Heading 2 ## 二级标题
Normal (普通段落) 默认文本,转换后通常为一行或多行文本合并的段落
List Paragraph -1. Word 中列表的通用样式,需要进一步判断是无序还是有序,并处理缩进
Strong / Bold **text** 文本加粗
Emphasis / Italic *text* 文本斜体

6. 从 PowerPoint 演示文稿 (.pptx) 中提取干净的 Markdown

PowerPoint 演示文稿(.pptx 格式)同样是 XML 文件的压缩包,主要由幻灯片组成。python-pptx 库可以访问幻灯片、文本框、形状、表格等内容。

6.1 PowerPoint 的挑战

  • 视觉为主: PPT 强调视觉呈现,文本通常分散在不同的文本框中,布局相对自由。
  • 语义缺失: 文本框之间没有明确的逻辑关系,需要根据位置和占位符类型推断。
  • 内容多样: 文本、图片、图表、 SmartArt、视频等。
  • 占位符: 文本通常位于特定占位符(如标题、正文)中,这有助于结构化。

6.2 使用 python-pptx 提取内容

pip install python-pptx

from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE

def extract_pptx_content(pptx_path: str) -> list:
    """
    从 .pptx 文件中提取内容,包括幻灯片标题、文本框内容、表格和备注。
    :param pptx_path: .pptx 文件路径。
    :return: 包含幻灯片元素的列表,每个元素是一个字典。
    """
    presentation_elements = []
    try:
        prs = Presentation(pptx_path)

        for i, slide in enumerate(prs.slides):
            slide_content = {
                'slide_number': i + 1,
                'title': '',
                'content_blocks': [],
                'notes': ''
            }

            # 提取幻灯片标题
            if slide.shapes.title:
                slide_content['title'] = slide.shapes.title.text

            # 提取幻灯片中的所有文本框和表格
            for shape in slide.shapes:
                if shape.has_text_frame:
                    text_frame = shape.text_frame
                    text = ""
                    for paragraph in text_frame.paragraphs:
                        # 处理段落中的 runs
                        para_text = ""
                        for run in paragraph.runs:
                            run_text = run.text
                            if run.font.bold:
                                run_text = f"**{run_text}**"
                            if run.font.italic:
                                run_text = f"*{run_text}*"
                            para_text += run_text

                        # 处理列表缩进
                        indent_level = paragraph.level # 0-indexed
                        list_prefix = '  ' * indent_level + '- ' if indent_level > 0 else ''
                        if para_text.strip():
                            text += f"{list_prefix}{para_text.strip()}n"

                    if text.strip():
                        slide_content['content_blocks'].append({
                            'type': 'text_box',
                            'text': text.strip()
                        })
                elif shape.has_table:
                    table_data = []
                    table = shape.table
                    for row_idx in range(table.rows):
                        row_cells = []
                        for col_idx in range(table.columns):
                            cell = table.cell(row_idx, col_idx)
                            cell_text = ""
                            for paragraph in cell.text_frame.paragraphs:
                                cell_text += paragraph.text.strip() + "n"
                            row_cells.append(cell_text.strip())
                        table_data.append(row_cells)

                    if table_data:
                        slide_content['content_blocks'].append({
                            'type': 'table',
                            'data': table_data
                        })
                # 可以添加处理图片、媒体等
                # elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
                #     slide_content['content_blocks'].append({'type': 'image', 'name': shape.name})

            # 提取讲师备注
            if slide.has_notes_slide:
                notes_slide = slide.notes_slide
                text_frame = notes_slide.notes_text_frame
                if text_frame and text_frame.text:
                    slide_content['notes'] = text_frame.text.strip()

            presentation_elements.append(slide_content)

    except PackageNotFoundError:
        print(f"Error: Presentation '{pptx_path}' not found or is not a valid .pptx file.")
        return []
    except Exception as e:
        print(f"Error processing .pptx file '{pptx_path}': {e}")
        return []

    return presentation_elements

def pptx_elements_to_markdown(elements: list) -> str:
    """
    将提取的 .pptx 元素转换为 Markdown 格式。
    :param elements: extract_pptx_content 的输出。
    :return: Markdown 格式文本。
    """
    markdown_output = []

    for slide_data in elements:
        markdown_output.append(f"# Slide {slide_data['slide_number']}n") # 每个幻灯片作为一个顶级标题

        if slide_data['title']:
            markdown_output.append(f"## {slide_data['title']}n") # 幻灯片标题作为二级标题

        for block in slide_data['content_blocks']:
            if block['type'] == 'text_box':
                # 文本框内容可能包含多行和列表
                markdown_output.append(block['text'])
                markdown_output.append("n")
            elif block['type'] == 'table':
                table_data = block['data']
                if not table_data:
                    continue

                # 构建 Markdown 表格
                headers = table_data[0]
                header_line = "| " + " | ".join(headers) + " |n"
                separator_line = "| " + " | ".join(["---" for _ in headers]) + " |n"

                data_lines = []
                for row in table_data[1:]:
                    row_values = [str(cell).replace('n', '<br>') for cell in row]
                    data_lines.append("| " + " | ".join(row_values) + " |")

                markdown_output.append(header_line + separator_line + "n".join(data_lines) + "n")

        if slide_data['notes']:
            markdown_output.append(f"n**Speaker Notes:**n> {slide_data['notes'].replace('n', 'n> ')}n")

        markdown_output.append("---nn") # 幻灯片之间用分隔线分隔

    # 移除连续多个空行
    cleaned_final_output = []
    last_was_empty = False
    for line in markdown_output:
        if not line.strip():
            if not last_was_empty:
                cleaned_final_output.append('')
            last_was_empty = True
        else:
            cleaned_final_output.append(line)
            last_was_empty = False

    return "n".join(cleaned_final_output).strip()

# 示例使用 (假设存在一个 .pptx 文件: sample_presentation.pptx)
# if __name__ == "__main__":
#     # 假设有一个PPT文件 sample_presentation.pptx
#     # 包含:
#     # 第一页:标题 'Introduction',正文框包含列表 '- Item 1n  - Sub-item 1'
#     # 第二页:标题 'Data Analysis',包含一个表格
#     # 第三页:标题 'Conclusion',包含讲师备注
#     # input_pptx = 'sample_presentation.pptx'
#     # if os.path.exists(input_pptx):
#     #     pptx_elements = extract_pptx_content(input_pptx)
#     #     markdown_output = pptx_elements_to_markdown(pptx_elements)
#     #     print("n--- Converted Markdown from PPTX ---")
#     #     print(markdown_output)
#     # else:
#     #     print(f"Error: Input PPTX '{input_pptx}' not found. Please create one for testing.")
#     pass

7. 通用策略与进阶考量

7.1 鲁棒性与定制化

  • 启发式规则: 针对标题、列表、段落的识别,通常依赖于字体大小、加粗、缩进、行间距等启发式规则。这些规则需要根据具体的文档类型和样式进行调整和优化。
  • 正则表达式: 在清理和识别特定模式(如日期、邮箱、URL、代码块)时非常有用。
  • 状态机: 对于复杂的嵌套结构(如多级列表、代码块内的注释),可以采用状态机来跟踪当前的解析上下文。
  • 配置化: 提供外部配置文件,允许用户自定义 Word 样式到 Markdown 标记的映射,或调整 OCR 预处理参数。

7.2 错误处理与日志

  • 异常捕获: 任何文件操作和解析都可能遇到异常,如文件不存在、格式损坏、权限问题等。
  • 健壮性: 对于无法解析或识别的内容,应有回退机制,例如将其作为纯文本输出,或标记为“未知内容”。
  • 日志记录: 记录处理过程中的警告和错误,便于调试和问题追踪。

7.3 文本后处理

  • Markdown 格式化工具: mdformat 等工具可以自动规范化 Markdown 语法,确保输出的一致性。
  • 拼写和语法检查: 对于 OCR 结果,可以集成语言工具进行初步的错误纠正。
  • 语义增强: 结合 NLP 技术(如命名实体识别、关键词提取)为 Markdown 添加额外的元数据或结构。

7.4 性能优化

  • 并行处理: 对于大量文件,可以利用多核 CPU 进行并行处理(例如 multiprocessing 模块)。
  • 内存管理: 处理大型 PDF 或 PPTX 文件时,注意内存使用,避免一次性加载所有内容。
  • 缓存: 对于重复处理的文件或中间结果,可以使用缓存。

8. 总结与展望

非结构化数据预处理是将原始、复杂的文档转化为结构化、可分析数据的基础。我们已经看到了如何利用 Python 及其丰富的库,针对扫描版 PDF、数字版 PDF、Word 和 PowerPoint 文档,一步步地提取内容并转换为干净的 Markdown 格式。这个过程充满了挑战,但也充满了技术乐趣,需要我们综合运用 OCR、布局分析、文本处理和结构映射等多种技术。

未来的发展将继续朝着更智能、更自动化的方向迈进,例如利用深度学习模型(如 LayoutLM、Don’t Be BLIP)进行更精准的文档布局理解和语义信息提取,甚至实现多模态信息的融合处理。掌握这些预处理技术,无疑是您在数据科学和自动化领域取得成功的关键能力之一。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注