Kubernetes 自定义调度器(Custom Scheduler)开发与部署

好的,各位观众老爷,各位技术大拿,还有屏幕前正在啃指甲盖儿的未来架构师们,大家好!我是你们的老朋友,专门负责把高深莫测的技术概念,用大白话揉碎了喂给你的“技术段子手”——码农张三! 👋

今天咱们要聊的,可不是什么“Hello World”,而是 Kubernetes 里头的“高定西装”——自定义调度器(Custom Scheduler)。 Kubernetes 本身自带的调度器已经很智能了,但就像商场里买的西装,总觉得哪里不太合身。为了让我们的 Pod 穿得更舒服,跑得更欢快,就需要量身定制一套调度方案。

一、 调度器:K8s 的“红娘”

首先,咱们得搞清楚调度器是干嘛的。 想象一下,Kubernetes 是一个大型的婚恋网站,里面住着各种各样的“Pod 小伙子”和“Node 姑娘”。 Pod 小伙子们都想找个条件好、性格合适的 Node 姑娘安家落户,而调度器就是这个“红娘”。

调度器负责把 Pod 分配到最合适的 Node 上运行。 它会考虑 Node 的资源(CPU、内存)、标签、污点等等,然后根据一定的算法,给每个 Pod 找到一个“良配”。

默认的调度器已经很能干了,但它就像一个“大众红娘”,只能照顾到大多数的需求。 如果你的 Pod 有特殊癖好,比如:

  • 死活要和数据库待在一起: 不然就闹脾气,数据访问延迟高得让人抓狂。
  • 对 GPU 情有独钟: 没有 GPU 就跑不动,简直是“显卡依赖症”。
  • 必须住在特定的机房: 不然就水土不服,延迟高到怀疑人生。

这时候,你就需要一个“私人定制”的红娘——自定义调度器!

二、为什么要“私人定制”?

就像选对象一样,适合自己的才是最好的。 自定义调度器可以让你根据自己的业务需求,灵活地控制 Pod 的调度行为。 它可以:

  • 优化资源利用率: 把 Pod 放到最合适的 Node 上,避免资源浪费。
  • 提高应用性能: 让 Pod 离它依赖的服务更近,减少延迟。
  • 实现高级调度策略: 比如亲和性调度、反亲和性调度、拓扑感知调度等等。
  • 满足特殊业务需求: 比如机器学习任务需要 GPU,大数据任务需要大内存等等。

总之,自定义调度器就像一把瑞士军刀,可以帮你解决各种各样的调度难题。

三、 “私人定制”的步骤

那么,如何才能打造一个属于自己的“红娘”呢? 别怕,其实也没那么难。 大致可以分为以下几个步骤:

  1. 明确需求: 首先,你要搞清楚你的 Pod 到底有什么特殊癖好。 比如,你希望把所有属于同一个服务的 Pod 都放到同一个可用区内,以提高可用性。
  2. 选择方案: 目前有两种主流的自定义调度器方案:

    • 独立调度器 (Standalone Scheduler): 完全从零开始,自己实现调度逻辑。 就像自己开一家婚介所,所有流程都自己掌控。 优点是灵活,缺点是工作量大。
    • 调度器扩展 (Scheduler Extender): 在默认调度器的基础上,添加一些自定义的过滤和打分规则。 就像在现有的婚介所里,添加一些 VIP 服务,满足特殊客户的需求。 优点是简单,缺点是灵活性有限。
  3. 编写代码: 根据你选择的方案,编写调度逻辑的代码。 这部分是核心,也是最难的部分。
  4. 打包部署: 将你的代码打包成镜像,然后部署到 Kubernetes 集群中。
  5. 配置 Pod: 在 Pod 的 YAML 文件中,指定使用你的自定义调度器。

下面我们详细讲解两种方案的具体实施过程。

四、方案一:独立调度器 (Standalone Scheduler)

这就像自己开一家婚介所,从选址、装修到招聘红娘,所有的事情都要自己操心。

1. 编写调度逻辑

这部分是核心,也是最难的部分。 你需要自己实现调度算法,从 Kubernetes API Server 获取 Pod 和 Node 的信息,然后根据你的业务需求,给每个 Pod 找到一个合适的 Node。

可以使用 Go 语言,因为 Kubernetes 本身就是用 Go 写的,方便集成。

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

func main() {
    // 1. 创建 Kubernetes 客户端
    config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatalf("Failed to create in-cluster config: %v", err)
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatalf("Failed to create clientset: %v", err)
    }

    // 2. 监听未调度的 Pod
    watch, err := clientset.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{
        FieldSelector: "spec.nodeName=", // 只监听未调度的 Pod
    })
    if err != nil {
        log.Fatalf("Failed to watch pods: %v", err)
    }

    // 3. 调度循环
    for event := range watch.ResultChan() {
        pod, ok := event.Object.(*corev1.Pod)
        if !ok {
            log.Printf("Unexpected type: %T", event.Object)
            continue
        }

        if event.Type == "ADDED" {
            log.Printf("Found unscheduled pod: %s/%s", pod.Namespace, pod.Name)
            // 4. 调度 Pod
            nodeName, err := schedulePod(clientset, pod)
            if err != nil {
                log.Printf("Failed to schedule pod %s/%s: %v", pod.Namespace, pod.Name, err)
                continue
            }
            log.Printf("Scheduled pod %s/%s to node %s", pod.Namespace, pod.Name, nodeName)
        }
    }
}

// schedulePod 调度 Pod 到合适的 Node
func schedulePod(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) {
    // 1. 获取所有可用的 Node
    nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        return "", fmt.Errorf("failed to list nodes: %v", err)
    }

    // 2. 简单的调度算法:随机选择一个 Node
    if len(nodes.Items) == 0 {
        return "", fmt.Errorf("no available nodes")
    }
    node := nodes.Items[0] // 简单起见,选择第一个 Node

    // 3. 将 Pod 绑定到 Node
    err = bindPodToNode(clientset, pod, node.Name)
    if err != nil {
        return "", fmt.Errorf("failed to bind pod to node %s: %v", node.Name, err)
    }

    return node.Name, nil
}

// bindPodToNode 将 Pod 绑定到 Node
func bindPodToNode(clientset *kubernetes.Clientset, pod *corev1.Pod, nodeName string) error {
    binding := &corev1.Binding{
        TypeMeta: metav1.TypeMeta{
            APIVersion: "v1",
            Kind:       "Binding",
        },
        ObjectMeta: metav1.ObjectMeta{
            Name:      pod.Name,
            Namespace: pod.Namespace,
        },
        Target: corev1.ObjectReference{
            Kind: "Node",
            Name: nodeName,
        },
    }

    err := clientset.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
    return err
}

代码解释:

  • 创建 Kubernetes 客户端: 使用 client-go 库,连接到 Kubernetes API Server。
  • 监听未调度的 Pod: 使用 Watch API,监听 spec.nodeName 为空的 Pod,也就是未调度的 Pod。
  • 调度循环: 不断地从 Watch API 获取事件,如果发现有新的 Pod,就调用 schedulePod 函数进行调度。
  • 调度算法: schedulePod 函数实现了简单的调度算法,这里只是随机选择一个 Node。 你可以根据自己的需求,实现更复杂的算法。 比如,可以考虑 Node 的资源利用率、标签、污点等等。
  • 绑定 Pod 到 Node: 使用 Bind API,将 Pod 绑定到选定的 Node 上。

2. 打包部署

将你的代码打包成 Docker 镜像,然后部署到 Kubernetes 集群中。

FROM golang:1.20-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .

RUN go build -o scheduler .

FROM alpine:latest

WORKDIR /app

COPY --from=builder /app/scheduler .

ENTRYPOINT ["./scheduler"]

3. 配置 Pod

在 Pod 的 YAML 文件中,指定使用你的自定义调度器。

apiVersion: v1
kind: Pod
metadata:
  name: my-pod
spec:
  schedulerName: my-scheduler # 指定使用自定义调度器
  containers:
  - name: my-container
    image: nginx:latest

五、方案二:调度器扩展 (Scheduler Extender)

这就像在现有的婚介所里,添加一些 VIP 服务,满足特殊客户的需求。 你不需要从零开始实现调度逻辑,只需要添加一些自定义的过滤和打分规则。

1. 编写扩展逻辑

你需要实现一些 HTTP Handler,用于接收默认调度器的请求,然后根据你的业务需求,进行过滤和打分。

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"

    corev1 "k8s.io/api/core/v1"
)

// ExtenderArgs 定义了扩展器的请求参数
type ExtenderArgs struct {
    Pod           corev1.Pod            `json:"pod"`
    Nodes         []corev1.Node         `json:"nodes"`
    NodeNames     *[]string             `json:"nodeNames,omitempty"`
    Preemptible   bool                  `json:"preemptible"`
    SuggestedHost string                `json:"suggestedHost"`
    Hints         map[string]string     `json:"hints"`
}

// ExtenderFilterResult 定义了过滤结果
type ExtenderFilterResult struct {
    Nodes     *[]corev1.Node `json:"nodes,omitempty"`
    NodeNames *[]string      `json:"nodeNames,omitempty"`
    FailedNodes map[string]string `json:"failedNodes,omitempty"`
    Error     string       `json:"error,omitempty"`
}

// ExtenderPrioritizeResult 定义了打分结果
type ExtenderPrioritizeResult struct {
    HostPriorityList []HostPriority `json:"HostPriorityList,omitempty"`
    Error     string       `json:"error,omitempty"`
}

// HostPriority 定义了 Node 的优先级
type HostPriority struct {
    Host  string `json:"Host"`
    Score int    `json:"Score"`
}

func main() {
    http.HandleFunc("/filter", filterHandler)
    http.HandleFunc("/prioritize", prioritizeHandler)

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    log.Printf("Scheduler extender listening on port %s", port)
    log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}

// filterHandler 处理过滤请求
func filterHandler(w http.ResponseWriter, r *http.Request) {
    var args ExtenderArgs
    if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // 1. 过滤 Node
    filteredNodes := filterNodes(args.Nodes, &args.Pod)

    // 2. 构造过滤结果
    result := ExtenderFilterResult{
        Nodes: &filteredNodes,
    }

    // 3. 返回结果
    w.Header().Set("Content-Type", "application/json")
    if err := json.NewEncoder(w).Encode(result); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
}

// prioritizeHandler 处理打分请求
func prioritizeHandler(w http.ResponseWriter, r *http.Request) {
    var args ExtenderArgs
    if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // 1. 给 Node 打分
    priorityList := prioritizeNodes(args.Nodes, &args.Pod)

    // 2. 构造打分结果
    result := ExtenderPrioritizeResult{
        HostPriorityList: priorityList,
    }

    // 3. 返回结果
    w.Header().Set("Content-Type", "application/json")
    if err := json.NewEncoder(w).Encode(result); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
}

// filterNodes 过滤 Node
func filterNodes(nodes []corev1.Node, pod *corev1.Pod) []corev1.Node {
    filteredNodes := []corev1.Node{}
    for _, node := range nodes {
        // 简单的过滤规则:只选择带有 "disktype=ssd" 标签的 Node
        if node.Labels["disktype"] == "ssd" {
            filteredNodes = append(filteredNodes, node)
        }
    }
    return filteredNodes
}

// prioritizeNodes 给 Node 打分
func prioritizeNodes(nodes []corev1.Node, pod *corev1.Pod) []HostPriority {
    priorityList := []HostPriority{}
    for _, node := range nodes {
        // 简单的打分规则:Node 的 CPU 核心数越多,得分越高
        score := int(node.Status.Capacity.Cpu().Value())
        priorityList = append(priorityList, HostPriority{
            Host:  node.Name,
            Score: score,
        })
    }
    return priorityList
}

代码解释:

  • 定义数据结构: 定义了 ExtenderArgsExtenderFilterResultExtenderPrioritizeResult 等数据结构,用于接收和返回请求参数和结果。
  • 实现 HTTP Handler: 实现了 filterHandlerprioritizeHandler 两个 HTTP Handler,分别用于处理过滤和打分请求。
  • 过滤 Node: filterNodes 函数实现了简单的过滤规则,这里只选择带有 "disktype=ssd" 标签的 Node。
  • 给 Node 打分: prioritizeNodes 函数实现了简单的打分规则,这里 Node 的 CPU 核心数越多,得分越高。

2. 打包部署

将你的代码打包成 Docker 镜像,然后部署到 Kubernetes 集群中。

Dockerfile 与独立调度器类似,只是构建命令稍有不同。

3. 配置调度器

你需要修改 Kubernetes 默认调度器的配置,告诉它使用你的调度器扩展。

apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
algorithmSource:
  policy:
    configMap:
      name: scheduler-policy
      namespace: kube-system

创建一个 ConfigMap,指定调度策略:

apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-policy
  namespace: kube-system
data:
  policy.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1beta3
    kind: Policy
    extenders:
    - urlPrefix: "http://my-scheduler-extender:8080" # 你的调度器扩展的地址
      filterVerb: "filter"
      prioritizeVerb: "prioritize"
      weight: 1
      enableHTTPS: false
      nodeCacheCapable: false

4. 配置 Pod

在 Pod 的 YAML 文件中,不需要指定 schedulerName 字段。 默认调度器会根据配置,自动调用你的调度器扩展。

六、总结

无论是选择独立调度器还是调度器扩展,都需要根据自己的业务需求进行选择。 独立调度器更灵活,但工作量更大。 调度器扩展更简单,但灵活性有限。

总而言之,自定义调度器就像给你的 Kubernetes 集群穿上了一件量身定制的“战袍”,让你的 Pod 能够更好地适应各种复杂的环境,跑得更快、更稳、更happy! 😁

希望今天的分享对大家有所帮助。 如果你觉得这篇文章还不错,记得点赞、评论、转发三连哦! 咱们下期再见! 🚀

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注