csi_client.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strings"
  21. "sync"
  22. "time"
  23. csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
  24. "google.golang.org/grpc"
  25. api "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/resource"
  27. utilversion "k8s.io/apimachinery/pkg/util/version"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/features"
  32. "k8s.io/kubernetes/pkg/volume"
  33. csipbv0 "k8s.io/kubernetes/pkg/volume/csi/csiv0"
  34. )
  35. type csiClient interface {
  36. NodeGetInfo(ctx context.Context) (
  37. nodeID string,
  38. maxVolumePerNode int64,
  39. accessibleTopology map[string]string,
  40. err error)
  41. NodePublishVolume(
  42. ctx context.Context,
  43. volumeid string,
  44. readOnly bool,
  45. stagingTargetPath string,
  46. targetPath string,
  47. accessMode api.PersistentVolumeAccessMode,
  48. publishContext map[string]string,
  49. volumeContext map[string]string,
  50. secrets map[string]string,
  51. fsType string,
  52. mountOptions []string,
  53. ) error
  54. NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
  55. NodeUnpublishVolume(
  56. ctx context.Context,
  57. volID string,
  58. targetPath string,
  59. ) error
  60. NodeStageVolume(ctx context.Context,
  61. volID string,
  62. publishVolumeInfo map[string]string,
  63. stagingTargetPath string,
  64. fsType string,
  65. accessMode api.PersistentVolumeAccessMode,
  66. secrets map[string]string,
  67. volumeContext map[string]string,
  68. mountOptions []string,
  69. ) error
  70. NodeGetVolumeStats(
  71. ctx context.Context,
  72. volID string,
  73. targetPath string,
  74. ) (*volume.Metrics, error)
  75. NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
  76. NodeSupportsStageUnstage(ctx context.Context) (bool, error)
  77. NodeSupportsNodeExpand(ctx context.Context) (bool, error)
  78. NodeSupportsVolumeStats(ctx context.Context) (bool, error)
  79. }
  80. // Strongly typed address
  81. type csiAddr string
  82. // Strongly typed driver name
  83. type csiDriverName string
  84. // csiClient encapsulates all csi-plugin methods
  85. type csiDriverClient struct {
  86. driverName csiDriverName
  87. addr csiAddr
  88. nodeV1ClientCreator nodeV1ClientCreator
  89. nodeV0ClientCreator nodeV0ClientCreator
  90. }
  91. var _ csiClient = &csiDriverClient{}
  92. type nodeV1ClientCreator func(addr csiAddr) (
  93. nodeClient csipbv1.NodeClient,
  94. closer io.Closer,
  95. err error,
  96. )
  97. type nodeV0ClientCreator func(addr csiAddr) (
  98. nodeClient csipbv0.NodeClient,
  99. closer io.Closer,
  100. err error,
  101. )
  102. const (
  103. initialDuration = 1 * time.Second
  104. factor = 2.0
  105. steps = 5
  106. )
  107. // newV1NodeClient creates a new NodeClient with the internally used gRPC
  108. // connection set up. It also returns a closer which must to be called to close
  109. // the gRPC connection when the NodeClient is not used anymore.
  110. // This is the default implementation for the nodeV1ClientCreator, used in
  111. // newCsiDriverClient.
  112. func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
  113. var conn *grpc.ClientConn
  114. conn, err = newGrpcConn(addr)
  115. if err != nil {
  116. return nil, nil, err
  117. }
  118. nodeClient = csipbv1.NewNodeClient(conn)
  119. return nodeClient, conn, nil
  120. }
  121. // newV0NodeClient creates a new NodeClient with the internally used gRPC
  122. // connection set up. It also returns a closer which must to be called to close
  123. // the gRPC connection when the NodeClient is not used anymore.
  124. // This is the default implementation for the nodeV1ClientCreator, used in
  125. // newCsiDriverClient.
  126. func newV0NodeClient(addr csiAddr) (nodeClient csipbv0.NodeClient, closer io.Closer, err error) {
  127. var conn *grpc.ClientConn
  128. conn, err = newGrpcConn(addr)
  129. if err != nil {
  130. return nil, nil, err
  131. }
  132. nodeClient = csipbv0.NewNodeClient(conn)
  133. return nodeClient, conn, nil
  134. }
  135. func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
  136. if driverName == "" {
  137. return nil, fmt.Errorf("driver name is empty")
  138. }
  139. addr := fmt.Sprintf(csiAddrTemplate, driverName)
  140. requiresV0Client := true
  141. if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
  142. existingDriver, driverExists := csiDrivers.Get(string(driverName))
  143. if !driverExists {
  144. return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
  145. }
  146. addr = existingDriver.endpoint
  147. requiresV0Client = versionRequiresV0Client(existingDriver.highestSupportedVersion)
  148. }
  149. nodeV1ClientCreator := newV1NodeClient
  150. nodeV0ClientCreator := newV0NodeClient
  151. if requiresV0Client {
  152. nodeV1ClientCreator = nil
  153. } else {
  154. nodeV0ClientCreator = nil
  155. }
  156. return &csiDriverClient{
  157. driverName: driverName,
  158. addr: csiAddr(addr),
  159. nodeV1ClientCreator: nodeV1ClientCreator,
  160. nodeV0ClientCreator: nodeV0ClientCreator,
  161. }, nil
  162. }
  163. func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
  164. nodeID string,
  165. maxVolumePerNode int64,
  166. accessibleTopology map[string]string,
  167. err error) {
  168. klog.V(4).Info(log("calling NodeGetInfo rpc"))
  169. // TODO retries should happen at a lower layer (issue #73371)
  170. backoff := wait.Backoff{Duration: initialDuration, Factor: factor, Steps: steps}
  171. err = wait.ExponentialBackoff(backoff, func() (bool, error) {
  172. var getNodeInfoError error
  173. if c.nodeV1ClientCreator != nil {
  174. nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
  175. } else if c.nodeV0ClientCreator != nil {
  176. nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV0(ctx)
  177. }
  178. if nodeID != "" {
  179. return true, nil
  180. }
  181. // kubelet plugin registration service not implemented is a terminal error, no need to retry
  182. if strings.Contains(getNodeInfoError.Error(), "no handler registered for plugin type") {
  183. return false, getNodeInfoError
  184. }
  185. // Continue with exponential backoff
  186. return false, nil
  187. })
  188. return nodeID, maxVolumePerNode, accessibleTopology, err
  189. }
  190. func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
  191. nodeID string,
  192. maxVolumePerNode int64,
  193. accessibleTopology map[string]string,
  194. err error) {
  195. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  196. if err != nil {
  197. return "", 0, nil, err
  198. }
  199. defer closer.Close()
  200. res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
  201. if err != nil {
  202. return "", 0, nil, err
  203. }
  204. topology := res.GetAccessibleTopology()
  205. if topology != nil {
  206. accessibleTopology = topology.Segments
  207. }
  208. return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
  209. }
  210. func (c *csiDriverClient) nodeGetInfoV0(ctx context.Context) (
  211. nodeID string,
  212. maxVolumePerNode int64,
  213. accessibleTopology map[string]string,
  214. err error) {
  215. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  216. if err != nil {
  217. return "", 0, nil, err
  218. }
  219. defer closer.Close()
  220. res, err := nodeClient.NodeGetInfo(ctx, &csipbv0.NodeGetInfoRequest{})
  221. if err != nil {
  222. return "", 0, nil, err
  223. }
  224. topology := res.GetAccessibleTopology()
  225. if topology != nil {
  226. accessibleTopology = topology.Segments
  227. }
  228. return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
  229. }
  230. func (c *csiDriverClient) NodePublishVolume(
  231. ctx context.Context,
  232. volID string,
  233. readOnly bool,
  234. stagingTargetPath string,
  235. targetPath string,
  236. accessMode api.PersistentVolumeAccessMode,
  237. publishContext map[string]string,
  238. volumeContext map[string]string,
  239. secrets map[string]string,
  240. fsType string,
  241. mountOptions []string,
  242. ) error {
  243. klog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath))
  244. if volID == "" {
  245. return errors.New("missing volume id")
  246. }
  247. if targetPath == "" {
  248. return errors.New("missing target path")
  249. }
  250. if c.nodeV1ClientCreator != nil {
  251. return c.nodePublishVolumeV1(
  252. ctx,
  253. volID,
  254. readOnly,
  255. stagingTargetPath,
  256. targetPath,
  257. accessMode,
  258. publishContext,
  259. volumeContext,
  260. secrets,
  261. fsType,
  262. mountOptions,
  263. )
  264. } else if c.nodeV0ClientCreator != nil {
  265. return c.nodePublishVolumeV0(
  266. ctx,
  267. volID,
  268. readOnly,
  269. stagingTargetPath,
  270. targetPath,
  271. accessMode,
  272. publishContext,
  273. volumeContext,
  274. secrets,
  275. fsType,
  276. mountOptions,
  277. )
  278. }
  279. return fmt.Errorf("failed to call NodePublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  280. }
  281. func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
  282. if c.nodeV1ClientCreator == nil {
  283. return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
  284. }
  285. if volumeID == "" {
  286. return newSize, errors.New("missing volume id")
  287. }
  288. if volumePath == "" {
  289. return newSize, errors.New("missing volume path")
  290. }
  291. if newSize.Value() < 0 {
  292. return newSize, errors.New("size can not be less than 0")
  293. }
  294. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  295. if err != nil {
  296. return newSize, err
  297. }
  298. defer closer.Close()
  299. req := &csipbv1.NodeExpandVolumeRequest{
  300. VolumeId: volumeID,
  301. VolumePath: volumePath,
  302. CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
  303. }
  304. resp, err := nodeClient.NodeExpandVolume(ctx, req)
  305. if err != nil {
  306. return newSize, err
  307. }
  308. updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
  309. return *updatedQuantity, nil
  310. }
  311. func (c *csiDriverClient) nodePublishVolumeV1(
  312. ctx context.Context,
  313. volID string,
  314. readOnly bool,
  315. stagingTargetPath string,
  316. targetPath string,
  317. accessMode api.PersistentVolumeAccessMode,
  318. publishContext map[string]string,
  319. volumeContext map[string]string,
  320. secrets map[string]string,
  321. fsType string,
  322. mountOptions []string,
  323. ) error {
  324. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  325. if err != nil {
  326. return err
  327. }
  328. defer closer.Close()
  329. req := &csipbv1.NodePublishVolumeRequest{
  330. VolumeId: volID,
  331. TargetPath: targetPath,
  332. Readonly: readOnly,
  333. PublishContext: publishContext,
  334. VolumeContext: volumeContext,
  335. Secrets: secrets,
  336. VolumeCapability: &csipbv1.VolumeCapability{
  337. AccessMode: &csipbv1.VolumeCapability_AccessMode{
  338. Mode: asCSIAccessModeV1(accessMode),
  339. },
  340. },
  341. }
  342. if stagingTargetPath != "" {
  343. req.StagingTargetPath = stagingTargetPath
  344. }
  345. if fsType == fsTypeBlockName {
  346. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
  347. Block: &csipbv1.VolumeCapability_BlockVolume{},
  348. }
  349. } else {
  350. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
  351. Mount: &csipbv1.VolumeCapability_MountVolume{
  352. FsType: fsType,
  353. MountFlags: mountOptions,
  354. },
  355. }
  356. }
  357. _, err = nodeClient.NodePublishVolume(ctx, req)
  358. return err
  359. }
  360. func (c *csiDriverClient) nodePublishVolumeV0(
  361. ctx context.Context,
  362. volID string,
  363. readOnly bool,
  364. stagingTargetPath string,
  365. targetPath string,
  366. accessMode api.PersistentVolumeAccessMode,
  367. publishContext map[string]string,
  368. volumeContext map[string]string,
  369. secrets map[string]string,
  370. fsType string,
  371. mountOptions []string,
  372. ) error {
  373. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  374. if err != nil {
  375. return err
  376. }
  377. defer closer.Close()
  378. req := &csipbv0.NodePublishVolumeRequest{
  379. VolumeId: volID,
  380. TargetPath: targetPath,
  381. Readonly: readOnly,
  382. PublishInfo: publishContext,
  383. VolumeAttributes: volumeContext,
  384. NodePublishSecrets: secrets,
  385. VolumeCapability: &csipbv0.VolumeCapability{
  386. AccessMode: &csipbv0.VolumeCapability_AccessMode{
  387. Mode: asCSIAccessModeV0(accessMode),
  388. },
  389. },
  390. }
  391. if stagingTargetPath != "" {
  392. req.StagingTargetPath = stagingTargetPath
  393. }
  394. if fsType == fsTypeBlockName {
  395. req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
  396. Block: &csipbv0.VolumeCapability_BlockVolume{},
  397. }
  398. } else {
  399. req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
  400. Mount: &csipbv0.VolumeCapability_MountVolume{
  401. FsType: fsType,
  402. MountFlags: mountOptions,
  403. },
  404. }
  405. }
  406. _, err = nodeClient.NodePublishVolume(ctx, req)
  407. return err
  408. }
  409. func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
  410. klog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath))
  411. if volID == "" {
  412. return errors.New("missing volume id")
  413. }
  414. if targetPath == "" {
  415. return errors.New("missing target path")
  416. }
  417. if c.nodeV1ClientCreator != nil {
  418. return c.nodeUnpublishVolumeV1(ctx, volID, targetPath)
  419. } else if c.nodeV0ClientCreator != nil {
  420. return c.nodeUnpublishVolumeV0(ctx, volID, targetPath)
  421. }
  422. return fmt.Errorf("failed to call NodeUnpublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  423. }
  424. func (c *csiDriverClient) nodeUnpublishVolumeV1(ctx context.Context, volID string, targetPath string) error {
  425. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  426. if err != nil {
  427. return err
  428. }
  429. defer closer.Close()
  430. req := &csipbv1.NodeUnpublishVolumeRequest{
  431. VolumeId: volID,
  432. TargetPath: targetPath,
  433. }
  434. _, err = nodeClient.NodeUnpublishVolume(ctx, req)
  435. return err
  436. }
  437. func (c *csiDriverClient) nodeUnpublishVolumeV0(ctx context.Context, volID string, targetPath string) error {
  438. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  439. if err != nil {
  440. return err
  441. }
  442. defer closer.Close()
  443. req := &csipbv0.NodeUnpublishVolumeRequest{
  444. VolumeId: volID,
  445. TargetPath: targetPath,
  446. }
  447. _, err = nodeClient.NodeUnpublishVolume(ctx, req)
  448. return err
  449. }
  450. func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
  451. volID string,
  452. publishContext map[string]string,
  453. stagingTargetPath string,
  454. fsType string,
  455. accessMode api.PersistentVolumeAccessMode,
  456. secrets map[string]string,
  457. volumeContext map[string]string,
  458. mountOptions []string,
  459. ) error {
  460. klog.V(4).Info(log("calling NodeStageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
  461. if volID == "" {
  462. return errors.New("missing volume id")
  463. }
  464. if stagingTargetPath == "" {
  465. return errors.New("missing staging target path")
  466. }
  467. if c.nodeV1ClientCreator != nil {
  468. return c.nodeStageVolumeV1(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext, mountOptions)
  469. } else if c.nodeV0ClientCreator != nil {
  470. return c.nodeStageVolumeV0(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext, mountOptions)
  471. }
  472. return fmt.Errorf("failed to call NodeStageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  473. }
  474. func (c *csiDriverClient) nodeStageVolumeV1(
  475. ctx context.Context,
  476. volID string,
  477. publishContext map[string]string,
  478. stagingTargetPath string,
  479. fsType string,
  480. accessMode api.PersistentVolumeAccessMode,
  481. secrets map[string]string,
  482. volumeContext map[string]string,
  483. mountOptions []string,
  484. ) error {
  485. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  486. if err != nil {
  487. return err
  488. }
  489. defer closer.Close()
  490. req := &csipbv1.NodeStageVolumeRequest{
  491. VolumeId: volID,
  492. PublishContext: publishContext,
  493. StagingTargetPath: stagingTargetPath,
  494. VolumeCapability: &csipbv1.VolumeCapability{
  495. AccessMode: &csipbv1.VolumeCapability_AccessMode{
  496. Mode: asCSIAccessModeV1(accessMode),
  497. },
  498. },
  499. Secrets: secrets,
  500. VolumeContext: volumeContext,
  501. }
  502. if fsType == fsTypeBlockName {
  503. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
  504. Block: &csipbv1.VolumeCapability_BlockVolume{},
  505. }
  506. } else {
  507. req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
  508. Mount: &csipbv1.VolumeCapability_MountVolume{
  509. FsType: fsType,
  510. MountFlags: mountOptions,
  511. },
  512. }
  513. }
  514. _, err = nodeClient.NodeStageVolume(ctx, req)
  515. return err
  516. }
  517. func (c *csiDriverClient) nodeStageVolumeV0(
  518. ctx context.Context,
  519. volID string,
  520. publishContext map[string]string,
  521. stagingTargetPath string,
  522. fsType string,
  523. accessMode api.PersistentVolumeAccessMode,
  524. secrets map[string]string,
  525. volumeContext map[string]string,
  526. mountOptions []string,
  527. ) error {
  528. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  529. if err != nil {
  530. return err
  531. }
  532. defer closer.Close()
  533. req := &csipbv0.NodeStageVolumeRequest{
  534. VolumeId: volID,
  535. PublishInfo: publishContext,
  536. StagingTargetPath: stagingTargetPath,
  537. VolumeCapability: &csipbv0.VolumeCapability{
  538. AccessMode: &csipbv0.VolumeCapability_AccessMode{
  539. Mode: asCSIAccessModeV0(accessMode),
  540. },
  541. },
  542. NodeStageSecrets: secrets,
  543. VolumeAttributes: volumeContext,
  544. }
  545. if fsType == fsTypeBlockName {
  546. req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
  547. Block: &csipbv0.VolumeCapability_BlockVolume{},
  548. }
  549. } else {
  550. req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
  551. Mount: &csipbv0.VolumeCapability_MountVolume{
  552. FsType: fsType,
  553. MountFlags: mountOptions,
  554. },
  555. }
  556. }
  557. _, err = nodeClient.NodeStageVolume(ctx, req)
  558. return err
  559. }
  560. func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
  561. klog.V(4).Info(log("calling NodeUnstageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
  562. if volID == "" {
  563. return errors.New("missing volume id")
  564. }
  565. if stagingTargetPath == "" {
  566. return errors.New("missing staging target path")
  567. }
  568. if c.nodeV1ClientCreator != nil {
  569. return c.nodeUnstageVolumeV1(ctx, volID, stagingTargetPath)
  570. } else if c.nodeV0ClientCreator != nil {
  571. return c.nodeUnstageVolumeV0(ctx, volID, stagingTargetPath)
  572. }
  573. return fmt.Errorf("failed to call NodeUnstageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  574. }
  575. func (c *csiDriverClient) nodeUnstageVolumeV1(ctx context.Context, volID, stagingTargetPath string) error {
  576. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  577. if err != nil {
  578. return err
  579. }
  580. defer closer.Close()
  581. req := &csipbv1.NodeUnstageVolumeRequest{
  582. VolumeId: volID,
  583. StagingTargetPath: stagingTargetPath,
  584. }
  585. _, err = nodeClient.NodeUnstageVolume(ctx, req)
  586. return err
  587. }
  588. func (c *csiDriverClient) nodeUnstageVolumeV0(ctx context.Context, volID, stagingTargetPath string) error {
  589. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  590. if err != nil {
  591. return err
  592. }
  593. defer closer.Close()
  594. req := &csipbv0.NodeUnstageVolumeRequest{
  595. VolumeId: volID,
  596. StagingTargetPath: stagingTargetPath,
  597. }
  598. _, err = nodeClient.NodeUnstageVolume(ctx, req)
  599. return err
  600. }
  601. func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
  602. klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
  603. if c.nodeV1ClientCreator != nil {
  604. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  605. if err != nil {
  606. return false, err
  607. }
  608. defer closer.Close()
  609. req := &csipbv1.NodeGetCapabilitiesRequest{}
  610. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  611. if err != nil {
  612. return false, err
  613. }
  614. capabilities := resp.GetCapabilities()
  615. if capabilities == nil {
  616. return false, nil
  617. }
  618. for _, capability := range capabilities {
  619. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
  620. return true, nil
  621. }
  622. }
  623. return false, nil
  624. } else if c.nodeV0ClientCreator != nil {
  625. return false, nil
  626. }
  627. return false, fmt.Errorf("failed to call NodeSupportsNodeExpand. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  628. }
  629. func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
  630. klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
  631. if c.nodeV1ClientCreator != nil {
  632. return c.nodeSupportsStageUnstageV1(ctx)
  633. } else if c.nodeV0ClientCreator != nil {
  634. return c.nodeSupportsStageUnstageV0(ctx)
  635. }
  636. return false, fmt.Errorf("failed to call NodeSupportsStageUnstage. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
  637. }
  638. func (c *csiDriverClient) nodeSupportsStageUnstageV1(ctx context.Context) (bool, error) {
  639. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  640. if err != nil {
  641. return false, err
  642. }
  643. defer closer.Close()
  644. req := &csipbv1.NodeGetCapabilitiesRequest{}
  645. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  646. if err != nil {
  647. return false, err
  648. }
  649. capabilities := resp.GetCapabilities()
  650. stageUnstageSet := false
  651. if capabilities == nil {
  652. return false, nil
  653. }
  654. for _, capability := range capabilities {
  655. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
  656. stageUnstageSet = true
  657. }
  658. }
  659. return stageUnstageSet, nil
  660. }
  661. func (c *csiDriverClient) nodeSupportsStageUnstageV0(ctx context.Context) (bool, error) {
  662. nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
  663. if err != nil {
  664. return false, err
  665. }
  666. defer closer.Close()
  667. req := &csipbv0.NodeGetCapabilitiesRequest{}
  668. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  669. if err != nil {
  670. return false, err
  671. }
  672. capabilities := resp.GetCapabilities()
  673. stageUnstageSet := false
  674. if capabilities == nil {
  675. return false, nil
  676. }
  677. for _, capability := range capabilities {
  678. if capability.GetRpc().GetType() == csipbv0.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
  679. stageUnstageSet = true
  680. }
  681. }
  682. return stageUnstageSet, nil
  683. }
  684. func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
  685. switch am {
  686. case api.ReadWriteOnce:
  687. return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
  688. case api.ReadOnlyMany:
  689. return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
  690. case api.ReadWriteMany:
  691. return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
  692. }
  693. return csipbv1.VolumeCapability_AccessMode_UNKNOWN
  694. }
  695. func asCSIAccessModeV0(am api.PersistentVolumeAccessMode) csipbv0.VolumeCapability_AccessMode_Mode {
  696. switch am {
  697. case api.ReadWriteOnce:
  698. return csipbv0.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
  699. case api.ReadOnlyMany:
  700. return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
  701. case api.ReadWriteMany:
  702. return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
  703. }
  704. return csipbv0.VolumeCapability_AccessMode_UNKNOWN
  705. }
  706. func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
  707. network := "unix"
  708. klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
  709. return grpc.Dial(
  710. string(addr),
  711. grpc.WithInsecure(),
  712. grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
  713. return net.Dial(network, target)
  714. }),
  715. )
  716. }
  717. func versionRequiresV0Client(version *utilversion.Version) bool {
  718. if version != nil && version.Major() == 0 {
  719. return true
  720. }
  721. return false
  722. }
  723. // CSI client getter with cache.
  724. // This provides a method to initialize CSI client with driver name and caches
  725. // it for later use. When CSI clients have not been discovered yet (e.g.
  726. // on kubelet restart), client initialization will fail. Users of CSI client (e.g.
  727. // mounter manager and block mapper) can use this to delay CSI client
  728. // initialization until needed.
  729. type csiClientGetter struct {
  730. sync.RWMutex
  731. csiClient csiClient
  732. driverName csiDriverName
  733. }
  734. func (c *csiClientGetter) Get() (csiClient, error) {
  735. c.RLock()
  736. if c.csiClient != nil {
  737. c.RUnlock()
  738. return c.csiClient, nil
  739. }
  740. c.RUnlock()
  741. c.Lock()
  742. defer c.Unlock()
  743. // Double-checking locking criterion.
  744. if c.csiClient != nil {
  745. return c.csiClient, nil
  746. }
  747. csi, err := newCsiDriverClient(c.driverName)
  748. if err != nil {
  749. return nil, err
  750. }
  751. c.csiClient = csi
  752. return c.csiClient, nil
  753. }
  754. func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
  755. klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
  756. if c.nodeV1ClientCreator != nil {
  757. return c.nodeSupportsVolumeStatsV1(ctx)
  758. }
  759. return false, fmt.Errorf("failed to call NodeSupportsVolumeStats. nodeV1ClientCreator is nil")
  760. }
  761. func (c *csiDriverClient) nodeSupportsVolumeStatsV1(ctx context.Context) (bool, error) {
  762. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  763. if err != nil {
  764. return false, err
  765. }
  766. defer closer.Close()
  767. req := &csipbv1.NodeGetCapabilitiesRequest{}
  768. resp, err := nodeClient.NodeGetCapabilities(ctx, req)
  769. if err != nil {
  770. return false, err
  771. }
  772. capabilities := resp.GetCapabilities()
  773. if capabilities == nil {
  774. return false, nil
  775. }
  776. for _, capability := range capabilities {
  777. if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
  778. return true, nil
  779. }
  780. }
  781. return false, nil
  782. }
  783. func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
  784. klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
  785. if volID == "" {
  786. return nil, errors.New("missing volume id")
  787. }
  788. if targetPath == "" {
  789. return nil, errors.New("missing target path")
  790. }
  791. if c.nodeV1ClientCreator != nil {
  792. return c.nodeGetVolumeStatsV1(ctx, volID, targetPath)
  793. }
  794. return nil, fmt.Errorf("failed to call NodeGetVolumeStats. nodeV1ClientCreator is nil")
  795. }
  796. func (c *csiDriverClient) nodeGetVolumeStatsV1(
  797. ctx context.Context,
  798. volID string,
  799. targetPath string,
  800. ) (*volume.Metrics, error) {
  801. nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
  802. if err != nil {
  803. return nil, err
  804. }
  805. defer closer.Close()
  806. req := &csipbv1.NodeGetVolumeStatsRequest{
  807. VolumeId: volID,
  808. VolumePath: targetPath,
  809. }
  810. resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
  811. if err != nil {
  812. return nil, err
  813. }
  814. usages := resp.GetUsage()
  815. if usages == nil {
  816. return nil, fmt.Errorf("failed to get usage from response. usage is nil")
  817. }
  818. metrics := &volume.Metrics{
  819. Used: resource.NewQuantity(int64(0), resource.BinarySI),
  820. Capacity: resource.NewQuantity(int64(0), resource.BinarySI),
  821. Available: resource.NewQuantity(int64(0), resource.BinarySI),
  822. InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
  823. Inodes: resource.NewQuantity(int64(0), resource.BinarySI),
  824. InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
  825. }
  826. for _, usage := range usages {
  827. unit := usage.GetUnit()
  828. switch unit {
  829. case csipbv1.VolumeUsage_BYTES:
  830. metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
  831. metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
  832. metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
  833. case csipbv1.VolumeUsage_INODES:
  834. metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
  835. metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
  836. metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
  837. default:
  838. klog.Errorf("unknown key %s in usage", unit.String())
  839. }
  840. }
  841. return metrics, nil
  842. }