cephfs.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cephfs
  14. import (
  15. "context"
  16. "fmt"
  17. "os"
  18. "os/exec"
  19. "path/filepath"
  20. "runtime"
  21. "strings"
  22. "k8s.io/klog"
  23. "k8s.io/utils/mount"
  24. utilstrings "k8s.io/utils/strings"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/kubernetes/pkg/volume"
  29. "k8s.io/kubernetes/pkg/volume/util"
  30. )
  31. // ProbeVolumePlugins is the primary entrypoint for volume plugins.
  32. func ProbeVolumePlugins() []volume.VolumePlugin {
  33. return []volume.VolumePlugin{&cephfsPlugin{nil}}
  34. }
  35. type cephfsPlugin struct {
  36. host volume.VolumeHost
  37. }
  38. var _ volume.VolumePlugin = &cephfsPlugin{}
  39. const (
  40. cephfsPluginName = "kubernetes.io/cephfs"
  41. )
  42. func (plugin *cephfsPlugin) Init(host volume.VolumeHost) error {
  43. plugin.host = host
  44. return nil
  45. }
  46. func (plugin *cephfsPlugin) GetPluginName() string {
  47. return cephfsPluginName
  48. }
  49. func (plugin *cephfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  50. mon, _, _, _, _, err := getVolumeSource(spec)
  51. if err != nil {
  52. return "", err
  53. }
  54. return fmt.Sprintf("%v", mon), nil
  55. }
  56. func (plugin *cephfsPlugin) CanSupport(spec *volume.Spec) bool {
  57. return (spec.Volume != nil && spec.Volume.CephFS != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CephFS != nil)
  58. }
  59. func (plugin *cephfsPlugin) RequiresRemount() bool {
  60. return false
  61. }
  62. func (plugin *cephfsPlugin) SupportsMountOption() bool {
  63. return true
  64. }
  65. func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool {
  66. return false
  67. }
  68. func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  69. return []v1.PersistentVolumeAccessMode{
  70. v1.ReadWriteOnce,
  71. v1.ReadOnlyMany,
  72. v1.ReadWriteMany,
  73. }
  74. }
  75. func (plugin *cephfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  76. secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
  77. if err != nil {
  78. return nil, err
  79. }
  80. secret := ""
  81. if len(secretName) > 0 && len(secretNs) > 0 {
  82. // if secret is provideded, retrieve it
  83. kubeClient := plugin.host.GetKubeClient()
  84. if kubeClient == nil {
  85. return nil, fmt.Errorf("Cannot get kube client")
  86. }
  87. secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
  88. if err != nil {
  89. err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
  90. return nil, err
  91. }
  92. for name, data := range secrets.Data {
  93. secret = string(data)
  94. klog.V(4).Infof("found ceph secret info: %s", name)
  95. }
  96. }
  97. return plugin.newMounterInternal(spec, pod.UID, plugin.host.GetMounter(plugin.GetPluginName()), secret)
  98. }
  99. func (plugin *cephfsPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, mounter mount.Interface, secret string) (volume.Mounter, error) {
  100. mon, path, id, secretFile, readOnly, err := getVolumeSource(spec)
  101. if err != nil {
  102. return nil, err
  103. }
  104. if id == "" {
  105. id = "admin"
  106. }
  107. if path == "" {
  108. path = "/"
  109. }
  110. if !strings.HasPrefix(path, "/") {
  111. path = "/" + path
  112. }
  113. if secretFile == "" {
  114. secretFile = "/etc/ceph/" + id + ".secret"
  115. }
  116. return &cephfsMounter{
  117. cephfs: &cephfs{
  118. podUID: podUID,
  119. volName: spec.Name(),
  120. mon: mon,
  121. path: path,
  122. secret: secret,
  123. id: id,
  124. secretFile: secretFile,
  125. readonly: readOnly,
  126. mounter: mounter,
  127. plugin: plugin,
  128. mountOptions: util.MountOptionFromSpec(spec),
  129. },
  130. }, nil
  131. }
  132. func (plugin *cephfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  133. return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName()))
  134. }
  135. func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) {
  136. return &cephfsUnmounter{
  137. cephfs: &cephfs{
  138. podUID: podUID,
  139. volName: volName,
  140. mounter: mounter,
  141. plugin: plugin},
  142. }, nil
  143. }
  144. func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  145. cephfsVolume := &v1.Volume{
  146. Name: volumeName,
  147. VolumeSource: v1.VolumeSource{
  148. CephFS: &v1.CephFSVolumeSource{
  149. Monitors: []string{},
  150. Path: mountPath,
  151. },
  152. },
  153. }
  154. return volume.NewSpecFromVolume(cephfsVolume), nil
  155. }
  156. // CephFS volumes represent a bare host file or directory mount of an CephFS export.
  157. type cephfs struct {
  158. volName string
  159. podUID types.UID
  160. mon []string
  161. path string
  162. id string
  163. secret string
  164. secretFile string
  165. readonly bool
  166. mounter mount.Interface
  167. plugin *cephfsPlugin
  168. volume.MetricsNil
  169. mountOptions []string
  170. }
  171. type cephfsMounter struct {
  172. *cephfs
  173. }
  174. var _ volume.Mounter = &cephfsMounter{}
  175. func (cephfsVolume *cephfsMounter) GetAttributes() volume.Attributes {
  176. return volume.Attributes{
  177. ReadOnly: cephfsVolume.readonly,
  178. Managed: false,
  179. SupportsSELinux: false,
  180. }
  181. }
  182. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  183. // to mount the volume are available on the underlying node.
  184. // If not, it returns an error
  185. func (cephfsVolume *cephfsMounter) CanMount() error {
  186. return nil
  187. }
  188. // SetUp attaches the disk and bind mounts to the volume path.
  189. func (cephfsVolume *cephfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
  190. return cephfsVolume.SetUpAt(cephfsVolume.GetPath(), mounterArgs)
  191. }
  192. // SetUpAt attaches the disk and bind mounts to the volume path.
  193. func (cephfsVolume *cephfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  194. notMnt, err := cephfsVolume.mounter.IsLikelyNotMountPoint(dir)
  195. klog.V(4).Infof("CephFS mount set up: %s %v %v", dir, !notMnt, err)
  196. if err != nil && !os.IsNotExist(err) {
  197. return err
  198. }
  199. if !notMnt {
  200. return nil
  201. }
  202. if err := os.MkdirAll(dir, 0750); err != nil {
  203. return err
  204. }
  205. // check whether it belongs to fuse, if not, default to use kernel mount.
  206. if cephfsVolume.checkFuseMount() {
  207. klog.V(4).Info("CephFS fuse mount.")
  208. err = cephfsVolume.execFuseMount(dir)
  209. // cleanup no matter if fuse mount fail.
  210. keyringPath := cephfsVolume.GetKeyringPath()
  211. _, StatErr := os.Stat(keyringPath)
  212. if !os.IsNotExist(StatErr) {
  213. os.RemoveAll(keyringPath)
  214. }
  215. if err == nil {
  216. // cephfs fuse mount succeeded.
  217. return nil
  218. }
  219. // if cephfs fuse mount failed, fallback to kernel mount.
  220. klog.V(2).Infof("CephFS fuse mount failed: %v, fallback to kernel mount.", err)
  221. }
  222. klog.V(4).Info("CephFS kernel mount.")
  223. err = cephfsVolume.execMount(dir)
  224. if err != nil {
  225. // cleanup upon failure.
  226. mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
  227. return err
  228. }
  229. return nil
  230. }
  231. type cephfsUnmounter struct {
  232. *cephfs
  233. }
  234. var _ volume.Unmounter = &cephfsUnmounter{}
  235. // TearDown unmounts the bind mount
  236. func (cephfsVolume *cephfsUnmounter) TearDown() error {
  237. return cephfsVolume.TearDownAt(cephfsVolume.GetPath())
  238. }
  239. // TearDownAt unmounts the bind mount
  240. func (cephfsVolume *cephfsUnmounter) TearDownAt(dir string) error {
  241. return mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
  242. }
  243. // GetPath creates global mount path
  244. func (cephfsVolume *cephfs) GetPath() string {
  245. name := cephfsPluginName
  246. return cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
  247. }
  248. // GetKeyringPath creates cephfuse keyring path
  249. func (cephfsVolume *cephfs) GetKeyringPath() string {
  250. name := cephfsPluginName
  251. volumeDir := cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
  252. volumeKeyringDir := volumeDir + "~keyring"
  253. return volumeKeyringDir
  254. }
  255. func (cephfsVolume *cephfs) execMount(mountpoint string) error {
  256. // cephfs mount option
  257. cephOpt := ""
  258. // override secretfile if secret is provided
  259. if cephfsVolume.secret != "" {
  260. cephOpt = "name=" + cephfsVolume.id + ",secret=" + cephfsVolume.secret
  261. } else {
  262. cephOpt = "name=" + cephfsVolume.id + ",secretfile=" + cephfsVolume.secretFile
  263. }
  264. // build option array
  265. opt := []string{}
  266. if cephfsVolume.readonly {
  267. opt = append(opt, "ro")
  268. }
  269. opt = append(opt, cephOpt)
  270. // build src like mon1:6789,mon2:6789,mon3:6789:/
  271. src := strings.Join(cephfsVolume.mon, ",") + ":" + cephfsVolume.path
  272. opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
  273. if err := cephfsVolume.mounter.Mount(src, mountpoint, "ceph", opt); err != nil {
  274. return fmt.Errorf("CephFS: mount failed: %v", err)
  275. }
  276. return nil
  277. }
  278. func (cephfsVolume *cephfsMounter) checkFuseMount() bool {
  279. execute := cephfsVolume.plugin.host.GetExec(cephfsVolume.plugin.GetPluginName())
  280. switch runtime.GOOS {
  281. case "linux":
  282. if _, err := execute.Command("/usr/bin/test", "-x", "/sbin/mount.fuse.ceph").CombinedOutput(); err == nil {
  283. klog.V(4).Info("/sbin/mount.fuse.ceph exists, it should be fuse mount.")
  284. return true
  285. }
  286. return false
  287. }
  288. return false
  289. }
  290. func (cephfsVolume *cephfs) execFuseMount(mountpoint string) error {
  291. // cephfs keyring file
  292. keyringFile := ""
  293. // override secretfile if secret is provided
  294. if cephfsVolume.secret != "" {
  295. // TODO: cephfs fuse currently doesn't support secret option,
  296. // remove keyring file create once secret option is supported.
  297. klog.V(4).Info("cephfs mount begin using fuse.")
  298. keyringPath := cephfsVolume.GetKeyringPath()
  299. os.MkdirAll(keyringPath, 0750)
  300. payload := make(map[string]util.FileProjection, 1)
  301. var fileProjection util.FileProjection
  302. keyring := fmt.Sprintf("[client.%s]\nkey = %s\n", cephfsVolume.id, cephfsVolume.secret)
  303. fileProjection.Data = []byte(keyring)
  304. fileProjection.Mode = int32(0644)
  305. fileName := cephfsVolume.id + ".keyring"
  306. payload[fileName] = fileProjection
  307. writerContext := fmt.Sprintf("cephfuse:%v.keyring", cephfsVolume.id)
  308. writer, err := util.NewAtomicWriter(keyringPath, writerContext)
  309. if err != nil {
  310. klog.Errorf("failed to create atomic writer: %v", err)
  311. return err
  312. }
  313. err = writer.Write(payload)
  314. if err != nil {
  315. klog.Errorf("failed to write payload to dir: %v", err)
  316. return err
  317. }
  318. keyringFile = filepath.Join(keyringPath, fileName)
  319. } else {
  320. keyringFile = cephfsVolume.secretFile
  321. }
  322. // build src like mon1:6789,mon2:6789,mon3:6789:/
  323. src := strings.Join(cephfsVolume.mon, ",")
  324. mountArgs := []string{}
  325. mountArgs = append(mountArgs, "-k")
  326. mountArgs = append(mountArgs, keyringFile)
  327. mountArgs = append(mountArgs, "-m")
  328. mountArgs = append(mountArgs, src)
  329. mountArgs = append(mountArgs, mountpoint)
  330. mountArgs = append(mountArgs, "-r")
  331. mountArgs = append(mountArgs, cephfsVolume.path)
  332. mountArgs = append(mountArgs, "--id")
  333. mountArgs = append(mountArgs, cephfsVolume.id)
  334. // build option array
  335. opt := []string{}
  336. if cephfsVolume.readonly {
  337. opt = append(opt, "ro")
  338. }
  339. opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
  340. if len(opt) > 0 {
  341. mountArgs = append(mountArgs, "-o")
  342. mountArgs = append(mountArgs, strings.Join(opt, ","))
  343. }
  344. klog.V(4).Infof("Mounting cmd ceph-fuse with arguments (%s)", mountArgs)
  345. command := exec.Command("ceph-fuse", mountArgs...)
  346. output, err := command.CombinedOutput()
  347. if err != nil || !(strings.Contains(string(output), "starting fuse")) {
  348. return fmt.Errorf("Ceph-fuse failed: %v\narguments: %s\nOutput: %s", err, mountArgs, string(output))
  349. }
  350. return nil
  351. }
  352. func getVolumeSource(spec *volume.Spec) ([]string, string, string, string, bool, error) {
  353. if spec.Volume != nil && spec.Volume.CephFS != nil {
  354. mon := spec.Volume.CephFS.Monitors
  355. path := spec.Volume.CephFS.Path
  356. user := spec.Volume.CephFS.User
  357. secretFile := spec.Volume.CephFS.SecretFile
  358. readOnly := spec.Volume.CephFS.ReadOnly
  359. return mon, path, user, secretFile, readOnly, nil
  360. } else if spec.PersistentVolume != nil &&
  361. spec.PersistentVolume.Spec.CephFS != nil {
  362. mon := spec.PersistentVolume.Spec.CephFS.Monitors
  363. path := spec.PersistentVolume.Spec.CephFS.Path
  364. user := spec.PersistentVolume.Spec.CephFS.User
  365. secretFile := spec.PersistentVolume.Spec.CephFS.SecretFile
  366. readOnly := spec.PersistentVolume.Spec.CephFS.ReadOnly
  367. return mon, path, user, secretFile, readOnly, nil
  368. }
  369. return nil, "", "", "", false, fmt.Errorf("Spec does not reference a CephFS volume type")
  370. }
  371. func getSecretNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
  372. if spec.Volume != nil && spec.Volume.CephFS != nil {
  373. localSecretRef := spec.Volume.CephFS.SecretRef
  374. if localSecretRef != nil {
  375. return localSecretRef.Name, defaultNamespace, nil
  376. }
  377. return "", "", nil
  378. } else if spec.PersistentVolume != nil &&
  379. spec.PersistentVolume.Spec.CephFS != nil {
  380. secretRef := spec.PersistentVolume.Spec.CephFS.SecretRef
  381. secretNs := defaultNamespace
  382. if secretRef != nil {
  383. if len(secretRef.Namespace) != 0 {
  384. secretNs = secretRef.Namespace
  385. }
  386. return secretRef.Name, secretNs, nil
  387. }
  388. return "", "", nil
  389. }
  390. return "", "", fmt.Errorf("Spec does not reference an CephFS volume type")
  391. }