attach_detach_controller.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package attachdetach implements a controller to manage volume attach and detach
  14. // operations.
  15. package attachdetach
  16. import (
  17. "fmt"
  18. "net"
  19. "time"
  20. "k8s.io/klog"
  21. utilexec "k8s.io/utils/exec"
  22. "k8s.io/utils/mount"
  23. authenticationv1 "k8s.io/api/authentication/v1"
  24. v1 "k8s.io/api/core/v1"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/runtime"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. utilfeature "k8s.io/apiserver/pkg/util/feature"
  31. coreinformers "k8s.io/client-go/informers/core/v1"
  32. storageinformersv1 "k8s.io/client-go/informers/storage/v1"
  33. storageinformers "k8s.io/client-go/informers/storage/v1beta1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/kubernetes/scheme"
  36. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  37. corelisters "k8s.io/client-go/listers/core/v1"
  38. storagelistersv1 "k8s.io/client-go/listers/storage/v1"
  39. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  40. kcache "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/record"
  42. "k8s.io/client-go/util/workqueue"
  43. cloudprovider "k8s.io/cloud-provider"
  44. csitrans "k8s.io/csi-translation-lib"
  45. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  46. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
  47. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
  48. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
  49. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
  50. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
  51. "k8s.io/kubernetes/pkg/features"
  52. "k8s.io/kubernetes/pkg/volume"
  53. "k8s.io/kubernetes/pkg/volume/csimigration"
  54. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  55. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  56. "k8s.io/kubernetes/pkg/volume/util/subpath"
  57. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  58. )
  59. // TimerConfig contains configuration of internal attach/detach timers and
  60. // should be used only to speed up tests. DefaultTimerConfig is the suggested
  61. // timer configuration for production.
  62. type TimerConfig struct {
  63. // ReconcilerLoopPeriod is the amount of time the reconciler loop waits
  64. // between successive executions
  65. ReconcilerLoopPeriod time.Duration
  66. // ReconcilerMaxWaitForUnmountDuration is the maximum amount of time the
  67. // attach detach controller will wait for a volume to be safely unmounted
  68. // from its node. Once this time has expired, the controller will assume the
  69. // node or kubelet are unresponsive and will detach the volume anyway.
  70. ReconcilerMaxWaitForUnmountDuration time.Duration
  71. // DesiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
  72. // DesiredStateOfWorldPopulator loop waits between successive executions
  73. DesiredStateOfWorldPopulatorLoopSleepPeriod time.Duration
  74. // DesiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
  75. // time the DesiredStateOfWorldPopulator loop waits between list pods
  76. // calls.
  77. DesiredStateOfWorldPopulatorListPodsRetryDuration time.Duration
  78. }
  79. // DefaultTimerConfig is the default configuration of Attach/Detach controller
  80. // timers.
  81. var DefaultTimerConfig TimerConfig = TimerConfig{
  82. ReconcilerLoopPeriod: 100 * time.Millisecond,
  83. ReconcilerMaxWaitForUnmountDuration: 6 * time.Minute,
  84. DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Minute,
  85. DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Minute,
  86. }
  87. // AttachDetachController defines the operations supported by this controller.
  88. type AttachDetachController interface {
  89. Run(stopCh <-chan struct{})
  90. GetDesiredStateOfWorld() cache.DesiredStateOfWorld
  91. }
  92. // NewAttachDetachController returns a new instance of AttachDetachController.
  93. func NewAttachDetachController(
  94. kubeClient clientset.Interface,
  95. podInformer coreinformers.PodInformer,
  96. nodeInformer coreinformers.NodeInformer,
  97. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  98. pvInformer coreinformers.PersistentVolumeInformer,
  99. csiNodeInformer storageinformersv1.CSINodeInformer,
  100. csiDriverInformer storageinformers.CSIDriverInformer,
  101. cloud cloudprovider.Interface,
  102. plugins []volume.VolumePlugin,
  103. prober volume.DynamicPluginProber,
  104. disableReconciliationSync bool,
  105. reconcilerSyncDuration time.Duration,
  106. timerConfig TimerConfig) (AttachDetachController, error) {
  107. adc := &attachDetachController{
  108. kubeClient: kubeClient,
  109. pvcLister: pvcInformer.Lister(),
  110. pvcsSynced: pvcInformer.Informer().HasSynced,
  111. pvLister: pvInformer.Lister(),
  112. pvsSynced: pvInformer.Informer().HasSynced,
  113. podLister: podInformer.Lister(),
  114. podsSynced: podInformer.Informer().HasSynced,
  115. podIndexer: podInformer.Informer().GetIndexer(),
  116. nodeLister: nodeInformer.Lister(),
  117. nodesSynced: nodeInformer.Informer().HasSynced,
  118. cloud: cloud,
  119. pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
  120. }
  121. if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
  122. utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  123. adc.csiNodeLister = csiNodeInformer.Lister()
  124. adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
  125. }
  126. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  127. adc.csiDriverLister = csiDriverInformer.Lister()
  128. adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
  129. }
  130. if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
  131. return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
  132. }
  133. eventBroadcaster := record.NewBroadcaster()
  134. eventBroadcaster.StartLogging(klog.Infof)
  135. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  136. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
  137. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  138. adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
  139. adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
  140. adc.attacherDetacher =
  141. operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  142. kubeClient,
  143. &adc.volumePluginMgr,
  144. recorder,
  145. false, // flag for experimental binary check for volume mount
  146. blkutil))
  147. adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
  148. kubeClient, nodeInformer.Lister(), adc.actualStateOfWorld)
  149. // Default these to values in options
  150. adc.reconciler = reconciler.NewReconciler(
  151. timerConfig.ReconcilerLoopPeriod,
  152. timerConfig.ReconcilerMaxWaitForUnmountDuration,
  153. reconcilerSyncDuration,
  154. disableReconciliationSync,
  155. adc.desiredStateOfWorld,
  156. adc.actualStateOfWorld,
  157. adc.attacherDetacher,
  158. adc.nodeStatusUpdater,
  159. recorder)
  160. csiTranslator := csitrans.New()
  161. adc.intreeToCSITranslator = csiTranslator
  162. adc.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
  163. adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
  164. timerConfig.DesiredStateOfWorldPopulatorLoopSleepPeriod,
  165. timerConfig.DesiredStateOfWorldPopulatorListPodsRetryDuration,
  166. podInformer.Lister(),
  167. adc.desiredStateOfWorld,
  168. &adc.volumePluginMgr,
  169. pvcInformer.Lister(),
  170. pvInformer.Lister(),
  171. adc.csiMigratedPluginManager,
  172. adc.intreeToCSITranslator)
  173. podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  174. AddFunc: adc.podAdd,
  175. UpdateFunc: adc.podUpdate,
  176. DeleteFunc: adc.podDelete,
  177. })
  178. // This custom indexer will index pods by its PVC keys. Then we don't need
  179. // to iterate all pods every time to find pods which reference given PVC.
  180. err := adc.podIndexer.AddIndexers(kcache.Indexers{
  181. pvcKeyIndex: indexByPVCKey,
  182. })
  183. if err != nil {
  184. klog.Warningf("adding indexer got %v", err)
  185. }
  186. nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  187. AddFunc: adc.nodeAdd,
  188. UpdateFunc: adc.nodeUpdate,
  189. DeleteFunc: adc.nodeDelete,
  190. })
  191. pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  192. AddFunc: func(obj interface{}) {
  193. adc.enqueuePVC(obj)
  194. },
  195. UpdateFunc: func(old, new interface{}) {
  196. adc.enqueuePVC(new)
  197. },
  198. })
  199. return adc, nil
  200. }
  201. const (
  202. pvcKeyIndex string = "pvcKey"
  203. )
  204. // indexByPVCKey returns PVC keys for given pod. Note that the index is only
  205. // used for attaching, so we are only interested in active pods with nodeName
  206. // set.
  207. func indexByPVCKey(obj interface{}) ([]string, error) {
  208. pod, ok := obj.(*v1.Pod)
  209. if !ok {
  210. return []string{}, nil
  211. }
  212. if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
  213. return []string{}, nil
  214. }
  215. keys := []string{}
  216. for _, podVolume := range pod.Spec.Volumes {
  217. if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
  218. keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
  219. }
  220. }
  221. return keys, nil
  222. }
  223. type attachDetachController struct {
  224. // kubeClient is the kube API client used by volumehost to communicate with
  225. // the API server.
  226. kubeClient clientset.Interface
  227. // pvcLister is the shared PVC lister used to fetch and store PVC
  228. // objects from the API server. It is shared with other controllers and
  229. // therefore the PVC objects in its store should be treated as immutable.
  230. pvcLister corelisters.PersistentVolumeClaimLister
  231. pvcsSynced kcache.InformerSynced
  232. // pvLister is the shared PV lister used to fetch and store PV objects
  233. // from the API server. It is shared with other controllers and therefore
  234. // the PV objects in its store should be treated as immutable.
  235. pvLister corelisters.PersistentVolumeLister
  236. pvsSynced kcache.InformerSynced
  237. podLister corelisters.PodLister
  238. podsSynced kcache.InformerSynced
  239. podIndexer kcache.Indexer
  240. nodeLister corelisters.NodeLister
  241. nodesSynced kcache.InformerSynced
  242. csiNodeLister storagelistersv1.CSINodeLister
  243. csiNodeSynced kcache.InformerSynced
  244. // csiDriverLister is the shared CSIDriver lister used to fetch and store
  245. // CSIDriver objects from the API server. It is shared with other controllers
  246. // and therefore the CSIDriver objects in its store should be treated as immutable.
  247. csiDriverLister storagelisters.CSIDriverLister
  248. csiDriversSynced kcache.InformerSynced
  249. // cloud provider used by volume host
  250. cloud cloudprovider.Interface
  251. // volumePluginMgr used to initialize and fetch volume plugins
  252. volumePluginMgr volume.VolumePluginMgr
  253. // desiredStateOfWorld is a data structure containing the desired state of
  254. // the world according to this controller: i.e. what nodes the controller
  255. // is managing, what volumes it wants be attached to these nodes, and which
  256. // pods are scheduled to those nodes referencing the volumes.
  257. // The data structure is populated by the controller using a stream of node
  258. // and pod API server objects fetched by the informers.
  259. desiredStateOfWorld cache.DesiredStateOfWorld
  260. // actualStateOfWorld is a data structure containing the actual state of
  261. // the world according to this controller: i.e. which volumes are attached
  262. // to which nodes.
  263. // The data structure is populated upon successful completion of attach and
  264. // detach actions triggered by the controller and a periodic sync with
  265. // storage providers for the "true" state of the world.
  266. actualStateOfWorld cache.ActualStateOfWorld
  267. // attacherDetacher is used to start asynchronous attach and operations
  268. attacherDetacher operationexecutor.OperationExecutor
  269. // reconciler is used to run an asynchronous periodic loop to reconcile the
  270. // desiredStateOfWorld with the actualStateOfWorld by triggering attach
  271. // detach operations using the attacherDetacher.
  272. reconciler reconciler.Reconciler
  273. // nodeStatusUpdater is used to update node status with the list of attached
  274. // volumes
  275. nodeStatusUpdater statusupdater.NodeStatusUpdater
  276. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  277. // populate the current pods using podInformer.
  278. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  279. // recorder is used to record events in the API server
  280. recorder record.EventRecorder
  281. // pvcQueue is used to queue pvc objects
  282. pvcQueue workqueue.RateLimitingInterface
  283. // csiMigratedPluginManager detects in-tree plugins that have been migrated to CSI
  284. csiMigratedPluginManager csimigration.PluginManager
  285. // intreeToCSITranslator translates from in-tree volume specs to CSI
  286. intreeToCSITranslator csimigration.InTreeToCSITranslator
  287. }
  288. func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
  289. defer runtime.HandleCrash()
  290. defer adc.pvcQueue.ShutDown()
  291. klog.Infof("Starting attach detach controller")
  292. defer klog.Infof("Shutting down attach detach controller")
  293. synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
  294. if adc.csiNodeSynced != nil {
  295. synced = append(synced, adc.csiNodeSynced)
  296. }
  297. if adc.csiDriversSynced != nil {
  298. synced = append(synced, adc.csiDriversSynced)
  299. }
  300. if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
  301. return
  302. }
  303. err := adc.populateActualStateOfWorld()
  304. if err != nil {
  305. klog.Errorf("Error populating the actual state of world: %v", err)
  306. }
  307. err = adc.populateDesiredStateOfWorld()
  308. if err != nil {
  309. klog.Errorf("Error populating the desired state of world: %v", err)
  310. }
  311. go adc.reconciler.Run(stopCh)
  312. go adc.desiredStateOfWorldPopulator.Run(stopCh)
  313. go wait.Until(adc.pvcWorker, time.Second, stopCh)
  314. metrics.Register(adc.pvcLister,
  315. adc.pvLister,
  316. adc.podLister,
  317. adc.actualStateOfWorld,
  318. adc.desiredStateOfWorld,
  319. &adc.volumePluginMgr,
  320. adc.csiMigratedPluginManager,
  321. adc.intreeToCSITranslator)
  322. <-stopCh
  323. }
  324. func (adc *attachDetachController) populateActualStateOfWorld() error {
  325. klog.V(5).Infof("Populating ActualStateOfworld")
  326. nodes, err := adc.nodeLister.List(labels.Everything())
  327. if err != nil {
  328. return err
  329. }
  330. for _, node := range nodes {
  331. nodeName := types.NodeName(node.Name)
  332. for _, attachedVolume := range node.Status.VolumesAttached {
  333. uniqueName := attachedVolume.Name
  334. // The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
  335. // In such a case it should be detached in the first reconciliation cycle and the
  336. // volume spec is not needed to detach a volume. If the volume is used by a pod, it
  337. // its spec can be: this would happen during in the populateDesiredStateOfWorld which
  338. // scans the pods and updates their volumes in the ActualStateOfWorld too.
  339. err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
  340. if err != nil {
  341. klog.Errorf("Failed to mark the volume as attached: %v", err)
  342. continue
  343. }
  344. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  345. adc.addNodeToDswp(node, types.NodeName(node.Name))
  346. }
  347. }
  348. return nil
  349. }
  350. func (adc *attachDetachController) getNodeVolumeDevicePath(
  351. volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
  352. var devicePath string
  353. var found bool
  354. node, err := adc.nodeLister.Get(string(nodeName))
  355. if err != nil {
  356. return devicePath, err
  357. }
  358. for _, attachedVolume := range node.Status.VolumesAttached {
  359. if volumeName == attachedVolume.Name {
  360. devicePath = attachedVolume.DevicePath
  361. found = true
  362. break
  363. }
  364. }
  365. if !found {
  366. err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
  367. }
  368. return devicePath, err
  369. }
  370. func (adc *attachDetachController) populateDesiredStateOfWorld() error {
  371. klog.V(5).Infof("Populating DesiredStateOfworld")
  372. pods, err := adc.podLister.List(labels.Everything())
  373. if err != nil {
  374. return err
  375. }
  376. for _, pod := range pods {
  377. podToAdd := pod
  378. adc.podAdd(podToAdd)
  379. for _, podVolume := range podToAdd.Spec.Volumes {
  380. nodeName := types.NodeName(podToAdd.Spec.NodeName)
  381. // The volume specs present in the ActualStateOfWorld are nil, let's replace those
  382. // with the correct ones found on pods. The present in the ASW with no corresponding
  383. // pod will be detached and the spec is irrelevant.
  384. volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
  385. if err != nil {
  386. klog.Errorf(
  387. "Error creating spec for volume %q, pod %q/%q: %v",
  388. podVolume.Name,
  389. podToAdd.Namespace,
  390. podToAdd.Name,
  391. err)
  392. continue
  393. }
  394. plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  395. if err != nil || plugin == nil {
  396. klog.V(10).Infof(
  397. "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
  398. podVolume.Name,
  399. podToAdd.Namespace,
  400. podToAdd.Name,
  401. err)
  402. continue
  403. }
  404. volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
  405. if err != nil {
  406. klog.Errorf(
  407. "Failed to find unique name for volume %q, pod %q/%q: %v",
  408. podVolume.Name,
  409. podToAdd.Namespace,
  410. podToAdd.Name,
  411. err)
  412. continue
  413. }
  414. if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) {
  415. devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
  416. if err != nil {
  417. klog.Errorf("Failed to find device path: %v", err)
  418. continue
  419. }
  420. err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
  421. if err != nil {
  422. klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
  423. }
  424. }
  425. }
  426. }
  427. return nil
  428. }
  429. func (adc *attachDetachController) podAdd(obj interface{}) {
  430. pod, ok := obj.(*v1.Pod)
  431. if pod == nil || !ok {
  432. return
  433. }
  434. if pod.Spec.NodeName == "" {
  435. // Ignore pods without NodeName, indicating they are not scheduled.
  436. return
  437. }
  438. volumeActionFlag := util.DetermineVolumeAction(
  439. pod,
  440. adc.desiredStateOfWorld,
  441. true /* default volume action */)
  442. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  443. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
  444. }
  445. // GetDesiredStateOfWorld returns desired state of world associated with controller
  446. func (adc *attachDetachController) GetDesiredStateOfWorld() cache.DesiredStateOfWorld {
  447. return adc.desiredStateOfWorld
  448. }
  449. func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
  450. pod, ok := newObj.(*v1.Pod)
  451. if pod == nil || !ok {
  452. return
  453. }
  454. if pod.Spec.NodeName == "" {
  455. // Ignore pods without NodeName, indicating they are not scheduled.
  456. return
  457. }
  458. volumeActionFlag := util.DetermineVolumeAction(
  459. pod,
  460. adc.desiredStateOfWorld,
  461. true /* default volume action */)
  462. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  463. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
  464. }
  465. func (adc *attachDetachController) podDelete(obj interface{}) {
  466. pod, ok := obj.(*v1.Pod)
  467. if pod == nil || !ok {
  468. return
  469. }
  470. util.ProcessPodVolumes(pod, false, /* addVolumes */
  471. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
  472. }
  473. func (adc *attachDetachController) nodeAdd(obj interface{}) {
  474. node, ok := obj.(*v1.Node)
  475. // TODO: investigate if nodeName is empty then if we can return
  476. // kubernetes/kubernetes/issues/37777
  477. if node == nil || !ok {
  478. return
  479. }
  480. nodeName := types.NodeName(node.Name)
  481. adc.nodeUpdate(nil, obj)
  482. // kubernetes/kubernetes/issues/37586
  483. // This is to workaround the case when a node add causes to wipe out
  484. // the attached volumes field. This function ensures that we sync with
  485. // the actual status.
  486. adc.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
  487. }
  488. func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
  489. node, ok := newObj.(*v1.Node)
  490. // TODO: investigate if nodeName is empty then if we can return
  491. if node == nil || !ok {
  492. return
  493. }
  494. nodeName := types.NodeName(node.Name)
  495. adc.addNodeToDswp(node, nodeName)
  496. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  497. }
  498. func (adc *attachDetachController) nodeDelete(obj interface{}) {
  499. node, ok := obj.(*v1.Node)
  500. if node == nil || !ok {
  501. return
  502. }
  503. nodeName := types.NodeName(node.Name)
  504. if err := adc.desiredStateOfWorld.DeleteNode(nodeName); err != nil {
  505. // This might happen during drain, but we still want it to appear in our logs
  506. klog.Infof("error removing node %q from desired-state-of-world: %v", nodeName, err)
  507. }
  508. adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
  509. }
  510. func (adc *attachDetachController) enqueuePVC(obj interface{}) {
  511. key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  512. if err != nil {
  513. runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  514. return
  515. }
  516. adc.pvcQueue.Add(key)
  517. }
  518. // pvcWorker processes items from pvcQueue
  519. func (adc *attachDetachController) pvcWorker() {
  520. for adc.processNextItem() {
  521. }
  522. }
  523. func (adc *attachDetachController) processNextItem() bool {
  524. keyObj, shutdown := adc.pvcQueue.Get()
  525. if shutdown {
  526. return false
  527. }
  528. defer adc.pvcQueue.Done(keyObj)
  529. if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
  530. // Rather than wait for a full resync, re-add the key to the
  531. // queue to be processed.
  532. adc.pvcQueue.AddRateLimited(keyObj)
  533. runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
  534. return true
  535. }
  536. // Finally, if no error occurs we Forget this item so it does not
  537. // get queued again until another change happens.
  538. adc.pvcQueue.Forget(keyObj)
  539. return true
  540. }
  541. func (adc *attachDetachController) syncPVCByKey(key string) error {
  542. klog.V(5).Infof("syncPVCByKey[%s]", key)
  543. namespace, name, err := kcache.SplitMetaNamespaceKey(key)
  544. if err != nil {
  545. klog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
  546. return nil
  547. }
  548. pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
  549. if apierrors.IsNotFound(err) {
  550. klog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
  551. return nil
  552. }
  553. if err != nil {
  554. return err
  555. }
  556. if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
  557. // Skip unbound PVCs.
  558. return nil
  559. }
  560. objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
  561. if err != nil {
  562. return err
  563. }
  564. for _, obj := range objs {
  565. pod, ok := obj.(*v1.Pod)
  566. if !ok {
  567. continue
  568. }
  569. volumeActionFlag := util.DetermineVolumeAction(
  570. pod,
  571. adc.desiredStateOfWorld,
  572. true /* default volume action */)
  573. util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
  574. adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
  575. }
  576. return nil
  577. }
  578. // processVolumesInUse processes the list of volumes marked as "in-use"
  579. // according to the specified Node's Status.VolumesInUse and updates the
  580. // corresponding volume in the actual state of the world to indicate that it is
  581. // mounted.
  582. func (adc *attachDetachController) processVolumesInUse(
  583. nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
  584. klog.V(4).Infof("processVolumesInUse for node %q", nodeName)
  585. for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
  586. mounted := false
  587. for _, volumeInUse := range volumesInUse {
  588. if attachedVolume.VolumeName == volumeInUse {
  589. mounted = true
  590. break
  591. }
  592. }
  593. err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
  594. if err != nil {
  595. klog.Warningf(
  596. "SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",
  597. attachedVolume.VolumeName, nodeName, mounted, err)
  598. }
  599. }
  600. }
  601. var _ volume.VolumeHost = &attachDetachController{}
  602. var _ volume.AttachDetachVolumeHost = &attachDetachController{}
  603. func (adc *attachDetachController) CSINodeLister() storagelistersv1.CSINodeLister {
  604. return adc.csiNodeLister
  605. }
  606. func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
  607. return adc.csiDriverLister
  608. }
  609. func (adc *attachDetachController) IsAttachDetachController() bool {
  610. return true
  611. }
  612. // VolumeHost implementation
  613. // This is an unfortunate requirement of the current factoring of volume plugin
  614. // initializing code. It requires kubelet specific methods used by the mounting
  615. // code to be implemented by all initializers even if the initializer does not
  616. // do mounting (like this attach/detach controller).
  617. // Issue kubernetes/kubernetes/issues/14217 to fix this.
  618. func (adc *attachDetachController) GetPluginDir(podUID string) string {
  619. return ""
  620. }
  621. func (adc *attachDetachController) GetVolumeDevicePluginDir(podUID string) string {
  622. return ""
  623. }
  624. func (adc *attachDetachController) GetPodsDir() string {
  625. return ""
  626. }
  627. func (adc *attachDetachController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
  628. return ""
  629. }
  630. func (adc *attachDetachController) GetPodPluginDir(podUID types.UID, pluginName string) string {
  631. return ""
  632. }
  633. func (adc *attachDetachController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  634. return ""
  635. }
  636. func (adc *attachDetachController) GetKubeClient() clientset.Interface {
  637. return adc.kubeClient
  638. }
  639. func (adc *attachDetachController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
  640. return nil, fmt.Errorf("NewWrapperMounter not supported by Attach/Detach controller's VolumeHost implementation")
  641. }
  642. func (adc *attachDetachController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  643. return nil, fmt.Errorf("NewWrapperUnmounter not supported by Attach/Detach controller's VolumeHost implementation")
  644. }
  645. func (adc *attachDetachController) GetCloudProvider() cloudprovider.Interface {
  646. return adc.cloud
  647. }
  648. func (adc *attachDetachController) GetMounter(pluginName string) mount.Interface {
  649. return nil
  650. }
  651. func (adc *attachDetachController) GetHostName() string {
  652. return ""
  653. }
  654. func (adc *attachDetachController) GetHostIP() (net.IP, error) {
  655. return nil, fmt.Errorf("GetHostIP() not supported by Attach/Detach controller's VolumeHost implementation")
  656. }
  657. func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error) {
  658. return v1.ResourceList{}, nil
  659. }
  660. func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  661. return func(_, _ string) (*v1.Secret, error) {
  662. return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
  663. }
  664. }
  665. func (adc *attachDetachController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  666. return func(_, _ string) (*v1.ConfigMap, error) {
  667. return nil, fmt.Errorf("GetConfigMap unsupported in attachDetachController")
  668. }
  669. }
  670. func (adc *attachDetachController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  671. return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  672. return nil, fmt.Errorf("GetServiceAccountToken unsupported in attachDetachController")
  673. }
  674. }
  675. func (adc *attachDetachController) DeleteServiceAccountTokenFunc() func(types.UID) {
  676. return func(types.UID) {
  677. klog.Errorf("DeleteServiceAccountToken unsupported in attachDetachController")
  678. }
  679. }
  680. func (adc *attachDetachController) GetExec(pluginName string) utilexec.Interface {
  681. return utilexec.New()
  682. }
  683. func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
  684. if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
  685. keepTerminatedPodVolumes := false
  686. if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
  687. keepTerminatedPodVolumes = (t == "true")
  688. }
  689. // Node specifies annotation indicating it should be managed by attach
  690. // detach controller. Add it to desired state of world.
  691. adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
  692. }
  693. }
  694. func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
  695. return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
  696. }
  697. func (adc *attachDetachController) GetNodeName() types.NodeName {
  698. return ""
  699. }
  700. func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
  701. return adc.recorder
  702. }
  703. func (adc *attachDetachController) GetSubpather() subpath.Interface {
  704. // Subpaths not needed in attachdetach controller
  705. return nil
  706. }
  707. func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
  708. return adc.csiDriverLister
  709. }