123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package flocker
- import (
- "fmt"
- "os"
- "path/filepath"
- "time"
- flockerapi "github.com/clusterhq/flocker-go"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/util/env"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/volume"
- utilstrings "k8s.io/utils/strings"
- )
- // ProbeVolumePlugins is the primary entrypoint for volume plugins.
- func ProbeVolumePlugins() []volume.VolumePlugin {
- return []volume.VolumePlugin{&flockerPlugin{nil}}
- }
- type flockerPlugin struct {
- host volume.VolumeHost
- }
- type flockerVolume struct {
- volName string
- podUID types.UID
- // dataset metadata name deprecated
- datasetName string
- // dataset uuid
- datasetUUID string
- //pod *v1.Pod
- flockerClient flockerapi.Clientable
- manager volumeManager
- plugin *flockerPlugin
- mounter mount.Interface
- volume.MetricsProvider
- }
- var _ volume.VolumePlugin = &flockerPlugin{}
- var _ volume.PersistentVolumePlugin = &flockerPlugin{}
- var _ volume.DeletableVolumePlugin = &flockerPlugin{}
- var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
- const (
- flockerPluginName = "kubernetes.io/flocker"
- defaultHost = "localhost"
- defaultPort = 4523
- defaultCACertFile = "/etc/flocker/cluster.crt"
- defaultClientKeyFile = "/etc/flocker/apiuser.key"
- defaultClientCertFile = "/etc/flocker/apiuser.crt"
- defaultMountPath = "/flocker"
- timeoutWaitingForVolume = 2 * time.Minute
- tickerWaitingForVolume = 5 * time.Second
- )
- func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
- return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName)
- }
- func makeGlobalFlockerPath(datasetUUID string) string {
- return filepath.Join(defaultMountPath, datasetUUID)
- }
- func (p *flockerPlugin) Init(host volume.VolumeHost) error {
- p.host = host
- return nil
- }
- func (p *flockerPlugin) GetPluginName() string {
- return flockerPluginName
- }
- func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
- volumeSource, _, err := getVolumeSource(spec)
- if err != nil {
- return "", err
- }
- return volumeSource.DatasetName, nil
- }
- func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
- return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
- (spec.Volume != nil && spec.Volume.Flocker != nil)
- }
- func (p *flockerPlugin) IsMigratedToCSI() bool {
- return false
- }
- func (p *flockerPlugin) RequiresRemount() bool {
- return false
- }
- func (p *flockerPlugin) SupportsMountOption() bool {
- return false
- }
- func (p *flockerPlugin) SupportsBulkVolumeVerification() bool {
- return false
- }
- func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
- return []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- }
- }
- func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) {
- // AFAIK this will always be r/w, but perhaps for the future it will be needed
- readOnly := false
- if spec.Volume != nil && spec.Volume.Flocker != nil {
- return spec.Volume.Flocker, readOnly
- }
- return spec.PersistentVolume.Spec.Flocker, readOnly
- }
- func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
- // Inject real implementations here, test through the internal function.
- return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
- }
- func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
- volumeSource, readOnly, err := getVolumeSource(spec)
- if err != nil {
- return nil, err
- }
- datasetName := volumeSource.DatasetName
- datasetUUID := volumeSource.DatasetUUID
- return &flockerVolumeMounter{
- flockerVolume: &flockerVolume{
- podUID: podUID,
- volName: spec.Name(),
- datasetName: datasetName,
- datasetUUID: datasetUUID,
- mounter: mounter,
- manager: manager,
- plugin: p,
- MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)),
- },
- readOnly: readOnly}, nil
- }
- func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
- // Inject real implementations here, test through the internal function.
- return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
- }
- func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
- return &flockerVolumeUnmounter{&flockerVolume{
- podUID: podUID,
- volName: volName,
- manager: manager,
- mounter: mounter,
- plugin: p,
- MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
- }}, nil
- }
- func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
- flockerVolume := &v1.Volume{
- Name: volumeName,
- VolumeSource: v1.VolumeSource{
- Flocker: &v1.FlockerVolumeSource{
- DatasetName: volumeName,
- },
- },
- }
- return volume.NewSpecFromVolume(flockerVolume), nil
- }
- func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
- // return UUID if set
- if len(b.datasetUUID) > 0 {
- return b.datasetUUID, nil
- }
- if b.flockerClient == nil {
- return "", fmt.Errorf("Flocker client is not initialized")
- }
- // lookup in flocker API otherwise
- return b.flockerClient.GetDatasetID(b.datasetName)
- }
- type flockerVolumeMounter struct {
- *flockerVolume
- readOnly bool
- }
- func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
- return volume.Attributes{
- ReadOnly: b.readOnly,
- Managed: false,
- SupportsSELinux: false,
- }
- }
- // Checks prior to mount operations to verify that the required components (binaries, etc.)
- // to mount the volume are available on the underlying node.
- // If not, it returns an error
- func (b *flockerVolumeMounter) CanMount() error {
- return nil
- }
- func (b *flockerVolumeMounter) GetPath() string {
- return getPath(b.podUID, b.volName, b.plugin.host)
- }
- // SetUp bind mounts the disk global mount to the volume path.
- func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
- return b.SetUpAt(b.GetPath(), mounterArgs)
- }
- // newFlockerClient uses environment variables and pod attributes to return a
- // flocker client capable of talking with the Flocker control service.
- func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {
- host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
- port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
- if err != nil {
- return nil, err
- }
- caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
- keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
- certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
- c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
- return c, err
- }
- func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) {
- hostIP, err := b.plugin.host.GetHostIP()
- if err != nil {
- return nil, err
- }
- return b.plugin.newFlockerClient(hostIP.String())
- }
- /*
- SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
- control service:
- 1. Get the dataset id for the given volume name/dir
- 2. It should already be there, if it's not the user needs to manually create it
- 3. Check the current Primary UUID
- 4. If it doesn't match with the Primary UUID that we got on 2, then we will
- need to update the Primary UUID for this volume.
- 5. Wait until the Primary UUID was updated or timeout.
- */
- func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
- var err error
- if b.flockerClient == nil {
- b.flockerClient, err = b.newFlockerClient()
- if err != nil {
- return err
- }
- }
- datasetUUID, err := b.GetDatasetUUID()
- if err != nil {
- return fmt.Errorf("The datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
- }
- datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
- if err != nil {
- return fmt.Errorf("The datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
- }
- primaryUUID, err := b.flockerClient.GetPrimaryUUID()
- if err != nil {
- return err
- }
- if datasetState.Primary != primaryUUID {
- if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
- return err
- }
- _, err := b.flockerClient.GetDatasetState(datasetUUID)
- if err != nil {
- return fmt.Errorf("The volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID)
- }
- }
- // TODO: handle failed mounts here.
- notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
- klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
- if err != nil && !os.IsNotExist(err) {
- klog.Errorf("cannot validate mount point: %s %v", dir, err)
- return err
- }
- if !notMnt {
- return nil
- }
- if err := os.MkdirAll(dir, 0750); err != nil {
- klog.Errorf("mkdir failed on disk %s (%v)", dir, err)
- return err
- }
- // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
- options := []string{"bind"}
- if b.readOnly {
- options = append(options, "ro")
- }
- globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
- klog.V(4).Infof("attempting to mount %s", dir)
- err = b.mounter.Mount(globalFlockerPath, dir, "", options)
- if err != nil {
- notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
- if mntErr != nil {
- klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
- return err
- }
- if !notMnt {
- if mntErr = b.mounter.Unmount(dir); mntErr != nil {
- klog.Errorf("failed to unmount: %v", mntErr)
- return err
- }
- notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
- if mntErr != nil {
- klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
- return err
- }
- if !notMnt {
- // This is very odd, we don't expect it. We'll try again next sync loop.
- klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
- return err
- }
- }
- os.Remove(dir)
- klog.Errorf("mount of disk %s failed: %v", dir, err)
- return err
- }
- if !b.readOnly {
- volume.SetVolumeOwnership(b, mounterArgs.FsGroup)
- }
- klog.V(4).Infof("successfully mounted %s", dir)
- return nil
- }
- // updateDatasetPrimary will update the primary in Flocker and wait for it to
- // be ready. If it never gets to ready state it will timeout and error.
- func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
- // We need to update the primary and wait for it to be ready
- _, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
- if err != nil {
- return err
- }
- timeoutChan := time.NewTimer(timeoutWaitingForVolume)
- defer timeoutChan.Stop()
- tickChan := time.NewTicker(tickerWaitingForVolume)
- defer tickChan.Stop()
- for {
- if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
- return nil
- }
- select {
- case <-timeoutChan.C:
- return fmt.Errorf(
- "Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
- datasetUUID, primaryUUID, err,
- )
- case <-tickChan.C:
- break
- }
- }
- }
- func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) {
- if spec.Volume != nil && spec.Volume.Flocker != nil {
- return spec.Volume.Flocker, spec.ReadOnly, nil
- } else if spec.PersistentVolume != nil &&
- spec.PersistentVolume.Spec.Flocker != nil {
- return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
- }
- return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
- }
- type flockerVolumeUnmounter struct {
- *flockerVolume
- }
- var _ volume.Unmounter = &flockerVolumeUnmounter{}
- func (c *flockerVolumeUnmounter) GetPath() string {
- return getPath(c.podUID, c.volName, c.plugin.host)
- }
- // Unmounts the bind mount, and detaches the disk only if the PD
- // resource was the last reference to that disk on the kubelet.
- func (c *flockerVolumeUnmounter) TearDown() error {
- return c.TearDownAt(c.GetPath())
- }
- // TearDownAt unmounts the bind mount
- func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
- return mount.CleanupMountPoint(dir, c.mounter, false)
- }
- func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
- return p.newDeleterInternal(spec, &flockerUtil{})
- }
- func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
- if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
- return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
- }
- return &flockerVolumeDeleter{
- flockerVolume: &flockerVolume{
- volName: spec.Name(),
- datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
- datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
- manager: manager,
- }}, nil
- }
- func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
- return p.newProvisionerInternal(options, &flockerUtil{})
- }
- func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
- return &flockerVolumeProvisioner{
- flockerVolume: &flockerVolume{
- manager: manager,
- plugin: p,
- },
- options: options,
- }, nil
- }
|