好的,各位观众老爷,各位技术大拿,还有屏幕前正在啃指甲盖儿的未来架构师们,大家好!我是你们的老朋友,专门负责把高深莫测的技术概念,用大白话揉碎了喂给你的“技术段子手”——码农张三! 👋
今天咱们要聊的,可不是什么“Hello World”,而是 Kubernetes 里头的“高定西装”——自定义调度器(Custom Scheduler)。 Kubernetes 本身自带的调度器已经很智能了,但就像商场里买的西装,总觉得哪里不太合身。为了让我们的 Pod 穿得更舒服,跑得更欢快,就需要量身定制一套调度方案。
一、 调度器:K8s 的“红娘”
首先,咱们得搞清楚调度器是干嘛的。 想象一下,Kubernetes 是一个大型的婚恋网站,里面住着各种各样的“Pod 小伙子”和“Node 姑娘”。 Pod 小伙子们都想找个条件好、性格合适的 Node 姑娘安家落户,而调度器就是这个“红娘”。
调度器负责把 Pod 分配到最合适的 Node 上运行。 它会考虑 Node 的资源(CPU、内存)、标签、污点等等,然后根据一定的算法,给每个 Pod 找到一个“良配”。
默认的调度器已经很能干了,但它就像一个“大众红娘”,只能照顾到大多数的需求。 如果你的 Pod 有特殊癖好,比如:
- 死活要和数据库待在一起: 不然就闹脾气,数据访问延迟高得让人抓狂。
- 对 GPU 情有独钟: 没有 GPU 就跑不动,简直是“显卡依赖症”。
- 必须住在特定的机房: 不然就水土不服,延迟高到怀疑人生。
这时候,你就需要一个“私人定制”的红娘——自定义调度器!
二、为什么要“私人定制”?
就像选对象一样,适合自己的才是最好的。 自定义调度器可以让你根据自己的业务需求,灵活地控制 Pod 的调度行为。 它可以:
- 优化资源利用率: 把 Pod 放到最合适的 Node 上,避免资源浪费。
- 提高应用性能: 让 Pod 离它依赖的服务更近,减少延迟。
- 实现高级调度策略: 比如亲和性调度、反亲和性调度、拓扑感知调度等等。
- 满足特殊业务需求: 比如机器学习任务需要 GPU,大数据任务需要大内存等等。
总之,自定义调度器就像一把瑞士军刀,可以帮你解决各种各样的调度难题。
三、 “私人定制”的步骤
那么,如何才能打造一个属于自己的“红娘”呢? 别怕,其实也没那么难。 大致可以分为以下几个步骤:
- 明确需求: 首先,你要搞清楚你的 Pod 到底有什么特殊癖好。 比如,你希望把所有属于同一个服务的 Pod 都放到同一个可用区内,以提高可用性。
-
选择方案: 目前有两种主流的自定义调度器方案:
- 独立调度器 (Standalone Scheduler): 完全从零开始,自己实现调度逻辑。 就像自己开一家婚介所,所有流程都自己掌控。 优点是灵活,缺点是工作量大。
- 调度器扩展 (Scheduler Extender): 在默认调度器的基础上,添加一些自定义的过滤和打分规则。 就像在现有的婚介所里,添加一些 VIP 服务,满足特殊客户的需求。 优点是简单,缺点是灵活性有限。
- 编写代码: 根据你选择的方案,编写调度逻辑的代码。 这部分是核心,也是最难的部分。
- 打包部署: 将你的代码打包成镜像,然后部署到 Kubernetes 集群中。
- 配置 Pod: 在 Pod 的 YAML 文件中,指定使用你的自定义调度器。
下面我们详细讲解两种方案的具体实施过程。
四、方案一:独立调度器 (Standalone Scheduler)
这就像自己开一家婚介所,从选址、装修到招聘红娘,所有的事情都要自己操心。
1. 编写调度逻辑
这部分是核心,也是最难的部分。 你需要自己实现调度算法,从 Kubernetes API Server 获取 Pod 和 Node 的信息,然后根据你的业务需求,给每个 Pod 找到一个合适的 Node。
可以使用 Go 语言,因为 Kubernetes 本身就是用 Go 写的,方便集成。
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
// 1. 创建 Kubernetes 客户端
config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Failed to create in-cluster config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create clientset: %v", err)
}
// 2. 监听未调度的 Pod
watch, err := clientset.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=", // 只监听未调度的 Pod
})
if err != nil {
log.Fatalf("Failed to watch pods: %v", err)
}
// 3. 调度循环
for event := range watch.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
log.Printf("Unexpected type: %T", event.Object)
continue
}
if event.Type == "ADDED" {
log.Printf("Found unscheduled pod: %s/%s", pod.Namespace, pod.Name)
// 4. 调度 Pod
nodeName, err := schedulePod(clientset, pod)
if err != nil {
log.Printf("Failed to schedule pod %s/%s: %v", pod.Namespace, pod.Name, err)
continue
}
log.Printf("Scheduled pod %s/%s to node %s", pod.Namespace, pod.Name, nodeName)
}
}
}
// schedulePod 调度 Pod 到合适的 Node
func schedulePod(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) {
// 1. 获取所有可用的 Node
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("failed to list nodes: %v", err)
}
// 2. 简单的调度算法:随机选择一个 Node
if len(nodes.Items) == 0 {
return "", fmt.Errorf("no available nodes")
}
node := nodes.Items[0] // 简单起见,选择第一个 Node
// 3. 将 Pod 绑定到 Node
err = bindPodToNode(clientset, pod, node.Name)
if err != nil {
return "", fmt.Errorf("failed to bind pod to node %s: %v", node.Name, err)
}
return node.Name, nil
}
// bindPodToNode 将 Pod 绑定到 Node
func bindPodToNode(clientset *kubernetes.Clientset, pod *corev1.Pod, nodeName string) error {
binding := &corev1.Binding{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Binding",
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
Target: corev1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}
err := clientset.CoreV1().Pods(pod.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
return err
}
代码解释:
- 创建 Kubernetes 客户端: 使用
client-go
库,连接到 Kubernetes API Server。 - 监听未调度的 Pod: 使用
Watch
API,监听spec.nodeName
为空的 Pod,也就是未调度的 Pod。 - 调度循环: 不断地从
Watch
API 获取事件,如果发现有新的 Pod,就调用schedulePod
函数进行调度。 - 调度算法:
schedulePod
函数实现了简单的调度算法,这里只是随机选择一个 Node。 你可以根据自己的需求,实现更复杂的算法。 比如,可以考虑 Node 的资源利用率、标签、污点等等。 - 绑定 Pod 到 Node: 使用
Bind
API,将 Pod 绑定到选定的 Node 上。
2. 打包部署
将你的代码打包成 Docker 镜像,然后部署到 Kubernetes 集群中。
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o scheduler .
FROM alpine:latest
WORKDIR /app
COPY --from=builder /app/scheduler .
ENTRYPOINT ["./scheduler"]
3. 配置 Pod
在 Pod 的 YAML 文件中,指定使用你的自定义调度器。
apiVersion: v1
kind: Pod
metadata:
name: my-pod
spec:
schedulerName: my-scheduler # 指定使用自定义调度器
containers:
- name: my-container
image: nginx:latest
五、方案二:调度器扩展 (Scheduler Extender)
这就像在现有的婚介所里,添加一些 VIP 服务,满足特殊客户的需求。 你不需要从零开始实现调度逻辑,只需要添加一些自定义的过滤和打分规则。
1. 编写扩展逻辑
你需要实现一些 HTTP Handler,用于接收默认调度器的请求,然后根据你的业务需求,进行过滤和打分。
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
corev1 "k8s.io/api/core/v1"
)
// ExtenderArgs 定义了扩展器的请求参数
type ExtenderArgs struct {
Pod corev1.Pod `json:"pod"`
Nodes []corev1.Node `json:"nodes"`
NodeNames *[]string `json:"nodeNames,omitempty"`
Preemptible bool `json:"preemptible"`
SuggestedHost string `json:"suggestedHost"`
Hints map[string]string `json:"hints"`
}
// ExtenderFilterResult 定义了过滤结果
type ExtenderFilterResult struct {
Nodes *[]corev1.Node `json:"nodes,omitempty"`
NodeNames *[]string `json:"nodeNames,omitempty"`
FailedNodes map[string]string `json:"failedNodes,omitempty"`
Error string `json:"error,omitempty"`
}
// ExtenderPrioritizeResult 定义了打分结果
type ExtenderPrioritizeResult struct {
HostPriorityList []HostPriority `json:"HostPriorityList,omitempty"`
Error string `json:"error,omitempty"`
}
// HostPriority 定义了 Node 的优先级
type HostPriority struct {
Host string `json:"Host"`
Score int `json:"Score"`
}
func main() {
http.HandleFunc("/filter", filterHandler)
http.HandleFunc("/prioritize", prioritizeHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Printf("Scheduler extender listening on port %s", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}
// filterHandler 处理过滤请求
func filterHandler(w http.ResponseWriter, r *http.Request) {
var args ExtenderArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 1. 过滤 Node
filteredNodes := filterNodes(args.Nodes, &args.Pod)
// 2. 构造过滤结果
result := ExtenderFilterResult{
Nodes: &filteredNodes,
}
// 3. 返回结果
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// prioritizeHandler 处理打分请求
func prioritizeHandler(w http.ResponseWriter, r *http.Request) {
var args ExtenderArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 1. 给 Node 打分
priorityList := prioritizeNodes(args.Nodes, &args.Pod)
// 2. 构造打分结果
result := ExtenderPrioritizeResult{
HostPriorityList: priorityList,
}
// 3. 返回结果
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// filterNodes 过滤 Node
func filterNodes(nodes []corev1.Node, pod *corev1.Pod) []corev1.Node {
filteredNodes := []corev1.Node{}
for _, node := range nodes {
// 简单的过滤规则:只选择带有 "disktype=ssd" 标签的 Node
if node.Labels["disktype"] == "ssd" {
filteredNodes = append(filteredNodes, node)
}
}
return filteredNodes
}
// prioritizeNodes 给 Node 打分
func prioritizeNodes(nodes []corev1.Node, pod *corev1.Pod) []HostPriority {
priorityList := []HostPriority{}
for _, node := range nodes {
// 简单的打分规则:Node 的 CPU 核心数越多,得分越高
score := int(node.Status.Capacity.Cpu().Value())
priorityList = append(priorityList, HostPriority{
Host: node.Name,
Score: score,
})
}
return priorityList
}
代码解释:
- 定义数据结构: 定义了
ExtenderArgs
、ExtenderFilterResult
和ExtenderPrioritizeResult
等数据结构,用于接收和返回请求参数和结果。 - 实现 HTTP Handler: 实现了
filterHandler
和prioritizeHandler
两个 HTTP Handler,分别用于处理过滤和打分请求。 - 过滤 Node:
filterNodes
函数实现了简单的过滤规则,这里只选择带有"disktype=ssd"
标签的 Node。 - 给 Node 打分:
prioritizeNodes
函数实现了简单的打分规则,这里 Node 的 CPU 核心数越多,得分越高。
2. 打包部署
将你的代码打包成 Docker 镜像,然后部署到 Kubernetes 集群中。
Dockerfile 与独立调度器类似,只是构建命令稍有不同。
3. 配置调度器
你需要修改 Kubernetes 默认调度器的配置,告诉它使用你的调度器扩展。
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
algorithmSource:
policy:
configMap:
name: scheduler-policy
namespace: kube-system
创建一个 ConfigMap,指定调度策略:
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-policy
namespace: kube-system
data:
policy.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: Policy
extenders:
- urlPrefix: "http://my-scheduler-extender:8080" # 你的调度器扩展的地址
filterVerb: "filter"
prioritizeVerb: "prioritize"
weight: 1
enableHTTPS: false
nodeCacheCapable: false
4. 配置 Pod
在 Pod 的 YAML 文件中,不需要指定 schedulerName
字段。 默认调度器会根据配置,自动调用你的调度器扩展。
六、总结
无论是选择独立调度器还是调度器扩展,都需要根据自己的业务需求进行选择。 独立调度器更灵活,但工作量更大。 调度器扩展更简单,但灵活性有限。
总而言之,自定义调度器就像给你的 Kubernetes 集群穿上了一件量身定制的“战袍”,让你的 Pod 能够更好地适应各种复杂的环境,跑得更快、更稳、更happy! 😁
希望今天的分享对大家有所帮助。 如果你觉得这篇文章还不错,记得点赞、评论、转发三连哦! 咱们下期再见! 🚀