Kubernetes学习笔记——DesiredStateOfWorldPopulator源码分析

我们知道,kubelet启动时会运行VolumeManager协程来负责Volume变更时的操作。它主要通过ActualStateOfWorld和DesiredStateOfWorld这两个cache信息来让VolumeManager中的两个协程工作。
3599202506.png

DesiredStateOfWorldPopulator 和 Reconciler 两个 Goroutine 会通过图中两个的 StateOfWorld 状态进行通信,DesiredStateOfWorldPopulator 主要负责从 Kubernetes 节点中获取新的 Pod 对象并更新 DesiredStateOfWorld 结构;而后者会根据实际状态和当前状态的区别对当前节点的状态进行迁移,也就是通过 DesiredStateOfWorld 中状态的变更更新 ActualStateOfWorld 中的内容。
Ref: https://draveness.me/kubernetes-volume

下面就来分析一下DesiredStateOfWorldPopulator的源码。

0x02. 结构与接口

DesiredStateOfWorldPopulator的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type desiredStateOfWorldPopulator struct {
kubeClient clientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
podStatusProvider status.PodStatusProvider
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
keepTerminatedPodVolumes bool
hasAddedPods bool
hasAddedPodsLock sync.RWMutex
}
  • kubeClient:用以从API Server获取PV和PVC对象
  • loopSleepDuration:定义连续执行的间隔
  • podManager:host真实存在的Pod信息获取来源

DesiredStateOfWorldPopulator的接口有三个方法:

1
2
3
4
5
6
7
type DesiredStateOfWorldPopulator interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})

ReprocessPod(podName volumetypes.UniquePodName)

HasAddedPods() bool
}

除了核心执行方法Run,ReprocessPod能够将特定Pod强制剔出processedPods列表进行强制重新处理。该方法用于在Pod更新上启用重新挂载卷。而HasAddedPods方法则返回populator是否已经将所有现有Pod处理添加到desired state中。

0x03. 核心流程分析

3.1 populatorLoopFunc

run方法中,每隔loopSleepDuration就会执行一次populatorLoopFunc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()


if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)

return
}

dswp.findAndRemoveDeletedPods()
}
}

3.2 findAndAddNewPods

findAndAddNewPods遍历所有Pod并且将“应该添加到期望状态但实际上没有添加”的Pod添加到对应状态值中。

分析流程可知该方法先寻找不是终止状态的Pod,再调用processPodVolumes处理这些符合条件的Pod。

1
2
3
4
5
6
7
8
9
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
for _, pod := range dswp.podManager.GetPods() {
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod)
}
}

终止状态判定生效条件满足一条即判定为终止状态:

  • Phase处于PodFailed
  • Phase处于Succeeded
  • 删除时间不为空,且所有内部容器状态ContainerStatus都为Terminated或者Waiting,或者Container List为空。

终止状态判定完毕,核心方法processPodVolumes会将给定Pod中的Volumes进行处理并添加到期望状态值中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// processPodVolumes processes the volumes in the given pod and adds them to the desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {

...
uniquePodName := util.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}

allVolumesAdded := true
mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers)

// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mountsMap, devicesMap)
....

// Add volume to desired state of world
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)

....
}

if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
// New pod has been synced. Re-mount all volumes that need it
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
}

}

可以看到,该方法处理流程如下:

  1. 判断该Pod之前是否被处理过,处理过则返回。即从dswp的processedPods列表中查找,需要加读锁。
  2. 通过Container列表创建mountsMap和devicesMap,mountsMap存储VolumeMounts字段的挂载信息,devicesMap存储BlockVolume的信息。Key=VolumeMount.Name,Value=True。
  3. 对Spec中的Volumes列表中每一项PodVolume,根据PodName, Namespace, mountsMap, devicesMap创建VolumeSpec。
  4. 根据Pod,PodName,VolumeSpec, PV Name,GID将Volume添加到dswp缓存的desiredStateOfWorld中。
  5. 如果全部Volume的添加都成功则将Pod标记为“Processed”,同时将Pod标记为RemountRequired状态用以更新Volume的内容。

其中,步骤3的createVolumeSpec首先会判断该podVolume的Source是否为PVC,如果为PVC则需要找到Claim背后的PV Name,再通过PV Name获取真正的PV对象并返回。如果PVC为空,则对PV深拷贝并创建Spec对象返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
namespace string, claimName string) (string, types.UID, error) {
pvc, err :=
dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{})
if err != nil || pvc == nil {
return "", "", fmt.Errorf(......)
}

...

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {

return "", "", fmt.Errorf(......)
}

return pvc.Spec.VolumeName, pvc.UID, nil
}

实际上,通过PVC找PV Name是由KubeClient向API Server请求得到的。请求通过Namespace和Claim Name获取到PVC对象,确认PVC对象的Phase为Bound状态且pvc.Spec.VolumeName不为空。上述流程成功后返回PVC的VolumeName(即PV Name)和该PVC的UID。

获取到pvName和pvcUID后,再次通过KubeClient向API Server请求得到PV对象。请求成功后检查ClaimRef是否为空,ClaimRef的UID和传入的PVC UID是否一致。最后返回该PV对象,在返回的同时一并返回的还有PV的GID。

再看看步骤4,其调用的AddPodToVolume方法如果检查到没有可用的Volume插件或者可用插件不止一个,会返回Error。如果Pod Unique Name重复,则不执行任何操作。此外,如果Volume Name如果不在该节点的Volume列表中,则该Volume会被隐式添加( implicitly added)。

If a volume with the name volumeName does not exist in the list of volumes that should be attached to this node, the volume is implicitly added.

3.3 findAndRemoveDeletedPods

先从desiredStateOfWorld中遍历待挂载的Volume,然后从PodManager中根据待挂载Volume的Pod UID查找该对应Pod。跳过正在运行和不需要删除Volumes(keepTerminatedPodVolumes)的Pod,执行删除流程。

当Pod从PodManager中删除Pod时,Pod不会在Volume Manager中立即删除,需要确认kubelet容器运行时所有的Container已经全部终止。此外,同时还要确认actualStateOfWorld缓存中是否存在待挂载Volume信息。

上述确认过程确认完毕后,从desiredStateOfWorld缓存中删除Pod,表明指定的Pod不再需要该Volume。同时,从dswp维护的processedPods列表中删除该Pod。