各位开发者,下午好!
今天,我们将深入探讨一个在LangChain生态系统中日益重要的工具——LangServe。在AI应用开发的浪潮中,我们经常面临一个核心挑战:如何将我们精心构建的复杂AI逻辑,特别是基于LangChain表达式语言(LCEL)构建的链,高效、稳定、便捷地转化为可供其他服务或前端调用的标准RESTful API。LangServe正是为了解决这一痛点而生。
想象一下,你已经用LCEL搭建了一个功能强大的问答系统、一个智能客服代理或一个文档摘要工具。现在,你希望你的前端应用能够调用它,或者你的其他微服务能够集成它。传统的做法可能涉及手动编写FastAPI或Flask路由,将LCEL链的输入输出适配到HTTP请求和响应,处理流式传输、错误、并发等等。这个过程不仅繁琐,而且容易出错,尤其当你的LCEL链变得复杂时。
LangServe应运而生,它旨在将这一繁琐的过程“一键”简化。它不仅仅是一个简单的HTTP包装器,更是一个智能的框架,能够自动推断你的LCEL链的输入和输出Schema,支持流式传输、批量处理、LangSmith追踪集成,并提供了一个符合RESTful原则的API接口。本质上,LangServe就是将你的LCEL链从一个Python对象,直接升级为一个生产级的、可扩展的、可观测的微服务API。
今天的讲座,我将带领大家从零开始,逐步掌握LangServe的核心概念、使用方法、高级特性以及生产部署的考量。我们将通过大量的代码示例,确保大家能够真正理解并运用LangServe来部署自己的AI应用。
I. 引言:LangServe的诞生与核心价值
A. 传统AI应用部署的挑战
在LangChain兴起之前,或者说在LangServe出现之前,将一个复杂的AI模型或逻辑部署为服务,通常会面临以下挑战:
- 接口标准化与适配:AI模型通常接受特定的数据结构作为输入,并产生特定格式的输出。将其暴露为Web API时,需要将HTTP请求的JSON或表单数据解析为模型期望的格式,并将模型输出序列化为HTTP响应。这需要大量手动的数据转换和验证代码。
- 异步与流式传输:许多生成式AI任务(如大型语言模型)的响应是流式的,即结果是逐步生成的。为了提供更好的用户体验,Web API也需要支持流式响应(如Server-Sent Events, SSE)。手动实现SSE需要深入理解HTTP协议和异步编程。
- 并发与扩展性:Web服务需要处理多个并发请求。如何高效地管理模型实例、连接池,并确保服务在负载增加时能够水平扩展,是一个复杂的系统设计问题。
- 可观测性:在生产环境中,我们需要监控API的性能、错误率,以及每个请求的内部执行路径(例如,LangChain链中的每一步)。手动集成监控和追踪工具(如LangSmith、Prometheus、Grafana)需要额外的开发工作。
- 开发效率:每一次LCEL链的修改或新功能的添加,都可能需要同步更新API接口代码,导致开发周期变长。
- 安全性:API需要认证、授权、速率限制等安全机制,这些都需要额外开发和配置。
B. LangServe的应运而生
LangServe正是为了解决上述挑战而诞生的。它充当了LangChain应用与Web服务之间的智能桥梁。通过LangServe,开发者可以将任何LCEL Runnable 对象(包括LLM调用、Prompt模板、输出解析器、工具、Agent等)直接暴露为RESTful API,而无需编写大量的样板代码。它将LangChain的强大表达能力与Web服务的易用性、可扩展性相结合。
C. LangServe的核心优势
LangServe的核心价值体现在以下几个方面:
- LCEL原生支持:LangServe深度集成LCEL,能够直接处理任何
Runnable对象,无需对链进行特殊修改。 - 自动化API生成:它能够智能地从LCEL链中推断输入和输出的
PydanticSchema,并自动生成符合OpenAPI规范的API文档(Swagger UI)。 - 全面的API功能:开箱即用支持同步(invoke)、异步(ainvoke)、流式(stream)、异步流式(astream)、批量(batch)、异步批量(abatch)等多种调用模式。
- LangSmith集成:与LangSmith无缝集成,提供端到端的请求追踪和调试能力,极大地增强了可观测性。
- 基于FastAPI:LangServe底层基于高性能的FastAPI框架构建,继承了FastAPI的诸多优点,如异步支持、数据验证、自动文档生成等。开发者也可以轻松地将LangServe路由挂载到现有的FastAPI应用中。
- 开发效率提升:将部署LCEL链所需的工作量降至最低,让开发者能更专注于AI逻辑本身。
- 生产级特性:支持认证、配置管理等,为生产环境部署提供了坚实的基础。
简而言之,LangServe将“开发”与“部署”这两个环节之间的鸿沟填平,让你的LangChain应用能够迅速投入实际使用。
II. LangChain表达式语言 (LCEL) 快速回顾
在深入LangServe之前,我们有必要快速回顾一下LangChain表达式语言(LCEL)。LCEL是LangChain的核心,它提供了一种声明式的方式来构建复杂、可组合、可流式传输的链。LangServe能够将任何LCEL Runnable 对象转化为API,因此理解LCEL是使用LangServe的基础。
A. LCEL设计哲学
LCEL的设计哲学包括:
- 可组合性 (Composability):小组件可以像乐高积木一样组合成更复杂的链。
- 流式传输 (Streaming):默认支持流式响应,提高用户体验。
- 异步支持 (Async Support):原生支持异步操作,提高并发性能。
- 可并行化 (Parallelization):链中的某些步骤可以并行执行,提高效率。
- 回退机制 (Fallback):可以定义备用路径以处理失败情况。
- 可观测性 (Observability):与LangSmith深度集成,提供详细的追踪信息。
- 输入/输出Schema:每个
Runnable都有明确的输入和输出类型,便于类型检查和API生成。
B. 构建一个简单的LCEL链
我们以一个简单的LLM调用链为例,来回顾LCEL的基本构成。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.runnables import Runnable
# 1. 定义Prompt Template
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的AI助手。"),
("user", "{input}")
])
# 2. 初始化LLM
# 确保你已经设置了OPENAI_API_KEY环境变量
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
# 3. 定义Output Parser
output_parser = StrOutputParser()
# 4. 组合链
# 使用 | 操作符将不同的Runnable组合起来
# input (str) -> prompt (ChatPromptTemplate) -> llm (ChatOpenAI) -> output_parser (StrOutputParser) -> output (str)
simple_chain: Runnable[str, str] = prompt | llm | output_parser
# 测试链
# print(simple_chain.invoke({"input": "你好,请介绍一下你自己。"}))
在这个例子中:
ChatPromptTemplate负责将用户输入格式化为LLM能理解的Prompt。ChatOpenAI是实际调用OpenAI模型的组件。StrOutputParser将LLM的输出(通常是AIMessage对象)解析为纯字符串。|操作符是LCEL的核心,它将这些独立的Runnable对象连接成一个有序的执行序列。
这个simple_chain对象就是一个标准的LCEL Runnable,它接受一个字典作为输入(其中包含input键),并返回一个字符串作为输出。LangServe将能够自动识别并将其转化为API接口。
III. 准备工作:环境搭建
在开始使用LangServe之前,我们需要安装必要的Python库,并配置好API密钥。
A. 依赖安装
# 创建并激活虚拟环境 (推荐)
python -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venvScriptsactivate # Windows
# 安装LangServe及其相关依赖
pip install "langserve[all]" langchain-openai
# "langserve[all]" 会安装LangServe自身、FastAPI、Uvicorn等所有必要组件
# langchain-openai 是为了我们使用OpenAI模型
B. API Key配置
LangServe应用通常会与LLM提供商(如OpenAI)进行交互。为了安全起见,我们通常通过环境变量来配置API密钥。
# 在你的shell中设置环境变量
export OPENAI_API_KEY="your_openai_api_key_here"
export LANGCHAIN_TRACING_V2="true" # 启用LangSmith追踪
export LANGCHAIN_API_KEY="your_langsmith_api_key_here" # LangSmith API Key
export LANGCHAIN_PROJECT="LangServe Lecture" # LangSmith项目名称
重要提示:在生产环境中,切勿将API密钥直接硬编码到代码中。使用环境变量、Vault或其他安全存储机制是最佳实践。
IV. LCEL链到REST API:LangServe的核心机制
现在,我们正式进入LangServe的核心部分:如何将一个LCEL链转化为REST API。
A. LangServe与FastAPI的结合
LangServe并不是一个独立的Web服务器框架,它是在FastAPI的基础上构建的。这意味着你将获得FastAPI的所有优点:高性能、异步支持、Pydantic驱动的数据验证、自动生成OpenAPI文档(Swagger UI和ReDoc)等。LangServe通过提供add_routes函数,使得将LCEL Runnable 挂载到FastAPI应用上变得异常简单。
B. add_routes函数详解
add_routes是LangServe提供的核心函数,用于将一个或多个Runnable对象注册到FastAPI应用上,从而生成对应的API路由。
from langserve import add_routes
from fastapi import FastAPI
# app = FastAPI()
# add_routes(
# app,
# runnable,
# path="/my_runnable_path",
# input_type=None, # 可选,用于显式定义输入Schema
# output_type=None, # 可选,用于显式定义输出Schema
# name="MyRunnable", # 可选,API名称,用于文档和LangSmith追踪
# enable_feedback=True, # 可选,是否启用反馈API
# enable_public_trace_link=False, # 可选,是否在响应中包含公共LangSmith追踪链接
# )
参数解析:
app: 一个FastAPI实例。LangServe会将路由添加到这个应用上。runnable: 你要暴露的LCELRunnable对象。path: API的基础路径,例如/my_chain。LangServe会在这个路径下生成/invoke,/stream,/batch等子路径。input_type(可选): 一个Pydantic模型或类型。如果提供了,它将用于验证API请求体,并覆盖LangServe自动推断的输入Schema。output_type(可选): 一个Pydantic模型或类型。如果提供了,它将用于定义API响应体,并覆盖LangServe自动推断的输出Schema。name(可选): 该Runnable在API文档和LangSmith追踪中显示的名称。enable_feedback(可选, 默认为True): 如果设置为True,LangServe会为该链生成一个/feedback端点,允许用户对特定的LangSmith追踪进行反馈。enable_public_trace_link(可选, 默认为False): 如果设置为True,当LangSmith追踪启用时,API响应中会包含一个指向LangSmith追踪页面的公共链接。
C. 第一个LangServe应用:一个简单的Echo链
让我们以一个简单的“Echo”链为例,来展示如何使用LangServe将其转化为API。这个链只是简单地返回它接收到的输入。
1. LCEL链定义
# echo_chain.py
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
from typing import Dict, Any
def echo_function(input_data: Dict[str, Any], config: RunnableConfig | None = None) -> Dict[str, Any]:
"""一个简单的Echo函数,返回接收到的输入。"""
# config参数是可选的,用于获取运行时配置,如回调、tags等
print(f"Received input: {input_data} with config: {config}")
return {"output": f"You said: {input_data.get('input', 'nothing')}"}
# 创建一个RunnableLambda,将Python函数包装成LCEL Runnable
echo_chain = RunnableLambda(echo_function)
这里我们创建了一个RunnableLambda,它将一个普通的Python函数封装成LCEL Runnable。这样,我们就可以像处理其他LCEL组件一样处理它。
2. LangServe服务器代码
现在,我们将这个echo_chain暴露为API。
# server.py
from fastapi import FastAPI
from langserve import add_routes
from echo_chain import echo_chain # 导入我们定义的链
app = FastAPI(
title="LangServe Echo Server",
version="1.0",
description="A simple LangServe server exposing an echo chain."
)
# 添加健康检查路由
@app.get("/health")
async def health_check():
return {"status": "ok"}
# 使用add_routes将echo_chain注册到FastAPI应用
# 路径为 "/echo"
add_routes(
app,
echo_chain,
path="/echo",
name="EchoChain",
enable_feedback=True,
enable_public_trace_link=True # 启用公共追踪链接
)
# 注意:这个文件本身不会直接运行服务器。我们将使用Uvicorn来运行。
3. 启动服务器
要运行这个FastAPI应用,我们需要使用Uvicorn。在终端中导航到server.py所在的目录,然后执行:
uvicorn server:app --reload --port 8000
server:app: 指定Uvicorn加载server.py文件中的app对象。--reload: 启用热重载,方便开发调试。--port 8000: 在8000端口启动服务器。
服务器启动后,你将看到类似以下的输出:
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [PID]
INFO: Started server process [PID]
INFO: Waiting for application startup.
INFO: Application startup complete.
现在,你的LangServe API已经在 http://127.0.0.1:8000 上运行了!
你可以访问 http://127.0.0.1:8000/docs 来查看自动生成的OpenAPI文档(Swagger UI)。你会在其中看到 /echo 相关的API端点。
4. API测试 (cURL, Python requests)
LangServe为每个Runnable自动生成了几个API端点:
POST /path/invoke: 同步调用,等待完整结果。POST /path/stream: 流式调用,通过SSE返回数据块。POST /path/batch: 批量调用,一次处理多个请求。
a. 使用cURL测试 invoke 端点
curl -X POST "http://127.0.0.1:8000/echo/invoke"
-H "Content-Type: application/json"
-d '{"input": {"input": "Hello LangServe!"}}'
预期输出:
{
"output": {
"output": "You said: Hello LangServe!"
},
"metadata": {
"run_id": "...",
"run_type": "chain",
"langsmith_url": "..." # 如果enable_public_trace_link为True
}
}
b. 使用Python requests 测试 invoke 端点
import requests
url = "http://127.0.0.1:8000/echo/invoke"
headers = {"Content-Type": "application/json"}
payload = {"input": {"input": "Hello from Python!"}}
response = requests.post(url, json=payload, headers=headers)
print(response.json())
预期输出:
{'output': {'output': 'You said: Hello from Python!'}, 'metadata': {'langsmith_url': '...', 'run_id': '...', 'run_type': 'chain'}}
通过这个简单的例子,我们已经成功地将一个LCEL Runnable 转化为一个可调用的REST API。LangServe处理了HTTP请求解析、JSON序列化、路由创建等所有细节。
V. 深入探究:高级功能与定制
LangServe不仅仅是简单的包装器,它提供了一系列高级功能来满足生产环境的需求。
A. 输入/输出Schema的自动推断与显式定义
LangServe的一大亮点是它能够自动从你的LCEL Runnable 中推断出预期的输入和输出Schema。这极大地简化了开发,因为你无需手动编写Swagger文档或Pydantic模型。
1. 自动推断的便利性
当你不为add_routes的input_type和output_type参数提供值时,LangServe会尝试分析runnable的input_schema和output_schema属性(这是LCEL Runnables的内置特性),并将其转化为OpenAPI规范。
例如,对于我们之前的simple_chain (prompt | llm | output_parser),它的input_schema可能被推断为接受一个包含input键的字典,而output_schema则是一个字符串。
2. 使用Pydantic显式定义Schema
尽管自动推断很方便,但在某些情况下,你可能希望显式地定义输入和输出Schema:
- 更强的类型安全和验证:Pydantic模型提供了强大的数据验证功能,可以确保传入的数据符合你的预期格式、类型和约束(例如,字符串长度、数值范围)。
- 清晰的API文档:显式定义Schema可以使API文档更加清晰、准确,便于其他开发者理解和使用。
- 复杂数据结构:当输入或输出包含嵌套对象、列表或自定义枚举类型时,Pydantic模型能够更好地表达这些复杂结构。
- 兼容性:确保客户端和服务端对数据结构有统一的理解,避免潜在的解析错误。
a. 为什么需要显式定义?
考虑一个场景,你的链接受一个用户请求,其中包含用户的姓名和年龄。自动推断可能只会将其识别为一个任意字典。但如果你用Pydantic定义,你可以强制要求姓名是字符串,年龄是整数且大于0。
b. 定义输入Schema
from pydantic import BaseModel, Field
class GreetingInput(BaseModel):
name: str = Field(description="The name of the person to greet.")
age: int = Field(ge=0, description="The age of the person to greet.") # ge=0表示大于等于0
c. 定义输出Schema
class GreetingOutput(BaseModel):
message: str = Field(description="The personalized greeting message.")
timestamp: str = Field(description="The time the greeting was generated.")
d. 结合LangServe
在add_routes中,你可以直接将这些Pydantic模型作为input_type和output_type参数传递。
3. 实践:一个带有自定义Schema的问候链
让我们创建一个问候链,它接受GreetingInput并返回GreetingOutput。
# greeting_chain.py
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Dict, Any
# 定义输入/输出Schema (与上述相同)
class GreetingInput(BaseModel):
name: str = Field(description="The name of the person to greet.")
age: int = Field(ge=0, description="The age of the person to greet.")
class GreetingOutput(BaseModel):
message: str = Field(description="The personalized greeting message.")
timestamp: str = Field(description="The time the greeting was generated.")
# 问候函数
def generate_greeting(input_data: GreetingInput, config: RunnableConfig | None = None) -> GreetingOutput:
"""生成个性化问候语"""
current_time = datetime.now().isoformat()
message = f"Hello, {input_data.name}! You are {input_data.age} years old. Nice to meet you."
print(f"Generating greeting for {input_data.name} (age: {input_data.age})")
return GreetingOutput(message=message, timestamp=current_time)
# 包装成RunnableLambda
greeting_chain = RunnableLambda(generate_greeting).with_types(
input_type=GreetingInput,
output_type=GreetingOutput
)
# .with_types() 是LCEL的一种方式,用于显式地为Runnable指定其输入和输出类型,
# 这有助于LangServe进行更准确的Schema推断,或者在没有明确指定input_type/output_type时提供Fallback。
更新 server.py:
# server.py (新增部分)
from greeting_chain import greeting_chain, GreetingInput, GreetingOutput # 导入
# ... (FastAPI app 和 health_check 保持不变) ...
add_routes(
app,
greeting_chain,
path="/greet",
name="GreetingChain",
input_type=GreetingInput, # 显式指定输入Schema
output_type=GreetingOutput # 显式指定输出Schema
)
# ... (其他 add_routes 保持不变) ...
重启Uvicorn服务器,访问 http://127.0.0.1:8000/docs。你会发现 /greet 接口的输入和输出Schema现在非常清晰,与我们定义的GreetingInput和GreetingOutput模型完全一致。
测试 /greet/invoke 端点
curl -X POST "http://127.0.0.1:8000/greet/invoke"
-H "Content-Type: application/json"
-d '{"input": {"name": "Alice", "age": 30}}'
预期输出:
{
"output": {
"message": "Hello, Alice! You are 30 years old. Nice to meet you.",
"timestamp": "2023-10-27T10:30:00.123456" # 实际时间
},
"metadata": {
"run_id": "...",
"run_type": "chain",
"langsmith_url": "..."
}
}
如果你尝试发送无效数据(例如,"age": -5),FastAPI会自动返回一个422 Unprocessable Entity错误,因为Pydantic模型会进行验证。
B. 实时数据流 (Streaming) 的支持
对于生成式AI任务,流式传输响应(Server-Sent Events, SSE)是提升用户体验的关键。用户可以立即看到部分结果,而不是等待整个响应生成完毕。LangServe对LCEL的流式能力提供了原生支持。
1. 流式传输的必要性
- 更好的用户体验:用户无需长时间等待,可以实时看到AI的输出,尤其在LLM生成长文本时。
- 感知性能提升:即使总响应时间不变,用户也会觉得应用更快。
- 资源效率:可以减少客户端和服务器之间的长连接占用。
2. LangServe如何处理Streaming
当你的LCEL链支持流式传输(几乎所有LangChain的核心Runnable都支持),LangServe会自动为 /path/stream 端点提供SSE流式响应。这意味着,你只需构建一个可流式传输的LCEL链,LangServe就会自动处理HTTP层面的SSE协议。
3. 客户端如何消费SSE流
客户端(无论是浏览器中的JavaScript还是后端Python服务)需要使用支持SSE的库来消费这些流。对于Python,requests-sse或httpx等库可以处理。
4. 实践:一个流式响应的LLM链
我们使用之前定义的simple_chain,它包含一个ChatOpenAI LLM,该LLM天然支持流式输出。
# server.py (新增部分)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.runnables import Runnable
# ... (FastAPI app, health_check, echo_chain, greeting_chain 保持不变) ...
# 重新定义simple_chain,确保llm是流式的
stream_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, streaming=True) # 明确开启streaming
stream_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个富有创意的AI助手。请用详细的语言描述一个奇幻森林的场景,至少200字。"),
("user", "{topic}")
])
stream_parser = StrOutputParser()
streaming_llm_chain: Runnable[str, str] = stream_prompt | stream_llm | stream_parser
add_routes(
app,
streaming_llm_chain,
path="/stream_llm",
name="StreamingLLMChain",
input_type=type({"topic": ""}), # 简单字典输入
output_type=str # 字符串输出
)
重启Uvicorn服务器。
测试 /stream_llm/stream 端点
使用cURL测试流式响应:
curl -N -X POST "http://127.0.0.1:8000/stream_llm/stream"
-H "Content-Type: application/json"
-d '{"input": {"topic": "神秘的森林"}}'
-N 选项告诉cURL不要缓冲输出,直接显示流式内容。你将看到数据以 data: {...} 的SSE格式逐步输出,直到整个响应完成。
使用Python requests 模拟流式消费:
由于requests库本身不支持SSE,我们可以使用httpx或requests-sse,或者手动解析。这里以httpx为例:
import httpx
import json
url = "http://127.0.0.1:8000/stream_llm/stream"
headers = {"Content-Type": "application/json"}
payload = {"input": {"topic": "神秘的森林"}}
with httpx.stream("POST", url, json=payload, headers=headers, timeout=None) as response:
response.raise_for_status()
for chunk in response.iter_bytes():
try:
# SSE通常是 `data: <json_string>nn` 格式
chunk_str = chunk.decode("utf-8")
if chunk_str.startswith("data:"):
json_data = chunk_str[len("data:"):].strip()
if json_data: # 确保不是空行
try:
data = json.loads(json_data)
# 打印流式输出的每个块
# 不同的Runnable会以不同的事件类型发送
if data.get("event") == "data" and "chunk" in data.get("data", {}):
print(data["data"]["chunk"], end="", flush=True)
elif data.get("event") == "end":
print("n--- Stream Ended ---")
break
elif data.get("event") == "error":
print(f"n--- Stream Error: {data.get('data')} ---")
break
except json.JSONDecodeError:
# 可能是非JSON的SSE注释或空行
pass
except UnicodeDecodeError:
pass # 忽略无法解码的字节
你将看到文本逐字或逐句地打印出来,模拟了实时生成的效果。
C. 批量处理 (Batching) 的优化
当你有多个独立的输入需要通过同一个链处理时,批量处理可以显著提高效率,尤其是在LLM调用中,可以减少网络往返时间。
1. 批量处理的场景与优势
- 数据预处理:对大量文本进行摘要、分类或实体提取。
- 报告生成:为多个实体生成报告片段。
- 效率提升:减少网络延迟,特别是当模型推理时间远小于网络往返时间时。
- 资源利用:某些模型在批量处理时能更有效地利用GPU等硬件资源。
2. LangServe的批量接口
LangServe为每个Runnable自动生成了 /path/batch 端点。这个端点接受一个包含多个输入对象的JSON数组,并返回一个包含对应输出的JSON数组。
3. 实践:批量处理多个请求
我们可以使用之前的greeting_chain来演示批量处理。
测试 /greet/batch 端点
curl -X POST "http://127.0.0.1:8000/greet/batch"
-H "Content-Type: application/json"
-d '[{"input": {"name": "Charlie", "age": 25}}, {"input": {"name": "Diana", "age": 35}}]'
预期输出:
[
{
"output": {
"message": "Hello, Charlie! You are 25 years old. Nice to meet you.",
"timestamp": "2023-10-27T10:35:00.123456"
},
"metadata": {
"run_id": "...",
"run_type": "chain",
"langsmith_url": "..."
}
},
{
"output": {
"message": "Hello, Diana! You are 35 years old. Nice to meet you.",
"timestamp": "2023-10-27T10:35:00.789012"
},
"metadata": {
"run_id": "...",
"run_type": "chain",
"langsmith_url": "..."
}
}
]
可以看到,一个请求发送了两个输入,并收到了两个对应的输出。
D. 运行时配置 (Configuration) 与可观测性
LangServe与LangChain的可观测性工具LangSmith深度集成,提供了强大的追踪和调试能力。
1. config参数的作用
在LangChain中,Runnable的invoke、stream、batch方法都接受一个可选的config参数。这个参数是一个RunnableConfig对象,可以用来传递运行时配置,例如:
callbacks: 用于自定义事件处理(如打印日志、集成自定义监控)。tags: 为追踪添加标签,方便在LangSmith中过滤。metadata: 附加的元数据。configurable: 用于动态配置Runnable内部组件(高级用法)。
LangServe允许客户端通过API请求体中的config字段来传递这些配置。
例如,一个请求可以这样发送:
{
"input": {"name": "Eve", "age": 28},
"config": {
"tags": ["prod_test", "greeting_api"],
"metadata": {"user_id": "user123"}
}
}
2. LangSmith集成与Tracing
LangSmith是LangChain的官方可观测性平台,用于调试、测试、评估和监控LLM应用。LangServe与LangSmith的集成是开箱即用的。
a. 设置LangSmith环境变量
确保你的环境变量已设置:
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_API_KEY="your_langsmith_api_key_here"
export LANGCHAIN_PROJECT="LangServe Lecture"
b. 跟踪请求
一旦这些环境变量设置好,并且你的LangServe应用正在运行,每一个通过LangServe API发起的请求都会自动在LangSmith中创建一个新的追踪(trace)。你可以在LangSmith UI中看到请求的输入、输出、链的内部执行步骤、每个步骤的耗时,以及任何错误信息。
c. 公开追踪链接
通过在add_routes中设置enable_public_trace_link=True,LangServe会在每个API响应的metadata字段中包含一个langsmith_url。这个URL可以直接跳转到LangSmith中对应的追踪页面,非常方便调试和分享。
3. 实践:带LangSmith跟踪的链
我们之前已经为echo_chain和greeting_chain设置了enable_public_trace_link=True。
当你调用 POST /echo/invoke 或 POST /greet/invoke 时,响应中会包含 langsmith_url。点击这个URL,你就可以在LangSmith中查看这次调用的详细追踪信息。
在LangSmith UI中,你可以看到:
- 整个链的执行流程图。
- 每个
Runnable的输入和输出。 - 每个步骤的持续时间。
- 任何错误或警告。
- 通过
config传递的tags和metadata。
这种深度集成使得调试和优化复杂的LangChain应用变得前所未有的简单。
E. 自定义FastAPI应用
由于LangServe是构建在FastAPI之上的,因此你可以完全利用FastAPI的强大功能来定制你的服务器。
1. 挂载LangServe路由到现有FastAPI应用
如果你已经有一个现有的FastAPI应用,你可以很轻松地将LangServe的路由挂载到它上面,而不是创建一个全新的LangServe应用。
# existing_app_server.py
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from langserve import add_routes
from echo_chain import echo_chain # 导入你的链
# 创建一个现有的FastAPI应用
app = FastAPI(
title="My Existing FastAPI App",
version="1.0",
description="An app with existing routes and a LangServe endpoint."
)
@app.get("/")
async def read_root():
return {"message": "Welcome to my existing app!"}
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id, "data": f"Item {item_id} details"}
# 将LangServe路由挂载到 /langchain_api 路径下
add_routes(
app,
echo_chain,
path="/langchain_api/echo", # 注意路径前缀
name="ExistingAppEchoChain"
)
# 运行:uvicorn existing_app_server:app --reload
现在,你可以同时访问 /、/items/{item_id} 和 /langchain_api/echo/invoke 等端点。
2. 添加中间件 (Middleware)
FastAPI中间件允许你在请求被路由处理之前或之后执行代码,常用于日志记录、认证、CORS处理等。
# server.py (新增中间件示例)
# ... (所有导入和 add_routes 保持不变) ...
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 自定义日志中间件
class LogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info(f"Request: {request.method} {request.url.path} | Status: {response.status_code} | Time: {process_time:.4f}s")
return response
app.add_middleware(LogMiddleware)
# ... (运行Uvicorn) ...
重启服务器并发送请求,你会在控制台看到每次请求的日志信息。
3. 添加依赖注入 (Dependency Injection)
FastAPI的依赖注入系统允许你声明请求处理函数所需的依赖项,FastAPI会自动解析并注入它们。这对于数据库连接、认证、获取用户会话等非常有用。
# server.py (新增依赖注入示例)
# ... (所有导入和 add_routes 保持不变) ...
from fastapi import Depends, HTTPException, Header
# 简单的认证依赖
async def get_current_user(x_api_key: str = Header(...)):
if x_api_key != "super-secret-key":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API Key")
return {"username": "authenticated_user"}
@app.get("/secure_info", dependencies=[Depends(get_current_user)])
async def get_secure_info():
return {"data": "This is highly classified information."}
# 如果你想让LangServe的路由也受此保护,你需要将认证逻辑集成到LangServe的配置中,
# 或在FastAPI层面使用`app.include_router`并应用依赖。
# 对于LangServe的路由,更常见的是在add_routes之后,通过FastAPI的Router或Middleware进行保护。
# 例如,可以创建一个FastAPI Router,然后将LangServe的链添加到该Router,再将Router添加到主app。
# 示例:将LangServe路由包含在需要认证的子路由中
# from fastapi import APIRouter
# secure_router = APIRouter(dependencies=[Depends(get_current_user)])
# add_routes(
# secure_router,
# echo_chain,
# path="/echo",
# name="SecuredEchoChain"
# )
# app.include_router(secure_router, prefix="/api/v1/secure")
F. 错误处理
当LCEL链内部发生错误时,LangServe会将其捕获并以标准的HTTP错误响应返回给客户端。
- HTTP 500 Internal Server Error:如果链执行过程中发生未捕获的异常(例如,LLM API调用失败,或自定义Runnable中存在Bug),LangServe会返回500错误。响应体通常会包含一个
detail字段,提供错误的简要描述。 - HTTP 422 Unprocessable Entity:如果客户端发送的请求数据不符合预期的
input_typeSchema(例如,缺少必填字段,或字段类型不匹配),Pydantic会自动捕获并返回422错误。响应体通常会包含详细的验证错误信息。
示例:模拟一个链内错误
# error_chain.py
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
from typing import Dict, Any
def buggy_function(input_data: Dict[str, Any], config: RunnableConfig | None = None) -> Dict[str, Any]:
"""一个会故意抛出错误的函数。"""
if input_data.get("trigger_error"):
raise ValueError("This is a simulated internal chain error!")
return {"status": "success", "input": input_data}
buggy_chain = RunnableLambda(buggy_function)
更新 server.py:
# server.py (新增部分)
from error_chain import buggy_chain # 导入
# ... (所有导入和 add_routes 保持不变) ...
add_routes(
app,
buggy_chain,
path="/buggy",
name="BuggyChain"
)
重启服务器。
测试 /buggy/invoke 端点
成功调用:
curl -X POST "http://127.0.0.1:8000/buggy/invoke"
-H "Content-Type: application/json"
-d '{"input": {"message": "Hello"}}'
预期输出:
{
"output": {
"status": "success",
"input": {
"message": "Hello"
}
},
"metadata": {
"run_id": "...",
"run_type": "chain",
"langsmith_url": "..."
}
}
触发错误:
curl -X POST "http://127.0.0.1:8000/buggy/invoke"
-H "Content-Type: application/json"
-d '{"input": {"trigger_error": true}}'
预期输出 (HTTP 500):
{
"detail": "Internal Server Error"
}
同时,服务器控制台会打印详细的Python追踪信息。如果你启用了LangSmith追踪,LangSmith中也会记录这次错误,并显示完整的堆栈信息。
VI. LangServe客户端:无缝交互
LangServe不仅提供了服务端框架,还提供了一个方便的Python客户端工具——RemoteRunnable,用于从另一个Python应用中无缝地与LangServe API进行交互。
A. RemoteRunnable的诞生
RemoteRunnable是LangServe客户端库的核心。它是一个特殊的LCEL Runnable,其行为就像一个本地的LCEL链,但实际上它会将所有调用(invoke, stream, batch等)转化为HTTP请求发送到远程的LangServe API。
这使得客户端代码可以像调用本地LCEL链一样调用远程API,而无需手动编写HTTP请求、解析JSON响应、处理流式数据等。
B. 基本用法
实例化RemoteRunnable时,你需要提供LangServe API的基URL。
from langserve import RemoteRunnable
# 假设你的EchoChain部署在 /echo 路径下
remote_echo_chain = RemoteRunnable("http://127.0.0.1:8000/echo/")
# 像调用本地LCEL链一样调用
result = remote_echo_chain.invoke({"input": "Hello from RemoteRunnable!"})
print(result)
RemoteRunnable会自动处理 /invoke、/stream、/batch 等子路径的拼接。
C. 远程调用链的Streaming与Batching
RemoteRunnable同样支持stream和batch方法,它会将这些调用转换为对应的LangServe API端点请求。
流式调用:
# 假设你的StreamingLLMChain部署在 /stream_llm 路径下
remote_streaming_llm_chain = RemoteRunnable("http://127.0.0.1:8000/stream_llm/")
print("Streaming from remote LLM chain:")
for chunk in remote_streaming_llm_chain.stream({"topic": "未来城市"}):
print(chunk, end="", flush=True)
print("n--- Stream Finished ---")
批量调用:
# 假设你的GreetingChain部署在 /greet 路径下
remote_greeting_chain = RemoteRunnable("http://127.0.0.1:8000/greet/")
batch_inputs = [
{"name": "Frank", "age": 40},
{"name": "Grace", "age": 22}
]
batch_results = remote_greeting_chain.batch(batch_inputs)
for res in batch_results:
print(res)
D. 实践:Python客户端调用LangServe API
让我们创建一个独立的Python脚本来演示RemoteRunnable的用法。
# client.py
from langserve import RemoteRunnable
import json
import os
# 确保LangSmith环境变量在客户端也设置,以便追踪客户端调用
# export LANGCHAIN_TRACING_V2="true"
# export LANGCHAIN_API_KEY="your_langsmith_api_key_here"
# export LANGCHAIN_PROJECT="LangServe Client Demo" # 不同的项目,便于区分
base_url = "http://127.0.0.1:8000"
print("--- Testing Remote Echo Chain (invoke) ---")
remote_echo_chain = RemoteRunnable(f"{base_url}/echo/")
echo_result = remote_echo_chain.invoke({"input": "Hello Remote Echo!"})
print(f"Echo Result: {echo_result}")
print("-" * 40)
print("--- Testing Remote Greeting Chain (invoke with explicit input) ---")
# 对于带有显式Pydantic Input Type的链,RemoteRunnable会尝试验证输入
remote_greeting_chain = RemoteRunnable(f"{base_url}/greet/")
greeting_input = {"name": "Henry", "age": 50}
greeting_result = remote_greeting_chain.invoke(greeting_input) # 直接传递Pydantic模型或字典
print(f"Greeting Result: {greeting_result}")
print("-" * 40)
print("--- Testing Remote Streaming LLM Chain (stream) ---")
remote_streaming_llm_chain = RemoteRunnable(f"{base_url}/stream_llm/")
print("Streaming response:")
full_stream_output = ""
for s_chunk in remote_streaming_llm_chain.stream({"topic": "一座古老城堡的传说"}):
# stream() 返回的是LangChain的BaseMessageChunk,需要进一步处理
# 对于StrOutputParser,它通常是 str 类型
print(s_chunk, end="", flush=True)
full_stream_output += str(s_chunk)
print(f"nFull Stream Output Length: {len(full_stream_output)}")
print("-" * 40)
print("--- Testing Remote Greeting Chain (batch) ---")
batch_inputs_greet = [
{"name": "Ivy", "age": 28},
{"name": "Jack", "age": 45},
{"name": "Karen", "age": 33}
]
batch_results_greet = remote_greeting_chain.batch(batch_inputs_greet)
print("Batch Greeting Results:")
for b_res in batch_results_greet:
print(f"- {b_res.get('message', '')}") # 直接访问output字典中的键
print("-" * 40)
# 演示如何传递config参数
print("--- Testing Remote Echo Chain with Config (invoke) ---")
echo_with_config_result = remote_echo_chain.invoke(
{"input": "Config Test"},
config={"tags": ["client_test", "custom_tag"], "metadata": {"client_id": "cli001"}}
)
print(f"Echo with Config Result: {echo_with_config_result}")
if 'langsmith_url' in echo_with_config_result.get('metadata', {}):
print(f"LangSmith Trace: {echo_with_config_result['metadata']['langsmith_url']}")
print("-" * 40)
运行 python client.py,你将看到客户端与服务器的交互结果,并且如果LangSmith已配置,LangSmith UI中也会出现对应的追踪记录。
VII. 生产部署考量
将LangServe应用从开发环境推向生产,需要考虑性能、可扩展性、可靠性和安全性。
A. Docker化
Docker是部署Web应用的标准方式,它提供了一个一致的运行环境,解决了“在我机器上能跑”的问题。
1. Dockerfile编写
# Dockerfile
# 使用官方Python基础镜像
FROM python:3.11-slim-buster
# 设置工作目录
WORKDIR /app
# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口 (LangServe默认运行在8000)
EXPOSE 8000
# 启动命令:使用Gunicorn和Uvicorn Worker
# Gunicorn管理worker进程,Uvicorn提供ASGI服务器
# --bind 0.0.0.0:8000 绑定到所有网络接口
# --workers 4 根据CPU核心数调整,通常 2 * cores + 1
# --worker-class uvicorn.workers.UvicornWorker 使用Uvicorn作为Gunicorn的worker
# server:app 指定FastAPI应用的入口
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "server:app"]
requirements.txt 示例:
langserve[all]
langchain-openai
pydantic
python-dotenv # 如果你在本地使用.env文件加载环境变量
httpx # 如果你的客户端代码也在此容器中
2. 构建与运行容器
在 Dockerfile 所在的目录执行:
# 构建镜像
docker build -t langserve-app .
# 运行容器 (将OpenAI API Key作为环境变量传递)
docker run -d -p 8000:8000
-e OPENAI_API_KEY="your_openai_api_key"
-e LANGCHAIN_TRACING_V2="true"
-e LANGCHAIN_API_KEY="your_langsmith_api_key"
-e LANGCHAIN_PROJECT="LangServe Production"
--name my-langserve-instance
langserve-app
现在你的LangServe应用就在Docker容器中运行了,并且可以通过宿主机的8000端口访问。
B. 部署策略
1. Gunicorn + Uvicorn
这是部署FastAPI(因此也包括LangServe)应用最推荐的方式。
- Gunicorn: 作为一个HTTP服务器,管理多个Uvicorn worker进程。它处理请求分发、进程管理、日志记录等。
- Uvicorn: 作为一个ASGI服务器,负责处理实际的HTTP请求和响应。
这种组合提供了进程级别的并发,提高了应用的稳定性和吞吐量。
2. Kubernetes/云服务平台
对于更复杂的部署,你可以将Docker镜像部署到Kubernetes集群或各种云服务平台(如AWS ECS/EKS, Google Cloud Run/GKE, Azure App Service/AKS)。
- Kubernetes: 提供自动扩缩容、负载均衡、服务发现、滚动更新等高级功能。
- Cloud Run (GCP): 无服务器容器平台,非常适合LangServe这种API服务,可以根据请求量自动扩缩容到零。
- AWS ECS/EKS: 容器编排服务,提供高度可控的部署环境。
C. 安全性
在生产环境中,API的安全性至关重要。
1. API Key认证
你可以使用FastAPI的依赖注入系统来实现API Key认证。在get_current_user这样的依赖函数中,验证请求头中的X-API-Key或自定义认证逻辑。
2. HTTPS
所有生产API都应该通过HTTPS提供服务,以加密客户端和服务器之间的数据传输,防止窃听和篡改。这通常通过负载均衡器(如Nginx、HAProxy、云服务商的LB)或API网关来配置。
3. 速率限制
为了防止滥用和DDoS攻击,你应该为API实施速率限制。FastAPI生态系统中有一些库(如fastapi-limiter)可以帮助你实现这一功能,或者你可以在API网关层面进行配置。
4. 敏感信息管理
API密钥、数据库凭证等敏感信息绝不能硬编码。使用环境变量、Kubernetes Secrets、AWS Secrets Manager、Azure Key Vault等安全存储机制。
VIII. 综合案例:一个简单的RAG链的LangServe化
为了更好地展示LangServe处理复杂LCEL链的能力,我们来构建一个简单的检索增强生成(RAG)链,并将其暴露为LangServe API。
这个RAG链将包含以下组件:
- 嵌入模型:将文档和查询转化为向量。
- 向量存储:存储文档向量,并根据查询进行检索。
- 检索器:从向量存储中获取相关文档。
- Prompt模板:结合查询和检索到的文档构建LLM的Prompt。
- LLM:生成最终答案。
为了简化,我们将使用内存中的Chroma作为向量存储,并预加载一些文本。
# rag_chain_server.py
from fastapi import FastAPI
from langserve import add_routes
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.documents import Document
from typing import List, Dict, Any
import os
# 确保环境变量已设置
# OPENAI_API_KEY
# LANGCHAIN_TRACING_V2, LANGCHAIN_API_KEY, LANGCHAIN_PROJECT
# 1. 初始化嵌入模型
embeddings = OpenAIEmbeddings()
# 2. 准备一些示例文档
docs = [
Document(page_content="地球是太阳系中的第三颗行星。", metadata={"source": "wiki"}),
Document(page_content="月球是地球唯一的天然卫星。", metadata={"source": "wiki"}),
Document(page_content="太阳是太阳系的中心恒星。", metadata={"source": "wiki"}),
Document(page_content="水星是太阳系中最小的行星。", metadata={"source": "wiki"}),
Document(page_content="火星常被称为红色星球。", metadata={"source": "wiki"}),
Document(page_content="人类已经登陆过月球。", metadata={"source": "space_history"}),
Document(page_content="木星是太阳系中最大的行星。", metadata={"source": "wiki"}),
Document(page_content="土星以其壮丽的光环而闻名。", metadata={"source": "wiki"}),
]
# 3. 创建内存向量存储并添加文档
vectorstore = Chroma.from_documents(docs, embeddings)
# 4. 创建检索器
retriever = vectorstore.as_retriever()
# 5. 定义Prompt模板
rag_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个问答助手。请根据提供的上下文回答问题。如果不知道答案,请说明。"),
("user", "上下文: {context}n问题: {question}")
])
# 6. 初始化LLM
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.0, streaming=True) # 启用流式传输
# 7. 定义输出解析器
output_parser = StrOutputParser()
# 8. 组合RAG链
# 定义一个函数,将检索到的文档格式化为字符串
def format_docs(docs: List[Document]) -> str:
return "nn".join(doc.page_content for doc in docs)
rag_chain = (
RunnablePassthrough.assign(context=(lambda x: x["question"]) | retriever | format_docs)
| rag_prompt
| llm
| output_parser
)
# -------------------------------------------------------------------------
# LangServe 应用设置
app = FastAPI(
title="LangServe RAG Server",
version="1.0",
description="A LangServe server exposing a simple RAG chain."
)
@app.get("/health")
async def health_check():
return {"status": "ok"}
# 将RAG链暴露为LangServe API
add_routes(
app,
rag_chain,
path="/rag",
name="RAGChain",
input_type=type({"question": ""}), # 简单字典输入
output_type=str, # 字符串输出
enable_public_trace_link=True,
enable_feedback=True
)
# 运行:uvicorn rag_chain_server:app --reload --port 8000
客户端调用与测试
启动服务器 (uvicorn rag_chain_server:app --reload --port 8000) 后,我们可以使用 RemoteRunnable 进行测试。
# rag_client.py
from langserve import RemoteRunnable
import os
# 确保LangSmith环境变量设置,以便追踪
# export LANGCHAIN_TRACING_V2="true"
# export LANGCHAIN_API_KEY="your_langsmith_api_key_here"
# export LANGCHAIN_PROJECT="LangServe RAG Client"
base_url = "http://127.0.0.1:8000"
remote_rag_chain = RemoteRunnable(f"{base_url}/rag/")
print("--- Testing RAG Chain (invoke) ---")
question_invoke = "地球最大的行星是什么?"
rag_result_invoke = remote_rag_chain.invoke({"question": question_invoke})
print(f"Question: {question_invoke}")
print(f"Answer: {rag_result_invoke}")
if 'langsmith_url' in rag_result_invoke.get('metadata', {}):
print(f"LangSmith Trace: {rag_result_invoke['metadata']['langsmith_url']}")
print("-" * 40)
print("--- Testing RAG Chain (stream) ---")
question_stream = "土星有什么特点?"
print(f"Question: {question_stream}")
print("Streaming Answer:")
full_stream_output = ""
for s_chunk in remote_rag_chain.stream({"question": question_stream}):
print(s_chunk, end="", flush=True)
full_stream_output += str(s_chunk)
print(f"nFull Stream Output: {full_stream_output}")
print("-" * 40)
print("--- Testing RAG Chain (batch) ---")
batch_questions = [
{"question": "月球是地球的什么?"},
{"question": "火星为什么是红色的?"},
{"question": "谁登陆过月球?"}
]
print("Batch Questions:")
for q in batch_questions:
print(f"- {q['question']}")
batch_rag_results = remote_rag_chain.batch(batch_questions)
print("Batch Answers:")
for i, res in enumerate(batch_rag_results):
print(f"{i+1}. {res}")
print("-" * 40)
运行python rag_client.py,你将看到RAG链通过LangServe API提供的各种调用模式(invoke, stream, batch)的强大功能。LangServe在后台无缝地将LCEL链的复杂性抽象为标准的Web API,极大地简化了部署和客户端集成。
IX. 展望与总结
LangServe作为LangChain生态系统中的重要一环,极大地简化了将LangChain表达式语言(LCEL)构建的复杂AI链转化为符合RESTful标准的API接口的过程。它通过智能的Schema推断、对流式传输和批量处理的原生支持、与FastAPI的深度整合以及LangSmith的无缝集成,为开发者提供了一个高效、可扩展且可观测的部署解决方案。
无论是简单的LLM调用,还是复杂的RAG系统或Agent,LangServe都能够将它们“一键”转化为生产级的微服务。这不仅加速了AI应用的开发周期,更重要的是,它降低了AI系统投入实际使用的门槛,使得开发者能够将更多精力集中在AI逻辑本身,而非繁琐的部署细节。随着LangChain和LangServe的不断演进,我们可以预见,构建和部署智能应用将变得更加快速和便捷。