expand_controller.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package expand
  14. import (
  15. "fmt"
  16. "net"
  17. "time"
  18. "k8s.io/klog"
  19. utilexec "k8s.io/utils/exec"
  20. "k8s.io/utils/mount"
  21. authenticationv1 "k8s.io/api/authentication/v1"
  22. v1 "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. coreinformers "k8s.io/client-go/informers/core/v1"
  28. storageclassinformer "k8s.io/client-go/informers/storage/v1"
  29. clientset "k8s.io/client-go/kubernetes"
  30. "k8s.io/client-go/kubernetes/scheme"
  31. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  32. corelisters "k8s.io/client-go/listers/core/v1"
  33. storagelisters "k8s.io/client-go/listers/storage/v1"
  34. "k8s.io/client-go/tools/cache"
  35. kcache "k8s.io/client-go/tools/cache"
  36. "k8s.io/client-go/tools/record"
  37. "k8s.io/client-go/util/workqueue"
  38. cloudprovider "k8s.io/cloud-provider"
  39. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  40. "k8s.io/kubernetes/pkg/controller/volume/events"
  41. "k8s.io/kubernetes/pkg/volume"
  42. "k8s.io/kubernetes/pkg/volume/csimigration"
  43. "k8s.io/kubernetes/pkg/volume/util"
  44. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  45. "k8s.io/kubernetes/pkg/volume/util/subpath"
  46. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  47. )
  48. const (
  49. // number of default volume expansion workers
  50. defaultWorkerCount = 10
  51. )
  52. // ExpandController expands the pvs
  53. type ExpandController interface {
  54. Run(stopCh <-chan struct{})
  55. }
  56. // CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
  57. type CSINameTranslator interface {
  58. GetCSINameFromInTreeName(pluginName string) (string, error)
  59. }
  60. type expandController struct {
  61. // kubeClient is the kube API client used by volumehost to communicate with
  62. // the API server.
  63. kubeClient clientset.Interface
  64. // pvcLister is the shared PVC lister used to fetch and store PVC
  65. // objects from the API server. It is shared with other controllers and
  66. // therefore the PVC objects in its store should be treated as immutable.
  67. pvcLister corelisters.PersistentVolumeClaimLister
  68. pvcsSynced kcache.InformerSynced
  69. pvLister corelisters.PersistentVolumeLister
  70. pvSynced kcache.InformerSynced
  71. // storageClass lister for fetching provisioner name
  72. classLister storagelisters.StorageClassLister
  73. classListerSynced cache.InformerSynced
  74. // cloud provider used by volume host
  75. cloud cloudprovider.Interface
  76. // volumePluginMgr used to initialize and fetch volume plugins
  77. volumePluginMgr volume.VolumePluginMgr
  78. // recorder is used to record events in the API server
  79. recorder record.EventRecorder
  80. operationGenerator operationexecutor.OperationGenerator
  81. queue workqueue.RateLimitingInterface
  82. translator CSINameTranslator
  83. csiMigratedPluginManager csimigration.PluginManager
  84. }
  85. // NewExpandController expands the pvs
  86. func NewExpandController(
  87. kubeClient clientset.Interface,
  88. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  89. pvInformer coreinformers.PersistentVolumeInformer,
  90. scInformer storageclassinformer.StorageClassInformer,
  91. cloud cloudprovider.Interface,
  92. plugins []volume.VolumePlugin,
  93. translator CSINameTranslator,
  94. csiMigratedPluginManager csimigration.PluginManager) (ExpandController, error) {
  95. expc := &expandController{
  96. kubeClient: kubeClient,
  97. cloud: cloud,
  98. pvcLister: pvcInformer.Lister(),
  99. pvcsSynced: pvcInformer.Informer().HasSynced,
  100. pvLister: pvInformer.Lister(),
  101. pvSynced: pvInformer.Informer().HasSynced,
  102. classLister: scInformer.Lister(),
  103. classListerSynced: scInformer.Informer().HasSynced,
  104. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
  105. translator: translator,
  106. csiMigratedPluginManager: csiMigratedPluginManager,
  107. }
  108. if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
  109. return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
  110. }
  111. eventBroadcaster := record.NewBroadcaster()
  112. eventBroadcaster.StartLogging(klog.Infof)
  113. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  114. expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
  115. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  116. expc.operationGenerator = operationexecutor.NewOperationGenerator(
  117. kubeClient,
  118. &expc.volumePluginMgr,
  119. expc.recorder,
  120. false,
  121. blkutil)
  122. pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  123. AddFunc: expc.enqueuePVC,
  124. UpdateFunc: func(old, new interface{}) {
  125. oldPVC, ok := old.(*v1.PersistentVolumeClaim)
  126. if !ok {
  127. return
  128. }
  129. oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
  130. newPVC, ok := new.(*v1.PersistentVolumeClaim)
  131. if !ok {
  132. return
  133. }
  134. newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
  135. if newSize.Cmp(oldSize) > 0 {
  136. expc.enqueuePVC(new)
  137. }
  138. },
  139. DeleteFunc: expc.enqueuePVC,
  140. })
  141. return expc, nil
  142. }
  143. func (expc *expandController) enqueuePVC(obj interface{}) {
  144. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  145. if !ok {
  146. return
  147. }
  148. size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  149. statusSize := pvc.Status.Capacity[v1.ResourceStorage]
  150. if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
  151. key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
  152. if err != nil {
  153. runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
  154. return
  155. }
  156. expc.queue.Add(key)
  157. }
  158. }
  159. func (expc *expandController) processNextWorkItem() bool {
  160. key, shutdown := expc.queue.Get()
  161. if shutdown {
  162. return false
  163. }
  164. defer expc.queue.Done(key)
  165. err := expc.syncHandler(key.(string))
  166. if err == nil {
  167. expc.queue.Forget(key)
  168. return true
  169. }
  170. runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
  171. expc.queue.AddRateLimited(key)
  172. return true
  173. }
  174. // syncHandler performs actual expansion of volume. If an error is returned
  175. // from this function - PVC will be requeued for resizing.
  176. func (expc *expandController) syncHandler(key string) error {
  177. namespace, name, err := kcache.SplitMetaNamespaceKey(key)
  178. if err != nil {
  179. return err
  180. }
  181. pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
  182. if errors.IsNotFound(err) {
  183. return nil
  184. }
  185. if err != nil {
  186. klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
  187. return err
  188. }
  189. pv, err := getPersistentVolume(pvc, expc.pvLister)
  190. if err != nil {
  191. klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
  192. return err
  193. }
  194. if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
  195. err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
  196. klog.V(4).Infof("%v", err)
  197. return err
  198. }
  199. claimClass := v1helper.GetPersistentVolumeClaimClass(pvc)
  200. if claimClass == "" {
  201. klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc))
  202. return nil
  203. }
  204. class, err := expc.classLister.Get(claimClass)
  205. if err != nil {
  206. klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
  207. return nil
  208. }
  209. volumeResizerName := class.Provisioner
  210. volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
  211. migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
  212. if err != nil {
  213. klog.V(4).Infof("failed to check CSI migration status for PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
  214. return nil
  215. }
  216. // handle CSI migration scenarios before invoking FindExpandablePluginBySpec for in-tree
  217. if migratable {
  218. msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
  219. expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
  220. csiResizerName, err := expc.translator.GetCSINameFromInTreeName(class.Provisioner)
  221. if err != nil {
  222. errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
  223. expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
  224. return fmt.Errorf(errorMsg)
  225. }
  226. pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
  227. if err != nil {
  228. errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
  229. expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
  230. return fmt.Errorf(errorMsg)
  231. }
  232. return nil
  233. }
  234. volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
  235. if err != nil || volumePlugin == nil {
  236. msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
  237. "waiting for an external controller to process this PVC")
  238. eventType := v1.EventTypeNormal
  239. if err != nil {
  240. eventType = v1.EventTypeWarning
  241. }
  242. expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
  243. klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
  244. // If we are expecting that an external plugin will handle resizing this volume then
  245. // is no point in requeuing this PVC.
  246. return nil
  247. }
  248. return expc.expand(pvc, pv, volumeResizerName)
  249. }
  250. func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
  251. pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
  252. if err != nil {
  253. klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  254. return err
  255. }
  256. generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
  257. if err != nil {
  258. klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  259. return err
  260. }
  261. klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  262. _, detailedErr := generatedOperations.Run()
  263. return detailedErr
  264. }
  265. // TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines
  266. func (expc *expandController) Run(stopCh <-chan struct{}) {
  267. defer runtime.HandleCrash()
  268. defer expc.queue.ShutDown()
  269. klog.Infof("Starting expand controller")
  270. defer klog.Infof("Shutting down expand controller")
  271. if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
  272. return
  273. }
  274. for i := 0; i < defaultWorkerCount; i++ {
  275. go wait.Until(expc.runWorker, time.Second, stopCh)
  276. }
  277. <-stopCh
  278. }
  279. func (expc *expandController) runWorker() {
  280. for expc.processNextWorkItem() {
  281. }
  282. }
  283. func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) {
  284. volumeName := pvc.Spec.VolumeName
  285. pv, err := pvLister.Get(volumeName)
  286. if err != nil {
  287. return nil, fmt.Errorf("failed to find PV %q in PV informer cache with error : %v", volumeName, err)
  288. }
  289. return pv.DeepCopy(), nil
  290. }
  291. // Implementing VolumeHost interface
  292. func (expc *expandController) GetPluginDir(pluginName string) string {
  293. return ""
  294. }
  295. func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
  296. return ""
  297. }
  298. func (expc *expandController) GetPodsDir() string {
  299. return ""
  300. }
  301. func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
  302. return ""
  303. }
  304. func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  305. return ""
  306. }
  307. func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
  308. return ""
  309. }
  310. func (expc *expandController) GetKubeClient() clientset.Interface {
  311. return expc.kubeClient
  312. }
  313. func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
  314. return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
  315. }
  316. func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  317. return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
  318. }
  319. func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
  320. return expc.cloud
  321. }
  322. func (expc *expandController) GetMounter(pluginName string) mount.Interface {
  323. return nil
  324. }
  325. func (expc *expandController) GetExec(pluginName string) utilexec.Interface {
  326. return utilexec.New()
  327. }
  328. func (expc *expandController) GetHostName() string {
  329. return ""
  330. }
  331. func (expc *expandController) GetHostIP() (net.IP, error) {
  332. return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
  333. }
  334. func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
  335. return v1.ResourceList{}, nil
  336. }
  337. func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  338. return func(_, _ string) (*v1.Secret, error) {
  339. return nil, fmt.Errorf("GetSecret unsupported in expandController")
  340. }
  341. }
  342. func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  343. return func(_, _ string) (*v1.ConfigMap, error) {
  344. return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
  345. }
  346. }
  347. func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  348. return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  349. return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
  350. }
  351. }
  352. func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
  353. return func(types.UID) {
  354. klog.Errorf("DeleteServiceAccountToken unsupported in expandController")
  355. }
  356. }
  357. func (expc *expandController) GetNodeLabels() (map[string]string, error) {
  358. return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
  359. }
  360. func (expc *expandController) GetNodeName() types.NodeName {
  361. return ""
  362. }
  363. func (expc *expandController) GetEventRecorder() record.EventRecorder {
  364. return expc.recorder
  365. }
  366. func (expc *expandController) GetSubpather() subpath.Interface {
  367. // not needed for expand controller
  368. return nil
  369. }