前言
针对下面这张图,上半部分是 client-go 中 Informer 相关的实现,下半部分是常见 Controller 相关的实现方式。本篇主要介绍下半部分常见 Controller 的实现,以及常用 Controller 框架的使用。
Controller 实现
在使用 Informer 的时候,需要注册事件回调函数。在 Informer 接收到事件之后会调用注册的回调函数,比如针对 ADD 事件则调用的是 AddFunc() 函数。
// 初始化 client
clientset, _ := kubernetes.NewForConfig(config)
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
// 启动 informer,list & watch
go factory.Start(stopper)
// 从 apiserver 同步资源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") },
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
如果事件回调函数(AddFunc、UpdateFunc 和 DeleteFunc 等函数)中的逻辑,都是操作 Workqueue,比如将事件入队。同时,有一个循环不断从 Workqueue 中取出事件,然后进一步处理。此时,可以发现这整个流程就对应了图中的下半部分,也就是 Controller 的核心流程。
对于 Controller 的实现例子,可以参考 Kubernetes CRD 自定义控制器,相当于自己手撸实现了一个 Controller。除此之外,现在很多包也都提供了开箱即用的 Controller 框架,基于框架可以快速构建一个 Controller。
controller-runtime 框架介绍
使用 controller-runtime 框架实现 controller,有两种使用方式,如下所示。下述代码,是从 controller-runtime 源代码中 example 中截取的:
func main() {
entryLog := log.Log.WithName("entrypoint")
// Setup a Manager
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
entryLog.Error(err, "unable to set up overall controller manager")
os.Exit(1)
}
// Setup a new controller to reconcile ReplicaSets
c, err := controller.New("foo-controller", mgr, controller.Options{
Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})
if err != nil {
entryLog.Error(err, "unable to set up individual controller")
os.Exit(1)
}
// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}
// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
entryLog.Error(err, "unable to run manager")
os.Exit(1)
}
}
var (
setupLog = ctrl.Log.WithName("setup")
)
type reconciler struct {
client.Client
scheme *runtime.Scheme
}
func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
return ctrl.Result{}, nil
}
func main() {
ctrl.SetLogger(zap.New())
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// in a real controller, we'd create a new scheme for this
err = api.AddToScheme(mgr.GetScheme())
if err != nil {
setupLog.Error(err, "unable to add scheme")
os.Exit(1)
}
err = ctrl.NewControllerManagedBy(mgr).
For(&api.ChaosPod{}).
Owns(&corev1.Pod{}).
Complete(&reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
})
if err != nil {
setupLog.Error(err, "unable to create controller")
os.Exit(1)
}
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
下面介绍 controller-runtime 的实现,主要包含 controller 和 manager 的实现,
controller 的整体实现
Controller 结构体的主要内容如下所示,
// pkg/internal/controller/controller.go
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
...
// Started is true if the Controller has been Started
Started bool
...
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
...
}
Controller 结构体实现了 Controller interface
// pkg/controller/controller.go
type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler
// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Start(ctx context.Context) error
// GetLogger returns this controller logger prefilled with basic information.
GetLogger() logr.Logger
}
controller.New() 函数
可以看到 NewUnmanaged
函数会真正地实例化 Controller。Controller 实例化完成后,会通过 mgr.Add(c)
函数将 Controller 添加到 Manager 中进行管理。
// 使用示例
c, err := controller.New("foo-controller", mgr, controller.Options{
Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})
// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}
// Add the controller as a Manager components
return c, mgr.Add(c)
}
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
if len(name) == 0 {
return nil, fmt.Errorf("must specify Name for Controller")
}
if options.MaxConcurrentReconciles <= 0 {
options.MaxConcurrentReconciles = 1
}
if options.CacheSyncTimeout == 0 {
options.CacheSyncTimeout = 2 * time.Minute
}
if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
Name: name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
}, nil
}
Add 函数传递的是一个 Runnable 参数,Runnable 是一个接口,用来表示可以启动的一个组件,而 Controller 就实现了这个接口的 Start 函数,也就是实现了这个 Runnable 接口。Add 函数会最终调用 runnable 的 Add 函数,根据 Runnable 来判断组件是否支持选举功能,支持则加入到 LeaderElection
group 中,否则加入到 Others
group 中。
// pkg/manager/internal.go
func (cm *controllerManager) Add(r Runnable) error {
cm.Lock()
defer cm.Unlock()
return cm.add(r)
}
func (cm *controllerManager) add(r Runnable) error {
return cm.runnables.Add(r)
}
// pkg/manager/runnable_group.go
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return runnable.GetCache().WaitForCacheSync(ctx)
})
case *webhook.Server:
return r.Webhooks.Add(fn, nil)
case LeaderElectionRunnable:
if !runnable.NeedLeaderElection() {
return r.Others.Add(fn, nil)
}
return r.LeaderElection.Add(fn, nil)
default:
return r.LeaderElection.Add(fn, nil)
}
}
Watch() 函数
Watch() 函数的主要流程是:
- 根据要 listen 的资源、event 处理函数、event 过滤函数封装成 watchDescription 结构体,并且添加到 c.startWatches 中。watchDescription 结构体是对 controller 中 informer 的抽象描述。
- 之后调用 source.Source 的 Start() 函数,在下面的例子中,其实是调用 Kind 实现的 Start() 函数。该 Start() 函数的主要流程,就是通过 manager 的 Cache 获取相应的 Informer,然后往 Informer 中添加相应的 event 处理函数。
// Watch 函数的使用,以 controller-runtime 的 example 代码为例
c, err := controller.New("foo-controller", mgr, controller.Options{
Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}
// pkg/internal/controller/controller.go
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
// Controller hasn't started yet, store the watches locally and return.
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
// pkg/internal/source/kind.go
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error {
...
go func() {
var (
i cache.Informer
lastErr error
)
// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
...
}
_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
ks.started <- err
return
}
if !ks.Cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
那么,相应的 event 处理函数如下所示,EventHandler 会将 handler.EventHandler、predicate.Predicate 等都进行封装,最终返回符合 Informer 要求的 event 处理函数(OnAdd、OnUpdate、OnDelete 等函数)。针对 event 处理函数来说,这些函数的处理流程基本上为:
- 获取 Informer 中获取到的 event 对应的 object 对象。
- 使用 predicate.Predicate 中的过滤函数进行过滤。
- 调用 handler.EventHandler 的 handler 函数处理,生成 reconcile.Request 对象,并放入 controller 的 queue 中。
// pkg/internal/source/event_handler.go
func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {
return &EventHandler{
ctx: ctx,
handler: handler,
queue: queue,
predicates: predicates,
}
}
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
ctx context.Context
handler handler.EventHandler
queue workqueue.RateLimitingInterface
predicates []predicate.Predicate
}
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: e.OnAdd,
UpdateFunc: e.OnUpdate,
DeleteFunc: e.OnDelete,
}
}
// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
ctx, cancel := context.WithCancel(e.ctx)
defer cancel()
e.handler.Create(ctx, c, e.queue)
}
// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}
...
for _, p := range e.predicates {
if !p.Update(u) {
return
}
}
...
e.handler.Update(ctx, u, e.queue)
}
// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e *EventHandler) OnDelete(obj interface{}) {
d := event.DeleteEvent{}
...
for _, p := range e.predicates {
if !p.Delete(d) {
return
}
}
...
e.handler.Delete(ctx, d, e.queue)
}
handler.EventHandler 的 handler 函数如下所示,主要功能就是往 queue 中添加相应的 reconcile.Request 对象。这个 queue 对象,会在 Controller 的 Start() 函数中被创建。而 queue 中的 reconcile.Request 对象,会被 controller 消费。
// pkg/handler/enqueue.go
type EnqueueRequestForObject struct{}
// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
switch {
case evt.ObjectNew != nil:
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
case evt.ObjectOld != nil:
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
}
// Delete implements EventHandler.
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
综上所述,Watch() 函数的主要作用就是:创建 Informer,并往 Informer 中添加相应的 informer event 处理函数。而这些 event 处理函数的主要流程是获取 informer event 对应的 object 对象,然后调用相应的过滤函数进行过滤,之后调用 handler 创建 reconcile.Request 对象,并将该对象放入 controller 的 queue 中。
Start() 函数
Start() 函数的主要流程是:
- 创建 controller 的 queue,这个 queue 将会存放 event 处理函数创建的 reconcile.Request 对象。
- 根据 c.MaxConcurrentReconciles,启动相应数量的 goroutine,每个 goroutine 都会调用 c.processNextWorkItem 函数。该函数的主要作用就是从 queue 中获取 reconcile.Request 对象,然后调用用户编写的 controller 的 Reconcile 函数,比如 reconcileReplicaSet 的 Reconcile 函数。
// reconcileReplicaSet 实现了相应的 Reconcile 函数,也就是 Reconciler 接口
type reconcileReplicaSet struct {
client client.Client
}
func (r *reconcileReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log := log.FromContext(ctx)
}
c, err := controller.New("foo-controller", mgr, controller.Options{
Reconciler: &reconcileReplicaSet{client: mgr.GetClient()},
})
// pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error {
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.Queue = c.MakeQueue()
wg := &sync.WaitGroup{}
err := func() error {
...
for _, watch := range c.startWatches {
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
}
c.startWatches = nil
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
...
return nil
}
processNextWorkItem 的处理流程如下所示:
- 从 queue 中获取相应的 obj 对象,然后调用 reconcileHandler 方法。
- 该方法会调用 controller 自己的 Reconcile 函数(不是用户编写的 Reconcile 函数)。同时,根据 Reconcile 函数的返回,做进一步的处理,比如重新加入队列、过一段时间之后加入队列、从队列中移掉。
- controller 自身的 Reconcile 函数会调用 Do 对应的 Reconcile 函数,这个 Reconcile 是用户编写的,包含用户自身的处理逻辑。
// pkg/internal/controller/controller.go
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
c.reconcileHandler(ctx, obj)
return true
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
req, ok := obj.(reconcile.Request)
if !ok {
c.Queue.Forget(obj)
return
}
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
case result.RequeueAfter > 0:
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
case result.Requeue:
c.Queue.AddRateLimited(req)
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.Queue.Forget(obj)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}
}
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
...
return c.Do.Reconcile(ctx, req)
}
综上所述,Start() 函数的主要作用是:
- 创建 controller 的 queue(当产生相应的 event 之后,informer 的 event 处理函数,会生成 reconcile.Request 对象并放入到 queue 中)。
- 启动一定数量的 goroutine,每个 goroutine 都会从 queue 中获取相应的 reconcile.Request 对象,并调用用户自己编写的 Reconcile 函数进行处理。
manager 的整体实现
controller-runtime 还提供 Manager,来管理 Controller。除了管理 Controller 之外,Manager 还可以:
-
管理 Admission Webhook。
-
提供统一的 client,访问 kubernetes 资源对象。使用该 client 读 kubernetes 资源对象的话,会先访问 manager 的缓存;如果是写 kubernetes 资源对象的话,则会直接访问 API Server。
-
提供统一的资源对象的 scheme。
-
提供统一的 cache,这个 cache 会在 controller 需要 informer 某些资源的时候,创建对应的 Informer 监听相应的资源对象,实现相应资源对象的缓存。
Manager 使用
上述提到,我们使用 controller-runtime 实现自己的 controller 有两种方式,但是上述两种方式都会用到 manager,但是更推荐使用后者的方式,也就是下面的使用方式(代码来自于 controller-runtime 仓库的 example 代码):
- 实例化 manager,参数 config
- 向 manager 添加 scheme
- 向 manager 添加 controller,该 controller 包含一个 reconciler 结构体,我们需要在 reconciler 结构体实现逻辑处理
- 向 manager 添加 webhook,同样需要实现逻辑处理
- 启动 manager.start()
package main
import (
...
)
var (
setupLog = ctrl.Log.WithName("setup")
)
type reconciler struct {
client.Client
scheme *runtime.Scheme
}
func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
return ctrl.Result{}, nil
}
func main() {
ctrl.SetLogger(zap.New())
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// in a real controller, we'd create a new scheme for this
err = api.AddToScheme(mgr.GetScheme())
if err != nil {
setupLog.Error(err, "unable to add scheme")
os.Exit(1)
}
err = ctrl.NewControllerManagedBy(mgr).
For(&api.ChaosPod{}).
Owns(&corev1.Pod{}).
Complete(&reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
})
if err != nil {
setupLog.Error(err, "unable to create controller")
os.Exit(1)
}
err = ctrl.NewWebhookManagedBy(mgr).
For(&api.ChaosPod{}).
Complete()
if err != nil {
setupLog.Error(err, "unable to create webhook")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
Manager 实例化
Manager 实例化最后返回的是一个 controllerManager 的实例。该结构体是 Manager 接口的一个实现。
- Manager 可以管理 Runnable 的生命周期(添加/启动),Controller 和 Admission Webhook 都实现了 Runable 接口,也就是 Start 方法。
- Manager 还支持领导人选举。Leader 选举的意思是如果这个 manager 启动了两个副本,那么只有一个 manager 中的 controller 才会处理相应的请求。简单点说,如果一个 manager 成为了 leader,那么该 leader 才会启动那些需要 leader 选举的 controller。
// Manager 实例化,最终会调用 manager.New 函数
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
NewManager = manager.New
// pkg/manager/manager.go
func New(config *rest.Config, options Options) (Manager, error) {
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
...
})
...
errChan := make(chan error)
runnables := newRunnables(options.BaseContext, errChan)
return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
cluster: cluster,
runnables: runnables,
errChan: errChan,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
controllerConfig: options.Controller,
logger: options.Logger,
elected: make(chan struct{}),
webhookServer: options.WebhookServer,
leaderElectionID: options.LeaderElectionID,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
}, nil
}
// pkg/manager/manager.go
type Manager interface {
cluster.Cluster
Add(Runnable) error
Elected() <-chan struct{}
Start(ctx context.Context) error
...
}
// pkg/manager/manager.go
type Runnable interface {
Start(context.Context) error
}
controller 注册到 Manger
// 将 controller 注册到 Manager 中,由相应
err = ctrl.NewControllerManagedBy(mgr).
For(&api.ChaosPod{}).
Owns(&corev1.Pod{}).
Complete(&reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
})
builder.ControllerManagedBy
函数返回 controller 的 Builder 对象,通过 Builder 对象生成的 controller 将由所提供的管理器 Manager 启动。
// pkg/builder/controller.go
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}
func ControllerManagedBy(m manager.Manager) *Builder {
return &Builder{mgr: m}
}
Builder 对象的 For() 函数主要是用来定义我们要监听的资源对象类型的,比如 deployment。
// pkg/builder/controller.go
type ForInput struct {
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
err error
}
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
if blder.forInput.object != nil {
return blder
}
input := ForInput{object: object}
for _, opt := range opts {
opt.ApplyToFor(&input)
}
blder.forInput = input
return blder
}
Owns() 函数是用来定义要监听的资源对象的子资源,比如 deployment 资源对象的子资源是 Pod。
// pkg/builder/controller.go
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
input := OwnsInput{object: object}
for _, opt := range opts {
opt.ApplyToOwns(&input)
}
blder.ownsInput = append(blder.ownsInput, input)
return blder
}
Complete() 函数通过调用 Build 函数来构建 Controller,其中比较重要的就是 doController 和 doWatch 两个函数。
// pkg/builder/controller.go
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
if r == nil {
return nil, fmt.Errorf("must provide a non-nil Reconciler")
}
if blder.mgr == nil {
return nil, fmt.Errorf("must provide a non-nil Manager")
}
if blder.forInput.err != nil {
return nil, blder.forInput.err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
doController 会真正地创建 Controller 实例,并将创建的 Controller 实例添加到 Manager 中。
// pkg/builder/controller.go
func (blder *Builder) doController(r reconcile.Reconciler) error {
...
// Build the controller and return.
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
return err
}
var newController = controller.New
// pkg/controller/controller.go
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}
// Add the controller as a Manager components
return c, mgr.Add(c)
}
doWatch() 函数主要是调用 controller 的 Watch() 函数获取 informer,并且往 informer 中添加相应的 event 处理函数。这些 event 处理函数会根据 event,并结合过滤条件,生成 reconcile.Request 对象,放入 controller 的 queue 中。需要注意的是,我们可能会监听 For、Owns 添加的两类资源。通过代码可以看到,针对这两类资源,event 的处理函数是不同的。
// pkg/builder/controller.go
func (blder *Builder) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
for _, own := range blder.ownsInput {
obj, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
}
hdler := handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
opts...,
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
...
return nil
}
Manager 启动
最后是调用 Manager 的 Start 函数来启动 Manager,由于上面我们已经把 Controller 添加到了 Manager 中,所以这里启动其实是启动关联的 Controller。Manager 的 Start 函数实现如下所示:
// pkg/manager/internal.go
func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.started = true
...
// Add the cluster runnable.
if err := cm.add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start the non-leaderelection Runnables after the cache has synced.
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel
go func() {
if cm.resourceLock != nil {
if err := cm.startLeaderElection(ctx); err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
}
close(cm.elected)
}
}()
}
...
}
Start() 函数会分别启动不同类别的 Runnable 实例,不同类别的 Runnable 实例是按组划分的,因此只需要调用 Runnable group 实例的 Start() 函数即可,比如cm.runnables.Others.Start(cm.internalCtx)
。group 的 Start() 函数会遍历该 group 内的所有 Runnable 实例,然后调用该实例的 Start() 函数,比如针对 controller 来说,就会调用 controller 的 Start() 函数。
// pkg/manager/runnable_group.go
func (r *runnableGroup) Start(ctx context.Context) error {
var retErr error
r.startOnce.Do(func() {
// Start the internal reconciler.
go r.reconcile()
...
})
return retErr
}
func (r *runnableGroup) reconcile() {
for runnable := range r.ch {
// Start the runnable.
go func(rn *readyRunnable) {
...
// Start the runnable.
if err := rn.Start(r.ctx); err != nil {
r.errChan <- err
}
}(runnable)
}
}
对于已经启动的 Manager 来说,如果再添加一个 controller 的话,那么这个 controller 也是会被调用 Start() 函数,从而启动的。这是因为,runnable group 会启动一个 goroutine,这个 goroutine 会不断从 channel 中读取内容,如果没有内容的话,这个 for 循环就会阻塞,直到有数据。再来看下 runnable group 的 Add 函数,如果 group 未启动的话,则添加到 startQueue 中,如果已经启动了的话,则直接往 ch 中写入一个 readyRunnable 结构体,而此时启动的 goroutine 会读取到这个 readyRunnable 结构体,然后启动里面包含的 controller。
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
if ready == nil {
ready = func(_ context.Context) bool { return true }
}
readyRunnable := &readyRunnable{
Runnable: rn,
Check: ready,
}
// Handle start.
// If the overall runnable group isn't started yet
// we want to buffer the runnables and let Start()
// queue them up again later.
{
r.start.Lock()
// Check if we're already started.
if !r.started {
// Store the runnable in the internal if not.
r.startQueue = append(r.startQueue, readyRunnable)
r.start.Unlock()
return nil
}
r.start.Unlock()
}
// Enqueue the runnable.
r.ch <- readyRunnable
return nil
}
相关链接
Kubebuilder
Kubebuilder 提供了一系列的工具,帮助开发者快速生成自定义资源定义(CRDs)、创建新的 Controller(使用了上述的 controller-runtime 库)。它主要提供了两个能力:
- 通过命令即可生成对应的 CRD,当然 CRD 中的具体内容(比如有哪些字段)还需要由开发者自己定义。
- 生成 CRD 的同时,也会生成相应的 controller、controller-manager 脚手架代码,当然 controller 中的处理逻辑需要开发自己编写。
常用方式
# 先 init
kubebuilder init --domain example.com
# 使用命令生成相应的 API 对象,其中 API 的 group 为 elasticweb.example.com,也就是会结合 domain
kubebuilder create api --group elasticweb --version v1 --kind El
在 controller 的 Reconcile 函数的注释中添加 RBAC 声明
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *ElasticWebReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
}
自定义输出列:在 API 对象的结构体处添加自定义输出列,通过 priority 参数实现在 -o wide 的时候才显示
// +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image",description="The Docker Image of MyAPP"
// +kubebuilder:printcolumn:name="Port",type="string",priority=1,JSONPath=".spec.image",description="The Docker Image of Etcd"
type ElasticWeb struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ElasticWebSpec `json:"spec,omitempty"`
Status ElasticWebStatus `json:"status,omitempty"`
}