123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package volumemanager
- import (
- "errors"
- "fmt"
- "sort"
- "strconv"
- "strings"
- "time"
- "k8s.io/klog"
- "k8s.io/utils/mount"
- v1 "k8s.io/api/core/v1"
- k8stypes "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/record"
- csitrans "k8s.io/csi-translation-lib"
- "k8s.io/kubernetes/pkg/kubelet/config"
- "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/pod"
- "k8s.io/kubernetes/pkg/kubelet/status"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/csimigration"
- "k8s.io/kubernetes/pkg/volume/util"
- "k8s.io/kubernetes/pkg/volume/util/hostutil"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- "k8s.io/kubernetes/pkg/volume/util/types"
- "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
- )
- const (
- // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
- // between successive executions
- reconcilerLoopSleepPeriod = 100 * time.Millisecond
- // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
- // DesiredStateOfWorldPopulator loop waits between successive executions
- desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
- // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
- // time the DesiredStateOfWorldPopulator loop waits between successive pod
- // cleanup calls (to prevent calling containerruntime.GetPodStatus too
- // frequently).
- desiredStateOfWorldPopulatorGetPodStatusRetryDuration = 2 * time.Second
- // podAttachAndMountTimeout is the maximum amount of time the
- // WaitForAttachAndMount call will wait for all volumes in the specified pod
- // to be attached and mounted. Even though cloud operations can take several
- // minutes to complete, we set the timeout to 2 minutes because kubelet
- // will retry in the next sync iteration. This frees the associated
- // goroutine of the pod to process newer updates if needed (e.g., a delete
- // request to the pod).
- // Value is slightly offset from 2 minutes to make timeouts due to this
- // constant recognizable.
- podAttachAndMountTimeout = 2*time.Minute + 3*time.Second
- // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
- // call waits before retrying
- podAttachAndMountRetryInterval = 300 * time.Millisecond
- // waitForAttachTimeout is the maximum amount of time a
- // operationexecutor.Mount call will wait for a volume to be attached.
- // Set to 10 minutes because we've seen attach operations take several
- // minutes to complete for some volume plugins in some cases. While this
- // operation is waiting it only blocks other operations on the same device,
- // other devices are not affected.
- waitForAttachTimeout = 10 * time.Minute
- )
- // VolumeManager runs a set of asynchronous loops that figure out which volumes
- // need to be attached/mounted/unmounted/detached based on the pods scheduled on
- // this node and makes it so.
- type VolumeManager interface {
- // Starts the volume manager and all the asynchronous loops that it controls
- Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
- // WaitForAttachAndMount processes the volumes referenced in the specified
- // pod and blocks until they are all attached and mounted (reflected in
- // actual state of the world).
- // An error is returned if all volumes are not attached and mounted within
- // the duration defined in podAttachAndMountTimeout.
- WaitForAttachAndMount(pod *v1.Pod) error
- // GetMountedVolumesForPod returns a VolumeMap containing the volumes
- // referenced by the specified pod that are successfully attached and
- // mounted. The key in the map is the OuterVolumeSpecName (i.e.
- // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
- // volumes.
- GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
- // GetExtraSupplementalGroupsForPod returns a list of the extra
- // supplemental groups for the Pod. These extra supplemental groups come
- // from annotations on persistent volumes that the pod depends on.
- GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
- // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
- // interface and are currently in use according to the actual and desired
- // state of the world caches. A volume is considered "in use" as soon as it
- // is added to the desired state of world, indicating it *should* be
- // attached to this node and remains "in use" until it is removed from both
- // the desired state of the world and the actual state of the world, or it
- // has been unmounted (as indicated in actual state of world).
- GetVolumesInUse() []v1.UniqueVolumeName
- // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
- // has been synced at least once after kubelet starts so that it is safe to update mounted
- // volume list retrieved from actual state.
- ReconcilerStatesHasBeenSynced() bool
- // VolumeIsAttached returns true if the given volume is attached to this
- // node.
- VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
- // Marks the specified volume as having successfully been reported as "in
- // use" in the nodes's volume status.
- MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
- }
- // NewVolumeManager returns a new concrete instance implementing the
- // VolumeManager interface.
- //
- // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
- // to communicate with the API server to fetch PV and PVC objects
- // volumePluginMgr - the volume plugin manager used to access volume plugins.
- // Must be pre-initialized.
- func NewVolumeManager(
- controllerAttachDetachEnabled bool,
- nodeName k8stypes.NodeName,
- podManager pod.Manager,
- podStatusProvider status.PodStatusProvider,
- kubeClient clientset.Interface,
- volumePluginMgr *volume.VolumePluginMgr,
- kubeContainerRuntime container.Runtime,
- mounter mount.Interface,
- hostutil hostutil.HostUtils,
- kubeletPodsDir string,
- recorder record.EventRecorder,
- checkNodeCapabilitiesBeforeMount bool,
- keepTerminatedPodVolumes bool,
- blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
- vm := &volumeManager{
- kubeClient: kubeClient,
- volumePluginMgr: volumePluginMgr,
- desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
- actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
- operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- kubeClient,
- volumePluginMgr,
- recorder,
- checkNodeCapabilitiesBeforeMount,
- blockVolumePathHandler)),
- }
- intreeToCSITranslator := csitrans.New()
- csiMigratedPluginManager := csimigration.NewPluginManager(intreeToCSITranslator)
- vm.intreeToCSITranslator = intreeToCSITranslator
- vm.csiMigratedPluginManager = csiMigratedPluginManager
- vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
- kubeClient,
- desiredStateOfWorldPopulatorLoopSleepPeriod,
- desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
- podManager,
- podStatusProvider,
- vm.desiredStateOfWorld,
- vm.actualStateOfWorld,
- kubeContainerRuntime,
- keepTerminatedPodVolumes,
- csiMigratedPluginManager,
- intreeToCSITranslator)
- vm.reconciler = reconciler.NewReconciler(
- kubeClient,
- controllerAttachDetachEnabled,
- reconcilerLoopSleepPeriod,
- waitForAttachTimeout,
- nodeName,
- vm.desiredStateOfWorld,
- vm.actualStateOfWorld,
- vm.desiredStateOfWorldPopulator.HasAddedPods,
- vm.operationExecutor,
- mounter,
- hostutil,
- volumePluginMgr,
- kubeletPodsDir)
- return vm
- }
- // volumeManager implements the VolumeManager interface
- type volumeManager struct {
- // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
- // communicate with the API server to fetch PV and PVC objects
- kubeClient clientset.Interface
- // volumePluginMgr is the volume plugin manager used to access volume
- // plugins. It must be pre-initialized.
- volumePluginMgr *volume.VolumePluginMgr
- // desiredStateOfWorld is a data structure containing the desired state of
- // the world according to the volume manager: i.e. what volumes should be
- // attached and which pods are referencing the volumes).
- // The data structure is populated by the desired state of the world
- // populator using the kubelet pod manager.
- desiredStateOfWorld cache.DesiredStateOfWorld
- // actualStateOfWorld is a data structure containing the actual state of
- // the world according to the manager: i.e. which volumes are attached to
- // this node and what pods the volumes are mounted to.
- // The data structure is populated upon successful completion of attach,
- // detach, mount, and unmount actions triggered by the reconciler.
- actualStateOfWorld cache.ActualStateOfWorld
- // operationExecutor is used to start asynchronous attach, detach, mount,
- // and unmount operations.
- operationExecutor operationexecutor.OperationExecutor
- // reconciler runs an asynchronous periodic loop to reconcile the
- // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
- // detach, mount, and unmount operations using the operationExecutor.
- reconciler reconciler.Reconciler
- // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
- // populate the desiredStateOfWorld using the kubelet PodManager.
- desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
- // csiMigratedPluginManager keeps track of CSI migration status of plugins
- csiMigratedPluginManager csimigration.PluginManager
- // intreeToCSITranslator translates in-tree volume specs to CSI
- intreeToCSITranslator csimigration.InTreeToCSITranslator
- }
- func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
- defer runtime.HandleCrash()
- go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
- klog.V(2).Infof("The desired_state_of_world populator starts")
- klog.Infof("Starting Kubelet Volume Manager")
- go vm.reconciler.Run(stopCh)
- metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
- if vm.kubeClient != nil {
- // start informer for CSIDriver
- vm.volumePluginMgr.Run(stopCh)
- }
- <-stopCh
- klog.Infof("Shutting down Kubelet Volume Manager")
- }
- func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
- podVolumes := make(container.VolumeMap)
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
- Mounter: mountedVolume.Mounter,
- BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
- ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
- InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
- }
- }
- return podVolumes
- }
- func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
- podName := util.GetUniquePodName(pod)
- supplementalGroups := sets.NewString()
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- if mountedVolume.VolumeGidValue != "" {
- supplementalGroups.Insert(mountedVolume.VolumeGidValue)
- }
- }
- result := make([]int64, 0, supplementalGroups.Len())
- for _, group := range supplementalGroups.List() {
- iGroup, extra := getExtraSupplementalGid(group, pod)
- if !extra {
- continue
- }
- result = append(result, int64(iGroup))
- }
- return result
- }
- func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
- // Report volumes in desired state of world and actual state of world so
- // that volumes are marked in use as soon as the decision is made that the
- // volume *should* be attached to this node until it is safely unmounted.
- desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
- allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
- volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes))
- desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes))
- for _, volume := range desiredVolumes {
- if volume.PluginIsAttachable {
- if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
- desiredVolumesMap[volume.VolumeName] = true
- volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
- }
- }
- }
- for _, volume := range allAttachedVolumes {
- if volume.PluginIsAttachable {
- if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
- volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
- }
- }
- }
- sort.Slice(volumesToReportInUse, func(i, j int) bool {
- return string(volumesToReportInUse[i]) < string(volumesToReportInUse[j])
- })
- return volumesToReportInUse
- }
- func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
- return vm.reconciler.StatesHasBeenSynced()
- }
- func (vm *volumeManager) VolumeIsAttached(
- volumeName v1.UniqueVolumeName) bool {
- return vm.actualStateOfWorld.VolumeExists(volumeName)
- }
- func (vm *volumeManager) MarkVolumesAsReportedInUse(
- volumesReportedAsInUse []v1.UniqueVolumeName) {
- vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
- }
- func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
- if pod == nil {
- return nil
- }
- expectedVolumes := getExpectedVolumes(pod)
- if len(expectedVolumes) == 0 {
- // No volumes to verify
- return nil
- }
- klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
- uniquePodName := util.GetUniquePodName(pod)
- // Some pods expect to have Setup called over and over again to update.
- // Remount plugins for which this is true. (Atomically updating volumes,
- // like Downward API, depend on this to update the contents of the volume).
- vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
- err := wait.PollImmediate(
- podAttachAndMountRetryInterval,
- podAttachAndMountTimeout,
- vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
- if err != nil {
- unmountedVolumes :=
- vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
- // Also get unattached volumes for error message
- unattachedVolumes :=
- vm.getUnattachedVolumes(expectedVolumes)
- if len(unmountedVolumes) == 0 {
- return nil
- }
- return fmt.Errorf(
- "unmounted volumes=%v, unattached volumes=%v: %s",
- unmountedVolumes,
- unattachedVolumes,
- err)
- }
- klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
- return nil
- }
- // getUnattachedVolumes returns a list of the volumes that are expected to be attached but
- // are not currently attached to the node
- func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string {
- unattachedVolumes := []string{}
- for _, volume := range expectedVolumes {
- if !vm.actualStateOfWorld.VolumeExists(v1.UniqueVolumeName(volume)) {
- unattachedVolumes = append(unattachedVolumes, volume)
- }
- }
- return unattachedVolumes
- }
- // verifyVolumesMountedFunc returns a method that returns true when all expected
- // volumes are mounted.
- func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
- return func() (done bool, err error) {
- if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
- return true, errors.New(strings.Join(errs, "; "))
- }
- return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
- }
- }
- // getUnmountedVolumes fetches the current list of mounted volumes from
- // the actual state of the world, and uses it to process the list of
- // expectedVolumes. It returns a list of unmounted volumes.
- // The list also includes volume that may be mounted in uncertain state.
- func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
- mountedVolumes := sets.NewString()
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
- }
- return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
- }
- // filterUnmountedVolumes adds each element of expectedVolumes that is not in
- // mountedVolumes to a list of unmountedVolumes and returns it.
- func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
- unmountedVolumes := []string{}
- for _, expectedVolume := range expectedVolumes {
- if !mountedVolumes.Has(expectedVolume) {
- unmountedVolumes = append(unmountedVolumes, expectedVolume)
- }
- }
- return unmountedVolumes
- }
- // getExpectedVolumes returns a list of volumes that must be mounted in order to
- // consider the volume setup step for this pod satisfied.
- func getExpectedVolumes(pod *v1.Pod) []string {
- mounts, devices := util.GetPodVolumeNames(pod)
- return mounts.Union(devices).UnsortedList()
- }
- // getExtraSupplementalGid returns the value of an extra supplemental GID as
- // defined by an annotation on a volume and a boolean indicating whether the
- // volume defined a GID that the pod doesn't already request.
- func getExtraSupplementalGid(volumeGidValue string, pod *v1.Pod) (int64, bool) {
- if volumeGidValue == "" {
- return 0, false
- }
- gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
- if err != nil {
- return 0, false
- }
- if pod.Spec.SecurityContext != nil {
- for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
- if gid == int64(existingGid) {
- return 0, false
- }
- }
- }
- return gid, true
- }
|