实战:利用大模型自动对比分析,找出你内容中缺乏‘可采信事实’的语义真空区

各位同仁,各位技术爱好者,大家好!

今天,我们齐聚一堂,共同探讨一个在信息爆炸时代日益凸显的关键问题:如何确保我们生产和消费的内容,尤其是技术内容,具备足够的“可采信事实”。在海量信息中,我们常常会遭遇一些看似言之凿凿,实则空泛无物,缺乏具体数据、来源或证据支撑的论断。我将这类区域称之为“语义真空区”。它不仅削弱了内容的权威性和可信度,更可能导致错误的决策和理解偏差。

作为编程专家,我们深知数据的力量和逻辑的严谨。当今,随着大模型技术的飞速发展,我们有了一个前所未有的强大工具,能够自动化地识别并定位这些“语义真空区”。今天,我将以一场实战讲座的形式,深入剖析如何利用大模型进行对比分析,系统地找出内容中缺乏“可采信事实”的区域。我们将从理论概念出发,通过具体的代码实践,构建一个端到端的解决方案。

一、信息洪流中的信任危机与大模型的机遇

我们生活在一个信息爆炸的时代,无论是新闻报道、技术文档、市场分析,还是产品说明,每天都有海量内容被生产和传播。然而,这种便捷性也带来了挑战:内容的质量参差不齐,假新闻、误导性信息、以及缺乏实质性支撑的“空话”层出不穷。对于技术领域而言,一个未经证实的数据、一个缺乏原理支撑的断言,都可能带来严重的后果。

什么是“可采信事实”?
在我看来,“可采信事实”是指那些客观的、可验证的、有明确来源或证据支持的信息点。它可能是一个统计数据、一个实验结果、一个技术规范、一个事件的发生时间地点、一个人物的身份或言论等。其核心在于“可验证性”和“溯源性”。

什么是“语义真空区”?
“语义真空区”特指内容中那些提出观点、做出断言、描述现象,但却缺乏上述“可采信事实”作为支撑的段落或语句。它可能表现为:

  1. 空泛的形容词和副词堆砌: “我们的产品性能卓越,用户体验极佳,市场前景广阔。”——缺乏具体数据或用户反馈支撑。
  2. 未经证实的因果关系: “由于采用了最新算法,系统效率大幅提升。”——“最新算法”具体是什么?“效率大幅提升”提升了多少?如何量化?
  3. 缺少来源的引用或数据: “有研究表明,这种方法能有效解决问题。”——是哪个研究?发表在哪里?数据是什么?
  4. 过度概括或以偏概全: “所有开发者都认为……”——“所有”是否属实?基于什么调查?

识别这些区域,对于提升内容的质量、建立信任、甚至对抗AI幻觉,都具有至关重要的意义。传统上,这依赖于人工审校,效率低下且容易遗漏。而大模型,凭借其强大的自然语言理解、生成和推理能力,为我们提供了一个自动化、规模化的解决方案。

二、大模型为何能胜任此任务?

大模型,如GPT系列、Claude、Llama等,之所以能够在此任务中发挥核心作用,主要得益于以下几个方面:

  1. 深度的语义理解能力: 它们能够理解语句的含义、上下文关系,而不仅仅是关键词匹配。这使得它们能够区分事实性陈述和主观意见,理解断言的意图。
  2. 强大的上下文推理能力: 大模型可以分析文本的逻辑结构,识别出哪些论点需要事实支撑,哪些事实被用来支撑哪些论点。
  3. 知识整合与检索能力(RAG): 结合检索增强生成(RAG)技术,大模型可以与外部权威知识库、数据库或搜索引擎协同工作,进行事实的交叉验证和溯源。
  4. 生成与归纳能力: 大模型不仅能发现问题,还能生成详细的分析报告,甚至提出具体的改进建议,例如“在此处补充具体数据来源”、“请提供此观点的实验证据”。

这些能力使得大模型超越了简单的规则引擎或关键词匹配工具,能够进行更深层次、更智能的语义分析。

三、实战框架:利用大模型定位语义真空区

为了系统地利用大模型解决这个问题,我们可以设计一个多阶段的工作流。

阶段一:文本预处理与分块 (Text Preprocessing & Chunking)

大模型通常有输入长度限制(上下文窗口)。因此,我们需要将待分析的长文本进行适当的预处理和分块,同时确保每个块都能保持足够的上下文信息。

目标:

  • 将长文本分割成适合LLM处理的较小块。
  • 在分块时尽量不打断重要的语义单元(如句子、段落)。
  • 记录每个块在原文中的位置,以便后续报告生成。

技术:

  • 句子分割: 使用NLTK或SpaCy等库将文本分割成句子。
  • 段落分割: 基于换行符分割。
  • 滑动窗口分块: 确保块之间有重叠,以维持上下文。
  • Token计数: 预估每个块的token数量,避免超出模型限制。

代码示例:
我们使用Python和tiktoken(用于OpenAI模型token计数)或transformers库(用于其他模型)来进行分块。

import tiktoken
import re
from typing import List, Dict

# 假设我们使用OpenAI的gpt-4模型
ENCODING_NAME = "cl100k_base" 
MAX_TOKENS_PER_CHUNK = 1500 # 每个块的最大token数,留出余量给prompt和response
OVERLAP_TOKENS = 150 # 块之间的重叠token数

def count_tokens(text: str) -> int:
    """计算文本的token数量"""
    encoding = tiktoken.get_encoding(ENCODING_NAME)
    return len(encoding.encode(text))

def chunk_text(long_text: str) -> List[Dict]:
    """
    将长文本分割成带有重叠的块,并记录每个块的原始位置。
    尽量保持句子和段落的完整性。
    """
    # 首先按段落分割
    paragraphs = [p.strip() for p in long_text.split('n') if p.strip()]

    chunks = []
    current_chunk_text = ""
    current_start_char = 0

    for paragraph in paragraphs:
        # 尝试添加整个段落
        temp_text = current_chunk_text + ("nn" if current_chunk_text else "") + paragraph
        if count_tokens(temp_text) <= MAX_TOKENS_PER_CHUNK:
            current_chunk_text = temp_text
        else:
            # 如果添加整个段落超出,则先保存当前块
            if current_chunk_text:
                chunks.append({
                    "text": current_chunk_text,
                    "start_char": current_start_char,
                    "end_char": current_start_char + len(current_chunk_text)
                })

            # 从新段落开始构建新块,并考虑重叠
            # 简化处理:这里直接从新段落开始,更复杂的重叠需要回溯上一个块的结尾
            # 对于长文本,更实用的是基于句子或固定长度的滑动窗口

            # 更精细的重叠处理:回溯上一个块的末尾部分作为新块的前缀
            overlap_prefix = ""
            if chunks:
                last_chunk_text = chunks[-1]["text"]
                # 尝试获取上一个块末尾的OVERLAP_TOKENS数量的文本
                last_chunk_tokens = encoding.encode(last_chunk_text)
                if len(last_chunk_tokens) > OVERLAP_TOKENS:
                    overlap_prefix_tokens = last_chunk_tokens[-OVERLAP_TOKENS:]
                    overlap_prefix = encoding.decode(overlap_prefix_tokens)
                else:
                    overlap_prefix = last_chunk_text

            current_chunk_text = (overlap_prefix + "nn" if overlap_prefix else "") + paragraph
            current_start_char = long_text.find(paragraph, current_start_char) # 找到段落的实际开始位置

            # 如果新段落本身就超出最大token数,需要进一步细分(这里暂时忽略,假设段落不会过长)
            if count_tokens(current_chunk_text) > MAX_TOKENS_PER_CHUNK:
                # 警告或进一步按句子细分
                print(f"Warning: Paragraph too long, needs further splitting: {paragraph[:100]}...")
                # 简单处理:直接将此过长段落作为一个块,可能会超出限制,但避免丢失内容
                chunks.append({
                    "text": paragraph,
                    "start_char": long_text.find(paragraph, current_start_char),
                    "end_char": long_text.find(paragraph, current_start_char) + len(paragraph)
                })
                current_chunk_text = "" # 重置,等待下一个段落
                current_start_char += len(paragraph) # 更新起始字符位置
            else:
                pass # 当前块已更新,等待下一个段落

    if current_chunk_text:
        chunks.append({
            "text": current_chunk_text,
            "start_char": current_start_char, # 实际需要精确定位
            "end_char": current_start_char + len(current_chunk_text) # 实际需要精确定位
        })

    # 重新计算start_char和end_char,因为上面的逻辑可能不精确
    final_chunks = []
    current_pos = 0
    for chunk in chunks:
        # 找到chunk_text在long_text中的准确位置
        start_idx = long_text.find(chunk['text'], current_pos)
        if start_idx != -1:
            chunk['start_char'] = start_idx
            chunk['end_char'] = start_idx + len(chunk['text'])
            final_chunks.append(chunk)
            current_pos = start_idx + len(chunk['text']) - OVERLAP_TOKENS # 为下一个查找提供一个近似的起始点
        else:
            # 如果直接查找失败,可能是因为重叠处理导致文本略有不同,需要更鲁棒的匹配
            # 简单起见,这里假设能找到
            pass

    return final_chunks

# 示例文本
sample_text = """
尊敬的各位专家、同事:

欢迎大家参加本次“智能风控系统V3.0技术研讨会”。本次系统升级是我们在金融科技领域的一次重大突破。

根据最新的市场调研报告,金融欺诈案件在全球范围内呈现逐年上升趋势,给金融机构带来了巨大的经济损失和声誉风险。我们的V3.0系统正是针对这一痛点,通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。

具体而言,新系统采用了Transformer架构的风险评估模型,结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。在内部测试中,我们将误报率降低了20%,同时将风险识别的准确率提升了15%,这些数据远超行业平均水平。此外,系统的可扩展性也得到了显著增强,能够轻松应对未来业务量的增长。

我们相信,V3.0系统将为金融机构提供前所未有的安全保障,并有望成为行业的新标杆。请大家踊跃提问,共同探讨。

谢谢大家!
"""

# chunks = chunk_text(sample_text)
# for i, chunk in enumerate(chunks):
#     print(f"Chunk {i+1} (Chars {chunk['start_char']}-{chunk['end_char']}):n{chunk['text']}n---")
# 
# 
# 
# 
# # 修正chunk_text函数以更精确地处理start_char和end_char,尤其是在有重叠的情况下
# # 实际上,为了简化和准确性,我们可以先按句子分割,然后累积句子直到达到token限制,并记录原始索引。
# # 这是一个更健壮的分块策略,直接通过原始文本索引来定位。
# def chunk_text_by_sentence(long_text: str) -> List[Dict]:
#     """
#     将长文本按句子分割,并累积成块,确保每个块不超过最大token数,并记录原始索引。
#     """
#     encoding = tiktoken.get_encoding(ENCODING_NAME)
#     sentences = re.split(r'(?<=[.!?])s+', long_text) # 简单的句子分割
# 
#     chunks = []
#     current_chunk_sentences = []
#     current_chunk_start_char = 0
# 
#     for i, sentence in enumerate(sentences):
#         temp_chunk_text = " ".join(current_chunk_sentences + [sentence])
#         if count_tokens(temp_chunk_text) <= MAX_TOKENS_PER_CHUNK:
#             current_chunk_sentences.append(sentence)
#             if not current_chunk_sentences: # 如果是第一个句子,记录其起始位置
#                 current_chunk_start_char = long_text.find(sentence)
#         else:
#             # 如果添加新句子超出限制,则保存当前块
#             if current_chunk_sentences:
#                 chunk_text = " ".join(current_chunk_sentences)
#                 # 找到这个chunk_text在原文本中的准确起始位置
#                 start_idx = long_text.find(chunk_text, current_chunk_start_char)
#                 if start_idx == -1: # 如果直接找不到,可能因为空格或其他细微差异,尝试更模糊匹配或从上次结束点开始找
#                    start_idx = current_chunk_start_char # 回退到上一个已知的起始点
#                    
#                 chunks.append({
#                     "text": chunk_text,
#                     "start_char": start_idx,
#                     "end_char": start_idx + len(chunk_text)
#                 })
#                 
#             # 开始新的块,并将当前句子作为新块的第一个句子
#             current_chunk_sentences = [sentence]
#             current_chunk_start_char = long_text.find(sentence, chunks[-1]['end_char'] if chunks else 0)
# 
#     # 添加最后一个块
#     if current_chunk_sentences:
#         chunk_text = " ".join(current_chunk_sentences)
#         start_idx = long_text.find(chunk_text, current_chunk_start_char)
#         if start_idx == -1:
#             start_idx = current_chunk_start_char
#         chunks.append({
#             "text": chunk_text,
#             "start_char": start_idx,
#             "end_char": start_idx + len(chunk_text)
#         })
# 
#     return chunks
# 
# # 重新测试
# chunks_v2 = chunk_text_by_sentence(sample_text)
# for i, chunk in enumerate(chunks_v2):
#     print(f"Chunk {i+1} (Chars {chunk['start_char']}-{chunk['end_char']}):n{chunk['text']}n---")
# 
# # 考虑到实际分块的复杂性,尤其是在长文本和重叠处理上,
# # 实际生产中会使用更成熟的库,如LangChain的TextSplitter。
# # 但此处为了展示核心逻辑,我们简化实现。
# # 
# # 更可靠的实现,使用 LangChain 的 RecursiveCharacterTextSplitter
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# 
# def chunk_text_langchain(long_text: str) -> List[Dict]:
#     text_splitter = RecursiveCharacterTextSplitter(
#         chunk_size=MAX_TOKENS_PER_CHUNK * 4, # LangChain chunk_size是字符数,粗略估计1 token ~ 4字符
#         chunk_overlap=OVERLAP_TOKENS * 4,
#         length_function=len, # 使用字符长度
#         separators=["nn", "n", " ", ""] # 优先按段落、行、空格分割
#     )
#     
#     docs = text_splitter.create_documents([long_text])
#     
#     chunks_with_metadata = []
#     for doc in docs:
#         # 获取每个块在原始文本中的位置信息
#         # LangChain 默认不提供start_char/end_char,需要手动实现或通过其他方式追踪
#         # 这里我们只返回文本,如果需要精确位置,需要更复杂的逻辑,例如在分割时自行记录
#         chunks_with_metadata.append({
#             "text": doc.page_content,
#             "start_char": -1, # Placeholder, needs actual implementation to track
#             "end_char": -1    # Placeholder
#         })
#         
#     return chunks_with_metadata
# 
# # 考虑到要求代码逻辑严谨,上述手写的分块函数在处理start_char/end_char时,
# # 对于复杂的文本匹配可能不完全精确,尤其是在有重叠和特殊字符时。
# # 更严谨的做法是:
# # 1. 预先将原始文本的每个字符映射到其索引。
# # 2. 分块时,记录每个块的起始和结束字符索引。
# # 
# # 这里提供一个更直接但可能牺牲部分语义完整性的分块方法,更易于定位:
# def chunk_text_with_index(long_text: str) -> List[Dict]:
#     encoding = tiktoken.get_encoding(ENCODING_NAME)
#     
#     # 将文本按句子分割,并保留每个句子的原始起始索引
#     sentences_with_indices = []
#     current_pos = 0
#     for sentence in re.split(r'(?<=[.!?])s+', long_text):
#         sentence = sentence.strip()
#         if sentence:
#             start_idx = long_text.find(sentence, current_pos)
#             if start_idx != -1:
#                 sentences_with_indices.append({"text": sentence, "start_char": start_idx, "end_char": start_idx + len(sentence)})
#                 current_pos = start_idx + len(sentence)
#             else: # Fallback if direct find fails (e.g., due to extra spaces in split)
#                 sentences_with_indices.append({"text": sentence, "start_char": -1, "end_char": -1}) # Mark as unknown
#                 current_pos += len(sentence) # Advance cursor roughly
# 
#     chunks = []
#     current_chunk_sentences = []
#     current_chunk_start_char = -1
# 
#     for sent_info in sentences_with_indices:
#         temp_chunk_text = " ".join([s['text'] for s in current_chunk_sentences] + [sent_info['text']])
#         
#         if count_tokens(temp_chunk_text) <= MAX_TOKENS_PER_CHUNK:
#             current_chunk_sentences.append(sent_info)
#             if current_chunk_start_char == -1:
#                 current_chunk_start_char = sent_info['start_char']
#         else:
#             # Save current chunk
#             if current_chunk_sentences:
#                 chunk_text = " ".join([s['text'] for s in current_chunk_sentences])
#                 end_char = current_chunk_sentences[-1]['end_char']
#                 chunks.append({
#                     "text": chunk_text,
#                     "start_char": current_chunk_start_char,
#                     "end_char": end_char
#                 })
#             
#             # Start new chunk with current sentence
#             current_chunk_sentences = [sent_info]
#             current_chunk_start_char = sent_info['start_char']
# 
#     # Add the last chunk
#     if current_chunk_sentences:
#         chunk_text = " ".join([s['text'] for s in current_chunk_sentences])
#         end_char = current_chunk_sentences[-1]['end_char']
#         chunks.append({
#             "text": chunk_text,
#             "start_char": current_chunk_start_char,
#             "end_char": end_char
#         })
# 
#     return chunks

# 考虑到实际的鲁棒性和精确性,以及避免手写正则匹配的坑,
# 我们将直接使用一个简化的分块逻辑,专注于核心的大模型交互。
# 在实际应用中,强烈建议使用 LangChain 或 llama_index 等库提供的 TextSplitter。

def simple_chunk_text(long_text: str) -> List[Dict]:
    """
    一个简化的分块函数,旨在快速展示,不保证最佳语义保留和重叠处理。
    """
    sentences = re.split(r'(?<=[.!?])s+', long_text)
    chunks = []
    current_chunk_text = ""
    current_start_char = 0

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        temp_chunk = current_chunk_text + (" " if current_chunk_text else "") + sentence
        if count_tokens(temp_chunk) <= MAX_TOKENS_PER_CHUNK:
            current_chunk_text = temp_chunk
        else:
            if current_chunk_text:
                # 找到当前块在原始文本中的准确位置
                start_idx = long_text.find(current_chunk_text, current_start_char)
                if start_idx != -1:
                    chunks.append({
                        "text": current_chunk_text,
                        "start_char": start_idx,
                        "end_char": start_idx + len(current_chunk_text)
                    })
                    current_start_char = start_idx + len(current_chunk_text)
                else: # Fallback
                    chunks.append({
                        "text": current_chunk_text,
                        "start_char": current_start_char,
                        "end_char": current_start_char + len(current_chunk_text)
                    })
                    current_start_char += len(current_chunk_text)

            current_chunk_text = sentence

    if current_chunk_text:
        start_idx = long_text.find(current_chunk_text, current_start_char)
        if start_idx != -1:
            chunks.append({
                "text": current_chunk_text,
                "start_char": start_idx,
                "end_char": start_idx + len(current_chunk_text)
            })
        else:
            chunks.append({
                "text": current_chunk_text,
                "start_char": current_start_char,
                "end_char": current_start_char + len(current_chunk_text)
            })

    return chunks

# chunks = simple_chunk_text(sample_text)
# for i, chunk in enumerate(chunks):
#     print(f"Chunk {i+1} (Chars {chunk['start_char']}-{chunk['end_char']}):n{chunk['text']}n---")

阶段二:事实性声明的提取 (Fact Claim Extraction)

对于每个文本块,我们利用大模型识别其中所有的事实性声明。这需要精心的Prompt工程。

目标:

  • 从文本中识别出所有潜在的、需要验证的事实性陈述。
  • 排除主观意见、指令、疑问句等非事实性内容。

技术:

  • 零样本/少样本提示: 直接向模型提出要求,或提供少量示例。
  • 结构化输出: 要求模型以JSON格式返回结果,便于程序处理。

Prompt设计策略:

策略名称 描述 优点 缺点
直接指令 “请从以下文本中提取所有事实性陈述。” 简单直接 可能包含主观判断,召回率和准确率一般
定义引导 “事实性陈述是客观的、可验证的、有明确来源的。请提取此类陈述。” 提高准确率 仍可能遗漏部分边缘情况
示例引导 (Few-shot) 提供几个文本-事实对的示例,让模型学习模式。 显著提高准确率和召回率 需要精心准备示例,增加Prompt长度
反例引导 明确指出哪些不是事实性陈述(如观点、假设、疑问)。 进一步过滤非事实性内容 增加Prompt复杂度
JSON格式输出 要求模型以JSON数组形式返回每个事实,包含“statement”和“original_span”等字段。 便于自动化解析和后续处理 对模型生成能力有要求,可能偶尔格式错误

代码示例:
这里我们使用一个模拟的LLM API调用,实际中会替换为OpenAI、Anthropic或其他大模型的SDK。

import json
import os
# from openai import OpenAI # 实际使用时取消注释

# 模拟LLM响应
def mock_llm_call(prompt: str) -> str:
    """模拟大模型的API调用"""
    if "请从以下文本中提取所有事实性陈述" in prompt:
        if "智能风控系统V3.0" in prompt:
            return json.dumps([
                {"statement": "智能风控系统V3.0是我们在金融科技领域的一次重大突破。", "original_span": "智能风控系统V3.0是我们在金融科技领域的一次重大突破。"},
                {"statement": "金融欺诈案件在全球范围内呈现逐年上升趋势。", "original_span": "金融欺诈案件在全球范围内呈现逐年上升趋势"},
                {"statement": "V3.0系统通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。", "original_span": "V3.0系统正是针对这一痛点,通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。"},
                {"statement": "新系统采用了Transformer架构的风险评估模型。", "original_span": "新系统采用了Transformer架构的风险评估模型"},
                {"statement": "新系统结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。", "original_span": "结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。"},
                {"statement": "在内部测试中,我们将误报率降低了20%。", "original_span": "在内部测试中,我们将误报率降低了20%"},
                {"statement": "我们将风险识别的准确率提升了15%。", "original_span": "同时将风险识别的准确率提升了15%"},
                {"statement": "这些数据远超行业平均水平。", "original_span": "这些数据远超行业平均水平。"},
                {"statement": "系统的可扩展性得到了显著增强。", "original_span": "系统的可扩展性也得到了显著增强"}
            ], ensure_ascii=False)
        else:
            return json.dumps([{"statement": "这是一个模拟的事实。", "original_span": "这是一个模拟的事实。"}], ensure_ascii=False)
    return json.dumps([])

# # 实际的OpenAI API调用示例
# class LLMClient:
#     def __init__(self, api_key: str):
#         self.client = OpenAI(api_key=api_key)
# 
#     def call(self, prompt: str, model: str = "gpt-4-turbo-preview", temperature: float = 0.1) -> str:
#         try:
#             response = self.client.chat.completions.create(
#                 model=model,
#                 messages=[
#                     {"role": "system", "content": "你是一个严谨的事实提取助手,擅长从文本中识别客观、可验证的事实性陈述。"},
#                     {"role": "user", "content": prompt}
#                 ],
#                 temperature=temperature,
#                 response_format={"type": "json_object"} # 要求JSON格式
#             )
#             return response.choices[0].message.content
#         except Exception as e:
#             print(f"LLM API call failed: {e}")
#             return json.dumps({"error": str(e)})

# llm_client = LLMClient(api_key=os.getenv("OPENAI_API_KEY")) # 替换为你的API Key

def extract_fact_claims(text_chunk: str, llm_caller) -> List[Dict]:
    """
    使用大模型从文本块中提取事实性声明。
    """
    prompt = f"""
你是一个严谨的语言模型,你的任务是从以下文本中识别并提取所有客观的、可验证的、具备明确信息来源潜力的事实性陈述。
请忽略主观意见、情感表达、指令、疑问句或未来预测。
对于每个识别出的事实性陈述,请提供其陈述内容(statement)和它在原始文本中的精确片段(original_span)。
请以JSON数组的形式返回结果,每个元素是一个包含 'statement' 和 'original_span' 键的字典。
如果文本中没有可提取的事实性陈述,请返回一个空数组 `[]`。

文本:

{text_chunk}


JSON格式输出:
"""
    try:
        response_json_str = llm_caller(prompt) # 实际调用LLM
        # print(f"LLM Response for fact extraction: {response_json_str[:200]}...") # Debug
        claims = json.loads(response_json_str)
        # 验证 JSON 结构
        if not isinstance(claims, list):
            print(f"Warning: LLM returned non-list for fact claims: {claims}")
            return []
        for claim in claims:
            if not isinstance(claim, dict) or "statement" not in claim or "original_span" not in claim:
                print(f"Warning: LLM returned malformed claim: {claim}")
                return [] # Or attempt to fix, or just return valid ones
        return claims
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON from LLM for fact extraction: {e}")
        print(f"Raw LLM response: {response_json_str}")
        return []
    except Exception as e:
        print(f"An unexpected error occurred during fact extraction: {e}")
        return []

# # 示例使用
# chunks = simple_chunk_text(sample_text)
# all_fact_claims = []
# for i, chunk in enumerate(chunks):
#     print(f"Processing chunk {i+1}...")
#     claims = extract_fact_claims(chunk['text'], mock_llm_call) # 使用模拟LLM
#     # 将原始文本的字符位置信息添加到提取的事实中
#     for claim in claims:
#         # 粗略定位,实际应更精确
#         relative_start = chunk['text'].find(claim['original_span'])
#         if relative_start != -1:
#             claim['start_char'] = chunk['start_char'] + relative_start
#             claim['end_char'] = claim['start_char'] + len(claim['original_span'])
#         else: # Fallback if exact span not found in chunk
#             claim['start_char'] = chunk['start_char']
#             claim['end_char'] = chunk['end_char'] # Assign chunk bounds
#         claim['chunk_id'] = i # 记录其所属的块
#     all_fact_claims.extend(claims)
# 
# print("nExtracted Fact Claims:")
# for claim in all_fact_claims:
#     print(f"- Statement: {claim['statement']} (Span: {claim.get('start_char', 'N/A')}-{claim.get('end_char', 'N/A')})")

阶段三:事实的交叉验证与溯源 (Fact Cross-Verification & Tracing)

这是核心步骤,我们将对提取出的事实声明进行验证。

策略 A:基于RAG的外部知识库验证 (RAG-based External Verification)

目标:

  • 查询外部权威数据源,验证事实的真伪或获取支撑证据。
  • 为每个事实声明提供一个“可信度评分”或“验证状态”。

技术:

  • 向量数据库: 存储领域知识、文档、数据记录的嵌入向量。
  • 嵌入模型: 将事实声明转换为向量,用于检索。
  • 信息检索: 在向量数据库中查找最相关的证据。
  • LLM二次判断: 将事实声明、检索到的证据以及验证任务提交给LLM,由其判断事实的真伪、相关性或缺失。

代码示例:

# 假设我们有一个简单的模拟向量数据库和检索函数
class MockVectorDB:
    def __init__(self, documents: List[Dict]):
        self.documents = documents
        # 实际中会生成并存储嵌入向量

    def retrieve_relevant_documents(self, query: str, top_k: int = 3) -> List[str]:
        """模拟检索最相关的文档"""
        # 实际会进行向量相似度搜索
        results = []
        # 简单模拟:如果查询关键词在文档中,则返回
        for doc in self.documents:
            if any(keyword in doc["content"] for keyword in query.split()):
                results.append(doc["content"])
        return results[:top_k]

# 模拟的外部知识库文档
external_knowledge = [
    {"id": "doc1", "content": "根据2023年全球金融欺诈报告,全球金融欺诈案件增长了12%,造成的损失达数千亿美元。"},
    {"id": "doc2", "content": "Transformer架构在自然语言处理和时间序列预测中表现出色。"},
    {"id": "doc3", "content": "某项研究表明,采用多模态数据分析可以有效提高风险识别准确率。"},
    {"id": "doc4", "content": "行业平均误报率在15%-25%之间,平均准确率在70%-80%之间。"},
    {"id": "doc5", "content": "目前没有任何官方报告指出智能风控系统V3.0是某个领域的重大突破。"}
]
mock_vector_db = MockVectorDB(external_knowledge)

def verify_fact_with_rag(fact_statement: str, llm_caller, vector_db: MockVectorDB) -> Dict:
    """
    使用RAG机制验证事实声明。
    """
    # 1. 检索相关证据
    relevant_docs = vector_db.retrieve_relevant_documents(fact_statement)

    context = ""
    if relevant_docs:
        context = "nn相关证据:n" + "n".join([f"- {doc}" for doc in relevant_docs])
    else:
        context = "nn未能找到直接相关证据。"

    # 2. 提交给LLM进行判断
    prompt = f"""
请根据提供的“事实声明”和“相关证据”,判断该事实声明的可信度。
你的任务是:
1.  判断“事实声明”是否与“相关证据”一致、冲突或无法验证。
2.  如果无法验证,请说明是由于证据不足还是证据不相关。
3.  给出最终的验证状态(Verified/Contradicted/Unverifiable/Partially Verified)。
4.  提供一个简短的验证理由。
5.  以JSON格式返回结果,包含 'status', 'reason', 'confidence_score' (0-1)。

事实声明:

{fact_statement}

{context}

JSON格式输出:
"""
    try:
        response_json_str = llm_caller(prompt)
        # print(f"LLM Response for RAG verification: {response_json_str[:200]}...") # Debug
        verification_result = json.loads(response_json_str)
        if not isinstance(verification_result, dict) or "status" not in verification_result:
            print(f"Warning: LLM returned malformed verification result: {verification_result}")
            return {"status": "Error", "reason": "Malformed LLM response", "confidence_score": 0.0}
        return verification_result
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON from LLM for RAG verification: {e}")
        print(f"Raw LLM response: {response_json_str}")
        return {"status": "Error", "reason": f"JSON Decode Error: {e}", "confidence_score": 0.0}
    except Exception as e:
        print(f"An unexpected error occurred during RAG verification: {e}")
        return {"status": "Error", "reason": f"Unexpected error: {e}", "confidence_score": 0.0}

# # 示例使用
# verified_claims = []
# for claim in all_fact_claims:
#     print(f"nVerifying: {claim['statement']}...")
#     verification_result = verify_fact_with_rag(claim['statement'], mock_llm_call, mock_vector_db)
#     claim.update(verification_result) # 将验证结果合并到事实声明中
#     verified_claims.append(claim)
# 
# print("nVerified Claims with Status:")
# for claim in verified_claims:
#     print(f"- Statement: {claim['statement']}n  Status: {claim['status']}, Reason: {claim['reason']}, Confidence: {claim['confidence_score']}n")
策略 B:内部一致性检查 (Internal Consistency Check)

目标:

  • 检查文档内部是否存在相互矛盾的事实声明。
  • 识别潜在的逻辑错误或信息不一致。

技术:

  • 将文档中的多个事实声明两两组合,提交给LLM判断是否存在冲突。
def check_internal_consistency(fact_claims: List[Dict], llm_caller) -> List[Dict]:
    """
    检查一组事实声明之间的内部一致性。
    """
    inconsistencies = []
    # 简化处理:只检查少量潜在冲突对,大规模检查复杂度高
    if len(fact_claims) < 2:
        return inconsistencies

    # 选取一些可能相关的对进行检查
    # 实际中会使用embedding相似度来选择最可能冲突的对
    pairs_to_check = []
    for i in range(len(fact_claims)):
        for j in range(i + 1, min(i + 3, len(fact_claims))): # 只检查相邻的几个,避免O(N^2)
            pairs_to_check.append((fact_claims[i], fact_claims[j]))

    for claim1, claim2 in pairs_to_check:
        prompt = f"""
请判断以下两个事实声明是否存在逻辑上的冲突或不一致。
请以JSON格式返回结果,包含 'conflict' (boolean), 'reason' (string)。

事实声明1:

{claim1[‘statement’]}


事实声明2:

{claim2[‘statement’]}


JSON格式输出:
"""
        try:
            response_json_str = llm_caller(prompt)
            # print(f"LLM Response for internal consistency: {response_json_str[:200]}...") # Debug
            result = json.loads(response_json_str)
            if result.get("conflict"):
                inconsistencies.append({
                    "claim1": claim1['statement'],
                    "claim2": claim2['statement'],
                    "reason": result.get("reason", "未提供具体原因")
                })
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON for internal consistency: {e}")
            print(f"Raw LLM response: {response_json_str}")
        except Exception as e:
            print(f"An unexpected error occurred during internal consistency check: {e}")

    return inconsistencies

# # 示例使用
# # 假设我们有以下两个可能冲突的声明
# # verified_claims.append({"statement": "金融欺诈案件在全球范围内呈现逐年下降趋势。", "original_span": "...", "status": "Unverifiable", "reason": "...", "confidence_score": 0.5})
# inconsistencies = check_internal_consistency(verified_claims, mock_llm_call)
# print("nInternal Inconsistencies Found:")
# for inconsistency in inconsistencies:
#     print(f"- Conflict between:n  1. {inconsistency['claim1']}n  2. {inconsistency['claim2']}n  Reason: {inconsistency['reason']}n")
策略 C:领域特定规则校验 (Domain-Specific Rule Validation)

目标:

  • 针对特定领域(如日期格式、数值范围、专业术语等)进行硬编码的规则检查。
  • 弥补LLM在精确性、时效性或特定格式校验上的不足。
import re

def validate_domain_rules(fact_claim: Dict) -> Dict:
    """
    对特定领域的事实声明进行规则校验。
    例如:检查百分比是否在0-100范围内,日期格式是否正确等。
    """
    statement = fact_claim['statement']
    validation_status = "Valid"
    validation_reason = "通过领域规则校验"

    # 规则1: 检查百分比数据是否合理 (0-100%)
    percentage_matches = re.findall(r'(d+(.d+)?)%', statement)
    for match in percentage_matches:
        value = float(match[0])
        if not (0 <= value <= 100):
            validation_status = "Invalid"
            validation_reason = f"百分比数据 '{value}%' 超出0-100%合理范围。"
            break

    # 规则2: 检查年份是否在合理范围 (例如,不早于1900年,不晚于当前年份+1)
    year_matches = re.findall(r'(d{4})年', statement)
    current_year = 2024 # datetime.datetime.now().year
    for year_str in year_matches:
        year = int(year_str)
        if not (1900 <= year <= current_year + 1):
            validation_status = "Invalid"
            validation_reason = f"年份数据 '{year}年' 超出合理范围。"
            break

    # 规则3: 检查“秒级预警”是否有具体数值支撑,或者是否过于模糊
    if "秒级预警" in statement and not re.search(r'd+(.d+)?秒内', statement):
        validation_status = "Needs Elaboration"
        validation_reason = "提及‘秒级预警’,但缺乏具体时间量化,建议补充具体秒数。"

    return {
        "domain_validation_status": validation_status,
        "domain_validation_reason": validation_reason
    }

# # 示例使用
# for claim in verified_claims:
#     domain_result = validate_domain_rules(claim)
#     claim.update(domain_result)
# print("nClaims with Domain Validation:")
# for claim in verified_claims:
#     print(f"- Statement: {claim['statement']}n  Domain Status: {claim['domain_validation_status']}, Reason: {claim['domain_validation_reason']}n")

阶段四:语义真空区识别与量化 (Semantic Vacuum Zone Identification & Quantification)

结合前几个阶段的验证结果,我们现在可以识别并量化语义真空区。

目标:

  • 标记出缺乏充分验证、被驳斥或需要更多证据支持的事实声明。
  • 识别出文本中那些“应该有事实但没有”的区域。
  • 为每个区域提供一个“真空度评分”。

技术:

  • 基于验证状态的评分: 为每个事实声明分配一个分数。
    • Verified: 0 分 (非真空)
    • Partially Verified: 0.3 分
    • Unverifiable: 0.7 分 (高度真空)
    • Contradicted: 1.0 分 (严重真空)
    • Needs Elaboration (来自领域规则): 0.5 分
  • 上下文分析: LLM可以进一步分析在某个段落中,哪些观点被提出但缺乏支撑,从而识别隐性的语义真空。

语义真空度评分标准示例:

验证状态 描述 真空度评分 建议操作
Verified 事实已确认,有可靠证据。 0.0 无需操作
Partially Verified 部分证据支持,但不够充分或有歧义。 0.3 补充更多细节或来源
Unverifiable 无法找到支持或反驳证据。 0.7 必须补充证据,或修改措辞
Contradicted 与已知事实或内部信息冲突。 1.0 立即修正或删除
Needs Elaboration 声明不够具体,缺乏量化或细节。 0.5 补充具体数据、时间、地点等
Implied Vacuum 文本提出观点但未提供任何事实支撑。 0.8 补充相关事实或论据
def calculate_vacuum_score(fact_claim: Dict) -> float:
    """
    根据事实声明的验证状态计算其语义真空度评分。
    """
    status_to_score = {
        "Verified": 0.0,
        "Partially Verified": 0.3,
        "Unverifiable": 0.7,
        "Contradicted": 1.0,
        "Needs Elaboration": 0.5, # 来自领域规则
        "Error": 0.9 # 验证过程中出错,也视为高风险
    }
    score = status_to_score.get(fact_claim.get('status', 'Error'), 0.9)

    # 如果领域规则有额外问题,提高分数
    if fact_claim.get('domain_validation_status') == "Invalid":
        score = max(score, 0.8)
    elif fact_claim.get('domain_validation_status') == "Needs Elaboration":
        score = max(score, 0.5)

    return score

def identify_semantic_vacuum_zones(processed_claims: List[Dict], original_text: str, chunks: List[Dict], llm_caller) -> List[Dict]:
    """
    识别语义真空区,包括基于验证结果的,以及LLM识别出的隐性真空。
    """
    vacuum_zones = []

    # 1. 基于已处理事实声明的真空区
    for claim in processed_claims:
        vacuum_score = calculate_vacuum_score(claim)
        if vacuum_score > 0: # 任何非0分的都视为真空
            vacuum_zones.append({
                "type": "Explicit Fact Vacuum",
                "statement": claim['statement'],
                "original_span": claim['original_span'],
                "start_char": claim.get('start_char', -1),
                "end_char": claim.get('end_char', -1),
                "vacuum_score": vacuum_score,
                "reason": f"验证状态: {claim.get('status', '未知')}, 理由: {claim.get('reason', 'N/A')}"
                        + (f", 领域校验: {claim.get('domain_validation_reason', '')}" if claim.get('domain_validation_status') != "Valid" else "")
            })

    # 2. LLM识别隐性语义真空(段落层面)
    # 遍历原始的文本块,让LLM判断是否有观点缺乏支撑
    for i, chunk_info in enumerate(chunks):
        chunk_text = chunk_info['text']

        # 排除已经有明确事实声明的区域(避免重复标记)
        # 实际需要更精细的排除逻辑,这里简化

        prompt = f"""
请分析以下文本段落,识别其中是否存在提出观点、做出断言,但却完全缺乏具体事实、数据、来源或证据支撑的“语义真空区”。
请忽略已经包含明确事实性陈述(即使这些事实性陈述本身可能未被验证)的语句。
你的重点是那些“应该有事实但没有”的空泛表述。
以JSON格式返回结果,包含一个布尔值 'has_implicit_vacuum' 和一个列表 'vacuum_statements',
每个元素是缺乏支撑的语句和其在段落中的大致位置(span)。
如果不存在,请返回 `{{ "has_implicit_vacuum": false, "vacuum_statements": [] }}`。

文本段落:

{chunk_text}


JSON格式输出:
"""
        try:
            response_json_str = llm_caller(prompt)
            # print(f"LLM Response for implicit vacuum: {response_json_str[:200]}...") # Debug
            implicit_vacuum_result = json.loads(response_json_str)
            if implicit_vacuum_result.get("has_implicit_vacuum") and implicit_vacuum_result.get("vacuum_statements"):
                for vac_stmt in implicit_vacuum_result["vacuum_statements"]:
                    # 粗略定位,实际应更精确
                    relative_start = chunk_text.find(vac_stmt.get('statement', ''))
                    if relative_start != -1:
                        stmt_start_char = chunk_info['start_char'] + relative_start
                        stmt_end_char = stmt_start_char + len(vac_stmt.get('statement', ''))
                    else:
                        stmt_start_char = chunk_info['start_char']
                        stmt_end_char = chunk_info['end_char']

                    vacuum_zones.append({
                        "type": "Implicit Semantic Vacuum",
                        "statement": vac_stmt.get('statement', 'N/A'),
                        "original_span": vac_stmt.get('statement', 'N/A'), # LLM可能只给statement
                        "start_char": stmt_start_char,
                        "end_char": stmt_end_char,
                        "vacuum_score": 0.8, # 默认给0.8
                        "reason": f"段落中提出观点但缺乏事实支撑。"
                    })
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON for implicit vacuum: {e}")
            print(f"Raw LLM response: {response_json_str}")
        except Exception as e:
            print(f"An unexpected error occurred during implicit vacuum detection: {e}")

    return vacuum_zones

# # 示例使用
# # 假设 processed_claims 已经包含了所有验证结果
# semantic_vacuum_zones = identify_semantic_vacuum_zones(verified_claims, sample_text, chunks, mock_llm_call)
# print("nIdentified Semantic Vacuum Zones:")
# for i, zone in enumerate(semantic_vacuum_zones):
#     print(f"Zone {i+1} (Score: {zone['vacuum_score']:.2f}):")
#     print(f"  Type: {zone['type']}")
#     print(f"  Statement: {zone['statement']}")
#     print(f"  Reason: {zone['reason']}")
#     print(f"  Original Span: Chars {zone['start_char']}-{zone['end_char']}n")

阶段五:报告生成与优化建议 (Report Generation & Optimization Suggestions)

最终,我们需要将发现的语义真空区以清晰、可操作的方式呈现给用户,并提供具体的改进建议。

目标:

  • 生成一份详细的报告,列出所有语义真空区。
  • 为每个真空区提供具体的修改建议。
  • 总结内容整体的“可信度得分”。

技术:

  • LLM总结与建议生成: 利用LLM的生成能力,根据分析结果撰写报告和建议。
  • Markdown/HTML格式: 生成易于阅读和集成的报告。
def generate_report_and_suggestions(original_text: str, semantic_vacuum_zones: List[Dict], llm_caller) -> str:
    """
    生成详细的语义真空区分析报告和优化建议。
    """
    report_parts = []

    report_parts.append("# 内容语义真空区分析报告nn")
    report_parts.append(f"**分析日期:** {os.popen('date').read().strip()}n")
    report_parts.append(f"**原文总字数:** {len(original_text)} 字nn")

    if not semantic_vacuum_zones:
        report_parts.append("## 结论:内容质量优秀,未发现明显的语义真空区。nn")
        report_parts.append("该内容的可信度得分极高,无需特别优化。")
        return "".join(report_parts)

    report_parts.append("## 发现的语义真空区列表:n")
    report_parts.append(f"共发现 **{len(semantic_vacuum_zones)}** 处语义真空区域。nn")

    overall_vacuum_score = sum(z['vacuum_score'] for z in semantic_vacuum_zones) / len(semantic_vacuum_zones) if semantic_vacuum_zones else 0
    report_parts.append(f"**内容整体语义真空度评分 (0-1,越高表示真空越严重):** {overall_vacuum_score:.2f}nn")

    for i, zone in enumerate(semantic_vacuum_zones):
        report_parts.append(f"### {i+1}. 语义真空区 (类型: {zone['type']}, 评分: {zone['vacuum_score']:.2f})n")
        report_parts.append(f"**原文片段:** `{zone['original_span']}`n")
        report_parts.append(f"**原文位置:** 字符 {zone['start_char']}-{zone['end_char']}n")
        report_parts.append(f"**问题描述:** {zone['reason']}n")

        # 使用LLM生成优化建议
        suggestion_prompt = f"""
请针对以下缺乏“可采信事实”的原文片段,提供具体的优化建议,帮助补充事实或修正表述。
请考虑如何使内容更具权威性、可信度和可验证性。
建议应简洁明了,可操作。

原文片段:

{zone[‘original_span’]}

问题描述:{zone['reason']}

优化建议:
"""
        try:
            suggestion = llm_caller(suggestion_prompt)
            report_parts.append(f"**优化建议:** {suggestion.strip()}nn")
        except Exception as e:
            report_parts.append(f"**优化建议:** 无法生成优化建议,错误: {e}nn")

    report_parts.append("## 总结与提升建议n")
    summary_prompt = f"""
根据上述对原文的语义真空区分析,请提供一份总结性建议,指出内容在“可采信事实”方面的主要优点和不足,并给出整体性的改进方向。
强调如何从根本上避免语义真空,提升内容质量。

原始文本:

{original_text[:1000]}… # 只给部分文本以节省token

已识别的语义真空区:

{json.dumps(semantic_vacuum_zones, ensure_ascii=False, indent=2)[:2000]}… # 只给部分信息


总结性建议:
"""
    try:
        summary_suggestions = llm_caller(summary_prompt)
        report_parts.append(f"{summary_suggestions.strip()}n")
    except Exception as e:
        report_parts.append(f"无法生成总结性建议,错误: {e}n")

    return "".join(report_parts)

# # 示例使用
# # 模拟LLM生成建议
# def mock_llm_suggestion(prompt: str) -> str:
#     if "智能风控系统V3.0是我们在金融科技领域的一次重大突破" in prompt:
#         return "请具体说明V3.0系统在哪些方面、相比以往版本或竞品,实现了哪些突破?提供具体的性能指标、技术创新点或行业排名数据。"
#     elif "金融欺诈案件在全球范围内呈现逐年上升趋势" in prompt:
#         return "请引用具体的市场调研报告名称、发布机构和年份,并提供具体的增长百分比或案件数量数据。"
#     elif "在内部测试中,我们将误报率降低了20%" in prompt:
#         return "请说明“内部测试”的具体环境、数据集、测试方法,并提供基准误报率和优化后的误报率的对比数据。"
#     elif "这些数据远超行业平均水平" in prompt:
#         return "请引用具体的行业报告或标准,指出行业平均水平的具体数值,以支撑“远超”的说法。"
#     elif "段落中提出观点但缺乏事实支撑" in prompt:
#         return "请在该观点后补充具体案例、数据、专家引用或实验结果来支撑。"
#     else:
#         return "请在此处补充具体的支撑性事实、数据、来源或证据。"
# 
# # 模拟LLM生成总结
# def mock_llm_summary(prompt: str) -> str:
#     return """
# 整体而言,该技术文档在宏观描述上较为流畅,但多处关键技术宣称和性能指标缺乏具体数据、来源或对比。
# 建议:
# 1. 对所有涉及性能提升、技术突破、市场趋势的陈述,务必提供具体的量化数据和权威来源。
# 2. 对于内部测试结果,详细说明测试方法、环境和数据集,并提供基准数据。
# 3. 避免使用“重大突破”、“极佳”、“大幅”等空泛词汇,用具体事实取而代之。
# 4. 考虑在文档中加入参考文献列表或数据来源链接,增强可信度。
# """
# 
# # 将 mock_llm_call 替换为更细致的模拟,以展示报告生成
# def creative_mock_llm_call_for_report(prompt: str) -> str:
#     if "优化建议" in prompt:
#         return mock_llm_suggestion(prompt)
#     elif "总结性建议" in prompt:
#         return mock_llm_summary(prompt)
#     else:
#         return mock_llm_call(prompt) # 沿用之前的通用模拟
# 
# final_report = generate_report_and_suggestions(sample_text, semantic_vacuum_zones, creative_mock_llm_call_for_report)
# print("n--- Final Analysis Report ---n")
# print(final_report)

四、代码实战:一个端到端的Python实现

现在,让我们将上述所有阶段整合到一个完整的流程中,用一个具体的例子来运行。

import json
import os
import re
import datetime
from typing import List, Dict

# --- 辅助函数定义(重复,但为了端到端展示,这里再次列出) ---
ENCODING_NAME = "cl100k_base" 
MAX_TOKENS_PER_CHUNK = 1500 

def count_tokens(text: str) -> int:
    encoding = tiktoken.get_encoding(ENCODING_NAME)
    return len(encoding.encode(text))

def simple_chunk_text(long_text: str) -> List[Dict]:
    sentences = re.split(r'(?<=[.!?])s+', long_text)
    chunks = []
    current_chunk_text = ""
    current_start_char = 0

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        temp_chunk = current_chunk_text + (" " if current_chunk_text else "") + sentence
        if count_tokens(temp_chunk) <= MAX_TOKENS_PER_CHUNK:
            current_chunk_text = temp_chunk
        else:
            if current_chunk_text:
                start_idx = long_text.find(current_chunk_text, current_start_char)
                if start_idx != -1:
                    chunks.append({
                        "text": current_chunk_text,
                        "start_char": start_idx,
                        "end_char": start_idx + len(current_chunk_text)
                    })
                    current_start_char = start_idx + len(current_chunk_text)
                else: 
                    chunks.append({
                        "text": current_chunk_text,
                        "start_char": current_start_char,
                        "end_char": current_start_char + len(current_chunk_text)
                    })
                    current_start_char += len(current_chunk_text)

            current_chunk_text = sentence

    if current_chunk_text:
        start_idx = long_text.find(current_chunk_text, current_start_char)
        if start_idx != -1:
            chunks.append({
                "text": current_chunk_text,
                "start_char": start_idx,
                "end_char": start_idx + len(current_chunk_text)
            })
        else:
            chunks.append({
                "text": current_chunk_text,
                "start_char": current_start_char,
                "end_char": current_start_char + len(current_chunk_text)
            })
    return chunks

# 模拟LLM响应函数
def mock_llm_call(prompt: str) -> str:
    # 模拟事实提取
    if "提取所有事实性陈述" in prompt:
        if "智能风控系统V3.0" in prompt:
            return json.dumps([
                {"statement": "智能风控系统V3.0是我们在金融科技领域的一次重大突破。", "original_span": "智能风控系统V3.0是我们在金融科技领域的一次重大突破。"},
                {"statement": "金融欺诈案件在全球范围内呈现逐年上升趋势。", "original_span": "金融欺诈案件在全球范围内呈现逐年上升趋势"},
                {"statement": "V3.0系统通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。", "original_span": "V3.0系统正是针对这一痛点,通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。"},
                {"statement": "新系统采用了Transformer架构的风险评估模型。", "original_span": "新系统采用了Transformer架构的风险评估模型"},
                {"statement": "新系统结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。", "original_span": "结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。"},
                {"statement": "在内部测试中,我们将误报率降低了20%。", "original_span": "在内部测试中,我们将误报率降低了20%"},
                {"statement": "我们将风险识别的准确率提升了15%。", "original_span": "同时将风险识别的准确率提升了15%"},
                {"statement": "这些数据远超行业平均水平。", "original_span": "这些数据远超行业平均水平。"},
                {"statement": "系统的可扩展性得到了显著增强。", "original_span": "系统的可扩展性也得到了显著增强"}
            ], ensure_ascii=False)
        else:
            return json.dumps([{"statement": "这是一个模拟的事实。", "original_span": "这是一个模拟的事实。"}], ensure_ascii=False)

    # 模拟RAG验证
    elif "判断该事实声明的可信度" in prompt:
        if "金融欺诈案件在全球范围内呈现逐年上升趋势" in prompt and "根据2023年全球金融欺诈报告" in prompt:
            return json.dumps({"status": "Verified", "reason": "与外部报告一致", "confidence_score": 0.9}, ensure_ascii=False)
        elif "智能风控系统V3.0是我们在金融科技领域的一次重大突破" in prompt and "没有任何官方报告指出" in prompt:
            return json.dumps({"status": "Unverifiable", "reason": "缺乏外部证据支撑", "confidence_score": 0.3}, ensure_ascii=False)
        elif "误报率降低了20%" in prompt or "准确率提升了15%" in prompt:
            return json.dumps({"status": "Partially Verified", "reason": "内部测试结果,缺乏第三方验证", "confidence_score": 0.6}, ensure_ascii=False)
        elif "这些数据远超行业平均水平" in prompt and "行业平均误报率在15%-25%之间" in prompt:
             return json.dumps({"status": "Needs Elaboration", "reason": "需要具体指出行业平均水平,才能判断是否远超", "confidence_score": 0.5}, ensure_ascii=False)
        else:
            return json.dumps({"status": "Unverifiable", "reason": "未能找到相关证据", "confidence_score": 0.4}, ensure_ascii=False)

    # 模拟内部一致性检查
    elif "是否存在逻辑上的冲突或不一致" in prompt:
        if "下降趋势" in prompt and "上升趋势" in prompt: # 假设有这种冲突
            return json.dumps({"conflict": True, "reason": "两个声明描述了相反的趋势。"}, ensure_ascii=False)
        return json.dumps({"conflict": False, "reason": "未发现明显冲突。"}, ensure_ascii=False)

    # 模拟隐性语义真空识别
    elif "识别其中是否存在提出观点、做出断言,但却完全缺乏具体事实" in prompt:
        if "系统将为金融机构提供前所未有的安全保障" in prompt or "有望成为行业的新标杆" in prompt:
            return json.dumps({
                "has_implicit_vacuum": True,
                "vacuum_statements": [
                    {"statement": "V3.0系统将为金融机构提供前所未有的安全保障", "original_span": "V3.0系统将为金融机构提供前所未有的安全保障"},
                    {"statement": "有望成为行业的新标杆", "original_span": "有望成为行业的新标杆"}
                ]
            }, ensure_ascii=False)
        return json.dumps({"has_implicit_vacuum": False, "vacuum_statements": []}, ensure_ascii=False)

    # 模拟优化建议
    elif "优化建议" in prompt:
        if "智能风控系统V3.0是我们在金融科技领域的一次重大突破" in prompt:
            return "请具体说明V3.0系统在哪些方面、相比以往版本或竞品,实现了哪些突破?提供具体的性能指标、技术创新点或行业排名数据。"
        elif "金融欺诈案件在全球范围内呈现逐年上升趋势" in prompt:
            return "请引用具体的市场调研报告名称、发布机构和年份,并提供具体的增长百分比或案件数量数据。"
        elif "在内部测试中,我们将误报率降低了20%" in prompt:
            return "请说明“内部测试”的具体环境、数据集、测试方法,并提供基准误报率和优化后的误报率的对比数据。"
        elif "这些数据远超行业平均水平" in prompt:
            return "请引用具体的行业报告或标准,指出行业平均水平的具体数值,以支撑“远超”的说法。"
        elif "有望成为行业的新标杆" in prompt:
            return "请提供成为行业标杆的具体依据,例如市场占有率预期、技术领先性分析或权威机构的评价。"
        return "请在此处补充具体的支撑性事实、数据、来源或证据。"

    # 模拟总结性建议
    elif "总结性建议" in prompt:
        return """
整体而言,该技术文档在宏观描述上较为流畅,但多处关键技术宣称和性能指标缺乏具体数据、来源或对比。
建议:
1. 对所有涉及性能提升、技术突破、市场趋势的陈述,务必提供具体的量化数据和权威来源。
2. 对于内部测试结果,详细说明测试方法、环境和数据集,并提供基准数据。
3. 避免使用“重大突破”、“极佳”、“大幅”等空泛词汇,用具体事实取而代之。
4. 考虑在文档中加入参考文献列表或数据来源链接,增强可信度。
"""

    return json.dumps({"error": "Unknown mock prompt"}, ensure_ascii=False)

# --- 核心处理函数集成 ---

def analyze_content_for_vacuum_zones(original_text: str) -> str:
    """
    端到端分析内容并生成报告。
    """
    print("--- 阶段一:文本预处理与分块 ---")
    chunks = simple_chunk_text(original_text)
    print(f"文本已分割成 {len(chunks)} 个块。n")

    all_fact_claims = []
    print("--- 阶段二:事实性声明的提取 ---")
    for i, chunk in enumerate(chunks):
        print(f"  处理块 {i+1}/{len(chunks)}...")
        claims = extract_fact_claims(chunk['text'], mock_llm_call)
        for claim in claims:
            relative_start = chunk['text'].find(claim['original_span'])
            if relative_start != -1:
                claim['start_char'] = chunk['start_char'] + relative_start
                claim['end_char'] = claim['start_char'] + len(claim['original_span'])
            else:
                claim['start_char'] = chunk['start_char']
                claim['end_char'] = chunk['end_char']
            claim['chunk_id'] = i
        all_fact_claims.extend(claims)
    print(f"共提取 {len(all_fact_claims)} 条事实性声明。n")

    print("--- 阶段三:事实的交叉验证与溯源 ---")
    verified_claims = []
    for i, claim in enumerate(all_fact_claims):
        print(f"  验证事实 {i+1}/{len(all_fact_claims)}: {claim['statement'][:50]}...")
        verification_result = verify_fact_with_rag(claim['statement'], mock_llm_call, mock_vector_db)
        claim.update(verification_result)
        verified_claims.append(claim)

    # 内部一致性检查
    print("n  进行内部一致性检查...")
    inconsistencies = check_internal_consistency(verified_claims, mock_llm_call)
    for inc in inconsistencies:
        print(f"    发现冲突: {inc['claim1']} vs {inc['claim2']} - 理由: {inc['reason']}")
        # 实际中可以将冲突标记到相关的verified_claims中,提高其真空度评分

    # 领域特定规则校验
    print("n  进行领域特定规则校验...")
    for claim in verified_claims:
        domain_result = validate_domain_rules(claim)
        claim.update(domain_result)
    print("事实验证与溯源完成。n")

    print("--- 阶段四:语义真空区识别与量化 ---")
    semantic_vacuum_zones = identify_semantic_vacuum_zones(verified_claims, original_text, chunks, mock_llm_call)
    print(f"共识别 {len(semantic_vacuum_zones)} 处语义真空区域。n")

    print("--- 阶段五:报告生成与优化建议 ---")
    final_report = generate_report_and_suggestions(original_text, semantic_vacuum_zones, mock_llm_call)
    print("报告生成完成。n")

    return final_report

# --- 运行主流程 ---
sample_text = """
尊敬的各位专家、同事:

欢迎大家参加本次“智能风控系统V3.0技术研讨会”。本次系统升级是我们在金融科技领域的一次重大突破。

根据最新的市场调研报告,金融欺诈案件在全球范围内呈现逐年上升趋势,给金融机构带来了巨大的经济损失和声誉风险。我们的V3.0系统正是针对这一痛点,通过引入先进的深度学习算法和实时数据处理能力,实现了对潜在风险的秒级预警。

具体而言,新系统采用了Transformer架构的风险评估模型,结合了多模态数据分析,包括交易行为、社交网络信息、设备指纹等。在内部测试中,我们将误报率降低了20%,同时将风险识别的准确率提升了15%,这些数据远超行业平均水平。此外,系统的可扩展性也得到了显著增强,能够轻松应对未来业务量的增长。

我们相信,V3.0系统将为金融机构提供前所未有的安全保障,并有望成为行业的新标杆。请大家踊跃提问,共同探讨。

谢谢大家!
"""

if __name__ == "__main__":
    import tiktoken # ensure tiktoken is imported for the main execution
    final_analysis_report = analyze_content_for_vacuum_zones(sample_text)
    print("n" + "="*80 + "n")
    print(final_analysis_report)
    print("n" + "="*80 + "n")

五、高级议题与挑战

尽管大模型提供了强大的能力,但在实际应用中,我们仍需面对诸多挑战:

  1. 事实的粒度与主观性: 区分客观事实和带有倾向性的陈述、主观意见是一项复杂任务。例如,“这项技术是革命性的”可能是一个观点,但“这项技术获得了专利”则是一个事实。大模型在处理这种模糊边界时,仍需精细的Prompt工程和领域知识。
  2. 时效性与知识库更新: 外部知识库的实时性至关重要。金融数据、市场报告、技术规范等信息更新迅速,RAG系统需要高效的更新机制来确保检索到的证据是最新的。
  3. 多语言与跨文化: 事实的表达方式和验证标准可能因语言和文化背景而异。构建跨语言的准确验证系统是一个巨大的工程。
  4. 成本与性能优化: 大模型的API调用通常按Token计费,大规模内容分析会

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注