flocker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package flocker
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "time"
  19. flockerapi "github.com/clusterhq/flocker-go"
  20. "k8s.io/klog"
  21. "k8s.io/utils/mount"
  22. utilstrings "k8s.io/utils/strings"
  23. v1 "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/kubernetes/pkg/util/env"
  26. "k8s.io/kubernetes/pkg/volume"
  27. )
  28. // ProbeVolumePlugins is the primary entrypoint for volume plugins.
  29. func ProbeVolumePlugins() []volume.VolumePlugin {
  30. return []volume.VolumePlugin{&flockerPlugin{nil}}
  31. }
  32. type flockerPlugin struct {
  33. host volume.VolumeHost
  34. }
  35. type flockerVolume struct {
  36. volName string
  37. podUID types.UID
  38. // dataset metadata name deprecated
  39. datasetName string
  40. // dataset uuid
  41. datasetUUID string
  42. //pod *v1.Pod
  43. flockerClient flockerapi.Clientable
  44. manager volumeManager
  45. plugin *flockerPlugin
  46. mounter mount.Interface
  47. volume.MetricsProvider
  48. }
  49. var _ volume.VolumePlugin = &flockerPlugin{}
  50. var _ volume.PersistentVolumePlugin = &flockerPlugin{}
  51. var _ volume.DeletableVolumePlugin = &flockerPlugin{}
  52. var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
  53. const (
  54. flockerPluginName = "kubernetes.io/flocker"
  55. defaultHost = "localhost"
  56. defaultPort = 4523
  57. defaultCACertFile = "/etc/flocker/cluster.crt"
  58. defaultClientKeyFile = "/etc/flocker/apiuser.key"
  59. defaultClientCertFile = "/etc/flocker/apiuser.crt"
  60. defaultMountPath = "/flocker"
  61. timeoutWaitingForVolume = 2 * time.Minute
  62. tickerWaitingForVolume = 5 * time.Second
  63. )
  64. func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
  65. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName)
  66. }
  67. func makeGlobalFlockerPath(datasetUUID string) string {
  68. return filepath.Join(defaultMountPath, datasetUUID)
  69. }
  70. func (p *flockerPlugin) Init(host volume.VolumeHost) error {
  71. p.host = host
  72. return nil
  73. }
  74. func (p *flockerPlugin) GetPluginName() string {
  75. return flockerPluginName
  76. }
  77. func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  78. volumeSource, _, err := getVolumeSource(spec)
  79. if err != nil {
  80. return "", err
  81. }
  82. return volumeSource.DatasetName, nil
  83. }
  84. func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
  85. return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
  86. (spec.Volume != nil && spec.Volume.Flocker != nil)
  87. }
  88. func (p *flockerPlugin) RequiresRemount() bool {
  89. return false
  90. }
  91. func (p *flockerPlugin) SupportsMountOption() bool {
  92. return false
  93. }
  94. func (p *flockerPlugin) SupportsBulkVolumeVerification() bool {
  95. return false
  96. }
  97. func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  98. return []v1.PersistentVolumeAccessMode{
  99. v1.ReadWriteOnce,
  100. }
  101. }
  102. func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) {
  103. // AFAIK this will always be r/w, but perhaps for the future it will be needed
  104. readOnly := false
  105. if spec.Volume != nil && spec.Volume.Flocker != nil {
  106. return spec.Volume.Flocker, readOnly
  107. }
  108. return spec.PersistentVolume.Spec.Flocker, readOnly
  109. }
  110. func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  111. // Inject real implementations here, test through the internal function.
  112. return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
  113. }
  114. func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
  115. volumeSource, readOnly, err := getVolumeSource(spec)
  116. if err != nil {
  117. return nil, err
  118. }
  119. datasetName := volumeSource.DatasetName
  120. datasetUUID := volumeSource.DatasetUUID
  121. return &flockerVolumeMounter{
  122. flockerVolume: &flockerVolume{
  123. podUID: podUID,
  124. volName: spec.Name(),
  125. datasetName: datasetName,
  126. datasetUUID: datasetUUID,
  127. mounter: mounter,
  128. manager: manager,
  129. plugin: p,
  130. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)),
  131. },
  132. readOnly: readOnly}, nil
  133. }
  134. func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  135. // Inject real implementations here, test through the internal function.
  136. return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
  137. }
  138. func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
  139. return &flockerVolumeUnmounter{&flockerVolume{
  140. podUID: podUID,
  141. volName: volName,
  142. manager: manager,
  143. mounter: mounter,
  144. plugin: p,
  145. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
  146. }}, nil
  147. }
  148. func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  149. flockerVolume := &v1.Volume{
  150. Name: volumeName,
  151. VolumeSource: v1.VolumeSource{
  152. Flocker: &v1.FlockerVolumeSource{
  153. DatasetName: volumeName,
  154. },
  155. },
  156. }
  157. return volume.NewSpecFromVolume(flockerVolume), nil
  158. }
  159. func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
  160. // return UUID if set
  161. if len(b.datasetUUID) > 0 {
  162. return b.datasetUUID, nil
  163. }
  164. if b.flockerClient == nil {
  165. return "", fmt.Errorf("Flocker client is not initialized")
  166. }
  167. // lookup in flocker API otherwise
  168. return b.flockerClient.GetDatasetID(b.datasetName)
  169. }
  170. type flockerVolumeMounter struct {
  171. *flockerVolume
  172. readOnly bool
  173. }
  174. func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
  175. return volume.Attributes{
  176. ReadOnly: b.readOnly,
  177. Managed: false,
  178. SupportsSELinux: false,
  179. }
  180. }
  181. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  182. // to mount the volume are available on the underlying node.
  183. // If not, it returns an error
  184. func (b *flockerVolumeMounter) CanMount() error {
  185. return nil
  186. }
  187. func (b *flockerVolumeMounter) GetPath() string {
  188. return getPath(b.podUID, b.volName, b.plugin.host)
  189. }
  190. // SetUp bind mounts the disk global mount to the volume path.
  191. func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
  192. return b.SetUpAt(b.GetPath(), mounterArgs)
  193. }
  194. // newFlockerClient uses environment variables and pod attributes to return a
  195. // flocker client capable of talking with the Flocker control service.
  196. func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {
  197. host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
  198. port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
  199. if err != nil {
  200. return nil, err
  201. }
  202. caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
  203. keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
  204. certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
  205. c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
  206. return c, err
  207. }
  208. func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) {
  209. hostIP, err := b.plugin.host.GetHostIP()
  210. if err != nil {
  211. return nil, err
  212. }
  213. return b.plugin.newFlockerClient(hostIP.String())
  214. }
  215. /*
  216. SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
  217. control service:
  218. 1. Get the dataset id for the given volume name/dir
  219. 2. It should already be there, if it's not the user needs to manually create it
  220. 3. Check the current Primary UUID
  221. 4. If it doesn't match with the Primary UUID that we got on 2, then we will
  222. need to update the Primary UUID for this volume.
  223. 5. Wait until the Primary UUID was updated or timeout.
  224. */
  225. func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  226. var err error
  227. if b.flockerClient == nil {
  228. b.flockerClient, err = b.newFlockerClient()
  229. if err != nil {
  230. return err
  231. }
  232. }
  233. datasetUUID, err := b.GetDatasetUUID()
  234. if err != nil {
  235. return fmt.Errorf("The datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
  236. }
  237. datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
  238. if err != nil {
  239. return fmt.Errorf("The datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
  240. }
  241. primaryUUID, err := b.flockerClient.GetPrimaryUUID()
  242. if err != nil {
  243. return err
  244. }
  245. if datasetState.Primary != primaryUUID {
  246. if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
  247. return err
  248. }
  249. _, err := b.flockerClient.GetDatasetState(datasetUUID)
  250. if err != nil {
  251. return fmt.Errorf("The volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID)
  252. }
  253. }
  254. // TODO: handle failed mounts here.
  255. notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
  256. klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
  257. if err != nil && !os.IsNotExist(err) {
  258. klog.Errorf("cannot validate mount point: %s %v", dir, err)
  259. return err
  260. }
  261. if !notMnt {
  262. return nil
  263. }
  264. if err := os.MkdirAll(dir, 0750); err != nil {
  265. klog.Errorf("mkdir failed on disk %s (%v)", dir, err)
  266. return err
  267. }
  268. // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
  269. options := []string{"bind"}
  270. if b.readOnly {
  271. options = append(options, "ro")
  272. }
  273. globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
  274. klog.V(4).Infof("attempting to mount %s", dir)
  275. err = b.mounter.Mount(globalFlockerPath, dir, "", options)
  276. if err != nil {
  277. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  278. if mntErr != nil {
  279. klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
  280. return err
  281. }
  282. if !notMnt {
  283. if mntErr = b.mounter.Unmount(dir); mntErr != nil {
  284. klog.Errorf("failed to unmount: %v", mntErr)
  285. return err
  286. }
  287. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  288. if mntErr != nil {
  289. klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
  290. return err
  291. }
  292. if !notMnt {
  293. // This is very odd, we don't expect it. We'll try again next sync loop.
  294. klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
  295. return err
  296. }
  297. }
  298. os.Remove(dir)
  299. klog.Errorf("mount of disk %s failed: %v", dir, err)
  300. return err
  301. }
  302. if !b.readOnly {
  303. volume.SetVolumeOwnership(b, mounterArgs.FsGroup)
  304. }
  305. klog.V(4).Infof("successfully mounted %s", dir)
  306. return nil
  307. }
  308. // updateDatasetPrimary will update the primary in Flocker and wait for it to
  309. // be ready. If it never gets to ready state it will timeout and error.
  310. func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
  311. // We need to update the primary and wait for it to be ready
  312. _, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
  313. if err != nil {
  314. return err
  315. }
  316. timeoutChan := time.NewTimer(timeoutWaitingForVolume)
  317. defer timeoutChan.Stop()
  318. tickChan := time.NewTicker(tickerWaitingForVolume)
  319. defer tickChan.Stop()
  320. for {
  321. if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
  322. return nil
  323. }
  324. select {
  325. case <-timeoutChan.C:
  326. return fmt.Errorf(
  327. "Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
  328. datasetUUID, primaryUUID, err,
  329. )
  330. case <-tickChan.C:
  331. }
  332. }
  333. }
  334. func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) {
  335. if spec.Volume != nil && spec.Volume.Flocker != nil {
  336. return spec.Volume.Flocker, spec.ReadOnly, nil
  337. } else if spec.PersistentVolume != nil &&
  338. spec.PersistentVolume.Spec.Flocker != nil {
  339. return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
  340. }
  341. return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
  342. }
  343. type flockerVolumeUnmounter struct {
  344. *flockerVolume
  345. }
  346. var _ volume.Unmounter = &flockerVolumeUnmounter{}
  347. func (c *flockerVolumeUnmounter) GetPath() string {
  348. return getPath(c.podUID, c.volName, c.plugin.host)
  349. }
  350. // Unmounts the bind mount, and detaches the disk only if the PD
  351. // resource was the last reference to that disk on the kubelet.
  352. func (c *flockerVolumeUnmounter) TearDown() error {
  353. return c.TearDownAt(c.GetPath())
  354. }
  355. // TearDownAt unmounts the bind mount
  356. func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
  357. return mount.CleanupMountPoint(dir, c.mounter, false)
  358. }
  359. func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
  360. return p.newDeleterInternal(spec, &flockerUtil{})
  361. }
  362. func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
  363. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
  364. return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
  365. }
  366. return &flockerVolumeDeleter{
  367. flockerVolume: &flockerVolume{
  368. volName: spec.Name(),
  369. datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
  370. datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
  371. manager: manager,
  372. }}, nil
  373. }
  374. func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
  375. return p.newProvisionerInternal(options, &flockerUtil{})
  376. }
  377. func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
  378. return &flockerVolumeProvisioner{
  379. flockerVolume: &flockerVolume{
  380. manager: manager,
  381. plugin: p,
  382. },
  383. options: options,
  384. }, nil
  385. }