123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package persistentvolume
- import (
- "fmt"
- "strconv"
- "time"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- storageinformers "k8s.io/client-go/informers/storage/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/workqueue"
- cloudprovider "k8s.io/cloud-provider"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
- pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
- "k8s.io/kubernetes/pkg/util/goroutinemap"
- vol "k8s.io/kubernetes/pkg/volume"
- "k8s.io/klog"
- )
- // This file contains the controller base functionality, i.e. framework to
- // process PV/PVC added/updated/deleted events. The real binding, provisioning,
- // recycling and deleting is done in pv_controller.go
- // ControllerParameters contains arguments for creation of a new
- // PersistentVolume controller.
- type ControllerParameters struct {
- KubeClient clientset.Interface
- SyncPeriod time.Duration
- VolumePlugins []vol.VolumePlugin
- Cloud cloudprovider.Interface
- ClusterName string
- VolumeInformer coreinformers.PersistentVolumeInformer
- ClaimInformer coreinformers.PersistentVolumeClaimInformer
- ClassInformer storageinformers.StorageClassInformer
- PodInformer coreinformers.PodInformer
- NodeInformer coreinformers.NodeInformer
- EventRecorder record.EventRecorder
- EnableDynamicProvisioning bool
- }
- // NewController creates a new PersistentVolume controller
- func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
- eventRecorder := p.EventRecorder
- if eventRecorder == nil {
- broadcaster := record.NewBroadcaster()
- broadcaster.StartLogging(klog.Infof)
- broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
- eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
- }
- controller := &PersistentVolumeController{
- volumes: newPersistentVolumeOrderedIndex(),
- claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
- kubeClient: p.KubeClient,
- eventRecorder: eventRecorder,
- runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
- cloud: p.Cloud,
- enableDynamicProvisioning: p.EnableDynamicProvisioning,
- clusterName: p.ClusterName,
- createProvisionedPVRetryCount: createProvisionedPVRetryCount,
- createProvisionedPVInterval: createProvisionedPVInterval,
- claimQueue: workqueue.NewNamed("claims"),
- volumeQueue: workqueue.NewNamed("volumes"),
- resyncPeriod: p.SyncPeriod,
- operationTimestamps: metrics.NewOperationStartTimeCache(),
- }
- // Prober is nil because PV is not aware of Flexvolume.
- if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
- return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
- }
- p.VolumeInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
- UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
- DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
- },
- )
- controller.volumeLister = p.VolumeInformer.Lister()
- controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
- p.ClaimInformer.Informer().AddEventHandler(
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
- UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
- DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
- },
- )
- controller.claimLister = p.ClaimInformer.Lister()
- controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
- controller.classLister = p.ClassInformer.Lister()
- controller.classListerSynced = p.ClassInformer.Informer().HasSynced
- controller.podLister = p.PodInformer.Lister()
- controller.podListerSynced = p.PodInformer.Informer().HasSynced
- controller.NodeLister = p.NodeInformer.Lister()
- controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
- return controller, nil
- }
- // initializeCaches fills all controller caches with initial data from etcd in
- // order to have the caches already filled when first addClaim/addVolume to
- // perform initial synchronization of the controller.
- func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
- volumeList, err := volumeLister.List(labels.Everything())
- if err != nil {
- klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
- return
- }
- for _, volume := range volumeList {
- volumeClone := volume.DeepCopy()
- if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
- klog.Errorf("error updating volume cache: %v", err)
- }
- }
- claimList, err := claimLister.List(labels.Everything())
- if err != nil {
- klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
- return
- }
- for _, claim := range claimList {
- if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {
- klog.Errorf("error updating claim cache: %v", err)
- }
- }
- klog.V(4).Infof("controller initialized")
- }
- // enqueueWork adds volume or claim to given work queue.
- func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, obj interface{}) {
- // Beware of "xxx deleted" events
- if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
- obj = unknown.Obj
- }
- objName, err := controller.KeyFunc(obj)
- if err != nil {
- klog.Errorf("failed to get key from object: %v", err)
- return
- }
- klog.V(5).Infof("enqueued %q for sync", objName)
- queue.Add(objName)
- }
- func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
- return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
- }
- func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (bool, error) {
- return storeObjectUpdate(ctrl.claims, claim, "claim")
- }
- // updateVolume runs in worker thread and handles "volume added",
- // "volume updated" and "periodic sync" events.
- func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {
- // Store the new volume version in the cache and do not process it if this
- // is an old version.
- new, err := ctrl.storeVolumeUpdate(volume)
- if err != nil {
- klog.Errorf("%v", err)
- }
- if !new {
- return
- }
- err = ctrl.syncVolume(volume)
- if err != nil {
- if errors.IsConflict(err) {
- // Version conflict error happens quite often and the controller
- // recovers from it easily.
- klog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)
- } else {
- klog.Errorf("could not sync volume %q: %+v", volume.Name, err)
- }
- }
- }
- // deleteVolume runs in worker thread and handles "volume deleted" event.
- func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) {
- _ = ctrl.volumes.store.Delete(volume)
- klog.V(4).Infof("volume %q deleted", volume.Name)
- // record deletion metric if a deletion start timestamp is in the cache
- // the following calls will be a no-op if there is nothing for this volume in the cache
- // end of timestamp cache entry lifecycle, "RecordMetric" will do the clean
- metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, nil)
- if volume.Spec.ClaimRef == nil {
- return
- }
- // sync the claim when its volume is deleted. Explicitly syncing the
- // claim here in response to volume deletion prevents the claim from
- // waiting until the next sync period for its Lost status.
- claimKey := claimrefToClaimKey(volume.Spec.ClaimRef)
- klog.V(5).Infof("deleteVolume[%s]: scheduling sync of claim %q", volume.Name, claimKey)
- ctrl.claimQueue.Add(claimKey)
- }
- // updateClaim runs in worker thread and handles "claim added",
- // "claim updated" and "periodic sync" events.
- func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
- // Store the new claim version in the cache and do not process it if this is
- // an old version.
- new, err := ctrl.storeClaimUpdate(claim)
- if err != nil {
- klog.Errorf("%v", err)
- }
- if !new {
- return
- }
- err = ctrl.syncClaim(claim)
- if err != nil {
- if errors.IsConflict(err) {
- // Version conflict error happens quite often and the controller
- // recovers from it easily.
- klog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
- } else {
- klog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
- }
- }
- }
- // Unit test [5-5] [5-6] [5-7]
- // deleteClaim runs in worker thread and handles "claim deleted" event.
- func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) {
- _ = ctrl.claims.Delete(claim)
- claimKey := claimToClaimKey(claim)
- klog.V(4).Infof("claim %q deleted", claimKey)
- // clean any possible unfinished provision start timestamp from cache
- // Unit test [5-8] [5-9]
- ctrl.operationTimestamps.Delete(claimKey)
- volumeName := claim.Spec.VolumeName
- if volumeName == "" {
- klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimKey)
- return
- }
- // sync the volume when its claim is deleted. Explicitly sync'ing the
- // volume here in response to claim deletion prevents the volume from
- // waiting until the next sync period for its Release.
- klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName)
- ctrl.volumeQueue.Add(volumeName)
- }
- // Run starts all of this controller's control loops
- func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer ctrl.claimQueue.ShutDown()
- defer ctrl.volumeQueue.ShutDown()
- klog.Infof("Starting persistent volume controller")
- defer klog.Infof("Shutting down persistent volume controller")
- if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
- return
- }
- ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
- go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
- go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
- go wait.Until(ctrl.claimWorker, time.Second, stopCh)
- metrics.Register(ctrl.volumes.store, ctrl.claims)
- <-stopCh
- }
- // volumeWorker processes items from volumeQueue. It must run only once,
- // syncVolume is not assured to be reentrant.
- func (ctrl *PersistentVolumeController) volumeWorker() {
- workFunc := func() bool {
- keyObj, quit := ctrl.volumeQueue.Get()
- if quit {
- return true
- }
- defer ctrl.volumeQueue.Done(keyObj)
- key := keyObj.(string)
- klog.V(5).Infof("volumeWorker[%s]", key)
- _, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
- return false
- }
- volume, err := ctrl.volumeLister.Get(name)
- if err == nil {
- // The volume still exists in informer cache, the event must have
- // been add/update/sync
- ctrl.updateVolume(volume)
- return false
- }
- if !errors.IsNotFound(err) {
- klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
- return false
- }
- // The volume is not in informer cache, the event must have been
- // "delete"
- volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
- if err != nil {
- klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
- return false
- }
- if !found {
- // The controller has already processed the delete event and
- // deleted the volume from its cache
- klog.V(2).Infof("deletion of volume %q was already processed", key)
- return false
- }
- volume, ok := volumeObj.(*v1.PersistentVolume)
- if !ok {
- klog.Errorf("expected volume, got %+v", volumeObj)
- return false
- }
- ctrl.deleteVolume(volume)
- return false
- }
- for {
- if quit := workFunc(); quit {
- klog.Infof("volume worker queue shutting down")
- return
- }
- }
- }
- // claimWorker processes items from claimQueue. It must run only once,
- // syncClaim is not reentrant.
- func (ctrl *PersistentVolumeController) claimWorker() {
- workFunc := func() bool {
- keyObj, quit := ctrl.claimQueue.Get()
- if quit {
- return true
- }
- defer ctrl.claimQueue.Done(keyObj)
- key := keyObj.(string)
- klog.V(5).Infof("claimWorker[%s]", key)
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
- return false
- }
- claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
- if err == nil {
- // The claim still exists in informer cache, the event must have
- // been add/update/sync
- ctrl.updateClaim(claim)
- return false
- }
- if !errors.IsNotFound(err) {
- klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
- return false
- }
- // The claim is not in informer cache, the event must have been "delete"
- claimObj, found, err := ctrl.claims.GetByKey(key)
- if err != nil {
- klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
- return false
- }
- if !found {
- // The controller has already processed the delete event and
- // deleted the claim from its cache
- klog.V(2).Infof("deletion of claim %q was already processed", key)
- return false
- }
- claim, ok := claimObj.(*v1.PersistentVolumeClaim)
- if !ok {
- klog.Errorf("expected claim, got %+v", claimObj)
- return false
- }
- ctrl.deleteClaim(claim)
- return false
- }
- for {
- if quit := workFunc(); quit {
- klog.Infof("claim worker queue shutting down")
- return
- }
- }
- }
- // resync supplements short resync period of shared informers - we don't want
- // all consumers of PV/PVC shared informer to have a short resync period,
- // therefore we do our own.
- func (ctrl *PersistentVolumeController) resync() {
- klog.V(4).Infof("resyncing PV controller")
- pvcs, err := ctrl.claimLister.List(labels.NewSelector())
- if err != nil {
- klog.Warningf("cannot list claims: %s", err)
- return
- }
- for _, pvc := range pvcs {
- ctrl.enqueueWork(ctrl.claimQueue, pvc)
- }
- pvs, err := ctrl.volumeLister.List(labels.NewSelector())
- if err != nil {
- klog.Warningf("cannot list persistent volumes: %s", err)
- return
- }
- for _, pv := range pvs {
- ctrl.enqueueWork(ctrl.volumeQueue, pv)
- }
- }
- // setClaimProvisioner saves
- // claim.Annotations[pvutil.AnnStorageProvisioner] = class.Provisioner
- func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) {
- if val, ok := claim.Annotations[pvutil.AnnStorageProvisioner]; ok && val == provisionerName {
- // annotation is already set, nothing to do
- return claim, nil
- }
- // The volume from method args can be pointing to watcher cache. We must not
- // modify these, therefore create a copy.
- claimClone := claim.DeepCopy()
- metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnStorageProvisioner, provisionerName)
- newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claimClone)
- if err != nil {
- return newClaim, err
- }
- _, err = ctrl.storeClaimUpdate(newClaim)
- if err != nil {
- return newClaim, err
- }
- return newClaim, nil
- }
- // Stateless functions
- func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string {
- bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted)
- boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)
- return fmt.Sprintf("phase: %s, bound to: %q, bindCompleted: %v, boundByController: %v", claim.Status.Phase, claim.Spec.VolumeName, bound, boundByController)
- }
- func getVolumeStatusForLogging(volume *v1.PersistentVolume) string {
- boundByController := metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController)
- claimName := ""
- if volume.Spec.ClaimRef != nil {
- claimName = fmt.Sprintf("%s/%s (uid: %s)", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, volume.Spec.ClaimRef.UID)
- }
- return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
- }
- // storeObjectUpdate updates given cache with a new object version from Informer
- // callback (i.e. with events from etcd) or with an object modified by the
- // controller itself. Returns "true", if the cache was updated, false if the
- // object is an old version and should be ignored.
- func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
- objName, err := controller.KeyFunc(obj)
- if err != nil {
- return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)
- }
- oldObj, found, err := store.Get(obj)
- if err != nil {
- return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
- }
- objAccessor, err := meta.Accessor(obj)
- if err != nil {
- return false, err
- }
- if !found {
- // This is a new object
- klog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
- if err = store.Add(obj); err != nil {
- return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err)
- }
- return true, nil
- }
- oldObjAccessor, err := meta.Accessor(oldObj)
- if err != nil {
- return false, err
- }
- objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
- if err != nil {
- return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
- }
- oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
- if err != nil {
- return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
- }
- // Throw away only older version, let the same version pass - we do want to
- // get periodic sync events.
- if oldObjResourceVersion > objResourceVersion {
- klog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
- return false, nil
- }
- klog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
- if err = store.Update(obj); err != nil {
- return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err)
- }
- return true, nil
- }
|