前言
在网上搜 Informer 的时候,会经常能刷到下面这张图。这张图分为上下两部分,上半部分是 client-go 中 Informer 相关的实现,下半部分是常见 Controller 相关的实现方式。接下去,我们会用两篇的篇幅来分别进行介绍。
基础概念
API Server 在 Kubernetes 中是 Etcd 的统一入口,任何对数据的读写操作都会经过 API Server。
为了缓解 API Server 的压力,提供了 List & Watch 机制,让客户端实时获取 API 对象。List & Watch 的基本思想是:
-
先通过 List API 获取到 Etcd 中某类对象的信息,比如全部 Pod 的对象。然后,将这些对象全都缓存在客户端的内存中。List API 是基于 HTTP 短链接。
-
之后,再通过 Watch API 监听 API Server,获取相应对象的改变事件,比如 create、update 或 delete 事件。之后客户端根据事件对内存中的缓存进行更新,从而保持本地缓存中的内容跟 Etcd 中的情况是一致的。Watch 避免了一直 List 带来的通信开销和对 API Server 的压力。Watch API 是基于 HTTP 长链接。
如下所示,通过
GET /api/v1/watch/pods
的方式可以持续监听 Pod 相关事件,如果有事件产生都会返回一个事件。$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/pods?watch=yes HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked Date: Thu, 02 Jan 2019 20:22:59 GMT Transfer-Encoding: chunked {"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}} {"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}} {"type":"MODIFIED", "object":{"kind":"Pod","apiVersion":"v1",...}}
Informer 是 client-go 在 List & Watch 之上构建的更高级的封装,它提供了缓存及事件分发机制。Informer 广泛用于 Kubernetes 中,比如 Scheduler、Controller Manager、Kubelet 等组件都使用了 Informer。
整体流程
本地在使用 Informer 的时候,整个流程如图所示,
- Informer 初始化的时候,Reflector 会先通过 List API 获得所有的 Pod,然后将这些 Pod 都存放到 Local Store(Thread safe store)中。
- Informer 初始化完成之后,Reflector 通过 http 协议中的 chunk 机制来监听 kubernetes 中资源的变更事件,事件主要分为三个动作 ADD、UPDATE、DELETE。
- 如果此时集群中新增了一个 Pod 资源,也就是 ADD 变更事件。此时,Reflector 会监听到这个事件,并把这个事件存储到 DeltaFIFO 中。
- 之后 Infromer 中的后台 goroutine 会将事件 Pop 出来,通过 Indexer(索引器,该索引器内就包含了 Local Store 对象)将资源分类存储。默认的 Indexer 是以资源的 namespace 和 name 作为资源对应的索引名。
- 与此同时,这个事件也会回调注册的事件处理函数(Res Event Handlers)进行处理。事件处理函数分为 OnAdd、OnUpdate 和 OnDelete 三个方法。
如果此时调用 Lister 的 List/Get 方法获取 Pod, 那么 Lister 是从 Local Store 中获取数据的。
List & Watch
client-go 中提供了 ListWatch 客户端,实现了 List & Watch 的功能。下面通过代码来演示这一步,

package main
import (
"fmt"
"k8s-clientset/deep_client_go/reflector/helper"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
"log"
)
// create pods list & watch
func main() {
// helper 只是一个类似上文演示的 config, 只要用于初始化各种客户端
cliset := helper.InitK8SClient()
lwc := cache.NewListWatchFromClient(cliset.CoreV1().RESTClient(), "pods", "kube-system", fields.Everything())
watcher, err := lwc.Watch(metav1.ListOptions{})
if err != nil {
log.Fatalln(err)
}
for {
select {
case v, ok := <-watcher.ResultChan():
if ok {
fmt.Println(v.Type, ":", v.Object.(*v1.Pod).Name, "-", v.Object.(*v1.Pod).Status.Phase)
}
}
}
}
// 输出结果
// ADDED : kube-apiserver-k8s-01 - Running
// ADDED : kube-scheduler-k8s-01 - Running
// ADDED : coredns-65c54cc984-26zx9 - Running
// ADDED : metrics-server-7fd564dc66-sm29c - Running
// ADDED : kube-proxy-6jl96 - Running
// ADDED : coredns-65c54cc984-bgmpm - Running
// ADDED : etcd-k8s-01 - Running
// ADDED : kube-controller-manager-k8s-01 - Running
当你做 Pod 资源变更时便可以接收到变更事件,
// 执行 kubectl apply -f deploy.yaml
//ADDED : mygott-7565765f4d-2t4z8 - Pending
//MODIFIED : mygott-7565765f4d-2t4z8 - Pending
//MODIFIED : mygott-7565765f4d-2t4z8 - Pending
//MODIFIED : mygott-7565765f4d-2t4z8 - Running
// 执行 kubectl delete deploy mygott
//MODIFIED : mygott-7565765f4d-2t4z8 - Running
//MODIFIED : mygott-7565765f4d-2t4z8 - Running
//MODIFIED : mygott-7565765f4d-2t4z8 - Running
//DELETED : mygott-7565765f4d-2t4z8 - Running
DeltaFIFO 队列
client-go 也提供了 DelatFIFO 队列。下面通过代码来体验 DeltaFIFO 队列的使用,将事件放入 DeltaFIFO 队列和从队列中获取内容,类似于这一步,

package main
import (
"fmt"
"k8s.io/client-go/tools/cache"
)
type Pod struct {
Name string
Value int
}
func NewPod(name string, v int) Pod {
return Pod{Name: name, Value: v}
}
// 需要提供一个资源的唯一标识的字符串给到 DeltaFifo, 这样它就能追踪某个资源的变化
func PodKeyFunc(obj interface{}) (string, error) {
return obj.(Pod).Name, nil
}
func main() {
df := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{KeyFunction: PodKeyFunc})
// ADD3个object 进入 fifo
pod1 := NewPod("pod-1", 1)
pod2 := NewPod("pod-2", 2)
pod3 := NewPod("pod-3", 3)
df.Add(pod1)
df.Add(pod2)
df.Add(pod3)
// Update pod-1
pod1.Value = 11
df.Update(pod1)
df.Delete(pod1)
// 当前df 的列表
fmt.Println(df.List())
// 循环抛出事件
for {
df.Pop(func(i interface{}) error {
for _, delta := range i.(cache.Deltas) {
switch delta.Type {
case cache.Added:
fmt.Printf("Add Event: %v \n", delta.Object)
break
case cache.Updated:
fmt.Printf("Update Event: %v \n", delta.Object)
break
case cache.Deleted:
fmt.Printf("Delete Event: %v \n", delta.Object)
break
case cache.Sync:
fmt.Printf("Sync Event: %v \n", delta.Object)
break
case cache.Replaced:
fmt.Printf("Replaced Event: %v \n", delta.Object)
break
}
}
return nil
})
}
}
// 输出结果, 可以看到先入列的资源事件会被先抛出
// 这是由于底层是是用 map 来记录资源的唯一标识起到快速索引和去重复的作用;
//[{pod-1 11} {pod-2 2} {pod-3 3}]
//Add Event: {pod-1 1}
//Update Event: {pod-1 11}
//Delete Event: {pod-1 11}
//Add Event: {pod-2 2}
//Add Event: {pod-3 3}
在实际过程中,往 DeltaFIFO 队列中写入的是 Event,如下
type Event struct {
// 事件类型
Type EventType
// 资源对象
Object runtime.Object
}
// 事件类型如下:
// 资源添加事件
Added EventType = "ADDED"
// 资源修改事件
Modified EventType = "MODIFIED"
// 资源删除事件
Deleted EventType = "DELETED"
// 标记资源版本号事件, 这个就是用于可重新watch的版本号
Bookmark EventType = "BOOKMARK"
// 错误事件
Error EventType = "ERROR"
Reflector
client-go 中的 Reflector 整合了 ListWatch 和 DeltaFifo 两个。它的作用相当于上述两块代码的结合:它通过 ListWatch 客户端 List 并 Watch API Server,然后将 Watch 到的事件写到 DeltaFIFO 队列中,类似于下图。

使用 Reflector 的代码如下所示,由于 Reflector 已经将事件写入 Delta FIFO 队列中了,因此可以通过从该队列中获取事件,
package main
import (
"fmt"
"k8s-clientset/deep_client_go/reflector/helper"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
"time"
)
// simulate k8s simple reflector creation process
func main() {
cliset := helper.InitK8SClient()
// 使用 store 进行存储,这样本地才有一份数据;
// 如果本地没有存储到被删除的资源, 则不需要 Pop 该资源的 Delete 事件;
// 所以我们为了准确接收到delete时接收到 Delete 事件, 所以预先创建一下 store
// cache.MetaNamespaceKeyFunc 是用于返回资源的唯一标识, {namespace}/{name} 或 {name}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
// create list & watch Client
lwc := cache.NewListWatchFromClient(cliset.CoreV1().RESTClient(),
helper.Resource,
helper.Namespace,
fields.Everything(),
)
// create deltafifo
df := cache.NewDeltaFIFOWithOptions(
cache.DeltaFIFOOptions{
KeyFunction: cache.MetaNamespaceKeyFunc,
KnownObjects: store,
})
// crete reflector
rf := cache.NewReflector(lwc, &v1.Pod{}, df, time.Second*0)
rsCH := make(chan struct{})
go func() {
rf.Run(rsCH)
}()
// fetch delta event
for {
df.Pop(func(i interface{}) error {
// deltas
for _, d := range i.(cache.Deltas) {
fmt.Println(d.Type, ":", d.Object.(*v1.Pod).Name,
"-", d.Object.(*v1.Pod).Status.Phase)
switch d.Type {
case cache.Sync, cache.Added:
// 向store中添加对象
store.Add(d.Object)
case cache.Updated:
store.Update(d.Object)
case cache.Deleted:
store.Delete(d.Object)
}
}
return nil
})
}
}
// 输出结果
//Sync : pod-1 - Running
//Sync : web-sts-1 - Running
//Sync : web-sts-0 - Running
//Sync : ngx-8669b5c9d-xwljg - Running
// 执行 kubectl apply -f deploy.yaml
//Added : mygott-7565765f4d-x6znf - Pending
//Updated : mygott-7565765f4d-x6znf - Pending
//Updated : mygott-7565765f4d-x6znf - Pending
//Updated : mygott-7565765f4d-x6znf - Running
// 执行 kubectl delete deploy mygott
//Updated : mygott-7565765f4d-x6znf - Running
//Updated : mygott-7565765f4d-x6znf - Running
//Updated : mygott-7565765f4d-x6znf - Running
//Deleted : mygott-7565765f4d-wcml6 - Running
Store/Indexer
在 Reflector 中,已经用过了 Store 相关的代码:store := cache.NewStore(cache.MetaNamespaceKeyFunc)
。下面则从 NewStore
函数开始,讲解 Store 和 Indexer。
Store
Store 是一个接口类型,它提供了以下这些核心方法,用于增删改查存储其中的对象。cache 结构体实现了该接口,
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
cache 使用 ThreadSafeStore 作为其核心存储对象,而 ThreadSafeStore 也是一接口,threadSafeMap 是其具体实现。threadSafeMap 的核心数据结构是一个 map,只是配合了读写锁保证了并发安全。
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
ThreadSafeStore 也提供了一系列方法,Store 相当于是在这些方法之上的封装:
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
Indexer
在 Informer 中,除了提供 Store 的方式之外,还提供了 Indexer。Indexer 的底层其实跟 Store 一样都是使用一个 ThreadSafeStore 的对象来存储资源,它和 Store 的主要区别是 Indexer 是可以做索引的,而 Store 是无法做索引的。在实际上,NewIndexer 给编程人员提供了传入 Indexer 相关配置的参数,并且也在 Store 基础之上提供了针对 Indexer 的相关方法。
可以看到针对 threadSafeMap 的 updateIndices 方法来说,它会遍历 indexers。由于 NewStore 的时候,传入的为空,所以实际上这个过程不会执行。
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range c.indexers {
...
}
}
下面来解释下跟索引相关的几个对象:
-
Indexers 存储了索引器的一个映射关系,map key 为 indexer 的名字,map value 为 IndexFunc。IndexFunc 表示 indexer 使用的索引函数,该函数会返回索引值,默认提供以 namespace 为索引值的函数。
-
Indices 存储了 indexer name 和使用该 indexer 时存储的索引值和相应资源的映射关系(叫 Index)。
// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Indices maps a name to an Index
type Indices map[string]Index
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
下述的代码很好地演示了这几个之间的串联使用:
func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := i.indices[indexName]
var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
return storeKeySet, nil
}
我们也可以提供自定义的 IndexFunc,如下所示,
package cache
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)
// LabelsIndexFunc 用作给出可检索所有的索引值
func LabelsIndexFunc(obj interface{}) ([]string, error) {
metaD, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{metaD.GetLabels()["app"]}, nil
}
func TestIndexer(t *testing.T) {
// 建立一个名为 app 的 Indexer, 并使用我们自己编写的 索引方法
idxs := Indexers{"app": LabelsIndexFunc}
// 伪造2个pod资源
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "ns1",
Labels: map[string]string{
"app": "l1",
}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "ns2",
Labels: map[string]string{
"app": "l2",
}}}
// 初始化 Indexer
myIdx := NewIndexer(MetaNamespaceKeyFunc, idxs)
// 添加pod
myIdx.Add(pod1)
myIdx.Add(pod2)
// 打印通过索引检索的资源
fmt.Println(myIdx.IndexKeys("app", "l1"))
}
// Output
// 结果只返回 app=l1 的 pod
// [ns1/pod1] <nil>
EventHandler
客户端在收到 Add、Update、Delete 等事件之后,除了会根据事件将本地存储和远程存储保持一致外,还会根据事件调用注册的回调函数,也就是下图中的这个部分,
编程人员可以使用类似下面的方法,注册事件回调函数,
package main
import (
"fmt"
"k8s-clientset/config"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
)
type CmdHandler struct {
}
// 当接收到添加事件便会执行该回调, 后面的方法以此类推
func (this *CmdHandler) OnAdd(obj interface{}) {
fmt.Println("Add: ", obj.(*v1.ConfigMap).Name)
}
func (this *CmdHandler) OnUpdate(obj interface{}, newObj interface{}) {
fmt.Println("Update: ", newObj.(*v1.ConfigMap).Name)
}
func (this *CmdHandler) OnDelete(obj interface{}) {
fmt.Println("Delete: ", obj.(*v1.ConfigMap).Name)
}
func main() {
cliset := config.NewK8sConfig().InitClient()
// 通过 clientset 返回一个 listwatcher, 仅支持 default/configmaps 资源
listWatcher := cache.NewListWatchFromClient(
cliset.CoreV1().RESTClient(),
"configmaps",
"default",
fields.Everything(),
)
// 初始化一个informer, 传入了监听器, 资源名, 间隔同步时间
// 最后一个是我们定义的 Handler 用于接收我们监听的资源变更事件;
_, c := cache.NewInformer(listWatcher, &v1.ConfigMap{}, 0, &CmdHandler{})
// 启动循环监听
c.Run(wait.NeverStop)
}
那么这一步骤是怎么实现的呢?见下文关于 Informer 的讲解。
Informer 的使用
上述过程我们对 Informer 中涉及的步骤进行了逐步拆解,在最后的 EventHandler 中,我们也使用了 Informer。在实际编程的时候,我们往往直接使用的就是 Informer。Informer 相当于 Reflector、DeltaFIFO、Indexer/Store 的进一步封装。在 Informer 中,它会创建 Reflector、DeltaFIFO 队列和 Indexer/Store,自动将 watch 到的事件存到 DeltaFIFO 队列中,并且从 DeltaFIFO 队列中不断取出事件,然后调用注册的事件回调函数,同时根据事件将对象存储到 Indexer/Store 中。
client-go 提供了多种类型的 Informer,
- Informer/IndexerInformer,该类型的 Informer 是一个独立的 Informer。同一程序中要想有监听同一个资源类型的多个 Informer,对应的就需要创建多个 Informer 实例监听的是同一资源,而每个 Informer 实例都独立地与 API Server 建立一个连接,维护一个 DeltaFIFO 队列,一个 LocalStore(比如针对同一资源类型会有多个事件回调处理函数,则需要创建多个 Informer 实例)。其中 Informer 使用缓存的是 Store,IndexerInformer 使用缓存的是 Indexer。
- SharedInformer/SharedIndexInformer,与 Informer/IndexerInformer 相比,该类型的 Informer 是一个共享的 Informer,这种类型的 Informer 使得同一类资源的 Informer 可以共享一个 Informer 实例(比如针对),也就共享一个 Refactor,从而降低对 APIServer 的压力(比如针对同一资源类型会有多个事件回调处理函数,则只需要创建一个 Informer 实例)。这种类型的 Informer 通常与 SharedInformerFactory 一起使用。SharedInformerFactory 是一个工厂模式的实现,可以管理和协调多个 SharedIndexInformer 实例的生命周期(SharedInformerFactory 创建的是 SharedIndexInformer 类型的实例)。其中 SharedInformer 使用的缓存是 Store,SharedIndexInformer 使用的缓存是 Indexer。
Informer/IndexerInformer
使用示例
func main () {
cliset := config.NewK8sConfig().InitClient()
// 获取configmap
listWatcher := cache.NewListWatchFromClient(
cliset.CoreV1().RESTClient(),
"configmaps",
"default",
fields.Everything(),
)
// CmdHandler 和上述的 EventHandler (参考 3.3.5)
store, controller := cache.NewInformer(listWatcher, &v1.ConfigMap{}, 0, &CmdHandler{})
// 开启一个goroutine 避免主线程堵塞
go controller.Run(wait.NeverStop)
// 等待3秒 同步缓存
time.Sleep(3 * time.Second)
// 从缓存中获取监听到的 configmap 资源
fmt.Println(store.List())
}
// Output:
// Add: kube-root-ca.crt
// Add: istio-ca-root-cert
// [... configmap 对象]
import (
"fmt"
"k8s-clientset/config"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"time"
)
...
// LabelsIndexFunc 用作给出可检索的索引值
func LabelsIndexFunc(obj interface{}) ([]string, error) {
metaD, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{metaD.GetLabels()["app"]}, nil
}
func main () {
cliset := config.NewK8sConfig().InitClient()
// 获取configmap
listWatcher := cache.NewListWatchFromClient(
cliset.CoreV1().RESTClient(),
"configmaps",
"default",
fields.Everything(),
)
// 创建索引其并指定名字
myIndexer := cache.Indexers{"app": LabelsIndexFunc}
// CmdHandler 和上述的 EventHandler (参考 3.3.5)
i, c := cache.NewIndexerInformer(listWatcher, &v1.Pod{}, 0, &CmdHandler{}, myIndexer)
// 开启一个goroutine 避免主线程堵塞
go controller.Run(wait.NeverStop)
// 等待3秒 同步缓存
time.Sleep(3 * time.Second)
// 通过 IndexStore 指定索引器获取我们需要的索引值
// busy-box 索引值是由于 我在某个 pod 上打了一个 label 为 app: busy-box
objList, err := i.ByIndex("app", "busy-box")
if err != nil {
panic(err)
}
fmt.Println(objList[0].(*v1.Pod).Name)
}
// Output:
// Add: cloud-enterprise-7f84df95bc-7vwxb
// Add: busy-box-6698d6dff6-jmwfs
// busy-box-6698d6dff6-jmwfs
//
源码
Informer/IndexerInformer 类型的 Informer,在实现上分为两部分:
- 一部分是 Controller 对象封装了 Refactor(将接收到的事件存到 DeltaFIFO 队列中),并且会不断从 DeltaFIFO 队列中获取事件,调用相应的 Process 函数。这个过程的核心是 processLoop() 函数(需要调用 Run () 方法)。
- 另一部分是 Informer 实现的 Process 函数。Informer 的 Process 函数核心是 processDeltas() 函数,该函数会调用 Store/Indexer 的 Add、Update、Delete 等函数保持本地缓存与 API Server 一样,同时也会调用传入的事件回调函数。
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
func processDeltas(
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
...
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
func (c *controller) Run(stopCh <-chan struct{}) {
...
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
...
}
}
SharedInformer/SharedIndexInformer
使用示例
使用 SharedInformer 类型的话,可以针对同一资源添加多个回调函数,而在 Informer/IndexInformer 中则需要创建两个 Informer 实例。
func main() {
cliset := config.NewK8sConfig().InitClient()
listWarcher := cache.NewListWatchFromClient(
cliset.CoreV1().RESTClient(),
"configmaps",
"default",
fields.Everything(),
)
// 全量同步时间
shareInformer := cache.NewSharedInformer(listWarcher, &v1.ConfigMap{}, 0)
// 可以增加多个Event handler
shareInformer.AddEventHandler(&handlers.CmdHandler{})
shareInformer.AddEventHandler(&handlers.CmdHandler2{})
shareInformer.Run(wait.NeverStop)
}
该类型的 Informer 在实际使用时往往使用的是 SharedInformerFactory,使用步骤如下所示,
-
首先创建一个 SharedInformerFactory。有 NewSharedInformerFactory/NewFilteredSharedInformerFactory/NewSharedInformerFactoryWithOptions 等方法,前两个方法中最终调用的其实都是 NewSharedInformerFactoryWithOptions。
-
之后调用 SharedInformerFactory 提供的 ForResource 方法,这个方法最终会调用对应资源的 Informer 方法,该方法会将相应的 Informer 实例注册到 factory 中。
-
最后调用 SharedInformerFactory 的 Start 方法,该方法会启动注册到 factory 的所有 Informer 实例。
package main
import (
"fmt"
"k8s-clientset/config"
"k8s-clientset/dc/handlers"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
)
func main() {
cliset := config.NewK8sConfig().InitClient()
informerFactory := informers.NewSharedInformerFactoryWithOptions(
cliset,
0,
// 指定的namespace 空间,如果需要所有空间,则不指定该参数
informers.WithNamespace("default"),
)
// 添加 ConfigMap 资源
cmGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
}
cmInformer, _ := informerFactory.ForResource(cmGVR)
// 增加对 ConfigMap 事件的处理
cmInformer.Informer().AddEventHandler(&handlers.CmdHandler{})
// 添加 Pod 资源
podGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
_, _ = informerFactory.ForResource(podGVR)
// 启动 informerFactory
informerFactory.Start(wait.NeverStop)
// 等待所有资源完成本地同步
informerFactory.WaitForCacheSync(wait.NeverStop)
// 打印资源信息
listConfigMap, _ := informerFactory.Core().V1().ConfigMaps().Lister().List(labels.Everything())
fmt.Println("Configmap:")
for _, obj := range listConfigMap {
fmt.Printf("%s/%s \n", obj.Namespace, obj.Name)
}
fmt.Println("Pod:")
listPod, _ := informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
for _, obj := range listPod {
fmt.Printf("%s/%s \n", obj.Namespace, obj.Name)
}
select {}
}
// Ouput:
// Configmap:
// default/istio-ca-root-cert
// default/kube-root-ca.crt
// default/my-config
// Pod:
// default/cloud-enterprise-7f84df95bc-csdqp
// default/busy-box-6698d6dff6-42trb
如果想监听所有可操作的内部资源, 可以使用
DiscoveryClient
先获取当前集群的资源版本再调用InformerFactory
进行资源缓存。
源码
SharedInformer/SharedIndexInformer 是两种类型的接口,实现这两个接口的结构体是 sharedIndexInformer。
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
SetWatchErrorHandler(handler WatchErrorHandler) error
SetTransform(handler TransformFunc) error
}
type SharedIndexInformer interface {
SharedInformer
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
watchErrorHandler WatchErrorHandler
transform TransformFunc
}
sharedIndexInformer 的实现主要分为三部分,
- 使用了 Controller,Controller 上文已提过,其最主要的作用是创建 Refactor,然后不断循环调用 Process 函数。
- 实现了 Process 函数,也就是 HandleDelta 函数,而这个函数的核心还是 processDeltas 函数。
- 自定义了事件回调函数,这个事件回调函数在 processDeltas 函数中被调用。而自定义的事件回调函数会调用编程人员注册的多个事件回调函数。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
...
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
...
s.controller.Run(stopCh)
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
}
关于 SharedInformerFactory 则按照示例中提到的流程,讲解相应的源码:
-
首先来看创建 SharedInformerFactory 的三种方法,NewSharedInformerFactory 和 NewFilteredSharedInformerFactory 最终调用的都是 NewSharedInformerFactoryWithOptions 方法。
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) } func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
-
ForResource 方法则根据传入的 GVR 信息,返回相应的 genericInformer 对象,而在此期间会调用相应资源的 Informer 方法。
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { case v1.SchemeGroupVersion.WithResource("mutatingwebhookconfigurations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Admissionregistration().V1().MutatingWebhookConfigurations().Informer()}, nil case v1.SchemeGroupVersion.WithResource("validatingwebhookconfigurations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer()}, nil ... case corev1.SchemeGroupVersion.WithResource("pods"): return &genericInformer{resource: resource.GroupResource(), informer: f.Core().V1().Pods().Informer()}, nil } }
client-go 针对每种内置资源都实现了 Informer 机制,如下所示是针对 Pod 实现的 Informer 机制,
type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil) } func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *podInformer) Lister() v1.PodLister { return v1.NewPodLister(f.Informer().GetIndexer()) }
因此上述调用相应资源的 Informer() 方法,其实是调用 SharedInformerFactory 的 InformerFor() 方法,而该方法调用 newFunc() 函数生成一个 SharedIndexInformer 类型的 Informer 实例,并将其添加到 SharedInformerFactory 的 informers 结构体中。
如果是同类型资源的话,则会返回已注册在 factory 中的 Informer 实例。
SharedInformerFactory 中生成的 Informer 都是 SharedIndexInformer 类型的实例。
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
-
再来看下 SharedInformerFactory 的 Start() 方法。上述通过 ForResource() 方法已经往 factory 注册了 Informer 实例,用 informers 结构进行保存。而在 Start() 方法中就是遍历 informers,调用 Informer 实例的 Run() 方法,将注册到 factory 中的 Informer 实例进行启动。
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
相关链接
- (推荐)client-go 的正确打开方式:https://juejin.cn/post/7203690731276517432#heading-12
- Client-go 源码分析之 SharedInformer:https://mp.weixin.qq.com/s/13enj17ifaD-mSrjVzAQKw