程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 核心流程- Pod 删除过程

发表于 2023-04-21 | 分类于 Kubernetes | 0 | 阅读次数 3122

整体流程

优雅删除 Pod 时,整体流程如下,这里考虑 APIServer 和 kubelet 两个环节:

  • 客户端请求删除 Pod,此次请求删除 GracePeriodSeconds 往往会采用默认值 30s。

  • APIServer 收到此次请求,更新 Pod 的信息,设置 Pod 的 DeletionTimestamp、DeletionGracePeriodSeconds(第一次)。

  • Kubelet 监听到 Pod 的变更,优雅释放 Pod 资源。

  • Kubelet 内部收到 Pod 资源被完全释放,再次请求删除 Pod,此次请求删除 GracePeriodSeconds 会设置为 0。

  • APIServer 再次收到请求,此次会从 Etcd 中删除 Pod 信息(第二次)。

  • Kubelet 再次监听到 Pod 的变更,但是此次跟上次采用的处理逻辑不同,这次它会完成最终 Pod 的清理工作。

源码解析

先阐述下 APIServer 侧的源码。之前已介绍过 APIServer 侧的整体代码了,APIServer 在 registerResourceHandlers() 中会注册资源处理的 handler,因此,从 registerResourceHandlers() 开始跟踪,

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
  gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	...
  for _, action := range actions {
    case "DELETE": // Delete a resource.
			...
			handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
			route := ws.DELETE(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Writes(deleteReturnType).
				Returns(http.StatusOK, "OK", deleteReturnType).
				Returns(http.StatusAccepted, "Accepted", deleteReturnType)
			if isGracefulDeleter {
				route.Reads(versionedDeleterObject)
				route.ParameterNamed("body").Required(false)
				if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil {
					return nil, nil, err
				}
			}
			addParams(route, action.Params)
			routes = append(routes, route)
  }
  ...
}

接下去的调用链路是,

registerResourceHandlers()->restfulDeleteResource()->store.Delete()

store.Delet() 是删除的核心逻辑所在,

  • 首先是调用 BeforeDelete() 判断是否可以被优雅删除,同时设置 DeletionTimestamp、DeletionGracePeriodSeconds。
  • 接着调用 updateForGracefulDeletionAndFinalizers() 判断是否要立即被删除。这里是根据 GracePeriodSeconds 判断,是否要立即删除:如果是 0 则立即删除;如果没有设置的话,默认情况下会被设置为 30s,不立即删除。
  • 之后则根据立即删除的判断做出不同的逻辑,
    • 如果不是立即删除,则直接返回。通常情况下是这种,之后 kubelet 会再次调用删除接口进行删除。
    • 如果是立即删除,则调用底层存储的 Delete() 接口,从 etcd 中删除。
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
  ...
	graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
	
	if graceful || pendingFinalizers || shouldUpdateFinalizers {
		err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
		...
	}

	if !deleteImmediately || err != nil {
		return out, false, err
	}

	if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
		...
	}
	
	return out, true, err
}

func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
	...
	now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
	objectMeta.SetDeletionTimestamp(&now)
	objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)

	return true, false, nil
}

func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, deleteValidation rest.ValidateObjectFunc, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {
	lastGraceful := int64(0)
  ...
	err = e.Storage.GuaranteedUpdate(
		...
		storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
			...
			lastGraceful = *options.GracePeriodSeconds
		}),
		...
	)
	switch err {
	case nil:
		if lastGraceful > 0 {
			return nil, false, false, out, lastExisting
		}
		return nil, true, true, out, lastExisting
	}
}

接着阐述下 Kubelet 侧的源码。之前已介绍过 Kubelet 侧的整体代码了,Kubelet 在 syncLoop 中根据不同的事件源根据事件的类型进行不同的逻辑处理。

  • 如果 Pod 设置了 DeletionTimestamp、DeletionGracePeriodSeconds。kubelet 会按照 DELETE 事件来进行处理,走的是 kubetypes.DELETE 这边的逻辑,最终是 kill Pod。
  • 如果 Pod 从 Etcd 中删除了,则会按照 REMOVE 事件来进行处理。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			klog.ErrorS(nil, "Kubelet does not support snapshot update")
		default:
			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
		}
  }
	...
}

// handler.HandlePodUpdates->podWorkers.podWorkerLoop()->kubelet.SyncTerminatingRuntimePod()
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
	...
	for _, pod := range pods {
		kl.podManager.UpdatePod(pod)

		pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)

		kl.podWorkers.UpdatePod(UpdatePodOptions{
			Pod:        pod,
			MirrorPod:  mirrorPod,
			UpdateType: kubetypes.SyncPodUpdate,
			StartTime:  start,
		})
	}
}

func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
	...
	if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
		...
		return err
	}

	return nil
}

// handler.HandlePodRemoves->kubelet.deletePod
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		kl.podManager.RemovePod(pod)

		if err := kl.deletePod(pod); err != nil {
			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
		}
	}
}

func (kl *Kubelet) deletePod(pod *v1.Pod) error {
	...
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		UpdateType: kubetypes.SyncPodKill,
	})
	...
	return nil
}

同时,kubelet 在启动的时候会启动一个 Status Manager,用于向 APIServer 同步 Pod 的状态。在 Status Manager 中会调用 canBeDeleted() 函数进行判断,是否可以删除。如果可以的话,则会再次调用 APIServer 的 Delete Pod 接口,删除 Pod,此时 GracePeriodSeconds 设置为了 0。APIServer 收到这个请求之后,会删除 etcd 中的 Pod。

func (m *manager) Start() {
	...
	go wait.Forever(func() {
		for {
			select {
			case <-m.podStatusChannel:
				klog.V(4).InfoS("Syncing updated statuses")
				m.syncBatch(false)
			case <-syncTicker:
				klog.V(4).InfoS("Syncing all statuses")
				m.syncBatch(true)
			}
		}
	}, 0)
}

func (m *manager) syncBatch(all bool) int {
	...

	for _, update := range updatedStatuses {
		m.syncPod(update.podUID, update.status)
	}

	return len(updatedStatuses)
}

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
	pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
	...
  
	if m.canBeDeleted(pod, status.status, status.podIsFinished) {
		deleteOptions := metav1.DeleteOptions{
			GracePeriodSeconds: new(int64),
			Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
		}
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
		...
	}
}

这里做下联动,

  • 对于第一次删除来说,如果没有指定 GracePeriodSeconds 的话,其实默认就是 graceful 删除。根据 APIServer 侧的行为,可以看到这种行为其实是更新 Pod 的行为,因为是给 Pod 设置 DeletionTimestamp、DeletionGracePeriodSeconds。

    kubelet 会按照 DELETE 事件来进行处理,会 kill Pod,清理 Pod 相关的资源。清理完后 Stauts Manager 会收到通知,接着判断是否可以删除。如果可以的话,则会调用 APIServer 的 DELETE Pod 接口删除。

  • 对于第二次删除来说。kubelet 也会收到变化,但是此次是按照 REMOVE 事件来进行处理,走的是 kubetypes.REMOVE 这边的逻辑,完成 Pod 的最终清理工作。

相关链接

Kubernetes源码分析之Pod的删除:https://juejin.cn/post/6844903842321039368

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/232
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 核心流程-Pod 驱逐过程
Kubernetes 核心流程-Watch 流程
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2024 程序锅
0%