基础
整体 Pod 创建的核心流程如下,这里只考虑在 kubelet 中的情况。
-
kubelet 收到 Pod 被创建的事件,syncLoop 中按照创建逻辑添加该 Pod。
-
将 Pod 加入 podManager。podManager 子模块负责管理这台机器上的 pod 信息,如果 podManager 中找不到某个 pod,则认为这个 pod 被删除了。
-
执行 Pod 创建需要的准备工作。
- 创建 CGROUp、创建 mirror Pod(如果是 static pod)、创建 Pod 的数据目录,存放 volume 和 plugin 信息。
- 等待 Pod 所需要的所有外挂的 volume 都准备好。
- 启动 goroutine 定期对 Pod 进行探针检查。
-
创建 Pod。
-
计算 sandbox 和 container 是否发生了变化。如果发生了变化,则清理掉发生变化了的 sandbox 和 container。这一步主要是针对重启的情况进行处理,获取当前正在运行的 Pod 情况与新建 Pod 进行对比。对于完全新建的 Pod 来说,这步可以略过。
-
创建 sandbox。
-
创建 ephemeral containers。
-
按顺序创建 init containers。
-
创建 normal containers,也就是业务容器。具体创建容器的过程如下,其中 ephemeral container、init container 也都是该过程。
- 拉取镜像。
- 创建容器。
- 启动容器。
- 执行 post start hook。
-
源码分析
之前已介绍过 Kubelet 侧的整体代码,Kubelet 在 syncLoop 中根据不同的事件源根据事件的类型进行不同的逻辑处理。对于创建 Pod 来说,调用的是 HandlePodAdditions() 方法。
在 HandlePodAdditions() 中,
- 把所有的 Pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理。
- 把 Pod 加入到 podManager 中,podManager 子模块负责管理这台机器上的 pod 信息,pod 和 mirrorPod 之间的对应关系等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 pod,则认为这个 pod 被删除了。
- 如果是 mirror pod 则单独进行处理。static pod 完全是由 kubelet 进程来监管,并在它异常时负责重启,APIServer 并不会感知到 static pod 的存在。kubelet 会通过 APIServer 为每一个 static pod 创建一个对应的 mirror pod,如此一来就可以通过 kubectl 命令查看对应的 pod,并且可以通过 kubectl logs 命令直接查看到 static pod 的日志信息。
- 验证 Pod 是否能在该节点上运行,如果不可以则直接拒绝。
- 调用 podWorkers.UpdatePod() 进一步进行处理。
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
...
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
StartTime: start,
})
}
}
接下去的调用链如下,其中在 podWorkers.UpdatePod() 方法中,
- 会为每个 Pod 单独创建一个 goroutine,运行 podWorkerLoop() 函数。podWorkerLoop() 函数会被传入一个 channel,podWorkerLoop() 会阻塞等待 channel 中的事件。
- 之后 podWorkers 则会往 channel 中发送事件。
podWorkers.UpdatePod()->podWorkerLoop()->Kubelet.SyncPod()
Kubelet.SyncPod() 方法中,
- 先是完成 Pod 实体(即容器)创建需要的准备工作。
- 通过 kubelet.statusManager 同步 podStatus。
- 创建 containerManagar 对象,并且创建 Pod level cgroup,更新 Qos level cgroup。
- 如果是 static Pod,就创建或者更新对应的 mirrorPod。
- 创建 Pod 的数据目录,存放 volume 和 plugin 信息。
- 调用 kubelet.volumeManager 组件,等待它将 Pod 所需要的所有外挂的 volume 都准备好(volumeManager 会在后台做这些事情)。
- 如果有 image secrets,获取对应的 secrets 数据。
- 在 probeManager 中添加 Pod,如果 Pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测。
- 调用 kubeGenericRuntimeManager.SyncPod 方法,进行真正的容器创建。
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
kl.statusManager.SetPodStatus(pod, apiPodStatus)
pcm := kl.containerManager.NewPodContainerManager()
if err := kl.makePodDataDirs(pod); err != nil {
return false, err
}
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
return false, err
}
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Ensure the pod is being probed
kl.probeManager.AddPod(pod)
result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff)
...
return false, nil
}
syncPod 主要执行以下几个操作:
- 计算 sandbox 和 container 是否发生了变化。
- 如果发生了变化,则清理掉发生变化了的 sandbox 和 container。
- 创建 sandbox。
- 创建 ephemeral containers。
- 创建 init containers。init containers 可以有多个,多个 init containers 严格按照顺序启动,只有前一个 init container 启动完成之后,才开始启动下一个 container。
- 调整正在运行的容器的资源用量(InPlacePodVerticalScaling 特性:用于支持在不重新调度Pod到不同节点的情况下,动态地调整已运行Pod内部容器资源(如 CPU 和内存)的请求和限制)
- 创建 normal containers,也就是业务容器。
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(ctx, pod, podStatus)
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(ctx, pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
return
}
}
}
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
...
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
}
// Step 5: start ephemeral containers
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
// Step 6: start init containers.
for _, idx := range podContainerChanges.InitContainersToStart {
container := &pod.Spec.InitContainers[idx]
// Start the next init container.
if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}
}
// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if isInPlacePodVerticalScalingAllowed(pod) {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
}
}
// Step 8: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
return
}
对于 ephemeral container、init container、normal container 来说,都是调用 start()->startContainer()
创建并启动容器 。整个顺序如下所示,
- 拉取镜像。
- 创建容器。
- 启动容器。
- 执行 post start hook。
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)
// Step 2: create the container.
...
containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
// Step 3: start the container.
err = m.runtimeService.StartContainer(ctx, containerID)
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
...
}
return "", nil
}
相关链接
kubelet 创建 pod 的流程:https://cloud.tencent.com/developer/article/1557556