123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- package pod
- import (
- "sync"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/kubernetes/pkg/kubelet/checkpoint"
- "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
- "k8s.io/kubernetes/pkg/kubelet/configmap"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/secret"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- )
- type Manager interface {
-
- GetPods() []*v1.Pod
-
-
- GetPodByFullName(podFullName string) (*v1.Pod, bool)
-
-
- GetPodByName(namespace, name string) (*v1.Pod, bool)
-
-
- GetPodByUID(types.UID) (*v1.Pod, bool)
-
-
- GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
-
-
- GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
-
- GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
-
-
- SetPods(pods []*v1.Pod)
-
- AddPod(pod *v1.Pod)
-
- UpdatePod(pod *v1.Pod)
-
-
-
- DeletePod(pod *v1.Pod)
-
-
-
- DeleteOrphanedMirrorPods()
-
-
-
-
-
-
-
- TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
-
-
- GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
-
-
- IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
- MirrorClient
- }
- type basicManager struct {
-
- lock sync.RWMutex
-
- podByUID map[kubetypes.ResolvedPodUID]*v1.Pod
-
- mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod
-
- podByFullName map[string]*v1.Pod
- mirrorPodByFullName map[string]*v1.Pod
-
- translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
-
- secretManager secret.Manager
- configMapManager configmap.Manager
- checkpointManager checkpointmanager.CheckpointManager
-
- MirrorClient
- }
- func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
- pm := &basicManager{}
- pm.secretManager = secretManager
- pm.configMapManager = configMapManager
- pm.checkpointManager = cpm
- pm.MirrorClient = client
- pm.SetPods(nil)
- return pm
- }
- func (pm *basicManager) SetPods(newPods []*v1.Pod) {
- pm.lock.Lock()
- defer pm.lock.Unlock()
- pm.podByUID = make(map[kubetypes.ResolvedPodUID]*v1.Pod)
- pm.podByFullName = make(map[string]*v1.Pod)
- pm.mirrorPodByUID = make(map[kubetypes.MirrorPodUID]*v1.Pod)
- pm.mirrorPodByFullName = make(map[string]*v1.Pod)
- pm.translationByUID = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
- pm.updatePodsInternal(newPods...)
- }
- func (pm *basicManager) AddPod(pod *v1.Pod) {
- pm.UpdatePod(pod)
- }
- func (pm *basicManager) UpdatePod(pod *v1.Pod) {
- pm.lock.Lock()
- defer pm.lock.Unlock()
- pm.updatePodsInternal(pod)
- if pm.checkpointManager != nil {
- if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil {
- klog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
- }
- }
- }
- func isPodInTerminatedState(pod *v1.Pod) bool {
- return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded
- }
- func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
- for _, pod := range pods {
- if pm.secretManager != nil {
- if isPodInTerminatedState(pod) {
-
-
-
-
- pm.secretManager.UnregisterPod(pod)
- } else {
-
-
- pm.secretManager.RegisterPod(pod)
- }
- }
- if pm.configMapManager != nil {
- if isPodInTerminatedState(pod) {
-
-
-
-
- pm.configMapManager.UnregisterPod(pod)
- } else {
-
-
- pm.configMapManager.RegisterPod(pod)
- }
- }
- podFullName := kubecontainer.GetPodFullName(pod)
-
-
- if IsMirrorPod(pod) {
- mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
- pm.mirrorPodByUID[mirrorPodUID] = pod
- pm.mirrorPodByFullName[podFullName] = pod
- if p, ok := pm.podByFullName[podFullName]; ok {
- pm.translationByUID[mirrorPodUID] = kubetypes.ResolvedPodUID(p.UID)
- }
- } else {
- resolvedPodUID := kubetypes.ResolvedPodUID(pod.UID)
- pm.podByUID[resolvedPodUID] = pod
- pm.podByFullName[podFullName] = pod
- if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
- pm.translationByUID[kubetypes.MirrorPodUID(mirror.UID)] = resolvedPodUID
- }
- }
- }
- }
- func (pm *basicManager) DeletePod(pod *v1.Pod) {
- pm.lock.Lock()
- defer pm.lock.Unlock()
- if pm.secretManager != nil {
- pm.secretManager.UnregisterPod(pod)
- }
- if pm.configMapManager != nil {
- pm.configMapManager.UnregisterPod(pod)
- }
- podFullName := kubecontainer.GetPodFullName(pod)
-
- if IsMirrorPod(pod) {
- mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
- delete(pm.mirrorPodByUID, mirrorPodUID)
- delete(pm.mirrorPodByFullName, podFullName)
- delete(pm.translationByUID, mirrorPodUID)
- } else {
- delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
- delete(pm.podByFullName, podFullName)
- }
- if pm.checkpointManager != nil {
- if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil {
- klog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
- }
- }
- }
- func (pm *basicManager) GetPods() []*v1.Pod {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- return podsMapToPods(pm.podByUID)
- }
- func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- pods := podsMapToPods(pm.podByUID)
- mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID)
- return pods, mirrorPods
- }
- func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- pod, ok := pm.podByUID[kubetypes.ResolvedPodUID(uid)]
- return pod, ok
- }
- func (pm *basicManager) GetPodByName(namespace, name string) (*v1.Pod, bool) {
- podFullName := kubecontainer.BuildPodFullName(name, namespace)
- return pm.GetPodByFullName(podFullName)
- }
- func (pm *basicManager) GetPodByFullName(podFullName string) (*v1.Pod, bool) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- pod, ok := pm.podByFullName[podFullName]
- return pod, ok
- }
- func (pm *basicManager) TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID {
-
- if uid == "" {
- return kubetypes.ResolvedPodUID(uid)
- }
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- if translated, ok := pm.translationByUID[kubetypes.MirrorPodUID(uid)]; ok {
- return translated
- }
- return kubetypes.ResolvedPodUID(uid)
- }
- func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID,
- mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- podToMirror = make(map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, len(pm.translationByUID))
- mirrorToPod = make(map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID, len(pm.translationByUID))
-
- for uid, pod := range pm.podByUID {
- if !IsStaticPod(pod) {
- continue
- }
- podToMirror[uid] = ""
- }
-
-
-
-
- for k, v := range pm.translationByUID {
- mirrorToPod[k] = v
- podToMirror[v] = k
- }
- return podToMirror, mirrorToPod
- }
- func (pm *basicManager) getOrphanedMirrorPodNames() []string {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- var podFullNames []string
- for podFullName := range pm.mirrorPodByFullName {
- if _, ok := pm.podByFullName[podFullName]; !ok {
- podFullNames = append(podFullNames, podFullName)
- }
- }
- return podFullNames
- }
- func (pm *basicManager) DeleteOrphanedMirrorPods() {
- podFullNames := pm.getOrphanedMirrorPodNames()
- for _, podFullName := range podFullNames {
- pm.MirrorClient.DeleteMirrorPod(podFullName, nil)
- }
- }
- func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
-
- if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
- return false
- }
- hash, ok := getHashFromMirrorPod(mirrorPod)
- if !ok {
- return false
- }
- return hash == getPodHash(pod)
- }
- func podsMapToPods(UIDMap map[kubetypes.ResolvedPodUID]*v1.Pod) []*v1.Pod {
- pods := make([]*v1.Pod, 0, len(UIDMap))
- for _, pod := range UIDMap {
- pods = append(pods, pod)
- }
- return pods
- }
- func mirrorPodsMapToMirrorPods(UIDMap map[kubetypes.MirrorPodUID]*v1.Pod) []*v1.Pod {
- pods := make([]*v1.Pod, 0, len(UIDMap))
- for _, pod := range UIDMap {
- pods = append(pods, pod)
- }
- return pods
- }
- func (pm *basicManager) GetMirrorPodByPod(pod *v1.Pod) (*v1.Pod, bool) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
- return mirrorPod, ok
- }
- func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) {
- pm.lock.RLock()
- defer pm.lock.RUnlock()
- pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
- return pod, ok
- }
|