各位技术同仁,大家好!
今天,我们将深入探讨一个在云原生时代至关重要的组件:高性能云原生网关。尤其,我将带领大家一起研究如何利用 Go 语言的强大能力,实现一个支持插件化扩展的动态路由引擎。这不仅仅是一个理论探讨,更是一次实践的分享,我们将通过大量代码示例,揭示其背后的设计哲学和实现细节。
1. 云原生时代的网关:为什么如此重要?
在微服务架构盛行的今天,API 网关(API Gateway)已成为服务间通信的门面。它不再仅仅是一个简单的反向代理,而是承担着流量管理、安全认证、限流熔断、日志监控等一系列核心职责。特别是在云原生环境中,服务数量庞大且动态变化,传统静态配置的网关已无法满足需求。我们需要一个能够:
- 动态路由: 实时感知后端服务变化,自动调整路由规则。
- 高并发、低延迟: 作为所有外部流量的入口,必须具备卓越的性能。
- 可扩展性: 能够方便地集成各种业务逻辑和横切关注点。
- 弹性与韧性: 面对后端服务故障时,能提供熔断、重试等机制。
- 可观测性: 提供丰富的日志、指标和追踪数据,便于问题排查。
选择 Go 语言来实现这样的网关,优势显而易见:
- 并发模型: Go 的 Goroutines 和 Channels 使得高并发编程变得简洁高效。
- 性能卓越: 接近 C/C++ 的执行效率,同时拥有现代编程语言的开发体验。
- 内存管理: 垃圾回收机制减少了手动内存管理的负担,同时GC暂停时间短。
- 静态链接: 部署简单,生成单一可执行文件,便于容器化。
- 丰富的生态系统: 拥有大量高质量的网络库和工具。
因此,Go 语言是构建高性能云原生网关的理想选择。
2. 动态路由引擎的核心概念
动态路由引擎是网关的“大脑”,它负责解析传入请求,根据预设规则匹配到目标后端服务,并将请求转发过去。其“动态”体现在能够实时更新路由规则,无需重启网关。
一个动态路由引擎通常包含以下核心组件:
- 路由规则定义 (Route Definition): 描述如何将一个请求映射到后端服务。
- 服务发现 (Service Discovery): 实时获取后端服务的地址列表。
- 配置管理 (Configuration Management): 存储和更新路由规则及服务信息。
- 请求匹配 (Request Matching): 根据请求的各个属性(路径、主机、方法、头部等)查找匹配的路由。
- 负载均衡 (Load Balancing): 在多个后端服务实例之间分发请求。
- 插件机制 (Plugin Mechanism): 提供可扩展的拦截和处理能力。
我们将围绕这些组件,逐步构建我们的网关。
3. 网关的整体架构概览
在深入代码之前,我们先勾勒出网关的宏观架构。它通常分为数据平面 (Data Plane) 和控制平面 (Control Plane)。
- 数据平面: 负责处理所有实际的业务流量,包括请求接收、路由匹配、插件执行、负载均衡和请求转发。这是我们今天主要关注的部分,也是对性能要求最高的部分。
- 控制平面: 负责管理和更新数据平面的配置,例如路由规则、服务列表、插件配置等。它通常通过 API、Webhooks 或与服务注册中心(如 Kubernetes API Server, Etcd, Consul)交互来实现配置的动态推送。
我们的目标是构建一个高性能的数据平面,并为其预留与控制平面集成的接口。
概念架构图:
+------------------+ +--------------------------+
| 外部客户端请求 |---->| API Gateway (数据平面) |
+------------------+ +--------------------------+
|
| 1. HTTP Server (监听请求)
|
v
+----------------------+
| 请求处理器 |
| (Request Handler) |
+----------------------+
|
| 2. 请求上下文构建
|
v
+----------------------+
| 路由引擎 |
| (Routing Engine) |
| - 路由匹配 |
+----------------------+
|
| 3. 匹配路由
|
v
+----------------------+
| 插件链 |
| (Plugin Chain) |
| - 认证、限流、熔断 |
| - 日志、指标 |
+----------------------+
|
| 4. 插件处理
|
v
+----------------------+
| 负载均衡器 |
| (Load Balancer) |
+----------------------+
|
| 5. 选择后端实例
|
v
+----------------------+
| 反向代理 |
| (Reverse Proxy) |
+----------------------+
|
| 6. 请求转发
|
v
+----------------------+
| 后端服务集群 |
+----------------------+
^
|
+-------------------------------------------------+
| |
| 控制平面 (配置管理, 服务发现) |
| |
+-------------------------------------------------+
^ ^
| |
| 更新路由规则 | 监听服务变化
| |
+---------------------+ +---------------------+
| 配置源 (ConfigMap, | | 服务注册中心 (K8s API,|
| Etcd, Consul) | | Etcd, Consul) |
+---------------------+ +---------------------+
4. Go 实现核心路由引擎
4.1 路由规则与服务定义
首先,我们需要定义路由规则和服务的数据结构。
// route.go
package router
import (
"net/http"
"time"
)
// PluginConfig 定义单个插件的配置
type PluginConfig struct {
Name string `json:"name"`
Config map[string]interface{} `json:"config"`
}
// Route 定义一个路由规则
type Route struct {
ID string `json:"id"` // 唯一ID
Path string `json:"path"` // 请求路径,支持通配符
Method string `json:"method"` // HTTP方法,ALL表示所有方法
Host string `json:"host"` // 请求主机名
Headers map[string]string `json:"headers"` // 请求头部匹配
Target string `json:"target"` // 目标服务名称或URL前缀
StripPath bool `json:"strip_path"` // 是否剥离路径前缀
Plugins []PluginConfig `json:"plugins"` // 绑定的插件列表
Description string `json:"description"` `json:"description"` // 路由描述
// 内部优化字段
pathMatcher *pathMatcher // 编译后的路径匹配器
}
// UpstreamEndpoint 定义一个后端服务实例
type UpstreamEndpoint struct {
ID string `json:"id"`
URL string `json:"url"` // 完整的后端服务URL (http://ip:port)
Weight int `json:"weight"` // 权重,用于加权轮询
Status string `json:"status"` // "UP", "DOWN"
LastHeartbeat time.Time `json:"last_heartbeat"` // 最近心跳时间
}
// Upstream 定义一个后端服务集群
type Upstream struct {
Name string `json:"name"` // 服务名称,与Route.Target对应
Strategy string `json:"strategy"` // 负载均衡策略 (e.g., "round-robin", "weighted-round-robin")
Endpoints []UpstreamEndpoint `json:"endpoints"` // 后端服务实例列表
HealthCheck *HealthCheckConfig `json:"health_check"` // 健康检查配置
UpdatedAt time.Time `json:"updated_at"` // 更新时间
// 内部优化字段
activeEndpoints []*UpstreamEndpoint // 活跃的后端服务实例
lb LoadBalancer // 负载均衡器实例
}
// HealthCheckConfig 健康检查配置
type HealthCheckConfig struct {
Enable bool `json:"enable"`
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
Path string `json:"path"`
Port int `json:"port"` // 如果不指定,使用Endpoint URL的端口
}
// RouterManager 路由管理器接口
type RouterManager interface {
GetRoute(req *http.Request) (*Route, error)
GetUpstream(name string) (*Upstream, error)
UpdateRoutes(routes []Route) error
UpdateUpstreams(upstreams []Upstream) error
}
解释:
Route结构体定义了路由规则,包括路径、方法、主机、头部等匹配条件,以及目标服务 (Target) 和要执行的插件列表。pathMatcher是一个内部字段,用于编译路径规则,提高匹配效率。UpstreamEndpoint定义了单个后端服务实例的地址和权重。Upstream定义了一个后端服务集群,包含多个实例和负载均衡策略。activeEndpoints和lb是内部字段,用于优化性能和管理负载均衡。PluginConfig定义了插件的名称和配置参数。RouterManager接口定义了路由管理器的核心功能,方便后续实现不同的配置源。
4.2 路由匹配算法:Radix Tree (前缀树)
对于路径匹配,高效的数据结构至关重要。Radix Tree(基数树,也称压缩前缀树)是实现高性能路由匹配的理想选择。它能有效地存储和检索字符串路径,尤其适合带有通配符的路径匹配。
我们将实现一个简化的 Radix Tree 来处理路径匹配。
// path_matcher.go
package router
import (
"fmt"
"strings"
"sync"
)
// pathMatcher 接口定义了路径匹配器的行为
type pathMatcher interface {
Match(path string) (map[string]string, bool) // 返回路径参数和是否匹配
Insert(path string, data interface{}) error
Lookup(path string) (interface{}, bool)
Remove(path string) error
}
// RadixTreeNode 结构表示 Radix Tree 中的一个节点
type RadixTreePathNode struct {
segment string // 当前节点的路径段 (e.g., "users", ":id", "*")
children map[string]*RadixTreePathNode // 子节点,key为路径段的第一个字符或特殊字符
handlers map[string]interface{} // 存储匹配到此节点的完整路径及其对应的数据 (e.g., route ID)
isWildcard bool // 标记此节点是否是通配符节点 (e.g., :id, *)
isCatchAll bool // 标记此节点是否是捕获所有节点 (e.g., *)
paramName string // 如果是通配符节点,存储参数名 (e.g., "id")
}
// NewRadixTreePathNode 创建一个新的 RadixTreePathNode
func NewRadixTreePathNode(segment string) *RadixTreePathNode {
return &RadixTreePathNode{
segment: segment,
children: make(map[string]*RadixTreePathNode),
handlers: make(map[string]interface{}),
}
}
// RadixTreePathMatcher 实现 pathMatcher 接口
type RadixTreePathMatcher struct {
root *RadixTreePathNode
mu sync.RWMutex
}
// NewRadixTreePathMatcher 创建一个新的 RadixTreePathMatcher
func NewRadixTreePathMatcher() *RadixTreePathMatcher {
return &RadixTreePathMatcher{
root: NewRadixTreePathNode("/"), // 根节点
}
}
// Insert 将路径和数据插入 Radix Tree
func (r *RadixTreePathMatcher) Insert(path string, data interface{}) error {
r.mu.Lock()
defer r.mu.Unlock()
if !strings.HasPrefix(path, "/") {
path = "/" + path // 确保路径以斜杠开头
}
segments := strings.Split(path, "/")
currentNode := r.root
for i, seg := range segments {
if seg == "" && i != 0 { // 忽略空段,除非是根路径的第一个空段
continue
}
// 处理通配符和捕获所有
isWildcard := false
isCatchAll := false
paramName := ""
if strings.HasPrefix(seg, ":") {
isWildcard = true
paramName = seg[1:]
} else if strings.HasPrefix(seg, "*") {
isCatchAll = true
paramName = seg[1:]
}
found := false
for _, child := range currentNode.children {
if child.segment == seg {
currentNode = child
found = true
break
}
// 如果当前段是通配符,且子节点也是通配符,或者当前段是普通段,子节点是通配符,则需要特殊的处理
// 简化处理:对于相同的路径段,只允许一种类型(普通/通配符/捕获所有)
if isWildcard && child.isWildcard {
if child.paramName != paramName {
return fmt.Errorf("conflicting wildcard parameter names for segment '%s': '%s' vs '%s'", seg, child.paramName, paramName)
}
currentNode = child
found = true
break
}
if isCatchAll && child.isCatchAll {
if child.paramName != paramName {
return fmt.Errorf("conflicting catchall parameter names for segment '%s': '%s' vs '%s'", seg, child.paramName, paramName)
}
currentNode = child
found = true
break
}
}
if !found {
newNode := NewRadixTreePathNode(seg)
newNode.isWildcard = isWildcard
newNode.isCatchAll = isCatchAll
newNode.paramName = paramName
currentNode.children[seg] = newNode // 使用段作为key
currentNode = newNode
}
if isCatchAll {
// Catch-all segments must be the last segment in a path
if i < len(segments)-1 {
return fmt.Errorf("catch-all segment '*' must be the last segment in path '%s'", path)
}
break // 捕获所有后不再继续向下插入
}
}
currentNode.handlers[path] = data // 存储原始路径和数据
return nil
}
// Match 匹配路径,返回路径参数和匹配到的数据
func (r *RadixTreePathMatcher) Match(path string) (map[string]string, interface{}, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
segments := strings.Split(path, "/")
currentNode := r.root
pathParams := make(map[string]string)
var matchedData interface{}
matched := false
// 用于存储最佳匹配(最长匹配)
var bestMatchPath string
var bestMatchData interface{}
var bestMatchParams map[string]string
var bestMatchLength int
r.matchRecursive(r.root, segments, 0, pathParams, func(nodePath string, data interface{}, params map[string]string) {
if len(nodePath) > bestMatchLength {
bestMatchLength = len(nodePath)
bestMatchPath = nodePath
bestMatchData = data
bestMatchParams = make(map[string]string)
for k, v := range params {
bestMatchParams[k] = v
}
}
})
if bestMatchData != nil {
return bestMatchParams, bestMatchData, true
}
return nil, nil, false
}
// matchRecursive 递归匹配路径,找到所有可能的匹配
func (r *RadixTreePathMatcher) matchRecursive(node *RadixTreePathNode, segments []string, segmentIndex int, currentParams map[string]string, callback func(nodePath string, data interface{}, params map[string]string)) {
if node == nil {
return
}
// 如果当前节点有handler,并且路径已经匹配完毕,则这是一个完整的匹配
if segmentIndex == len(segments) {
for p, d := range node.handlers {
callback(p, d, currentParams)
}
return
}
currentSegment := segments[segmentIndex]
// 尝试匹配普通子节点
if child, ok := node.children[currentSegment]; ok {
r.matchRecursive(child, segments, segmentIndex+1, currentParams, callback)
}
// 尝试匹配通配符子节点
for _, child := range node.children {
if child.isWildcard {
newParams := make(map[string]string)
for k, v := range currentParams {
newParams[k] = v
}
newParams[child.paramName] = currentSegment
r.matchRecursive(child, segments, segmentIndex+1, newParams, callback)
} else if child.isCatchAll {
// 捕获所有匹配
newParams := make(map[string]string)
for k, v := range currentParams {
newParams[k] = v
}
newParams[child.paramName] = strings.Join(segments[segmentIndex:], "/")
for p, d := range child.handlers {
callback(p, d, newParams)
}
// 捕获所有后,不再继续向下匹配
return
}
}
}
// Lookup 从 Radix Tree 中查找数据 (此处简化,直接调用Match)
func (r *RadixTreePathMatcher) Lookup(path string) (interface{}, bool) {
_, data, found := r.Match(path)
return data, found
}
// Remove 从 Radix Tree 中移除路径和数据 (此处省略复杂实现,实际生产需要实现)
func (r *RadixTreePathMatcher) Remove(path string) error {
r.mu.Lock()
defer r.mu.Unlock()
// 实际生产中需要实现删除逻辑,涉及节点清理、合并等复杂操作
// 简单的实现可以从handlers中移除,但不清理树结构
// for _, h := range r.root.handlers {
// if h == data {
// delete(r.root.handlers, path)
// return nil
// }
// }
return fmt.Errorf("remove not implemented for RadixTreePathMatcher")
}
// RouteMatcher 接口,用于匹配请求并返回匹配到的Route
type RouteMatcher interface {
Match(req *http.Request) (*Route, map[string]string, error)
AddRoute(route Route) error
RemoveRoute(routeID string) error
UpdateRoute(route Route) error
}
// DefaultRouteMatcher 默认的路由匹配器实现
type DefaultRouteMatcher struct {
routes sync.Map // 存储所有的路由规则,key为Route.ID
pathTree *RadixTreePathMatcher // Radix Tree 用于路径匹配
hostMap sync.Map // 存储 host -> []Route
methodMap sync.Map // 存储 method -> []Route
defaultRoute *Route // 默认路由
}
func NewDefaultRouteMatcher() *DefaultRouteMatcher {
return &DefaultRouteMatcher{
pathTree: NewRadixTreePathMatcher(),
}
}
// AddRoute 添加路由
func (rm *DefaultRouteMatcher) AddRoute(route Route) error {
// 编译路径匹配器
if err := rm.pathTree.Insert(route.Path, route.ID); err != nil {
return fmt.Errorf("failed to insert route path %s: %w", route.Path, err)
}
rm.routes.Store(route.ID, route)
// 刷新hostMap和methodMap (更复杂的实现可能需要更精细的索引)
rm.buildIndexes()
return nil
}
// RemoveRoute 移除路由
func (rm *DefaultRouteMatcher) RemoveRoute(routeID string) error {
rm.routes.Delete(routeID)
// 刷新hostMap和methodMap
rm.buildIndexes()
// TODO: 还需要从pathTree中删除对应的路径
return nil
}
// UpdateRoute 更新路由
func (rm *DefaultRouteMatcher) UpdateRoute(route Route) error {
// 简单实现为删除再添加,实际生产中可以优化
rm.RemoveRoute(route.ID)
return rm.AddRoute(route)
}
// buildIndexes 重新构建host和method的索引
func (rm *DefaultRouteMatcher) buildIndexes() {
newHostMap := make(map[string][]Route)
newMethodMap := make(map[string][]Route)
rm.routes.Range(func(key, value interface{}) bool {
r := value.(Route)
if r.Host != "" {
newHostMap[r.Host] = append(newHostMap[r.Host], r)
}
if r.Method != "" {
newMethodMap[r.Method] = append(newMethodMap[r.Method], r)
}
return true
})
// 原子替换
rm.hostMap = sync.Map{}
for k, v := range newHostMap {
rm.hostMap.Store(k, v)
}
rm.methodMap = sync.Map{}
for k, v := range newMethodMap {
rm.methodMap.Store(k, v)
}
}
// Match 匹配请求,返回匹配到的Route和路径参数
func (rm *DefaultRouteMatcher) Match(req *http.Request) (*Route, map[string]string, error) {
// 1. 路径匹配
pathParams, routeID, pathMatch := rm.pathTree.Match(req.URL.Path)
if !pathMatch {
return nil, nil, fmt.Errorf("no route found for path: %s", req.URL.Path)
}
matchedRoute, ok := rm.routes.Load(routeID.(string))
if !ok {
return nil, nil, fmt.Errorf("route ID '%s' found in path tree but not in routes map", routeID)
}
route := matchedRoute.(Route)
// 2. Host 匹配 (如果Route指定了Host)
if route.Host != "" && route.Host != req.Host {
return nil, nil, fmt.Errorf("host mismatch for route %s: expected %s, got %s", route.ID, route.Host, req.Host)
}
// 3. Method 匹配 (如果Route指定了Method)
if route.Method != "" && route.Method != "ALL" && route.Method != req.Method {
return nil, nil, fmt.Errorf("method mismatch for route %s: expected %s, got %s", route.ID, route.Method, req.Method)
}
// 4. Header 匹配 (如果Route指定了Header)
for k, v := range route.Headers {
if req.Header.Get(k) != v {
return nil, nil, fmt.Errorf("header mismatch for route %s: expected %s=%s", route.ID, k, v)
}
}
return &route, pathParams, nil
}
解释:
RadixTreePathNode和RadixTreePathMatcher实现了 Radix Tree 的基本功能,支持/users/:id和/files/*这样的通配符和捕获所有路径。Match方法会返回路径参数。为了简化,这里的Match方法会找到最长匹配的路径。DefaultRouteMatcher实现了RouteMatcher接口,内部使用sync.Map存储路由,并利用RadixTreePathMatcher进行路径匹配。它还维护了hostMap和methodMap索引,以便在路由规则数量庞大时快速过滤。AddRoute会将路由添加到 Radix Tree 和sync.Map中,并重建索引。Match方法会依次进行路径、主机、方法和头部匹配。
注意: 实际生产级 Radix Tree 实现会更复杂,需要处理节点合并、删除优化、不同通配符的优先级等问题。这里提供的是一个简化版,用于演示核心思想。对于性能极致的场景,可以考虑集成像 fasthttp/router 或 gorilla/mux 这样的成熟路由库。
4.3 服务发现与配置管理
网关需要动态获取后端服务实例列表,并实时更新路由规则。这通常通过与服务注册中心(如 Kubernetes API Server、Etcd、Consul)集成来实现。
// service_discovery.go
package router
import (
"context"
"fmt"
"log"
"sync"
"time"
// 假设我们使用 Kubernetes 作为服务发现
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// ServiceDiscovery 接口定义了服务发现的功能
type ServiceDiscovery interface {
GetUpstream(name string) (*Upstream, bool)
Start(ctx context.Context) error
Stop()
Subscribe(callback func(upstreams []Upstream)) // 订阅更新通知
}
// KubernetesServiceDiscovery 实现基于 Kubernetes 的服务发现
type KubernetesServiceDiscovery struct {
clientset *kubernetes.Clientset
informerFactory informers.SharedInformerFactory
upstreamStore sync.Map // map[string]*Upstream
stopCh chan struct{}
subscribers []func(upstreams []Upstream)
mu sync.RWMutex
}
// NewKubernetesServiceDiscovery 创建 K8s 服务发现实例
func NewKubernetesServiceDiscovery(namespace string) (*KubernetesServiceDiscovery, error) {
config, err := rest.InClusterConfig() // 尝试集群内配置
if err != nil {
// 如果不在集群内,尝试加载kubeconfig
// kubeconfig := os.Getenv("KUBECONFIG")
// if kubeconfig == "" {
// kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
// }
// config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
// if err != nil {
// return nil, fmt.Errorf("failed to get k8s config: %w", err)
// }
return nil, fmt.Errorf("failed to get in-cluster k8s config: %w (run outside cluster requires specific kubeconfig setup)", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create k8s clientset: %w", err)
}
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
time.Minute, // 重同步周期
informers.WithNamespace(namespace),
)
return &KubernetesServiceDiscovery{
clientset: clientset,
informerFactory: factory,
stopCh: make(chan struct{}),
}, nil
}
// Start 启动服务发现
func (k *KubernetesServiceDiscovery) Start(ctx context.Context) error {
serviceInformer := k.informerFactory.Core().V1().Services().Informer()
endpointsInformer := k.informerFactory.Core().V1().Endpoints().Informer()
serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: k.onServiceAdd,
UpdateFunc: k.onServiceUpdate,
DeleteFunc: k.onServiceDelete,
})
endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: k.onEndpointsAdd,
UpdateFunc: k.onEndpointsUpdate,
DeleteFunc: k.onEndpointsDelete,
})
k.informerFactory.Start(k.stopCh)
k.informerFactory.WaitForCacheSync(k.stopCh) // 等待缓存同步完成
log.Println("Kubernetes Service Discovery started and cache synced.")
return nil
}
// Stop 停止服务发现
func (k *KubernetesServiceDiscovery) Stop() {
close(k.stopCh)
log.Println("Kubernetes Service Discovery stopped.")
}
// onServiceAdd/Update/Delete 处理 Service 资源变化
func (k *KubernetesServiceDiscovery) onServiceAdd(obj interface{}) {
k.updateUpstreamFromService(obj.(*v1.Service))
}
func (k *KubernetesServiceDiscovery) onServiceUpdate(oldObj, newObj interface{}) {
k.updateUpstreamFromService(newObj.(*v1.Service))
}
func (k *KubernetesServiceDiscovery) onServiceDelete(obj interface{}) {
svc := obj.(*v1.Service)
k.upstreamStore.Delete(svc.Name)
k.notifySubscribers()
log.Printf("Service deleted: %s", svc.Name)
}
// onEndpointsAdd/Update/Delete 处理 Endpoints 资源变化
func (k *KubernetesServiceDiscovery) onEndpointsAdd(obj interface{}) {
k.updateUpstreamFromEndpoints(obj.(*v1.Endpoints))
}
func (k *KubernetesServiceDiscovery) onEndpointsUpdate(oldObj, newObj interface{}) {
k.updateUpstreamFromEndpoints(newObj.(*v1.Endpoints))
}
func (k *KubernetesServiceDiscovery) onEndpointsDelete(obj interface{}) {
// 当Endpoints被删除时,可能需要检查对应的Service是否还存在
// 简单的处理是,如果Service仍然存在,则将其endpoints清空
ep := obj.(*v1.Endpoints)
if upstream, ok := k.upstreamStore.Load(ep.Name); ok {
up := upstream.(*Upstream)
up.Endpoints = []*UpstreamEndpoint{}
up.activeEndpoints = []*UpstreamEndpoint{} // 清空活跃实例
up.UpdatedAt = time.Now()
k.upstreamStore.Store(ep.Name, up)
k.notifySubscribers()
log.Printf("Endpoints for service %s deleted. Upstream updated.", ep.Name)
}
}
// updateUpstreamFromService 从 Service 资源更新 Upstream
func (k *KubernetesServiceDiscovery) updateUpstreamFromService(svc *v1.Service) {
// 这里只处理Service名称到Upstream名称的映射
// 实际的Endpoint信息来自Endpoints资源
// 对于每个Service,我们创建一个或更新一个Upstream
// 负载均衡策略可以从Service的Annotation中获取
upstreamName := svc.Name
lbStrategy := "round-robin" // 默认负载均衡策略
if strategy, ok := svc.Annotations["gateway.example.com/load-balancer"]; ok {
lbStrategy = strategy
}
upstream, _ := k.upstreamStore.LoadOrStore(upstreamName, &Upstream{
Name: upstreamName,
Strategy: lbStrategy,
Endpoints: []*UpstreamEndpoint{}, // Endpoints稍后从Endpoints资源补充
UpdatedAt: time.Now(),
})
up := upstream.(*Upstream)
up.Strategy = lbStrategy // 更新策略
up.UpdatedAt = time.Now()
k.upstreamStore.Store(upstreamName, up)
k.notifySubscribers()
log.Printf("Service %s updated. Upstream %s created/updated.", svc.Name, upstreamName)
}
// updateUpstreamFromEndpoints 从 Endpoints 资源更新 Upstream
func (k *KubernetesServiceDiscovery) updateUpstreamFromEndpoints(ep *v1.Endpoints) {
upstreamName := ep.Name
upstream, ok := k.upstreamStore.Load(upstreamName)
if !ok {
// 如果Service还没被处理,先创建一个空的Upstream
upstream = &Upstream{
Name: upstreamName,
Strategy: "round-robin",
Endpoints: []*UpstreamEndpoint{},
UpdatedAt: time.Now(),
}
k.upstreamStore.Store(upstreamName, upstream)
}
up := upstream.(*Upstream)
newEndpoints := make([]*UpstreamEndpoint, 0)
for _, subset := range ep.Subsets {
for _, address := range subset.Addresses {
for _, port := range subset.Ports {
// 假设我们只关心HTTP端口
if port.Protocol == v1.ProtocolTCP && (port.Name == "http" || port.Name == "web" || port.Name == "gateway") {
newEndpoints = append(newEndpoints, &UpstreamEndpoint{
ID: fmt.Sprintf("%s-%s-%d", address.IP, port.Name, port.Port),
URL: fmt.Sprintf("http://%s:%d", address.IP, port.Port),
Weight: 1, // 默认权重
Status: "UP",
LastHeartbeat: time.Now(),
})
}
}
}
}
up.Endpoints = newEndpoints
up.activeEndpoints = newEndpoints // 暂时所有都是活跃的,健康检查会更新状态
up.UpdatedAt = time.Now()
// 更新负载均衡器
up.lb = GetLoadBalancer(up.Strategy, up.activeEndpoints)
k.upstreamStore.Store(upstreamName, up)
k.notifySubscribers()
log.Printf("Endpoints for service %s updated. Upstream %s endpoints refreshed.", ep.Name, upstreamName)
}
// GetUpstream 获取指定名称的 Upstream
func (k *KubernetesServiceDiscovery) GetUpstream(name string) (*Upstream, bool) {
if val, ok := k.upstreamStore.Load(name); ok {
return val.(*Upstream), true
}
return nil, false
}
// Subscribe 允许其他组件订阅 Upstream 更新通知
func (k *KubernetesServiceDiscovery) Subscribe(callback func(upstreams []Upstream)) {
k.mu.Lock()
defer k.mu.Unlock()
k.subscribers = append(k.subscribers, callback)
}
// notifySubscribers 通知所有订阅者 Upstream 列表已更新
func (k *KubernetesServiceDiscovery) notifySubscribers() {
k.mu.RLock()
defer k.mu.RUnlock()
var allUpstreams []Upstream
k.upstreamStore.Range(func(key, value interface{}) bool {
allUpstreams = append(allUpstreams, *value.(*Upstream))
return true
})
for _, sub := range k.subscribers {
sub(allUpstreams)
}
}
// ConfigManager 接口定义了配置管理器的功能
type ConfigManager interface {
GetRoutes() ([]Route, error)
GetUpstreams() ([]Upstream, error)
Start(ctx context.Context) error
Stop()
SubscribeRoutes(callback func(routes []Route))
SubscribeUpstreams(callback func(upstreams []Upstream))
}
// InMemoryConfigManager 简单的内存配置管理器 (用于演示,生产环境应从文件/K8s ConfigMap/Etcd加载)
type InMemoryConfigManager struct {
routes []Route
upstreams []Upstream
routeSubscribers []func(routes []Route)
upstreamSubscribers []func(upstreams []Upstream)
mu sync.RWMutex
stopCh chan struct{}
}
func NewInMemoryConfigManager() *InMemoryConfigManager {
return &InMemoryConfigManager{
stopCh: make(chan struct{}),
}
}
func (m *InMemoryConfigManager) Start(ctx context.Context) error {
log.Println("In-Memory Config Manager started.")
// 模拟初始加载和定期更新
go func() {
ticker := time.NewTicker(30 * time.Second) // 每30秒模拟更新
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.mu.Lock()
// 模拟更新路由和上游
m.routes = []Route{
{
ID: "user-service",
Path: "/api/users/:id",
Method: "GET",
Target: "user-service",
StripPath: true,
Plugins: []PluginConfig{
{Name: "auth"},
{Name: "rate-limit", Config: map[string]interface{}{"rate": 10, "burst": 20}},
},
},
{
ID: "product-service",
Path: "/api/products/*",
Method: "ALL",
Target: "product-service",
StripPath: true,
Plugins: []PluginConfig{
{Name: "logger"},
},
},
}
m.upstreams = []Upstream{
{
Name: "user-service",
Strategy: "round-robin",
Endpoints: []*UpstreamEndpoint{
{ID: "user-1", URL: "http://127.0.0.1:8081", Weight: 1, Status: "UP"},
{ID: "user-2", URL: "http://127.0.0.1:8082", Weight: 1, Status: "UP"},
},
UpdatedAt: time.Now(),
},
{
Name: "product-service",
Strategy: "weighted-round-robin",
Endpoints: []*UpstreamEndpoint{
{ID: "product-1", URL: "http://127.0.0.1:8083", Weight: 2, Status: "UP"},
{ID: "product-2", URL: "http://127.0.0.1:8084", Weight: 1, Status: "UP"},
},
UpdatedAt: time.Now(),
},
}
m.mu.Unlock()
m.notifyRouteSubscribers()
m.notifyUpstreamSubscribers()
log.Println("In-Memory Config Manager: Routes and Upstreams updated.")
case <-m.stopCh:
return
}
}
}()
return nil
}
func (m *InMemoryConfigManager) Stop() {
close(m.stopCh)
log.Println("In-Memory Config Manager stopped.")
}
func (m *InMemoryConfigManager) GetRoutes() ([]Route, error) {
m.mu.RLock()
defer m.mu.RUnlock()
// 返回副本以防止外部修改
routesCopy := make([]Route, len(m.routes))
copy(routesCopy, m.routes)
return routesCopy, nil
}
func (m *InMemoryConfigManager) GetUpstreams() ([]Upstream, error) {
m.mu.RLock()
defer m.mu.RUnlock()
upstreamsCopy := make([]Upstream, len(m.upstreams))
copy(upstreamsCopy, m.upstreams)
return upstreamsCopy, nil
}
func (m *InMemoryConfigManager) SubscribeRoutes(callback func(routes []Route)) {
m.mu.Lock()
defer m.mu.Unlock()
m.routeSubscribers = append(m.routeSubscribers, callback)
}
func (m *InMemoryConfigManager) SubscribeUpstreams(callback func(upstreams []Upstream)) {
m.mu.Lock()
defer m.mu.Unlock()
m.upstreamSubscribers = append(m.upstreamSubscribers, callback)
}
func (m *InMemoryConfigManager) notifyRouteSubscribers() {
m.mu.RLock()
defer m.mu.RUnlock()
routesCopy := make([]Route, len(m.routes))
copy(routesCopy, m.routes)
for _, sub := range m.routeSubscribers {
sub(routesCopy)
}
}
func (m *InMemoryConfigManager) notifyUpstreamSubscribers() {
m.mu.RLock()
defer m.mu.RUnlock()
upstreamsCopy := make([]Upstream, len(m.upstreams))
copy(upstreamsCopy, m.upstreams)
for _, sub := range m.upstreamSubscribers {
sub(upstreamsCopy)
}
}
解释:
ServiceDiscovery接口定义了服务发现的行为。KubernetesServiceDiscovery是一个基于 Kubernetesclient-goInformer 的实现。它监听Service和Endpoints资源的变化,并将其转换为内部的Upstream结构。client-goInformer 机制是 K8s 客户端库中用于高效监听资源变化并维护本地缓存的组件。它通过 Watch API 接收事件,并异步处理。onServiceAdd/Update/Delete和onEndpointsAdd/Update/Delete方法是事件处理回调,它们负责更新upstreamStore。notifySubscribers方法会在Upstream列表发生变化时,通知所有订阅者。
ConfigManager接口定义了配置管理器的行为,用于获取和订阅路由和上游的配置。InMemoryConfigManager是一个简单的内存实现,用于演示。在生产环境中,这部分会替换为从 Kubernetes ConfigMap、Etcd、Consul 或自定义 CRD 中加载配置的实现。它也会定期模拟更新并通知订阅者,展示“热加载”的机制。
4.4 负载均衡器
负载均衡器负责在多个后端服务实例之间分发请求。
// load_balancer.go
package router
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// LoadBalancer 接口定义了负载均衡器的行为
type LoadBalancer interface {
Next() (*UpstreamEndpoint, error)
UpdateEndpoints(endpoints []*UpstreamEndpoint)
}
// RoundRobinLoadBalancer 轮询负载均衡器
type RoundRobinLoadBalancer struct {
endpoints []*UpstreamEndpoint
next uint64 // 原子计数器
mu sync.RWMutex
}
func NewRoundRobinLoadBalancer(endpoints []*UpstreamEndpoint) *RoundRobinLoadBalancer {
return &RoundRobinLoadBalancer{
endpoints: endpoints,
next: 0,
}
}
func (lb *RoundRobinLoadBalancer) Next() (*UpstreamEndpoint, error) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.endpoints) == 0 {
return nil, fmt.Errorf("no active endpoints available")
}
// 原子递增并取模
idx := atomic.AddUint64(&lb.next, 1) % uint64(len(lb.endpoints))
return lb.endpoints[idx], nil
}
func (lb *RoundRobinLoadBalancer) UpdateEndpoints(endpoints []*UpstreamEndpoint) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.endpoints = endpoints
// 重置计数器,确保新的Endpoints能从头开始轮询
lb.next = 0
}
// WeightedRoundRobinLoadBalancer 加权轮询负载均衡器
type WeightedRoundRobinLoadBalancer struct {
endpoints []*UpstreamEndpoint
weights []int // 存储每个endpoint的权重
totalWeight int
currentWeight int // 当前权重
currentIdx int // 当前索引
mu sync.RWMutex
}
func NewWeightedRoundRobinLoadBalancer(endpoints []*UpstreamEndpoint) *WeightedRoundRobinLoadBalancer {
lb := &WeightedRoundRobinLoadBalancer{}
lb.UpdateEndpoints(endpoints) // 调用UpdateEndpoints来初始化
return lb
}
func (lb *WeightedRoundRobinLoadBalancer) UpdateEndpoints(endpoints []*UpstreamEndpoint) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.endpoints = endpoints
lb.weights = make([]int, len(endpoints))
lb.totalWeight = 0
for i, ep := range endpoints {
lb.weights[i] = ep.Weight
lb.totalWeight += ep.Weight
}
lb.currentWeight = 0
lb.currentIdx = -1 // 初始为-1,第一次Next时会自增到0
}
// Next 采用平滑加权轮询算法
func (lb *WeightedRoundRobinLoadBalancer) Next() (*UpstreamEndpoint, error) {
lb.mu.Lock() // 需要写锁来更新currentWeight和currentIdx
defer lb.mu.Unlock()
if len(lb.endpoints) == 0 {
return nil, fmt.Errorf("no active endpoints available")
}
// Nginx平滑加权轮询算法
// 每次选择当前权重最高的,并减去总权重,然后给所有权重加上自己的权重
for {
lb.currentIdx = (lb.currentIdx + 1) % len(lb.endpoints)
if lb.currentIdx == 0 {
lb.currentWeight -= lb.totalWeight // 一轮结束后,减去总权重
}
lb.currentWeight += lb.weights[lb.currentIdx] // 加上当前endpoint的权重
if lb.currentWeight >= 0 {
return lb.endpoints[lb.currentIdx], nil
}
}
}
// GetLoadBalancer 根据策略获取负载均衡器实例
func GetLoadBalancer(strategy string, endpoints []*UpstreamEndpoint) LoadBalancer {
switch strategy {
case "weighted-round-robin":
return NewWeightedRoundRobinLoadBalancer(endpoints)
case "round-robin":
return NewRoundRobinLoadBalancer(endpoints)
case "random":
return NewRandomLoadBalancer(endpoints)
default:
// 默认使用轮询
return NewRoundRobinLoadBalancer(endpoints)
}
}
// RandomLoadBalancer 随机负载均衡器
type RandomLoadBalancer struct {
endpoints []*UpstreamEndpoint
rand *rand.Rand // 随机数生成器
mu sync.RWMutex
}
func NewRandomLoadBalancer(endpoints []*UpstreamEndpoint) *RandomLoadBalancer {
return &RandomLoadBalancer{
endpoints: endpoints,
rand: rand.New(rand.NewSource(time.Now().UnixNano())), // 使用时间戳作为种子
}
}
func (lb *RandomLoadBalancer) Next() (*UpstreamEndpoint, error) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.endpoints) == 0 {
return nil, fmt.Errorf("no active endpoints available")
}
idx := lb.rand.Intn(len(lb.endpoints))
return lb.endpoints[idx], nil
}
func (lb *RandomLoadBalancer) UpdateEndpoints(endpoints []*UpstreamEndpoint) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.endpoints = endpoints
}
解释:
LoadBalancer接口定义了负载均衡器的通用行为:Next()选择下一个后端服务,UpdateEndpoints()更新后端服务列表。RoundRobinLoadBalancer实现了简单的轮询算法,使用atomic.AddUint64保证计数器的并发安全。WeightedRoundRobinLoadBalancer实现了加权轮询,这里采用了 Nginx 经典的平滑加权轮询算法,在权重差异大时也能保证请求分发的平滑性。RandomLoadBalancer实现了随机负载均衡。GetLoadBalancer是一个工厂函数,根据传入的策略字符串返回对应的负载均衡器实例。
5. 实现插件化扩展
插件化是网关灵活性的关键。它允许我们在不修改核心代码的情况下,动态地添加、修改或移除功能,如认证、限流、日志、监控、请求转换、熔断等。
5.1 插件接口与上下文
我们需要定义一个统一的插件接口,以及一个在插件之间传递数据的上下文对象。
// plugin.go
package plugin
import (
"context"
"net/http"
)
// Context 在插件链中传递的上下文
type Context struct {
// 请求和响应对象
Request *http.Request
Response http.ResponseWriter
// 路由信息
RouteID string
PathParams map[string]string // 路径参数
TargetURL string // 目标后端服务的完整URL
// 插件之间共享的数据
Data map[string]interface{}
// 用于控制插件链的执行
Abort bool // 如果设置为true,后续插件和请求转发将被中止
// Go Context,用于超时和取消
GoContext context.Context
}
// Plugin 接口定义了所有插件必须实现的方法
type Plugin interface {
Name() string // 插件名称
Execute(ctx *Context) error // 执行插件逻辑
Init(config map[string]interface{}) error // 初始化插件
}
// PluginManager 负责管理和执行插件
type PluginManager struct {
plugins map[string]Plugin // 注册的插件实例
}
func NewPluginManager() *PluginManager {
return &PluginManager{
plugins: make(map[string]Plugin),
}
}
// RegisterPlugin 注册插件
func (pm *PluginManager) RegisterPlugin(p Plugin) {
pm.plugins[p.Name()] = p
}
// GetPlugin 获取指定名称的插件
func (pm *PluginManager) GetPlugin(name string) (Plugin, bool) {
p, ok := pm.plugins[name]
return p, ok
}
// ExecutePlugins 按照路由配置的顺序执行插件
func (pm *PluginManager) ExecutePlugins(ctx *Context, pluginConfigs []PluginConfig) error {
for _, pc := range pluginConfigs {
plugin, ok := pm.GetPlugin(pc.Name)
if !ok {
return fmt.Errorf("plugin '%s' not found", pc.Name)
}
// 每次执行前重新初始化插件配置,确保隔离性
// 更好的做法是插件是无状态的,或者在Init阶段就处理配置并生成一个配置了的插件实例
// 这里为了简化,假设插件在Execute内部会使用其Init时的配置
// 实际上,Init应该在插件注册时调用一次,然后通过PluginConfig中的Config字段在Execute时传递
// 为了满足动态配置,我们可以在每次执行前重新Init,或者让Execute直接接收配置
// 采取后者:让Execute直接接收PluginConfig
if err := plugin.Execute(ctx); err != nil {
return fmt.Errorf("plugin '%s' failed: %w", plugin.Name(), err)
}
if ctx.Abort {
return nil // 插件链被中止
}
}
return nil
}
为了更灵活地传递插件配置,我们修改 Plugin 接口:
// plugin.go (修正版)
package plugin
import (
"context"
"net/http"
)
// Context 在插件链中传递的上下文
type Context struct {
Request *http.Request
Response http.ResponseWriter
RouteID string
PathParams map[string]string
TargetURL string
Data map[string]interface{}
Abort bool
GoContext context.Context
ResponseWriterStatus int // 记录响应状态码
}
// PluginConfig 定义单个插件的配置 (与 router 包中的定义一致)
type PluginConfig struct {
Name string `json:"name"`
Config map[string]interface{} `json:"config"`
}
// Plugin 接口定义了所有插件必须实现的方法
type Plugin interface {
Name() string // 插件名称
Init(config map[string]interface{}) error // 初始化插件,通常在网关启动时调用一次
Execute(ctx *Context, config map[string]interface{}) error // 执行插件逻辑,传入路由级别的配置
}
// PluginManager 负责管理和执行插件
type PluginManager struct {
plugins map[string]Plugin // 注册的插件实例
}
func NewPluginManager() *PluginManager {
return &PluginManager{
plugins: make(map[string]Plugin),
}
}
// RegisterPlugin 注册插件
func (pm *PluginManager) RegisterPlugin(p Plugin) error {
if _, exists := pm.plugins[p.Name()]; exists {
return fmt.Errorf("plugin '%s' already registered", p.Name())
}
pm.plugins[p.Name()] = p
// 插件通常在注册时进行一次全局初始化,不依赖路由特定配置
// 如果插件需要全局配置,可以在这里传递
return p.Init(nil) // 示例:全局配置为nil
}
// ExecutePlugins 按照路由配置的顺序执行插件
func (pm *PluginManager) ExecutePlugins(ctx *Context, pluginConfigs []PluginConfig) error {
for _, pc := range pluginConfigs {
plugin, ok := pm.plugins[pc.Name]
if !ok {
return fmt.Errorf("plugin '%s' not found", pc.Name)
}
if err := plugin.Execute(ctx, pc.Config); err != nil {
return fmt.Errorf("plugin '%s' failed: %w", plugin.Name(), err)
}
if ctx.Abort {
return nil // 插件链被中止
}
}
return nil
}
解释:
Context封装了请求、响应、路由信息、路径参数、共享数据和 Gocontext.Context。Abort字段用于提前终止插件链和请求处理。Plugin接口定义了Name()、Init()和Execute()方法。Init用于插件的全局初始化,Execute则在每次请求时执行,并接收路由级别的配置。PluginManager负责存储和按顺序执行插件。RegisterPlugin注册插件实例,ExecutePlugins遍历路由配置中指定的插件列表,依次调用它们的Execute方法。
5.2 常见插件示例
5.2.1 认证插件 (JWT)
// auth_plugin.go
package plugin
import (
"fmt"
"log"
"net/http"
"strings"
"github.com/dgrijalva/jwt-go" // 假设使用这个库
)
// AuthPlugin 实现认证功能
type AuthPlugin struct {
secretKey string // JWT签名密钥
}
func NewAuthPlugin() *AuthPlugin {
return &AuthPlugin{}
}
func (p *AuthPlugin) Name() string {
return "auth"
}
// Init AuthPlugin 的初始化,加载全局密钥等
func (p *AuthPlugin) Init(config map[string]interface{}) error {
if secret, ok := config["secret_key"].(string); ok {
p.secretKey = secret
} else {
// 默认密钥或从环境变量加载
p.secretKey = "your-super-secret-key"
}
log.Printf("AuthPlugin initialized with secret key: %s", p.secretKey)
return nil
}
// Execute 验证 JWT Token
func (p *AuthPlugin) Execute(ctx *Context, config map[string]interface{}) error {
authHeader := ctx.Request.Header.Get("Authorization")
if authHeader == "" {
ctx.Response.WriteHeader(http.StatusUnauthorized)
ctx.Response.Write([]byte("Unauthorized: Missing Authorization header"))
ctx.Abort = true
ctx.ResponseWriterStatus = http.StatusUnauthorized
return nil
}
parts := strings.Split(authHeader, " ")
if len(parts) != 2 || parts[0] != "Bearer" {
ctx.Response.WriteHeader(http.StatusUnauthorized)
ctx.Response.Write([]byte("Unauthorized: Invalid Authorization header format"))
ctx.Abort = true
ctx.ResponseWriterStatus = http.StatusUnauthorized
return nil
}
tokenString := parts[1]
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return []byte(p.secretKey), nil
})
if err != nil || !token.Valid {
ctx.Response.WriteHeader(http.StatusUnauthorized)
ctx.Response.Write([]byte("Unauthorized: Invalid Token"))
ctx.Abort = true
ctx.ResponseWriterStatus = http.StatusUnauthorized
log.Printf("JWT validation failed: %v", err)
return nil
}
// 将用户信息存储到 Context.Data,供后续插件或服务使用
if claims, ok := token.Claims.(jwt.MapClaims); ok {
ctx.Data["user_id"] = claims["user_id"]
ctx.Data["roles"] = claims["roles"]
log.Printf("Authenticated user_id: %v", claims["user_id"])
}
return nil
}
5.2.2 限流插件
// rate_limit_plugin.go
package plugin
import (
"fmt"
"log"
"net/http"
"strconv"
"sync"
"time"
"golang.org/x/time/rate" // 令牌桶算法
)
// RateLimitPlugin 实现限流功能
type RateLimitPlugin struct {
// 全局限流器,或者根据不同的配置创建不同的限流器
// 实际应用中,限流器可能按IP、用户ID、API路径等维度进行管理
limiters sync.Map // map[string]*rate.Limiter
}
func NewRateLimitPlugin() *RateLimitPlugin {
return &RateLimitPlugin{}
}
func (p *RateLimitPlugin) Name() string {
return "rate-limit"
}
func (p *RateLimitPlugin) Init(config map[string]interface{}) error {
log.Printf("RateLimitPlugin initialized.")
return nil
}
// Execute 执行限流逻辑
func (p *RateLimitPlugin) Execute(ctx *Context, config map[string]interface{}) error {
// 从插件配置中获取限流参数
ratePerSec := 10.0 // 默认值
burst := 20 // 默认值
if r, ok := config["rate"].(float64); ok {
ratePerSec = r
} else if r, ok := config["rate"].(int); ok {
ratePerSec = float64(r)
}
if b, ok := config["burst"].(float64); ok {
burst = int(b)
} else if b, ok := config["burst"].(int); ok {
burst = b
}
// 针对每个路由创建一个独立的限流器 (生产环境应按更细粒度维度)
limiterKey := ctx.RouteID // 或者 ctx.Request.RemoteAddr, ctx.Data["user_id"]
limiter, _ := p.limiters.LoadOrStore(limiterKey, rate.NewLimiter(rate.Limit(ratePerSec), burst))
l := limiter.(*rate.Limiter)
if !l.Allow() {
ctx.Response.Header().Set("Retry-After", strconv.Itoa(int(l.Reserve().Delay()/time.Second)))
ctx.Response.WriteHeader(http.StatusTooManyRequests)
ctx.Response.Write([]byte("Too Many Requests"))
ctx.Abort = true
ctx.ResponseWriterStatus = http.StatusTooManyRequests
log.Printf("Rate limit exceeded for route %s", ctx.RouteID)
return nil
}
return nil
}
5.2.3 日志插件
// logger_plugin.go
package plugin
import (
"fmt"
"log"
"net/http"
"time"
)
// LoggerPlugin 实现请求日志功能
type LoggerPlugin struct{}
func NewLoggerPlugin() *LoggerPlugin {
return &LoggerPlugin{}
}
func (p *LoggerPlugin) Name() string {
return "logger"
}
func (p *LoggerPlugin) Init(config map[string]interface{}) error {
log.Printf("LoggerPlugin initialized.")
return nil
}
// Execute 记录请求日志
func (p *LoggerPlugin) Execute(ctx *Context, config map[string]interface{}) error {
start := time.Now()
// 在请求处理完成后记录日志 (需要包装http.ResponseWriter)
// 这是一个前置插件,但为了记录响应状态和时间,需要特殊的处理
// 或者通过在 Context 中暴露一个后处理钩子来实现
// 这里简化为只记录请求信息
log.Printf("Request received: Method=%s, Path=%s, RouteID=%s, UserID=%v",
ctx.Request.Method, ctx.Request.URL.Path, ctx.RouteID, ctx.Data["user_id"])
// 假设我们有一个后处理机制来记录完整的请求-响应周期
// 实际可以在请求处理链的末端注入一个Hook
// 这里通过Context.GoContext来传递请求开始时间,在代理完成后计算
ctx.GoContext = context.WithValue(ctx.GoContext, "requestStartTime", start)
return nil
}
// ResponseLogger 包装 http.ResponseWriter 以捕获状态码和大小
type ResponseLogger struct {
http.ResponseWriter
statusCode int
bytesWritten int
}
func NewResponseLogger(w http.ResponseWriter) *ResponseLogger {
return &ResponseLogger{
ResponseWriter: w,
statusCode: http.StatusOK, // 默认200
}
}
func (rl *ResponseLogger) WriteHeader(statusCode int) {
rl.statusCode = statusCode
rl.ResponseWriter.WriteHeader(statusCode)
}
func (rl *ResponseLogger) Write(b []byte) (int, error) {
n, err := rl.ResponseWriter.Write(b)
rl.bytesWritten += n
return n, err
}
解释:
- 每个插件都实现了
Plugin接口。 AuthPlugin演示了 JWT 认证,从Authorization头部获取 Token 并验证,如果失败则中止请求。RateLimitPlugin使用golang.org/x/time/rate实现了令牌桶限流算法,可以根据路由配置不同的限流速率。LoggerPlugin记录请求信息。为了完整记录响应状态码和耗时,通常需要包装http.ResponseWriter并在请求处理链的末尾执行日志记录。这里为了演示插件本身,只记录了请求前的信息,并用ResponseLogger示意如何捕获响应信息。
6. 高性能考虑
构建高性能网关,除了高效的算法和数据结构,Go 语言本身的特性也需要充分利用。
- Goroutines 与 Concurrency: Go 的轻量级协程 Goroutine 使得处理大量并发连接变得简单。HTTP 服务器为每个请求启动一个 Goroutine,确保请求处理是非阻塞的。
- 非阻塞 I/O: Go 的网络库底层利用操作系统提供的 Epoll (Linux), Kqueue (macOS/FreeBSD) 等机制实现非阻塞 I/O,确保在等待网络事件时不会阻塞整个进程。
- 内存管理与 GC 优化:
- 减少内存分配: 避免在热路径上频繁创建临时对象。例如,可以复用
http.Request和http.Response的一些字段。 - 对象池 (
sync.Pool): 对于频繁创建和销毁的短生命周期对象(如plugin.Context),可以使用sync.Pool进行复用,减少 GC 压力。 - 零拷贝: 在某些场景下,避免数据在不同缓冲区之间复制,例如在代理请求时直接将上游响应流式传输给客户端。
- 减少内存分配: 避免在热路径上频繁创建临时对象。例如,可以复用
- 高效数据结构: Radix Tree (用于路由匹配) 和
sync.Map(用于并发安全的配置存储) 是很好的例子。 - HTTP Server 选择: Go 标准库
net/http通常已经足够高性能。对于一些对极限性能有要求的场景,可以考虑fasthttp,它提供了更高的吞吐量和更低的延迟,但代价是与标准库不完全兼容。对于网关,net/http的灵活性和中间件生态往往更具优势。 - Upstream 连接池: 对于后端服务的 HTTP 客户端,应使用连接池(
http.Client默认使用连接池)来复用 TCP 连接,减少连接建立和关闭的开销。 - 超时与上下文取消: 使用
context.Context管理请求的生命周期,设置合理的超时时间,并在客户端断开连接时及时取消上游请求,避免资源浪费。
plugin.Context 使用 sync.Pool 优化:
// main.go (或者一个公共的 context_pool.go 文件)
package main
import (
"net/http"
"sync"
"context"
"gateway/plugin"
)
var contextPool = sync.Pool{
New: func() interface{} {
return &plugin.Context{
Data: make(map[string]interface{}),
}
},
}
func acquireContext(w http.ResponseWriter, r *http.Request) *plugin.Context {
ctx := contextPool.Get().(*plugin.Context)
ctx.Request = r
ctx.Response = w
ctx.RouteID = ""
ctx.PathParams = nil
ctx.TargetURL = ""
for k := range ctx.Data { // 清理旧数据
delete(ctx.Data, k)
}
ctx.Abort = false
ctx.GoContext = r.Context()
ctx.ResponseWriterStatus = http.StatusOK
return ctx
}
func releaseContext(ctx *plugin.Context) {
contextPool.Put(ctx)
}
7. 构建网关服务器
现在,我们将所有组件整合起来,构建一个完整的网关服务器。
// main.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil" // Go标准库的反向代理
"net/url"
"os"
"os/signal"
"syscall"
"time"
"gateway/plugin"
"gateway/router"
)
// GatewayServer 网关服务器
type GatewayServer struct {
routerMatcher router.RouteMatcher
configManager router.ConfigManager
serviceDiscovery router.ServiceDiscovery
pluginManager *plugin.PluginManager
proxy *httputil.ReverseProxy
server *http.Server
upstreams sync.Map // map[string]*router.Upstream
}
func NewGatewayServer() (*GatewayServer, error) {
// 初始化插件管理器并注册插件
pm := plugin.NewPluginManager()
if err := pm.RegisterPlugin(plugin.NewAuthPlugin()); err != nil {
return nil, fmt.Errorf("failed to register auth plugin: %w", err)
}
if err := pm.RegisterPlugin(plugin.NewRateLimitPlugin()); err != nil {
return nil, fmt.Errorf("failed to register rate limit plugin: %w", err)
}
if err := pm.RegisterPlugin(plugin.NewLoggerPlugin()); err != nil {
return nil, fmt.Errorf("failed to register logger plugin: %w", err)
}
// 初始化路由匹配器
rm := router.NewDefaultRouteMatcher()
// 初始化配置管理器和服务发现
// 这里使用InMemoryConfigManager和KubernetesServiceDiscovery作为示例
// 生产环境可以根据需要选择
cm := router.NewInMemoryConfigManager() // 假设从内存加载路由
// k8sNamespace := os.Getenv("KUBERNETES_NAMESPACE")
// if k8sNamespace == "" {
// k8sNamespace = "default"
// }
// sd, err := router.NewKubernetesServiceDiscovery(k8sNamespace) // 假设从K8s发现服务
// if err != nil {
// log.Fatalf("Failed to create K8s service discovery: %v", err)
// }
// 为了本地测试方便,这里直接使用ConfigManager提供的模拟Upstream
sd := &dummyServiceDiscovery{upstreams: make(map[string]*router.Upstream)}
gw := &GatewayServer{
routerMatcher: rm,
configManager: cm,
serviceDiscovery: sd,
pluginManager: pm,
upstreams: sync.Map{},
}
// 初始化反向代理,并设置Director函数
gw.proxy = &httputil.ReverseProxy{
Director: gw.proxyDirector,
ErrorHandler: gw.proxyErrorHandler,
}
// 订阅配置更新
cm.SubscribeRoutes(gw.onRoutesUpdate)
cm.SubscribeUpstreams(gw.onUpstreamsUpdate) // InMemoryConfigManager 会模拟更新Upstream
// sd.Subscribe(gw.onUpstreamsUpdate) // KubernetesServiceDiscovery 会通过这个回调更新Upstream
return gw, nil
}
// onRoutesUpdate 处理路由配置更新
func (gw *GatewayServer) onRoutesUpdate(routes []router.Route) {
log.Println("Received route update. Rebuilding router matcher...")
// 清空旧路由并重新添加 (简单粗暴,生产环境可以优化为增量更新)
// 这里需要重新创建 RouteMatcher 或者提供批量更新接口
// 为了简化,我们直接替换整个路由匹配器
newRouterMatcher := router.NewDefaultRouteMatcher()
for _, r := range routes {
if err := newRouterMatcher.AddRoute(r); err != nil {
log.Printf("Error adding route %s: %v", r.ID, err)
}
}
gw.routerMatcher = newRouterMatcher
log.Printf("%d routes loaded.", len(routes))
}
// onUpstreamsUpdate 处理上游服务更新
func (gw *GatewayServer) onUpstreamsUpdate(upstreams []router.Upstream) {
log.Println("Received upstream update. Refreshing upstream cache...")
newUpstreams := sync.Map{}
for _, up := range upstreams {
// 确保每个Upstream都有一个负载均衡器实例
up.lb = router.GetLoadBalancer(up.Strategy, up.Endpoints)
up.activeEndpoints = up.Endpoints // 暂时所有都是活跃的,健康检查会更新状态
newUpstreams.Store(up.Name, &up)
}
gw.upstreams = newUpstreams
log.Printf("%d upstreams loaded.", len(upstreams))
}
// proxyDirector 是反向代理的核心逻辑,负责修改请求
func (gw *GatewayServer) proxyDirector(req *http.Request) {
// 这是一个占位符,实际的请求修改发生在 HandleRequest 中
// httputil.ReverseProxy 会在调用 Director 之前复制请求
// 所以这里不需要对 req 进行深拷贝
}
// proxyErrorHandler 处理反向代理的错误
func (gw *GatewayServer) proxyErrorHandler(rw http.ResponseWriter, req *http.Request, err error) {
log.Printf("Proxy error for %s %s: %v", req.Method, req.URL.Path, err)
rw.WriteHeader(http.StatusBadGateway)
rw.Write([]byte(fmt.Sprintf("Bad Gateway: %v", err)))
}
// HandleRequest 是所有传入请求的入口点
func (gw *GatewayServer) HandleRequest(w http.ResponseWriter, r *http.Request) {
// 从对象池获取Context
ctx := acquireContext(w, r)
defer releaseContext(ctx) // 确保Context被放回池中
// 设置GoContext
ctx.GoContext = r.Context()
// 1. 路由匹配
route, pathParams, err := gw.routerMatcher.Match(r)
if err != nil {
log.Printf("Route matching failed for %s %s: %v", r.Method, r.URL.Path, err)
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(fmt.Sprintf("Not Found: %v", err)))
return
}
ctx.RouteID = route.ID
ctx.PathParams = pathParams
// 2. 执行插件链
if err := gw.pluginManager.ExecutePlugins(ctx, route.Plugins); err != nil {
log.Printf("Plugin execution failed for route %s: %v", route.ID, err)
// 插件错误,但可能没有中止请求,这里需要根据ctx.Abort判断
if !ctx.Abort { // 如果插件报错但未中止,则返回500
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("Internal Server Error: %v", err)))
}
// 如果插件中止了,响应已经在插件中发送
return
}
if ctx.Abort { // 如果插件中止了请求,直接返回
log.Printf("Request aborted by plugin for route %s. Status: %d", route.ID, ctx.ResponseWriterStatus)
// 响应已经由插件写入,直接返回
return
}
// 3. 获取上游服务
upstreamVal, ok := gw.upstreams.Load(route.Target)
if !ok {
log.Printf("Upstream '%s' not found for route %s", route.Target, route.ID)
w.WriteHeader(http.StatusBadGateway)
w.Write([]byte(fmt.Sprintf("Bad Gateway: Upstream '%s' not found", route.Target)))
return
}
upstream := upstreamVal.(*router.Upstream)
// 4. 负载均衡选择一个后端实例
endpoint, err := upstream.lb.Next()
if err != nil {
log.Printf("Load balancing failed for upstream '%s': %v", upstream.Name, err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(fmt.Sprintf("Service Unavailable: %v", err)))
return
}
// 5. 修改请求,准备转发
targetURL, err := url.Parse(endpoint.URL)
if err != nil {
log.Printf("Invalid target URL '%s': %v", endpoint.URL, err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error: Invalid upstream URL"))
return
}
// 修改请求的URL
originalPath := r.URL.Path
r.URL.Scheme = targetURL.Scheme
r.URL.Host = targetURL.Host
r.URL.Path = targetURL.Path + r.URL.Path // 默认拼接
if route.StripPath {
// 移除路由匹配的前缀
// 例如:Route.Path=/api/users/:id, Req.Path=/api/users/123
// targetURL.Path=/backend, 转发到 /backend/123
strippedPath := strings.TrimPrefix(originalPath, route.Path)
r.URL.Path = targetURL.Path + strippedPath
}
// 设置Host头部为后端服务的主机
r.Host = targetURL.Host // 或者根据配置选择是否保留原始Host
ctx.TargetURL = r.URL.String()
log.Printf("Proxying request %s %s to %s", r.Method, originalPath, ctx.TargetURL)
// 6. 转发请求
gw.proxy.ServeHTTP(w, r)
// 记录请求耗时 (如果 LoggerPlugin 记录了开始时间)
if startTime, ok := ctx.GoContext.Value("requestStartTime").(time.Time); ok {
log.Printf("Request %s %s processed in %s", originalPath, r.Method, time.Since(startTime))
}
}
// Start 启动网关服务器
func (gw *GatewayServer) Start(addr string) error {
// 启动配置管理器和服务发现
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := gw.configManager.Start(ctx); err != nil {
return fmt.Errorf("failed to start config manager: %w", err)
}
if err := gw.serviceDiscovery.Start(ctx); err != nil {
return fmt.Errorf("failed to start service discovery: %w", err)
}
gw.server = &http.Server{
Addr: addr,
Handler: http.HandlerFunc(gw.HandleRequest),
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
log.Printf("Gateway server starting on %s...", addr)
return gw.server.ListenAndServe()
}
// Shutdown 优雅关闭网关服务器
func (gw *GatewayServer) Shutdown(ctx context.Context) error {
log.Println("Shutting down gateway server gracefully...")
gw.configManager.Stop()
gw.serviceDiscovery.Stop()
return gw.server.Shutdown(ctx)
}
// dummyServiceDiscovery for local testing without K8s
type dummyServiceDiscovery struct {
upstreams map[string]*router.Upstream
subscribers []func(upstreams []router.Upstream)
mu sync.RWMutex
}
func (d *dummyServiceDiscovery) GetUpstream(name string) (*router.Upstream, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
up, ok := d.upstreams[name]
return up,