123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package reconciler implements interfaces that attempt to reconcile the
- // desired state of the with the actual state of the world by triggering
- // actions.
- package reconciler
- import (
- "fmt"
- "strings"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/record"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
- "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
- "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
- kevents "k8s.io/kubernetes/pkg/kubelet/events"
- "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- )
- // Reconciler runs a periodic loop to reconcile the desired state of the world with
- // the actual state of the world by triggering attach detach operations.
- // Note: This is distinct from the Reconciler implemented by the kubelet volume
- // manager. This reconciles state for the attach/detach controller. That
- // reconciles state for the kubelet volume manager.
- type Reconciler interface {
- // Starts running the reconciliation loop which executes periodically, checks
- // if volumes that should be attached are attached and volumes that should
- // be detached are detached. If not, it will trigger attach/detach
- // operations to rectify.
- Run(stopCh <-chan struct{})
- }
- // NewReconciler returns a new instance of Reconciler that waits loopPeriod
- // between successive executions.
- // loopPeriod is the amount of time the reconciler loop waits between
- // successive executions.
- // maxWaitForUnmountDuration is the max amount of time the reconciler will wait
- // for the volume to be safely unmounted, after this it will detach the volume
- // anyway (to handle crashed/unavailable nodes). If during this time the volume
- // becomes used by a new pod, the detach request will be aborted and the timer
- // cleared.
- func NewReconciler(
- loopPeriod time.Duration,
- maxWaitForUnmountDuration time.Duration,
- syncDuration time.Duration,
- disableReconciliationSync bool,
- desiredStateOfWorld cache.DesiredStateOfWorld,
- actualStateOfWorld cache.ActualStateOfWorld,
- attacherDetacher operationexecutor.OperationExecutor,
- nodeStatusUpdater statusupdater.NodeStatusUpdater,
- recorder record.EventRecorder) Reconciler {
- return &reconciler{
- loopPeriod: loopPeriod,
- maxWaitForUnmountDuration: maxWaitForUnmountDuration,
- syncDuration: syncDuration,
- disableReconciliationSync: disableReconciliationSync,
- desiredStateOfWorld: desiredStateOfWorld,
- actualStateOfWorld: actualStateOfWorld,
- attacherDetacher: attacherDetacher,
- nodeStatusUpdater: nodeStatusUpdater,
- timeOfLastSync: time.Now(),
- recorder: recorder,
- }
- }
- type reconciler struct {
- loopPeriod time.Duration
- maxWaitForUnmountDuration time.Duration
- syncDuration time.Duration
- desiredStateOfWorld cache.DesiredStateOfWorld
- actualStateOfWorld cache.ActualStateOfWorld
- attacherDetacher operationexecutor.OperationExecutor
- nodeStatusUpdater statusupdater.NodeStatusUpdater
- timeOfLastSync time.Time
- disableReconciliationSync bool
- recorder record.EventRecorder
- }
- func (rc *reconciler) Run(stopCh <-chan struct{}) {
- wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)
- }
- // reconciliationLoopFunc this can be disabled via cli option disableReconciliation.
- // It periodically checks whether the attached volumes from actual state
- // are still attached to the node and update the status if they are not.
- func (rc *reconciler) reconciliationLoopFunc() func() {
- return func() {
- rc.reconcile()
- if rc.disableReconciliationSync {
- klog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.")
- } else if rc.syncDuration < time.Second {
- klog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.")
- } else if time.Since(rc.timeOfLastSync) > rc.syncDuration {
- klog.V(5).Info("Starting reconciling attached volumes still attached")
- rc.sync()
- }
- }
- }
- func (rc *reconciler) sync() {
- defer rc.updateSyncTime()
- rc.syncStates()
- }
- func (rc *reconciler) updateSyncTime() {
- rc.timeOfLastSync = time.Now()
- }
- func (rc *reconciler) syncStates() {
- volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
- rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
- }
- // isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
- // In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
- // false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
- // attacher to fail fast in such cases.
- // Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
- func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
- if volumeSpec.Volume != nil {
- // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
- if volumeSpec.Volume.AzureDisk != nil ||
- volumeSpec.Volume.Cinder != nil {
- return true
- }
- }
- // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
- // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
- if volumeSpec.PersistentVolume != nil {
- // Check for persistent volume types which do not fail when trying to multi-attach
- if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
- // No access mode specified so we don't know for sure. Let the attacher fail if needed
- return false
- }
- // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
- for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
- if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
- return false
- }
- }
- return true
- }
- // we don't know if it's supported or not and let the attacher fail later in cases it's not supported
- return false
- }
- func (rc *reconciler) reconcile() {
- // Detaches are triggered before attaches so that volumes referenced by
- // pods that are rescheduled to a different node are detached first.
- // Ensure volumes that should be detached are detached.
- for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
- if !rc.desiredStateOfWorld.VolumeExists(
- attachedVolume.VolumeName, attachedVolume.NodeName) {
- // Don't even try to start an operation if there is already one running
- // This check must be done before we do any other checks, as otherwise the other checks
- // may pass while at the same time the volume leaves the pending state, resulting in
- // double detach attempts
- if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") {
- klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
- continue
- }
- // Set the detach request time
- elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
- if err != nil {
- klog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
- continue
- }
- // Check whether timeout has reached the maximum waiting time
- timeout := elapsedTime > rc.maxWaitForUnmountDuration
- // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
- if attachedVolume.MountedByNode && !timeout {
- klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Cannot detach volume because it is still mounted", ""))
- continue
- }
- // Before triggering volume detach, mark volume as detached and update the node status
- // If it fails to update node status, skip detach volume
- err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
- if err != nil {
- klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v",
- attachedVolume.VolumeName,
- attachedVolume.NodeName,
- err)
- }
- // Update Node Status to indicate volume is no longer safe to mount.
- err = rc.nodeStatusUpdater.UpdateNodeStatuses()
- if err != nil {
- // Skip detaching this volume if unable to update node status
- klog.Errorf(attachedVolume.GenerateErrorDetailed("UpdateNodeStatuses failed while attempting to report volume as attached", err).Error())
- continue
- }
- // Trigger detach volume which requires verifying safe to detach step
- // If timeout is true, skip verifySafeToDetach check
- klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting attacherDetacher.DetachVolume", ""))
- verifySafeToDetach := !timeout
- err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
- if err == nil {
- if !timeout {
- klog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", ""))
- } else {
- metrics.RecordForcedDetachMetric()
- klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
- }
- }
- if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
- }
- }
- }
- rc.attachDesiredVolumes()
- // Update Node Status
- err := rc.nodeStatusUpdater.UpdateNodeStatuses()
- if err != nil {
- klog.Warningf("UpdateNodeStatuses failed with: %v", err)
- }
- }
- func (rc *reconciler) attachDesiredVolumes() {
- // Ensure volumes that should be attached are attached.
- for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
- if rc.actualStateOfWorld.IsVolumeAttachedToNode(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
- // Volume/Node exists, touch it to reset detachRequestedTime
- if klog.V(5) {
- klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
- }
- rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
- continue
- }
- // Don't even try to start an operation if there is already one running
- if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
- if klog.V(10) {
- klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
- }
- continue
- }
- if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
- nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
- if len(nodes) > 0 {
- if !volumeToAttach.MultiAttachErrorReported {
- rc.reportMultiAttachError(volumeToAttach, nodes)
- rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
- }
- continue
- }
- }
- // Volume/Node doesn't exist, spawn a goroutine to attach it
- if klog.V(5) {
- klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
- }
- err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
- if err == nil {
- klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
- }
- if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
- // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
- // Log all other errors.
- klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
- }
- }
- }
- // reportMultiAttachError sends events and logs situation that a volume that
- // should be attached to a node is already attached to different node(s).
- func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
- // Filter out the current node from list of nodes where the volume is
- // attached.
- // Some methods need []string, some other needs []NodeName, collect both.
- // In theory, these arrays should have always only one element - the
- // controller does not allow more than one attachment. But use array just
- // in case...
- otherNodes := []types.NodeName{}
- otherNodesStr := []string{}
- for _, node := range nodes {
- if node != volumeToAttach.NodeName {
- otherNodes = append(otherNodes, node)
- otherNodesStr = append(otherNodesStr, string(node))
- }
- }
- // Get list of pods that use the volume on the other nodes.
- pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
- if len(pods) == 0 {
- // We did not find any pods that requests the volume. The pod must have been deleted already.
- simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
- for _, pod := range volumeToAttach.ScheduledPods {
- rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
- }
- // Log detailed message to system admin
- nodeList := strings.Join(otherNodesStr, ", ")
- detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList))
- klog.Warningf(detailedMsg)
- return
- }
- // There are pods that require the volume and run on another node. Typically
- // it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let
- // the user know what pods are blocking the volume.
- for _, scheduledPod := range volumeToAttach.ScheduledPods {
- // Each scheduledPod must get a custom message. They can run in
- // different namespaces and user of a namespace should not see names of
- // pods in other namespaces.
- localPodNames := []string{} // Names of pods in scheduledPods's namespace
- otherPods := 0 // Count of pods in other namespaces
- for _, pod := range pods {
- if pod.Namespace == scheduledPod.Namespace {
- localPodNames = append(localPodNames, pod.Name)
- } else {
- otherPods++
- }
- }
- var msg string
- if len(localPodNames) > 0 {
- msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", "))
- if otherPods > 0 {
- msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods)
- }
- } else {
- // No local pods, there are pods only in different namespaces.
- msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
- }
- simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
- rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
- }
- // Log all pods for system admin
- podNames := []string{}
- for _, pod := range pods {
- podNames = append(podNames, pod.Namespace+"/"+pod.Name)
- }
- detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
- klog.Warningf(detailedMsg)
- }
|