operation_executor.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package operationexecutor implements interfaces that enable execution of
  14. // attach, detach, mount, and unmount operations with a
  15. // nestedpendingoperations so that more than one operation is never triggered
  16. // on the same volume for the same pod.
  17. package operationexecutor
  18. import (
  19. "fmt"
  20. "time"
  21. "k8s.io/klog"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/api/resource"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/kubernetes/pkg/util/mount"
  26. "k8s.io/kubernetes/pkg/volume"
  27. "k8s.io/kubernetes/pkg/volume/util"
  28. "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
  29. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  30. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  31. )
  32. // OperationExecutor defines a set of operations for attaching, detaching,
  33. // mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
  34. // prevents more than one operation from being triggered on the same volume.
  35. //
  36. // These operations should be idempotent (for example, AttachVolume should
  37. // still succeed if the volume is already attached to the node, etc.). However,
  38. // they depend on the volume plugins to implement this behavior.
  39. //
  40. // Once an operation completes successfully, the actualStateOfWorld is updated
  41. // to indicate the volume is attached/detached/mounted/unmounted.
  42. //
  43. // If the OperationExecutor fails to start the operation because, for example,
  44. // an operation with the same UniqueVolumeName is already pending, a non-nil
  45. // error is returned.
  46. //
  47. // Once the operation is started, since it is executed asynchronously,
  48. // errors are simply logged and the goroutine is terminated without updating
  49. // actualStateOfWorld (callers are responsible for retrying as needed).
  50. //
  51. // Some of these operations may result in calls to the API server; callers are
  52. // responsible for rate limiting on errors.
  53. type OperationExecutor interface {
  54. // AttachVolume attaches the volume to the node specified in volumeToAttach.
  55. // It then updates the actual state of the world to reflect that.
  56. AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
  57. // VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node.
  58. // If any volume is not attached right now, it will update the actual state of the world to reflect that.
  59. // Note that this operation could be operated concurrently with other attach/detach operations.
  60. // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
  61. // even if it is attached. But reconciler can correct this in a short period of time.
  62. VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
  63. // VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node
  64. // If any volume is not attached right now, it will update actual state of world to reflect that.
  65. VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
  66. // DetachVolume detaches the volume from the node specified in
  67. // volumeToDetach, and updates the actual state of the world to reflect
  68. // that. If verifySafeToDetach is set, a call is made to the fetch the node
  69. // object and it is used to verify that the volume does not exist in Node's
  70. // Status.VolumesInUse list (operation fails with error if it is).
  71. DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
  72. // If a volume has 'Filesystem' volumeMode, MountVolume mounts the
  73. // volume to the pod specified in volumeToMount.
  74. // Specifically it will:
  75. // * Wait for the device to finish attaching (for attachable volumes only).
  76. // * Mount device to global mount path (for attachable volumes only).
  77. // * Update actual state of world to reflect volume is globally mounted (for
  78. // attachable volumes only).
  79. // * Mount the volume to the pod specific path.
  80. // * Update actual state of world to reflect volume is mounted to the pod
  81. // path.
  82. // The parameter "isRemount" is informational and used to adjust logging
  83. // verbosity. An initial mount is more log-worthy than a remount, for
  84. // example.
  85. //
  86. // For 'Block' volumeMode, this method creates a symbolic link to
  87. // the volume from both the pod specified in volumeToMount and global map path.
  88. // Specifically it will:
  89. // * Wait for the device to finish attaching (for attachable volumes only).
  90. // * Update actual state of world to reflect volume is globally mounted/mapped.
  91. // * Map volume to global map path using symbolic link.
  92. // * Map the volume to the pod device map path using symbolic link.
  93. // * Update actual state of world to reflect volume is mounted/mapped to the pod path.
  94. MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
  95. // If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the
  96. // volume from the pod specified in volumeToUnmount and updates the actual
  97. // state of the world to reflect that.
  98. //
  99. // For 'Block' volumeMode, this method unmaps symbolic link to the volume
  100. // from both the pod device map path in volumeToUnmount and global map path.
  101. // And then, updates the actual state of the world to reflect that.
  102. UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error
  103. // If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the
  104. // volumes global mount path from the device (for attachable volumes only,
  105. // freeing it for detach. It then updates the actual state of the world to
  106. // reflect that.
  107. //
  108. // For 'Block' volumeMode, this method checks number of symbolic links under
  109. // global map path. If number of reference is zero, remove global map path
  110. // directory and free a volume for detach.
  111. // It then updates the actual state of the world to reflect that.
  112. UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
  113. // VerifyControllerAttachedVolume checks if the specified volume is present
  114. // in the specified nodes AttachedVolumes Status field. It uses kubeClient
  115. // to fetch the node object.
  116. // If the volume is found, the actual state of the world is updated to mark
  117. // the volume as attached.
  118. // If the volume does not implement the attacher interface, it is assumed to
  119. // be attached and the actual state of the world is updated accordingly.
  120. // If the volume is not found or there is an error (fetching the node
  121. // object, for example) then an error is returned which triggers exponential
  122. // back off on retries.
  123. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
  124. // IsOperationPending returns true if an operation for the given volumeName and podName is pending,
  125. // otherwise it returns false
  126. IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
  127. // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
  128. ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
  129. // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
  130. ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
  131. // CheckVolumeExistenceOperation checks volume existence
  132. CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
  133. }
  134. // NewOperationExecutor returns a new instance of OperationExecutor.
  135. func NewOperationExecutor(
  136. operationGenerator OperationGenerator) OperationExecutor {
  137. return &operationExecutor{
  138. pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
  139. true /* exponentialBackOffOnError */),
  140. operationGenerator: operationGenerator,
  141. }
  142. }
  143. // ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
  144. // state of the world cache after successful mount/unmount.
  145. type ActualStateOfWorldMounterUpdater interface {
  146. // Marks the specified volume as mounted to the specified pod
  147. MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
  148. // Marks the specified volume as unmounted from the specified pod
  149. MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
  150. // Marks the specified volume as having been globally mounted.
  151. MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
  152. // Marks the specified volume as having its global mount unmounted.
  153. MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
  154. // Marks the specified volume's file system resize request is finished.
  155. MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
  156. }
  157. // ActualStateOfWorldAttacherUpdater defines a set of operations updating the
  158. // actual state of the world cache after successful attach/detach/mount/unmount.
  159. type ActualStateOfWorldAttacherUpdater interface {
  160. // Marks the specified volume as attached to the specified node. If the
  161. // volume name is supplied, that volume name will be used. If not, the
  162. // volume name is computed using the result from querying the plugin.
  163. //
  164. // TODO: in the future, we should be able to remove the volumeName
  165. // argument to this method -- since it is used only for attachable
  166. // volumes. See issue 29695.
  167. MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
  168. // Marks the specified volume as *possibly* attached to the specified node.
  169. // If an attach operation fails, the attach/detach controller does not know for certain if the volume is attached or not.
  170. // If the volume name is supplied, that volume name will be used. If not, the
  171. // volume name is computed using the result from querying the plugin.
  172. MarkVolumeAsUncertain(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error
  173. // Marks the specified volume as detached from the specified node
  174. MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
  175. // Marks desire to detach the specified volume (remove the volume from the node's
  176. // volumesToReportAsAttached list)
  177. RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
  178. // Unmarks the desire to detach for the specified volume (add the volume back to
  179. // the node's volumesToReportAsAttached list)
  180. AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
  181. }
  182. // VolumeLogger defines a set of operations for generating volume-related logging and error msgs
  183. type VolumeLogger interface {
  184. // Creates a detailed msg that can be used in logs
  185. // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
  186. // where each implementation provides the volume details
  187. GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string)
  188. // Creates a detailed error that can be used in logs.
  189. // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
  190. GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error)
  191. // Creates a simple msg that is user friendly and a detailed msg that can be used in logs
  192. // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
  193. // where each implementation provides the volume details
  194. GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string)
  195. // Creates a simple error that is user friendly and a detailed error that can be used in logs.
  196. // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
  197. GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error)
  198. }
  199. // Generates an error string with the format ": <err>" if err exists
  200. func errSuffix(err error) string {
  201. errStr := ""
  202. if err != nil {
  203. errStr = fmt.Sprintf(": %v", err)
  204. }
  205. return errStr
  206. }
  207. // Generate a detailed error msg for logs
  208. func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) {
  209. return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg)
  210. }
  211. // Generate a simplified error msg for events and a detailed error msg for logs
  212. func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) {
  213. simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg)
  214. return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details)
  215. }
  216. // VolumeToAttach represents a volume that should be attached to a node.
  217. type VolumeToAttach struct {
  218. // MultiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
  219. // It is used to prevent reporting the error from being reported more than once for a given volume.
  220. MultiAttachErrorReported bool
  221. // VolumeName is the unique identifier for the volume that should be
  222. // attached.
  223. VolumeName v1.UniqueVolumeName
  224. // VolumeSpec is a volume spec containing the specification for the volume
  225. // that should be attached.
  226. VolumeSpec *volume.Spec
  227. // NodeName is the identifier for the node that the volume should be
  228. // attached to.
  229. NodeName types.NodeName
  230. // scheduledPods is a map containing the set of pods that reference this
  231. // volume and are scheduled to the underlying node. The key in the map is
  232. // the name of the pod and the value is a pod object containing more
  233. // information about the pod.
  234. ScheduledPods []*v1.Pod
  235. }
  236. // GenerateMsgDetailed returns detailed msgs for volumes to attach
  237. func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
  238. detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
  239. volumeSpecName := "nil"
  240. if volume.VolumeSpec != nil {
  241. volumeSpecName = volume.VolumeSpec.Name()
  242. }
  243. return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  244. }
  245. // GenerateMsg returns simple and detailed msgs for volumes to attach
  246. func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
  247. detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
  248. volumeSpecName := "nil"
  249. if volume.VolumeSpec != nil {
  250. volumeSpecName = volume.VolumeSpec.Name()
  251. }
  252. return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  253. }
  254. // GenerateErrorDetailed returns detailed errors for volumes to attach
  255. func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
  256. return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
  257. }
  258. // GenerateError returns simple and detailed errors for volumes to attach
  259. func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
  260. simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
  261. return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
  262. }
  263. // VolumeToMount represents a volume that should be attached to this node and
  264. // mounted to the PodName.
  265. type VolumeToMount struct {
  266. // VolumeName is the unique identifier for the volume that should be
  267. // mounted.
  268. VolumeName v1.UniqueVolumeName
  269. // PodName is the unique identifier for the pod that the volume should be
  270. // mounted to after it is attached.
  271. PodName volumetypes.UniquePodName
  272. // VolumeSpec is a volume spec containing the specification for the volume
  273. // that should be mounted. Used to create NewMounter. Used to generate
  274. // InnerVolumeSpecName.
  275. VolumeSpec *volume.Spec
  276. // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
  277. // volume was referenced through a persistent volume claim, this contains
  278. // the podSpec.Volume[x].Name of the persistent volume claim.
  279. OuterVolumeSpecName string
  280. // Pod to mount the volume to. Used to create NewMounter.
  281. Pod *v1.Pod
  282. // PluginIsAttachable indicates that the plugin for this volume implements
  283. // the volume.Attacher interface
  284. PluginIsAttachable bool
  285. // PluginIsDeviceMountable indicates that the plugin for this volume implements
  286. // the volume.DeviceMounter interface
  287. PluginIsDeviceMountable bool
  288. // VolumeGidValue contains the value of the GID annotation, if present.
  289. VolumeGidValue string
  290. // DevicePath contains the path on the node where the volume is attached.
  291. // For non-attachable volumes this is empty.
  292. DevicePath string
  293. // ReportedInUse indicates that the volume was successfully added to the
  294. // VolumesInUse field in the node's status.
  295. ReportedInUse bool
  296. // DesiredSizeLimit indicates the desired upper bound on the size of the volume
  297. // (if so implemented)
  298. DesiredSizeLimit *resource.Quantity
  299. }
  300. // GenerateMsgDetailed returns detailed msgs for volumes to mount
  301. func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
  302. detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
  303. volumeSpecName := "nil"
  304. if volume.VolumeSpec != nil {
  305. volumeSpecName = volume.VolumeSpec.Name()
  306. }
  307. return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  308. }
  309. // GenerateMsg returns simple and detailed msgs for volumes to mount
  310. func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
  311. detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
  312. volumeSpecName := "nil"
  313. if volume.VolumeSpec != nil {
  314. volumeSpecName = volume.VolumeSpec.Name()
  315. }
  316. return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  317. }
  318. // GenerateErrorDetailed returns detailed errors for volumes to mount
  319. func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
  320. return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
  321. }
  322. // GenerateError returns simple and detailed errors for volumes to mount
  323. func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
  324. simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
  325. return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
  326. }
  327. // AttachedVolume represents a volume that is attached to a node.
  328. type AttachedVolume struct {
  329. // VolumeName is the unique identifier for the volume that is attached.
  330. VolumeName v1.UniqueVolumeName
  331. // VolumeSpec is the volume spec containing the specification for the
  332. // volume that is attached.
  333. VolumeSpec *volume.Spec
  334. // NodeName is the identifier for the node that the volume is attached to.
  335. NodeName types.NodeName
  336. // PluginIsAttachable indicates that the plugin for this volume implements
  337. // the volume.Attacher interface
  338. PluginIsAttachable bool
  339. // DevicePath contains the path on the node where the volume is attached.
  340. // For non-attachable volumes this is empty.
  341. DevicePath string
  342. // DeviceMountPath contains the path on the node where the device should
  343. // be mounted after it is attached.
  344. DeviceMountPath string
  345. // PluginName is the Unescaped Qualified name of the volume plugin used to
  346. // attach and mount this volume.
  347. PluginName string
  348. }
  349. // GenerateMsgDetailed returns detailed msgs for attached volumes
  350. func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
  351. detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
  352. volumeSpecName := "nil"
  353. if volume.VolumeSpec != nil {
  354. volumeSpecName = volume.VolumeSpec.Name()
  355. }
  356. return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  357. }
  358. // GenerateMsg returns simple and detailed msgs for attached volumes
  359. func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
  360. detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
  361. volumeSpecName := "nil"
  362. if volume.VolumeSpec != nil {
  363. volumeSpecName = volume.VolumeSpec.Name()
  364. }
  365. return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
  366. }
  367. // GenerateErrorDetailed returns detailed errors for attached volumes
  368. func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
  369. return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
  370. }
  371. // GenerateError returns simple and detailed errors for attached volumes
  372. func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
  373. simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
  374. return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
  375. }
  376. // MountedVolume represents a volume that has successfully been mounted to a pod.
  377. type MountedVolume struct {
  378. // PodName is the unique identifier of the pod mounted to.
  379. PodName volumetypes.UniquePodName
  380. // VolumeName is the unique identifier of the volume mounted to the pod.
  381. VolumeName v1.UniqueVolumeName
  382. // InnerVolumeSpecName is the volume.Spec.Name() of the volume. If the
  383. // volume was referenced through a persistent volume claims, this contains
  384. // the name of the bound persistent volume object.
  385. // It is the name that plugins use in their pod mount path, i.e.
  386. // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{innerVolumeSpecName}/
  387. // PVC example,
  388. // apiVersion: v1
  389. // kind: PersistentVolume
  390. // metadata:
  391. // name: pv0003 <- InnerVolumeSpecName
  392. // spec:
  393. // capacity:
  394. // storage: 5Gi
  395. // accessModes:
  396. // - ReadWriteOnce
  397. // persistentVolumeReclaimPolicy: Recycle
  398. // nfs:
  399. // path: /tmp
  400. // server: 172.17.0.2
  401. // Non-PVC example:
  402. // apiVersion: v1
  403. // kind: Pod
  404. // metadata:
  405. // name: test-pd
  406. // spec:
  407. // containers:
  408. // - image: k8s.gcr.io/test-webserver
  409. // name: test-container
  410. // volumeMounts:
  411. // - mountPath: /test-pd
  412. // name: test-volume
  413. // volumes:
  414. // - name: test-volume <- InnerVolumeSpecName
  415. // gcePersistentDisk:
  416. // pdName: my-data-disk
  417. // fsType: ext4
  418. InnerVolumeSpecName string
  419. // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
  420. // volume was referenced through a persistent volume claim, this contains
  421. // the podSpec.Volume[x].Name of the persistent volume claim.
  422. // PVC example:
  423. // kind: Pod
  424. // apiVersion: v1
  425. // metadata:
  426. // name: mypod
  427. // spec:
  428. // containers:
  429. // - name: myfrontend
  430. // image: dockerfile/nginx
  431. // volumeMounts:
  432. // - mountPath: "/var/www/html"
  433. // name: mypd
  434. // volumes:
  435. // - name: mypd <- OuterVolumeSpecName
  436. // persistentVolumeClaim:
  437. // claimName: myclaim
  438. // Non-PVC example:
  439. // apiVersion: v1
  440. // kind: Pod
  441. // metadata:
  442. // name: test-pd
  443. // spec:
  444. // containers:
  445. // - image: k8s.gcr.io/test-webserver
  446. // name: test-container
  447. // volumeMounts:
  448. // - mountPath: /test-pd
  449. // name: test-volume
  450. // volumes:
  451. // - name: test-volume <- OuterVolumeSpecName
  452. // gcePersistentDisk:
  453. // pdName: my-data-disk
  454. // fsType: ext4
  455. OuterVolumeSpecName string
  456. // PluginName is the "Unescaped Qualified" name of the volume plugin used to
  457. // mount and unmount this volume. It can be used to fetch the volume plugin
  458. // to unmount with, on demand. It is also the name that plugins use, though
  459. // escaped, in their pod mount path, i.e.
  460. // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
  461. PluginName string
  462. // PodUID is the UID of the pod mounted to. It is also the string used by
  463. // plugins in their pod mount path, i.e.
  464. // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
  465. PodUID types.UID
  466. // Mounter is the volume mounter used to mount this volume. It is required
  467. // by kubelet to create container.VolumeMap.
  468. Mounter volume.Mounter
  469. // BlockVolumeMapper is the volume mapper used to map this volume. It is required
  470. // by kubelet to create container.VolumeMap.
  471. BlockVolumeMapper volume.BlockVolumeMapper
  472. // VolumeGidValue contains the value of the GID annotation, if present.
  473. VolumeGidValue string
  474. // VolumeSpec is a volume spec containing the specification for the volume
  475. // that should be mounted.
  476. VolumeSpec *volume.Spec
  477. // DeviceMountPath contains the path on the node where the device should
  478. // be mounted after it is attached.
  479. DeviceMountPath string
  480. }
  481. // GenerateMsgDetailed returns detailed msgs for mounted volumes
  482. func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
  483. detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
  484. return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
  485. }
  486. // GenerateMsg returns simple and detailed msgs for mounted volumes
  487. func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
  488. detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
  489. return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
  490. }
  491. // GenerateErrorDetailed returns simple and detailed errors for mounted volumes
  492. func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
  493. return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
  494. }
  495. // GenerateError returns simple and detailed errors for mounted volumes
  496. func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
  497. simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
  498. return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
  499. }
  500. type operationExecutor struct {
  501. // pendingOperations keeps track of pending attach and detach operations so
  502. // multiple operations are not started on the same volume
  503. pendingOperations nestedpendingoperations.NestedPendingOperations
  504. // operationGenerator is an interface that provides implementations for
  505. // generating volume function
  506. operationGenerator OperationGenerator
  507. }
  508. func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
  509. return oe.pendingOperations.IsOperationPending(volumeName, podName)
  510. }
  511. func (oe *operationExecutor) AttachVolume(
  512. volumeToAttach VolumeToAttach,
  513. actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  514. generatedOperations :=
  515. oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
  516. return oe.pendingOperations.Run(
  517. volumeToAttach.VolumeName, "" /* podName */, generatedOperations)
  518. }
  519. func (oe *operationExecutor) DetachVolume(
  520. volumeToDetach AttachedVolume,
  521. verifySafeToDetach bool,
  522. actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  523. generatedOperations, err :=
  524. oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
  525. if err != nil {
  526. return err
  527. }
  528. return oe.pendingOperations.Run(
  529. volumeToDetach.VolumeName, "" /* podName */, generatedOperations)
  530. }
  531. func (oe *operationExecutor) VerifyVolumesAreAttached(
  532. attachedVolumes map[types.NodeName][]AttachedVolume,
  533. actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
  534. // A map of plugin names and nodes on which they exist with volumes they manage
  535. bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
  536. volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
  537. for node, nodeAttachedVolumes := range attachedVolumes {
  538. for _, volumeAttached := range nodeAttachedVolumes {
  539. if volumeAttached.VolumeSpec == nil {
  540. klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
  541. continue
  542. }
  543. volumePlugin, err :=
  544. oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
  545. if err != nil || volumePlugin == nil {
  546. klog.Errorf(
  547. "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
  548. volumeAttached.VolumeName,
  549. volumeAttached.VolumeSpec.Name(),
  550. volumeAttached.NodeName,
  551. err)
  552. continue
  553. }
  554. pluginName := volumePlugin.GetPluginName()
  555. if volumePlugin.SupportsBulkVolumeVerification() {
  556. pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
  557. if !pluginNodesExist {
  558. pluginNodes = make(map[types.NodeName][]*volume.Spec)
  559. }
  560. volumeSpecList, nodeExists := pluginNodes[node]
  561. if !nodeExists {
  562. volumeSpecList = []*volume.Spec{}
  563. }
  564. volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
  565. pluginNodes[node] = volumeSpecList
  566. bulkVerifyPluginsByNode[pluginName] = pluginNodes
  567. volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
  568. if !mapExists {
  569. volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
  570. }
  571. volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
  572. volumeSpecMapByPlugin[pluginName] = volumeSpecMap
  573. continue
  574. }
  575. // If node doesn't support Bulk volume polling it is best to poll individually
  576. nodeError := oe.VerifyVolumesAreAttachedPerNode(nodeAttachedVolumes, node, actualStateOfWorld)
  577. if nodeError != nil {
  578. klog.Errorf("BulkVerifyVolumes.VerifyVolumesAreAttached verifying volumes on node %q with %v", node, nodeError)
  579. }
  580. break
  581. }
  582. }
  583. for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
  584. generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
  585. pluginNodeVolumes,
  586. pluginName,
  587. volumeSpecMapByPlugin[pluginName],
  588. actualStateOfWorld)
  589. if err != nil {
  590. klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
  591. }
  592. // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
  593. uniquePluginName := v1.UniqueVolumeName(pluginName)
  594. err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations)
  595. if err != nil {
  596. klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
  597. }
  598. }
  599. }
  600. func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
  601. attachedVolumes []AttachedVolume,
  602. nodeName types.NodeName,
  603. actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  604. generatedOperations, err :=
  605. oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
  606. if err != nil {
  607. return err
  608. }
  609. // Give an empty UniqueVolumeName so that this operation could be executed concurrently.
  610. return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations)
  611. }
  612. func (oe *operationExecutor) MountVolume(
  613. waitForAttachTimeout time.Duration,
  614. volumeToMount VolumeToMount,
  615. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  616. isRemount bool) error {
  617. fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
  618. if err != nil {
  619. return err
  620. }
  621. var generatedOperations volumetypes.GeneratedOperations
  622. if fsVolume {
  623. // Filesystem volume case
  624. // Mount/remount a volume when a volume is attached
  625. generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
  626. waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
  627. } else {
  628. // Block volume case
  629. // Creates a map to device if a volume is attached
  630. generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
  631. waitForAttachTimeout, volumeToMount, actualStateOfWorld)
  632. }
  633. if err != nil {
  634. return err
  635. }
  636. // Avoid executing mount/map from multiple pods referencing the
  637. // same volume in parallel
  638. podName := nestedpendingoperations.EmptyUniquePodName
  639. // TODO: remove this -- not necessary
  640. if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
  641. // volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
  642. // referencing the same volume in parallel
  643. podName = util.GetUniquePodName(volumeToMount.Pod)
  644. }
  645. // TODO mount_device
  646. return oe.pendingOperations.Run(
  647. volumeToMount.VolumeName, podName, generatedOperations)
  648. }
  649. func (oe *operationExecutor) UnmountVolume(
  650. volumeToUnmount MountedVolume,
  651. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  652. podsDir string) error {
  653. fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
  654. if err != nil {
  655. return err
  656. }
  657. var generatedOperations volumetypes.GeneratedOperations
  658. if fsVolume {
  659. // Filesystem volume case
  660. // Unmount a volume if a volume is mounted
  661. generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
  662. volumeToUnmount, actualStateOfWorld, podsDir)
  663. } else {
  664. // Block volume case
  665. // Unmap a volume if a volume is mapped
  666. generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
  667. volumeToUnmount, actualStateOfWorld)
  668. }
  669. if err != nil {
  670. return err
  671. }
  672. // All volume plugins can execute unmount/unmap for multiple pods referencing the
  673. // same volume in parallel
  674. podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
  675. return oe.pendingOperations.Run(
  676. volumeToUnmount.VolumeName, podName, generatedOperations)
  677. }
  678. func (oe *operationExecutor) UnmountDevice(
  679. deviceToDetach AttachedVolume,
  680. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  681. mounter mount.Interface) error {
  682. fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
  683. if err != nil {
  684. return err
  685. }
  686. var generatedOperations volumetypes.GeneratedOperations
  687. if fsVolume {
  688. // Filesystem volume case
  689. // Unmount and detach a device if a volume isn't referenced
  690. generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
  691. deviceToDetach, actualStateOfWorld, mounter)
  692. } else {
  693. // Block volume case
  694. // Detach a device and remove loopback if a volume isn't referenced
  695. generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
  696. deviceToDetach, actualStateOfWorld, mounter)
  697. }
  698. if err != nil {
  699. return err
  700. }
  701. // Avoid executing unmount/unmap device from multiple pods referencing
  702. // the same volume in parallel
  703. podName := nestedpendingoperations.EmptyUniquePodName
  704. return oe.pendingOperations.Run(
  705. deviceToDetach.VolumeName, podName, generatedOperations)
  706. }
  707. func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
  708. generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld)
  709. if err != nil {
  710. return err
  711. }
  712. return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations)
  713. }
  714. func (oe *operationExecutor) VerifyControllerAttachedVolume(
  715. volumeToMount VolumeToMount,
  716. nodeName types.NodeName,
  717. actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  718. generatedOperations, err :=
  719. oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
  720. if err != nil {
  721. return err
  722. }
  723. return oe.pendingOperations.Run(
  724. volumeToMount.VolumeName, "" /* podName */, generatedOperations)
  725. }
  726. // ReconstructVolumeOperation return a func to create volumeSpec from mount path
  727. func (oe *operationExecutor) ReconstructVolumeOperation(
  728. volumeMode v1.PersistentVolumeMode,
  729. plugin volume.VolumePlugin,
  730. mapperPlugin volume.BlockVolumePlugin,
  731. uid types.UID,
  732. podName volumetypes.UniquePodName,
  733. volumeSpecName string,
  734. volumePath string,
  735. pluginName string) (*volume.Spec, error) {
  736. // Filesystem Volume case
  737. if volumeMode == v1.PersistentVolumeFilesystem {
  738. // Create volumeSpec from mount path
  739. klog.V(5).Infof("Starting operationExecutor.ReconstructVolumepodName")
  740. volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
  741. if err != nil {
  742. return nil, err
  743. }
  744. return volumeSpec, nil
  745. }
  746. // Block Volume case
  747. // Create volumeSpec from mount path
  748. klog.V(5).Infof("Starting operationExecutor.ReconstructVolume")
  749. if mapperPlugin == nil {
  750. return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)",
  751. pluginName,
  752. volumeSpecName,
  753. podName,
  754. uid)
  755. }
  756. // volumePath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link
  757. // corresponding to raw block device.
  758. // ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
  759. volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
  760. if err != nil {
  761. return nil, err
  762. }
  763. return volumeSpec, nil
  764. }
  765. // CheckVolumeExistenceOperation checks mount path directory if volume still exists
  766. func (oe *operationExecutor) CheckVolumeExistenceOperation(
  767. volumeSpec *volume.Spec,
  768. mountPath, volumeName string,
  769. mounter mount.Interface,
  770. uniqueVolumeName v1.UniqueVolumeName,
  771. podName volumetypes.UniquePodName,
  772. podUID types.UID,
  773. attachable volume.AttachableVolumePlugin) (bool, error) {
  774. fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec)
  775. if err != nil {
  776. return false, err
  777. }
  778. // Filesystem Volume case
  779. // For attachable volume case, check mount path directory if volume is still existing and mounted.
  780. // Return true if volume is mounted.
  781. if fsVolume {
  782. if attachable != nil {
  783. var isNotMount bool
  784. var mountCheckErr error
  785. if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
  786. return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
  787. uniqueVolumeName,
  788. volumeName,
  789. podName,
  790. podUID,
  791. mountCheckErr)
  792. }
  793. return !isNotMount, nil
  794. }
  795. // For non-attachable volume case, skip check and return true without mount point check
  796. // since plugins may not have volume mount point.
  797. return true, nil
  798. }
  799. // Block Volume case
  800. // Check mount path directory if volume still exists, then return true if volume
  801. // is there. Either plugin is attachable or non-attachable, the plugin should
  802. // have symbolic link associated to raw block device under pod device map
  803. // if volume exists.
  804. blkutil := volumepathhandler.NewBlockVolumePathHandler()
  805. var islinkExist bool
  806. var checkErr error
  807. if islinkExist, checkErr = blkutil.IsSymlinkExist(mountPath); checkErr != nil {
  808. return false, fmt.Errorf("Could not check whether the block volume %q (spec.Name: %q) pod %q (UID: %q) is mapped to: %v",
  809. uniqueVolumeName,
  810. volumeName,
  811. podName,
  812. podUID,
  813. checkErr)
  814. }
  815. return islinkExist, nil
  816. }