csi_attacher.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "errors"
  18. "fmt"
  19. "os"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. "k8s.io/klog"
  24. "k8s.io/api/core/v1"
  25. storage "k8s.io/api/storage/v1"
  26. apierrors "k8s.io/apimachinery/pkg/api/errors"
  27. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/watch"
  31. "k8s.io/client-go/kubernetes"
  32. "k8s.io/kubernetes/pkg/volume"
  33. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  34. )
  35. const (
  36. persistentVolumeInGlobalPath = "pv"
  37. globalMountInGlobalPath = "globalmount"
  38. )
  39. type csiAttacher struct {
  40. plugin *csiPlugin
  41. k8s kubernetes.Interface
  42. waitSleepTime time.Duration
  43. csiClient csiClient
  44. }
  45. type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
  46. // volume.Attacher methods
  47. var _ volume.Attacher = &csiAttacher{}
  48. var _ volume.Detacher = &csiAttacher{}
  49. var _ volume.DeviceMounter = &csiAttacher{}
  50. func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  51. if spec == nil {
  52. klog.Error(log("attacher.Attach missing volume.Spec"))
  53. return "", errors.New("missing spec")
  54. }
  55. pvSrc, err := getPVSourceFromSpec(spec)
  56. if err != nil {
  57. return "", errors.New(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err))
  58. }
  59. node := string(nodeName)
  60. attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
  61. var vaSrc storage.VolumeAttachmentSource
  62. if spec.InlineVolumeSpecForCSIMigration {
  63. // inline PV scenario - use PV spec to populate VA source.
  64. // The volume spec will be populated by CSI translation API
  65. // for inline volumes. This allows fields required by the CSI
  66. // attacher such as AccessMode and MountOptions (in addition to
  67. // fields in the CSI persistent volume source) to be populated
  68. // as part of CSI translation for inline volumes.
  69. vaSrc = storage.VolumeAttachmentSource{
  70. InlineVolumeSpec: &spec.PersistentVolume.Spec,
  71. }
  72. } else {
  73. // regular PV scenario - use PV name to populate VA source
  74. pvName := spec.PersistentVolume.GetName()
  75. vaSrc = storage.VolumeAttachmentSource{
  76. PersistentVolumeName: &pvName,
  77. }
  78. }
  79. attachment := &storage.VolumeAttachment{
  80. ObjectMeta: meta.ObjectMeta{
  81. Name: attachID,
  82. },
  83. Spec: storage.VolumeAttachmentSpec{
  84. NodeName: node,
  85. Attacher: pvSrc.Driver,
  86. Source: vaSrc,
  87. },
  88. }
  89. _, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
  90. alreadyExist := false
  91. if err != nil {
  92. if !apierrors.IsAlreadyExists(err) {
  93. return "", errors.New(log("attacher.Attach failed: %v", err))
  94. }
  95. alreadyExist = true
  96. }
  97. if alreadyExist {
  98. klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
  99. } else {
  100. klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
  101. }
  102. if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil {
  103. return "", err
  104. }
  105. klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
  106. // Don't return attachID as a devicePath. We can reconstruct the attachID using getAttachmentName()
  107. return "", nil
  108. }
  109. func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
  110. source, err := getPVSourceFromSpec(spec)
  111. if err != nil {
  112. return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
  113. }
  114. attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
  115. return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
  116. }
  117. func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
  118. klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
  119. timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
  120. defer timer.Stop()
  121. return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
  122. }
  123. func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
  124. klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
  125. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
  126. if err != nil {
  127. klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
  128. return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
  129. }
  130. err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
  131. if err != nil {
  132. return "", err
  133. }
  134. return attach.Name, nil
  135. }
  136. func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  137. klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
  138. attached := make(map[*volume.Spec]bool)
  139. for _, spec := range specs {
  140. if spec == nil {
  141. klog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
  142. return nil, errors.New("missing spec")
  143. }
  144. pvSrc, err := getPVSourceFromSpec(spec)
  145. if err != nil {
  146. attached[spec] = false
  147. klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err))
  148. continue
  149. }
  150. driverName := pvSrc.Driver
  151. volumeHandle := pvSrc.VolumeHandle
  152. skip, err := c.plugin.skipAttach(driverName)
  153. if err != nil {
  154. klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err))
  155. } else {
  156. if skip {
  157. // This volume is not attachable, pretend it's attached
  158. attached[spec] = true
  159. continue
  160. }
  161. }
  162. attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
  163. klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
  164. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
  165. if err != nil {
  166. attached[spec] = false
  167. klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
  168. continue
  169. }
  170. klog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached))
  171. attached[spec] = attach.Status.Attached
  172. }
  173. return attached, nil
  174. }
  175. func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
  176. klog.V(4).Info(log("attacher.GetDeviceMountPath(%v)", spec))
  177. deviceMountPath, err := makeDeviceMountPath(c.plugin, spec)
  178. if err != nil {
  179. return "", errors.New(log("attacher.GetDeviceMountPath failed to make device mount path: %v", err))
  180. }
  181. klog.V(4).Infof("attacher.GetDeviceMountPath succeeded, deviceMountPath: %s", deviceMountPath)
  182. return deviceMountPath, nil
  183. }
  184. func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  185. klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
  186. if deviceMountPath == "" {
  187. return errors.New(log("attacher.MountDevice failed, deviceMountPath is empty"))
  188. }
  189. mounted, err := isDirMounted(c.plugin, deviceMountPath)
  190. if err != nil {
  191. klog.Error(log("attacher.MountDevice failed while checking mount status for dir [%s]", deviceMountPath))
  192. return err
  193. }
  194. if mounted {
  195. klog.V(4).Info(log("attacher.MountDevice skipping mount, dir already mounted [%s]", deviceMountPath))
  196. return nil
  197. }
  198. // Setup
  199. if spec == nil {
  200. return errors.New(log("attacher.MountDevice failed, spec is nil"))
  201. }
  202. csiSource, err := getPVSourceFromSpec(spec)
  203. if err != nil {
  204. return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
  205. }
  206. // lets check if node/unstage is supported
  207. if c.csiClient == nil {
  208. c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
  209. if err != nil {
  210. return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
  211. }
  212. }
  213. csi := c.csiClient
  214. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  215. defer cancel()
  216. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  217. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  218. if err != nil {
  219. return err
  220. }
  221. // Get secrets and publish context required for mountDevice
  222. nodeName := string(c.plugin.host.GetNodeName())
  223. publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
  224. if err != nil {
  225. return volumetypes.NewTransientOperationFailure(err.Error())
  226. }
  227. nodeStageSecrets := map[string]string{}
  228. // we only require secrets if csiSource has them and volume has NodeStage capability
  229. if csiSource.NodeStageSecretRef != nil && stageUnstageSet {
  230. nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
  231. if err != nil {
  232. err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
  233. csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
  234. // if we failed to fetch secret then that could be a transient error
  235. return volumetypes.NewTransientOperationFailure(err.Error())
  236. }
  237. }
  238. // Store volume metadata for UnmountDevice. Keep it around even if the
  239. // driver does not support NodeStage, UnmountDevice still needs it.
  240. if err = os.MkdirAll(deviceMountPath, 0750); err != nil {
  241. return errors.New(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
  242. }
  243. klog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
  244. dataDir := filepath.Dir(deviceMountPath)
  245. data := map[string]string{
  246. volDataKey.volHandle: csiSource.VolumeHandle,
  247. volDataKey.driverName: csiSource.Driver,
  248. }
  249. if err = saveVolumeData(dataDir, volDataFileName, data); err != nil {
  250. klog.Error(log("failed to save volume info data: %v", err))
  251. if cleanErr := os.RemoveAll(dataDir); cleanErr != nil {
  252. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanErr))
  253. }
  254. return err
  255. }
  256. defer func() {
  257. // Only if there was an error and volume operation was considered
  258. // finished, we should remove the directory.
  259. if err != nil && volumetypes.IsOperationFinishedError(err) {
  260. // clean up metadata
  261. klog.Errorf(log("attacher.MountDevice failed: %v", err))
  262. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  263. klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", deviceMountPath, err))
  264. }
  265. }
  266. }()
  267. if !stageUnstageSet {
  268. klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
  269. // defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
  270. return nil
  271. }
  272. //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  273. accessMode := v1.ReadWriteOnce
  274. if spec.PersistentVolume.Spec.AccessModes != nil {
  275. accessMode = spec.PersistentVolume.Spec.AccessModes[0]
  276. }
  277. var mountOptions []string
  278. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.MountOptions != nil {
  279. mountOptions = spec.PersistentVolume.Spec.MountOptions
  280. }
  281. fsType := csiSource.FSType
  282. err = csi.NodeStageVolume(ctx,
  283. csiSource.VolumeHandle,
  284. publishContext,
  285. deviceMountPath,
  286. fsType,
  287. accessMode,
  288. nodeStageSecrets,
  289. csiSource.VolumeAttributes,
  290. mountOptions)
  291. if err != nil {
  292. return err
  293. }
  294. klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
  295. return err
  296. }
  297. var _ volume.Detacher = &csiAttacher{}
  298. var _ volume.DeviceUnmounter = &csiAttacher{}
  299. func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
  300. var attachID string
  301. var volID string
  302. if volumeName == "" {
  303. klog.Error(log("detacher.Detach missing value for parameter volumeName"))
  304. return errors.New("missing expected parameter volumeName")
  305. }
  306. if isAttachmentName(volumeName) {
  307. // Detach can also be called with the attach ID as the `volumeName`. This codepath is
  308. // hit only when we have migrated an in-tree volume to CSI and the A/D Controller is shut
  309. // down, the pod with the volume is deleted, and the A/D Controller starts back up in that
  310. // order.
  311. attachID = volumeName
  312. // Vol ID should be the volume handle, except that is not available here.
  313. // It is only used in log messages so in the event that this happens log messages will be
  314. // printing out the attachID instead of the volume handle.
  315. volID = volumeName
  316. } else {
  317. // volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
  318. parts := strings.Split(volumeName, volNameSep)
  319. if len(parts) != 2 {
  320. klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
  321. return errors.New("volumeName missing expected data")
  322. }
  323. driverName := parts[0]
  324. volID = parts[1]
  325. attachID = getAttachmentName(volID, driverName, string(nodeName))
  326. }
  327. if err := c.k8s.StorageV1().VolumeAttachments().Delete(context.TODO(), attachID, nil); err != nil {
  328. if apierrors.IsNotFound(err) {
  329. // object deleted or never existed, done
  330. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
  331. return nil
  332. }
  333. return errors.New(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
  334. }
  335. klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
  336. err := c.waitForVolumeDetachment(volID, attachID, csiTimeout)
  337. return err
  338. }
  339. func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string, timeout time.Duration) error {
  340. klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
  341. timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
  342. defer timer.Stop()
  343. return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
  344. }
  345. func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer,
  346. timeout time.Duration) error {
  347. klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
  348. attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
  349. if err != nil {
  350. if apierrors.IsNotFound(err) {
  351. //object deleted or never existed, done
  352. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
  353. return nil
  354. }
  355. return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
  356. }
  357. err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus)
  358. if err != nil {
  359. return err
  360. }
  361. return err
  362. }
  363. func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
  364. timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
  365. successful, err := verifyStatus(attach, volumeHandle)
  366. if err != nil {
  367. return err
  368. }
  369. if successful {
  370. return nil
  371. }
  372. watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
  373. if err != nil {
  374. return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
  375. }
  376. ch := watcher.ResultChan()
  377. defer watcher.Stop()
  378. for {
  379. select {
  380. case event, ok := <-ch:
  381. if !ok {
  382. klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
  383. return errors.New("volume attachment watch channel had been closed")
  384. }
  385. switch event.Type {
  386. case watch.Added, watch.Modified:
  387. attach, _ := event.Object.(*storage.VolumeAttachment)
  388. successful, err := verifyStatus(attach, volumeHandle)
  389. if err != nil {
  390. return err
  391. }
  392. if successful {
  393. return nil
  394. }
  395. case watch.Deleted:
  396. // set attach nil to get different results
  397. // for detachment, a deleted event means successful detachment, should return success
  398. // for attachment, should return fail
  399. if successful, err := verifyStatus(nil, volumeHandle); !successful {
  400. return err
  401. }
  402. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
  403. return nil
  404. case watch.Error:
  405. klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
  406. }
  407. case <-timer.C:
  408. klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
  409. return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
  410. }
  411. }
  412. }
  413. func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
  414. klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
  415. // Setup
  416. var driverName, volID string
  417. dataDir := filepath.Dir(deviceMountPath)
  418. data, err := loadVolumeData(dataDir, volDataFileName)
  419. if err == nil {
  420. driverName = data[volDataKey.driverName]
  421. volID = data[volDataKey.volHandle]
  422. } else {
  423. klog.Error(log("UnmountDevice failed to load volume data file [%s]: %v", dataDir, err))
  424. // The volume might have been mounted by old CSI volume plugin. Fall back to the old behavior: read PV from API server
  425. driverName, volID, err = getDriverAndVolNameFromDeviceMountPath(c.k8s, deviceMountPath)
  426. if err != nil {
  427. klog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
  428. return err
  429. }
  430. }
  431. if c.csiClient == nil {
  432. c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
  433. if err != nil {
  434. return errors.New(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
  435. }
  436. }
  437. csi := c.csiClient
  438. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  439. defer cancel()
  440. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  441. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  442. if err != nil {
  443. return errors.New(log("attacher.UnmountDevice failed to check whether STAGE_UNSTAGE_VOLUME set: %v", err))
  444. }
  445. if !stageUnstageSet {
  446. klog.Infof(log("attacher.UnmountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping UnmountDevice..."))
  447. // Just delete the global directory + json file
  448. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  449. return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
  450. }
  451. return nil
  452. }
  453. // Start UnmountDevice
  454. err = csi.NodeUnstageVolume(ctx,
  455. volID,
  456. deviceMountPath)
  457. if err != nil {
  458. return errors.New(log("attacher.UnmountDevice failed: %v", err))
  459. }
  460. // Delete the global directory + json file
  461. if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
  462. return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
  463. }
  464. klog.V(4).Infof(log("attacher.UnmountDevice successfully requested NodeUnStageVolume [%s]", deviceMountPath))
  465. return nil
  466. }
  467. // getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
  468. func getAttachmentName(volName, csiDriverName, nodeName string) string {
  469. result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
  470. return fmt.Sprintf("csi-%x", result)
  471. }
  472. // isAttachmentName returns true if the string given is of the form of an Attach ID
  473. // and false otherwise
  474. func isAttachmentName(unknownString string) bool {
  475. // 68 == "csi-" + len(sha256hash)
  476. if strings.HasPrefix(unknownString, "csi-") && len(unknownString) == 68 {
  477. return true
  478. }
  479. return false
  480. }
  481. func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
  482. if spec == nil {
  483. return "", errors.New(log("makeDeviceMountPath failed, spec is nil"))
  484. }
  485. pvName := spec.PersistentVolume.Name
  486. if pvName == "" {
  487. return "", errors.New(log("makeDeviceMountPath failed, pv name empty"))
  488. }
  489. return filepath.Join(plugin.host.GetPluginDir(plugin.GetPluginName()), persistentVolumeInGlobalPath, pvName, globalMountInGlobalPath), nil
  490. }
  491. func getDriverAndVolNameFromDeviceMountPath(k8s kubernetes.Interface, deviceMountPath string) (string, string, error) {
  492. // deviceMountPath structure: /var/lib/kubelet/plugins/kubernetes.io/csi/pv/{pvname}/globalmount
  493. dir := filepath.Dir(deviceMountPath)
  494. if file := filepath.Base(deviceMountPath); file != globalMountInGlobalPath {
  495. return "", "", errors.New(log("getDriverAndVolNameFromDeviceMountPath failed, path did not end in %s", globalMountInGlobalPath))
  496. }
  497. // dir is now /var/lib/kubelet/plugins/kubernetes.io/csi/pv/{pvname}
  498. pvName := filepath.Base(dir)
  499. // Get PV and check for errors
  500. pv, err := k8s.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, meta.GetOptions{})
  501. if err != nil {
  502. return "", "", err
  503. }
  504. if pv == nil || pv.Spec.CSI == nil {
  505. return "", "", errors.New(log("getDriverAndVolNameFromDeviceMountPath could not find CSI Persistent Volume Source for pv: %s", pvName))
  506. }
  507. // Get VolumeHandle and PluginName from pv
  508. csiSource := pv.Spec.CSI
  509. if csiSource.Driver == "" {
  510. return "", "", errors.New(log("getDriverAndVolNameFromDeviceMountPath failed, driver name empty"))
  511. }
  512. if csiSource.VolumeHandle == "" {
  513. return "", "", errors.New(log("getDriverAndVolNameFromDeviceMountPath failed, VolumeHandle empty"))
  514. }
  515. return csiSource.Driver, csiSource.VolumeHandle, nil
  516. }
  517. func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
  518. // when we received a deleted event during attachment, fail fast
  519. if attachment == nil {
  520. klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle))
  521. return false, errors.New("volume attachment has been deleted")
  522. }
  523. // if being deleted, fail fast
  524. if attachment.GetDeletionTimestamp() != nil {
  525. klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
  526. return false, errors.New("volume attachment is being deleted")
  527. }
  528. // attachment OK
  529. if attachment.Status.Attached {
  530. return true, nil
  531. }
  532. // driver reports attach error
  533. attachErr := attachment.Status.AttachError
  534. if attachErr != nil {
  535. klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
  536. return false, errors.New(attachErr.Message)
  537. }
  538. return false, nil
  539. }
  540. func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
  541. // when we received a deleted event during detachment
  542. // it means we have successfully detached it.
  543. if attachment == nil {
  544. return true, nil
  545. }
  546. // driver reports detach error
  547. detachErr := attachment.Status.DetachError
  548. if detachErr != nil {
  549. klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message))
  550. return false, errors.New(detachErr.Message)
  551. }
  552. return false, nil
  553. }