整体概述
API Server 的 Watch 机制主要依赖 etcd 的 Watch 机制,整体如下:
- API Server 针对每种资源都会在本地维护一个 cacher,通过 etcd 提供的 watch 机制实时更新该 cacher 的内容。客户端针对资源的访问请求,都会优先访问该 cacher。
- 每个 watch 请求都会在 API Server 中创建一个 cacheWatcher。API Server 在通过 etcd 的 watch 机制收到变更事件之后,会将这个事件发送给 cacher 的 incoming 通道。
- cacher.dispatchEvents 从 incoming 通道读取事件,将事件再分发给 cacheWatcher 的 input 通道。
- cacheWatcher 再通过 processInterval 将 input 通道中的事件,通过 result 通道发送给客户端。
源码
API Server 中的 list&watch
API Server 基于 etcd 的 watch 机制实现了 list&watch,将所有对象的最新状态和最近的事件都存放在 Cacher 中。客户端组件对资源的访问都会先经过 Cacher。Cacher 数据结构如下(仅保留了几个关键的子结构):
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
// incoming 事件管道,通过该管道将事件发送给 watcher
incoming chan watchCacheEvent
// storage 的底层实现
storage storage.Interface
// 对象类型
objectType reflect.Type
// watchCache 滑动窗口,维护了当前 kind 的所有的资源,和一个基于滑动窗口的最近的事件数组
watchCache *watchCache
// reflector list&&watch etcd,并将事件和资源存到 watchCache 中
reflector *cache.Reflector
// watchersBuffer 代表着所有 client-go 客户端跟 apiserver 的连接
watchersBuffer []*cacheWatcher
....
}
创建 Cacher 的代码如下所示,在创建 cacher 的时候,
- 也创建了 watchCache,将用于保存事件和所有资源。
- 同时创建了 reflector,将用于执行对 etcd 的 list-watch,然后更新到 watchCache。
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from its internal cache and updating its cache in the background based on the given configuration.
func NewCacherFromConfig(config Config) (*Cacher, error) {
...
objType := reflect.TypeOf(obj)
cacher := &Cacher{
...
incoming: make(chan watchCacheEvent, 100),
...
}
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
cacher.watchCache = watchCache
cacher.reflector = reflector
go cacher.dispatchEvents()
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
return cacher, nil
}
同时启动了两个 goroutine,
-
一个 goroutine 主要运行 cacher.startCaching() 函数。
这个函数的主要作用就是通过 list 方法获取 etcd 里面的资源,然后再通过 watch 方法 watch etcd 发送的事件,根据事件的类型,调用 watchCache 的 add、update 或 delete 方法。
在 add 方法里面最终会调用 processEvent() 方法将收到的事件发送给 cacher 的 incoming。
func (c *Cacher) processEvent(event *watchCacheEvent) { ... c.incoming <- *event }
-
一个 goroutine 负责运行 cacher.dispatchEvents() 函数。
这个函数的主要作用就是从 cacher 的 incoming 中取出事件,然后遍历 watchersBuffer,将事件发送给每个 cacheWatcher 中的 input。
func (c *Cacher) dispatchEvents() { lastProcessedResourceVersion := uint64(0) for { select { case event, ok := <-c.incoming: if !ok { return } has changed. if event.Type != watch.Bookmark { c.dispatchEvent(&event) } } } ... } func (c *Cacher) dispatchEvent(event *watchCacheEvent) { if event.Type == watch.Bookmark { for _, watcher := range c.watchersBuffer { watcher.nonblockingAdd(event) } } } func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { select { case c.input <- event: c.markBookmarkAfterRvAsReceived(event) return true default: return false } }
Watch 请求->创建 cacheWatcher
API Server 并没有单独提供一个 watch 接口,而是在 list 接口的基础之上增加了一个 watch 参数。如果客户端在向 API Server 发起的 list 请求中带上了 watch=true 的参数,则表示发起 watch 请求。比如,
https://{host:6443}/apis/apps/v1/namespaces/default/deployments?watch=true
API Server 的 handler 在解析到 watch=true 之后,则会按 watch 请求的方式进行处理,如下所示,
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
if opts.Watch || forceWatch {
if rw == nil {
...
watcher, err := rw.Watch(ctx, &opts)
...
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
...
}
}
}
对于 rw.Watch() 方法来说,最终会调用 Cacher 的 Watch 方法。
- 该方法会首先创建一个 cacheWatcher。
- 创建的 cacheWatcher 会以 goroutine 方式运行其 processInterval() 方法。该方法会不断的把 cacheWatcher.input channel 中的事件发送给 cacheWatcher.result channel。
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
...
watcher := newCacheWatcher(
chanSize,
filterWithAttrsFunction(key, pred),
emptyFunc,
c.versioner,
deadline,
pred.AllowWatchBookmarks,
c.groupResource,
identifier,
)
...
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
}
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
...
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
for {
select {
case event, ok := <-c.input:
if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
}
}
}
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
...
select {
case c.result <- *watchEvent:
c.markBookmarkAfterRvSent(event)
case <-c.done:
}
}
cacheWatcher 发送事件
创建 cacheWatcher 之后,会调用 serveWatch() 方法。
- serveWatch() 方法会最终调用 ServeHTTP() 方法。
- 在 ServeHTTP() 方法中,会从 cacheWatcher 的 result channel 中不断取出事件,转换为外部的 WatchEvent 格式,然后发送给客户端。
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
...
server := &WatchServer{
Watching: watcher,
...
}
server.ServeHTTP(w, req)
}
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
...
flusher, ok := w.(http.Flusher)
framer := s.Framer.NewFrameWriter(w)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
ch := s.Watching.ResultChan()
for {
select {
case <-s.ServerShuttingDownCh:
return
case <-done:
return
case <-timeoutCh:
return
case event, ok := <-ch:
...
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
if err := e.Encode(outEvent); err != nil {
return
}
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}
相关链接
K8s apiserver watch 机制浅析:https://mp.weixin.qq.com/s/jp9uVNyd8jyz6dwT_niZuA