RoadRunner插件架构:Go语言接口实现PHP Worker生命周期钩子扩展
大家好,今天我们来探讨RoadRunner的插件架构,特别是如何通过Go语言接口来实现PHP Worker的生命周期钩子扩展。RoadRunner作为一款高性能的PHP应用服务器、负载均衡器和进程管理器,其插件架构为开发者提供了极大的灵活性,允许我们在不修改RoadRunner核心代码的情况下,扩展其功能,定制PHP Worker的行为。
1. RoadRunner插件架构概述
RoadRunner的设计理念是模块化和可扩展性。其核心功能由一系列的插件组成,这些插件通过gRPC协议与RoadRunner核心进行通信。插件可以监听RoadRunner的各种事件,例如Worker的启动、停止、执行请求等,并在这些事件发生时执行自定义的逻辑。
RoadRunner的插件可以使用多种编程语言开发,其中Go语言是官方推荐的语言。使用Go语言开发的插件可以直接编译成二进制文件,与RoadRunner核心一起运行,性能更高。
2. PHP Worker生命周期钩子
RoadRunner为PHP Worker提供了多个生命周期钩子,允许我们在Worker的不同阶段执行自定义的代码。这些钩子包括:
- OnWorkerStart: 在Worker启动时触发,可以用于初始化Worker环境,例如加载配置文件、建立数据库连接等。
- OnWorkerStop: 在Worker停止时触发,可以用于清理Worker环境,例如关闭数据库连接、释放资源等。
- OnReceive: 在Worker接收到请求时触发,可以用于记录请求日志、验证请求参数等。
- BeforeCycle: 在每次请求循环之前触发,可以用于更新配置、检查状态等。
- AfterCycle: 在每次请求循环之后触发,可以用于记录执行时间、清理临时数据等。
- OnException: 在Worker执行过程中发生异常时触发,可以用于记录异常信息、发送报警等。
通过实现这些钩子,我们可以定制PHP Worker的行为,满足各种业务需求。
3. Go语言接口实现插件
要实现一个RoadRunner插件,我们需要创建一个Go语言的结构体,并实现plugin.Plugin接口。该接口定义了以下方法:
Init(core *roadrunner.Core) error: 初始化插件,接收RoadRunner核心的引用。Serve(): 启动插件,开始监听RoadRunner的事件。Stop() error: 停止插件,清理资源。
同时,我们需要实现service.Service接口,该接口定义了以下方法:
Name() string: 返回插件的名称。
此外,我们还需要实现与PHP Worker生命周期钩子相关的接口。这些接口定义在worker.WorkerEvent中,例如:
worker.OnWorkerStart接口: 定义了OnWorkerStart(ctx context.Context, worker *roadrunner.Worker) error方法,用于处理Worker启动事件。worker.OnWorkerStop接口: 定义了OnWorkerStop(ctx context.Context, worker *roadrunner.Worker) error方法,用于处理Worker停止事件。worker.OnReceive接口: 定义了OnReceive(ctx context.Context, worker *roadrunner.Worker, payload []byte) error方法,用于处理Worker接收请求事件。worker.BeforeCycle接口: 定义了BeforeCycle(ctx context.Context, worker *roadrunner.Worker) error方法,用于处理Worker请求循环之前事件。worker.AfterCycle接口: 定义了AfterCycle(ctx context.Context, worker *roadrunner.Worker) error方法,用于处理Worker请求循环之后事件。worker.OnException接口: 定义了OnException(ctx context.Context, worker *roadrunner.Worker, err error) error方法,用于处理Worker异常事件。
4. 代码示例:实现一个简单的日志插件
下面是一个简单的日志插件的示例,该插件会在Worker启动、停止和接收请求时记录日志:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/service"
"github.com/spiral/roadrunner/v2/plugins/worker"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/websockets"
"github.com/spiral/roadrunner/v2/plugins/grpc"
"github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/nats"
"github.com/spiral/roadrunner/v2/plugins/sqldb"
"github.com/spiral/roadrunner/v2/plugins/temporal"
"github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/resetter"
"github.com/spiral/roadrunner/v2/plugins/broker"
"github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/cycle"
"go.uber.org/zap"
)
// 定义插件结构体
type LoggerPlugin struct {
log *zap.Logger
}
// 初始化插件
func (p *LoggerPlugin) Init(cfg config.Configurer, log logger.Logger) error {
p.log = log.Named("logger_plugin")
return nil
}
// 插件名称
func (p *LoggerPlugin) Name() string {
return "logger_plugin"
}
// 服务
func (p *LoggerPlugin) Serve() chan error {
errCh := make(chan error, 1)
return errCh
}
// 停止
func (p *LoggerPlugin) Stop() error {
return nil
}
// 实现OnWorkerStart钩子
func (p *LoggerPlugin) OnWorkerStart(ctx context.Context, worker *worker.Process) error {
p.log.Info("Worker started", zap.Int("pid", worker.Pid()))
return nil
}
// 实现OnWorkerStop钩子
func (p *LoggerPlugin) OnWorkerStop(ctx context.Context, worker *worker.Process) error {
p.log.Info("Worker stopped", zap.Int("pid", worker.Pid()))
return nil
}
// 实现OnReceive钩子
func (p *LoggerPlugin) OnReceive(ctx context.Context, worker *worker.Process, payload []byte) error {
p.log.Info("Worker received request", zap.Int("pid", worker.Pid()), zap.String("payload", string(payload)))
return nil
}
// BeforeCycle
func (p *LoggerPlugin) BeforeCycle(ctx context.Context, worker *worker.Process) error {
p.log.Debug("Before cycle", zap.Int("pid", worker.Pid()))
return nil
}
// AfterCycle
func (p *LoggerPlugin) AfterCycle(ctx context.Context, worker *worker.Process) error {
p.log.Debug("After cycle", zap.Int("pid", worker.Pid()))
return nil
}
// OnException
func (p *LoggerPlugin) OnException(ctx context.Context, worker *worker.Process, err error) error {
p.log.Error("Exception occurred", zap.Int("pid", worker.Pid()), zap.Error(err))
return nil
}
// 入口函数
func main() {
// 创建RoadRunner核心
rr := roadrunner.New(
// 加载所有必要的插件
&config.Plugin{},
&logger.Plugin{},
&server.Plugin{},
&http.Plugin{},
&rpc.Plugin{},
&jobs.Plugin{},
&kv.Plugin{},
&metrics.Plugin{},
&websockets.Plugin{},
&grpc.Plugin{},
&amqp.Plugin{},
&nats.Plugin{},
&sqldb.Plugin{},
&temporal.Plugin{},
&memory.Plugin{},
&informer.Plugin{},
&resetter.Plugin{},
&broker.Plugin{},
&app.Plugin{},
&cycle.Plugin{},
&LoggerPlugin{}, // 注册自定义插件
)
// 启动RoadRunner
err := rr.Serve()
if err != nil {
fmt.Println("Error starting RoadRunner:", err)
os.Exit(1)
}
}
// 定义 RoadRunner 接口
type RoadRunner interface {
Serve() error
}
// RoadRunner 结构体
type roadrunner struct {
cfg config.Configurer
log logger.Logger
srv server.Server
http server.Server
rpc rpc.RPC
jobs jobs.Jobs
kv kv.KV
metrics metrics.Metrics
websockets websockets.Websockets
grpc grpc.GRPC
amqp amqp.AMQP
nats nats.NATS
sqldb sqldb.SQLDB
temporal temporal.Temporal
memory memory.Memory
informer informer.Informer
resetter resetter.Resetter
broker broker.Broker
app app.App
cycle cycle.Cycle
plugins []service.Service
}
// New 创建 RoadRunner 实例
func New(plugins ...service.Service) RoadRunner {
rr := &roadrunner{
plugins: plugins,
}
// 初始化插件
for _, p := range plugins {
switch v := p.(type) {
case config.Configurer:
rr.cfg = v
case logger.Logger:
rr.log = v
case server.Server:
rr.srv = v
case http.Server:
rr.http = v
case rpc.RPC:
rr.rpc = v
case jobs.Jobs:
rr.jobs = v
case kv.KV:
rr.kv = v
case metrics.Metrics:
rr.metrics = v
case websockets.Websockets:
rr.websockets = v
case grpc.GRPC:
rr.grpc = v
case amqp.AMQP:
rr.amqp = v
case nats.NATS:
rr.nats = v
case sqldb.SQLDB:
rr.sqldb = v
case temporal.Temporal:
rr.temporal = v
case memory.Memory:
rr.memory = v
case informer.Informer:
rr.informer = v
case resetter.Resetter:
rr.resetter = v
case broker.Broker:
rr.broker = v
case app.App:
rr.app = v
case cycle.Cycle:
rr.cycle = v
}
}
return rr
}
// Serve 启动 RoadRunner
func (rr *roadrunner) Serve() error {
// 初始化插件
for _, p := range rr.plugins {
switch v := p.(type) {
case service.Service:
name := v.Name()
switch name {
case "config":
err := v.(config.Configurer).Init(rr.cfg)
if err != nil {
return fmt.Errorf("config plugin init error: %w", err)
}
case "logger":
err := v.(logger.Logger).Init(rr.cfg)
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
case "server":
err := v.(server.Server).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("server plugin init error: %w", err)
}
case "http":
err := v.(http.Server).Init(rr.cfg, rr.log, rr.srv)
if err != nil {
return fmt.Errorf("http plugin init error: %w", err)
}
case "rpc":
err := v.(rpc.RPC).Init(rr.cfg, rr.log, rr.srv)
if err != nil {
return fmt.Errorf("rpc plugin init error: %w", err)
}
case "jobs":
err := v.(jobs.Jobs).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("jobs plugin init error: %w", err)
}
case "kv":
err := v.(kv.KV).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("kv plugin init error: %w", err)
}
case "metrics":
err := v.(metrics.Metrics).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("metrics plugin init error: %w", err)
}
case "websockets":
err := v.(websockets.Websockets).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("websockets plugin init error: %w", err)
}
case "grpc":
err := v.(grpc.GRPC).Init(rr.cfg, rr.log, rr.srv)
if err != nil {
return fmt.Errorf("grpc plugin init error: %w", err)
}
case "amqp":
err := v.(amqp.AMQP).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("amqp plugin init error: %w", err)
}
case "nats":
err := v.(nats.NATS).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("nats plugin init error: %w", err)
}
case "sqldb":
err := v.(sqldb.SQLDB).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("sqldb plugin init error: %w", err)
}
case "temporal":
err := v.(temporal.Temporal).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("temporal plugin init error: %w", err)
}
case "memory":
err := v.(memory.Memory).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("memory plugin init error: %w", err)
}
case "informer":
err := v.(informer.Informer).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("informer plugin init error: %w", err)
}
case "resetter":
err := v.(resetter.Resetter).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("resetter plugin init error: %w", err)
}
case "broker":
err := v.(broker.Broker).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("broker plugin init error: %w", err)
}
case "app":
err := v.(app.App).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("app plugin init error: %w", err)
}
case "cycle":
err := v.(cycle.Cycle).Init(rr.cfg, rr.log)
if err != nil {
return fmt.Errorf("cycle plugin init error: %w", err)
}
case "logger_plugin": // 注册自定义插件
err := v.(worker.OnWorkerStart).OnWorkerStart(context.Background(), &worker.Process{})
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
err = v.(worker.OnWorkerStop).OnWorkerStop(context.Background(), &worker.Process{})
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
err = v.(worker.OnReceive).OnReceive(context.Background(), &worker.Process{}, []byte{})
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
err = v.(worker.BeforeCycle).BeforeCycle(context.Background(), &worker.Process{})
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
err = v.(worker.AfterCycle).AfterCycle(context.Background(), &worker.Process{})
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
err = v.(worker.OnException).OnException(context.Background(), &worker.Process{}, nil)
if err != nil {
return fmt.Errorf("logger plugin init error: %w", err)
}
}
}
}
// 启动所有插件
errCh := make(chan error, len(rr.plugins))
for _, p := range rr.plugins {
go func(plugin service.Service) {
if plugin.Name() != "config" && plugin.Name() != "logger" { // 避免重复启动
errCh <- plugin.Serve()
}
}(p)
}
// 监听错误
for i := 0; i < len(rr.plugins); i++ {
select {
case err := <-errCh:
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}
}
}
return nil
}
5. RoadRunner配置文件
为了让RoadRunner加载我们的插件,需要在RoadRunner的配置文件中注册该插件。例如:
version: "2.0"
server:
command: "php worker.php"
relay: "pipes"
relay_timeout: "10s"
workers:
command: "php worker.php"
pool:
num_workers: 4
max_jobs: 0
allocate_timeout: "60s"
destroy_timeout: "60s"
http:
address: "0.0.0.0:8080"
max_request_size: 16384
upload_middleware:
- files
middleware: ["headers", "gzip"]
headers:
request:
X-Custom-Header: "some-value"
gzip:
enabled: true
level: 5
logger:
mode: development
level: debug
encoding: console
output: stderr
rotate:
enabled: true
size: 100
compress: true
age: 7
plugins:
logger_plugin: # 注册插件
enabled: true
6. PHP Worker代码
worker.php代码如下:
<?php
use SpiralRoadRunnerWorker;
use SpiralRoadRunnerPSRWorker as PSRWorker;
use NyholmPsr7FactoryPsr17Factory;
use LaminasHttpHandlerRunnerEmitterSapiEmitter;
require 'vendor/autoload.php';
$worker = Worker::create();
$psrFactory = new Psr17Factory();
$psrWorker = new PSRWorker($worker, $psrFactory, $psrFactory, $psrFactory);
$emitter = new SapiEmitter();
while ($req = $psrWorker->waitRequest()) {
try {
// This is your app logic
$response = new LaminasDiactorosResponse();
$response->getBody()->write('Hello, world!');
$response = $response->withHeader('Content-Type', 'text/plain');
// Pass request to the application and return result to RoadRunner
$psrWorker->respond($response);
} catch (Throwable $e) {
$psrWorker->getWorker()->error((string)$e);
}
}
7. 运行插件
- 编译Go语言插件:
go build -o logger_plugin main.go - 将编译后的
logger_plugin二进制文件复制到RoadRunner的插件目录下(通常是与RoadRunner配置文件相同的目录)。 - 启动RoadRunner:
./rr serve -c .rr.yaml
此时,当RoadRunner启动时,我们的日志插件也会被加载并运行。我们可以在日志文件中看到Worker启动、停止和接收请求的日志。
8. 插件配置
插件可以通过RoadRunner的配置文件进行配置。例如,我们可以为日志插件添加一个配置项,用于指定日志文件的路径:
plugins:
logger_plugin:
enabled: true
log_file: "/tmp/roadrunner.log"
在Go语言代码中,我们可以通过config.Configurer接口读取这些配置项:
// 定义插件结构体
type LoggerPlugin struct {
log *zap.Logger
logFile string // 新增配置项
}
// 初始化插件
func (p *LoggerPlugin) Init(cfg config.Configurer, log logger.Logger) error {
p.log = log.Named("logger_plugin")
// 读取配置项
if cfg.Has("logger_plugin.log_file") {
p.logFile = cfg.String("logger_plugin.log_file")
} else {
p.logFile = "/tmp/roadrunner.log" // 默认值
}
return nil
}
9. 插件之间的通信
RoadRunner允许插件之间进行通信,可以使用gRPC协议或者共享内存等方式。
10. 总结
通过Go语言接口实现RoadRunner插件,可以方便地扩展RoadRunner的功能,定制PHP Worker的行为。通过实现Worker的生命周期钩子,我们可以在Worker的不同阶段执行自定义的代码,例如初始化环境、清理资源、记录日志、验证参数等。RoadRunner的插件架构为开发者提供了极大的灵活性,可以满足各种业务需求。
一些关键点:
- RoadRunner的插件机制依赖于gRPC通信和Go语言接口。
- 通过实现
plugin.Plugin接口,我们可以创建一个RoadRunner插件。 - 通过实现
worker.WorkerEvent接口,我们可以监听PHP Worker的生命周期事件。 - 插件可以通过RoadRunner的配置文件进行配置。
希望今天的讲解对大家有所帮助。谢谢!
灵活的插件系统,高性能的扩展能力
RoadRunner的插件架构为开发者提供了强大的扩展能力,允许开发者根据实际需求定制PHP Worker的行为,构建更高效、更可靠的应用程序。
Go语言接口是关键,生命周期钩子是核心
理解Go语言接口的定义和PHP Worker的生命周期钩子是开发RoadRunner插件的关键。通过合理地利用这些接口和钩子,我们可以实现各种自定义的功能。