desired_state_of_world.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package cache implements data structures used by the kubelet volume manager to
  15. keep track of attached volumes and the pods that mounted them.
  16. */
  17. package cache
  18. import (
  19. "fmt"
  20. "sync"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
  24. "k8s.io/kubernetes/pkg/volume"
  25. "k8s.io/kubernetes/pkg/volume/util"
  26. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  27. "k8s.io/kubernetes/pkg/volume/util/types"
  28. )
  29. // DesiredStateOfWorld defines a set of thread-safe operations for the kubelet
  30. // volume manager's desired state of the world cache.
  31. // This cache contains volumes->pods i.e. a set of all volumes that should be
  32. // attached to this node and the pods that reference them and should mount the
  33. // volume.
  34. // Note: This is distinct from the DesiredStateOfWorld implemented by the
  35. // attach/detach controller. They both keep track of different objects. This
  36. // contains kubelet volume manager specific state.
  37. type DesiredStateOfWorld interface {
  38. // AddPodToVolume adds the given pod to the given volume in the cache
  39. // indicating the specified pod should mount the specified volume.
  40. // A unique volumeName is generated from the volumeSpec and returned on
  41. // success.
  42. // If no volume plugin can support the given volumeSpec or more than one
  43. // plugin can support it, an error is returned.
  44. // If a volume with the name volumeName does not exist in the list of
  45. // volumes that should be attached to this node, the volume is implicitly
  46. // added.
  47. // If a pod with the same unique name already exists under the specified
  48. // volume, this is a no-op.
  49. AddPodToVolume(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (v1.UniqueVolumeName, error)
  50. // MarkVolumesReportedInUse sets the ReportedInUse value to true for the
  51. // reportedVolumes. For volumes not in the reportedVolumes list, the
  52. // ReportedInUse value is reset to false. The default ReportedInUse value
  53. // for a newly created volume is false.
  54. // When set to true this value indicates that the volume was successfully
  55. // added to the VolumesInUse field in the node's status. Mount operation needs
  56. // to check this value before issuing the operation.
  57. // If a volume in the reportedVolumes list does not exist in the list of
  58. // volumes that should be attached to this node, it is skipped without error.
  59. MarkVolumesReportedInUse(reportedVolumes []v1.UniqueVolumeName)
  60. // DeletePodFromVolume removes the given pod from the given volume in the
  61. // cache indicating the specified pod no longer requires the specified
  62. // volume.
  63. // If a pod with the same unique name does not exist under the specified
  64. // volume, this is a no-op.
  65. // If a volume with the name volumeName does not exist in the list of
  66. // attached volumes, this is a no-op.
  67. // If after deleting the pod, the specified volume contains no other child
  68. // pods, the volume is also deleted.
  69. DeletePodFromVolume(podName types.UniquePodName, volumeName v1.UniqueVolumeName)
  70. // VolumeExists returns true if the given volume exists in the list of
  71. // volumes that should be attached to this node.
  72. // If a pod with the same unique name does not exist under the specified
  73. // volume, false is returned.
  74. VolumeExists(volumeName v1.UniqueVolumeName) bool
  75. // PodExistsInVolume returns true if the given pod exists in the list of
  76. // podsToMount for the given volume in the cache.
  77. // If a pod with the same unique name does not exist under the specified
  78. // volume, false is returned.
  79. // If a volume with the name volumeName does not exist in the list of
  80. // attached volumes, false is returned.
  81. PodExistsInVolume(podName types.UniquePodName, volumeName v1.UniqueVolumeName) bool
  82. // GetVolumesToMount generates and returns a list of volumes that should be
  83. // attached to this node and the pods they should be mounted to based on the
  84. // current desired state of the world.
  85. GetVolumesToMount() []VolumeToMount
  86. // GetPods generates and returns a map of pods in which map is indexed
  87. // with pod's unique name. This map can be used to determine which pod is currently
  88. // in desired state of world.
  89. GetPods() map[types.UniquePodName]bool
  90. // VolumeExistsWithSpecName returns true if the given volume specified with the
  91. // volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
  92. // volumes that should be attached to this node.
  93. // If a pod with the same name does not exist under the specified
  94. // volume, false is returned.
  95. VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool
  96. }
  97. // VolumeToMount represents a volume that is attached to this node and needs to
  98. // be mounted to PodName.
  99. type VolumeToMount struct {
  100. operationexecutor.VolumeToMount
  101. }
  102. // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
  103. func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
  104. return &desiredStateOfWorld{
  105. volumesToMount: make(map[v1.UniqueVolumeName]volumeToMount),
  106. volumePluginMgr: volumePluginMgr,
  107. }
  108. }
  109. type desiredStateOfWorld struct {
  110. // volumesToMount is a map containing the set of volumes that should be
  111. // attached to this node and mounted to the pods referencing it. The key in
  112. // the map is the name of the volume and the value is a volume object
  113. // containing more information about the volume.
  114. volumesToMount map[v1.UniqueVolumeName]volumeToMount
  115. // volumePluginMgr is the volume plugin manager used to create volume
  116. // plugin objects.
  117. volumePluginMgr *volume.VolumePluginMgr
  118. sync.RWMutex
  119. }
  120. // The volume object represents a volume that should be attached to this node,
  121. // and mounted to podsToMount.
  122. type volumeToMount struct {
  123. // volumeName contains the unique identifier for this volume.
  124. volumeName v1.UniqueVolumeName
  125. // podsToMount is a map containing the set of pods that reference this
  126. // volume and should mount it once it is attached. The key in the map is
  127. // the name of the pod and the value is a pod object containing more
  128. // information about the pod.
  129. podsToMount map[types.UniquePodName]podToMount
  130. // pluginIsAttachable indicates that the plugin for this volume implements
  131. // the volume.Attacher interface
  132. pluginIsAttachable bool
  133. // pluginIsDeviceMountable indicates that the plugin for this volume implements
  134. // the volume.DeviceMounter interface
  135. pluginIsDeviceMountable bool
  136. // volumeGidValue contains the value of the GID annotation, if present.
  137. volumeGidValue string
  138. // reportedInUse indicates that the volume was successfully added to the
  139. // VolumesInUse field in the node's status.
  140. reportedInUse bool
  141. // desiredSizeLimit indicates the desired upper bound on the size of the volume
  142. // (if so implemented)
  143. desiredSizeLimit *resource.Quantity
  144. }
  145. // The pod object represents a pod that references the underlying volume and
  146. // should mount it once it is attached.
  147. type podToMount struct {
  148. // podName contains the name of this pod.
  149. podName types.UniquePodName
  150. // Pod to mount the volume to. Used to create NewMounter.
  151. pod *v1.Pod
  152. // volume spec containing the specification for this volume. Used to
  153. // generate the volume plugin object, and passed to plugin methods.
  154. // For non-PVC volumes this is the same as defined in the pod object. For
  155. // PVC volumes it is from the dereferenced PV object.
  156. volumeSpec *volume.Spec
  157. // outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
  158. // directly in the pod. If the volume was referenced through a persistent
  159. // volume claim, this contains the volume.Spec.Name() of the persistent
  160. // volume claim
  161. outerVolumeSpecName string
  162. }
  163. func (dsw *desiredStateOfWorld) AddPodToVolume(
  164. podName types.UniquePodName,
  165. pod *v1.Pod,
  166. volumeSpec *volume.Spec,
  167. outerVolumeSpecName string,
  168. volumeGidValue string) (v1.UniqueVolumeName, error) {
  169. dsw.Lock()
  170. defer dsw.Unlock()
  171. volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)
  172. if err != nil || volumePlugin == nil {
  173. return "", fmt.Errorf(
  174. "failed to get Plugin from volumeSpec for volume %q err=%v",
  175. volumeSpec.Name(),
  176. err)
  177. }
  178. var volumeName v1.UniqueVolumeName
  179. // The unique volume name used depends on whether the volume is attachable/device-mountable
  180. // or not.
  181. attachable := dsw.isAttachableVolume(volumeSpec)
  182. deviceMountable := dsw.isDeviceMountableVolume(volumeSpec)
  183. if attachable || deviceMountable {
  184. // For attachable/device-mountable volumes, use the unique volume name as reported by
  185. // the plugin.
  186. volumeName, err =
  187. util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
  188. if err != nil {
  189. return "", fmt.Errorf(
  190. "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
  191. volumeSpec.Name(),
  192. volumePlugin.GetPluginName(),
  193. err)
  194. }
  195. } else {
  196. // For non-attachable and non-device-mountable volumes, generate a unique name based on the pod
  197. // namespace and name and the name of the volume within the pod.
  198. volumeName = util.GetUniqueVolumeNameFromSpecWithPod(podName, volumePlugin, volumeSpec)
  199. }
  200. if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {
  201. var sizeLimit *resource.Quantity
  202. if volumeSpec.Volume != nil {
  203. if util.IsLocalEphemeralVolume(*volumeSpec.Volume) {
  204. _, podLimits := apiv1resource.PodRequestsAndLimits(pod)
  205. ephemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
  206. sizeLimit = resource.NewQuantity(ephemeralStorageLimit.Value(), resource.BinarySI)
  207. if volumeSpec.Volume.EmptyDir != nil &&
  208. volumeSpec.Volume.EmptyDir.SizeLimit != nil &&
  209. volumeSpec.Volume.EmptyDir.SizeLimit.Value() > 0 &&
  210. volumeSpec.Volume.EmptyDir.SizeLimit.Value() < sizeLimit.Value() {
  211. sizeLimit = resource.NewQuantity(volumeSpec.Volume.EmptyDir.SizeLimit.Value(), resource.BinarySI)
  212. }
  213. }
  214. }
  215. dsw.volumesToMount[volumeName] = volumeToMount{
  216. volumeName: volumeName,
  217. podsToMount: make(map[types.UniquePodName]podToMount),
  218. pluginIsAttachable: attachable,
  219. pluginIsDeviceMountable: deviceMountable,
  220. volumeGidValue: volumeGidValue,
  221. reportedInUse: false,
  222. desiredSizeLimit: sizeLimit,
  223. }
  224. }
  225. // Create new podToMount object. If it already exists, it is refreshed with
  226. // updated values (this is required for volumes that require remounting on
  227. // pod update, like Downward API volumes).
  228. dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
  229. podName: podName,
  230. pod: pod,
  231. volumeSpec: volumeSpec,
  232. outerVolumeSpecName: outerVolumeSpecName,
  233. }
  234. return volumeName, nil
  235. }
  236. func (dsw *desiredStateOfWorld) MarkVolumesReportedInUse(
  237. reportedVolumes []v1.UniqueVolumeName) {
  238. dsw.Lock()
  239. defer dsw.Unlock()
  240. reportedVolumesMap := make(
  241. map[v1.UniqueVolumeName]bool, len(reportedVolumes) /* capacity */)
  242. for _, reportedVolume := range reportedVolumes {
  243. reportedVolumesMap[reportedVolume] = true
  244. }
  245. for volumeName, volumeObj := range dsw.volumesToMount {
  246. _, volumeReported := reportedVolumesMap[volumeName]
  247. volumeObj.reportedInUse = volumeReported
  248. dsw.volumesToMount[volumeName] = volumeObj
  249. }
  250. }
  251. func (dsw *desiredStateOfWorld) DeletePodFromVolume(
  252. podName types.UniquePodName, volumeName v1.UniqueVolumeName) {
  253. dsw.Lock()
  254. defer dsw.Unlock()
  255. volumeObj, volumeExists := dsw.volumesToMount[volumeName]
  256. if !volumeExists {
  257. return
  258. }
  259. if _, podExists := volumeObj.podsToMount[podName]; !podExists {
  260. return
  261. }
  262. // Delete pod if it exists
  263. delete(dsw.volumesToMount[volumeName].podsToMount, podName)
  264. if len(dsw.volumesToMount[volumeName].podsToMount) == 0 {
  265. // Delete volume if no child pods left
  266. delete(dsw.volumesToMount, volumeName)
  267. }
  268. }
  269. func (dsw *desiredStateOfWorld) VolumeExists(
  270. volumeName v1.UniqueVolumeName) bool {
  271. dsw.RLock()
  272. defer dsw.RUnlock()
  273. _, volumeExists := dsw.volumesToMount[volumeName]
  274. return volumeExists
  275. }
  276. func (dsw *desiredStateOfWorld) PodExistsInVolume(
  277. podName types.UniquePodName, volumeName v1.UniqueVolumeName) bool {
  278. dsw.RLock()
  279. defer dsw.RUnlock()
  280. volumeObj, volumeExists := dsw.volumesToMount[volumeName]
  281. if !volumeExists {
  282. return false
  283. }
  284. _, podExists := volumeObj.podsToMount[podName]
  285. return podExists
  286. }
  287. func (dsw *desiredStateOfWorld) VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool {
  288. dsw.RLock()
  289. defer dsw.RUnlock()
  290. for _, volumeObj := range dsw.volumesToMount {
  291. for name, podObj := range volumeObj.podsToMount {
  292. if podName == name && podObj.volumeSpec.Name() == volumeSpecName {
  293. return true
  294. }
  295. }
  296. }
  297. return false
  298. }
  299. func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
  300. dsw.RLock()
  301. defer dsw.RUnlock()
  302. podList := make(map[types.UniquePodName]bool)
  303. for _, volumeObj := range dsw.volumesToMount {
  304. for podName := range volumeObj.podsToMount {
  305. if !podList[podName] {
  306. podList[podName] = true
  307. }
  308. }
  309. }
  310. return podList
  311. }
  312. func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
  313. dsw.RLock()
  314. defer dsw.RUnlock()
  315. volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */)
  316. for volumeName, volumeObj := range dsw.volumesToMount {
  317. for podName, podObj := range volumeObj.podsToMount {
  318. volumesToMount = append(
  319. volumesToMount,
  320. VolumeToMount{
  321. VolumeToMount: operationexecutor.VolumeToMount{
  322. VolumeName: volumeName,
  323. PodName: podName,
  324. Pod: podObj.pod,
  325. VolumeSpec: podObj.volumeSpec,
  326. PluginIsAttachable: volumeObj.pluginIsAttachable,
  327. PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable,
  328. OuterVolumeSpecName: podObj.outerVolumeSpecName,
  329. VolumeGidValue: volumeObj.volumeGidValue,
  330. ReportedInUse: volumeObj.reportedInUse,
  331. DesiredSizeLimit: volumeObj.desiredSizeLimit}})
  332. }
  333. }
  334. return volumesToMount
  335. }
  336. func (dsw *desiredStateOfWorld) isAttachableVolume(volumeSpec *volume.Spec) bool {
  337. attachableVolumePlugin, _ :=
  338. dsw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  339. if attachableVolumePlugin != nil {
  340. volumeAttacher, err := attachableVolumePlugin.NewAttacher()
  341. if err == nil && volumeAttacher != nil {
  342. return true
  343. }
  344. }
  345. return false
  346. }
  347. func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec) bool {
  348. deviceMountableVolumePlugin, _ := dsw.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
  349. if deviceMountableVolumePlugin != nil {
  350. volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
  351. if err == nil && volumeDeviceMounter != nil {
  352. return true
  353. }
  354. }
  355. return false
  356. }