pv_controller.go 76 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package persistentvolume
  14. import (
  15. "fmt"
  16. "reflect"
  17. "strings"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. storage "k8s.io/api/storage/v1"
  21. apierrs "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/client-go/kubernetes/scheme"
  28. corelisters "k8s.io/client-go/listers/core/v1"
  29. storagelisters "k8s.io/client-go/listers/storage/v1"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/tools/record"
  32. ref "k8s.io/client-go/tools/reference"
  33. "k8s.io/client-go/util/workqueue"
  34. cloudprovider "k8s.io/cloud-provider"
  35. volerr "k8s.io/cloud-provider/volume/errors"
  36. csitranslation "k8s.io/csi-translation-lib"
  37. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  38. "k8s.io/kubernetes/pkg/controller/volume/events"
  39. "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
  40. pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
  41. "k8s.io/kubernetes/pkg/features"
  42. "k8s.io/kubernetes/pkg/util/goroutinemap"
  43. "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
  44. vol "k8s.io/kubernetes/pkg/volume"
  45. "k8s.io/kubernetes/pkg/volume/util"
  46. "k8s.io/kubernetes/pkg/volume/util/recyclerclient"
  47. "k8s.io/klog"
  48. )
  49. // ==================================================================
  50. // PLEASE DO NOT ATTEMPT TO SIMPLIFY THIS CODE.
  51. // KEEP THE SPACE SHUTTLE FLYING.
  52. // ==================================================================
  53. //
  54. // This controller is intentionally written in a very verbose style. You will
  55. // notice:
  56. //
  57. // 1. Every 'if' statement has a matching 'else' (exception: simple error
  58. // checks for a client API call)
  59. // 2. Things that may seem obvious are commented explicitly
  60. //
  61. // We call this style 'space shuttle style'. Space shuttle style is meant to
  62. // ensure that every branch and condition is considered and accounted for -
  63. // the same way code is written at NASA for applications like the space
  64. // shuttle.
  65. //
  66. // Originally, the work of this controller was split amongst three
  67. // controllers. This controller is the result a large effort to simplify the
  68. // PV subsystem. During that effort, it became clear that we needed to ensure
  69. // that every single condition was handled and accounted for in the code, even
  70. // if it resulted in no-op code branches.
  71. //
  72. // As a result, the controller code may seem overly verbose, commented, and
  73. // 'branchy'. However, a large amount of business knowledge and context is
  74. // recorded here in order to ensure that future maintainers can correctly
  75. // reason through the complexities of the binding behavior. For that reason,
  76. // changes to this file should preserve and add to the space shuttle style.
  77. //
  78. // ==================================================================
  79. // PLEASE DO NOT ATTEMPT TO SIMPLIFY THIS CODE.
  80. // KEEP THE SPACE SHUTTLE FLYING.
  81. // ==================================================================
  82. // Design:
  83. //
  84. // The fundamental key to this design is the bi-directional "pointer" between
  85. // PersistentVolumes (PVs) and PersistentVolumeClaims (PVCs), which is
  86. // represented here as pvc.Spec.VolumeName and pv.Spec.ClaimRef. The bi-
  87. // directionality is complicated to manage in a transactionless system, but
  88. // without it we can't ensure sane behavior in the face of different forms of
  89. // trouble. For example, a rogue HA controller instance could end up racing
  90. // and making multiple bindings that are indistinguishable, resulting in
  91. // potential data loss.
  92. //
  93. // This controller is designed to work in active-passive high availability
  94. // mode. It *could* work also in active-active HA mode, all the object
  95. // transitions are designed to cope with this, however performance could be
  96. // lower as these two active controllers will step on each other toes
  97. // frequently.
  98. //
  99. // This controller supports pre-bound (by the creator) objects in both
  100. // directions: a PVC that wants a specific PV or a PV that is reserved for a
  101. // specific PVC.
  102. //
  103. // The binding is two-step process. PV.Spec.ClaimRef is modified first and
  104. // PVC.Spec.VolumeName second. At any point of this transaction, the PV or PVC
  105. // can be modified by user or other controller or completely deleted. Also,
  106. // two (or more) controllers may try to bind different volumes to different
  107. // claims at the same time. The controller must recover from any conflicts
  108. // that may arise from these conditions.
  109. // CloudVolumeCreatedForClaimNamespaceTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
  110. // with namespace of a persistent volume claim used to create this volume.
  111. const CloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace"
  112. // CloudVolumeCreatedForClaimNameTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
  113. // with name of a persistent volume claim used to create this volume.
  114. const CloudVolumeCreatedForClaimNameTag = "kubernetes.io/created-for/pvc/name"
  115. // CloudVolumeCreatedForVolumeNameTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
  116. // with name of appropriate Kubernetes persistent volume .
  117. const CloudVolumeCreatedForVolumeNameTag = "kubernetes.io/created-for/pv/name"
  118. // Number of retries when we create a PV object for a provisioned volume.
  119. const createProvisionedPVRetryCount = 5
  120. // Interval between retries when we create a PV object for a provisioned volume.
  121. const createProvisionedPVInterval = 10 * time.Second
  122. // PersistentVolumeController is a controller that synchronizes
  123. // PersistentVolumeClaims and PersistentVolumes. It starts two
  124. // cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
  125. // changes.
  126. type PersistentVolumeController struct {
  127. volumeLister corelisters.PersistentVolumeLister
  128. volumeListerSynced cache.InformerSynced
  129. claimLister corelisters.PersistentVolumeClaimLister
  130. claimListerSynced cache.InformerSynced
  131. classLister storagelisters.StorageClassLister
  132. classListerSynced cache.InformerSynced
  133. podLister corelisters.PodLister
  134. podListerSynced cache.InformerSynced
  135. NodeLister corelisters.NodeLister
  136. NodeListerSynced cache.InformerSynced
  137. kubeClient clientset.Interface
  138. eventRecorder record.EventRecorder
  139. cloud cloudprovider.Interface
  140. volumePluginMgr vol.VolumePluginMgr
  141. enableDynamicProvisioning bool
  142. clusterName string
  143. resyncPeriod time.Duration
  144. // Cache of the last known version of volumes and claims. This cache is
  145. // thread safe as long as the volumes/claims there are not modified, they
  146. // must be cloned before any modification. These caches get updated both by
  147. // "xxx added/updated/deleted" events from etcd and by the controller when
  148. // it saves newer version to etcd.
  149. // Why local cache: binding a volume to a claim generates 4 events, roughly
  150. // in this order (depends on goroutine ordering):
  151. // - volume.Spec update
  152. // - volume.Status update
  153. // - claim.Spec update
  154. // - claim.Status update
  155. // With these caches, the controller can check that it has already saved
  156. // volume.Status and claim.Spec+Status and does not need to do anything
  157. // when e.g. volume.Spec update event arrives before all the other events.
  158. // Without this cache, it would see the old version of volume.Status and
  159. // claim in the informers (it has not been updated from API server events
  160. // yet) and it would try to fix these objects to be bound together.
  161. // Any write to API server would fail with version conflict - these objects
  162. // have been already written.
  163. volumes persistentVolumeOrderedIndex
  164. claims cache.Store
  165. // Work queues of claims and volumes to process. Every queue should have
  166. // exactly one worker thread, especially syncClaim() is not reentrant.
  167. // Two syncClaims could bind two different claims to the same volume or one
  168. // claim to two volumes. The controller would recover from this (due to
  169. // version errors in API server and other checks in this controller),
  170. // however overall speed of multi-worker controller would be lower than if
  171. // it runs single thread only.
  172. claimQueue *workqueue.Type
  173. volumeQueue *workqueue.Type
  174. // Map of scheduled/running operations.
  175. runningOperations goroutinemap.GoRoutineMap
  176. // For testing only: hook to call before an asynchronous operation starts.
  177. // Not used when set to nil.
  178. preOperationHook func(operationName string)
  179. createProvisionedPVRetryCount int
  180. createProvisionedPVInterval time.Duration
  181. // For testing only: hook to intercept CSI driver name <=> Intree plugin name mapping
  182. // Not used when set to nil
  183. csiNameFromIntreeNameHook func(pluginName string) (string, error)
  184. // operationTimestamps caches start timestamp of operations
  185. // (currently provision + binding/deletion) for metric recording.
  186. // Detailed lifecyle/key for each operation
  187. // 1. provision + binding
  188. // key: claimKey
  189. // start time: user has NOT provide any volume ref in the claim AND
  190. // there is no existing volume found for the claim,
  191. // "provisionClaim" is called with a valid plugin/external provisioner
  192. // to provision a volume
  193. // end time: after a volume has been provisioned and bound to the claim successfully
  194. // the corresponding timestamp entry will be deleted from cache
  195. // abort: claim has not been bound to a volume yet but a claim deleted event
  196. // has been received from API server
  197. // 2. deletion
  198. // key: volumeName
  199. // start time: when "reclaimVolume" process a volume with reclaim policy
  200. // set to be "PersistentVolumeReclaimDelete"
  201. // end time: after a volume deleted event has been received from API server
  202. // the corresponding timestamp entry will be deleted from cache
  203. // abort: N.A.
  204. operationTimestamps metrics.OperationStartTimeCache
  205. }
  206. // syncClaim is the main controller method to decide what to do with a claim.
  207. // It's invoked by appropriate cache.Controller callbacks when a claim is
  208. // created, updated or periodically synced. We do not differentiate between
  209. // these events.
  210. // For easier readability, it was split into syncUnboundClaim and syncBoundClaim
  211. // methods.
  212. func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
  213. klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
  214. if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
  215. return ctrl.syncUnboundClaim(claim)
  216. } else {
  217. return ctrl.syncBoundClaim(claim)
  218. }
  219. }
  220. // checkVolumeSatisfyClaim checks if the volume requested by the claim satisfies the requirements of the claim
  221. func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {
  222. requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
  223. requestedSize := requestedQty.Value()
  224. // check if PV's DeletionTimeStamp is set, if so, return error.
  225. if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
  226. if volume.ObjectMeta.DeletionTimestamp != nil {
  227. return fmt.Errorf("the volume is marked for deletion")
  228. }
  229. }
  230. volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
  231. volumeSize := volumeQty.Value()
  232. if volumeSize < requestedSize {
  233. return fmt.Errorf("requested PV is too small")
  234. }
  235. requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
  236. if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
  237. return fmt.Errorf("storageClassName does not match")
  238. }
  239. isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
  240. if err != nil {
  241. return fmt.Errorf("error checking volumeMode: %v", err)
  242. }
  243. if isMismatch {
  244. return fmt.Errorf("incompatible volumeMode")
  245. }
  246. if !pvutil.CheckAccessModes(claim, volume) {
  247. return fmt.Errorf("incompatible accessMode")
  248. }
  249. return nil
  250. }
  251. func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool {
  252. // When feature VolumeScheduling enabled,
  253. // Scheduler signal to the PV controller to start dynamic
  254. // provisioning by setting the "AnnSelectedNode" annotation
  255. // in the PVC
  256. _, ok := claim.Annotations[pvutil.AnnSelectedNode]
  257. return ok
  258. }
  259. // shouldDelayBinding returns true if binding of claim should be delayed, false otherwise.
  260. // If binding of claim should be delayed, only claims pbound by scheduler
  261. func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
  262. // If claim has already been assigned a node by scheduler for dynamic provisioning.
  263. if ctrl.isDelayBindingProvisioning(claim) {
  264. return false, nil
  265. }
  266. // If claim is in delay binding mode.
  267. return pvutil.IsDelayBindingMode(claim, ctrl.classLister)
  268. }
  269. // syncUnboundClaim is the main controller method to decide what to do with an
  270. // unbound claim.
  271. func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
  272. // This is a new PVC that has not completed binding
  273. // OBSERVATION: pvc is "Pending"
  274. if claim.Spec.VolumeName == "" {
  275. // User did not care which PV they get.
  276. delayBinding, err := ctrl.shouldDelayBinding(claim)
  277. if err != nil {
  278. return err
  279. }
  280. // [Unit test set 1]
  281. volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
  282. if err != nil {
  283. klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
  284. return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
  285. }
  286. if volume == nil {
  287. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
  288. // No PV could be found
  289. // OBSERVATION: pvc is "Pending", will retry
  290. switch {
  291. case delayBinding:
  292. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")
  293. case v1helper.GetPersistentVolumeClaimClass(claim) != "":
  294. if err = ctrl.provisionClaim(claim); err != nil {
  295. return err
  296. }
  297. return nil
  298. default:
  299. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
  300. }
  301. // Mark the claim as Pending and try to find a match in the next
  302. // periodic syncClaim
  303. if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
  304. return err
  305. }
  306. return nil
  307. } else /* pv != nil */ {
  308. // Found a PV for this claim
  309. // OBSERVATION: pvc is "Pending", pv is "Available"
  310. claimKey := claimToClaimKey(claim)
  311. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
  312. if err = ctrl.bind(volume, claim); err != nil {
  313. // On any error saving the volume or the claim, subsequent
  314. // syncClaim will finish the binding.
  315. // record count error for provision if exists
  316. // timestamp entry will remain in cache until a success binding has happened
  317. metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
  318. return err
  319. }
  320. // OBSERVATION: claim is "Bound", pv is "Bound"
  321. // if exists a timestamp entry in cache, record end to end provision latency and clean up cache
  322. // End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric"
  323. // [Unit test 12-1, 12-2, 12-4]
  324. metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
  325. return nil
  326. }
  327. } else /* pvc.Spec.VolumeName != nil */ {
  328. // [Unit test set 2]
  329. // User asked for a specific PV.
  330. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
  331. obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
  332. if err != nil {
  333. return err
  334. }
  335. if !found {
  336. // User asked for a PV that does not exist.
  337. // OBSERVATION: pvc is "Pending"
  338. // Retry later.
  339. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
  340. if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
  341. return err
  342. }
  343. return nil
  344. } else {
  345. volume, ok := obj.(*v1.PersistentVolume)
  346. if !ok {
  347. return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
  348. }
  349. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
  350. if volume.Spec.ClaimRef == nil {
  351. // User asked for a PV that is not claimed
  352. // OBSERVATION: pvc is "Pending", pv is "Available"
  353. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
  354. if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
  355. klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
  356. // send an event
  357. msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
  358. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
  359. // volume does not satisfy the requirements of the claim
  360. if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
  361. return err
  362. }
  363. } else if err = ctrl.bind(volume, claim); err != nil {
  364. // On any error saving the volume or the claim, subsequent
  365. // syncClaim will finish the binding.
  366. return err
  367. }
  368. // OBSERVATION: pvc is "Bound", pv is "Bound"
  369. return nil
  370. } else if pvutil.IsVolumeBoundToClaim(volume, claim) {
  371. // User asked for a PV that is claimed by this PVC
  372. // OBSERVATION: pvc is "Pending", pv is "Bound"
  373. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
  374. // Finish the volume binding by adding claim UID.
  375. if err = ctrl.bind(volume, claim); err != nil {
  376. return err
  377. }
  378. // OBSERVATION: pvc is "Bound", pv is "Bound"
  379. return nil
  380. } else {
  381. // User asked for a PV that is claimed by someone else
  382. // OBSERVATION: pvc is "Pending", pv is "Bound"
  383. if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
  384. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
  385. // User asked for a specific PV, retry later
  386. if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
  387. return err
  388. }
  389. return nil
  390. } else {
  391. // This should never happen because someone had to remove
  392. // AnnBindCompleted annotation on the claim.
  393. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
  394. return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
  395. }
  396. }
  397. }
  398. }
  399. }
  400. // syncBoundClaim is the main controller method to decide what to do with a
  401. // bound claim.
  402. func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
  403. // HasAnnotation(pvc, pvutil.AnnBindCompleted)
  404. // This PVC has previously been bound
  405. // OBSERVATION: pvc is not "Pending"
  406. // [Unit test set 3]
  407. if claim.Spec.VolumeName == "" {
  408. // Claim was bound before but not any more.
  409. if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
  410. return err
  411. }
  412. return nil
  413. }
  414. obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
  415. if err != nil {
  416. return err
  417. }
  418. if !found {
  419. // Claim is bound to a non-existing volume.
  420. if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
  421. return err
  422. }
  423. return nil
  424. } else {
  425. volume, ok := obj.(*v1.PersistentVolume)
  426. if !ok {
  427. return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
  428. }
  429. klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
  430. if volume.Spec.ClaimRef == nil {
  431. // Claim is bound but volume has come unbound.
  432. // Or, a claim was bound and the controller has not received updated
  433. // volume yet. We can't distinguish these cases.
  434. // Bind the volume again and set all states to Bound.
  435. klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
  436. if err = ctrl.bind(volume, claim); err != nil {
  437. // Objects not saved, next syncPV or syncClaim will try again
  438. return err
  439. }
  440. return nil
  441. } else if volume.Spec.ClaimRef.UID == claim.UID {
  442. // All is well
  443. // NOTE: syncPV can handle this so it can be left out.
  444. // NOTE: bind() call here will do nothing in most cases as
  445. // everything should be already set.
  446. klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
  447. if err = ctrl.bind(volume, claim); err != nil {
  448. // Objects not saved, next syncPV or syncClaim will try again
  449. return err
  450. }
  451. return nil
  452. } else {
  453. // Claim is bound but volume has a different claimant.
  454. // Set the claim phase to 'Lost', which is a terminal
  455. // phase.
  456. if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
  457. return err
  458. }
  459. return nil
  460. }
  461. }
  462. }
  463. // syncVolume is the main controller method to decide what to do with a volume.
  464. // It's invoked by appropriate cache.Controller callbacks when a volume is
  465. // created, updated or periodically synced. We do not differentiate between
  466. // these events.
  467. func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {
  468. klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
  469. // [Unit test set 4]
  470. if volume.Spec.ClaimRef == nil {
  471. // Volume is unused
  472. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
  473. if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
  474. // Nothing was saved; we will fall back into the same
  475. // condition in the next call to this method
  476. return err
  477. }
  478. return nil
  479. } else /* pv.Spec.ClaimRef != nil */ {
  480. // Volume is bound to a claim.
  481. if volume.Spec.ClaimRef.UID == "" {
  482. // The PV is reserved for a PVC; that PVC has not yet been
  483. // bound to this PV; the PVC sync will handle it.
  484. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
  485. if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
  486. // Nothing was saved; we will fall back into the same
  487. // condition in the next call to this method
  488. return err
  489. }
  490. return nil
  491. }
  492. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
  493. // Get the PVC by _name_
  494. var claim *v1.PersistentVolumeClaim
  495. claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
  496. obj, found, err := ctrl.claims.GetByKey(claimName)
  497. if err != nil {
  498. return err
  499. }
  500. if !found && metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
  501. // If PV is bound by external PV binder (e.g. kube-scheduler), it's
  502. // possible on heavy load that corresponding PVC is not synced to
  503. // controller local cache yet. So we need to double-check PVC in
  504. // 1) informer cache
  505. // 2) apiserver if not found in informer cache
  506. // to make sure we will not reclaim a PV wrongly.
  507. // Note that only non-released and non-failed volumes will be
  508. // updated to Released state when PVC does not exist.
  509. if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
  510. obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
  511. if err != nil && !apierrs.IsNotFound(err) {
  512. return err
  513. }
  514. found = !apierrs.IsNotFound(err)
  515. if !found {
  516. obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{})
  517. if err != nil && !apierrs.IsNotFound(err) {
  518. return err
  519. }
  520. found = !apierrs.IsNotFound(err)
  521. }
  522. }
  523. }
  524. if !found {
  525. klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
  526. // Fall through with claim = nil
  527. } else {
  528. var ok bool
  529. claim, ok = obj.(*v1.PersistentVolumeClaim)
  530. if !ok {
  531. return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
  532. }
  533. klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
  534. }
  535. if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
  536. // The claim that the PV was pointing to was deleted, and another
  537. // with the same name created.
  538. klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
  539. // Treat the volume as bound to a missing claim.
  540. claim = nil
  541. }
  542. if claim == nil {
  543. // If we get into this block, the claim must have been deleted;
  544. // NOTE: reclaimVolume may either release the PV back into the pool or
  545. // recycle it or do nothing (retain)
  546. // Do not overwrite previous Failed state - let the user see that
  547. // something went wrong, while we still re-try to reclaim the
  548. // volume.
  549. if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
  550. // Also, log this only once:
  551. klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
  552. if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
  553. // Nothing was saved; we will fall back into the same condition
  554. // in the next call to this method
  555. return err
  556. }
  557. }
  558. if err = ctrl.reclaimVolume(volume); err != nil {
  559. // Release failed, we will fall back into the same condition
  560. // in the next call to this method
  561. return err
  562. }
  563. return nil
  564. } else if claim.Spec.VolumeName == "" {
  565. if isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch {
  566. // Binding for the volume won't be called in syncUnboundClaim,
  567. // because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
  568. volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
  569. ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
  570. claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
  571. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
  572. // Skipping syncClaim
  573. return nil
  574. }
  575. if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
  576. // The binding is not completed; let PVC sync handle it
  577. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
  578. } else {
  579. // Dangling PV; try to re-establish the link in the PVC sync
  580. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
  581. }
  582. // In both cases, the volume is Bound and the claim is Pending.
  583. // Next syncClaim will fix it. To speed it up, we enqueue the claim
  584. // into the controller, which results in syncClaim to be called
  585. // shortly (and in the right worker goroutine).
  586. // This speeds up binding of provisioned volumes - provisioner saves
  587. // only the new PV and it expects that next syncClaim will bind the
  588. // claim to it.
  589. ctrl.claimQueue.Add(claimToClaimKey(claim))
  590. return nil
  591. } else if claim.Spec.VolumeName == volume.Name {
  592. // Volume is bound to a claim properly, update status if necessary
  593. klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
  594. if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
  595. // Nothing was saved; we will fall back into the same
  596. // condition in the next call to this method
  597. return err
  598. }
  599. return nil
  600. } else {
  601. // Volume is bound to a claim, but the claim is bound elsewhere
  602. if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
  603. // This volume was dynamically provisioned for this claim. The
  604. // claim got bound elsewhere, and thus this volume is not
  605. // needed. Delete it.
  606. // Mark the volume as Released for external deleters and to let
  607. // the user know. Don't overwrite existing Failed status!
  608. if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
  609. // Also, log this only once:
  610. klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
  611. if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
  612. // Nothing was saved; we will fall back into the same condition
  613. // in the next call to this method
  614. return err
  615. }
  616. }
  617. if err = ctrl.reclaimVolume(volume); err != nil {
  618. // Deletion failed, we will fall back into the same condition
  619. // in the next call to this method
  620. return err
  621. }
  622. return nil
  623. } else {
  624. // Volume is bound to a claim, but the claim is bound elsewhere
  625. // and it's not dynamically provisioned.
  626. if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
  627. // This is part of the normal operation of the controller; the
  628. // controller tried to use this volume for a claim but the claim
  629. // was fulfilled by another volume. We did this; fix it.
  630. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
  631. if err = ctrl.unbindVolume(volume); err != nil {
  632. return err
  633. }
  634. return nil
  635. } else {
  636. // The PV must have been created with this ptr; leave it alone.
  637. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
  638. // This just updates the volume phase and clears
  639. // volume.Spec.ClaimRef.UID. It leaves the volume pre-bound
  640. // to the claim.
  641. if err = ctrl.unbindVolume(volume); err != nil {
  642. return err
  643. }
  644. return nil
  645. }
  646. }
  647. }
  648. }
  649. }
  650. // updateClaimStatus saves new claim.Status to API server.
  651. // Parameters:
  652. // claim - claim to update
  653. // phase - phase to set
  654. // volume - volume which Capacity is set into claim.Status.Capacity
  655. func (ctrl *PersistentVolumeController) updateClaimStatus(claim *v1.PersistentVolumeClaim, phase v1.PersistentVolumeClaimPhase, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
  656. klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: set phase %s", claimToClaimKey(claim), phase)
  657. dirty := false
  658. claimClone := claim.DeepCopy()
  659. if claim.Status.Phase != phase {
  660. claimClone.Status.Phase = phase
  661. dirty = true
  662. }
  663. if volume == nil {
  664. // Need to reset AccessModes and Capacity
  665. if claim.Status.AccessModes != nil {
  666. claimClone.Status.AccessModes = nil
  667. dirty = true
  668. }
  669. if claim.Status.Capacity != nil {
  670. claimClone.Status.Capacity = nil
  671. dirty = true
  672. }
  673. } else {
  674. // Need to update AccessModes and Capacity
  675. if !reflect.DeepEqual(claim.Status.AccessModes, volume.Spec.AccessModes) {
  676. claimClone.Status.AccessModes = volume.Spec.AccessModes
  677. dirty = true
  678. }
  679. // Update Capacity if the claim is becoming Bound, not if it was already.
  680. // A discrepancy can be intentional to mean that the PVC filesystem size
  681. // doesn't match the PV block device size, so don't clobber it
  682. if claim.Status.Phase != phase {
  683. volumeCap, ok := volume.Spec.Capacity[v1.ResourceStorage]
  684. if !ok {
  685. return nil, fmt.Errorf("PersistentVolume %q is without a storage capacity", volume.Name)
  686. }
  687. claimCap, ok := claim.Status.Capacity[v1.ResourceStorage]
  688. if !ok || volumeCap.Cmp(claimCap) != 0 {
  689. claimClone.Status.Capacity = volume.Spec.Capacity
  690. dirty = true
  691. }
  692. }
  693. }
  694. if !dirty {
  695. // Nothing to do.
  696. klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: phase %s already set", claimToClaimKey(claim), phase)
  697. return claim, nil
  698. }
  699. newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone)
  700. if err != nil {
  701. klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: set phase %s failed: %v", claimToClaimKey(claim), phase, err)
  702. return newClaim, err
  703. }
  704. _, err = ctrl.storeClaimUpdate(newClaim)
  705. if err != nil {
  706. klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: cannot update internal cache: %v", claimToClaimKey(claim), err)
  707. return newClaim, err
  708. }
  709. klog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase)
  710. return newClaim, nil
  711. }
  712. // updateClaimStatusWithEvent saves new claim.Status to API server and emits
  713. // given event on the claim. It saves the status and emits the event only when
  714. // the status has actually changed from the version saved in API server.
  715. // Parameters:
  716. // claim - claim to update
  717. // phase - phase to set
  718. // volume - volume which Capacity is set into claim.Status.Capacity
  719. // eventtype, reason, message - event to send, see EventRecorder.Event()
  720. func (ctrl *PersistentVolumeController) updateClaimStatusWithEvent(claim *v1.PersistentVolumeClaim, phase v1.PersistentVolumeClaimPhase, volume *v1.PersistentVolume, eventtype, reason, message string) (*v1.PersistentVolumeClaim, error) {
  721. klog.V(4).Infof("updating updateClaimStatusWithEvent[%s]: set phase %s", claimToClaimKey(claim), phase)
  722. if claim.Status.Phase == phase {
  723. // Nothing to do.
  724. klog.V(4).Infof("updating updateClaimStatusWithEvent[%s]: phase %s already set", claimToClaimKey(claim), phase)
  725. return claim, nil
  726. }
  727. newClaim, err := ctrl.updateClaimStatus(claim, phase, volume)
  728. if err != nil {
  729. return nil, err
  730. }
  731. // Emit the event only when the status change happens, not every time
  732. // syncClaim is called.
  733. klog.V(3).Infof("claim %q changed status to %q: %s", claimToClaimKey(claim), phase, message)
  734. ctrl.eventRecorder.Event(newClaim, eventtype, reason, message)
  735. return newClaim, nil
  736. }
  737. // updateVolumePhase saves new volume phase to API server.
  738. func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {
  739. klog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)
  740. if volume.Status.Phase == phase {
  741. // Nothing to do.
  742. klog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)
  743. return volume, nil
  744. }
  745. volumeClone := volume.DeepCopy()
  746. volumeClone.Status.Phase = phase
  747. volumeClone.Status.Message = message
  748. newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(volumeClone)
  749. if err != nil {
  750. klog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
  751. return newVol, err
  752. }
  753. _, err = ctrl.storeVolumeUpdate(newVol)
  754. if err != nil {
  755. klog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
  756. return newVol, err
  757. }
  758. klog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)
  759. return newVol, err
  760. }
  761. // updateVolumePhaseWithEvent saves new volume phase to API server and emits
  762. // given event on the volume. It saves the phase and emits the event only when
  763. // the phase has actually changed from the version saved in API server.
  764. func (ctrl *PersistentVolumeController) updateVolumePhaseWithEvent(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, eventtype, reason, message string) (*v1.PersistentVolume, error) {
  765. klog.V(4).Infof("updating updateVolumePhaseWithEvent[%s]: set phase %s", volume.Name, phase)
  766. if volume.Status.Phase == phase {
  767. // Nothing to do.
  768. klog.V(4).Infof("updating updateVolumePhaseWithEvent[%s]: phase %s already set", volume.Name, phase)
  769. return volume, nil
  770. }
  771. newVol, err := ctrl.updateVolumePhase(volume, phase, message)
  772. if err != nil {
  773. return nil, err
  774. }
  775. // Emit the event only when the status change happens, not every time
  776. // syncClaim is called.
  777. klog.V(3).Infof("volume %q changed status to %q: %s", volume.Name, phase, message)
  778. ctrl.eventRecorder.Event(newVol, eventtype, reason, message)
  779. return newVol, nil
  780. }
  781. // bindVolumeToClaim modifies given volume to be bound to a claim and saves it to
  782. // API server. The claim is not modified in this method!
  783. func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
  784. klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim))
  785. volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim)
  786. if err != nil {
  787. return nil, err
  788. }
  789. // Save the volume only if something was changed
  790. if dirty {
  791. return ctrl.updateBindVolumeToClaim(volumeClone, claim, true)
  792. }
  793. klog.V(4).Infof("updating PersistentVolume[%s]: already bound to %q", volume.Name, claimToClaimKey(claim))
  794. return volume, nil
  795. }
  796. // bindVolumeToClaim modifies given volume to be bound to a claim and saves it to
  797. // API server. The claim is not modified in this method!
  798. func (ctrl *PersistentVolumeController) updateBindVolumeToClaim(volumeClone *v1.PersistentVolume, claim *v1.PersistentVolumeClaim, updateCache bool) (*v1.PersistentVolume, error) {
  799. klog.V(2).Infof("claim %q bound to volume %q", claimToClaimKey(claim), volumeClone.Name)
  800. newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)
  801. if err != nil {
  802. klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volumeClone.Name, claimToClaimKey(claim), err)
  803. return newVol, err
  804. }
  805. if updateCache {
  806. _, err = ctrl.storeVolumeUpdate(newVol)
  807. if err != nil {
  808. klog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volumeClone.Name, err)
  809. return newVol, err
  810. }
  811. }
  812. klog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", newVol.Name, claimToClaimKey(claim))
  813. return newVol, nil
  814. }
  815. // bindClaimToVolume modifies the given claim to be bound to a volume and
  816. // saves it to API server. The volume is not modified in this method!
  817. func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
  818. klog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q", claimToClaimKey(claim), volume.Name)
  819. dirty := false
  820. // Check if the claim was already bound (either by controller or by user)
  821. shouldBind := false
  822. if volume.Name != claim.Spec.VolumeName {
  823. shouldBind = true
  824. }
  825. // The claim from method args can be pointing to watcher cache. We must not
  826. // modify these, therefore create a copy.
  827. claimClone := claim.DeepCopy()
  828. if shouldBind {
  829. dirty = true
  830. // Bind the claim to the volume
  831. claimClone.Spec.VolumeName = volume.Name
  832. // Set AnnBoundByController if it is not set yet
  833. if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBoundByController) {
  834. metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBoundByController, "yes")
  835. }
  836. }
  837. // Set AnnBindCompleted if it is not set yet
  838. if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBindCompleted) {
  839. metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBindCompleted, "yes")
  840. dirty = true
  841. }
  842. if dirty {
  843. klog.V(2).Infof("volume %q bound to claim %q", volume.Name, claimToClaimKey(claim))
  844. newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claimClone)
  845. if err != nil {
  846. klog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err)
  847. return newClaim, err
  848. }
  849. _, err = ctrl.storeClaimUpdate(newClaim)
  850. if err != nil {
  851. klog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
  852. return newClaim, err
  853. }
  854. klog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name)
  855. return newClaim, nil
  856. }
  857. klog.V(4).Infof("updating PersistentVolumeClaim[%s]: already bound to %q", claimToClaimKey(claim), volume.Name)
  858. return claim, nil
  859. }
  860. // bind saves binding information both to the volume and the claim and marks
  861. // both objects as Bound. Volume is saved first.
  862. // It returns on first error, it's up to the caller to implement some retry
  863. // mechanism.
  864. func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {
  865. var err error
  866. // use updateClaim/updatedVolume to keep the original claim/volume for
  867. // logging in error cases.
  868. var updatedClaim *v1.PersistentVolumeClaim
  869. var updatedVolume *v1.PersistentVolume
  870. klog.V(4).Infof("binding volume %q to claim %q", volume.Name, claimToClaimKey(claim))
  871. if updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {
  872. klog.V(3).Infof("error binding volume %q to claim %q: failed saving the volume: %v", volume.Name, claimToClaimKey(claim), err)
  873. return err
  874. }
  875. volume = updatedVolume
  876. if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
  877. klog.V(3).Infof("error binding volume %q to claim %q: failed saving the volume status: %v", volume.Name, claimToClaimKey(claim), err)
  878. return err
  879. }
  880. volume = updatedVolume
  881. if updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {
  882. klog.V(3).Infof("error binding volume %q to claim %q: failed saving the claim: %v", volume.Name, claimToClaimKey(claim), err)
  883. return err
  884. }
  885. claim = updatedClaim
  886. if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {
  887. klog.V(3).Infof("error binding volume %q to claim %q: failed saving the claim status: %v", volume.Name, claimToClaimKey(claim), err)
  888. return err
  889. }
  890. claim = updatedClaim
  891. klog.V(4).Infof("volume %q bound to claim %q", volume.Name, claimToClaimKey(claim))
  892. klog.V(4).Infof("volume %q status after binding: %s", volume.Name, getVolumeStatusForLogging(volume))
  893. klog.V(4).Infof("claim %q status after binding: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
  894. return nil
  895. }
  896. // unbindVolume rolls back previous binding of the volume. This may be necessary
  897. // when two controllers bound two volumes to single claim - when we detect this,
  898. // only one binding succeeds and the second one must be rolled back.
  899. // This method updates both Spec and Status.
  900. // It returns on first error, it's up to the caller to implement some retry
  901. // mechanism.
  902. func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {
  903. klog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
  904. // Save the PV only when any modification is necessary.
  905. volumeClone := volume.DeepCopy()
  906. if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {
  907. // The volume was bound by the controller.
  908. volumeClone.Spec.ClaimRef = nil
  909. delete(volumeClone.Annotations, pvutil.AnnBoundByController)
  910. if len(volumeClone.Annotations) == 0 {
  911. // No annotations look better than empty annotation map (and it's easier
  912. // to test).
  913. volumeClone.Annotations = nil
  914. }
  915. } else {
  916. // The volume was pre-bound by user. Clear only the binging UID.
  917. volumeClone.Spec.ClaimRef.UID = ""
  918. }
  919. newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)
  920. if err != nil {
  921. klog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)
  922. return err
  923. }
  924. _, err = ctrl.storeVolumeUpdate(newVol)
  925. if err != nil {
  926. klog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
  927. return err
  928. }
  929. klog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)
  930. // Update the status
  931. _, err = ctrl.updateVolumePhase(newVol, v1.VolumeAvailable, "")
  932. return err
  933. }
  934. // reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
  935. // starts appropriate reclaim action.
  936. func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {
  937. switch volume.Spec.PersistentVolumeReclaimPolicy {
  938. case v1.PersistentVolumeReclaimRetain:
  939. klog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)
  940. case v1.PersistentVolumeReclaimRecycle:
  941. klog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
  942. opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))
  943. ctrl.scheduleOperation(opName, func() error {
  944. ctrl.recycleVolumeOperation(volume)
  945. return nil
  946. })
  947. case v1.PersistentVolumeReclaimDelete:
  948. klog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
  949. opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
  950. // create a start timestamp entry in cache for deletion operation if no one exists with
  951. // key = volume.Name, pluginName = provisionerName, operation = "delete"
  952. ctrl.operationTimestamps.AddIfNotExist(volume.Name, ctrl.getProvisionerNameFromVolume(volume), "delete")
  953. ctrl.scheduleOperation(opName, func() error {
  954. _, err := ctrl.deleteVolumeOperation(volume)
  955. if err != nil {
  956. // only report error count to "volume_operation_total_errors"
  957. // latency reporting will happen when the volume get finally
  958. // deleted and a volume deleted event is captured
  959. metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, err)
  960. }
  961. return err
  962. })
  963. default:
  964. // Unknown PersistentVolumeReclaimPolicy
  965. if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil {
  966. return err
  967. }
  968. }
  969. return nil
  970. }
  971. // recycleVolumeOperation recycles a volume. This method is running in
  972. // standalone goroutine and already has all necessary locks.
  973. func (ctrl *PersistentVolumeController) recycleVolumeOperation(volume *v1.PersistentVolume) {
  974. klog.V(4).Infof("recycleVolumeOperation [%s] started", volume.Name)
  975. // This method may have been waiting for a volume lock for some time.
  976. // Previous recycleVolumeOperation might just have saved an updated version,
  977. // so read current volume state now.
  978. newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
  979. if err != nil {
  980. klog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)
  981. return
  982. }
  983. needsReclaim, err := ctrl.isVolumeReleased(newVolume)
  984. if err != nil {
  985. klog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)
  986. return
  987. }
  988. if !needsReclaim {
  989. klog.V(3).Infof("volume %q no longer needs recycling, skipping", volume.Name)
  990. return
  991. }
  992. pods, used, err := ctrl.isVolumeUsed(newVolume)
  993. if err != nil {
  994. klog.V(3).Infof("can't recycle volume %q: %v", volume.Name, err)
  995. return
  996. }
  997. // Verify the claim is in cache: if so, then it is a different PVC with the same name
  998. // since the volume is known to be released at this moment. Ths new (cached) PVC must use
  999. // a different PV -- we checked that the PV is unused in isVolumeReleased.
  1000. // So the old PV is safe to be recycled.
  1001. claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
  1002. _, claimCached, err := ctrl.claims.GetByKey(claimName)
  1003. if err != nil {
  1004. klog.V(3).Infof("error getting the claim %s from cache", claimName)
  1005. return
  1006. }
  1007. if used && !claimCached {
  1008. msg := fmt.Sprintf("Volume is used by pods: %s", strings.Join(pods, ","))
  1009. klog.V(3).Infof("can't recycle volume %q: %s", volume.Name, msg)
  1010. ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeFailedRecycle, msg)
  1011. return
  1012. }
  1013. // Use the newest volume copy, this will save us from version conflicts on
  1014. // saving.
  1015. volume = newVolume
  1016. // Find a plugin.
  1017. spec := vol.NewSpecFromPersistentVolume(volume, false)
  1018. plugin, err := ctrl.volumePluginMgr.FindRecyclablePluginBySpec(spec)
  1019. if err != nil {
  1020. // No recycler found. Emit an event and mark the volume Failed.
  1021. if _, err = ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedRecycle, "No recycler plugin found for the volume!"); err != nil {
  1022. klog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
  1023. // Save failed, retry on the next deletion attempt
  1024. return
  1025. }
  1026. // Despite the volume being Failed, the controller will retry recycling
  1027. // the volume in every syncVolume() call.
  1028. return
  1029. }
  1030. // Plugin found
  1031. recorder := ctrl.newRecyclerEventRecorder(volume)
  1032. if err = plugin.Recycle(volume.Name, spec, recorder); err != nil {
  1033. // Recycler failed
  1034. strerr := fmt.Sprintf("Recycle failed: %s", err)
  1035. if _, err = ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedRecycle, strerr); err != nil {
  1036. klog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
  1037. // Save failed, retry on the next deletion attempt
  1038. return
  1039. }
  1040. // Despite the volume being Failed, the controller will retry recycling
  1041. // the volume in every syncVolume() call.
  1042. return
  1043. }
  1044. klog.V(2).Infof("volume %q recycled", volume.Name)
  1045. // Send an event
  1046. ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeRecycled, "Volume recycled")
  1047. // Make the volume available again
  1048. if err = ctrl.unbindVolume(volume); err != nil {
  1049. // Oops, could not save the volume and therefore the controller will
  1050. // recycle the volume again on next update. We _could_ maintain a cache
  1051. // of "recently recycled volumes" and avoid unnecessary recycling, this
  1052. // is left out as future optimization.
  1053. klog.V(3).Infof("recycleVolumeOperation [%s]: failed to make recycled volume 'Available' (%v), we will recycle the volume again", volume.Name, err)
  1054. return
  1055. }
  1056. return
  1057. }
  1058. // deleteVolumeOperation deletes a volume. This method is running in standalone
  1059. // goroutine and already has all necessary locks.
  1060. func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) {
  1061. klog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)
  1062. // This method may have been waiting for a volume lock for some time.
  1063. // Previous deleteVolumeOperation might just have saved an updated version, so
  1064. // read current volume state now.
  1065. newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
  1066. if err != nil {
  1067. klog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)
  1068. return "", nil
  1069. }
  1070. needsReclaim, err := ctrl.isVolumeReleased(newVolume)
  1071. if err != nil {
  1072. klog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)
  1073. return "", nil
  1074. }
  1075. if !needsReclaim {
  1076. klog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name)
  1077. return "", nil
  1078. }
  1079. pluginName, deleted, err := ctrl.doDeleteVolume(volume)
  1080. if err != nil {
  1081. // Delete failed, update the volume and emit an event.
  1082. klog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)
  1083. if volerr.IsDeletedVolumeInUse(err) {
  1084. // The plugin needs more time, don't mark the volume as Failed
  1085. // and send Normal event only
  1086. ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())
  1087. } else {
  1088. // The plugin failed, mark the volume as Failed and send Warning
  1089. // event
  1090. if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil {
  1091. klog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
  1092. // Save failed, retry on the next deletion attempt
  1093. return pluginName, err
  1094. }
  1095. }
  1096. // Despite the volume being Failed, the controller will retry deleting
  1097. // the volume in every syncVolume() call.
  1098. return pluginName, err
  1099. }
  1100. if !deleted {
  1101. // The volume waits for deletion by an external plugin. Do nothing.
  1102. return pluginName, nil
  1103. }
  1104. klog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name)
  1105. // Delete the volume
  1106. if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
  1107. // Oops, could not delete the volume and therefore the controller will
  1108. // try to delete the volume again on next update. We _could_ maintain a
  1109. // cache of "recently deleted volumes" and avoid unnecessary deletion,
  1110. // this is left out as future optimization.
  1111. klog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err)
  1112. return pluginName, nil
  1113. }
  1114. return pluginName, nil
  1115. }
  1116. // isVolumeReleased returns true if given volume is released and can be recycled
  1117. // or deleted, based on its retain policy. I.e. the volume is bound to a claim
  1118. // and the claim does not exist or exists and is bound to different volume.
  1119. func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVolume) (bool, error) {
  1120. // A volume needs reclaim if it has ClaimRef and appropriate claim does not
  1121. // exist.
  1122. if volume.Spec.ClaimRef == nil {
  1123. klog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is nil", volume.Name)
  1124. return false, nil
  1125. }
  1126. if volume.Spec.ClaimRef.UID == "" {
  1127. // This is a volume bound by user and the controller has not finished
  1128. // binding to the real claim yet.
  1129. klog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is not bound", volume.Name)
  1130. return false, nil
  1131. }
  1132. var claim *v1.PersistentVolumeClaim
  1133. claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
  1134. obj, found, err := ctrl.claims.GetByKey(claimName)
  1135. if err != nil {
  1136. return false, err
  1137. }
  1138. if !found {
  1139. // Fall through with claim = nil
  1140. } else {
  1141. var ok bool
  1142. claim, ok = obj.(*v1.PersistentVolumeClaim)
  1143. if !ok {
  1144. return false, fmt.Errorf("Cannot convert object from claim cache to claim!?: %#v", obj)
  1145. }
  1146. }
  1147. if claim != nil && claim.UID == volume.Spec.ClaimRef.UID {
  1148. // the claim still exists and has the right UID
  1149. if len(claim.Spec.VolumeName) > 0 && claim.Spec.VolumeName != volume.Name {
  1150. // the claim is bound to another PV, this PV *is* released
  1151. return true, nil
  1152. }
  1153. klog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is still valid, volume is not released", volume.Name)
  1154. return false, nil
  1155. }
  1156. klog.V(2).Infof("isVolumeReleased[%s]: volume is released", volume.Name)
  1157. return true, nil
  1158. }
  1159. // isVolumeUsed returns list of pods that use given PV.
  1160. func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
  1161. if pv.Spec.ClaimRef == nil {
  1162. return nil, false, nil
  1163. }
  1164. claimName := pv.Spec.ClaimRef.Name
  1165. podNames := sets.NewString()
  1166. pods, err := ctrl.podLister.Pods(pv.Spec.ClaimRef.Namespace).List(labels.Everything())
  1167. if err != nil {
  1168. return nil, false, fmt.Errorf("error listing pods: %s", err)
  1169. }
  1170. for _, pod := range pods {
  1171. if util.IsPodTerminated(pod, pod.Status) {
  1172. continue
  1173. }
  1174. for i := range pod.Spec.Volumes {
  1175. usedPV := &pod.Spec.Volumes[i]
  1176. if usedPV.PersistentVolumeClaim != nil && usedPV.PersistentVolumeClaim.ClaimName == claimName {
  1177. podNames.Insert(pod.Namespace + "/" + pod.Name)
  1178. }
  1179. }
  1180. }
  1181. return podNames.List(), podNames.Len() != 0, nil
  1182. }
  1183. // doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
  1184. // the volume plugin name. Also, it returns 'true', when the volume was deleted and
  1185. // 'false' when the volume cannot be deleted because the deleter is external. No
  1186. // error should be reported in this case.
  1187. func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolume) (string, bool, error) {
  1188. klog.V(4).Infof("doDeleteVolume [%s]", volume.Name)
  1189. var err error
  1190. plugin, err := ctrl.findDeletablePlugin(volume)
  1191. if err != nil {
  1192. return "", false, err
  1193. }
  1194. if plugin == nil {
  1195. // External deleter is requested, do nothing
  1196. klog.V(3).Infof("external deleter for volume %q requested, ignoring", volume.Name)
  1197. return "", false, nil
  1198. }
  1199. // Plugin found
  1200. pluginName := plugin.GetPluginName()
  1201. klog.V(5).Infof("found a deleter plugin %q for volume %q", pluginName, volume.Name)
  1202. spec := vol.NewSpecFromPersistentVolume(volume, false)
  1203. deleter, err := plugin.NewDeleter(spec)
  1204. if err != nil {
  1205. // Cannot create deleter
  1206. return pluginName, false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)
  1207. }
  1208. opComplete := util.OperationCompleteHook(pluginName, "volume_delete")
  1209. err = deleter.Delete()
  1210. opComplete(&err)
  1211. if err != nil {
  1212. // Deleter failed
  1213. return pluginName, false, err
  1214. }
  1215. klog.V(2).Infof("volume %q deleted", volume.Name)
  1216. return pluginName, true, nil
  1217. }
  1218. // provisionClaim starts new asynchronous operation to provision a claim if
  1219. // provisioning is enabled.
  1220. func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {
  1221. if !ctrl.enableDynamicProvisioning {
  1222. return nil
  1223. }
  1224. klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
  1225. opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
  1226. plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
  1227. if err != nil {
  1228. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())
  1229. klog.Errorf("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)
  1230. // failed to find the requested provisioning plugin, directly return err for now.
  1231. // controller will retry the provisioning in every syncUnboundClaim() call
  1232. // retain the original behavior of returning nil from provisionClaim call
  1233. return nil
  1234. }
  1235. ctrl.scheduleOperation(opName, func() error {
  1236. // create a start timestamp entry in cache for provision operation if no one exists with
  1237. // key = claimKey, pluginName = provisionerName, operation = "provision"
  1238. claimKey := claimToClaimKey(claim)
  1239. ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision")
  1240. var err error
  1241. if plugin == nil || plugin.IsMigratedToCSI() {
  1242. _, err = ctrl.provisionClaimOperationExternal(claim, plugin, storageClass)
  1243. } else {
  1244. _, err = ctrl.provisionClaimOperation(claim, plugin, storageClass)
  1245. }
  1246. // if error happened, record an error count metric
  1247. // timestamp entry will remain in cache until a success binding has happened
  1248. if err != nil {
  1249. metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
  1250. }
  1251. return err
  1252. })
  1253. return nil
  1254. }
  1255. func (ctrl *PersistentVolumeController) getCSINameFromIntreeName(pluginName string) (string, error) {
  1256. if ctrl.csiNameFromIntreeNameHook != nil {
  1257. return ctrl.csiNameFromIntreeNameHook(pluginName)
  1258. }
  1259. return csitranslation.GetCSINameFromInTreeName(pluginName)
  1260. }
  1261. // provisionClaimOperation provisions a volume. This method is running in
  1262. // standalone goroutine and already has all necessary locks.
  1263. func (ctrl *PersistentVolumeController) provisionClaimOperation(
  1264. claim *v1.PersistentVolumeClaim,
  1265. plugin vol.ProvisionableVolumePlugin,
  1266. storageClass *storage.StorageClass) (string, error) {
  1267. claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
  1268. klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
  1269. // called from provisionClaim(), in this case, plugin MUST NOT be nil and
  1270. // plugin.IsMigratedToCSI() MUST return FALSE
  1271. // NOTE: checks on plugin/storageClass has been saved
  1272. pluginName := plugin.GetPluginName()
  1273. provisionerName := storageClass.Provisioner
  1274. // Add provisioner annotation to be consistent with external provisioner workflow
  1275. newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
  1276. if err != nil {
  1277. // Save failed, the controller will retry in the next sync
  1278. klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
  1279. return pluginName, err
  1280. }
  1281. claim = newClaim
  1282. // internal provisioning
  1283. // A previous provisionClaimOperation may just have finished while we were waiting for
  1284. // the locks. Check that PV (with deterministic name) hasn't been provisioned
  1285. // yet.
  1286. pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
  1287. volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
  1288. if err == nil && volume != nil {
  1289. // Volume has been already provisioned, nothing to do.
  1290. klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
  1291. return pluginName, err
  1292. }
  1293. // Prepare a claimRef to the claim early (to fail before a volume is
  1294. // provisioned)
  1295. claimRef, err := ref.GetReference(scheme.Scheme, claim)
  1296. if err != nil {
  1297. klog.V(3).Infof("unexpected error getting claim reference: %v", err)
  1298. return pluginName, err
  1299. }
  1300. // Gather provisioning options
  1301. tags := make(map[string]string)
  1302. tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
  1303. tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
  1304. tags[CloudVolumeCreatedForVolumeNameTag] = pvName
  1305. options := vol.VolumeOptions{
  1306. PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
  1307. MountOptions: storageClass.MountOptions,
  1308. CloudTags: &tags,
  1309. ClusterName: ctrl.clusterName,
  1310. PVName: pvName,
  1311. PVC: claim,
  1312. Parameters: storageClass.Parameters,
  1313. }
  1314. // Refuse to provision if the plugin doesn't support mount options, creation
  1315. // of PV would be rejected by validation anyway
  1316. if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {
  1317. strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
  1318. klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
  1319. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1320. return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
  1321. }
  1322. // Provision the volume
  1323. provisioner, err := plugin.NewProvisioner(options)
  1324. if err != nil {
  1325. strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
  1326. klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
  1327. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1328. return pluginName, err
  1329. }
  1330. var selectedNode *v1.Node = nil
  1331. if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
  1332. selectedNode, err = ctrl.NodeLister.Get(nodeName)
  1333. if err != nil {
  1334. strerr := fmt.Sprintf("Failed to get target node: %v", err)
  1335. klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
  1336. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1337. return pluginName, err
  1338. }
  1339. }
  1340. allowedTopologies := storageClass.AllowedTopologies
  1341. opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
  1342. volume, err = provisioner.Provision(selectedNode, allowedTopologies)
  1343. opComplete(&err)
  1344. if err != nil {
  1345. // Other places of failure have nothing to do with VolumeScheduling,
  1346. // so just let controller retry in the next sync. We'll only call func
  1347. // rescheduleProvisioning here when the underlying provisioning actually failed.
  1348. ctrl.rescheduleProvisioning(claim)
  1349. strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
  1350. klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
  1351. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1352. return pluginName, err
  1353. }
  1354. klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))
  1355. // Create Kubernetes PV object for the volume.
  1356. if volume.Name == "" {
  1357. volume.Name = pvName
  1358. }
  1359. // Bind it to the claim
  1360. volume.Spec.ClaimRef = claimRef
  1361. volume.Status.Phase = v1.VolumeBound
  1362. volume.Spec.StorageClassName = claimClass
  1363. // Add AnnBoundByController (used in deleting the volume)
  1364. metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")
  1365. metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())
  1366. // Try to create the PV object several times
  1367. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
  1368. klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
  1369. var newVol *v1.PersistentVolume
  1370. if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
  1371. // Save succeeded.
  1372. if err != nil {
  1373. klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
  1374. err = nil
  1375. } else {
  1376. klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
  1377. _, updateErr := ctrl.storeVolumeUpdate(newVol)
  1378. if updateErr != nil {
  1379. // We will get an "volume added" event soon, this is not a big error
  1380. klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
  1381. }
  1382. }
  1383. break
  1384. }
  1385. // Save failed, try again after a while.
  1386. klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
  1387. time.Sleep(ctrl.createProvisionedPVInterval)
  1388. }
  1389. if err != nil {
  1390. // Save failed. Now we have a storage asset outside of Kubernetes,
  1391. // but we don't have appropriate PV object for it.
  1392. // Emit some event here and try to delete the storage asset several
  1393. // times.
  1394. strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
  1395. klog.V(3).Info(strerr)
  1396. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1397. var deleteErr error
  1398. var deleted bool
  1399. for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
  1400. _, deleted, deleteErr = ctrl.doDeleteVolume(volume)
  1401. if deleteErr == nil && deleted {
  1402. // Delete succeeded
  1403. klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
  1404. break
  1405. }
  1406. if !deleted {
  1407. // This is unreachable code, the volume was provisioned by an
  1408. // internal plugin and therefore there MUST be an internal
  1409. // plugin that deletes it.
  1410. klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
  1411. break
  1412. }
  1413. // Delete failed, try again after a while.
  1414. klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
  1415. time.Sleep(ctrl.createProvisionedPVInterval)
  1416. }
  1417. if deleteErr != nil {
  1418. // Delete failed several times. There is an orphaned volume and there
  1419. // is nothing we can do about it.
  1420. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
  1421. klog.V(2).Info(strerr)
  1422. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
  1423. }
  1424. } else {
  1425. klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
  1426. msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
  1427. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
  1428. }
  1429. return pluginName, nil
  1430. }
  1431. // provisionClaimOperationExternal provisions a volume using external provisioner async-ly
  1432. // This method will be running in a standalone go-routine scheduled in "provisionClaim"
  1433. func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
  1434. claim *v1.PersistentVolumeClaim,
  1435. plugin vol.ProvisionableVolumePlugin,
  1436. storageClass *storage.StorageClass) (string, error) {
  1437. claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
  1438. klog.V(4).Infof("provisionClaimOperationExternal [%s] started, class: %q", claimToClaimKey(claim), claimClass)
  1439. // Set provisionerName to external provisioner name by setClaimProvisioner
  1440. var err error
  1441. provisionerName := storageClass.Provisioner
  1442. if plugin != nil {
  1443. // update the provisioner name to use the CSI in-tree name
  1444. provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
  1445. if err != nil {
  1446. strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
  1447. klog.V(2).Infof("%s", strerr)
  1448. ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
  1449. return provisionerName, err
  1450. }
  1451. }
  1452. // Add provisioner annotation so external provisioners know when to start
  1453. newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName)
  1454. if err != nil {
  1455. // Save failed, the controller will retry in the next sync
  1456. klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
  1457. return provisionerName, err
  1458. }
  1459. claim = newClaim
  1460. msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", provisionerName)
  1461. // External provisioner has been requested for provisioning the volume
  1462. // Report an event and wait for external provisioner to finish
  1463. ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)
  1464. klog.V(3).Infof("provisionClaimOperationExternal provisioning claim %q: %s", claimToClaimKey(claim), msg)
  1465. // return provisioner name here for metric reporting
  1466. return provisionerName, nil
  1467. }
  1468. // rescheduleProvisioning signal back to the scheduler to retry dynamic provisioning
  1469. // by removing the AnnSelectedNode annotation
  1470. func (ctrl *PersistentVolumeController) rescheduleProvisioning(claim *v1.PersistentVolumeClaim) {
  1471. if _, ok := claim.Annotations[pvutil.AnnSelectedNode]; !ok {
  1472. // Provisioning not triggered by the scheduler, skip
  1473. return
  1474. }
  1475. // The claim from method args can be pointing to watcher cache. We must not
  1476. // modify these, therefore create a copy.
  1477. newClaim := claim.DeepCopy()
  1478. delete(newClaim.Annotations, pvutil.AnnSelectedNode)
  1479. // Try to update the PVC object
  1480. if _, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(newClaim.Namespace).Update(newClaim); err != nil {
  1481. klog.V(4).Infof("Failed to delete annotation 'pvutil.AnnSelectedNode' for PersistentVolumeClaim %q: %v", claimToClaimKey(newClaim), err)
  1482. return
  1483. }
  1484. if _, err := ctrl.storeClaimUpdate(newClaim); err != nil {
  1485. // We will get an "claim updated" event soon, this is not a big error
  1486. klog.V(4).Infof("Updating PersistentVolumeClaim %q: cannot update internal cache: %v", claimToClaimKey(newClaim), err)
  1487. }
  1488. }
  1489. // getProvisionedVolumeNameForClaim returns PV.Name for the provisioned volume.
  1490. // The name must be unique.
  1491. func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *v1.PersistentVolumeClaim) string {
  1492. return "pvc-" + string(claim.UID)
  1493. }
  1494. // scheduleOperation starts given asynchronous operation on given volume. It
  1495. // makes sure the operation is already not running.
  1496. func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func() error) {
  1497. klog.V(4).Infof("scheduleOperation[%s]", operationName)
  1498. // Poke test code that an operation is just about to get started.
  1499. if ctrl.preOperationHook != nil {
  1500. ctrl.preOperationHook(operationName)
  1501. }
  1502. err := ctrl.runningOperations.Run(operationName, operation)
  1503. if err != nil {
  1504. switch {
  1505. case goroutinemap.IsAlreadyExists(err):
  1506. klog.V(4).Infof("operation %q is already running, skipping", operationName)
  1507. case exponentialbackoff.IsExponentialBackoff(err):
  1508. klog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
  1509. default:
  1510. klog.Errorf("error scheduling operation %q: %v", operationName, err)
  1511. }
  1512. }
  1513. }
  1514. // newRecyclerEventRecorder returns a RecycleEventRecorder that sends all events
  1515. // to given volume.
  1516. func (ctrl *PersistentVolumeController) newRecyclerEventRecorder(volume *v1.PersistentVolume) recyclerclient.RecycleEventRecorder {
  1517. return func(eventtype, message string) {
  1518. ctrl.eventRecorder.Eventf(volume, eventtype, events.RecyclerPod, "Recycler pod: %s", message)
  1519. }
  1520. }
  1521. // findProvisionablePlugin finds a provisioner plugin for a given claim.
  1522. // It returns either the provisioning plugin or nil when an external
  1523. // provisioner is requested.
  1524. func (ctrl *PersistentVolumeController) findProvisionablePlugin(claim *v1.PersistentVolumeClaim) (vol.ProvisionableVolumePlugin, *storage.StorageClass, error) {
  1525. // provisionClaim() which leads here is never called with claimClass=="", we
  1526. // can save some checks.
  1527. claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
  1528. class, err := ctrl.classLister.Get(claimClass)
  1529. if err != nil {
  1530. return nil, nil, err
  1531. }
  1532. // Find a plugin for the class
  1533. plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner)
  1534. if err != nil {
  1535. if !strings.HasPrefix(class.Provisioner, "kubernetes.io/") {
  1536. // External provisioner is requested, do not report error
  1537. return nil, class, nil
  1538. }
  1539. return nil, class, err
  1540. }
  1541. return plugin, class, nil
  1542. }
  1543. // findDeletablePlugin finds a deleter plugin for a given volume. It returns
  1544. // either the deleter plugin or nil when an external deleter is requested.
  1545. func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.PersistentVolume) (vol.DeletableVolumePlugin, error) {
  1546. // Find a plugin. Try to find the same plugin that provisioned the volume
  1547. var plugin vol.DeletableVolumePlugin
  1548. if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) {
  1549. provisionPluginName := volume.Annotations[pvutil.AnnDynamicallyProvisioned]
  1550. if provisionPluginName != "" {
  1551. plugin, err := ctrl.volumePluginMgr.FindDeletablePluginByName(provisionPluginName)
  1552. if err != nil {
  1553. if !strings.HasPrefix(provisionPluginName, "kubernetes.io/") {
  1554. // External provisioner is requested, do not report error
  1555. return nil, nil
  1556. }
  1557. return nil, err
  1558. }
  1559. return plugin, nil
  1560. }
  1561. }
  1562. // The plugin that provisioned the volume was not found or the volume
  1563. // was not dynamically provisioned. Try to find a plugin by spec.
  1564. spec := vol.NewSpecFromPersistentVolume(volume, false)
  1565. plugin, err := ctrl.volumePluginMgr.FindDeletablePluginBySpec(spec)
  1566. if err != nil {
  1567. // No deleter found. Emit an event and mark the volume Failed.
  1568. return nil, fmt.Errorf("Error getting deleter volume plugin for volume %q: %v", volume.Name, err)
  1569. }
  1570. return plugin, nil
  1571. }
  1572. // obtain provisioner/deleter name for a volume
  1573. func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.PersistentVolume) string {
  1574. plugin, err := ctrl.findDeletablePlugin(volume)
  1575. if err != nil {
  1576. return "N/A"
  1577. }
  1578. if plugin != nil && !plugin.IsMigratedToCSI() {
  1579. return plugin.GetPluginName()
  1580. }
  1581. // If reached here, Either an external provisioner was used for provisioning
  1582. // or a plugin has been migrated to CSI.
  1583. // If an external provisioner was used, i.e., plugin == nil, instead of using
  1584. // the AnnDynamicallyProvisioned annotation value, use the storageClass's Provisioner
  1585. // field to avoid explosion of the metric in the cases like local storage provisioner
  1586. // tagging a volume with arbitrary provisioner names
  1587. storageClass := v1helper.GetPersistentVolumeClass(volume)
  1588. class, err := ctrl.classLister.Get(storageClass)
  1589. if err != nil {
  1590. return "N/A"
  1591. }
  1592. if plugin != nil {
  1593. provisionerName, err := ctrl.getCSINameFromIntreeName(class.Provisioner)
  1594. if err == nil {
  1595. return provisionerName
  1596. }
  1597. }
  1598. return class.Provisioner
  1599. }
  1600. // obtain plugin/external provisioner name from plugin and storage class
  1601. func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.ProvisionableVolumePlugin, storageClass *storage.StorageClass) string {
  1602. // intree plugin, returns the plugin's name
  1603. if plugin != nil && !plugin.IsMigratedToCSI() {
  1604. return plugin.GetPluginName()
  1605. } else if plugin != nil {
  1606. // get the CSI in-tree name from storage class provisioner name
  1607. provisionerName, err := ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
  1608. if err != nil {
  1609. return "N/A"
  1610. }
  1611. return provisionerName
  1612. }
  1613. return storageClass.Provisioner
  1614. }