什么是 ‘Semantic Slicing’:将 10 万字文档拆解为具备‘逻辑锚点’的切片,在图中实现高保真召回

各位编程领域的专家、学者,以及对智能文档处理和知识图谱技术充满热情的同仁们:

大家好!

今天,我将与大家深入探讨一项前沿而实用的技术——“语义切片”(Semantic Slicing)。在信息爆炸的时代,我们每天都面临着海量的非结构化文本数据,尤其是长篇文档,例如技术规范、法律合同、研究报告,甚至是一本十万字的电子书。如何高效地理解、导航和检索这些文档中的知识,是一个长期存在的挑战。传统的文档处理方法,如固定大小的分块(fixed-size chunking)或简单的句子分割,往往会割裂上下文,破坏逻辑完整性,导致在后续的知识检索和表示中出现“失真”。

今天,我们的目标是超越这些局限,探讨如何将一份长达十万字的文档,拆解为一系列具备“逻辑锚点”的切片,并在一个高保真的知识图谱中实现精准、上下文丰富的召回。这不仅仅是技术细节的堆砌,更是一种对知识组织和检索范式的深刻变革。


1. 挑战:传统文档处理的局限

想象一下,你有一份长达100,000字的巨型技术文档,其中包含了多个章节、子章节、图表说明、代码示例和详细的解释。如果你只是简单地将这份文档按照固定字数(例如200字)或固定段落数进行切分,会发生什么?

  • 逻辑割裂: 一个完整的概念、一个复杂的论证过程,很可能被硬生生地切断,导致每个切片都缺乏足够的上下文,难以独立理解。
  • 信息冗余与缺失: 切片之间可能存在大量重复信息,或者关键的连接点被忽略。
  • 低效检索: 当用户搜索某个概念时,系统可能返回一堆零散的、不完整的切片,用户需要花费大量时间去拼凑上下文,严重影响了信息检索的效率和用户体验。
  • 难以构建知识图谱: 缺乏逻辑边界的切片难以作为图谱的节点,它们之间的关系也难以准确建立。

这些问题在传统的RAG(Retrieval Augmented Generation)系统中尤为突出。一个高质量的检索结果,其基础是高质量的切片。如果切片本身缺乏语义完整性,那么即使有再强大的大语言模型,也难以生成真正准确和有用的回答。


2. 什么是语义切片?

“语义切片”的核心思想是:以文档的内在逻辑结构和语义内容为依据,智能地识别并划分出具有独立意义和完整上下文的“切片”。这些切片并非任意分割,而是围绕着“逻辑锚点”进行组织,确保每个切片都代表一个相对完整的知识单元或论述段落。

逻辑锚点 (Logical Anchors) 是指文档中那些具有明确语义边界、表达核心概念、或引出重要论点的关键点。它们可以是:

  • 结构性锚点: 章节标题、子标题、段落分隔、列表项等。
  • 主题性锚点: 话题的开始和结束、新概念的引入、论点的转变。
  • 实体性锚点: 关键人物、组织、产品、技术术语首次出现或被详细解释的地方。
  • 论述性锚点: 问题提出、解决方案、论证过程、结论等。

通过识别这些锚点,我们将文档从线性的文本流转化为离散的、语义丰富的知识块。这些知识块将成为构建高保真知识图谱的基本单元。

让我们通过一个表格来对比传统分块与语义切片的区别:

特征 传统分块 (Fixed-size Chunking) 语义切片 (Semantic Slicing)
切分依据 固定字符数、固定行数、固定段落数 文档内在逻辑、语义完整性、主题边界、结构标记
切片大小 相对固定 动态可变,取决于逻辑单元的长度
上下文完整性 易被破坏,可能割裂逻辑 保持良好,每个切片是相对独立的逻辑单元
召回质量 低,可能返回不完整或冗余信息 高,返回与查询意图高度相关的完整逻辑单元
知识图谱构建 难以作为有意义的节点,关系模糊 易于作为图谱节点,便于建立清晰的语义关系
处理复杂性 高,需要NLP、机器学习等技术支持
适用场景 简单文本处理,粗略预览 精准知识检索、RAG增强、智能问答、知识图谱构建、深度文档理解

3. 高保真召回与知识图谱

“高保真召回”意味着当用户提出一个查询时,系统不仅能返回相关的切片,更能提供围绕该切片的完整上下文,包括其前因后果、相关实体、所属主题以及与文档中其他部分的联系。这就像是你在查看一个复杂的机器零件时,不仅看到了零件本身,还能看到它的装配图、使用手册以及与其他零件的连接方式。

知识图谱 (Knowledge Graph) 是实现高保真召回的关键。它通过节点 (Nodes)边 (Edges) 来表示知识。

  • 节点: 在语义切片场景中,每个语义切片本身可以是一个节点。此外,从切片中提取出的关键实体(人、组织、地点、概念、技术术语)、核心主题也可以作为独立的节点。
  • 边: 边则表示节点之间的关系。例如:
    • 顺序关系: 切片 A precedes 切片 B。
    • 语义关系: 切片 A discusses 主题 X,切片 B elaborates_on 切片 A 的某个观点。
    • 实体关系: 切片 A mentions 实体 Y,实体 Y is_a 概念 Z。
    • 引用关系: 切片 A refers_to 切片 C。

通过将语义切片转化为图谱中的节点和边,我们构建了一个多维度的知识网络。在这个网络中,查询不再是简单的关键词匹配,而是可以在图谱中进行路径遍历、模式匹配和语义推理,从而实现对原始文档内容的高保真、上下文丰富的检索。


4. 语义切片技术管线:从文本到图谱

现在,我们进入技术核心。语义切片是一个多阶段的复杂过程,它融合了自然语言处理(NLP)、机器学习、图论等多个领域的知识。以下是一个典型的语义切片技术管线:

4.1. 预处理 (Preprocessing)

这是所有NLP任务的起点,旨在将原始文档转化为机器可读的、结构化的形式。

import re
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
import unicodedata

# 下载NLTK资源 (如果尚未下载)
try:
    nltk.data.find('tokenizers/punkt')
except nltk.downloader.DownloadError:
    nltk.download('punkt')
try:
    nltk.data.find('corpora/stopwords')
except nltk.downloader.DownloadError:
    nltk.download('stopwords')

def clean_text(text):
    """
    清理文本:去除多余空白、特殊字符、标准化Unicode等。
    """
    # 标准化Unicode字符(例如全角转半角)
    text = unicodedata.normalize('NFKC', text)
    # 移除或替换特殊字符(根据具体需求调整)
    text = re.sub(r'[ufeffu200bu200cu200du200eu200f]', '', text) # 移除零宽字符
    text = re.sub(r'[^ws.,;!?:"'()<>/[]{}@#$%^&*+-=`~_]', ' ', text) # 保留常见标点,移除其他特殊字符
    # 将多个空格替换为单个空格
    text = re.sub(r's+', ' ', text).strip()
    return text

def preprocess_document(document_path):
    """
    加载文档并进行初步预处理:清洗、句子分割、段落分割。
    """
    with open(document_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()

    cleaned_text = clean_text(raw_text)

    # 1. 段落分割 (基于双换行符)
    paragraphs = [p.strip() for p in cleaned_text.split('nn') if p.strip()]

    # 2. 句子分割 (使用NLTK)
    # 注意:对于中文,通常使用其他库如spaCy或自定义规则效果更好
    # 这里为了通用性,先用NLTK,后续会介绍spaCy
    sentences = []
    for para in paragraphs:
        sentences.extend(sent_tokenize(para, language='english')) # 假设英文,中文需'chinese'或其它库

    # 对于中文,推荐使用spaCy
    # import spacy
    # nlp_zh = spacy.load("zh_core_web_sm")
    # doc = nlp_zh(cleaned_text)
    # sentences_zh = [sent.text.strip() for sent in doc.sents]
    # paragraphs_zh = [p.text.strip() for p in doc.paragraphs] # spaCy 3.0+ 支持 paragraph segmentation

    return cleaned_text, paragraphs, sentences

# 示例使用
# with open("example_document.txt", "w", encoding="utf-8") as f:
#     f.write("这是一个示例文档。nn它包含多个段落。nn每个段落又包含多个句子。比如这一句。")

# cleaned_doc, doc_paragraphs, doc_sentences = preprocess_document("example_document.txt")
# print("--- 清理后的文本 ---")
# print(cleaned_doc[:200]) # 打印前200字
# print("n--- 段落示例 ---")
# print(doc_paragraphs[:2])
# print("n--- 句子示例 ---")
# print(doc_sentences[:5])

在实际应用中,对于中文文本,强烈推荐使用spaCyTHULAC等专门针对中文的NLP工具包进行分词和句子分割,它们能更好地处理中文的无空格特性和复杂的标点符号。

4.2. 语义表示 (Semantic Representation)

将文本转换为数值向量(嵌入),以便计算相似度、聚类和进行其他机器学习操作。高质量的嵌入是识别语义边界的基础。

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# 加载预训练的 Sentence-BERT 模型
# 'paraphrase-multilingual-MiniLM-L12-v2' 支持多语言,包括中文和英文
# 对于纯中文任务,也可以选择如 'shibing624/text2vec-base-chinese'
try:
    sbert_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
except Exception as e:
    print(f"加载Sentence-BERT模型失败: {e}. 请检查网络连接或模型名称。尝试下载到本地。")
    # 如果失败,可以尝试手动下载模型到本地路径,然后加载
    # sbert_model = SentenceTransformer('/path/to/your/downloaded/model')

def get_sentence_embeddings(sentences, model):
    """
    获取句子嵌入向量。
    """
    if not sentences:
        return np.array([])
    embeddings = model.encode(sentences, show_progress_bar=True)
    return embeddings

def get_chunk_embeddings(chunks, model):
    """
    获取更大文本块(例如段落或初步切片)的嵌入向量。
    """
    if not chunks:
        return np.array([])
    embeddings = model.encode(chunks, show_progress_bar=True)
    return embeddings

# 示例使用
# sample_sentences = [
#     "自然语言处理是人工智能的一个分支。",
#     "它涉及计算机与人类语言之间的交互。",
#     "机器学习是人工智能的另一个重要领域。",
#     "深度学习是机器学习的一个子集,在NLP中取得了巨大成功。"
# ]
# sentence_embeddings = get_sentence_embeddings(sample_sentences, sbert_model)
# print("n--- 句子嵌入向量形状 ---")
# print(sentence_embeddings.shape) # (4, 384) for MiniLM-L12-v2

# 计算句子之间的相似度
# similarity_matrix = cosine_similarity(sentence_embeddings)
# print("n--- 相似度矩阵 ---")
# print(similarity_matrix)

4.3. 识别逻辑锚点 (Identifying Logical Anchors)

这是语义切片的核心步骤,需要结合多种策略来识别文档中的语义边界。

4.3.1. 结构分析 (Structural Analysis)

文档的结构(标题、子标题、列表、段落分隔)是识别逻辑锚点最直接、最可靠的信号。

def extract_structural_elements(text):
    """
    通过正则表达式提取标题和段落。
    这里假设简单的Markdown风格或类似结构。
    实际文档可能需要更复杂的解析器(如lxml, Beautiful Soup for HTML/XML)。
    """
    headings = []
    # 匹配Markdown风格的H1-H4标题
    heading_pattern = re.compile(r'^(#+s.*)$', re.MULTILINE)
    # 对于纯文本,可能需要识别大写、居中、加粗等格式
    # 假设标题通常在行首且独立成行
    # 例如:以数字和点开头的章节号,如 "1.0 引言", "1.1 背景"
    numbered_heading_pattern = re.compile(r'^s*(d+(.d+)*)s+([^n]+)$', re.MULTILINE)

    lines = text.split('n')
    structural_boundaries = [] # 存储 (行号, 类型, 内容)

    current_char_offset = 0
    for i, line in enumerate(lines):
        match_h = heading_pattern.match(line)
        match_num_h = numbered_heading_pattern.match(line)

        if match_h:
            headings.append((match_h.group(1).strip(), current_char_offset, "Markdown Heading"))
            structural_boundaries.append({'type': 'heading', 'level': match_h.group(1).count('#'), 'text': match_h.group(1).strip(), 'char_offset': current_char_offset})
        elif match_num_h:
            headings.append((match_num_h.group(0).strip(), current_char_offset, "Numbered Heading"))
            structural_boundaries.append({'type': 'heading', 'level': len(match_num_h.group(1).split('.')), 'text': match_num_h.group(0).strip(), 'char_offset': current_char_offset})

        current_char_offset += len(line) + 1 # +1 for newline character

    return headings, structural_boundaries

# 示例使用
# sample_doc_with_headings = """
# # 1. 引言
# 这是一个关于语义切片技术的介绍。
# ## 1.1 背景
# 传统的分块方法存在诸多问题。
# ### 1.1.1 挑战
# 逻辑割裂是主要问题之一。
# 2. 核心概念
# 语义切片旨在解决这些问题。
# """
# headings_found, boundaries_found = extract_structural_elements(sample_doc_with_headings)
# print("n--- 提取的标题 ---")
# for h in headings_found:
#     print(h)
# print("n--- 结构边界 ---")
# for b in boundaries_found:
#     print(b)
4.3.2. 基于语义相似度的边界检测 (Cohesion-based Slicing)

当一个文档从一个主题转向另一个主题时,连续句子或段落之间的语义相似度通常会下降。我们可以利用这个“相似度下降点”来识别潜在的切片边界。

def find_semantic_boundaries_by_similarity(embeddings, sentences, window_size=3, threshold_ratio=0.7):
    """
    根据句子嵌入的相似度下降来识别语义边界。
    使用一个滑动窗口来计算局部相似度,并检测相似度的显著下降。
    """
    if len(embeddings) < window_size * 2:
        return []

    similarity_scores = []
    # 计算相邻句子对的余弦相似度
    for i in range(len(embeddings) - 1):
        similarity_scores.append(cosine_similarity(embeddings[i].reshape(1, -1), embeddings[i+1].reshape(1, -1))[0][0])

    boundaries = []
    # 检测相似度下降点
    for i in range(len(similarity_scores) - window_size):
        current_similarity = similarity_scores[i]
        next_window_avg_similarity = np.mean(similarity_scores[i+1 : i+1+window_size])

        # 如果当前相似度显著低于后续窗口的平均相似度,则可能是一个边界
        # 这里的阈值需要根据实际数据进行调整
        if current_similarity < next_window_avg_similarity * threshold_ratio:
            boundaries.append(i + 1) # 边界在第 i 和 i+1 个句子之间

    # 进一步优化:合并过于接近的边界,或结合结构信息
    return sorted(list(set(boundaries)))

# 示例使用 (假设之前已经生成了 sentence_embeddings)
# if 'sentence_embeddings' in locals() and len(sentence_embeddings) > 0:
#     semantic_boundaries = find_semantic_boundaries_by_similarity(sentence_embeddings, sample_sentences)
#     print("n--- 语义相似度检测到的边界 (句子索引) ---")
#     print(semantic_boundaries)
#     # 打印边界处的句子,帮助理解
#     # for idx in semantic_boundaries:
#     #     print(f"----- Boundary at index {idx} -----")
#     #     if idx > 0:
#     #         print(f"Sentence {idx-1}: {sample_sentences[idx-1]}")
#     #     print(f"Sentence {idx}: {sample_sentences[idx]}")
4.3.3. 主题建模 (Topic Modeling)

主题建模可以帮助我们发现文档中潜在的抽象主题,并在主题发生显著变化的地方设置逻辑锚点。BERTopic是一个非常强大的工具,它结合了BERT嵌入和HDBSCAN聚类来识别主题。

from bertopic import BERTopic

def extract_topics_and_boundaries(sentences, embeddings):
    """
    使用BERTopic进行主题建模,并识别主题切换点作为潜在边界。
    """
    if not sentences or not embeddings.any():
        return [], [], []

    # language='multilingual' 允许处理多种语言
    # nr_topics='auto' 或指定一个整数来控制主题数量
    topic_model = BERTopic(language="multilingual", calculate_probabilities=True, verbose=True, nr_topics="auto")
    topics, probs = topic_model.fit_transform(sentences, embeddings)

    # 获取每个句子所属的主题ID
    sentence_topics = topics

    # 识别主题切换点
    topic_boundaries = []
    for i in range(1, len(sentence_topics)):
        if sentence_topics[i] != sentence_topics[i-1]:
            topic_boundaries.append(i) # 边界在第 i-1 和 i 个句子之间

    return topics, probs, topic_boundaries, topic_model

# 示例使用 (假设之前已经生成了 sample_sentences 和 sentence_embeddings)
# if 'sentence_embeddings' in locals() and len(sentence_embeddings) > 0:
#     doc_topics, doc_probs, topic_boundaries_indices, bertopic_model = extract_topics_and_boundaries(sample_sentences, sentence_embeddings)
#     print("n--- 句子主题分配 (部分) ---")
#     print(doc_topics[:10])
#     print("n--- 主题切换边界 (句子索引) ---")
#     print(topic_boundaries_indices)
#     # 打印主题关键词
#     # for topic_id in bertopic_model.get_topics():
#     #     if topic_id != -1: # -1 是噪声主题
#     #         print(f"Topic {topic_id}: {bertopic_model.get_topic(topic_id)}")
4.3.4. 命名实体识别 (Named Entity Recognition – NER)

关键实体(人名、组织名、地名、产品名、技术术语)的首次提及、详细解释或密集出现,也可能是逻辑锚点。例如,在技术文档中,一个新技术的首次引入和详细描述,通常标志着一个新的语义切片。

import spacy

# 加载spaCy模型 (中文或英文)
# 对于中文,确保已安装 `python -m spacy download zh_core_web_sm`
# 对于英文,确保已安装 `python -m spacy download en_core_web_sm`
try:
    nlp_zh = spacy.load("zh_core_web_sm")
except OSError:
    print("下载 'zh_core_web_sm' 模型...")
    spacy.cli.download("zh_core_web_sm")
    nlp_zh = spacy.load("zh_core_web_sm")

def extract_named_entities(text):
    """
    使用spaCy提取命名实体。
    返回实体列表及其在文本中的字符偏移。
    """
    doc = nlp_zh(text)
    entities = []
    for ent in doc.ents:
        entities.append({
            'text': ent.text,
            'label': ent.label_,
            'start_char': ent.start_char,
            'end_char': ent.end_char
        })
    return entities

# 示例使用
# sample_text_ner = "苹果公司由史蒂夫·乔布斯于1976年创立,总部位于加利福尼亚州的库比蒂诺。其产品包括iPhone和MacBook。"
# extracted_entities = extract_named_entities(sample_text_ner)
# print("n--- 提取的命名实体 ---")
# for ent in extracted_entities:
#     print(ent)

4.4. 切片算法 (Slicing Algorithm)

这是将所有识别出的锚点信号进行整合,最终确定切片边界的阶段。这通常是一个启发式或基于规则的算法,也可以是更复杂的机器学习模型。

核心思想: 将文档视为一系列潜在的切片点(每个句子或段落之间),然后为每个切片点分配一个“得分”,表示其作为逻辑锚点的强度。得分越高,越可能是有效的切片边界。

得分构成:

  • 结构性得分: 标题 > 子标题 > 段落分隔。
  • 语义相似度得分: 相似度下降幅度越大,得分越高。
  • 主题切换得分: 发生主题切换,得分增加。
  • 实体密度得分: 密集出现关键实体,或新实体首次出现,得分增加。
def combine_and_score_boundaries(
    sentences, 
    structural_boundaries, 
    semantic_boundaries_indices, 
    topic_boundaries_indices, 
    entities, 
    min_slice_length=5, # 最小切片长度(句子数)
    max_slice_length=50 # 最大切片长度(句子数)
):
    """
    结合多种信号,计算每个句子作为切片边界的得分,并确定最终切片。
    """
    potential_boundaries = set()

    # 将结构性边界转换为句子索引 (近似值)
    # 这是一个简化,实际需要将字符偏移映射到句子索引
    sentence_char_offsets = []
    current_offset = 0
    for s in sentences:
        sentence_char_offsets.append(current_offset)
        current_offset += len(s) + 1 # +1 for space/newline

    for sb in structural_boundaries:
        # 找到最接近结构边界字符偏移的句子索引
        closest_sentence_idx = np.argmin(np.abs(np.array(sentence_char_offsets) - sb['char_offset']))
        potential_boundaries.add(closest_sentence_idx)
        # 结构性边界通常非常强,可以给一个高初始权重

    potential_boundaries.update(semantic_boundaries_indices)
    potential_boundaries.update(topic_boundaries_indices)

    # 实体信号可以用来增强现有边界或创建新边界
    # 例如:如果一个新实体在某个位置首次出现,且该位置附近已经有其他弱边界信号,则强化该边界
    # 这一步比较复杂,这里简化处理,只将所有潜在边界点合并

    # 确保文档的开始和结束也是边界
    if 0 not in potential_boundaries:
        potential_boundaries.add(0)
    if len(sentences) not in potential_boundaries:
        potential_boundaries.add(len(sentences))

    sorted_boundaries = sorted(list(potential_boundaries))

    final_slices = []
    current_start_idx = 0

    for i in range(1, len(sorted_boundaries)):
        boundary_idx = sorted_boundaries[i]

        # 检查切片长度是否合理
        slice_length = boundary_idx - current_start_idx

        # 如果切片太短,尝试合并到前一个切片(如果存在)或下一个切片
        if slice_length < min_slice_length and current_start_idx != 0:
            # 合并到上一个切片,或者推迟当前边界
            continue # 暂时跳过这个边界,让它与下一个边界合并

        # 如果切片太长,可能需要进一步细分(这个逻辑更复杂,这里暂时不实现)
        # 可以考虑在长切片内部再次运行相似度检测等

        # 确定切片内容
        slice_sentences = sentences[current_start_idx:boundary_idx]
        if slice_sentences:
            final_slices.append(" ".join(slice_sentences))
            current_start_idx = boundary_idx

    # 处理最后一个切片
    if current_start_idx < len(sentences):
        final_slices.append(" ".join(sentences[current_start_idx:len(sentences)]))

    # 再次检查并合并过短的切片,或分裂过长的切片(可选,迭代优化)
    # ... 复杂的合并/分裂逻辑 ...

    return final_slices

# 假设我们有了所有的原始数据
# example_document_path = "example_document_long.txt" # 假设这是一个长文档
# with open(example_document_path, "w", encoding="utf-8") as f:
#      f.write(sample_doc_with_headings + "nn" + "这是一个关于自然语言处理的段落。它深入探讨了其历史和发展。NLP在许多实际应用中都发挥着关键作用,例如机器翻译和情感分析。nn机器学习是构建NLP系统的核心技术。最近,深度学习,特别是Transformer模型,彻底改变了NLP领域。nn未来,我们将看到更多创新的NLP技术出现。nn这是另一个不相关的主题,关于量子计算。量子计算是一种全新的计算范式,与传统计算截然不同。")

# _, doc_paragraphs_full, doc_sentences_full = preprocess_document(example_document_path)
# doc_embeddings_full = get_sentence_embeddings(doc_sentences_full, sbert_model)
# doc_headings, doc_structural_boundaries = extract_structural_elements(" ".join(doc_paragraphs_full))
# doc_semantic_boundaries = find_semantic_boundaries_by_similarity(doc_embeddings_full, doc_sentences_full)
# doc_topics, _, doc_topic_boundaries, _ = extract_topics_and_boundaries(doc_sentences_full, doc_embeddings_full)
# doc_entities = extract_named_entities(" ".join(doc_sentences_full)) # 对整个文档提取实体

# final_semantic_slices = combine_and_score_boundaries(
#     doc_sentences_full,
#     doc_structural_boundaries,
#     doc_semantic_boundaries,
#     doc_topic_boundaries,
#     doc_entities
# )
# print(f"n--- 最终切片数量: {len(final_semantic_slices)} ---")
# for i, s in enumerate(final_semantic_slices[:5]): # 打印前5个切片
#     print(f"Slice {i+1} ({len(s)} chars):n{s[:200]}...n---") # 打印前200字符

这个combine_and_score_boundaries函数是一个简化的示例。在实际应用中,切片算法会更加复杂,可能包括:

  • 动态规划: 寻找最优的切片方案,最小化切片内部的语义不连贯性,并最大化切片间的语义差异。
  • 图割算法: 将文档表示为图,节点是句子,边是语义相似度。通过切割图来找到最佳边界。
  • 强化学习: 训练一个模型来学习如何根据各种信号决定切片边界。
  • 迭代优化: 初步切片后,对过短或过长的切片进行进一步的合并或细分。

4.5. 图谱构建 (Graph Construction)

有了语义切片,我们就可以开始构建知识图谱了。

import networkx as nx

def build_knowledge_graph(slices, slice_embeddings, entities_by_slice, topics_by_slice, sbert_model, similarity_threshold=0.6):
    """
    根据语义切片、实体、主题等信息构建知识图谱。
    """
    G = nx.Graph()

    # 1. 添加切片节点 (Slice Nodes)
    slice_nodes = []
    for i, s_content in enumerate(slices):
        slice_id = f"Slice_{i}"
        G.add_node(slice_id, type="slice", content=s_content, embedding=slice_embeddings[i])
        slice_nodes.append(slice_id)

    # 2. 添加实体节点 (Entity Nodes) 和它们与切片的连接
    entity_nodes = set()
    for slice_idx, entities in enumerate(entities_by_slice):
        slice_id = f"Slice_{slice_idx}"
        for ent in entities:
            entity_text = ent['text']
            entity_label = ent['label']
            if entity_text not in entity_nodes:
                G.add_node(entity_text, type="entity", label=entity_label)
                entity_nodes.add(entity_text)
            G.add_edge(slice_id, entity_text, relation="mentions")

    # 3. 添加主题节点 (Topic Nodes) 和它们与切片的连接
    topic_nodes = set()
    for slice_idx, topic_info in enumerate(topics_by_slice): # topics_by_slice 应该是 (主题ID, 关键词列表)
        slice_id = f"Slice_{slice_idx}"
        topic_id = topic_info[0]
        topic_keywords = topic_info[1] # 假设第二个元素是关键词列表
        if topic_id != -1: # 排除噪声主题
            topic_node_name = f"Topic_{topic_id}"
            if topic_node_name not in topic_nodes:
                G.add_node(topic_node_name, type="topic", keywords=topic_keywords)
                topic_nodes.add(topic_node_name)
            G.add_edge(slice_id, topic_node_name, relation="discusses")

    # 4. 添加切片之间的顺序关系 (Sequential Edges)
    for i in range(len(slice_nodes) - 1):
        G.add_edge(slice_nodes[i], slice_nodes[i+1], relation="precedes")

    # 5. 添加切片之间的语义相似度关系 (Semantic Similarity Edges)
    # 可以在切片之间计算相似度,如果高于某个阈值则添加边
    slice_embeddings_list = [G.nodes[node]['embedding'] for node in slice_nodes]
    if len(slice_embeddings_list) > 1:
        similarity_matrix = cosine_similarity(np.array(slice_embeddings_list))
        for i in range(len(slice_nodes)):
            for j in range(i + 1, len(slice_nodes)):
                if similarity_matrix[i, j] > similarity_threshold:
                    G.add_edge(slice_nodes[i], slice_nodes[j], relation="semantically_similar", weight=similarity_matrix[i, j])

    return G

# 假设我们已经有了 final_semantic_slices
# 需要为每个切片生成嵌入
# slice_embeddings_final = get_chunk_embeddings(final_semantic_slices, sbert_model)

# 还需要为每个切片提取实体和主题
# 这需要一个循环,对每个切片调用 extract_named_entities 和 topic_model.transform
# entities_per_slice = []
# topics_per_slice = []
# for s in final_semantic_slices:
#     entities_per_slice.append(extract_named_entities(s))
#     # 对于BERTopic,需要先transform再获取主题
#     # s_topics, s_probs = bertopic_model.transform([s], sbert_model.encode([s]))
#     # if s_topics[0] != -1:
#     #     topics_per_slice.append((s_topics[0], bertopic_model.get_topic(s_topics[0])))
#     # else:
#     #     topics_per_slice.append((-1, [])) # 噪声主题

# kg = build_knowledge_graph(
#     final_semantic_slices,
#     slice_embeddings_final,
#     entities_per_slice,
#     topics_per_slice,
#     sbert_model
# )

# print(f"n--- 知识图谱节点数: {kg.number_of_nodes()} ---")
# print(f"--- 知识图谱边数: {kg.number_of_edges()} ---")
# print("部分节点示例:", list(kg.nodes(data=True))[:5])
# print("部分边示例:", list(kg.edges(data=True))[:5])

4.6. 高保真召回 (High-Fidelity Recall)

在构建了知识图谱之后,我们可以利用图谱的结构和语义信息进行高级检索。

def query_knowledge_graph(graph, query_text, sbert_model, top_k=3, traversal_depth=2):
    """
    在知识图谱中执行语义查询,返回高保真的相关信息。
    """
    query_embedding = sbert_model.encode([query_text])[0]

    # 1. 查找最相似的切片节点
    slice_nodes = [n for n, data in graph.nodes(data=True) if data.get('type') == 'slice']
    if not slice_nodes:
        return "图谱中没有切片节点。", []

    slice_embeddings = np.array([graph.nodes[n]['embedding'] for n in slice_nodes])
    similarities = cosine_similarity(query_embedding.reshape(1, -1), slice_embeddings)[0]

    # 获取相似度最高的切片索引
    top_k_indices = np.argsort(similarities)[::-1][:top_k]
    initial_relevant_slices = [slice_nodes[i] for i in top_k_indices if similarities[i] > 0.5] # 设置相似度阈值

    if not initial_relevant_slices:
        return "未找到足够相关的切片。", []

    retrieved_info = []
    visited_nodes = set()

    # 2. 从最相似的切片开始,进行图谱遍历,收集相关上下文
    for start_slice_node in initial_relevant_slices:
        if start_slice_node in visited_nodes:
            continue

        current_context = {
            'slice_id': start_slice_node,
            'slice_content': graph.nodes[start_slice_node]['content'],
            'related_entities': [],
            'related_topics': [],
            'neighboring_slices': []
        }

        # BFS或DFS遍历
        queue = [(start_slice_node, 0)]
        visited_nodes.add(start_slice_node)

        while queue:
            current_node, depth = queue.pop(0)

            if depth > traversal_depth:
                continue

            for neighbor in graph.neighbors(current_node):
                if neighbor not in visited_nodes:
                    neighbor_data = graph.nodes[neighbor]
                    if neighbor_data.get('type') == 'entity':
                        current_context['related_entities'].append(neighbor_data['text'])
                    elif neighbor_data.get('type') == 'topic':
                        current_context['related_topics'].append(neighbor_data['keywords'])
                    elif neighbor_data.get('type') == 'slice' and neighbor != start_slice_node:
                        # 避免重复添加,并区分主切片
                        current_context['neighboring_slices'].append({
                            'id': neighbor,
                            'content': neighbor_data['content'][:100] + '...' # 只显示部分内容
                        })

                    visited_nodes.add(neighbor)
                    if neighbor_data.get('type') != 'entity' and neighbor_data.get('type') != 'topic': # 不再从实体/主题节点继续遍历
                         queue.append((neighbor, depth + 1))

        retrieved_info.append(current_context)

    return "召回成功", retrieved_info

# 示例查询
# query = "语义切片的核心思想是什么?"
# status, results = query_knowledge_graph(kg, query, sbert_model, top_k=2, traversal_depth=1)
# print(f"n--- 查询结果: {status} ---")
# for res in results:
#     print(f"** 主切片: {res['slice_id']} **")
#     print(f"内容: {res['slice_content'][:300]}...")
#     print(f"相关实体: {list(set(res['related_entities']))}")
#     print(f"相关主题: {list(set([' '.join(t) for t in res['related_topics']]))}")
#     print(f"相邻切片: {res['neighboring_slices']}")
#     print("-" * 50)

这个查询函数展示了如何利用图谱进行上下文丰富的召回。它不仅返回了与查询最直接相关的切片,还沿着图谱的边,通过有限的深度遍历,查找并返回了与该切片相关的实体、主题以及邻近的逻辑切片。这就是“高保真召回”的体现:提供了一个完整的知识视图,而不仅仅是一个孤立的答案。


5. 挑战与考量

尽管语义切片前景广阔,但在实际实施中仍面临诸多挑战:

  • 领域特异性: 不同领域的文档(如法律、医学、IT)对“逻辑锚点”的定义可能大相径庭。通用模型可能表现不佳,需要领域适应性训练。
  • 计算资源: 对于10万字甚至更长的文档,生成嵌入、进行主题建模、构建和遍历图谱都需要显著的计算资源和时间。
  • 切片粒度: 如何确定“最佳”切片粒度是一个难题。太细会导致碎片化,太粗则失去精度。这通常需要在效率和召回质量之间进行权衡。
  • 歧义与上下文: 语言的固有歧义性使得精确识别语义边界充满挑战。例如,一个词在不同上下文中可能代表不同的概念。
  • 动态文档: 文档内容可能会更新,如何有效地更新和维护已构建的知识图谱是一个持续的挑战。
  • 评估标准: 如何客观、量化地评估语义切片的质量和高保真召回的效果,仍需进一步研究和行业标准的建立。

6. 实际应用

语义切片和基于图谱的高保真召回技术在多个领域具有巨大的应用潜力:

  • RAG系统增强: 为大型语言模型提供更精准、上下文更丰富的检索结果,显著提升生成内容的准确性和相关性。
  • 智能文档导航与搜索: 用户可以通过概念、实体、主题等方式,而非简单的关键词,在复杂文档中进行深度探索。
  • 知识库构建: 自动化地从非结构化文档中提取结构化知识,填充企业知识图谱。
  • 法律与合规: 分析法律条文、合同和判例,快速定位关键条款、相关方和法律责任。
  • 科学研究与专利分析: 辅助研究人员快速理解复杂论文,发现研究趋势,分析专利引用关系。
  • 教育与培训: 为学习者提供个性化的学习路径,根据知识点之间的逻辑关系推荐相关材料。

7. 未来展望

语义切片技术仍在不断演进。未来的发展方向可能包括:

  • 结合生成式AI: 利用大型语言模型更智能地识别和定义逻辑锚点,甚至生成切片的摘要或相互关系描述。
  • 多模态语义切片: 不仅限于文本,还将图像、视频、音频等非文本内容纳入切片和图谱构建中,实现更全面的知识表示。
  • 用户意图驱动的自适应切片: 根据用户的实时查询意图和偏好,动态调整切片的粒度和组合方式。
  • 跨文档知识融合: 将多个相关文档进行语义切片,并整合到一个更大的知识图谱中,实现跨文档的知识关联和检索。

通过今天的探讨,我们看到了语义切片如何从根本上改变我们处理和理解大型文档的方式。它将文档从线性的信息流转化为互联互通的知识网络,从而实现前所未有的高保真知识召回。这不仅是技术上的飞跃,更是向构建真正智能、可解释的知识系统迈出的重要一步。虽然挑战依然存在,但其潜力无疑是巨大的,值得我们每一位技术工作者深入探索和实践。

谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注