程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 核心流程-Pod 创建过程

发表于 2023-04-11 | 分类于 Kubernetes | 0 | 阅读次数 2466

基础

整体 Pod 创建的核心流程如下,这里只考虑在 kubelet 中的情况。

  1. kubelet 收到 Pod 被创建的事件,syncLoop 中按照创建逻辑添加该 Pod。

  2. 将 Pod 加入 podManager。podManager 子模块负责管理这台机器上的 pod 信息,如果 podManager 中找不到某个 pod,则认为这个 pod 被删除了。

  3. 执行 Pod 创建需要的准备工作。

    • 创建 CGROUp、创建 mirror Pod(如果是 static pod)、创建 Pod 的数据目录,存放 volume 和 plugin 信息。
    • 等待 Pod 所需要的所有外挂的 volume 都准备好。
    • 启动 goroutine 定期对 Pod 进行探针检查。
  4. 创建 Pod。

    • 计算 sandbox 和 container 是否发生了变化。如果发生了变化,则清理掉发生变化了的 sandbox 和 container。这一步主要是针对重启的情况进行处理,获取当前正在运行的 Pod 情况与新建 Pod 进行对比。对于完全新建的 Pod 来说,这步可以略过。

    • 创建 sandbox。

    • 创建 ephemeral containers。

    • 按顺序创建 init containers。

    • 创建 normal containers,也就是业务容器。具体创建容器的过程如下,其中 ephemeral container、init container 也都是该过程。

      • 拉取镜像。
      • 创建容器。
      • 启动容器。
      • 执行 post start hook。

源码分析

之前已介绍过 Kubelet 侧的整体代码,Kubelet 在 syncLoop 中根据不同的事件源根据事件的类型进行不同的逻辑处理。对于创建 Pod 来说,调用的是 HandlePodAdditions() 方法。

在 HandlePodAdditions() 中,

  1. 把所有的 Pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理。
  2. 把 Pod 加入到 podManager 中,podManager 子模块负责管理这台机器上的 pod 信息,pod 和 mirrorPod 之间的对应关系等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 pod,则认为这个 pod 被删除了。
  3. 如果是 mirror pod 则单独进行处理。static pod 完全是由 kubelet 进程来监管,并在它异常时负责重启,APIServer 并不会感知到 static pod 的存在。kubelet 会通过 APIServer 为每一个 static pod 创建一个对应的 mirror pod,如此一来就可以通过 kubectl 命令查看对应的 pod,并且可以通过 kubectl logs 命令直接查看到 static pod 的日志信息。
  4. 验证 Pod 是否能在该节点上运行,如果不可以则直接拒绝。
  5. 调用 podWorkers.UpdatePod() 进一步进行处理。
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()
		kl.podManager.AddPod(pod)

		pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
		if wasMirror {
      ...
			kl.podWorkers.UpdatePod(UpdatePodOptions{
				Pod:        pod,
				MirrorPod:  mirrorPod,
				UpdateType: kubetypes.SyncPodUpdate,
				StartTime:  start,
			})
			continue
		}

		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
      // Check if we can admit the pod; if not, reject it.
      if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
        kl.rejectPod(pod, reason, message)
        continue
      }
		}
    
		kl.podWorkers.UpdatePod(UpdatePodOptions{
			Pod:        pod,
			MirrorPod:  mirrorPod,
			UpdateType: kubetypes.SyncPodCreate,
			StartTime:  start,
		})
	}
}

接下去的调用链如下,其中在 podWorkers.UpdatePod() 方法中,

  • 会为每个 Pod 单独创建一个 goroutine,运行 podWorkerLoop() 函数。podWorkerLoop() 函数会被传入一个 channel,podWorkerLoop() 会阻塞等待 channel 中的事件。
  • 之后 podWorkers 则会往 channel 中发送事件。
podWorkers.UpdatePod()->podWorkerLoop()->Kubelet.SyncPod()

Kubelet.SyncPod() 方法中,

  • 先是完成 Pod 实体(即容器)创建需要的准备工作。
    • 通过 kubelet.statusManager 同步 podStatus。
    • 创建 containerManagar 对象,并且创建 Pod level cgroup,更新 Qos level cgroup。
    • 如果是 static Pod,就创建或者更新对应的 mirrorPod。
    • 创建 Pod 的数据目录,存放 volume 和 plugin 信息。
    • 调用 kubelet.volumeManager 组件,等待它将 Pod 所需要的所有外挂的 volume 都准备好(volumeManager 会在后台做这些事情)。
    • 如果有 image secrets,获取对应的 secrets 数据。
    • 在 probeManager 中添加 Pod,如果 Pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测。
  • 调用 kubeGenericRuntimeManager.SyncPod 方法,进行真正的容器创建。
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
	...

	kl.statusManager.SetPodStatus(pod, apiPodStatus)
	pcm := kl.containerManager.NewPodContainerManager()
	if err := kl.makePodDataDirs(pod); err != nil {
		return false, err
	}

	// Wait for volumes to attach/mount
	if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
		return false, err
	}

	// Fetch the pull secrets for the pod
	pullSecrets := kl.getPullSecretsForPod(pod)

	// Ensure the pod is being probed
	kl.probeManager.AddPod(pod)

	result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff)
  ...

	return false, nil
}

syncPod 主要执行以下几个操作:

  1. 计算 sandbox 和 container 是否发生了变化。
  2. 如果发生了变化,则清理掉发生变化了的 sandbox 和 container。
  3. 创建 sandbox。
  4. 创建 ephemeral containers。
  5. 创建 init containers。init containers 可以有多个,多个 init containers 严格按照顺序启动,只有前一个 init container 启动完成之后,才开始启动下一个 container。
  6. 调整正在运行的容器的资源用量(InPlacePodVerticalScaling 特性:用于支持在不重新调度Pod到不同节点的情况下,动态地调整已运行Pod内部容器资源(如 CPU 和内存)的请求和限制)
  7. 创建 normal containers,也就是业务容器。
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(ctx, pod, podStatus)

	// Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
		if podContainerChanges.CreateSandbox {
			m.purgeInitContainers(ctx, pod, podStatus)
		}
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
				return
			}
		}
	}

	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		...
		podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
	}

	// Step 5: start ephemeral containers
	for _, idx := range podContainerChanges.EphemeralContainersToStart {
		start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
	}

  // Step 6: start init containers.
  for _, idx := range podContainerChanges.InitContainersToStart {
    container := &pod.Spec.InitContainers[idx]
    // Start the next init container.
    if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
      return
    }
  }	

	// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
	if isInPlacePodVerticalScalingAllowed(pod) {
		if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
			m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
		}
	}

	// Step 8: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
	}

	return
}

对于 ephemeral container、init container、normal container 来说,都是调用 start()->startContainer() 创建并启动容器 。整个顺序如下所示,

  1. 拉取镜像。
  2. 创建容器。
  3. 启动容器。
  4. 执行 post start hook。
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	container := spec.container

	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)

	// Step 2: create the container.
	...
	containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
  
	// Step 3: start the container.
	err = m.runtimeService.StartContainer(ctx, containerID)

	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
		...
	}

	return "", nil
}

相关链接

kubelet 创建 pod 的流程:https://cloud.tencent.com/developer/article/1557556

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/229
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 核心流程-Exec 流程
Kubernetes 核心流程-Pod 驱逐过程
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%