csi_mounter.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "errors"
  18. "fmt"
  19. "os"
  20. "path/filepath"
  21. "strconv"
  22. "k8s.io/klog"
  23. api "k8s.io/api/core/v1"
  24. storage "k8s.io/api/storage/v1beta1"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. "k8s.io/apimachinery/pkg/types"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. "k8s.io/client-go/kubernetes"
  29. "k8s.io/kubernetes/pkg/features"
  30. "k8s.io/kubernetes/pkg/volume"
  31. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  32. utilstrings "k8s.io/utils/strings"
  33. )
  34. //TODO (vladimirvivien) move this in a central loc later
  35. var (
  36. volDataKey = struct {
  37. specVolID,
  38. volHandle,
  39. driverName,
  40. nodeName,
  41. attachmentID,
  42. volumeLifecycleMode string
  43. }{
  44. "specVolID",
  45. "volumeHandle",
  46. "driverName",
  47. "nodeName",
  48. "attachmentID",
  49. "volumeLifecycleMode",
  50. }
  51. )
  52. type csiMountMgr struct {
  53. csiClientGetter
  54. k8s kubernetes.Interface
  55. plugin *csiPlugin
  56. driverName csiDriverName
  57. volumeLifecycleMode storage.VolumeLifecycleMode
  58. volumeID string
  59. specVolumeID string
  60. readOnly bool
  61. supportsSELinux bool
  62. spec *volume.Spec
  63. pod *api.Pod
  64. podUID types.UID
  65. options volume.VolumeOptions
  66. publishContext map[string]string
  67. kubeVolHost volume.KubeletVolumeHost
  68. volume.MetricsProvider
  69. }
  70. // volume.Volume methods
  71. var _ volume.Volume = &csiMountMgr{}
  72. func (c *csiMountMgr) GetPath() string {
  73. dir := filepath.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host), "/mount")
  74. klog.V(4).Info(log("mounter.GetPath generated [%s]", dir))
  75. return dir
  76. }
  77. func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
  78. specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
  79. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
  80. }
  81. // volume.Mounter methods
  82. var _ volume.Mounter = &csiMountMgr{}
  83. func (c *csiMountMgr) CanMount() error {
  84. return nil
  85. }
  86. func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
  87. return c.SetUpAt(c.GetPath(), mounterArgs)
  88. }
  89. func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  90. klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
  91. mounted, err := isDirMounted(c.plugin, dir)
  92. if err != nil {
  93. return errors.New(log("mounter.SetUpAt failed while checking mount status for dir [%s]: %v", dir, err))
  94. }
  95. if mounted {
  96. klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
  97. return nil
  98. }
  99. csi, err := c.csiClientGetter.Get()
  100. if err != nil {
  101. return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
  102. }
  103. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  104. defer cancel()
  105. volSrc, pvSrc, err := getSourceFromSpec(c.spec)
  106. if err != nil {
  107. return errors.New(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
  108. }
  109. driverName := c.driverName
  110. volumeHandle := c.volumeID
  111. readOnly := c.readOnly
  112. accessMode := api.ReadWriteOnce
  113. var (
  114. fsType string
  115. volAttribs map[string]string
  116. nodePublishSecrets map[string]string
  117. publishContext map[string]string
  118. mountOptions []string
  119. deviceMountPath string
  120. secretRef *api.SecretReference
  121. )
  122. switch {
  123. case volSrc != nil:
  124. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  125. return fmt.Errorf("CSIInlineVolume feature required")
  126. }
  127. if c.volumeLifecycleMode != storage.VolumeLifecycleEphemeral {
  128. return fmt.Errorf("unexpected volume mode: %s", c.volumeLifecycleMode)
  129. }
  130. if volSrc.FSType != nil {
  131. fsType = *volSrc.FSType
  132. }
  133. volAttribs = volSrc.VolumeAttributes
  134. if volSrc.NodePublishSecretRef != nil {
  135. secretName := volSrc.NodePublishSecretRef.Name
  136. ns := c.pod.Namespace
  137. secretRef = &api.SecretReference{Name: secretName, Namespace: ns}
  138. }
  139. case pvSrc != nil:
  140. if c.volumeLifecycleMode != storage.VolumeLifecyclePersistent {
  141. return fmt.Errorf("unexpected driver mode: %s", c.volumeLifecycleMode)
  142. }
  143. fsType = pvSrc.FSType
  144. volAttribs = pvSrc.VolumeAttributes
  145. if pvSrc.NodePublishSecretRef != nil {
  146. secretRef = pvSrc.NodePublishSecretRef
  147. }
  148. //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  149. if c.spec.PersistentVolume.Spec.AccessModes != nil {
  150. accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
  151. }
  152. mountOptions = c.spec.PersistentVolume.Spec.MountOptions
  153. // Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
  154. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  155. if err != nil {
  156. return errors.New(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capability: %v", err))
  157. }
  158. if stageUnstageSet {
  159. deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
  160. if err != nil {
  161. return errors.New(log("mounter.SetUpAt failed to make device mount path: %v", err))
  162. }
  163. }
  164. // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  165. if c.publishContext == nil {
  166. nodeName := string(c.plugin.host.GetNodeName())
  167. c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
  168. if err != nil {
  169. // we could have a transient error associated with fetching publish context
  170. return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to fetch publishContext: %v", err))
  171. }
  172. publishContext = c.publishContext
  173. }
  174. default:
  175. return fmt.Errorf("volume source not found in volume.Spec")
  176. }
  177. // create target_dir before call to NodePublish
  178. if err := os.MkdirAll(dir, 0750); err != nil {
  179. return errors.New(log("mounter.SetUpAt failed to create dir %#v: %v", dir, err))
  180. }
  181. klog.V(4).Info(log("created target path successfully [%s]", dir))
  182. nodePublishSecrets = map[string]string{}
  183. if secretRef != nil {
  184. nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
  185. if err != nil {
  186. return volumetypes.NewTransientOperationFailure(fmt.Sprintf("fetching NodePublishSecretRef %s/%s failed: %v",
  187. secretRef.Namespace, secretRef.Name, err))
  188. }
  189. }
  190. // Inject pod information into volume_attributes
  191. podAttrs, err := c.podAttributes()
  192. if err != nil {
  193. return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
  194. }
  195. if podAttrs != nil {
  196. if volAttribs == nil {
  197. volAttribs = podAttrs
  198. } else {
  199. for k, v := range podAttrs {
  200. volAttribs[k] = v
  201. }
  202. }
  203. }
  204. err = csi.NodePublishVolume(
  205. ctx,
  206. volumeHandle,
  207. readOnly,
  208. deviceMountPath,
  209. dir,
  210. accessMode,
  211. publishContext,
  212. volAttribs,
  213. nodePublishSecrets,
  214. fsType,
  215. mountOptions,
  216. )
  217. if err != nil {
  218. // If operation finished with error then we can remove the mount directory.
  219. if volumetypes.IsOperationFinishedError(err) {
  220. if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
  221. klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
  222. }
  223. }
  224. return err
  225. }
  226. c.supportsSELinux, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir)
  227. if err != nil {
  228. klog.V(2).Info(log("error checking for SELinux support: %s", err))
  229. }
  230. // apply volume ownership
  231. // The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
  232. // if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
  233. // if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
  234. err = c.applyFSGroup(fsType, mounterArgs.FsGroup)
  235. if err != nil {
  236. // At this point mount operation is successful:
  237. // 1. Since volume can not be used by the pod because of invalid permissions, we must return error
  238. // 2. Since mount is successful, we must record volume as mounted in uncertain state, so it can be
  239. // cleaned up.
  240. return volumetypes.NewUncertainProgressError(fmt.Sprintf("applyFSGroup failed for vol %s: %v", c.volumeID, err))
  241. }
  242. klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
  243. return nil
  244. }
  245. func (c *csiMountMgr) podAttributes() (map[string]string, error) {
  246. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  247. return nil, nil
  248. }
  249. kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
  250. if ok {
  251. kletHost.WaitForCacheSync()
  252. }
  253. if c.plugin.csiDriverLister == nil {
  254. return nil, fmt.Errorf("CSIDriverLister not found")
  255. }
  256. csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
  257. if err != nil {
  258. if apierrors.IsNotFound(err) {
  259. klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))
  260. return nil, nil
  261. }
  262. return nil, err
  263. }
  264. // if PodInfoOnMount is not set or false we do not set pod attributes
  265. if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
  266. klog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
  267. return nil, nil
  268. }
  269. attrs := map[string]string{
  270. "csi.storage.k8s.io/pod.name": c.pod.Name,
  271. "csi.storage.k8s.io/pod.namespace": c.pod.Namespace,
  272. "csi.storage.k8s.io/pod.uid": string(c.pod.UID),
  273. "csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName,
  274. }
  275. if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  276. attrs["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(c.volumeLifecycleMode == storage.VolumeLifecycleEphemeral)
  277. }
  278. klog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName))
  279. return attrs, nil
  280. }
  281. func (c *csiMountMgr) GetAttributes() volume.Attributes {
  282. return volume.Attributes{
  283. ReadOnly: c.readOnly,
  284. Managed: !c.readOnly,
  285. SupportsSELinux: c.supportsSELinux,
  286. }
  287. }
  288. // volume.Unmounter methods
  289. var _ volume.Unmounter = &csiMountMgr{}
  290. func (c *csiMountMgr) TearDown() error {
  291. return c.TearDownAt(c.GetPath())
  292. }
  293. func (c *csiMountMgr) TearDownAt(dir string) error {
  294. klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
  295. volID := c.volumeID
  296. csi, err := c.csiClientGetter.Get()
  297. if err != nil {
  298. return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
  299. }
  300. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  301. defer cancel()
  302. if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
  303. return errors.New(log("mounter.TearDownAt failed: %v", err))
  304. }
  305. // clean mount point dir
  306. if err := removeMountDir(c.plugin, dir); err != nil {
  307. return errors.New(log("mounter.TearDownAt failed to clean mount dir [%s]: %v", dir, err))
  308. }
  309. klog.V(4).Infof(log("mounter.TearDownAt successfully unmounted dir [%s]", dir))
  310. return nil
  311. }
  312. // applyFSGroup applies the volume ownership it derives its logic
  313. // from https://github.com/kubernetes/kubernetes/issues/66323
  314. // 1) if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
  315. // 2) if fstype is provided and pv.AccessMode == ReadWriteOnly and !c.spec.ReadOnly then apply fsgroup
  316. func (c *csiMountMgr) applyFSGroup(fsType string, fsGroup *int64) error {
  317. if fsGroup != nil {
  318. if fsType == "" {
  319. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, fsType not provided"))
  320. return nil
  321. }
  322. accessModes := c.spec.PersistentVolume.Spec.AccessModes
  323. if c.spec.PersistentVolume.Spec.AccessModes == nil {
  324. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, access modes not provided"))
  325. return nil
  326. }
  327. if !hasReadWriteOnce(accessModes) {
  328. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, only support ReadWriteOnce access mode"))
  329. return nil
  330. }
  331. if c.readOnly {
  332. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, volume is readOnly"))
  333. return nil
  334. }
  335. err := volume.SetVolumeOwnership(c, fsGroup)
  336. if err != nil {
  337. return err
  338. }
  339. klog.V(4).Info(log("mounter.SetupAt fsGroup [%d] applied successfully to %s", *fsGroup, c.volumeID))
  340. }
  341. return nil
  342. }
  343. // isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
  344. func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
  345. mounter := plug.host.GetMounter(plug.GetPluginName())
  346. notMnt, err := mounter.IsLikelyNotMountPoint(dir)
  347. if err != nil && !os.IsNotExist(err) {
  348. klog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir))
  349. return false, err
  350. }
  351. return !notMnt, nil
  352. }
  353. // removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir
  354. func removeMountDir(plug *csiPlugin, mountPath string) error {
  355. klog.V(4).Info(log("removing mount path [%s]", mountPath))
  356. mnt, err := isDirMounted(plug, mountPath)
  357. if err != nil {
  358. return err
  359. }
  360. if !mnt {
  361. klog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath))
  362. if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) {
  363. return errors.New(log("failed to remove dir [%s]: %v", mountPath, err))
  364. }
  365. // remove volume data file as well
  366. volPath := filepath.Dir(mountPath)
  367. dataFile := filepath.Join(volPath, volDataFileName)
  368. klog.V(4).Info(log("also deleting volume info data file [%s]", dataFile))
  369. if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
  370. return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err))
  371. }
  372. // remove volume path
  373. klog.V(4).Info(log("deleting volume path [%s]", volPath))
  374. if err := os.Remove(volPath); err != nil && !os.IsNotExist(err) {
  375. return errors.New(log("failed to delete volume path [%s]: %v", volPath, err))
  376. }
  377. }
  378. return nil
  379. }
  380. // makeVolumeHandle returns csi-<sha256(podUID,volSourceSpecName)>
  381. func makeVolumeHandle(podUID, volSourceSpecName string) string {
  382. result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName)))
  383. return fmt.Sprintf("csi-%x", result)
  384. }