attach_detach_controller.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package attachdetach implements a controller to manage volume attach and detach
  14. // operations.
  15. package attachdetach
  16. import (
  17. "fmt"
  18. "net"
  19. "time"
  20. authenticationv1 "k8s.io/api/authentication/v1"
  21. "k8s.io/api/core/v1"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. coreinformers "k8s.io/client-go/informers/core/v1"
  29. storageinformers "k8s.io/client-go/informers/storage/v1beta1"
  30. clientset "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/kubernetes/scheme"
  32. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  33. corelisters "k8s.io/client-go/listers/core/v1"
  34. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  35. kcache "k8s.io/client-go/tools/cache"
  36. "k8s.io/client-go/tools/record"
  37. "k8s.io/client-go/util/workqueue"
  38. cloudprovider "k8s.io/cloud-provider"
  39. "k8s.io/klog"
  40. "k8s.io/kubernetes/pkg/controller"
  41. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  42. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
  43. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
  44. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
  45. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
  46. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
  47. "k8s.io/kubernetes/pkg/features"
  48. "k8s.io/kubernetes/pkg/util/mount"
  49. "k8s.io/kubernetes/pkg/volume"
  50. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  51. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  52. "k8s.io/kubernetes/pkg/volume/util/subpath"
  53. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  54. )
  55. // TimerConfig contains configuration of internal attach/detach timers and
  56. // should be used only to speed up tests. DefaultTimerConfig is the suggested
  57. // timer configuration for production.
  58. type TimerConfig struct {
  59. // ReconcilerLoopPeriod is the amount of time the reconciler loop waits
  60. // between successive executions
  61. ReconcilerLoopPeriod time.Duration
  62. // ReconcilerMaxWaitForUnmountDuration is the maximum amount of time the
  63. // attach detach controller will wait for a volume to be safely unmounted
  64. // from its node. Once this time has expired, the controller will assume the
  65. // node or kubelet are unresponsive and will detach the volume anyway.
  66. ReconcilerMaxWaitForUnmountDuration time.Duration
  67. // DesiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
  68. // DesiredStateOfWorldPopulator loop waits between successive executions
  69. DesiredStateOfWorldPopulatorLoopSleepPeriod time.Duration
  70. // DesiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
  71. // time the DesiredStateOfWorldPopulator loop waits between list pods
  72. // calls.
  73. DesiredStateOfWorldPopulatorListPodsRetryDuration time.Duration
  74. }
  75. // DefaultTimerConfig is the default configuration of Attach/Detach controller
  76. // timers.
  77. var DefaultTimerConfig TimerConfig = TimerConfig{
  78. ReconcilerLoopPeriod: 100 * time.Millisecond,
  79. ReconcilerMaxWaitForUnmountDuration: 6 * time.Minute,
  80. DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Minute,
  81. DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Minute,
  82. }
  83. // AttachDetachController defines the operations supported by this controller.
  84. type AttachDetachController interface {
  85. Run(stopCh <-chan struct{})
  86. GetDesiredStateOfWorld() cache.DesiredStateOfWorld
  87. }
  88. // NewAttachDetachController returns a new instance of AttachDetachController.
  89. func NewAttachDetachController(
  90. kubeClient clientset.Interface,
  91. podInformer coreinformers.PodInformer,
  92. nodeInformer coreinformers.NodeInformer,
  93. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  94. pvInformer coreinformers.PersistentVolumeInformer,
  95. csiNodeInformer storageinformers.CSINodeInformer,
  96. csiDriverInformer storageinformers.CSIDriverInformer,
  97. cloud cloudprovider.Interface,
  98. plugins []volume.VolumePlugin,
  99. prober volume.DynamicPluginProber,
  100. disableReconciliationSync bool,
  101. reconcilerSyncDuration time.Duration,
  102. timerConfig TimerConfig) (AttachDetachController, error) {
  103. // TODO: The default resyncPeriod for shared informers is 12 hours, this is
  104. // unacceptable for the attach/detach controller. For example, if a pod is
  105. // skipped because the node it is scheduled to didn't set its annotation in
  106. // time, we don't want to have to wait 12hrs before processing the pod
  107. // again.
  108. // Luckily https://github.com/kubernetes/kubernetes/issues/23394 is being
  109. // worked on and will split resync in to resync and relist. Once that
  110. // happens the resync period can be set to something much faster (30
  111. // seconds).
  112. // If that issue is not resolved in time, then this controller will have to
  113. // consider some unappealing alternate options: use a non-shared informer
  114. // and set a faster resync period even if it causes relist, or requeue
  115. // dropped pods so they are continuously processed until it is accepted or
  116. // deleted (probably can't do this with sharedInformer), etc.
  117. adc := &attachDetachController{
  118. kubeClient: kubeClient,
  119. pvcLister: pvcInformer.Lister(),
  120. pvcsSynced: pvcInformer.Informer().HasSynced,
  121. pvLister: pvInformer.Lister(),
  122. pvsSynced: pvInformer.Informer().HasSynced,
  123. podLister: podInformer.Lister(),
  124. podsSynced: podInformer.Informer().HasSynced,
  125. podIndexer: podInformer.Informer().GetIndexer(),
  126. nodeLister: nodeInformer.Lister(),
  127. nodesSynced: nodeInformer.Informer().HasSynced,
  128. cloud: cloud,
  129. pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
  130. }
  131. if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
  132. utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  133. adc.csiNodeLister = csiNodeInformer.Lister()
  134. adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
  135. }
  136. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  137. adc.csiDriverLister = csiDriverInformer.Lister()
  138. adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
  139. }
  140. if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
  141. return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
  142. }
  143. eventBroadcaster := record.NewBroadcaster()
  144. eventBroadcaster.StartLogging(klog.Infof)
  145. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  146. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
  147. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  148. adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
  149. adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
  150. adc.attacherDetacher =
  151. operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  152. kubeClient,
  153. &adc.volumePluginMgr,
  154. recorder,
  155. false, // flag for experimental binary check for volume mount
  156. blkutil))
  157. adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
  158. kubeClient, nodeInformer.Lister(), adc.actualStateOfWorld)
  159. // Default these to values in options
  160. adc.reconciler = reconciler.NewReconciler(
  161. timerConfig.ReconcilerLoopPeriod,
  162. timerConfig.ReconcilerMaxWaitForUnmountDuration,
  163. reconcilerSyncDuration,
  164. disableReconciliationSync,
  165. adc.desiredStateOfWorld,
  166. adc.actualStateOfWorld,
  167. adc.attacherDetacher,
  168. adc.nodeStatusUpdater,
  169. recorder)
  170. adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
  171. timerConfig.DesiredStateOfWorldPopulatorLoopSleepPeriod,
  172. timerConfig.DesiredStateOfWorldPopulatorListPodsRetryDuration,
  173. podInformer.Lister(),
  174. adc.desiredStateOfWorld,
  175. &adc.volumePluginMgr,
  176. pvcInformer.Lister(),
  177. pvInformer.Lister())
  178. podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  179. AddFunc: adc.podAdd,
  180. UpdateFunc: adc.podUpdate,
  181. DeleteFunc: adc.podDelete,
  182. })
  183. // This custom indexer will index pods by its PVC keys. Then we don't need
  184. // to iterate all pods every time to find pods which reference given PVC.
  185. adc.podIndexer.AddIndexers(kcache.Indexers{
  186. pvcKeyIndex: indexByPVCKey,
  187. })
  188. nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  189. AddFunc: adc.nodeAdd,
  190. UpdateFunc: adc.nodeUpdate,
  191. DeleteFunc: adc.nodeDelete,
  192. })
  193. pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  194. AddFunc: func(obj interface{}) {
  195. adc.enqueuePVC(obj)
  196. },
  197. UpdateFunc: func(old, new interface{}) {
  198. adc.enqueuePVC(new)
  199. },
  200. })
  201. return adc, nil
  202. }
  203. const (
  204. pvcKeyIndex string = "pvcKey"
  205. )
  206. // indexByPVCKey returns PVC keys for given pod. Note that the index is only
  207. // used for attaching, so we are only interested in active pods with nodeName
  208. // set.
  209. func indexByPVCKey(obj interface{}) ([]string, error) {
  210. pod, ok := obj.(*v1.Pod)
  211. if !ok {
  212. return []string{}, nil
  213. }
  214. if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
  215. return []string{}, nil
  216. }
  217. keys := []string{}
  218. for _, podVolume := range pod.Spec.Volumes {
  219. if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
  220. keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
  221. }
  222. }
  223. return keys, nil
  224. }
  225. type attachDetachController struct {
  226. // kubeClient is the kube API client used by volumehost to communicate with
  227. // the API server.
  228. kubeClient clientset.Interface
  229. // pvcLister is the shared PVC lister used to fetch and store PVC
  230. // objects from the API server. It is shared with other controllers and
  231. // therefore the PVC objects in its store should be treated as immutable.
  232. pvcLister corelisters.PersistentVolumeClaimLister
  233. pvcsSynced kcache.InformerSynced
  234. // pvLister is the shared PV lister used to fetch and store PV objects
  235. // from the API server. It is shared with other controllers and therefore
  236. // the PV objects in its store should be treated as immutable.
  237. pvLister corelisters.PersistentVolumeLister
  238. pvsSynced kcache.InformerSynced
  239. podLister corelisters.PodLister
  240. podsSynced kcache.InformerSynced
  241. podIndexer kcache.Indexer
  242. nodeLister corelisters.NodeLister
  243. nodesSynced kcache.InformerSynced
  244. csiNodeLister storagelisters.CSINodeLister
  245. csiNodeSynced kcache.InformerSynced
  246. // csiDriverLister is the shared CSIDriver lister used to fetch and store
  247. // CSIDriver objects from the API server. It is shared with other controllers
  248. // and therefore the CSIDriver objects in its store should be treated as immutable.
  249. csiDriverLister storagelisters.CSIDriverLister
  250. csiDriversSynced kcache.InformerSynced
  251. // cloud provider used by volume host
  252. cloud cloudprovider.Interface
  253. // volumePluginMgr used to initialize and fetch volume plugins
  254. volumePluginMgr volume.VolumePluginMgr
  255. // desiredStateOfWorld is a data structure containing the desired state of
  256. // the world according to this controller: i.e. what nodes the controller
  257. // is managing, what volumes it wants be attached to these nodes, and which
  258. // pods are scheduled to those nodes referencing the volumes.
  259. // The data structure is populated by the controller using a stream of node
  260. // and pod API server objects fetched by the informers.
  261. desiredStateOfWorld cache.DesiredStateOfWorld
  262. // actualStateOfWorld is a data structure containing the actual state of
  263. // the world according to this controller: i.e. which volumes are attached
  264. // to which nodes.
  265. // The data structure is populated upon successful completion of attach and
  266. // detach actions triggered by the controller and a periodic sync with
  267. // storage providers for the "true" state of the world.
  268. actualStateOfWorld cache.ActualStateOfWorld
  269. // attacherDetacher is used to start asynchronous attach and operations
  270. attacherDetacher operationexecutor.OperationExecutor
  271. // reconciler is used to run an asynchronous periodic loop to reconcile the
  272. // desiredStateOfWorld with the actualStateOfWorld by triggering attach
  273. // detach operations using the attacherDetacher.
  274. reconciler reconciler.Reconciler
  275. // nodeStatusUpdater is used to update node status with the list of attached
  276. // volumes
  277. nodeStatusUpdater statusupdater.NodeStatusUpdater
  278. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  279. // populate the current pods using podInformer.
  280. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  281. // recorder is used to record events in the API server
  282. recorder record.EventRecorder
  283. // pvcQueue is used to queue pvc objects
  284. pvcQueue workqueue.RateLimitingInterface
  285. }
  286. func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
  287. defer runtime.HandleCrash()
  288. defer adc.pvcQueue.ShutDown()
  289. klog.Infof("Starting attach detach controller")
  290. defer klog.Infof("Shutting down attach detach controller")
  291. synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
  292. if adc.csiNodeSynced != nil {
  293. synced = append(synced, adc.csiNodeSynced)
  294. }
  295. if adc.csiDriversSynced != nil {
  296. synced = append(synced, adc.csiDriversSynced)
  297. }
  298. if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
  299. return
  300. }
  301. err := adc.populateActualStateOfWorld()
  302. if err != nil {
  303. klog.Errorf("Error populating the actual state of world: %v", err)
  304. }
  305. err = adc.populateDesiredStateOfWorld()
  306. if err != nil {
  307. klog.Errorf("Error populating the desired state of world: %v", err)
  308. }
  309. go adc.reconciler.Run(stopCh)
  310. go adc.desiredStateOfWorldPopulator.Run(stopCh)
  311. go wait.Until(adc.pvcWorker, time.Second, stopCh)
  312. metrics.Register(adc.pvcLister,
  313. adc.pvLister,
  314. adc.podLister,
  315. adc.actualStateOfWorld,
  316. adc.desiredStateOfWorld,
  317. &adc.volumePluginMgr)
  318. <-stopCh
  319. }
  320. func (adc *attachDetachController) populateActualStateOfWorld() error {
  321. klog.V(5).Infof("Populating ActualStateOfworld")
  322. nodes, err := adc.nodeLister.List(labels.Everything())
  323. if err != nil {
  324. return err
  325. }
  326. for _, node := range nodes {
  327. nodeName := types.NodeName(node.Name)
  328. for _, attachedVolume := range node.Status.VolumesAttached {
  329. uniqueName := attachedVolume.Name
  330. // The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
  331. // In such a case it should be detached in the first reconciliation cycle and the
  332. // volume spec is not needed to detach a volume. If the volume is used by a pod, it
  333. // its spec can be: this would happen during in the populateDesiredStateOfWorld which
  334. // scans the pods and updates their volumes in the ActualStateOfWorld too.
  335. err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
  336. if err != nil {
  337. klog.Errorf("Failed to mark the volume as attached: %v", err)
  338. continue
  339. }
  340. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  341. adc.addNodeToDswp(node, types.NodeName(node.Name))
  342. }
  343. }
  344. return nil
  345. }
  346. func (adc *attachDetachController) getNodeVolumeDevicePath(
  347. volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
  348. var devicePath string
  349. var found bool
  350. node, err := adc.nodeLister.Get(string(nodeName))
  351. if err != nil {
  352. return devicePath, err
  353. }
  354. for _, attachedVolume := range node.Status.VolumesAttached {
  355. if volumeName == attachedVolume.Name {
  356. devicePath = attachedVolume.DevicePath
  357. found = true
  358. break
  359. }
  360. }
  361. if !found {
  362. err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
  363. }
  364. return devicePath, err
  365. }
  366. func (adc *attachDetachController) populateDesiredStateOfWorld() error {
  367. klog.V(5).Infof("Populating DesiredStateOfworld")
  368. pods, err := adc.podLister.List(labels.Everything())
  369. if err != nil {
  370. return err
  371. }
  372. for _, pod := range pods {
  373. podToAdd := pod
  374. adc.podAdd(podToAdd)
  375. for _, podVolume := range podToAdd.Spec.Volumes {
  376. // The volume specs present in the ActualStateOfWorld are nil, let's replace those
  377. // with the correct ones found on pods. The present in the ASW with no corresponding
  378. // pod will be detached and the spec is irrelevant.
  379. volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
  380. if err != nil {
  381. klog.Errorf(
  382. "Error creating spec for volume %q, pod %q/%q: %v",
  383. podVolume.Name,
  384. podToAdd.Namespace,
  385. podToAdd.Name,
  386. err)
  387. continue
  388. }
  389. nodeName := types.NodeName(podToAdd.Spec.NodeName)
  390. plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  391. if err != nil || plugin == nil {
  392. klog.V(10).Infof(
  393. "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
  394. podVolume.Name,
  395. podToAdd.Namespace,
  396. podToAdd.Name,
  397. err)
  398. continue
  399. }
  400. volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
  401. if err != nil {
  402. klog.Errorf(
  403. "Failed to find unique name for volume %q, pod %q/%q: %v",
  404. podVolume.Name,
  405. podToAdd.Namespace,
  406. podToAdd.Name,
  407. err)
  408. continue
  409. }
  410. if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) {
  411. devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
  412. if err != nil {
  413. klog.Errorf("Failed to find device path: %v", err)
  414. continue
  415. }
  416. err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
  417. if err != nil {
  418. klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
  419. }
  420. }
  421. }
  422. }
  423. return nil
  424. }
  425. func (adc *attachDetachController) podAdd(obj interface{}) {
  426. pod, ok := obj.(*v1.Pod)
  427. if pod == nil || !ok {
  428. return
  429. }
  430. if pod.Spec.NodeName == "" {
  431. // Ignore pods without NodeName, indicating they are not scheduled.
  432. return
  433. }
  434. volumeActionFlag := util.DetermineVolumeAction(
  435. pod,
  436. adc.desiredStateOfWorld,
  437. true /* default volume action */)
  438. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  439. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
  440. }
  441. // GetDesiredStateOfWorld returns desired state of world associated with controller
  442. func (adc *attachDetachController) GetDesiredStateOfWorld() cache.DesiredStateOfWorld {
  443. return adc.desiredStateOfWorld
  444. }
  445. func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
  446. pod, ok := newObj.(*v1.Pod)
  447. if pod == nil || !ok {
  448. return
  449. }
  450. if pod.Spec.NodeName == "" {
  451. // Ignore pods without NodeName, indicating they are not scheduled.
  452. return
  453. }
  454. volumeActionFlag := util.DetermineVolumeAction(
  455. pod,
  456. adc.desiredStateOfWorld,
  457. true /* default volume action */)
  458. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  459. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
  460. }
  461. func (adc *attachDetachController) podDelete(obj interface{}) {
  462. pod, ok := obj.(*v1.Pod)
  463. if pod == nil || !ok {
  464. return
  465. }
  466. util.ProcessPodVolumes(pod, false, /* addVolumes */
  467. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
  468. }
  469. func (adc *attachDetachController) nodeAdd(obj interface{}) {
  470. node, ok := obj.(*v1.Node)
  471. // TODO: investigate if nodeName is empty then if we can return
  472. // kubernetes/kubernetes/issues/37777
  473. if node == nil || !ok {
  474. return
  475. }
  476. nodeName := types.NodeName(node.Name)
  477. adc.nodeUpdate(nil, obj)
  478. // kubernetes/kubernetes/issues/37586
  479. // This is to workaround the case when a node add causes to wipe out
  480. // the attached volumes field. This function ensures that we sync with
  481. // the actual status.
  482. adc.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
  483. }
  484. func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
  485. node, ok := newObj.(*v1.Node)
  486. // TODO: investigate if nodeName is empty then if we can return
  487. if node == nil || !ok {
  488. return
  489. }
  490. nodeName := types.NodeName(node.Name)
  491. adc.addNodeToDswp(node, nodeName)
  492. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  493. }
  494. func (adc *attachDetachController) nodeDelete(obj interface{}) {
  495. node, ok := obj.(*v1.Node)
  496. if node == nil || !ok {
  497. return
  498. }
  499. nodeName := types.NodeName(node.Name)
  500. if err := adc.desiredStateOfWorld.DeleteNode(nodeName); err != nil {
  501. // This might happen during drain, but we still want it to appear in our logs
  502. klog.Infof("error removing node %q from desired-state-of-world: %v", nodeName, err)
  503. }
  504. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  505. }
  506. func (adc *attachDetachController) enqueuePVC(obj interface{}) {
  507. key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  508. if err != nil {
  509. runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  510. return
  511. }
  512. adc.pvcQueue.Add(key)
  513. }
  514. // pvcWorker processes items from pvcQueue
  515. func (adc *attachDetachController) pvcWorker() {
  516. for adc.processNextItem() {
  517. }
  518. }
  519. func (adc *attachDetachController) processNextItem() bool {
  520. keyObj, shutdown := adc.pvcQueue.Get()
  521. if shutdown {
  522. return false
  523. }
  524. defer adc.pvcQueue.Done(keyObj)
  525. if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
  526. // Rather than wait for a full resync, re-add the key to the
  527. // queue to be processed.
  528. adc.pvcQueue.AddRateLimited(keyObj)
  529. runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
  530. return true
  531. }
  532. // Finally, if no error occurs we Forget this item so it does not
  533. // get queued again until another change happens.
  534. adc.pvcQueue.Forget(keyObj)
  535. return true
  536. }
  537. func (adc *attachDetachController) syncPVCByKey(key string) error {
  538. klog.V(5).Infof("syncPVCByKey[%s]", key)
  539. namespace, name, err := kcache.SplitMetaNamespaceKey(key)
  540. if err != nil {
  541. klog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
  542. return nil
  543. }
  544. pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
  545. if apierrors.IsNotFound(err) {
  546. klog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
  547. return nil
  548. }
  549. if err != nil {
  550. return err
  551. }
  552. if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
  553. // Skip unbound PVCs.
  554. return nil
  555. }
  556. objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
  557. if err != nil {
  558. return err
  559. }
  560. for _, obj := range objs {
  561. pod, ok := obj.(*v1.Pod)
  562. if !ok {
  563. continue
  564. }
  565. volumeActionFlag := util.DetermineVolumeAction(
  566. pod,
  567. adc.desiredStateOfWorld,
  568. true /* default volume action */)
  569. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  570. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
  571. }
  572. return nil
  573. }
  574. // processVolumesInUse processes the list of volumes marked as "in-use"
  575. // according to the specified Node's Status.VolumesInUse and updates the
  576. // corresponding volume in the actual state of the world to indicate that it is
  577. // mounted.
  578. func (adc *attachDetachController) processVolumesInUse(
  579. nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
  580. klog.V(4).Infof("processVolumesInUse for node %q", nodeName)
  581. for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
  582. mounted := false
  583. for _, volumeInUse := range volumesInUse {
  584. if attachedVolume.VolumeName == volumeInUse {
  585. mounted = true
  586. break
  587. }
  588. }
  589. err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
  590. if err != nil {
  591. klog.Warningf(
  592. "SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",
  593. attachedVolume.VolumeName, nodeName, mounted, err)
  594. }
  595. }
  596. }
  597. var _ volume.VolumeHost = &attachDetachController{}
  598. var _ volume.AttachDetachVolumeHost = &attachDetachController{}
  599. func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister {
  600. return adc.csiNodeLister
  601. }
  602. func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
  603. return adc.csiDriverLister
  604. }
  605. func (adc *attachDetachController) IsAttachDetachController() bool {
  606. return true
  607. }
  608. // VolumeHost implementation
  609. // This is an unfortunate requirement of the current factoring of volume plugin
  610. // initializing code. It requires kubelet specific methods used by the mounting
  611. // code to be implemented by all initializers even if the initializer does not
  612. // do mounting (like this attach/detach controller).
  613. // Issue kubernetes/kubernetes/issues/14217 to fix this.
  614. func (adc *attachDetachController) GetPluginDir(podUID string) string {
  615. return ""
  616. }
  617. func (adc *attachDetachController) GetVolumeDevicePluginDir(podUID string) string {
  618. return ""
  619. }
  620. func (adc *attachDetachController) GetPodsDir() string {
  621. return ""
  622. }
  623. func (adc *attachDetachController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
  624. return ""
  625. }
  626. func (adc *attachDetachController) GetPodPluginDir(podUID types.UID, pluginName string) string {
  627. return ""
  628. }
  629. func (adc *attachDetachController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  630. return ""
  631. }
  632. func (adc *attachDetachController) GetKubeClient() clientset.Interface {
  633. return adc.kubeClient
  634. }
  635. func (adc *attachDetachController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
  636. return nil, fmt.Errorf("NewWrapperMounter not supported by Attach/Detach controller's VolumeHost implementation")
  637. }
  638. func (adc *attachDetachController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  639. return nil, fmt.Errorf("NewWrapperUnmounter not supported by Attach/Detach controller's VolumeHost implementation")
  640. }
  641. func (adc *attachDetachController) GetCloudProvider() cloudprovider.Interface {
  642. return adc.cloud
  643. }
  644. func (adc *attachDetachController) GetMounter(pluginName string) mount.Interface {
  645. return nil
  646. }
  647. func (adc *attachDetachController) GetHostName() string {
  648. return ""
  649. }
  650. func (adc *attachDetachController) GetHostIP() (net.IP, error) {
  651. return nil, fmt.Errorf("GetHostIP() not supported by Attach/Detach controller's VolumeHost implementation")
  652. }
  653. func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error) {
  654. return v1.ResourceList{}, nil
  655. }
  656. func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  657. return func(_, _ string) (*v1.Secret, error) {
  658. return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
  659. }
  660. }
  661. func (adc *attachDetachController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  662. return func(_, _ string) (*v1.ConfigMap, error) {
  663. return nil, fmt.Errorf("GetConfigMap unsupported in attachDetachController")
  664. }
  665. }
  666. func (adc *attachDetachController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  667. return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  668. return nil, fmt.Errorf("GetServiceAccountToken unsupported in attachDetachController")
  669. }
  670. }
  671. func (adc *attachDetachController) DeleteServiceAccountTokenFunc() func(types.UID) {
  672. return func(types.UID) {
  673. klog.Errorf("DeleteServiceAccountToken unsupported in attachDetachController")
  674. }
  675. }
  676. func (adc *attachDetachController) GetExec(pluginName string) mount.Exec {
  677. return mount.NewOsExec()
  678. }
  679. func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
  680. if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
  681. keepTerminatedPodVolumes := false
  682. if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
  683. keepTerminatedPodVolumes = (t == "true")
  684. }
  685. // Node specifies annotation indicating it should be managed by attach
  686. // detach controller. Add it to desired state of world.
  687. adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
  688. }
  689. }
  690. func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
  691. return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
  692. }
  693. func (adc *attachDetachController) GetNodeName() types.NodeName {
  694. return ""
  695. }
  696. func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
  697. return adc.recorder
  698. }
  699. func (adc *attachDetachController) GetSubpather() subpath.Interface {
  700. // Subpaths not needed in attachdetach controller
  701. return nil
  702. }
  703. func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
  704. return adc.csiDriverLister
  705. }