expand_controller.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package expand
  14. import (
  15. "fmt"
  16. "net"
  17. "time"
  18. "k8s.io/klog"
  19. authenticationv1 "k8s.io/api/authentication/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. coreinformers "k8s.io/client-go/informers/core/v1"
  26. storageclassinformer "k8s.io/client-go/informers/storage/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. storagelisters "k8s.io/client-go/listers/storage/v1"
  32. "k8s.io/client-go/tools/cache"
  33. kcache "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/tools/record"
  35. "k8s.io/client-go/util/workqueue"
  36. cloudprovider "k8s.io/cloud-provider"
  37. csitranslation "k8s.io/csi-translation-lib"
  38. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  39. "k8s.io/kubernetes/pkg/controller"
  40. "k8s.io/kubernetes/pkg/controller/volume/events"
  41. "k8s.io/kubernetes/pkg/util/mount"
  42. "k8s.io/kubernetes/pkg/volume"
  43. "k8s.io/kubernetes/pkg/volume/util"
  44. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  45. "k8s.io/kubernetes/pkg/volume/util/subpath"
  46. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  47. )
  48. const (
  49. // number of default volume expansion workers
  50. defaultWorkerCount = 10
  51. )
  52. // ExpandController expands the pvs
  53. type ExpandController interface {
  54. Run(stopCh <-chan struct{})
  55. }
  56. type expandController struct {
  57. // kubeClient is the kube API client used by volumehost to communicate with
  58. // the API server.
  59. kubeClient clientset.Interface
  60. // pvcLister is the shared PVC lister used to fetch and store PVC
  61. // objects from the API server. It is shared with other controllers and
  62. // therefore the PVC objects in its store should be treated as immutable.
  63. pvcLister corelisters.PersistentVolumeClaimLister
  64. pvcsSynced kcache.InformerSynced
  65. pvLister corelisters.PersistentVolumeLister
  66. pvSynced kcache.InformerSynced
  67. // storageClass lister for fetching provisioner name
  68. classLister storagelisters.StorageClassLister
  69. classListerSynced cache.InformerSynced
  70. // cloud provider used by volume host
  71. cloud cloudprovider.Interface
  72. // volumePluginMgr used to initialize and fetch volume plugins
  73. volumePluginMgr volume.VolumePluginMgr
  74. // recorder is used to record events in the API server
  75. recorder record.EventRecorder
  76. operationGenerator operationexecutor.OperationGenerator
  77. queue workqueue.RateLimitingInterface
  78. }
  79. func NewExpandController(
  80. kubeClient clientset.Interface,
  81. pvcInformer coreinformers.PersistentVolumeClaimInformer,
  82. pvInformer coreinformers.PersistentVolumeInformer,
  83. scInformer storageclassinformer.StorageClassInformer,
  84. cloud cloudprovider.Interface,
  85. plugins []volume.VolumePlugin) (ExpandController, error) {
  86. expc := &expandController{
  87. kubeClient: kubeClient,
  88. cloud: cloud,
  89. pvcLister: pvcInformer.Lister(),
  90. pvcsSynced: pvcInformer.Informer().HasSynced,
  91. pvLister: pvInformer.Lister(),
  92. pvSynced: pvInformer.Informer().HasSynced,
  93. classLister: scInformer.Lister(),
  94. classListerSynced: scInformer.Informer().HasSynced,
  95. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
  96. }
  97. if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
  98. return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
  99. }
  100. eventBroadcaster := record.NewBroadcaster()
  101. eventBroadcaster.StartLogging(klog.Infof)
  102. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  103. expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
  104. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  105. expc.operationGenerator = operationexecutor.NewOperationGenerator(
  106. kubeClient,
  107. &expc.volumePluginMgr,
  108. expc.recorder,
  109. false,
  110. blkutil)
  111. pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
  112. AddFunc: expc.enqueuePVC,
  113. UpdateFunc: func(old, new interface{}) {
  114. oldPVC, ok := old.(*v1.PersistentVolumeClaim)
  115. if !ok {
  116. return
  117. }
  118. oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
  119. newPVC, ok := new.(*v1.PersistentVolumeClaim)
  120. if !ok {
  121. return
  122. }
  123. newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
  124. if newSize.Cmp(oldSize) > 0 {
  125. expc.enqueuePVC(new)
  126. }
  127. },
  128. DeleteFunc: expc.enqueuePVC,
  129. })
  130. return expc, nil
  131. }
  132. func (expc *expandController) enqueuePVC(obj interface{}) {
  133. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  134. if !ok {
  135. return
  136. }
  137. size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  138. statusSize := pvc.Status.Capacity[v1.ResourceStorage]
  139. if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
  140. key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
  141. if err != nil {
  142. runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
  143. return
  144. }
  145. expc.queue.Add(key)
  146. }
  147. }
  148. func (expc *expandController) processNextWorkItem() bool {
  149. key, shutdown := expc.queue.Get()
  150. if shutdown {
  151. return false
  152. }
  153. defer expc.queue.Done(key)
  154. err := expc.syncHandler(key.(string))
  155. if err == nil {
  156. expc.queue.Forget(key)
  157. return true
  158. }
  159. runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
  160. expc.queue.AddRateLimited(key)
  161. return true
  162. }
  163. // syncHandler performs actual expansion of volume. If an error is returned
  164. // from this function - PVC will be requeued for resizing.
  165. func (expc *expandController) syncHandler(key string) error {
  166. namespace, name, err := kcache.SplitMetaNamespaceKey(key)
  167. if err != nil {
  168. return err
  169. }
  170. pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
  171. if errors.IsNotFound(err) {
  172. return nil
  173. }
  174. if err != nil {
  175. klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
  176. return err
  177. }
  178. pv, err := getPersistentVolume(pvc, expc.pvLister)
  179. if err != nil {
  180. klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
  181. return err
  182. }
  183. if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
  184. err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
  185. klog.V(4).Infof("%v", err)
  186. return err
  187. }
  188. claimClass := v1helper.GetPersistentVolumeClaimClass(pvc)
  189. if claimClass == "" {
  190. klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc))
  191. return nil
  192. }
  193. class, err := expc.classLister.Get(claimClass)
  194. if err != nil {
  195. klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
  196. return nil
  197. }
  198. volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
  199. volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
  200. volumeResizerName := class.Provisioner
  201. if err != nil || volumePlugin == nil {
  202. msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
  203. "waiting for an external controller to process this PVC")
  204. eventType := v1.EventTypeNormal
  205. if err != nil {
  206. eventType = v1.EventTypeWarning
  207. }
  208. expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
  209. klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
  210. // If we are expecting that an external plugin will handle resizing this volume then
  211. // is no point in requeuing this PVC.
  212. return nil
  213. }
  214. if volumePlugin.IsMigratedToCSI() {
  215. msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
  216. expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
  217. csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner)
  218. if err != nil {
  219. errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
  220. expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
  221. return fmt.Errorf(errorMsg)
  222. }
  223. pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
  224. if err != nil {
  225. errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
  226. expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
  227. return fmt.Errorf(errorMsg)
  228. }
  229. return nil
  230. }
  231. return expc.expand(pvc, pv, volumeResizerName)
  232. }
  233. func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
  234. pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
  235. if err != nil {
  236. klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  237. return err
  238. }
  239. generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
  240. if err != nil {
  241. klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  242. return err
  243. }
  244. klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  245. _, detailedErr := generatedOperations.Run()
  246. return detailedErr
  247. }
  248. // TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines
  249. func (expc *expandController) Run(stopCh <-chan struct{}) {
  250. defer runtime.HandleCrash()
  251. defer expc.queue.ShutDown()
  252. klog.Infof("Starting expand controller")
  253. defer klog.Infof("Shutting down expand controller")
  254. if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
  255. return
  256. }
  257. for i := 0; i < defaultWorkerCount; i++ {
  258. go wait.Until(expc.runWorker, time.Second, stopCh)
  259. }
  260. <-stopCh
  261. }
  262. func (expc *expandController) runWorker() {
  263. for expc.processNextWorkItem() {
  264. }
  265. }
  266. func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) {
  267. volumeName := pvc.Spec.VolumeName
  268. pv, err := pvLister.Get(volumeName)
  269. if err != nil {
  270. return nil, fmt.Errorf("failed to find PV %q in PV informer cache with error : %v", volumeName, err)
  271. }
  272. return pv.DeepCopy(), nil
  273. }
  274. // Implementing VolumeHost interface
  275. func (expc *expandController) GetPluginDir(pluginName string) string {
  276. return ""
  277. }
  278. func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
  279. return ""
  280. }
  281. func (expc *expandController) GetPodsDir() string {
  282. return ""
  283. }
  284. func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
  285. return ""
  286. }
  287. func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  288. return ""
  289. }
  290. func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
  291. return ""
  292. }
  293. func (expc *expandController) GetKubeClient() clientset.Interface {
  294. return expc.kubeClient
  295. }
  296. func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
  297. return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
  298. }
  299. func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  300. return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
  301. }
  302. func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
  303. return expc.cloud
  304. }
  305. func (expc *expandController) GetMounter(pluginName string) mount.Interface {
  306. return nil
  307. }
  308. func (expc *expandController) GetExec(pluginName string) mount.Exec {
  309. return mount.NewOsExec()
  310. }
  311. func (expc *expandController) GetHostName() string {
  312. return ""
  313. }
  314. func (expc *expandController) GetHostIP() (net.IP, error) {
  315. return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
  316. }
  317. func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
  318. return v1.ResourceList{}, nil
  319. }
  320. func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  321. return func(_, _ string) (*v1.Secret, error) {
  322. return nil, fmt.Errorf("GetSecret unsupported in expandController")
  323. }
  324. }
  325. func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  326. return func(_, _ string) (*v1.ConfigMap, error) {
  327. return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
  328. }
  329. }
  330. func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  331. return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  332. return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
  333. }
  334. }
  335. func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
  336. return func(types.UID) {
  337. klog.Errorf("DeleteServiceAccountToken unsupported in expandController")
  338. }
  339. }
  340. func (expc *expandController) GetNodeLabels() (map[string]string, error) {
  341. return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
  342. }
  343. func (expc *expandController) GetNodeName() types.NodeName {
  344. return ""
  345. }
  346. func (expc *expandController) GetEventRecorder() record.EventRecorder {
  347. return expc.recorder
  348. }
  349. func (expc *expandController) GetSubpather() subpath.Interface {
  350. // not needed for expand controller
  351. return nil
  352. }