reconciler.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package reconciler implements interfaces that attempt to reconcile the
  14. // desired state of the world with the actual state of the world by triggering
  15. // relevant actions (attach, detach, mount, unmount).
  16. package reconciler
  17. import (
  18. "context"
  19. "fmt"
  20. "io/ioutil"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "time"
  25. "k8s.io/klog"
  26. "k8s.io/utils/mount"
  27. utilpath "k8s.io/utils/path"
  28. utilstrings "k8s.io/utils/strings"
  29. v1 "k8s.io/api/core/v1"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/types"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. utilfeature "k8s.io/apiserver/pkg/util/feature"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/kubernetes/pkg/features"
  36. "k8s.io/kubernetes/pkg/kubelet/config"
  37. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  38. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  39. volumepkg "k8s.io/kubernetes/pkg/volume"
  40. "k8s.io/kubernetes/pkg/volume/util"
  41. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  42. "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
  43. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  44. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  45. )
  46. // Reconciler runs a periodic loop to reconcile the desired state of the world
  47. // with the actual state of the world by triggering attach, detach, mount, and
  48. // unmount operations.
  49. // Note: This is distinct from the Reconciler implemented by the attach/detach
  50. // controller. This reconciles state for the kubelet volume manager. That
  51. // reconciles state for the attach/detach controller.
  52. type Reconciler interface {
  53. // Starts running the reconciliation loop which executes periodically, checks
  54. // if volumes that should be mounted are mounted and volumes that should
  55. // be unmounted are unmounted. If not, it will trigger mount/unmount
  56. // operations to rectify.
  57. // If attach/detach management is enabled, the manager will also check if
  58. // volumes that should be attached are attached and volumes that should
  59. // be detached are detached and trigger attach/detach operations as needed.
  60. Run(stopCh <-chan struct{})
  61. // StatesHasBeenSynced returns true only after syncStates process starts to sync
  62. // states at least once after kubelet starts
  63. StatesHasBeenSynced() bool
  64. }
  65. // NewReconciler returns a new instance of Reconciler.
  66. //
  67. // controllerAttachDetachEnabled - if true, indicates that the attach/detach
  68. // controller is responsible for managing the attach/detach operations for
  69. // this node, and therefore the volume manager should not
  70. // loopSleepDuration - the amount of time the reconciler loop sleeps between
  71. // successive executions
  72. // waitForAttachTimeout - the amount of time the Mount function will wait for
  73. // the volume to be attached
  74. // nodeName - the Name for this node, used by Attach and Detach methods
  75. // desiredStateOfWorld - cache containing the desired state of the world
  76. // actualStateOfWorld - cache containing the actual state of the world
  77. // populatorHasAddedPods - checker for whether the populator has finished
  78. // adding pods to the desiredStateOfWorld cache at least once after sources
  79. // are all ready (before sources are ready, pods are probably missing)
  80. // operationExecutor - used to trigger attach/detach/mount/unmount operations
  81. // safely (prevents more than one operation from being triggered on the same
  82. // volume)
  83. // mounter - mounter passed in from kubelet, passed down unmount path
  84. // hostutil - hostutil passed in from kubelet
  85. // volumePluginMgr - volume plugin manager passed from kubelet
  86. func NewReconciler(
  87. kubeClient clientset.Interface,
  88. controllerAttachDetachEnabled bool,
  89. loopSleepDuration time.Duration,
  90. waitForAttachTimeout time.Duration,
  91. nodeName types.NodeName,
  92. desiredStateOfWorld cache.DesiredStateOfWorld,
  93. actualStateOfWorld cache.ActualStateOfWorld,
  94. populatorHasAddedPods func() bool,
  95. operationExecutor operationexecutor.OperationExecutor,
  96. mounter mount.Interface,
  97. hostutil hostutil.HostUtils,
  98. volumePluginMgr *volumepkg.VolumePluginMgr,
  99. kubeletPodsDir string) Reconciler {
  100. return &reconciler{
  101. kubeClient: kubeClient,
  102. controllerAttachDetachEnabled: controllerAttachDetachEnabled,
  103. loopSleepDuration: loopSleepDuration,
  104. waitForAttachTimeout: waitForAttachTimeout,
  105. nodeName: nodeName,
  106. desiredStateOfWorld: desiredStateOfWorld,
  107. actualStateOfWorld: actualStateOfWorld,
  108. populatorHasAddedPods: populatorHasAddedPods,
  109. operationExecutor: operationExecutor,
  110. mounter: mounter,
  111. hostutil: hostutil,
  112. volumePluginMgr: volumePluginMgr,
  113. kubeletPodsDir: kubeletPodsDir,
  114. timeOfLastSync: time.Time{},
  115. }
  116. }
  117. type reconciler struct {
  118. kubeClient clientset.Interface
  119. controllerAttachDetachEnabled bool
  120. loopSleepDuration time.Duration
  121. waitForAttachTimeout time.Duration
  122. nodeName types.NodeName
  123. desiredStateOfWorld cache.DesiredStateOfWorld
  124. actualStateOfWorld cache.ActualStateOfWorld
  125. populatorHasAddedPods func() bool
  126. operationExecutor operationexecutor.OperationExecutor
  127. mounter mount.Interface
  128. hostutil hostutil.HostUtils
  129. volumePluginMgr *volumepkg.VolumePluginMgr
  130. kubeletPodsDir string
  131. timeOfLastSync time.Time
  132. }
  133. func (rc *reconciler) Run(stopCh <-chan struct{}) {
  134. wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
  135. }
  136. func (rc *reconciler) reconciliationLoopFunc() func() {
  137. return func() {
  138. rc.reconcile()
  139. // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
  140. // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
  141. // desired state of world does not contain a complete list of pods.
  142. if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
  143. klog.Infof("Reconciler: start to sync state")
  144. rc.sync()
  145. }
  146. }
  147. }
  148. func (rc *reconciler) reconcile() {
  149. // Unmounts are triggered before mounts so that a volume that was
  150. // referenced by a pod that was deleted and is now referenced by another
  151. // pod is unmounted from the first pod before being mounted to the new
  152. // pod.
  153. rc.unmountVolumes()
  154. // Next we mount required volumes. This function could also trigger
  155. // attach if kubelet is responsible for attaching volumes.
  156. // If underlying PVC was resized while in-use then this function also handles volume
  157. // resizing.
  158. rc.mountAttachVolumes()
  159. // Ensure devices that should be detached/unmounted are detached/unmounted.
  160. rc.unmountDetachDevices()
  161. }
  162. func (rc *reconciler) unmountVolumes() {
  163. // Ensure volumes that should be unmounted are unmounted.
  164. for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
  165. if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
  166. // Volume is mounted, unmount it
  167. klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
  168. err := rc.operationExecutor.UnmountVolume(
  169. mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
  170. if err != nil &&
  171. !nestedpendingoperations.IsAlreadyExists(err) &&
  172. !exponentialbackoff.IsExponentialBackoff(err) {
  173. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  174. // Log all other errors.
  175. klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  176. }
  177. if err == nil {
  178. klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
  179. }
  180. }
  181. }
  182. }
  183. func (rc *reconciler) mountAttachVolumes() {
  184. // Ensure volumes that should be attached/mounted are attached/mounted.
  185. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
  186. volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
  187. volumeToMount.DevicePath = devicePath
  188. if cache.IsVolumeNotAttachedError(err) {
  189. if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
  190. // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
  191. // for controller to finish attaching volume.
  192. klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
  193. err := rc.operationExecutor.VerifyControllerAttachedVolume(
  194. volumeToMount.VolumeToMount,
  195. rc.nodeName,
  196. rc.actualStateOfWorld)
  197. if err != nil &&
  198. !nestedpendingoperations.IsAlreadyExists(err) &&
  199. !exponentialbackoff.IsExponentialBackoff(err) {
  200. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  201. // Log all other errors.
  202. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  203. }
  204. if err == nil {
  205. klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
  206. }
  207. } else {
  208. // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
  209. // so attach it
  210. volumeToAttach := operationexecutor.VolumeToAttach{
  211. VolumeName: volumeToMount.VolumeName,
  212. VolumeSpec: volumeToMount.VolumeSpec,
  213. NodeName: rc.nodeName,
  214. }
  215. klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
  216. err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
  217. if err != nil &&
  218. !nestedpendingoperations.IsAlreadyExists(err) &&
  219. !exponentialbackoff.IsExponentialBackoff(err) {
  220. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  221. // Log all other errors.
  222. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  223. }
  224. if err == nil {
  225. klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
  226. }
  227. }
  228. } else if !volMounted || cache.IsRemountRequiredError(err) {
  229. // Volume is not mounted, or is already mounted, but requires remounting
  230. remountingLogStr := ""
  231. isRemount := cache.IsRemountRequiredError(err)
  232. if isRemount {
  233. remountingLogStr = "Volume is already mounted to pod, but remount was requested."
  234. }
  235. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
  236. err := rc.operationExecutor.MountVolume(
  237. rc.waitForAttachTimeout,
  238. volumeToMount.VolumeToMount,
  239. rc.actualStateOfWorld,
  240. isRemount)
  241. if err != nil &&
  242. !nestedpendingoperations.IsAlreadyExists(err) &&
  243. !exponentialbackoff.IsExponentialBackoff(err) {
  244. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  245. // Log all other errors.
  246. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  247. }
  248. if err == nil {
  249. if remountingLogStr == "" {
  250. klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
  251. } else {
  252. klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
  253. }
  254. }
  255. } else if cache.IsFSResizeRequiredError(err) &&
  256. utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
  257. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
  258. err := rc.operationExecutor.ExpandInUseVolume(
  259. volumeToMount.VolumeToMount,
  260. rc.actualStateOfWorld)
  261. if err != nil &&
  262. !nestedpendingoperations.IsAlreadyExists(err) &&
  263. !exponentialbackoff.IsExponentialBackoff(err) {
  264. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  265. // Log all other errors.
  266. klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
  267. }
  268. if err == nil {
  269. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
  270. }
  271. }
  272. }
  273. }
  274. func (rc *reconciler) unmountDetachDevices() {
  275. for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
  276. // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
  277. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
  278. !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
  279. if attachedVolume.DeviceMayBeMounted() {
  280. // Volume is globally mounted to device, unmount it
  281. klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
  282. err := rc.operationExecutor.UnmountDevice(
  283. attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
  284. if err != nil &&
  285. !nestedpendingoperations.IsAlreadyExists(err) &&
  286. !exponentialbackoff.IsExponentialBackoff(err) {
  287. // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
  288. // Log all other errors.
  289. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  290. }
  291. if err == nil {
  292. klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
  293. }
  294. } else {
  295. // Volume is attached to node, detach it
  296. // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
  297. if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
  298. rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
  299. klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
  300. } else {
  301. // Only detach if kubelet detach is enabled
  302. klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
  303. err := rc.operationExecutor.DetachVolume(
  304. attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
  305. if err != nil &&
  306. !nestedpendingoperations.IsAlreadyExists(err) &&
  307. !exponentialbackoff.IsExponentialBackoff(err) {
  308. // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
  309. // Log all other errors.
  310. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  311. }
  312. if err == nil {
  313. klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
  314. }
  315. }
  316. }
  317. }
  318. }
  319. }
  320. // sync process tries to observe the real world by scanning all pods' volume directories from the disk.
  321. // If the actual and desired state of worlds are not consistent with the observed world, it means that some
  322. // mounted volumes are left out probably during kubelet restart. This process will reconstruct
  323. // the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
  324. // it will try to clean up the mount paths with operation executor.
  325. func (rc *reconciler) sync() {
  326. defer rc.updateLastSyncTime()
  327. rc.syncStates()
  328. }
  329. func (rc *reconciler) updateLastSyncTime() {
  330. rc.timeOfLastSync = time.Now()
  331. }
  332. func (rc *reconciler) StatesHasBeenSynced() bool {
  333. return !rc.timeOfLastSync.IsZero()
  334. }
  335. type podVolume struct {
  336. podName volumetypes.UniquePodName
  337. volumeSpecName string
  338. volumePath string
  339. pluginName string
  340. volumeMode v1.PersistentVolumeMode
  341. }
  342. type reconstructedVolume struct {
  343. volumeName v1.UniqueVolumeName
  344. podName volumetypes.UniquePodName
  345. volumeSpec *volumepkg.Spec
  346. outerVolumeSpecName string
  347. pod *v1.Pod
  348. volumeGidValue string
  349. devicePath string
  350. mounter volumepkg.Mounter
  351. deviceMounter volumepkg.DeviceMounter
  352. blockVolumeMapper volumepkg.BlockVolumeMapper
  353. }
  354. // syncStates scans the volume directories under the given pod directory.
  355. // If the volume is not in desired state of world, this function will reconstruct
  356. // the volume related information and put it in both the actual and desired state of worlds.
  357. // For some volume plugins that cannot support reconstruction, it will clean up the existing
  358. // mount points since the volume is no long needed (removed from desired state)
  359. func (rc *reconciler) syncStates() {
  360. // Get volumes information by reading the pod's directory
  361. podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
  362. if err != nil {
  363. klog.Errorf("Cannot get volumes from disk %v", err)
  364. return
  365. }
  366. volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
  367. volumeNeedReport := []v1.UniqueVolumeName{}
  368. for _, volume := range podVolumes {
  369. if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
  370. klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
  371. // There is nothing to reconstruct
  372. continue
  373. }
  374. volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
  375. reconstructedVolume, err := rc.reconstructVolume(volume)
  376. if err != nil {
  377. if volumeInDSW {
  378. // Some pod needs the volume, don't clean it up and hope that
  379. // reconcile() calls SetUp and reconstructs the volume in ASW.
  380. klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
  381. continue
  382. }
  383. // No pod needs the volume.
  384. klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
  385. rc.cleanupMounts(volume)
  386. continue
  387. }
  388. if volumeInDSW {
  389. // Some pod needs the volume. And it exists on disk. Some previous
  390. // kubelet must have created the directory, therefore it must have
  391. // reported the volume as in use. Mark the volume as in use also in
  392. // this new kubelet so reconcile() calls SetUp and re-mounts the
  393. // volume if it's necessary.
  394. volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
  395. klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName)
  396. continue
  397. }
  398. // There is no pod that uses the volume.
  399. if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
  400. klog.Warning("Volume is in pending operation, skip cleaning up mounts")
  401. }
  402. klog.V(2).Infof(
  403. "Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
  404. reconstructedVolume)
  405. volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
  406. }
  407. if len(volumesNeedUpdate) > 0 {
  408. if err = rc.updateStates(volumesNeedUpdate); err != nil {
  409. klog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
  410. }
  411. }
  412. if len(volumeNeedReport) > 0 {
  413. rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
  414. }
  415. }
  416. func (rc *reconciler) cleanupMounts(volume podVolume) {
  417. klog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points",
  418. volume.podName, volume.volumeSpecName)
  419. mountedVolume := operationexecutor.MountedVolume{
  420. PodName: volume.podName,
  421. VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
  422. InnerVolumeSpecName: volume.volumeSpecName,
  423. PluginName: volume.pluginName,
  424. PodUID: types.UID(volume.podName),
  425. }
  426. // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
  427. // to unmount both volume and device in the same routine.
  428. err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
  429. if err != nil {
  430. klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
  431. return
  432. }
  433. }
  434. // Reconstruct volume data structure by reading the pod's volume directories
  435. func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
  436. // plugin initializations
  437. plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
  438. if err != nil {
  439. return nil, err
  440. }
  441. attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName)
  442. if err != nil {
  443. return nil, err
  444. }
  445. deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginByName(volume.pluginName)
  446. if err != nil {
  447. return nil, err
  448. }
  449. // Create pod object
  450. pod := &v1.Pod{
  451. ObjectMeta: metav1.ObjectMeta{
  452. UID: types.UID(volume.podName),
  453. },
  454. }
  455. mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
  456. if err != nil {
  457. return nil, err
  458. }
  459. // TODO: remove feature gate check after no longer needed
  460. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
  461. return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
  462. }
  463. volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
  464. volume.volumeMode,
  465. plugin,
  466. mapperPlugin,
  467. pod.UID,
  468. volume.podName,
  469. volume.volumeSpecName,
  470. volume.volumePath,
  471. volume.pluginName)
  472. if err != nil {
  473. return nil, err
  474. }
  475. var uniqueVolumeName v1.UniqueVolumeName
  476. if attachablePlugin != nil || deviceMountablePlugin != nil {
  477. uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
  478. if err != nil {
  479. return nil, err
  480. }
  481. } else {
  482. uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
  483. }
  484. var volumeMapper volumepkg.BlockVolumeMapper
  485. var volumeMounter volumepkg.Mounter
  486. var deviceMounter volumepkg.DeviceMounter
  487. // Path to the mount or block device to check
  488. var checkPath string
  489. // TODO: remove feature gate check after no longer needed
  490. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock {
  491. var newMapperErr error
  492. volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
  493. volumeSpec,
  494. pod,
  495. volumepkg.VolumeOptions{})
  496. if newMapperErr != nil {
  497. return nil, fmt.Errorf(
  498. "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
  499. uniqueVolumeName,
  500. volumeSpec.Name(),
  501. volume.podName,
  502. pod.UID,
  503. newMapperErr)
  504. }
  505. mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
  506. checkPath = filepath.Join(mapDir, linkName)
  507. } else {
  508. var err error
  509. volumeMounter, err = plugin.NewMounter(
  510. volumeSpec,
  511. pod,
  512. volumepkg.VolumeOptions{})
  513. if err != nil {
  514. return nil, fmt.Errorf(
  515. "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
  516. uniqueVolumeName,
  517. volumeSpec.Name(),
  518. volume.podName,
  519. pod.UID,
  520. err)
  521. }
  522. checkPath = volumeMounter.GetPath()
  523. if deviceMountablePlugin != nil {
  524. deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
  525. if err != nil {
  526. return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
  527. uniqueVolumeName,
  528. volumeSpec.Name(),
  529. volume.podName,
  530. pod.UID,
  531. err)
  532. }
  533. }
  534. }
  535. // Check existence of mount point for filesystem volume or symbolic link for block volume
  536. isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
  537. if checkErr != nil {
  538. return nil, checkErr
  539. }
  540. // If mount or symlink doesn't exist, volume reconstruction should be failed
  541. if !isExist {
  542. return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
  543. }
  544. reconstructedVolume := &reconstructedVolume{
  545. volumeName: uniqueVolumeName,
  546. podName: volume.podName,
  547. volumeSpec: volumeSpec,
  548. // volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
  549. // for volume cleanup.
  550. // TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
  551. outerVolumeSpecName: volume.volumeSpecName,
  552. pod: pod,
  553. deviceMounter: deviceMounter,
  554. volumeGidValue: "",
  555. // devicePath is updated during updateStates() by checking node status's VolumesAttached data.
  556. // TODO: get device path directly from the volume mount path.
  557. devicePath: "",
  558. mounter: volumeMounter,
  559. blockVolumeMapper: volumeMapper,
  560. }
  561. return reconstructedVolume, nil
  562. }
  563. // updateDevicePath gets the node status to retrieve volume device path information.
  564. func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
  565. node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
  566. if fetchErr != nil {
  567. klog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
  568. } else {
  569. for _, attachedVolume := range node.Status.VolumesAttached {
  570. if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
  571. volume.devicePath = attachedVolume.DevicePath
  572. volumesNeedUpdate[attachedVolume.Name] = volume
  573. klog.V(4).Infof("Update devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
  574. }
  575. }
  576. }
  577. }
  578. // getDeviceMountPath returns device mount path for block volume which
  579. // implements BlockVolumeMapper or filesystem volume which implements
  580. // DeviceMounter
  581. func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
  582. if volume.blockVolumeMapper != nil {
  583. // for block volume, we return its global map path
  584. return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
  585. } else if volume.deviceMounter != nil {
  586. // for filesystem volume, we return its device mount path if the plugin implements DeviceMounter
  587. return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec)
  588. } else {
  589. return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
  590. }
  591. }
  592. func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
  593. // Get the node status to retrieve volume device path information.
  594. rc.updateDevicePath(volumesNeedUpdate)
  595. for _, volume := range volumesNeedUpdate {
  596. err := rc.actualStateOfWorld.MarkVolumeAsAttached(
  597. //TODO: the devicePath might not be correct for some volume plugins: see issue #54108
  598. volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
  599. if err != nil {
  600. klog.Errorf("Could not add volume information to actual state of world: %v", err)
  601. continue
  602. }
  603. markVolumeOpts := operationexecutor.MarkVolumeOpts{
  604. PodName: volume.podName,
  605. PodUID: types.UID(volume.podName),
  606. VolumeName: volume.volumeName,
  607. Mounter: volume.mounter,
  608. BlockVolumeMapper: volume.blockVolumeMapper,
  609. OuterVolumeSpecName: volume.outerVolumeSpecName,
  610. VolumeGidVolume: volume.volumeGidValue,
  611. VolumeSpec: volume.volumeSpec,
  612. VolumeMountState: operationexecutor.VolumeMounted,
  613. }
  614. err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
  615. if err != nil {
  616. klog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
  617. continue
  618. }
  619. klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
  620. // If the volume has device to mount, we mark its device as mounted.
  621. if volume.deviceMounter != nil || volume.blockVolumeMapper != nil {
  622. deviceMountPath, err := getDeviceMountPath(volume)
  623. if err != nil {
  624. klog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
  625. continue
  626. }
  627. err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
  628. if err != nil {
  629. klog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
  630. continue
  631. }
  632. klog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName)
  633. }
  634. }
  635. return nil
  636. }
  637. // getVolumesFromPodDir scans through the volumes directories under the given pod directory.
  638. // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
  639. // and volume spec name.
  640. func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
  641. podsDirInfo, err := ioutil.ReadDir(podDir)
  642. if err != nil {
  643. return nil, err
  644. }
  645. volumes := []podVolume{}
  646. for i := range podsDirInfo {
  647. if !podsDirInfo[i].IsDir() {
  648. continue
  649. }
  650. podName := podsDirInfo[i].Name()
  651. podDir := path.Join(podDir, podName)
  652. // Find filesystem volume information
  653. // ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
  654. volumesDirs := map[v1.PersistentVolumeMode]string{
  655. v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
  656. }
  657. // TODO: remove feature gate check after no longer needed
  658. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
  659. // Find block volume information
  660. // ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
  661. volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
  662. }
  663. for volumeMode, volumesDir := range volumesDirs {
  664. var volumesDirInfo []os.FileInfo
  665. if volumesDirInfo, err = ioutil.ReadDir(volumesDir); err != nil {
  666. // Just skip the loop because given volumesDir doesn't exist depending on volumeMode
  667. continue
  668. }
  669. for _, volumeDir := range volumesDirInfo {
  670. pluginName := volumeDir.Name()
  671. volumePluginPath := path.Join(volumesDir, pluginName)
  672. volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
  673. if err != nil {
  674. klog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err)
  675. continue
  676. }
  677. unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
  678. for _, volumeName := range volumePluginDirs {
  679. volumePath := path.Join(volumePluginPath, volumeName)
  680. klog.V(5).Infof("podName: %v, volume path from volume plugin directory: %v, ", podName, volumePath)
  681. volumes = append(volumes, podVolume{
  682. podName: volumetypes.UniquePodName(podName),
  683. volumeSpecName: volumeName,
  684. volumePath: volumePath,
  685. pluginName: unescapePluginName,
  686. volumeMode: volumeMode,
  687. })
  688. }
  689. }
  690. }
  691. }
  692. klog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
  693. return volumes, nil
  694. }