整体流程
这篇文章将介绍 Kubelet 的整体流程,然后再对源码进行简单梳理。希望通过这篇文章,可以对 Kubelet 先有个整体认识。
kubelet 整体的启动过程如下,
-
首先设置 listers,注册 kubelet 所关心的资源的 Informer,这里 kubelet 只监听了与自己相关 Pod 对象的变化,也就是 Watch 的过滤条件是该 Pod 的 nodeName 字段与自己相同的。Informer 所产生的事件用于驱动后续的 SyncLoop。
-
启动 PLEG,PLEG 会定期通过容器运行时获取节点上 Pod 的状态,与其缓存中的 Pod 信息进行比较,封装成事件。这些事件也用于驱动后续的 SyncLoop。
-
接着启动各个组件,比如 Volume Manager、Node Status Manager、CPU Manager 等。这些组件都以控制器模式运行,负责完成某项具体职责。比如,
- Volume Manager 负责处理 Volume 卸载和挂载。
- Node Status Manager 负责将 Node 的状态收集起来,并通过 Heartbeat 的方式上报给 APIServer。
- CPU Manager,负责维护该 Node 的 CPU 核的信息,以便在 Pod 通过 cpuset 的方式请求 CPU 核的时候,能够正确地管理 CPU 核的使用量和可用量。
-
最后也是最重要的是启动一个循环:SyncLoop。SyncLoop 是整个 Kubelet 的核心,驱动这个循环运行的事件,有 Pod 更新事件(来自 APIServer 或本地文件变化)、Pod 生命周期变化(来自 PLEG)、kubelet 本身设置的执行周期、定时的清理事件、Pod 中容器探针检测的结果。
需要注意的是,SyncLoop 要求是绝对不可以被阻塞,因此凡是在 kubelet 里有可能会耗费大量时间的操作,比如准备 Pod 的 Volume、拉取镜像等,SyncLoop 都会开启单独的 Goroutine 来进行操作。
因此,它整体的工作原理就是启动多个组件,监听 API Server/本地文件、监控容器运行时中容器的情况(PLEG)。这些都会产生事件,然后在 SyncLoop 中根据这些事件来进行不同的处理逻辑,因此可以说 kubelet 本身也是按照“控制器”模型进行工作的。以创建了一个 Pod 为例,介绍下过程,
-
当创建了一个 Pod 并完成了调度之后,kubelet 会收到 Pod 的更新事件。
-
Syncloop 根据这个时间调用 HandlePods 进行处理。
-
如果是一个新调度的 Pod,就会执行 HandlePods 中的 ADD 事件对应的处理逻辑。HandlePods 会为这个新的 Pod 生成对应的 Pod Status,检查 Pod 所声明使用的 Volume 是不是已经准备好。然后调用 CRI 接口创建 Pod 及其所定义的容器。
-
如果是 UPDATE 事件,kubelet 就会根据 Pod 对象具体的变更情况,调用 CRI 接口重建容器。
-
-
接着以 gRPC 服务方式运行的 CRI shim 会收到相应的 CRI 请求。CRI shim 会把 CRI 请求里面的内容取出来,然后调用容器运行时提供的 API,比如 containerd 提供的 API。
-
容器运行时会根据自己的实现创建容器。以 containerd 为例,它会创建 runC 容器。在 runC 中,它会启动容器要执行的程序,并设置容器的 Namespace、Cgroups 和 chroot 等。
总结
-
kubelet 中关键的两个点是:SyncLoop 和 CRI。前者是 kubelet 自身逻辑运行的核心,后者是与底层容器运行时交互的核心。
-
kubelet 是 kubernetes 中第二个不可被替代的组件,第一个不可被替代的组件当然是 kube-apiserver。也就是说,无论如何,都不太建议对 kubelet 的代码进行大量的改动。保持 kubelet 跟上游基本一致的重要性,就跟保持 kube-apiserver 跟上游一致是一个道理。
-
kubelet 中的 cAdvisor 组件是通过读取 sys cgroup 或者 /proc 下的状态信息来得到 container cpu/mem/net/io metrics 的。
整体源码
kubelet 的入口处代码,如下所示,cli.Run 最终运行的是 command 中的 RunE。因此,从 NewKubeletCommand 中关于 RunE 部分的开始看即可。
// cmd/kubelet/kubelet.go
func main() {
command := app.NewKubeletCommand()
code := cli.Run(command)
os.Exit(code)
}
从 NewKubeletCommand() 到创建和启动 kubelet 环节,调用链路大概如下,
app.NewKubeletCommand()->app.Run()->app.run()->app.RunKubelet()
在 app.RunKubelet() 中,
-
首先是调用 createAndInitKubelet() 创建并初始化 kubelet,包括创建 kubelet 的各种组件,如下
-
接着调用 startKubelet() 方法启动各个组件。同时在 kubelet 内部启动一个 http server,提供 metrics、exec 等 http api。
// RunKubelet is responsible for setting up and running a kubelet.
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
hostnameOverridden := len(kubeServer.HostnameOverride) > 0
nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs))
k, err := createAndInitKubelet(kubeServer,
kubeDeps,
hostname,
hostnameOverridden,
nodeName,
nodeIPs)
...
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
return nil
}
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
...
}
Kubelet.Run()
kubelet.Run() 方法中,
- 启动 Volume Manager,负责处理 Volume 卸载和挂载。
- 启动 syncNodeStatus,负责更新节点自身的状态。
- 启动 Node Lease Controller,负责更新节点租约。
- 启动 Status Manager,负责向 APIServer 更新 Pod 状态。
- 启动 PLEG,持续从底层容器运行时中获取 Pod/容器的状态,并于 kubelet 本地 cache 进行比较,生成对应的 Event。
- 启动 SyncLoop,持续监控并处理来自文件、APIServer、PLEG 的变更事件,包括 Pod 的增加、更新、删除等。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start two go-routines to update the status.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
}
// Start component sync loops.
kl.statusManager.Start()
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(ctx, updates, kl)
}
接下去来看下 syncLoop() 的实现:整体的流程是从多个 channel 中读取事件,然后处理这个事件。
- 支持的几个 channel 如下,也就是事件的来源,
- configChannel:Pod 的变更事件,从 APIServer 或者本地文件(静态 Pod)获取而来。
- plegChannel:PLEG 产生的事件。
- syncChannel:定期的更新事件。
- housekeepingChannel:定期的清理事件,清理不再需要的资源,例如停止运行的容器实例以及它们占用的磁盘空间、无效的镜像和临时文件等。
- healthChannel:容器的健康检查失败。
- SyncHandler:具体的对 Pod 进行处理的 handler。
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
plegCh := kl.pleg.Watch()
for {
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
...
}
}
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
handler.HandlePodUpdates(u.Pods)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
handler.HandlePodSyncs([]*v1.Pod{pod})
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
podsToSync := kl.getPodsToSync()
handler.HandlePodSyncs(podsToSync)
...
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if err := handler.HandlePodCleanups(ctx); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
...
}
return true
}
Kubelet.ListenAndServe()
接下去相关的调用链路是,
Kubelet.ListenAndServe()->server.ListenAndServeKubeletServer()->server.NewServer()->ListenAndServe()
在这个调用链路中,核心的是 server.NewServer(),它会注册各种 handler
- InstallDefaultHandlers() 注册默认的 handler,比如 health 探测、metrics、stats 等。
- InstallDebuggingHandlers() 注册 debug 相关的 handler,比如 exec、attach 等。
func NewServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, tp oteltrace.TracerProvider, kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
...
server.InstallDefaultHandlers()
if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
server.InstallDebuggingHandlers()
server.InstallSystemLogHandler(kubeCfg.EnableSystemLogHandler, kubeCfg.EnableSystemLogQuery)
server.InstallProfilingHandler(kubeCfg.EnableProfilingHandler, kubeCfg.EnableContentionProfiling)
server.InstallDebugFlagsHandler(kubeCfg.EnableDebugFlagsHandler)
} else {
server.InstallDebuggingDisabledHandlers()
}
return server
}
func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.restfulCont,
healthz.PingHealthz,
healthz.LogHealthz,
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
)
ws := new(restful.WebService)
ws.Path("/pods").Produces(restful.MIME_JSON)
s.restfulCont.Add(ws)
s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer))
s.restfulCont.Handle(metricsPath, legacyregistry.Handler())
includedMetrics := cadvisormetrics.MetricSet{
cadvisormetrics.CpuUsageMetrics: struct{}{},
cadvisormetrics.MemoryUsageMetrics: struct{}{},
cadvisormetrics.CpuLoadMetrics: struct{}{},
cadvisormetrics.DiskIOMetrics: struct{}{},
cadvisormetrics.DiskUsageMetrics: struct{}{},
cadvisormetrics.NetworkUsageMetrics: struct{}{},
cadvisormetrics.AppMetrics: struct{}{},
cadvisormetrics.ProcessMetrics: struct{}{},
cadvisormetrics.OOMMetrics: struct{}{},
}
// cAdvisor metrics are exposed under the secured handler as well
r := compbasemetrics.NewKubeRegistry()
r.RawMustRegister(metrics.NewPrometheusMachineCollector(prometheusHostAdapter{s.host}, includedMetrics))
...
}
func (s *Server) InstallDebuggingHandlers() {
ws := new(restful.WebService)
ws.Path("/run")
ws = new(restful.WebService)
ws.Path("/exec")
ws = new(restful.WebService)
ws.Path("/attach")
ws = new(restful.WebService)
ws.Path("/portForward")
ws = new(restful.WebService)
ws.Path("/containerLogs")
ws = new(restful.WebService)
ws.Path("/runningpods/").Produces(restful.MIME_JSON)
...
}
相关链接
kubelet 创建 pod 的流程:https://cloud.tencent.com/developer/article/1557556