csi_mounter.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "fmt"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "k8s.io/klog"
  22. api "k8s.io/api/core/v1"
  23. apierrs "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. "k8s.io/client-go/kubernetes"
  27. "k8s.io/kubernetes/pkg/features"
  28. "k8s.io/kubernetes/pkg/volume"
  29. utilstrings "k8s.io/utils/strings"
  30. )
  31. //TODO (vladimirvivien) move this in a central loc later
  32. var (
  33. volDataKey = struct {
  34. specVolID,
  35. volHandle,
  36. driverName,
  37. nodeName,
  38. attachmentID,
  39. driverMode string
  40. }{
  41. "specVolID",
  42. "volumeHandle",
  43. "driverName",
  44. "nodeName",
  45. "attachmentID",
  46. "driverMode",
  47. }
  48. )
  49. type csiMountMgr struct {
  50. csiClientGetter
  51. k8s kubernetes.Interface
  52. plugin *csiPlugin
  53. driverName csiDriverName
  54. driverMode driverMode
  55. volumeID string
  56. specVolumeID string
  57. readOnly bool
  58. spec *volume.Spec
  59. pod *api.Pod
  60. podUID types.UID
  61. options volume.VolumeOptions
  62. publishContext map[string]string
  63. volume.MetricsProvider
  64. }
  65. // volume.Volume methods
  66. var _ volume.Volume = &csiMountMgr{}
  67. func (c *csiMountMgr) GetPath() string {
  68. dir := filepath.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host), "/mount")
  69. klog.V(4).Info(log("mounter.GetPath generated [%s]", dir))
  70. return dir
  71. }
  72. func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
  73. specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
  74. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
  75. }
  76. // volume.Mounter methods
  77. var _ volume.Mounter = &csiMountMgr{}
  78. func (c *csiMountMgr) CanMount() error {
  79. return nil
  80. }
  81. func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
  82. return c.SetUpAt(c.GetPath(), mounterArgs)
  83. }
  84. func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  85. klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
  86. mounted, err := isDirMounted(c.plugin, dir)
  87. if err != nil {
  88. klog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir))
  89. return err
  90. }
  91. if mounted {
  92. klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
  93. return nil
  94. }
  95. csi, err := c.csiClientGetter.Get()
  96. if err != nil {
  97. klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
  98. return err
  99. }
  100. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  101. defer cancel()
  102. volSrc, pvSrc, err := getSourceFromSpec(c.spec)
  103. if err != nil {
  104. klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
  105. return err
  106. }
  107. driverName := c.driverName
  108. volumeHandle := c.volumeID
  109. readOnly := c.readOnly
  110. accessMode := api.ReadWriteOnce
  111. var (
  112. fsType string
  113. volAttribs map[string]string
  114. nodePublishSecrets map[string]string
  115. publishContext map[string]string
  116. mountOptions []string
  117. deviceMountPath string
  118. secretRef *api.SecretReference
  119. )
  120. switch {
  121. case volSrc != nil:
  122. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  123. return fmt.Errorf("CSIInlineVolume feature required")
  124. }
  125. if c.driverMode != ephemeralDriverMode {
  126. return fmt.Errorf("unexpected driver mode: %s", c.driverMode)
  127. }
  128. if volSrc.FSType != nil {
  129. fsType = *volSrc.FSType
  130. }
  131. volAttribs = volSrc.VolumeAttributes
  132. if volSrc.NodePublishSecretRef != nil {
  133. secretName := volSrc.NodePublishSecretRef.Name
  134. ns := c.pod.Namespace
  135. secretRef = &api.SecretReference{Name: secretName, Namespace: ns}
  136. }
  137. case pvSrc != nil:
  138. if c.driverMode != persistentDriverMode {
  139. return fmt.Errorf("unexpected driver mode: %s", c.driverMode)
  140. }
  141. fsType = pvSrc.FSType
  142. volAttribs = pvSrc.VolumeAttributes
  143. if pvSrc.NodePublishSecretRef != nil {
  144. secretRef = pvSrc.NodePublishSecretRef
  145. }
  146. //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  147. if c.spec.PersistentVolume.Spec.AccessModes != nil {
  148. accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
  149. }
  150. mountOptions = c.spec.PersistentVolume.Spec.MountOptions
  151. // Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
  152. stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
  153. if err != nil {
  154. klog.Error(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err))
  155. return err
  156. }
  157. if stageUnstageSet {
  158. deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
  159. if err != nil {
  160. klog.Error(log("mounter.SetUpAt failed to make device mount path: %v", err))
  161. return err
  162. }
  163. }
  164. // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  165. if c.publishContext == nil {
  166. nodeName := string(c.plugin.host.GetNodeName())
  167. c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
  168. if err != nil {
  169. return err
  170. }
  171. publishContext = c.publishContext
  172. }
  173. default:
  174. return fmt.Errorf("volume source not found in volume.Spec")
  175. }
  176. // create target_dir before call to NodePublish
  177. if err := os.MkdirAll(dir, 0750); err != nil {
  178. klog.Error(log("mouter.SetUpAt failed to create dir %#v: %v", dir, err))
  179. return err
  180. }
  181. klog.V(4).Info(log("created target path successfully [%s]", dir))
  182. nodePublishSecrets = map[string]string{}
  183. if secretRef != nil {
  184. nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
  185. if err != nil {
  186. return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
  187. secretRef.Namespace, secretRef.Name, err)
  188. }
  189. }
  190. // Inject pod information into volume_attributes
  191. podAttrs, err := c.podAttributes()
  192. if err != nil {
  193. klog.Error(log("mouter.SetUpAt failed to assemble volume attributes: %v", err))
  194. return err
  195. }
  196. if podAttrs != nil {
  197. if volAttribs == nil {
  198. volAttribs = podAttrs
  199. } else {
  200. for k, v := range podAttrs {
  201. volAttribs[k] = v
  202. }
  203. }
  204. }
  205. err = csi.NodePublishVolume(
  206. ctx,
  207. volumeHandle,
  208. readOnly,
  209. deviceMountPath,
  210. dir,
  211. accessMode,
  212. publishContext,
  213. volAttribs,
  214. nodePublishSecrets,
  215. fsType,
  216. mountOptions,
  217. )
  218. if err != nil {
  219. klog.Errorf(log("mounter.SetupAt failed: %v", err))
  220. if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
  221. klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
  222. }
  223. return err
  224. }
  225. // apply volume ownership
  226. // The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
  227. // if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
  228. // if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
  229. err = c.applyFSGroup(fsType, mounterArgs.FsGroup)
  230. if err != nil {
  231. // attempt to rollback mount.
  232. fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
  233. if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
  234. klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
  235. return fsGrpErr
  236. }
  237. if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
  238. klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
  239. return fsGrpErr
  240. }
  241. return fsGrpErr
  242. }
  243. klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
  244. return nil
  245. }
  246. func (c *csiMountMgr) podAttributes() (map[string]string, error) {
  247. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  248. return nil, nil
  249. }
  250. kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
  251. if ok {
  252. kletHost.WaitForCacheSync()
  253. }
  254. if c.plugin.csiDriverLister == nil {
  255. return nil, fmt.Errorf("CSIDriverLister not found")
  256. }
  257. csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
  258. if err != nil {
  259. if apierrs.IsNotFound(err) {
  260. klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))
  261. return nil, nil
  262. }
  263. return nil, err
  264. }
  265. // if PodInfoOnMount is not set or false we do not set pod attributes
  266. if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
  267. klog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
  268. return nil, nil
  269. }
  270. attrs := map[string]string{
  271. "csi.storage.k8s.io/pod.name": c.pod.Name,
  272. "csi.storage.k8s.io/pod.namespace": c.pod.Namespace,
  273. "csi.storage.k8s.io/pod.uid": string(c.pod.UID),
  274. "csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName,
  275. }
  276. klog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName))
  277. return attrs, nil
  278. }
  279. func (c *csiMountMgr) GetAttributes() volume.Attributes {
  280. mounter := c.plugin.host.GetMounter(c.plugin.GetPluginName())
  281. path := c.GetPath()
  282. supportSelinux, err := mounter.GetSELinuxSupport(path)
  283. if err != nil {
  284. klog.V(2).Info(log("error checking for SELinux support: %s", err))
  285. // Best guess
  286. supportSelinux = false
  287. }
  288. return volume.Attributes{
  289. ReadOnly: c.readOnly,
  290. Managed: !c.readOnly,
  291. SupportsSELinux: supportSelinux,
  292. }
  293. }
  294. // volume.Unmounter methods
  295. var _ volume.Unmounter = &csiMountMgr{}
  296. func (c *csiMountMgr) TearDown() error {
  297. return c.TearDownAt(c.GetPath())
  298. }
  299. func (c *csiMountMgr) TearDownAt(dir string) error {
  300. klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
  301. volID := c.volumeID
  302. csi, err := c.csiClientGetter.Get()
  303. if err != nil {
  304. klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
  305. return err
  306. }
  307. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  308. defer cancel()
  309. if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
  310. klog.Errorf(log("mounter.TearDownAt failed: %v", err))
  311. return err
  312. }
  313. // clean mount point dir
  314. if err := removeMountDir(c.plugin, dir); err != nil {
  315. klog.Error(log("mounter.TearDownAt failed to clean mount dir [%s]: %v", dir, err))
  316. return err
  317. }
  318. klog.V(4).Infof(log("mounter.TearDownAt successfully unmounted dir [%s]", dir))
  319. return nil
  320. }
  321. // applyFSGroup applies the volume ownership it derives its logic
  322. // from https://github.com/kubernetes/kubernetes/issues/66323
  323. // 1) if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
  324. // 2) if fstype is provided and pv.AccessMode == ReadWriteOnly and !c.spec.ReadOnly then apply fsgroup
  325. func (c *csiMountMgr) applyFSGroup(fsType string, fsGroup *int64) error {
  326. if fsGroup != nil {
  327. if fsType == "" {
  328. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, fsType not provided"))
  329. return nil
  330. }
  331. accessModes := c.spec.PersistentVolume.Spec.AccessModes
  332. if c.spec.PersistentVolume.Spec.AccessModes == nil {
  333. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, access modes not provided"))
  334. return nil
  335. }
  336. if !hasReadWriteOnce(accessModes) {
  337. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, only support ReadWriteOnce access mode"))
  338. return nil
  339. }
  340. if c.readOnly {
  341. klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, volume is readOnly"))
  342. return nil
  343. }
  344. err := volume.SetVolumeOwnership(c, fsGroup)
  345. if err != nil {
  346. return err
  347. }
  348. klog.V(4).Info(log("mounter.SetupAt fsGroup [%d] applied successfully to %s", *fsGroup, c.volumeID))
  349. }
  350. return nil
  351. }
  352. // isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
  353. func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
  354. mounter := plug.host.GetMounter(plug.GetPluginName())
  355. notMnt, err := mounter.IsLikelyNotMountPoint(dir)
  356. if err != nil && !os.IsNotExist(err) {
  357. klog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir))
  358. return false, err
  359. }
  360. return !notMnt, nil
  361. }
  362. // removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir
  363. func removeMountDir(plug *csiPlugin, mountPath string) error {
  364. klog.V(4).Info(log("removing mount path [%s]", mountPath))
  365. mnt, err := isDirMounted(plug, mountPath)
  366. if err != nil {
  367. return err
  368. }
  369. if !mnt {
  370. klog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath))
  371. if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) {
  372. klog.Error(log("failed to remove dir [%s]: %v", mountPath, err))
  373. return err
  374. }
  375. // remove volume data file as well
  376. volPath := path.Dir(mountPath)
  377. dataFile := filepath.Join(volPath, volDataFileName)
  378. klog.V(4).Info(log("also deleting volume info data file [%s]", dataFile))
  379. if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
  380. klog.Error(log("failed to delete volume data file [%s]: %v", dataFile, err))
  381. return err
  382. }
  383. // remove volume path
  384. klog.V(4).Info(log("deleting volume path [%s]", volPath))
  385. if err := os.Remove(volPath); err != nil && !os.IsNotExist(err) {
  386. klog.Error(log("failed to delete volume path [%s]: %v", volPath, err))
  387. return err
  388. }
  389. }
  390. return nil
  391. }
  392. // makeVolumeHandle returns csi-<sha256(podUID,volSourceSpecName)>
  393. func makeVolumeHandle(podUID, volSourceSpecName string) string {
  394. result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName)))
  395. return fmt.Sprintf("csi-%x", result)
  396. }