csi_attacher.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "errors"
  18. "fmt"
  19. "os"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. "k8s.io/klog"
  24. v1 "k8s.io/api/core/v1"
  25. storage "k8s.io/api/storage/v1"
  26. apierrs "k8s.io/apimachinery/pkg/api/errors"
  27. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/watch"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/kubernetes/pkg/volume"
  32. )
  33. const (
  34. persistentVolumeInGlobalPath = "pv"
  35. globalMountInGlobalPath = "globalmount"
  36. )
  37. type csiAttacher struct {
  38. plugin *csiPlugin
  39. k8s kubernetes.Interface
  40. waitSleepTime time.Duration
  41. csiClient csiClient
  42. }
  43. // volume.Attacher methods
  44. var _ volume.Attacher = &csiAttacher{}
  45. var _ volume.Detacher = &csiAttacher{}
  46. var _ volume.DeviceMounter = &csiAttacher{}
  47. func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  48. if spec == nil {
  49. klog.Error(log("attacher.Attach missing volume.Spec"))
  50. return "", errors.New("missing spec")
  51. }
  52. pvSrc, err := getPVSourceFromSpec(spec)
  53. if err != nil {
  54. klog.Error(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err))
  55. return "", err
  56. }
  57. node := string(nodeName)
  58. attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
  59. var vaSrc storage.VolumeAttachmentSource
  60. if spec.InlineVolumeSpecForCSIMigration {
  61. // inline PV scenario - use PV spec to populate VA source.
  62. // The volume spec will be populated by CSI translation API
  63. // for inline volumes. This allows fields required by the CSI
  64. // attacher such as AccessMode and MountOptions (in addition to
  65. // fields in the CSI persistent volume source) to be populated
  66. // as part of CSI translation for inline volumes.
  67. vaSrc = storage.VolumeAttachmentSource{
  68. InlineVolumeSpec: &spec.PersistentVolume.Spec,
  69. }
  70. } else {
  71. // regular PV scenario - use PV name to populate VA source
  72. pvName := spec.PersistentVolume.GetName()
  73. vaSrc = storage.VolumeAttachmentSource{
  74. PersistentVolumeName: &pvName,
  75. }
  76. }
  77. attachment := &storage.VolumeAttachment{
  78. ObjectMeta: meta.ObjectMeta{
  79. Name: attachID,
  80. },
  81. Spec: storage.VolumeAttachmentSpec{
  82. NodeName: node,
  83. Attacher: pvSrc.Driver,
  84. Source: vaSrc,
  85. },
  86. }
  87. _, err = c.k8s.StorageV1().VolumeAttachments().Create(attachment)
  88. alreadyExist := false
  89. if err != nil {
  90. if !apierrs.IsAlreadyExists(err) {
  91. klog.Error(log("attacher.Attach failed: %v", err))
  92. return "", err
  93. }
  94. alreadyExist = true
  95. }
  96. if alreadyExist {
  97. klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
  98. } else {
  99. klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
  100. }
  101. if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil {
  102. return "", err
  103. }
  104. klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
  105. // Don't return attachID as a devicePath. We can reconstruct the attachID using getAttachmentName()
  106. return "", nil
  107. }
  108. func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
  109. source, err := getPVSourceFromSpec(spec)
  110. if err != nil {
  111. klog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
  112. return "", err
  113. }
  114. attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
  115. return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
  116. }
  117. func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
  118. klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
  119. timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
  120. defer timer.Stop()
  121. return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
  122. }
  123. func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
  124. klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
  125. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
  126. if err != nil {
  127. klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
  128. return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
  129. }
  130. successful, err := verifyAttachmentStatus(attach, volumeHandle)
  131. if err != nil {
  132. return "", err
  133. }
  134. if successful {
  135. return attachID, nil
  136. }
  137. watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
  138. if err != nil {
  139. return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
  140. }
  141. ch := watcher.ResultChan()
  142. defer watcher.Stop()
  143. for {
  144. select {
  145. case event, ok := <-ch:
  146. if !ok {
  147. klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
  148. return "", errors.New("volume attachment watch channel had been closed")
  149. }
  150. switch event.Type {
  151. case watch.Added, watch.Modified:
  152. attach, _ := event.Object.(*storage.VolumeAttachment)
  153. successful, err := verifyAttachmentStatus(attach, volumeHandle)
  154. if err != nil {
  155. return "", err
  156. }
  157. if successful {
  158. return attachID, nil
  159. }
  160. case watch.Deleted:
  161. // if deleted, fail fast
  162. klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID))
  163. return "", errors.New("volume attachment has been deleted")
  164. case watch.Error:
  165. // start another cycle
  166. c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
  167. }
  168. case <-timer.C:
  169. klog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
  170. return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
  171. }
  172. }
  173. }
  174. func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
  175. // if being deleted, fail fast
  176. if attachment.GetDeletionTimestamp() != nil {
  177. klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
  178. return false, errors.New("volume attachment is being deleted")
  179. }
  180. // attachment OK
  181. if attachment.Status.Attached {
  182. return true, nil
  183. }
  184. // driver reports attach error
  185. attachErr := attachment.Status.AttachError
  186. if attachErr != nil {
  187. klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
  188. return false, errors.New(attachErr.Message)
  189. }
  190. return false, nil
  191. }
  192. func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  193. klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
  194. attached := make(map[*volume.Spec]bool)
  195. for _, spec := range specs {
  196. if spec == nil {
  197. klog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
  198. return nil, errors.New("missing spec")
  199. }
  200. pvSrc, err := getPVSourceFromSpec(spec)
  201. if err != nil {
  202. attached[spec] = false
  203. klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err))
  204. continue
  205. }
  206. driverName := pvSrc.Driver
  207. volumeHandle := pvSrc.VolumeHandle
  208. skip, err := c.plugin.skipAttach(driverName)
  209. if err != nil {
  210. klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err))
  211. } else {
  212. if skip {
  213. // This volume is not attachable, pretend it's attached
  214. attached[spec] = true
  215. continue
  216. }
  217. }
  218. attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
  219. klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
  220. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
  221. if err != nil {
  222. attached[spec] = false
  223. klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
  224. continue
  225. }
  226. klog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached))
  227. attached[spec] = attach.Status.Attached
  228. }
  229. return attached, nil
  230. }
  231. func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
  232. klog.V(4).Info(log("attacher.GetDeviceMountPath(%v)", spec))
  233. deviceMountPath, err := makeDeviceMountPath(c.plugin, spec)
  234. if err != nil {
  235. klog.Error(log("attacher.GetDeviceMountPath failed to make device mount path: %v", err))
  236. return "", err
  237. }
  238. klog.V(4).Infof("attacher.GetDeviceMountPath succeeded, deviceMountPath: %s", deviceMountPath)
  239. return deviceMountPath, nil
  240. }
  241. func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) (err error) {
  242. klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
  243. if deviceMountPath == "" {
  244. err = fmt.Errorf("attacher.MountDevice failed, deviceMountPath is empty")
  245. return err
  246. }
  247. mounted, err := isDirMounted(c.plugin, deviceMountPath)
  248. if err != nil {
  249. klog.Error(log("attacher.MountDevice failed while checking mount status for dir [%s]", deviceMountPath))
  250. return err
  251. }
  252. if mounted {
  253. klog.V(4).Info(log("attacher.MountDevice skipping mount, dir already mounted [%s]", deviceMountPath))
  254. return nil
  255. }
  256. // Setup
  257. if spec == nil {
  258. return fmt.Errorf("attacher.MountDevice failed, spec is nil")
  259. }
  260. csiSource, err := getPVSourceFromSpec(spec)
  261. if err != nil {
  262. klog.Error(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
  263. return err
  264. }
  265. // Store volume metadata for UnmountDevice. Keep it around even if the
  266. // driver does not support NodeStage, UnmountDevice still needs it.
  267. if err = os.MkdirAll(deviceMountPath, 0750); err != nil {
  268. klog.Error(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
  269. return err
  270. }
  271. klog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
  272. dataDir := filepath.Dir(deviceMountPath)
  273. data := map[string]string{
  274. volDataKey.volHandle: csiSource.VolumeHandle,
  275. volDataKey.driverName: csiSource.Driver,
  276. }
  277. if err = saveVolumeData(dataDir, volDataFileName, data); err != nil {
  278. klog.Error(log("failed to save volume info data: %v", err))
  279. if cleanerr := os.RemoveAll(dataDir); err != nil {
  280. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanerr))
  281. }
  282. return err
  283. }
  284. defer func() {
  285. if err != nil {
  286. // clean up metadata
  287. klog.Errorf(log("attacher.MountDevice failed: %v", err))
  288. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  289. klog.Error(log("attacher.MountDevice failed to remove mount dir after errir [%s]: %v", deviceMountPath, err))
  290. }
  291. }
  292. }()
  293. if c.csiClient == nil {
  294. c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
  295. if err != nil {
  296. klog.Errorf(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
  297. return err
  298. }
  299. }
  300. csi := c.csiClient
  301. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  302. defer cancel()
  303. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  304. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  305. if err != nil {
  306. return err
  307. }
  308. if !stageUnstageSet {
  309. klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
  310. // defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
  311. return nil
  312. }
  313. // Start MountDevice
  314. nodeName := string(c.plugin.host.GetNodeName())
  315. publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
  316. nodeStageSecrets := map[string]string{}
  317. if csiSource.NodeStageSecretRef != nil {
  318. nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
  319. if err != nil {
  320. err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
  321. csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
  322. return err
  323. }
  324. }
  325. //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  326. accessMode := v1.ReadWriteOnce
  327. if spec.PersistentVolume.Spec.AccessModes != nil {
  328. accessMode = spec.PersistentVolume.Spec.AccessModes[0]
  329. }
  330. var mountOptions []string
  331. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.MountOptions != nil {
  332. mountOptions = spec.PersistentVolume.Spec.MountOptions
  333. }
  334. fsType := csiSource.FSType
  335. err = csi.NodeStageVolume(ctx,
  336. csiSource.VolumeHandle,
  337. publishContext,
  338. deviceMountPath,
  339. fsType,
  340. accessMode,
  341. nodeStageSecrets,
  342. csiSource.VolumeAttributes,
  343. mountOptions)
  344. if err != nil {
  345. return err
  346. }
  347. klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
  348. return nil
  349. }
  350. var _ volume.Detacher = &csiAttacher{}
  351. var _ volume.DeviceUnmounter = &csiAttacher{}
  352. func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
  353. var attachID string
  354. var volID string
  355. if volumeName == "" {
  356. klog.Error(log("detacher.Detach missing value for parameter volumeName"))
  357. return errors.New("missing expected parameter volumeName")
  358. }
  359. if isAttachmentName(volumeName) {
  360. // Detach can also be called with the attach ID as the `volumeName`. This codepath is
  361. // hit only when we have migrated an in-tree volume to CSI and the A/D Controller is shut
  362. // down, the pod with the volume is deleted, and the A/D Controller starts back up in that
  363. // order.
  364. attachID = volumeName
  365. // Vol ID should be the volume handle, except that is not available here.
  366. // It is only used in log messages so in the event that this happens log messages will be
  367. // printing out the attachID instead of the volume handle.
  368. volID = volumeName
  369. } else {
  370. // volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
  371. parts := strings.Split(volumeName, volNameSep)
  372. if len(parts) != 2 {
  373. klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
  374. return errors.New("volumeName missing expected data")
  375. }
  376. driverName := parts[0]
  377. volID = parts[1]
  378. attachID = getAttachmentName(volID, driverName, string(nodeName))
  379. }
  380. if err := c.k8s.StorageV1().VolumeAttachments().Delete(attachID, nil); err != nil {
  381. if apierrs.IsNotFound(err) {
  382. // object deleted or never existed, done
  383. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
  384. return nil
  385. }
  386. klog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
  387. return err
  388. }
  389. klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
  390. return c.waitForVolumeDetachment(volID, attachID)
  391. }
  392. func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
  393. klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
  394. timeout := c.waitSleepTime * 10
  395. timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
  396. defer timer.Stop()
  397. return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
  398. }
  399. func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error {
  400. klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
  401. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
  402. if err != nil {
  403. if apierrs.IsNotFound(err) {
  404. //object deleted or never existed, done
  405. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
  406. return nil
  407. }
  408. klog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
  409. return err
  410. }
  411. // driver reports attach error
  412. detachErr := attach.Status.DetachError
  413. if detachErr != nil {
  414. klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
  415. return errors.New(detachErr.Message)
  416. }
  417. watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
  418. if err != nil {
  419. return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
  420. }
  421. ch := watcher.ResultChan()
  422. defer watcher.Stop()
  423. for {
  424. select {
  425. case event, ok := <-ch:
  426. if !ok {
  427. klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
  428. return errors.New("volume attachment watch channel had been closed")
  429. }
  430. switch event.Type {
  431. case watch.Added, watch.Modified:
  432. attach, _ := event.Object.(*storage.VolumeAttachment)
  433. // driver reports attach error
  434. detachErr := attach.Status.DetachError
  435. if detachErr != nil {
  436. klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
  437. return errors.New(detachErr.Message)
  438. }
  439. case watch.Deleted:
  440. //object deleted
  441. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
  442. return nil
  443. case watch.Error:
  444. // start another cycle
  445. c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
  446. }
  447. case <-timer.C:
  448. klog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
  449. return fmt.Errorf("detachment timeout for volume %v", volumeHandle)
  450. }
  451. }
  452. }
  453. func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
  454. klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
  455. // Setup
  456. var driverName, volID string
  457. dataDir := filepath.Dir(deviceMountPath)
  458. data, err := loadVolumeData(dataDir, volDataFileName)
  459. if err == nil {
  460. driverName = data[volDataKey.driverName]
  461. volID = data[volDataKey.volHandle]
  462. } else {
  463. klog.Error(log("UnmountDevice failed to load volume data file [%s]: %v", dataDir, err))
  464. // The volume might have been mounted by old CSI volume plugin. Fall back to the old behavior: read PV from API server
  465. driverName, volID, err = getDriverAndVolNameFromDeviceMountPath(c.k8s, deviceMountPath)
  466. if err != nil {
  467. klog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
  468. return err
  469. }
  470. }
  471. if c.csiClient == nil {
  472. c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
  473. if err != nil {
  474. klog.Errorf(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
  475. return err
  476. }
  477. }
  478. csi := c.csiClient
  479. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  480. defer cancel()
  481. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  482. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  483. if err != nil {
  484. klog.Errorf(log("attacher.UnmountDevice failed to check whether STAGE_UNSTAGE_VOLUME set: %v", err))
  485. return err
  486. }
  487. if !stageUnstageSet {
  488. klog.Infof(log("attacher.UnmountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping UnmountDevice..."))
  489. // Just delete the global directory + json file
  490. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  491. return fmt.Errorf("failed to clean up gloubal mount %s: %s", dataDir, err)
  492. }
  493. return nil
  494. }
  495. // Start UnmountDevice
  496. err = csi.NodeUnstageVolume(ctx,
  497. volID,
  498. deviceMountPath)
  499. if err != nil {
  500. klog.Errorf(log("attacher.UnmountDevice failed: %v", err))
  501. return err
  502. }
  503. // Delete the global directory + json file
  504. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  505. return fmt.Errorf("failed to clean up gloubal mount %s: %s", dataDir, err)
  506. }
  507. klog.V(4).Infof(log("attacher.UnmountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
  508. return nil
  509. }
  510. // getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
  511. func getAttachmentName(volName, csiDriverName, nodeName string) string {
  512. result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
  513. return fmt.Sprintf("csi-%x", result)
  514. }
  515. // isAttachmentName returns true if the string given is of the form of an Attach ID
  516. // and false otherwise
  517. func isAttachmentName(unknownString string) bool {
  518. // 68 == "csi-" + len(sha256hash)
  519. if strings.HasPrefix(unknownString, "csi-") && len(unknownString) == 68 {
  520. return true
  521. }
  522. return false
  523. }
  524. func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
  525. if spec == nil {
  526. return "", fmt.Errorf("makeDeviceMountPath failed, spec is nil")
  527. }
  528. pvName := spec.PersistentVolume.Name
  529. if pvName == "" {
  530. return "", fmt.Errorf("makeDeviceMountPath failed, pv name empty")
  531. }
  532. return filepath.Join(plugin.host.GetPluginDir(plugin.GetPluginName()), persistentVolumeInGlobalPath, pvName, globalMountInGlobalPath), nil
  533. }
  534. func getDriverAndVolNameFromDeviceMountPath(k8s kubernetes.Interface, deviceMountPath string) (string, string, error) {
  535. // deviceMountPath structure: /var/lib/kubelet/plugins/kubernetes.io/csi/pv/{pvname}/globalmount
  536. dir := filepath.Dir(deviceMountPath)
  537. if file := filepath.Base(deviceMountPath); file != globalMountInGlobalPath {
  538. return "", "", fmt.Errorf("getDriverAndVolNameFromDeviceMountPath failed, path did not end in %s", globalMountInGlobalPath)
  539. }
  540. // dir is now /var/lib/kubelet/plugins/kubernetes.io/csi/pv/{pvname}
  541. pvName := filepath.Base(dir)
  542. // Get PV and check for errors
  543. pv, err := k8s.CoreV1().PersistentVolumes().Get(pvName, meta.GetOptions{})
  544. if err != nil {
  545. return "", "", err
  546. }
  547. if pv == nil || pv.Spec.CSI == nil {
  548. return "", "", fmt.Errorf("getDriverAndVolNameFromDeviceMountPath could not find CSI Persistent Volume Source for pv: %s", pvName)
  549. }
  550. // Get VolumeHandle and PluginName from pv
  551. csiSource := pv.Spec.CSI
  552. if csiSource.Driver == "" {
  553. return "", "", fmt.Errorf("getDriverAndVolNameFromDeviceMountPath failed, driver name empty")
  554. }
  555. if csiSource.VolumeHandle == "" {
  556. return "", "", fmt.Errorf("getDriverAndVolNameFromDeviceMountPath failed, VolumeHandle empty")
  557. }
  558. return csiSource.Driver, csiSource.VolumeHandle, nil
  559. }