volume_manager.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package volumemanager
  14. import (
  15. "fmt"
  16. "sort"
  17. "strconv"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. k8stypes "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/tools/record"
  26. "k8s.io/klog"
  27. "k8s.io/kubernetes/pkg/kubelet/config"
  28. "k8s.io/kubernetes/pkg/kubelet/container"
  29. "k8s.io/kubernetes/pkg/kubelet/pod"
  30. "k8s.io/kubernetes/pkg/kubelet/status"
  31. "k8s.io/kubernetes/pkg/kubelet/util/format"
  32. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  33. "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
  34. "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
  35. "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
  36. "k8s.io/kubernetes/pkg/util/mount"
  37. "k8s.io/kubernetes/pkg/volume"
  38. "k8s.io/kubernetes/pkg/volume/util"
  39. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  40. "k8s.io/kubernetes/pkg/volume/util/types"
  41. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  42. )
  43. const (
  44. // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
  45. // between successive executions
  46. reconcilerLoopSleepPeriod = 100 * time.Millisecond
  47. // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
  48. // DesiredStateOfWorldPopulator loop waits between successive executions
  49. desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
  50. // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
  51. // time the DesiredStateOfWorldPopulator loop waits between successive pod
  52. // cleanup calls (to prevent calling containerruntime.GetPodStatus too
  53. // frequently).
  54. desiredStateOfWorldPopulatorGetPodStatusRetryDuration = 2 * time.Second
  55. // podAttachAndMountTimeout is the maximum amount of time the
  56. // WaitForAttachAndMount call will wait for all volumes in the specified pod
  57. // to be attached and mounted. Even though cloud operations can take several
  58. // minutes to complete, we set the timeout to 2 minutes because kubelet
  59. // will retry in the next sync iteration. This frees the associated
  60. // goroutine of the pod to process newer updates if needed (e.g., a delete
  61. // request to the pod).
  62. // Value is slightly offset from 2 minutes to make timeouts due to this
  63. // constant recognizable.
  64. podAttachAndMountTimeout = 2*time.Minute + 3*time.Second
  65. // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
  66. // call waits before retrying
  67. podAttachAndMountRetryInterval = 300 * time.Millisecond
  68. // waitForAttachTimeout is the maximum amount of time a
  69. // operationexecutor.Mount call will wait for a volume to be attached.
  70. // Set to 10 minutes because we've seen attach operations take several
  71. // minutes to complete for some volume plugins in some cases. While this
  72. // operation is waiting it only blocks other operations on the same device,
  73. // other devices are not affected.
  74. waitForAttachTimeout = 10 * time.Minute
  75. )
  76. // VolumeManager runs a set of asynchronous loops that figure out which volumes
  77. // need to be attached/mounted/unmounted/detached based on the pods scheduled on
  78. // this node and makes it so.
  79. type VolumeManager interface {
  80. // Starts the volume manager and all the asynchronous loops that it controls
  81. Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
  82. // WaitForAttachAndMount processes the volumes referenced in the specified
  83. // pod and blocks until they are all attached and mounted (reflected in
  84. // actual state of the world).
  85. // An error is returned if all volumes are not attached and mounted within
  86. // the duration defined in podAttachAndMountTimeout.
  87. WaitForAttachAndMount(pod *v1.Pod) error
  88. // GetMountedVolumesForPod returns a VolumeMap containing the volumes
  89. // referenced by the specified pod that are successfully attached and
  90. // mounted. The key in the map is the OuterVolumeSpecName (i.e.
  91. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  92. // volumes.
  93. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  94. // GetExtraSupplementalGroupsForPod returns a list of the extra
  95. // supplemental groups for the Pod. These extra supplemental groups come
  96. // from annotations on persistent volumes that the pod depends on.
  97. GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
  98. // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
  99. // interface and are currently in use according to the actual and desired
  100. // state of the world caches. A volume is considered "in use" as soon as it
  101. // is added to the desired state of world, indicating it *should* be
  102. // attached to this node and remains "in use" until it is removed from both
  103. // the desired state of the world and the actual state of the world, or it
  104. // has been unmounted (as indicated in actual state of world).
  105. GetVolumesInUse() []v1.UniqueVolumeName
  106. // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
  107. // has been synced at least once after kubelet starts so that it is safe to update mounted
  108. // volume list retrieved from actual state.
  109. ReconcilerStatesHasBeenSynced() bool
  110. // VolumeIsAttached returns true if the given volume is attached to this
  111. // node.
  112. VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
  113. // Marks the specified volume as having successfully been reported as "in
  114. // use" in the nodes's volume status.
  115. MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
  116. }
  117. // NewVolumeManager returns a new concrete instance implementing the
  118. // VolumeManager interface.
  119. //
  120. // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
  121. // to communicate with the API server to fetch PV and PVC objects
  122. // volumePluginMgr - the volume plugin manager used to access volume plugins.
  123. // Must be pre-initialized.
  124. func NewVolumeManager(
  125. controllerAttachDetachEnabled bool,
  126. nodeName k8stypes.NodeName,
  127. podManager pod.Manager,
  128. podStatusProvider status.PodStatusProvider,
  129. kubeClient clientset.Interface,
  130. volumePluginMgr *volume.VolumePluginMgr,
  131. kubeContainerRuntime container.Runtime,
  132. mounter mount.Interface,
  133. kubeletPodsDir string,
  134. recorder record.EventRecorder,
  135. checkNodeCapabilitiesBeforeMount bool,
  136. keepTerminatedPodVolumes bool) VolumeManager {
  137. vm := &volumeManager{
  138. kubeClient: kubeClient,
  139. volumePluginMgr: volumePluginMgr,
  140. desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
  141. actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
  142. operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  143. kubeClient,
  144. volumePluginMgr,
  145. recorder,
  146. checkNodeCapabilitiesBeforeMount,
  147. volumepathhandler.NewBlockVolumePathHandler())),
  148. }
  149. vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
  150. kubeClient,
  151. desiredStateOfWorldPopulatorLoopSleepPeriod,
  152. desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
  153. podManager,
  154. podStatusProvider,
  155. vm.desiredStateOfWorld,
  156. vm.actualStateOfWorld,
  157. kubeContainerRuntime,
  158. keepTerminatedPodVolumes)
  159. vm.reconciler = reconciler.NewReconciler(
  160. kubeClient,
  161. controllerAttachDetachEnabled,
  162. reconcilerLoopSleepPeriod,
  163. waitForAttachTimeout,
  164. nodeName,
  165. vm.desiredStateOfWorld,
  166. vm.actualStateOfWorld,
  167. vm.desiredStateOfWorldPopulator.HasAddedPods,
  168. vm.operationExecutor,
  169. mounter,
  170. volumePluginMgr,
  171. kubeletPodsDir)
  172. return vm
  173. }
  174. // volumeManager implements the VolumeManager interface
  175. type volumeManager struct {
  176. // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
  177. // communicate with the API server to fetch PV and PVC objects
  178. kubeClient clientset.Interface
  179. // volumePluginMgr is the volume plugin manager used to access volume
  180. // plugins. It must be pre-initialized.
  181. volumePluginMgr *volume.VolumePluginMgr
  182. // desiredStateOfWorld is a data structure containing the desired state of
  183. // the world according to the volume manager: i.e. what volumes should be
  184. // attached and which pods are referencing the volumes).
  185. // The data structure is populated by the desired state of the world
  186. // populator using the kubelet pod manager.
  187. desiredStateOfWorld cache.DesiredStateOfWorld
  188. // actualStateOfWorld is a data structure containing the actual state of
  189. // the world according to the manager: i.e. which volumes are attached to
  190. // this node and what pods the volumes are mounted to.
  191. // The data structure is populated upon successful completion of attach,
  192. // detach, mount, and unmount actions triggered by the reconciler.
  193. actualStateOfWorld cache.ActualStateOfWorld
  194. // operationExecutor is used to start asynchronous attach, detach, mount,
  195. // and unmount operations.
  196. operationExecutor operationexecutor.OperationExecutor
  197. // reconciler runs an asynchronous periodic loop to reconcile the
  198. // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
  199. // detach, mount, and unmount operations using the operationExecutor.
  200. reconciler reconciler.Reconciler
  201. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  202. // populate the desiredStateOfWorld using the kubelet PodManager.
  203. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  204. }
  205. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
  206. defer runtime.HandleCrash()
  207. go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
  208. klog.V(2).Infof("The desired_state_of_world populator starts")
  209. klog.Infof("Starting Kubelet Volume Manager")
  210. go vm.reconciler.Run(stopCh)
  211. metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
  212. if vm.kubeClient != nil {
  213. // start informer for CSIDriver
  214. vm.volumePluginMgr.Run(stopCh)
  215. }
  216. <-stopCh
  217. klog.Infof("Shutting down Kubelet Volume Manager")
  218. }
  219. func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
  220. podVolumes := make(container.VolumeMap)
  221. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  222. podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
  223. Mounter: mountedVolume.Mounter,
  224. BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
  225. ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
  226. InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
  227. }
  228. }
  229. return podVolumes
  230. }
  231. func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
  232. podName := util.GetUniquePodName(pod)
  233. supplementalGroups := sets.NewString()
  234. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  235. if mountedVolume.VolumeGidValue != "" {
  236. supplementalGroups.Insert(mountedVolume.VolumeGidValue)
  237. }
  238. }
  239. result := make([]int64, 0, supplementalGroups.Len())
  240. for _, group := range supplementalGroups.List() {
  241. iGroup, extra := getExtraSupplementalGid(group, pod)
  242. if !extra {
  243. continue
  244. }
  245. result = append(result, int64(iGroup))
  246. }
  247. return result
  248. }
  249. func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
  250. // Report volumes in desired state of world and actual state of world so
  251. // that volumes are marked in use as soon as the decision is made that the
  252. // volume *should* be attached to this node until it is safely unmounted.
  253. desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
  254. allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
  255. volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes))
  256. desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes))
  257. for _, volume := range desiredVolumes {
  258. if volume.PluginIsAttachable {
  259. if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
  260. desiredVolumesMap[volume.VolumeName] = true
  261. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  262. }
  263. }
  264. }
  265. for _, volume := range allAttachedVolumes {
  266. if volume.PluginIsAttachable {
  267. if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
  268. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  269. }
  270. }
  271. }
  272. sort.Slice(volumesToReportInUse, func(i, j int) bool {
  273. return string(volumesToReportInUse[i]) < string(volumesToReportInUse[j])
  274. })
  275. return volumesToReportInUse
  276. }
  277. func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
  278. return vm.reconciler.StatesHasBeenSynced()
  279. }
  280. func (vm *volumeManager) VolumeIsAttached(
  281. volumeName v1.UniqueVolumeName) bool {
  282. return vm.actualStateOfWorld.VolumeExists(volumeName)
  283. }
  284. func (vm *volumeManager) MarkVolumesAsReportedInUse(
  285. volumesReportedAsInUse []v1.UniqueVolumeName) {
  286. vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
  287. }
  288. func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
  289. if pod == nil {
  290. return nil
  291. }
  292. expectedVolumes := getExpectedVolumes(pod)
  293. if len(expectedVolumes) == 0 {
  294. // No volumes to verify
  295. return nil
  296. }
  297. klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
  298. uniquePodName := util.GetUniquePodName(pod)
  299. // Some pods expect to have Setup called over and over again to update.
  300. // Remount plugins for which this is true. (Atomically updating volumes,
  301. // like Downward API, depend on this to update the contents of the volume).
  302. vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
  303. err := wait.PollImmediate(
  304. podAttachAndMountRetryInterval,
  305. podAttachAndMountTimeout,
  306. vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
  307. if err != nil {
  308. // Timeout expired
  309. unmountedVolumes :=
  310. vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
  311. // Also get unattached volumes for error message
  312. unattachedVolumes :=
  313. vm.getUnattachedVolumes(expectedVolumes)
  314. if len(unmountedVolumes) == 0 {
  315. return nil
  316. }
  317. return fmt.Errorf(
  318. "timeout expired waiting for volumes to attach or mount for pod %q/%q. list of unmounted volumes=%v. list of unattached volumes=%v",
  319. pod.Namespace,
  320. pod.Name,
  321. unmountedVolumes,
  322. unattachedVolumes)
  323. }
  324. klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
  325. return nil
  326. }
  327. // getUnattachedVolumes returns a list of the volumes that are expected to be attached but
  328. // are not currently attached to the node
  329. func (vm *volumeManager) getUnattachedVolumes(expectedVolumes []string) []string {
  330. unattachedVolumes := []string{}
  331. for _, volume := range expectedVolumes {
  332. if !vm.actualStateOfWorld.VolumeExists(v1.UniqueVolumeName(volume)) {
  333. unattachedVolumes = append(unattachedVolumes, volume)
  334. }
  335. }
  336. return unattachedVolumes
  337. }
  338. // verifyVolumesMountedFunc returns a method that returns true when all expected
  339. // volumes are mounted.
  340. func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
  341. return func() (done bool, err error) {
  342. return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
  343. }
  344. }
  345. // getUnmountedVolumes fetches the current list of mounted volumes from
  346. // the actual state of the world, and uses it to process the list of
  347. // expectedVolumes. It returns a list of unmounted volumes.
  348. func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
  349. mountedVolumes := sets.NewString()
  350. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  351. mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
  352. }
  353. return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
  354. }
  355. // filterUnmountedVolumes adds each element of expectedVolumes that is not in
  356. // mountedVolumes to a list of unmountedVolumes and returns it.
  357. func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
  358. unmountedVolumes := []string{}
  359. for _, expectedVolume := range expectedVolumes {
  360. if !mountedVolumes.Has(expectedVolume) {
  361. unmountedVolumes = append(unmountedVolumes, expectedVolume)
  362. }
  363. }
  364. return unmountedVolumes
  365. }
  366. // getExpectedVolumes returns a list of volumes that must be mounted in order to
  367. // consider the volume setup step for this pod satisfied.
  368. func getExpectedVolumes(pod *v1.Pod) []string {
  369. expectedVolumes := []string{}
  370. for _, podVolume := range pod.Spec.Volumes {
  371. expectedVolumes = append(expectedVolumes, podVolume.Name)
  372. }
  373. return expectedVolumes
  374. }
  375. // getExtraSupplementalGid returns the value of an extra supplemental GID as
  376. // defined by an annotation on a volume and a boolean indicating whether the
  377. // volume defined a GID that the pod doesn't already request.
  378. func getExtraSupplementalGid(volumeGidValue string, pod *v1.Pod) (int64, bool) {
  379. if volumeGidValue == "" {
  380. return 0, false
  381. }
  382. gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
  383. if err != nil {
  384. return 0, false
  385. }
  386. if pod.Spec.SecurityContext != nil {
  387. for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
  388. if gid == int64(existingGid) {
  389. return 0, false
  390. }
  391. }
  392. }
  393. return gid, true
  394. }