nodeinfomanager.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package nodeinfomanager includes internal functions used to add/delete labels to
  14. // kubernetes nodes for corresponding CSI drivers
  15. package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
  16. import (
  17. "context"
  18. "encoding/json"
  19. goerrors "errors"
  20. "fmt"
  21. "math"
  22. "strings"
  23. "time"
  24. v1 "k8s.io/api/core/v1"
  25. storagev1 "k8s.io/api/storage/v1"
  26. "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/types"
  29. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. utilfeature "k8s.io/apiserver/pkg/util/feature"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/klog"
  35. "k8s.io/kubernetes/pkg/features"
  36. nodeutil "k8s.io/kubernetes/pkg/util/node"
  37. "k8s.io/kubernetes/pkg/volume"
  38. "k8s.io/kubernetes/pkg/volume/util"
  39. )
  40. const (
  41. // Name of node annotation that contains JSON map of driver names to node
  42. annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
  43. )
  44. var (
  45. nodeKind = v1.SchemeGroupVersion.WithKind("Node")
  46. updateBackoff = wait.Backoff{
  47. Steps: 4,
  48. Duration: 10 * time.Millisecond,
  49. Factor: 5.0,
  50. Jitter: 0.1,
  51. }
  52. )
  53. // nodeInfoManager contains necessary common dependencies to update node info on both
  54. // the Node and CSINode objects.
  55. type nodeInfoManager struct {
  56. nodeName types.NodeName
  57. volumeHost volume.VolumeHost
  58. migratedPlugins map[string](func() bool)
  59. }
  60. // If no updates is needed, the function must return the same Node object as the input.
  61. type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
  62. // Interface implements an interface for managing labels of a node
  63. type Interface interface {
  64. CreateCSINode() (*storagev1.CSINode, error)
  65. // Updates or Creates the CSINode object with annotations for CSI Migration
  66. InitializeCSINodeWithAnnotation() error
  67. // Record in the cluster the given node information from the CSI driver with the given name.
  68. // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
  69. // to other methods in this interface.
  70. InstallCSIDriver(driverName string, driverNodeID string, maxVolumeLimit int64, topology map[string]string) error
  71. // Remove in the cluster node information from the CSI driver with the given name.
  72. // Concurrent calls to UninstallCSIDriver() is allowed, but they should not be intertwined with calls
  73. // to other methods in this interface.
  74. UninstallCSIDriver(driverName string) error
  75. }
  76. // NewNodeInfoManager initializes nodeInfoManager
  77. func NewNodeInfoManager(
  78. nodeName types.NodeName,
  79. volumeHost volume.VolumeHost,
  80. migratedPlugins map[string](func() bool)) Interface {
  81. return &nodeInfoManager{
  82. nodeName: nodeName,
  83. volumeHost: volumeHost,
  84. migratedPlugins: migratedPlugins,
  85. }
  86. }
  87. // InstallCSIDriver updates the node ID annotation in the Node object and CSIDrivers field in the
  88. // CSINode object. If the CSINode object doesn't yet exist, it will be created.
  89. // If multiple calls to InstallCSIDriver() are made in parallel, some calls might receive Node or
  90. // CSINode update conflicts, which causes the function to retry the corresponding update.
  91. func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
  92. if driverNodeID == "" {
  93. return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
  94. }
  95. nodeUpdateFuncs := []nodeUpdateFunc{
  96. updateNodeIDInNode(driverName, driverNodeID),
  97. }
  98. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  99. nodeUpdateFuncs = append(nodeUpdateFuncs, updateTopologyLabels(topology))
  100. }
  101. err := nim.updateNode(nodeUpdateFuncs...)
  102. if err != nil {
  103. return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
  104. }
  105. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  106. err = nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
  107. if err != nil {
  108. return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
  109. }
  110. }
  111. return nil
  112. }
  113. // UninstallCSIDriver removes the node ID annotation from the Node object and CSIDrivers field from the
  114. // CSINode object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
  115. // If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
  116. // CSINode update conflicts, which causes the function to retry the corresponding update.
  117. func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
  118. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  119. err := nim.uninstallDriverFromCSINode(driverName)
  120. if err != nil {
  121. return fmt.Errorf("error uninstalling CSI driver from CSINode object %v", err)
  122. }
  123. }
  124. err := nim.updateNode(
  125. removeMaxAttachLimit(driverName),
  126. removeNodeIDFromNode(driverName),
  127. )
  128. if err != nil {
  129. return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
  130. }
  131. return nil
  132. }
  133. func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
  134. var updateErrs []error
  135. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  136. if err := nim.tryUpdateNode(updateFuncs...); err != nil {
  137. updateErrs = append(updateErrs, err)
  138. return false, nil
  139. }
  140. return true, nil
  141. })
  142. if err != nil {
  143. return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  144. }
  145. return nil
  146. }
  147. // updateNode repeatedly attempts to update the corresponding node object
  148. // which is modified by applying the given update functions sequentially.
  149. // Because updateFuncs are applied sequentially, later updateFuncs should take into account
  150. // the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
  151. // functions update the same field, updates in the last function are persisted.
  152. func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error {
  153. // Retrieve the latest version of Node before attempting update, so that
  154. // existing changes are not overwritten.
  155. kubeClient := nim.volumeHost.GetKubeClient()
  156. if kubeClient == nil {
  157. return fmt.Errorf("error getting kube client")
  158. }
  159. nodeClient := kubeClient.CoreV1().Nodes()
  160. originalNode, err := nodeClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
  161. if err != nil {
  162. return err
  163. }
  164. node := originalNode.DeepCopy()
  165. needUpdate := false
  166. for _, update := range updateFuncs {
  167. newNode, updated, err := update(node)
  168. if err != nil {
  169. return err
  170. }
  171. node = newNode
  172. needUpdate = needUpdate || updated
  173. }
  174. if needUpdate {
  175. // PatchNodeStatus can update both node's status and labels or annotations
  176. // Updating status by directly updating node does not work
  177. _, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
  178. return updateErr
  179. }
  180. return nil
  181. }
  182. // Guarantees the map is non-nil if no error is returned.
  183. func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
  184. var previousAnnotationValue string
  185. if node.ObjectMeta.Annotations != nil {
  186. previousAnnotationValue =
  187. node.ObjectMeta.Annotations[annotationKeyNodeID]
  188. }
  189. var existingDriverMap map[string]string
  190. if previousAnnotationValue != "" {
  191. // Parse previousAnnotationValue as JSON
  192. if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
  193. return nil, fmt.Errorf(
  194. "failed to parse node's %q annotation value (%q) err=%v",
  195. annotationKeyNodeID,
  196. previousAnnotationValue,
  197. err)
  198. }
  199. }
  200. if existingDriverMap == nil {
  201. return make(map[string]string), nil
  202. }
  203. return existingDriverMap, nil
  204. }
  205. // updateNodeIDInNode returns a function that updates a Node object with the given
  206. // Node ID information.
  207. func updateNodeIDInNode(
  208. csiDriverName string,
  209. csiDriverNodeID string) nodeUpdateFunc {
  210. return func(node *v1.Node) (*v1.Node, bool, error) {
  211. existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
  212. if err != nil {
  213. return nil, false, err
  214. }
  215. if val, ok := existingDriverMap[csiDriverName]; ok {
  216. if val == csiDriverNodeID {
  217. // Value already exists in node annotation, nothing more to do
  218. return node, false, nil
  219. }
  220. }
  221. // Add/update annotation value
  222. existingDriverMap[csiDriverName] = csiDriverNodeID
  223. jsonObj, err := json.Marshal(existingDriverMap)
  224. if err != nil {
  225. return nil, false, fmt.Errorf(
  226. "error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
  227. csiDriverName,
  228. csiDriverNodeID,
  229. err)
  230. }
  231. if node.ObjectMeta.Annotations == nil {
  232. node.ObjectMeta.Annotations = make(map[string]string)
  233. }
  234. node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
  235. return node, true, nil
  236. }
  237. }
  238. // removeNodeIDFromNode returns a function that removes node ID information matching the given
  239. // driver name from a Node object.
  240. func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
  241. return func(node *v1.Node) (*v1.Node, bool, error) {
  242. var previousAnnotationValue string
  243. if node.ObjectMeta.Annotations != nil {
  244. previousAnnotationValue =
  245. node.ObjectMeta.Annotations[annotationKeyNodeID]
  246. }
  247. if previousAnnotationValue == "" {
  248. return node, false, nil
  249. }
  250. // Parse previousAnnotationValue as JSON
  251. existingDriverMap := map[string]string{}
  252. if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
  253. return nil, false, fmt.Errorf(
  254. "failed to parse node's %q annotation value (%q) err=%v",
  255. annotationKeyNodeID,
  256. previousAnnotationValue,
  257. err)
  258. }
  259. if _, ok := existingDriverMap[csiDriverName]; !ok {
  260. // Value is already missing in node annotation, nothing more to do
  261. return node, false, nil
  262. }
  263. // Delete annotation value
  264. delete(existingDriverMap, csiDriverName)
  265. if len(existingDriverMap) == 0 {
  266. delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
  267. } else {
  268. jsonObj, err := json.Marshal(existingDriverMap)
  269. if err != nil {
  270. return nil, false, fmt.Errorf(
  271. "failed while trying to remove key %q from node %q annotation. Existing data: %v",
  272. csiDriverName,
  273. annotationKeyNodeID,
  274. previousAnnotationValue)
  275. }
  276. node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
  277. }
  278. return node, true, nil
  279. }
  280. }
  281. // updateTopologyLabels returns a function that updates labels of a Node object with the given
  282. // topology information.
  283. func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
  284. return func(node *v1.Node) (*v1.Node, bool, error) {
  285. if topology == nil || len(topology) == 0 {
  286. return node, false, nil
  287. }
  288. for k, v := range topology {
  289. if curVal, exists := node.Labels[k]; exists && curVal != v {
  290. return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
  291. }
  292. }
  293. if node.Labels == nil {
  294. node.Labels = make(map[string]string)
  295. }
  296. for k, v := range topology {
  297. node.Labels[k] = v
  298. }
  299. return node, true, nil
  300. }
  301. }
  302. func (nim *nodeInfoManager) updateCSINode(
  303. driverName string,
  304. driverNodeID string,
  305. maxAttachLimit int64,
  306. topology map[string]string) error {
  307. csiKubeClient := nim.volumeHost.GetKubeClient()
  308. if csiKubeClient == nil {
  309. return fmt.Errorf("error getting CSI client")
  310. }
  311. var updateErrs []error
  312. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  313. if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
  314. updateErrs = append(updateErrs, err)
  315. return false, nil
  316. }
  317. return true, nil
  318. })
  319. if err != nil {
  320. return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  321. }
  322. return nil
  323. }
  324. func (nim *nodeInfoManager) tryUpdateCSINode(
  325. csiKubeClient clientset.Interface,
  326. driverName string,
  327. driverNodeID string,
  328. maxAttachLimit int64,
  329. topology map[string]string) error {
  330. nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
  331. if nodeInfo == nil || errors.IsNotFound(err) {
  332. nodeInfo, err = nim.CreateCSINode()
  333. }
  334. if err != nil {
  335. return err
  336. }
  337. return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
  338. }
  339. func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
  340. csiKubeClient := nim.volumeHost.GetKubeClient()
  341. if csiKubeClient == nil {
  342. return goerrors.New("error getting CSI client")
  343. }
  344. var updateErrs []error
  345. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  346. if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
  347. updateErrs = append(updateErrs, err)
  348. return false, nil
  349. }
  350. return true, nil
  351. })
  352. if err != nil {
  353. return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  354. }
  355. return nil
  356. }
  357. func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
  358. nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
  359. if nodeInfo == nil || errors.IsNotFound(err) {
  360. // CreateCSINode will set the annotation
  361. _, err = nim.CreateCSINode()
  362. return err
  363. } else if err != nil {
  364. return err
  365. }
  366. annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  367. if annotationModified {
  368. _, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
  369. return err
  370. }
  371. return nil
  372. }
  373. func (nim *nodeInfoManager) CreateCSINode() (*storagev1.CSINode, error) {
  374. kubeClient := nim.volumeHost.GetKubeClient()
  375. if kubeClient == nil {
  376. return nil, fmt.Errorf("error getting kube client")
  377. }
  378. csiKubeClient := nim.volumeHost.GetKubeClient()
  379. if csiKubeClient == nil {
  380. return nil, fmt.Errorf("error getting CSI client")
  381. }
  382. node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
  383. if err != nil {
  384. return nil, err
  385. }
  386. nodeInfo := &storagev1.CSINode{
  387. ObjectMeta: metav1.ObjectMeta{
  388. Name: string(nim.nodeName),
  389. OwnerReferences: []metav1.OwnerReference{
  390. {
  391. APIVersion: nodeKind.Version,
  392. Kind: nodeKind.Kind,
  393. Name: node.Name,
  394. UID: node.UID,
  395. },
  396. },
  397. },
  398. Spec: storagev1.CSINodeSpec{
  399. Drivers: []storagev1.CSINodeDriver{},
  400. },
  401. }
  402. setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  403. return csiKubeClient.StorageV1().CSINodes().Create(context.TODO(), nodeInfo, metav1.CreateOptions{})
  404. }
  405. func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1.CSINode) (modified bool) {
  406. if migratedPlugins == nil {
  407. return false
  408. }
  409. nodeInfoAnnotations := nodeInfo.GetAnnotations()
  410. if nodeInfoAnnotations == nil {
  411. nodeInfoAnnotations = map[string]string{}
  412. }
  413. var oldAnnotationSet sets.String
  414. mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
  415. tok := strings.Split(mpa, ",")
  416. if len(mpa) == 0 {
  417. oldAnnotationSet = sets.NewString()
  418. } else {
  419. oldAnnotationSet = sets.NewString(tok...)
  420. }
  421. newAnnotationSet := sets.NewString()
  422. for pluginName, migratedFunc := range migratedPlugins {
  423. if migratedFunc() {
  424. newAnnotationSet.Insert(pluginName)
  425. }
  426. }
  427. if oldAnnotationSet.Equal(newAnnotationSet) {
  428. return false
  429. }
  430. nas := strings.Join(newAnnotationSet.List(), ",")
  431. if len(nas) != 0 {
  432. nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
  433. } else {
  434. delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
  435. }
  436. nodeInfo.Annotations = nodeInfoAnnotations
  437. return true
  438. }
  439. func (nim *nodeInfoManager) installDriverToCSINode(
  440. nodeInfo *storagev1.CSINode,
  441. driverName string,
  442. driverNodeID string,
  443. maxAttachLimit int64,
  444. topology map[string]string) error {
  445. csiKubeClient := nim.volumeHost.GetKubeClient()
  446. if csiKubeClient == nil {
  447. return fmt.Errorf("error getting CSI client")
  448. }
  449. topologyKeys := make(sets.String)
  450. for k := range topology {
  451. topologyKeys.Insert(k)
  452. }
  453. specModified := true
  454. // Clone driver list, omitting the driver that matches the given driverName
  455. newDriverSpecs := []storagev1.CSINodeDriver{}
  456. for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
  457. if driverInfoSpec.Name == driverName {
  458. if driverInfoSpec.NodeID == driverNodeID &&
  459. sets.NewString(driverInfoSpec.TopologyKeys...).Equal(topologyKeys) {
  460. specModified = false
  461. }
  462. } else {
  463. // Omit driverInfoSpec matching given driverName
  464. newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
  465. }
  466. }
  467. annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
  468. if !specModified && !annotationModified {
  469. return nil
  470. }
  471. // Append new driver
  472. driverSpec := storagev1.CSINodeDriver{
  473. Name: driverName,
  474. NodeID: driverNodeID,
  475. TopologyKeys: topologyKeys.List(),
  476. }
  477. if maxAttachLimit > 0 {
  478. if maxAttachLimit > math.MaxInt32 {
  479. klog.Warningf("Exceeded max supported attach limit value, truncating it to %d", math.MaxInt32)
  480. maxAttachLimit = math.MaxInt32
  481. }
  482. m := int32(maxAttachLimit)
  483. driverSpec.Allocatable = &storagev1.VolumeNodeResources{Count: &m}
  484. } else {
  485. klog.Errorf("Invalid attach limit value %d cannot be added to CSINode object for %q", maxAttachLimit, driverName)
  486. }
  487. newDriverSpecs = append(newDriverSpecs, driverSpec)
  488. nodeInfo.Spec.Drivers = newDriverSpecs
  489. _, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
  490. return err
  491. }
  492. func (nim *nodeInfoManager) uninstallDriverFromCSINode(
  493. csiDriverName string) error {
  494. csiKubeClient := nim.volumeHost.GetKubeClient()
  495. if csiKubeClient == nil {
  496. return fmt.Errorf("error getting CSI client")
  497. }
  498. var updateErrs []error
  499. err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
  500. if err := nim.tryUninstallDriverFromCSINode(csiKubeClient, csiDriverName); err != nil {
  501. updateErrs = append(updateErrs, err)
  502. return false, nil
  503. }
  504. return true, nil
  505. })
  506. if err != nil {
  507. return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
  508. }
  509. return nil
  510. }
  511. func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
  512. csiKubeClient clientset.Interface,
  513. csiDriverName string) error {
  514. nodeInfoClient := csiKubeClient.StorageV1().CSINodes()
  515. nodeInfo, err := nodeInfoClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
  516. if err != nil && errors.IsNotFound(err) {
  517. return nil
  518. } else if err != nil {
  519. return err
  520. }
  521. hasModified := false
  522. // Uninstall CSINodeDriver with name csiDriverName
  523. drivers := nodeInfo.Spec.Drivers[:0]
  524. for _, driver := range nodeInfo.Spec.Drivers {
  525. if driver.Name != csiDriverName {
  526. drivers = append(drivers, driver)
  527. } else {
  528. // Found a driver with name csiDriverName
  529. // Set hasModified to true because it will be removed
  530. hasModified = true
  531. }
  532. }
  533. if !hasModified {
  534. // No changes, don't update
  535. return nil
  536. }
  537. nodeInfo.Spec.Drivers = drivers
  538. _, err = nodeInfoClient.Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
  539. return err // do not wrap error
  540. }
  541. func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
  542. return func(node *v1.Node) (*v1.Node, bool, error) {
  543. limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
  544. capacityExists := false
  545. if node.Status.Capacity != nil {
  546. _, capacityExists = node.Status.Capacity[limitKey]
  547. }
  548. allocatableExists := false
  549. if node.Status.Allocatable != nil {
  550. _, allocatableExists = node.Status.Allocatable[limitKey]
  551. }
  552. if !capacityExists && !allocatableExists {
  553. return node, false, nil
  554. }
  555. delete(node.Status.Capacity, limitKey)
  556. if len(node.Status.Capacity) == 0 {
  557. node.Status.Capacity = nil
  558. }
  559. delete(node.Status.Allocatable, limitKey)
  560. if len(node.Status.Allocatable) == 0 {
  561. node.Status.Allocatable = nil
  562. }
  563. return node, true, nil
  564. }
  565. }