flocker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package flocker
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "time"
  19. flockerapi "github.com/clusterhq/flocker-go"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/klog"
  23. "k8s.io/kubernetes/pkg/util/env"
  24. "k8s.io/kubernetes/pkg/util/mount"
  25. "k8s.io/kubernetes/pkg/volume"
  26. utilstrings "k8s.io/utils/strings"
  27. )
  28. // ProbeVolumePlugins is the primary entrypoint for volume plugins.
  29. func ProbeVolumePlugins() []volume.VolumePlugin {
  30. return []volume.VolumePlugin{&flockerPlugin{nil}}
  31. }
  32. type flockerPlugin struct {
  33. host volume.VolumeHost
  34. }
  35. type flockerVolume struct {
  36. volName string
  37. podUID types.UID
  38. // dataset metadata name deprecated
  39. datasetName string
  40. // dataset uuid
  41. datasetUUID string
  42. //pod *v1.Pod
  43. flockerClient flockerapi.Clientable
  44. manager volumeManager
  45. plugin *flockerPlugin
  46. mounter mount.Interface
  47. volume.MetricsProvider
  48. }
  49. var _ volume.VolumePlugin = &flockerPlugin{}
  50. var _ volume.PersistentVolumePlugin = &flockerPlugin{}
  51. var _ volume.DeletableVolumePlugin = &flockerPlugin{}
  52. var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
  53. const (
  54. flockerPluginName = "kubernetes.io/flocker"
  55. defaultHost = "localhost"
  56. defaultPort = 4523
  57. defaultCACertFile = "/etc/flocker/cluster.crt"
  58. defaultClientKeyFile = "/etc/flocker/apiuser.key"
  59. defaultClientCertFile = "/etc/flocker/apiuser.crt"
  60. defaultMountPath = "/flocker"
  61. timeoutWaitingForVolume = 2 * time.Minute
  62. tickerWaitingForVolume = 5 * time.Second
  63. )
  64. func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
  65. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName)
  66. }
  67. func makeGlobalFlockerPath(datasetUUID string) string {
  68. return filepath.Join(defaultMountPath, datasetUUID)
  69. }
  70. func (p *flockerPlugin) Init(host volume.VolumeHost) error {
  71. p.host = host
  72. return nil
  73. }
  74. func (p *flockerPlugin) GetPluginName() string {
  75. return flockerPluginName
  76. }
  77. func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  78. volumeSource, _, err := getVolumeSource(spec)
  79. if err != nil {
  80. return "", err
  81. }
  82. return volumeSource.DatasetName, nil
  83. }
  84. func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
  85. return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
  86. (spec.Volume != nil && spec.Volume.Flocker != nil)
  87. }
  88. func (p *flockerPlugin) IsMigratedToCSI() bool {
  89. return false
  90. }
  91. func (p *flockerPlugin) RequiresRemount() bool {
  92. return false
  93. }
  94. func (p *flockerPlugin) SupportsMountOption() bool {
  95. return false
  96. }
  97. func (p *flockerPlugin) SupportsBulkVolumeVerification() bool {
  98. return false
  99. }
  100. func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  101. return []v1.PersistentVolumeAccessMode{
  102. v1.ReadWriteOnce,
  103. }
  104. }
  105. func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) {
  106. // AFAIK this will always be r/w, but perhaps for the future it will be needed
  107. readOnly := false
  108. if spec.Volume != nil && spec.Volume.Flocker != nil {
  109. return spec.Volume.Flocker, readOnly
  110. }
  111. return spec.PersistentVolume.Spec.Flocker, readOnly
  112. }
  113. func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  114. // Inject real implementations here, test through the internal function.
  115. return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
  116. }
  117. func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
  118. volumeSource, readOnly, err := getVolumeSource(spec)
  119. if err != nil {
  120. return nil, err
  121. }
  122. datasetName := volumeSource.DatasetName
  123. datasetUUID := volumeSource.DatasetUUID
  124. return &flockerVolumeMounter{
  125. flockerVolume: &flockerVolume{
  126. podUID: podUID,
  127. volName: spec.Name(),
  128. datasetName: datasetName,
  129. datasetUUID: datasetUUID,
  130. mounter: mounter,
  131. manager: manager,
  132. plugin: p,
  133. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)),
  134. },
  135. readOnly: readOnly}, nil
  136. }
  137. func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  138. // Inject real implementations here, test through the internal function.
  139. return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
  140. }
  141. func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
  142. return &flockerVolumeUnmounter{&flockerVolume{
  143. podUID: podUID,
  144. volName: volName,
  145. manager: manager,
  146. mounter: mounter,
  147. plugin: p,
  148. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
  149. }}, nil
  150. }
  151. func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  152. flockerVolume := &v1.Volume{
  153. Name: volumeName,
  154. VolumeSource: v1.VolumeSource{
  155. Flocker: &v1.FlockerVolumeSource{
  156. DatasetName: volumeName,
  157. },
  158. },
  159. }
  160. return volume.NewSpecFromVolume(flockerVolume), nil
  161. }
  162. func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
  163. // return UUID if set
  164. if len(b.datasetUUID) > 0 {
  165. return b.datasetUUID, nil
  166. }
  167. if b.flockerClient == nil {
  168. return "", fmt.Errorf("Flocker client is not initialized")
  169. }
  170. // lookup in flocker API otherwise
  171. return b.flockerClient.GetDatasetID(b.datasetName)
  172. }
  173. type flockerVolumeMounter struct {
  174. *flockerVolume
  175. readOnly bool
  176. }
  177. func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
  178. return volume.Attributes{
  179. ReadOnly: b.readOnly,
  180. Managed: false,
  181. SupportsSELinux: false,
  182. }
  183. }
  184. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  185. // to mount the volume are available on the underlying node.
  186. // If not, it returns an error
  187. func (b *flockerVolumeMounter) CanMount() error {
  188. return nil
  189. }
  190. func (b *flockerVolumeMounter) GetPath() string {
  191. return getPath(b.podUID, b.volName, b.plugin.host)
  192. }
  193. // SetUp bind mounts the disk global mount to the volume path.
  194. func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
  195. return b.SetUpAt(b.GetPath(), mounterArgs)
  196. }
  197. // newFlockerClient uses environment variables and pod attributes to return a
  198. // flocker client capable of talking with the Flocker control service.
  199. func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {
  200. host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
  201. port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
  202. if err != nil {
  203. return nil, err
  204. }
  205. caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
  206. keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
  207. certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
  208. c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
  209. return c, err
  210. }
  211. func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) {
  212. hostIP, err := b.plugin.host.GetHostIP()
  213. if err != nil {
  214. return nil, err
  215. }
  216. return b.plugin.newFlockerClient(hostIP.String())
  217. }
  218. /*
  219. SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
  220. control service:
  221. 1. Get the dataset id for the given volume name/dir
  222. 2. It should already be there, if it's not the user needs to manually create it
  223. 3. Check the current Primary UUID
  224. 4. If it doesn't match with the Primary UUID that we got on 2, then we will
  225. need to update the Primary UUID for this volume.
  226. 5. Wait until the Primary UUID was updated or timeout.
  227. */
  228. func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  229. var err error
  230. if b.flockerClient == nil {
  231. b.flockerClient, err = b.newFlockerClient()
  232. if err != nil {
  233. return err
  234. }
  235. }
  236. datasetUUID, err := b.GetDatasetUUID()
  237. if err != nil {
  238. return fmt.Errorf("The datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
  239. }
  240. datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
  241. if err != nil {
  242. return fmt.Errorf("The datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
  243. }
  244. primaryUUID, err := b.flockerClient.GetPrimaryUUID()
  245. if err != nil {
  246. return err
  247. }
  248. if datasetState.Primary != primaryUUID {
  249. if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
  250. return err
  251. }
  252. _, err := b.flockerClient.GetDatasetState(datasetUUID)
  253. if err != nil {
  254. return fmt.Errorf("The volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID)
  255. }
  256. }
  257. // TODO: handle failed mounts here.
  258. notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
  259. klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
  260. if err != nil && !os.IsNotExist(err) {
  261. klog.Errorf("cannot validate mount point: %s %v", dir, err)
  262. return err
  263. }
  264. if !notMnt {
  265. return nil
  266. }
  267. if err := os.MkdirAll(dir, 0750); err != nil {
  268. klog.Errorf("mkdir failed on disk %s (%v)", dir, err)
  269. return err
  270. }
  271. // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
  272. options := []string{"bind"}
  273. if b.readOnly {
  274. options = append(options, "ro")
  275. }
  276. globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
  277. klog.V(4).Infof("attempting to mount %s", dir)
  278. err = b.mounter.Mount(globalFlockerPath, dir, "", options)
  279. if err != nil {
  280. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  281. if mntErr != nil {
  282. klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
  283. return err
  284. }
  285. if !notMnt {
  286. if mntErr = b.mounter.Unmount(dir); mntErr != nil {
  287. klog.Errorf("failed to unmount: %v", mntErr)
  288. return err
  289. }
  290. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  291. if mntErr != nil {
  292. klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
  293. return err
  294. }
  295. if !notMnt {
  296. // This is very odd, we don't expect it. We'll try again next sync loop.
  297. klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
  298. return err
  299. }
  300. }
  301. os.Remove(dir)
  302. klog.Errorf("mount of disk %s failed: %v", dir, err)
  303. return err
  304. }
  305. if !b.readOnly {
  306. volume.SetVolumeOwnership(b, mounterArgs.FsGroup)
  307. }
  308. klog.V(4).Infof("successfully mounted %s", dir)
  309. return nil
  310. }
  311. // updateDatasetPrimary will update the primary in Flocker and wait for it to
  312. // be ready. If it never gets to ready state it will timeout and error.
  313. func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
  314. // We need to update the primary and wait for it to be ready
  315. _, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
  316. if err != nil {
  317. return err
  318. }
  319. timeoutChan := time.NewTimer(timeoutWaitingForVolume)
  320. defer timeoutChan.Stop()
  321. tickChan := time.NewTicker(tickerWaitingForVolume)
  322. defer tickChan.Stop()
  323. for {
  324. if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
  325. return nil
  326. }
  327. select {
  328. case <-timeoutChan.C:
  329. return fmt.Errorf(
  330. "Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
  331. datasetUUID, primaryUUID, err,
  332. )
  333. case <-tickChan.C:
  334. break
  335. }
  336. }
  337. }
  338. func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) {
  339. if spec.Volume != nil && spec.Volume.Flocker != nil {
  340. return spec.Volume.Flocker, spec.ReadOnly, nil
  341. } else if spec.PersistentVolume != nil &&
  342. spec.PersistentVolume.Spec.Flocker != nil {
  343. return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
  344. }
  345. return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
  346. }
  347. type flockerVolumeUnmounter struct {
  348. *flockerVolume
  349. }
  350. var _ volume.Unmounter = &flockerVolumeUnmounter{}
  351. func (c *flockerVolumeUnmounter) GetPath() string {
  352. return getPath(c.podUID, c.volName, c.plugin.host)
  353. }
  354. // Unmounts the bind mount, and detaches the disk only if the PD
  355. // resource was the last reference to that disk on the kubelet.
  356. func (c *flockerVolumeUnmounter) TearDown() error {
  357. return c.TearDownAt(c.GetPath())
  358. }
  359. // TearDownAt unmounts the bind mount
  360. func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
  361. return mount.CleanupMountPoint(dir, c.mounter, false)
  362. }
  363. func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
  364. return p.newDeleterInternal(spec, &flockerUtil{})
  365. }
  366. func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
  367. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
  368. return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
  369. }
  370. return &flockerVolumeDeleter{
  371. flockerVolume: &flockerVolume{
  372. volName: spec.Name(),
  373. datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
  374. datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
  375. manager: manager,
  376. }}, nil
  377. }
  378. func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
  379. return p.newProvisionerInternal(options, &flockerUtil{})
  380. }
  381. func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
  382. return &flockerVolumeProvisioner{
  383. flockerVolume: &flockerVolume{
  384. manager: manager,
  385. plugin: p,
  386. },
  387. options: options,
  388. }, nil
  389. }