123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package reconciler implements interfaces that attempt to reconcile the
- // desired state of the world with the actual state of the world by triggering
- // relevant actions (attach, detach, mount, unmount).
- package reconciler
- import (
- "fmt"
- "io/ioutil"
- "os"
- "path"
- "time"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/kubelet/config"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
- "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
- "k8s.io/kubernetes/pkg/util/mount"
- volumepkg "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util"
- "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
- utilpath "k8s.io/utils/path"
- utilstrings "k8s.io/utils/strings"
- )
- // Reconciler runs a periodic loop to reconcile the desired state of the world
- // with the actual state of the world by triggering attach, detach, mount, and
- // unmount operations.
- // Note: This is distinct from the Reconciler implemented by the attach/detach
- // controller. This reconciles state for the kubelet volume manager. That
- // reconciles state for the attach/detach controller.
- type Reconciler interface {
- // Starts running the reconciliation loop which executes periodically, checks
- // if volumes that should be mounted are mounted and volumes that should
- // be unmounted are unmounted. If not, it will trigger mount/unmount
- // operations to rectify.
- // If attach/detach management is enabled, the manager will also check if
- // volumes that should be attached are attached and volumes that should
- // be detached are detached and trigger attach/detach operations as needed.
- Run(stopCh <-chan struct{})
- // StatesHasBeenSynced returns true only after syncStates process starts to sync
- // states at least once after kubelet starts
- StatesHasBeenSynced() bool
- }
- // NewReconciler returns a new instance of Reconciler.
- //
- // controllerAttachDetachEnabled - if true, indicates that the attach/detach
- // controller is responsible for managing the attach/detach operations for
- // this node, and therefore the volume manager should not
- // loopSleepDuration - the amount of time the reconciler loop sleeps between
- // successive executions
- // waitForAttachTimeout - the amount of time the Mount function will wait for
- // the volume to be attached
- // nodeName - the Name for this node, used by Attach and Detach methods
- // desiredStateOfWorld - cache containing the desired state of the world
- // actualStateOfWorld - cache containing the actual state of the world
- // populatorHasAddedPods - checker for whether the populator has finished
- // adding pods to the desiredStateOfWorld cache at least once after sources
- // are all ready (before sources are ready, pods are probably missing)
- // operationExecutor - used to trigger attach/detach/mount/unmount operations
- // safely (prevents more than one operation from being triggered on the same
- // volume)
- // mounter - mounter passed in from kubelet, passed down unmount path
- // volumePluginMgr - volume plugin manager passed from kubelet
- func NewReconciler(
- kubeClient clientset.Interface,
- controllerAttachDetachEnabled bool,
- loopSleepDuration time.Duration,
- waitForAttachTimeout time.Duration,
- nodeName types.NodeName,
- desiredStateOfWorld cache.DesiredStateOfWorld,
- actualStateOfWorld cache.ActualStateOfWorld,
- populatorHasAddedPods func() bool,
- operationExecutor operationexecutor.OperationExecutor,
- mounter mount.Interface,
- volumePluginMgr *volumepkg.VolumePluginMgr,
- kubeletPodsDir string) Reconciler {
- return &reconciler{
- kubeClient: kubeClient,
- controllerAttachDetachEnabled: controllerAttachDetachEnabled,
- loopSleepDuration: loopSleepDuration,
- waitForAttachTimeout: waitForAttachTimeout,
- nodeName: nodeName,
- desiredStateOfWorld: desiredStateOfWorld,
- actualStateOfWorld: actualStateOfWorld,
- populatorHasAddedPods: populatorHasAddedPods,
- operationExecutor: operationExecutor,
- mounter: mounter,
- volumePluginMgr: volumePluginMgr,
- kubeletPodsDir: kubeletPodsDir,
- timeOfLastSync: time.Time{},
- }
- }
- type reconciler struct {
- kubeClient clientset.Interface
- controllerAttachDetachEnabled bool
- loopSleepDuration time.Duration
- syncDuration time.Duration
- waitForAttachTimeout time.Duration
- nodeName types.NodeName
- desiredStateOfWorld cache.DesiredStateOfWorld
- actualStateOfWorld cache.ActualStateOfWorld
- populatorHasAddedPods func() bool
- operationExecutor operationexecutor.OperationExecutor
- mounter mount.Interface
- volumePluginMgr *volumepkg.VolumePluginMgr
- kubeletPodsDir string
- timeOfLastSync time.Time
- }
- func (rc *reconciler) Run(stopCh <-chan struct{}) {
- wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
- }
- func (rc *reconciler) reconciliationLoopFunc() func() {
- return func() {
- rc.reconcile()
- // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
- // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
- // desired state of world does not contain a complete list of pods.
- if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
- klog.Infof("Reconciler: start to sync state")
- rc.sync()
- }
- }
- }
- func (rc *reconciler) reconcile() {
- // Unmounts are triggered before mounts so that a volume that was
- // referenced by a pod that was deleted and is now referenced by another
- // pod is unmounted from the first pod before being mounted to the new
- // pod.
- // Ensure volumes that should be unmounted are unmounted.
- for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
- if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
- // Volume is mounted, unmount it
- klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
- err := rc.operationExecutor.UnmountVolume(
- mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
- }
- }
- }
- // Ensure volumes that should be attached/mounted are attached/mounted.
- for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
- volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
- volumeToMount.DevicePath = devicePath
- if cache.IsVolumeNotAttachedError(err) {
- if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
- // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
- // for controller to finish attaching volume.
- klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
- err := rc.operationExecutor.VerifyControllerAttachedVolume(
- volumeToMount.VolumeToMount,
- rc.nodeName,
- rc.actualStateOfWorld)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
- }
- } else {
- // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
- // so attach it
- volumeToAttach := operationexecutor.VolumeToAttach{
- VolumeName: volumeToMount.VolumeName,
- VolumeSpec: volumeToMount.VolumeSpec,
- NodeName: rc.nodeName,
- }
- klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
- err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
- }
- }
- } else if !volMounted || cache.IsRemountRequiredError(err) {
- // Volume is not mounted, or is already mounted, but requires remounting
- remountingLogStr := ""
- isRemount := cache.IsRemountRequiredError(err)
- if isRemount {
- remountingLogStr = "Volume is already mounted to pod, but remount was requested."
- }
- klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
- err := rc.operationExecutor.MountVolume(
- rc.waitForAttachTimeout,
- volumeToMount.VolumeToMount,
- rc.actualStateOfWorld,
- isRemount)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- if remountingLogStr == "" {
- klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
- } else {
- klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
- }
- }
- } else if cache.IsFSResizeRequiredError(err) &&
- utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
- klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
- err := rc.operationExecutor.ExpandInUseVolume(
- volumeToMount.VolumeToMount,
- rc.actualStateOfWorld)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
- }
- if err == nil {
- klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
- }
- }
- }
- // Ensure devices that should be detached/unmounted are detached/unmounted.
- for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
- // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
- if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
- !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
- if attachedVolume.GloballyMounted {
- // Volume is globally mounted to device, unmount it
- klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
- err := rc.operationExecutor.UnmountDevice(
- attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
- }
- } else {
- // Volume is attached to node, detach it
- // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
- if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
- rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
- klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
- } else {
- // Only detach if kubelet detach is enabled
- klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
- err := rc.operationExecutor.DetachVolume(
- attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
- if err != nil &&
- !nestedpendingoperations.IsAlreadyExists(err) &&
- !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
- }
- if err == nil {
- klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
- }
- }
- }
- }
- }
- }
- // sync process tries to observe the real world by scanning all pods' volume directories from the disk.
- // If the actual and desired state of worlds are not consistent with the observed world, it means that some
- // mounted volumes are left out probably during kubelet restart. This process will reconstruct
- // the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
- // it will try to clean up the mount paths with operation executor.
- func (rc *reconciler) sync() {
- defer rc.updateLastSyncTime()
- rc.syncStates()
- }
- func (rc *reconciler) updateLastSyncTime() {
- rc.timeOfLastSync = time.Now()
- }
- func (rc *reconciler) StatesHasBeenSynced() bool {
- return !rc.timeOfLastSync.IsZero()
- }
- type podVolume struct {
- podName volumetypes.UniquePodName
- volumeSpecName string
- volumePath string
- pluginName string
- volumeMode v1.PersistentVolumeMode
- }
- type reconstructedVolume struct {
- volumeName v1.UniqueVolumeName
- podName volumetypes.UniquePodName
- volumeSpec *volumepkg.Spec
- outerVolumeSpecName string
- pod *v1.Pod
- attachablePlugin volumepkg.AttachableVolumePlugin
- volumeGidValue string
- devicePath string
- reportedInUse bool
- mounter volumepkg.Mounter
- blockVolumeMapper volumepkg.BlockVolumeMapper
- }
- // syncStates scans the volume directories under the given pod directory.
- // If the volume is not in desired state of world, this function will reconstruct
- // the volume related information and put it in both the actual and desired state of worlds.
- // For some volume plugins that cannot support reconstruction, it will clean up the existing
- // mount points since the volume is no long needed (removed from desired state)
- func (rc *reconciler) syncStates() {
- // Get volumes information by reading the pod's directory
- podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
- if err != nil {
- klog.Errorf("Cannot get volumes from disk %v", err)
- return
- }
- volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
- volumeNeedReport := []v1.UniqueVolumeName{}
- for _, volume := range podVolumes {
- if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
- klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
- // There is nothing to reconstruct
- continue
- }
- volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
- reconstructedVolume, err := rc.reconstructVolume(volume)
- if err != nil {
- if volumeInDSW {
- // Some pod needs the volume, don't clean it up and hope that
- // reconcile() calls SetUp and reconstructs the volume in ASW.
- klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
- continue
- }
- // No pod needs the volume.
- klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
- rc.cleanupMounts(volume)
- continue
- }
- if volumeInDSW {
- // Some pod needs the volume. And it exists on disk. Some previous
- // kubelet must have created the directory, therefore it must have
- // reported the volume as in use. Mark the volume as in use also in
- // this new kubelet so reconcile() calls SetUp and re-mounts the
- // volume if it's necessary.
- volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
- klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName)
- continue
- }
- // There is no pod that uses the volume.
- if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
- klog.Warning("Volume is in pending operation, skip cleaning up mounts")
- }
- klog.V(2).Infof(
- "Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
- reconstructedVolume)
- volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
- }
- if len(volumesNeedUpdate) > 0 {
- if err = rc.updateStates(volumesNeedUpdate); err != nil {
- klog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
- }
- }
- if len(volumeNeedReport) > 0 {
- rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
- }
- }
- func (rc *reconciler) cleanupMounts(volume podVolume) {
- klog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points",
- volume.podName, volume.volumeSpecName)
- mountedVolume := operationexecutor.MountedVolume{
- PodName: volume.podName,
- VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
- InnerVolumeSpecName: volume.volumeSpecName,
- PluginName: volume.pluginName,
- PodUID: types.UID(volume.podName),
- }
- // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
- // to unmount both volume and device in the same routine.
- err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
- if err != nil {
- klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
- return
- }
- }
- // Reconstruct volume data structure by reading the pod's volume directories
- func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
- // plugin initializations
- plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
- if err != nil {
- return nil, err
- }
- attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName)
- if err != nil {
- return nil, err
- }
- deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginByName(volume.pluginName)
- if err != nil {
- return nil, err
- }
- // Create pod object
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- UID: types.UID(volume.podName),
- },
- }
- mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
- if err != nil {
- return nil, err
- }
- volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
- volume.volumeMode,
- plugin,
- mapperPlugin,
- pod.UID,
- volume.podName,
- volume.volumeSpecName,
- volume.volumePath,
- volume.pluginName)
- if err != nil {
- return nil, err
- }
- var uniqueVolumeName v1.UniqueVolumeName
- if attachablePlugin != nil || deviceMountablePlugin != nil {
- uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
- if err != nil {
- return nil, err
- }
- } else {
- uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
- }
- volumeMounter, newMounterErr := plugin.NewMounter(
- volumeSpec,
- pod,
- volumepkg.VolumeOptions{})
- if newMounterErr != nil {
- return nil, fmt.Errorf(
- "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
- uniqueVolumeName,
- volumeSpec.Name(),
- volume.podName,
- pod.UID,
- newMounterErr)
- }
- // Check existence of mount point for filesystem volume or symbolic link for block volume
- isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, volumeMounter.GetPath(), volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
- if checkErr != nil {
- return nil, checkErr
- }
- // If mount or symlink doesn't exist, volume reconstruction should be failed
- if !isExist {
- return nil, fmt.Errorf("Volume: %q is not mounted", uniqueVolumeName)
- }
- // TODO: remove feature gate check after no longer needed
- var volumeMapper volumepkg.BlockVolumeMapper
- if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock {
- var newMapperErr error
- if mapperPlugin != nil {
- volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
- volumeSpec,
- pod,
- volumepkg.VolumeOptions{})
- if newMapperErr != nil {
- return nil, fmt.Errorf(
- "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
- uniqueVolumeName,
- volumeSpec.Name(),
- volume.podName,
- pod.UID,
- newMapperErr)
- }
- }
- }
- reconstructedVolume := &reconstructedVolume{
- volumeName: uniqueVolumeName,
- podName: volume.podName,
- volumeSpec: volumeSpec,
- // volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
- // for volume cleanup.
- // TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
- outerVolumeSpecName: volume.volumeSpecName,
- pod: pod,
- attachablePlugin: attachablePlugin,
- volumeGidValue: "",
- // devicePath is updated during updateStates() by checking node status's VolumesAttached data.
- // TODO: get device path directly from the volume mount path.
- devicePath: "",
- mounter: volumeMounter,
- blockVolumeMapper: volumeMapper,
- }
- return reconstructedVolume, nil
- }
- // updateDevicePath gets the node status to retrieve volume device path information.
- func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
- node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(string(rc.nodeName), metav1.GetOptions{})
- if fetchErr != nil {
- klog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
- } else {
- for _, attachedVolume := range node.Status.VolumesAttached {
- if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
- volume.devicePath = attachedVolume.DevicePath
- volumesNeedUpdate[attachedVolume.Name] = volume
- klog.V(4).Infof("Update devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
- }
- }
- }
- }
- func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
- volumeAttacher, err := volume.attachablePlugin.NewAttacher()
- if volumeAttacher == nil || err != nil {
- return "", err
- }
- deviceMountPath, err :=
- volumeAttacher.GetDeviceMountPath(volume.volumeSpec)
- if err != nil {
- return "", err
- }
- if volume.blockVolumeMapper != nil {
- deviceMountPath, err =
- volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
- if err != nil {
- return "", err
- }
- }
- return deviceMountPath, nil
- }
- func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
- // Get the node status to retrieve volume device path information.
- rc.updateDevicePath(volumesNeedUpdate)
- for _, volume := range volumesNeedUpdate {
- err := rc.actualStateOfWorld.MarkVolumeAsAttached(
- //TODO: the devicePath might not be correct for some volume plugins: see issue #54108
- volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
- if err != nil {
- klog.Errorf("Could not add volume information to actual state of world: %v", err)
- continue
- }
- err = rc.actualStateOfWorld.MarkVolumeAsMounted(
- volume.podName,
- types.UID(volume.podName),
- volume.volumeName,
- volume.mounter,
- volume.blockVolumeMapper,
- volume.outerVolumeSpecName,
- volume.volumeGidValue,
- volume.volumeSpec)
- if err != nil {
- klog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
- continue
- }
- klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
- if volume.attachablePlugin != nil {
- deviceMountPath, err := getDeviceMountPath(volume)
- if err != nil {
- klog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
- continue
- }
- err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
- if err != nil {
- klog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
- continue
- }
- klog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName)
- }
- }
- return nil
- }
- // getVolumesFromPodDir scans through the volumes directories under the given pod directory.
- // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
- // and volume spec name.
- func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
- podsDirInfo, err := ioutil.ReadDir(podDir)
- if err != nil {
- return nil, err
- }
- volumes := []podVolume{}
- for i := range podsDirInfo {
- if !podsDirInfo[i].IsDir() {
- continue
- }
- podName := podsDirInfo[i].Name()
- podDir := path.Join(podDir, podName)
- // Find filesystem volume information
- // ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
- volumesDirs := map[v1.PersistentVolumeMode]string{
- v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
- }
- // TODO: remove feature gate check after no longer needed
- if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
- // Find block volume information
- // ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
- volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
- }
- for volumeMode, volumesDir := range volumesDirs {
- var volumesDirInfo []os.FileInfo
- if volumesDirInfo, err = ioutil.ReadDir(volumesDir); err != nil {
- // Just skip the loop because given volumesDir doesn't exist depending on volumeMode
- continue
- }
- for _, volumeDir := range volumesDirInfo {
- pluginName := volumeDir.Name()
- volumePluginPath := path.Join(volumesDir, pluginName)
- volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
- if err != nil {
- klog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err)
- continue
- }
- unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
- for _, volumeName := range volumePluginDirs {
- volumePath := path.Join(volumePluginPath, volumeName)
- klog.V(5).Infof("podName: %v, volume path from volume plugin directory: %v, ", podName, volumePath)
- volumes = append(volumes, podVolume{
- podName: volumetypes.UniquePodName(podName),
- volumeSpecName: volumeName,
- volumePath: volumePath,
- pluginName: unescapePluginName,
- volumeMode: volumeMode,
- })
- }
- }
- }
- }
- klog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
- return volumes, nil
- }
|