csi_block.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "os"
  19. "path/filepath"
  20. "k8s.io/klog"
  21. "k8s.io/api/core/v1"
  22. storage "k8s.io/api/storage/v1"
  23. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/client-go/kubernetes"
  26. "k8s.io/kubernetes/pkg/volume"
  27. ioutil "k8s.io/kubernetes/pkg/volume/util"
  28. utilstrings "k8s.io/utils/strings"
  29. )
  30. type csiBlockMapper struct {
  31. csiClientGetter
  32. k8s kubernetes.Interface
  33. plugin *csiPlugin
  34. driverName csiDriverName
  35. specName string
  36. volumeID string
  37. readOnly bool
  38. spec *volume.Spec
  39. podUID types.UID
  40. volumeInfo map[string]string
  41. }
  42. var _ volume.BlockVolumeMapper = &csiBlockMapper{}
  43. // GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
  44. // Example: plugins/kubernetes.io/csi/volumeDevices/{pvname}/dev
  45. func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
  46. dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
  47. klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
  48. return dir, nil
  49. }
  50. // getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
  51. // Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
  52. func (m *csiBlockMapper) getStagingPath() string {
  53. sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
  54. return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID)
  55. }
  56. // getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
  57. // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
  58. func (m *csiBlockMapper) getPublishPath() string {
  59. sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
  60. return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID)
  61. }
  62. // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
  63. // returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
  64. func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
  65. path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
  66. specName := m.specName
  67. klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
  68. return path, specName
  69. }
  70. // stageVolumeForBlock stages a block volume to stagingPath
  71. func (m *csiBlockMapper) stageVolumeForBlock(
  72. ctx context.Context,
  73. csi csiClient,
  74. accessMode v1.PersistentVolumeAccessMode,
  75. csiSource *v1.CSIPersistentVolumeSource,
  76. attachment *storage.VolumeAttachment,
  77. ) (string, error) {
  78. klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
  79. stagingPath := m.getStagingPath()
  80. klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
  81. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  82. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  83. if err != nil {
  84. klog.Error(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
  85. return "", err
  86. }
  87. if !stageUnstageSet {
  88. klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
  89. return "", nil
  90. }
  91. publishVolumeInfo := map[string]string{}
  92. if attachment != nil {
  93. publishVolumeInfo = attachment.Status.AttachmentMetadata
  94. }
  95. nodeStageSecrets := map[string]string{}
  96. if csiSource.NodeStageSecretRef != nil {
  97. nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef)
  98. if err != nil {
  99. return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v",
  100. csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
  101. }
  102. }
  103. // Creating a stagingPath directory before call to NodeStageVolume
  104. if err := os.MkdirAll(stagingPath, 0750); err != nil {
  105. klog.Error(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingPath, err))
  106. return "", err
  107. }
  108. klog.V(4).Info(log("blockMapper.stageVolumeForBlock created stagingPath directory successfully [%s]", stagingPath))
  109. // Request to stage a block volume to stagingPath.
  110. // Expected implementation for driver is creating driver specific resource on stagingPath and
  111. // attaching the block volume to the node.
  112. err = csi.NodeStageVolume(ctx,
  113. csiSource.VolumeHandle,
  114. publishVolumeInfo,
  115. stagingPath,
  116. fsTypeBlockName,
  117. accessMode,
  118. nodeStageSecrets,
  119. csiSource.VolumeAttributes,
  120. nil /* MountOptions */)
  121. if err != nil {
  122. klog.Error(log("blockMapper.stageVolumeForBlock failed: %v", err))
  123. return "", err
  124. }
  125. klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
  126. return stagingPath, nil
  127. }
  128. // publishVolumeForBlock publishes a block volume to publishPath
  129. func (m *csiBlockMapper) publishVolumeForBlock(
  130. ctx context.Context,
  131. csi csiClient,
  132. accessMode v1.PersistentVolumeAccessMode,
  133. csiSource *v1.CSIPersistentVolumeSource,
  134. attachment *storage.VolumeAttachment,
  135. stagingPath string,
  136. ) (string, error) {
  137. klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
  138. publishVolumeInfo := map[string]string{}
  139. if attachment != nil {
  140. publishVolumeInfo = attachment.Status.AttachmentMetadata
  141. }
  142. nodePublishSecrets := map[string]string{}
  143. var err error
  144. if csiSource.NodePublishSecretRef != nil {
  145. nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
  146. if err != nil {
  147. klog.Errorf("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v",
  148. csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
  149. return "", err
  150. }
  151. }
  152. publishPath := m.getPublishPath()
  153. // Setup a parent directory for publishPath before call to NodePublishVolume
  154. publishDir := filepath.Dir(publishPath)
  155. if err := os.MkdirAll(publishDir, 0750); err != nil {
  156. klog.Error(log("blockMapper.publishVolumeForBlock failed to create dir %s: %v", publishDir, err))
  157. return "", err
  158. }
  159. klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir))
  160. // Request to publish a block volume to publishPath.
  161. // Expectation for driver is to place a block volume on the publishPath, by bind-mounting the device file on the publishPath or
  162. // creating device file on the publishPath.
  163. // Parent directory for publishPath is created by k8s, but driver is responsible for creating publishPath itself.
  164. // If driver doesn't implement NodeStageVolume, attaching the block volume to the node may be done, here.
  165. err = csi.NodePublishVolume(
  166. ctx,
  167. m.volumeID,
  168. m.readOnly,
  169. stagingPath,
  170. publishPath,
  171. accessMode,
  172. publishVolumeInfo,
  173. csiSource.VolumeAttributes,
  174. nodePublishSecrets,
  175. fsTypeBlockName,
  176. []string{},
  177. )
  178. if err != nil {
  179. klog.Errorf(log("blockMapper.publishVolumeForBlock failed: %v", err))
  180. return "", err
  181. }
  182. return publishPath, nil
  183. }
  184. // SetUpDevice ensures the device is attached returns path where the device is located.
  185. func (m *csiBlockMapper) SetUpDevice() (string, error) {
  186. if !m.plugin.blockEnabled {
  187. return "", errors.New("CSIBlockVolume feature not enabled")
  188. }
  189. klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
  190. // Get csiSource from spec
  191. if m.spec == nil {
  192. klog.Error(log("blockMapper.SetUpDevice spec is nil"))
  193. return "", fmt.Errorf("spec is nil")
  194. }
  195. csiSource, err := getCSISourceFromSpec(m.spec)
  196. if err != nil {
  197. klog.Error(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
  198. return "", err
  199. }
  200. driverName := csiSource.Driver
  201. skip, err := m.plugin.skipAttach(driverName)
  202. if err != nil {
  203. klog.Error(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
  204. return "", err
  205. }
  206. var attachment *storage.VolumeAttachment
  207. if !skip {
  208. // Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  209. nodeName := string(m.plugin.host.GetNodeName())
  210. attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
  211. attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
  212. if err != nil {
  213. klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
  214. return "", err
  215. }
  216. }
  217. //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  218. accessMode := v1.ReadWriteOnce
  219. if m.spec.PersistentVolume.Spec.AccessModes != nil {
  220. accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
  221. }
  222. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  223. defer cancel()
  224. csiClient, err := m.csiClientGetter.Get()
  225. if err != nil {
  226. klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
  227. return "", err
  228. }
  229. // Call NodeStageVolume
  230. stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
  231. if err != nil {
  232. return "", err
  233. }
  234. // Call NodePublishVolume
  235. publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
  236. if err != nil {
  237. return "", err
  238. }
  239. return publishPath, nil
  240. }
  241. func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
  242. return ioutil.MapBlockVolume(devicePath, globalMapPath, volumeMapPath, volumeMapName, podUID)
  243. }
  244. var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
  245. // unpublishVolumeForBlock unpublishes a block volume from publishPath
  246. func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error {
  247. // Request to unpublish a block volume from publishPath.
  248. // Expectation for driver is to remove block volume from the publishPath, by unmounting bind-mounted device file
  249. // or deleting device file.
  250. // Driver is responsible for deleting publishPath itself.
  251. // If driver doesn't implement NodeUnstageVolume, detaching the block volume from the node may be done, here.
  252. if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil {
  253. klog.Error(log("blockMapper.unpublishVolumeForBlock failed: %v", err))
  254. return err
  255. }
  256. klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath))
  257. return nil
  258. }
  259. // unstageVolumeForBlock unstages a block volume from stagingPath
  260. func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error {
  261. // Check whether "STAGE_UNSTAGE_VOLUME" is set
  262. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  263. if err != nil {
  264. klog.Error(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
  265. return err
  266. }
  267. if !stageUnstageSet {
  268. klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ..."))
  269. return nil
  270. }
  271. // Request to unstage a block volume from stagingPath.
  272. // Expected implementation for driver is removing driver specific resource in stagingPath and
  273. // detaching the block volume from the node.
  274. if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil {
  275. klog.Errorf(log("blockMapper.unstageVolumeForBlock failed: %v", err))
  276. return err
  277. }
  278. klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath))
  279. // Remove stagingPath directory and its contents
  280. if err := os.RemoveAll(stagingPath); err != nil {
  281. klog.Error(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err))
  282. return err
  283. }
  284. return nil
  285. }
  286. // TearDownDevice removes traces of the SetUpDevice.
  287. func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
  288. if !m.plugin.blockEnabled {
  289. return errors.New("CSIBlockVolume feature not enabled")
  290. }
  291. klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
  292. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  293. defer cancel()
  294. csiClient, err := m.csiClientGetter.Get()
  295. if err != nil {
  296. klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
  297. return err
  298. }
  299. // Call NodeUnpublishVolume
  300. publishPath := m.getPublishPath()
  301. if _, err := os.Stat(publishPath); err != nil {
  302. if os.IsNotExist(err) {
  303. klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
  304. } else {
  305. return err
  306. }
  307. } else {
  308. err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
  309. if err != nil {
  310. return err
  311. }
  312. }
  313. // Call NodeUnstageVolume
  314. stagingPath := m.getStagingPath()
  315. if _, err := os.Stat(stagingPath); err != nil {
  316. if os.IsNotExist(err) {
  317. klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
  318. } else {
  319. return err
  320. }
  321. } else {
  322. err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
  323. if err != nil {
  324. return err
  325. }
  326. }
  327. return nil
  328. }