各位观众老爷们,大家好!我是今天的主讲人,江湖人称“代码搬运工”,今天咱们来聊聊如何在 Kubernetes 的世界里,用 Operator 模式优雅地管理 MySQL 集群的生命周期。准备好了吗?坐稳扶好,发车咯!
一、啥是 Operator?为啥要用它管 MySQL?
咱们先来掰扯掰扯啥是 Operator。简单来说,Operator 就是 Kubernetes 的“高级管理员”,它能理解某个特定应用程序(比如 MySQL)的运行逻辑,并根据你定义的配置,自动完成部署、升级、备份、恢复等等操作。
想象一下,没有 Operator 的日子,你得自己手动创建 Deployment、Service、PersistentVolumeClaim,还得操心数据备份、主从切换,一不小心就容易手忙脚乱。有了 Operator,你只需要告诉它“我想要一个包含3个节点的 MySQL 集群”,它就能自动帮你搞定一切,是不是很香?
为啥要用 Operator 管 MySQL 呢?理由如下:
- 自动化运维: 告别手动操作,让 Operator 自动完成部署、升级、备份、恢复等繁琐任务。
- 声明式配置: 通过定义 YAML 文件,声明你期望的 MySQL 集群状态,Operator 会自动将其变为现实。
- 高可用保障: Operator 可以监控 MySQL 集群的健康状态,并在出现故障时自动进行主从切换或节点修复。
- 可扩展性: 轻松扩展 MySQL 集群的规模,只需修改 YAML 文件,Operator 就能自动添加或删除节点。
- 一致性: 确保所有 MySQL 集群都按照相同的配置和策略运行,避免出现配置漂移。
二、Operator 的核心组件:CRD、Controller、Custom Resource
Operator 的核心由三个部分组成,它们协同工作,共同完成对应用程序的管理:
- Custom Resource Definition (CRD): 告诉 Kubernetes “我定义了一种新的资源类型,叫做 MySQLCluster”。
- Custom Resource (CR): 基于 CRD 创建的实例,例如“一个包含3个节点的 MySQL 集群”。
- Controller: 负责监听 CR 的变化,并根据 CR 的配置,执行相应的操作,比如创建 Deployment、Service 等。
可以用一个简单的表格来概括一下:
组件 | 作用 | 举例 |
---|---|---|
Custom Resource Definition (CRD) | 定义了一种新的 Kubernetes 资源类型,描述了资源的 schema 和行为。 | apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: mysqlclusters.example.com spec: group: example.com versions: - name: v1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: size: type: integer image: type: string scope: Namespaced |
Custom Resource (CR) | 基于 CRD 创建的资源实例,包含了具体的配置信息。 | apiVersion: example.com/v1 kind: MySQLCluster metadata: name: my-mysql-cluster spec: size: 3 image: mysql:8.0 |
Controller | 监听 CR 的变化,并根据 CR 的配置,执行相应的操作,比如创建 Deployment、Service 等。 Controller 本身也是一个运行在 Kubernetes 集群中的 Pod,它使用 Kubernetes API 来管理资源。 | (Go 代码,见下文示例) |
三、手撸一个简单的 MySQL Operator (Go 语言示例)
接下来,咱们来撸起袖子,用 Go 语言写一个简单的 MySQL Operator。
1. 定义 CRD (mysqlclusters.yaml):
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mysqlclusters.example.com
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
size:
type: integer
description: Number of MySQL instances in the cluster.
image:
type: string
description: MySQL image to use.
scope: Namespaced
names:
plural: mysqlclusters
singular: mysqlcluster
kind: MySQLCluster
shortNames:
- mc
这个 CRD 定义了一个名为 MySQLCluster
的资源,它有两个属性:size
(集群大小)和 image
(MySQL 镜像)。
2. 定义 Custom Resource (my-mysql-cluster.yaml):
apiVersion: example.com/v1
kind: MySQLCluster
metadata:
name: my-mysql-cluster
spec:
size: 3
image: mysql:8.0
这个 CR 创建了一个名为 my-mysql-cluster
的 MySQL 集群,包含 3 个节点,使用 mysql:8.0
镜像。
3. 编写 Controller (main.go):
package main
import (
"context"
"fmt"
"os"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/client-go/tools/clientcmd"
examplecomv1 "example.com/mysql-operator/pkg/apis/example.com/v1"
exampleclientset "example.com/mysql-operator/pkg/client/clientset/versioned"
examplescheme "example.com/mysql-operator/pkg/client/clientset/versioned/scheme"
exampleinformers "example.com/mysql-operator/pkg/client/informers/externalversions/example.com/v1"
examplelisters "example.com/mysql-operator/pkg/client/listers/example.com/v1"
)
const controllerAgentName = "mysql-operator"
const (
// SuccessSynced is used as part of the Event 'reason' when a Foo is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a Foo fails
// to sync due to a Deployment of the same name already existing.
ErrResourceExists = "ErrResourceExists"
// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Deployment already existing
MessageResourceExists = "Resource %q already exists and is not managed by Foo"
// MessageResourceSynced is the message used for an Event when a Foo is synced
MessageResourceSynced = "Foo synced successfully"
)
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset exampleclientset.Interface
// foosLister is lister for Foo resources
mysqlclustersLister examplelisters.MySQLClusterLister
// foosSynced is a cache.Controller for Foo resources
mysqlclustersSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never overwhelming the system.
workqueue workqueue.RateLimitingInterface
// recorder event.Recorder
}
// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset exampleclientset.Interface,
mysqlclusterInformer exampleinformers.MySQLClusterInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the registry so they can be serialized
examplescheme.AddToScheme(scheme.Scheme)
//klog.V(4).Info("Setting up event broadcaster")
//eventBroadcaster := record.NewBroadcaster()
//eventBroadcaster.StartStructuredLogging(0)
//recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
mysqlclustersLister: mysqlclusterInformer.Lister(),
mysqlclustersSynced: mysqlclusterInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MySQLClusters"),
//recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up event handlers for when Foo resources change
mysqlclusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueMySQLCluster,
UpdateFunc: func(old, new interface{}) {
controller.enqueueMySQLCluster(new)
},
DeleteFunc: func(obj interface{}) {
controller.enqueueMySQLCluster(obj)
},
})
return controller
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer c.workqueue.ShutDown()
//defer utilruntime.HandleCrash()
// Start the informer factories to begin populating the informer caches
klog.Info("Starting MySQLCluster controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.mysqlclustersSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item to be re-enqueued. For example, we do
// not re-enqueue items that are invalid such as missing the resource
// or too old.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here. We handle the error similarly to how we handle
// errors during the syncHandler.
c.workqueue.Forget(obj)
//utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
klog.Errorf("expected string in workqueue but got %#v", obj)
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurred, we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
//klog.Infof("Successfully synced '%s'", key)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
//utilruntime.HandleError(err)
klog.Error(err)
return true
}
return true
}
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueMySQLCluster(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
//utilruntime.HandleError(err)
klog.Error(err)
return
}
c.workqueue.Add(key)
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
//utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
klog.Error(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
mysqlcluster, err := c.mysqlclustersLister.MySQLClusters(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
//utilruntime.HandleError(fmt.Errorf("foo '%s/%s' in work queue no longer exists", namespace, name))
klog.Error(fmt.Errorf("mysqlcluster '%s/%s' in work queue no longer exists", namespace, name))
return nil
}
//deploymentName := foo.Spec.DeploymentName
deploymentName := mysqlcluster.Name
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
//utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
klog.Error(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
deployment, err := c.kubeclientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
// If the resource doesn't exist, we'll create it
if err != nil && err.Error() == "not found" {
deployment, err = c.constructDeployment(mysqlcluster)
if err != nil {
return err
}
deployment, err = c.kubeclientset.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// transient network problem, or any other external reason.
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and ret
if !metav1.IsControlledBy(deployment, mysqlcluster) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
//c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
klog.Warning(msg)
return fmt.Errorf(msg)
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateMySQLClusterStatus(mysqlcluster, deployment)
if err != nil {
return err
}
//c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
func (c *Controller) constructDeployment(mysqlcluster *examplecomv1.MySQLCluster) (*appsv1.Deployment, error) {
labels := map[string]string{
"app": "mysql",
"controller": mysqlcluster.Name,
}
replicas := int32(mysqlcluster.Spec.Size)
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: mysqlcluster.Name,
Namespace: mysqlcluster.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mysqlcluster, examplecomv1.SchemeGroupVersion.WithKind("MySQLCluster")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "mysql",
Image: mysqlcluster.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: 3306,
},
},
},
},
},
},
},
}
return dep, nil
}
func (c *Controller) updateMySQLClusterStatus(mysqlcluster *examplecomv1.MySQLCluster, deployment *appsv1.Deployment) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a brand new object.
mysqlclusterCopy := mysqlcluster.DeepCopy()
mysqlclusterCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// If the CustomResourceSubresources feature gate is not enabled,
// we must use Update instead of UpdateStatus to update the Status block.
// UpdateStatus will not allow changes to the Spec of the resource,
// which is ideal for ensuring nothing other than status changes are updated.
_, err := c.sampleclientset.ExampleV1().MySQLClusters(mysqlcluster.Namespace).UpdateStatus(context.TODO(), mysqlclusterCopy, metav1.UpdateOptions{})
return err
}
func main() {
klog.InitFlags(nil)
cfg, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
panic(err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
panic(err.Error())
}
exampleClient, err := exampleclientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
panic(err.Error())
}
mysqlClusterInformer := exampleinformers.NewMySQLClusterInformer(
exampleClient,
"",
time.Second*30,
cache.Indexers{},
)
controller := NewController(kubeClient, exampleClient, mysqlClusterInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go mysqlClusterInformer.Run(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
4. 其他代码 (pkg/apis/example.com/v1/types.go):
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MySQLCluster is a specification for a MySQLCluster resource
type MySQLCluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MySQLClusterSpec `json:"spec"`
Status MySQLClusterStatus `json:"status"`
}
// MySQLClusterSpec is the spec for a MySQLCluster resource
type MySQLClusterSpec struct {
Size int32 `json:"size"`
Image string `json:"image"`
}
// MySQLClusterStatus is the status for a MySQLCluster resource
type MySQLClusterStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MySQLClusterList is a list of MySQLCluster resources
type MySQLClusterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []MySQLCluster `json:"items"`
}
5. 其他代码 (pkg/apis/example.com/v1/register.go):
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "example.com"
const GroupVersion = "v1"
var (
SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&MySQLCluster{},
&MySQLClusterList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
6. 其他代码 (pkg/client/clientset/versioned/clientset.go, pkg/client/clientset/versioned/typed/example.com/v1/example.com_client.go, pkg/client/clientset/versioned/typed/example.com/v1/fake/fake_example.com_client.go, pkg/client/clientset/versioned/typed/example.com/v1/fake/fake_mysqlcluster.go, pkg/client/clientset/versioned/typed/example.com/v1/mysqlcluster.go, pkg/client/informers/externalversions/example.com/v1/mysqlcluster.go, pkg/client/listers/example.com/v1/mysqlcluster.go):
这些代码文件是使用 code-generator
工具生成的,用于操作自定义资源。你可以使用以下命令生成这些代码:
# 安装 code-generator
go install k8s.io/code-generator/cmd/deepcopy-gen
go install k8s.io/code-generator/cmd/client-gen
go install k8s.io/code-generator/cmd/informer-gen
go install k8s.io/code-generator/cmd/lister-gen
# 创建代码目录
mkdir -p pkg/apis/example.com/v1
mkdir -p pkg/client
# 生成代码
./vendor/k8s.io/code-generator/generate-groups.sh all example.com/mysql-operator/pkg/client example.com/mysql-operator/pkg/apis example.com:v1 --go-header-file ./hack/custom-boilerplate.go.txt
7. 构建和部署 Operator:
- 构建镜像: 使用 Dockerfile 构建包含 Controller 的镜像。
- 部署 CRD: 使用
kubectl apply -f mysqlclusters.yaml
部署 CRD。 - 部署 Controller: 将 Controller 部署到 Kubernetes 集群中。可以使用 Deployment 或其他方式。
8. 创建 MySQL 集群:
使用 kubectl apply -f my-mysql-cluster.yaml
创建 MySQL 集群。
代码解释:
main.go
是 Controller 的入口点,它负责初始化 Kubernetes 客户端、注册 CRD、创建 Informer 和 Workqueue,并启动 Worker 线程。enqueueMySQLCluster
函数将 MySQLCluster 资源的 Key 添加到 Workqueue 中,触发 Controller 的调谐循环。syncHandler
函数是 Controller 的核心逻辑,它负责根据 MySQLCluster 资源的状态,创建或更新 Deployment。constructDeployment
函数根据 MySQLCluster 资源的配置,生成 Deployment 对象。
四、Operator 的高级特性:备份、恢复、升级
上面只是一个简单的示例,实际的 MySQL Operator 还需要实现更高级的功能,比如:
- 备份: 定期备份 MySQL 数据,并将备份文件存储到云存储或本地存储中。
- 恢复: 从备份文件中恢复 MySQL 数据。
- 升级: 升级 MySQL 版本,并确保数据迁移的正确性。
这些功能可以通过以下方式实现:
- 使用 Kubernetes Job: 创建 Kubernetes Job 来执行备份和恢复任务。
- 使用 StatefulSet: 使用 StatefulSet 来管理 MySQL 集群,可以保证节点顺序和持久化存储。
- 使用 MySQL Shell: 使用 MySQL Shell 提供的 API 来执行备份、恢复和升级操作。
五、总结与展望
今天咱们简单地聊了聊如何在 Kubernetes 环境中使用 Operator 模式管理 MySQL 集群的生命周期。虽然只是冰山一角,但希望能给大家带来一些启发。
未来,Operator 的发展方向将更加智能化和自动化,例如:
- 自动伸缩: 根据 MySQL 集群的负载情况,自动调整节点数量。
- 故障自愈: 自动检测和修复 MySQL 集群的故障。
- 智能调优: 根据 MySQL 集群的运行状态,自动调整配置参数。
希望大家能够多多探索,共同推动 Operator 技术的发展!
温馨提示: 上面的代码只是一个简单的示例,仅供学习参考。在实际生产环境中,还需要考虑更多因素,比如安全性、性能、监控等等。
好了,今天的讲座就到这里,谢谢大家!散会!