desired_state_of_world.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package cache implements data structures used by the kubelet volume manager to
  15. keep track of attached volumes and the pods that mounted them.
  16. */
  17. package cache
  18. import (
  19. "fmt"
  20. "sync"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource"
  25. "k8s.io/kubernetes/pkg/volume"
  26. "k8s.io/kubernetes/pkg/volume/util"
  27. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  28. "k8s.io/kubernetes/pkg/volume/util/types"
  29. )
  30. // DesiredStateOfWorld defines a set of thread-safe operations for the kubelet
  31. // volume manager's desired state of the world cache.
  32. // This cache contains volumes->pods i.e. a set of all volumes that should be
  33. // attached to this node and the pods that reference them and should mount the
  34. // volume.
  35. // Note: This is distinct from the DesiredStateOfWorld implemented by the
  36. // attach/detach controller. They both keep track of different objects. This
  37. // contains kubelet volume manager specific state.
  38. type DesiredStateOfWorld interface {
  39. // AddPodToVolume adds the given pod to the given volume in the cache
  40. // indicating the specified pod should mount the specified volume.
  41. // A unique volumeName is generated from the volumeSpec and returned on
  42. // success.
  43. // If no volume plugin can support the given volumeSpec or more than one
  44. // plugin can support it, an error is returned.
  45. // If a volume with the name volumeName does not exist in the list of
  46. // volumes that should be attached to this node, the volume is implicitly
  47. // added.
  48. // If a pod with the same unique name already exists under the specified
  49. // volume, this is a no-op.
  50. AddPodToVolume(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (v1.UniqueVolumeName, error)
  51. // MarkVolumesReportedInUse sets the ReportedInUse value to true for the
  52. // reportedVolumes. For volumes not in the reportedVolumes list, the
  53. // ReportedInUse value is reset to false. The default ReportedInUse value
  54. // for a newly created volume is false.
  55. // When set to true this value indicates that the volume was successfully
  56. // added to the VolumesInUse field in the node's status. Mount operation needs
  57. // to check this value before issuing the operation.
  58. // If a volume in the reportedVolumes list does not exist in the list of
  59. // volumes that should be attached to this node, it is skipped without error.
  60. MarkVolumesReportedInUse(reportedVolumes []v1.UniqueVolumeName)
  61. // DeletePodFromVolume removes the given pod from the given volume in the
  62. // cache indicating the specified pod no longer requires the specified
  63. // volume.
  64. // If a pod with the same unique name does not exist under the specified
  65. // volume, this is a no-op.
  66. // If a volume with the name volumeName does not exist in the list of
  67. // attached volumes, this is a no-op.
  68. // If after deleting the pod, the specified volume contains no other child
  69. // pods, the volume is also deleted.
  70. DeletePodFromVolume(podName types.UniquePodName, volumeName v1.UniqueVolumeName)
  71. // VolumeExists returns true if the given volume exists in the list of
  72. // volumes that should be attached to this node.
  73. // If a pod with the same unique name does not exist under the specified
  74. // volume, false is returned.
  75. VolumeExists(volumeName v1.UniqueVolumeName) bool
  76. // PodExistsInVolume returns true if the given pod exists in the list of
  77. // podsToMount for the given volume in the cache.
  78. // If a pod with the same unique name does not exist under the specified
  79. // volume, false is returned.
  80. // If a volume with the name volumeName does not exist in the list of
  81. // attached volumes, false is returned.
  82. PodExistsInVolume(podName types.UniquePodName, volumeName v1.UniqueVolumeName) bool
  83. // GetVolumesToMount generates and returns a list of volumes that should be
  84. // attached to this node and the pods they should be mounted to based on the
  85. // current desired state of the world.
  86. GetVolumesToMount() []VolumeToMount
  87. // GetPods generates and returns a map of pods in which map is indexed
  88. // with pod's unique name. This map can be used to determine which pod is currently
  89. // in desired state of world.
  90. GetPods() map[types.UniquePodName]bool
  91. // VolumeExistsWithSpecName returns true if the given volume specified with the
  92. // volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
  93. // volumes that should be attached to this node.
  94. // If a pod with the same name does not exist under the specified
  95. // volume, false is returned.
  96. VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool
  97. // AddErrorToPod adds the given error to the given pod in the cache.
  98. // It will be returned by subsequent GetPodErrors().
  99. // Each error string is stored only once.
  100. AddErrorToPod(podName types.UniquePodName, err string)
  101. // PopPodErrors returns accumulated errors on a given pod and clears
  102. // them.
  103. PopPodErrors(podName types.UniquePodName) []string
  104. // GetPodsWithErrors returns names of pods that have stored errors.
  105. GetPodsWithErrors() []types.UniquePodName
  106. }
  107. // VolumeToMount represents a volume that is attached to this node and needs to
  108. // be mounted to PodName.
  109. type VolumeToMount struct {
  110. operationexecutor.VolumeToMount
  111. }
  112. // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
  113. func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
  114. return &desiredStateOfWorld{
  115. volumesToMount: make(map[v1.UniqueVolumeName]volumeToMount),
  116. volumePluginMgr: volumePluginMgr,
  117. podErrors: make(map[types.UniquePodName]sets.String),
  118. }
  119. }
  120. type desiredStateOfWorld struct {
  121. // volumesToMount is a map containing the set of volumes that should be
  122. // attached to this node and mounted to the pods referencing it. The key in
  123. // the map is the name of the volume and the value is a volume object
  124. // containing more information about the volume.
  125. volumesToMount map[v1.UniqueVolumeName]volumeToMount
  126. // volumePluginMgr is the volume plugin manager used to create volume
  127. // plugin objects.
  128. volumePluginMgr *volume.VolumePluginMgr
  129. // podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod.
  130. podErrors map[types.UniquePodName]sets.String
  131. sync.RWMutex
  132. }
  133. // The volume object represents a volume that should be attached to this node,
  134. // and mounted to podsToMount.
  135. type volumeToMount struct {
  136. // volumeName contains the unique identifier for this volume.
  137. volumeName v1.UniqueVolumeName
  138. // podsToMount is a map containing the set of pods that reference this
  139. // volume and should mount it once it is attached. The key in the map is
  140. // the name of the pod and the value is a pod object containing more
  141. // information about the pod.
  142. podsToMount map[types.UniquePodName]podToMount
  143. // pluginIsAttachable indicates that the plugin for this volume implements
  144. // the volume.Attacher interface
  145. pluginIsAttachable bool
  146. // pluginIsDeviceMountable indicates that the plugin for this volume implements
  147. // the volume.DeviceMounter interface
  148. pluginIsDeviceMountable bool
  149. // volumeGidValue contains the value of the GID annotation, if present.
  150. volumeGidValue string
  151. // reportedInUse indicates that the volume was successfully added to the
  152. // VolumesInUse field in the node's status.
  153. reportedInUse bool
  154. // desiredSizeLimit indicates the desired upper bound on the size of the volume
  155. // (if so implemented)
  156. desiredSizeLimit *resource.Quantity
  157. }
  158. // The pod object represents a pod that references the underlying volume and
  159. // should mount it once it is attached.
  160. type podToMount struct {
  161. // podName contains the name of this pod.
  162. podName types.UniquePodName
  163. // Pod to mount the volume to. Used to create NewMounter.
  164. pod *v1.Pod
  165. // volume spec containing the specification for this volume. Used to
  166. // generate the volume plugin object, and passed to plugin methods.
  167. // For non-PVC volumes this is the same as defined in the pod object. For
  168. // PVC volumes it is from the dereferenced PV object.
  169. volumeSpec *volume.Spec
  170. // outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
  171. // directly in the pod. If the volume was referenced through a persistent
  172. // volume claim, this contains the volume.Spec.Name() of the persistent
  173. // volume claim
  174. outerVolumeSpecName string
  175. }
  176. const (
  177. // Maximum errors to be stored per pod in desiredStateOfWorld.podErrors to
  178. // prevent unbound growth.
  179. maxPodErrors = 10
  180. )
  181. func (dsw *desiredStateOfWorld) AddPodToVolume(
  182. podName types.UniquePodName,
  183. pod *v1.Pod,
  184. volumeSpec *volume.Spec,
  185. outerVolumeSpecName string,
  186. volumeGidValue string) (v1.UniqueVolumeName, error) {
  187. dsw.Lock()
  188. defer dsw.Unlock()
  189. volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)
  190. if err != nil || volumePlugin == nil {
  191. return "", fmt.Errorf(
  192. "failed to get Plugin from volumeSpec for volume %q err=%v",
  193. volumeSpec.Name(),
  194. err)
  195. }
  196. var volumeName v1.UniqueVolumeName
  197. // The unique volume name used depends on whether the volume is attachable/device-mountable
  198. // or not.
  199. attachable := dsw.isAttachableVolume(volumeSpec)
  200. deviceMountable := dsw.isDeviceMountableVolume(volumeSpec)
  201. if attachable || deviceMountable {
  202. // For attachable/device-mountable volumes, use the unique volume name as reported by
  203. // the plugin.
  204. volumeName, err =
  205. util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
  206. if err != nil {
  207. return "", fmt.Errorf(
  208. "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
  209. volumeSpec.Name(),
  210. volumePlugin.GetPluginName(),
  211. err)
  212. }
  213. } else {
  214. // For non-attachable and non-device-mountable volumes, generate a unique name based on the pod
  215. // namespace and name and the name of the volume within the pod.
  216. volumeName = util.GetUniqueVolumeNameFromSpecWithPod(podName, volumePlugin, volumeSpec)
  217. }
  218. if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {
  219. var sizeLimit *resource.Quantity
  220. if volumeSpec.Volume != nil {
  221. if util.IsLocalEphemeralVolume(*volumeSpec.Volume) {
  222. _, podLimits := apiv1resource.PodRequestsAndLimits(pod)
  223. ephemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
  224. sizeLimit = resource.NewQuantity(ephemeralStorageLimit.Value(), resource.BinarySI)
  225. if volumeSpec.Volume.EmptyDir != nil &&
  226. volumeSpec.Volume.EmptyDir.SizeLimit != nil &&
  227. volumeSpec.Volume.EmptyDir.SizeLimit.Value() > 0 &&
  228. volumeSpec.Volume.EmptyDir.SizeLimit.Value() < sizeLimit.Value() {
  229. sizeLimit = resource.NewQuantity(volumeSpec.Volume.EmptyDir.SizeLimit.Value(), resource.BinarySI)
  230. }
  231. }
  232. }
  233. dsw.volumesToMount[volumeName] = volumeToMount{
  234. volumeName: volumeName,
  235. podsToMount: make(map[types.UniquePodName]podToMount),
  236. pluginIsAttachable: attachable,
  237. pluginIsDeviceMountable: deviceMountable,
  238. volumeGidValue: volumeGidValue,
  239. reportedInUse: false,
  240. desiredSizeLimit: sizeLimit,
  241. }
  242. }
  243. // Create new podToMount object. If it already exists, it is refreshed with
  244. // updated values (this is required for volumes that require remounting on
  245. // pod update, like Downward API volumes).
  246. dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
  247. podName: podName,
  248. pod: pod,
  249. volumeSpec: volumeSpec,
  250. outerVolumeSpecName: outerVolumeSpecName,
  251. }
  252. return volumeName, nil
  253. }
  254. func (dsw *desiredStateOfWorld) MarkVolumesReportedInUse(
  255. reportedVolumes []v1.UniqueVolumeName) {
  256. dsw.Lock()
  257. defer dsw.Unlock()
  258. reportedVolumesMap := make(
  259. map[v1.UniqueVolumeName]bool, len(reportedVolumes) /* capacity */)
  260. for _, reportedVolume := range reportedVolumes {
  261. reportedVolumesMap[reportedVolume] = true
  262. }
  263. for volumeName, volumeObj := range dsw.volumesToMount {
  264. _, volumeReported := reportedVolumesMap[volumeName]
  265. volumeObj.reportedInUse = volumeReported
  266. dsw.volumesToMount[volumeName] = volumeObj
  267. }
  268. }
  269. func (dsw *desiredStateOfWorld) DeletePodFromVolume(
  270. podName types.UniquePodName, volumeName v1.UniqueVolumeName) {
  271. dsw.Lock()
  272. defer dsw.Unlock()
  273. delete(dsw.podErrors, podName)
  274. volumeObj, volumeExists := dsw.volumesToMount[volumeName]
  275. if !volumeExists {
  276. return
  277. }
  278. if _, podExists := volumeObj.podsToMount[podName]; !podExists {
  279. return
  280. }
  281. // Delete pod if it exists
  282. delete(dsw.volumesToMount[volumeName].podsToMount, podName)
  283. if len(dsw.volumesToMount[volumeName].podsToMount) == 0 {
  284. // Delete volume if no child pods left
  285. delete(dsw.volumesToMount, volumeName)
  286. }
  287. }
  288. func (dsw *desiredStateOfWorld) VolumeExists(
  289. volumeName v1.UniqueVolumeName) bool {
  290. dsw.RLock()
  291. defer dsw.RUnlock()
  292. _, volumeExists := dsw.volumesToMount[volumeName]
  293. return volumeExists
  294. }
  295. func (dsw *desiredStateOfWorld) PodExistsInVolume(
  296. podName types.UniquePodName, volumeName v1.UniqueVolumeName) bool {
  297. dsw.RLock()
  298. defer dsw.RUnlock()
  299. volumeObj, volumeExists := dsw.volumesToMount[volumeName]
  300. if !volumeExists {
  301. return false
  302. }
  303. _, podExists := volumeObj.podsToMount[podName]
  304. return podExists
  305. }
  306. func (dsw *desiredStateOfWorld) VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool {
  307. dsw.RLock()
  308. defer dsw.RUnlock()
  309. for _, volumeObj := range dsw.volumesToMount {
  310. if podObj, podExists := volumeObj.podsToMount[podName]; podExists {
  311. if podObj.volumeSpec.Name() == volumeSpecName {
  312. return true
  313. }
  314. }
  315. }
  316. return false
  317. }
  318. func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
  319. dsw.RLock()
  320. defer dsw.RUnlock()
  321. podList := make(map[types.UniquePodName]bool)
  322. for _, volumeObj := range dsw.volumesToMount {
  323. for podName := range volumeObj.podsToMount {
  324. podList[podName] = true
  325. }
  326. }
  327. return podList
  328. }
  329. func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
  330. dsw.RLock()
  331. defer dsw.RUnlock()
  332. volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */)
  333. for volumeName, volumeObj := range dsw.volumesToMount {
  334. for podName, podObj := range volumeObj.podsToMount {
  335. volumesToMount = append(
  336. volumesToMount,
  337. VolumeToMount{
  338. VolumeToMount: operationexecutor.VolumeToMount{
  339. VolumeName: volumeName,
  340. PodName: podName,
  341. Pod: podObj.pod,
  342. VolumeSpec: podObj.volumeSpec,
  343. PluginIsAttachable: volumeObj.pluginIsAttachable,
  344. PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable,
  345. OuterVolumeSpecName: podObj.outerVolumeSpecName,
  346. VolumeGidValue: volumeObj.volumeGidValue,
  347. ReportedInUse: volumeObj.reportedInUse,
  348. DesiredSizeLimit: volumeObj.desiredSizeLimit}})
  349. }
  350. }
  351. return volumesToMount
  352. }
  353. func (dsw *desiredStateOfWorld) isAttachableVolume(volumeSpec *volume.Spec) bool {
  354. attachableVolumePlugin, _ :=
  355. dsw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  356. if attachableVolumePlugin != nil {
  357. volumeAttacher, err := attachableVolumePlugin.NewAttacher()
  358. if err == nil && volumeAttacher != nil {
  359. return true
  360. }
  361. }
  362. return false
  363. }
  364. func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec) bool {
  365. deviceMountableVolumePlugin, _ := dsw.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
  366. if deviceMountableVolumePlugin != nil {
  367. volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
  368. if err == nil && volumeDeviceMounter != nil {
  369. return true
  370. }
  371. }
  372. return false
  373. }
  374. func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) {
  375. dsw.Lock()
  376. defer dsw.Unlock()
  377. if errs, found := dsw.podErrors[podName]; found {
  378. if errs.Len() <= maxPodErrors {
  379. errs.Insert(err)
  380. }
  381. return
  382. }
  383. dsw.podErrors[podName] = sets.NewString(err)
  384. }
  385. func (dsw *desiredStateOfWorld) PopPodErrors(podName types.UniquePodName) []string {
  386. dsw.Lock()
  387. defer dsw.Unlock()
  388. if errs, found := dsw.podErrors[podName]; found {
  389. delete(dsw.podErrors, podName)
  390. return errs.List()
  391. }
  392. return []string{}
  393. }
  394. func (dsw *desiredStateOfWorld) GetPodsWithErrors() []types.UniquePodName {
  395. dsw.RLock()
  396. defer dsw.RUnlock()
  397. pods := make([]types.UniquePodName, 0, len(dsw.podErrors))
  398. for podName := range dsw.podErrors {
  399. pods = append(pods, podName)
  400. }
  401. return pods
  402. }