从生成CRD到编写自定义控制器教程示例

2022-11-13 08:11:08 示例 自定义 控制器

介绍

我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet、InfORMer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。

CRD定义

首先初始化项目

$ mkdir operator-crd && cd operator-crd
$ Go mod init operator-crd
$ mkdir -p pkg/apis/example.com/v1

在该文件夹下新建doc.go文件,内容如下所示:

// +k8s:deepcopy-gen=package
// +groupName=example.com
package v1

根据 CRD 的规范定义,这里我们定义的 group 为example.com,版本为v1,在顶部添加了一个代码自动生成的deepcopy-gen的 tag,为整个包中的类型生成深拷贝方法。

然后就是非常重要的资源对象的结构体定义,新建types.go文件,types.go内容可以使用type-scaffpld自动生成,具体文件内容如下:

package v1
import metav1 "k8s.io/apiMachinery/pkg/apis/meta/v1"
// BarSpec defines the desired state of Bar
type BarSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
    DeploymentName string `JSON:"deploymentName"`
    Image          string `json:"image"`
    Replicas       *int32 `json:"replicas"`
}
// BarStatus defines the observed state of Bar.
// It should always be reconstructable from the state of the cluster and/or outside world.
type BarStatus struct {
    // INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster
}
// 下面这个一定不能少,少了的话不能生成 lister 和 informer
// +genclient  
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Bar is the Schema for the bars API
// +k8s:openapi-gen=true
type Bar struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec   BarSpec   `json:"spec,omitempty"`
    Status BarStatus `json:"status,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// BarList contains a list of Bar
type BarList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []Bar `json:"items"`
}

然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 reGISter.go 文件,内容如下所示:

package v1
import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
    // SchemeBuilder initializes a scheme builder
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    // AddToScheme is a global function that registers this API group & version to a scheme
    AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
    // 添加 Bar 与 BarList这两个资源到 scheme
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Bar{},
        &BarList{},
    )
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

使用controller-gen生成crd:

$ controller-gen crd paths=./... output:crd:dir=crd

生成example.com_bars.yaml文件如下所示:

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: (devel)
  creationTimestamp: null
  name: bars.example.com
spec:
  group: example.com
  names:
    kind: Bar
    listKind: BarList
    plural: bars
    singular: bar
  scope: Namespaced
  versions:
  - name: v1
    schema:
      openAPIV3Schema:
        description: Bar is the Schema for the bars API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/commUnity/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: Https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: BarSpec defines the desired state of Bar
            properties:
              deploymentName:
                description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
                type: string
              image:
                type: string
              replicas:
                format: int32
                type: integer
            required:
            - deploymentName
            - image
            - replicas
            type: object
          status:
            description: BarStatus defines the observed state of Bar. It should always
              be reconstructable from the state of the cluster and/or outside world.
            type: object
        type: object
    served: true
    storage: true

最终项目结构如下所示:

$ tree
.
├── crd
│   └── example.com_bars.yaml
├── go.mod
├── go.sum
└── pkg
    └── apis
        └── example.com
            └── v1
                ├── doc.go
                ├── register.go
                └── types.go
5 directories, 6 files

生成客户端相关代码

上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。

首先创建生成代码的脚本,下面这些脚本均来源于sample-controller提供的示例:

$ mkdir hack && cd hack

在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:

// +build tools
// 建立 tools.go 来依赖 code-generator
// 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包.
package tools
import _ "k8s.io/code-generator"

然后新建 update-codegen.sh 脚本,用来配置代码生成的脚本:

#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
  operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 \
  --output-base "${SCRIPT_ROOT}"/../ \
  --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
#   --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

同样还有 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:

#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"
cleanup() {
  rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT
cleanup
mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"

接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:

$ go mod vendor

然后执行脚本生成代码:

$ chmod +x ./hack/update-codegen.sh
$./hack/update-codegen.sh 
Generating deepcopy funcs
Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset
Generating listers for example.com:v1 at operator-crd/pkg/client/listers
Generating informers for example.com:v1 at operator-crd/pkg/client/informers

代码生成后,整个项目的 pkg 包变成了下面的样子:

$  tree pkg                
pkg
├── apis
│   └── example.com
│       └── v1
│           ├── doc.go
│           ├── register.go
│           ├── types.go
│           └── zz_generated.deepcopy.go
└── client
    ├── clientset
    │   └── versioned
    │       ├── clientset.go
    │       ├── doc.go
    │       ├── fake
    │       │   ├── clientset_generated.go
    │       │   ├── doc.go
    │       │   └── register.go
    │       ├── scheme
    │       │   ├── doc.go
    │       │   └── register.go
    │       └── typed
    │           └── example.com
    │               └── v1
    │                   ├── bar.go
    │                   ├── doc.go
    │                   ├── example.com_client.go
    │                   ├── fake
    │                   │   ├── doc.go
    │                   │   ├── fake_bar.go
    │                   │   └── fake_example.com_client.go
    │                   └── generated_expansion.go
    ├── informers
    │   └── externalversions
    │       ├── example.com
    │       │   ├── interface.go
    │       │   └── v1
    │       │       ├── bar.go
    │       │       └── interface.go
    │       ├── factory.go
    │       ├── generic.go
    │       └── internalinterfaces
    │           └── factory_interfaces.go
    └── listers
        └── example.com
            └── v1
                ├── bar.go
                └── expansion_generated.go
20 directories, 26 files

仔细观察可以发现pkg/apis/example.com/v1目录下面多了一个zz_generated.deepcopy.go文件,在pkg/client文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 Bar 进行 List 和 Watch 操作了。

编写控制器

首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件。

package main
import (
    "k8s.io/client-go/tools/clientcmd"
   	"k8s.io/klog/v2"
    clientset "operator-crd/pkg/client/clientset/versioned"
    "operator-crd/pkg/client/informers/externalversions"
    "time"
    "os"
    "os/signal"
    "syscall"
)
var (
    onlyOneSignalHandler = make(chan struct{})
    shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
)
// 注册 SIGTERM 和 SIGINT 信号
// 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭
// 如果捕获到第二个信号,程序直接退出
func setupSignalHandler() (stopCh <-chan struct{}) {
    // 当调用两次的时候 panics
    close(onlyOneSignalHandler)
    stop := make(chan struct{})
    c := make(chan os.Signal, 2)
    // Notify 函数让 signal 包将输入信号转发到c
    // 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号
    signal.Notify(c, shutdownSignals...)
    go func() {
        <-c
        close(stop)
        <-c
        os.Exit(1) // 第二个信号直接退出
    }()
    return stop
}
func main() {
    stopCh := setupSignalHandler()
    // 获取config
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        klog.Fatalln(err)
    }
    // 通过config构建clientSet
    // 这里的clientSet 是 Bar 的
    clientSet, err := clientset.NewForConfig(config)
    if err != nil {
        klog.Fatalln(err)
    }
    // informerFactory 工厂类, 这里注入我们通过代码生成的 client
    // client 主要用于和 API Server 进行通信,实现 ListAndWatch
    factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30)
    // 实例化自定义控制器
    controller := NewController(factory.Example().V1().Bars())
    // 启动 informer,开始list 和 watch
    go factory.Start(stopCh)
    // 启动控制器
    if err = controller.Run(2, stopCh); err != nil {
        klog.Fatalf("Error running controller: %s", err.Error())
    }
}

首先初始化一个用于访问 Bar 资源的 ClientSet 对象,然后同样新建一个 Bar 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 Bar 的状态进行调谐。

在项目根目录下新建controller.go文件,内容如下所示:

package main
import (
    "fmt"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"
    v1 "operator-crd/pkg/apis/example.com/v1"
    "time"
    informers "operator-crd/pkg/client/informers/externalversions/example.com/v1"
)
type Controller struct {
    informer  informers.BarInformer
    workqueue workqueue.RateLimitingInterface
}
func NewController(informer informers.BarInformer) *Controller {
    controller := &Controller{
        informer: informer,
        // WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
        workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"),
    }
    klog.Info("Setting up Bar event handlers")
    // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
    // 分别对应 API 对象的“添加”“更新”和“删除”事件。
    // 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
    informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    controller.addBar,
        UpdateFunc: controller.updateBar,
        DeleteFunc: controller.deleteBar,
    })
    return controller
}
func (c *Controller) Run(thread int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShuttingDown()
    // 记录开始日志
    klog.Info("Starting Bar control loop")
    klog.Info("Waiting for informer caches to sync")
    // 等待缓存同步数据
    if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
        return fmt.Errorf("failed to wati for caches to sync")
    }
    klog.Info("Starting workers")
    for i := 0; i < thread; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    klog.Info("Started workers")
    <-stopCh
    klog.Info("Shutting down workers")
    return nil
}
// runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从 workqueue读取消息
func (c *Controller) runWorker() {
    for c.processNExtWorkItem() {
    }
}
// 从workqueue读取和读取消息
func (c *Controller) processNExtWorkItem() bool {
    // 获取 item
    item, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    if err := func(item interface{}) error {
        // 标记以及处理
        defer c.workqueue.Done(item)
        var key string
        var ok bool
        if key, ok = item.(string); !ok {
            // 判读key的类型不是字符串,则直接丢弃
            c.workqueue.Forget(item)
            runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            return fmt.Errorf("error syncing '%s':%s", item, err.Error())
        }
        c.workqueue.Forget(item)
        return nil
    }(item); err != nil {
        runtime.HandleError(err)
        return false
    }
    return true
}
// 尝试从 Informer 维护的缓存中拿到了它所对应的 Bar 对象
func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key))
        return err
    }
    bar, err := c.informer.Lister().Bars(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            // 说明是在删除事件中添加进来的
            return nil
        }
        runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name))
        return err
    }
    fmt.Printf("[BarCRD] try to process bar:%#v ...", bar)
    // 可以根据bar来做其他的事。
    // todo
    return nil
}
func (c *Controller) addBar(item interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(item); err != nil {
        runtime.HandleError(err)
        return
    }
    c.workqueue.AddRateLimited(key)
}
func (c *Controller) deleteBar(item interface{}) {
    var key string
    var err error
    if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil {
        runtime.HandleError(err)
        return
    }
    fmt.Println("delete crd")
    c.workqueue.AddRateLimited(key)
}
func (c *Controller) updateBar(old, new interface{}) {
    oldItem := old.(*v1.Bar)
    newItem := new.(*v1.Bar)
    // 比较两个资源版本,如果相同,则不处理
    if oldItem.ResourceVersion == newItem.ResourceVersion {
        return
    }
    c.workqueue.AddRateLimited(new)
}

我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。

同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。

到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。

测试

接下来我们直接运行我们的main函数:

I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108   39032 controller.go:55] Starting workers
I0512 16:51:34.023153   39032 controller.go:60] Started workers

现在我们创建一个Bar资源对象:

# bar.yaml
apiVersion: example.com/v1
kind: Bar
metadata:
  name: bar-demo
  namespace: default
spec:
  image: "Nginx:1.17.1"
  deploymentName: example-bar
  replicas: 2

直接创建上面的对象,注意观察控制器的日志:

I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108   39032 controller.go:55] Starting workers
I0512 16:51:34.023153   39032 controller.go:60] Started workers
[BarCRD] try to process bar:"bar-demo" ...

可以看到,我们上面创建 bar.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 bar 对象的日志信息。

同样我们删除这个资源的时候,也会有对应的提示。

这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了,更多关于CRD生成自定义控制器的资料请关注其它相关文章!

相关文章