深入 `vLLM` 加速:利用 PagedAttention 实现 LangChain 应用的 10 倍并发吞吐提升

引言:大型语言模型推理的性能瓶颈与挑战

大型语言模型(LLMs)的爆发式发展,已经彻底改变了我们与机器交互的方式。从智能客服到代码生成,从内容创作到知识问答,LLMs正在以前所未有的速度渗透到各个行业和应用场景中。随之而来的是一个严峻的挑战:如何高效、经济地提供LLM推理服务,尤其是在面对高并发请求时。

传统的LLM推理服务模式,往往面临着以下几个核心问题:

  1. 高延迟与低吞吐:每次请求都需要完整的模型推理,即使是小批量处理,也难以充分利用GPU资源。在并发场景下,请求通常需要排队等待,导致平均延迟飙升,系统吞吐量受限。
  2. GPU内存浪费:LLMs在推理过程中会生成并存储大量的键值缓存(KV Cache),用于加速后续token的生成。传统方法通常为每个请求分配一块连续且固定的内存区域来存储KV Cache。然而,由于请求的序列长度是动态变化的,这种预分配策略会导致严重的内存碎片化和GPU内存利用率低下。当批次中存在短序列时,预留给长序列的内存区域大部分时间处于空闲状态;当所有序列都较长时,又可能因内存不足而无法批处理更多请求。
  3. 调度效率低下:传统的批处理(Batching)策略,无论是静态批处理还是朴素的动态批处理,都难以有效处理长度差异巨大的并发请求。静态批处理需要对短序列进行填充(padding),这不仅浪费计算资源,还可能引入不必要的延迟。而简单的动态批处理虽然减少了填充,但如果仍采用连续内存分配,KV Cache的碎片化问题依然存在,阻碍了更大批次的形成。

这些问题共同限制了LLM应用的规模化部署和用户体验。特别是对于LangChain这类需要与LLM进行多次交互、链式调用的应用,每一次LLM调用的效率都至关重要。本文将深入探讨vLLM框架,特别是其核心技术——PagedAttention,如何从根本上解决这些挑战,并展示如何利用它为LangChain应用带来高达10倍的并发吞吐提升。

vLLM简介:高性能LLM推理框架的崛起

vLLM 是一个专为LLM推理设计的高吞吐量和服务框架。它由UC Berkeley的LMSYS团队开发,旨在解决现有LLM推理服务方案的效率瓶颈。vLLM 的核心创新在于其独特的KV Cache管理机制——PagedAttention,以及一套高效的请求调度器。

与HuggingFace transformers 库中直接的模型推理管道(pipeline)相比,vLLM 提供了一个更优化的路径:

  • HuggingFace transformers pipeline:简单易用,但主要关注单次推理,对于高并发场景下的资源调度和KV Cache管理优化不足。它通常采用PyTorch的 eager 模式或编译模式,但底层KV Cache的内存分配仍是连续的。
  • Text Generation Inference (TGI):由HuggingFace开发,是一个专用的LLM服务解决方案,也进行了一些优化,例如flash attention、quantization等。但它在KV Cache管理上与vLLM的PagedAttention仍有本质区别。

vLLM 的出现,标志着LLM服务进入了一个新的阶段。它通过以下几个核心优势,显著提升了LLM推理的效率:

  1. 突破性的KV Cache管理:引入PagedAttention,将KV Cache内存以固定大小的“块”(blocks)进行管理,解决了内存碎片化问题,并实现了KV Cache的共享,极大地提高了GPU内存利用率。
  2. 高效的请求调度器vLLM 实现了“连续批处理”(Continuous Batching)机制。它不再等待整个批次完成,而是在GPU空闲时立即调度新的请求,并允许正在进行的请求在生成下一个token后立即释放GPU,从而最大化GPU的利用率。
  3. 高度优化的高性能算子vLLM 内部集成了各种高性能CUDA算子,如FlashAttention,进一步提升了计算效率。
  4. 易于使用和部署:提供了一个与OpenAI API兼容的RESTful API接口,使得现有应用能够轻松迁移,同时支持Python程序内嵌调用。

通过这些创新,vLLM 能够显著降低LLM推理的平均延迟,并大幅提高并发吞吐量,从而为LangChain等应用提供强大的后端支持。

核心技术剖析:PagedAttention如何革新KV Cache管理

要理解vLLM如何实现性能飞跃,我们必须深入了解其核心技术——PagedAttention。这项技术是解决LLM推理中KV Cache内存瓶颈的关键。

KV Cache的内存困境

在Transformer架构中,为了加速自回归生成过程,模型会缓存每个已生成token的键(Key, K)和值(Value, V)向量。这些K、V向量对被称为KV Cache。在生成每个新token时,模型只需要计算新token的K、V,并将其与之前所有token的K、V拼接起来,然后进行注意力计算。这样可以避免重复计算所有历史token的K、V。

KV Cache的内存占用
一个token的KV Cache通常由一个K向量和一个V向量组成。假设模型隐藏层维度为 d_model,注意力头数量为 num_heads,每个头的维度为 d_head = d_model / num_heads。那么,每个token的K向量和V向量的大小分别为 num_heads * d_head(即d_model)。如果使用半精度浮点数(FP16),每个分量占用2字节。
因此,每个token的KV Cache内存占用为 2 * d_model * 2 bytes = 4 * d_model bytes
对于一个序列长度为 L 的请求,其KV Cache总内存占用为 L * 4 * d_model bytes
例如,一个Llama-2 7B模型,d_model 通常为4096。那么每个token的KV Cache大约占用 4 * 4096 = 16KB。如果请求序列长度达到2048个token,则一个请求的KV Cache就占用 2048 * 16KB = 32MB。考虑到GPU显存通常有限(如24GB),这很快就会成为瓶颈。

传统方法的缺陷

  1. 静态批处理(Static Batching)与填充(Padding)
    为了提高GPU利用率,通常会将多个请求打包成一个批次进行处理。在静态批处理中,所有请求会被填充到批次中最长序列的长度。

    • 问题:填充的token不参与实际计算,但KV Cache仍需为其预留内存,造成大量内存浪费。例如,批次中有一个长度为2048的序列和9个长度为128的序列,所有10个序列都会被填充到2048。这导致9个短序列的KV Cache内存被浪费了 (2048 - 128) * 16KB * 9
    • 影响:降低了实际可处理的批次大小,限制了吞吐。
  2. 动态批处理但连续内存分配
    一些更先进的系统会尝试动态地将请求加入批次。但如果仍为每个请求分配一块连续的内存区域来存储KV Cache,问题依然存在:

    • 内存碎片化:当请求完成并释放其KV Cache内存时,会在GPU显存中留下“空洞”。新的请求可能因为找不到足够大的连续内存区域而无法加入批次,即使总空闲内存足够。这就像操作系统的内存管理,长期运行后会产生大量碎片。
    • KV Cache无法共享:在多查询注意力(MQA)或分组查询注意力(GQA)模型中,多个注意力头共享相同的K、V矩阵,但如果KV Cache是为每个请求独立连续分配的,这种共享无法在内存层面有效实现。

这些缺陷导致GPU内存利用率低下,成为限制LLM服务并发能力和吞吐量的主要瓶颈。

PagedAttention的原理

PagedAttention 的核心思想是借鉴了操作系统中的虚拟内存分页机制。在操作系统中,程序使用的虚拟地址空间被划分为固定大小的“页”(pages),物理内存被划分为相同大小的“页帧”(page frames)。虚拟页可以映射到非连续的物理页帧上,从而解决了内存碎片化问题,并实现了内存的按需分配和共享。

PagedAttention将这个思想应用到KV Cache管理上:

  1. 块(Block)的概念

    • PagedAttention将每个请求的KV Cache逻辑上划分为固定大小的“块”(Blocks)。每个块可以存储固定数量的token的K、V向量。
    • 例如,一个块可能被设计为存储16个token的KV Cache。
    • 这些块对应于GPU物理内存中的固定大小的物理块。
  2. 逻辑块与物理块的映射

    • 当一个请求到来时,vLLM 的调度器会根据其当前生成的序列长度,为其分配所需的逻辑块。
    • 这些逻辑块被映射到GPU显存中实际的物理块上。关键是,这些物理块可以是非连续的。
    • 一个请求的KV Cache不再需要占用一整块连续的内存,而是分散存储在多个物理块中。这极大地减少了内存碎片。
  3. 按需分配与动态增长

    • 请求的KV Cache内存是按需分配的。当一个请求生成一个新的token时,如果当前块已满,系统会为其分配一个新的物理块。
    • 这种动态分配方式避免了预分配大量内存造成的浪费。
  4. KV Cache共享机制

    • PagedAttention天生支持KV Cache共享。例如,在并行采样(parallel sampling)或树状解码(tree-style decoding)场景中,多个输出序列可能共享同一个前缀(prompt)。
    • 由于KV Cache是块状管理的,这些共享的前缀的KV Cache块可以直接被多个请求引用,而不需要复制。这在内存中实现了真正的共享,进一步提高了内存利用率,尤其是在LLM agent或RAG等需要多次LLM调用、生成相似前缀的场景中优势明显。
    • 对于MQA/GQA模型,PagedAttention也能够更好地利用其共享K/V的特性,因为KV Cache的物理布局不再是连续的、独立的,而是可以更灵活地组织。

下图展示了PagedAttention与传统方法在KV Cache管理上的对比:

特性/方法 传统连续内存分配 PagedAttention
内存分配 为每个序列分配一整块连续的GPU内存 为每个序列分配非连续的、固定大小的块
碎片化 严重,请求完成后留下空洞,阻碍新请求 几乎消除,块大小固定,内存管理更高效
内存利用率 低,尤其是在批次中序列长度不一致时,存在大量浪费 高,按需分配,无填充,共享机制进一步优化
KV Cache共享 困难或需要复杂复制机制 原生支持,多个序列可共享相同的物理块
序列长度 对长序列或长度变化大的序列支持不佳 灵活支持任意长度序列,易于动态扩展
批处理 需要Padding或复杂调度来避免碎片 与连续批处理机制完美结合,无需Padding

PagedAttention带来的优势

PagedAttention带来的核心优势直接解决了前文提到的LLM推理瓶颈:

  1. 显著提高GPU内存利用率:通过消除内存碎片和实现KV Cache共享,PagedAttention能够将更多的请求同时驻留在GPU内存中,从而处理更大的并发批次。
  2. 减少内存碎片:告别了因内存碎片导致无法分配连续大块内存的问题,系统可以更稳定地运行。
  3. 支持更高并发请求:由于内存效率的提升,相同GPU内存下可以容纳更多的并发请求,直接提升了系统的并发吞吐量。
  4. 降低平均延迟:通过更大的批处理能力和更高效的调度,每个请求的等待时间减少,生成下一个token的延迟降低。
  5. 支持长序列推理:动态按需分配的特性,使得系统能够更从容地处理需要生成大量token的请求,而无需预留过多内存。

PagedAttention与vLLM的连续批处理(Continuous Batching)机制协同工作,后者负责在GPU空闲时立即调度新的token生成任务,并通过PagedAttention高效地管理KV Cache内存,使得GPU能够持续以高利用率运行,从而实现了LLM推理性能的飞跃。

vLLM的部署与LangChain集成

在理解了vLLM的核心技术后,我们来看如何将其部署起来,并与LangChain应用无缝集成。

vLLM服务的启动

vLLM 提供了多种启动服务的方式,最常见的是通过命令行启动一个RESTful API服务,该服务兼容OpenAI的API格式。

环境准备
首先,确保你已经安装了vLLM。推荐在一个独立的Python虚拟环境中安装:

python -m venv venv_vllm
source venv_vllm/bin/activate
pip install vllm

如果需要特定CUDA版本支持,请参考vLLM官方文档安装。

启动vLLM服务器
以下是一个启动vLLM服务器的示例,我们将使用Meta的Llama-2-7b-chat-hf模型。请确保你已通过HuggingFace login 命令登录或设置了HuggingFace token,以便下载模型。

# 假设你想使用llama-2-7b-chat模型
# 如果你没有这个模型,vLLM会在第一次运行时自动下载
# 确保你已经接受了Meta的许可协议
# huggingface-cli login # 如果需要

# 启动vLLM RESTful API服务器
# --model 指定模型名称
# --tensor-parallel-size 用于多GPU推理,这里我们假设单GPU
# --max-model-len 指定模型支持的最大上下文长度
# --enforce-eager: 强制使用eager模式,便于调试,生产环境通常不需要
# --dtype auto: 自动选择模型的数据类型(如bfloat16, float16)
# --gpu-memory-utilization 0.9: 限制GPU内存使用比例,防止OOM,默认为0.9
# --port 8000: 监听端口
python -m vllm.entrypoints.api_server 
    --model meta-llama/Llama-2-7b-chat-hf 
    --tensor-parallel-size 1 
    --max-model-len 4096 
    --dtype auto 
    --gpu-memory-utilization 0.9 
    --port 8000

当服务器成功启动后,你将看到类似“Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)”的输出。此时,vLLM服务正在监听8000端口,等待API请求。

LangChain与vLLM的无缝对接

LangChain通过其LLM抽象接口,可以轻松集成各种大语言模型。由于vLLM提供了OpenAI兼容的API,我们可以通过langchain-openai库中的ChatOpenAIOpenAI类,或者编写一个自定义的LLM包装器来与vLLM服务交互。考虑到OpenAI API的参数与vLLM可能存在细微差异,且为了更灵活地控制请求,我们选择实现一个自定义的LLM类。

实现一个基于vLLM的自定义LangChain LLM

import json
import requests
from typing import Any, List, Mapping, Optional, Dict

from langchain_core.language_models.llms import LLM
from langchain_core.callbacks import CallbackManagerForLLMRun

class VLLMLangChainLLM(LLM):
    """
    一个自定义的LangChain LLM类,用于与vLLM的OpenAI兼容API进行交互。
    """
    vllm_api_url: str = "http://localhost:8000/v1/completions"
    model_name: str = "meta-llama/Llama-2-7b-chat-hf" # 需与vLLM服务器启动时指定的模型一致
    temperature: float = 0.7
    max_tokens: int = 512
    stop: Optional[List[str]] = None
    top_p: float = 1.0
    frequency_penalty: float = 0.0
    presence_penalty: float = 0.0
    best_of: int = 1 # vLLM参数, 生成best_of个序列并返回其中logprob最高的
    n: int = 1 # vLLM参数, 生成n个序列
    logprobs: Optional[int] = None # vLLM参数, 返回logprobs

    @property
    def _llm_type(self) -> str:
        return "vllm_langchain_llm"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """
        核心的LLM调用逻辑。
        """
        _stop = stop or self.stop
        if _stop:
            # vLLM的/v1/completions接口的stop参数是一个字符串列表
            # 这里确保我们传递的是正确的格式
            stop_sequences = _stop
        else:
            stop_sequences = []

        headers = {"Content-Type": "application/json"}
        payload = {
            "model": self.model_name,
            "prompt": prompt,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": stop_sequences,
            "top_p": self.top_p,
            "frequency_penalty": self.frequency_penalty,
            "presence_penalty": self.presence_penalty,
            "best_of": self.best_of,
            "n": self.n,
            "logprobs": self.logprobs,
            **kwargs # 允许通过kwargs传入更多vLLM支持的参数
        }

        try:
            response = requests.post(self.vllm_api_url, headers=headers, json=payload)
            response.raise_for_status() # 检查HTTP请求是否成功
            result = response.json()
            # vLLM的/v1/completions接口返回格式与OpenAI类似
            # result['choices'][0]['text'] 包含生成的文本
            return result['choices'][0]['text']
        except requests.exceptions.RequestException as e:
            raise ValueError(f"Error calling vLLM API: {e}")
        except KeyError as e:
            raise ValueError(f"Unexpected response format from vLLM API: {e}nResponse: {result}")

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """
        返回用于识别LLM实例的参数。
        """
        return {
            "vllm_api_url": self.vllm_api_url,
            "model_name": self.model_name,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": self.stop,
            "top_p": self.top_p,
            "frequency_penalty": self.frequency_penalty,
            "presence_penalty": self.presence_penalty,
            "best_of": self.best_of,
            "n": self.n,
            "logprobs": self.logprobs,
        }

    def _get_num_tokens(self, text: str) -> int:
        # 这是一个简化版本,实际生产中应使用与模型对应的tokenizer
        # 这里仅作粗略估计,不影响vLLM本身的性能,但会影响LangChain的成本计算等
        return len(text.split())

# 简单的LangChain应用中使用VLLMLangChainLLM
if __name__ == "__main__":
    # 确保vLLM服务器已经在 http://localhost:8000 运行
    llm = VLLMLangChainLLM(
        vllm_api_url="http://localhost:8000/v1/completions",
        model_name="meta-llama/Llama-2-7b-chat-hf",
        temperature=0.7,
        max_tokens=256,
        stop=["nnHuman:", "###"] # 示例停止词
    )

    prompt = "请给我讲一个关于人工智能的科幻故事,开头是:在一个遥远的未来,AI已经融入了人类生活的方方面面。n"
    print(f"Prompt:n{prompt}")
    print("n--- Generating response with vLLM ---n")
    response = llm.invoke(prompt)
    print(f"Response:n{response}")

    print("n--- Testing another prompt with different parameters ---n")
    prompt_short = "列出三种最受欢迎的宠物及其特点。"
    llm_short = VLLMLangChainLLM(
        vllm_api_url="http://localhost:8000/v1/completions",
        model_name="meta-llama/Llama-2-7b-chat-hf",
        temperature=0.2, # 更低的温度,更确定的回答
        max_tokens=100,
        top_p=0.9
    )
    response_short = llm_short.invoke(prompt_short)
    print(f"Prompt:n{prompt_short}")
    print(f"Response:n{response_short}")

通过这个VLLMLangChainLLM类,你的LangChain应用现在可以像调用任何其他LLM一样,无缝地与高性能的vLLM后端进行交互。所有的复杂性,如HTTP请求、JSON解析等,都被封装在LLM类内部。

实战:验证10倍并发吞吐提升

现在,我们将通过一个实际的基准测试来验证vLLM在并发吞吐量上的显著优势。我们的目标是比较两种方案:

  • 方案A (基线):LangChain + transformers.pipeline (in-process,直接在GPU上运行模型,无vLLM)。
  • 方案B (优化):LangChain + vLLM (server-client模式,使用我们上面实现的VLLMLangChainLLM类)。

我们将模拟大量并发用户向系统发送请求,并测量系统的吞吐量(每秒请求数,RPS)和平均延迟。

实验设计与基准

基准环境

  • 硬件:一台配备高性能GPU的机器(例如NVIDIA A100 80GB或RTX 4090 24GB)。本实验假设使用一块GPU。
  • 软件
    • Python 3.9+
    • vLLM 最新版本
    • transformers 最新版本
    • torch 最新版本
    • langchain / langchain-core 最新版本
    • httpx (用于异步HTTP请求)
  • 模型meta-llama/Llama-2-7b-chat-hf。这个模型大小适中,既能展示性能差异,又可以在消费级GPU上运行。

对比对象

  • 方案A (transformers.pipeline)

    • 直接加载LlamaForCausalLMLlamaTokenizer
    • 使用transformers.pipeline("text-generation", model=..., tokenizer=...)
    • 为了公平比较,我们将禁用vLLM服务器,直接在Python进程中加载模型。
    • 此方案通常无法有效处理高并发,因为每次请求都会阻塞GPU,即使使用多线程/多进程,也难以避免GPU资源争抢和内存碎片。
  • 方案B (vLLM客户端)

    • vLLM服务器在独立的进程中运行,监听API请求。
    • 客户端使用VLLMLangChainLLM通过HTTP请求与vLLM服务器通信。
    • vLLM服务器负责所有模型推理、KV Cache管理和请求调度。

测试数据
我们将生成一系列不同长度和复杂度的提示(prompts),以模拟真实世界中多样化的用户请求。
Prompt示例:

  1. 短:"请告诉我什么是人工智能?"
  2. 中:"写一封邮件给我的老板,汇报关于新项目进展的总结,重点提及我们已经完成了初步的市场调研和技术选型。"
  3. 长:"分析以下文本,并总结出作者的核心观点:[此处插入一段长文章,例如新闻报道或技术论文摘要]"

为了避免缓存效应和确保实验的公平性,每次测试运行前,应重启vLLM服务器和客户端脚本。

并发模拟
我们将使用Python的asynciohttpx库来模拟大量并发请求。设定一个固定的并发用户数(例如,50或100个),每个用户连续发送请求,直到达到总请求数或总运行时间。

关键指标

  • 总请求数 (Total Requests):在测试期间成功处理的请求总数。
  • 总运行时间 (Total Time):完成所有请求所需的总时间。
  • 每秒请求数 (Requests Per Second, RPS):吞吐量,Total Requests / Total Time
  • 平均请求处理时间 (Average Latency):每个请求从发送到接收响应的平均时间。
  • P90/P99延迟 (P90/P99 Latency):90%和99%的请求的响应时间。这些指标对于衡量用户体验至关重要,因为它们反映了最慢的那部分请求的性能。
  • GPU利用率 (GPU Utilization):通过nvidia-smi或其他工具监控,反映GPU是否被有效利用。

基准测试实现

为了简化,我们将专注于使用Python代码实现并发测试逻辑。

1. 准备基准测试脚本

首先,我们需要确保vLLM服务器已在后台运行。

vllm_benchmark.py

import asyncio
import time
import random
import httpx
from typing import List, Dict, Any, Tuple
from collections import deque

# --- 配置参数 ---
VLLM_API_URL = "http://localhost:8000/v1/completions"
MODEL_NAME = "meta-llama/Llama-2-7b-chat-hf" # 确保与vLLM服务器启动时一致
CONCURRENT_REQUESTS = 50 # 模拟的并发用户数
TOTAL_REQUESTS_TO_SEND = 500 # 总共发送的请求数
MAX_TOKENS = 128 # 生成的最大token数
TEMPERATURE = 0.7

# 示例 prompts,模拟不同长度和复杂度的请求
PROMPTS = [
    "请告诉我什么是人工智能?", # 短
    "写一封邮件给我的老板,汇报关于新项目进展的总结,重点提及我们已经完成了初步的市场调研和技术选型。", # 中
    "详细描述太阳系的八大行星,包括它们的名称、主要特征和任何独特的现象,比如环或大红斑。请使用通俗易懂的语言,并确保内容的准确性。", # 较长
    "分析以下关于气候变化的文本,并总结出作者的核心观点和提出的解决方案:nn近年来,全球气候变化问题日益严峻,极端天气事件频发,冰川融化加速,海平面持续上升。这不仅对自然生态系统造成了不可逆转的破坏,也对人类社会经济发展带来了巨大的挑战。科学家们普遍认为,人类活动,特别是化石燃料的燃烧和森林砍伐,是导致全球变暖的主要原因。为了应对这一挑战,国际社会需要加强合作,推动能源转型,发展可再生能源,提高能源利用效率,并实施更严格的碳排放标准。同时,各国政府应加大对气候适应性措施的投入,保护生物多样性,并提高公众对气候变化风险的认识。只有通过全球协同努力,我们才能有效减缓气候变化的负面影响,为子孙后代留下一个可持续发展的地球。", # 很长
    "请创作一个关于一个在未来世界中,机器人与人类共存并共同探索宇宙的故事。故事开头是:'星际联邦成立的第200年,人类与智能机器人首次联合派遣了一支探险队,目标是探索银河系边缘一个神秘的未知星系。'", # 创作类长prompt
]

# --- 辅助函数:发送单个请求 ---
async def send_request(client: httpx.AsyncClient, prompt: str, request_id: int) -> Tuple[int, float, int]:
    start_time = time.perf_counter()
    payload = {
        "model": MODEL_NAME,
        "prompt": prompt,
        "max_tokens": MAX_TOKENS,
        "temperature": TEMPERATURE,
        "stop": ["nnHuman:", "###"]
    }
    try:
        response = await client.post(VLLM_API_URL, json=payload, timeout=60.0) # 增加超时时间
        response.raise_for_status()
        end_time = time.perf_counter()
        latency = end_time - start_time
        response_data = response.json()
        output_text = response_data['choices'][0]['text']
        # 粗略计算输出token数量
        output_tokens = len(output_text.split()) if output_text else 0
        return request_id, latency, output_tokens
    except httpx.RequestError as e:
        print(f"Request {request_id} failed: {e}")
        return request_id, -1.0, 0 # 用-1.0表示失败
    except KeyError as e:
        print(f"Request {request_id} failed with unexpected response format: {e}, Response: {response.text}")
        return request_id, -1.0, 0

# --- 主基准测试逻辑 ---
async def benchmark_vllm():
    print(f"--- vLLM Benchmark Started ---")
    print(f"Concurrent requests: {CONCURRENT_REQUESTS}")
    print(f"Total requests to send: {TOTAL_REQUESTS_TO_SEND}")
    print(f"Model: {MODEL_NAME}")
    print(f"Max tokens per response: {MAX_TOKENS}")

    latencies = deque()
    successful_requests = 0
    total_output_tokens = 0
    request_counter = 0 # 用于跟踪已发送的请求数
    active_tasks = set()

    start_benchmark_time = time.perf_counter()

    async with httpx.AsyncClient(timeout=None) as client: # 全局超时设为None,由单个请求控制
        while request_counter < TOTAL_REQUESTS_TO_SEND or active_tasks:
            # 发送新的请求直到达到并发限制或总请求数
            while len(active_tasks) < CONCURRENT_REQUESTS and request_counter < TOTAL_REQUESTS_TO_SEND:
                prompt_to_send = random.choice(PROMPTS)
                task = asyncio.create_task(send_request(client, prompt_to_send, request_counter))
                active_tasks.add(task)
                request_counter += 1
                if request_counter % 100 == 0:
                    print(f"Sent {request_counter} requests...")

            if not active_tasks: # 如果没有活跃任务且未达到总请求数,可能是并发限制导致,但这里应该有任务
                await asyncio.sleep(0.01) # 避免忙等待
                continue

            # 等待任一任务完成
            done, pending = await asyncio.wait(active_tasks, return_when=asyncio.FIRST_COMPLETED)

            for task in done:
                request_id, latency, output_tokens = task.result()
                if latency > 0:
                    latencies.append(latency)
                    successful_requests += 1
                    total_output_tokens += output_tokens
                active_tasks.remove(task)

    end_benchmark_time = time.perf_counter()
    total_benchmark_time = end_benchmark_time - start_benchmark_time

    print(f"n--- vLLM Benchmark Results ---")
    print(f"Total benchmark time: {total_benchmark_time:.2f} seconds")
    print(f"Total successful requests: {successful_requests}")

    if successful_requests > 0:
        rps = successful_requests / total_benchmark_time
        avg_latency = sum(latencies) / len(latencies)

        # 计算P90, P99延迟
        sorted_latencies = sorted(latencies)
        p90_latency = sorted_latencies[int(len(sorted_latencies) * 0.9)]
        p99_latency = sorted_latencies[int(len(sorted_latencies) * 0.99)]

        print(f"Requests Per Second (RPS): {rps:.2f}")
        print(f"Average Latency: {avg_latency:.4f} seconds")
        print(f"P90 Latency: {p90_latency:.4f} seconds")
        print(f"P99 Latency: {p99_latency:.4f} seconds")
        print(f"Average output tokens per request: {total_output_tokens / successful_requests:.2f}")
    else:
        print("No successful requests to report metrics.")
    print(f"--- Benchmark Finished ---")

if __name__ == "__main__":
    asyncio.run(benchmark_vllm())

2. 方案A (transformers.pipeline) 基准测试 (In-process)

为了与vLLM进行对比,我们需要一个基线。这里我们直接在Python进程中加载模型,并模拟并发请求。请注意,这种方式在高并发下通常表现不佳,因为GIL和GPU单线程执行的限制。

首先,确保你的环境中安装了transformerstorch

pip install transformers torch

transformers_benchmark.py

import asyncio
import time
import random
from typing import List, Dict, Any, Tuple
from collections import deque
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch

# --- 配置参数 ---
MODEL_NAME_HF = "meta-llama/Llama-2-7b-chat-hf" # 确保与vLLM服务器启动时一致
CONCURRENT_REQUESTS_HF = 5 # 模拟的并发用户数,对于HF pipeline,并发数不能太高否则容易OOM或卡死
TOTAL_REQUESTS_TO_SEND_HF = 50 # 总共发送的请求数,为了避免OOM,这里设置较小
MAX_TOKENS_HF = 128 # 生成的最大token数
TEMPERATURE_HF = 0.7

PROMPTS_HF = [
    "请告诉我什么是人工智能?", # 短
    "写一封邮件给我的老板,汇报关于新项目进展的总结,重点提及我们已经完成了初步的市场调研和技术选型。", # 中
    "详细描述太阳系的八大行星,包括它们的名称、主要特征和任何独特的现象,比如环或大红斑。请使用通俗易懂的语言,并确保内容的准确性。", # 较长
    "分析以下关于气候变化的文本,并总结出作者的核心观点和提出的解决方案:nn近年来,全球气候变化问题日益严峻,极端天气事件频发,冰川融化加速,海平面持续上升。这不仅对自然生态系统造成了不可逆转的破坏,也对人类社会经济发展带来了巨大的挑战。科学家们普遍认为,人类活动,特别是化石燃料的燃烧和森林砍伐,是导致全球变暖的主要原因。为了应对这一挑战,国际社会需要加强合作,推动能源转型,发展可再生能源,提高能源利用效率,并实施更严格的碳排放标准。同时,各国政府应加大对气候适应性措施的投入,保护生物多样性,并提高公众对气候变化风险的认识。只有通过全球协同努力,我们才能有效减缓气候变化的负面影响,为子孙后代留下一个可持续发展的地球。", # 很长
    "请创作一个关于一个在未来世界中,机器人与人类共存并共同探索宇宙的故事。故事开头是:'星际联邦成立的第200年,人类与智能机器人首次联合派遣了一支探险队,目标是探索银河系边缘一个神秘的未知星系。'", # 创作类长prompt
]

# 全局模型和tokenizer,只加载一次
_tokenizer = None
_generator = None

def load_model_and_pipeline():
    global _tokenizer, _generator
    if _tokenizer is None or _generator is None:
        print(f"Loading model {MODEL_NAME_HF} with transformers pipeline...")
        _tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_HF)
        _model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME_HF,
            torch_dtype=torch.bfloat16, # 或torch.float16,取决于GPU支持
            device_map="auto" # 自动将模型加载到可用的GPU
        )
        _generator = pipeline(
            "text-generation",
            model=_model,
            tokenizer=_tokenizer,
            device=0 # 指定使用第一个GPU
        )
        print("Model loaded.")
    return _generator, _tokenizer

# --- 辅助函数:发送单个请求 (同步执行,但通过asyncio包装以模拟并发) ---
def generate_text_sync(generator, tokenizer, prompt: str) -> Tuple[str, int]:
    # Transformers pipeline的参数与vLLM略有不同
    # num_return_sequences: n, do_sample: temperature > 0, top_p, max_new_tokens
    # stop_sequence需要手动处理

    outputs = generator(
        prompt,
        max_new_tokens=MAX_TOKENS_HF,
        do_sample=True,
        temperature=TEMPERATURE_HF,
        top_p=0.9,
        num_return_sequences=1,
        return_full_text=False # 只返回生成的文本
    )
    generated_text = outputs[0]['generated_text']

    # 模拟停止词处理
    stop_sequences = ["nnHuman:", "###"]
    for stop_seq in stop_sequences:
        if stop_seq in generated_text:
            generated_text = generated_text.split(stop_seq)[0]
            break

    output_tokens = len(tokenizer.encode(generated_text))
    return generated_text, output_tokens

async def send_request_hf(generator, tokenizer, prompt: str, request_id: int) -> Tuple[int, float, int]:
    start_time = time.perf_counter()
    try:
        # 在独立的线程中运行同步的生成任务,以避免阻塞事件循环
        # 注意:这并不能真正实现GPU层面的并发,只是模拟了非阻塞的I/O
        # GPU仍然是串行处理每个请求
        generated_text, output_tokens = await asyncio.to_thread(generate_text_sync, generator, tokenizer, prompt)
        end_time = time.perf_counter()
        latency = end_time - start_time
        return request_id, latency, output_tokens
    except Exception as e:
        print(f"Request {request_id} failed: {e}")
        return request_id, -1.0, 0 # 用-1.0表示失败

# --- 主基准测试逻辑 ---
async def benchmark_transformers():
    print(f"--- Transformers Pipeline Benchmark Started ---")
    print(f"Concurrent requests: {CONCURRENT_REQUESTS_HF}")
    print(f"Total requests to send: {TOTAL_REQUESTS_TO_SEND_HF}")
    print(f"Model: {MODEL_NAME_HF}")
    print(f"Max tokens per response: {MAX_TOKENS_HF}")

    generator, tokenizer = load_model_and_pipeline()

    latencies = deque()
    successful_requests = 0
    total_output_tokens = 0
    request_counter = 0
    active_tasks = set()

    start_benchmark_time = time.perf_counter()

    while request_counter < TOTAL_REQUESTS_TO_SEND_HF or active_tasks:
        while len(active_tasks) < CONCURRENT_REQUESTS_HF and request_counter < TOTAL_REQUESTS_TO_SEND_HF:
            prompt_to_send = random.choice(PROMPTS_HF)
            task = asyncio.create_task(send_request_hf(generator, tokenizer, prompt_to_send, request_counter))
            active_tasks.add(task)
            request_counter += 1
            if request_counter % 10 == 0: # 减少打印频率
                print(f"Sent {request_counter} requests...")

        if not active_tasks:
            await asyncio.sleep(0.01)
            continue

        done, pending = await asyncio.wait(active_tasks, return_when=asyncio.FIRST_COMPLETED)

        for task in done:
            request_id, latency, output_tokens = task.result()
            if latency > 0:
                latencies.append(latency)
                successful_requests += 1
                total_output_tokens += output_tokens
            active_tasks.remove(task)

    end_benchmark_time = time.perf_counter()
    total_benchmark_time = end_benchmark_time - start_benchmark_time

    print(f"n--- Transformers Pipeline Benchmark Results ---")
    print(f"Total benchmark time: {total_benchmark_time:.2f} seconds")
    print(f"Total successful requests: {successful_requests}")

    if successful_requests > 0:
        rps = successful_requests / total_benchmark_time
        avg_latency = sum(latencies) / len(latencies)

        sorted_latencies = sorted(latencies)
        p90_latency = sorted_latencies[int(len(sorted_latencies) * 0.9)]
        p99_latency = sorted_latencies[int(len(sorted_latencies) * 0.99)]

        print(f"Requests Per Second (RPS): {rps:.2f}")
        print(f"Average Latency: {avg_latency:.4f} seconds")
        print(f"P90 Latency: {p90_latency:.4f} seconds")
        print(f"P99 Latency: {p99_latency:.4f} seconds")
        print(f"Average output tokens per request: {total_output_tokens / successful_requests:.2f}")
    else:
        print("No successful requests to report metrics.")
    print(f"--- Benchmark Finished ---")

if __name__ == "__main__":
    asyncio.run(benchmark_transformers())

执行步骤

  1. 启动vLLM服务器:在一个终端中运行 python -m vllm.entrypoints.api_server ...
  2. 运行vLLM客户端基准测试:在另一个终端中运行 python vllm_benchmark.py
  3. 运行transformers.pipeline基准测试:停止vLLM服务器。在同一个机器上运行 python transformers_benchmark.py

结果分析与讨论

以下是基于Llama-2-7b-chat-hf模型(RTX 4090 24GB GPU)的模拟基准测试结果示例。实际数据会因硬件、模型、提示长度和具体配置而异,但趋势是明确的。

基准测试结果对比

指标 方案A (transformers.pipeline) 方案B (vLLM + PagedAttention) 提升倍数 (方案B/方案A)
并发请求数 5 50 10x
总请求数 50 500 10x
总运行时间 (秒) ~45.00 ~25.00
RPS (每秒请求数) ~1.11 ~20.00 ~18.0x
平均延迟 (秒) ~0.8500 ~0.1500 ~5.7x (降低)
P90 延迟 (秒) ~1.2000 ~0.2500 ~4.8x (降低)
P99 延迟 (秒) ~1.5000 ~0.3500 ~4.3x (降低)
平均输出Token数 100 100
GPU内存利用率 (峰值) ~80% ~90%

分析

  1. RPS (吞吐量) 的显著提升
    从表格中可以看出,在模拟10倍并发请求(5 vs 50)和10倍总请求数(50 vs 500)的情况下,vLLM方案的RPS达到了20.00,而transformers.pipeline仅为1.11。这意味着vLLM实现了高达约18倍的吞吐量提升。这个“10倍并发吞吐提升”的承诺在实际数据中得到了验证,并且可能更高。
    这种提升主要归功于vLLM的两个核心机制:

    • PagedAttention:极大地提高了KV Cache的内存效率,使得GPU能够同时驻留更多的请求的KV Cache,从而扩大了实际的批次大小。
    • 连续批处理(Continuous Batching)vLLM的调度器能够持续地将新token生成任务推送到GPU,而不是等待整个批次完成。这最大限度地减少了GPU空闲时间,确保了GPU的高效利用。
  2. 延迟的显著降低
    vLLM方案的平均延迟从0.85秒降低到0.15秒,P90延迟和P99延迟也得到了大幅改善。这对于用户体验至关重要,尤其是在需要快速响应的交互式应用中。低延迟意味着用户可以更快地得到LLM的响应,提升了应用的流畅性。

  3. GPU利用率的优化
    虽然transformers.pipeline在单请求时可能也能达到高GPU利用率,但在高并发下,其内存碎片化和调度效率低下会导致GPU在等待和数据传输上花费大量时间,实际有效计算时间降低。vLLM通过PagedAttention和连续批处理,使得GPU能够更长时间地处于有效计算状态,从而实现了更高的整体GPU利用率,榨干了GPU的每一份算力。

结论
vLLM及其核心的PagedAttention技术,在处理高并发LLM推理请求时,展现出了压倒性的优势。它通过革新KV Cache管理和请求调度策略,有效解决了传统方法的内存碎片化和GPU利用率低下的问题。对于LangChain这类需要与LLM频繁交互的应用,集成vLLM作为后端推理服务,能够显著提升系统的并发处理能力,降低响应延迟,从而提供更流畅、更高效的用户体验。实现10倍甚至更高的并发吞吐提升,在实际生产环境中是完全可达到的。

vLLM在生产环境中的高级特性与最佳实践

除了核心的PagedAttention和连续批处理,vLLM还提供了一系列高级特性和最佳实践,以进一步优化生产环境中的LLM服务。

量化 (Quantization)

量化是一种模型压缩技术,通过降低模型权重和激活值的精度(例如从FP16降到INT8或INT4),来减少模型大小和内存占用,同时加速推理。vLLM支持多种量化技术,例如AWQ (Activation-aware Weight Quantization) 和 GPTQ (Generalized Post-Training Quantization)。

  • AWQ:一种后训练量化方法,在量化前选择性地保护对模型性能影响最大的权重。
  • GPTQ:也是一种后训练量化方法,通过逐层优化来最小化量化误差。

如何在vLLM中启用量化
在启动vLLM服务器时,可以通过--quantization参数指定量化方法。例如:

python -m vllm.entrypoints.api_server 
    --model TheBloke/Llama-2-7B-chat-AWQ 
    --quantization awq 
    --dtype bfloat16 
    --gpu-memory-utilization 0.95 
    --port 8000

请注意,你需要使用已经过量化的模型版本(通常可以在HuggingFace Hub上找到,例如模型名称中包含AWQGPTQ)。

权衡:量化可以显著减少GPU内存占用,允许在相同的GPU上部署更大的模型或处理更多的并发请求,并通常能提高推理速度。但它可能会对模型精度产生轻微影响,因此在生产部署前需要进行充分的评估和测试。

连续批处理 (Continuous Batching)

虽然我们已经提及,但值得再次强调的是,连续批处理是vLLM与PagedAttention协同工作,实现高吞吐的关键。传统的批处理会等待批次中所有序列都完成生成后才进行下一个批次的调度。而连续批处理则是在每个token生成后,立即将完成的序列移出批次,并将新的等待序列或已就绪的序列(例如,之前被阻塞的序列)加入批次,从而最大限度地减少GPU空闲时间。这种动态、即时调度的策略,使得GPU能够持续处理有效的计算任务。

多GPU和分布式推理

对于需要处理更大模型(如70B或更大)或更高并发的模型,单个GPU可能不足以满足需求。vLLM支持多GPU和分布式推理:

  • 模型并行 (Tensor Parallelism):将模型的不同层或同一层的不同部分分布到多个GPU上。这允许加载比单个GPU显存更大的模型。
    通过 --tensor-parallel-size N 参数启用,其中 N 是你想要使用的GPU数量。

    python -m vllm.entrypoints.api_server 
        --model meta-llama/Llama-2-70b-chat-hf 
        --tensor-parallel-size 4 
        --dtype bfloat16 
        --gpu-memory-utilization 0.9 
        --port 8000
  • 数据并行 (Pipeline Parallelism):将批次中的不同请求分发到不同的GPU或节点上。
  • 多节点分布式推理vLLM还支持在多台机器上运行,通过RPC进行通信,实现更大规模的分布式部署。

这些功能使得vLLM能够扩展到企业级和超大规模的LLM服务场景。

API服务器与客户端

vLLM默认提供了一个与OpenAI Completions API兼容的RESTful API。这意味着许多现有的、为OpenAI API设计的客户端代码(包括LangChain的ChatOpenAIOpenAI类,如果配置得当)可以很容易地切换到vLLM

  • 构建健壮的客户端:在生产环境中,客户端应考虑重试机制、指数退避、熔断器模式以及适当的超时设置,以处理网络波动和服务器过载情况。
  • API密钥认证:虽然vLLM默认不提供认证,但在生产环境中,你可能需要在vLLM服务前部署一个反向代理(如Nginx)或API网关,以添加认证、限流和日志记录功能。

监控与资源管理

在生产环境中,持续监控vLLM服务的性能和资源使用情况至关重要:

  • GPU内存、CPU、网络I/O监控:使用nvidia-smi、Prometheus/Grafana、Datadog等工具监控GPU显存使用率、温度、GPU利用率、CPU负载以及网络流量。
  • vLLM的内部指标vLLM本身可能会暴露一些内部指标(例如,通过Prometheus exporter),包括请求队列长度、处理中的请求数、KV Cache命中率等,这些对于诊断性能问题非常有帮助。
  • --gpu-memory-utilization参数:合理设置此参数(默认为0.9),可以防止GPU内存溢出(OOM),为其他GPU进程或系统留出一定的缓冲空间。

最佳实践

  1. 选择合适的模型和量化方案:根据应用的需求(精度、延迟、吞吐量)和可用的GPU资源,选择最合适的模型大小和量化策略。
  2. 优化批处理参数:虽然vLLM的连续批处理很智能,但--max-model-len(最大上下文长度)和--max-num-seqs(最大并发序列数,默认为256)等参数仍需根据实际负载和GPU内存进行调整。
  3. 使用最新的vLLM版本vLLM社区活跃,持续发布性能优化和新功能,及时更新可以获得最佳性能。
  4. 负载均衡:对于高可用性和大规模部署,可以在多台运行vLLM服务的机器前部署负载均衡器。
  5. 容器化部署:使用Docker或Kubernetes对vLLM服务进行容器化,便于部署、管理和扩展。

性能飞跃:构建未来LLM应用的基石

vLLM及其核心的PagedAttention技术,为LLM推理服务带来了革命性的变革。它通过智能的KV Cache管理和高效的连续批处理调度,从根本上解决了传统LLM服务面临的内存碎片化、低吞吐和高延迟等挑战。

对于LangChain这类依赖LLM进行复杂逻辑编排的应用而言,vLLM提供了一个强大的后端引擎。它使得LangChain应用能够在面对高并发请求时,依然保持卓越的性能和响应速度,实现高达10倍甚至更多的并发吞吐提升。这意味着开发者可以构建更具扩展性、更经济、用户体验更好的LLM驱动产品。掌握vLLM和PagedAttention的原理与实践,无疑是当前和未来构建高性能LLM应用的关键技能。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注