Python的`GRPC`:如何使用`grpcio`库构建`GRPC`服务。

Python gRPC 服务构建实战:grpcio 库详解

各位朋友,大家好!今天我们来聊聊如何使用 Python 的 grpcio 库构建 gRPC 服务。gRPC 是一种高性能、开源的通用 RPC 框架,特别适合构建微服务架构。Python 的 grpcio 库是官方提供的 gRPC Python 实现,功能强大且易于使用。

1. gRPC 核心概念回顾

在深入代码之前,我们先快速回顾一下 gRPC 的几个核心概念:

  • Protocol Buffers (protobuf):gRPC 使用 protobuf 作为接口定义语言 (IDL)。我们需要使用 .proto 文件定义服务接口和消息结构。
  • Service Definition: 通过 protobuf 定义的服务接口,包含方法名、请求消息类型和响应消息类型。
  • RPC Methods: 服务接口中定义的方法,客户端可以调用这些方法来请求服务。
  • Message: 在 RPC 方法中传递的数据,由 protobuf 定义的结构化数据。
  • Server: 提供 gRPC 服务的应用程序。它实现了服务接口,并监听客户端的请求。
  • Client: 调用 gRPC 服务的应用程序。它根据 protobuf 定义生成客户端代码,并使用这些代码来发送请求。
  • Stub: 客户端的代码,它提供了调用 gRPC 服务的便捷方法。

2. 环境准备

首先,确保你已经安装了 grpcioprotobuf 库。如果没有,可以使用 pip 安装:

pip install grpcio protobuf

另外,你需要安装 grpcio-tools 用于从 .proto 文件生成 gRPC 代码:

pip install grpcio-tools

3. 定义服务接口:.proto 文件

我们从定义一个简单的服务开始,假设我们要创建一个简单的问候服务,客户端发送姓名,服务端返回问候语。 创建名为 greeter.proto 的文件,内容如下:

syntax = "proto3";

package greeter;

// 定义服务
service Greeter {
  // 定义一个 SayHello 方法
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 定义请求消息
message HelloRequest {
  string name = 1;
}

// 定义响应消息
message HelloReply {
  string message = 1;
}

这个 .proto 文件定义了以下内容:

  • package greeter:指定包名,用于避免命名冲突。
  • service Greeter:定义了一个名为 Greeter 的服务。
  • rpc SayHello (HelloRequest) returns (HelloReply):定义了一个名为 SayHello 的 RPC 方法,它接受 HelloRequest 类型的请求消息,并返回 HelloReply 类型的响应消息。
  • message HelloRequest:定义了一个名为 HelloRequest 的消息类型,包含一个名为 name 的字符串字段。
  • message HelloReply:定义了一个名为 HelloReply 的消息类型,包含一个名为 message 的字符串字段。

4. 生成 gRPC 代码

接下来,我们需要使用 grpcio-tools.proto 文件生成 Python 代码。在命令行中执行以下命令:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. greeter.proto

这个命令会生成两个 Python 文件:

  • greeter_pb2.py:包含 protobuf 定义的消息类。
  • greeter_pb2_grpc.py:包含 gRPC 服务接口和客户端 stub。

5. 构建 gRPC 服务端

现在,我们可以开始编写 gRPC 服务端代码。创建一个名为 server.py 的文件,内容如下:

import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        message = f"Hello, {request.name}!"
        return greeter_pb2.HelloReply(message=message)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

代码解释:

  • GreeterServicer 类:继承自 greeter_pb2_grpc.GreeterServicer,并实现了 SayHello 方法。 这个类负责处理客户端的请求。
  • SayHello 方法:接收 HelloRequest 类型的请求消息,并返回 HelloReply 类型的响应消息。 它从请求消息中获取姓名,并构建一个包含问候语的响应消息。
  • serve 函数:
    • 创建一个 gRPC 服务器,并使用线程池来处理请求。
    • GreeterServicer 实例添加到服务器。
    • 监听 50051 端口。
    • 启动服务器。
    • 等待服务器终止。

6. 构建 gRPC 客户端

接下来,我们编写 gRPC 客户端代码。创建一个名为 client.py 的文件,内容如下:

import grpc
import greeter_pb2
import greeter_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = greeter_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(greeter_pb2.HelloRequest(name='World'))
    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    run()

代码解释:

  • grpc.insecure_channel('localhost:50051'): 创建一个到 localhost:50051 的 insecure channel (不加密)。生产环境应使用安全 channel。
  • greeter_pb2_grpc.GreeterStub(channel): 基于 channel 创建一个 Greeter 服务的 stub (客户端代理)。
  • stub.SayHello(greeter_pb2.HelloRequest(name='World')): 使用 stub 调用 SayHello 方法,并传入一个 HelloRequest 消息,其中 name 字段设置为 "World"。
  • response.message: 获取响应消息中的 message 字段,并打印到控制台。

7. 运行服务

首先,启动 gRPC 服务端:

python server.py

然后,在另一个终端中启动 gRPC 客户端:

python client.py

如果一切顺利,你将在客户端终端中看到以下输出:

Greeter client received: Hello, World!

8. 使用 Metadata (元数据)

gRPC 允许在请求和响应中传递元数据。元数据是键值对,可以用于传递认证信息、跟踪 ID 等。

服务端修改:

server.py 中,我们可以添加处理元数据的代码:

import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        # 获取元数据
        metadata = context.invocation_metadata()
        print(f"Received metadata: {metadata}")

        message = f"Hello, {request.name}!"
        # 添加元数据到响应
        context.set_trailing_metadata((('custom-header', 'custom-value'),))
        return greeter_pb2.HelloReply(message=message)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

这里,context.invocation_metadata() 获取客户端发送的元数据,context.set_trailing_metadata() 添加元数据到响应。

客户端修改:

client.py 中,我们可以添加发送元数据和接收元数据的代码:

import grpc
import greeter_pb2
import greeter_pb2_grpc

def run():
    metadata = (('authorization', 'Bearer my-token'),)  # 构造metadata
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = greeter_pb2_grpc.GreeterStub(channel)
        response, call = stub.SayHello.with_call(greeter_pb2.HelloRequest(name='World'), metadata=metadata) # 传入metadata
    print("Greeter client received: " + response.message)
    print(f"Received trailing metadata: {call.trailing_metadata()}") # 获取响应header

if __name__ == '__main__':
    run()

这里,我们在创建 stub 的时候,将元数据作为参数传递给 SayHello 方法。 stub.SayHello.with_call 返回response和 call 对象,我们可以从call对象里读取元数据。

9. 拦截器 (Interceptors)

拦截器允许我们在 gRPC 请求和响应的处理过程中添加自定义逻辑。 这对于实现认证、日志记录、监控等功能非常有用。

服务端拦截器:

创建一个名为 server_interceptor.py 的文件,内容如下:

import grpc
import logging

class LoggingInterceptor(grpc.ServerInterceptor):
    def intercept(self, method, request_or_iterator, context, method_name):
        logging.info(f"Received request for method: {method_name}")
        return method(request_or_iterator, context)

def get_server_interceptor():
  return LoggingInterceptor()

客户端拦截器:

创建一个名为 client_interceptor.py 的文件,内容如下:

import grpc
import logging

class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
  def intercept(self, method, request_or_iterator, client_call_details):
    logging.info(f"Sending request to method: {client_call_details.method}")
    return method(request_or_iterator, client_call_details)

def get_client_interceptor():
  return LoggingInterceptor()

修改服务端和客户端:

server.py:

import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc
import server_interceptor  # 导入拦截器

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        message = f"Hello, {request.name}!"
        return greeter_pb2.HelloReply(message=message)

def serve():
    interceptors = [server_interceptor.get_server_interceptor()] # 创建拦截器列表
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=interceptors, # 添加拦截器
    )
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO) # 开启日志
    serve()

client.py:

import grpc
import greeter_pb2
import greeter_pb2_grpc
import client_interceptor # 导入拦截器

def run():
    interceptors = [client_interceptor.get_client_interceptor()] # 创建拦截器列表
    with grpc.insecure_channel('localhost:50051') as channel:
        intercepted_channel = grpc.intercept_channel(channel, *interceptors) # 添加拦截器
        stub = greeter_pb2_grpc.GreeterStub(intercepted_channel)
        response = stub.SayHello(greeter_pb2.HelloRequest(name='World'))
    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO) # 开启日志
    run()

现在,当你运行客户端和服务端时,你将在控制台中看到日志信息,显示请求的接收和发送情况。

10. 流式 gRPC

gRPC 支持三种流式模式:

  • Unary RPC: 客户端发送一个请求,服务端返回一个响应 (我们上面已经演示了)。
  • Server Streaming RPC: 客户端发送一个请求,服务端返回一个流式响应。
  • Client Streaming RPC: 客户端发送一个流式请求,服务端返回一个响应。
  • Bidirectional Streaming RPC: 客户端发送一个流式请求,服务端返回一个流式响应。

我们这里演示 Server Streaming RPC。

.proto 文件修改:

greeter.proto 文件中添加一个 Server Streaming 方法:

syntax = "proto3";

package greeter;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc LotsOfReplies (HelloRequest) returns (stream HelloReply) {} // 添加 Server Streaming 方法
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

重新生成 gRPC 代码:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. greeter.proto

服务端修改:

server.py 中实现 LotsOfReplies 方法:

import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        message = f"Hello, {request.name}!"
        return greeter_pb2.HelloReply(message=message)

    def LotsOfReplies(self, request, context):
        for i in range(5):
            message = f"Hello, {request.name}! Reply number {i+1}"
            yield greeter_pb2.HelloReply(message=message) # 使用 yield 返回多个响应

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

客户端修改:

client.py 中调用 LotsOfReplies 方法:

import grpc
import greeter_pb2
import greeter_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = greeter_pb2_grpc.GreeterStub(channel)
        responses = stub.LotsOfReplies(greeter_pb2.HelloRequest(name='World')) # 调用 Server Streaming 方法
        for response in responses: # 遍历响应
            print("Greeter client received: " + response.message)

if __name__ == '__main__':
    run()

现在,当你运行客户端和服务端时,客户端将收到来自服务端的 5 个响应。

11. 错误处理

gRPC 使用 gRPC 状态码来表示错误。客户端和服务端都可以使用这些状态码来处理错误。

服务端错误处理:

server.py 中,我们可以使用 context.abort() 方法来返回一个带有状态码的错误:

import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        if not request.name:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Name cannot be empty") # 返回错误
        message = f"Hello, {request.name}!"
        return greeter_pb2.HelloReply(message=message)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

客户端错误处理:

client.py 中,我们可以使用 try-except 块来捕获 gRPC 错误:

import grpc
import greeter_pb2
import greeter_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = greeter_pb2_grpc.GreeterStub(channel)
        try:
            response = stub.SayHello(greeter_pb2.HelloRequest(name='')) # 发送一个空 name 的请求
            print("Greeter client received: " + response.message)
        except grpc.RpcError as e:
            print(f"Greeter client received error: {e.code()} - {e.details()}") # 捕获错误

if __name__ == '__main__':
    run()

如果客户端发送一个空 name 的请求,服务端将返回一个 INVALID_ARGUMENT 错误,客户端将捕获这个错误并打印错误信息。

12. gRPC 与异步编程

Python 的 grpcio 库也支持异步编程,这可以提高 gRPC 服务的性能。 使用 asyncio 模块可以构建异步 gRPC 服务和客户端。

异步服务端:

import asyncio
import grpc
from concurrent import futures
import greeter_pb2
import greeter_pb2_grpc

class GreeterServicer(greeter_pb2_grpc.GreeterServicer):
    async def SayHello(self, request, context):
        message = f"Hello, {request.name}!"
        return greeter_pb2.HelloReply(message=message)

async def serve():
    server = grpc.aio.server()
    greeter_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()

if __name__ == '__main__':
    asyncio.run(serve())

异步客户端:

import asyncio
import grpc
import greeter_pb2
import greeter_pb2_grpc

async def run():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = greeter_pb2_grpc.GreeterStub(channel)
        response = await stub.SayHello(greeter_pb2.HelloRequest(name='World'))
    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    asyncio.run(run())

注意,在异步 gRPC 中,需要使用 grpc.aio 模块,并且使用 asyncawait 关键字来定义异步方法。

总结

今天我们详细介绍了如何使用 Python 的 grpcio 库构建 gRPC 服务。从定义 .proto 文件,到生成 gRPC 代码,再到构建服务端和客户端,我们一步步地完成了整个过程。同时,我们也学习了如何使用元数据、拦截器、流式 gRPC 和错误处理。希望这些知识能帮助你更好地使用 gRPC 构建高性能的微服务。

gRPC 技术的掌握,助力微服务架构的开发

gRPC 的使用使得开发者能更高效地构建微服务,利用 protobuf 定义接口,使用 grpcio 库实现服务和客户端,可以轻松实现服务间的通信。
掌握 gRPC 的相关技术,可以为微服务架构的开发提供强有力的支持。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注