深入 ‘Kubernetes Scheduler Plugins’:如何编写 Go 代码干预 K8s 的调度决策以实现拓扑感知(Topology-aware)

深入 Kubernetes Scheduler Plugins:编写 Go 代码实现拓扑感知调度

各位技术同行,大家好。今天我们将深入探讨 Kubernetes 调度器的核心机制,特别是如何利用其强大的插件架构,编写 Go 语言代码,实现高度定制化的拓扑感知调度策略。在当今大规模、分布式云原生环境中,仅仅依靠默认调度器往往不足以满足复杂的业务需求。理解并掌握调度器插件的开发,是提升集群资源利用率、优化应用性能、增强系统健壮性的关键一步。

一、Kubernetes 调度的基石与定制化的必要性

Kubernetes 调度器 (kube-scheduler) 是集群控制平面中的一个关键组件,其核心职责是根据一系列预设规则和优先级,为新创建但尚未分配到节点的 Pod 选择一个最合适的节点。这个过程通常分为两个阶段:

  1. 过滤 (Predicates / Filter):从所有可用节点中筛选出符合 Pod 运行条件(如资源请求、节点选择器、亲和性/反亲和性规则、端口冲突等)的节点。
  2. 打分 (Priorities / Score):对过滤后的节点进行打分,选择分数最高的节点来调度 Pod。

默认的 Kubernetes 调度器已经相当智能和强大,它内置了多种策略来处理常见的调度场景,例如:

  • 资源匹配:确保节点有足够的 CPU、内存等资源。
  • 节点亲和性/反亲和性:根据节点标签将 Pod 吸引或排斥到特定节点。
  • Pod 亲和性/反亲和性:根据其他 Pod 的位置将 Pod 部署在一起或分散开。
  • Taints 和 Tolerations:允许节点驱逐不容忍特定污点的 Pod。
  • PodTopologySpread:确保 Pod 在不同的拓扑域(如区域、可用区、主机)中均匀分布。

然而,在面对更高级、更复杂的业务场景时,默认调度器的能力可能会显得不足。例如:

  • 高度定制的拓扑感知调度:默认的 PodTopologySpread 提供了基本的拓扑分散能力,但如果我们需要更精细的控制,例如,优先将计算调度到离数据源最近的可用区,或者在某个可用区资源不足时,以特定策略进行跨可用区调度,同时兼顾成本和网络延迟,就需要自定义逻辑。
  • 特定硬件/软件加速器调度:将 Pod 调度到具有 GPU、FPGA 等特定硬件的节点,并考虑这些资源的利用率。
  • 批处理作业调度优化:针对短时、大量计算的批处理任务,可能需要与在线服务不同的调度策略。
  • 复杂的资源预留/配额管理:在多租户环境中,实现更灵活、更细粒度的资源隔离和共享策略。
  • 外部系统集成:调度决策需要与外部系统(如容量管理系统、成本优化系统)进行交互。

正是为了应对这些挑战,Kubernetes 调度器引入了插件机制,允许开发者通过实现特定的接口,扩展调度器的行为,甚至替换其核心逻辑。

二、Kubernetes Scheduler 插件架构深度解析

Kubernetes 1.15 版本引入了调度器框架(Scheduler Framework),并在后续版本中不断完善,最终形成了我们今天使用的插件架构。这个架构的核心思想是将调度过程分解为一系列可扩展的阶段(Extension Points),每个阶段都可以注册一个或多个插件来执行自定义逻辑。

2.1 调度器框架的扩展点 (Extension Points)

调度器框架定义了以下主要扩展点。理解这些扩展点的执行顺序和作用至关重要:

扩展点名称 执行时机 作用 典型插件示例
QueueSort 调度队列中的 Pod 排序阶段 决定哪个 Pod 应该优先被调度。 基于优先级、创建时间、资源请求等排序。
PreFilter 过滤阶段之前 对 Pod 进行预处理,提前进行一些检查,或缓存 Pod 相关信息。 检查 Pod 的全局约束,准备后续过滤所需的数据。
Filter 过滤阶段 筛选出无法运行 Pod 的节点。 检查节点资源、端口、标签、污点等是否满足 Pod 要求。
PostFilter 过滤阶段之后,所有节点都被过滤掉时 处理 Pod 无法调度的情况,例如触发事件或尝试其他调度策略。 记录无法调度的原因,尝试回退到其他队列。
PreScore 打分阶段之前 在节点打分前进行预处理,计算共享信息。 为打分阶段准备节点或 Pod 的聚合数据。
Score 打分阶段 对每个节点进行打分,表示其适合度。 资源使用率、节点亲和性、Pod 反亲和性等。
NormalizeScore 打分阶段之后,所有节点分数汇总之前 对所有节点的原始分数进行归一化处理,确保分数在合理范围内。 将分数从不同插件的特定范围映射到统一的 0-100 范围。
Reserve 绑定阶段之前,Pod 已经选定节点,但尚未真正绑定 预留节点资源,防止其他调度器同时选择该节点。 资源预留,更新内部状态。
Permit 允许阶段,Pod 等待绑定 允许或拒绝 Pod 绑定到选定节点,可以延迟绑定。 外部准入控制,等待外部审批。
PreBind 绑定阶段之前 执行 Pod 绑定前的最后操作,例如准备存储卷。 挂载 CSI 卷、初始化网络。
Bind 绑定阶段 将 Pod 真正绑定到节点。 更新 Pod 的 spec.nodeName 字段。默认调度器通常只使用一个 Bind 插件。
PostBind 绑定阶段之后 执行绑定后的清理或通知操作。 清理预留资源,发送通知,更新外部系统状态。

执行顺序简化图示:

                                          +-----------------+
                                          |   Pod 调度请求  |
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   QueueSort     | (排序调度队列中的 Pod)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   PreFilter     | (预处理 Pod 信息)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   Filter        | (筛选可用节点)
                                          +--------+--------+
                                                   |
                                      (如果无可用节点)
                                      +----------+----------+
                                      |   PostFilter        | (处理无可用节点情况)
                                      +----------+----------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   PreScore      | (预处理打分信息)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   Score         | (为节点打分)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          | NormalizeScore  | (归一化分数)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   Reserve       | (预留节点资源)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   Permit        | (允许/拒绝绑定)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   PreBind       | (绑定前操作)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   Bind          | (实际绑定 Pod)
                                          +--------+--------+
                                                   |
                                                   v
                                          +--------+--------+
                                          |   PostBind      | (绑定后操作)
                                          +--------+--------+
                                                   |
                                                   v
                                          +-----------------+
                                          |   调度完成      |
                                          +-----------------+

2.2 插件的注册与配置

要启用自定义插件,我们需要创建一个 KubeSchedulerConfiguration 对象,并在其中指定要使用的插件及其配置。这个配置通常通过 ConfigMap 提供给 kube-scheduler 进程。

# plugin.yaml
apiVersion: kubescheduler.config.k8s.io/v1beta3 # 或 v1beta2,根据K8s版本选择
kind: KubeSchedulerConfiguration
clientConnection:
  kubeconfig: /etc/kubernetes/scheduler.conf
leaderElection:
  leaderElect: true
profiles:
  - schedulerName: my-topology-scheduler # 自定义调度器名称
    plugins:
      queueSort:
        enabled:
          - name: PrioritySort # 默认的优先级排序
      preFilter:
        enabled:
          - name: NodeResourcesFit
          - name: NodePorts
          - name: PodTopologySpread
          - name: InterPodAffinity
          - name: TopologyAware # 我们的自定义插件
      filter:
        enabled:
          - name: NodeResourcesFit
          - name: NodePorts
          - name: NodeName
          - name: TaintToleration
          - name: InterPodAffinity
          - name: NodeAffinity
          - name: PodTopologySpread
          - name: TopologyAware # 我们的自定义插件
      postFilter:
        enabled:
          - name: DefaultPreemption
      preScore:
        enabled:
          - name: InterPodAffinity
          - name: PodTopologySpread
          - name: TopologyAware # 我们的自定义插件
      score:
        enabled:
          - name: NodeResourcesBalancedAllocation
          - name: NodeResourcesLeastAllocated
          - name: NodeAffinity
            weight: 1
          - name: InterPodAffinity
            weight: 1
          - name: PodTopologySpread
            weight: 2
          - name: TaintToleration
            weight: 1
          - name: TopologyAware # 我们的自定义插件
            weight: 3 # 给予较高的权重,体现其重要性
      reserve:
        enabled:
          - name: VolumeBinding
      permit:
        enabled: []
      preBind:
        enabled:
          - name: VolumeBinding
      bind:
        enabled:
          - name: DefaultBinder
      postBind:
        enabled: []

在上面的配置中,我们定义了一个名为 my-topology-scheduler 的调度器配置,并在多个扩展点(preFilterfilterpreScorescore)中启用了我们自定义的 TopologyAware 插件。weight 字段在 score 阶段尤为重要,它决定了该插件的打分结果对最终节点选择的影响程度。

三、拓扑感知调度:理念与挑战

拓扑感知调度旨在根据集群的物理或逻辑布局(拓扑结构)来做出调度决策。这些拓扑结构通常由节点标签表示,例如:

  • topology.kubernetes.io/region: 云服务商的区域,如 us-east-1
  • topology.kubernetes.io/zone: 云服务商的可用区,如 us-east-1a
  • kubernetes.io/hostname: 主机名。

为什么需要拓扑感知调度?

  1. 数据本地性 (Data Locality):将计算工作负载调度到离其所需数据最近的节点,可以显著降低网络延迟,提高应用程序性能。例如,将 Kafka 消费者调度到与 Kafka Broker 相同的可用区。
  2. 故障域隔离 (Failure Domain Isolation):将同一服务的 Pod 分散到不同的故障域(如不同的可用区、机架)中,可以防止单点故障。即使一个可用区发生故障,服务仍然可以在其他可用区继续运行。
  3. 网络成本优化 (Network Cost Optimization):跨区域或跨可用区的数据传输通常会产生额外的网络费用。拓扑感知调度有助于将流量限制在同一区域或可用区内,从而降低运营成本。
  4. 性能与带宽:在同一物理网络拓扑中通信的 Pod 通常具有更高的带宽和更低的延迟。

Kubernetes 已经提供了一些内置机制来支持拓扑感知:

  • Node Affinity / Anti-Affinity:可以通过 nodeSelectornodeAffinity 将 Pod 调度到特定区域或可用区的节点。
  • Pod Anti-Affinity:结合 topologyKey 可以将 Pod 分散到不同的拓扑域。例如,topologyKey: topology.kubernetes.io/zone 可以确保来自同一服务的 Pod 不在同一个可用区。
  • PodTopologySpread:这是 Kubernetes 1.16+ 引入的专门用于拓扑分散的 API 对象,允许用户定义 Pod 在指定拓扑域内的分布策略,例如,确保每个可用区都有大致相同数量的 Pod。

尽管这些内置机制非常有用,但在某些场景下仍有局限性:

  • 复杂优先级:当需要同时考虑数据本地性、故障域隔离和资源利用率等多个因素,并赋予它们不同的优先级时,内置机制可能难以表达。
  • 动态调整:内置机制通常是静态配置的。如果需要根据集群的实时状态(如可用区负载、网络拥堵)动态调整调度策略,就需要更灵活的插件。
  • 特定业务逻辑:某些业务可能存在独特的拓扑需求,例如,只在特定区域部署某种类型的服务,或者在主备架构中强制主服务和备用服务位于不同区域,但又不能离得太远。

这就是自定义调度器插件发挥作用的地方。我们将编写一个插件,实现更精细的拓扑感知调度策略。

四、编写自定义拓扑感知调度插件

现在,让我们通过一个具体的 Go 语言示例,来编写一个名为 TopologyAware 的调度器插件。

场景设定:

我们的集群跨越多个 Region 和 Zone。我们希望实现以下调度策略:

  1. 区域/可用区亲和性优先:如果 Pod 通过 topology.kubernetes.io/regiontopology.kubernetes.io/zone 标签明确表达了偏好的拓扑域,则优先将其调度到该域内的节点。
  2. 负载均衡:在满足亲和性要求的前提下,倾向于选择那些资源利用率较低的节点。
  3. 故障域分散:对于带有特定标签(例如 app.kubernetes.io/component=web)的 Pod,我们希望将其分散到不同的可用区,避免过度集中。
  4. 优雅降级:如果所有亲和性节点都不可用,允许 Pod 调度到其他区域或可用区,但得分会较低。

4.1 项目结构

my-topology-scheduler-plugin/
├── cmd/
│   └── main.go
├── go.mod
├── go.sum
├── pkg/
│   └── plugin/
│       └── topologyaware.go
└── scheduler-config.yaml # 我们的调度器配置

4.2 核心 Go 代码实现 (pkg/plugin/topologyaware.go)

首先,我们需要定义插件的结构体,并实现调度器框架定义的接口。

package plugin

import (
    "context"
    "fmt"
    "sort"
    "strings"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/klog/v2"
    framework "k8s.io/kubernetes/pkg/scheduler/framework"
    "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)

const (
    // PluginName 是我们插件的名称
    PluginName = "TopologyAware"

    // 拓扑标签键
    NodeRegionLabelKey = "topology.kubernetes.io/region"
    NodeZoneLabelKey   = "topology.kubernetes.io/zone"

    // Pod 偏好标签/注解
    PodPreferredRegionAnnotationKey = "topology-aware-scheduler.example.com/preferred-region"
    PodPreferredZoneAnnotationKey   = "topology-aware-scheduler.example.com/preferred-zone"
    PodDisperseComponentLabelKey    = "app.kubernetes.io/component" // 用于故障域分散的组件标签

    // CycleStateKey 用于在调度周期中传递信息
    stateKey framework.StateKey = "TopologyAwareState"
)

// TopologyAwareState 是在调度周期内传递的状态信息
type TopologyAwareState struct {
    PreferredRegion string
    PreferredZone   string
    DisperseComponent string // 用于分散的组件值
}

// Clone 实现了 framework.StateData 接口
func (s *TopologyAwareState) Clone() framework.StateData {
    return &TopologyAwareState{
        PreferredRegion: s.PreferredRegion,
        PreferredZone:   s.PreferredZone,
        DisperseComponent: s.DisperseComponent,
    }
}

// TopologyAware 是我们自定义的拓扑感知调度插件
type TopologyAware struct {
    handle framework.Handle
}

// New 函数是插件的构造器,它会在调度器启动时被调用
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
    klog.V(4).Infof("Initializing plugin %s", PluginName)
    return &TopologyAware{handle: h}, nil
}

// Name 返回插件的名称
func (ta *TopologyAware) Name() string {
    return PluginName
}

// PreFilter 检查 Pod 的偏好拓扑信息,并将其存储到 CycleState 中
func (ta *TopologyAware) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
    preferredRegion := p.Annotations[PodPreferredRegionAnnotationKey]
    preferredZone := p.Annotations[PodPreferredZoneAnnotationKey]
    disperseComponent := p.Labels[PodDisperseComponentLabelKey]

    s := &TopologyAwareState{
        PreferredRegion: preferredRegion,
        PreferredZone:   preferredZone,
        DisperseComponent: disperseComponent,
    }
    state.Write(stateKey, s)

    klog.V(5).Infof("PreFilter for Pod %s/%s: preferredRegion=%s, preferredZone=%s, disperseComponent=%s",
        p.Namespace, p.Name, preferredRegion, preferredZone, disperseComponent)
    return nil, nil // 返回 nil, nil 表示 PreFilter 成功
}

// PreFilterExtensions 提供了 PreFilter 阶段的扩展点,这里我们不需要
func (ta *TopologyAware) PreFilterExtensions() framework.PreFilterExtensions {
    return nil
}

// Filter 阶段:根据 Pod 的偏好拓扑信息过滤不符合条件的节点
func (ta *TopologyAware) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    s, err := getState(state)
    if err != nil {
        return framework.As="error", err.Error())
    }

    node := nodeInfo.Node()
    if node == nil {
        return framework.NewStatus(framework.Error, "node not found")
    }

    nodeRegion := node.Labels[NodeRegionLabelKey]
    nodeZone := node.Labels[NodeZoneLabelKey]

    // 如果 Pod 指定了偏好区域,且节点不在该区域,则过滤掉
    if s.PreferredRegion != "" && s.PreferredRegion != nodeRegion {
        klog.V(5).Infof("Filter Pod %s/%s: Node %s (Region %s) does not match preferred region %s",
            pod.Namespace, pod.Name, node.Name, nodeRegion, s.PreferredRegion)
        return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node region %s does not match preferred region %s", nodeRegion, s.PreferredRegion))
    }

    // 如果 Pod 指定了偏好可用区,且节点不在该可用区,则过滤掉
    if s.PreferredZone != "" && s.PreferredZone != nodeZone {
        klog.V(5).Infof("Filter Pod %s/%s: Node %s (Zone %s) does not match preferred zone %s",
            pod.Namespace, pod.Name, node.Name, nodeZone, s.PreferredZone)
        return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node zone %s does not match preferred zone %s", nodeZone, s.PreferredZone))
    }

    klog.V(5).Infof("Filter Pod %s/%s: Node %s (Region %s, Zone %s) is suitable based on topology preferences",
        pod.Namespace, pod.Name, node.Name, nodeRegion, nodeZone)
    return nil // 返回 nil 表示节点符合条件
}

// PreScore 阶段:为打分阶段准备数据,例如统计每个 Zone 已运行的 Pod 数量
func (ta *TopologyAware) PreScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodes []*v1.Node) *framework.Status {
    s, err := getState(state)
    if err != nil {
        return framework.As="error", err.Error())
    }

    if s.DisperseComponent == "" {
        return nil // 如果没有分散组件要求,则跳过
    }

    // 统计每个 Zone 中已运行的目标组件 Pod 数量
    zonePodCount := make(map[string]int)
    for _, node := range nodes {
        nodeZone := node.Labels[NodeZoneLabelKey]
        if nodeZone == "" {
            continue
        }
        // 获取该节点上所有 Pod
        nodeInfo, err := ta.handle.SnapshotSharedLister().NodeInfos().Get(node.Name)
        if err != nil {
            klog.ErrorS(err, "Failed to get node info for node", "node", node.Name)
            continue
        }
        for _, existingPod := range nodeInfo.Pods {
            if existingPod.Pod.Labels[PodDisperseComponentLabelKey] == s.DisperseComponent {
                zonePodCount[nodeZone]++
            }
        }
    }
    // 将统计结果存储到 CycleState 中
    state.Write(stateKey, &TopologyAwareState{
        PreferredRegion: s.PreferredRegion,
        PreferredZone: s.PreferredZone,
        DisperseComponent: s.DisperseComponent,
        // 存储额外数据,这里简单地用 string 序列化,实际应用中可以定义更复杂的结构
        // 为了简化,我们假设 PreScore 只是更新状态,实际应用可能需要更复杂的共享数据结构
        // 对于本示例,PreScore 主要是为了演示如何获取信息,实际打分逻辑在 Score 中直接计算
    })
    klog.V(5).Infof("PreScore for Pod %s/%s, DisperseComponent: %s, ZonePodCount: %v",
        p.Namespace, p.Name, s.DisperseComponent, zonePodCount)
    return nil
}

// Score 阶段:为每个节点打分
func (ta *TopologyAware) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
    s, err := getState(state)
    if err != nil {
        return 0, framework.As="error", err.Error())
    }

    nodeInfo, err := ta.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from snapshot error: %v", nodeName, err))
    }
    node := nodeInfo.Node()
    if node == nil {
        return 0, framework.NewStatus(framework.Error, "node not found")
    }

    nodeRegion := node.Labels[NodeRegionLabelKey]
    nodeZone := node.Labels[NodeZoneLabelKey]

    var score int64 = 0

    // 1. 区域/可用区亲和性打分 (高优先级)
    // 如果 Pod 指定了偏好区域,且节点在偏好区域,则加分
    if s.PreferredRegion != "" && s.PreferredRegion == nodeRegion {
        score += 50
        klog.V(5).Infof("Score Pod %s/%s on Node %s: +50 for preferred region match (%s)", p.Namespace, p.Name, node.Name, nodeRegion)
    } else if s.PreferredRegion != "" && s.PreferredRegion != nodeRegion {
        // 如果 Pod 指定了偏好区域,但节点不在,则减分 (除非是 Filter 已经过滤掉,这里是降级情况)
        // 实际上,Filter 已经把不符合条件的节点过滤了,这里是为了演示打分逻辑的完整性
        score -= 20 // 降级到其他区域的惩罚
        klog.V(5).Infof("Score Pod %s/%s on Node %s: -20 for non-preferred region (%s)", p.Namespace, p.Name, node.Name, nodeRegion)
    }

    // 如果 Pod 指定了偏好可用区,且节点在偏好可用区,则加分
    if s.PreferredZone != "" && s.PreferredZone == nodeZone {
        score += 30
        klog.V(5).Infof("Score Pod %s/%s on Node %s: +30 for preferred zone match (%s)", p.Namespace, p.Name, node.Name, nodeZone)
    } else if s.PreferredZone != "" && s.PreferredZone != nodeZone {
        score -= 10 // 降级到其他可用区的惩罚
        klog.V(5).Infof("Score Pod %s/%s on Node %s: -10 for non-preferred zone (%s)", p.Namespace, p.Name, node.Name, nodeZone)
    }

    // 2. 负载均衡打分 (中优先级)
    // 简单的负载均衡:根据节点上 Pod 数量打分,Pod 越少分数越高
    // 实际生产中会根据 CPU/Memory 利用率等指标
    nodePodsCount := len(nodeInfo.Pods)
    // 假设最大 Pod 数量是 100,分数范围 0-20
    // Pod 越少,分数越高。100 - nodePodsCount / 5
    loadBalanceScore := int64(framework.MaxNodeScore - nodePodsCount) / 5
    if loadBalanceScore < 0 {
        loadBalanceScore = 0
    }
    score += loadBalanceScore
    klog.V(5).Infof("Score Pod %s/%s on Node %s: +%d for load balance (Pods: %d)", p.Namespace, p.Name, node.Name, loadBalanceScore, nodePodsCount)

    // 3. 故障域分散打分 (中优先级)
    if s.DisperseComponent != "" {
        currentZonePods := 0
        for _, existingPod := range nodeInfo.Pods {
            if existingPod.Pod.Labels[PodDisperseComponentLabelKey] == s.DisperseComponent {
                currentZonePods++
            }
        }

        // 目标是让每个 Zone 的 Pod 数量尽可能少。
        // 在当前 Zone 内,Pod 数量越多,分数越低。
        // 假设每个 Zone 最多允许 10 个相同组件的 Pod
        disperseScore := int64(10 - currentZonePods) * 5
        if disperseScore < -50 { // 最低分数限制
            disperseScore = -50
        }
        score += disperseScore
        klog.V(5).Infof("Score Pod %s/%s on Node %s: +%d for disperse component (Zone %s, current component pods: %d)",
            p.Namespace, p.Name, node.Name, disperseScore, nodeZone, currentZonePods)
    }

    // 确保分数在合法范围内
    if score < framework.MinNodeScore {
        score = framework.MinNodeScore
    }
    if score > framework.MaxNodeScore {
        score = framework.MaxNodeScore
    }

    klog.V(4).Infof("Score Pod %s/%s on Node %s: Final Score = %d", p.Namespace, p.Name, node.Name, score)
    return score, nil
}

// NormalizeScore 阶段:将 Score 插件的打分结果归一化到 0-framework.MaxNodeScore 之间。
// 如果 Score 插件自身已经确保了分数在合法范围内,这个阶段可以不做特殊处理。
// 但通常建议实现,以防万一或与其他插件协调。
func (ta *TopologyAware) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    // 找到最大和最小分数
    var maxScore int64 = 0
    var minScore int64 = framework.MaxNodeScore
    for _, nodeScore := range scores {
        if nodeScore.Score > maxScore {
            maxScore = nodeScore.Score
        }
        if nodeScore.Score < minScore {
            minScore = nodeScore.Score
        }
    }

    // 如果所有分数都相同,或者范围为零,则无需归一化
    if maxScore == minScore {
        for i := range scores {
            scores[i].Score = framework.MaxNodeScore / 2 // 统一设置为中间值
        }
        return nil
    }

    // 归一化到 [0, MaxNodeScore] 范围
    // newScore = (oldScore - minScore) * MaxNodeScore / (maxScore - minScore)
    for i := range scores {
        scores[i].Score = (scores[i].Score - minScore) * framework.MaxNodeScore / (maxScore - minScore)
        klog.V(5).Infof("Normalized Score for Node %s: %d", scores[i].Name, scores[i].Score)
    }

    return nil
}

// 辅助函数:从 CycleState 中获取状态
func getState(state *framework.CycleState) (*TopologyAwareState, error) {
    s, err := state.Read(stateKey)
    if err != nil {
        return nil, fmt.Errorf("error reading state from cycleState: %w", err)
    }
    ts, ok := s.(*TopologyAwareState)
    if !ok {
        return nil, fmt.Errorf("unexpected state type, expected *TopologyAwareState")
    }
    return ts, nil
}

代码解释:

  1. TopologyAwareState: 这是一个自定义结构体,用于在同一个调度周期内,在不同的插件扩展点之间传递数据。我们在这里存储了 Pod 偏好的区域、可用区以及用于分散的组件标签值。
  2. New: 这是插件的工厂函数,它在调度器启动时被调用,负责创建 TopologyAware 插件实例。
  3. Name(): 返回插件的唯一名称 TopologyAware,这与我们在 scheduler-config.yaml 中配置的名称一致。
  4. PreFilter: 在实际过滤节点之前执行。它从 Pod 的注解中读取用户指定的偏好区域和可用区,以及 Pod 标签中的分散组件信息,并将这些信息存储在 CycleState 中,以便后续的 FilterScore 阶段使用。
  5. Filter: 这个阶段根据 PreFilter 中获取的 Pod 偏好,检查每个节点。如果节点所在的区域或可用区不符合 Pod 的强偏好,则直接将该节点标记为不可调度 (UnschedulableAndUnresolvable)。这确保了 Pod 优先在符合其拓扑偏好的节点上调度。
  6. PreScore: 在 Score 阶段之前执行。这里我们演示了如何遍历所有节点并获取它们的信息,为 Score 阶段的复杂计算做准备。在本例中,我们统计了每个 Zone 中已运行的目标组件 Pod 的数量,以便在 Score 阶段用于分散策略。
  7. Score: 这是核心逻辑所在。它为每个符合 Filter 条件的节点计算一个分数:
    • 区域/可用区亲和性:如果节点与 Pod 的偏好区域/可用区匹配,则给予高分。如果节点不匹配(在 Filter 阶段没有被过滤掉,意味着 Pod 没有强偏好,或者我们允许降级),则给予较低的分数或负分。
    • 负载均衡:简单地根据节点上已运行 Pod 的数量进行打分,Pod 数量越少分数越高。这模拟了将 Pod 调度到负载较低节点的策略。
    • 故障域分散:如果 Pod 指定了 DisperseComponent,则检查该节点所在可用区中已运行的相同组件的 Pod 数量。数量越少,分数越高,鼓励 Pod 分散。
    • 最终分数会根据这些策略的权重累加。
  8. NormalizeScore: 在所有 Score 插件执行完毕后,对所有节点的原始分数进行归一化处理,确保最终分数都在 [0, framework.MaxNodeScore] 范围内。这对于协调不同 Score 插件的打分范围非常重要。
  9. getState: 辅助函数,用于从 CycleState 中安全地读取我们插件的状态。

4.3 启动自定义调度器 (cmd/main.go)

接下来,我们需要一个 main 函数来启动我们的自定义调度器。这个 main 函数将使用 k8s.io/kubernetes/cmd/kube-scheduler 包中的 scheduler.NewSchedulerCommand 函数来构建调度器命令,并注册我们的插件。

package main

import (
    "math/rand"
    "os"
    "time"

    "github.com/myorg/my-topology-scheduler-plugin/pkg/plugin" // 替换为你的模块路径

    "k8s.io/component-base/logs"
    "k8s.io/kubernetes/cmd/kube-scheduler/app"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    logs.InitLogs()
    defer logs.FlushLogs()

    // 创建调度器命令,并注册我们的自定义插件
    command := app.NewSchedulerCommand(
        app.With"=plugin", plugin.PluginName, plugin.New),
    )

    // 如果命令行参数为空,则添加默认配置路径
    // 这通常在容器中运行调度器时很有用
    if len(os.Args) <= 1 {
        os.Args = append(os.Args, "--config=/etc/kubernetes/scheduler-config.yaml")
    }

    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

代码解释:

  • app.NewSchedulerCommand 是构建 kube-scheduler 命令的入口。
  • app.WithPlugin 允许我们将自定义插件注册到调度器框架中。我们传入插件的名称 (plugin.PluginName) 和它的工厂函数 (plugin.New)。

4.4 调度器配置 (scheduler-config.yaml)

将这个配置文件保存为 scheduler-config.yaml

# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1beta3 # 根据你的K8s版本选择 v1beta2 或 v1beta3
kind: KubeSchedulerConfiguration
clientConnection:
  kubeconfig: /etc/kubernetes/scheduler.conf # 调度器连接 Kube API Server 的 kubeconfig
leaderElection:
  leaderElect: true # 启用领导者选举,确保只有一个调度器实例活跃
profiles:
  - schedulerName: my-topology-scheduler # 这个名称应该与 Pod.spec.schedulerName 匹配
    plugins:
      # 默认的 QueueSort 插件,按优先级排序 Pod
      queueSort:
        enabled:
          - name: PrioritySort
      # PreFilter 阶段,我们的插件在此处读取 Pod 拓扑偏好
      preFilter:
        enabled:
          - name: NodeResourcesFit
          - name: NodePorts
          - name: PodTopologySpread
          - name: InterPodAffinity
          - name: TopologyAware # 启用我们的插件
      # Filter 阶段,我们的插件在此处过滤不符合拓扑偏好的节点
      filter:
        enabled:
          - name: NodeResourcesFit
          - name: NodePorts
          - name: NodeName
          - name: TaintToleration
          - name: InterPodAffinity
          - name: NodeAffinity
          - name: PodTopologySpread
          - name: TopologyAware # 启用我们的插件
      # PostFilter 阶段,处理没有可用节点的情况
      postFilter:
        enabled:
          - name: DefaultPreemption
      # PreScore 阶段,我们的插件在此处准备打分数据,如统计 Zone 内 Pod 数量
      preScore:
        enabled:
          - name: InterPodAffinity
          - name: PodTopologySpread
          - name: TopologyAware # 启用我们的插件
      # Score 阶段,我们的插件在此处为节点打分
      score:
        enabled:
          - name: NodeResourcesBalancedAllocation
          - name: NodeResourcesLeastAllocated
          - name: NodeAffinity
            weight: 1
          - name: InterPodAffinity
            weight: 1
          - name: PodTopologySpread
            weight: 2
          - name: TaintToleration
            weight: 1
          - name: TopologyAware # 启用我们的插件,并赋予较高权重
            weight: 3 # 权重越高,该插件的打分对最终结果影响越大
      # NormalizeScore 阶段,对分数进行归一化
      normalizeScore:
        enabled:
          - name: TopologyAware # 启用我们的插件
      # Reserve 阶段,用于预留资源
      reserve:
        enabled:
          - name: VolumeBinding
      # Permit 阶段,用于延迟或拒绝绑定
      permit:
        enabled: []
      # PreBind 阶段,绑定前操作
      preBind:
        enabled:
          - name: VolumeBinding
      # Bind 阶段,实际执行绑定
      bind:
        enabled:
          - name: DefaultBinder
      # PostBind 阶段,绑定后操作
      postBind:
        enabled: []

五、编译、部署与测试

5.1 编译自定义调度器

my-topology-scheduler-plugin 目录下执行:

go mod init github.com/myorg/my-topology-scheduler-plugin # 替换为你的实际模块路径
go mod tidy
go build -o kube-scheduler-topology cmd/main.go

这将生成一个名为 kube-scheduler-topology 的二进制文件。

5.2 部署自定义调度器

通常,部署自定义调度器有两种方式:

  1. 替换默认调度器:修改现有 kube-scheduler Deployment 或 Static Pod 的配置,将其镜像替换为包含我们自定义调度器的镜像。这种方式风险较高,不建议在生产环境直接使用。
  2. 作为辅助调度器运行:这是推荐的方式。我们将自己的调度器作为一个独立的 Deployment 运行,并为它指定一个不同的 schedulerName。Pod 可以通过 spec.schedulerName 字段选择使用哪个调度器。

步骤(作为辅助调度器):

  1. 创建 Docker 镜像
    创建一个 Dockerfile

    # Dockerfile
    FROM golang:1.21-alpine AS builder
    
    WORKDIR /app
    COPY go.mod go.sum ./
    RUN go mod download
    
    COPY . .
    RUN CGO_ENABLED=0 go build -o kube-scheduler-topology cmd/main.go
    
    FROM alpine:latest
    RUN apk --no-cache add ca-certificates
    WORKDIR /usr/local/bin
    COPY --from=builder /app/kube-scheduler-topology .
    
    ENTRYPOINT ["kube-scheduler-topology"]
    CMD ["--config=/etc/kubernetes/scheduler-config.yaml", "--v=4"]

    构建镜像:

    docker build -t my-topology-scheduler:latest .
    docker tag my-topology-scheduler:latest your-registry/my-topology-scheduler:latest
    docker push your-registry/my-topology-scheduler:latest
  2. 创建 ServiceAccount 和 RoleBinding
    自定义调度器需要与 API Server 交互,因此需要相应的权限。可以参考默认 kube-schedulerServiceAccountClusterRole

    # rbac.yaml
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: my-topology-scheduler-sa
      namespace: kube-system
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: my-topology-scheduler-binding
    subjects:
      - kind: ServiceAccount
        name: my-topology-scheduler-sa
        namespace: kube-system
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: ClusterRole
      name: system:kube-scheduler # 复用默认调度器的权限

    kubectl apply -f rbac.yaml

  3. 创建 ConfigMap
    scheduler-config.yaml 内容放入 ConfigMap。

    kubectl create configmap my-topology-scheduler-config --from-file=scheduler-config.yaml -n kube-system
  4. 创建 Deployment

    # deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: my-topology-scheduler
      namespace: kube-system
      labels:
        app: my-topology-scheduler
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: my-topology-scheduler
      template:
        metadata:
          labels:
            app: my-topology-scheduler
        spec:
          serviceAccountName: my-topology-scheduler-sa
          containers:
            - name: my-topology-scheduler
              image: your-registry/my-topology-scheduler:latest # 替换为你的镜像
              imagePullPolicy: Always
              args:
                - --config=/etc/kubernetes/scheduler-config.yaml
                - --kubeconfig=/etc/kubernetes/scheduler.conf
                - --leader-elect=true
                - --v=4 # 调整日志级别,方便调试
              volumeMounts:
                - name: scheduler-config
                  mountPath: /etc/kubernetes/scheduler-config.yaml
                  subPath: scheduler-config.yaml
                - name: kubeconfig
                  mountPath: /etc/kubernetes/scheduler.conf
                  subPath: scheduler.conf
          volumes:
            - name: scheduler-config
              configMap:
                name: my-topology-scheduler-config
            - name: kubeconfig # 假设 kubeconfig 已经以 Secret 或 ConfigMap 形式存在
              secret: # 或者 configMap
                secretName: my-scheduler-kubeconfig # 替换为你的 kubeconfig Secret 名称

    注意kubeconfig 需要预先创建。最简单的方法是复制默认 kube-schedulerscheduler.conf 文件并创建 Secret。

    kubectl apply -f deployment.yaml

5.3 测试场景

  1. 准备节点:确保你的集群节点带有正确的拓扑标签。

    kubectl label node <node-name-1> topology.kubernetes.io/region=us-east-1 topology.kubernetes.io/zone=us-east-1a
    kubectl label node <node-name-2> topology.kubernetes.io/region=us-east-1 topology.kubernetes.io/zone=us-east-1b
    kubectl label node <node-name-3> topology.kubernetes.io/region=us-west-1 topology.kubernetes.io/zone=us-west-1a
  2. 创建 Pod (带强偏好)

    # pod-preferred-zone.yaml
    apiVersion: v1
    kind: Pod
    metadata:
      name: my-app-zone-a
      annotations:
        topology-aware-scheduler.example.com/preferred-zone: us-east-1a
    spec:
      schedulerName: my-topology-scheduler # 指定我们的调度器
      containers:
        - name: nginx
          image: nginx:latest
          resources:
            requests:
              cpu: "100m"
              memory: "100Mi"

    预期结果:Pod 会被调度到 us-east-1a 的节点。

  3. 创建 Pod (带分散要求)

    # pod-disperse.yaml
    apiVersion: v1
    kind: Pod
    metadata:
      name: my-web-1
      labels:
        app.kubernetes.io/component: web
    spec:
      schedulerName: my-topology-scheduler
      containers:
        - name: nginx
          image: nginx:latest
          resources:
            requests:
              cpu: "100m"
              memory: "100Mi"
    ---
    apiVersion: v1
    kind: Pod
    metadata:
      name: my-web-2
      labels:
        app.kubernetes.io/component: web
    spec:
      schedulerName: my-topology-scheduler
      containers:
        - name: nginx
          image: nginx:latest
          resources:
            requests:
              cpu: "100m"
              memory: "100Mi"
    ---
    apiVersion: v1
    kind: Pod
    metadata:
      name: my-web-3
      labels:
        app.kubernetes.io/component: web
    spec:
      schedulerName: my-topology-scheduler
      containers:
        - name: nginx
          image: nginx:latest
          resources:
            requests:
              cpu: "100m"
              memory: "100Mi"

    预期结果:my-web-1, my-web-2, my-web-3 会尽量分散到不同的可用区,或者优先调度到当前负载较低的可用区。

  4. 创建 Pod (无特定偏好,观察负载均衡)

    # pod-no-preference.yaml
    apiVersion: v1
    kind: Pod
    metadata:
      name: my-app-no-pref
    spec:
      schedulerName: my-topology-scheduler
      containers:
        - name: busybox
          image: busybox:latest
          command: ["sh", "-c", "sleep 3600"]
          resources:
            requests:
              cpu: "100m"
              memory: "100Mi"

    预期结果:Pod 会被调度到当前负载(Pod 数量)最低的节点。

通过 kubectl get pod -o wide 和查看自定义调度器的日志 (kubectl logs -f deployment/my-topology-scheduler -n kube-system),可以验证调度决策是否符合预期。

六、高级考量与最佳实践

  1. 性能优化

    • 缓存利用:调度器框架提供了 framework.Handle 接口,通过 SnapshotSharedLister() 可以访问集群资源的共享缓存(Informer)。尽量利用这些缓存而不是直接查询 API Server,以减少延迟和 API Server 负载。
    • 计算复杂度FilterScore 阶段会被频繁调用。确保你的插件逻辑高效,避免在热路径上执行耗时操作。
    • CycleState:巧妙使用 CycleState 在不同扩展点之间传递预计算结果,避免重复工作。
    • PreFilterExtensions:如果你的 Filter 逻辑可以提前判断某些 Pod 根本不可能调度,可以实现 PreFilterExtensions 中的 AddPod / RemovePod 等方法来优化。
  2. 错误处理与日志

    • 在插件代码中,始终进行充分的错误检查。
    • 使用 klog/v2 进行详细的日志输出,特别是调试信息 (klog.V(level)),以便在生产环境中追踪调度决策。
  3. 可观测性

    • 考虑将你的插件与 Prometheus 集成,暴露自定义指标(例如,因特定拓扑约束而被拒绝的 Pod 数量、不同区域的调度成功率等),以便更好地监控调度器的行为和集群状态。
  4. 插件间交互

    • 如果多个插件在同一扩展点被启用,它们的执行顺序可能很重要。在 scheduler-config.yaml 中,enabled 列表的顺序决定了插件的执行顺序。
    • Score 插件的权重 (weight) 决定了其打分结果对最终总分的影响。
    • 不同插件可能通过 CycleState 共享数据,但要小心命名冲突和数据一致性。
  5. 版本兼容性

    • Kubernetes 调度器框架的 API 可能会在不同版本之间发生变化(例如 v1beta2v1beta3)。在升级集群时,请务必检查插件代码与新框架版本的兼容性。
  6. 更复杂的拓扑

    • 本示例只考虑了 Region 和 Zone。实际中可能需要支持更多层次的拓扑,如机架(Rack)、宿主机组(Host Group)。这可以通过扩展节点标签和插件逻辑来实现。
    • 可以引入外部拓扑服务,通过 API 查询更复杂的拓扑信息,而不是仅依赖节点标签。

七、展望未来,赋能调度

通过本次深入探讨,我们了解了 Kubernetes 调度器插件框架的强大能力。编写自定义 Go 语言插件,可以让我们摆脱默认调度器的限制,实现真正符合业务需求的拓扑感知调度策略。这不仅能优化应用程序的性能和可靠性,还能有效控制云成本,是构建高效、健壮云原生平台不可或缺的一环。

从简单的区域亲和性,到复杂的故障域分散和智能负载均衡,调度器插件为我们打开了一扇通向无限定制可能的大门。鼓励各位在实际项目中大胆探索,利用这一机制解决独特的调度挑战,共同推动 Kubernetes 调度的智能化和自动化水平。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注