各位同仁,下午好!
今天,我们将深入探讨一个当下最热门、最具挑战性的技术领域——如何使用 Go 语言构建一个高性能的 LLM Serving 架构,并重点聚焦于如何高效管理千万级并发请求下的 KV-Cache 内存池。大型语言模型(LLM)的兴起,带来了前所未有的计算和内存挑战,而 Go 语言以其卓越的并发特性和简洁性,正成为解决这些挑战的有力工具。
引言:LLM 服务的高并发挑战
大型语言模型(LLM)在生成文本时,其核心计算是自回归的:模型根据当前已生成的序列预测下一个词元(token)。为了提高推理效率,尤其是当序列长度逐渐增加时,我们不能每次都重新计算整个序列中所有词元的 Key (K) 和 Value (V) 矩阵。这就是 KV-Cache 存在的意义。
KV-Cache 存储了模型在推理过程中,所有已处理词元在 Transformer 解码器层中的 K 和 V 矩阵。这样,在生成下一个词元时,模型只需计算新词元的 K 和 V,然后将其与之前缓存的 K 和 V 拼接起来,送入注意力机制。这显著减少了重复计算,是实现高效 LLM 推理的关键。
然而,KV-Cache 也带来了巨大的内存挑战:
- 内存占用巨大: 每次推理都需要为每个 Transformer 层存储 K 和 V 矩阵。对于一个拥有数百亿参数的模型,即使是单个请求,其 KV-Cache 也可能占用数百 MB 甚至数 GB 的内存。
- 动态增长: 随着生成序列的延长,KV-Cache 的大小会线性增长。
- 并发请求: 当面临千万级并发请求时(这里“千万级并发”指的是系统在一段时间内需要同时处理或维护状态的请求数量,即便不是所有请求在同一微秒内进行计算,它们也都需要占用 KV-Cache 资源),如果为每个请求独立分配 KV-Cache,内存将迅速耗尽,并可能导致严重的内存碎片。
因此,一个高效的 KV-Cache 内存管理系统,是构建高性能 LLM Serving 架构的基石。
Go 在高性能 LLM Serving 中的定位与优势
Go 语言可能不是你首先想到的用于深度学习推理的语言,通常人们会选择 Python 结合 CUDA/C++。然而,对于 LLM Serving 的控制平面和数据平面协调部分,Go 展现出独特的优势:
- 卓越的并发模型: Goroutine 和 Channel 是 Go 的杀手锏。它们使得编写高并发、非阻塞的代码变得异常简单和高效。在 LLM Serving 中,这对于处理大量并发请求、调度推理任务、管理 KV-Cache 内存池以及与底层 GPU 驱动进行通信至关重要。
- 高性能: Go 编译器生成的是原生机器码,运行时性能接近 C/C++。虽然 Go 的垃圾回收(GC)在某些极端低延迟场景下可能引入微小停顿,但通过精心设计内存结构和复用机制,可以有效规避或最小化其影响。
- 内存效率: Go 提供了对内存布局的良好控制,例如通过切片 (
[]byte) 可以直接操作连续内存区域。这对于实现内存池至关重要。 - 开发效率与可维护性: Go 语法简洁,工具链完善,使得开发、部署和维护复杂的分布式系统变得更加容易。
- 与C/C++及GPU的协作: Go 可以通过 CGo 轻松地与现有的 C/C++ 库(如 CUDA、cuBLAS、cuDNN 或 llama.cpp 等)进行交互,将计算密集型任务卸载到 GPU 上,同时利用 Go 来管理上层业务逻辑、请求调度和资源分配。
我们的目标是利用 Go 来构建一个智能的 KV-Cache 内存池,它能有效地调度和复用内存,以支持极高并发下的 LLM 推理。
KV-Cache 深度解析:结构、生命周期与内存足迹
在深入内存池设计之前,我们先回顾 KV-Cache 的基本概念。
LLM 推理的两个阶段
- 预填充(Prefill)阶段:
- 当用户提交一个提示词(prompt)时,模型会一次性处理这个提示词中的所有词元。
- 在这个阶段,模型会计算所有输入词元的 K 和 V 矩阵,并将其存储为 KV-Cache。
- 这是计算密集型阶段,通常涉及大矩阵乘法。
- 解码(Decode)阶段:
- 模型根据当前的 KV-Cache 预测下一个词元。
- 每预测一个词元,模型只计算这个新词元的 K 和 V 矩阵。
- 然后将这个新词元的 K 和 V 矩阵追加到现有的 KV-Cache 中。
- 这个过程重复进行,直到生成结束词元或达到最大长度。
KV-Cache 的数据结构
KV-Cache 的本质是一个或多个张量(Tensors),用于存储每个 Transformer 层中 Key 和 Value 的历史值。
假设一个模型有 num_layers 层,每层有 num_heads 个注意力头,每个头的维度是 head_dim。那么,对于一个长度为 seq_len 的序列,KV-Cache 的内存占用大致如下:
- Key 矩阵:
num_layers * num_heads * seq_len * head_dim * sizeof(dtype) - Value 矩阵:
num_layers * num_heads * seq_len * head_dim * sizeof(dtype)
通常,dtype 是 float16 或 bfloat16,因为它在精度和内存占用之间取得了良好平衡。
内存足迹估算示例:
- 模型层数
num_layers= 32 - 注意力头数
num_heads= 32 - 头维度
head_dim= 128 - 数据类型
dtype=float16(2字节)
对于一个 seq_len = 1024 的 KV-Cache:
内存占用 = 2 * num_layers * num_heads * seq_len * head_dim * sizeof(float16)
= 2 * 32 * 32 * 1024 * 128 * 2 字节
= 536,870,912 字节
= 512 MB
这仅仅是一个请求在序列长度为 1024 时的 KV-Cache 内存占用。当有数百万个这样的请求需要同时维护 KV-Cache 时,总内存需求将是天文数字。因此,有效的内存管理,尤其是通过内存池进行管理,变得刻不容缓。
千万级并发下 KV-Cache 内存池设计的核心原则
面对千万级并发,为每个请求动态分配 KV-Cache 会带来以下问题:
- 内存碎片化: 不同长度的 KV-Cache 频繁分配和释放会导致内存碎片,降低内存利用率,甚至导致分配失败。
- 分配开销: 操作系统级别的内存分配(
malloc/free)开销较大,在高并发下会成为性能瓶颈。 - GC 压力: Go 自身的 GC 对大量短生命周期的对象分配和回收会造成压力。
- 内存利用率低: 如果KV-Cache是连续分配的,当请求完成时,这块内存可能无法被其他请求完全利用。
因此,我们需要一个专门的 KV-Cache 内存池。其核心设计原则包括:
- 预分配大块内存: 在服务启动时或运行时根据需要,预先从操作系统申请一大块连续的内存区域,避免频繁的系统调用。
- 页面(Page)管理: 将预分配的大块内存划分为固定大小的“页面”(Pages)。一个 KV-Cache 请求不再直接占用连续的内存块,而是分配一系列页面。这类似于操作系统的虚拟内存管理,可以有效解决外部碎片问题。
- 高效的页面分配与回收: 内存池需要维护一个空闲页面列表,并能以 O(1) 或 O(logN) 的时间复杂度进行页面的分配和回收。
- 并发安全: 内存池的分配和回收操作必须是并发安全的,以支持多 Goroutine 同时访问。
- 内存复用: 当请求完成时,其占用的页面应立即被回收并重新添加到空闲列表中,供其他请求复用。
- 弹性扩缩容: 内存池应能根据负载动态调整其大小,例如在内存不足时尝试扩容,在长时间低负载时考虑缩容。
- 与调度器协同: 内存池应与请求调度器紧密配合,调度器在决定是否接受新请求或批处理请求时,需要考虑 KV-Cache 的内存可用性。
页面(Page)管理策略
页面管理是 KV-Cache 内存池的核心。我们不再以整个 KV-Cache 作为分配单元,而是以固定大小的页面作为分配单元。
KV-Cache 逻辑结构与物理存储映射:
| 逻辑结构 | 物理存储方式 | 优势 |
|---|---|---|
| KV-Cache Page (K) | 固定大小的 []byte 切片,存储 Key 矩阵的某一部分 |
便于管理、减少碎片、支持非连续分配 |
| KV-Cache Page (V) | 固定大小的 []byte 切片,存储 Value 矩阵的某一部分 |
同上 |
| Request KV-Cache | 由一系列 KV-Cache Page 组成,存储特定请求的 K 和 V 矩阵 | 灵活适应不同长度序列,无需连续大块内存 |
| KV-Cache Memory Pool | 维护所有空闲和已分配的 KV-Cache Page | 集中管理、高效复用、降低系统内存分配开销 |
一个 KV-Cache Page 的大小通常设计为能容纳一个 Transformer 层的一个或几个注意力头的 KV 矩阵的一部分,或者是一个与 GPU 内存对齐的块大小(例如 4KB、16KB、64KB)。
Go 实现高性能 KV-Cache 内存池的架构与细节
现在,让我们通过 Go 语言来具体实现这个 KV-Cache 内存池。
1. 基础内存单元:KVPage
我们定义一个 KVPage 结构体来代表内存池中的一个页面。每个页面都包含一个 []byte 切片,作为其底层内存。
package kvcache
import (
"fmt"
"sync/atomic"
"unsafe"
)
// PageID is a unique identifier for a KVPage
type PageID uint64
// KVPage represents a fixed-size memory block for KV-Cache.
type KVPage struct {
ID PageID // Unique identifier for this page
Data []byte // The actual memory buffer for this page
Capacity int // Total capacity of the Data slice
InUse atomic.Bool // Flag indicating if the page is currently allocated
RequestID string // ID of the request currently using this page (for debugging/tracking)
Timestamp int64 // Timestamp of last allocation/use (for LRU/LFU eviction, if needed)
}
// NewKVPage creates a new KVPage with a given ID and capacity.
// It pre-allocates the underlying byte slice.
func NewKVPage(id PageID, capacity int) *KVPage {
data := make([]byte, capacity) // Pre-allocate the memory
page := &KVPage{
ID: id,
Data: data,
Capacity: capacity,
}
page.InUse.Store(false) // Initially not in use
return page
}
// Reset clears the page's association with a request and marks it as free.
func (p *KVPage) Reset() {
p.InUse.Store(false)
p.RequestID = ""
// Optionally, zero out the memory for security/debugging, but can be slow.
// For performance, we usually rely on the next user to overwrite it.
// for i := range p.Data {
// p.Data[i] = 0
// }
}
// String provides a human-readable representation of the page.
func (p *KVPage) String() string {
return fmt.Sprintf("KVPage{ID: %d, Capacity: %d, InUse: %t, RequestID: %s, DataAddr: %p}",
p.ID, p.Capacity, p.InUse.Load(), p.RequestID, unsafe.Pointer(&p.Data[0]))
}
这里我们使用了 atomic.Bool 来并发安全地标记页面是否在使用中,避免了对 sync.Mutex 的频繁竞争,尤其是在读多写少的场景下。unsafe.Pointer 被用于打印内存地址,以便于调试,但在实际逻辑中应尽量避免直接使用。
2. 内存池管理:KVPagePool
KVPagePool 是内存池的核心。它将维护一个空闲页面的列表和一个已分配页面到请求 ID 的映射。
package kvcache
import (
"fmt"
"sync"
"time"
)
// KVPagePool manages a pool of KVPages.
type KVPagePool struct {
pageSize int // Fixed size of each KVPage in bytes
initialPages int // Initial number of pages to pre-allocate
maxPages int // Maximum number of pages the pool can grow to
currentPageID PageID // Counter for assigning unique page IDs
mu sync.Mutex // Mutex to protect access to freePages and allocatedPages
freePages []*KVPage // List of available pages
allocatedPages map[string][]*KVPage // Map from RequestID to the pages it owns
// Metrics
totalPages atomic.Uint64 // Total number of pages currently managed by the pool
allocatedCount atomic.Uint64 // Number of pages currently allocated to requests
freeCount atomic.Uint64 // Number of pages currently free
}
// NewKVPagePool creates and initializes a new KVPagePool.
func NewKVPagePool(pageSize, initialPages, maxPages int) (*KVPagePool, error) {
if pageSize <= 0 || initialPages <= 0 || maxPages < initialPages {
return nil, fmt.Errorf("invalid pool parameters: pageSize=%d, initialPages=%d, maxPages=%d", pageSize, initialPages, maxPages)
}
pool := &KVPagePool{
pageSize: pageSize,
initialPages: initialPages,
maxPages: maxPages,
currentPageID: 0,
freePages: make([]*KVPage, 0, initialPages),
allocatedPages: make(map[string][]*KVPage),
}
// Pre-allocate initial pages
for i := 0; i < initialPages; i++ {
pool.currentPageID++
page := NewKVPage(pool.currentPageID, pageSize)
pool.freePages = append(pool.freePages, page)
pool.totalPages.Add(1)
pool.freeCount.Add(1)
}
fmt.Printf("KVPagePool initialized: PageSize=%d, InitialPages=%d, MaxPages=%d, TotalMemory=%.2fMBn",
pageSize, initialPages, maxPages, float64(pageSize*initialPages)/(1024*1024))
return pool, nil
}
// AllocatePages attempts to allocate 'numPages' for a given request.
// It returns the allocated pages or an error if not enough memory is available.
func (p *KVPagePool) AllocatePages(requestID string, numPages int) ([]*KVPage, error) {
if numPages <= 0 {
return nil, fmt.Errorf("number of pages to allocate must be positive")
}
p.mu.Lock()
defer p.mu.Unlock()
// Check if enough free pages are available
if len(p.freePages) < numPages {
// Try to expand the pool if maxPages limit is not reached
pagesToGrow := numPages - len(p.freePages)
if p.totalPages.Load()+uint64(pagesToGrow) > uint64(p.maxPages) {
// Cannot allocate enough pages and cannot grow further
return nil, fmt.Errorf("KV-Cache memory exhausted for request %s: requested %d pages, available %d, pool max %d",
requestID, numPages, len(p.freePages), p.maxPages)
}
// Grow the pool
fmt.Printf("KVPagePool growing: current pages %d, adding %d new pages.n", p.totalPages.Load(), pagesToGrow)
for i := 0; i < pagesToGrow; i++ {
p.currentPageID++
page := NewKVPage(p.currentPageID, p.pageSize)
p.freePages = append(p.freePages, page)
p.totalPages.Add(1)
p.freeCount.Add(1)
}
}
// Allocate pages from the free list
allocated := make([]*KVPage, numPages)
for i := 0; i < numPages; i++ {
page := p.freePages[0] // Take from the front of the free list (FIFO-like)
p.freePages = p.freePages[1:]
page.InUse.Store(true)
page.RequestID = requestID
page.Timestamp = time.Now().UnixNano() // Record allocation time
allocated[i] = page
p.allocatedCount.Add(1)
p.freeCount.Add(^-uint64(0)) // Decrement safely
}
p.allocatedPages[requestID] = allocated
fmt.Printf("Allocated %d pages for request %s. Total allocated: %d, Total free: %dn",
numPages, requestID, p.allocatedCount.Load(), p.freeCount.Load())
return allocated, nil
}
// FreePages releases all pages associated with a given request ID.
func (p *KVPagePool) FreePages(requestID string) error {
p.mu.Lock()
defer p.mu.Unlock()
pages, exists := p.allocatedPages[requestID]
if !exists {
return fmt.Errorf("no pages found for request ID %s", requestID)
}
for _, page := range pages {
page.Reset() // Mark as free and clear request association
p.freePages = append(p.freePages, page) // Return to free list
p.allocatedCount.Add(^-uint64(0)) // Decrement safely
p.freeCount.Add(1)
}
delete(p.allocatedPages, requestID)
fmt.Printf("Freed %d pages for request %s. Total allocated: %d, Total free: %dn",
len(pages), requestID, p.allocatedCount.Load(), p.freeCount.Load())
return nil
}
// GetMetrics returns current pool metrics.
func (p *KVPagePool) GetMetrics() (totalPages, allocated, free uint64) {
return p.totalPages.Load(), p.allocatedCount.Load(), p.freeCount.Load()
}
// CheckAvailablePages returns the number of currently free pages.
func (p *KVPagePool) CheckAvailablePages() uint64 {
p.mu.Lock()
defer p.mu.Unlock()
return uint64(len(p.freePages))
}
设计考量:
sync.Mutex:KVPagePool的核心操作(AllocatePages和FreePages)通过sync.Mutex来保证并发安全。在高并发场景下,这可能成为瓶颈,但对于管理页面列表的临界区,这是最直接且安全的做法。更高级的优化可能涉及无锁数据结构,但这会增加复杂性。- 页面增长策略: 当空闲页面不足时,
AllocatePages会尝试根据maxPages限制扩容。实际生产中,扩容逻辑可能更复杂,例如批量扩容、热插拔内存块等。 - FIFO 页面分配:
freePages列表以 FIFO(先进先出)的方式分配页面。这有助于平均分配页面的使用寿命,但不会优化局部性。对于 GPU 内存,局部性可能不那么关键,因为数据会整体传输。 atomic.Uint64: 用于维护统计指标,可以在不加锁的情况下安全读取,提高监控效率。- 内存零化:
KVPage.Reset()中注释掉的内存零化操作,在安全性要求高(避免数据泄露)的场景下可能需要,但会带来性能开销。对于 LLM KV-Cache,通常下一个请求会直接覆盖这块内存,所以可以省略。
3. 请求与KV-Cache的关联:RequestState
在 LLM Serving 架构中,每个活跃的推理请求都需要维护其状态,包括已分配的 KV-Cache 页面。
package kvcache
import (
"fmt"
"sync/atomic"
"time"
)
// RequestState holds the state of an individual LLM inference request.
type RequestState struct {
ID string // Unique ID for this request
Prompt string // User's input prompt
MaxTokens int // Max tokens to generate
AllocatedPages []*KVPage // Pages allocated for this request's KV-Cache
CurrentLength int // Current sequence length (number of tokens generated + prompt tokens)
Status atomic.Pointer[string] // Current status of the request (e.g., "PENDING", "PROCESSING", "COMPLETED")
StartTime time.Time // Request start time
EndTime time.Time // Request end time
ModelName string // Model being used
}
// NewRequestState creates a new RequestState.
func NewRequestState(id, prompt, modelName string, maxTokens int) *RequestState {
status := "PENDING"
rs := &RequestState{
ID: id,
Prompt: prompt,
MaxTokens: maxTokens,
ModelName: modelName,
StartTime: time.Now(),
}
rs.Status.Store(&status)
return rs
}
// GetRequiredPages calculates the number of pages needed for a given sequence length.
// This is a simplified calculation. In reality, it depends on model architecture,
// head_dim, num_heads, num_layers, and page size.
// For demonstration, let's assume one page can hold KV for N tokens.
func (rs *RequestState) GetRequiredPages(seqLen int, tokensPerPage int) int {
if tokensPerPage <= 0 {
tokensPerPage = 1 // Avoid division by zero
}
// Round up to the nearest page
return (seqLen + tokensPerPage - 1) / tokensPerPage
}
// UpdateLength updates the current sequence length and potentially requests more pages.
// This function would typically be called by the inference engine during decode phase.
func (rs *RequestState) UpdateLength(newLength int, tokensPerPage int, pool *KVPagePool) error {
if newLength <= rs.CurrentLength {
return nil // Length did not increase or decreased (should not happen)
}
// Calculate pages needed for the new length
requiredPages := rs.GetRequiredPages(newLength, tokensPerPage)
// If more pages are needed, try to allocate them
if requiredPages > len(rs.AllocatedPages) {
pagesToAllocate := requiredPages - len(rs.AllocatedPages)
fmt.Printf("Request %s needs more pages: current %d, required %d. Allocating %d new pages.n",
rs.ID, len(rs.AllocatedPages), requiredPages, pagesToAllocate)
newlyAllocated, err := pool.AllocatePages(rs.ID, pagesToAllocate)
if err != nil {
status := fmt.Sprintf("FAILED: %s", err.Error())
rs.Status.Store(&status)
return fmt.Errorf("failed to allocate additional pages for request %s: %w", rs.ID, err)
}
rs.AllocatedPages = append(rs.AllocatedPages, newlyAllocated...)
}
rs.CurrentLength = newLength
return nil
}
// MarkCompleted marks the request as completed and frees its pages.
func (rs *RequestState) MarkCompleted(pool *KVPagePool) error {
status := "COMPLETED"
rs.Status.Store(&status)
rs.EndTime = time.Now()
if len(rs.AllocatedPages) > 0 {
err := pool.FreePages(rs.ID)
if err != nil {
fmt.Printf("Error freeing pages for request %s: %vn", rs.ID, err)
return err
}
rs.AllocatedPages = nil // Clear reference
}
return nil
}
RequestState 存储了请求的 KV-Cache 页面列表。当请求的序列长度增加时,UpdateLength 方法会计算所需的页面数量,并向 KVPagePool 申请额外的页面。当请求完成时,MarkCompleted 方法会释放所有页面。
4. 模拟请求流程
现在我们可以将这些组件组合起来,模拟一个高并发的请求处理流程。
package kvcache
import (
"fmt"
"math/rand"
"strconv"
"sync"
"time"
)
// Simulate an LLM serving controller that manages requests and the KV-Cache pool.
type LLMServingController struct {
pool *KVPagePool
activeRequests sync.Map // Map[string]*RequestState
tokensPerPage int // Number of tokens whose KV-Cache can fit into one page
mu sync.Mutex // Protects access to some controller-level state
}
func NewLLMServingController(pool *KVPagePool, tokensPerPage int) *LLMServingController {
return &LLMServingController{
pool: pool,
tokensPerPage: tokensPerPage,
activeRequests: sync.Map{},
}
}
// HandleNewRequest simulates receiving a new LLM inference request.
func (c *LLMServingController) HandleNewRequest(prompt string, maxTokens int) (string, error) {
requestID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 10) + "-" + strconv.Itoa(rand.Intn(1000))
requestState := NewRequestState(requestID, prompt, "llama-7b", maxTokens)
initialSeqLen := len(prompt) / 5 // Very rough estimate of tokens from prompt chars
requiredPages := requestState.GetRequiredPages(initialSeqLen, c.tokensPerPage)
// Try to allocate initial pages for prefill
allocatedPages, err := c.pool.AllocatePages(requestID, requiredPages)
if err != nil {
fmt.Printf("Failed to allocate initial pages for request %s: %vn", requestID, err)
return "", err
}
requestState.AllocatedPages = allocatedPages
requestState.CurrentLength = initialSeqLen
c.activeRequests.Store(requestID, requestState)
status := "PROCESSING"
requestState.Status.Store(&status)
fmt.Printf("Request %s started. Initial pages: %d. Prompt: '%s'n", requestID, len(allocatedPages), prompt)
// Start a goroutine to simulate inference and token generation
go c.simulateInference(requestState)
return requestID, nil
}
// simulateInference simulates the LLM inference process for a single request.
func (c *LLMServingController) simulateInference(rs *RequestState) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic in inference goroutine for request %s: %vn", rs.ID, r)
status := fmt.Sprintf("CRASHED: %v", r)
rs.Status.Store(&status)
c.pool.FreePages(rs.ID) // Attempt to free pages even on crash
c.activeRequests.Delete(rs.ID)
}
}()
currentLength := rs.CurrentLength
for i := 0; i < rs.MaxTokens; i++ {
// Simulate token generation and KV-Cache update
time.Sleep(time.Duration(rand.Intn(50)+10) * time.Millisecond) // Simulate decode time
currentLength++
err := rs.UpdateLength(currentLength, c.tokensPerPage, c.pool)
if err != nil {
fmt.Printf("Request %s failed during decode: %vn", rs.ID, err)
break
}
// fmt.Printf("Request %s generated token %d, current length %d, pages: %dn", rs.ID, i+1, currentLength, len(rs.AllocatedPages))
if currentLength >= 2048 { // Simulate reaching max context length for model
fmt.Printf("Request %s reached max context length.n", rs.ID)
break
}
}
// Request completed or failed
err := rs.MarkCompleted(c.pool)
if err != nil {
fmt.Printf("Error marking request %s completed: %vn", rs.ID, err)
}
c.activeRequests.Delete(rs.ID)
fmt.Printf("Request %s completed. Total tokens: %d. Duration: %vn", rs.ID, rs.CurrentLength - (len(rs.Prompt)/5), time.Since(rs.StartTime))
}
// GetActiveRequestCount returns the number of active requests.
func (c *LLMServingController) GetActiveRequestCount() int {
count := 0
c.activeRequests.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
// RunSimulation demonstrates the KV-Cache pool in action.
func RunSimulation() {
// Define KV-Cache page size (e.g., 64KB) and pool limits
// This page size should be chosen carefully based on model architecture and GPU memory alignment.
pageSize := 64 * 1024 // 64 KB per page
initialPages := 1024 // 1024 * 64KB = 64MB initial pool
maxPages := 16384 // 16384 * 64KB = 1GB max pool size
// Estimate how many tokens' KV-Cache can fit in one page
// Example: 32 layers, 32 heads, 128 head_dim, float16 (2 bytes)
// Mem per token = 2 * 32 * 32 * 128 * 2 = 524288 bytes = 0.5 MB
// So, one 64KB page can hold KV-Cache for 65536 / 524288 ~= 0.125 tokens.
// This means a page is very small relative to a single token's KV-Cache.
// Let's adjust for a more realistic scenario where a page holds multiple tokens' KV.
// For example, if a page is 64KB, and one token's KV for a layer is 2 * num_heads * head_dim * sizeof(float16).
// Let's simplify: assume one page can hold KV for 16 tokens for all layers.
tokensPerPage := 16 // Simplified: 16 tokens' KV-Cache per 64KB page
pool, err := NewKVPagePool(pageSize, initialPages, maxPages)
if err != nil {
fmt.Fatalf("Failed to create KVPagePool: %vn", err)
}
controller := NewLLMServingController(pool, tokensPerPage)
// Simulate high concurrency
numConcurrentRequests := 5000 // Simulate 5000 concurrent active requests
var wg sync.WaitGroup
for i := 0; i < numConcurrentRequests; i++ {
wg.Add(1)
go func(reqNum int) {
defer wg.Done()
prompt := fmt.Sprintf("Tell me a story about a brave knight named Sir %d who fought a dragon.", reqNum)
maxTokens := rand.Intn(500) + 50 // Generate between 50 and 550 tokens
_, err := controller.HandleNewRequest(prompt, maxTokens)
if err != nil {
// Handle request failure (e.g., queue it, return error to client)
// fmt.Printf("Request %d failed to start: %vn", reqNum, err)
}
}(i)
// Introduce some delay to simulate requests arriving over time
if i%100 == 0 {
time.Sleep(10 * time.Millisecond)
}
}
// Monitor pool status periodically
done := make(chan struct{})
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
totalPages, allocated, free := pool.GetMetrics()
activeReqs := controller.GetActiveRequestCount()
fmt.Printf("[Monitor] Active Requests: %d, KV-Cache Pool: Total Pages=%d, Allocated Pages=%d (%.2fMB), Free Pages=%d (%.2fMB)n",
activeReqs, totalPages, allocated, float64(allocated*uint64(pageSize))/(1024*1024), free, float64(free*uint64(pageSize))/(1024*1024))
case <-done:
return
}
}
}()
wg.Wait() // Wait for all simulated requests to complete
close(done)
fmt.Println("All simulated requests completed.")
totalPages, allocated, free := pool.GetMetrics()
fmt.Printf("[Final Monitor] Total Pages=%d, Allocated Pages=%d, Free Pages=%dn", totalPages, allocated, free)
}
运行 RunSimulation() 的 main 函数:
package main
import "kvcache" // Assuming kvcache package is in a 'kvcache' directory
func main() {
kvcache.RunSimulation()
}
这个模拟程序展示了 KV-Cache 内存池如何在高并发下工作:
LLMServingController接收新请求,并为它们分配初始 KV-Cache 页面。- 每个请求在独立的 Goroutine 中模拟推理过程,随着序列长度的增长,可能会向内存池请求更多页面。
- 请求完成后,其占用的页面被释放回内存池。
sync.Map用于并发安全地存储活跃请求的状态。- 监控 Goroutine 实时报告内存池和请求状态。
调度器与KV-Cache内存池的协同
在实际的高性能 LLM Serving 架构中,一个智能的请求调度器是不可或缺的。它与 KV-Cache 内存池紧密协同,共同决定哪些请求可以被处理,以及如何有效地批处理它们。
1. 请求调度与Batching
- 动态批处理(Dynamic Batching): 将多个等待中的请求合并成一个批次进行推理。这样可以提高 GPU 利用率,因为大批量计算通常比小批量更有效率。调度器在创建批次时,需要考虑批次中所有请求的当前 KV-Cache 长度,并预估所需的页面数量。
- 连续批处理(Continuous Batching): 允许在批处理推理过程中,动态添加新请求或移除已完成请求,进一步优化 GPU 吞吐量和延迟。
- 内存感知调度: 调度器在决定是否将新请求加入批次或启动新推理任务时,应查询 KV-Cache 内存池的可用页面数量。如果内存不足,请求可能需要等待,或者被拒绝。
2. 内存预留与弹性扩缩容
- 预留策略: 调度器可以根据预测的请求峰值,向内存池预留一部分页面,以确保关键请求或高优先级请求能够及时获得内存。
- 弹性扩缩容: 内存池的
AllocatePages方法中包含了简单的扩容逻辑。在更复杂的场景下,扩容可以由独立的监控服务触发,根据内存水位线、历史使用模式和预测负载进行。缩容则可以在长时间低负载时进行,将空闲页面归还给操作系统,释放资源。
3. 如何处理内存不足
当 KV-Cache 内存池耗尽时,调度器有几种策略:
- 等待: 将新请求放入等待队列,直到有足够的内存页面被释放。
- 拒绝: 直接拒绝新请求,返回错误给客户端。这通常是最后的手段,但在极端负载下是必要的。
- 驱逐(Eviction): 如果内存紧张,调度器可以根据 LRU (Least Recently Used)、LFU (Least Frequently Used) 或 FIFO (First In, First Out) 等策略,驱逐(Swap Out)不活跃或低优先级的请求的 KV-Cache 页面到主机内存(如果它们当前在 GPU 上),或直接将其从池中移除(如果请求可以重新计算)。
进阶优化与挑战
1. 内存碎片化
- 内部碎片: 如果 KV-Cache Page 的大小固定,但一个请求所需的 KV-Cache 实际大小不足一个页面整数倍时,会产生内部碎片。选择合适的页面大小是关键,需要在减少内部碎片和管理复杂性之间取得平衡。
- 外部碎片: 我们的页面管理策略已经大大缓解了外部碎片问题,因为我们分配的是固定大小的页面,而不是任意大小的连续块。
2. Go GC的考量
Go 的垃圾回收器在处理大量 Go 对象(如 *KVPage 指针)时,可能会引入短暂停顿。我们的设计通过以下方式最小化 GC 影响:
- 预分配:
KVPagePool在启动时预分配了所有KVPage对象及其底层[]byte切片。 - 复用:
KVPage对象本身被复用,而不是频繁创建和销毁。freePages列表存储的是*KVPage指针,这些指针指向的内存是长期存在的。 - 避免大量小对象: 核心数据(KV-Cache本身)存储在大的
[]byte切片中,这些切片由 Go 运行时管理,但由于它们是大的、长生命周期的对象,GC 对其的扫描和处理开销相对较小。
3. 跨设备/多GPU内存管理
对于超大规模的 LLM 服务,单个 GPU 的内存可能不足以容纳所有 KV-Cache。这需要更复杂的跨设备内存管理策略:
- KV-Cache Paging: 类似于操作系统内存分页,将不活跃的 KV-Cache 页面从 GPU 内存交换到主机(CPU)内存,甚至硬盘,并在需要时再交换回来。这增加了延迟,但扩大了总容量。
- 分布式 KV-Cache: 在多台机器、多 GPU 上分布 KV-Cache。这引入了网络通信和数据一致性问题,需要一个分布式 KV-Cache 服务。
4. 缓存驱逐策略
当内存池达到容量上限且无法扩容时,必须驱逐一些 KV-Cache 来为新请求腾出空间。
- LRU (Least Recently Used): 驱逐最长时间未使用的 KV-Cache。可以通过
KVPage.Timestamp字段实现。 - LFU (Least Frequently Used): 驱逐使用频率最低的 KV-Cache。需要额外计数器。
- FIFO (First In, First Out): 驱逐最早进入内存池的 KV-Cache。我们的
freePages列表本质上是 FIFO。
这些策略通常由调度器结合请求优先级、剩余生成长度等因素综合决策。
5. 监控与诊断
强大的监控系统是高性能服务的生命线。我们需要:
- 内存池指标: 总页面数、已分配页面数、空闲页面数、内存使用率。
- 请求指标: 活跃请求数、等待队列长度、请求平均处理时间、KV-Cache 平均大小。
- 性能指标: KV-Cache 分配/回收的 P99 延迟、GC 暂停时间。
这些指标可以帮助我们识别瓶颈、调整内存池参数和优化调度策略。
架构概览:Go LLM Serving 的整体视图
将上述 KV-Cache 内存池集成到一个完整的 Go LLM Serving 架构中,其高层视图如下:
- API 网关/客户端 API 层: 接收用户请求 (HTTP/gRPC),进行鉴权、限流,并将请求路由到后端服务。
- 请求处理与路由: Go 服务负责解析请求,将其转换为内部任务,并根据模型、优先级、负载等因素,将任务路由到合适的推理工作节点。
- 调度器 (Scheduler): Go 核心组件,负责:
- 维护所有活跃请求的状态。
- 与 KV-Cache 管理器交互,获取内存可用性。
- 根据内存、GPU 资源和请求优先级,决定批处理哪些请求。
- 将批处理后的推理任务发送给模型推理引擎。
- KV-Cache 管理器 (KV-Cache Manager): 本文的核心,由 Go 实现。
- 管理
KVPagePool。 - 响应调度器和模型推理引擎的内存分配/回收请求。
- 维护页面到请求的映射。
- 提供内存池的监控指标。
- 管理
- 模型推理引擎 (Model Inference Engine): 通常是与 GPU 紧密结合的 C++/CUDA 后端,例如基于
llama.cpp或 NVIDIA Triton Inference Server。- 接收调度器发送的批处理任务。
- 执行 LLM 推理(预填充和解码)。
- 在推理过程中,向 KV-Cache 管理器请求新的 KV-Cache 页面(通过 CGo 或 IPC)。
- 将生成的词元返回给调度器。
- 结果聚合与流式传输: 调度器接收到生成的词元后,将其与请求关联,并可能通过流式 API (Server-Sent Events, gRPC Stream) 将结果实时返回给客户端。
这个架构中,Go 语言扮演了“指挥官”的角色,它不直接执行繁重的矩阵运算,而是高效地协调整个推理流程,尤其是在高并发下的资源管理,使得底层 GPU 资源能够得到最大化利用。
总结
今天我们详细探讨了在 Go 语言中实现高性能 LLM Serving 架构的关键挑战之一:千万级并发下的 KV-Cache 内存池管理。我们深入分析了 KV-Cache 的重要性、内存足迹,并提出了一种基于页面分配的内存池设计。通过 Go 语言的并发特性和内存管理能力,我们构建了一个能够高效预分配、复用和回收 KV-Cache 页面的系统。
这种内存池设计不仅能够显著降低内存碎片和分配开销,还能有效应对高并发带来的内存压力。结合智能的调度器和底层的 GPU 推理引擎,Go 能够构建出稳定、高效且可扩展的 LLM 服务。未来的工作将集中在更精细的内存调度策略、跨设备 KV-Cache 管理以及与 Go GC 的深度优化上,以应对不断增长的 LLM 规模和用户需求。