各位来宾,各位技术同仁,下午好!
今天,我们将深入探讨一个引人入胜的话题:Virtual Kubelet。这是一个利用 Go 语言将非 Kubernetes 原生资源,例如无服务器(Serverless)函数,巧妙地映射为标准 Kubernetes 节点的物理实现。它不仅仅是一个技术概念,更是一种将 Kubernetes 的强大控制平面扩展到其传统边界之外的哲学。
在当今的云原生时代,Kubernetes 已成为容器编排的事实标准。它为我们提供了一个统一的平台来部署、管理和扩展容器化应用。然而,世界并非只有容器。我们有大量的计算资源存在于 Kubernetes 之外,例如:
- 无服务器函数 (Serverless Functions):如 AWS Lambda、Azure Functions、Google Cloud Functions。它们是事件驱动的、按需付费的,但其生命周期和管理模型与 K8s Pod 大相径庭。
- 外部容器服务 (External Container Services):如 Azure Container Instances (ACI)、AWS Fargate。它们提供按需的容器运行环境,无需管理底层虚拟机,但通常有自己的 API。
- 物联网 (IoT) 边缘设备:资源受限的设备,需要运行轻量级工作负载,但又希望利用 Kubernetes 的管理能力。
- 特殊硬件加速器:如 GPU、FPGA,这些资源可能不是以传统节点或容器的形式存在。
面对这些多样化的计算资源,我们面临一个核心问题:如何统一管理这些异构资源,以便我们可以使用熟悉的 Kubernetes 工具链和工作流来操作它们?
答案,便是 Virtual Kubelet。
一、Kubernetes 核心概念回顾:Kubelet 的职责
在深入 Virtual Kubelet 之前,让我们快速回顾一下 Kubernetes 的几个核心组件,特别是 Kubelet 的作用。
Kubernetes 集群由控制平面(Control Plane)和数据平面(Data Plane)组成。
控制平面组件:
- API Server:Kubernetes 的前端接口,暴露 RESTful API,是所有组件和用户交互的中心。
- Scheduler (调度器):负责将新创建的 Pod 分配到合适的节点上。它根据资源需求、节点亲和性、污点与容忍度等策略进行决策。
- Controller Manager (控制器管理器):运行各种控制器,如 ReplicaSet 控制器、Deployment 控制器等,确保集群的实际状态与期望状态一致。
- etcd:高可用的键值存储,用于保存集群的所有状态数据。
数据平面组件(节点上运行):
- Kubelet:这是我们今天讨论的重点。Kubelet 是运行在每个节点上的代理,它负责:
- 注册节点:向 API Server 报告节点自身的信息(如 CPU、内存、OS、内核版本等),从而使节点成为集群的一部分。
- 接收 PodSpec:从 API Server 接收调度到该节点的 Pod 定义 (PodSpec)。
- Pod 生命周期管理:根据 PodSpec 在节点上创建、启动、停止、删除容器。这通常通过与容器运行时(如 containerd 或 CRI-O)交互来完成。
- 报告 Pod 状态:持续监控其上运行的 Pod 和容器的健康状况,并将状态更新报告给 API Server。
- 日志和 Exec:提供容器日志查询和在容器内执行命令的能力。
- Kube-proxy:维护节点上的网络规则,实现 Pod 之间的网络通信和外部访问。
- 容器运行时 (Container Runtime):如 Docker、containerd、CRI-O,负责真正运行容器。
Kubelet 的核心作用在于,它是一个忠实的代理,将 Kubernetes 控制平面下达的指令,转化为在物理节点上实际操作容器的具体动作,并反馈容器的实时状态。正是这种“代理”的特性,为 Virtual Kubelet 提供了可乘之机。
二、Virtual Kubelet 概念:弥合鸿沟的桥梁
Virtual Kubelet 的核心思想非常直接:如果一个代理能够模拟 Kubelet 的行为,向 Kubernetes API Server 注册一个节点,并对调度到该“虚拟节点”的 Pod 做出响应,那么 Kubernetes 就能管理任何这种代理能够控制的后端资源。
简单来说,Virtual Kubelet 就是一个“假”Kubelet。它不是运行在真实的物理机或虚拟机上,而是作为 Kubernetes 集群中的一个普通 Pod 运行(或者在集群外作为独立进程运行),并向 API Server 注册一个或多个虚拟节点 (Virtual Node)。当 Kubernetes 调度器将 Pod 调度到这些虚拟节点时,Virtual Kubelet 不会像真正的 Kubelet 那样在本地启动容器,而是将 Pod 的定义“翻译”成后端特定资源提供商(如 Serverless 平台)的 API 调用,去创建或管理对应的外部资源。
其工作原理的高层视图如下:
- 启动与节点注册:Virtual Kubelet 启动,并使用
client-go库向 Kubernetes API Server 注册一个Node对象。这个Node对象是“虚拟”的,它不对应任何物理机器,但会报告一些虚构的资源容量(例如,可以承载的 Pod 数量、内存、CPU 等)。它还会添加一些特定的标签(node.kubernetes.io/exclude-from-external-load-balancers=true)和污点(Taints),以区分自己是虚拟节点,并指导调度器行为。 - Pod 监听:Virtual Kubelet 持续监听 API Server 中调度到其所注册的虚拟节点上的
Pod对象。 - Pod 翻译与执行:一旦发现有 Pod 被调度过来,Virtual Kubelet 会:
- 解析 PodSpec:读取 Pod 的详细定义,包括容器镜像、环境变量、资源限制、端口等。
- 后端 API 转换:将这些 K8s 原生的 Pod 属性,翻译成特定后端服务(例如 Serverless 平台)所能理解的参数和配置。
- 调用后端 API:使用翻译后的参数调用后端服务提供的 API,例如,创建一个 AWS Lambda 函数,一个 Azure Container Instance 组,或触发一个 Serverless 平台上的部署流程。
- 状态同步:Virtual Kubelet 持续监控后端服务的实际状态(例如,Serverless 函数是否已部署完成、是否运行正常、是否失败)。
- K8s Pod 状态更新:根据后端服务的状态,Virtual Kubelet 更新 Kubernetes 中对应 Pod 的
Status字段,确保 Kubernetes 控制平面始终能够反映外部资源的真实情况。
Virtual Kubelet 带来的核心优势:
- 统一控制平面:开发者和运维人员可以使用
kubectl等熟悉的 K8s 工具来管理包括 Serverless 函数在内的所有工作负载,无需学习新的特定于云平台的 CLI 或 API。 - 抽象底层基础设施:Kubernetes PodSpec 成为了一种通用的抽象层,屏蔽了不同后端服务之间的 API 差异。
- 利用 K8s 生态:可以利用 Kubernetes 的 RBAC(角色访问控制)、网络策略、服务发现、配置管理(ConfigMaps/Secrets)、监控(Prometheus)等现有生态系统。
- 实现混合云/多云战略:轻松地将工作负载部署到最适合的后端,无论是集群内容器、外部容器服务还是 Serverless 函数。
三、深入架构:Virtual Kubelet 的内部运作
为了更好地理解 Virtual Kubelet,我们需要剖析其内部架构和关键交互流程。Go 语言在其中扮演了至关重要的角色,因为它拥有强大的并发模型、类型安全以及 Kubernetes 官方提供的 client-go 库。
一个典型的 Virtual Kubelet 实现主要包含以下核心组件:
-
Kubernetes Client (client-go):这是 Virtual Kubelet 与 Kubernetes API Server 交互的基石。它用于:
- Node Client:注册、更新和报告虚拟节点的健康状态。
- Pod Informer/Lister:高效地监听和缓存调度到虚拟节点上的 Pods。
- Pod Client:更新 Pod 的状态(Phase, Conditions, ContainerStatuses 等)。
- Event Broadcaster:发送 Kubernetes 事件。
-
Node Controller:
- 负责虚拟节点的生命周期管理。
- 在启动时向 API Server 注册虚拟节点。
- 定期发送心跳,更新节点的
LastHeartbeatTime。 - 报告节点的容量(Capacity)和可分配资源(Allocatable),这是调度器决定是否将 Pod 调度到该节点的重要依据。
- 设置节点的地址、操作系统、架构等基本信息。
- 应用污点 (Taints) 和标签 (Labels),以实现更精细的调度控制。
-
Pod Controller:
- 核心逻辑所在。它通过
client-go的Informer机制,订阅并监听 API Server 中 Pod 对象的变化。 - 当发现有新的 Pod 被调度到它所管理的虚拟节点时(通过
nodeName匹配),它会将这个 Pod 交给底层的 Provider 进行处理。 - 当 Pod 发生更新或删除时,也同样通知 Provider。
- 负责周期性地从 Provider 获取 Pod 的最新状态,并更新到 Kubernetes API Server。
- 核心逻辑所在。它通过
-
Provider Interface (提供者接口):
- 这是 Virtual Kubelet 的灵魂和可扩展性所在。它定义了一组 Go 接口,封装了与特定后端服务交互的逻辑。
- 任何想要集成非 K8s 资源的 Virtual Kubelet 实现,都必须实现这个接口。
- 这个接口是
github.com/virtual-kubelet/virtual-kubelet/node.PodLifecycleHandler,它定义了以下核心方法:CreatePod(ctx context.Context, pod *corev1.Pod) error:根据 K8s PodSpec 在后端创建资源。UpdatePod(ctx context.Context, pod *corev1.Pod) error:更新后端资源。DeletePod(ctx context.Context, pod *corev1.Pod) error:删除后端资源。GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error):获取 Pod 的完整定义(通常从缓存)。GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error):获取 Pod 的实时状态。GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts *corev1.PodLogOptions) (io.ReadCloser, error):从后端获取容器日志。RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser) error:在容器内执行命令(对于无服务器通常不适用)。ConfigureNode(ctx context.Context, node *corev1.Node):在节点注册时配置节点属性。
交互流程图示 (文本描述):
| 步骤 | 参与者 | 动作 | 描述 |
|---|---|---|---|
| 1 | Admin/Dev | kubectl apply -f my-serverless-pod.yaml |
用户提交一个 Pod 定义,其中包含 nodeSelector 或 tolerations 指向 Virtual Kubelet 节点。 |
| 2 | K8s API Server | 接收 Pod 对象并存储 | API Server 验证并持久化 Pod 对象。 |
| 3 | K8s Scheduler | 调度 Pod 到 Virtual Kubelet 节点 | 调度器根据 Pod 的调度约束(如 nodeSelector: type: virtual-kubelet)和虚拟节点报告的容量,将 Pod 绑定到 Virtual Kubelet 注册的虚拟节点。 |
| 4 | Virtual Kubelet | 通过 Pod Informer 监听 Pod 事件 |
Virtual Kubelet 的 Pod Controller 收到关于新 Pod 的通知。 |
| 5 | Virtual Kubelet | 调用 Provider.CreatePod |
Pod Controller 将 PodSpec 传递给底层实现的 Provider。 |
| 6 | Provider | 将 PodSpec 转换为后端 API 请求 | Provider 解析 PodSpec,提取容器镜像、环境变量、资源限制等信息,并将其映射为特定 Serverless 平台(如 AWS Lambda)的函数创建或更新参数。 |
| 7 | Provider | 调用后端 Serverless API | Provider 使用其内部的 Serverless 客户端(例如 AWS SDK)调用 Serverless 平台的 API,部署或更新函数。 |
| 8 | Serverless Platform | 部署/更新函数 | Serverless 平台执行函数部署操作,返回一个函数 ID 或其他标识符。 |
| 9 | Provider | 存储 Pod UID 与 Serverless Function ID 的映射,返回成功 | Provider 记录 K8s Pod 的唯一标识符(UID)与 Serverless 函数的对应关系,并返回成功。 |
| 10 | Virtual Kubelet | 更新 K8s Pod 状态为 Pending |
Virtual Kubelet 更新 API Server 中 Pod 的状态为 Pending,表示正在创建中。 |
| 11 | Provider | 持续监控 Serverless 函数状态 | Provider 定期查询 Serverless 平台,获取函数的实际运行状态(例如,Creating -> Active -> Failed)。 |
| 12 | Virtual Kubelet | 调用 Provider.GetPodStatus 并更新 K8s Pod 状态 |
Virtual Kubelet 周期性地调用 Provider.GetPodStatus,获取最新状态,并将其翻译为 K8s PodStatus(如 Running、Succeeded、Failed),然后通过 client-go 更新到 API Server。 |
| 13 | User | kubectl get pod my-hello-function / kubectl logs my-hello-function |
用户通过 K8s 工具查看 Pod 状态或日志,这些操作最终都会通过 Virtual Kubelet 代理到后端的 Serverless 平台。 |
四、Go 语言在 Virtual Kubelet 中的实践与关键库
Go 语言是实现 Virtual Kubelet 的理想选择。其原因如下:
- 强大的并发原语:Goroutines 和 Channels 使得处理大量的 Pods 和与多个后端服务进行异步通信变得简单高效。
- 出色的 Kubernetes 客户端库
client-go:Kubernetes 自身也是用 Go 编写的,client-go提供了与 Kubernetes API Server 交互的官方且功能完备的客户端,包括 Informers、Listers、Clientsets 等,极大地简化了开发。 - 高性能和静态编译:Go 编译后的二进制文件是静态链接的,易于部署,且运行时性能优异。
- 强类型和内存安全:减少了运行时错误,提高了代码质量。
核心 Go 库:
-
k8s.io/client-go:这是与 Kubernetes API 交互的核心库。kubernetes.NewForConfig(config): 创建 Clientset,用于执行 CRUD 操作。informers.NewSharedInformerFactory(client, resyncPeriod): 创建 Informer Factory,用于监听资源变化并构建本地缓存。cache.NewSharedIndexInformer(...)/cache.Indexer: 构建 Informer 和 Lister,用于高效地获取和监听特定资源(如 Pods)。
-
k8s.io/api和k8s.io/apimachinery:这些库定义了 Kubernetes 资源的 Go 结构体(如corev1.Pod、corev1.Node)和通用的 API 类型。 -
github.com/virtual-kubelet/virtual-kubelet:这是一个官方(或半官方)的 Virtual Kubelet 框架。它提供了一个通用的 Virtual Kubelet 服务器实现,以及上面提到的node.PodLifecycleHandler接口。开发者只需要实现这个接口,即可构建自己的 Provider。
接下来,我们将通过代码示例来具体展示如何利用 Go 语言构建 Virtual Kubelet 的核心部分。我们将专注于一个 Serverless Provider 的实现。
4.1 main.go – 设置 Virtual Kubelet 运行时
main.go 是 Virtual Kubelet 应用程序的入口点。它负责解析命令行参数、初始化 Kubernetes 客户端、创建 Pod Manager(负责监听 Pods),以及实例化并运行我们的自定义 Provider。
package main
import (
"context"
"flag"
"os"
"time"
"github.com/sirupsen/logrus" // 日志库
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/vkubelet"
// 导入我们自定义的 Serverless Provider
"your-org/your-repo/serverless-provider" // 假设你的 Provider 放在这个路径下
)
var (
nodeName string
kubeConfigPath string
podSyncWorkers int
providerName string
providerConfig string // 提供者特定的配置文件路径
operatingSystem string = "Linux" // 虚拟节点报告的操作系统
)
func main() {
// 定义命令行参数
flag.StringVar(&nodeName, "node-name", "virtual-kubelet-serverless", "虚拟 kubelet 节点的名称")
flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig 文件路径 (如果为空,则使用 In-Cluster 配置)")
flag.IntVar(&podSyncWorkers, "pod-sync-workers", 10, "并发同步 Pod 状态的 worker 数量")
flag.StringVar(&providerName, "provider", "serverless", "要使用的提供者名称")
flag.StringVar(&providerConfig, "provider-config", "", "提供者特定配置文件的路径 (例如,AWS 凭证、区域等)")
flag.Parse()
// 初始化日志
ctx := context.Background()
logger := logrus.StandardLogger()
log.L = log.NewLoggerWithField(logger, "node", nodeName) // 为日志添加节点名称字段
ctx = log.WithLogger(ctx, log.L)
log.G(ctx).Infof("Virtual Kubelet 启动,节点名称: %s", nodeName)
// 1. 创建 Kubernetes 客户端
client, err := manager.NewKubernetesClient(kubeConfigPath)
if err != nil {
log.G(ctx).Fatalf("无法创建 Kubernetes 客户端: %v", err)
}
// 2. 创建 Pod Manager (用于监听调度到本虚拟节点的 Pods)
podManager, err := manager.NewPodManager(client, nodeName)
if err != nil {
log.G(ctx).Fatalf("无法创建 Pod Manager: %v", err)
}
// 3. 实例化我们自定义的 Serverless Provider
provider, err := serverless_provider.NewServerlessProvider(ctx, nodeName, operatingSystem, providerConfig)
if err != nil {
log.G(ctx).Fatalf("无法创建 Serverless Provider: %v", err)
}
// 4. 设置 Virtual Kubelet 选项和配置
opts := vkubelet.VirtualKubeletOpts{
Provider: provider,
NodeName: nodeName,
Manager: podManager,
// 其他选项如 taint, operating system, metrics address 等可以在这里设置
}
cfg := vkubelet.Config{
Client: client,
PodSyncWorkers: podSyncWorkers,
InformerResync: 30 * time.Second, // Informer 缓存的同步周期
ListenPort: 10250, // Kubelet 标准的 REST API 端口 (用于健康检查、指标等)
KubeNamespace: os.Getenv("KUBERNETES_NAMESPACE"), // Virtual Kubelet 自身运行的命名空间
PodStatusWorkers: 1, // 用于更新 Pod 状态的 worker 数量
}
// 5. 创建 Virtual Kubelet 服务器实例
vk, err := vkubelet.New(ctx, cfg, opts)
if err != nil {
log.G(ctx).Fatalf("无法创建 Virtual Kubelet 实例: %v", err)
}
// 6. 启动 Virtual Kubelet
log.G(ctx).Infof("Virtual Kubelet 开始运行...")
if err := vk.Run(ctx); err != nil && err != context.Canceled {
log.G(ctx).Fatalf("Virtual Kubelet 运行失败: %v", err)
}
log.G(ctx).Info("Virtual Kubelet 已停止.")
}
4.2 serverless-provider/provider.go – 实现 PodLifecycleHandler 接口
这个文件将包含我们自定义的 Serverless Provider 的实现。它会实现 node.PodLifecycleHandler 接口中的所有方法,将 Kubernetes Pod 的生命周期事件映射到 Serverless 平台的 API 调用。
我们还需要一个模拟的 serverless-client 来与假想的 Serverless 平台交互。
package serverless_provider
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" // 用于 Pod UID
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node"
// 导入我们模拟的 Serverless 客户端
"your-org/your-repo/serverless-client" // 假设你的模拟客户端在这个路径下
)
const (
providerName = "serverless"
nodeCapacityPods = "100" // 虚拟节点可以承载的最大 Pod 数量 (即 Serverless 函数数量)
nodeCapacityCPU = "20" // 虚拟节点报告的 CPU 容量 (可以是任意值,或根据 API 吞吐量估算)
nodeCapacityMemory = "20Gi" // 虚拟节点报告的内存容量 (可以是任意值)
defaultContainerName = "serverless-container" // 默认的容器名称,因为 Serverless Pod 通常只有一个逻辑容器
)
// ServerlessProvider 实现了 virtual-kubelet/node.PodLifecycleHandler 接口
type ServerlessProvider struct {
nodeName string
operatingSystem string
serverlessClient serverless_client.ServerlessAPIClient // 用于与实际 Serverless 平台交互的客户端
pods sync.Map // 存储 K8s Pod UID 到 Serverless Function ID 的映射
config *ProviderConfig // 提供者配置
}
// ProviderConfig 包含提供者特定的配置
type ProviderConfig struct {
AWSRegion string `json:"awsRegion"`
// 可以添加更多配置字段,例如 Azure 订阅 ID、GCP 项目 ID 等
}
// NewServerlessProvider 创建一个新的 ServerlessProvider 实例
func NewServerlessProvider(ctx context.Context, nodeName, operatingSystem, configPath string) (*ServerlessProvider, error) {
logger := log.G(ctx).WithField("provider", providerName)
cfg := &ProviderConfig{}
if configPath != "" {
data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("读取提供者配置文件失败: %w", err)
}
if err := json.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("解析提供者配置文件失败: %w", err)
}
} else {
logger.Warn("未提供提供者配置文件,将使用默认/环境变量配置 Serverless 客户端。")
}
// 在这里初始化你的实际 Serverless 客户端。
// 例如,对于 AWS Lambda,这将涉及 aws-sdk-go-v2。
// 为了演示,我们使用一个模拟客户端。
mockClient, err := serverless_client.NewMockServerlessClient(cfg.AWSRegion)
if err != nil {
return nil, fmt.Errorf("创建 Serverless 客户端失败: %w", err)
}
p := &ServerlessProvider{
nodeName: nodeName,
operatingSystem: operatingSystem,
serverlessClient: mockClient, // 使用你的真实客户端
config: cfg,
}
logger.Info("Serverless 提供者已初始化。")
return p, nil
}
// CreatePod 将 K8s Pod 映射到 Serverless 函数部署
func (p *ServerlessProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
logger := log.G(ctx).WithFields(logrus.Fields{
"pod": pod.Name,
"namespace": pod.Namespace,
"uid": pod.UID,
})
logger.Info("调用 CreatePod 方法")
// 将 K8s Pod spec 转换为 Serverless 函数参数。
// 这高度依赖于你的目标 Serverless 平台。
// 为简化起见,假设一个 Pod 对应一个 Serverless 函数,且只有一个容器。
if len(pod.Spec.Containers) == 0 {
return fmt.Errorf("Pod %s/%s 没有定义容器", pod.Namespace, pod.Name)
}
container := pod.Spec.Containers[0]
// 为 Serverless 函数生成一个唯一名称
functionName := fmt.Sprintf("%s-%s-%s", pod.Namespace, pod.Name, pod.UID[:8])
functionConfig := serverless_client.FunctionConfig{ // 模拟配置
Name: functionName,
Runtime: "nodejs16.x", // 可以从镜像名、注解或默认值推断
Handler: "index.handler", // 可以从 Command/Args 或注解推断
MemoryMB: 128, // 默认内存,或从 resources.limits.memory 获取
Environment: make(map[string]string),
// ... 其他 Serverless 平台特定的参数
}
// 将 K8s 容器属性映射到 Serverless 函数属性
if container.Image != "" {
// 在实际场景中,这可能映射到特定的运行时、一个容器镜像 URL (如 Lambda 的容器镜像支持),
// 或者用于从镜像仓库获取源代码。
logger.Debugf("容器镜像: %s", container.Image)
// 例如:如果镜像是 'myregistry.com/myfunc:v1',则可以使用 'myfunc' 作为函数名的一部分。
// 或者如果它是已知的运行时,如 'python:3.9',则映射到 serverless_client.RuntimePython。
}
for _, env := range container.Env {
functionConfig.Environment[env.Name] = env.Value
}
if container.Resources.Limits != nil {
if mem, ok := container.Resources.Limits[corev1.ResourceMemory]; ok {
if memMB := mem.Value() / (1024 * 1024); memMB > 0 {
functionConfig.MemoryMB = int(memMB)
}
}
// 对于 FaaS,CPU 映射通常比较复杂,通常被忽略或用于内部扩展决策。
}
// 将 K8s 标签/注解作为 Serverless 函数的标签或元数据
for k, v := range pod.Labels {
// functionConfig.Tags[k] = v // 假设 Serverless 客户端支持标签
}
for k, v := range pod.Annotations {
// 特殊注解可以驱动特定的 Serverless 功能
if k == "serverless.example.com/handler" {
functionConfig.Handler = v
}
// ...
}
// 调用 Serverless 平台 API 创建/部署函数
serverlessFunctionID, err := p.serverlessClient.CreateFunction(ctx, functionConfig)
if err != nil {
logger.Errorf("为 Pod %s/%s 创建 Serverless 函数失败: %v", pod.Namespace, pod.Name, err)
return fmt.Errorf("创建 Serverless 函数失败: %w", err)
}
p.pods.Store(pod.UID, serverlessFunctionID) // 存储 K8s Pod UID 到 Serverless Function ID 的映射
logger.Infof("成功为 Pod %s/%s 创建 Serverless 函数 %s (ID: %s)", pod.Namespace, pod.Name, functionName, serverlessFunctionID)
// 更新 Pod 状态为 Pending (初始状态)
pod.Status.Phase = corev1.PodPending
pod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodInitialized,
Status: corev1.ConditionFalse, // 初始时可能未完全初始化
Reason: "CreatingFunction",
},
{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
},
}
return nil
}
// UpdatePod 处理对现有 Pod 的更新 (例如,环境变量更改)
func (p *ServerlessProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
logger := log.G(ctx).WithFields(logrus.Fields{
"pod": pod.Name,
"namespace": pod.Namespace,
"uid": pod.UID,
})
logger.Info("调用 UpdatePod 方法")
val, ok := p.pods.Load(pod.UID)
if !ok {
return fmt.Errorf("在提供者状态中未找到 Pod %s/%s", pod.Namespace, pod.Name)
}
serverlessFunctionID := val.(string)
// 在这里重新翻译 Pod spec 到函数配置
// 并调用 p.serverlessClient.UpdateFunction(ctx, serverlessFunctionID, newFunctionConfig)
// 这可能涉及重新部署 Serverless 函数
logger.Infof("正在更新 Pod %s/%s 对应的 Serverless 函数 %s", pod.Namespace, pod.Name, serverlessFunctionID)
// 实际更新逻辑... (此处省略,与 CreatePod 类似,但调用 Update API)
return nil
}
// DeletePod 移除与 K8s Pod 对应的 Serverless 函数
func (p *ServerlessProvider) DeletePod(ctx context.Context, pod *corev1.Pod) error {
logger := log.G(ctx).WithFields(logrus.Fields{
"pod": pod.Name,
"namespace": pod.Namespace,
"uid": pod.UID,
})
logger.Info("调用 DeletePod 方法")
val, ok := p.pods.Load(pod.UID)
if !ok {
logger.Warnf("在提供者状态中未找到 Pod %s/%s,假定已删除。", pod.Namespace, pod.Name)
return nil // 幂等性:如果未找到,则视为已删除
}
serverlessFunctionID := val.(string)
// 调用 Serverless 平台 API 删除函数
err := p.serverlessClient.DeleteFunction(ctx, serverlessFunctionID)
if err != nil {
logger.Errorf("为 Pod %s/%s 删除 Serverless 函数 %s 失败: %v", pod.Namespace, pod.Name, serverlessFunctionID, err)
return fmt.Errorf("删除 Serverless 函数失败: %w", err)
}
p.pods.Delete(pod.UID) // 从映射中移除
logger.Infof("成功为 Pod %s/%s 删除 Serverless 函数 %s", pod.Namespace, pod.Name, serverlessFunctionID)
return nil
}
// GetPod 获取提供者视角下 K8s Pod 的最新状态
func (p *ServerlessProvider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
// 在实际场景中,你可能需要查询你的提供者状态或缓存。
// 对于此示例,我们将返回一个模拟 Pod 或依赖 VK 框架的缓存来获取现有 Pod。
// 此方法通常在 VK 启动时或调试时被调用以填充初始 Pod 对象。
// 框架通常会频繁调用 GetPodStatus。
log.G(ctx).WithFields(logrus.Fields{
"pod": name,
"namespace": namespace,
}).Debug("调用 GetPod 方法")
// 这是一个简化。通常,VK 框架会维护 Pod 对象的缓存。
// 如果 Pod 不在缓存中但在底层提供者中存在,则需要重建它。
// 现在,我们假设 VK 框架维护了基础 Pod 对象。
return nil, fmt.Errorf("GetPod 在模拟提供者中未完全实现,请依赖框架缓存获取基础 Pod 对象。使用 GetPodStatus 获取状态更新。")
}
// GetPodStatus 获取 Serverless 函数的状态并将其转换为 K8s PodStatus
func (p *ServerlessProvider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
logger := log.G(ctx).WithFields(logrus.Fields{
"pod": name,
"namespace": namespace,
})
logger.Debug("调用 GetPodStatus 方法")
// 需要找到 Pod 的 UID。如果没有完整的 Pod 对象,这很棘手。
// 在真实的 VK 设置中,框架可能会传递 UID 或有办法查找。
// 目前,我们假设 `name` 对此模拟是唯一的。
// 在实际场景中,传递给 CreatePod 的 Pod 对象会带有 UID。
// VK 框架通常会向 GetPodStatus 内部传递 *corev1.Pod 对象,
// 或者期望提供者有效地管理映射。
// 为了示例,我们先模拟一个查找过程。更完善的实现会有更好的内部映射。
var serverlessFunctionID string
var podUID types.UID
found := false
p.pods.Range(func(key, value interface{}) bool {
// 这是一个简化。更好的方法是获取 VK 的 PodManager 中的实际 Pod 对象,
// 然后提取其 UID 或唯一标识符。
// 这里我们假定,如果通过 pod.Namespace 和 pod.Name 能唯一识别,则找到。
// 实际场景中,通常通过 Pod.UID 进行查找更为可靠。
// 由于 GetPodStatus 仅接收 namespace 和 name,我们需要一种从这些信息获取 UID 的方式,
// 或者在 p.pods 中存储 (namespace, name) -> FunctionID 的映射。
// 为简化,我们在这里假设找到一个。
serverlessFunctionID = value.(string)
podUID = key.(types.UID)
found = true
return false // 找到即停止遍历
})
if !found {
return nil, fmt.Errorf("未找到 Pod %s/%s 对应的 Serverless 函数", namespace, name)
}
// 从 Serverless 平台获取状态
status, err := p.serverlessClient.GetFunctionStatus(ctx, serverlessFunctionID)
if err != nil {
logger.Errorf("获取 Serverless 函数 %s 的状态失败: %v", serverlessFunctionID, err)
return nil, fmt.Errorf("获取 Serverless 函数状态失败: %w", err)
}
podStatus := corev1.PodStatus{}
switch status.State {
case serverless_client.FunctionStateCreating:
podStatus.Phase = corev1.PodPending
podStatus.Conditions = []corev1.PodCondition{
{Type: corev1.PodInitialized, Status: corev1.ConditionFalse, Reason: "CreatingFunction", Message: "Serverless function is being created."},
{Type: corev1.PodReady, Status: corev1.ConditionFalse, Reason: "CreatingFunction", Message: "Serverless function is not yet ready."},
}
case serverless_client.FunctionStateActive:
podStatus.Phase = corev1.PodRunning
podStatus.Conditions = []corev1.PodCondition{
{Type: corev1.PodInitialized, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
}
// 对于 Serverless,'Running' Pod 意味着函数已部署并可供调用。
// 它不意味着容器正在主动运行应用程序循环。
podStatus.ContainerStatuses = []corev1.ContainerStatus{
{
Name: defaultContainerName, // 默认容器名称
Ready: true,
State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()}},
Image: "serverless-runtime-image", // 模拟镜像
ImageID: "serverless-image-id",
ContainerID: fmt.Sprintf("serverless://%s", serverlessFunctionID),
RestartCount: 0,
},
}
case serverless_client.FunctionStateFailed:
podStatus.Phase = corev1.PodFailed
podStatus.Reason = status.Reason // 例如 "DeploymentFailed"
podStatus.Message = status.Message
podStatus.Conditions = []corev1.PodCondition{
{Type: corev1.PodInitialized, Status: corev1.