123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package operationexecutor implements interfaces that enable execution of
- // attach, detach, mount, and unmount operations with a
- // nestedpendingoperations so that more than one operation is never triggered
- // on the same volume for the same pod.
- package operationexecutor
- import (
- "fmt"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util"
- "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
- volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
- "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
- )
- // OperationExecutor defines a set of operations for attaching, detaching,
- // mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
- // prevents more than one operation from being triggered on the same volume.
- //
- // These operations should be idempotent (for example, AttachVolume should
- // still succeed if the volume is already attached to the node, etc.). However,
- // they depend on the volume plugins to implement this behavior.
- //
- // Once an operation completes successfully, the actualStateOfWorld is updated
- // to indicate the volume is attached/detached/mounted/unmounted.
- //
- // If the OperationExecutor fails to start the operation because, for example,
- // an operation with the same UniqueVolumeName is already pending, a non-nil
- // error is returned.
- //
- // Once the operation is started, since it is executed asynchronously,
- // errors are simply logged and the goroutine is terminated without updating
- // actualStateOfWorld (callers are responsible for retrying as needed).
- //
- // Some of these operations may result in calls to the API server; callers are
- // responsible for rate limiting on errors.
- type OperationExecutor interface {
- // AttachVolume attaches the volume to the node specified in volumeToAttach.
- // It then updates the actual state of the world to reflect that.
- AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
- // VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node.
- // If any volume is not attached right now, it will update the actual state of the world to reflect that.
- // Note that this operation could be operated concurrently with other attach/detach operations.
- // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
- // even if it is attached. But reconciler can correct this in a short period of time.
- VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
- // VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node
- // If any volume is not attached right now, it will update actual state of world to reflect that.
- VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
- // DetachVolume detaches the volume from the node specified in
- // volumeToDetach, and updates the actual state of the world to reflect
- // that. If verifySafeToDetach is set, a call is made to the fetch the node
- // object and it is used to verify that the volume does not exist in Node's
- // Status.VolumesInUse list (operation fails with error if it is).
- DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
- // If a volume has 'Filesystem' volumeMode, MountVolume mounts the
- // volume to the pod specified in volumeToMount.
- // Specifically it will:
- // * Wait for the device to finish attaching (for attachable volumes only).
- // * Mount device to global mount path (for attachable volumes only).
- // * Update actual state of world to reflect volume is globally mounted (for
- // attachable volumes only).
- // * Mount the volume to the pod specific path.
- // * Update actual state of world to reflect volume is mounted to the pod
- // path.
- // The parameter "isRemount" is informational and used to adjust logging
- // verbosity. An initial mount is more log-worthy than a remount, for
- // example.
- //
- // For 'Block' volumeMode, this method creates a symbolic link to
- // the volume from both the pod specified in volumeToMount and global map path.
- // Specifically it will:
- // * Wait for the device to finish attaching (for attachable volumes only).
- // * Update actual state of world to reflect volume is globally mounted/mapped.
- // * Map volume to global map path using symbolic link.
- // * Map the volume to the pod device map path using symbolic link.
- // * Update actual state of world to reflect volume is mounted/mapped to the pod path.
- MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
- // If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the
- // volume from the pod specified in volumeToUnmount and updates the actual
- // state of the world to reflect that.
- //
- // For 'Block' volumeMode, this method unmaps symbolic link to the volume
- // from both the pod device map path in volumeToUnmount and global map path.
- // And then, updates the actual state of the world to reflect that.
- UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error
- // If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the
- // volumes global mount path from the device (for attachable volumes only,
- // freeing it for detach. It then updates the actual state of the world to
- // reflect that.
- //
- // For 'Block' volumeMode, this method checks number of symbolic links under
- // global map path. If number of reference is zero, remove global map path
- // directory and free a volume for detach.
- // It then updates the actual state of the world to reflect that.
- UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
- // VerifyControllerAttachedVolume checks if the specified volume is present
- // in the specified nodes AttachedVolumes Status field. It uses kubeClient
- // to fetch the node object.
- // If the volume is found, the actual state of the world is updated to mark
- // the volume as attached.
- // If the volume does not implement the attacher interface, it is assumed to
- // be attached and the actual state of the world is updated accordingly.
- // If the volume is not found or there is an error (fetching the node
- // object, for example) then an error is returned which triggers exponential
- // back off on retries.
- VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
- // IsOperationPending returns true if an operation for the given volumeName and podName is pending,
- // otherwise it returns false
- IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
- // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
- ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
- // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
- ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
- // CheckVolumeExistenceOperation checks volume existence
- CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
- }
- // NewOperationExecutor returns a new instance of OperationExecutor.
- func NewOperationExecutor(
- operationGenerator OperationGenerator) OperationExecutor {
- return &operationExecutor{
- pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
- true /* exponentialBackOffOnError */),
- operationGenerator: operationGenerator,
- }
- }
- // ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
- // state of the world cache after successful mount/unmount.
- type ActualStateOfWorldMounterUpdater interface {
- // Marks the specified volume as mounted to the specified pod
- MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
- // Marks the specified volume as unmounted from the specified pod
- MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
- // Marks the specified volume as having been globally mounted.
- MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
- // Marks the specified volume as having its global mount unmounted.
- MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
- // Marks the specified volume's file system resize request is finished.
- MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
- }
- // ActualStateOfWorldAttacherUpdater defines a set of operations updating the
- // actual state of the world cache after successful attach/detach/mount/unmount.
- type ActualStateOfWorldAttacherUpdater interface {
- // Marks the specified volume as attached to the specified node. If the
- // volume name is supplied, that volume name will be used. If not, the
- // volume name is computed using the result from querying the plugin.
- //
- // TODO: in the future, we should be able to remove the volumeName
- // argument to this method -- since it is used only for attachable
- // volumes. See issue 29695.
- MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
- // Marks the specified volume as *possibly* attached to the specified node.
- // If an attach operation fails, the attach/detach controller does not know for certain if the volume is attached or not.
- // If the volume name is supplied, that volume name will be used. If not, the
- // volume name is computed using the result from querying the plugin.
- MarkVolumeAsUncertain(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error
- // Marks the specified volume as detached from the specified node
- MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
- // Marks desire to detach the specified volume (remove the volume from the node's
- // volumesToReportAsAttached list)
- RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
- // Unmarks the desire to detach for the specified volume (add the volume back to
- // the node's volumesToReportAsAttached list)
- AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
- }
- // VolumeLogger defines a set of operations for generating volume-related logging and error msgs
- type VolumeLogger interface {
- // Creates a detailed msg that can be used in logs
- // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
- // where each implementation provides the volume details
- GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string)
- // Creates a detailed error that can be used in logs.
- // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
- GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error)
- // Creates a simple msg that is user friendly and a detailed msg that can be used in logs
- // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
- // where each implementation provides the volume details
- GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string)
- // Creates a simple error that is user friendly and a detailed error that can be used in logs.
- // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
- GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error)
- }
- // Generates an error string with the format ": <err>" if err exists
- func errSuffix(err error) string {
- errStr := ""
- if err != nil {
- errStr = fmt.Sprintf(": %v", err)
- }
- return errStr
- }
- // Generate a detailed error msg for logs
- func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) {
- return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg)
- }
- // Generate a simplified error msg for events and a detailed error msg for logs
- func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) {
- simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg)
- return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details)
- }
- // VolumeToAttach represents a volume that should be attached to a node.
- type VolumeToAttach struct {
- // MultiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
- // It is used to prevent reporting the error from being reported more than once for a given volume.
- MultiAttachErrorReported bool
- // VolumeName is the unique identifier for the volume that should be
- // attached.
- VolumeName v1.UniqueVolumeName
- // VolumeSpec is a volume spec containing the specification for the volume
- // that should be attached.
- VolumeSpec *volume.Spec
- // NodeName is the identifier for the node that the volume should be
- // attached to.
- NodeName types.NodeName
- // scheduledPods is a map containing the set of pods that reference this
- // volume and are scheduled to the underlying node. The key in the map is
- // the name of the pod and the value is a pod object containing more
- // information about the pod.
- ScheduledPods []*v1.Pod
- }
- // GenerateMsgDetailed returns detailed msgs for volumes to attach
- func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateMsg returns simple and detailed msgs for volumes to attach
- func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateErrorDetailed returns detailed errors for volumes to attach
- func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
- return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
- }
- // GenerateError returns simple and detailed errors for volumes to attach
- func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
- simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
- return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
- }
- // VolumeToMount represents a volume that should be attached to this node and
- // mounted to the PodName.
- type VolumeToMount struct {
- // VolumeName is the unique identifier for the volume that should be
- // mounted.
- VolumeName v1.UniqueVolumeName
- // PodName is the unique identifier for the pod that the volume should be
- // mounted to after it is attached.
- PodName volumetypes.UniquePodName
- // VolumeSpec is a volume spec containing the specification for the volume
- // that should be mounted. Used to create NewMounter. Used to generate
- // InnerVolumeSpecName.
- VolumeSpec *volume.Spec
- // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
- // volume was referenced through a persistent volume claim, this contains
- // the podSpec.Volume[x].Name of the persistent volume claim.
- OuterVolumeSpecName string
- // Pod to mount the volume to. Used to create NewMounter.
- Pod *v1.Pod
- // PluginIsAttachable indicates that the plugin for this volume implements
- // the volume.Attacher interface
- PluginIsAttachable bool
- // PluginIsDeviceMountable indicates that the plugin for this volume implements
- // the volume.DeviceMounter interface
- PluginIsDeviceMountable bool
- // VolumeGidValue contains the value of the GID annotation, if present.
- VolumeGidValue string
- // DevicePath contains the path on the node where the volume is attached.
- // For non-attachable volumes this is empty.
- DevicePath string
- // ReportedInUse indicates that the volume was successfully added to the
- // VolumesInUse field in the node's status.
- ReportedInUse bool
- // DesiredSizeLimit indicates the desired upper bound on the size of the volume
- // (if so implemented)
- DesiredSizeLimit *resource.Quantity
- }
- // GenerateMsgDetailed returns detailed msgs for volumes to mount
- func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateMsg returns simple and detailed msgs for volumes to mount
- func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateErrorDetailed returns detailed errors for volumes to mount
- func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
- return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
- }
- // GenerateError returns simple and detailed errors for volumes to mount
- func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
- simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
- return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
- }
- // AttachedVolume represents a volume that is attached to a node.
- type AttachedVolume struct {
- // VolumeName is the unique identifier for the volume that is attached.
- VolumeName v1.UniqueVolumeName
- // VolumeSpec is the volume spec containing the specification for the
- // volume that is attached.
- VolumeSpec *volume.Spec
- // NodeName is the identifier for the node that the volume is attached to.
- NodeName types.NodeName
- // PluginIsAttachable indicates that the plugin for this volume implements
- // the volume.Attacher interface
- PluginIsAttachable bool
- // DevicePath contains the path on the node where the volume is attached.
- // For non-attachable volumes this is empty.
- DevicePath string
- // DeviceMountPath contains the path on the node where the device should
- // be mounted after it is attached.
- DeviceMountPath string
- // PluginName is the Unescaped Qualified name of the volume plugin used to
- // attach and mount this volume.
- PluginName string
- }
- // GenerateMsgDetailed returns detailed msgs for attached volumes
- func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateMsg returns simple and detailed msgs for attached volumes
- func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
- volumeSpecName := "nil"
- if volume.VolumeSpec != nil {
- volumeSpecName = volume.VolumeSpec.Name()
- }
- return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
- }
- // GenerateErrorDetailed returns detailed errors for attached volumes
- func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
- return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
- }
- // GenerateError returns simple and detailed errors for attached volumes
- func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
- simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
- return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
- }
- // MountedVolume represents a volume that has successfully been mounted to a pod.
- type MountedVolume struct {
- // PodName is the unique identifier of the pod mounted to.
- PodName volumetypes.UniquePodName
- // VolumeName is the unique identifier of the volume mounted to the pod.
- VolumeName v1.UniqueVolumeName
- // InnerVolumeSpecName is the volume.Spec.Name() of the volume. If the
- // volume was referenced through a persistent volume claims, this contains
- // the name of the bound persistent volume object.
- // It is the name that plugins use in their pod mount path, i.e.
- // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{innerVolumeSpecName}/
- // PVC example,
- // apiVersion: v1
- // kind: PersistentVolume
- // metadata:
- // name: pv0003 <- InnerVolumeSpecName
- // spec:
- // capacity:
- // storage: 5Gi
- // accessModes:
- // - ReadWriteOnce
- // persistentVolumeReclaimPolicy: Recycle
- // nfs:
- // path: /tmp
- // server: 172.17.0.2
- // Non-PVC example:
- // apiVersion: v1
- // kind: Pod
- // metadata:
- // name: test-pd
- // spec:
- // containers:
- // - image: k8s.gcr.io/test-webserver
- // name: test-container
- // volumeMounts:
- // - mountPath: /test-pd
- // name: test-volume
- // volumes:
- // - name: test-volume <- InnerVolumeSpecName
- // gcePersistentDisk:
- // pdName: my-data-disk
- // fsType: ext4
- InnerVolumeSpecName string
- // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
- // volume was referenced through a persistent volume claim, this contains
- // the podSpec.Volume[x].Name of the persistent volume claim.
- // PVC example:
- // kind: Pod
- // apiVersion: v1
- // metadata:
- // name: mypod
- // spec:
- // containers:
- // - name: myfrontend
- // image: dockerfile/nginx
- // volumeMounts:
- // - mountPath: "/var/www/html"
- // name: mypd
- // volumes:
- // - name: mypd <- OuterVolumeSpecName
- // persistentVolumeClaim:
- // claimName: myclaim
- // Non-PVC example:
- // apiVersion: v1
- // kind: Pod
- // metadata:
- // name: test-pd
- // spec:
- // containers:
- // - image: k8s.gcr.io/test-webserver
- // name: test-container
- // volumeMounts:
- // - mountPath: /test-pd
- // name: test-volume
- // volumes:
- // - name: test-volume <- OuterVolumeSpecName
- // gcePersistentDisk:
- // pdName: my-data-disk
- // fsType: ext4
- OuterVolumeSpecName string
- // PluginName is the "Unescaped Qualified" name of the volume plugin used to
- // mount and unmount this volume. It can be used to fetch the volume plugin
- // to unmount with, on demand. It is also the name that plugins use, though
- // escaped, in their pod mount path, i.e.
- // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
- PluginName string
- // PodUID is the UID of the pod mounted to. It is also the string used by
- // plugins in their pod mount path, i.e.
- // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
- PodUID types.UID
- // Mounter is the volume mounter used to mount this volume. It is required
- // by kubelet to create container.VolumeMap.
- Mounter volume.Mounter
- // BlockVolumeMapper is the volume mapper used to map this volume. It is required
- // by kubelet to create container.VolumeMap.
- BlockVolumeMapper volume.BlockVolumeMapper
- // VolumeGidValue contains the value of the GID annotation, if present.
- VolumeGidValue string
- // VolumeSpec is a volume spec containing the specification for the volume
- // that should be mounted.
- VolumeSpec *volume.Spec
- // DeviceMountPath contains the path on the node where the device should
- // be mounted after it is attached.
- DeviceMountPath string
- }
- // GenerateMsgDetailed returns detailed msgs for mounted volumes
- func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
- return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
- }
- // GenerateMsg returns simple and detailed msgs for mounted volumes
- func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
- detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
- return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
- }
- // GenerateErrorDetailed returns simple and detailed errors for mounted volumes
- func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
- return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
- }
- // GenerateError returns simple and detailed errors for mounted volumes
- func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
- simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
- return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
- }
- type operationExecutor struct {
- // pendingOperations keeps track of pending attach and detach operations so
- // multiple operations are not started on the same volume
- pendingOperations nestedpendingoperations.NestedPendingOperations
- // operationGenerator is an interface that provides implementations for
- // generating volume function
- operationGenerator OperationGenerator
- }
- func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
- return oe.pendingOperations.IsOperationPending(volumeName, podName)
- }
- func (oe *operationExecutor) AttachVolume(
- volumeToAttach VolumeToAttach,
- actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
- generatedOperations :=
- oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
- return oe.pendingOperations.Run(
- volumeToAttach.VolumeName, "" /* podName */, generatedOperations)
- }
- func (oe *operationExecutor) DetachVolume(
- volumeToDetach AttachedVolume,
- verifySafeToDetach bool,
- actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
- generatedOperations, err :=
- oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
- if err != nil {
- return err
- }
- return oe.pendingOperations.Run(
- volumeToDetach.VolumeName, "" /* podName */, generatedOperations)
- }
- func (oe *operationExecutor) VerifyVolumesAreAttached(
- attachedVolumes map[types.NodeName][]AttachedVolume,
- actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
- // A map of plugin names and nodes on which they exist with volumes they manage
- bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
- volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
- for node, nodeAttachedVolumes := range attachedVolumes {
- for _, volumeAttached := range nodeAttachedVolumes {
- if volumeAttached.VolumeSpec == nil {
- klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
- continue
- }
- volumePlugin, err :=
- oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
- if err != nil || volumePlugin == nil {
- klog.Errorf(
- "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
- volumeAttached.VolumeName,
- volumeAttached.VolumeSpec.Name(),
- volumeAttached.NodeName,
- err)
- continue
- }
- pluginName := volumePlugin.GetPluginName()
- if volumePlugin.SupportsBulkVolumeVerification() {
- pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
- if !pluginNodesExist {
- pluginNodes = make(map[types.NodeName][]*volume.Spec)
- }
- volumeSpecList, nodeExists := pluginNodes[node]
- if !nodeExists {
- volumeSpecList = []*volume.Spec{}
- }
- volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
- pluginNodes[node] = volumeSpecList
- bulkVerifyPluginsByNode[pluginName] = pluginNodes
- volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
- if !mapExists {
- volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
- }
- volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
- volumeSpecMapByPlugin[pluginName] = volumeSpecMap
- continue
- }
- // If node doesn't support Bulk volume polling it is best to poll individually
- nodeError := oe.VerifyVolumesAreAttachedPerNode(nodeAttachedVolumes, node, actualStateOfWorld)
- if nodeError != nil {
- klog.Errorf("BulkVerifyVolumes.VerifyVolumesAreAttached verifying volumes on node %q with %v", node, nodeError)
- }
- break
- }
- }
- for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
- generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
- pluginNodeVolumes,
- pluginName,
- volumeSpecMapByPlugin[pluginName],
- actualStateOfWorld)
- if err != nil {
- klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
- }
- // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
- uniquePluginName := v1.UniqueVolumeName(pluginName)
- err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations)
- if err != nil {
- klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
- }
- }
- }
- func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
- attachedVolumes []AttachedVolume,
- nodeName types.NodeName,
- actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
- generatedOperations, err :=
- oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
- if err != nil {
- return err
- }
- // Give an empty UniqueVolumeName so that this operation could be executed concurrently.
- return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations)
- }
- func (oe *operationExecutor) MountVolume(
- waitForAttachTimeout time.Duration,
- volumeToMount VolumeToMount,
- actualStateOfWorld ActualStateOfWorldMounterUpdater,
- isRemount bool) error {
- fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
- if err != nil {
- return err
- }
- var generatedOperations volumetypes.GeneratedOperations
- if fsVolume {
- // Filesystem volume case
- // Mount/remount a volume when a volume is attached
- generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
- waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
- } else {
- // Block volume case
- // Creates a map to device if a volume is attached
- generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
- waitForAttachTimeout, volumeToMount, actualStateOfWorld)
- }
- if err != nil {
- return err
- }
- // Avoid executing mount/map from multiple pods referencing the
- // same volume in parallel
- podName := nestedpendingoperations.EmptyUniquePodName
- // TODO: remove this -- not necessary
- if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
- // volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
- // referencing the same volume in parallel
- podName = util.GetUniquePodName(volumeToMount.Pod)
- }
- // TODO mount_device
- return oe.pendingOperations.Run(
- volumeToMount.VolumeName, podName, generatedOperations)
- }
- func (oe *operationExecutor) UnmountVolume(
- volumeToUnmount MountedVolume,
- actualStateOfWorld ActualStateOfWorldMounterUpdater,
- podsDir string) error {
- fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
- if err != nil {
- return err
- }
- var generatedOperations volumetypes.GeneratedOperations
- if fsVolume {
- // Filesystem volume case
- // Unmount a volume if a volume is mounted
- generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
- volumeToUnmount, actualStateOfWorld, podsDir)
- } else {
- // Block volume case
- // Unmap a volume if a volume is mapped
- generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
- volumeToUnmount, actualStateOfWorld)
- }
- if err != nil {
- return err
- }
- // All volume plugins can execute unmount/unmap for multiple pods referencing the
- // same volume in parallel
- podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
- return oe.pendingOperations.Run(
- volumeToUnmount.VolumeName, podName, generatedOperations)
- }
- func (oe *operationExecutor) UnmountDevice(
- deviceToDetach AttachedVolume,
- actualStateOfWorld ActualStateOfWorldMounterUpdater,
- mounter mount.Interface) error {
- fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
- if err != nil {
- return err
- }
- var generatedOperations volumetypes.GeneratedOperations
- if fsVolume {
- // Filesystem volume case
- // Unmount and detach a device if a volume isn't referenced
- generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
- deviceToDetach, actualStateOfWorld, mounter)
- } else {
- // Block volume case
- // Detach a device and remove loopback if a volume isn't referenced
- generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
- deviceToDetach, actualStateOfWorld, mounter)
- }
- if err != nil {
- return err
- }
- // Avoid executing unmount/unmap device from multiple pods referencing
- // the same volume in parallel
- podName := nestedpendingoperations.EmptyUniquePodName
- return oe.pendingOperations.Run(
- deviceToDetach.VolumeName, podName, generatedOperations)
- }
- func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
- generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld)
- if err != nil {
- return err
- }
- return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations)
- }
- func (oe *operationExecutor) VerifyControllerAttachedVolume(
- volumeToMount VolumeToMount,
- nodeName types.NodeName,
- actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
- generatedOperations, err :=
- oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
- if err != nil {
- return err
- }
- return oe.pendingOperations.Run(
- volumeToMount.VolumeName, "" /* podName */, generatedOperations)
- }
- // ReconstructVolumeOperation return a func to create volumeSpec from mount path
- func (oe *operationExecutor) ReconstructVolumeOperation(
- volumeMode v1.PersistentVolumeMode,
- plugin volume.VolumePlugin,
- mapperPlugin volume.BlockVolumePlugin,
- uid types.UID,
- podName volumetypes.UniquePodName,
- volumeSpecName string,
- volumePath string,
- pluginName string) (*volume.Spec, error) {
- // Filesystem Volume case
- if volumeMode == v1.PersistentVolumeFilesystem {
- // Create volumeSpec from mount path
- klog.V(5).Infof("Starting operationExecutor.ReconstructVolumepodName")
- volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
- if err != nil {
- return nil, err
- }
- return volumeSpec, nil
- }
- // Block Volume case
- // Create volumeSpec from mount path
- klog.V(5).Infof("Starting operationExecutor.ReconstructVolume")
- if mapperPlugin == nil {
- return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)",
- pluginName,
- volumeSpecName,
- podName,
- uid)
- }
- // volumePath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link
- // corresponding to raw block device.
- // ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
- volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
- if err != nil {
- return nil, err
- }
- return volumeSpec, nil
- }
- // CheckVolumeExistenceOperation checks mount path directory if volume still exists
- func (oe *operationExecutor) CheckVolumeExistenceOperation(
- volumeSpec *volume.Spec,
- mountPath, volumeName string,
- mounter mount.Interface,
- uniqueVolumeName v1.UniqueVolumeName,
- podName volumetypes.UniquePodName,
- podUID types.UID,
- attachable volume.AttachableVolumePlugin) (bool, error) {
- fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec)
- if err != nil {
- return false, err
- }
- // Filesystem Volume case
- // For attachable volume case, check mount path directory if volume is still existing and mounted.
- // Return true if volume is mounted.
- if fsVolume {
- if attachable != nil {
- var isNotMount bool
- var mountCheckErr error
- if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
- return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
- uniqueVolumeName,
- volumeName,
- podName,
- podUID,
- mountCheckErr)
- }
- return !isNotMount, nil
- }
- // For non-attachable volume case, skip check and return true without mount point check
- // since plugins may not have volume mount point.
- return true, nil
- }
- // Block Volume case
- // Check mount path directory if volume still exists, then return true if volume
- // is there. Either plugin is attachable or non-attachable, the plugin should
- // have symbolic link associated to raw block device under pod device map
- // if volume exists.
- blkutil := volumepathhandler.NewBlockVolumePathHandler()
- var islinkExist bool
- var checkErr error
- if islinkExist, checkErr = blkutil.IsSymlinkExist(mountPath); checkErr != nil {
- return false, fmt.Errorf("Could not check whether the block volume %q (spec.Name: %q) pod %q (UID: %q) is mapped to: %v",
- uniqueVolumeName,
- volumeName,
- podName,
- podUID,
- checkErr)
- }
- return islinkExist, nil
- }
|