从生成CRD到编写自定义控制器教程示例
介绍
我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet、InfORMer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。
CRD定义
首先初始化项目:
$ mkdir operator-crd && cd operator-crd
$ Go mod init operator-crd
$ mkdir -p pkg/apis/example.com/v1
在该文件夹下新建doc.go
文件,内容如下所示:
// +k8s:deepcopy-gen=package
// +groupName=example.com
package v1
根据 CRD 的规范定义,这里我们定义的 group 为example.com
,版本为v1
,在顶部添加了一个代码自动生成的deepcopy-gen
的 tag,为整个包中的类型生成深拷贝方法。
然后就是非常重要的资源对象的结构体定义,新建types.go
文件,types.go内容可以使用type-scaffpld
自动生成,具体文件内容如下:
package v1
import metav1 "k8s.io/apiMachinery/pkg/apis/meta/v1"
// BarSpec defines the desired state of Bar
type BarSpec struct {
// INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
DeploymentName string `JSON:"deploymentName"`
Image string `json:"image"`
Replicas *int32 `json:"replicas"`
}
// BarStatus defines the observed state of Bar.
// It should always be reconstructable from the state of the cluster and/or outside world.
type BarStatus struct {
// INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster
}
// 下面这个一定不能少,少了的话不能生成 lister 和 informer
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Bar is the Schema for the bars API
// +k8s:openapi-gen=true
type Bar struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec BarSpec `json:"spec,omitempty"`
Status BarStatus `json:"status,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// BarList contains a list of Bar
type BarList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Bar `json:"items"`
}
然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 reGISter.go 文件,内容如下所示:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
// 添加 Bar 与 BarList这两个资源到 scheme
scheme.AddKnownTypes(SchemeGroupVersion,
&Bar{},
&BarList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
使用controller-gen
生成crd:
$ controller-gen crd paths=./... output:crd:dir=crd
生成example.com_bars.yaml文件如下所示:
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
creationTimestamp: null
name: bars.example.com
spec:
group: example.com
names:
kind: Bar
listKind: BarList
plural: bars
singular: bar
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
description: Bar is the Schema for the bars API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/commUnity/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: Https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: BarSpec defines the desired state of Bar
properties:
deploymentName:
description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
type: string
image:
type: string
replicas:
format: int32
type: integer
required:
- deploymentName
- image
- replicas
type: object
status:
description: BarStatus defines the observed state of Bar. It should always
be reconstructable from the state of the cluster and/or outside world.
type: object
type: object
served: true
storage: true
最终项目结构如下所示:
$ tree
.
├── crd
│ └── example.com_bars.yaml
├── go.mod
├── go.sum
└── pkg
└── apis
└── example.com
└── v1
├── doc.go
├── register.go
└── types.go
5 directories, 6 files
生成客户端相关代码
上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。
首先创建生成代码的脚本,下面这些脚本均来源于sample-controller提供的示例:
$ mkdir hack && cd hack
在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:
// +build tools
// 建立 tools.go 来依赖 code-generator
// 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包.
package tools
import _ "k8s.io/code-generator"
然后新建 update-codegen.sh 脚本,用来配置代码生成的脚本:
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 \
--output-base "${SCRIPT_ROOT}"/../ \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
# --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
同样还有 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"
cleanup() {
rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT
cleanup
mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"
接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:
$ go mod vendor
然后执行脚本生成代码:
$ chmod +x ./hack/update-codegen.sh
$./hack/update-codegen.sh
Generating deepcopy funcs
Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset
Generating listers for example.com:v1 at operator-crd/pkg/client/listers
Generating informers for example.com:v1 at operator-crd/pkg/client/informers
代码生成后,整个项目的 pkg 包变成了下面的样子:
$ tree pkg
pkg
├── apis
│ └── example.com
│ └── v1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
└── client
├── clientset
│ └── versioned
│ ├── clientset.go
│ ├── doc.go
│ ├── fake
│ │ ├── clientset_generated.go
│ │ ├── doc.go
│ │ └── register.go
│ ├── scheme
│ │ ├── doc.go
│ │ └── register.go
│ └── typed
│ └── example.com
│ └── v1
│ ├── bar.go
│ ├── doc.go
│ ├── example.com_client.go
│ ├── fake
│ │ ├── doc.go
│ │ ├── fake_bar.go
│ │ └── fake_example.com_client.go
│ └── generated_expansion.go
├── informers
│ └── externalversions
│ ├── example.com
│ │ ├── interface.go
│ │ └── v1
│ │ ├── bar.go
│ │ └── interface.go
│ ├── factory.go
│ ├── generic.go
│ └── internalinterfaces
│ └── factory_interfaces.go
└── listers
└── example.com
└── v1
├── bar.go
└── expansion_generated.go
20 directories, 26 files
仔细观察可以发现pkg/apis/example.com/v1目录下面多了一个zz_generated.deepcopy.go文件,在pkg/client文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 Bar 进行 List 和 Watch 操作了。
编写控制器
首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件。
package main
import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clientset "operator-crd/pkg/client/clientset/versioned"
"operator-crd/pkg/client/informers/externalversions"
"time"
"os"
"os/signal"
"syscall"
)
var (
onlyOneSignalHandler = make(chan struct{})
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)
// 注册 SIGTERM 和 SIGINT 信号
// 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭
// 如果捕获到第二个信号,程序直接退出
func setupSignalHandler() (stopCh <-chan struct{}) {
// 当调用两次的时候 panics
close(onlyOneSignalHandler)
stop := make(chan struct{})
c := make(chan os.Signal, 2)
// Notify 函数让 signal 包将输入信号转发到c
// 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // 第二个信号直接退出
}()
return stop
}
func main() {
stopCh := setupSignalHandler()
// 获取config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
klog.Fatalln(err)
}
// 通过config构建clientSet
// 这里的clientSet 是 Bar 的
clientSet, err := clientset.NewForConfig(config)
if err != nil {
klog.Fatalln(err)
}
// informerFactory 工厂类, 这里注入我们通过代码生成的 client
// client 主要用于和 API Server 进行通信,实现 ListAndWatch
factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30)
// 实例化自定义控制器
controller := NewController(factory.Example().V1().Bars())
// 启动 informer,开始list 和 watch
go factory.Start(stopCh)
// 启动控制器
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
首先初始化一个用于访问 Bar 资源的 ClientSet 对象,然后同样新建一个 Bar 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 Bar 的状态进行调谐。
在项目根目录下新建controller.go文件,内容如下所示:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
v1 "operator-crd/pkg/apis/example.com/v1"
"time"
informers "operator-crd/pkg/client/informers/externalversions/example.com/v1"
)
type Controller struct {
informer informers.BarInformer
workqueue workqueue.RateLimitingInterface
}
func NewController(informer informers.BarInformer) *Controller {
controller := &Controller{
informer: informer,
// WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"),
}
klog.Info("Setting up Bar event handlers")
// informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
// 分别对应 API 对象的“添加”“更新”和“删除”事件。
// 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addBar,
UpdateFunc: controller.updateBar,
DeleteFunc: controller.deleteBar,
})
return controller
}
func (c *Controller) Run(thread int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShuttingDown()
// 记录开始日志
klog.Info("Starting Bar control loop")
klog.Info("Waiting for informer caches to sync")
// 等待缓存同步数据
if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
return fmt.Errorf("failed to wati for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < thread; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从 workqueue读取消息
func (c *Controller) runWorker() {
for c.processNExtWorkItem() {
}
}
// 从workqueue读取和读取消息
func (c *Controller) processNExtWorkItem() bool {
// 获取 item
item, shutdown := c.workqueue.Get()
if shutdown {
return false
}
if err := func(item interface{}) error {
// 标记以及处理
defer c.workqueue.Done(item)
var key string
var ok bool
if key, ok = item.(string); !ok {
// 判读key的类型不是字符串,则直接丢弃
c.workqueue.Forget(item)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item))
return nil
}
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s':%s", item, err.Error())
}
c.workqueue.Forget(item)
return nil
}(item); err != nil {
runtime.HandleError(err)
return false
}
return true
}
// 尝试从 Informer 维护的缓存中拿到了它所对应的 Bar 对象
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key))
return err
}
bar, err := c.informer.Lister().Bars(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// 说明是在删除事件中添加进来的
return nil
}
runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name))
return err
}
fmt.Printf("[BarCRD] try to process bar:%#v ...", bar)
// 可以根据bar来做其他的事。
// todo
return nil
}
func (c *Controller) addBar(item interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(item); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
func (c *Controller) deleteBar(item interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil {
runtime.HandleError(err)
return
}
fmt.Println("delete crd")
c.workqueue.AddRateLimited(key)
}
func (c *Controller) updateBar(old, new interface{}) {
oldItem := old.(*v1.Bar)
newItem := new.(*v1.Bar)
// 比较两个资源版本,如果相同,则不处理
if oldItem.ResourceVersion == newItem.ResourceVersion {
return
}
c.workqueue.AddRateLimited(new)
}
我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。
同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。
到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。
测试
接下来我们直接运行我们的main函数:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108 39032 controller.go:55] Starting workers
I0512 16:51:34.023153 39032 controller.go:60] Started workers
现在我们创建一个Bar资源对象:
# bar.yaml
apiVersion: example.com/v1
kind: Bar
metadata:
name: bar-demo
namespace: default
spec:
image: "Nginx:1.17.1"
deploymentName: example-bar
replicas: 2
直接创建上面的对象,注意观察控制器的日志:
I0512 16:51:33.922138 39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255 39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258 39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108 39032 controller.go:55] Starting workers
I0512 16:51:34.023153 39032 controller.go:60] Started workers
[BarCRD] try to process bar:"bar-demo" ...
可以看到,我们上面创建 bar.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 bar 对象的日志信息。
同样我们删除这个资源的时候,也会有对应的提示。
这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了,更多关于CRD生成自定义控制器的资料请关注其它相关文章!
相关文章