reconciler.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package reconciler implements interfaces that attempt to reconcile the
  14. // desired state of the world with the actual state of the world by triggering
  15. // relevant actions (attach, detach, mount, unmount).
  16. package reconciler
  17. import (
  18. "fmt"
  19. "io/ioutil"
  20. "os"
  21. "path"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/klog"
  30. "k8s.io/kubernetes/pkg/features"
  31. "k8s.io/kubernetes/pkg/kubelet/config"
  32. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  33. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  34. "k8s.io/kubernetes/pkg/util/mount"
  35. volumepkg "k8s.io/kubernetes/pkg/volume"
  36. "k8s.io/kubernetes/pkg/volume/util"
  37. "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
  38. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  39. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  40. utilpath "k8s.io/utils/path"
  41. utilstrings "k8s.io/utils/strings"
  42. )
  43. // Reconciler runs a periodic loop to reconcile the desired state of the world
  44. // with the actual state of the world by triggering attach, detach, mount, and
  45. // unmount operations.
  46. // Note: This is distinct from the Reconciler implemented by the attach/detach
  47. // controller. This reconciles state for the kubelet volume manager. That
  48. // reconciles state for the attach/detach controller.
  49. type Reconciler interface {
  50. // Starts running the reconciliation loop which executes periodically, checks
  51. // if volumes that should be mounted are mounted and volumes that should
  52. // be unmounted are unmounted. If not, it will trigger mount/unmount
  53. // operations to rectify.
  54. // If attach/detach management is enabled, the manager will also check if
  55. // volumes that should be attached are attached and volumes that should
  56. // be detached are detached and trigger attach/detach operations as needed.
  57. Run(stopCh <-chan struct{})
  58. // StatesHasBeenSynced returns true only after syncStates process starts to sync
  59. // states at least once after kubelet starts
  60. StatesHasBeenSynced() bool
  61. }
  62. // NewReconciler returns a new instance of Reconciler.
  63. //
  64. // controllerAttachDetachEnabled - if true, indicates that the attach/detach
  65. // controller is responsible for managing the attach/detach operations for
  66. // this node, and therefore the volume manager should not
  67. // loopSleepDuration - the amount of time the reconciler loop sleeps between
  68. // successive executions
  69. // waitForAttachTimeout - the amount of time the Mount function will wait for
  70. // the volume to be attached
  71. // nodeName - the Name for this node, used by Attach and Detach methods
  72. // desiredStateOfWorld - cache containing the desired state of the world
  73. // actualStateOfWorld - cache containing the actual state of the world
  74. // populatorHasAddedPods - checker for whether the populator has finished
  75. // adding pods to the desiredStateOfWorld cache at least once after sources
  76. // are all ready (before sources are ready, pods are probably missing)
  77. // operationExecutor - used to trigger attach/detach/mount/unmount operations
  78. // safely (prevents more than one operation from being triggered on the same
  79. // volume)
  80. // mounter - mounter passed in from kubelet, passed down unmount path
  81. // volumePluginMgr - volume plugin manager passed from kubelet
  82. func NewReconciler(
  83. kubeClient clientset.Interface,
  84. controllerAttachDetachEnabled bool,
  85. loopSleepDuration time.Duration,
  86. waitForAttachTimeout time.Duration,
  87. nodeName types.NodeName,
  88. desiredStateOfWorld cache.DesiredStateOfWorld,
  89. actualStateOfWorld cache.ActualStateOfWorld,
  90. populatorHasAddedPods func() bool,
  91. operationExecutor operationexecutor.OperationExecutor,
  92. mounter mount.Interface,
  93. volumePluginMgr *volumepkg.VolumePluginMgr,
  94. kubeletPodsDir string) Reconciler {
  95. return &reconciler{
  96. kubeClient: kubeClient,
  97. controllerAttachDetachEnabled: controllerAttachDetachEnabled,
  98. loopSleepDuration: loopSleepDuration,
  99. waitForAttachTimeout: waitForAttachTimeout,
  100. nodeName: nodeName,
  101. desiredStateOfWorld: desiredStateOfWorld,
  102. actualStateOfWorld: actualStateOfWorld,
  103. populatorHasAddedPods: populatorHasAddedPods,
  104. operationExecutor: operationExecutor,
  105. mounter: mounter,
  106. volumePluginMgr: volumePluginMgr,
  107. kubeletPodsDir: kubeletPodsDir,
  108. timeOfLastSync: time.Time{},
  109. }
  110. }
  111. type reconciler struct {
  112. kubeClient clientset.Interface
  113. controllerAttachDetachEnabled bool
  114. loopSleepDuration time.Duration
  115. syncDuration time.Duration
  116. waitForAttachTimeout time.Duration
  117. nodeName types.NodeName
  118. desiredStateOfWorld cache.DesiredStateOfWorld
  119. actualStateOfWorld cache.ActualStateOfWorld
  120. populatorHasAddedPods func() bool
  121. operationExecutor operationexecutor.OperationExecutor
  122. mounter mount.Interface
  123. volumePluginMgr *volumepkg.VolumePluginMgr
  124. kubeletPodsDir string
  125. timeOfLastSync time.Time
  126. }
  127. func (rc *reconciler) Run(stopCh <-chan struct{}) {
  128. wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
  129. }
  130. func (rc *reconciler) reconciliationLoopFunc() func() {
  131. return func() {
  132. rc.reconcile()
  133. // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
  134. // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
  135. // desired state of world does not contain a complete list of pods.
  136. if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
  137. klog.Infof("Reconciler: start to sync state")
  138. rc.sync()
  139. }
  140. }
  141. }
  142. func (rc *reconciler) reconcile() {
  143. // Unmounts are triggered before mounts so that a volume that was
  144. // referenced by a pod that was deleted and is now referenced by another
  145. // pod is unmounted from the first pod before being mounted to the new
  146. // pod.
  147. // Ensure volumes that should be unmounted are unmounted.
  148. for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
  149. if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
  150. // Volume is mounted, unmount it
  151. klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
  152. err := rc.operationExecutor.UnmountVolume(
  153. mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
  154. if err != nil &&
  155. !nestedpendingoperations.IsAlreadyExists(err) &&
  156. !exponentialbackoff.IsExponentialBackoff(err) {
  157. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  158. // Log all other errors.
  159. klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  160. }
  161. if err == nil {
  162. klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
  163. }
  164. }
  165. }
  166. // Ensure volumes that should be attached/mounted are attached/mounted.
  167. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
  168. volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
  169. volumeToMount.DevicePath = devicePath
  170. if cache.IsVolumeNotAttachedError(err) {
  171. if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
  172. // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
  173. // for controller to finish attaching volume.
  174. klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
  175. err := rc.operationExecutor.VerifyControllerAttachedVolume(
  176. volumeToMount.VolumeToMount,
  177. rc.nodeName,
  178. rc.actualStateOfWorld)
  179. if err != nil &&
  180. !nestedpendingoperations.IsAlreadyExists(err) &&
  181. !exponentialbackoff.IsExponentialBackoff(err) {
  182. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  183. // Log all other errors.
  184. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  185. }
  186. if err == nil {
  187. klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
  188. }
  189. } else {
  190. // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
  191. // so attach it
  192. volumeToAttach := operationexecutor.VolumeToAttach{
  193. VolumeName: volumeToMount.VolumeName,
  194. VolumeSpec: volumeToMount.VolumeSpec,
  195. NodeName: rc.nodeName,
  196. }
  197. klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
  198. err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
  199. if err != nil &&
  200. !nestedpendingoperations.IsAlreadyExists(err) &&
  201. !exponentialbackoff.IsExponentialBackoff(err) {
  202. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  203. // Log all other errors.
  204. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  205. }
  206. if err == nil {
  207. klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
  208. }
  209. }
  210. } else if !volMounted || cache.IsRemountRequiredError(err) {
  211. // Volume is not mounted, or is already mounted, but requires remounting
  212. remountingLogStr := ""
  213. isRemount := cache.IsRemountRequiredError(err)
  214. if isRemount {
  215. remountingLogStr = "Volume is already mounted to pod, but remount was requested."
  216. }
  217. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
  218. err := rc.operationExecutor.MountVolume(
  219. rc.waitForAttachTimeout,
  220. volumeToMount.VolumeToMount,
  221. rc.actualStateOfWorld,
  222. isRemount)
  223. if err != nil &&
  224. !nestedpendingoperations.IsAlreadyExists(err) &&
  225. !exponentialbackoff.IsExponentialBackoff(err) {
  226. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  227. // Log all other errors.
  228. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  229. }
  230. if err == nil {
  231. if remountingLogStr == "" {
  232. klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
  233. } else {
  234. klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
  235. }
  236. }
  237. } else if cache.IsFSResizeRequiredError(err) &&
  238. utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
  239. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
  240. err := rc.operationExecutor.ExpandInUseVolume(
  241. volumeToMount.VolumeToMount,
  242. rc.actualStateOfWorld)
  243. if err != nil &&
  244. !nestedpendingoperations.IsAlreadyExists(err) &&
  245. !exponentialbackoff.IsExponentialBackoff(err) {
  246. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  247. // Log all other errors.
  248. klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
  249. }
  250. if err == nil {
  251. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
  252. }
  253. }
  254. }
  255. // Ensure devices that should be detached/unmounted are detached/unmounted.
  256. for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
  257. // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
  258. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
  259. !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
  260. if attachedVolume.GloballyMounted {
  261. // Volume is globally mounted to device, unmount it
  262. klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
  263. err := rc.operationExecutor.UnmountDevice(
  264. attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
  265. if err != nil &&
  266. !nestedpendingoperations.IsAlreadyExists(err) &&
  267. !exponentialbackoff.IsExponentialBackoff(err) {
  268. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  269. // Log all other errors.
  270. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  271. }
  272. if err == nil {
  273. klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
  274. }
  275. } else {
  276. // Volume is attached to node, detach it
  277. // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
  278. if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
  279. rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
  280. klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
  281. } else {
  282. // Only detach if kubelet detach is enabled
  283. klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
  284. err := rc.operationExecutor.DetachVolume(
  285. attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
  286. if err != nil &&
  287. !nestedpendingoperations.IsAlreadyExists(err) &&
  288. !exponentialbackoff.IsExponentialBackoff(err) {
  289. // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
  290. // Log all other errors.
  291. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  292. }
  293. if err == nil {
  294. klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
  295. }
  296. }
  297. }
  298. }
  299. }
  300. }
  301. // sync process tries to observe the real world by scanning all pods' volume directories from the disk.
  302. // If the actual and desired state of worlds are not consistent with the observed world, it means that some
  303. // mounted volumes are left out probably during kubelet restart. This process will reconstruct
  304. // the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
  305. // it will try to clean up the mount paths with operation executor.
  306. func (rc *reconciler) sync() {
  307. defer rc.updateLastSyncTime()
  308. rc.syncStates()
  309. }
  310. func (rc *reconciler) updateLastSyncTime() {
  311. rc.timeOfLastSync = time.Now()
  312. }
  313. func (rc *reconciler) StatesHasBeenSynced() bool {
  314. return !rc.timeOfLastSync.IsZero()
  315. }
  316. type podVolume struct {
  317. podName volumetypes.UniquePodName
  318. volumeSpecName string
  319. volumePath string
  320. pluginName string
  321. volumeMode v1.PersistentVolumeMode
  322. }
  323. type reconstructedVolume struct {
  324. volumeName v1.UniqueVolumeName
  325. podName volumetypes.UniquePodName
  326. volumeSpec *volumepkg.Spec
  327. outerVolumeSpecName string
  328. pod *v1.Pod
  329. attachablePlugin volumepkg.AttachableVolumePlugin
  330. volumeGidValue string
  331. devicePath string
  332. reportedInUse bool
  333. mounter volumepkg.Mounter
  334. blockVolumeMapper volumepkg.BlockVolumeMapper
  335. }
  336. // syncStates scans the volume directories under the given pod directory.
  337. // If the volume is not in desired state of world, this function will reconstruct
  338. // the volume related information and put it in both the actual and desired state of worlds.
  339. // For some volume plugins that cannot support reconstruction, it will clean up the existing
  340. // mount points since the volume is no long needed (removed from desired state)
  341. func (rc *reconciler) syncStates() {
  342. // Get volumes information by reading the pod's directory
  343. podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
  344. if err != nil {
  345. klog.Errorf("Cannot get volumes from disk %v", err)
  346. return
  347. }
  348. volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
  349. volumeNeedReport := []v1.UniqueVolumeName{}
  350. for _, volume := range podVolumes {
  351. if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
  352. klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
  353. // There is nothing to reconstruct
  354. continue
  355. }
  356. volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
  357. reconstructedVolume, err := rc.reconstructVolume(volume)
  358. if err != nil {
  359. if volumeInDSW {
  360. // Some pod needs the volume, don't clean it up and hope that
  361. // reconcile() calls SetUp and reconstructs the volume in ASW.
  362. klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
  363. continue
  364. }
  365. // No pod needs the volume.
  366. klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
  367. rc.cleanupMounts(volume)
  368. continue
  369. }
  370. if volumeInDSW {
  371. // Some pod needs the volume. And it exists on disk. Some previous
  372. // kubelet must have created the directory, therefore it must have
  373. // reported the volume as in use. Mark the volume as in use also in
  374. // this new kubelet so reconcile() calls SetUp and re-mounts the
  375. // volume if it's necessary.
  376. volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
  377. klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName)
  378. continue
  379. }
  380. // There is no pod that uses the volume.
  381. if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
  382. klog.Warning("Volume is in pending operation, skip cleaning up mounts")
  383. }
  384. klog.V(2).Infof(
  385. "Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
  386. reconstructedVolume)
  387. volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
  388. }
  389. if len(volumesNeedUpdate) > 0 {
  390. if err = rc.updateStates(volumesNeedUpdate); err != nil {
  391. klog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
  392. }
  393. }
  394. if len(volumeNeedReport) > 0 {
  395. rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
  396. }
  397. }
  398. func (rc *reconciler) cleanupMounts(volume podVolume) {
  399. klog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points",
  400. volume.podName, volume.volumeSpecName)
  401. mountedVolume := operationexecutor.MountedVolume{
  402. PodName: volume.podName,
  403. VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
  404. InnerVolumeSpecName: volume.volumeSpecName,
  405. PluginName: volume.pluginName,
  406. PodUID: types.UID(volume.podName),
  407. }
  408. // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
  409. // to unmount both volume and device in the same routine.
  410. err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
  411. if err != nil {
  412. klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
  413. return
  414. }
  415. }
  416. // Reconstruct volume data structure by reading the pod's volume directories
  417. func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
  418. // plugin initializations
  419. plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
  420. if err != nil {
  421. return nil, err
  422. }
  423. attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName)
  424. if err != nil {
  425. return nil, err
  426. }
  427. deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginByName(volume.pluginName)
  428. if err != nil {
  429. return nil, err
  430. }
  431. // Create pod object
  432. pod := &v1.Pod{
  433. ObjectMeta: metav1.ObjectMeta{
  434. UID: types.UID(volume.podName),
  435. },
  436. }
  437. mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
  438. if err != nil {
  439. return nil, err
  440. }
  441. volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
  442. volume.volumeMode,
  443. plugin,
  444. mapperPlugin,
  445. pod.UID,
  446. volume.podName,
  447. volume.volumeSpecName,
  448. volume.volumePath,
  449. volume.pluginName)
  450. if err != nil {
  451. return nil, err
  452. }
  453. var uniqueVolumeName v1.UniqueVolumeName
  454. if attachablePlugin != nil || deviceMountablePlugin != nil {
  455. uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
  456. if err != nil {
  457. return nil, err
  458. }
  459. } else {
  460. uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
  461. }
  462. volumeMounter, newMounterErr := plugin.NewMounter(
  463. volumeSpec,
  464. pod,
  465. volumepkg.VolumeOptions{})
  466. if newMounterErr != nil {
  467. return nil, fmt.Errorf(
  468. "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
  469. uniqueVolumeName,
  470. volumeSpec.Name(),
  471. volume.podName,
  472. pod.UID,
  473. newMounterErr)
  474. }
  475. // Check existence of mount point for filesystem volume or symbolic link for block volume
  476. isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, volumeMounter.GetPath(), volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
  477. if checkErr != nil {
  478. return nil, checkErr
  479. }
  480. // If mount or symlink doesn't exist, volume reconstruction should be failed
  481. if !isExist {
  482. return nil, fmt.Errorf("Volume: %q is not mounted", uniqueVolumeName)
  483. }
  484. // TODO: remove feature gate check after no longer needed
  485. var volumeMapper volumepkg.BlockVolumeMapper
  486. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock {
  487. var newMapperErr error
  488. if mapperPlugin != nil {
  489. volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
  490. volumeSpec,
  491. pod,
  492. volumepkg.VolumeOptions{})
  493. if newMapperErr != nil {
  494. return nil, fmt.Errorf(
  495. "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
  496. uniqueVolumeName,
  497. volumeSpec.Name(),
  498. volume.podName,
  499. pod.UID,
  500. newMapperErr)
  501. }
  502. }
  503. }
  504. reconstructedVolume := &reconstructedVolume{
  505. volumeName: uniqueVolumeName,
  506. podName: volume.podName,
  507. volumeSpec: volumeSpec,
  508. // volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
  509. // for volume cleanup.
  510. // TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
  511. outerVolumeSpecName: volume.volumeSpecName,
  512. pod: pod,
  513. attachablePlugin: attachablePlugin,
  514. volumeGidValue: "",
  515. // devicePath is updated during updateStates() by checking node status's VolumesAttached data.
  516. // TODO: get device path directly from the volume mount path.
  517. devicePath: "",
  518. mounter: volumeMounter,
  519. blockVolumeMapper: volumeMapper,
  520. }
  521. return reconstructedVolume, nil
  522. }
  523. // updateDevicePath gets the node status to retrieve volume device path information.
  524. func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
  525. node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(string(rc.nodeName), metav1.GetOptions{})
  526. if fetchErr != nil {
  527. klog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
  528. } else {
  529. for _, attachedVolume := range node.Status.VolumesAttached {
  530. if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
  531. volume.devicePath = attachedVolume.DevicePath
  532. volumesNeedUpdate[attachedVolume.Name] = volume
  533. klog.V(4).Infof("Update devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
  534. }
  535. }
  536. }
  537. }
  538. func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
  539. volumeAttacher, err := volume.attachablePlugin.NewAttacher()
  540. if volumeAttacher == nil || err != nil {
  541. return "", err
  542. }
  543. deviceMountPath, err :=
  544. volumeAttacher.GetDeviceMountPath(volume.volumeSpec)
  545. if err != nil {
  546. return "", err
  547. }
  548. if volume.blockVolumeMapper != nil {
  549. deviceMountPath, err =
  550. volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
  551. if err != nil {
  552. return "", err
  553. }
  554. }
  555. return deviceMountPath, nil
  556. }
  557. func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
  558. // Get the node status to retrieve volume device path information.
  559. rc.updateDevicePath(volumesNeedUpdate)
  560. for _, volume := range volumesNeedUpdate {
  561. err := rc.actualStateOfWorld.MarkVolumeAsAttached(
  562. //TODO: the devicePath might not be correct for some volume plugins: see issue #54108
  563. volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
  564. if err != nil {
  565. klog.Errorf("Could not add volume information to actual state of world: %v", err)
  566. continue
  567. }
  568. err = rc.actualStateOfWorld.MarkVolumeAsMounted(
  569. volume.podName,
  570. types.UID(volume.podName),
  571. volume.volumeName,
  572. volume.mounter,
  573. volume.blockVolumeMapper,
  574. volume.outerVolumeSpecName,
  575. volume.volumeGidValue,
  576. volume.volumeSpec)
  577. if err != nil {
  578. klog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
  579. continue
  580. }
  581. klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
  582. if volume.attachablePlugin != nil {
  583. deviceMountPath, err := getDeviceMountPath(volume)
  584. if err != nil {
  585. klog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
  586. continue
  587. }
  588. err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
  589. if err != nil {
  590. klog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
  591. continue
  592. }
  593. klog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName)
  594. }
  595. }
  596. return nil
  597. }
  598. // getVolumesFromPodDir scans through the volumes directories under the given pod directory.
  599. // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
  600. // and volume spec name.
  601. func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
  602. podsDirInfo, err := ioutil.ReadDir(podDir)
  603. if err != nil {
  604. return nil, err
  605. }
  606. volumes := []podVolume{}
  607. for i := range podsDirInfo {
  608. if !podsDirInfo[i].IsDir() {
  609. continue
  610. }
  611. podName := podsDirInfo[i].Name()
  612. podDir := path.Join(podDir, podName)
  613. // Find filesystem volume information
  614. // ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
  615. volumesDirs := map[v1.PersistentVolumeMode]string{
  616. v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
  617. }
  618. // TODO: remove feature gate check after no longer needed
  619. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
  620. // Find block volume information
  621. // ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
  622. volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
  623. }
  624. for volumeMode, volumesDir := range volumesDirs {
  625. var volumesDirInfo []os.FileInfo
  626. if volumesDirInfo, err = ioutil.ReadDir(volumesDir); err != nil {
  627. // Just skip the loop because given volumesDir doesn't exist depending on volumeMode
  628. continue
  629. }
  630. for _, volumeDir := range volumesDirInfo {
  631. pluginName := volumeDir.Name()
  632. volumePluginPath := path.Join(volumesDir, pluginName)
  633. volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
  634. if err != nil {
  635. klog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err)
  636. continue
  637. }
  638. unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
  639. for _, volumeName := range volumePluginDirs {
  640. volumePath := path.Join(volumePluginPath, volumeName)
  641. klog.V(5).Infof("podName: %v, volume path from volume plugin directory: %v, ", podName, volumePath)
  642. volumes = append(volumes, podVolume{
  643. podName: volumetypes.UniquePodName(podName),
  644. volumeSpecName: volumeName,
  645. volumePath: volumePath,
  646. pluginName: unescapePluginName,
  647. volumeMode: volumeMode,
  648. })
  649. }
  650. }
  651. }
  652. }
  653. klog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
  654. return volumes, nil
  655. }