各位学员,大家好!欢迎来到今天的技术讲座。今天我们将深入探讨一个在构建现代AI应用,特别是大型语言模型(LLM)驱动的系统时至关重要的话题:如何高效、可靠地处理并摄取百万量级的非结构化数据,例如来自Notion或Slack的文档,并将其转化为LLM可用的知识表示。
在当今的信息爆炸时代,企业内部知识库和沟通记录承载了巨大的价值。Notion作为项目管理和知识沉淀的利器,Slack作为团队协作和即时沟通的枢纽,它们内部的数据量往往是惊人的。对于LLM而言,这些数据是其理解企业运营、回答复杂问题、提供智能辅助的基石。然而,将这些海量的、格式各异的数据有效地“喂给”LLM,并非易事。这其中涉及到的挑战包括但不限于:
- 数据规模庞大: 百万量级的文档意味着巨大的存储和处理开销。
- 数据结构多样: Notion有页面、数据库、块;Slack有频道、消息、线程,各自结构复杂。
- API限制与配额: 外部服务API往往有严格的请求速率和分页机制。
- 增量更新与实时性: 数据是动态变化的,如何高效地同步最新内容而非每次全量拉取?
- 数据质量与一致性: 如何确保摄取的数据是干净、有效且格式统一的?
- 内存与性能: 如何在处理海量数据时避免内存溢出,保证处理效率?
今天,我们将聚焦于LangChain框架中的原生DocumentLoader机制,探讨如何利用它来构建一个健壮、可扩展的数据摄取管道,以应对上述挑战。我们将通过具体的代码示例,深入解析Notion和Slack的API交互策略,以及如何结合LangChain的抽象,实现百万量级文档的高效加载、处理与准备。
1. LangChain DocumentLoader 的核心概念与作用
在LangChain生态系统中,DocumentLoader 是数据摄取的核心抽象。它的主要职责是从各种数据源(如文件系统、数据库、Web API等)加载数据,并将其转化为LangChain Document 对象列表。每个 Document 对象通常包含两个关键属性:
page_content: 存储文档的实际文本内容。metadata: 一个字典,包含与文档相关的元数据,如源文件路径、创建时间、作者、URL等。这些元数据对于后续的检索增强生成(RAG)等应用至关重要。
1.1 load() 与 lazy_load() 方法
DocumentLoader 接口定义了两个主要的加载方法:
load(): 这个方法会一次性加载所有文档并返回一个list[Document]。对于小规模数据集来说很方便,但如果数据量巨大,它可能会导致内存溢出(Out Of Memory, OOM)。lazy_load(): 这个方法是一个生成器,它按需生成Document对象。这意味着它不会一次性将所有文档加载到内存中,而是逐个生成并返回。这对于处理百万量级的数据至关重要,因为它大大降低了内存压力,允许我们以流式方式处理数据。
from langchain.docstore.document import Document
from typing import Iterator, List
class CustomLargeScaleLoader:
def lazy_load(self) -> Iterator[Document]:
# 模拟从一个巨大数据源逐个加载文档
for i in range(1_000_000): # 模拟一百万个文档
doc_content = f"This is the content of document {i}."
doc_metadata = {"source": "custom_data_source", "doc_id": i}
yield Document(page_content=doc_content, metadata=doc_metadata)
def load(self) -> List[Document]:
# 对于百万量级数据,这个方法通常不推荐直接使用,
# 除非有足够的内存或数据源本身支持一次性高效加载。
# 这里仅为演示接口,实际会耗尽内存。
return list(self.lazy_load())
# 示例使用 lazy_load
# loader = CustomLargeScaleLoader()
# for i, doc in enumerate(loader.lazy_load()):
# if i < 5:
# print(f"Loaded document {doc.metadata['doc_id']}: {doc.page_content[:50]}...")
# if i == 5:
# print("...")
# if i > 999995:
# print(f"Loaded document {doc.metadata['doc_id']}: {doc.page_content[:50]}...")
# print("Finished loading all documents lazily.")
在接下来的讨论中,我们将重点利用 lazy_load() 的优势,结合Notion和Slack的API特性,构建真正可扩展的数据摄取方案。
2. Notion文档摄取:企业知识库的规模化挑战与解决方案
Notion作为现代企业的知识管理工具,其内部沉淀了大量的文档、笔记、数据库记录。将其内容有效地摄取到LLM的知识库中,能够极大地提升LLM在企业特定领域的问答能力。
2.1 Notion API 概览与认证
要从Notion获取数据,我们需要使用Notion API。
- 认证: 主要通过内部集成令牌(Internal Integration Token)。
- 访问 Notion Integrations。
- 点击 "New integration",选择工作区,并提交。
- 复制生成的 "Internal Integration Token"。
- 权限: 集成创建后,需要将Notion页面或数据库分享给该集成。在Notion页面或数据库的右上角点击 "Share",然后添加你的集成。
- 核心概念:
- Page (页面): Notion中的基本单位,可以包含各种内容。
- Database (数据库): 结构化的页面集合,每个页面是数据库中的一个条目(Page Item)。
- Block (块): 页面或数据库条目中的最小内容单元,如段落、标题、列表、代码块等。
- API 限制与分页: Notion API有速率限制(通常是每秒若干请求)和分页机制(
start_cursor和page_size)。处理百万量级数据时,必须严格遵守这些规则。
2.2 LangChain Notion Loader 的基本使用
LangChain提供了 NotionAPILoader,它可以连接到Notion API并加载指定数据库或页面的内容。
import os
from langchain_community.document_loaders import NotionAPILoader
from dotenv import load_dotenv
load_dotenv() # 加载 .env 文件中的环境变量
# 确保在 .env 文件中设置了 NOTION_INTEGRATION_TOKEN
# NOTION_INTEGRATION_TOKEN="secret_YOUR_NOTION_INTEGRATION_TOKEN"
# NOTION_DATABASE_ID="YOUR_NOTION_DATABASE_ID" # 可选,如果只加载特定数据库
# NOTION_PAGE_IDS=["PAGE_ID_1", "PAGE_ID_2"] # 可选,如果只加载特定页面
# 获取Notion集成令牌
notion_token = os.getenv("NOTION_INTEGRATION_TOKEN")
if not notion_token:
raise ValueError("NOTION_INTEGRATION_TOKEN not found in environment variables.")
# 示例1: 加载指定数据库的所有页面内容
# database_id = os.getenv("NOTION_DATABASE_ID") # 例如 'a1b2c3d4e5f6...'
# if database_id:
# print(f"Loading documents from Notion database: {database_id}")
# loader = NotionAPILoader(
# integration_token=notion_token,
# database_id=database_id,
# # page_ids=None # 如果指定了database_id,则page_ids通常不指定
# )
# docs = loader.load()
# print(f"Loaded {len(docs)} documents from database.")
# if docs:
# print(f"First document content (truncated): {docs[0].page_content[:200]}...")
# print(f"First document metadata: {docs[0].metadata}")
# 示例2: 加载指定页面内容 (可以加载多个)
# page_ids = os.getenv("NOTION_PAGE_IDS")
# if page_ids:
# # 通常从环境变量读取的是字符串,需要解析成列表
# page_id_list = page_ids.split(',') if ',' in page_ids else [page_ids]
# print(f"Loading documents from Notion pages: {page_id_list}")
# loader_pages = NotionAPILoader(
# integration_token=notion_token,
# page_ids=page_id_list
# )
# docs_pages = loader_pages.load()
# print(f"Loaded {len(docs_pages)} documents from specified pages.")
# if docs_pages:
# print(f"First page document content (truncated): {docs_pages[0].page_content[:200]}...")
# print(f"First page document metadata: {docs_pages[0].metadata}")
注意: 上述 NotionAPILoader 的 load() 方法在底层会处理API分页。但对于百万量级的数据,即使是 load() 内部的聚合,也可能因为一次性处理太多API请求或构建过大的内存对象而遇到问题。因此,我们需要更精细的控制。
2.3 策略1:基于 lazy_load() 的迭代加载与API分页管理
NotionAPILoader 已经内置了 lazy_load() 方法,它在内部处理了Notion API的分页逻辑,从而避免一次性加载所有数据到内存。这是处理大规模Notion数据的首选方式。
import os
import time
from langchain_community.document_loaders import NotionAPILoader
from dotenv import load_dotenv
load_dotenv()
notion_token = os.getenv("NOTION_INTEGRATION_TOKEN")
database_id_to_load = os.getenv("NOTION_DATABASE_ID") # 假设我们从环境变量获取一个数据库ID
if not notion_token or not database_id_to_load:
raise ValueError("NOTION_INTEGRATION_TOKEN and NOTION_DATABASE_ID must be set.")
print(f"Initiating lazy loading for Notion database: {database_id_to_load}")
loader = NotionAPILoader(
integration_token=notion_token,
database_id=database_id_to_load,
# block_ids=None, # 如果想加载特定块,可以指定
# page_ids=None, # 如果指定了database_id,通常不指定page_ids
)
processed_count = 0
start_time = time.time()
try:
for doc in loader.lazy_load():
processed_count += 1
# 在这里可以对每个doc进行实时处理,例如:
# - 分块 (chunking)
# - 向量化 (embedding)
# - 存储到向量数据库 (vector store)
if processed_count % 1000 == 0:
elapsed_time = time.time() - start_time
print(f"Processed {processed_count} documents. Current document: {doc.metadata.get('title', 'N/A')}. "
f"Time elapsed: {elapsed_time:.2f}s")
# 实际生产中,这里是你的文档处理管道的核心逻辑
# 例如:
# chunks = text_splitter.split_documents([doc])
# vector_store.add_documents(chunks)
except Exception as e:
print(f"An error occurred during lazy loading: {e}")
# 实现错误重试机制或记录失败的文档ID
finally:
end_time = time.time()
print(f"nFinished lazy loading. Total documents processed: {processed_count}.")
print(f"Total time taken: {(end_time - start_time):.2f} seconds.")
这个方法的核心优势在于:
- 内存效率:
lazy_load()逐个生成Document,避免了巨大的内存峰值。 - API流量控制: LangChain的Notion Loader内部会处理API的
start_cursor和page_size参数,并可能内置一些基本的速率限制。 - 实时处理: 每个文档生成后即可立即进行下游处理(如分块、向量化),无需等待所有文档加载完毕。
2.4 策略2:增量更新与变更检测
对于百万量级的数据,每次全量拉取不仅耗时,而且浪费API配额。更优的方案是实现增量更新,只拉取自上次同步以来发生变化的数据。Notion API提供了 last_edited_time 属性,可以用来实现这一目标。
实现增量更新需要以下步骤:
- 存储状态: 维护一个记录上次成功同步时间的持久化存储(例如数据库、文件)。
- 查询变更: 使用Notion API的过滤功能,只查询
last_edited_time大于上次同步时间的数据。 - 更新状态: 成功同步后,更新上次同步时间。
下面是一个自定义的 NotionIncrementalLoader 示例,它利用Notion API的查询功能实现增量加载:
import os
import time
from datetime import datetime, timezone, timedelta
import requests
from typing import Iterator, List, Dict, Any, Optional
from langchain.docstore.document import Document
from dotenv import load_dotenv
load_dotenv()
class NotionIncrementalLoader:
def __init__(self, integration_token: str, database_id: str,
last_sync_timestamp: Optional[datetime] = None,
page_size: int = 100, # Notion API max page size
retry_attempts: int = 5,
initial_backoff: float = 1.0):
self.integration_token = integration_token
self.database_id = database_id
self.last_sync_timestamp = last_sync_timestamp
self.page_size = page_size
self.retry_attempts = retry_attempts
self.initial_backoff = initial_backoff
self.headers = {
"Authorization": f"Bearer {self.integration_token}",
"Notion-Version": "2022-06-28", # Use a stable Notion API version
"Content-Type": "application/json"
}
self._base_url = "https://api.notion.com/v1"
self._notion_client = None # Could integrate a dedicated Notion client library here
def _get_notion_client(self):
# Placeholder for a more robust Notion client if needed
pass
def _make_api_request(self, method: str, url: str, json_data: Optional[Dict] = None) -> Dict:
for attempt in range(self.retry_attempts):
try:
response = requests.request(method, url, headers=self.headers, json=json_data, timeout=30)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
print(f"API request failed (attempt {attempt+1}/{self.retry_attempts}): {e}")
if response is not None:
print(f"Response status: {response.status_code}, content: {response.text}")
if attempt < self.retry_attempts - 1:
backoff_time = self.initial_backoff * (2 ** attempt) + (time.random() * 0.1) # Exponential backoff with jitter
print(f"Retrying in {backoff_time:.2f} seconds...")
time.sleep(backoff_time)
else:
raise
def _get_page_content(self, page_id: str) -> str:
url = f"{self._base_url}/blocks/{page_id}/children"
content_blocks = []
next_cursor = None
while True:
params = {"page_size": self.page_size}
if next_cursor:
params["start_cursor"] = next_cursor
try:
response = self._make_api_request("GET", url, json_data=params)
for block in response.get("results", []):
block_type = block.get("type")
if block_type == "paragraph":
text = "".join([rich_text.get("plain_text", "") for rich_text in block["paragraph"]["rich_text"]])
content_blocks.append(text)
elif block_type == "heading_1":
text = "".join([rich_text.get("plain_text", "") for rich_text in block["heading_1"]["rich_text"]])
content_blocks.append(f"# {text}")
elif block_type == "heading_2":
text = "".join([rich_text.get("plain_text", "") for rich_text in block["heading_2"]["rich_text"]])
content_blocks.append(f"## {text}")
elif block_type == "heading_3":
text = "".join([rich_text.get("plain_text", "") for rich_text in block["heading_3"]["rich_text"]])
content_blocks.append(f"### {text}")
# Add more block types as needed (bulleted_list_item, numbered_list_item, code, etc.)
# For a full list, refer to Notion API documentation for block objects.
next_cursor = response.get("next_cursor")
if not next_cursor:
break
except Exception as e:
print(f"Error fetching blocks for page {page_id}: {e}")
break # Or re-raise, depending on error handling strategy
return "nn".join(content_blocks)
def lazy_load(self) -> Iterator[Document]:
url = f"{self._base_url}/databases/{self.database_id}/query"
next_cursor = None
while True:
query_filter = {
"property": "Last edited time",
"date": {
"on_or_after": self.last_sync_timestamp.isoformat() if self.last_sync_timestamp else "1970-01-01T00:00:00Z"
}
} if self.last_sync_timestamp else {} # No filter if first run
query_body = {
"filter": query_filter,
"sorts": [
{
"property": "Last edited time",
"direction": "ascending"
}
],
"page_size": self.page_size
}
if next_cursor:
query_body["start_cursor"] = next_cursor
try:
response = self._make_api_request("POST", url, json_data=query_body)
for page in response.get("results", []):
page_id = page["id"]
properties = page["properties"]
# Extract title
title_prop = properties.get("Name", properties.get("title")) # 'Name' for databases, 'title' for pages
title = ""
if title_prop and title_prop.get("title"): # For database items (pages)
title = "".join([t.get("plain_text", "") for t in title_prop["title"]])
elif title_prop and title_prop.get("rich_text"): # For simple page titles
title = "".join([t.get("plain_text", "") for t in title_prop["rich_text"]])
last_edited_time_str = page["last_edited_time"]
created_time_str = page["created_time"]
# Fetch actual page content (blocks)
page_content = self._get_page_content(page_id)
metadata = {
"source": f"notion_database_{self.database_id}",
"page_id": page_id,
"title": title if title else f"Page {page_id}",
"last_edited_time": last_edited_time_str,
"created_time": created_time_str,
"url": page["url"],
# Add other database properties to metadata as needed
**{k: self._extract_property_value(v) for k, v in properties.items() if k not in ["Name", "title"]}
}
yield Document(page_content=page_content, metadata=metadata)
next_cursor = response.get("next_cursor")
if not next_cursor:
break
except Exception as e:
print(f"Error querying Notion database: {e}")
break
def _extract_property_value(self, prop_value: Dict[str, Any]) -> Any:
prop_type = prop_value.get("type")
if prop_type == "title":
return "".join([t.get("plain_text", "") for t in prop_value["title"]])
elif prop_type == "rich_text":
return "".join([t.get("plain_text", "") for t in prop_value["rich_text"]])
elif prop_type == "number":
return prop_value["number"]
elif prop_type == "select":
return prop_value["select"]["name"] if prop_value["select"] else None
elif prop_type == "multi_select":
return [s["name"] for s in prop_value["multi_select"]]
elif prop_type == "date":
return prop_value["date"]["start"] if prop_value["date"] else None
elif prop_type == "checkbox":
return prop_value["checkbox"]
elif prop_type == "url":
return prop_value["url"]
elif prop_type == "email":
return prop_value["email"]
elif prop_type == "phone_number":
return prop_value["phone_number"]
elif prop_type == "files":
return [f["name"] for f in prop_value["files"]]
elif prop_type == "people":
return [p["name"] for p in prop_value["people"]]
elif prop_type == "status":
return prop_value["status"]["name"] if prop_value["status"] else None
# Add more property types as needed
return None
# --- 运行示例 ---
notion_token = os.getenv("NOTION_INTEGRATION_TOKEN")
database_id_to_load = os.getenv("NOTION_DATABASE_ID")
if not notion_token or not database_id_to_load:
raise ValueError("NOTION_INTEGRATION_TOKEN and NOTION_DATABASE_ID must be set.")
# 假设我们有一个持久化存储来记录上次同步时间
# 首次运行或没有历史记录时,设置为None或一个非常早的时间
# 在实际应用中,这会从数据库或文件加载
last_successful_sync_time_str = "2023-01-01T00:00:00.000Z" # 示例:从某个持久化存储读取
last_sync_dt = datetime.fromisoformat(last_successful_sync_time_str.replace('Z', '+00:00')) if last_successful_sync_time_str else None
print(f"Starting incremental load for database {database_id_to_load} since {last_sync_dt or 'beginning of time'}")
incremental_loader = NotionIncrementalLoader(
integration_token=notion_token,
database_id=database_id_to_load,
last_sync_timestamp=last_sync_dt # Pass the timestamp
)
processed_count = 0
new_last_sync_time = last_sync_dt
start_time = time.time()
try:
for doc in incremental_loader.lazy_load():
processed_count += 1
current_doc_edited_time = datetime.fromisoformat(doc.metadata["last_edited_time"].replace('Z', '+00:00'))
# 更新新的同步时间,确保它是所有已处理文档中最晚的
if new_last_sync_time is None or current_doc_edited_time > new_last_sync_time:
new_last_sync_time = current_doc_edited_time
if processed_count % 100 == 0:
elapsed_time = time.time() - start_time
print(f"Processed {processed_count} incremental documents. Last doc title: {doc.metadata.get('title')}. "
f"Edited: {doc.metadata['last_edited_time']}. Time elapsed: {elapsed_time:.2f}s")
# 实际的文档处理逻辑(分块、向量化等)
# ...
except Exception as e:
print(f"An error occurred during incremental loading: {e}")
finally:
end_time = time.time()
print(f"nFinished incremental loading. Total documents processed: {processed_count}.")
print(f"Time taken: {(end_time - start_time):.2f} seconds.")
# 成功完成后,更新持久化存储中的 last_successful_sync_time
if new_last_sync_time:
# 将其转换为UTC并格式化为ISO 8601字符串
final_sync_time_str = new_last_sync_time.astimezone(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z')
print(f"Next sync should start from: {final_sync_time_str}")
# 在生产环境中,你会将 final_sync_time_str 写入数据库或配置服务
关键点:
_make_api_request: 封装了API请求,包含重试和指数退避机制,以应对网络波动和API限流。_get_page_content: 递归获取页面的所有块内容,并将其拼接成一个文本。Notion的页面内容是分块存储的,因此需要额外的API调用来获取。lazy_load: 核心逻辑。它构建一个Notion数据库查询,筛选last_edited_time大于上次同步时间的页面。- 状态管理:
last_sync_timestamp需要从持久化存储中读取和写入。在实际生产中,这通常是一个数据库表。 - 并发考虑: 如果多个进程或线程同时尝试更新
last_sync_timestamp,需要加锁或使用事务。
2.5 策略3:并行化处理多个数据库或页面
如果你的Notion知识库分散在多个数据库或大量独立的页面中,可以考虑并行化加载过程,以提高整体吞吐量。
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain_community.document_loaders import NotionAPILoader
from dotenv import load_dotenv
from typing import List, Iterator
load_dotenv()
# 从环境变量获取Notion集成令牌
notion_token = os.getenv("NOTION_INTEGRATION_TOKEN")
if not notion_token:
raise ValueError("NOTION_INTEGRATION_TOKEN not found in environment variables.")
# 假设有多个数据库ID需要加载
# 在实际场景中,这些ID可能来自一个配置列表、数据库查询或Notion API本身
multiple_database_ids_str = os.getenv("NOTION_MULTI_DATABASE_IDS", "")
if not multiple_database_ids_str:
print("Warning: NOTION_MULTI_DATABASE_IDS not set. Using a dummy ID for demonstration.")
# 请替换为你的实际数据库ID列表
database_ids_to_process = ["dummy_db_id_1", "dummy_db_id_2", "dummy_db_id_3"]
else:
database_ids_to_process = [db_id.strip() for db_id in multiple_database_ids_str.split(',')]
def load_notion_database_lazily(db_id: str, token: str) -> Iterator[Document]:
"""
一个函数,用于包装NotionAPILoader的lazy_load,并处理单个数据库。
"""
print(f"Starting lazy load for database: {db_id}")
loader = NotionAPILoader(integration_token=token, database_id=db_id)
try:
for doc in loader.lazy_load():
yield doc
except Exception as e:
print(f"Error loading database {db_id}: {e}")
# 可以选择在这里 yield 一个错误文档或者记录错误
finally:
print(f"Finished lazy load for database: {db_id}")
def parallel_notion_ingestion(db_ids: List[str], token: str, max_workers: int = 5) -> Iterator[Document]:
"""
并行化处理多个Notion数据库的文档摄取。
"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交每个数据库的加载任务
future_to_db_id = {executor.submit(load_notion_database_lazily, db_id, token): db_id for db_id in db_ids}
for future in as_completed(future_to_db_id):
db_id = future_to_db_id[future]
try:
# 获取生成器结果,并逐个yield文档
for doc in future.result():
yield doc
except Exception as exc:
print(f"Database {db_id} generated an exception: {exc}")
# --- 运行示例 ---
print(f"Starting parallel Notion ingestion for {len(database_ids_to_process)} databases.")
total_processed_docs = 0
start_time = time.time()
for doc in parallel_notion_ingestion(database_ids_to_process, notion_token, max_workers=3):
total_processed_docs += 1
if total_processed_docs % 100 == 0:
elapsed = time.time() - start_time
print(f"Total processed documents: {total_processed_docs}. Current doc from {doc.metadata.get('source', 'N/A')}. Time: {elapsed:.2f}s")
# 实际的文档处理逻辑
# 例如:text_splitter.split_documents([doc]), vector_store.add_documents(chunks)
end_time = time.time()
print(f"nFinished parallel Notion ingestion. Total documents processed: {total_processed_docs}.")
print(f"Total time taken: {(end_time - start_time):.2f} seconds.")
并行化注意事项:
ThreadPoolExecutorvs.ProcessPoolExecutor: 对于I/O密集型任务(如API调用),ThreadPoolExecutor通常更合适,因为它避免了进程间通信的开销。对于CPU密集型任务(如复杂的文本处理),ProcessPoolExecutor可以利用多核CPU。- API限流: 并行化会增加对Notion API的请求速率。务必注意Notion的API速率限制,可能需要增加请求之间的延迟或更复杂的限流策略。
- 错误处理: 确保每个并行任务中的错误能够被捕获并妥善处理,不影响整个管道的运行。
2.6 Notion文档的后处理:分块与元数据增强
无论采用何种加载策略,从Notion加载的原始 Document 对象通常需要进一步处理,以适应LLM的需求。
2.6.1 文本分块 (Chunking)
LLM通常有输入长度限制(上下文窗口),因此需要将大型文档分割成更小的、有意义的块。RecursiveCharacterTextSplitter 是一个非常常用的分块器。
from langchain.text_splitter import RecursiveCharacterTextSplitter
def process_notion_document_for_llm(doc: Document) -> List[Document]:
"""
对单个Notion文档进行分块和元数据处理。
"""
# 针对Notion文档的特点调整分块策略
# 例如,可以根据标题、段落等结构信息进行更智能的分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["nn", "n", " ", ""] # 优先按双换行符分割,其次单换行,再空格,最后字符
)
chunks = text_splitter.split_documents([doc])
# 对每个块添加或更新元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = f"{doc.metadata['page_id']}-{i}"
chunk.metadata["chunk_index"] = i
chunk.metadata["total_chunks"] = len(chunks)
# 确保每个块都包含原始文档的关键元数据,例如标题和URL
chunk.metadata["page_title"] = doc.metadata.get("title", "Unknown Notion Page")
chunk.metadata["page_url"] = doc.metadata.get("url", "N/A")
chunk.metadata["last_edited_time"] = doc.metadata.get("last_edited_time", "N/A")
# 可以添加一个指向原始Notion页面的链接
chunk.metadata["source_url"] = doc.metadata.get("url", "N/A")
return chunks
# 示例使用 (假设我们已经从Notion加载了一个文档)
# dummy_notion_doc = Document(
# page_content="""
# # 项目总结报告 - Q3 2023
# ## 概述
# 本报告总结了2023年第三季度各项工作的进展和成果。
# 主要成就包括:
# - 完成了A功能开发
# - 优化了B模块性能
# ### 关键指标
# - 用户增长:15%
# - 活跃度:20%
# ## 挑战与展望
# 面临的主要挑战是市场竞争加剧。
# """,
# metadata={
# "page_id": "notion_page_123",
# "title": "项目总结报告",
# "url": "https://www.notion.so/...",
# "last_edited_time": "2023-10-26T10:00:00Z"
# }
# )
# processed_chunks = process_notion_document_for_llm(dummy_notion_doc)
# print(f"Original document has {len(dummy_notion_doc.page_content)} characters.")
# print(f"Split into {len(processed_chunks)} chunks.")
# for i, chunk in enumerate(processed_chunks):
# print(f"nChunk {i+1} (ID: {chunk.metadata['chunk_id']}):")
# print(f"Content: {chunk.page_content[:200]}...")
# print(f"Metadata: {chunk.metadata}")
2.6.2 元数据增强
从Notion页面或数据库属性中提取的关键信息(如作者、标签、状态、项目名称等)可以作为元数据附加到Document对象中。这些元数据在检索阶段可以用来过滤或排序结果,提高RAG的准确性。
表格:Notion文档元数据示例
| 字段名 | 来源 | 描述 | 用途 |
|---|---|---|---|
page_id |
Notion API | Notion页面的唯一ID | 关联回原始文档,排重 |
title |
Notion API | 页面标题 | 搜索结果展示,上下文 |
url |
Notion API | 页面URL | 方便用户跳转到原文 |
last_edited_time |
Notion API | 最后编辑时间 | 增量更新,按时间排序 |
created_time |
Notion API | 创建时间 | |
source |
自定义 | 指示数据来源(如notion_database_X) |
区分不同来源的数据 |
tags |
Notion 属性 | 页面标签(多选) | 过滤搜索结果,分类 |
status |
Notion 属性 | 页面状态(如Draft, Published) |
过滤草稿,只处理已发布内容 |
author |
Notion 属性 | 作者(人员属性) | 按作者查询 |
chunk_id |
分块后生成 | 文本块的唯一ID | 跟踪每个文本块,排重 |
chunk_index |
分块后生成 | 文本块在原始文档中的序号 | 保持文本块顺序,重构完整文档 |
3. Slack文档摄取:团队沟通记录的规模化挑战与解决方案
Slack作为团队沟通的中心,包含了大量的讨论、决策、技术交流等宝贵信息。将这些信息整合到LLM的知识库中,可以帮助LLM理解团队历史、回答常见问题、总结会议讨论等。
3.1 Slack API 概览与认证
要从Slack获取数据,我们需要使用Slack API。
- 认证: 主要通过OAuth 2.0,通常使用Bot Token。
- 访问 Slack API。
- 创建一个新的Slack应用。
- 在 "OAuth & Permissions" 页面,添加所需的Bot Token Scopes。对于读取消息,通常需要
channels:read,groups:read,im:read,mpim:read(用于识别频道类型),channels:history,groups:history,im:history,mpim:history(用于读取消息)。 - 安装应用到工作区,复制生成的 "Bot User OAuth Token" (以
xoxb-开头)。
- 核心概念:
- Channel (频道): 公开频道、私有频道。
- Direct Message (DM): 私聊。
- Multi-Person Direct Message (MPDM): 多人私聊。
- Message (消息): 频道或DM中的一条信息。
- Thread (线程): 消息下的回复链。
ts(timestamp): 消息的唯一标识符,也是其时间戳。
- API 限制与分页: Slack API有严格的速率限制(按层级划分)和分页机制(
cursor)。尤其是在拉取历史消息时,需要非常小心地管理请求。
3.2 LangChain Slack Loader 的基本使用
LangChain提供了 SlackDataLoader,它可以连接到Slack API并加载指定频道或用户的消息。
import os
from langchain_community.document_loaders import SlackDataLoader
from dotenv import load_dotenv
load_dotenv() # 加载 .env 文件中的环境变量
# 确保在 .env 文件中设置了 SLACK_BOT_TOKEN
# SLACK_BOT_TOKEN="xoxb-YOUR_SLACK_BOT_TOKEN"
# SLACK_CHANNEL_IDS="C12345,C67890" # 逗号分隔的频道ID列表
slack_token = os.getenv("SLACK_BOT_TOKEN")
if not slack_token:
raise ValueError("SLACK_BOT_TOKEN not found in environment variables.")
# 假设要加载的频道ID
channel_ids_str = os.getenv("SLACK_CHANNEL_IDS", "")
if not channel_ids_str:
print("Warning: SLACK_CHANNEL_IDS not set. Using dummy channel IDs for demonstration.")
# 请替换为你的实际Slack频道ID
target_channel_ids = ["C0123ABCD", "C0987EFGH"]
else:
target_channel_ids = [cid.strip() for cid in channel_ids_str.split(',')]
# 示例: 加载指定频道的历史消息
print(f"Loading messages from Slack channels: {target_channel_ids}")
loader = SlackDataLoader(
token=slack_token,
channel_ids=target_channel_ids,
# user_ids=None # 如果要加载DM,可以指定用户ID
)
# 使用 lazy_load 避免内存问题
processed_count = 0
start_time = time.time()
try:
for doc in loader.lazy_load():
processed_count += 1
if processed_count % 100 == 0:
elapsed_time = time.time() - start_time
print(f"Processed {processed_count} Slack messages. "
f"Sender: {doc.metadata.get('sender', 'N/A')}, "
f"Channel: {doc.metadata.get('channel_name', 'N/A')}. "
f"Time elapsed: {elapsed_time:.2f}s")
# 实际的文档处理逻辑
# ...
except Exception as e:
print(f"An error occurred during Slack lazy loading: {e}")
finally:
end_time = time.time()
print(f"nFinished Slack lazy loading. Total messages processed: {processed_count}.")
print(f"Total time taken: {(end_time - start_time):.2f} seconds.")
与NotionAPILoader类似,SlackDataLoader 的 lazy_load() 方法在内部处理了Slack API的分页(cursor)和每个频道的消息拉取。
3.3 策略1:基于 lazy_load() 的迭代加载与消息分页管理
SlackDataLoader 已经内置了 lazy_load() 方法,它会:
- 列出所有目标频道。
- 对每个频道,使用
conversations.historyAPI 分页拉取消息。 - 对每个消息,包括其可能的回复线程,将其转化为
Document对象。
这是处理大规模Slack数据的推荐方式,因为它高效且内存友好。
3.4 策略2:增量更新与Timestamp-based 过滤
Slack消息是流式的,实现增量更新是关键。Slack API的 conversations.history 方法支持 oldest 和 latest 参数,可以用来指定时间范围。
实现增量更新的步骤:
- 存储状态: 为每个频道维护一个上次成功同步的
latest_timestamp(Slack的ts字段)。 - 查询增量: 调用
conversations.history时,设置oldest参数为上次同步的latest_timestamp。 - 更新状态: 成功同步后,将本次拉取到的最晚消息的
ts更新为新的latest_timestamp。
import os
import time
from datetime import datetime, timezone
from typing import Iterator, List, Dict, Any, Optional
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from langchain.docstore.document import Document
from dotenv import load_dotenv
load_dotenv()
class SlackIncrementalLoader:
def __init__(self, token: str, channel_ids: List[str],
last_sync_timestamps: Optional[Dict[str, float]] = None, # {channel_id: latest_ts_float}
page_limit: int = 100, # Max messages per API call, Slack default is 1000
retry_attempts: int = 5,
initial_backoff: float = 1.0):
self.client = WebClient(token=token)
self.channel_ids = channel_ids
self.last_sync_timestamps = last_sync_timestamps if last_sync_timestamps is not None else {}
self.page_limit = page_limit
self.retry_attempts = retry_attempts
self.initial_backoff = initial_backoff
def _get_channel_info(self, channel_id: str) -> Dict[str, Any]:
try:
response = self.client.conversations_info(channel=channel_id)
return response["channel"]
except SlackApiError as e:
print(f"Error fetching channel info for {channel_id}: {e.response['error']}")
return {"name": "Unknown Channel"}
def _fetch_messages_with_retry(self, channel_id: str, oldest_ts: Optional[float] = None,
cursor: Optional[str] = None) -> Dict[str, Any]:
for attempt in range(self.retry_attempts):
try:
params = {
"channel": channel_id,
"limit": self.page_limit,
"inclusive": True # Include message at oldest_ts
}
if oldest_ts:
params["oldest"] = str(oldest_ts) # Slack API expects string
if cursor:
params["cursor"] = cursor
response = self.client.conversations_history(**params)
return response
except SlackApiError as e:
error_code = e.response["error"]
print(f"Slack API error (attempt {attempt+1}/{self.retry_attempts}) for channel {channel_id}: {error_code}")
if error_code == "ratelimited":
backoff_time = int(e.response.headers.get("Retry-After", self.initial_backoff * (2 ** attempt))) + (time.random() * 0.1)
print(f"Rate limited. Retrying in {backoff_time:.2f} seconds...")
time.sleep(backoff_time)
elif error_code in ["not_in_channel", "channel_not_found", "is_archived"]:
print(f"Skipping channel {channel_id} due to permanent error: {error_code}")
return {"messages": [], "has_more": False} # Treat as empty, no further retry
else:
if attempt < self.retry_attempts - 1:
backoff_time = self.initial_backoff * (2 ** attempt) + (time.random() * 0.1)
print(f"Retrying in {backoff_time:.2f} seconds...")
time.sleep(backoff_time)
else:
raise # Re-raise after max retries
raise Exception(f"Failed to fetch messages for channel {channel_id} after {self.retry_attempts} attempts.")
def _fetch_replies_with_retry(self, channel_id: str, thread_ts: str) -> List[Dict[str, Any]]:
for attempt in range(self.retry_attempts):
try:
response = self.client.conversations_replies(
channel=channel_id,
ts=thread_ts,
limit=self.page_limit
)
return response["messages"]
except SlackApiError as e:
error_code = e.response["error"]
print(f"Slack API error (attempt {attempt+1}/{self.retry_attempts}) for replies in {channel_id} (thread {thread_ts}): {error_code}")
if error_code == "ratelimited":
backoff_time = int(e.response.headers.get("Retry-After", self.initial_backoff * (2 ** attempt))) + (time.random() * 0.1)
print(f"Rate limited. Retrying in {backoff_time:.2f} seconds...")
time.sleep(backoff_time)
else:
if attempt < self.retry_attempts - 1:
backoff_time = self.initial_backoff * (2 ** attempt) + (time.random() * 0.1)
print(f"Retrying in {backoff_time:.2f} seconds...")
time.sleep(backoff_time)
else:
return [] # Return empty list if replies cannot be fetched after retries
return []
def lazy_load(self) -> Iterator[Document]:
for channel_id in self.channel_ids:
channel_info = self._get_channel_info(channel_id)
channel_name = channel_info.get("name", channel_id)
# Start from the last sync timestamp for this channel
oldest_ts = self.last_sync_timestamps.get(channel_id, 0.0)
print(f"Loading incremental messages for channel '{channel_name}' ({channel_id}) since {oldest_ts}")
cursor = None
has_more = True
while has_more:
try:
response = self._fetch_messages_with_retry(channel_id, oldest_ts=oldest_ts, cursor=cursor)
messages = response.get("messages", [])
has_more = response.get("has_more", False)
cursor = response.get("response_metadata", {}).get("next_cursor")
for msg in messages:
if msg.get("subtype") == "bot_message": # Skip bot messages by default
continue
if msg.get("type") != "message": # Only process actual messages
continue
# Fetch replies if it's a thread parent
replies_content = []
if msg.get("reply_count", 0) > 0 and msg.get("thread_ts") == msg.get("ts"):
replies = self._fetch_replies_with_retry(channel_id, msg["ts"])
# Skip the parent message itself if it's included in replies
replies_content = [
r.get("text", "") for r in replies if r.get("ts") != msg["ts"] and r.get("subtype") != "bot_message"
]
full_message_text = msg.get("text", "")
if replies_content:
full_message_text += "nnThread Replies:n" + "n".join(replies_content)
# Extract sender info
user_id = msg.get("user")
sender_name = self._get_user_name(user_id) if user_id else "Bot/App"
metadata = {
"source": f"slack_channel_{channel_id}",
"channel_id": channel_id,
"channel_name": channel_name,
"message_ts": msg["ts"], # Unique identifier for message
"sender": sender_name,
"user_id": user_id,
"thread_ts": msg.get("thread_ts"),
"permalink": self.client.chat_getPermalink(channel=channel_id, message_ts=msg["ts"]).get("permalink")
if self.client.chat_getPermalink else None,
"message_type": msg.get("type"),
"subtype": msg.get("subtype"),
"is_thread_parent": msg.get("thread_ts") == msg.get("ts"),
"reply_count": msg.get("reply_count", 0),
"latest_reply_ts": msg.get("latest_reply"),
}
yield Document(page_content=full_message_text, metadata=metadata)
# Respect Slack API rate limits between pages
if has_more:
time.sleep(0.5) # Small delay to avoid hitting limits too hard
except Exception as e:
print(f"Error fetching messages for channel {channel_id}: {e}")
break # Move to next channel or stop if critical
def _get_user_name(self, user_id: str) -> str:
# Simple cache for user names to reduce API calls
if not hasattr(self, '_user_cache'):
self._user_cache = {}
if user_id not in self._user_cache:
try:
response = self.client.users_info(user=user_id)
self._user_cache[user_id] = response["user"]["profile"]["display_name"] or response["user"]["real_name"]
except SlackApiError:
self._user_cache[user_id] = f"Unknown User ({user_id})"
return self._user_cache[user_id]
# --- 运行示例 ---
slack_token = os.getenv("SLACK_BOT_TOKEN")
channel_ids_str = os.getenv("SLACK_CHANNEL_IDS", "")
if not slack_token:
raise ValueError("SLACK_BOT_TOKEN not found in environment variables.")
if not channel_ids_str:
print("Please set SLACK_CHANNEL_IDS in your .env file. Using dummy IDs for now.")
target_channel_ids = ["C0123ABCD", "C0987EFGH"] # Replace with actual channel IDs
else:
target_channel_ids = [cid.strip() for cid in channel_ids_str.split(',')]
# 假设我们有一个持久化存储来记录每个频道的上次同步时间
# {channel_id: latest_ts_float}
# 首次运行或没有历史记录时,可以设置为一个非常早的timestamp (e.g., 0.0)
# 在实际应用中,这会从数据库或文件加载
last_successful_sync_timestamps = {
"C0123ABCD": 1672531200.0, # Example: Jan 1, 2023 00:00:00 UTC
"C0987EFGH": 1672531200.0,
}
print(f"Starting incremental load for Slack channels. Last sync state: {last_successful_sync_timestamps}")
incremental_slack_loader = SlackIncrementalLoader(
token=slack_token,
channel_ids=target_channel_ids,
last_sync_timestamps=last_successful_sync_timestamps
)
processed_count = 0
new_channel_sync_states = last_successful_sync_timestamps.copy()
start_time = time.time()
try:
for doc in incremental_slack_loader.lazy_load():
processed_count += 1
channel_id = doc.metadata["channel_id"]
message_ts = float(doc.metadata["message_ts"])
# 更新该频道的最晚时间戳
if message_ts > new_channel_sync_states.get(channel_id, 0.0):
new_channel_sync_states[channel_id] = message_ts
if processed_count % 50 == 0:
elapsed_time = time.time() - start_time
print(f"Processed {processed_count} incremental Slack messages. "
f"Sender: {doc.metadata.get('sender')}, Channel: {doc.metadata.get('channel_name')}. "
f"TS: {doc.metadata['message_ts']}. Time: {elapsed_time:.2f}s")
# 实际的文档处理逻辑
# ...
except Exception as e:
print(f"An error occurred during incremental Slack loading: {e}")
finally:
end_time = time.time()
print(f"nFinished incremental Slack loading. Total messages processed: {processed_count}.")
print(f"Time taken: {(end_time - start_time):.2f} seconds.")
# 成功完成后,更新持久化存储中的 last_successful_sync_timestamps
print(f"Next sync should start with states: {new_channel_sync_states}")
# 在生产环境中,你会将 new_channel_sync_states 写入数据库或配置服务
关键点:
WebClient和错误处理: 使用slack_sdk.WebClient进行API交互,并内置了对SlackApiError的处理,特别是ratelimited错误,实现了指数退避和Retry-After头部的解析。_fetch_messages_with_retry/_fetch_replies_with_retry: 封装了消息和回复的获取逻辑,包含重试机制。oldest参数: 利用conversations.history的oldest参数实现增量拉取。- 处理线程: Slack消息的线程回复需要单独调用
conversations.repliesAPI来获取,并将其合并到主消息的page_content中。 - 排除机器人消息: 默认跳过
bot_message子类型,因为它们通常不包含有价值的用户生成内容。 - 用户名称解析:
_get_user_name辅助函数用于将用户ID转换为可读的用户名,并进行了缓存以减少API调用。 - 状态管理:
last_sync_timestamps需要为每个频道单独管理,并从持久化存储中读取和写入。
3.5 策略3:并行化处理多个Slack频道
类似于Notion,如果需要处理大量Slack频道,并行化可以显著提高摄取速度。每个频道可以独立地进行消息拉取。
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Iterator, Dict
from langchain.docstore.document import Document
from slack_sdk import WebClient
from dotenv import load_dotenv
load_dotenv()
slack_token = os.getenv("SLACK_BOT_TOKEN")
if not slack_token:
raise ValueError("SLACK_BOT_TOKEN not found in environment variables.")
channel_ids_str = os.getenv("SLACK_CHANNEL_IDS", "")
if not channel_ids_str:
print("Warning: SLACK_CHANNEL_IDS not set. Using dummy IDs for demonstration.")
target_channel_ids = ["C0123ABCD", "C0987EFGH", "C0XYZW"] # Replace with actual channel IDs
else:
target_channel_ids = [cid.strip() for cid in channel_ids_str.split(',')]
# 示例:假设我们有上次同步的状态 (每个频道一个时间戳)
last_sync_state_for_channels: Dict[str, float] = {
"C0123ABCD": 1672531200.0,
"C0987EFGH": 1672531200.0,
"C0XYZW": 1672531200.0,
}
def load_slack_channel_lazily(channel_id: str, token: str,
last_sync_ts: Optional[float] = None) -> Iterator[Document]:
"""
一个函数,用于包装SlackIncrementalLoader的lazy_load,并处理单个频道。
"""
print(f"Starting lazy load for Slack channel: {channel_id} since {last_sync_ts or 'beginning'}")
# 针对单个频道,创建一个临时的SlackIncrementalLoader实例
# 这样每个线程/进程都有自己的客户端和状态管理
loader = SlackIncrementalLoader(
token=token,
channel_ids=[channel_id],
last_sync_timestamps={channel_id: last_sync_ts} if last_sync_ts else {}
)
current_channel_latest_ts = last_sync_ts
try:
for doc in loader.lazy_load():
yield doc
# 记录该频道内处理到的最新时间戳
msg_ts = float(doc.metadata["message_ts"])
if current_channel_latest_ts is None or msg_ts > current_channel_latest_ts:
current_channel_latest_ts = msg_ts
except Exception as e:
print(f"Error loading channel {channel_id}: {e}")
finally:
print(f"Finished lazy load for channel: {channel_id}. Latest TS: {current_channel_latest_ts}")
# 在这里可以返回或记录该频道最新的同步时间戳,以便主程序更新全局状态
yield (channel_id, current_channel_latest_ts) # Special yield for final state
def parallel_slack_ingestion(channel_ids: List[str], token: str,
last_sync_states: Dict[str, float],
max_workers: int = 5) -> Iterator[Document]:
"""
并行化处理多个Slack频道的文档摄取。
"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_channel_id = {
executor.submit(load_slack_channel_lazily, cid, token, last_sync_states.get(cid)): cid
for cid in channel_ids
}
for future in as_completed(future_to_channel_id):
channel_id = future_to_channel_id[future]
try:
# 获取生成器结果
for item in future.result():
if isinstance(item, Document):
yield item
elif isinstance(item, tuple) and len(item) == 2:
# 这是一个特殊返回的元组,包含频道ID和该频道最新的同步时间戳
# 主程序可以利用这个来更新全局的 last_sync_state_for_channels
pass # We will handle state update in the main loop for simplicity
except Exception as exc:
print(f"Channel {channel_id} generated an exception: {exc}")
# --- 运行示例 ---
print(f"Starting parallel Slack ingestion for {len(target_channel_ids)} channels.")
total_processed_docs = 0
start_time = time.time()
updated_sync_states = last_sync_state_for_channels.copy()
for doc in parallel_slack_ingestion(target_channel_ids, slack_token, last_sync_state_for_channels, max_workers=3):
total_processed_docs += 1
# 更新全局的同步状态 (这里只是一个简单示例,实际可能需要更复杂的并发安全更新)
channel_id = doc.metadata["channel_id"]
message_ts = float(doc.metadata["message_ts"])
if message_ts > updated_sync_states.get(channel_id, 0.0):
updated_sync_states[channel_id] = message_ts
if total_processed_docs % 50 == 0:
elapsed = time.time() - start_time
print(f"Total processed documents: {total_processed_docs}. Current doc from {doc.metadata.get('channel_name', 'N/A')}. Time: {elapsed:.2f}s")
# 实际的文档处理逻辑
# ...
end_time = time.time()
print(f"nFinished parallel Slack ingestion. Total documents processed: {total_processed_docs}.")
print(f"Total time taken: {(end_time - start_time):.2f} seconds.")
print(f"Final updated sync states: {updated_sync_states}")
# 在生产环境中,将 updated_sync_states 写入持久化存储
并行化注意事项:
- API限流: Slack API有更严格的速率限制(按层级和方法)。并行化时更容易触及这些限制。
slack_sdk.WebClient内部有一定的限流处理,但大规模并行仍需谨慎。可能需要额外的全局限流器或更长的等待时间。 - 状态管理:
last_sync_state_for_channels在并行环境中更新时,需要考虑并发安全。简单的字典更新可能不够,需要使用锁或将更新操作推送到一个中央服务。 ThreadPoolExecutor: 同样,I/O密集型任务适合线程池。
3.6 Slack文档的后处理:分块与元数据增强
Slack消息通常比Notion页面短,但线程回复可能使单个“逻辑消息”变长。
3.6.1 文本分块 (Chunking)
对于Slack,通常一个消息本身就是一个很好的块。但如果消息很长或包含大量线程回复,仍然需要分块。
from langchain.text_splitter import RecursiveCharacterTextSplitter
def process_slack_document_for_llm(doc: Document) -> List[Document]:
"""
对单个Slack文档(消息+线程)进行分块和元数据处理。
"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Slack消息通常较短,可以设置更小的块大小
chunk_overlap=100,
length_function=len,
separators=["nn", "n", " ", ""]
)
chunks = text_splitter.split_documents([doc])
# 对每个块添加或更新元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = f"{doc.metadata['message_ts']}-{i}"
chunk.metadata["chunk_index"] = i
chunk.metadata["total_chunks"] = len(chunks)
# 确保每个块都包含原始消息的关键元数据
chunk.metadata["channel_name"] = doc.metadata.get("channel_name", "Unknown Channel")
chunk.metadata["sender"] = doc.metadata.get("sender", "Unknown Sender")
chunk.metadata["message_ts"] = doc.metadata.get("message_ts", "N/A")
chunk.metadata["permalink"] = doc.metadata.get("permalink", "N/A")
return chunks
# 示例使用 (假设我们已经从Slack加载了一个文档)
# dummy_slack_doc = Document(
# page_content="""
# @Alice 大家好,关于我们下周的产品发布计划,我有一些初步的想法想和大家讨论一下。
# 主要包括:
# 1. 宣传材料的最终定稿
# 2. 媒体沟通的策略
# 3. 内部培训的安排
# Thread Replies:
# @Bob 好的,我这边宣传材料初稿已经准备好了,可以分享给大家看看。
# @Charlie 媒体沟通方面,我建议我们可以考虑几家科技媒体和行业KOL。
# @David 内部培训的话,需要确定最终的产品特性和演示DEMO。
# """,
# metadata={
# "channel_id": "C12345",
# "channel_name": "#general",
# "message_ts": "1678886400.000000",
# "sender": "Alice",
# "user_id": "U0XYZ",
# "permalink": "https://slack.com/archives/C12345/p1678886400000000"
# }
# )
# processed_chunks = process_slack_document_for_llm(dummy_slack_doc)
# print(f"Original document has {len(dummy_slack_doc.page_content)} characters.")
# print(f"Split into {len(processed_chunks)} chunks.")
# for i, chunk in enumerate(processed_chunks):
# print(f"nChunk {i+1} (ID: {chunk.metadata['chunk_id']}):")
# print(f"Content: {chunk.page_content[:200]}...")
# print(f"Metadata: {chunk.metadata}")
3.6.2 元数据增强
Slack消息的元数据对于RAG非常有用,例如:发送者、频道、消息时间戳、是否是线程消息、线程ID等。
表格:Slack文档元数据示例
| 字段名 | 来源 | 描述 | 用途 |
|---|---|---|---|
message_ts |
Slack API | 消息的唯一时间戳ID | 关联回原始消息,排重,按时间排序 |
channel_id |
Slack API | 消息所在频道的ID | 按频道过滤 |
channel_name |
Slack API | 消息所在频道的名称 | 搜索结果展示,上下文 |
sender |
Slack API (user info) | 消息发送者名称 | 按发送者查询 |
user_id |
Slack API | 消息发送者ID | |
permalink |
Slack API | 消息的永久链接 | 方便用户跳转到原文 |
thread_ts |
Slack API | 如果是线程回复,指向父消息的ts |
重构对话上下文,过滤非线程消息 |
is_thread_parent |
自定义 | 是否为线程的起始消息 | 区分主消息和回复 |
source |
自定义 | 指示数据来源(如slack_channel_X) |
区分不同来源的数据 |
chunk_id |
分块后生成 | 文本块的唯一ID | 跟踪每个文本块,排重 |
chunk_index |
分块后生成 | 文本块在原始文档中的序号 | 保持文本块顺序,重构完整文档 |
4. 通用最佳实践与架构考虑
处理百万量级数据摄取,不仅仅是编写正确的Loader代码,还需要系统性的考量。
4.1 错误处理与重试机制
- 指数退避: 在API请求失败时,使用指数退避(Exponential Backoff)策略进行重试。这意味着每次失败后等待的时间会越来越长,避免对API造成持续压力。
- Jitter: 在指数退避的基础上加入随机抖动(Jitter),防止所有重试请求在同一时间爆发,进一步缓解API压力。
- 熔断机制: 当某个API或数据源持续失败时,暂时停止对其的请求,避免资源浪费和级联失败。
4.2 状态管理与持久化
- 数据库: 使用关系型数据库(如PostgreSQL、MySQL)或NoSQL数据库(如MongoDB)来存储每个数据源(Notion数据库/页面、Slack频道)的上次同步时间、已处理的文档ID、错误日志等状态信息。
- 检查点: 定期保存处理进度,即使管道中断也能从上次的检查点恢复。
- 幂等性: 设计摄取过程为幂等操作。这意味着即使重复执行,也不会产生副作用(如重复数据)。通过使用文档ID(Notion
page_id,Slackmessage_ts)进行检查和更新,而不是简单的插入。
4.3 观测性与监控
- 日志: 详细记录摄取过程中的关键事件,包括成功加载的文档数量、失败的请求、API限流警告、处理时间等。
- 指标: 收集和监控关键性能指标(KPIs),如每秒处理文档数、API请求延迟、错误率等。使用Prometheus、Grafana等工具进行可视化。
- 告警: 设置告警机制,当出现大量错误、处理停滞或API限流时及时通知运维人员。
4.4 资源管理
- 内存: 始终优先使用
lazy_load()。对于每个Document的处理,尽量保持内存占用低。 - CPU: 对于CPU密集型任务(如复杂的文本清理或模型推理),考虑使用
ProcessPoolExecutor或分布式计算框架。 - 网络: 优化API请求,减少不必要的数据传输。合理利用缓存。
4.5 架构考量
- 任务调度器 (Orchestration): 使用Apache Airflow、Prefect、Dagster等工具来调度和管理数据摄取管道。它们提供了任务依赖、重试、监控和可视化功能。
- 消息队列 (Message Queues): 将数据摄取和后续处理(如分块、向量化、存储)解耦。使用Kafka、RabbitMQ等消息队列,将原始
Document对象发送到队列,由下游消费者异步处理。这有助于缓冲流量、处理背压。 - 分布式存储: 对于原始文档或中间处理结果,可以存储在S3、Azure Blob Storage等对象存储服务中,提供高可用性和可伸缩性。
- 向量数据库 (Vector Databases): 将最终生成的文本块及其嵌入存储在Pinecone、Weaviate、Qdrant、Chroma等向量数据库中,以便进行高效的语义搜索和RAG。
- 容器化: 使用Docker将整个摄取服务容器化,确保在不同环境中运行的一致性。Kubernetes可以用于管理和扩展这些容器。
5. 总结与展望
通过本讲座,我们深入探讨了如何利用LangChain的DocumentLoader机制,结合Notion和Slack的API特性,构建能够处理百万量级文档的数据摄取管道。我们强调了lazy_load()在内存效率上的关键作用,并提供了增量更新和并行化处理的策略与代码示例。
数据摄取是构建强大LLM应用的第一步,也是至关重要的一步。一个健壮、高效、可扩展的数据摄取管道,能够将企业内部散落的非结构化数据转化为LLM可理解、可利用的知识宝藏。这不仅提升了LLM的智能水平,也为企业带来了前所未有的洞察力和自动化能力。随着LangChain生态的不断发展和API的完善,我们有理由相信,构建这样的智能数据管道将变得越来越高效和便捷。