pv_controller_base.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package persistentvolume
  14. import (
  15. "context"
  16. "fmt"
  17. "strconv"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. "k8s.io/apimachinery/pkg/api/meta"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. coreinformers "k8s.io/client-go/informers/core/v1"
  27. storageinformers "k8s.io/client-go/informers/storage/v1"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  31. corelisters "k8s.io/client-go/listers/core/v1"
  32. "k8s.io/client-go/tools/cache"
  33. "k8s.io/client-go/tools/record"
  34. "k8s.io/client-go/util/workqueue"
  35. cloudprovider "k8s.io/cloud-provider"
  36. csitrans "k8s.io/csi-translation-lib"
  37. "k8s.io/kubernetes/pkg/controller"
  38. "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
  39. pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
  40. "k8s.io/kubernetes/pkg/util/goroutinemap"
  41. vol "k8s.io/kubernetes/pkg/volume"
  42. "k8s.io/kubernetes/pkg/volume/csimigration"
  43. "k8s.io/klog"
  44. )
  45. // This file contains the controller base functionality, i.e. framework to
  46. // process PV/PVC added/updated/deleted events. The real binding, provisioning,
  47. // recycling and deleting is done in pv_controller.go
  48. // ControllerParameters contains arguments for creation of a new
  49. // PersistentVolume controller.
  50. type ControllerParameters struct {
  51. KubeClient clientset.Interface
  52. SyncPeriod time.Duration
  53. VolumePlugins []vol.VolumePlugin
  54. Cloud cloudprovider.Interface
  55. ClusterName string
  56. VolumeInformer coreinformers.PersistentVolumeInformer
  57. ClaimInformer coreinformers.PersistentVolumeClaimInformer
  58. ClassInformer storageinformers.StorageClassInformer
  59. PodInformer coreinformers.PodInformer
  60. NodeInformer coreinformers.NodeInformer
  61. EventRecorder record.EventRecorder
  62. EnableDynamicProvisioning bool
  63. }
  64. // NewController creates a new PersistentVolume controller
  65. func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
  66. eventRecorder := p.EventRecorder
  67. if eventRecorder == nil {
  68. broadcaster := record.NewBroadcaster()
  69. broadcaster.StartLogging(klog.Infof)
  70. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
  71. eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
  72. }
  73. controller := &PersistentVolumeController{
  74. volumes: newPersistentVolumeOrderedIndex(),
  75. claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
  76. kubeClient: p.KubeClient,
  77. eventRecorder: eventRecorder,
  78. runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
  79. cloud: p.Cloud,
  80. enableDynamicProvisioning: p.EnableDynamicProvisioning,
  81. clusterName: p.ClusterName,
  82. createProvisionedPVRetryCount: createProvisionedPVRetryCount,
  83. createProvisionedPVInterval: createProvisionedPVInterval,
  84. claimQueue: workqueue.NewNamed("claims"),
  85. volumeQueue: workqueue.NewNamed("volumes"),
  86. resyncPeriod: p.SyncPeriod,
  87. operationTimestamps: metrics.NewOperationStartTimeCache(),
  88. }
  89. // Prober is nil because PV is not aware of Flexvolume.
  90. if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
  91. return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
  92. }
  93. p.VolumeInformer.Informer().AddEventHandler(
  94. cache.ResourceEventHandlerFuncs{
  95. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
  96. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
  97. DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
  98. },
  99. )
  100. controller.volumeLister = p.VolumeInformer.Lister()
  101. controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
  102. p.ClaimInformer.Informer().AddEventHandler(
  103. cache.ResourceEventHandlerFuncs{
  104. AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
  105. UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
  106. DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
  107. },
  108. )
  109. controller.claimLister = p.ClaimInformer.Lister()
  110. controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
  111. controller.classLister = p.ClassInformer.Lister()
  112. controller.classListerSynced = p.ClassInformer.Informer().HasSynced
  113. controller.podLister = p.PodInformer.Lister()
  114. controller.podListerSynced = p.PodInformer.Informer().HasSynced
  115. controller.NodeLister = p.NodeInformer.Lister()
  116. controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
  117. csiTranslator := csitrans.New()
  118. controller.translator = csiTranslator
  119. controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
  120. return controller, nil
  121. }
  122. // initializeCaches fills all controller caches with initial data from etcd in
  123. // order to have the caches already filled when first addClaim/addVolume to
  124. // perform initial synchronization of the controller.
  125. func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
  126. volumeList, err := volumeLister.List(labels.Everything())
  127. if err != nil {
  128. klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
  129. return
  130. }
  131. for _, volume := range volumeList {
  132. volumeClone := volume.DeepCopy()
  133. if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
  134. klog.Errorf("error updating volume cache: %v", err)
  135. }
  136. }
  137. claimList, err := claimLister.List(labels.Everything())
  138. if err != nil {
  139. klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
  140. return
  141. }
  142. for _, claim := range claimList {
  143. if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {
  144. klog.Errorf("error updating claim cache: %v", err)
  145. }
  146. }
  147. klog.V(4).Infof("controller initialized")
  148. }
  149. // enqueueWork adds volume or claim to given work queue.
  150. func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, obj interface{}) {
  151. // Beware of "xxx deleted" events
  152. if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
  153. obj = unknown.Obj
  154. }
  155. objName, err := controller.KeyFunc(obj)
  156. if err != nil {
  157. klog.Errorf("failed to get key from object: %v", err)
  158. return
  159. }
  160. klog.V(5).Infof("enqueued %q for sync", objName)
  161. queue.Add(objName)
  162. }
  163. func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
  164. return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
  165. }
  166. func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (bool, error) {
  167. return storeObjectUpdate(ctrl.claims, claim, "claim")
  168. }
  169. // updateVolume runs in worker thread and handles "volume added",
  170. // "volume updated" and "periodic sync" events.
  171. func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {
  172. // Store the new volume version in the cache and do not process it if this
  173. // is an old version.
  174. new, err := ctrl.storeVolumeUpdate(volume)
  175. if err != nil {
  176. klog.Errorf("%v", err)
  177. }
  178. if !new {
  179. return
  180. }
  181. err = ctrl.syncVolume(volume)
  182. if err != nil {
  183. if errors.IsConflict(err) {
  184. // Version conflict error happens quite often and the controller
  185. // recovers from it easily.
  186. klog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)
  187. } else {
  188. klog.Errorf("could not sync volume %q: %+v", volume.Name, err)
  189. }
  190. }
  191. }
  192. // deleteVolume runs in worker thread and handles "volume deleted" event.
  193. func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) {
  194. if err := ctrl.volumes.store.Delete(volume); err != nil {
  195. klog.Errorf("volume %q deletion encountered : %v", volume.Name, err)
  196. } else {
  197. klog.V(4).Infof("volume %q deleted", volume.Name)
  198. }
  199. // record deletion metric if a deletion start timestamp is in the cache
  200. // the following calls will be a no-op if there is nothing for this volume in the cache
  201. // end of timestamp cache entry lifecycle, "RecordMetric" will do the clean
  202. metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, nil)
  203. if volume.Spec.ClaimRef == nil {
  204. return
  205. }
  206. // sync the claim when its volume is deleted. Explicitly syncing the
  207. // claim here in response to volume deletion prevents the claim from
  208. // waiting until the next sync period for its Lost status.
  209. claimKey := claimrefToClaimKey(volume.Spec.ClaimRef)
  210. klog.V(5).Infof("deleteVolume[%s]: scheduling sync of claim %q", volume.Name, claimKey)
  211. ctrl.claimQueue.Add(claimKey)
  212. }
  213. // updateClaim runs in worker thread and handles "claim added",
  214. // "claim updated" and "periodic sync" events.
  215. func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
  216. // Store the new claim version in the cache and do not process it if this is
  217. // an old version.
  218. new, err := ctrl.storeClaimUpdate(claim)
  219. if err != nil {
  220. klog.Errorf("%v", err)
  221. }
  222. if !new {
  223. return
  224. }
  225. err = ctrl.syncClaim(claim)
  226. if err != nil {
  227. if errors.IsConflict(err) {
  228. // Version conflict error happens quite often and the controller
  229. // recovers from it easily.
  230. klog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
  231. } else {
  232. klog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
  233. }
  234. }
  235. }
  236. // Unit test [5-5] [5-6] [5-7]
  237. // deleteClaim runs in worker thread and handles "claim deleted" event.
  238. func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) {
  239. if err := ctrl.claims.Delete(claim); err != nil {
  240. klog.Errorf("claim %q deletion encountered : %v", claim.Name, err)
  241. }
  242. claimKey := claimToClaimKey(claim)
  243. klog.V(4).Infof("claim %q deleted", claimKey)
  244. // clean any possible unfinished provision start timestamp from cache
  245. // Unit test [5-8] [5-9]
  246. ctrl.operationTimestamps.Delete(claimKey)
  247. volumeName := claim.Spec.VolumeName
  248. if volumeName == "" {
  249. klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimKey)
  250. return
  251. }
  252. // sync the volume when its claim is deleted. Explicitly sync'ing the
  253. // volume here in response to claim deletion prevents the volume from
  254. // waiting until the next sync period for its Release.
  255. klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName)
  256. ctrl.volumeQueue.Add(volumeName)
  257. }
  258. // Run starts all of this controller's control loops
  259. func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
  260. defer utilruntime.HandleCrash()
  261. defer ctrl.claimQueue.ShutDown()
  262. defer ctrl.volumeQueue.ShutDown()
  263. klog.Infof("Starting persistent volume controller")
  264. defer klog.Infof("Shutting down persistent volume controller")
  265. if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
  266. return
  267. }
  268. ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
  269. go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
  270. go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
  271. go wait.Until(ctrl.claimWorker, time.Second, stopCh)
  272. metrics.Register(ctrl.volumes.store, ctrl.claims)
  273. <-stopCh
  274. }
  275. func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(claim *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
  276. // TODO: update[Claim|Volume]MigrationAnnotations can be optimized to not
  277. // copy the claim/volume if no modifications are required. Though this
  278. // requires some refactoring as well as an interesting change in the
  279. // semantics of the function which may be undesirable. If no copy is made
  280. // when no modifications are required this function could sometimes return a
  281. // copy of the volume and sometimes return a ref to the original
  282. claimClone := claim.DeepCopy()
  283. modified := updateMigrationAnnotations(ctrl.csiMigratedPluginManager, ctrl.translator, claimClone.Annotations, pvutil.AnnStorageProvisioner)
  284. if !modified {
  285. return claimClone, nil
  286. }
  287. newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
  288. if err != nil {
  289. return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
  290. }
  291. _, err = ctrl.storeClaimUpdate(newClaim)
  292. if err != nil {
  293. return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
  294. }
  295. return newClaim, nil
  296. }
  297. func (ctrl *PersistentVolumeController) updateVolumeMigrationAnnotations(volume *v1.PersistentVolume) (*v1.PersistentVolume, error) {
  298. volumeClone := volume.DeepCopy()
  299. modified := updateMigrationAnnotations(ctrl.csiMigratedPluginManager, ctrl.translator, volumeClone.Annotations, pvutil.AnnDynamicallyProvisioned)
  300. if !modified {
  301. return volumeClone, nil
  302. }
  303. newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), volumeClone, metav1.UpdateOptions{})
  304. if err != nil {
  305. return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
  306. }
  307. _, err = ctrl.storeVolumeUpdate(newVol)
  308. if err != nil {
  309. return nil, fmt.Errorf("persistent Volume Controller can't anneal migration annotations: %v", err)
  310. }
  311. return newVol, nil
  312. }
  313. // updateMigrationAnnotations takes an Annotations map and checks for a
  314. // provisioner name using the provisionerKey. It will then add a
  315. // "volume.beta.kubernetes.io/migrated-to" annotation if migration with the CSI
  316. // driver name for that provisioner is "on" based on feature flags, it will also
  317. // remove the annotation is migration is "off" for that provisioner in rollback
  318. // scenarios. Returns true if the annotations map was modified and false otherwise.
  319. func updateMigrationAnnotations(cmpm CSIMigratedPluginManager, translator CSINameTranslator, ann map[string]string, provisionerKey string) bool {
  320. var csiDriverName string
  321. var err error
  322. if ann == nil {
  323. // No annotations so we can't get the provisioner and don't know whether
  324. // this is migrated - no change
  325. return false
  326. }
  327. provisioner, ok := ann[provisionerKey]
  328. if !ok {
  329. // Volume not dynamically provisioned. Ignore
  330. return false
  331. }
  332. migratedToDriver := ann[pvutil.AnnMigratedTo]
  333. if cmpm.IsMigrationEnabledForPlugin(provisioner) {
  334. csiDriverName, err = translator.GetCSINameFromInTreeName(provisioner)
  335. if err != nil {
  336. klog.Errorf("Could not update volume migration annotations. Migration enabled for plugin %s but could not find corresponding driver name: %v", provisioner, err)
  337. return false
  338. }
  339. if migratedToDriver != csiDriverName {
  340. ann[pvutil.AnnMigratedTo] = csiDriverName
  341. return true
  342. }
  343. } else {
  344. if migratedToDriver != "" {
  345. // Migration annotation exists but the driver isn't migrated currently
  346. delete(ann, pvutil.AnnMigratedTo)
  347. return true
  348. }
  349. }
  350. return false
  351. }
  352. // volumeWorker processes items from volumeQueue. It must run only once,
  353. // syncVolume is not assured to be reentrant.
  354. func (ctrl *PersistentVolumeController) volumeWorker() {
  355. workFunc := func() bool {
  356. keyObj, quit := ctrl.volumeQueue.Get()
  357. if quit {
  358. return true
  359. }
  360. defer ctrl.volumeQueue.Done(keyObj)
  361. key := keyObj.(string)
  362. klog.V(5).Infof("volumeWorker[%s]", key)
  363. _, name, err := cache.SplitMetaNamespaceKey(key)
  364. if err != nil {
  365. klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
  366. return false
  367. }
  368. volume, err := ctrl.volumeLister.Get(name)
  369. if err == nil {
  370. // The volume still exists in informer cache, the event must have
  371. // been add/update/sync
  372. ctrl.updateVolume(volume)
  373. return false
  374. }
  375. if !errors.IsNotFound(err) {
  376. klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
  377. return false
  378. }
  379. // The volume is not in informer cache, the event must have been
  380. // "delete"
  381. volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
  382. if err != nil {
  383. klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
  384. return false
  385. }
  386. if !found {
  387. // The controller has already processed the delete event and
  388. // deleted the volume from its cache
  389. klog.V(2).Infof("deletion of volume %q was already processed", key)
  390. return false
  391. }
  392. volume, ok := volumeObj.(*v1.PersistentVolume)
  393. if !ok {
  394. klog.Errorf("expected volume, got %+v", volumeObj)
  395. return false
  396. }
  397. ctrl.deleteVolume(volume)
  398. return false
  399. }
  400. for {
  401. if quit := workFunc(); quit {
  402. klog.Infof("volume worker queue shutting down")
  403. return
  404. }
  405. }
  406. }
  407. // claimWorker processes items from claimQueue. It must run only once,
  408. // syncClaim is not reentrant.
  409. func (ctrl *PersistentVolumeController) claimWorker() {
  410. workFunc := func() bool {
  411. keyObj, quit := ctrl.claimQueue.Get()
  412. if quit {
  413. return true
  414. }
  415. defer ctrl.claimQueue.Done(keyObj)
  416. key := keyObj.(string)
  417. klog.V(5).Infof("claimWorker[%s]", key)
  418. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  419. if err != nil {
  420. klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
  421. return false
  422. }
  423. claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
  424. if err == nil {
  425. // The claim still exists in informer cache, the event must have
  426. // been add/update/sync
  427. ctrl.updateClaim(claim)
  428. return false
  429. }
  430. if !errors.IsNotFound(err) {
  431. klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
  432. return false
  433. }
  434. // The claim is not in informer cache, the event must have been "delete"
  435. claimObj, found, err := ctrl.claims.GetByKey(key)
  436. if err != nil {
  437. klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
  438. return false
  439. }
  440. if !found {
  441. // The controller has already processed the delete event and
  442. // deleted the claim from its cache
  443. klog.V(2).Infof("deletion of claim %q was already processed", key)
  444. return false
  445. }
  446. claim, ok := claimObj.(*v1.PersistentVolumeClaim)
  447. if !ok {
  448. klog.Errorf("expected claim, got %+v", claimObj)
  449. return false
  450. }
  451. ctrl.deleteClaim(claim)
  452. return false
  453. }
  454. for {
  455. if quit := workFunc(); quit {
  456. klog.Infof("claim worker queue shutting down")
  457. return
  458. }
  459. }
  460. }
  461. // resync supplements short resync period of shared informers - we don't want
  462. // all consumers of PV/PVC shared informer to have a short resync period,
  463. // therefore we do our own.
  464. func (ctrl *PersistentVolumeController) resync() {
  465. klog.V(4).Infof("resyncing PV controller")
  466. pvcs, err := ctrl.claimLister.List(labels.NewSelector())
  467. if err != nil {
  468. klog.Warningf("cannot list claims: %s", err)
  469. return
  470. }
  471. for _, pvc := range pvcs {
  472. ctrl.enqueueWork(ctrl.claimQueue, pvc)
  473. }
  474. pvs, err := ctrl.volumeLister.List(labels.NewSelector())
  475. if err != nil {
  476. klog.Warningf("cannot list persistent volumes: %s", err)
  477. return
  478. }
  479. for _, pv := range pvs {
  480. ctrl.enqueueWork(ctrl.volumeQueue, pv)
  481. }
  482. }
  483. // setClaimProvisioner saves
  484. // claim.Annotations[pvutil.AnnStorageProvisioner] = class.Provisioner
  485. func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
  486. if val, ok := claim.Annotations[pvutil.AnnStorageProvisioner]; ok && val == provisionerName {
  487. // annotation is already set, nothing to do
  488. return claim, nil
  489. }
  490. // The volume from method args can be pointing to watcher cache. We must not
  491. // modify these, therefore create a copy.
  492. claimClone := claim.DeepCopy()
  493. metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnStorageProvisioner, provisionerName)
  494. updateMigrationAnnotations(ctrl.csiMigratedPluginManager, ctrl.translator, claimClone.Annotations, pvutil.AnnStorageProvisioner)
  495. newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
  496. if err != nil {
  497. return newClaim, err
  498. }
  499. _, err = ctrl.storeClaimUpdate(newClaim)
  500. if err != nil {
  501. return newClaim, err
  502. }
  503. return newClaim, nil
  504. }
  505. // Stateless functions
  506. func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string {
  507. bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted)
  508. boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)
  509. return fmt.Sprintf("phase: %s, bound to: %q, bindCompleted: %v, boundByController: %v", claim.Status.Phase, claim.Spec.VolumeName, bound, boundByController)
  510. }
  511. func getVolumeStatusForLogging(volume *v1.PersistentVolume) string {
  512. boundByController := metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController)
  513. claimName := ""
  514. if volume.Spec.ClaimRef != nil {
  515. claimName = fmt.Sprintf("%s/%s (uid: %s)", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, volume.Spec.ClaimRef.UID)
  516. }
  517. return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
  518. }
  519. // storeObjectUpdate updates given cache with a new object version from Informer
  520. // callback (i.e. with events from etcd) or with an object modified by the
  521. // controller itself. Returns "true", if the cache was updated, false if the
  522. // object is an old version and should be ignored.
  523. func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
  524. objName, err := controller.KeyFunc(obj)
  525. if err != nil {
  526. return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)
  527. }
  528. oldObj, found, err := store.Get(obj)
  529. if err != nil {
  530. return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
  531. }
  532. objAccessor, err := meta.Accessor(obj)
  533. if err != nil {
  534. return false, err
  535. }
  536. if !found {
  537. // This is a new object
  538. klog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
  539. if err = store.Add(obj); err != nil {
  540. return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err)
  541. }
  542. return true, nil
  543. }
  544. oldObjAccessor, err := meta.Accessor(oldObj)
  545. if err != nil {
  546. return false, err
  547. }
  548. objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
  549. if err != nil {
  550. return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
  551. }
  552. oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
  553. if err != nil {
  554. return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
  555. }
  556. // Throw away only older version, let the same version pass - we do want to
  557. // get periodic sync events.
  558. if oldObjResourceVersion > objResourceVersion {
  559. klog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
  560. return false, nil
  561. }
  562. klog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
  563. if err = store.Update(obj); err != nil {
  564. return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err)
  565. }
  566. return true, nil
  567. }