Kubernetes学习笔记——DesiredStateOfWorldPopulator源码分析

我们知道,kubelet 启动时会运行 VolumeManager 协程来负责 Volume 变更时的操作。它主要通过 ActualStateOfWorld 和 DesiredStateOfWorld 这两个 cache 信息来让 VolumeManager 中的两个协程工作。
3599202506.png

DesiredStateOfWorldPopulator 和 Reconciler 两个 Goroutine 会通过图中两个的 StateOfWorld 状态进行通信,DesiredStateOfWorldPopulator 主要负责从 Kubernetes 节点中获取新的 Pod 对象并更新 DesiredStateOfWorld 结构;而后者会根据实际状态和当前状态的区别对当前节点的状态进行迁移,也就是通过 DesiredStateOfWorld 中状态的变更更新 ActualStateOfWorld 中的内容。
Ref: https://draveness.me/kubernetes-volume

下面就来分析一下 DesiredStateOfWorldPopulator 的源码。

0x02. 结构与接口

DesiredStateOfWorldPopulator 的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type desiredStateOfWorldPopulator struct {
kubeClient clientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
podStatusProvider status.PodStatusProvider
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
keepTerminatedPodVolumes bool
hasAddedPods bool
hasAddedPodsLock sync.RWMutex
}
  • kubeClient:用以从 API Server 获取 PV 和 PVC 对象
  • loopSleepDuration:定义连续执行的间隔
  • podManager:host 真实存在的 Pod 信息获取来源

DesiredStateOfWorldPopulator 的接口有三个方法:

1
2
3
4
5
6
7
type DesiredStateOfWorldPopulator interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})

ReprocessPod(podName volumetypes.UniquePodName)

HasAddedPods() bool
}

除了核心执行方法 Run,ReprocessPod 能够将特定 Pod 强制剔出 processedPods 列表进行强制重新处理。该方法用于在 Pod 更新上启用重新挂载卷。而 HasAddedPods 方法则返回 populator 是否已经将所有现有 Pod 处理添加到 desired state 中。

0x03. 核心流程分析

3.1 populatorLoopFunc

run 方法中,每隔 loopSleepDuration 就会执行一次 populatorLoopFunc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()


if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)

return
}

dswp.findAndRemoveDeletedPods()
}
}

3.2 findAndAddNewPods

findAndAddNewPods 遍历所有 Pod 并且将“应该添加到期望状态但实际上没有添加”的 Pod 添加到对应状态值中。

分析流程可知该方法先寻找不是终止状态的 Pod,再调用 processPodVolumes 处理这些符合条件的 Pod。

1
2
3
4
5
6
7
8
9
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
for _, pod := range dswp.podManager.GetPods() {
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod)
}
}

终止状态判定生效条件满足一条即判定为终止状态:

  • Phase 处于 PodFailed
  • Phase 处于 Succeeded
  • 删除时间不为空,且所有内部容器状态 ContainerStatus 都为 Terminated 或者 Waiting,或者 Container List 为空。

终止状态判定完毕,核心方法 processPodVolumes 会将给定 Pod 中的 Volumes 进行处理并添加到期望状态值中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// processPodVolumes processes the volumes in the given pod and adds them to the desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {

...
uniquePodName := util.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}

allVolumesAdded := true
mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers)

// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mountsMap, devicesMap)
....

// Add volume to desired state of world
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)

....
}

if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
// New pod has been synced. Re-mount all volumes that need it
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
}

}

可以看到,该方法处理流程如下:

  1. 判断该 Pod 之前是否被处理过,处理过则返回。即从 dswp 的 processedPods 列表中查找,需要加读锁。
  2. 通过 Container 列表创建 mountsMap 和 devicesMap,mountsMap 存储 VolumeMounts 字段的挂载信息,devicesMap 存储 BlockVolume 的信息。Key=VolumeMount.Name,Value=True。
  3. 对 Spec 中的 Volumes 列表中每一项 PodVolume,根据 PodName, Namespace, mountsMap, devicesMap 创建 VolumeSpec。
  4. 根据 Pod,PodName,VolumeSpec, PV Name,GID 将 Volume 添加到 dswp 缓存的 desiredStateOfWorld 中。
  5. 如果全部 Volume 的添加都成功则将 Pod 标记为“Processed”,同时将 Pod 标记为 RemountRequired 状态用以更新 Volume 的内容。

其中,步骤 3 的 createVolumeSpec 首先会判断该 podVolume 的 Source 是否为 PVC,如果为 PVC 则需要找到 Claim 背后的 PV Name,再通过 PV Name 获取真正的 PV 对象并返回。如果 PVC 为空,则对 PV 深拷贝并创建 Spec 对象返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
namespace string, claimName string) (string, types.UID, error) {
pvc, err :=
dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{})
if err != nil || pvc == nil {
return "", "", fmt.Errorf(......)
}

...

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {

return "", "", fmt.Errorf(......)
}

return pvc.Spec.VolumeName, pvc.UID, nil
}

实际上,通过 PVC 找 PV Name 是由 KubeClient 向 API Server 请求得到的。请求通过 Namespace 和 Claim Name 获取到 PVC 对象,确认 PVC 对象的 Phase 为 Bound 状态且 pvc.Spec.VolumeName 不为空。上述流程成功后返回 PVC 的 VolumeName(即 PV Name)和该 PVC 的 UID。

获取到 pvName 和 pvcUID 后,再次通过 KubeClient 向 API Server 请求得到 PV 对象。请求成功后检查 ClaimRef 是否为空,ClaimRef 的 UID 和传入的 PVC UID 是否一致。最后返回该 PV 对象,在返回的同时一并返回的还有 PV 的 GID。

再看看步骤 4,其调用的 AddPodToVolume 方法如果检查到没有可用的 Volume 插件或者可用插件不止一个,会返回 Error。如果 Pod Unique Name 重复,则不执行任何操作。此外,如果 Volume Name 如果不在该节点的 Volume 列表中,则该 Volume 会被隐式添加( implicitly added)。

If a volume with the name volumeName does not exist in the list of volumes that should be attached to this node, the volume is implicitly added.

3.3 findAndRemoveDeletedPods

先从 desiredStateOfWorld 中遍历待挂载的 Volume,然后从 PodManager 中根据待挂载 Volume 的 Pod UID 查找该对应 Pod。跳过正在运行和不需要删除 Volumes(keepTerminatedPodVolumes)的 Pod,执行删除流程。

当 Pod 从 PodManager 中删除 Pod 时,Pod 不会在 Volume Manager 中立即删除,需要确认 kubelet 容器运行时所有的 Container 已经全部终止。此外,同时还要确认 actualStateOfWorld 缓存中是否存在待挂载 Volume 信息。

上述确认过程确认完毕后,从 desiredStateOfWorld 缓存中删除 Pod,表明指定的 Pod 不再需要该 Volume。同时,从 dswp 维护的 processedPods 列表中删除该 Pod。