nodeinfomanager.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package nodeinfomanager includes internal functions used to add/delete labels to
  14. // kubernetes nodes for corresponding CSI drivers
  15. package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
  16. import (
  17. "encoding/json"
  18. goerrors "errors"
  19. "fmt"
  20. "strings"
  21. "time"
  22. "k8s.io/api/core/v1"
  23. storagev1beta1 "k8s.io/api/storage/v1beta1"
  24. "k8s.io/apimachinery/pkg/api/errors"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. clientset "k8s.io/client-go/kubernetes"
  33. "k8s.io/klog"
  34. "k8s.io/kubernetes/pkg/features"
  35. nodeutil "k8s.io/kubernetes/pkg/util/node"
  36. "k8s.io/kubernetes/pkg/volume"
  37. "k8s.io/kubernetes/pkg/volume/util"
  38. )
  39. const (
  40. // Name of node annotation that contains JSON map of driver names to node
  41. annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
  42. )
  43. var (
  44. nodeKind = v1.SchemeGroupVersion.WithKind("Node")
  45. updateBackoff = wait.Backoff{
  46. Steps: 4,
  47. Duration: 10 * time.Millisecond,
  48. Factor: 5.0,
  49. Jitter: 0.1,
  50. }
  51. )
  52. // nodeInfoManager contains necessary common dependencies to update node info on both
  53. // the Node and CSINode objects.
  54. type nodeInfoManager struct {
  55. nodeName types.NodeName
  56. volumeHost volume.VolumeHost
  57. migratedPlugins map[string](func() bool)
  58. }
  59. // If no updates is needed, the function must return the same Node object as the input.
  60. type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
  61. // Interface implements an interface for managing labels of a node
  62. type Interface interface {
  63. CreateCSINode() (*storagev1beta1.CSINode, error)
  64. // Updates or Creates the CSINode object with annotations for CSI Migration
  65. InitializeCSINodeWithAnnotation() error
  66. // Record in the cluster the given node information from the CSI driver with the given name.
  67. // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
  68. // to other methods in this interface.
  69. InstallCSIDriver(driverName string, driverNodeID string, maxVolumeLimit int64, topology map[string]string) error
  70. // Remove in the cluster node information from the CSI driver with the given name.
  71. // Concurrent calls to UninstallCSIDriver() is allowed, but they should not be intertwined with calls
  72. // to other methods in this interface.
  73. UninstallCSIDriver(driverName string) error
  74. }
  75. // NewNodeInfoManager initializes nodeInfoManager
  76. func NewNodeInfoManager(
  77. nodeName types.NodeName,
  78. volumeHost volume.VolumeHost,
  79. migratedPlugins map[string](func() bool)) Interface {
  80. return &nodeInfoManager{
  81. nodeName: nodeName,
  82. volumeHost: volumeHost,
  83. migratedPlugins: migratedPlugins,
  84. }
  85. }
  86. // InstallCSIDriver updates the node ID annotation in the Node object and CSIDrivers field in the
  87. // CSINode object. If the CSINode object doesn't yet exist, it will be created.
  88. // If multiple calls to InstallCSIDriver() are made in parallel, some calls might receive Node or
  89. // CSINode update conflicts, which causes the function to retry the corresponding update.
  90. func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
  91. if driverNodeID == "" {
  92. return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
  93. }
  94. nodeUpdateFuncs := []nodeUpdateFunc{
  95. updateNodeIDInNode(driverName, driverNodeID),
  96. }
  97. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  98. nodeUpdateFuncs = append(nodeUpdateFuncs, updateTopologyLabels(topology))
  99. }
  100. if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
  101. nodeUpdateFuncs = append(nodeUpdateFuncs, updateMaxAttachLimit(driverName, maxAttachLimit))
  102. }
  103. err := nim.updateNode(nodeUpdateFuncs...)
  104. if err != nil {
  105. return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
  106. }
  107. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  108. err = nim.updateCSINode(driverName, driverNodeID, topology)
  109. if err != nil {
  110. return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
  111. }
  112. }
  113. return nil
  114. }
  115. // UninstallCSIDriver removes the node ID annotation from the Node object and CSIDrivers field from the
  116. // CSINode object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
  117. // If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
  118. // CSINode update conflicts, which causes the function to retry the corresponding update.
  119. func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
  120. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  121. err := nim.uninstallDriverFromCSINode(driverName)
  122. if err != nil {
  123. return fmt.Errorf("error uninstalling CSI driver from CSINode object %v", err)
  124. }
  125. }
  126. err := nim.updateNode(
  127. removeMaxAttachLimit(driverName),
  128. removeNodeIDFromNode(driverName),
  129. )
  130. if err != nil {
  131. return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
  132. }
  133. return nil
  134. }
  135. func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
  136. var updateErrs []error
  137. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  138. if err := nim.tryUpdateNode(updateFuncs...); err != nil {
  139. updateErrs = append(updateErrs, err)
  140. return false, nil
  141. }
  142. return true, nil
  143. })
  144. if err != nil {
  145. return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  146. }
  147. return nil
  148. }
  149. // updateNode repeatedly attempts to update the corresponding node object
  150. // which is modified by applying the given update functions sequentially.
  151. // Because updateFuncs are applied sequentially, later updateFuncs should take into account
  152. // the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
  153. // functions update the same field, updates in the last function are persisted.
  154. func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error {
  155. // Retrieve the latest version of Node before attempting update, so that
  156. // existing changes are not overwritten.
  157. kubeClient := nim.volumeHost.GetKubeClient()
  158. if kubeClient == nil {
  159. return fmt.Errorf("error getting kube client")
  160. }
  161. nodeClient := kubeClient.CoreV1().Nodes()
  162. originalNode, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
  163. if err != nil {
  164. return err
  165. }
  166. node := originalNode.DeepCopy()
  167. needUpdate := false
  168. for _, update := range updateFuncs {
  169. newNode, updated, err := update(node)
  170. if err != nil {
  171. return err
  172. }
  173. node = newNode
  174. needUpdate = needUpdate || updated
  175. }
  176. if needUpdate {
  177. // PatchNodeStatus can update both node's status and labels or annotations
  178. // Updating status by directly updating node does not work
  179. _, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
  180. return updateErr
  181. }
  182. return nil
  183. }
  184. // Guarantees the map is non-nil if no error is returned.
  185. func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
  186. var previousAnnotationValue string
  187. if node.ObjectMeta.Annotations != nil {
  188. previousAnnotationValue =
  189. node.ObjectMeta.Annotations[annotationKeyNodeID]
  190. }
  191. var existingDriverMap map[string]string
  192. if previousAnnotationValue != "" {
  193. // Parse previousAnnotationValue as JSON
  194. if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
  195. return nil, fmt.Errorf(
  196. "failed to parse node's %q annotation value (%q) err=%v",
  197. annotationKeyNodeID,
  198. previousAnnotationValue,
  199. err)
  200. }
  201. }
  202. if existingDriverMap == nil {
  203. return make(map[string]string), nil
  204. }
  205. return existingDriverMap, nil
  206. }
  207. // updateNodeIDInNode returns a function that updates a Node object with the given
  208. // Node ID information.
  209. func updateNodeIDInNode(
  210. csiDriverName string,
  211. csiDriverNodeID string) nodeUpdateFunc {
  212. return func(node *v1.Node) (*v1.Node, bool, error) {
  213. existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
  214. if err != nil {
  215. return nil, false, err
  216. }
  217. if val, ok := existingDriverMap[csiDriverName]; ok {
  218. if val == csiDriverNodeID {
  219. // Value already exists in node annotation, nothing more to do
  220. return node, false, nil
  221. }
  222. }
  223. // Add/update annotation value
  224. existingDriverMap[csiDriverName] = csiDriverNodeID
  225. jsonObj, err := json.Marshal(existingDriverMap)
  226. if err != nil {
  227. return nil, false, fmt.Errorf(
  228. "error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
  229. csiDriverName,
  230. csiDriverNodeID,
  231. err)
  232. }
  233. if node.ObjectMeta.Annotations == nil {
  234. node.ObjectMeta.Annotations = make(map[string]string)
  235. }
  236. node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
  237. return node, true, nil
  238. }
  239. }
  240. // removeNodeIDFromNode returns a function that removes node ID information matching the given
  241. // driver name from a Node object.
  242. func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
  243. return func(node *v1.Node) (*v1.Node, bool, error) {
  244. var previousAnnotationValue string
  245. if node.ObjectMeta.Annotations != nil {
  246. previousAnnotationValue =
  247. node.ObjectMeta.Annotations[annotationKeyNodeID]
  248. }
  249. if previousAnnotationValue == "" {
  250. return node, false, nil
  251. }
  252. // Parse previousAnnotationValue as JSON
  253. existingDriverMap := map[string]string{}
  254. if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
  255. return nil, false, fmt.Errorf(
  256. "failed to parse node's %q annotation value (%q) err=%v",
  257. annotationKeyNodeID,
  258. previousAnnotationValue,
  259. err)
  260. }
  261. if _, ok := existingDriverMap[csiDriverName]; !ok {
  262. // Value is already missing in node annotation, nothing more to do
  263. return node, false, nil
  264. }
  265. // Delete annotation value
  266. delete(existingDriverMap, csiDriverName)
  267. if len(existingDriverMap) == 0 {
  268. delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
  269. } else {
  270. jsonObj, err := json.Marshal(existingDriverMap)
  271. if err != nil {
  272. return nil, false, fmt.Errorf(
  273. "failed while trying to remove key %q from node %q annotation. Existing data: %v",
  274. csiDriverName,
  275. annotationKeyNodeID,
  276. previousAnnotationValue)
  277. }
  278. node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
  279. }
  280. return node, true, nil
  281. }
  282. }
  283. // updateTopologyLabels returns a function that updates labels of a Node object with the given
  284. // topology information.
  285. func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
  286. return func(node *v1.Node) (*v1.Node, bool, error) {
  287. if topology == nil || len(topology) == 0 {
  288. return node, false, nil
  289. }
  290. for k, v := range topology {
  291. if curVal, exists := node.Labels[k]; exists && curVal != v {
  292. return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
  293. }
  294. }
  295. if node.Labels == nil {
  296. node.Labels = make(map[string]string)
  297. }
  298. for k, v := range topology {
  299. node.Labels[k] = v
  300. }
  301. return node, true, nil
  302. }
  303. }
  304. func (nim *nodeInfoManager) updateCSINode(
  305. driverName string,
  306. driverNodeID string,
  307. topology map[string]string) error {
  308. csiKubeClient := nim.volumeHost.GetKubeClient()
  309. if csiKubeClient == nil {
  310. return fmt.Errorf("error getting CSI client")
  311. }
  312. var updateErrs []error
  313. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  314. if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, topology); err != nil {
  315. updateErrs = append(updateErrs, err)
  316. return false, nil
  317. }
  318. return true, nil
  319. })
  320. if err != nil {
  321. return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  322. }
  323. return nil
  324. }
  325. func (nim *nodeInfoManager) tryUpdateCSINode(
  326. csiKubeClient clientset.Interface,
  327. driverName string,
  328. driverNodeID string,
  329. topology map[string]string) error {
  330. nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
  331. if nodeInfo == nil || errors.IsNotFound(err) {
  332. nodeInfo, err = nim.CreateCSINode()
  333. }
  334. if err != nil {
  335. return err
  336. }
  337. return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
  338. }
  339. func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
  340. csiKubeClient := nim.volumeHost.GetKubeClient()
  341. if csiKubeClient == nil {
  342. return goerrors.New("error getting CSI client")
  343. }
  344. var updateErrs []error
  345. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  346. if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
  347. updateErrs = append(updateErrs, err)
  348. return false, nil
  349. }
  350. return true, nil
  351. })
  352. if err != nil {
  353. return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  354. }
  355. return nil
  356. }
  357. func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
  358. nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
  359. if nodeInfo == nil || errors.IsNotFound(err) {
  360. // CreateCSINode will set the annotation
  361. _, err = nim.CreateCSINode()
  362. return err
  363. }
  364. annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  365. if annotationModified {
  366. _, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo)
  367. return err
  368. }
  369. return nil
  370. }
  371. func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) {
  372. kubeClient := nim.volumeHost.GetKubeClient()
  373. if kubeClient == nil {
  374. return nil, fmt.Errorf("error getting kube client")
  375. }
  376. csiKubeClient := nim.volumeHost.GetKubeClient()
  377. if csiKubeClient == nil {
  378. return nil, fmt.Errorf("error getting CSI client")
  379. }
  380. node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
  381. if err != nil {
  382. return nil, err
  383. }
  384. nodeInfo := &storagev1beta1.CSINode{
  385. ObjectMeta: metav1.ObjectMeta{
  386. Name: string(nim.nodeName),
  387. OwnerReferences: []metav1.OwnerReference{
  388. {
  389. APIVersion: nodeKind.Version,
  390. Kind: nodeKind.Kind,
  391. Name: node.Name,
  392. UID: node.UID,
  393. },
  394. },
  395. },
  396. Spec: storagev1beta1.CSINodeSpec{
  397. Drivers: []storagev1beta1.CSINodeDriver{},
  398. },
  399. }
  400. setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  401. return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo)
  402. }
  403. func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1beta1.CSINode) (modified bool) {
  404. if migratedPlugins == nil {
  405. return false
  406. }
  407. nodeInfoAnnotations := nodeInfo.GetAnnotations()
  408. if nodeInfoAnnotations == nil {
  409. nodeInfoAnnotations = map[string]string{}
  410. }
  411. var oldAnnotationSet sets.String
  412. mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
  413. tok := strings.Split(mpa, ",")
  414. if len(mpa) == 0 {
  415. oldAnnotationSet = sets.NewString()
  416. } else {
  417. oldAnnotationSet = sets.NewString(tok...)
  418. }
  419. newAnnotationSet := sets.NewString()
  420. for pluginName, migratedFunc := range migratedPlugins {
  421. if migratedFunc() {
  422. newAnnotationSet.Insert(pluginName)
  423. }
  424. }
  425. if oldAnnotationSet.Equal(newAnnotationSet) {
  426. return false
  427. }
  428. nas := strings.Join(newAnnotationSet.List(), ",")
  429. if len(nas) != 0 {
  430. nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
  431. } else {
  432. delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
  433. }
  434. nodeInfo.Annotations = nodeInfoAnnotations
  435. return true
  436. }
  437. func (nim *nodeInfoManager) installDriverToCSINode(
  438. nodeInfo *storagev1beta1.CSINode,
  439. driverName string,
  440. driverNodeID string,
  441. topology map[string]string) error {
  442. csiKubeClient := nim.volumeHost.GetKubeClient()
  443. if csiKubeClient == nil {
  444. return fmt.Errorf("error getting CSI client")
  445. }
  446. topologyKeys := make(sets.String)
  447. for k := range topology {
  448. topologyKeys.Insert(k)
  449. }
  450. specModified := true
  451. // Clone driver list, omitting the driver that matches the given driverName
  452. newDriverSpecs := []storagev1beta1.CSINodeDriver{}
  453. for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
  454. if driverInfoSpec.Name == driverName {
  455. if driverInfoSpec.NodeID == driverNodeID &&
  456. sets.NewString(driverInfoSpec.TopologyKeys...).Equal(topologyKeys) {
  457. specModified = false
  458. }
  459. } else {
  460. // Omit driverInfoSpec matching given driverName
  461. newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
  462. }
  463. }
  464. annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  465. if !specModified && !annotationModified {
  466. return nil
  467. }
  468. // Append new driver
  469. driverSpec := storagev1beta1.CSINodeDriver{
  470. Name: driverName,
  471. NodeID: driverNodeID,
  472. TopologyKeys: topologyKeys.List(),
  473. }
  474. newDriverSpecs = append(newDriverSpecs, driverSpec)
  475. nodeInfo.Spec.Drivers = newDriverSpecs
  476. _, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo)
  477. return err
  478. }
  479. func (nim *nodeInfoManager) uninstallDriverFromCSINode(
  480. csiDriverName string) error {
  481. csiKubeClient := nim.volumeHost.GetKubeClient()
  482. if csiKubeClient == nil {
  483. return fmt.Errorf("error getting CSI client")
  484. }
  485. var updateErrs []error
  486. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  487. if err := nim.tryUninstallDriverFromCSINode(csiKubeClient, csiDriverName); err != nil {
  488. updateErrs = append(updateErrs, err)
  489. return false, nil
  490. }
  491. return true, nil
  492. })
  493. if err != nil {
  494. return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  495. }
  496. return nil
  497. }
  498. func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
  499. csiKubeClient clientset.Interface,
  500. csiDriverName string) error {
  501. nodeInfoClient := csiKubeClient.StorageV1beta1().CSINodes()
  502. nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
  503. if err != nil && errors.IsNotFound(err) {
  504. return nil
  505. } else if err != nil {
  506. return err
  507. }
  508. hasModified := false
  509. // Uninstall CSINodeDriver with name csiDriverName
  510. drivers := nodeInfo.Spec.Drivers[:0]
  511. for _, driver := range nodeInfo.Spec.Drivers {
  512. if driver.Name != csiDriverName {
  513. drivers = append(drivers, driver)
  514. } else {
  515. // Found a driver with name csiDriverName
  516. // Set hasModified to true because it will be removed
  517. hasModified = true
  518. }
  519. }
  520. if !hasModified {
  521. // No changes, don't update
  522. return nil
  523. }
  524. nodeInfo.Spec.Drivers = drivers
  525. _, err = nodeInfoClient.Update(nodeInfo)
  526. return err // do not wrap error
  527. }
  528. func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc {
  529. return func(node *v1.Node) (*v1.Node, bool, error) {
  530. if maxLimit <= 0 {
  531. klog.V(4).Infof("skipping adding attach limit for %s", driverName)
  532. return node, false, nil
  533. }
  534. if node.Status.Capacity == nil {
  535. node.Status.Capacity = v1.ResourceList{}
  536. }
  537. if node.Status.Allocatable == nil {
  538. node.Status.Allocatable = v1.ResourceList{}
  539. }
  540. limitKeyName := util.GetCSIAttachLimitKey(driverName)
  541. node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
  542. node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
  543. return node, true, nil
  544. }
  545. }
  546. func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
  547. return func(node *v1.Node) (*v1.Node, bool, error) {
  548. limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
  549. capacityExists := false
  550. if node.Status.Capacity != nil {
  551. _, capacityExists = node.Status.Capacity[limitKey]
  552. }
  553. allocatableExists := false
  554. if node.Status.Allocatable != nil {
  555. _, allocatableExists = node.Status.Allocatable[limitKey]
  556. }
  557. if !capacityExists && !allocatableExists {
  558. return node, false, nil
  559. }
  560. delete(node.Status.Capacity, limitKey)
  561. if len(node.Status.Capacity) == 0 {
  562. node.Status.Capacity = nil
  563. }
  564. delete(node.Status.Allocatable, limitKey)
  565. if len(node.Status.Allocatable) == 0 {
  566. node.Status.Allocatable = nil
  567. }
  568. return node, true, nil
  569. }
  570. }