rbd.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rbd
  14. import (
  15. "context"
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "regexp"
  20. dstrings "strings"
  21. "k8s.io/klog"
  22. utilexec "k8s.io/utils/exec"
  23. "k8s.io/utils/mount"
  24. utilstrings "k8s.io/utils/strings"
  25. v1 "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/resource"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/uuid"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. clientset "k8s.io/client-go/kubernetes"
  33. "k8s.io/kubernetes/pkg/features"
  34. "k8s.io/kubernetes/pkg/volume"
  35. "k8s.io/kubernetes/pkg/volume/util"
  36. volutil "k8s.io/kubernetes/pkg/volume/util"
  37. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  38. )
  39. var (
  40. supportedFeatures = sets.NewString("layering")
  41. )
  42. // This is the primary entrypoint for volume plugins.
  43. func ProbeVolumePlugins() []volume.VolumePlugin {
  44. return []volume.VolumePlugin{&rbdPlugin{}}
  45. }
  46. // rbdPlugin implements Volume.VolumePlugin.
  47. type rbdPlugin struct {
  48. host volume.VolumeHost
  49. }
  50. var _ volume.VolumePlugin = &rbdPlugin{}
  51. var _ volume.PersistentVolumePlugin = &rbdPlugin{}
  52. var _ volume.DeletableVolumePlugin = &rbdPlugin{}
  53. var _ volume.ProvisionableVolumePlugin = &rbdPlugin{}
  54. var _ volume.AttachableVolumePlugin = &rbdPlugin{}
  55. var _ volume.ExpandableVolumePlugin = &rbdPlugin{}
  56. var _ volume.BlockVolumePlugin = &rbdPlugin{}
  57. var _ volume.DeviceMountableVolumePlugin = &rbdPlugin{}
  58. const (
  59. rbdPluginName = "kubernetes.io/rbd"
  60. secretKeyName = "key" // key name used in secret
  61. rbdImageFormat1 = "1"
  62. rbdImageFormat2 = "2"
  63. rbdDefaultAdminId = "admin"
  64. rbdDefaultAdminSecretNamespace = "default"
  65. rbdDefaultPool = "rbd"
  66. )
  67. func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
  68. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(rbdPluginName), volName)
  69. }
  70. func (plugin *rbdPlugin) Init(host volume.VolumeHost) error {
  71. plugin.host = host
  72. return nil
  73. }
  74. func (plugin *rbdPlugin) GetPluginName() string {
  75. return rbdPluginName
  76. }
  77. func (plugin *rbdPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  78. pool, err := getVolumeSourcePool(spec)
  79. if err != nil {
  80. return "", err
  81. }
  82. img, err := getVolumeSourceImage(spec)
  83. if err != nil {
  84. return "", err
  85. }
  86. return fmt.Sprintf(
  87. "%v:%v",
  88. pool,
  89. img), nil
  90. }
  91. func (plugin *rbdPlugin) CanSupport(spec *volume.Spec) bool {
  92. return (spec.Volume != nil && spec.Volume.RBD != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD != nil)
  93. }
  94. func (plugin *rbdPlugin) RequiresRemount() bool {
  95. return false
  96. }
  97. func (plugin *rbdPlugin) SupportsMountOption() bool {
  98. return true
  99. }
  100. func (plugin *rbdPlugin) SupportsBulkVolumeVerification() bool {
  101. return false
  102. }
  103. func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  104. return []v1.PersistentVolumeAccessMode{
  105. v1.ReadWriteOnce,
  106. v1.ReadOnlyMany,
  107. }
  108. }
  109. type rbdVolumeExpander struct {
  110. *rbdMounter
  111. }
  112. func (plugin *rbdPlugin) getAdminAndSecret(spec *volume.Spec) (string, string, error) {
  113. class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
  114. if err != nil {
  115. return "", "", err
  116. }
  117. adminSecretName := ""
  118. adminSecretNamespace := rbdDefaultAdminSecretNamespace
  119. admin := ""
  120. for k, v := range class.Parameters {
  121. switch dstrings.ToLower(k) {
  122. case "adminid":
  123. admin = v
  124. case "adminsecretname":
  125. adminSecretName = v
  126. case "adminsecretnamespace":
  127. adminSecretNamespace = v
  128. }
  129. }
  130. if admin == "" {
  131. admin = rbdDefaultAdminId
  132. }
  133. secret, err := parsePVSecret(adminSecretNamespace, adminSecretName, plugin.host.GetKubeClient())
  134. if err != nil {
  135. return admin, "", fmt.Errorf("failed to get admin secret from [%q/%q]: %v", adminSecretNamespace, adminSecretName, err)
  136. }
  137. return admin, secret, nil
  138. }
  139. func (plugin *rbdPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) {
  140. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD == nil {
  141. return oldSize, fmt.Errorf("spec.PersistentVolume.Spec.RBD is nil")
  142. }
  143. // get admin and secret
  144. admin, secret, err := plugin.getAdminAndSecret(spec)
  145. if err != nil {
  146. return oldSize, err
  147. }
  148. expander := &rbdVolumeExpander{
  149. rbdMounter: &rbdMounter{
  150. rbd: &rbd{
  151. volName: spec.Name(),
  152. Image: spec.PersistentVolume.Spec.RBD.RBDImage,
  153. Pool: spec.PersistentVolume.Spec.RBD.RBDPool,
  154. plugin: plugin,
  155. manager: &RBDUtil{},
  156. mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
  157. exec: plugin.host.GetExec(plugin.GetPluginName()),
  158. },
  159. Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
  160. adminId: admin,
  161. adminSecret: secret,
  162. },
  163. }
  164. expandedSize, err := expander.ResizeImage(oldSize, newSize)
  165. if err != nil {
  166. return oldSize, err
  167. } else {
  168. return expandedSize, nil
  169. }
  170. }
  171. func (plugin *rbdPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
  172. fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
  173. if err != nil {
  174. return false, fmt.Errorf("error checking VolumeMode: %v", err)
  175. }
  176. // if volume is not a fs file system, there is nothing for us to do here.
  177. if !fsVolume {
  178. return true, nil
  179. }
  180. _, err = volutil.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
  181. if err != nil {
  182. return false, err
  183. }
  184. return true, nil
  185. }
  186. var _ volume.NodeExpandableVolumePlugin = &rbdPlugin{}
  187. func (expander *rbdVolumeExpander) ResizeImage(oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error) {
  188. return expander.manager.ExpandImage(expander, oldSize, newSize)
  189. }
  190. func (plugin *rbdPlugin) RequiresFSResize() bool {
  191. return true
  192. }
  193. func (plugin *rbdPlugin) createMounterFromVolumeSpecAndPod(spec *volume.Spec, pod *v1.Pod) (*rbdMounter, error) {
  194. var err error
  195. mon, err := getVolumeSourceMonitors(spec)
  196. if err != nil {
  197. return nil, err
  198. }
  199. img, err := getVolumeSourceImage(spec)
  200. if err != nil {
  201. return nil, err
  202. }
  203. fstype, err := getVolumeSourceFSType(spec)
  204. if err != nil {
  205. return nil, err
  206. }
  207. pool, err := getVolumeSourcePool(spec)
  208. if err != nil {
  209. return nil, err
  210. }
  211. id, err := getVolumeSourceUser(spec)
  212. if err != nil {
  213. return nil, err
  214. }
  215. keyring, err := getVolumeSourceKeyRing(spec)
  216. if err != nil {
  217. return nil, err
  218. }
  219. ro, err := getVolumeSourceReadOnly(spec)
  220. if err != nil {
  221. return nil, err
  222. }
  223. ams, err := getVolumeAccessModes(spec)
  224. if err != nil {
  225. return nil, err
  226. }
  227. secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
  228. if err != nil {
  229. return nil, err
  230. }
  231. secret := ""
  232. if len(secretName) > 0 && len(secretNs) > 0 {
  233. // if secret is provideded, retrieve it
  234. kubeClient := plugin.host.GetKubeClient()
  235. if kubeClient == nil {
  236. return nil, fmt.Errorf("Cannot get kube client")
  237. }
  238. secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
  239. if err != nil {
  240. err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
  241. return nil, err
  242. }
  243. for _, data := range secrets.Data {
  244. secret = string(data)
  245. }
  246. }
  247. return &rbdMounter{
  248. rbd: newRBD("", spec.Name(), img, pool, ro, plugin, &RBDUtil{}),
  249. Mon: mon,
  250. Id: id,
  251. Keyring: keyring,
  252. Secret: secret,
  253. fsType: fstype,
  254. accessModes: ams,
  255. }, nil
  256. }
  257. func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  258. secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
  259. if err != nil {
  260. return nil, err
  261. }
  262. secret := ""
  263. if len(secretName) > 0 && len(secretNs) > 0 {
  264. // if secret is provideded, retrieve it
  265. kubeClient := plugin.host.GetKubeClient()
  266. if kubeClient == nil {
  267. return nil, fmt.Errorf("Cannot get kube client")
  268. }
  269. secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
  270. if err != nil {
  271. err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
  272. return nil, err
  273. }
  274. for _, data := range secrets.Data {
  275. secret = string(data)
  276. }
  277. }
  278. // Inject real implementations here, test through the internal function.
  279. return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, secret)
  280. }
  281. func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string) (volume.Mounter, error) {
  282. mon, err := getVolumeSourceMonitors(spec)
  283. if err != nil {
  284. return nil, err
  285. }
  286. img, err := getVolumeSourceImage(spec)
  287. if err != nil {
  288. return nil, err
  289. }
  290. fstype, err := getVolumeSourceFSType(spec)
  291. if err != nil {
  292. return nil, err
  293. }
  294. pool, err := getVolumeSourcePool(spec)
  295. if err != nil {
  296. return nil, err
  297. }
  298. id, err := getVolumeSourceUser(spec)
  299. if err != nil {
  300. return nil, err
  301. }
  302. keyring, err := getVolumeSourceKeyRing(spec)
  303. if err != nil {
  304. return nil, err
  305. }
  306. ro, err := getVolumeSourceReadOnly(spec)
  307. if err != nil {
  308. return nil, err
  309. }
  310. ams, err := getVolumeAccessModes(spec)
  311. if err != nil {
  312. return nil, err
  313. }
  314. return &rbdMounter{
  315. rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
  316. Mon: mon,
  317. Id: id,
  318. Keyring: keyring,
  319. Secret: secret,
  320. fsType: fstype,
  321. mountOptions: volutil.MountOptionFromSpec(spec),
  322. accessModes: ams,
  323. }, nil
  324. }
  325. func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  326. // Inject real implementations here, test through the internal function.
  327. return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{})
  328. }
  329. func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager) (volume.Unmounter, error) {
  330. return &rbdUnmounter{
  331. rbdMounter: &rbdMounter{
  332. rbd: newRBD(podUID, volName, "", "", false, plugin, manager),
  333. Mon: make([]string, 0),
  334. },
  335. }, nil
  336. }
  337. func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  338. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  339. kvh, ok := plugin.host.(volume.KubeletVolumeHost)
  340. if !ok {
  341. return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
  342. }
  343. hu := kvh.GetHostUtil()
  344. pluginMntDir := volutil.GetPluginMountDir(plugin.host, plugin.GetPluginName())
  345. sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
  346. if err != nil {
  347. return nil, err
  348. }
  349. s := dstrings.Split(sourceName, "-image-")
  350. if len(s) != 2 {
  351. // The mountPath parameter is the volume mount path for a specific pod, its format
  352. // is /var/lib/kubelet/pods/{podUID}/volumes/{volumePluginName}/{volumeName}.
  353. // mounter.GetDeviceNameFromMount will find the device path(such as /dev/rbd0) by
  354. // mountPath first, and then try to find the global device mount path from the mounted
  355. // path list of this device. sourceName is extracted from this global device mount path.
  356. // mounter.GetDeviceNameFromMount expects the global device mount path conforms to canonical
  357. // format: /var/lib/kubelet/plugins/kubernetes.io/rbd/mounts/{pool}-image-{image}.
  358. // If this assertion failed, it means that the global device mount path is created by
  359. // the deprecated format: /var/lib/kubelet/plugins/kubernetes.io/rbd/rbd/{pool}-image-{image}.
  360. // So we will try to check whether this old style global device mount path exist or not.
  361. // If existed, extract the sourceName from this old style path, otherwise return an error.
  362. klog.V(3).Infof("SourceName %s wrong, fallback to old format", sourceName)
  363. sourceName, err = plugin.getDeviceNameFromOldMountPath(mounter, mountPath)
  364. if err != nil {
  365. return nil, err
  366. }
  367. s = dstrings.Split(sourceName, "-image-")
  368. if len(s) != 2 {
  369. return nil, fmt.Errorf("sourceName %s wrong, should be pool+\"-image-\"+imageName", sourceName)
  370. }
  371. }
  372. rbdVolume := &v1.Volume{
  373. Name: volumeName,
  374. VolumeSource: v1.VolumeSource{
  375. RBD: &v1.RBDVolumeSource{
  376. RBDPool: s[0],
  377. RBDImage: s[1],
  378. },
  379. },
  380. }
  381. return volume.NewSpecFromVolume(rbdVolume), nil
  382. }
  383. func (plugin *rbdPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {
  384. pluginDir := plugin.host.GetVolumeDevicePluginDir(rbdPluginName)
  385. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  386. globalMapPathUUID, err := blkutil.FindGlobalMapPathUUIDFromPod(pluginDir, mapPath, podUID)
  387. if err != nil {
  388. return nil, err
  389. }
  390. klog.V(5).Infof("globalMapPathUUID: %v, err: %v", globalMapPathUUID, err)
  391. globalMapPath := filepath.Dir(globalMapPathUUID)
  392. if len(globalMapPath) == 1 {
  393. return nil, fmt.Errorf("failed to retrieve volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
  394. }
  395. return getVolumeSpecFromGlobalMapPath(globalMapPath, volumeName)
  396. }
  397. func getVolumeSpecFromGlobalMapPath(globalMapPath, volumeName string) (*volume.Spec, error) {
  398. // Retrieve volume spec information from globalMapPath
  399. // globalMapPath example:
  400. // plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}
  401. pool, image, err := getPoolAndImageFromMapPath(globalMapPath)
  402. if err != nil {
  403. return nil, err
  404. }
  405. block := v1.PersistentVolumeBlock
  406. rbdVolume := &v1.PersistentVolume{
  407. ObjectMeta: metav1.ObjectMeta{
  408. Name: volumeName,
  409. },
  410. Spec: v1.PersistentVolumeSpec{
  411. PersistentVolumeSource: v1.PersistentVolumeSource{
  412. RBD: &v1.RBDPersistentVolumeSource{
  413. RBDImage: image,
  414. RBDPool: pool,
  415. },
  416. },
  417. VolumeMode: &block,
  418. },
  419. }
  420. return volume.NewSpecFromPersistentVolume(rbdVolume, true), nil
  421. }
  422. func (plugin *rbdPlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
  423. var uid types.UID
  424. if pod != nil {
  425. uid = pod.UID
  426. }
  427. secret := ""
  428. if pod != nil {
  429. secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
  430. if err != nil {
  431. return nil, err
  432. }
  433. if len(secretName) > 0 && len(secretNs) > 0 {
  434. // if secret is provideded, retrieve it
  435. kubeClient := plugin.host.GetKubeClient()
  436. if kubeClient == nil {
  437. return nil, fmt.Errorf("Cannot get kube client")
  438. }
  439. secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
  440. if err != nil {
  441. err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
  442. return nil, err
  443. }
  444. for _, data := range secrets.Data {
  445. secret = string(data)
  446. }
  447. }
  448. }
  449. return plugin.newBlockVolumeMapperInternal(spec, uid, &RBDUtil{}, secret, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
  450. }
  451. func (plugin *rbdPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string, mounter mount.Interface, exec utilexec.Interface) (volume.BlockVolumeMapper, error) {
  452. mon, err := getVolumeSourceMonitors(spec)
  453. if err != nil {
  454. return nil, err
  455. }
  456. img, err := getVolumeSourceImage(spec)
  457. if err != nil {
  458. return nil, err
  459. }
  460. pool, err := getVolumeSourcePool(spec)
  461. if err != nil {
  462. return nil, err
  463. }
  464. id, err := getVolumeSourceUser(spec)
  465. if err != nil {
  466. return nil, err
  467. }
  468. keyring, err := getVolumeSourceKeyRing(spec)
  469. if err != nil {
  470. return nil, err
  471. }
  472. ro, err := getVolumeSourceReadOnly(spec)
  473. if err != nil {
  474. return nil, err
  475. }
  476. return &rbdDiskMapper{
  477. rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
  478. mon: mon,
  479. id: id,
  480. keyring: keyring,
  481. secret: secret,
  482. }, nil
  483. }
  484. func (plugin *rbdPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
  485. return plugin.newUnmapperInternal(volName, podUID, &RBDUtil{})
  486. }
  487. func (plugin *rbdPlugin) newUnmapperInternal(volName string, podUID types.UID, manager diskManager) (volume.BlockVolumeUnmapper, error) {
  488. return &rbdDiskUnmapper{
  489. rbdDiskMapper: &rbdDiskMapper{
  490. rbd: newRBD(podUID, volName, "", "", false, plugin, manager),
  491. mon: make([]string, 0),
  492. },
  493. }, nil
  494. }
  495. func (plugin *rbdPlugin) getDeviceNameFromOldMountPath(mounter mount.Interface, mountPath string) (string, error) {
  496. refs, err := mounter.GetMountRefs(mountPath)
  497. if err != nil {
  498. return "", err
  499. }
  500. // baseMountPath is the prefix of deprecated device global mounted path,
  501. // such as: /var/lib/kubelet/plugins/kubernetes.io/rbd/rbd
  502. baseMountPath := filepath.Join(plugin.host.GetPluginDir(rbdPluginName), "rbd")
  503. for _, ref := range refs {
  504. if dstrings.HasPrefix(ref, baseMountPath) {
  505. return filepath.Rel(baseMountPath, ref)
  506. }
  507. }
  508. return "", fmt.Errorf("can't find source name from mounted path: %s", mountPath)
  509. }
  510. func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
  511. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD == nil {
  512. return nil, fmt.Errorf("spec.PersistentVolume.Spec.RBD is nil")
  513. }
  514. admin, secret, err := plugin.getAdminAndSecret(spec)
  515. if err != nil {
  516. return nil, err
  517. }
  518. return plugin.newDeleterInternal(spec, admin, secret, &RBDUtil{})
  519. }
  520. func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) {
  521. return &rbdVolumeDeleter{
  522. rbdMounter: &rbdMounter{
  523. rbd: newRBD("", spec.Name(), spec.PersistentVolume.Spec.RBD.RBDImage, spec.PersistentVolume.Spec.RBD.RBDPool, false, plugin, manager),
  524. Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
  525. adminId: admin,
  526. adminSecret: secret,
  527. }}, nil
  528. }
  529. func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
  530. return plugin.newProvisionerInternal(options, &RBDUtil{})
  531. }
  532. func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) {
  533. return &rbdVolumeProvisioner{
  534. rbdMounter: &rbdMounter{
  535. rbd: newRBD("", "", "", "", false, plugin, manager),
  536. },
  537. options: options,
  538. }, nil
  539. }
  540. // rbdVolumeProvisioner implements volume.Provisioner interface.
  541. type rbdVolumeProvisioner struct {
  542. *rbdMounter
  543. options volume.VolumeOptions
  544. }
  545. var _ volume.Provisioner = &rbdVolumeProvisioner{}
  546. func (r *rbdVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
  547. if !volutil.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) {
  548. return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes())
  549. }
  550. if r.options.PVC.Spec.Selector != nil {
  551. return nil, fmt.Errorf("claim Selector is not supported")
  552. }
  553. var err error
  554. adminSecretName := ""
  555. adminSecretNamespace := rbdDefaultAdminSecretNamespace
  556. secret := ""
  557. secretName := ""
  558. secretNamespace := ""
  559. keyring := ""
  560. imageFormat := rbdImageFormat2
  561. fstype := ""
  562. for k, v := range r.options.Parameters {
  563. switch dstrings.ToLower(k) {
  564. case "monitors":
  565. arr := dstrings.Split(v, ",")
  566. r.Mon = append(r.Mon, arr...)
  567. case "adminid":
  568. r.adminId = v
  569. case "adminsecretname":
  570. adminSecretName = v
  571. case "adminsecretnamespace":
  572. adminSecretNamespace = v
  573. case "userid":
  574. r.Id = v
  575. case "pool":
  576. r.Pool = v
  577. case "usersecretname":
  578. secretName = v
  579. case "usersecretnamespace":
  580. secretNamespace = v
  581. case "keyring":
  582. keyring = v
  583. case "imageformat":
  584. imageFormat = v
  585. case "imagefeatures":
  586. arr := dstrings.Split(v, ",")
  587. for _, f := range arr {
  588. if !supportedFeatures.Has(f) {
  589. return nil, fmt.Errorf("invalid feature %q for volume plugin %s, supported features are: %v", f, r.plugin.GetPluginName(), supportedFeatures)
  590. } else {
  591. r.imageFeatures = append(r.imageFeatures, f)
  592. }
  593. }
  594. case volume.VolumeParameterFSType:
  595. fstype = v
  596. default:
  597. return nil, fmt.Errorf("invalid option %q for volume plugin %s", k, r.plugin.GetPluginName())
  598. }
  599. }
  600. // sanity check
  601. if imageFormat != rbdImageFormat1 && imageFormat != rbdImageFormat2 {
  602. return nil, fmt.Errorf("invalid ceph imageformat %s, expecting %s or %s",
  603. imageFormat, rbdImageFormat1, rbdImageFormat2)
  604. }
  605. r.imageFormat = imageFormat
  606. if adminSecretName == "" {
  607. return nil, fmt.Errorf("missing Ceph admin secret name")
  608. }
  609. if secret, err = parsePVSecret(adminSecretNamespace, adminSecretName, r.plugin.host.GetKubeClient()); err != nil {
  610. return nil, fmt.Errorf("failed to get admin secret from [%q/%q]: %v", adminSecretNamespace, adminSecretName, err)
  611. }
  612. r.adminSecret = secret
  613. if len(r.Mon) < 1 {
  614. return nil, fmt.Errorf("missing Ceph monitors")
  615. }
  616. if secretName == "" && keyring == "" {
  617. return nil, fmt.Errorf("must specify either keyring or user secret name")
  618. }
  619. if r.adminId == "" {
  620. r.adminId = rbdDefaultAdminId
  621. }
  622. if r.Pool == "" {
  623. r.Pool = rbdDefaultPool
  624. }
  625. if r.Id == "" {
  626. r.Id = r.adminId
  627. }
  628. // create random image name
  629. image := fmt.Sprintf("kubernetes-dynamic-pvc-%s", uuid.NewUUID())
  630. r.rbdMounter.Image = image
  631. rbd, sizeMB, err := r.manager.CreateImage(r)
  632. if err != nil {
  633. klog.Errorf("rbd: create volume failed, err: %v", err)
  634. return nil, err
  635. }
  636. klog.Infof("successfully created rbd image %q", image)
  637. pv := new(v1.PersistentVolume)
  638. metav1.SetMetaDataAnnotation(&pv.ObjectMeta, volutil.VolumeDynamicallyCreatedByKey, "rbd-dynamic-provisioner")
  639. if secretName != "" {
  640. rbd.SecretRef = new(v1.SecretReference)
  641. rbd.SecretRef.Name = secretName
  642. rbd.SecretRef.Namespace = secretNamespace
  643. } else {
  644. var filePathRegex = regexp.MustCompile(`^(?:/[^/!;` + "`" + ` ]+)+$`)
  645. if keyring != "" && !filePathRegex.MatchString(keyring) {
  646. return nil, fmt.Errorf("keyring field must contain a path to a file")
  647. }
  648. rbd.Keyring = keyring
  649. }
  650. var volumeMode *v1.PersistentVolumeMode
  651. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
  652. volumeMode = r.options.PVC.Spec.VolumeMode
  653. if volumeMode != nil && *volumeMode == v1.PersistentVolumeBlock {
  654. // Block volumes should not have any FSType
  655. fstype = ""
  656. }
  657. }
  658. rbd.RadosUser = r.Id
  659. rbd.FSType = fstype
  660. pv.Spec.PersistentVolumeSource.RBD = rbd
  661. pv.Spec.PersistentVolumeReclaimPolicy = r.options.PersistentVolumeReclaimPolicy
  662. pv.Spec.AccessModes = r.options.PVC.Spec.AccessModes
  663. if len(pv.Spec.AccessModes) == 0 {
  664. pv.Spec.AccessModes = r.plugin.GetAccessModes()
  665. }
  666. pv.Spec.Capacity = v1.ResourceList{
  667. v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dMi", sizeMB)),
  668. }
  669. pv.Spec.MountOptions = r.options.MountOptions
  670. pv.Spec.VolumeMode = volumeMode
  671. return pv, nil
  672. }
  673. // rbdVolumeDeleter implements volume.Deleter interface.
  674. type rbdVolumeDeleter struct {
  675. *rbdMounter
  676. }
  677. var _ volume.Deleter = &rbdVolumeDeleter{}
  678. func (r *rbdVolumeDeleter) GetPath() string {
  679. return getPath(r.podUID, r.volName, r.plugin.host)
  680. }
  681. func (r *rbdVolumeDeleter) Delete() error {
  682. return r.manager.DeleteImage(r)
  683. }
  684. // rbd implmenets volume.Volume interface.
  685. // It's embedded in Mounter/Unmounter/Deleter.
  686. type rbd struct {
  687. volName string
  688. podUID types.UID
  689. Pool string
  690. Image string
  691. ReadOnly bool
  692. plugin *rbdPlugin
  693. mounter *mount.SafeFormatAndMount
  694. exec utilexec.Interface
  695. // Utility interface that provides API calls to the provider to attach/detach disks.
  696. manager diskManager
  697. volume.MetricsProvider `json:"-"`
  698. }
  699. var _ volume.Volume = &rbd{}
  700. func (rbd *rbd) GetPath() string {
  701. // safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up
  702. return getPath(rbd.podUID, rbd.volName, rbd.plugin.host)
  703. }
  704. // newRBD creates a new rbd.
  705. func newRBD(podUID types.UID, volName string, image string, pool string, readOnly bool, plugin *rbdPlugin, manager diskManager) *rbd {
  706. return &rbd{
  707. podUID: podUID,
  708. volName: volName,
  709. Image: image,
  710. Pool: pool,
  711. ReadOnly: readOnly,
  712. plugin: plugin,
  713. mounter: volutil.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host),
  714. exec: plugin.host.GetExec(plugin.GetPluginName()),
  715. manager: manager,
  716. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
  717. }
  718. }
  719. // rbdMounter implements volume.Mounter interface.
  720. // It contains information which need to be persisted in whole life cycle of PV
  721. // on the node. It is persisted at the very beginning in the pod mount point
  722. // directory.
  723. // Note: Capitalized field names of this struct determines the information
  724. // persisted on the disk, DO NOT change them. (TODO: refactoring to use a dedicated struct?)
  725. type rbdMounter struct {
  726. *rbd
  727. // capitalized so they can be exported in persistRBD()
  728. Mon []string
  729. Id string
  730. Keyring string
  731. Secret string
  732. fsType string
  733. adminSecret string
  734. adminId string
  735. mountOptions []string
  736. imageFormat string
  737. imageFeatures []string
  738. accessModes []v1.PersistentVolumeAccessMode
  739. }
  740. var _ volume.Mounter = &rbdMounter{}
  741. func (b *rbd) GetAttributes() volume.Attributes {
  742. return volume.Attributes{
  743. ReadOnly: b.ReadOnly,
  744. Managed: !b.ReadOnly,
  745. SupportsSELinux: true,
  746. }
  747. }
  748. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  749. // to mount the volume are available on the underlying node.
  750. // If not, it returns an error
  751. func (b *rbdMounter) CanMount() error {
  752. return nil
  753. }
  754. func (b *rbdMounter) SetUp(mounterArgs volume.MounterArgs) error {
  755. return b.SetUpAt(b.GetPath(), mounterArgs)
  756. }
  757. func (b *rbdMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  758. // diskSetUp checks mountpoints and prevent repeated calls
  759. klog.V(4).Infof("rbd: attempting to setup at %s", dir)
  760. err := diskSetUp(b.manager, *b, dir, b.mounter, mounterArgs.FsGroup)
  761. if err != nil {
  762. klog.Errorf("rbd: failed to setup at %s %v", dir, err)
  763. }
  764. klog.V(3).Infof("rbd: successfully setup at %s", dir)
  765. return err
  766. }
  767. // rbdUnmounter implements volume.Unmounter interface.
  768. type rbdUnmounter struct {
  769. *rbdMounter
  770. }
  771. var _ volume.Unmounter = &rbdUnmounter{}
  772. // Unmounts the bind mount, and detaches the disk only if the disk
  773. // resource was the last reference to that disk on the kubelet.
  774. func (c *rbdUnmounter) TearDown() error {
  775. return c.TearDownAt(c.GetPath())
  776. }
  777. func (c *rbdUnmounter) TearDownAt(dir string) error {
  778. klog.V(4).Infof("rbd: attempting to teardown at %s", dir)
  779. if pathExists, pathErr := mount.PathExists(dir); pathErr != nil {
  780. return fmt.Errorf("Error checking if path exists: %v", pathErr)
  781. } else if !pathExists {
  782. klog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
  783. return nil
  784. }
  785. err := diskTearDown(c.manager, *c, dir, c.mounter)
  786. if err != nil {
  787. return err
  788. }
  789. klog.V(3).Infof("rbd: successfully teardown at %s", dir)
  790. return nil
  791. }
  792. var _ volume.BlockVolumeMapper = &rbdDiskMapper{}
  793. type rbdDiskMapper struct {
  794. *rbd
  795. mon []string
  796. id string
  797. keyring string
  798. secret string
  799. adminSecret string
  800. adminId string
  801. imageFormat string
  802. imageFeatures []string
  803. }
  804. var _ volume.BlockVolumeUnmapper = &rbdDiskUnmapper{}
  805. var _ volume.CustomBlockVolumeUnmapper = &rbdDiskUnmapper{}
  806. // GetGlobalMapPath returns global map path and error
  807. // path: plugins/kubernetes.io/{PluginName}/volumeDevices/{rbd pool}-image-{rbd image-name}/{podUid}
  808. func (rbd *rbd) GetGlobalMapPath(spec *volume.Spec) (string, error) {
  809. return rbd.rbdGlobalMapPath(spec)
  810. }
  811. // GetPodDeviceMapPath returns pod device map path and volume name
  812. // path: pods/{podUid}/volumeDevices/kubernetes.io~rbd
  813. // volumeName: pv0001
  814. func (rbd *rbd) GetPodDeviceMapPath() (string, string) {
  815. return rbd.rbdPodDeviceMapPath()
  816. }
  817. func (rbd *rbd) rbdGlobalMapPath(spec *volume.Spec) (string, error) {
  818. mon, err := getVolumeSourceMonitors(spec)
  819. if err != nil {
  820. return "", err
  821. }
  822. img, err := getVolumeSourceImage(spec)
  823. if err != nil {
  824. return "", err
  825. }
  826. pool, err := getVolumeSourcePool(spec)
  827. if err != nil {
  828. return "", err
  829. }
  830. ro, err := getVolumeSourceReadOnly(spec)
  831. if err != nil {
  832. return "", err
  833. }
  834. mounter := &rbdMounter{
  835. rbd: newRBD("", spec.Name(), img, pool, ro, rbd.plugin, &RBDUtil{}),
  836. Mon: mon,
  837. }
  838. return rbd.manager.MakeGlobalVDPDName(*mounter.rbd), nil
  839. }
  840. func (rbd *rbd) rbdPodDeviceMapPath() (string, string) {
  841. name := rbdPluginName
  842. return rbd.plugin.host.GetPodVolumeDeviceDir(rbd.podUID, utilstrings.EscapeQualifiedName(name)), rbd.volName
  843. }
  844. type rbdDiskUnmapper struct {
  845. *rbdDiskMapper
  846. }
  847. func getPoolAndImageFromMapPath(mapPath string) (string, string, error) {
  848. pathParts := dstrings.Split(mapPath, "/")
  849. if len(pathParts) < 2 {
  850. return "", "", fmt.Errorf("corrupted mapPath")
  851. }
  852. rbdParts := dstrings.Split(pathParts[len(pathParts)-1], "-image-")
  853. if len(rbdParts) < 2 {
  854. return "", "", fmt.Errorf("corrupted mapPath")
  855. }
  856. return string(rbdParts[0]), string(rbdParts[1]), nil
  857. }
  858. func getBlockVolumeDevice(mapPath string) (string, error) {
  859. pool, image, err := getPoolAndImageFromMapPath(mapPath)
  860. if err != nil {
  861. return "", err
  862. }
  863. // Getting full device path
  864. device, found := getDevFromImageAndPool(pool, image)
  865. if !found {
  866. return "", err
  867. }
  868. return device, nil
  869. }
  870. func (rbd *rbdDiskUnmapper) TearDownDevice(mapPath, _ string) error {
  871. device, err := getBlockVolumeDevice(mapPath)
  872. if err != nil {
  873. return fmt.Errorf("rbd: failed to get loopback for device: %v, err: %v", device, err)
  874. }
  875. err = rbd.manager.DetachBlockDisk(*rbd, mapPath)
  876. if err != nil {
  877. return fmt.Errorf("rbd: failed to detach disk: %s\nError: %v", mapPath, err)
  878. }
  879. klog.V(4).Infof("rbd: %q is unmapped, deleting the directory", mapPath)
  880. err = os.RemoveAll(mapPath)
  881. if err != nil {
  882. return fmt.Errorf("rbd: failed to delete the directory: %s\nError: %v", mapPath, err)
  883. }
  884. klog.V(4).Infof("rbd: successfully detached disk: %s", mapPath)
  885. return nil
  886. }
  887. func (rbd *rbdDiskUnmapper) UnmapPodDevice() error {
  888. return nil
  889. }
  890. func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) {
  891. if spec.Volume != nil && spec.Volume.RBD != nil {
  892. return spec.Volume.RBD.CephMonitors, nil
  893. } else if spec.PersistentVolume != nil &&
  894. spec.PersistentVolume.Spec.RBD != nil {
  895. return spec.PersistentVolume.Spec.RBD.CephMonitors, nil
  896. }
  897. return nil, fmt.Errorf("Spec does not reference a RBD volume type")
  898. }
  899. func getVolumeSourceImage(spec *volume.Spec) (string, error) {
  900. if spec.Volume != nil && spec.Volume.RBD != nil {
  901. return spec.Volume.RBD.RBDImage, nil
  902. } else if spec.PersistentVolume != nil &&
  903. spec.PersistentVolume.Spec.RBD != nil {
  904. return spec.PersistentVolume.Spec.RBD.RBDImage, nil
  905. }
  906. return "", fmt.Errorf("Spec does not reference a RBD volume type")
  907. }
  908. func getVolumeSourceFSType(spec *volume.Spec) (string, error) {
  909. if spec.Volume != nil && spec.Volume.RBD != nil {
  910. return spec.Volume.RBD.FSType, nil
  911. } else if spec.PersistentVolume != nil &&
  912. spec.PersistentVolume.Spec.RBD != nil {
  913. return spec.PersistentVolume.Spec.RBD.FSType, nil
  914. }
  915. return "", fmt.Errorf("Spec does not reference a RBD volume type")
  916. }
  917. func getVolumeSourcePool(spec *volume.Spec) (string, error) {
  918. if spec.Volume != nil && spec.Volume.RBD != nil {
  919. return spec.Volume.RBD.RBDPool, nil
  920. } else if spec.PersistentVolume != nil &&
  921. spec.PersistentVolume.Spec.RBD != nil {
  922. return spec.PersistentVolume.Spec.RBD.RBDPool, nil
  923. }
  924. return "", fmt.Errorf("Spec does not reference a RBD volume type")
  925. }
  926. func getVolumeSourceUser(spec *volume.Spec) (string, error) {
  927. if spec.Volume != nil && spec.Volume.RBD != nil {
  928. return spec.Volume.RBD.RadosUser, nil
  929. } else if spec.PersistentVolume != nil &&
  930. spec.PersistentVolume.Spec.RBD != nil {
  931. return spec.PersistentVolume.Spec.RBD.RadosUser, nil
  932. }
  933. return "", fmt.Errorf("Spec does not reference a RBD volume type")
  934. }
  935. func getVolumeSourceKeyRing(spec *volume.Spec) (string, error) {
  936. if spec.Volume != nil && spec.Volume.RBD != nil {
  937. return spec.Volume.RBD.Keyring, nil
  938. } else if spec.PersistentVolume != nil &&
  939. spec.PersistentVolume.Spec.RBD != nil {
  940. return spec.PersistentVolume.Spec.RBD.Keyring, nil
  941. }
  942. return "", fmt.Errorf("Spec does not reference a RBD volume type")
  943. }
  944. func getVolumeSourceReadOnly(spec *volume.Spec) (bool, error) {
  945. if spec.Volume != nil && spec.Volume.RBD != nil {
  946. return spec.Volume.RBD.ReadOnly, nil
  947. } else if spec.PersistentVolume != nil &&
  948. spec.PersistentVolume.Spec.RBD != nil {
  949. // rbd volumes used as a PersistentVolume gets the ReadOnly flag indirectly through
  950. // the persistent-claim volume used to mount the PV
  951. return spec.ReadOnly, nil
  952. }
  953. return false, fmt.Errorf("Spec does not reference a RBD volume type")
  954. }
  955. func getVolumeAccessModes(spec *volume.Spec) ([]v1.PersistentVolumeAccessMode, error) {
  956. // Only PersistentVolumeSpec has AccessModes
  957. if spec.PersistentVolume != nil {
  958. if spec.PersistentVolume.Spec.RBD != nil {
  959. return spec.PersistentVolume.Spec.AccessModes, nil
  960. } else {
  961. return nil, fmt.Errorf("Spec does not reference a RBD volume type")
  962. }
  963. }
  964. return nil, nil
  965. }
  966. func parsePVSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
  967. secret, err := volutil.GetSecretForPV(namespace, secretName, rbdPluginName, kubeClient)
  968. if err != nil {
  969. klog.Errorf("failed to get secret from [%q/%q]: %+v", namespace, secretName, err)
  970. return "", fmt.Errorf("failed to get secret from [%q/%q]: %+v", namespace, secretName, err)
  971. }
  972. return parseSecretMap(secret)
  973. }
  974. // parseSecretMap locates the secret by key name.
  975. func parseSecretMap(secretMap map[string]string) (string, error) {
  976. if len(secretMap) == 0 {
  977. return "", fmt.Errorf("empty secret map")
  978. }
  979. secret := ""
  980. for k, v := range secretMap {
  981. if k == secretKeyName {
  982. return v, nil
  983. }
  984. secret = v
  985. }
  986. // If not found, the last secret in the map wins as done before
  987. return secret, nil
  988. }
  989. func getSecretNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
  990. if spec.Volume != nil && spec.Volume.RBD != nil {
  991. localSecretRef := spec.Volume.RBD.SecretRef
  992. if localSecretRef != nil {
  993. return localSecretRef.Name, defaultNamespace, nil
  994. }
  995. return "", "", nil
  996. } else if spec.PersistentVolume != nil &&
  997. spec.PersistentVolume.Spec.RBD != nil {
  998. secretRef := spec.PersistentVolume.Spec.RBD.SecretRef
  999. secretNs := defaultNamespace
  1000. if secretRef != nil {
  1001. if len(secretRef.Namespace) != 0 {
  1002. secretNs = secretRef.Namespace
  1003. }
  1004. return secretRef.Name, secretNs, nil
  1005. }
  1006. return "", "", nil
  1007. }
  1008. return "", "", fmt.Errorf("Spec does not reference an RBD volume type")
  1009. }