各位开发者、架构师,以及对未来AI Agent充满好奇的朋友们,大家好。
今天,我们将深入探讨一个前沿且充满挑战性的话题:如何构建一个能够“边运行边学习”、动态更新私有知识库的“长青”Agent。我们称之为“Continuous Ingestion”,即持续摄取。
在人工智能领域,我们已经习惯了Agent通过大型语言模型(LLM)与预训练知识进行交互。然而,这些预训练知识是静态的,无法感知瞬息万变的世界。一个真正有用的Agent,尤其是在企业或个人私有领域,必须能够持续地学习新信息、更新旧知识、甚至遗忘过时信息,从而保持其知识库的“新鲜”和“准确”。这正是“长青Agent”的核心要义。
静态知识的局限性与长青Agent的必要性
想象一下,你有一个企业内部的客服Agent。它通过检索增强生成(RAG)技术,从企业文档中获取信息来回答客户问题。如果这些文档是去年上传的,而公司的产品、政策在过去一年中发生了多次迭代,那么这个Agent的回答很快就会变得不准确甚至误导客户。这就是静态知识的局局限性。
一个“长青”Agent,其目标是克服这一局限。它不是一次性地吸收所有知识,然后坐等知识过时,而是像一个活的有机体一样,不断地从环境中吸收新的养分,代谢旧的废物,从而保持活力和适应性。这对于以下场景至关重要:
- 企业内部知识管理: 政策更新、产品手册修订、会议纪要、项目进展。
- 个人信息助理: 阅读新文章、邮件、日程安排、学习笔记。
- 实时数据分析: 市场趋势、新闻事件、股票价格。
- 法律与合规: 法规条文修订、判例更新。
实现这一目标的核心挑战在于如何设计一个“持续摄取”的管道,让Agent能够高效、准确地将外部信息融入其内部知识体系。
长青Agent的架构蓝图
要构建一个长青Agent,我们需要一个精心设计的架构,它不仅仅包含一个LLM和检索模块,更需要一个健壮的知识管理和摄取系统。其核心组件包括:
- Agent核心(Agent Core): 负责理解用户意图、规划行动、与LLM交互以及协调其他模块。
- 动态知识库(Dynamic Knowledge Base): 存储Agent的私有知识,并支持高效的检索、更新和删除。通常是向量数据库与传统数据库的混合。
- 持续摄取管道(Continuous Ingestion Pipeline): 负责从各种数据源监控、提取、转换和加载(ETL)新信息到知识库。
- 学习与更新机制(Learning & Update Mechanism): 负责协调知识库的更新策略,包括新知识的融入、旧知识的修订、冲突解决以及遗忘机制。
- 反馈循环(Feedback Loop): 允许用户或Agent自身对知识的准确性、有用性提供反馈,从而驱动知识库的优化。
以下是一个简化的架构图,以表格形式呈现主要模块及其功能:
| 模块名称 | 核心功能 | 典型技术栈 |
|---|---|---|
| Agent核心 | 意图识别、任务规划、LLM调用、工具使用、RAG协调 | LangChain, LlamaIndex, 自定义Agent框架, OpenAI/Anthropic LLMs |
| 动态知识库 | 存储文本块、向量嵌入、元数据;支持语义检索、结构化查询、CRUD操作 | 向量数据库: Chroma, Weaviate, Pinecone, Qdrant; 关系型/文档型数据库: PostgreSQL, MongoDB |
| 数据源 | 提供原始信息:Web页面、API接口、文件系统、数据库、消息队列 | HTTP/REST APIs, S3/Azure Blob, RDBMS, Kafka/RabbitMQ |
| 摄取管道 | 监控数据源、提取新数据、清洗、分块、嵌入、加载至知识库 | Airflow, Prefect, Custom Python scripts, Change Data Capture (CDC) tools, Webhooks |
| 学习与更新机制 | 协调知识库更新策略(增量、全量、版本控制)、冲突解决、遗忘策略、重新嵌入 | 自定义逻辑, 后台任务调度 |
| 反馈循环 | 收集用户对Agent回答的评价(正确/错误)、Agent自我评估、驱动知识库优化 | UI/API接口, 数据库, 消息队列 |
摄取管道:从源头到知识库
持续摄取管道是长青Agent的生命线。它负责将外部世界的动态信息转化为Agent可理解和利用的内部知识。这个管道通常包含以下几个阶段:
1. 数据源与监控
首先,我们需要识别Agent所需的数据来源,并建立有效的监控机制。
常见数据源:
- Web内容: 博客、新闻、文档、API文档。
- API接口: 内部服务、第三方数据提供商。
- 文件系统/云存储: 本地文件、S3、Azure Blob、Google Cloud Storage中的PDF、DOCX、TXT、MD文件。
- 数据库: 关系型数据库、NoSQL数据库中的结构化数据。
- 消息队列/事件流: Kafka、RabbitMQ,用于实时事件通知。
- 内部系统: Jira、Confluence、CRM、ERP系统。
监控机制:
- 轮询(Polling): 定期检查数据源是否有更新。简单但可能不实时,且对源端有压力。
- Webhooks: 数据源发生变化时主动通知Agent。实时性高,但需要数据源支持。
- 事件流(Event Streams): 通过Kafka、RabbitMQ等消息队列订阅数据源的变更事件。实时性高,可扩展性强,但实现复杂。
- 变更数据捕获(CDC – Change Data Capture): 针对数据库,捕获数据表的所有变更,如Debezium。
代码示例:一个简单的文件系统监控
我们可以使用 watchdog 库来监控文件系统中的文件变化,或者简单地使用一个定时任务来检查文件修改时间。这里我们用一个模拟的定时检查器。
import os
import time
from datetime import datetime
class FileMonitor:
def __init__(self, folder_path, interval_seconds=60):
self.folder_path = folder_path
self.interval = interval_seconds
self.last_checked_times = {} # Stores {filepath: last_modification_time}
def _get_files_to_process(self):
new_or_modified_files = []
current_files = {}
for root, _, files in os.walk(self.folder_path):
for file_name in files:
file_path = os.path.join(root, file_name)
try:
# Get the last modification time
mtime = os.path.getmtime(file_path)
current_files[file_path] = mtime
if file_path not in self.last_checked_times or
self.last_checked_times[file_path] < mtime:
new_or_modified_files.append(file_path)
except OSError as e:
print(f"Error accessing file {file_path}: {e}")
continue
# Detect deleted files (optional, but good for cleanup)
deleted_files = [f for f in self.last_checked_times if f not in current_files]
if deleted_files:
print(f"Detected deleted files (need to handle): {deleted_files}")
# In a real system, you might trigger a cleanup in the KB
self.last_checked_times.update(current_files) # Update for next check
# Remove deleted files from tracking
for df in deleted_files:
if df in self.last_checked_times:
del self.last_checked_times[df]
return new_or_modified_files
def start_monitoring(self, process_callback):
print(f"Starting file monitor for {self.folder_path} every {self.interval} seconds.")
while True:
modified_files = self._get_files_to_process()
if modified_files:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Detected changes in: {modified_files}")
for file_path in modified_files:
process_callback(file_path)
time.sleep(self.interval)
# --- Usage Example ---
def process_file_for_ingestion(file_path):
print(f"Processing file: {file_path} for ingestion...")
# Here you would read the file, extract content, chunk, embed, and store.
# For now, just print a placeholder.
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read(100) # Read first 100 chars
print(f" Content snippet: '{content.strip()}'...")
print(f" Placeholder: Ingesting '{file_path}' into knowledge base.")
if __name__ == "__main__":
# Create a dummy folder and file for testing
monitor_dir = "agent_knowledge_source"
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, "doc_v1.txt"), "w", encoding="utf-8") as f:
f.write("This is the initial content of document version 1.")
with open(os.path.join(monitor_dir, "report_q1.txt"), "w", encoding="utf-8") as f:
f.write("Q1 financial report data...")
monitor = FileMonitor(monitor_dir, interval_seconds=5) # Check every 5 seconds
# This will run indefinitely. In a real application, you'd run this in a separate thread/process.
# For demonstration, we'll just run it for a short period or manually stop.
# monitor.start_monitoring(process_file_for_ingestion)
print("nSimulating file changes (run monitor.start_monitoring in a separate process/thread to see real-time updates):")
print(f"Initial files in {monitor_dir}: {os.listdir(monitor_dir)}")
# Simulate modification
time.sleep(2)
with open(os.path.join(monitor_dir, "doc_v1.txt"), "a", encoding="utf-8") as f:
f.write("nAppended new information for version 1.1.")
print("Modified doc_v1.txt")
# Simulate new file
time.sleep(2)
with open(os.path.join(monitor_dir, "new_policy.txt"), "w", encoding="utf-8") as f:
f.write("New company policy regarding remote work.")
print("Created new_policy.txt")
# To see these changes detected, you'd run monitor.start_monitoring in a real environment.
# For this script, we'll just demonstrate the logic of _get_files_to_process.
print("nManually checking _get_files_to_process after simulated changes:")
temp_monitor = FileMonitor(monitor_dir) # Re-initialize to simulate fresh start for demonstration
temp_monitor.last_checked_times = {} # Clear for demonstration
modified = temp_monitor._get_files_to_process()
print(f"Files detected for initial processing (including new/modified): {modified}")
for f in modified:
process_file_for_ingestion(f)
print("Second check (should show no new changes unless files were modified again):")
modified = temp_monitor._get_files_to_process()
print(f"Files detected for second processing: {modified}")
# Cleanup
# os.remove(os.path.join(monitor_dir, "doc_v1.txt"))
# os.remove(os.path.join(monitor_dir, "report_q1.txt"))
# os.remove(os.path.join(monitor_dir, "new_policy.txt"))
# os.rmdir(monitor_dir)
这个示例展示了如何通过文件修改时间来检测文件变化。在实际生产环境中,watchdog库会更高效,或者使用更专业的CDC工具。
2. 信息提取与转换
当检测到新数据或更新数据后,下一步是提取其内容并进行转换,使其适合Agent消费。
内容提取:
- 文本文件: 直接读取。
- PDF/DOCX: 使用
PyPDF2,python-docx等库提取文本。 - HTML: 使用
BeautifulSoup抓取特定元素。 - API响应: 解析JSON/XML。
数据清洗与预处理:
- 去除无关的HTML标签、广告、页眉页脚。
- 标准化文本(大小写、编码)。
- 处理特殊字符和乱码。
分块(Chunking):
LLM通常有输入长度限制,且RAG要求检索的文本块(chunk)既不过大导致无关信息过多,也不过小导致上下文不足。因此,将长文档分割成有意义的、大小适中的块是关键。
- 固定大小分块: 最简单,但可能在句子或段落中间截断。
- 基于语义的分块: 尝试根据文档结构(标题、段落)或语义相似度来分块,确保每个块的上下文完整。LangChain和LlamaIndex提供了多种分块策略。
- 重叠分块: 块之间有少量重叠,有助于在检索时捕捉跨块的上下文。
元数据附加:
为每个文本块添加元数据(如原始文件名、URL、作者、创建时间、更新时间、主题、权限等)。这些元数据在检索过滤和知识更新时非常有用。
代码示例:内容提取与分块
我们使用 langchain_text_splitters 来进行分块,并为数据附加元数据。
from langchain_text_splitters import RecursiveCharacterTextSplitter
import os
def extract_and_chunk(file_path):
"""
Reads a file, extracts content, and chunks it.
Attaches basic metadata.
"""
file_name = os.path.basename(file_path)
file_extension = os.path.splitext(file_name)[1]
content = ""
# Basic content extraction (can be extended for PDF, DOCX etc.)
try:
if file_extension.lower() == ".txt":
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_extension.lower() == ".md":
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Add more parsers here for .pdf, .docx, .html etc.
# Example for PDF (requires pypdf, unstructured):
# elif file_extension.lower() == ".pdf":
# from langchain_community.document_loaders import PyPDFLoader
# loader = PyPDFLoader(file_path)
# docs = loader.load()
# content = "n".join([doc.page_content for doc in docs])
else:
print(f"Unsupported file type: {file_extension}. Skipping {file_path}")
return []
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return []
if not content:
return []
# Initialize text splitter
# Using RecursiveCharacterTextSplitter which tries to split by paragraphs, then sentences, then words.
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
# Create documents (LangChain's representation of a chunk with metadata)
documents = []
chunks = text_splitter.split_text(content)
for i, chunk in enumerate(chunks):
metadata = {
"source": file_name,
"file_path": file_path,
"chunk_id": f"{file_name}_{i}",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"file_mtime": os.path.getmtime(file_path) # Last modification time of the file
# Add more custom metadata like 'author', 'topic', 'permissions' etc.
}
documents.append({"page_content": chunk, "metadata": metadata})
return documents
# --- Usage Example ---
if __name__ == "__main__":
# Ensure monitor_dir exists and has content
monitor_dir = "agent_knowledge_source"
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, "long_doc.txt"), "w", encoding="utf-8") as f:
f.write("This is the first paragraph of a very long document. " * 50 + "nn")
f.write("This is the second paragraph with some more details. " * 70 + "nn")
f.write("And finally, the third paragraph concludes the document. " * 60)
print(f"Processing long_doc.txt from {monitor_dir}...")
chunks = extract_and_chunk(os.path.join(monitor_dir, "long_doc.txt"))
if chunks:
print(f"Extracted {len(chunks)} chunks.")
for i, chunk in enumerate(chunks):
print(f"n--- Chunk {i+1} ---")
print(f"Content (first 200 chars): {chunk['page_content'][:200]}...")
print(f"Metadata: {chunk['metadata']}")
else:
print("No chunks extracted.")
3. 嵌入与存储
经过预处理和分块的文本现在需要转化为向量嵌入(embeddings),并存储到向量数据库中。
嵌入模型(Embedding Models):
选择一个合适的嵌入模型至关重要。常见的选择有:
- OpenAI Embeddings:
text-embedding-ada-002,text-embedding-3-small,text-embedding-3-large。 - Sentence-Transformers: 开源模型,如
all-MiniLM-L6-v2,BAAI/bge-large-en-v1.5,可在本地运行。 - 自定义微调模型: 针对特定领域,可能需要微调自己的嵌入模型以提高相关性。
向量数据库(Vector Databases):
向量数据库是存储向量嵌入并支持高效相似度搜索(ANN)的核心组件。它们通常也支持存储与向量关联的元数据。
- ChromaDB: 轻量级,易于部署,适合小型项目或本地测试。
- Weaviate: 功能丰富,支持多模态,可扩展性好。
- Pinecone: 云原生,托管服务,高性能。
- Qdrant: 开源,高性能,支持复杂过滤。
- Milvus: 开源,为大规模向量搜索设计。
选择哪个取决于项目规模、性能需求、部署环境和预算。
代码示例:嵌入与存储(使用ChromaDB和OpenAI Embeddings)
首先,确保安装了必要的库:
pip install chromadb openai langchain-openai
import os
from datetime import datetime
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from dotenv import load_dotenv
# Load environment variables (e.g., OPENAI_API_KEY)
load_dotenv()
# --- Configuration ---
CHROMA_DB_PATH = "./chroma_db_agent"
EMBEDDING_MODEL_NAME = "text-embedding-ada-002"
# For local models, you might use:
# from langchain_community.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
def get_embedding_function():
"""Returns the configured embedding function."""
return OpenAIEmbeddings(model=EMBEDDING_MODEL_NAME)
def initialize_vector_db():
"""Initializes or loads the ChromaDB client."""
embeddings = get_embedding_function()
# If the database exists, load it; otherwise, initialize an empty one.
db = Chroma(persist_directory=CHROMA_DB_PATH, embedding_function=embeddings)
return db
def ingest_documents_to_chroma(documents, vector_db):
"""
Ingests a list of documents (chunks with metadata) into ChromaDB.
Handles updates if documents with the same 'chunk_id' already exist.
"""
if not documents:
print("No documents to ingest.")
return
texts = [doc["page_content"] for doc in documents]
metadatas = [doc["metadata"] for doc in documents]
ids = [doc["metadata"]["chunk_id"] for doc in documents]
print(f"Attempting to add/update {len(texts)} documents to ChromaDB...")
# For ChromaDB, `add_texts` can be used for both initial addition and "upsert-like" behavior
# if you provide explicit IDs and those IDs already exist.
# However, it's safer to explicitly handle updates if your DB doesn't have a direct upsert.
# Chroma's `add_texts` with explicit IDs will effectively update if IDs conflict.
# For more complex update logic, you might fetch existing docs by ID, compare, and then update/delete/add.
try:
# Check if IDs already exist to decide on update vs. add.
# This is a simplification. A robust system would check metadata (e.g., 'updated_at' or hash of content)
# to determine if an actual update is needed.
existing_ids = vector_db.get(ids=ids, include=[])['ids'] # Get metadata for existing IDs
ids_to_add = []
texts_to_add = []
metadatas_to_add = []
ids_to_update = []
texts_to_update = []
metadatas_to_update = []
for i, doc_id in enumerate(ids):
if doc_id in existing_ids:
# If chunk_id exists, we treat it as an update.
# In a real system, you'd compare content or a content hash to avoid unnecessary re-embedding.
ids_to_update.append(doc_id)
texts_to_update.append(texts[i])
metadatas_to_update.append(metadatas[i])
else:
ids_to_add.append(doc_id)
texts_to_add.append(texts[i])
metadatas_to_add.append(metadatas[i])
if ids_to_add:
print(f"Adding {len(ids_to_add)} new documents.")
vector_db.add_texts(texts=texts_to_add, metadatas=metadatas_to_add, ids=ids_to_add)
if ids_to_update:
print(f"Updating {len(ids_to_update)} existing documents.")
# Chroma's update is a bit implicit with add_texts and existing IDs.
# A more explicit way for Chroma would be to delete and then add.
# vector_db.delete(ids=ids_to_update) # If you want to explicitly delete first
vector_db.add_texts(texts=texts_to_update, metadatas=metadatas_to_update, ids=ids_to_update) # This performs an upsert if IDs exist
vector_db.persist() # Save changes to disk
print("Ingestion complete. ChromaDB persisted.")
except Exception as e:
print(f"Error during ingestion: {e}")
# --- Combined Ingestion Process ---
def run_ingestion_process(file_path, vector_db):
print(f"Starting full ingestion for: {file_path}")
documents = extract_and_chunk(file_path) # Use the previously defined function
if documents:
ingest_documents_to_chroma(documents, vector_db)
else:
print(f"No content extracted or chunked from {file_path}.")
if __name__ == "__main__":
# Initialize ChromaDB
db = initialize_vector_db()
# Create dummy files for demonstration
monitor_dir = "agent_knowledge_source"
os.makedirs(monitor_dir, exist_ok=True)
doc1_path = os.path.join(monitor_dir, "doc1.txt")
doc2_path = os.path.join(monitor_dir, "doc2.txt")
with open(doc1_path, "w", encoding="utf-8") as f:
f.write("Initial content of document 1. This is a very important piece of information.")
with open(doc2_path, "w", encoding="utf-8") as f:
f.write("The second document contains general knowledge about AI agents.")
# First ingestion run
print("n--- Initial Ingestion ---")
run_ingestion_process(doc1_path, db)
run_ingestion_process(doc2_path, db)
# Verify initial content
print(f"nTotal documents in DB after initial ingestion: {db._collection.count()}")
retrieved_docs = db.similarity_search("important information about document 1", k=1)
print(f"Retrieved after initial: {retrieved_docs[0].page_content[:50]}...")
# Simulate an update to doc1.txt
time.sleep(1) # Ensure modification time changes
with open(doc1_path, "w", encoding="utf-8") as f: # Overwrite content
f.write("UPDATED content of document 1. This information has been revised and is now even more critical.")
print("n--- Simulating Update for doc1.txt ---")
run_ingestion_process(doc1_path, db) # Re-ingest the modified file
# Verify update
print(f"nTotal documents in DB after update: {db._collection.count()}") # Should be same count if IDs are handled
retrieved_docs_updated = db.similarity_search("critical revised information about document 1", k=1)
print(f"Retrieved after update: {retrieved_docs_updated[0].page_content[:50]}...")
# Simulate a new file
new_doc_path = os.path.join(monitor_dir, "new_feature_guide.txt")
with open(new_doc_path, "w", encoding="utf-8") as f:
f.write("Guide for the new feature X. It explains how to use it step by step.")
print("n--- Simulating New File Ingestion ---")
run_ingestion_process(new_doc_path, db)
print(f"nTotal documents in DB after new file: {db._collection.count()}")
retrieved_new_feature = db.similarity_search("how to use feature X", k=1)
print(f"Retrieved new feature: {retrieved_new_feature[0].page_content[:50]}...")
# Clean up (optional)
# db.delete_collection()
# import shutil
# if os.path.exists(CHROMA_DB_PATH):
# shutil.rmtree(CHROMA_DB_PATH)
# if os.path.exists(monitor_dir):
# shutil.rmtree(monitor_dir)
这个流程将监控、提取、分块和存储集成在一起,形成了持续摄取管道的核心。
Agent编排与检索增强生成(RAG)
一旦知识库被持续更新,Agent就需要利用这些最新知识来回答问题或执行任务。这正是RAG技术发挥作用的地方。
一个长青Agent的RAG流程与传统RAG类似,但其背后检索的知识库是动态的。
- 用户查询: Agent接收用户输入。
- 意图识别与查询重写: Agent分析查询,可能使用LLM将其重写为更适合检索的关键词或语义查询。
- 知识检索: Agent向动态知识库发送检索请求。它会根据用户查询的语义,从向量数据库中找出最相关的文本块。在这一步,元数据过滤非常有用(例如,只检索特定日期后的文档,或特定作者的文档)。
- 上下文构建: 将检索到的相关文本块作为上下文,与用户原始查询一起,构造一个发送给LLM的提示(prompt)。
- LLM生成: LLM利用这些上下文和其自身的预训练知识,生成回答。
- 结果返回: Agent将LLM的回答返回给用户。
代码示例:Agent与ChromaDB的RAG集成
我们将使用LangChain来构建一个简单的RAG链。
import os
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain_core.prompts import PromptTemplate
# Assuming get_embedding_function() and initialize_vector_db() are defined as above
# from your previous ingestion script.
def create_rag_agent(vector_db):
"""
Creates a simple RAG chain using LangChain.
"""
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
# Define a custom prompt template for the LLM
template = """
你是一个知识渊博的助手,能够回答关于公司内部文档的问题。
请根据提供的上下文信息来回答问题。如果上下文没有足够的信息,请说明你不知道。
上下文:
{context}
问题: {question}
有用的回答:
"""
QA_CHAIN_PROMPT = PromptTemplate.from_template(template)
# Create a retriever from the vector store
# The 'as_retriever()' method allows ChromaDB to be used as a source for LangChain's RAG chain.
# We can also specify search_kwargs like 'k' (number of docs to retrieve)
# and 'filter' (metadata filtering).
retriever = vector_db.as_retriever(search_kwargs={"k": 3})
# Create the RAG chain
qa_chain = RetrievalQA.from_chain_type(
llm,
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": QA_CHAIN_PROMPT}
)
return qa_chain
if __name__ == "__main__":
# Ensure ChromaDB is initialized and populated from previous steps
db = initialize_vector_db()
qa_agent = create_rag_agent(db)
print("n--- Agent Q&A Session ---")
# Test with initial knowledge
query1 = "doc1文档最初说了什么重要信息?"
print(f"nUser: {query1}")
response1 = qa_agent.invoke({"query": query1})
print(f"Agent: {response1['result']}")
# print(f"Source Documents: {response1['source_documents']}")
# After doc1 was updated in the ingestion step
query2 = "doc1文档现在关于什么?有什么更新?"
print(f"nUser: {query2}")
response2 = qa_agent.invoke({"query": query2})
print(f"Agent: {response2['result']}")
# print(f"Source Documents: {response2['source_documents']}")
# Test with new knowledge
query3 = "新功能X的使用指南是什么?"
print(f"nUser: {query3}")
response3 = qa_agent.invoke({"query": query3})
print(f"Agent: {response3['result']}")
# print(f"Source Documents: {response3['source_documents']}")
query4 = "公司内部的AI Agent文档有什么内容?"
print(f"nUser: {query4}")
response4 = qa_agent.invoke({"query": query4})
print(f"Agent: {response4['result']}")
通过这个集成,Agent能够动态地利用知识库中的最新信息,从而展现出“长青”的特性。
学习循环:适应与演进
一个长青Agent不仅仅是摄取信息,它还需要能够“学习”并优化其知识和行为。这涉及几个关键机制。
1. 反馈机制
反馈是Agent学习的驱动力。它可以是显式的,也可以是隐式的。
- 显式反馈: 用户直接评价Agent的回答(例如,“有用”/“无用”,“正确”/“错误”)。
- 隐式反馈: 用户行为(例如,用户对Agent回答的后续提问、用户是否点击了某个链接、Agent在任务中的成功率)。
- Agent自我评估: Agent可以利用LLM的能力,根据其自身对事实的理解,评估其生成回答的置信度或与检索上下文的一致性。
当收到反馈时,Agent可以:
- 调整检索策略: 如果某个文档经常导致错误回答,可以降低其检索权重。
- 优化提示词: 根据反馈调整发送给LLM的提示词,以获得更好的结果。
- 标记知识块: 将导致错误回答的知识块标记为“可疑”,等待人工审核。
- 触发知识更新: 如果反馈指出知识已过时或不准确,可以直接触发摄取管道对相关数据源进行重新检查或更新。
代码示例:模拟反馈循环
import uuid
# A simple in-memory store for feedback
feedback_store = []
def record_feedback(query, agent_response, is_correct, user_comment=None, source_docs=None):
"""Records user feedback on an agent's response."""
feedback_id = str(uuid.uuid4())
feedback_entry = {
"feedback_id": feedback_id,
"timestamp": datetime.now().isoformat(),
"query": query,
"agent_response": agent_response,
"is_correct": is_correct,
"user_comment": user_comment,
"source_documents_ids": [doc.metadata.get('chunk_id') for doc in source_docs] if source_docs else [],
"source_documents_content": [doc.page_content for doc in source_docs] if source_docs else []
}
feedback_store.append(feedback_entry)
print(f"Feedback recorded (ID: {feedback_id}): Correct={is_correct}")
return feedback_id
def analyze_feedback(threshold=0.3):
"""
Analyzes feedback to identify potentially problematic knowledge chunks or sources.
This is a very simplified example.
"""
problematic_chunks = {} # {chunk_id: {'incorrect_count': X, 'total_count': Y}}
problematic_sources = {} # {source_file: {'incorrect_count': X, 'total_count': Y}}
for entry in feedback_store:
if not entry['is_correct']:
for chunk_id in entry['source_documents_ids']:
problematic_chunks.setdefault(chunk_id, {'incorrect_count': 0, 'total_count': 0})
problematic_chunks[chunk_id]['incorrect_count'] += 1
for source_doc_content in entry['source_documents_content']:
# Heuristic: Find source file from content or metadata
source_file = next((doc['metadata']['source'] for doc in db._collection.get(ids=entry['source_documents_ids'], include=['metadatas'])['metadatas'] if source_doc_content in doc.values()), "unknown_source")
problematic_sources.setdefault(source_file, {'incorrect_count': 0, 'total_count': 0})
problematic_sources[source_file]['incorrect_count'] += 1
# Increment total counts for all chunks/sources involved in a query
for chunk_id in entry['source_documents_ids']:
problematic_chunks.setdefault(chunk_id, {'incorrect_count': 0, 'total_count': 0})
problematic_chunks[chunk_id]['total_count'] += 1
for source_doc_content in entry['source_documents_content']:
source_file = next((doc['metadata']['source'] for doc in db._collection.get(ids=entry['source_documents_ids'], include=['metadatas'])['metadatas'] if source_doc_content in doc.values()), "unknown_source")
problematic_sources.setdefault(source_file, {'incorrect_count': 0, 'total_count': 0})
problematic_sources[source_file]['total_count'] += 1
print("n--- Feedback Analysis ---")
print("Potentially problematic chunks (incorrect_rate > threshold):")
for chunk_id, data in problematic_chunks.items():
if data['total_count'] > 0:
incorrect_rate = data['incorrect_count'] / data['total_count']
if incorrect_rate > threshold:
print(f" Chunk ID: {chunk_id}, Incorrect Rate: {incorrect_rate:.2f} ({data['incorrect_count']}/{data['total_count']})")
# Action: Trigger re-evaluation or mark for deletion/update
print("nPotentially problematic sources (incorrect_rate > threshold):")
for source_file, data in problematic_sources.items():
if data['total_count'] > 0:
incorrect_rate = data['incorrect_count'] / data['total_count']
if incorrect_rate > threshold:
print(f" Source File: {source_file}, Incorrect Rate: {incorrect_rate:.2f} ({data['incorrect_count']}/{data['total_count']})")
# Action: Trigger re-ingestion of the entire source file
# In a real system, you'd trigger `run_ingestion_process(source_file_path, db)`
if __name__ == "__main__":
# Assuming db and qa_agent are initialized from previous steps
db = initialize_vector_db()
qa_agent = create_rag_agent(db)
# Example query and feedback
query_feedback_1 = "doc1文档最初说了什么重要信息?"
response_feedback_1 = qa_agent.invoke({"query": query_feedback_1})
print(f"nUser: {query_feedback_1}")
print(f"Agent: {response_feedback_1['result']}")
# User says this is correct
record_feedback(query_feedback_1, response_feedback_1['result'], True, source_docs=response_feedback_1['source_documents'])
# Simulating a query where the agent might give an outdated answer
# if doc1 was updated but feedback was provided on an old query
query_feedback_2 = "doc1文档的关键更新是什么?"
response_feedback_2 = qa_agent.invoke({"query": query_feedback_2})
print(f"nUser: {query_feedback_2}")
print(f"Agent: {response_feedback_2['result']}")
# User says this is incorrect, as it missed some crucial update
record_feedback(query_feedback_2, response_feedback_2['result'], False,
user_comment="Missed the latest critical update about document 1.",
source_docs=response_feedback_2['source_documents'])
# Analyze feedback and potentially trigger actions
analyze_feedback()
这个简化的反馈系统可以作为更复杂学习循环的起点。
2. 知识更新与版本控制
持续摄取意味着知识库中的信息会不断变化。有效的更新机制和版本控制是必不可少的。
- 增量更新: 只更新发生变化的文本块。通过比较新旧内容的哈希值或修改时间戳来判断。
- 全量更新: 对于整个文档发生重大结构性变化的情况,可能需要删除旧文档的所有块,然后重新摄取新版本。
- 版本控制: 存储知识块的不同版本。这允许Agent回溯到旧版本,或在不同版本之间进行比较,尤其是在处理冲突信息时。元数据中的
version或updated_at字段至关重要。
在向量数据库中实现“更新”通常意味着:
- 根据文档ID或块ID查找旧的向量和元数据。
- 删除旧的向量和元数据。
- 对新内容进行嵌入,并作为新记录插入。
(一些向量数据库提供原生的upsert操作,可以简化这一过程。)
3. 遗忘与相关性管理
“长青”不意味着无限增长。有些信息会随着时间流逝而变得不相关或完全过时,甚至有害。 Agent需要具备“遗忘”的能力。
- 基于时间(TTL – Time To Live): 为某些类型的知识块设置过期时间。例如,某项促销活动的信息在活动结束后就应被移除或标记为低优先级。
- 基于访问频率/相关性: 不常被访问或被检索到的知识块,其权重可以降低,甚至被归档或删除。
- 冲突解决: 当新信息与现有信息冲突时(例如,政策变更),Agent需要策略来决定哪个信息是权威的(例如,最新版本优先,或需要人工裁决)。
- 定期清理: 运行后台任务,识别并清理不再相关或过时的知识块。
代码示例:模拟知识遗忘(基于TTL)
我们可以在检索时进行过滤,或者定期清理数据库。这里演示检索时过滤。
from datetime import datetime, timedelta
def retrieve_with_ttl_filter(vector_db, query, max_age_days=30, k=3):
"""
Retrieves documents from ChromaDB, filtering out documents older than max_age_days.
Assumes 'updated_at' metadata is in ISO format.
"""
earliest_valid_date = (datetime.now() - timedelta(days=max_age_days)).isoformat()
# ChromaDB's where clause for metadata filtering
# Docs: https://docs.trychroma.com/usage-guide#query-filtering
# filter = {"updated_at": {"$gte": earliest_valid_date}} # Requires 'updated_at' to be indexed as a string for comparison.
# More robust filtering might require storing numeric timestamps or using a database that supports date comparisons directly.
# For demonstration, we'll retrieve and then filter in Python (less efficient but illustrates concept)
# A better approach for production is to use a vector DB that supports numerical date filtering or proper string comparison.
all_retrieved_docs = vector_db.similarity_search(query, k=k*5) # Retrieve more than needed, then filter
filtered_docs = []
for doc in all_retrieved_docs:
updated_at_str = doc.metadata.get("updated_at")
if updated_at_str:
try:
doc_updated_at = datetime.fromisoformat(updated_at_str)
if doc_updated_at >= datetime.fromisoformat(earliest_valid_date):
filtered_docs.append(doc)
except ValueError:
print(f"Warning: Could not parse updated_at for doc {doc.metadata.get('chunk_id')}: {updated_at_str}")
# If parsing fails, decide whether to include or exclude. Default to exclude for safety.
# else: doc without updated_at, decide if it should be included (e.g., always relevant or too old)
# Return top K from filtered documents
return filtered_docs[:k]
def cleanup_old_chunks(vector_db, max_age_days=90):
"""
Periodically cleans up chunks older than max_age_days from the vector database.
This is a conceptual example, actual implementation depends on vector DB API.
"""
print(f"n--- Running Cleanup for chunks older than {max_age_days} days ---")
earliest_valid_date = (datetime.now() - timedelta(days=max_age_days))
# In Chroma, you might need to iterate or retrieve by metadata to find old IDs.
# This is not a direct API call in Chroma. For other DBs, it might be.
# For demonstration, we'll iterate through all and mark for deletion.
# NOTE: ChromaDB's public API doesn't allow direct metadata filtering for deletion.
# You'd typically need to query by ID or manually manage IDs.
# For a real system, consider a hybrid approach: store chunk_id and updated_at in a traditional DB,
# then query that DB for old chunk_ids and pass them to vector_db.delete(ids=...).
# Simulating by fetching all documents (not scalable for large DBs)
# This part is highly conceptual for Chroma due to API limitations for bulk deletion by metadata.
# A production system would manage chunk_ids and their lifecycle outside of just Chroma.
# For a proper cleanup, you'd store mapping of `chunk_id` to `updated_at` in a separate DB
# or ensure your vector DB supports efficient deletion by metadata.
print("Cleanup logic would go here. For Chroma, it requires more manual ID management.")
print("e.g., query for IDs of old chunks, then call `vector_db.delete(ids=old_chunk_ids)`")
# Example: If you knew specific old chunk IDs to remove
# old_chunk_ids_to_remove = ["doc1.txt_0", "some_other_old_chunk_id"]
# if old_chunk_ids_to_remove:
# print(f"Simulating deletion of old chunks: {old_chunk_ids_to_remove}")
# vector_db.delete(ids=old_chunk_ids_to_remove)
# vector_db.persist()
# print("Cleanup simulated.")
if __name__ == "__main__":
db = initialize_vector_db()
# Simulate an old document that should be "forgotten"
old_doc_path = os.path.join("agent_knowledge_source", "old_policy.txt")
with open(old_doc_path, "w", encoding="utf-8") as f:
f.write("This is a very old company policy from 2 years ago.")
old_chunks = extract_and_chunk(old_doc_path)
if old_chunks:
# Manually set updated_at to be very old for demonstration
for chunk in old_chunks:
chunk['metadata']['updated_at'] = (datetime.now() - timedelta(days=730)).isoformat() # 2 years ago
ingest_documents_to_chroma(old_chunks, db)
print(f"nTotal documents in DB before TTL test: {db._collection.count()}")
query_old = "旧政策是什么?"
print(f"nUser: {query_old} (expecting to find if TTL is not applied)")
retrieved_without_filter = db.similarity_search(query_old, k=1)
if retrieved_without_filter:
print(f"Retrieved without TTL filter: {retrieved_without_filter[0].page_content[:50]}...")
else:
print("Nothing retrieved without TTL filter.")
print(f"nUser: {query_old} (expecting to NOT find if TTL is applied for 30 days)")
retrieved_with_filter = retrieve_with_ttl_filter(db, query_old, max_age_days=30)
if retrieved_with_filter:
print(f"Retrieved with TTL filter (should be empty): {retrieved_with_filter[0].page_content[:50]}...")
else:
print("Nothing retrieved with TTL filter, as expected for old content.")
# Run conceptual cleanup
cleanup_old_chunks(db, max_age_days=90)
遗忘机制确保Agent的知识库不会无限膨胀,并始终保持相关性。
挑战与最佳实践
构建一个长青Agent并非易事,会面临诸多挑战:
- 数据质量与一致性: 持续摄取的数据可能包含错误、不一致或冗余信息。脏数据会导致Agent生成错误或矛盾的回答。
- 最佳实践: 实施严格的数据清洗和验证流程。利用LLM进行事实核查和去重。
- 冲突解决: 随着知识的更新,新旧信息可能存在冲突。如何判断哪个信息是权威的?
- 最佳实践: 引入知识版本控制和优先级规则(例如,最新信息优先,或来自特定来源的信息权重更高)。对于关键冲突,触发人工审核。
- 计算与存储成本: 持续的嵌入、存储和检索操作会消耗大量计算资源和存储空间。
- 最佳实践: 增量嵌入,只重新嵌入修改过的块。优化分块策略以减少块数量。使用高效的向量数据库和云服务。
- 实时性与延迟: 如何平衡知识更新的实时性与系统的稳定性、效率?
- 最佳实践: 区分不同类型信息的更新频率。关键业务数据可能需要近实时更新,而历史文档可以定期批量更新。使用异步处理和消息队列。
- 灾难性遗忘(Catastrophic Forgetting): 虽然RAG Agent不像微调LLM那样直接面临灾难性遗忘,但知识库中大量新信息可能“淹没”旧的、仍然重要的信息,导致检索困难。
- 最佳实践: 智能检索策略(例如,结合时间、主题、来源进行多维度检索)。建立知识层级,区分核心知识和易变知识。
- 安全与隐私: 私有知识库通常包含敏感信息。
- 最佳实践: 严格的访问控制、数据加密、权限管理。确保摄取管道和知识库符合数据安全和隐私法规。
- 可扩展性: 随着数据量和用户量的增长,整个系统需要能够弹性伸缩。
- 最佳实践: 使用云原生架构,利用分布式系统和可伸缩的数据库。
未来方向
长青Agent的潜力远不止于此,以下是一些值得探索的未来方向:
- 主动学习(Active Learning): Agent能够识别其知识边界,主动向用户提问或搜索特定信息来弥补知识空白。
- 多模态知识: 不仅仅是文本,Agent可以摄取和理解图像、视频、音频等多模态信息,并将其整合到知识库中。
- 知识图谱集成: 将非结构化文本转化为结构化的知识图谱,结合向量检索和图谱查询,实现更精准和推理性的回答。
- 个性化知识: 为每个用户或用户组维护一个独立的、个性化的知识库,提供更定制化的服务。
- Agent间协作: 多个长青Agent可以共享和交换知识,形成一个更强大的知识网络。
持续演进的智能体
构建一个“长青”Agent,意味着我们不再满足于静态、孤立的智能体。我们正在迈向一个能够持续学习、适应环境、并与外部世界同步演进的智能系统。它将像活的智能一样,不断地自我更新,永葆活力。这不仅是技术上的挑战,更是未来人工智能发展的重要方向。
感谢大家的聆听。