程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes Kubelet-整体流程

发表于 2024-01-03 | 分类于 Kubernetes | 0 | 阅读次数 2757

整体流程

这篇文章将介绍 Kubelet 的整体流程,然后再对源码进行简单梳理。希望通过这篇文章,可以对 Kubelet 先有个整体认识。

kubelet 整体的启动过程如下,

  • 首先设置 listers,注册 kubelet 所关心的资源的 Informer,这里 kubelet 只监听了与自己相关 Pod 对象的变化,也就是 Watch 的过滤条件是该 Pod 的 nodeName 字段与自己相同的。Informer 所产生的事件用于驱动后续的 SyncLoop。

  • 启动 PLEG,PLEG 会定期通过容器运行时获取节点上 Pod 的状态,与其缓存中的 Pod 信息进行比较,封装成事件。这些事件也用于驱动后续的 SyncLoop。

  • 接着启动各个组件,比如 Volume Manager、Node Status Manager、CPU Manager 等。这些组件都以控制器模式运行,负责完成某项具体职责。比如,

    • Volume Manager 负责处理 Volume 卸载和挂载。
    • Node Status Manager 负责将 Node 的状态收集起来,并通过 Heartbeat 的方式上报给 APIServer。
    • CPU Manager,负责维护该 Node 的 CPU 核的信息,以便在 Pod 通过 cpuset 的方式请求 CPU 核的时候,能够正确地管理 CPU 核的使用量和可用量。
  • 最后也是最重要的是启动一个循环:SyncLoop。SyncLoop 是整个 Kubelet 的核心,驱动这个循环运行的事件,有 Pod 更新事件(来自 APIServer 或本地文件变化)、Pod 生命周期变化(来自 PLEG)、kubelet 本身设置的执行周期、定时的清理事件、Pod 中容器探针检测的结果。

    需要注意的是,SyncLoop 要求是绝对不可以被阻塞,因此凡是在 kubelet 里有可能会耗费大量时间的操作,比如准备 Pod 的 Volume、拉取镜像等,SyncLoop 都会开启单独的 Goroutine 来进行操作。

因此,它整体的工作原理就是启动多个组件,监听 API Server/本地文件、监控容器运行时中容器的情况(PLEG)。这些都会产生事件,然后在 SyncLoop 中根据这些事件来进行不同的处理逻辑,因此可以说 kubelet 本身也是按照“控制器”模型进行工作的。以创建了一个 Pod 为例,介绍下过程,

  • 当创建了一个 Pod 并完成了调度之后,kubelet 会收到 Pod 的更新事件。

  • Syncloop 根据这个时间调用 HandlePods 进行处理。

    • 如果是一个新调度的 Pod,就会执行 HandlePods 中的 ADD 事件对应的处理逻辑。HandlePods 会为这个新的 Pod 生成对应的 Pod Status,检查 Pod 所声明使用的 Volume 是不是已经准备好。然后调用 CRI 接口创建 Pod 及其所定义的容器。

    • 如果是 UPDATE 事件,kubelet 就会根据 Pod 对象具体的变更情况,调用 CRI 接口重建容器。

  • 接着以 gRPC 服务方式运行的 CRI shim 会收到相应的 CRI 请求。CRI shim 会把 CRI 请求里面的内容取出来,然后调用容器运行时提供的 API,比如 containerd 提供的 API。

  • 容器运行时会根据自己的实现创建容器。以 containerd 为例,它会创建 runC 容器。在 runC 中,它会启动容器要执行的程序,并设置容器的 Namespace、Cgroups 和 chroot 等。

总结

  • kubelet 中关键的两个点是:SyncLoop 和 CRI。前者是 kubelet 自身逻辑运行的核心,后者是与底层容器运行时交互的核心。

  • kubelet 是 kubernetes 中第二个不可被替代的组件,第一个不可被替代的组件当然是 kube-apiserver。也就是说,无论如何,都不太建议对 kubelet 的代码进行大量的改动。保持 kubelet 跟上游基本一致的重要性,就跟保持 kube-apiserver 跟上游一致是一个道理。

  • kubelet 中的 cAdvisor 组件是通过读取 sys cgroup 或者 /proc 下的状态信息来得到 container cpu/mem/net/io metrics 的。

整体源码

kubelet 的入口处代码,如下所示,cli.Run 最终运行的是 command 中的 RunE。因此,从 NewKubeletCommand 中关于 RunE 部分的开始看即可。

// cmd/kubelet/kubelet.go
func main() {
	command := app.NewKubeletCommand()
	code := cli.Run(command)
	os.Exit(code)
}

从 NewKubeletCommand() 到创建和启动 kubelet 环节,调用链路大概如下,

app.NewKubeletCommand()->app.Run()->app.run()->app.RunKubelet()

在 app.RunKubelet() 中,

  • 首先是调用 createAndInitKubelet() 创建并初始化 kubelet,包括创建 kubelet 的各种组件,如下

  • 接着调用 startKubelet() 方法启动各个组件。同时在 kubelet 内部启动一个 http server,提供 metrics、exec 等 http api。

// RunKubelet is responsible for setting up and running a kubelet.
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	hostnameOverridden := len(kubeServer.HostnameOverride) > 0
	nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs))
  
	k, err := createAndInitKubelet(kubeServer,
		kubeDeps,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs)

	...
  
  startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
  klog.InfoS("Started kubelet")
	
  return nil
}

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
	}
	...
}

Kubelet.Run()

kubelet.Run() 方法中,

  • 启动 Volume Manager,负责处理 Volume 卸载和挂载。
  • 启动 syncNodeStatus,负责更新节点自身的状态。
  • 启动 Node Lease Controller,负责更新节点租约。
  • 启动 Status Manager,负责向 APIServer 更新 Pod 状态。
  • 启动 PLEG,持续从底层容器运行时中获取 Pod/容器的状态,并于 kubelet 本地 cache 进行比较,生成对应的 Event。
  • 启动 SyncLoop,持续监控并处理来自文件、APIServer、PLEG 的变更事件,包括 Pod 的增加、更新、删除等。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	...
	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start two go-routines to update the status.
		go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)

		// start syncing lease
		go kl.nodeLeaseController.Run(context.Background())
	}

	// Start component sync loops.
	kl.statusManager.Start()

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
  
	kl.syncLoop(ctx, updates, kl)
}

接下去来看下 syncLoop() 的实现:整体的流程是从多个 channel 中读取事件,然后处理这个事件。

  • 支持的几个 channel 如下,也就是事件的来源,
    • configChannel:Pod 的变更事件,从 APIServer 或者本地文件(静态 Pod)获取而来。
    • plegChannel:PLEG 产生的事件。
    • syncChannel:定期的更新事件。
    • housekeepingChannel:定期的清理事件,清理不再需要的资源,例如停止运行的容器实例以及它们占用的磁盘空间、无效的镜像和临时文件等。
    • healthChannel:容器的健康检查失败。
  • SyncHandler:具体的对 Pod 进行处理的 handler。
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	syncTicker := time.NewTicker(time.Second)
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	plegCh := kl.pleg.Watch()
  
	for {
		if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		...
	}
}

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			handler.HandlePodUpdates(u.Pods)
		}

	case e := <-plegCh:
		if isSyncPodWorthy(e) {
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				handler.HandlePodSyncs([]*v1.Pod{pod})
			}
		}

		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
  
	case <-syncCh:
		podsToSync := kl.getPodsToSync()
		handler.HandlePodSyncs(podsToSync)
    ...
  case update := <-kl.livenessManager.Updates():
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
    
	case update := <-kl.readinessManager.Updates():
		ready := update.Result == proberesults.Success
		handleProbeSync(kl, update, handler, "readiness", status)
    
	case update := <-kl.startupManager.Updates():
		started := update.Result == proberesults.Success
		handleProbeSync(kl, update, handler, "startup", status)
    
	case <-housekeepingCh:
			if err := handler.HandlePodCleanups(ctx); err != nil {
				klog.ErrorS(err, "Failed cleaning pods")
			}
			...
	}
	return true
}

Kubelet.ListenAndServe()

接下去相关的调用链路是,

Kubelet.ListenAndServe()->server.ListenAndServeKubeletServer()->server.NewServer()->ListenAndServe()

在这个调用链路中,核心的是 server.NewServer(),它会注册各种 handler

  • InstallDefaultHandlers() 注册默认的 handler,比如 health 探测、metrics、stats 等。
  • InstallDebuggingHandlers() 注册 debug 相关的 handler,比如 exec、attach 等。
func NewServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, tp oteltrace.TracerProvider, kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
	...
  server.InstallDefaultHandlers()
	if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
		server.InstallDebuggingHandlers()
		server.InstallSystemLogHandler(kubeCfg.EnableSystemLogHandler, kubeCfg.EnableSystemLogQuery)
		server.InstallProfilingHandler(kubeCfg.EnableProfilingHandler, kubeCfg.EnableContentionProfiling)
		server.InstallDebugFlagsHandler(kubeCfg.EnableDebugFlagsHandler)
	} else {
		server.InstallDebuggingDisabledHandlers()
	}
	return server
}

func (s *Server) InstallDefaultHandlers() {
	healthz.InstallHandler(s.restfulCont,
		healthz.PingHealthz,
		healthz.LogHealthz,
		healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
	)

	ws := new(restful.WebService)
	ws.Path("/pods").Produces(restful.MIME_JSON)
	s.restfulCont.Add(ws)

	s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer))
	s.restfulCont.Handle(metricsPath, legacyregistry.Handler())

	includedMetrics := cadvisormetrics.MetricSet{
		cadvisormetrics.CpuUsageMetrics:     struct{}{},
		cadvisormetrics.MemoryUsageMetrics:  struct{}{},
		cadvisormetrics.CpuLoadMetrics:      struct{}{},
		cadvisormetrics.DiskIOMetrics:       struct{}{},
		cadvisormetrics.DiskUsageMetrics:    struct{}{},
		cadvisormetrics.NetworkUsageMetrics: struct{}{},
		cadvisormetrics.AppMetrics:          struct{}{},
		cadvisormetrics.ProcessMetrics:      struct{}{},
		cadvisormetrics.OOMMetrics:          struct{}{},
	}
	// cAdvisor metrics are exposed under the secured handler as well
	r := compbasemetrics.NewKubeRegistry()
	r.RawMustRegister(metrics.NewPrometheusMachineCollector(prometheusHostAdapter{s.host}, includedMetrics))
	...
}

func (s *Server) InstallDebuggingHandlers() {
	ws := new(restful.WebService)
	ws.Path("/run")

	ws = new(restful.WebService)
	ws.Path("/exec")

	ws = new(restful.WebService)
	ws.Path("/attach")

	ws = new(restful.WebService)
	ws.Path("/portForward")
  
	ws = new(restful.WebService)
	ws.Path("/containerLogs")

	ws = new(restful.WebService)
	ws.Path("/runningpods/").Produces(restful.MIME_JSON)
  ...
}

相关链接

kubelet 创建 pod 的流程:https://cloud.tencent.com/developer/article/1557556

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/254
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes Kubelet-PLEG 模块
Kubernetes APIServer-整体源码
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%