深入 Kubernetes Scheduler Plugins:编写 Go 代码实现拓扑感知调度
各位技术同行,大家好。今天我们将深入探讨 Kubernetes 调度器的核心机制,特别是如何利用其强大的插件架构,编写 Go 语言代码,实现高度定制化的拓扑感知调度策略。在当今大规模、分布式云原生环境中,仅仅依靠默认调度器往往不足以满足复杂的业务需求。理解并掌握调度器插件的开发,是提升集群资源利用率、优化应用性能、增强系统健壮性的关键一步。
一、Kubernetes 调度的基石与定制化的必要性
Kubernetes 调度器 (kube-scheduler) 是集群控制平面中的一个关键组件,其核心职责是根据一系列预设规则和优先级,为新创建但尚未分配到节点的 Pod 选择一个最合适的节点。这个过程通常分为两个阶段:
- 过滤 (Predicates / Filter):从所有可用节点中筛选出符合 Pod 运行条件(如资源请求、节点选择器、亲和性/反亲和性规则、端口冲突等)的节点。
- 打分 (Priorities / Score):对过滤后的节点进行打分,选择分数最高的节点来调度 Pod。
默认的 Kubernetes 调度器已经相当智能和强大,它内置了多种策略来处理常见的调度场景,例如:
- 资源匹配:确保节点有足够的 CPU、内存等资源。
- 节点亲和性/反亲和性:根据节点标签将 Pod 吸引或排斥到特定节点。
- Pod 亲和性/反亲和性:根据其他 Pod 的位置将 Pod 部署在一起或分散开。
- Taints 和 Tolerations:允许节点驱逐不容忍特定污点的 Pod。
- PodTopologySpread:确保 Pod 在不同的拓扑域(如区域、可用区、主机)中均匀分布。
然而,在面对更高级、更复杂的业务场景时,默认调度器的能力可能会显得不足。例如:
- 高度定制的拓扑感知调度:默认的 PodTopologySpread 提供了基本的拓扑分散能力,但如果我们需要更精细的控制,例如,优先将计算调度到离数据源最近的可用区,或者在某个可用区资源不足时,以特定策略进行跨可用区调度,同时兼顾成本和网络延迟,就需要自定义逻辑。
- 特定硬件/软件加速器调度:将 Pod 调度到具有 GPU、FPGA 等特定硬件的节点,并考虑这些资源的利用率。
- 批处理作业调度优化:针对短时、大量计算的批处理任务,可能需要与在线服务不同的调度策略。
- 复杂的资源预留/配额管理:在多租户环境中,实现更灵活、更细粒度的资源隔离和共享策略。
- 外部系统集成:调度决策需要与外部系统(如容量管理系统、成本优化系统)进行交互。
正是为了应对这些挑战,Kubernetes 调度器引入了插件机制,允许开发者通过实现特定的接口,扩展调度器的行为,甚至替换其核心逻辑。
二、Kubernetes Scheduler 插件架构深度解析
Kubernetes 1.15 版本引入了调度器框架(Scheduler Framework),并在后续版本中不断完善,最终形成了我们今天使用的插件架构。这个架构的核心思想是将调度过程分解为一系列可扩展的阶段(Extension Points),每个阶段都可以注册一个或多个插件来执行自定义逻辑。
2.1 调度器框架的扩展点 (Extension Points)
调度器框架定义了以下主要扩展点。理解这些扩展点的执行顺序和作用至关重要:
| 扩展点名称 | 执行时机 | 作用 | 典型插件示例 |
|---|---|---|---|
QueueSort |
调度队列中的 Pod 排序阶段 | 决定哪个 Pod 应该优先被调度。 | 基于优先级、创建时间、资源请求等排序。 |
PreFilter |
过滤阶段之前 | 对 Pod 进行预处理,提前进行一些检查,或缓存 Pod 相关信息。 | 检查 Pod 的全局约束,准备后续过滤所需的数据。 |
Filter |
过滤阶段 | 筛选出无法运行 Pod 的节点。 | 检查节点资源、端口、标签、污点等是否满足 Pod 要求。 |
PostFilter |
过滤阶段之后,所有节点都被过滤掉时 | 处理 Pod 无法调度的情况,例如触发事件或尝试其他调度策略。 | 记录无法调度的原因,尝试回退到其他队列。 |
PreScore |
打分阶段之前 | 在节点打分前进行预处理,计算共享信息。 | 为打分阶段准备节点或 Pod 的聚合数据。 |
Score |
打分阶段 | 对每个节点进行打分,表示其适合度。 | 资源使用率、节点亲和性、Pod 反亲和性等。 |
NormalizeScore |
打分阶段之后,所有节点分数汇总之前 | 对所有节点的原始分数进行归一化处理,确保分数在合理范围内。 | 将分数从不同插件的特定范围映射到统一的 0-100 范围。 |
Reserve |
绑定阶段之前,Pod 已经选定节点,但尚未真正绑定 | 预留节点资源,防止其他调度器同时选择该节点。 | 资源预留,更新内部状态。 |
Permit |
允许阶段,Pod 等待绑定 | 允许或拒绝 Pod 绑定到选定节点,可以延迟绑定。 | 外部准入控制,等待外部审批。 |
PreBind |
绑定阶段之前 | 执行 Pod 绑定前的最后操作,例如准备存储卷。 | 挂载 CSI 卷、初始化网络。 |
Bind |
绑定阶段 | 将 Pod 真正绑定到节点。 | 更新 Pod 的 spec.nodeName 字段。默认调度器通常只使用一个 Bind 插件。 |
PostBind |
绑定阶段之后 | 执行绑定后的清理或通知操作。 | 清理预留资源,发送通知,更新外部系统状态。 |
执行顺序简化图示:
+-----------------+
| Pod 调度请求 |
+--------+--------+
|
v
+--------+--------+
| QueueSort | (排序调度队列中的 Pod)
+--------+--------+
|
v
+--------+--------+
| PreFilter | (预处理 Pod 信息)
+--------+--------+
|
v
+--------+--------+
| Filter | (筛选可用节点)
+--------+--------+
|
(如果无可用节点)
+----------+----------+
| PostFilter | (处理无可用节点情况)
+----------+----------+
|
v
+--------+--------+
| PreScore | (预处理打分信息)
+--------+--------+
|
v
+--------+--------+
| Score | (为节点打分)
+--------+--------+
|
v
+--------+--------+
| NormalizeScore | (归一化分数)
+--------+--------+
|
v
+--------+--------+
| Reserve | (预留节点资源)
+--------+--------+
|
v
+--------+--------+
| Permit | (允许/拒绝绑定)
+--------+--------+
|
v
+--------+--------+
| PreBind | (绑定前操作)
+--------+--------+
|
v
+--------+--------+
| Bind | (实际绑定 Pod)
+--------+--------+
|
v
+--------+--------+
| PostBind | (绑定后操作)
+--------+--------+
|
v
+-----------------+
| 调度完成 |
+-----------------+
2.2 插件的注册与配置
要启用自定义插件,我们需要创建一个 KubeSchedulerConfiguration 对象,并在其中指定要使用的插件及其配置。这个配置通常通过 ConfigMap 提供给 kube-scheduler 进程。
# plugin.yaml
apiVersion: kubescheduler.config.k8s.io/v1beta3 # 或 v1beta2,根据K8s版本选择
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: /etc/kubernetes/scheduler.conf
leaderElection:
leaderElect: true
profiles:
- schedulerName: my-topology-scheduler # 自定义调度器名称
plugins:
queueSort:
enabled:
- name: PrioritySort # 默认的优先级排序
preFilter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
- name: PodTopologySpread
- name: InterPodAffinity
- name: TopologyAware # 我们的自定义插件
filter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
- name: NodeName
- name: TaintToleration
- name: InterPodAffinity
- name: NodeAffinity
- name: PodTopologySpread
- name: TopologyAware # 我们的自定义插件
postFilter:
enabled:
- name: DefaultPreemption
preScore:
enabled:
- name: InterPodAffinity
- name: PodTopologySpread
- name: TopologyAware # 我们的自定义插件
score:
enabled:
- name: NodeResourcesBalancedAllocation
- name: NodeResourcesLeastAllocated
- name: NodeAffinity
weight: 1
- name: InterPodAffinity
weight: 1
- name: PodTopologySpread
weight: 2
- name: TaintToleration
weight: 1
- name: TopologyAware # 我们的自定义插件
weight: 3 # 给予较高的权重,体现其重要性
reserve:
enabled:
- name: VolumeBinding
permit:
enabled: []
preBind:
enabled:
- name: VolumeBinding
bind:
enabled:
- name: DefaultBinder
postBind:
enabled: []
在上面的配置中,我们定义了一个名为 my-topology-scheduler 的调度器配置,并在多个扩展点(preFilter、filter、preScore、score)中启用了我们自定义的 TopologyAware 插件。weight 字段在 score 阶段尤为重要,它决定了该插件的打分结果对最终节点选择的影响程度。
三、拓扑感知调度:理念与挑战
拓扑感知调度旨在根据集群的物理或逻辑布局(拓扑结构)来做出调度决策。这些拓扑结构通常由节点标签表示,例如:
topology.kubernetes.io/region: 云服务商的区域,如us-east-1。topology.kubernetes.io/zone: 云服务商的可用区,如us-east-1a。kubernetes.io/hostname: 主机名。
为什么需要拓扑感知调度?
- 数据本地性 (Data Locality):将计算工作负载调度到离其所需数据最近的节点,可以显著降低网络延迟,提高应用程序性能。例如,将 Kafka 消费者调度到与 Kafka Broker 相同的可用区。
- 故障域隔离 (Failure Domain Isolation):将同一服务的 Pod 分散到不同的故障域(如不同的可用区、机架)中,可以防止单点故障。即使一个可用区发生故障,服务仍然可以在其他可用区继续运行。
- 网络成本优化 (Network Cost Optimization):跨区域或跨可用区的数据传输通常会产生额外的网络费用。拓扑感知调度有助于将流量限制在同一区域或可用区内,从而降低运营成本。
- 性能与带宽:在同一物理网络拓扑中通信的 Pod 通常具有更高的带宽和更低的延迟。
Kubernetes 已经提供了一些内置机制来支持拓扑感知:
- Node Affinity / Anti-Affinity:可以通过
nodeSelector或nodeAffinity将 Pod 调度到特定区域或可用区的节点。 - Pod Anti-Affinity:结合
topologyKey可以将 Pod 分散到不同的拓扑域。例如,topologyKey: topology.kubernetes.io/zone可以确保来自同一服务的 Pod 不在同一个可用区。 - PodTopologySpread:这是 Kubernetes 1.16+ 引入的专门用于拓扑分散的 API 对象,允许用户定义 Pod 在指定拓扑域内的分布策略,例如,确保每个可用区都有大致相同数量的 Pod。
尽管这些内置机制非常有用,但在某些场景下仍有局限性:
- 复杂优先级:当需要同时考虑数据本地性、故障域隔离和资源利用率等多个因素,并赋予它们不同的优先级时,内置机制可能难以表达。
- 动态调整:内置机制通常是静态配置的。如果需要根据集群的实时状态(如可用区负载、网络拥堵)动态调整调度策略,就需要更灵活的插件。
- 特定业务逻辑:某些业务可能存在独特的拓扑需求,例如,只在特定区域部署某种类型的服务,或者在主备架构中强制主服务和备用服务位于不同区域,但又不能离得太远。
这就是自定义调度器插件发挥作用的地方。我们将编写一个插件,实现更精细的拓扑感知调度策略。
四、编写自定义拓扑感知调度插件
现在,让我们通过一个具体的 Go 语言示例,来编写一个名为 TopologyAware 的调度器插件。
场景设定:
我们的集群跨越多个 Region 和 Zone。我们希望实现以下调度策略:
- 区域/可用区亲和性优先:如果 Pod 通过
topology.kubernetes.io/region或topology.kubernetes.io/zone标签明确表达了偏好的拓扑域,则优先将其调度到该域内的节点。 - 负载均衡:在满足亲和性要求的前提下,倾向于选择那些资源利用率较低的节点。
- 故障域分散:对于带有特定标签(例如
app.kubernetes.io/component=web)的 Pod,我们希望将其分散到不同的可用区,避免过度集中。 - 优雅降级:如果所有亲和性节点都不可用,允许 Pod 调度到其他区域或可用区,但得分会较低。
4.1 项目结构
my-topology-scheduler-plugin/
├── cmd/
│ └── main.go
├── go.mod
├── go.sum
├── pkg/
│ └── plugin/
│ └── topologyaware.go
└── scheduler-config.yaml # 我们的调度器配置
4.2 核心 Go 代码实现 (pkg/plugin/topologyaware.go)
首先,我们需要定义插件的结构体,并实现调度器框架定义的接口。
package plugin
import (
"context"
"fmt"
"sort"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
framework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
const (
// PluginName 是我们插件的名称
PluginName = "TopologyAware"
// 拓扑标签键
NodeRegionLabelKey = "topology.kubernetes.io/region"
NodeZoneLabelKey = "topology.kubernetes.io/zone"
// Pod 偏好标签/注解
PodPreferredRegionAnnotationKey = "topology-aware-scheduler.example.com/preferred-region"
PodPreferredZoneAnnotationKey = "topology-aware-scheduler.example.com/preferred-zone"
PodDisperseComponentLabelKey = "app.kubernetes.io/component" // 用于故障域分散的组件标签
// CycleStateKey 用于在调度周期中传递信息
stateKey framework.StateKey = "TopologyAwareState"
)
// TopologyAwareState 是在调度周期内传递的状态信息
type TopologyAwareState struct {
PreferredRegion string
PreferredZone string
DisperseComponent string // 用于分散的组件值
}
// Clone 实现了 framework.StateData 接口
func (s *TopologyAwareState) Clone() framework.StateData {
return &TopologyAwareState{
PreferredRegion: s.PreferredRegion,
PreferredZone: s.PreferredZone,
DisperseComponent: s.DisperseComponent,
}
}
// TopologyAware 是我们自定义的拓扑感知调度插件
type TopologyAware struct {
handle framework.Handle
}
// New 函数是插件的构造器,它会在调度器启动时被调用
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
klog.V(4).Infof("Initializing plugin %s", PluginName)
return &TopologyAware{handle: h}, nil
}
// Name 返回插件的名称
func (ta *TopologyAware) Name() string {
return PluginName
}
// PreFilter 检查 Pod 的偏好拓扑信息,并将其存储到 CycleState 中
func (ta *TopologyAware) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
preferredRegion := p.Annotations[PodPreferredRegionAnnotationKey]
preferredZone := p.Annotations[PodPreferredZoneAnnotationKey]
disperseComponent := p.Labels[PodDisperseComponentLabelKey]
s := &TopologyAwareState{
PreferredRegion: preferredRegion,
PreferredZone: preferredZone,
DisperseComponent: disperseComponent,
}
state.Write(stateKey, s)
klog.V(5).Infof("PreFilter for Pod %s/%s: preferredRegion=%s, preferredZone=%s, disperseComponent=%s",
p.Namespace, p.Name, preferredRegion, preferredZone, disperseComponent)
return nil, nil // 返回 nil, nil 表示 PreFilter 成功
}
// PreFilterExtensions 提供了 PreFilter 阶段的扩展点,这里我们不需要
func (ta *TopologyAware) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// Filter 阶段:根据 Pod 的偏好拓扑信息过滤不符合条件的节点
func (ta *TopologyAware) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
s, err := getState(state)
if err != nil {
return framework.As="error", err.Error())
}
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
nodeRegion := node.Labels[NodeRegionLabelKey]
nodeZone := node.Labels[NodeZoneLabelKey]
// 如果 Pod 指定了偏好区域,且节点不在该区域,则过滤掉
if s.PreferredRegion != "" && s.PreferredRegion != nodeRegion {
klog.V(5).Infof("Filter Pod %s/%s: Node %s (Region %s) does not match preferred region %s",
pod.Namespace, pod.Name, node.Name, nodeRegion, s.PreferredRegion)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node region %s does not match preferred region %s", nodeRegion, s.PreferredRegion))
}
// 如果 Pod 指定了偏好可用区,且节点不在该可用区,则过滤掉
if s.PreferredZone != "" && s.PreferredZone != nodeZone {
klog.V(5).Infof("Filter Pod %s/%s: Node %s (Zone %s) does not match preferred zone %s",
pod.Namespace, pod.Name, node.Name, nodeZone, s.PreferredZone)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node zone %s does not match preferred zone %s", nodeZone, s.PreferredZone))
}
klog.V(5).Infof("Filter Pod %s/%s: Node %s (Region %s, Zone %s) is suitable based on topology preferences",
pod.Namespace, pod.Name, node.Name, nodeRegion, nodeZone)
return nil // 返回 nil 表示节点符合条件
}
// PreScore 阶段:为打分阶段准备数据,例如统计每个 Zone 已运行的 Pod 数量
func (ta *TopologyAware) PreScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodes []*v1.Node) *framework.Status {
s, err := getState(state)
if err != nil {
return framework.As="error", err.Error())
}
if s.DisperseComponent == "" {
return nil // 如果没有分散组件要求,则跳过
}
// 统计每个 Zone 中已运行的目标组件 Pod 数量
zonePodCount := make(map[string]int)
for _, node := range nodes {
nodeZone := node.Labels[NodeZoneLabelKey]
if nodeZone == "" {
continue
}
// 获取该节点上所有 Pod
nodeInfo, err := ta.handle.SnapshotSharedLister().NodeInfos().Get(node.Name)
if err != nil {
klog.ErrorS(err, "Failed to get node info for node", "node", node.Name)
continue
}
for _, existingPod := range nodeInfo.Pods {
if existingPod.Pod.Labels[PodDisperseComponentLabelKey] == s.DisperseComponent {
zonePodCount[nodeZone]++
}
}
}
// 将统计结果存储到 CycleState 中
state.Write(stateKey, &TopologyAwareState{
PreferredRegion: s.PreferredRegion,
PreferredZone: s.PreferredZone,
DisperseComponent: s.DisperseComponent,
// 存储额外数据,这里简单地用 string 序列化,实际应用中可以定义更复杂的结构
// 为了简化,我们假设 PreScore 只是更新状态,实际应用可能需要更复杂的共享数据结构
// 对于本示例,PreScore 主要是为了演示如何获取信息,实际打分逻辑在 Score 中直接计算
})
klog.V(5).Infof("PreScore for Pod %s/%s, DisperseComponent: %s, ZonePodCount: %v",
p.Namespace, p.Name, s.DisperseComponent, zonePodCount)
return nil
}
// Score 阶段:为每个节点打分
func (ta *TopologyAware) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
s, err := getState(state)
if err != nil {
return 0, framework.As="error", err.Error())
}
nodeInfo, err := ta.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from snapshot error: %v", nodeName, err))
}
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, "node not found")
}
nodeRegion := node.Labels[NodeRegionLabelKey]
nodeZone := node.Labels[NodeZoneLabelKey]
var score int64 = 0
// 1. 区域/可用区亲和性打分 (高优先级)
// 如果 Pod 指定了偏好区域,且节点在偏好区域,则加分
if s.PreferredRegion != "" && s.PreferredRegion == nodeRegion {
score += 50
klog.V(5).Infof("Score Pod %s/%s on Node %s: +50 for preferred region match (%s)", p.Namespace, p.Name, node.Name, nodeRegion)
} else if s.PreferredRegion != "" && s.PreferredRegion != nodeRegion {
// 如果 Pod 指定了偏好区域,但节点不在,则减分 (除非是 Filter 已经过滤掉,这里是降级情况)
// 实际上,Filter 已经把不符合条件的节点过滤了,这里是为了演示打分逻辑的完整性
score -= 20 // 降级到其他区域的惩罚
klog.V(5).Infof("Score Pod %s/%s on Node %s: -20 for non-preferred region (%s)", p.Namespace, p.Name, node.Name, nodeRegion)
}
// 如果 Pod 指定了偏好可用区,且节点在偏好可用区,则加分
if s.PreferredZone != "" && s.PreferredZone == nodeZone {
score += 30
klog.V(5).Infof("Score Pod %s/%s on Node %s: +30 for preferred zone match (%s)", p.Namespace, p.Name, node.Name, nodeZone)
} else if s.PreferredZone != "" && s.PreferredZone != nodeZone {
score -= 10 // 降级到其他可用区的惩罚
klog.V(5).Infof("Score Pod %s/%s on Node %s: -10 for non-preferred zone (%s)", p.Namespace, p.Name, node.Name, nodeZone)
}
// 2. 负载均衡打分 (中优先级)
// 简单的负载均衡:根据节点上 Pod 数量打分,Pod 越少分数越高
// 实际生产中会根据 CPU/Memory 利用率等指标
nodePodsCount := len(nodeInfo.Pods)
// 假设最大 Pod 数量是 100,分数范围 0-20
// Pod 越少,分数越高。100 - nodePodsCount / 5
loadBalanceScore := int64(framework.MaxNodeScore - nodePodsCount) / 5
if loadBalanceScore < 0 {
loadBalanceScore = 0
}
score += loadBalanceScore
klog.V(5).Infof("Score Pod %s/%s on Node %s: +%d for load balance (Pods: %d)", p.Namespace, p.Name, node.Name, loadBalanceScore, nodePodsCount)
// 3. 故障域分散打分 (中优先级)
if s.DisperseComponent != "" {
currentZonePods := 0
for _, existingPod := range nodeInfo.Pods {
if existingPod.Pod.Labels[PodDisperseComponentLabelKey] == s.DisperseComponent {
currentZonePods++
}
}
// 目标是让每个 Zone 的 Pod 数量尽可能少。
// 在当前 Zone 内,Pod 数量越多,分数越低。
// 假设每个 Zone 最多允许 10 个相同组件的 Pod
disperseScore := int64(10 - currentZonePods) * 5
if disperseScore < -50 { // 最低分数限制
disperseScore = -50
}
score += disperseScore
klog.V(5).Infof("Score Pod %s/%s on Node %s: +%d for disperse component (Zone %s, current component pods: %d)",
p.Namespace, p.Name, node.Name, disperseScore, nodeZone, currentZonePods)
}
// 确保分数在合法范围内
if score < framework.MinNodeScore {
score = framework.MinNodeScore
}
if score > framework.MaxNodeScore {
score = framework.MaxNodeScore
}
klog.V(4).Infof("Score Pod %s/%s on Node %s: Final Score = %d", p.Namespace, p.Name, node.Name, score)
return score, nil
}
// NormalizeScore 阶段:将 Score 插件的打分结果归一化到 0-framework.MaxNodeScore 之间。
// 如果 Score 插件自身已经确保了分数在合法范围内,这个阶段可以不做特殊处理。
// 但通常建议实现,以防万一或与其他插件协调。
func (ta *TopologyAware) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// 找到最大和最小分数
var maxScore int64 = 0
var minScore int64 = framework.MaxNodeScore
for _, nodeScore := range scores {
if nodeScore.Score > maxScore {
maxScore = nodeScore.Score
}
if nodeScore.Score < minScore {
minScore = nodeScore.Score
}
}
// 如果所有分数都相同,或者范围为零,则无需归一化
if maxScore == minScore {
for i := range scores {
scores[i].Score = framework.MaxNodeScore / 2 // 统一设置为中间值
}
return nil
}
// 归一化到 [0, MaxNodeScore] 范围
// newScore = (oldScore - minScore) * MaxNodeScore / (maxScore - minScore)
for i := range scores {
scores[i].Score = (scores[i].Score - minScore) * framework.MaxNodeScore / (maxScore - minScore)
klog.V(5).Infof("Normalized Score for Node %s: %d", scores[i].Name, scores[i].Score)
}
return nil
}
// 辅助函数:从 CycleState 中获取状态
func getState(state *framework.CycleState) (*TopologyAwareState, error) {
s, err := state.Read(stateKey)
if err != nil {
return nil, fmt.Errorf("error reading state from cycleState: %w", err)
}
ts, ok := s.(*TopologyAwareState)
if !ok {
return nil, fmt.Errorf("unexpected state type, expected *TopologyAwareState")
}
return ts, nil
}
代码解释:
TopologyAwareState: 这是一个自定义结构体,用于在同一个调度周期内,在不同的插件扩展点之间传递数据。我们在这里存储了 Pod 偏好的区域、可用区以及用于分散的组件标签值。New: 这是插件的工厂函数,它在调度器启动时被调用,负责创建TopologyAware插件实例。Name(): 返回插件的唯一名称TopologyAware,这与我们在scheduler-config.yaml中配置的名称一致。PreFilter: 在实际过滤节点之前执行。它从 Pod 的注解中读取用户指定的偏好区域和可用区,以及 Pod 标签中的分散组件信息,并将这些信息存储在CycleState中,以便后续的Filter和Score阶段使用。Filter: 这个阶段根据PreFilter中获取的 Pod 偏好,检查每个节点。如果节点所在的区域或可用区不符合 Pod 的强偏好,则直接将该节点标记为不可调度 (UnschedulableAndUnresolvable)。这确保了 Pod 优先在符合其拓扑偏好的节点上调度。PreScore: 在Score阶段之前执行。这里我们演示了如何遍历所有节点并获取它们的信息,为Score阶段的复杂计算做准备。在本例中,我们统计了每个 Zone 中已运行的目标组件 Pod 的数量,以便在Score阶段用于分散策略。Score: 这是核心逻辑所在。它为每个符合Filter条件的节点计算一个分数:- 区域/可用区亲和性:如果节点与 Pod 的偏好区域/可用区匹配,则给予高分。如果节点不匹配(在 Filter 阶段没有被过滤掉,意味着 Pod 没有强偏好,或者我们允许降级),则给予较低的分数或负分。
- 负载均衡:简单地根据节点上已运行 Pod 的数量进行打分,Pod 数量越少分数越高。这模拟了将 Pod 调度到负载较低节点的策略。
- 故障域分散:如果 Pod 指定了
DisperseComponent,则检查该节点所在可用区中已运行的相同组件的 Pod 数量。数量越少,分数越高,鼓励 Pod 分散。 - 最终分数会根据这些策略的权重累加。
NormalizeScore: 在所有Score插件执行完毕后,对所有节点的原始分数进行归一化处理,确保最终分数都在[0, framework.MaxNodeScore]范围内。这对于协调不同Score插件的打分范围非常重要。getState: 辅助函数,用于从CycleState中安全地读取我们插件的状态。
4.3 启动自定义调度器 (cmd/main.go)
接下来,我们需要一个 main 函数来启动我们的自定义调度器。这个 main 函数将使用 k8s.io/kubernetes/cmd/kube-scheduler 包中的 scheduler.NewSchedulerCommand 函数来构建调度器命令,并注册我们的插件。
package main
import (
"math/rand"
"os"
"time"
"github.com/myorg/my-topology-scheduler-plugin/pkg/plugin" // 替换为你的模块路径
"k8s.io/component-base/logs"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func main() {
rand.Seed(time.Now().UnixNano())
logs.InitLogs()
defer logs.FlushLogs()
// 创建调度器命令,并注册我们的自定义插件
command := app.NewSchedulerCommand(
app.With"=plugin", plugin.PluginName, plugin.New),
)
// 如果命令行参数为空,则添加默认配置路径
// 这通常在容器中运行调度器时很有用
if len(os.Args) <= 1 {
os.Args = append(os.Args, "--config=/etc/kubernetes/scheduler-config.yaml")
}
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
代码解释:
app.NewSchedulerCommand是构建kube-scheduler命令的入口。app.WithPlugin允许我们将自定义插件注册到调度器框架中。我们传入插件的名称 (plugin.PluginName) 和它的工厂函数 (plugin.New)。
4.4 调度器配置 (scheduler-config.yaml)
将这个配置文件保存为 scheduler-config.yaml。
# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1beta3 # 根据你的K8s版本选择 v1beta2 或 v1beta3
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: /etc/kubernetes/scheduler.conf # 调度器连接 Kube API Server 的 kubeconfig
leaderElection:
leaderElect: true # 启用领导者选举,确保只有一个调度器实例活跃
profiles:
- schedulerName: my-topology-scheduler # 这个名称应该与 Pod.spec.schedulerName 匹配
plugins:
# 默认的 QueueSort 插件,按优先级排序 Pod
queueSort:
enabled:
- name: PrioritySort
# PreFilter 阶段,我们的插件在此处读取 Pod 拓扑偏好
preFilter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
- name: PodTopologySpread
- name: InterPodAffinity
- name: TopologyAware # 启用我们的插件
# Filter 阶段,我们的插件在此处过滤不符合拓扑偏好的节点
filter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
- name: NodeName
- name: TaintToleration
- name: InterPodAffinity
- name: NodeAffinity
- name: PodTopologySpread
- name: TopologyAware # 启用我们的插件
# PostFilter 阶段,处理没有可用节点的情况
postFilter:
enabled:
- name: DefaultPreemption
# PreScore 阶段,我们的插件在此处准备打分数据,如统计 Zone 内 Pod 数量
preScore:
enabled:
- name: InterPodAffinity
- name: PodTopologySpread
- name: TopologyAware # 启用我们的插件
# Score 阶段,我们的插件在此处为节点打分
score:
enabled:
- name: NodeResourcesBalancedAllocation
- name: NodeResourcesLeastAllocated
- name: NodeAffinity
weight: 1
- name: InterPodAffinity
weight: 1
- name: PodTopologySpread
weight: 2
- name: TaintToleration
weight: 1
- name: TopologyAware # 启用我们的插件,并赋予较高权重
weight: 3 # 权重越高,该插件的打分对最终结果影响越大
# NormalizeScore 阶段,对分数进行归一化
normalizeScore:
enabled:
- name: TopologyAware # 启用我们的插件
# Reserve 阶段,用于预留资源
reserve:
enabled:
- name: VolumeBinding
# Permit 阶段,用于延迟或拒绝绑定
permit:
enabled: []
# PreBind 阶段,绑定前操作
preBind:
enabled:
- name: VolumeBinding
# Bind 阶段,实际执行绑定
bind:
enabled:
- name: DefaultBinder
# PostBind 阶段,绑定后操作
postBind:
enabled: []
五、编译、部署与测试
5.1 编译自定义调度器
在 my-topology-scheduler-plugin 目录下执行:
go mod init github.com/myorg/my-topology-scheduler-plugin # 替换为你的实际模块路径
go mod tidy
go build -o kube-scheduler-topology cmd/main.go
这将生成一个名为 kube-scheduler-topology 的二进制文件。
5.2 部署自定义调度器
通常,部署自定义调度器有两种方式:
- 替换默认调度器:修改现有
kube-schedulerDeployment 或 Static Pod 的配置,将其镜像替换为包含我们自定义调度器的镜像。这种方式风险较高,不建议在生产环境直接使用。 - 作为辅助调度器运行:这是推荐的方式。我们将自己的调度器作为一个独立的 Deployment 运行,并为它指定一个不同的
schedulerName。Pod 可以通过spec.schedulerName字段选择使用哪个调度器。
步骤(作为辅助调度器):
-
创建 Docker 镜像:
创建一个Dockerfile:# Dockerfile FROM golang:1.21-alpine AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 go build -o kube-scheduler-topology cmd/main.go FROM alpine:latest RUN apk --no-cache add ca-certificates WORKDIR /usr/local/bin COPY --from=builder /app/kube-scheduler-topology . ENTRYPOINT ["kube-scheduler-topology"] CMD ["--config=/etc/kubernetes/scheduler-config.yaml", "--v=4"]构建镜像:
docker build -t my-topology-scheduler:latest . docker tag my-topology-scheduler:latest your-registry/my-topology-scheduler:latest docker push your-registry/my-topology-scheduler:latest -
创建 ServiceAccount 和 RoleBinding:
自定义调度器需要与 API Server 交互,因此需要相应的权限。可以参考默认kube-scheduler的ServiceAccount和ClusterRole。# rbac.yaml apiVersion: v1 kind: ServiceAccount metadata: name: my-topology-scheduler-sa namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: my-topology-scheduler-binding subjects: - kind: ServiceAccount name: my-topology-scheduler-sa namespace: kube-system roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: system:kube-scheduler # 复用默认调度器的权限kubectl apply -f rbac.yaml -
创建 ConfigMap:
将scheduler-config.yaml内容放入 ConfigMap。kubectl create configmap my-topology-scheduler-config --from-file=scheduler-config.yaml -n kube-system -
创建 Deployment:
# deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: my-topology-scheduler namespace: kube-system labels: app: my-topology-scheduler spec: replicas: 1 selector: matchLabels: app: my-topology-scheduler template: metadata: labels: app: my-topology-scheduler spec: serviceAccountName: my-topology-scheduler-sa containers: - name: my-topology-scheduler image: your-registry/my-topology-scheduler:latest # 替换为你的镜像 imagePullPolicy: Always args: - --config=/etc/kubernetes/scheduler-config.yaml - --kubeconfig=/etc/kubernetes/scheduler.conf - --leader-elect=true - --v=4 # 调整日志级别,方便调试 volumeMounts: - name: scheduler-config mountPath: /etc/kubernetes/scheduler-config.yaml subPath: scheduler-config.yaml - name: kubeconfig mountPath: /etc/kubernetes/scheduler.conf subPath: scheduler.conf volumes: - name: scheduler-config configMap: name: my-topology-scheduler-config - name: kubeconfig # 假设 kubeconfig 已经以 Secret 或 ConfigMap 形式存在 secret: # 或者 configMap secretName: my-scheduler-kubeconfig # 替换为你的 kubeconfig Secret 名称注意:
kubeconfig需要预先创建。最简单的方法是复制默认kube-scheduler的scheduler.conf文件并创建 Secret。kubectl apply -f deployment.yaml
5.3 测试场景
-
准备节点:确保你的集群节点带有正确的拓扑标签。
kubectl label node <node-name-1> topology.kubernetes.io/region=us-east-1 topology.kubernetes.io/zone=us-east-1a kubectl label node <node-name-2> topology.kubernetes.io/region=us-east-1 topology.kubernetes.io/zone=us-east-1b kubectl label node <node-name-3> topology.kubernetes.io/region=us-west-1 topology.kubernetes.io/zone=us-west-1a -
创建 Pod (带强偏好):
# pod-preferred-zone.yaml apiVersion: v1 kind: Pod metadata: name: my-app-zone-a annotations: topology-aware-scheduler.example.com/preferred-zone: us-east-1a spec: schedulerName: my-topology-scheduler # 指定我们的调度器 containers: - name: nginx image: nginx:latest resources: requests: cpu: "100m" memory: "100Mi"预期结果:Pod 会被调度到
us-east-1a的节点。 -
创建 Pod (带分散要求):
# pod-disperse.yaml apiVersion: v1 kind: Pod metadata: name: my-web-1 labels: app.kubernetes.io/component: web spec: schedulerName: my-topology-scheduler containers: - name: nginx image: nginx:latest resources: requests: cpu: "100m" memory: "100Mi" --- apiVersion: v1 kind: Pod metadata: name: my-web-2 labels: app.kubernetes.io/component: web spec: schedulerName: my-topology-scheduler containers: - name: nginx image: nginx:latest resources: requests: cpu: "100m" memory: "100Mi" --- apiVersion: v1 kind: Pod metadata: name: my-web-3 labels: app.kubernetes.io/component: web spec: schedulerName: my-topology-scheduler containers: - name: nginx image: nginx:latest resources: requests: cpu: "100m" memory: "100Mi"预期结果:
my-web-1,my-web-2,my-web-3会尽量分散到不同的可用区,或者优先调度到当前负载较低的可用区。 -
创建 Pod (无特定偏好,观察负载均衡):
# pod-no-preference.yaml apiVersion: v1 kind: Pod metadata: name: my-app-no-pref spec: schedulerName: my-topology-scheduler containers: - name: busybox image: busybox:latest command: ["sh", "-c", "sleep 3600"] resources: requests: cpu: "100m" memory: "100Mi"预期结果:Pod 会被调度到当前负载(Pod 数量)最低的节点。
通过 kubectl get pod -o wide 和查看自定义调度器的日志 (kubectl logs -f deployment/my-topology-scheduler -n kube-system),可以验证调度决策是否符合预期。
六、高级考量与最佳实践
-
性能优化:
- 缓存利用:调度器框架提供了
framework.Handle接口,通过SnapshotSharedLister()可以访问集群资源的共享缓存(Informer)。尽量利用这些缓存而不是直接查询 API Server,以减少延迟和 API Server 负载。 - 计算复杂度:
Filter和Score阶段会被频繁调用。确保你的插件逻辑高效,避免在热路径上执行耗时操作。 CycleState:巧妙使用CycleState在不同扩展点之间传递预计算结果,避免重复工作。PreFilterExtensions:如果你的Filter逻辑可以提前判断某些 Pod 根本不可能调度,可以实现PreFilterExtensions中的AddPod/RemovePod等方法来优化。
- 缓存利用:调度器框架提供了
-
错误处理与日志:
- 在插件代码中,始终进行充分的错误检查。
- 使用
klog/v2进行详细的日志输出,特别是调试信息 (klog.V(level)),以便在生产环境中追踪调度决策。
-
可观测性:
- 考虑将你的插件与 Prometheus 集成,暴露自定义指标(例如,因特定拓扑约束而被拒绝的 Pod 数量、不同区域的调度成功率等),以便更好地监控调度器的行为和集群状态。
-
插件间交互:
- 如果多个插件在同一扩展点被启用,它们的执行顺序可能很重要。在
scheduler-config.yaml中,enabled列表的顺序决定了插件的执行顺序。 Score插件的权重 (weight) 决定了其打分结果对最终总分的影响。- 不同插件可能通过
CycleState共享数据,但要小心命名冲突和数据一致性。
- 如果多个插件在同一扩展点被启用,它们的执行顺序可能很重要。在
-
版本兼容性:
- Kubernetes 调度器框架的 API 可能会在不同版本之间发生变化(例如
v1beta2到v1beta3)。在升级集群时,请务必检查插件代码与新框架版本的兼容性。
- Kubernetes 调度器框架的 API 可能会在不同版本之间发生变化(例如
-
更复杂的拓扑:
- 本示例只考虑了 Region 和 Zone。实际中可能需要支持更多层次的拓扑,如机架(Rack)、宿主机组(Host Group)。这可以通过扩展节点标签和插件逻辑来实现。
- 可以引入外部拓扑服务,通过 API 查询更复杂的拓扑信息,而不是仅依赖节点标签。
七、展望未来,赋能调度
通过本次深入探讨,我们了解了 Kubernetes 调度器插件框架的强大能力。编写自定义 Go 语言插件,可以让我们摆脱默认调度器的限制,实现真正符合业务需求的拓扑感知调度策略。这不仅能优化应用程序的性能和可靠性,还能有效控制云成本,是构建高效、健壮云原生平台不可或缺的一环。
从简单的区域亲和性,到复杂的故障域分散和智能负载均衡,调度器插件为我们打开了一扇通向无限定制可能的大门。鼓励各位在实际项目中大胆探索,利用这一机制解决独特的调度挑战,共同推动 Kubernetes 调度的智能化和自动化水平。