csi_client.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "net"
  20. "sync"
  21. "time"
  22. csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/status"
  26. api "k8s.io/api/core/v1"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/klog"
  30. "k8s.io/kubernetes/pkg/volume"
  31. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  32. )
  33. type csiClient interface {
  34. NodeGetInfo(ctx context.Context) (
  35. nodeID string,
  36. maxVolumePerNode int64,
  37. accessibleTopology map[string]string,
  38. err error)
  39. NodePublishVolume(
  40. ctx context.Context,
  41. volumeid string,
  42. readOnly bool,
  43. stagingTargetPath string,
  44. targetPath string,
  45. accessMode api.PersistentVolumeAccessMode,
  46. publishContext map[string]string,
  47. volumeContext map[string]string,
  48. secrets map[string]string,
  49. fsType string,
  50. mountOptions []string,
  51. ) error
  52. NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
  53. NodeUnpublishVolume(
  54. ctx context.Context,
  55. volID string,
  56. targetPath string,
  57. ) error
  58. NodeStageVolume(ctx context.Context,
  59. volID string,
  60. publishVolumeInfo map[string]string,
  61. stagingTargetPath string,
  62. fsType string,
  63. accessMode api.PersistentVolumeAccessMode,
  64. secrets map[string]string,
  65. volumeContext map[string]string,
  66. mountOptions []string,
  67. ) error
  68. NodeGetVolumeStats(
  69. ctx context.Context,
  70. volID string,
  71. targetPath string,
  72. ) (*volume.Metrics, error)
  73. NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
  74. NodeSupportsStageUnstage(ctx context.Context) (bool, error)
  75. NodeSupportsNodeExpand(ctx context.Context) (bool, error)
  76. NodeSupportsVolumeStats(ctx context.Context) (bool, error)
  77. }
  78. // Strongly typed address
  79. type csiAddr string
  80. // Strongly typed driver name
  81. type csiDriverName string
  82. // csiClient encapsulates all csi-plugin methods
  83. type csiDriverClient struct {
  84. driverName csiDriverName
  85. addr csiAddr
  86. nodeV1ClientCreator nodeV1ClientCreator
  87. }
  88. var _ csiClient = &csiDriverClient{}
  89. type nodeV1ClientCreator func(addr csiAddr) (
  90. nodeClient csipbv1.NodeClient,
  91. closer io.Closer,
  92. err error,
  93. )
  94. const (
  95. initialDuration = 1 * time.Second
  96. factor = 2.0
  97. steps = 5
  98. )
  99. // newV1NodeClient creates a new NodeClient with the internally used gRPC
  100. // connection set up. It also returns a closer which must to be called to close
  101. // the gRPC connection when the NodeClient is not used anymore.
  102. // This is the default implementation for the nodeV1ClientCreator, used in
  103. // newCsiDriverClient.
  104. func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
  105. var conn *grpc.ClientConn
  106. conn, err = newGrpcConn(addr)
  107. if err != nil {
  108. return nil, nil, err
  109. }
  110. nodeClient = csipbv1.NewNodeClient(conn)
  111. return nodeClient, conn, nil
  112. }
  113. func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
  114. if driverName == "" {
  115. return nil, fmt.Errorf("driver name is empty")
  116. }
  117. existingDriver, driverExists := csiDrivers.Get(string(driverName))
  118. if !driverExists {
  119. return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
  120. }
  121. nodeV1ClientCreator := newV1NodeClient
  122. return &csiDriverClient{
  123. driverName: driverName,
  124. addr: csiAddr(existingDriver.endpoint),
  125. nodeV1ClientCreator: nodeV1ClientCreator,
  126. }, nil
  127. }
  128. func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
  129. nodeID string,
  130. maxVolumePerNode int64,
  131. accessibleTopology map[string]string,
  132. err error) {
  133. klog.V(4).Info(log("calling NodeGetInfo rpc"))
  134. // TODO retries should happen at a lower layer (issue #73371)
  135. backoff := wait.Backoff{Duration: initialDuration, Factor: factor, Steps: steps}
  136. err = wait.ExponentialBackoff(backoff, func() (bool, error) {
  137. var getNodeInfoError error
  138. nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
  139. if nodeID != "" {
  140. return true, nil
  141. }
  142. if getNodeInfoError != nil {
  143. klog.Warningf("Error calling CSI NodeGetInfo(): %v", getNodeInfoError.Error())
  144. }
  145. // Continue with exponential backoff
  146. return false, nil
  147. })
  148. return nodeID, maxVolumePerNode, accessibleTopology, err
  149. }
  150. func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
  151. nodeID string,
  152. maxVolumePerNode int64,
  153. accessibleTopology map[string]string,
  154. err error) {
  155. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  156. if err != nil {
  157. return "", 0, nil, err
  158. }
  159. defer closer.Close()
  160. res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
  161. if err != nil {
  162. return "", 0, nil, err
  163. }
  164. topology := res.GetAccessibleTopology()
  165. if topology != nil {
  166. accessibleTopology = topology.Segments
  167. }
  168. return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
  169. }
  170. func (c *csiDriverClient) NodePublishVolume(
  171. ctx context.Context,
  172. volID string,
  173. readOnly bool,
  174. stagingTargetPath string,
  175. targetPath string,
  176. accessMode api.PersistentVolumeAccessMode,
  177. publishContext map[string]string,
  178. volumeContext map[string]string,
  179. secrets map[string]string,
  180. fsType string,
  181. mountOptions []string,
  182. ) error {
  183. klog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath))
  184. if volID == "" {
  185. return errors.New("missing volume id")
  186. }
  187. if targetPath == "" {
  188. return errors.New("missing target path")
  189. }
  190. if c.nodeV1ClientCreator == nil {
  191. return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
  192. }
  193. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  194. if err != nil {
  195. return err
  196. }
  197. defer closer.Close()
  198. req := &csipbv1.NodePublishVolumeRequest{
  199. VolumeId: volID,
  200. TargetPath: targetPath,
  201. Readonly: readOnly,
  202. PublishContext: publishContext,
  203. VolumeContext: volumeContext,
  204. Secrets: secrets,
  205. VolumeCapability: &csipbv1.VolumeCapability{
  206. AccessMode: &csipbv1.VolumeCapability_AccessMode{
  207. Mode: asCSIAccessModeV1(accessMode),
  208. },
  209. },
  210. }
  211. if stagingTargetPath != "" {
  212. req.StagingTargetPath = stagingTargetPath
  213. }
  214. if fsType == fsTypeBlockName {
  215. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
  216. Block: &csipbv1.VolumeCapability_BlockVolume{},
  217. }
  218. } else {
  219. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
  220. Mount: &csipbv1.VolumeCapability_MountVolume{
  221. FsType: fsType,
  222. MountFlags: mountOptions,
  223. },
  224. }
  225. }
  226. _, err = nodeClient.NodePublishVolume(ctx, req)
  227. if err != nil && !isFinalError(err) {
  228. return volumetypes.NewUncertainProgressError(err.Error())
  229. }
  230. return err
  231. }
  232. func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
  233. if c.nodeV1ClientCreator == nil {
  234. return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
  235. }
  236. if volumeID == "" {
  237. return newSize, errors.New("missing volume id")
  238. }
  239. if volumePath == "" {
  240. return newSize, errors.New("missing volume path")
  241. }
  242. if newSize.Value() < 0 {
  243. return newSize, errors.New("size can not be less than 0")
  244. }
  245. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  246. if err != nil {
  247. return newSize, err
  248. }
  249. defer closer.Close()
  250. req := &csipbv1.NodeExpandVolumeRequest{
  251. VolumeId: volumeID,
  252. VolumePath: volumePath,
  253. CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
  254. }
  255. resp, err := nodeClient.NodeExpandVolume(ctx, req)
  256. if err != nil {
  257. return newSize, err
  258. }
  259. updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
  260. return *updatedQuantity, nil
  261. }
  262. func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
  263. klog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath))
  264. if volID == "" {
  265. return errors.New("missing volume id")
  266. }
  267. if targetPath == "" {
  268. return errors.New("missing target path")
  269. }
  270. if c.nodeV1ClientCreator == nil {
  271. return errors.New("nodeV1ClientCreate is nil")
  272. }
  273. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  274. if err != nil {
  275. return err
  276. }
  277. defer closer.Close()
  278. req := &csipbv1.NodeUnpublishVolumeRequest{
  279. VolumeId: volID,
  280. TargetPath: targetPath,
  281. }
  282. _, err = nodeClient.NodeUnpublishVolume(ctx, req)
  283. return err
  284. }
  285. func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
  286. volID string,
  287. publishContext map[string]string,
  288. stagingTargetPath string,
  289. fsType string,
  290. accessMode api.PersistentVolumeAccessMode,
  291. secrets map[string]string,
  292. volumeContext map[string]string,
  293. mountOptions []string,
  294. ) error {
  295. klog.V(4).Info(log("calling NodeStageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
  296. if volID == "" {
  297. return errors.New("missing volume id")
  298. }
  299. if stagingTargetPath == "" {
  300. return errors.New("missing staging target path")
  301. }
  302. if c.nodeV1ClientCreator == nil {
  303. return errors.New("nodeV1ClientCreate is nil")
  304. }
  305. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  306. if err != nil {
  307. return err
  308. }
  309. defer closer.Close()
  310. req := &csipbv1.NodeStageVolumeRequest{
  311. VolumeId: volID,
  312. PublishContext: publishContext,
  313. StagingTargetPath: stagingTargetPath,
  314. VolumeCapability: &csipbv1.VolumeCapability{
  315. AccessMode: &csipbv1.VolumeCapability_AccessMode{
  316. Mode: asCSIAccessModeV1(accessMode),
  317. },
  318. },
  319. Secrets: secrets,
  320. VolumeContext: volumeContext,
  321. }
  322. if fsType == fsTypeBlockName {
  323. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
  324. Block: &csipbv1.VolumeCapability_BlockVolume{},
  325. }
  326. } else {
  327. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
  328. Mount: &csipbv1.VolumeCapability_MountVolume{
  329. FsType: fsType,
  330. MountFlags: mountOptions,
  331. },
  332. }
  333. }
  334. _, err = nodeClient.NodeStageVolume(ctx, req)
  335. if err != nil && !isFinalError(err) {
  336. return volumetypes.NewUncertainProgressError(err.Error())
  337. }
  338. return err
  339. }
  340. func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
  341. klog.V(4).Info(log("calling NodeUnstageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
  342. if volID == "" {
  343. return errors.New("missing volume id")
  344. }
  345. if stagingTargetPath == "" {
  346. return errors.New("missing staging target path")
  347. }
  348. if c.nodeV1ClientCreator == nil {
  349. return errors.New("nodeV1ClientCreate is nil")
  350. }
  351. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  352. if err != nil {
  353. return err
  354. }
  355. defer closer.Close()
  356. req := &csipbv1.NodeUnstageVolumeRequest{
  357. VolumeId: volID,
  358. StagingTargetPath: stagingTargetPath,
  359. }
  360. _, err = nodeClient.NodeUnstageVolume(ctx, req)
  361. return err
  362. }
  363. func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
  364. klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
  365. if c.nodeV1ClientCreator == nil {
  366. return false, errors.New("nodeV1ClientCreate is nil")
  367. }
  368. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  369. if err != nil {
  370. return false, err
  371. }
  372. defer closer.Close()
  373. req := &csipbv1.NodeGetCapabilitiesRequest{}
  374. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  375. if err != nil {
  376. return false, err
  377. }
  378. capabilities := resp.GetCapabilities()
  379. if capabilities == nil {
  380. return false, nil
  381. }
  382. for _, capability := range capabilities {
  383. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
  384. return true, nil
  385. }
  386. }
  387. return false, nil
  388. }
  389. func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
  390. klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
  391. if c.nodeV1ClientCreator == nil {
  392. return false, errors.New("nodeV1ClientCreate is nil")
  393. }
  394. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  395. if err != nil {
  396. return false, err
  397. }
  398. defer closer.Close()
  399. req := &csipbv1.NodeGetCapabilitiesRequest{}
  400. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  401. if err != nil {
  402. return false, err
  403. }
  404. capabilities := resp.GetCapabilities()
  405. stageUnstageSet := false
  406. if capabilities == nil {
  407. return false, nil
  408. }
  409. for _, capability := range capabilities {
  410. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
  411. stageUnstageSet = true
  412. break
  413. }
  414. }
  415. return stageUnstageSet, nil
  416. }
  417. func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
  418. switch am {
  419. case api.ReadWriteOnce:
  420. return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
  421. case api.ReadOnlyMany:
  422. return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
  423. case api.ReadWriteMany:
  424. return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
  425. }
  426. return csipbv1.VolumeCapability_AccessMode_UNKNOWN
  427. }
  428. func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
  429. network := "unix"
  430. klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
  431. return grpc.Dial(
  432. string(addr),
  433. grpc.WithInsecure(),
  434. grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
  435. return (&net.Dialer{}).DialContext(ctx, network, target)
  436. }),
  437. )
  438. }
  439. // CSI client getter with cache.
  440. // This provides a method to initialize CSI client with driver name and caches
  441. // it for later use. When CSI clients have not been discovered yet (e.g.
  442. // on kubelet restart), client initialization will fail. Users of CSI client (e.g.
  443. // mounter manager and block mapper) can use this to delay CSI client
  444. // initialization until needed.
  445. type csiClientGetter struct {
  446. sync.RWMutex
  447. csiClient csiClient
  448. driverName csiDriverName
  449. }
  450. func (c *csiClientGetter) Get() (csiClient, error) {
  451. c.RLock()
  452. if c.csiClient != nil {
  453. c.RUnlock()
  454. return c.csiClient, nil
  455. }
  456. c.RUnlock()
  457. c.Lock()
  458. defer c.Unlock()
  459. // Double-checking locking criterion.
  460. if c.csiClient != nil {
  461. return c.csiClient, nil
  462. }
  463. csi, err := newCsiDriverClient(c.driverName)
  464. if err != nil {
  465. return nil, err
  466. }
  467. c.csiClient = csi
  468. return c.csiClient, nil
  469. }
  470. func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
  471. klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
  472. if c.nodeV1ClientCreator == nil {
  473. return false, errors.New("nodeV1ClientCreate is nil")
  474. }
  475. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  476. if err != nil {
  477. return false, err
  478. }
  479. defer closer.Close()
  480. req := &csipbv1.NodeGetCapabilitiesRequest{}
  481. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  482. if err != nil {
  483. return false, err
  484. }
  485. capabilities := resp.GetCapabilities()
  486. if capabilities == nil {
  487. return false, nil
  488. }
  489. for _, capability := range capabilities {
  490. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
  491. return true, nil
  492. }
  493. }
  494. return false, nil
  495. }
  496. func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
  497. klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
  498. if volID == "" {
  499. return nil, errors.New("missing volume id")
  500. }
  501. if targetPath == "" {
  502. return nil, errors.New("missing target path")
  503. }
  504. if c.nodeV1ClientCreator == nil {
  505. return nil, errors.New("nodeV1ClientCreate is nil")
  506. }
  507. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  508. if err != nil {
  509. return nil, err
  510. }
  511. defer closer.Close()
  512. req := &csipbv1.NodeGetVolumeStatsRequest{
  513. VolumeId: volID,
  514. VolumePath: targetPath,
  515. }
  516. resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
  517. if err != nil {
  518. return nil, err
  519. }
  520. usages := resp.GetUsage()
  521. if usages == nil {
  522. return nil, fmt.Errorf("failed to get usage from response. usage is nil")
  523. }
  524. metrics := &volume.Metrics{
  525. Used: resource.NewQuantity(int64(0), resource.BinarySI),
  526. Capacity: resource.NewQuantity(int64(0), resource.BinarySI),
  527. Available: resource.NewQuantity(int64(0), resource.BinarySI),
  528. InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
  529. Inodes: resource.NewQuantity(int64(0), resource.BinarySI),
  530. InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
  531. }
  532. for _, usage := range usages {
  533. if usage == nil {
  534. continue
  535. }
  536. unit := usage.GetUnit()
  537. switch unit {
  538. case csipbv1.VolumeUsage_BYTES:
  539. metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
  540. metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
  541. metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
  542. case csipbv1.VolumeUsage_INODES:
  543. metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
  544. metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
  545. metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
  546. default:
  547. klog.Errorf("unknown key %s in usage", unit.String())
  548. }
  549. }
  550. return metrics, nil
  551. }
  552. func isFinalError(err error) bool {
  553. // Sources:
  554. // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
  555. // https://github.com/container-storage-interface/spec/blob/master/spec.md
  556. st, ok := status.FromError(err)
  557. if !ok {
  558. // This is not gRPC error. The operation must have failed before gRPC
  559. // method was called, otherwise we would get gRPC error.
  560. // We don't know if any previous volume operation is in progress, be on the safe side.
  561. return false
  562. }
  563. switch st.Code() {
  564. case codes.Canceled, // gRPC: Client Application cancelled the request
  565. codes.DeadlineExceeded, // gRPC: Timeout
  566. codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress.
  567. codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress.
  568. codes.Aborted: // CSI: Operation pending for volume
  569. return false
  570. }
  571. // All other errors mean that operation either did not
  572. // even start or failed. It is for sure not in progress.
  573. return true
  574. }