解析 Go 实现的高性能 LLM Serving 架构:如何管理千万级并发的 KV-Cache 内存池?

各位同仁,下午好!

今天,我们将深入探讨一个当下最热门、最具挑战性的技术领域——如何使用 Go 语言构建一个高性能的 LLM Serving 架构,并重点聚焦于如何高效管理千万级并发请求下的 KV-Cache 内存池。大型语言模型(LLM)的兴起,带来了前所未有的计算和内存挑战,而 Go 语言以其卓越的并发特性和简洁性,正成为解决这些挑战的有力工具。

引言:LLM 服务的高并发挑战

大型语言模型(LLM)在生成文本时,其核心计算是自回归的:模型根据当前已生成的序列预测下一个词元(token)。为了提高推理效率,尤其是当序列长度逐渐增加时,我们不能每次都重新计算整个序列中所有词元的 Key (K) 和 Value (V) 矩阵。这就是 KV-Cache 存在的意义。

KV-Cache 存储了模型在推理过程中,所有已处理词元在 Transformer 解码器层中的 K 和 V 矩阵。这样,在生成下一个词元时,模型只需计算新词元的 K 和 V,然后将其与之前缓存的 K 和 V 拼接起来,送入注意力机制。这显著减少了重复计算,是实现高效 LLM 推理的关键。

然而,KV-Cache 也带来了巨大的内存挑战:

  1. 内存占用巨大: 每次推理都需要为每个 Transformer 层存储 K 和 V 矩阵。对于一个拥有数百亿参数的模型,即使是单个请求,其 KV-Cache 也可能占用数百 MB 甚至数 GB 的内存。
  2. 动态增长: 随着生成序列的延长,KV-Cache 的大小会线性增长。
  3. 并发请求: 当面临千万级并发请求时(这里“千万级并发”指的是系统在一段时间内需要同时处理或维护状态的请求数量,即便不是所有请求在同一微秒内进行计算,它们也都需要占用 KV-Cache 资源),如果为每个请求独立分配 KV-Cache,内存将迅速耗尽,并可能导致严重的内存碎片。

因此,一个高效的 KV-Cache 内存管理系统,是构建高性能 LLM Serving 架构的基石。

Go 在高性能 LLM Serving 中的定位与优势

Go 语言可能不是你首先想到的用于深度学习推理的语言,通常人们会选择 Python 结合 CUDA/C++。然而,对于 LLM Serving 的控制平面数据平面协调部分,Go 展现出独特的优势:

  1. 卓越的并发模型: Goroutine 和 Channel 是 Go 的杀手锏。它们使得编写高并发、非阻塞的代码变得异常简单和高效。在 LLM Serving 中,这对于处理大量并发请求、调度推理任务、管理 KV-Cache 内存池以及与底层 GPU 驱动进行通信至关重要。
  2. 高性能: Go 编译器生成的是原生机器码,运行时性能接近 C/C++。虽然 Go 的垃圾回收(GC)在某些极端低延迟场景下可能引入微小停顿,但通过精心设计内存结构和复用机制,可以有效规避或最小化其影响。
  3. 内存效率: Go 提供了对内存布局的良好控制,例如通过切片 ([]byte) 可以直接操作连续内存区域。这对于实现内存池至关重要。
  4. 开发效率与可维护性: Go 语法简洁,工具链完善,使得开发、部署和维护复杂的分布式系统变得更加容易。
  5. 与C/C++及GPU的协作: Go 可以通过 CGo 轻松地与现有的 C/C++ 库(如 CUDA、cuBLAS、cuDNN 或 llama.cpp 等)进行交互,将计算密集型任务卸载到 GPU 上,同时利用 Go 来管理上层业务逻辑、请求调度和资源分配。

我们的目标是利用 Go 来构建一个智能的 KV-Cache 内存池,它能有效地调度和复用内存,以支持极高并发下的 LLM 推理。

KV-Cache 深度解析:结构、生命周期与内存足迹

在深入内存池设计之前,我们先回顾 KV-Cache 的基本概念。

LLM 推理的两个阶段

  1. 预填充(Prefill)阶段:
    • 当用户提交一个提示词(prompt)时,模型会一次性处理这个提示词中的所有词元。
    • 在这个阶段,模型会计算所有输入词元的 K 和 V 矩阵,并将其存储为 KV-Cache。
    • 这是计算密集型阶段,通常涉及大矩阵乘法。
  2. 解码(Decode)阶段:
    • 模型根据当前的 KV-Cache 预测下一个词元。
    • 每预测一个词元,模型只计算这个新词元的 K 和 V 矩阵。
    • 然后将这个新词元的 K 和 V 矩阵追加到现有的 KV-Cache 中。
    • 这个过程重复进行,直到生成结束词元或达到最大长度。

KV-Cache 的数据结构

KV-Cache 的本质是一个或多个张量(Tensors),用于存储每个 Transformer 层中 Key 和 Value 的历史值。

假设一个模型有 num_layers 层,每层有 num_heads 个注意力头,每个头的维度是 head_dim。那么,对于一个长度为 seq_len 的序列,KV-Cache 的内存占用大致如下:

  • Key 矩阵: num_layers * num_heads * seq_len * head_dim * sizeof(dtype)
  • Value 矩阵: num_layers * num_heads * seq_len * head_dim * sizeof(dtype)

通常,dtypefloat16bfloat16,因为它在精度和内存占用之间取得了良好平衡。

内存足迹估算示例:

  • 模型层数 num_layers = 32
  • 注意力头数 num_heads = 32
  • 头维度 head_dim = 128
  • 数据类型 dtype = float16 (2字节)

对于一个 seq_len = 1024 的 KV-Cache:

内存占用 = 2 * num_layers * num_heads * seq_len * head_dim * sizeof(float16)
= 2 * 32 * 32 * 1024 * 128 * 2 字节
= 536,870,912 字节
= 512 MB

这仅仅是一个请求在序列长度为 1024 时的 KV-Cache 内存占用。当有数百万个这样的请求需要同时维护 KV-Cache 时,总内存需求将是天文数字。因此,有效的内存管理,尤其是通过内存池进行管理,变得刻不容缓。

千万级并发下 KV-Cache 内存池设计的核心原则

面对千万级并发,为每个请求动态分配 KV-Cache 会带来以下问题:

  1. 内存碎片化: 不同长度的 KV-Cache 频繁分配和释放会导致内存碎片,降低内存利用率,甚至导致分配失败。
  2. 分配开销: 操作系统级别的内存分配(malloc/free)开销较大,在高并发下会成为性能瓶颈。
  3. GC 压力: Go 自身的 GC 对大量短生命周期的对象分配和回收会造成压力。
  4. 内存利用率低: 如果KV-Cache是连续分配的,当请求完成时,这块内存可能无法被其他请求完全利用。

因此,我们需要一个专门的 KV-Cache 内存池。其核心设计原则包括:

  1. 预分配大块内存: 在服务启动时或运行时根据需要,预先从操作系统申请一大块连续的内存区域,避免频繁的系统调用。
  2. 页面(Page)管理: 将预分配的大块内存划分为固定大小的“页面”(Pages)。一个 KV-Cache 请求不再直接占用连续的内存块,而是分配一系列页面。这类似于操作系统的虚拟内存管理,可以有效解决外部碎片问题。
  3. 高效的页面分配与回收: 内存池需要维护一个空闲页面列表,并能以 O(1) 或 O(logN) 的时间复杂度进行页面的分配和回收。
  4. 并发安全: 内存池的分配和回收操作必须是并发安全的,以支持多 Goroutine 同时访问。
  5. 内存复用: 当请求完成时,其占用的页面应立即被回收并重新添加到空闲列表中,供其他请求复用。
  6. 弹性扩缩容: 内存池应能根据负载动态调整其大小,例如在内存不足时尝试扩容,在长时间低负载时考虑缩容。
  7. 与调度器协同: 内存池应与请求调度器紧密配合,调度器在决定是否接受新请求或批处理请求时,需要考虑 KV-Cache 的内存可用性。

页面(Page)管理策略

页面管理是 KV-Cache 内存池的核心。我们不再以整个 KV-Cache 作为分配单元,而是以固定大小的页面作为分配单元。

KV-Cache 逻辑结构与物理存储映射:

逻辑结构 物理存储方式 优势
KV-Cache Page (K) 固定大小的 []byte 切片,存储 Key 矩阵的某一部分 便于管理、减少碎片、支持非连续分配
KV-Cache Page (V) 固定大小的 []byte 切片,存储 Value 矩阵的某一部分 同上
Request KV-Cache 由一系列 KV-Cache Page 组成,存储特定请求的 K 和 V 矩阵 灵活适应不同长度序列,无需连续大块内存
KV-Cache Memory Pool 维护所有空闲和已分配的 KV-Cache Page 集中管理、高效复用、降低系统内存分配开销

一个 KV-Cache Page 的大小通常设计为能容纳一个 Transformer 层的一个或几个注意力头的 KV 矩阵的一部分,或者是一个与 GPU 内存对齐的块大小(例如 4KB、16KB、64KB)。

Go 实现高性能 KV-Cache 内存池的架构与细节

现在,让我们通过 Go 语言来具体实现这个 KV-Cache 内存池。

1. 基础内存单元:KVPage

我们定义一个 KVPage 结构体来代表内存池中的一个页面。每个页面都包含一个 []byte 切片,作为其底层内存。

package kvcache

import (
    "fmt"
    "sync/atomic"
    "unsafe"
)

// PageID is a unique identifier for a KVPage
type PageID uint64

// KVPage represents a fixed-size memory block for KV-Cache.
type KVPage struct {
    ID        PageID    // Unique identifier for this page
    Data      []byte    // The actual memory buffer for this page
    Capacity  int       // Total capacity of the Data slice
    InUse     atomic.Bool // Flag indicating if the page is currently allocated
    RequestID string    // ID of the request currently using this page (for debugging/tracking)
    Timestamp int64     // Timestamp of last allocation/use (for LRU/LFU eviction, if needed)
}

// NewKVPage creates a new KVPage with a given ID and capacity.
// It pre-allocates the underlying byte slice.
func NewKVPage(id PageID, capacity int) *KVPage {
    data := make([]byte, capacity) // Pre-allocate the memory
    page := &KVPage{
        ID:       id,
        Data:     data,
        Capacity: capacity,
    }
    page.InUse.Store(false) // Initially not in use
    return page
}

// Reset clears the page's association with a request and marks it as free.
func (p *KVPage) Reset() {
    p.InUse.Store(false)
    p.RequestID = ""
    // Optionally, zero out the memory for security/debugging, but can be slow.
    // For performance, we usually rely on the next user to overwrite it.
    // for i := range p.Data {
    //  p.Data[i] = 0
    // }
}

// String provides a human-readable representation of the page.
func (p *KVPage) String() string {
    return fmt.Sprintf("KVPage{ID: %d, Capacity: %d, InUse: %t, RequestID: %s, DataAddr: %p}",
        p.ID, p.Capacity, p.InUse.Load(), p.RequestID, unsafe.Pointer(&p.Data[0]))
}

这里我们使用了 atomic.Bool 来并发安全地标记页面是否在使用中,避免了对 sync.Mutex 的频繁竞争,尤其是在读多写少的场景下。unsafe.Pointer 被用于打印内存地址,以便于调试,但在实际逻辑中应尽量避免直接使用。

2. 内存池管理:KVPagePool

KVPagePool 是内存池的核心。它将维护一个空闲页面的列表和一个已分配页面到请求 ID 的映射。

package kvcache

import (
    "fmt"
    "sync"
    "time"
)

// KVPagePool manages a pool of KVPages.
type KVPagePool struct {
    pageSize      int // Fixed size of each KVPage in bytes
    initialPages  int // Initial number of pages to pre-allocate
    maxPages      int // Maximum number of pages the pool can grow to
    currentPageID PageID // Counter for assigning unique page IDs

    mu         sync.Mutex           // Mutex to protect access to freePages and allocatedPages
    freePages  []*KVPage            // List of available pages
    allocatedPages map[string][]*KVPage // Map from RequestID to the pages it owns

    // Metrics
    totalPages    atomic.Uint64 // Total number of pages currently managed by the pool
    allocatedCount atomic.Uint64 // Number of pages currently allocated to requests
    freeCount     atomic.Uint64 // Number of pages currently free
}

// NewKVPagePool creates and initializes a new KVPagePool.
func NewKVPagePool(pageSize, initialPages, maxPages int) (*KVPagePool, error) {
    if pageSize <= 0 || initialPages <= 0 || maxPages < initialPages {
        return nil, fmt.Errorf("invalid pool parameters: pageSize=%d, initialPages=%d, maxPages=%d", pageSize, initialPages, maxPages)
    }

    pool := &KVPagePool{
        pageSize:      pageSize,
        initialPages:  initialPages,
        maxPages:      maxPages,
        currentPageID: 0,
        freePages:     make([]*KVPage, 0, initialPages),
        allocatedPages: make(map[string][]*KVPage),
    }

    // Pre-allocate initial pages
    for i := 0; i < initialPages; i++ {
        pool.currentPageID++
        page := NewKVPage(pool.currentPageID, pageSize)
        pool.freePages = append(pool.freePages, page)
        pool.totalPages.Add(1)
        pool.freeCount.Add(1)
    }

    fmt.Printf("KVPagePool initialized: PageSize=%d, InitialPages=%d, MaxPages=%d, TotalMemory=%.2fMBn",
        pageSize, initialPages, maxPages, float64(pageSize*initialPages)/(1024*1024))

    return pool, nil
}

// AllocatePages attempts to allocate 'numPages' for a given request.
// It returns the allocated pages or an error if not enough memory is available.
func (p *KVPagePool) AllocatePages(requestID string, numPages int) ([]*KVPage, error) {
    if numPages <= 0 {
        return nil, fmt.Errorf("number of pages to allocate must be positive")
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    // Check if enough free pages are available
    if len(p.freePages) < numPages {
        // Try to expand the pool if maxPages limit is not reached
        pagesToGrow := numPages - len(p.freePages)
        if p.totalPages.Load()+uint64(pagesToGrow) > uint64(p.maxPages) {
            // Cannot allocate enough pages and cannot grow further
            return nil, fmt.Errorf("KV-Cache memory exhausted for request %s: requested %d pages, available %d, pool max %d",
                requestID, numPages, len(p.freePages), p.maxPages)
        }

        // Grow the pool
        fmt.Printf("KVPagePool growing: current pages %d, adding %d new pages.n", p.totalPages.Load(), pagesToGrow)
        for i := 0; i < pagesToGrow; i++ {
            p.currentPageID++
            page := NewKVPage(p.currentPageID, p.pageSize)
            p.freePages = append(p.freePages, page)
            p.totalPages.Add(1)
            p.freeCount.Add(1)
        }
    }

    // Allocate pages from the free list
    allocated := make([]*KVPage, numPages)
    for i := 0; i < numPages; i++ {
        page := p.freePages[0] // Take from the front of the free list (FIFO-like)
        p.freePages = p.freePages[1:]

        page.InUse.Store(true)
        page.RequestID = requestID
        page.Timestamp = time.Now().UnixNano() // Record allocation time
        allocated[i] = page

        p.allocatedCount.Add(1)
        p.freeCount.Add(^-uint64(0)) // Decrement safely
    }

    p.allocatedPages[requestID] = allocated
    fmt.Printf("Allocated %d pages for request %s. Total allocated: %d, Total free: %dn",
        numPages, requestID, p.allocatedCount.Load(), p.freeCount.Load())

    return allocated, nil
}

// FreePages releases all pages associated with a given request ID.
func (p *KVPagePool) FreePages(requestID string) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    pages, exists := p.allocatedPages[requestID]
    if !exists {
        return fmt.Errorf("no pages found for request ID %s", requestID)
    }

    for _, page := range pages {
        page.Reset() // Mark as free and clear request association
        p.freePages = append(p.freePages, page) // Return to free list
        p.allocatedCount.Add(^-uint64(0)) // Decrement safely
        p.freeCount.Add(1)
    }
    delete(p.allocatedPages, requestID)

    fmt.Printf("Freed %d pages for request %s. Total allocated: %d, Total free: %dn",
        len(pages), requestID, p.allocatedCount.Load(), p.freeCount.Load())

    return nil
}

// GetMetrics returns current pool metrics.
func (p *KVPagePool) GetMetrics() (totalPages, allocated, free uint64) {
    return p.totalPages.Load(), p.allocatedCount.Load(), p.freeCount.Load()
}

// CheckAvailablePages returns the number of currently free pages.
func (p *KVPagePool) CheckAvailablePages() uint64 {
    p.mu.Lock()
    defer p.mu.Unlock()
    return uint64(len(p.freePages))
}

设计考量:

  • sync.Mutex KVPagePool 的核心操作(AllocatePagesFreePages)通过 sync.Mutex 来保证并发安全。在高并发场景下,这可能成为瓶颈,但对于管理页面列表的临界区,这是最直接且安全的做法。更高级的优化可能涉及无锁数据结构,但这会增加复杂性。
  • 页面增长策略: 当空闲页面不足时,AllocatePages 会尝试根据 maxPages 限制扩容。实际生产中,扩容逻辑可能更复杂,例如批量扩容、热插拔内存块等。
  • FIFO 页面分配: freePages 列表以 FIFO(先进先出)的方式分配页面。这有助于平均分配页面的使用寿命,但不会优化局部性。对于 GPU 内存,局部性可能不那么关键,因为数据会整体传输。
  • atomic.Uint64 用于维护统计指标,可以在不加锁的情况下安全读取,提高监控效率。
  • 内存零化: KVPage.Reset() 中注释掉的内存零化操作,在安全性要求高(避免数据泄露)的场景下可能需要,但会带来性能开销。对于 LLM KV-Cache,通常下一个请求会直接覆盖这块内存,所以可以省略。

3. 请求与KV-Cache的关联:RequestState

在 LLM Serving 架构中,每个活跃的推理请求都需要维护其状态,包括已分配的 KV-Cache 页面。

package kvcache

import (
    "fmt"
    "sync/atomic"
    "time"
)

// RequestState holds the state of an individual LLM inference request.
type RequestState struct {
    ID           string           // Unique ID for this request
    Prompt       string           // User's input prompt
    MaxTokens    int              // Max tokens to generate
    AllocatedPages []*KVPage        // Pages allocated for this request's KV-Cache
    CurrentLength  int              // Current sequence length (number of tokens generated + prompt tokens)
    Status       atomic.Pointer[string] // Current status of the request (e.g., "PENDING", "PROCESSING", "COMPLETED")
    StartTime    time.Time        // Request start time
    EndTime      time.Time        // Request end time
    ModelName    string           // Model being used
}

// NewRequestState creates a new RequestState.
func NewRequestState(id, prompt, modelName string, maxTokens int) *RequestState {
    status := "PENDING"
    rs := &RequestState{
        ID:        id,
        Prompt:    prompt,
        MaxTokens: maxTokens,
        ModelName: modelName,
        StartTime: time.Now(),
    }
    rs.Status.Store(&status)
    return rs
}

// GetRequiredPages calculates the number of pages needed for a given sequence length.
// This is a simplified calculation. In reality, it depends on model architecture,
// head_dim, num_heads, num_layers, and page size.
// For demonstration, let's assume one page can hold KV for N tokens.
func (rs *RequestState) GetRequiredPages(seqLen int, tokensPerPage int) int {
    if tokensPerPage <= 0 {
        tokensPerPage = 1 // Avoid division by zero
    }
    // Round up to the nearest page
    return (seqLen + tokensPerPage - 1) / tokensPerPage
}

// UpdateLength updates the current sequence length and potentially requests more pages.
// This function would typically be called by the inference engine during decode phase.
func (rs *RequestState) UpdateLength(newLength int, tokensPerPage int, pool *KVPagePool) error {
    if newLength <= rs.CurrentLength {
        return nil // Length did not increase or decreased (should not happen)
    }

    // Calculate pages needed for the new length
    requiredPages := rs.GetRequiredPages(newLength, tokensPerPage)

    // If more pages are needed, try to allocate them
    if requiredPages > len(rs.AllocatedPages) {
        pagesToAllocate := requiredPages - len(rs.AllocatedPages)
        fmt.Printf("Request %s needs more pages: current %d, required %d. Allocating %d new pages.n",
            rs.ID, len(rs.AllocatedPages), requiredPages, pagesToAllocate)

        newlyAllocated, err := pool.AllocatePages(rs.ID, pagesToAllocate)
        if err != nil {
            status := fmt.Sprintf("FAILED: %s", err.Error())
            rs.Status.Store(&status)
            return fmt.Errorf("failed to allocate additional pages for request %s: %w", rs.ID, err)
        }
        rs.AllocatedPages = append(rs.AllocatedPages, newlyAllocated...)
    }

    rs.CurrentLength = newLength
    return nil
}

// MarkCompleted marks the request as completed and frees its pages.
func (rs *RequestState) MarkCompleted(pool *KVPagePool) error {
    status := "COMPLETED"
    rs.Status.Store(&status)
    rs.EndTime = time.Now()

    if len(rs.AllocatedPages) > 0 {
        err := pool.FreePages(rs.ID)
        if err != nil {
            fmt.Printf("Error freeing pages for request %s: %vn", rs.ID, err)
            return err
        }
        rs.AllocatedPages = nil // Clear reference
    }
    return nil
}

RequestState 存储了请求的 KV-Cache 页面列表。当请求的序列长度增加时,UpdateLength 方法会计算所需的页面数量,并向 KVPagePool 申请额外的页面。当请求完成时,MarkCompleted 方法会释放所有页面。

4. 模拟请求流程

现在我们可以将这些组件组合起来,模拟一个高并发的请求处理流程。

package kvcache

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

// Simulate an LLM serving controller that manages requests and the KV-Cache pool.
type LLMServingController struct {
    pool        *KVPagePool
    activeRequests sync.Map // Map[string]*RequestState
    tokensPerPage int // Number of tokens whose KV-Cache can fit into one page
    mu          sync.Mutex // Protects access to some controller-level state
}

func NewLLMServingController(pool *KVPagePool, tokensPerPage int) *LLMServingController {
    return &LLMServingController{
        pool:        pool,
        tokensPerPage: tokensPerPage,
        activeRequests: sync.Map{},
    }
}

// HandleNewRequest simulates receiving a new LLM inference request.
func (c *LLMServingController) HandleNewRequest(prompt string, maxTokens int) (string, error) {
    requestID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 10) + "-" + strconv.Itoa(rand.Intn(1000))
    requestState := NewRequestState(requestID, prompt, "llama-7b", maxTokens)

    initialSeqLen := len(prompt) / 5 // Very rough estimate of tokens from prompt chars
    requiredPages := requestState.GetRequiredPages(initialSeqLen, c.tokensPerPage)

    // Try to allocate initial pages for prefill
    allocatedPages, err := c.pool.AllocatePages(requestID, requiredPages)
    if err != nil {
        fmt.Printf("Failed to allocate initial pages for request %s: %vn", requestID, err)
        return "", err
    }
    requestState.AllocatedPages = allocatedPages
    requestState.CurrentLength = initialSeqLen
    c.activeRequests.Store(requestID, requestState)

    status := "PROCESSING"
    requestState.Status.Store(&status)
    fmt.Printf("Request %s started. Initial pages: %d. Prompt: '%s'n", requestID, len(allocatedPages), prompt)

    // Start a goroutine to simulate inference and token generation
    go c.simulateInference(requestState)

    return requestID, nil
}

// simulateInference simulates the LLM inference process for a single request.
func (c *LLMServingController) simulateInference(rs *RequestState) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Panic in inference goroutine for request %s: %vn", rs.ID, r)
            status := fmt.Sprintf("CRASHED: %v", r)
            rs.Status.Store(&status)
            c.pool.FreePages(rs.ID) // Attempt to free pages even on crash
            c.activeRequests.Delete(rs.ID)
        }
    }()

    currentLength := rs.CurrentLength
    for i := 0; i < rs.MaxTokens; i++ {
        // Simulate token generation and KV-Cache update
        time.Sleep(time.Duration(rand.Intn(50)+10) * time.Millisecond) // Simulate decode time

        currentLength++
        err := rs.UpdateLength(currentLength, c.tokensPerPage, c.pool)
        if err != nil {
            fmt.Printf("Request %s failed during decode: %vn", rs.ID, err)
            break
        }

        // fmt.Printf("Request %s generated token %d, current length %d, pages: %dn", rs.ID, i+1, currentLength, len(rs.AllocatedPages))

        if currentLength >= 2048 { // Simulate reaching max context length for model
            fmt.Printf("Request %s reached max context length.n", rs.ID)
            break
        }
    }

    // Request completed or failed
    err := rs.MarkCompleted(c.pool)
    if err != nil {
        fmt.Printf("Error marking request %s completed: %vn", rs.ID, err)
    }
    c.activeRequests.Delete(rs.ID)
    fmt.Printf("Request %s completed. Total tokens: %d. Duration: %vn", rs.ID, rs.CurrentLength - (len(rs.Prompt)/5), time.Since(rs.StartTime))
}

// GetActiveRequestCount returns the number of active requests.
func (c *LLMServingController) GetActiveRequestCount() int {
    count := 0
    c.activeRequests.Range(func(key, value interface{}) bool {
        count++
        return true
    })
    return count
}

// RunSimulation demonstrates the KV-Cache pool in action.
func RunSimulation() {
    // Define KV-Cache page size (e.g., 64KB) and pool limits
    // This page size should be chosen carefully based on model architecture and GPU memory alignment.
    pageSize := 64 * 1024 // 64 KB per page
    initialPages := 1024  // 1024 * 64KB = 64MB initial pool
    maxPages := 16384     // 16384 * 64KB = 1GB max pool size

    // Estimate how many tokens' KV-Cache can fit in one page
    // Example: 32 layers, 32 heads, 128 head_dim, float16 (2 bytes)
    // Mem per token = 2 * 32 * 32 * 128 * 2 = 524288 bytes = 0.5 MB
    // So, one 64KB page can hold KV-Cache for 65536 / 524288 ~= 0.125 tokens.
    // This means a page is very small relative to a single token's KV-Cache.
    // Let's adjust for a more realistic scenario where a page holds multiple tokens' KV.
    // For example, if a page is 64KB, and one token's KV for a layer is 2 * num_heads * head_dim * sizeof(float16).
    // Let's simplify: assume one page can hold KV for 16 tokens for all layers.
    tokensPerPage := 16 // Simplified: 16 tokens' KV-Cache per 64KB page

    pool, err := NewKVPagePool(pageSize, initialPages, maxPages)
    if err != nil {
        fmt.Fatalf("Failed to create KVPagePool: %vn", err)
    }

    controller := NewLLMServingController(pool, tokensPerPage)

    // Simulate high concurrency
    numConcurrentRequests := 5000 // Simulate 5000 concurrent active requests
    var wg sync.WaitGroup

    for i := 0; i < numConcurrentRequests; i++ {
        wg.Add(1)
        go func(reqNum int) {
            defer wg.Done()
            prompt := fmt.Sprintf("Tell me a story about a brave knight named Sir %d who fought a dragon.", reqNum)
            maxTokens := rand.Intn(500) + 50 // Generate between 50 and 550 tokens

            _, err := controller.HandleNewRequest(prompt, maxTokens)
            if err != nil {
                // Handle request failure (e.g., queue it, return error to client)
                // fmt.Printf("Request %d failed to start: %vn", reqNum, err)
            }
        }(i)
        // Introduce some delay to simulate requests arriving over time
        if i%100 == 0 {
            time.Sleep(10 * time.Millisecond)
        }
    }

    // Monitor pool status periodically
    done := make(chan struct{})
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                totalPages, allocated, free := pool.GetMetrics()
                activeReqs := controller.GetActiveRequestCount()
                fmt.Printf("[Monitor] Active Requests: %d, KV-Cache Pool: Total Pages=%d, Allocated Pages=%d (%.2fMB), Free Pages=%d (%.2fMB)n",
                    activeReqs, totalPages, allocated, float64(allocated*uint64(pageSize))/(1024*1024), free, float64(free*uint64(pageSize))/(1024*1024))
            case <-done:
                return
            }
        }
    }()

    wg.Wait() // Wait for all simulated requests to complete

    close(done)
    fmt.Println("All simulated requests completed.")
    totalPages, allocated, free := pool.GetMetrics()
    fmt.Printf("[Final Monitor] Total Pages=%d, Allocated Pages=%d, Free Pages=%dn", totalPages, allocated, free)
}

运行 RunSimulation()main 函数:

package main

import "kvcache" // Assuming kvcache package is in a 'kvcache' directory

func main() {
    kvcache.RunSimulation()
}

这个模拟程序展示了 KV-Cache 内存池如何在高并发下工作:

  • LLMServingController 接收新请求,并为它们分配初始 KV-Cache 页面。
  • 每个请求在独立的 Goroutine 中模拟推理过程,随着序列长度的增长,可能会向内存池请求更多页面。
  • 请求完成后,其占用的页面被释放回内存池。
  • sync.Map 用于并发安全地存储活跃请求的状态。
  • 监控 Goroutine 实时报告内存池和请求状态。

调度器与KV-Cache内存池的协同

在实际的高性能 LLM Serving 架构中,一个智能的请求调度器是不可或缺的。它与 KV-Cache 内存池紧密协同,共同决定哪些请求可以被处理,以及如何有效地批处理它们。

1. 请求调度与Batching

  • 动态批处理(Dynamic Batching): 将多个等待中的请求合并成一个批次进行推理。这样可以提高 GPU 利用率,因为大批量计算通常比小批量更有效率。调度器在创建批次时,需要考虑批次中所有请求的当前 KV-Cache 长度,并预估所需的页面数量。
  • 连续批处理(Continuous Batching): 允许在批处理推理过程中,动态添加新请求或移除已完成请求,进一步优化 GPU 吞吐量和延迟。
  • 内存感知调度: 调度器在决定是否将新请求加入批次或启动新推理任务时,应查询 KV-Cache 内存池的可用页面数量。如果内存不足,请求可能需要等待,或者被拒绝。

2. 内存预留与弹性扩缩容

  • 预留策略: 调度器可以根据预测的请求峰值,向内存池预留一部分页面,以确保关键请求或高优先级请求能够及时获得内存。
  • 弹性扩缩容: 内存池的 AllocatePages 方法中包含了简单的扩容逻辑。在更复杂的场景下,扩容可以由独立的监控服务触发,根据内存水位线、历史使用模式和预测负载进行。缩容则可以在长时间低负载时进行,将空闲页面归还给操作系统,释放资源。

3. 如何处理内存不足

当 KV-Cache 内存池耗尽时,调度器有几种策略:

  1. 等待: 将新请求放入等待队列,直到有足够的内存页面被释放。
  2. 拒绝: 直接拒绝新请求,返回错误给客户端。这通常是最后的手段,但在极端负载下是必要的。
  3. 驱逐(Eviction): 如果内存紧张,调度器可以根据 LRU (Least Recently Used)、LFU (Least Frequently Used) 或 FIFO (First In, First Out) 等策略,驱逐(Swap Out)不活跃或低优先级的请求的 KV-Cache 页面到主机内存(如果它们当前在 GPU 上),或直接将其从池中移除(如果请求可以重新计算)。

进阶优化与挑战

1. 内存碎片化

  • 内部碎片: 如果 KV-Cache Page 的大小固定,但一个请求所需的 KV-Cache 实际大小不足一个页面整数倍时,会产生内部碎片。选择合适的页面大小是关键,需要在减少内部碎片和管理复杂性之间取得平衡。
  • 外部碎片: 我们的页面管理策略已经大大缓解了外部碎片问题,因为我们分配的是固定大小的页面,而不是任意大小的连续块。

2. Go GC的考量

Go 的垃圾回收器在处理大量 Go 对象(如 *KVPage 指针)时,可能会引入短暂停顿。我们的设计通过以下方式最小化 GC 影响:

  • 预分配: KVPagePool 在启动时预分配了所有 KVPage 对象及其底层 []byte 切片。
  • 复用: KVPage 对象本身被复用,而不是频繁创建和销毁。freePages 列表存储的是 *KVPage 指针,这些指针指向的内存是长期存在的。
  • 避免大量小对象: 核心数据(KV-Cache本身)存储在大的 []byte 切片中,这些切片由 Go 运行时管理,但由于它们是大的、长生命周期的对象,GC 对其的扫描和处理开销相对较小。

3. 跨设备/多GPU内存管理

对于超大规模的 LLM 服务,单个 GPU 的内存可能不足以容纳所有 KV-Cache。这需要更复杂的跨设备内存管理策略:

  • KV-Cache Paging: 类似于操作系统内存分页,将不活跃的 KV-Cache 页面从 GPU 内存交换到主机(CPU)内存,甚至硬盘,并在需要时再交换回来。这增加了延迟,但扩大了总容量。
  • 分布式 KV-Cache: 在多台机器、多 GPU 上分布 KV-Cache。这引入了网络通信和数据一致性问题,需要一个分布式 KV-Cache 服务。

4. 缓存驱逐策略

当内存池达到容量上限且无法扩容时,必须驱逐一些 KV-Cache 来为新请求腾出空间。

  • LRU (Least Recently Used): 驱逐最长时间未使用的 KV-Cache。可以通过 KVPage.Timestamp 字段实现。
  • LFU (Least Frequently Used): 驱逐使用频率最低的 KV-Cache。需要额外计数器。
  • FIFO (First In, First Out): 驱逐最早进入内存池的 KV-Cache。我们的 freePages 列表本质上是 FIFO。

这些策略通常由调度器结合请求优先级、剩余生成长度等因素综合决策。

5. 监控与诊断

强大的监控系统是高性能服务的生命线。我们需要:

  • 内存池指标: 总页面数、已分配页面数、空闲页面数、内存使用率。
  • 请求指标: 活跃请求数、等待队列长度、请求平均处理时间、KV-Cache 平均大小。
  • 性能指标: KV-Cache 分配/回收的 P99 延迟、GC 暂停时间。

这些指标可以帮助我们识别瓶颈、调整内存池参数和优化调度策略。

架构概览:Go LLM Serving 的整体视图

将上述 KV-Cache 内存池集成到一个完整的 Go LLM Serving 架构中,其高层视图如下:

  1. API 网关/客户端 API 层: 接收用户请求 (HTTP/gRPC),进行鉴权、限流,并将请求路由到后端服务。
  2. 请求处理与路由: Go 服务负责解析请求,将其转换为内部任务,并根据模型、优先级、负载等因素,将任务路由到合适的推理工作节点。
  3. 调度器 (Scheduler): Go 核心组件,负责:
    • 维护所有活跃请求的状态。
    • 与 KV-Cache 管理器交互,获取内存可用性。
    • 根据内存、GPU 资源和请求优先级,决定批处理哪些请求。
    • 将批处理后的推理任务发送给模型推理引擎。
  4. KV-Cache 管理器 (KV-Cache Manager): 本文的核心,由 Go 实现。
    • 管理 KVPagePool
    • 响应调度器和模型推理引擎的内存分配/回收请求。
    • 维护页面到请求的映射。
    • 提供内存池的监控指标。
  5. 模型推理引擎 (Model Inference Engine): 通常是与 GPU 紧密结合的 C++/CUDA 后端,例如基于 llama.cpp 或 NVIDIA Triton Inference Server。
    • 接收调度器发送的批处理任务。
    • 执行 LLM 推理(预填充和解码)。
    • 在推理过程中,向 KV-Cache 管理器请求新的 KV-Cache 页面(通过 CGo 或 IPC)。
    • 将生成的词元返回给调度器。
  6. 结果聚合与流式传输: 调度器接收到生成的词元后,将其与请求关联,并可能通过流式 API (Server-Sent Events, gRPC Stream) 将结果实时返回给客户端。

这个架构中,Go 语言扮演了“指挥官”的角色,它不直接执行繁重的矩阵运算,而是高效地协调整个推理流程,尤其是在高并发下的资源管理,使得底层 GPU 资源能够得到最大化利用。

总结

今天我们详细探讨了在 Go 语言中实现高性能 LLM Serving 架构的关键挑战之一:千万级并发下的 KV-Cache 内存池管理。我们深入分析了 KV-Cache 的重要性、内存足迹,并提出了一种基于页面分配的内存池设计。通过 Go 语言的并发特性和内存管理能力,我们构建了一个能够高效预分配、复用和回收 KV-Cache 页面的系统。

这种内存池设计不仅能够显著降低内存碎片和分配开销,还能有效应对高并发带来的内存压力。结合智能的调度器和底层的 GPU 推理引擎,Go 能够构建出稳定、高效且可扩展的 LLM 服务。未来的工作将集中在更精细的内存调度策略、跨设备 KV-Cache 管理以及与 Go GC 的深度优化上,以应对不断增长的 LLM 规模和用户需求。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注