程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 客户端-controller client 介绍

发表于 2023-05-12 | 分类于 Kubernetes | 0 | 阅读次数 2373

前言

针对下面这张图,上半部分是 client-go 中 Informer 相关的实现,下半部分是常见 Controller 相关的实现方式。本篇主要介绍下半部分常见 Controller 的实现,以及常用 Controller 框架的使用。

Controller 实现

在使用 Informer 的时候,需要注册事件回调函数。在 Informer 接收到事件之后会调用注册的回调函数,比如针对 ADD 事件则调用的是 AddFunc() 函数。

// 初始化 client
clientset, _ := kubernetes.NewForConfig(config)

stopper := make(chan struct{})
defer close(stopper)

// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()

// 启动 informer,list & watch
go factory.Start(stopper)

// 从 apiserver 同步资源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
  runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  return
}

// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc:    onAdd,
  UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") },
  DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})

如果事件回调函数(AddFunc、UpdateFunc 和 DeleteFunc 等函数)中的逻辑,都是操作 Workqueue,比如将事件入队。同时,有一个循环不断从 Workqueue 中取出事件,然后进一步处理。此时,可以发现这整个流程就对应了图中的下半部分,也就是 Controller 的核心流程。

对于 Controller 的实现例子,可以参考 Kubernetes CRD 自定义控制器,相当于自己手撸实现了一个 Controller。除此之外,现在很多包也都提供了开箱即用的 Controller 框架,基于框架可以快速构建一个 Controller。

controller-runtime 框架介绍

使用 controller-runtime 框架实现 controller,有两种使用方式,如下所示。下述代码,是从 controller-runtime 源代码中 example 中截取的:

func main() {
	entryLog := log.Log.WithName("entrypoint")

	// Setup a Manager
	mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
	if err != nil {
		entryLog.Error(err, "unable to set up overall controller manager")
		os.Exit(1)
	}

	// Setup a new controller to reconcile ReplicaSets
	c, err := controller.New("foo-controller", mgr, controller.Options{
		Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
	})
	if err != nil {
		entryLog.Error(err, "unable to set up individual controller")
		os.Exit(1)
	}

	// Watch ReplicaSets and enqueue ReplicaSet object key
	if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
		entryLog.Error(err, "unable to watch ReplicaSets")
		os.Exit(1)
	}

	// Watch Pods and enqueue owning ReplicaSet key
	if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
		handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
		entryLog.Error(err, "unable to watch Pods")
		os.Exit(1)
	}

	if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
		entryLog.Error(err, "unable to run manager")
		os.Exit(1)
	}
}
var (
	setupLog = ctrl.Log.WithName("setup")
)

type reconciler struct {
	client.Client
	scheme *runtime.Scheme
}

func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	...
	return ctrl.Result{}, nil
}

func main() {
	ctrl.SetLogger(zap.New())

	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	// in a real controller, we'd create a new scheme for this
	err = api.AddToScheme(mgr.GetScheme())
	if err != nil {
		setupLog.Error(err, "unable to add scheme")
		os.Exit(1)
	}

	err = ctrl.NewControllerManagedBy(mgr).
		For(&api.ChaosPod{}).
		Owns(&corev1.Pod{}).
		Complete(&reconciler{
			Client: mgr.GetClient(),
			scheme: mgr.GetScheme(),
		})
	if err != nil {
		setupLog.Error(err, "unable to create controller")
		os.Exit(1)
	}

	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

下面介绍 controller-runtime 的实现,主要包含 controller 和 manager 的实现,

controller 的整体实现

Controller 结构体的主要内容如下所示,

// pkg/internal/controller/controller.go

type Controller struct {
	// Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
	Name string

	// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
	MaxConcurrentReconciles int

	// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
	// ensures that the state of the system matches the state specified in the object.
	// Defaults to the DefaultReconcileFunc.
	Do reconcile.Reconciler

	// MakeQueue constructs the queue for this controller once the controller is ready to start.
	// This exists because the standard Kubernetes workqueues start themselves immediately, which
	// leads to goroutine leaks if something calls controller.New repeatedly.
	MakeQueue func() workqueue.RateLimitingInterface

	// Queue is an listeningQueue that listens for events from Informers and adds object keys to
	// the Queue for processing
	Queue workqueue.RateLimitingInterface
  
  ...
  
	// Started is true if the Controller has been Started
	Started bool
	
  ...
  
	// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
	startWatches []watchDescription
	
  ...
}

Controller 结构体实现了 Controller interface

// pkg/controller/controller.go
type Controller interface {
	// Reconciler is called to reconcile an object by Namespace/Name
	reconcile.Reconciler

	// Watch takes events provided by a Source and uses the EventHandler to
	// enqueue reconcile.Requests in response to the events.
	//
	// Watch may be provided one or more Predicates to filter events before
	// they are given to the EventHandler.  Events will be passed to the
	// EventHandler if all provided Predicates evaluate to true.
	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error

	// Start starts the controller.  Start blocks until the context is closed or a
	// controller has an error starting.
	Start(ctx context.Context) error

	// GetLogger returns this controller logger prefilled with basic information.
	GetLogger() logr.Logger
}

controller.New() 函数

可以看到 NewUnmanaged 函数会真正地实例化 Controller。Controller 实例化完成后,会通过 mgr.Add(c) 函数将 Controller 添加到 Manager 中进行管理。

// 使用示例
c, err := controller.New("foo-controller", mgr, controller.Options{
  Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// Add the controller as a Manager components
	return c, mgr.Add(c)
}

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}


	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	if options.RecoverPanic == nil {
		options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
	}

	// Create controller with dependencies set
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		Name:                    name,
		LogConstructor:          options.LogConstructor,
		RecoverPanic:            options.RecoverPanic,
		LeaderElected:           options.NeedLeaderElection,
	}, nil
}

Add 函数传递的是一个 Runnable 参数,Runnable 是一个接口,用来表示可以启动的一个组件,而 Controller 就实现了这个接口的 Start 函数,也就是实现了这个 Runnable 接口。Add 函数会最终调用 runnable 的 Add 函数,根据 Runnable 来判断组件是否支持选举功能,支持则加入到 LeaderElection group 中,否则加入到 Others group 中。

// pkg/manager/internal.go
func (cm *controllerManager) Add(r Runnable) error {
	cm.Lock()
	defer cm.Unlock()
	return cm.add(r)
}

func (cm *controllerManager) add(r Runnable) error {
	return cm.runnables.Add(r)
}

// pkg/manager/runnable_group.go
func (r *runnables) Add(fn Runnable) error {
	switch runnable := fn.(type) {
	case hasCache:
		return r.Caches.Add(fn, func(ctx context.Context) bool {
			return runnable.GetCache().WaitForCacheSync(ctx)
		})
	case *webhook.Server:
		return r.Webhooks.Add(fn, nil)
	case LeaderElectionRunnable:
		if !runnable.NeedLeaderElection() {
			return r.Others.Add(fn, nil)
		}
		return r.LeaderElection.Add(fn, nil)
	default:
		return r.LeaderElection.Add(fn, nil)
	}
}

Watch() 函数

Watch() 函数的主要流程是:

  • 根据要 listen 的资源、event 处理函数、event 过滤函数封装成 watchDescription 结构体,并且添加到 c.startWatches 中。watchDescription 结构体是对 controller 中 informer 的抽象描述。
  • 之后调用 source.Source 的 Start() 函数,在下面的例子中,其实是调用 Kind 实现的 Start() 函数。该 Start() 函数的主要流程,就是通过 manager 的 Cache 获取相应的 Informer,然后往 Informer 中添加相应的 event 处理函数。
// Watch 函数的使用,以 controller-runtime 的 example 代码为例
c, err := controller.New("foo-controller", mgr, controller.Options{
  Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
  entryLog.Error(err, "unable to watch ReplicaSets")
  os.Exit(1)
}


// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	// Controller hasn't started yet, store the watches locally and return.
	// These watches are going to be held on the controller struct until the manager or user calls Start(...).
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

// pkg/internal/source/kind.go
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error {
  ...
	go func() {
		var (
			i       cache.Informer
			lastErr error
		)

		// Tries to get an informer until it returns true,
		// an error or the specified context is cancelled or expired.
		if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
			// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
			i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
			...
		}

		_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
		if err != nil {
			ks.started <- err
			return
		}
		if !ks.Cache.WaitForCacheSync(ctx) {
			// Would be great to return something more informative here
			ks.started <- errors.New("cache did not sync")
		}
		close(ks.started)
	}()

	return nil
}

那么,相应的 event 处理函数如下所示,EventHandler 会将 handler.EventHandler、predicate.Predicate 等都进行封装,最终返回符合 Informer 要求的 event 处理函数(OnAdd、OnUpdate、OnDelete 等函数)。针对 event 处理函数来说,这些函数的处理流程基本上为:

  • 获取 Informer 中获取到的 event 对应的 object 对象。
  • 使用 predicate.Predicate 中的过滤函数进行过滤。
  • 调用 handler.EventHandler 的 handler 函数处理,生成 reconcile.Request 对象,并放入 controller 的 queue 中。

// pkg/internal/source/event_handler.go
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
	return &EventHandler{
		ctx:        ctx,
		handler:    handler,
		queue:      queue,
		predicates: predicates,
	}
}

// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
	ctx context.Context

	handler    handler.EventHandler
	queue      workqueue.RateLimitingInterface
	predicates []predicate.Predicate
}

func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
	return cache.ResourceEventHandlerFuncs{
		AddFunc:    e.OnAdd,
		UpdateFunc: e.OnUpdate,
		DeleteFunc: e.OnDelete,
	}
}

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {
	c := event.CreateEvent{}

	// Pull Object out of the object
	if o, ok := obj.(client.Object); ok {
		c.Object = o
	} else {
		log.Error(nil, "OnAdd missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	for _, p := range e.predicates {
		if !p.Create(c) {
			return
		}
	}

	// Invoke create handler
	ctx, cancel := context.WithCancel(e.ctx)
	defer cancel()
	e.handler.Create(ctx, c, e.queue)
}

// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
	u := event.UpdateEvent{}

	...
	for _, p := range e.predicates {
		if !p.Update(u) {
			return
		}
	}

  ...
	e.handler.Update(ctx, u, e.queue)
}

// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e *EventHandler) OnDelete(obj interface{}) {
	d := event.DeleteEvent{}
	
  ...
	for _, p := range e.predicates {
		if !p.Delete(d) {
			return
		}
	}

	...
	e.handler.Delete(ctx, d, e.queue)
}

handler.EventHandler 的 handler 函数如下所示,主要功能就是往 queue 中添加相应的 reconcile.Request 对象。这个 queue 对象,会在 Controller 的 Start() 函数中被创建。而 queue 中的 reconcile.Request 对象,会被 controller 消费。

// pkg/handler/enqueue.go
type EnqueueRequestForObject struct{}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
		return
	}
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
	switch {
	case evt.ObjectNew != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectNew.GetName(),
			Namespace: evt.ObjectNew.GetNamespace(),
		}})
	case evt.ObjectOld != nil:
		q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
			Name:      evt.ObjectOld.GetName(),
			Namespace: evt.ObjectOld.GetNamespace(),
		}})
	default:
		enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
	}
}

// Delete implements EventHandler.
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
		return
	}
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

综上所述,Watch() 函数的主要作用就是:创建 Informer,并往 Informer 中添加相应的 informer event 处理函数。而这些 event 处理函数的主要流程是获取 informer event 对应的 object 对象,然后调用相应的过滤函数进行过滤,之后调用 handler 创建 reconcile.Request 对象,并将该对象放入 controller 的 queue 中。

Start() 函数

Start() 函数的主要流程是:

  • 创建 controller 的 queue,这个 queue 将会存放 event 处理函数创建的 reconcile.Request 对象。
  • 根据 c.MaxConcurrentReconciles,启动相应数量的 goroutine,每个 goroutine 都会调用 c.processNextWorkItem 函数。该函数的主要作用就是从 queue 中获取 reconcile.Request 对象,然后调用用户编写的 controller 的 Reconcile 函数,比如 reconcileReplicaSet 的 Reconcile 函数。
// reconcileReplicaSet 实现了相应的 Reconcile 函数,也就是 Reconciler 接口
type reconcileReplicaSet struct {
	client client.Client
}

func (r *reconcileReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	log := log.FromContext(ctx)
}

c, err := controller.New("foo-controller", mgr, controller.Options{
  Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})



// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.Queue = c.MakeQueue()

	wg := &sync.WaitGroup{}
	err := func() error {
		...
		for _, watch := range c.startWatches {
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		c.startWatches = nil

		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
  
	...
	return nil
}

processNextWorkItem 的处理流程如下所示:

  • 从 queue 中获取相应的 obj 对象,然后调用 reconcileHandler 方法。
  • 该方法会调用 controller 自己的 Reconcile 函数(不是用户编写的 Reconcile 函数)。同时,根据 Reconcile 函数的返回,做进一步的处理,比如重新加入队列、过一段时间之后加入队列、从队列中移掉。
  • controller 自身的 Reconcile 函数会调用 Do 对应的 Reconcile 函数,这个 Reconcile 是用户编写的,包含用户自身的处理逻辑。
// pkg/internal/controller/controller.go
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	obj, shutdown := c.Queue.Get()
	if shutdown {
		// Stop working
		return false
	}

	c.reconcileHandler(ctx, obj)
	return true
}

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
	req, ok := obj.(reconcile.Request)
	if !ok {
		c.Queue.Forget(obj)
		return
	}

	result, err := c.Reconcile(ctx, req)
	switch {
	case err != nil:
		c.Queue.AddRateLimited(req)
	case result.RequeueAfter > 0:
		c.Queue.Forget(obj)
		c.Queue.AddAfter(req, result.RequeueAfter)
	case result.Requeue:
		c.Queue.AddRateLimited(req)
	default:
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.Queue.Forget(obj)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
	}
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
	...
	return c.Do.Reconcile(ctx, req)
}

综上所述,Start() 函数的主要作用是:

  • 创建 controller 的 queue(当产生相应的 event 之后,informer 的 event 处理函数,会生成 reconcile.Request 对象并放入到 queue 中)。
  • 启动一定数量的 goroutine,每个 goroutine 都会从 queue 中获取相应的 reconcile.Request 对象,并调用用户自己编写的 Reconcile 函数进行处理。

manager 的整体实现

controller-runtime 还提供 Manager,来管理 Controller。除了管理 Controller 之外,Manager 还可以:

  • 管理 Admission Webhook。

  • 提供统一的 client,访问 kubernetes 资源对象。使用该 client 读 kubernetes 资源对象的话,会先访问 manager 的缓存;如果是写 kubernetes 资源对象的话,则会直接访问 API Server。

  • 提供统一的资源对象的 scheme。

  • 提供统一的 cache,这个 cache 会在 controller 需要 informer 某些资源的时候,创建对应的 Informer 监听相应的资源对象,实现相应资源对象的缓存。

Manager 使用

上述提到,我们使用 controller-runtime 实现自己的 controller 有两种方式,但是上述两种方式都会用到 manager,但是更推荐使用后者的方式,也就是下面的使用方式(代码来自于 controller-runtime 仓库的 example 代码):

  • 实例化 manager,参数 config
  • 向 manager 添加 scheme
  • 向 manager 添加 controller,该 controller 包含一个 reconciler 结构体,我们需要在 reconciler 结构体实现逻辑处理
  • 向 manager 添加 webhook,同样需要实现逻辑处理
  • 启动 manager.start()
package main

import (
	...
)

var (
	setupLog = ctrl.Log.WithName("setup")
)

type reconciler struct {
	client.Client
	scheme *runtime.Scheme
}

func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	...
	return ctrl.Result{}, nil
}

func main() {
	ctrl.SetLogger(zap.New())

	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	// in a real controller, we'd create a new scheme for this
	err = api.AddToScheme(mgr.GetScheme())
	if err != nil {
		setupLog.Error(err, "unable to add scheme")
		os.Exit(1)
	}

	err = ctrl.NewControllerManagedBy(mgr).
		For(&api.ChaosPod{}).
		Owns(&corev1.Pod{}).
		Complete(&reconciler{
			Client: mgr.GetClient(),
			scheme: mgr.GetScheme(),
		})
	if err != nil {
		setupLog.Error(err, "unable to create controller")
		os.Exit(1)
	}

	err = ctrl.NewWebhookManagedBy(mgr).
		For(&api.ChaosPod{}).
		Complete()
	if err != nil {
		setupLog.Error(err, "unable to create webhook")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

Manager 实例化

Manager 实例化最后返回的是一个 controllerManager 的实例。该结构体是 Manager 接口的一个实现。

  • Manager 可以管理 Runnable 的生命周期(添加/启动),Controller 和 Admission Webhook 都实现了 Runable 接口,也就是 Start 方法。
  • Manager 还支持领导人选举。Leader 选举的意思是如果这个 manager 启动了两个副本,那么只有一个 manager 中的 controller 才会处理相应的请求。简单点说,如果一个 manager 成为了 leader,那么该 leader 才会启动那些需要 leader 选举的 controller。
// Manager 实例化,最终会调用 manager.New 函数
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
NewManager = manager.New
// pkg/manager/manager.go
func New(config *rest.Config, options Options) (Manager, error) {
	options = setOptionsDefaults(options)

	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		...
	})
  
  ...

	errChan := make(chan error)
	runnables := newRunnables(options.BaseContext, errChan)

	return &controllerManager{
		stopProcedureEngaged:          pointer.Int64(0),
		cluster:                       cluster,
		runnables:                     runnables,
		errChan:                       errChan,
		recorderProvider:              recorderProvider,
		resourceLock:                  resourceLock,
		metricsListener:               metricsListener,
		metricsExtraHandlers:          metricsExtraHandlers,
		controllerConfig:              options.Controller,
		logger:                        options.Logger,
		elected:                       make(chan struct{}),
		webhookServer:                 options.WebhookServer,
		leaderElectionID:              options.LeaderElectionID,
		leaseDuration:                 *options.LeaseDuration,
		renewDeadline:                 *options.RenewDeadline,
		retryPeriod:                   *options.RetryPeriod,
		healthProbeListener:           healthProbeListener,
		readinessEndpointName:         options.ReadinessEndpointName,
		livenessEndpointName:          options.LivenessEndpointName,
		gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
		internalProceduresStop:        make(chan struct{}),
		leaderElectionStopped:         make(chan struct{}),
		leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
	}, nil
}

// pkg/manager/manager.go
type Manager interface {
	cluster.Cluster

	Add(Runnable) error

	Elected() <-chan struct{}

	Start(ctx context.Context) error
  
	...
}

// pkg/manager/manager.go
type Runnable interface {
	Start(context.Context) error
}

controller 注册到 Manger

// 将 controller 注册到 Manager 中,由相应
err = ctrl.NewControllerManagedBy(mgr).
		For(&api.ChaosPod{}).
		Owns(&corev1.Pod{}).
		Complete(&reconciler{
			Client: mgr.GetClient(),
			scheme: mgr.GetScheme(),
		})

builder.ControllerManagedBy 函数返回 controller 的 Builder 对象,通过 Builder 对象生成的 controller 将由所提供的管理器 Manager 启动。

// pkg/builder/controller.go

type Builder struct {
	forInput         ForInput
	ownsInput        []OwnsInput
	watchesInput     []WatchesInput
	mgr              manager.Manager
	globalPredicates []predicate.Predicate
	ctrl             controller.Controller
	ctrlOptions      controller.Options
	name             string
}

func ControllerManagedBy(m manager.Manager) *Builder {
	return &Builder{mgr: m}
}

Builder 对象的 For() 函数主要是用来定义我们要监听的资源对象类型的,比如 deployment。

// pkg/builder/controller.go

type ForInput struct {
	object           client.Object
	predicates       []predicate.Predicate
	objectProjection objectProjection
	err              error
}

func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
	if blder.forInput.object != nil {
		return blder
	}
  
	input := ForInput{object: object}
	for _, opt := range opts {
		opt.ApplyToFor(&input)
	}

	blder.forInput = input
	return blder
}

Owns() 函数是用来定义要监听的资源对象的子资源,比如 deployment 资源对象的子资源是 Pod。

// pkg/builder/controller.go

func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
	input := OwnsInput{object: object}
	for _, opt := range opts {
		opt.ApplyToOwns(&input)
	}

	blder.ownsInput = append(blder.ownsInput, input)
	return blder
}

Complete() 函数通过调用 Build 函数来构建 Controller,其中比较重要的就是 doController 和 doWatch 两个函数。

// pkg/builder/controller.go

func (blder *Builder) Complete(r reconcile.Reconciler) error {
  _, err := blder.Build(r)
  return err
}

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
  if r == nil {
    return nil, fmt.Errorf("must provide a non-nil Reconciler")
  }
  if blder.mgr == nil {
    return nil, fmt.Errorf("must provide a non-nil Manager")
  }
  if blder.forInput.err != nil {
    return nil, blder.forInput.err
  }

  // Set the ControllerManagedBy
  if err := blder.doController(r); err != nil {
    return nil, err
  }

  // Set the Watch
  if err := blder.doWatch(); err != nil {
    return nil, err
  }

  return blder.ctrl, nil
}

doController 会真正地创建 Controller 实例,并将创建的 Controller 实例添加到 Manager 中。

// pkg/builder/controller.go

func (blder *Builder) doController(r reconcile.Reconciler) error {
	...

	// Build the controller and return.
	blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
	return err
}

var newController = controller.New

// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// Add the controller as a Manager components
	return c, mgr.Add(c)
}

doWatch() 函数主要是调用 controller 的 Watch() 函数获取 informer,并且往 informer 中添加相应的 event 处理函数。这些 event 处理函数会根据 event,并结合过滤条件,生成 reconcile.Request 对象,放入 controller 的 queue 中。需要注意的是,我们可能会监听 For、Owns 添加的两类资源。通过代码可以看到,针对这两类资源,event 的处理函数是不同的。

// pkg/builder/controller.go

func (blder *Builder) doWatch() error {
	// Reconcile type
	if blder.forInput.object != nil {
		obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
		if err != nil {
			return err
		}
		src := source.Kind(blder.mgr.GetCache(), obj)
		hdler := &handler.EnqueueRequestForObject{}
		allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	for _, own := range blder.ownsInput {
		obj, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := source.Kind(blder.mgr.GetCache(), obj)
		opts := []handler.OwnerOption{}
		if !own.matchEveryOwner {
			opts = append(opts, handler.OnlyControllerOwner())
		}
		hdler := handler.EnqueueRequestForOwner(
			blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
			blder.forInput.object,
			opts...,
		)
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	...
	return nil
}

Manager 启动

最后是调用 Manager 的 Start 函数来启动 Manager,由于上面我们已经把 Controller 添加到了 Manager 中,所以这里启动其实是启动关联的 Controller。Manager 的 Start 函数实现如下所示:

// pkg/manager/internal.go

func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.started = true
  
  ...
  
	// Add the cluster runnable.
	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}

	// First start any webhook servers, which includes conversion, validation, and defaulting
	// webhooks that are registered.
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// Start and wait for caches.
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// Start the non-leaderelection Runnables after the cache has synced.
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// Start the leader election and all required runnables.
	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {
				// Treat not having leader election enabled the same as being elected.
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	...
}

Start() 函数会分别启动不同类别的 Runnable 实例,不同类别的 Runnable 实例是按组划分的,因此只需要调用 Runnable group 实例的 Start() 函数即可,比如cm.runnables.Others.Start(cm.internalCtx)。group 的 Start() 函数会遍历该 group 内的所有 Runnable 实例,然后调用该实例的 Start() 函数,比如针对 controller 来说,就会调用 controller 的 Start() 函数。

// pkg/manager/runnable_group.go

func (r *runnableGroup) Start(ctx context.Context) error {
	var retErr error

	r.startOnce.Do(func() {
		// Start the internal reconciler.
		go r.reconcile()
		...
	})

	return retErr
}

func (r *runnableGroup) reconcile() {
	for runnable := range r.ch {
		// Start the runnable.
		go func(rn *readyRunnable) {
			...
			// Start the runnable.
			if err := rn.Start(r.ctx); err != nil {
				r.errChan <- err
			}
		}(runnable)
	}
}

对于已经启动的 Manager 来说,如果再添加一个 controller 的话,那么这个 controller 也是会被调用 Start() 函数,从而启动的。这是因为,runnable group 会启动一个 goroutine,这个 goroutine 会不断从 channel 中读取内容,如果没有内容的话,这个 for 循环就会阻塞,直到有数据。再来看下 runnable group 的 Add 函数,如果 group 未启动的话,则添加到 startQueue 中,如果已经启动了的话,则直接往 ch 中写入一个 readyRunnable 结构体,而此时启动的 goroutine 会读取到这个 readyRunnable 结构体,然后启动里面包含的 controller。

func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
	if ready == nil {
		ready = func(_ context.Context) bool { return true }
	}

	readyRunnable := &readyRunnable{
		Runnable: rn,
		Check:    ready,
	}

	// Handle start.
	// If the overall runnable group isn't started yet
	// we want to buffer the runnables and let Start()
	// queue them up again later.
	{
		r.start.Lock()

		// Check if we're already started.
		if !r.started {
			// Store the runnable in the internal if not.
			r.startQueue = append(r.startQueue, readyRunnable)
			r.start.Unlock()
			return nil
		}
		r.start.Unlock()
	}

	// Enqueue the runnable.
	r.ch <- readyRunnable
	return nil
}

相关链接

  1. controller-runtime 之 manager 实现

  2. controller-runtime 之控制器实现

  3. controller-runtime源码分析

  4. Github controller-runtime 源码

Kubebuilder

Kubebuilder 提供了一系列的工具,帮助开发者快速生成自定义资源定义(CRDs)、创建新的 Controller(使用了上述的 controller-runtime 库)。它主要提供了两个能力:

  • 通过命令即可生成对应的 CRD,当然 CRD 中的具体内容(比如有哪些字段)还需要由开发者自己定义。
  • 生成 CRD 的同时,也会生成相应的 controller、controller-manager 脚手架代码,当然 controller 中的处理逻辑需要开发自己编写。

常用方式

# 先 init
kubebuilder init --domain example.com

# 使用命令生成相应的 API 对象,其中 API 的 group 为 elasticweb.example.com,也就是会结合 domain
kubebuilder create api --group elasticweb --version v1 --kind El

在 controller 的 Reconcile 函数的注释中添加 RBAC 声明

//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *ElasticWebReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
		...
}

自定义输出列:在 API 对象的结构体处添加自定义输出列,通过 priority 参数实现在 -o wide 的时候才显示

// +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image",description="The Docker Image of MyAPP"
// +kubebuilder:printcolumn:name="Port",type="string",priority=1,JSONPath=".spec.image",description="The Docker Image of Etcd"

type ElasticWeb struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   ElasticWebSpec   `json:"spec,omitempty"`
	Status ElasticWebStatus `json:"status,omitempty"`
}

相关链接

  1. 手摸手教你使用kubebuilder开发operator
卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/234
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 客户端-client go 介绍
Kubernetes 客户端-Informer 机制
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%