基础
PLEG 全称叫 Pod Lifecycle Event Generator,是 kubelet 中的一个模块。主要功能是通过 CRI 接口从容器运行时获取当前的 Pod 状态。然后将当前 Pod 状态和之前的 Pod 状态进行比较,生成 Pod 生命周期事件,并发送到 channel 中。kubelet 的主循环 syncLoop 会从 channel 中读取事件,结合事件以及 Pod 的期望状态对 Pod 做出调整。
目前 PLGE 有两种,
- 一种是 Generic PLEG。1.2 引入,主要原理如下,
- 核心是 relist() 函数,这个函数每隔 1 秒执行一次,每次执行的默认超时时间为 3 分钟。
- relist() 函数的主要流程是:获取当前容器运行时中正在运行的容器对应的 Pod 状态 -> 跟上一次保留的 Pod 状态进行比较,生成相应的 Pod 生命周期事件 -> 发送给 channel。
- 如果 Pod 数量较多,容易导致执行时间超过 3 分钟,导致 PLEG is not healthy,进而导致 node not ready 现象。
- 另一种是 Evented PLEG。1.26 引入,但是它的引入不是为了取代 Generic PLEG,而是和 Generic PLEG 配合,降低 Generic PLEG 的频次,提高性能。主要原理如下,
- 不再依赖获取->比较->生成事件这种模式,而是直接调用 CRI 提供的 GetContainerEvents() 接口来直接获取事件,然后生成 Pod 的生命周期事件,再发送给 channel。
- 但是为了保证只通过 GetContainerEvents() 接口获取事件可能会导致一些状态变更丢失,为此还会继续运行 Generic PLEG,由 Generic PLEG 来兜底,但是 Generic PLEG 的运行周期会有变化,
- 调用间隔时间从 1s 变成 300s。
- 执行的默认超时时间从 3min 变成 10min。
源码分析
PLEG 的启动
整个调用链路如下,
NewKubeletCommand()->Run()->run()->RunKubelet()->startKubelet()->Kubelet.Run()
Kubelet.Run() 中的代码如下所示,
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
// Start the pod lifecycle event generator.
kl.pleg.Start()
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
kl.syncLoop(ctx, updates, kl)
}
Generic PLEG 源码
GenericPLEG.Start() 中通过 go wait.Until()
方式,以间隔 RelistPeriod 的方式运行 g.Relist() 方法。其中跟踪 RelistPeriod 赋值的代码,可以看到它默认被赋值为 1s。
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
if !g.isRunning {
g.isRunning = true
g.stopCh = make(chan struct{})
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}
GenericPLEG.Relist() 方法如下,
- 先调用 GetPods() 获取所有的 Pod。跟踪下去可以看到,是调用 CRI 的 ListPodSandbox 和 ListContainers 接口获取 sandbox 和 containers,然后组合成对应的 Pod。
- 更新了 RelistTime。
- 比较当前获取到的 Pod 和 podRecords 中已有的 Pod 情况,生成 PodLifecycleEvent。
- 再遍历 PodLifecycleEvent,更新 podRecords 中的情况,同时将事件发送给 channel。
func (g *GenericPLEG) Relist() {
timestamp := g.clock.Now()
// Get all the pods.
podList, err := g.runtime.GetPods(ctx, true)
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
g.podRecords.setCurrent(pods)
// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
// Update the internal storage and send out the events.
g.podRecords.update(pid)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
...
}
}
}
...
}
是怎么判断 PLEG is not healthy 的?
PLEG 提供了一个健康检查函数 GenericPLEG.Healthy()。这个函数的主要作用就是当前时间距离上次 relist 时间是否超过了 RelistThreshold(默认 3 min),如果超过了则返回 error。
func (g *GenericPLEG) Healthy() (bool, error) {
relistTime := g.getRelistTime()
...
elapsed := g.clock.Since(relistTime)
if elapsed > g.relistDuration.RelistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
}
return true, nil
}
那么 PLEG is not healty 是怎么导致 node 出现 not ready 的呢?Kubelet.Run() 还会启动一个 goroutine,定期执行 syncNodeStatus() 方法,在这个方法中会调用 PLEG 的健康检查函数。
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.kubeClient != nil {
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
...
}
...
}
那么 PLEG is not healty 是怎么影响 Pod 状态同步的呢?在 syncLoop() 中,它会首先执行 kl.runtimeState.runtimeErrors() 方法,如果这个方法出错则不会执行 syncLoopIteration() 方法。syncLoopIteration() 方法则是针对各个事件,包括了针对上述 PLEG 生成的事件的处理,同时也会产生事件通知 Status Manager 进行 Pod 状态同步。其中 runtimeErrors() 方法的核心是调用各个健康检查的函数,其中就有 PLEG 的健康检查函数。
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
...
continue
}
// reset backoff if we have a success
duration = base
...
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
}
}
func (s *runtimeState) runtimeErrors() error {
for _, hc := range s.healthChecks {
if ok, err := hc.fn(); !ok {
errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
}
}
...
return utilerrors.NewAggregate(errs)
}
总结下,如果出现 pleg not healthy,一般有以下几种可能:
- 容器运行时无响应或响应超时,如 docker进程响应超时(比较常见)。
- 该节点上容器数量过多,导致 relist 的过程无法在 3 分钟内完成。
Evented PLEG 源码
EventedPLEG.Start() 中通过 go wait.Until()
方式,以无间隔的方式运行 watchEventsChannel() 方法。
func (e *EventedPLEG) Start() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if isEventedPLEGInUse() {
return
}
setEventedPLEGUsage(true)
e.stopCh = make(chan struct{})
e.stopCacheUpdateCh = make(chan struct{})
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}
在 watchEventsChannel() 方法中,
- 会起一个 goroutine 循环调用 runtimeService.GetContainerEvents() 方法直接从 CRI 中获取 container 变更事件。
- 同时 watchEventsChannel() 中会调用 processCRIEvents() 方法进行处理。
func (e *EventedPLEG) watchEventsChannel() {
containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, cap(e.eventChannel))
// Get the container events from the runtime.
go func() {
numAttempts := 0
for {
if numAttempts >= e.eventedPlegMaxStreamRetries {
if isEventedPLEGInUse() {
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
e.genericPleg.Start()
break
}
}
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh)
if err != nil {
metrics.EventedPLEGConnErr.Inc()
numAttempts++
e.Relist() // Force a relist to get the latest container and pods running metric.
}
}
}()
if isEventedPLEGInUse() {
e.processCRIEvents(containerEventsResponseCh)
}
}
相关链接
kubelet PLEG 的实现与优化:https://www.myway5.com/index.php/2023/02/27/kubelet-pleg-implementation/
Kubelet PLEG 源码分析:https://cloud.tencent.com/developer/article/1359236