整体流程
优雅删除 Pod 时,整体流程如下,这里考虑 APIServer 和 kubelet 两个环节:
-
客户端请求删除 Pod,此次请求删除 GracePeriodSeconds 往往会采用默认值 30s。
-
APIServer 收到此次请求,更新 Pod 的信息,设置 Pod 的 DeletionTimestamp、DeletionGracePeriodSeconds(第一次)。
-
Kubelet 监听到 Pod 的变更,优雅释放 Pod 资源。
-
Kubelet 内部收到 Pod 资源被完全释放,再次请求删除 Pod,此次请求删除 GracePeriodSeconds 会设置为 0。
-
APIServer 再次收到请求,此次会从 Etcd 中删除 Pod 信息(第二次)。
-
Kubelet 再次监听到 Pod 的变更,但是此次跟上次采用的处理逻辑不同,这次它会完成最终 Pod 的清理工作。
源码解析
先阐述下 APIServer 侧的源码。之前已介绍过 APIServer 侧的整体代码了,APIServer 在 registerResourceHandlers()
中会注册资源处理的 handler,因此,从 registerResourceHandlers()
开始跟踪,
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
...
for _, action := range actions {
case "DELETE": // Delete a resource.
...
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("delete"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Writes(deleteReturnType).
Returns(http.StatusOK, "OK", deleteReturnType).
Returns(http.StatusAccepted, "Accepted", deleteReturnType)
if isGracefulDeleter {
route.Reads(versionedDeleterObject)
route.ParameterNamed("body").Required(false)
if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil {
return nil, nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
}
...
}
接下去的调用链路是,
registerResourceHandlers()->restfulDeleteResource()->store.Delete()
store.Delet() 是删除的核心逻辑所在,
- 首先是调用 BeforeDelete() 判断是否可以被优雅删除,同时设置 DeletionTimestamp、DeletionGracePeriodSeconds。
- 接着调用 updateForGracefulDeletionAndFinalizers() 判断是否要立即被删除。这里是根据 GracePeriodSeconds 判断,是否要立即删除:如果是 0 则立即删除;如果没有设置的话,默认情况下会被设置为 30s,不立即删除。
- 之后则根据立即删除的判断做出不同的逻辑,
- 如果不是立即删除,则直接返回。通常情况下是这种,之后 kubelet 会再次调用删除接口进行删除。
- 如果是立即删除,则调用底层存储的 Delete() 接口,从 etcd 中删除。
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
...
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
if graceful || pendingFinalizers || shouldUpdateFinalizers {
err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
...
}
if !deleteImmediately || err != nil {
return out, false, err
}
if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
...
}
return out, true, err
}
func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
...
now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
objectMeta.SetDeletionTimestamp(&now)
objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
return true, false, nil
}
func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, deleteValidation rest.ValidateObjectFunc, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {
lastGraceful := int64(0)
...
err = e.Storage.GuaranteedUpdate(
...
storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
...
lastGraceful = *options.GracePeriodSeconds
}),
...
)
switch err {
case nil:
if lastGraceful > 0 {
return nil, false, false, out, lastExisting
}
return nil, true, true, out, lastExisting
}
}
接着阐述下 Kubelet 侧的源码。之前已介绍过 Kubelet 侧的整体代码了,Kubelet 在 syncLoop 中根据不同的事件源根据事件的类型进行不同的逻辑处理。
- 如果 Pod 设置了 DeletionTimestamp、DeletionGracePeriodSeconds。kubelet 会按照 DELETE 事件来进行处理,走的是 kubetypes.DELETE 这边的逻辑,最终是 kill Pod。
- 如果 Pod 从 Etcd 中删除了,则会按照 REMOVE 事件来进行处理。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
}
...
}
// handler.HandlePodUpdates->podWorkers.podWorkerLoop()->kubelet.SyncTerminatingRuntimePod()
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
...
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
}
}
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
...
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
...
return err
}
return nil
}
// handler.HandlePodRemoves->kubelet.deletePod
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.RemovePod(pod)
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
}
}
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
...
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
...
return nil
}
同时,kubelet 在启动的时候会启动一个 Status Manager,用于向 APIServer 同步 Pod 的状态。在 Status Manager 中会调用 canBeDeleted() 函数进行判断,是否可以删除。如果可以的话,则会再次调用 APIServer 的 Delete Pod 接口,删除 Pod,此时 GracePeriodSeconds 设置为了 0。APIServer 收到这个请求之后,会删除 etcd 中的 Pod。
func (m *manager) Start() {
...
go wait.Forever(func() {
for {
select {
case <-m.podStatusChannel:
klog.V(4).InfoS("Syncing updated statuses")
m.syncBatch(false)
case <-syncTicker:
klog.V(4).InfoS("Syncing all statuses")
m.syncBatch(true)
}
}
}, 0)
}
func (m *manager) syncBatch(all bool) int {
...
for _, update := range updatedStatuses {
m.syncPod(update.podUID, update.status)
}
return len(updatedStatuses)
}
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
...
if m.canBeDeleted(pod, status.status, status.podIsFinished) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
...
}
}
这里做下联动,
-
对于第一次删除来说,如果没有指定 GracePeriodSeconds 的话,其实默认就是 graceful 删除。根据 APIServer 侧的行为,可以看到这种行为其实是更新 Pod 的行为,因为是给 Pod 设置 DeletionTimestamp、DeletionGracePeriodSeconds。
kubelet 会按照 DELETE 事件来进行处理,会 kill Pod,清理 Pod 相关的资源。清理完后 Stauts Manager 会收到通知,接着判断是否可以删除。如果可以的话,则会调用 APIServer 的 DELETE Pod 接口删除。
-
对于第二次删除来说。kubelet 也会收到变化,但是此次是按照 REMOVE 事件来进行处理,走的是 kubetypes.REMOVE 这边的逻辑,完成 Pod 的最终清理工作。
相关链接
Kubernetes源码分析之Pod的删除:https://juejin.cn/post/6844903842321039368