各位同仁,下午好!
今天,我们来探讨一个在构建高性能分布式系统时,常常被忽视却又极为致命的问题:gRPC的流控陷阱。你或许曾经历过这样的场景:明明你的服务设计精巧,测试时表现优异,但在高负载下却突然变得“卡死”,请求处理延迟飙升,甚至服务崩溃,而CPU和内存指标看起来却并非完全饱和。这并非偶然,很可能你已经一脚踏入了gRPC流控的深坑。
gRPC,作为Google开源的高性能RPC框架,以其基于HTTP/2协议、使用Protocol Buffers作为接口定义语言、支持多种语言和流式通信的特性,迅速成为微服务架构中的明星。它承诺了低延迟、高吞吐和高效的二进制传输。然而,HTTP/2和gRPC的流控机制,虽然旨在保护服务免受过载,但如果理解和使用不当,反而会成为系统性能瓶颈乃至故障的元凶。
我将从gRPC和HTTP/2流控的底层原理讲起,深入剖析常见的流控陷阱,并分享诊断、调试和规避这些问题的实战经验和最佳实践。
gRPC与HTTP/2流控的基石
要理解gRPC的流控陷阱,我们首先要回到其赖以生存的传输层:HTTP/2协议。gRPC并非凭空创造了一套新的网络传输机制,而是巧妙地利用了HTTP/2的强大功能。
HTTP/2的核心特性与流控必要性
HTTP/2相对于HTTP/1.1最大的改进之一就是多路复用(Multiplexing)。它允许在单个TCP连接上同时发送多个请求和响应,通过“流(Stream)”的概念将不同的请求-响应对逻辑上隔离。每个流都有一个唯一的ID,并且由一系列更小的单位——“帧(Frame)”组成。常见的帧类型包括HEADERS帧(携带HTTP请求/响应头)、DATA帧(携带实际的应用数据)、WINDOW_UPDATE帧(流控的关键)等。
在多路复用环境下,流控变得至关重要。想象一下,一个快速的发送方(Producer)在单个TCP连接上同时向多个慢速接收方(Consumer)发送数据。如果没有流控,发送方可能会:
- 压垮接收方: 接收方的内存缓冲区可能被快速涌入的数据填满,导致其无法及时处理,甚至触发内存溢出(OOM)。
- 独占连接资源: 某个流上的大量数据可能会阻塞其他流的数据传输,导致队头阻塞(Head-of-Line Blocking),尽管HTTP/2在逻辑上解决了TCP层面的队头阻塞,但在应用数据层面,如果发送方不加限制,仍然可能出现。
- 加剧网络拥塞: 向已经饱和的网络链路灌输更多数据,只会导致更大的延迟和丢包。
因此,HTTP/2设计了一套基于“窗口(Window)”的流量控制机制,确保发送方不会发送超出接收方处理能力的数据量。
HTTP/2的窗口流控机制
HTTP/2的流控是逐跳(hop-by-hop)的,这意味着它只在直接相连的两个对等方(例如,客户端和服务器,或服务器和代理)之间生效。它通过两种独立的窗口进行管理:
- 连接级别窗口(Connection-level Window): 适用于整个TCP连接上的所有数据。每个DATA帧都会消耗连接窗口。
- 流级别窗口(Stream-level Window): 适用于单个HTTP/2流上的数据。每个DATA帧也会消耗其所在流的窗口。
发送方在发送DATA帧时,必须确保其数据量不会超过连接窗口和当前流窗口中较小的一个。
工作原理:
- 初始窗口大小(Initial Window Size): 当一个新的HTTP/2连接或流建立时,双方会协商一个初始的窗口大小。默认情况下,HTTP/2协议规定
SETTINGS_INITIAL_WINDOW_SIZE的默认值为65,535字节(64KB)。这意味着在接收方发送任何WINDOW_UPDATE帧之前,发送方最多只能发送64KB的数据。 - 消耗窗口: 每当发送方发送一个DATA帧,其携带的数据量就会从对应的连接窗口和流窗口中扣除。
- 通告窗口(Advertised Window): 接收方处理完一部分数据后,会通过发送
WINDOW_UPDATE帧来增加其窗口大小。这个帧告诉发送方:“我已经处理了X字节的数据,现在你可以再发送X字节给我了。” - 窗口为零: 如果一个窗口(无论是连接级别还是流级别)减小到零,发送方就必须停止在该窗口对应的范围(连接或特定流)内发送DATA帧,直到接收方发送
WINDOW_UPDATE帧来增加窗口。
用Go语言的伪代码理解窗口:
假设我们在一个简化版的gRPC Send 函数中:
// 这是一个高度简化的gRPC发送逻辑示意,实际实现复杂得多
func (s *grpcStream) Send(msg []byte) error {
dataLen := len(msg)
// 等待连接窗口和流窗口都允许发送
for {
// 尝试获取发送许可,可能是阻塞的
if s.connectionWindow.TryAcquire(dataLen) && s.streamWindow.TryAcquire(dataLen) {
break // 获得许可,可以发送
}
// 否则,等待 WINDOW_UPDATE 帧到来,或者超时
// 实际的gRPC库会在这里挂起goroutine,直到窗口更新
time.Sleep(10 * time.Millisecond) // 模拟等待
}
// 将消息编码为DATA帧并发送
// ... 发送 DATA 帧 ...
return nil
}
// 接收方处理数据并发送 WINDOW_UPDATE 帧的示意
func (r *grpcStream) OnReceiveData(data []byte) {
// 处理收到的数据
r.applicationBuffer.Write(data)
// 当处理了一定阈值的数据后,增加窗口
if r.applicationBuffer.BytesProcessedSinceLastUpdate() >= r.windowUpdateThreshold {
incrementAmount := r.applicationBuffer.ClearProcessedBytes()
r.streamWindow.Increment(incrementAmount)
r.connectionWindow.Increment(incrementAmount)
// 发送 WINDOW_UPDATE 帧给发送方
// ... 发送 WINDOW_UPDATE 帧 ...
}
}
gRPC作为HTTP/2的“上层应用”,自然地继承了这套流控机制。当你在gRPC中调用 stream.Send() 或 stream.Recv() 时,底层就是在与HTTP/2的DATA帧和WINDOW_UPDATE帧打交道。Send() 操作可能会因为发送窗口为零而阻塞,而 Recv() 操作则在内部处理DATA帧并根据应用程序处理数据的速度来发送 WINDOW_UPDATE 帧。
流控陷阱:高性能服务“卡死”的根源
现在,我们有了流控的基础知识,就可以深入探讨那些让高性能服务在高负载下“卡死”的流控陷阱了。这些陷阱往往是由于对流控机制的误解、配置不当或应用程序设计缺陷与流控机制的冲突造成的。
陷阱一:慢速消费者导致发送方资源耗尽
这是最经典也是最常见的流控问题。
场景描述:
假设你有一个gRPC服务器流式RPC (Server-Side Streaming),例如:
service ItemService {
rpc GetLargeItemList(ItemListRequest) returns (stream Item);
}
服务器端从数据库或其他数据源获取大量数据,并尝试通过 stream.Send() 持续发送给客户端。如果客户端处理这些 Item 消息的速度非常慢,或者因为网络延迟导致 WINDOW_UPDATE 帧回传缓慢,会发生什么?
发生机制:
- 服务器不断调用
stream.Send(item)。 - 客户端接收到
Item消息后,将其放入内部缓冲区等待应用层处理。 - 随着客户端接收的数据越来越多,其内部用于接收的流窗口和连接窗口会逐渐减小。
- 如果客户端应用层处理数据速度跟不上,客户端发送
WINDOW_UPDATE帧的速度也会减慢。 - 最终,客户端的流窗口和连接窗口会达到零。
- 此时,服务器端的
stream.Send(item)调用将开始阻塞,因为它没有可用的发送窗口。
陷阱表现:
服务器看起来“卡死”了。尽管它可能已经从数据库中读取了所有数据,但却无法将其发送出去。如果服务器端在 stream.Send() 阻塞期间,仍然持有关键资源(如数据库连接、内存缓冲区、正在运行的goroutine或线程),这些资源就会被长时间占用而无法释放。在高并发下,这会导致:
- 数据库连接池耗尽: 所有连接都在等待流控解除,新的请求无法获取数据库连接。
- 内存溢出(OOM): 如果服务器端在发送前将所有数据一次性加载到内存中(例如,一个
[]Item切片),而stream.Send()阻塞,那么这部分内存会长期占用,在高并发下可能迅速耗尽服务器内存。 - CPU使用率低但服务无响应: CPU可能没有完全饱和,因为大部分时间都花在了等待网络I/O(即等待发送窗口打开)上,但服务却无法处理新请求。
- 请求超时: 客户端请求长时间得不到响应,最终超时。
Go语言代码示例(服务器端陷阱):
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "your_project/proto" // 假设你的proto文件定义了 ItemService 和 Item
)
// ItemServiceServer 实现
type itemServiceServer struct {
pb.UnimplementedItemServiceServer
}
func (s *itemServiceServer) GetLargeItemList(req *pb.ItemListRequest, stream pb.ItemService_GetLargeItemListServer) error {
log.Printf("Received GetLargeItemList request for batch size: %d", req.BatchSize)
// 模拟从数据库加载大量数据,假设每次加载1000条
dataBatchSize := int(req.BatchSize)
if dataBatchSize == 0 {
dataBatchSize = 1000 // 默认发送1000条
}
// 这是一个陷阱:如果直接生成大量数据并尝试发送,慢客户端会阻塞发送
// 并且在发送阻塞期间,这些数据可能一直占用内存
for i := 0; i < dataBatchSize; i++ {
item := &pb.Item{
Id: fmt.Sprintf("item-%d", i),
Name: fmt.Sprintf("Item Name %d", i),
Description: fmt.Sprintf("Description for item %d, this is a somewhat long string to simulate real data payload.", i),
Price: float32(i) * 1.5,
}
select {
case <-stream.Context().Done():
log.Printf("Client disconnected during streaming: %v", stream.Context().Err())
return stream.Context().Err()
default:
// !!! 核心陷阱点 !!!
// stream.Send() 会阻塞,直到客户端发送 WINDOW_UPDATE 帧
// 如果客户端处理慢,这里会长时间阻塞,导致当前goroutine占用资源
// 但因为没有显式的应用层反压,上游的生产数据过程(这里是循环生成)不会受影响
// 如果数据是从数据库一次性读入内存,这里阻塞意味着内存被长期占用
log.Printf("Server attempting to send item %s...", item.Id)
if err := stream.Send(item); err != nil {
log.Printf("Failed to send item %s: %v", item.Id, err)
return status.Errorf(codes.Internal, "failed to send item: %v", err)
}
log.Printf("Server sent item %s.", item.Id)
// 为了更好地模拟慢客户端效应,服务器端也可以稍微放慢发送速度
// time.Sleep(10 * time.Millisecond)
}
}
log.Println("Finished sending all items.")
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterItemServiceServer(s, &itemServiceServer{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端示例(模拟慢速消费者):
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "your_project/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewItemServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := &pb.ItemListRequest{BatchSize: 500} // 请求500条数据
stream, err := c.GetLargeItemList(ctx, req)
if err != nil {
log.Fatalf("could not get item list: %v", err)
}
log.Println("Client started receiving items.")
receivedCount := 0
for {
item, err := stream.Recv()
if err == io.EOF {
log.Println("Server finished streaming.")
break
}
if err != nil {
log.Fatalf("failed to receive an item: %v", err)
}
receivedCount++
log.Printf("Client received: Item ID: %s, Name: %s", item.Id, item.Name)
// !!! 模拟慢速处理 !!!
// 每接收100条数据,就停顿1秒,或者每条数据都停顿
if receivedCount%10 == 0 {
log.Printf("Client processing slow down for 500ms after %d items...", receivedCount)
time.Sleep(500 * time.Millisecond) // 模拟处理复杂或外部I/O
}
// 更慢的模拟:每条都停顿
// time.Sleep(50 * time.Millisecond)
}
log.Printf("Client finished, received %d items.", receivedCount)
}
运行这两个程序,你会看到客户端接收数据时会有明显停顿,而服务器端在发送数据时,其日志输出也会明显变慢,表明 stream.Send() 操作正在阻塞。如果并发量足够大,数据库连接池或服务器内存很快就会出问题。
陷阱二:不合理的初始窗口大小配置
HTTP/2的默认 SETTINGS_INITIAL_WINDOW_SIZE 是64KB。对于大多数简单的请求-响应模式,这可能足够。然而,对于高吞吐量的流式RPC,尤其是当网络延迟较高时,64KB的初始窗口可能过于保守。
发生机制:
发送方只能发送64KB的数据,然后必须等待接收方发送 WINDOW_UPDATE 帧。如果网络延迟高,WINDOW_UPDATE 帧到达发送方的时间就会增加。这意味着发送方会频繁地停止发送并等待窗口更新,导致:
- 管道空闲: 网络带宽没有得到充分利用,因为发送方在等待。
- 吞吐量下降: 频繁的阻塞和恢复降低了整体数据传输效率。
- 额外开销: 频繁发送
WINDOW_UPDATE帧也增加了协议开销。
陷阱表现:
服务在高负载下虽然不至于完全“卡死”,但吞吐量远低于预期,延迟波动大,网络利用率不高。
解决方案: 增加 SETTINGS_INITIAL_WINDOW_SIZE。这可以通过gRPC选项进行配置。例如,在Go语言中:
// 服务器端
s := grpc.NewServer(
grpc.InitialWindowSize(1 * 1024 * 1024), // 1MB 流级别窗口
grpc.InitialConnWindowSize(4 * 1024 * 1024), // 4MB 连接级别窗口
)
// 客户端
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithInitialWindowSize(1 * 1024 * 1024),
grpc.WithInitialConnWindowSize(4 * 1024 * 1024),
)
将窗口大小提升到1MB或4MB,甚至更大(如16MB),是常见的优化手段。但是,过大的窗口也会增加接收方的内存消耗,需要根据实际情况权衡。
陷阱三:双向流中的死锁与循环等待
双向流(Bidirectional Streaming)是最灵活但也最容易引入复杂流控问题的模式。当客户端和服务器都在发送和接收数据时,如果双方都陷入等待对方发送 WINDOW_UPDATE 帧或处理数据的僵局,就可能发生死锁。
场景描述:
假设有一个双向流RPC:
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
客户端发送消息A,期望服务器返回消息B。服务器收到A后,开始处理并准备发送B。同时,客户端也在准备发送下一条消息C。
发生机制:
- 客户端发送消息A。
- 服务器接收并处理消息A。
- 服务器准备发送消息B,但此时客户端的接收窗口(用于接收B)可能已满或很小。服务器的
stream.Send(B)阻塞。 - 与此同时,客户端在发送消息C之前,可能需要先接收并处理消息B,以腾出接收缓冲区并发送
WINDOW_UPDATE帧。 - 如果服务器的
stream.Send(B)阻塞,客户端就无法接收到B,也就无法处理B并发送WINDOW_UPDATE帧。 - 这就形成了一个循环等待:服务器等待客户端的窗口更新,客户端等待服务器发送数据。双方都无法向前推进,导致死锁。
陷阱表现:
整个双向流通信会突然停止,客户端和服务器都表现为长时间阻塞在 Send() 或 Recv() 调用上,服务看起来完全“卡死”。这是最难以诊断的问题之一,因为CPU和内存可能看起来正常,但系统没有任何进展。
Go语言代码示例(双向流死锁陷阱):
// 这是一个模拟,实际死锁发生需要更复杂的条件和并发交互,
// 尤其是在应用程序层面的处理逻辑与gRPC流控交互不当。
// 这里的代码旨在示意可能的阻塞点。
// 假设客户端发送消息后,必须等待服务器返回响应,然后才能发送下一条。
// 如果服务器处理慢,或者其发送阻塞,客户端就会卡住。
func (c *chatClient) Chat() {
stream, err := c.client.Chat(context.Background())
if err != nil {
log.Fatalf("failed to open chat stream: %v", err)
}
defer stream.CloseSend()
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Failed to receive a chat message: %v", err)
return
}
log.Printf("Client received: %s", in.Content)
// 模拟客户端处理接收到的消息需要时间,这会影响 WINDOW_UPDATE 的发送
// time.Sleep(100 * time.Millisecond)
}
}()
for i := 0; i < 10; i++ {
msg := &pb.ChatMessage{Content: fmt.Sprintf("Hello from client %d", i)}
log.Printf("Client sending: %s", msg.Content)
// !!! 核心陷阱点 !!!
// 如果服务器端发送消息阻塞,这里也会阻塞。
// 如果客户端的 Recv 协程处理消息慢,导致服务器的发送窗口关闭,
// 那么服务器的 Send 就会阻塞,进而导致客户端的 Send 也阻塞。
if err := stream.Send(msg); err != nil {
log.Fatalf("failed to send chat message: %v", err)
}
time.Sleep(500 * time.Millisecond) // 客户端也模拟慢速发送
}
}
// 服务器端处理逻辑,如果其处理和发送速度不协调,也可能导致死锁
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("Server received: %s", in.Content)
// 模拟服务器处理消息需要时间
time.Sleep(200 * time.Millisecond)
resp := &pb.ChatMessage{Content: fmt.Sprintf("Echo: %s", in.Content)}
log.Printf("Server sending: %s", resp.Content)
// !!! 核心陷阱点 !!!
// 如果客户端的 Recv 协程处理慢,导致其接收窗口关闭,
// 那么这里的 stream.Send() 就会阻塞。
// 如果客户端同时也在 Send,并且它依赖于 Recv 来解锁其 Send,就会死锁。
if err := stream.Send(resp); err != nil {
log.Printf("Failed to send chat message: %v", err)
return err
}
}
}
陷阱四:应用程序层未实现反压(Backpressure)
gRPC提供的流控是传输层的,它防止的是网络缓冲区溢出。但如果你的应用程序在将数据交给gRPC传输层之前,就因为生产速度过快而耗尽了内存,那么传输层的流控就无能为力了。
场景描述:
一个服务器端流式RPC,数据源是一个高产的生成器(例如,一个CPU密集型计算任务,或者一个从Kafka消费消息的goroutine)。它以极快的速度将数据生产出来,并放入一个无界(unbounded)或过大(oversized)的内存队列中,然后另一个goroutine从队列中取出数据并调用 stream.Send()。
发生机制:
- 生产数据的goroutine不受限制地生成数据。
- 这些数据被推入一个内存队列。
- 如果消费者(调用
stream.Send()的goroutine)因为gRPC流控而阻塞,导致数据发送变慢。 - 生产者仍然以高速向队列中添加数据,导致队列无限增长。
陷阱表现:
服务器内存迅速增长,直到触发OOM,服务崩溃。在崩溃前,垃圾回收(GC)可能会变得非常频繁,导致CPU使用率飙升,但有效工作量下降,服务响应缓慢。
Go语言代码示例(内存溢出陷阱):
// 生产者goroutine,注意这里没有反压机制
func produceUnboundedData(ctx context.Context, dataStream chan *pb.Item) {
defer close(dataStream)
i := 0
for {
select {
case <-ctx.Done():
log.Println("Producer context cancelled.")
return
default:
item := &pb.Item{
Id: fmt.Sprintf("generated-item-%d", i),
Name: fmt.Sprintf("Generated Item %d", i),
Description: fmt.Sprintf("This is a very long description for item %d, simulating large data payloads that consume memory quickly. We need to fill up memory as fast as possible to demonstrate the unbounded buffering problem.", i),
Price: float32(i) * 0.1,
}
// !!! 核心陷阱点 !!!
// 这是一个无缓冲的 channel,或者一个容量非常大的 channel
// 但更常见的陷阱是,生产者在将数据放入 channel 之前,
// 先将大量数据缓存在一个 slice 或 map 中。
// 如果 channel 仍然是阻塞的,生产者会在这里阻塞,从而提供反压。
// 但如果 channel 容量过大,或者数据根本不是通过 channel 传递而是直接放入一个共享的、无界增长的 slice,
// 那么 OOM 风险就很大。
// 为了模拟无界缓冲,我们假设 channel 在实际场景中被替换为某种无界队列或直接Append到一个slice。
// 这里为了简化,我们暂时用一个大的有界channel来模拟,
// 但请注意,真正的OOM往往发生在channel之外的某个数据结构中。
// For a true OOM, imagine `dataStream` is a `[]*pb.Item` that `append`s to indefinitely.
// Let's modify the server to simulate this.
// dataStream <- item // 如果是无缓冲或有界但容量过小,这里会提供反压
// 为了模拟陷阱,我们假设有一个后台goroutine不断生成,并试图将其放入一个容量足够大的channel
// 或者直接在一个slice中累积。
// 这里我们将直接在主goroutine中累积,并模拟无界的内存增长。
// 这是一个错误的示范,仅用于演示OOM。
// log.Printf("Producer generated item %s", item.Id)
// time.Sleep(1 * time.Millisecond) // 稍微放慢,但仍然很快
i++
if i > 1000000 { // 限制一下,避免真的OOM太快
break
}
}
}
log.Println("Producer finished generating data.")
}
// 服务器端流式RPC,演示如何因缺乏应用层反压导致OOM风险
func (s *itemServiceServer) GetLargeItemListWithOOMRisk(req *pb.ItemListRequest, stream pb.ItemService_GetLargeItemListServer) error {
log.Printf("Received GetLargeItemListWithOOMRisk request.")
// !!! 核心陷阱点 !!!
// 这是一个模拟 OOM 的方式:一次性从“数据源”读取所有数据到内存中
// 假设数据源是无限的或非常大的,而我们没有流式读取的能力
var largeDataBuffer []*pb.Item
log.Println("Simulating loading enormous data into memory...")
for i := 0; i < 5000000; i++ { // 模拟加载500万条数据,每条数据可能几十到几百字节
item := &pb.Item{
Id: fmt.Sprintf("item-%d", i),
Name: fmt.Sprintf("Item Name %d", i),
Description: fmt.Sprintf("Description for item %d, this is a somewhat long string to simulate real data payload that takes up significant memory. We are artificially making it long to quickly hit memory limits. Repeated text makes it longer: %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d", i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, faked) {
log.Printf("Client disconnected or stream closed.")
break
}
}
}
}
func (s *itemServiceServer) GetLargeItemList(req *pb.ItemListRequest, stream pb.ItemService_GetLargeItemListServer) error {
log.Printf("Received GetLargeItemList request.")
dataChan := make(chan *pb.Item, 1000) // 这是一个有界通道,可以提供反压
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
// 生产者 goroutine
go func() {
defer cancel() // 生产者完成或出错时取消上下文
for i := 0; i < 500000; i++ { // 模拟生成50万条数据
select {
case <-ctx.Done():
log.Println("Producer received context done.")
return
default:
item := &pb.Item{
Id: fmt.Sprintf("item-%d", i),
Name: fmt.Sprintf("Item Name %d", i),
Description: fmt.Sprintf("Description for item %d, this is a somewhat long string to simulate real data payload. Repeated text to make it longer: %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d