cephfs.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cephfs
  14. import (
  15. "fmt"
  16. "os"
  17. "os/exec"
  18. "path/filepath"
  19. "runtime"
  20. "strings"
  21. "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/klog"
  25. "k8s.io/kubernetes/pkg/util/mount"
  26. "k8s.io/kubernetes/pkg/volume"
  27. "k8s.io/kubernetes/pkg/volume/util"
  28. utilstrings "k8s.io/utils/strings"
  29. )
  30. // ProbeVolumePlugins is the primary entrypoint for volume plugins.
  31. func ProbeVolumePlugins() []volume.VolumePlugin {
  32. return []volume.VolumePlugin{&cephfsPlugin{nil}}
  33. }
  34. type cephfsPlugin struct {
  35. host volume.VolumeHost
  36. }
  37. var _ volume.VolumePlugin = &cephfsPlugin{}
  38. const (
  39. cephfsPluginName = "kubernetes.io/cephfs"
  40. )
  41. func (plugin *cephfsPlugin) Init(host volume.VolumeHost) error {
  42. plugin.host = host
  43. return nil
  44. }
  45. func (plugin *cephfsPlugin) GetPluginName() string {
  46. return cephfsPluginName
  47. }
  48. func (plugin *cephfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  49. mon, _, _, _, _, err := getVolumeSource(spec)
  50. if err != nil {
  51. return "", err
  52. }
  53. return fmt.Sprintf("%v", mon), nil
  54. }
  55. func (plugin *cephfsPlugin) CanSupport(spec *volume.Spec) bool {
  56. return (spec.Volume != nil && spec.Volume.CephFS != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CephFS != nil)
  57. }
  58. func (plugin *cephfsPlugin) IsMigratedToCSI() bool {
  59. return false
  60. }
  61. func (plugin *cephfsPlugin) RequiresRemount() bool {
  62. return false
  63. }
  64. func (plugin *cephfsPlugin) SupportsMountOption() bool {
  65. return true
  66. }
  67. func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool {
  68. return false
  69. }
  70. func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  71. return []v1.PersistentVolumeAccessMode{
  72. v1.ReadWriteOnce,
  73. v1.ReadOnlyMany,
  74. v1.ReadWriteMany,
  75. }
  76. }
  77. func (plugin *cephfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  78. secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
  79. if err != nil {
  80. return nil, err
  81. }
  82. secret := ""
  83. if len(secretName) > 0 && len(secretNs) > 0 {
  84. // if secret is provideded, retrieve it
  85. kubeClient := plugin.host.GetKubeClient()
  86. if kubeClient == nil {
  87. return nil, fmt.Errorf("Cannot get kube client")
  88. }
  89. secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(secretName, metav1.GetOptions{})
  90. if err != nil {
  91. err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
  92. return nil, err
  93. }
  94. for name, data := range secrets.Data {
  95. secret = string(data)
  96. klog.V(4).Infof("found ceph secret info: %s", name)
  97. }
  98. }
  99. return plugin.newMounterInternal(spec, pod.UID, plugin.host.GetMounter(plugin.GetPluginName()), secret)
  100. }
  101. func (plugin *cephfsPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, mounter mount.Interface, secret string) (volume.Mounter, error) {
  102. mon, path, id, secretFile, readOnly, err := getVolumeSource(spec)
  103. if err != nil {
  104. return nil, err
  105. }
  106. if id == "" {
  107. id = "admin"
  108. }
  109. if path == "" {
  110. path = "/"
  111. }
  112. if !strings.HasPrefix(path, "/") {
  113. path = "/" + path
  114. }
  115. if secretFile == "" {
  116. secretFile = "/etc/ceph/" + id + ".secret"
  117. }
  118. return &cephfsMounter{
  119. cephfs: &cephfs{
  120. podUID: podUID,
  121. volName: spec.Name(),
  122. mon: mon,
  123. path: path,
  124. secret: secret,
  125. id: id,
  126. secretFile: secretFile,
  127. readonly: readOnly,
  128. mounter: mounter,
  129. plugin: plugin,
  130. mountOptions: util.MountOptionFromSpec(spec),
  131. },
  132. }, nil
  133. }
  134. func (plugin *cephfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  135. return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName()))
  136. }
  137. func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) {
  138. return &cephfsUnmounter{
  139. cephfs: &cephfs{
  140. podUID: podUID,
  141. volName: volName,
  142. mounter: mounter,
  143. plugin: plugin},
  144. }, nil
  145. }
  146. func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  147. cephfsVolume := &v1.Volume{
  148. Name: volumeName,
  149. VolumeSource: v1.VolumeSource{
  150. CephFS: &v1.CephFSVolumeSource{
  151. Monitors: []string{},
  152. Path: mountPath,
  153. },
  154. },
  155. }
  156. return volume.NewSpecFromVolume(cephfsVolume), nil
  157. }
  158. // CephFS volumes represent a bare host file or directory mount of an CephFS export.
  159. type cephfs struct {
  160. volName string
  161. podUID types.UID
  162. mon []string
  163. path string
  164. id string
  165. secret string
  166. secretFile string
  167. readonly bool
  168. mounter mount.Interface
  169. plugin *cephfsPlugin
  170. volume.MetricsNil
  171. mountOptions []string
  172. }
  173. type cephfsMounter struct {
  174. *cephfs
  175. }
  176. var _ volume.Mounter = &cephfsMounter{}
  177. func (cephfsVolume *cephfsMounter) GetAttributes() volume.Attributes {
  178. return volume.Attributes{
  179. ReadOnly: cephfsVolume.readonly,
  180. Managed: false,
  181. SupportsSELinux: false,
  182. }
  183. }
  184. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  185. // to mount the volume are available on the underlying node.
  186. // If not, it returns an error
  187. func (cephfsVolume *cephfsMounter) CanMount() error {
  188. return nil
  189. }
  190. // SetUp attaches the disk and bind mounts to the volume path.
  191. func (cephfsVolume *cephfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
  192. return cephfsVolume.SetUpAt(cephfsVolume.GetPath(), mounterArgs)
  193. }
  194. // SetUpAt attaches the disk and bind mounts to the volume path.
  195. func (cephfsVolume *cephfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  196. notMnt, err := cephfsVolume.mounter.IsLikelyNotMountPoint(dir)
  197. klog.V(4).Infof("CephFS mount set up: %s %v %v", dir, !notMnt, err)
  198. if err != nil && !os.IsNotExist(err) {
  199. return err
  200. }
  201. if !notMnt {
  202. return nil
  203. }
  204. if err := os.MkdirAll(dir, 0750); err != nil {
  205. return err
  206. }
  207. // check whether it belongs to fuse, if not, default to use kernel mount.
  208. if cephfsVolume.checkFuseMount() {
  209. klog.V(4).Info("CephFS fuse mount.")
  210. err = cephfsVolume.execFuseMount(dir)
  211. // cleanup no matter if fuse mount fail.
  212. keyringPath := cephfsVolume.GetKeyringPath()
  213. _, StatErr := os.Stat(keyringPath)
  214. if !os.IsNotExist(StatErr) {
  215. os.RemoveAll(keyringPath)
  216. }
  217. if err == nil {
  218. // cephfs fuse mount succeeded.
  219. return nil
  220. }
  221. // if cephfs fuse mount failed, fallback to kernel mount.
  222. klog.V(2).Infof("CephFS fuse mount failed: %v, fallback to kernel mount.", err)
  223. }
  224. klog.V(4).Info("CephFS kernel mount.")
  225. err = cephfsVolume.execMount(dir)
  226. if err != nil {
  227. // cleanup upon failure.
  228. mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
  229. return err
  230. }
  231. return nil
  232. }
  233. type cephfsUnmounter struct {
  234. *cephfs
  235. }
  236. var _ volume.Unmounter = &cephfsUnmounter{}
  237. // TearDown unmounts the bind mount
  238. func (cephfsVolume *cephfsUnmounter) TearDown() error {
  239. return cephfsVolume.TearDownAt(cephfsVolume.GetPath())
  240. }
  241. // TearDownAt unmounts the bind mount
  242. func (cephfsVolume *cephfsUnmounter) TearDownAt(dir string) error {
  243. return mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
  244. }
  245. // GetPath creates global mount path
  246. func (cephfsVolume *cephfs) GetPath() string {
  247. name := cephfsPluginName
  248. return cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
  249. }
  250. // GetKeyringPath creates cephfuse keyring path
  251. func (cephfsVolume *cephfs) GetKeyringPath() string {
  252. name := cephfsPluginName
  253. volumeDir := cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
  254. volumeKeyringDir := volumeDir + "~keyring"
  255. return volumeKeyringDir
  256. }
  257. func (cephfsVolume *cephfs) execMount(mountpoint string) error {
  258. // cephfs mount option
  259. cephOpt := ""
  260. // override secretfile if secret is provided
  261. if cephfsVolume.secret != "" {
  262. cephOpt = "name=" + cephfsVolume.id + ",secret=" + cephfsVolume.secret
  263. } else {
  264. cephOpt = "name=" + cephfsVolume.id + ",secretfile=" + cephfsVolume.secretFile
  265. }
  266. // build option array
  267. opt := []string{}
  268. if cephfsVolume.readonly {
  269. opt = append(opt, "ro")
  270. }
  271. opt = append(opt, cephOpt)
  272. // build src like mon1:6789,mon2:6789,mon3:6789:/
  273. hosts := cephfsVolume.mon
  274. l := len(hosts)
  275. // pass all monitors and let ceph randomize and fail over
  276. i := 0
  277. src := ""
  278. for i = 0; i < l-1; i++ {
  279. src += hosts[i] + ","
  280. }
  281. src += hosts[i] + ":" + cephfsVolume.path
  282. opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
  283. if err := cephfsVolume.mounter.Mount(src, mountpoint, "ceph", opt); err != nil {
  284. return fmt.Errorf("CephFS: mount failed: %v", err)
  285. }
  286. return nil
  287. }
  288. func (cephfsVolume *cephfsMounter) checkFuseMount() bool {
  289. execute := cephfsVolume.plugin.host.GetExec(cephfsVolume.plugin.GetPluginName())
  290. switch runtime.GOOS {
  291. case "linux":
  292. if _, err := execute.Run("/usr/bin/test", "-x", "/sbin/mount.fuse.ceph"); err == nil {
  293. klog.V(4).Info("/sbin/mount.fuse.ceph exists, it should be fuse mount.")
  294. return true
  295. }
  296. return false
  297. }
  298. return false
  299. }
  300. func (cephfsVolume *cephfs) execFuseMount(mountpoint string) error {
  301. // cephfs keyring file
  302. keyringFile := ""
  303. // override secretfile if secret is provided
  304. if cephfsVolume.secret != "" {
  305. // TODO: cephfs fuse currently doesn't support secret option,
  306. // remove keyring file create once secret option is supported.
  307. klog.V(4).Info("cephfs mount begin using fuse.")
  308. keyringPath := cephfsVolume.GetKeyringPath()
  309. os.MkdirAll(keyringPath, 0750)
  310. payload := make(map[string]util.FileProjection, 1)
  311. var fileProjection util.FileProjection
  312. keyring := fmt.Sprintf("[client.%s]\nkey = %s\n", cephfsVolume.id, cephfsVolume.secret)
  313. fileProjection.Data = []byte(keyring)
  314. fileProjection.Mode = int32(0644)
  315. fileName := cephfsVolume.id + ".keyring"
  316. payload[fileName] = fileProjection
  317. writerContext := fmt.Sprintf("cephfuse:%v.keyring", cephfsVolume.id)
  318. writer, err := util.NewAtomicWriter(keyringPath, writerContext)
  319. if err != nil {
  320. klog.Errorf("failed to create atomic writer: %v", err)
  321. return err
  322. }
  323. err = writer.Write(payload)
  324. if err != nil {
  325. klog.Errorf("failed to write payload to dir: %v", err)
  326. return err
  327. }
  328. keyringFile = filepath.Join(keyringPath, fileName)
  329. } else {
  330. keyringFile = cephfsVolume.secretFile
  331. }
  332. // build src like mon1:6789,mon2:6789,mon3:6789:/
  333. hosts := cephfsVolume.mon
  334. l := len(hosts)
  335. // pass all monitors and let ceph randomize and fail over
  336. i := 0
  337. src := ""
  338. for i = 0; i < l-1; i++ {
  339. src += hosts[i] + ","
  340. }
  341. src += hosts[i]
  342. mountArgs := []string{}
  343. mountArgs = append(mountArgs, "-k")
  344. mountArgs = append(mountArgs, keyringFile)
  345. mountArgs = append(mountArgs, "-m")
  346. mountArgs = append(mountArgs, src)
  347. mountArgs = append(mountArgs, mountpoint)
  348. mountArgs = append(mountArgs, "-r")
  349. mountArgs = append(mountArgs, cephfsVolume.path)
  350. mountArgs = append(mountArgs, "--id")
  351. mountArgs = append(mountArgs, cephfsVolume.id)
  352. // build option array
  353. opt := []string{}
  354. if cephfsVolume.readonly {
  355. opt = append(opt, "ro")
  356. }
  357. opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
  358. if len(opt) > 0 {
  359. mountArgs = append(mountArgs, "-o")
  360. mountArgs = append(mountArgs, strings.Join(opt, ","))
  361. }
  362. klog.V(4).Infof("Mounting cmd ceph-fuse with arguments (%s)", mountArgs)
  363. command := exec.Command("ceph-fuse", mountArgs...)
  364. output, err := command.CombinedOutput()
  365. if err != nil || !(strings.Contains(string(output), "starting fuse")) {
  366. return fmt.Errorf("Ceph-fuse failed: %v\narguments: %s\nOutput: %s", err, mountArgs, string(output))
  367. }
  368. return nil
  369. }
  370. func getVolumeSource(spec *volume.Spec) ([]string, string, string, string, bool, error) {
  371. if spec.Volume != nil && spec.Volume.CephFS != nil {
  372. mon := spec.Volume.CephFS.Monitors
  373. path := spec.Volume.CephFS.Path
  374. user := spec.Volume.CephFS.User
  375. secretFile := spec.Volume.CephFS.SecretFile
  376. readOnly := spec.Volume.CephFS.ReadOnly
  377. return mon, path, user, secretFile, readOnly, nil
  378. } else if spec.PersistentVolume != nil &&
  379. spec.PersistentVolume.Spec.CephFS != nil {
  380. mon := spec.PersistentVolume.Spec.CephFS.Monitors
  381. path := spec.PersistentVolume.Spec.CephFS.Path
  382. user := spec.PersistentVolume.Spec.CephFS.User
  383. secretFile := spec.PersistentVolume.Spec.CephFS.SecretFile
  384. readOnly := spec.PersistentVolume.Spec.CephFS.ReadOnly
  385. return mon, path, user, secretFile, readOnly, nil
  386. }
  387. return nil, "", "", "", false, fmt.Errorf("Spec does not reference a CephFS volume type")
  388. }
  389. func getSecretNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
  390. if spec.Volume != nil && spec.Volume.CephFS != nil {
  391. localSecretRef := spec.Volume.CephFS.SecretRef
  392. if localSecretRef != nil {
  393. return localSecretRef.Name, defaultNamespace, nil
  394. }
  395. return "", "", nil
  396. } else if spec.PersistentVolume != nil &&
  397. spec.PersistentVolume.Spec.CephFS != nil {
  398. secretRef := spec.PersistentVolume.Spec.CephFS.SecretRef
  399. secretNs := defaultNamespace
  400. if secretRef != nil {
  401. if len(secretRef.Namespace) != 0 {
  402. secretNs = secretRef.Namespace
  403. }
  404. return secretRef.Name, secretNs, nil
  405. }
  406. return "", "", nil
  407. }
  408. return "", "", fmt.Errorf("Spec does not reference an CephFS volume type")
  409. }