程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 核心流程-Watch 流程

发表于 2023-04-25 | 分类于 Kubernetes | 0 | 阅读次数 1810

整体概述

API Server 的 Watch 机制主要依赖 etcd 的 Watch 机制,整体如下:

  • API Server 针对每种资源都会在本地维护一个 cacher,通过 etcd 提供的 watch 机制实时更新该 cacher 的内容。客户端针对资源的访问请求,都会优先访问该 cacher。
  • 每个 watch 请求都会在 API Server 中创建一个 cacheWatcher。API Server 在通过 etcd 的 watch 机制收到变更事件之后,会将这个事件发送给 cacher 的 incoming 通道。
  • cacher.dispatchEvents 从 incoming 通道读取事件,将事件再分发给 cacheWatcher 的 input 通道。
  • cacheWatcher 再通过 processInterval 将 input 通道中的事件,通过 result 通道发送给客户端。

源码

API Server 中的 list&watch

API Server 基于 etcd 的 watch 机制实现了 list&watch,将所有对象的最新状态和最近的事件都存放在 Cacher 中。客户端组件对资源的访问都会先经过 Cacher。Cacher 数据结构如下(仅保留了几个关键的子结构):

// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

type Cacher struct {
   // incoming 事件管道,通过该管道将事件发送给 watcher
   incoming chan watchCacheEvent
   
   // storage 的底层实现
   storage storage.Interface

   // 对象类型
   objectType reflect.Type

   // watchCache 滑动窗口,维护了当前 kind 的所有的资源,和一个基于滑动窗口的最近的事件数组
   watchCache *watchCache
   
   // reflector list&&watch etcd,并将事件和资源存到 watchCache 中
   reflector  *cache.Reflector
   
   // watchersBuffer 代表着所有 client-go 客户端跟 apiserver 的连接
   watchersBuffer []*cacheWatcher
   ....
}

创建 Cacher 的代码如下所示,在创建 cacher 的时候,

  • 也创建了 watchCache,将用于保存事件和所有资源。
  • 同时创建了 reflector,将用于执行对 etcd 的 list-watch,然后更新到 watchCache。
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from its internal cache and updating its cache in the background based on the given configuration.
func NewCacherFromConfig(config Config) (*Cacher, error) {
	...
	objType := reflect.TypeOf(obj)
	cacher := &Cacher{
		...
		incoming:              make(chan watchCacheEvent, 100),
		...
	}

	watchCache := newWatchCache(
		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
	listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix

	reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
  
	cacher.watchCache = watchCache
	cacher.reflector = reflector

	go cacher.dispatchEvents()
	
	go func() {
		defer cacher.stopWg.Done()
		defer cacher.terminateAllWatchers()
		wait.Until(
			func() {
				if !cacher.isStopped() {
					cacher.startCaching(stopCh)
				}
			}, time.Second, stopCh,
		)
	}()

	return cacher, nil
}

同时启动了两个 goroutine,

  • 一个 goroutine 主要运行 cacher.startCaching() 函数。

    这个函数的主要作用就是通过 list 方法获取 etcd 里面的资源,然后再通过 watch 方法 watch etcd 发送的事件,根据事件的类型,调用 watchCache 的 add、update 或 delete 方法。

    在 add 方法里面最终会调用 processEvent() 方法将收到的事件发送给 cacher 的 incoming。

    func (c *Cacher) processEvent(event *watchCacheEvent) {
    	...
    	c.incoming <- *event
    }
    
  • 一个 goroutine 负责运行 cacher.dispatchEvents() 函数。

    这个函数的主要作用就是从 cacher 的 incoming 中取出事件,然后遍历 watchersBuffer,将事件发送给每个 cacheWatcher 中的 input。

    func (c *Cacher) dispatchEvents() {
    	lastProcessedResourceVersion := uint64(0)
    	for {
    		select {
    		case event, ok := <-c.incoming:
    			if !ok {
    				return
    			}
    			has changed.
    			if event.Type != watch.Bookmark {
    				c.dispatchEvent(&event)
    			}
       	}
      }
      ...
    }
    
    func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
    	if event.Type == watch.Bookmark {
    		for _, watcher := range c.watchersBuffer {
    			watcher.nonblockingAdd(event)
    		}
      }
    }
    
    func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
    	select {
    	case c.input <- event:
    		c.markBookmarkAfterRvAsReceived(event)
    		return true
    	default:
    		return false
    	}
    }
    

Watch 请求->创建 cacheWatcher

API Server 并没有单独提供一个 watch 接口,而是在 list 接口的基础之上增加了一个 watch 参数。如果客户端在向 API Server 发起的 list 请求中带上了 watch=true 的参数,则表示发起 watch 请求。比如,

https://{host:6443}/apis/apps/v1/namespaces/default/deployments?watch=true

API Server 的 handler 在解析到 watch=true 之后,则会按 watch 请求的方式进行处理,如下所示,

// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go 

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
   return func(w http.ResponseWriter, req *http.Request) {
      ...
      if opts.Watch || forceWatch {
         if rw == nil {
           ...
           watcher, err := rw.Watch(ctx, &opts)
           ...
           metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
              serveWatch(watcher, scope, outputMediaType, req, w, timeout)
           })
           return
      	}
      ...
   		}
  }
}


对于 rw.Watch() 方法来说,最终会调用 Cacher 的 Watch 方法。

  • 该方法会首先创建一个 cacheWatcher。
  • 创建的 cacheWatcher 会以 goroutine 方式运行其 processInterval() 方法。该方法会不断的把 cacheWatcher.input channel 中的事件发送给 cacheWatcher.result channel。
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
	...
	watcher := newCacheWatcher(
		chanSize,
		filterWithAttrsFunction(key, pred),
		emptyFunc,
		c.versioner,
		deadline,
		pred.AllowWatchBookmarks,
		c.groupResource,
		identifier,
	)
  
	...

	go watcher.processInterval(ctx, cacheInterval, startWatchRV)
	return watcher, nil
}

func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
	...
	c.process(ctx, resourceVersion)
}

func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
	for {
		select {
		case event, ok := <-c.input:
			if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) {
				c.sendWatchCacheEvent(event)
			}
		case <-ctx.Done():
			return
		}
	}
}

func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
	...
  
	select {
	case c.result <- *watchEvent:
		c.markBookmarkAfterRvSent(event)
	case <-c.done:
	}
}

cacheWatcher 发送事件

创建 cacheWatcher 之后,会调用 serveWatch() 方法。

  • serveWatch() 方法会最终调用 ServeHTTP() 方法。
  • 在 ServeHTTP() 方法中,会从 cacheWatcher 的 result channel 中不断取出事件,转换为外部的 WatchEvent 格式,然后发送给客户端。
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
	...
	
	server := &WatchServer{
		Watching: watcher,
		...
	}

	server.ServeHTTP(w, req)
}

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	...
  flusher, ok := w.(http.Flusher)
  framer := s.Framer.NewFrameWriter(w)
  e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
  
	ch := s.Watching.ResultChan()
	
	for {
		select {
		case <-s.ServerShuttingDownCh:
			return
		case <-done:
			return
		case <-timeoutCh:
			return
		case event, ok := <-ch:
			...
			*internalEvent = metav1.InternalEvent(event)
			err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
			
			if err := e.Encode(outEvent); err != nil {
				return
			}
			if len(ch) == 0 {
				flusher.Flush()
			}

			buf.Reset()
		}
	}
}

相关链接

K8s apiserver watch 机制浅析:https://mp.weixin.qq.com/s/jp9uVNyd8jyz6dwT_niZuA

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/256
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 核心流程- Pod 删除过程
Kubernetes 客户端-client go 介绍
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%