123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package expand
- import (
- "fmt"
- "net"
- "time"
- "k8s.io/klog"
- authenticationv1 "k8s.io/api/authentication/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- storageclassinformer "k8s.io/client-go/informers/storage/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- storagelisters "k8s.io/client-go/listers/storage/v1"
- "k8s.io/client-go/tools/cache"
- kcache "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/workqueue"
- cloudprovider "k8s.io/cloud-provider"
- csitranslation "k8s.io/csi-translation-lib"
- v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/volume/events"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- "k8s.io/kubernetes/pkg/volume/util/subpath"
- "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
- )
- const (
- // number of default volume expansion workers
- defaultWorkerCount = 10
- )
- // ExpandController expands the pvs
- type ExpandController interface {
- Run(stopCh <-chan struct{})
- }
- type expandController struct {
- // kubeClient is the kube API client used by volumehost to communicate with
- // the API server.
- kubeClient clientset.Interface
- // pvcLister is the shared PVC lister used to fetch and store PVC
- // objects from the API server. It is shared with other controllers and
- // therefore the PVC objects in its store should be treated as immutable.
- pvcLister corelisters.PersistentVolumeClaimLister
- pvcsSynced kcache.InformerSynced
- pvLister corelisters.PersistentVolumeLister
- pvSynced kcache.InformerSynced
- // storageClass lister for fetching provisioner name
- classLister storagelisters.StorageClassLister
- classListerSynced cache.InformerSynced
- // cloud provider used by volume host
- cloud cloudprovider.Interface
- // volumePluginMgr used to initialize and fetch volume plugins
- volumePluginMgr volume.VolumePluginMgr
- // recorder is used to record events in the API server
- recorder record.EventRecorder
- operationGenerator operationexecutor.OperationGenerator
- queue workqueue.RateLimitingInterface
- }
- func NewExpandController(
- kubeClient clientset.Interface,
- pvcInformer coreinformers.PersistentVolumeClaimInformer,
- pvInformer coreinformers.PersistentVolumeInformer,
- scInformer storageclassinformer.StorageClassInformer,
- cloud cloudprovider.Interface,
- plugins []volume.VolumePlugin) (ExpandController, error) {
- expc := &expandController{
- kubeClient: kubeClient,
- cloud: cloud,
- pvcLister: pvcInformer.Lister(),
- pvcsSynced: pvcInformer.Informer().HasSynced,
- pvLister: pvInformer.Lister(),
- pvSynced: pvInformer.Informer().HasSynced,
- classLister: scInformer.Lister(),
- classListerSynced: scInformer.Informer().HasSynced,
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
- }
- if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
- return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
- }
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
- blkutil := volumepathhandler.NewBlockVolumePathHandler()
- expc.operationGenerator = operationexecutor.NewOperationGenerator(
- kubeClient,
- &expc.volumePluginMgr,
- expc.recorder,
- false,
- blkutil)
- pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
- AddFunc: expc.enqueuePVC,
- UpdateFunc: func(old, new interface{}) {
- oldPVC, ok := old.(*v1.PersistentVolumeClaim)
- if !ok {
- return
- }
- oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
- newPVC, ok := new.(*v1.PersistentVolumeClaim)
- if !ok {
- return
- }
- newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
- if newSize.Cmp(oldSize) > 0 {
- expc.enqueuePVC(new)
- }
- },
- DeleteFunc: expc.enqueuePVC,
- })
- return expc, nil
- }
- func (expc *expandController) enqueuePVC(obj interface{}) {
- pvc, ok := obj.(*v1.PersistentVolumeClaim)
- if !ok {
- return
- }
- size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
- statusSize := pvc.Status.Capacity[v1.ResourceStorage]
- if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
- key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
- if err != nil {
- runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
- return
- }
- expc.queue.Add(key)
- }
- }
- func (expc *expandController) processNextWorkItem() bool {
- key, shutdown := expc.queue.Get()
- if shutdown {
- return false
- }
- defer expc.queue.Done(key)
- err := expc.syncHandler(key.(string))
- if err == nil {
- expc.queue.Forget(key)
- return true
- }
- runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
- expc.queue.AddRateLimited(key)
- return true
- }
- // syncHandler performs actual expansion of volume. If an error is returned
- // from this function - PVC will be requeued for resizing.
- func (expc *expandController) syncHandler(key string) error {
- namespace, name, err := kcache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
- if errors.IsNotFound(err) {
- return nil
- }
- if err != nil {
- klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
- return err
- }
- pv, err := getPersistentVolume(pvc, expc.pvLister)
- if err != nil {
- klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
- return err
- }
- if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
- err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
- klog.V(4).Infof("%v", err)
- return err
- }
- claimClass := v1helper.GetPersistentVolumeClaimClass(pvc)
- if claimClass == "" {
- klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc))
- return nil
- }
- class, err := expc.classLister.Get(claimClass)
- if err != nil {
- klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
- return nil
- }
- volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
- volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
- volumeResizerName := class.Provisioner
- if err != nil || volumePlugin == nil {
- msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
- "waiting for an external controller to process this PVC")
- eventType := v1.EventTypeNormal
- if err != nil {
- eventType = v1.EventTypeWarning
- }
- expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
- klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
- // If we are expecting that an external plugin will handle resizing this volume then
- // is no point in requeuing this PVC.
- return nil
- }
- if volumePlugin.IsMigratedToCSI() {
- msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
- expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
- csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner)
- if err != nil {
- errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
- expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
- return fmt.Errorf(errorMsg)
- }
- pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
- if err != nil {
- errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
- expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
- return fmt.Errorf(errorMsg)
- }
- return nil
- }
- return expc.expand(pvc, pv, volumeResizerName)
- }
- func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
- pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
- if err != nil {
- klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
- return err
- }
- generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
- if err != nil {
- klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
- return err
- }
- klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
- _, detailedErr := generatedOperations.Run()
- return detailedErr
- }
- // TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines
- func (expc *expandController) Run(stopCh <-chan struct{}) {
- defer runtime.HandleCrash()
- defer expc.queue.ShutDown()
- klog.Infof("Starting expand controller")
- defer klog.Infof("Shutting down expand controller")
- if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
- return
- }
- for i := 0; i < defaultWorkerCount; i++ {
- go wait.Until(expc.runWorker, time.Second, stopCh)
- }
- <-stopCh
- }
- func (expc *expandController) runWorker() {
- for expc.processNextWorkItem() {
- }
- }
- func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) {
- volumeName := pvc.Spec.VolumeName
- pv, err := pvLister.Get(volumeName)
- if err != nil {
- return nil, fmt.Errorf("failed to find PV %q in PV informer cache with error : %v", volumeName, err)
- }
- return pv.DeepCopy(), nil
- }
- // Implementing VolumeHost interface
- func (expc *expandController) GetPluginDir(pluginName string) string {
- return ""
- }
- func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
- return ""
- }
- func (expc *expandController) GetPodsDir() string {
- return ""
- }
- func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
- return ""
- }
- func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
- return ""
- }
- func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
- return ""
- }
- func (expc *expandController) GetKubeClient() clientset.Interface {
- return expc.kubeClient
- }
- func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
- return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
- }
- func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
- return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
- }
- func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
- return expc.cloud
- }
- func (expc *expandController) GetMounter(pluginName string) mount.Interface {
- return nil
- }
- func (expc *expandController) GetExec(pluginName string) mount.Exec {
- return mount.NewOsExec()
- }
- func (expc *expandController) GetHostName() string {
- return ""
- }
- func (expc *expandController) GetHostIP() (net.IP, error) {
- return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
- }
- func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
- return v1.ResourceList{}, nil
- }
- func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
- return func(_, _ string) (*v1.Secret, error) {
- return nil, fmt.Errorf("GetSecret unsupported in expandController")
- }
- }
- func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
- return func(_, _ string) (*v1.ConfigMap, error) {
- return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
- }
- }
- func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
- return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
- return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
- }
- }
- func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
- return func(types.UID) {
- klog.Errorf("DeleteServiceAccountToken unsupported in expandController")
- }
- }
- func (expc *expandController) GetNodeLabels() (map[string]string, error) {
- return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
- }
- func (expc *expandController) GetNodeName() types.NodeName {
- return ""
- }
- func (expc *expandController) GetEventRecorder() record.EventRecorder {
- return expc.recorder
- }
- func (expc *expandController) GetSubpather() subpath.Interface {
- // not needed for expand controller
- return nil
- }
|