volume_manager.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package volumemanager
  14. import (
  15. "errors"
  16. "fmt"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "k8s.io/klog"
  22. "k8s.io/utils/mount"
  23. v1 "k8s.io/api/core/v1"
  24. k8stypes "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/sets"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/tools/record"
  30. csitrans "k8s.io/csi-translation-lib"
  31. "k8s.io/kubernetes/pkg/kubelet/config"
  32. "k8s.io/kubernetes/pkg/kubelet/container"
  33. "k8s.io/kubernetes/pkg/kubelet/pod"
  34. "k8s.io/kubernetes/pkg/kubelet/status"
  35. "k8s.io/kubernetes/pkg/kubelet/util/format"
  36. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  37. "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
  38. "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
  39. "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
  40. "k8s.io/kubernetes/pkg/volume"
  41. "k8s.io/kubernetes/pkg/volume/csimigration"
  42. "k8s.io/kubernetes/pkg/volume/util"
  43. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  44. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  45. "k8s.io/kubernetes/pkg/volume/util/types"
  46. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  47. )
  48. const (
  49. // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
  50. // between successive executions
  51. reconcilerLoopSleepPeriod = 100 * time.Millisecond
  52. // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
  53. // DesiredStateOfWorldPopulator loop waits between successive executions
  54. desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
  55. // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
  56. // time the DesiredStateOfWorldPopulator loop waits between successive pod
  57. // cleanup calls (to prevent calling containerruntime.GetPodStatus too
  58. // frequently).
  59. desiredStateOfWorldPopulatorGetPodStatusRetryDuration = 2 * time.Second
  60. // podAttachAndMountTimeout is the maximum amount of time the
  61. // WaitForAttachAndMount call will wait for all volumes in the specified pod
  62. // to be attached and mounted. Even though cloud operations can take several
  63. // minutes to complete, we set the timeout to 2 minutes because kubelet
  64. // will retry in the next sync iteration. This frees the associated
  65. // goroutine of the pod to process newer updates if needed (e.g., a delete
  66. // request to the pod).
  67. // Value is slightly offset from 2 minutes to make timeouts due to this
  68. // constant recognizable.
  69. podAttachAndMountTimeout = 2*time.Minute + 3*time.Second
  70. // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
  71. // call waits before retrying
  72. podAttachAndMountRetryInterval = 300 * time.Millisecond
  73. // waitForAttachTimeout is the maximum amount of time a
  74. // operationexecutor.Mount call will wait for a volume to be attached.
  75. // Set to 10 minutes because we've seen attach operations take several
  76. // minutes to complete for some volume plugins in some cases. While this
  77. // operation is waiting it only blocks other operations on the same device,
  78. // other devices are not affected.
  79. waitForAttachTimeout = 10 * time.Minute
  80. )
  81. // VolumeManager runs a set of asynchronous loops that figure out which volumes
  82. // need to be attached/mounted/unmounted/detached based on the pods scheduled on
  83. // this node and makes it so.
  84. type VolumeManager interface {
  85. // Starts the volume manager and all the asynchronous loops that it controls
  86. Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
  87. // WaitForAttachAndMount processes the volumes referenced in the specified
  88. // pod and blocks until they are all attached and mounted (reflected in
  89. // actual state of the world).
  90. // An error is returned if all volumes are not attached and mounted within
  91. // the duration defined in podAttachAndMountTimeout.
  92. WaitForAttachAndMount(pod *v1.Pod) error
  93. // GetMountedVolumesForPod returns a VolumeMap containing the volumes
  94. // referenced by the specified pod that are successfully attached and
  95. // mounted. The key in the map is the OuterVolumeSpecName (i.e.
  96. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  97. // volumes.
  98. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  99. // GetExtraSupplementalGroupsForPod returns a list of the extra
  100. // supplemental groups for the Pod. These extra supplemental groups come
  101. // from annotations on persistent volumes that the pod depends on.
  102. GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
  103. // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
  104. // interface and are currently in use according to the actual and desired
  105. // state of the world caches. A volume is considered "in use" as soon as it
  106. // is added to the desired state of world, indicating it *should* be
  107. // attached to this node and remains "in use" until it is removed from both
  108. // the desired state of the world and the actual state of the world, or it
  109. // has been unmounted (as indicated in actual state of world).
  110. GetVolumesInUse() []v1.UniqueVolumeName
  111. // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
  112. // has been synced at least once after kubelet starts so that it is safe to update mounted
  113. // volume list retrieved from actual state.
  114. ReconcilerStatesHasBeenSynced() bool
  115. // VolumeIsAttached returns true if the given volume is attached to this
  116. // node.
  117. VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
  118. // Marks the specified volume as having successfully been reported as "in
  119. // use" in the nodes's volume status.
  120. MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
  121. }
  122. // NewVolumeManager returns a new concrete instance implementing the
  123. // VolumeManager interface.
  124. //
  125. // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
  126. // to communicate with the API server to fetch PV and PVC objects
  127. // volumePluginMgr - the volume plugin manager used to access volume plugins.
  128. // Must be pre-initialized.
  129. func NewVolumeManager(
  130. controllerAttachDetachEnabled bool,
  131. nodeName k8stypes.NodeName,
  132. podManager pod.Manager,
  133. podStatusProvider status.PodStatusProvider,
  134. kubeClient clientset.Interface,
  135. volumePluginMgr *volume.VolumePluginMgr,
  136. kubeContainerRuntime container.Runtime,
  137. mounter mount.Interface,
  138. hostutil hostutil.HostUtils,
  139. kubeletPodsDir string,
  140. recorder record.EventRecorder,
  141. checkNodeCapabilitiesBeforeMount bool,
  142. keepTerminatedPodVolumes bool,
  143. blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
  144. vm := &volumeManager{
  145. kubeClient: kubeClient,
  146. volumePluginMgr: volumePluginMgr,
  147. desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
  148. actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
  149. operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  150. kubeClient,
  151. volumePluginMgr,
  152. recorder,
  153. checkNodeCapabilitiesBeforeMount,
  154. blockVolumePathHandler)),
  155. }
  156. intreeToCSITranslator := csitrans.New()
  157. csiMigratedPluginManager := csimigration.NewPluginManager(intreeToCSITranslator)
  158. vm.intreeToCSITranslator = intreeToCSITranslator
  159. vm.csiMigratedPluginManager = csiMigratedPluginManager
  160. vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
  161. kubeClient,
  162. desiredStateOfWorldPopulatorLoopSleepPeriod,
  163. desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
  164. podManager,
  165. podStatusProvider,
  166. vm.desiredStateOfWorld,
  167. vm.actualStateOfWorld,
  168. kubeContainerRuntime,
  169. keepTerminatedPodVolumes,
  170. csiMigratedPluginManager,
  171. intreeToCSITranslator)
  172. vm.reconciler = reconciler.NewReconciler(
  173. kubeClient,
  174. controllerAttachDetachEnabled,
  175. reconcilerLoopSleepPeriod,
  176. waitForAttachTimeout,
  177. nodeName,
  178. vm.desiredStateOfWorld,
  179. vm.actualStateOfWorld,
  180. vm.desiredStateOfWorldPopulator.HasAddedPods,
  181. vm.operationExecutor,
  182. mounter,
  183. hostutil,
  184. volumePluginMgr,
  185. kubeletPodsDir)
  186. return vm
  187. }
  188. // volumeManager implements the VolumeManager interface
  189. type volumeManager struct {
  190. // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
  191. // communicate with the API server to fetch PV and PVC objects
  192. kubeClient clientset.Interface
  193. // volumePluginMgr is the volume plugin manager used to access volume
  194. // plugins. It must be pre-initialized.
  195. volumePluginMgr *volume.VolumePluginMgr
  196. // desiredStateOfWorld is a data structure containing the desired state of
  197. // the world according to the volume manager: i.e. what volumes should be
  198. // attached and which pods are referencing the volumes).
  199. // The data structure is populated by the desired state of the world
  200. // populator using the kubelet pod manager.
  201. desiredStateOfWorld cache.DesiredStateOfWorld
  202. // actualStateOfWorld is a data structure containing the actual state of
  203. // the world according to the manager: i.e. which volumes are attached to
  204. // this node and what pods the volumes are mounted to.
  205. // The data structure is populated upon successful completion of attach,
  206. // detach, mount, and unmount actions triggered by the reconciler.
  207. actualStateOfWorld cache.ActualStateOfWorld
  208. // operationExecutor is used to start asynchronous attach, detach, mount,
  209. // and unmount operations.
  210. operationExecutor operationexecutor.OperationExecutor
  211. // reconciler runs an asynchronous periodic loop to reconcile the
  212. // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
  213. // detach, mount, and unmount operations using the operationExecutor.
  214. reconciler reconciler.Reconciler
  215. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  216. // populate the desiredStateOfWorld using the kubelet PodManager.
  217. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  218. // csiMigratedPluginManager keeps track of CSI migration status of plugins
  219. csiMigratedPluginManager csimigration.PluginManager
  220. // intreeToCSITranslator translates in-tree volume specs to CSI
  221. intreeToCSITranslator csimigration.InTreeToCSITranslator
  222. }
  223. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
  224. defer runtime.HandleCrash()
  225. go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
  226. klog.V(2).Infof("The desired_state_of_world populator starts")
  227. klog.Infof("Starting Kubelet Volume Manager")
  228. go vm.reconciler.Run(stopCh)
  229. metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
  230. if vm.kubeClient != nil {
  231. // start informer for CSIDriver
  232. vm.volumePluginMgr.Run(stopCh)
  233. }
  234. <-stopCh
  235. klog.Infof("Shutting down Kubelet Volume Manager")
  236. }
  237. func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
  238. podVolumes := make(container.VolumeMap)
  239. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  240. podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
  241. Mounter: mountedVolume.Mounter,
  242. BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
  243. ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
  244. InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
  245. }
  246. }
  247. return podVolumes
  248. }
  249. func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
  250. podName := util.GetUniquePodName(pod)
  251. supplementalGroups := sets.NewString()
  252. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  253. if mountedVolume.VolumeGidValue != "" {
  254. supplementalGroups.Insert(mountedVolume.VolumeGidValue)
  255. }
  256. }
  257. result := make([]int64, 0, supplementalGroups.Len())
  258. for _, group := range supplementalGroups.List() {
  259. iGroup, extra := getExtraSupplementalGid(group, pod)
  260. if !extra {
  261. continue
  262. }
  263. result = append(result, int64(iGroup))
  264. }
  265. return result
  266. }
  267. func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
  268. // Report volumes in desired state of world and actual state of world so
  269. // that volumes are marked in use as soon as the decision is made that the
  270. // volume *should* be attached to this node until it is safely unmounted.
  271. desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
  272. allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
  273. volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes))
  274. desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes))
  275. for _, volume := range desiredVolumes {
  276. if volume.PluginIsAttachable {
  277. if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
  278. desiredVolumesMap[volume.VolumeName] = true
  279. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  280. }
  281. }
  282. }
  283. for _, volume := range allAttachedVolumes {
  284. if volume.PluginIsAttachable {
  285. if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
  286. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  287. }
  288. }
  289. }
  290. sort.Slice(volumesToReportInUse, func(i, j int) bool {
  291. return string(volumesToReportInUse[i]) < string(volumesToReportInUse[j])
  292. })
  293. return volumesToReportInUse
  294. }
  295. func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
  296. return vm.reconciler.StatesHasBeenSynced()
  297. }
  298. func (vm *volumeManager) VolumeIsAttached(
  299. volumeName v1.UniqueVolumeName) bool {
  300. return vm.actualStateOfWorld.VolumeExists(volumeName)
  301. }
  302. func (vm *volumeManager) MarkVolumesAsReportedInUse(
  303. volumesReportedAsInUse []v1.UniqueVolumeName) {
  304. vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
  305. }
  306. func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
  307. if pod == nil {
  308. return nil
  309. }
  310. expectedVolumes := getExpectedVolumes(pod)
  311. if len(expectedVolumes) == 0 {
  312. // No volumes to verify
  313. return nil
  314. }
  315. klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
  316. uniquePodName := util.GetUniquePodName(pod)
  317. // Some pods expect to have Setup called over and over again to update.
  318. // Remount plugins for which this is true. (Atomically updating volumes,
  319. // like Downward API, depend on this to update the contents of the volume).
  320. vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
  321. err := wait.PollImmediate(
  322. podAttachAndMountRetryInterval,
  323. podAttachAndMountTimeout,
  324. vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
  325. if err != nil {
  326. unmountedVolumes :=
  327. vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
  328. // Also get unattached volumes for error message
  329. unattachedVolumes :=
  330. vm.getUnattachedVolumes(expectedVolumes)
  331. if len(unmountedVolumes) == 0 {
  332. return nil
  333. }
  334. return fmt.Errorf(
  335. "unmounted volumes=%v, unattached volumes=%v: %s",
  336. unmountedVolumes,
  337. unattachedVolumes,
  338. err)
  339. }
  340. klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
  341. return nil
  342. }
  343. // getUnattachedVolumes returns a list of the volumes that are expected to be attached but
  344. // are not currently attached to the node
  345. func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string {
  346. unattachedVolumes := []string{}
  347. for _, volume := range expectedVolumes {
  348. if !vm.actualStateOfWorld.VolumeExists(v1.UniqueVolumeName(volume)) {
  349. unattachedVolumes = append(unattachedVolumes, volume)
  350. }
  351. }
  352. return unattachedVolumes
  353. }
  354. // verifyVolumesMountedFunc returns a method that returns true when all expected
  355. // volumes are mounted.
  356. func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
  357. return func() (done bool, err error) {
  358. if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
  359. return true, errors.New(strings.Join(errs, "; "))
  360. }
  361. return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
  362. }
  363. }
  364. // getUnmountedVolumes fetches the current list of mounted volumes from
  365. // the actual state of the world, and uses it to process the list of
  366. // expectedVolumes. It returns a list of unmounted volumes.
  367. // The list also includes volume that may be mounted in uncertain state.
  368. func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
  369. mountedVolumes := sets.NewString()
  370. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  371. mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
  372. }
  373. return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
  374. }
  375. // filterUnmountedVolumes adds each element of expectedVolumes that is not in
  376. // mountedVolumes to a list of unmountedVolumes and returns it.
  377. func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
  378. unmountedVolumes := []string{}
  379. for _, expectedVolume := range expectedVolumes {
  380. if !mountedVolumes.Has(expectedVolume) {
  381. unmountedVolumes = append(unmountedVolumes, expectedVolume)
  382. }
  383. }
  384. return unmountedVolumes
  385. }
  386. // getExpectedVolumes returns a list of volumes that must be mounted in order to
  387. // consider the volume setup step for this pod satisfied.
  388. func getExpectedVolumes(pod *v1.Pod) []string {
  389. mounts, devices := util.GetPodVolumeNames(pod)
  390. return mounts.Union(devices).UnsortedList()
  391. }
  392. // getExtraSupplementalGid returns the value of an extra supplemental GID as
  393. // defined by an annotation on a volume and a boolean indicating whether the
  394. // volume defined a GID that the pod doesn't already request.
  395. func getExtraSupplementalGid(volumeGidValue string, pod *v1.Pod) (int64, bool) {
  396. if volumeGidValue == "" {
  397. return 0, false
  398. }
  399. gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
  400. if err != nil {
  401. return 0, false
  402. }
  403. if pod.Spec.SecurityContext != nil {
  404. for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
  405. if gid == int64(existingGid) {
  406. return 0, false
  407. }
  408. }
  409. }
  410. return gid, true
  411. }