util.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "fmt"
  16. "k8s.io/api/core/v1"
  17. "k8s.io/apimachinery/pkg/types"
  18. corelisters "k8s.io/client-go/listers/core/v1"
  19. "k8s.io/klog"
  20. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  21. "k8s.io/kubernetes/pkg/volume"
  22. "k8s.io/kubernetes/pkg/volume/util"
  23. )
  24. // CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
  25. // specified volume. It dereference any PVC to get PV objects, if needed.
  26. func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
  27. if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
  28. klog.V(10).Infof(
  29. "Found PVC, ClaimName: %q/%q",
  30. podNamespace,
  31. pvcSource.ClaimName)
  32. // If podVolume is a PVC, fetch the real PV behind the claim
  33. pvName, pvcUID, err := getPVCFromCacheExtractPV(
  34. podNamespace, pvcSource.ClaimName, pvcLister)
  35. if err != nil {
  36. return nil, fmt.Errorf(
  37. "error processing PVC %q/%q: %v",
  38. podNamespace,
  39. pvcSource.ClaimName,
  40. err)
  41. }
  42. klog.V(10).Infof(
  43. "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
  44. podNamespace,
  45. pvcSource.ClaimName,
  46. pvcUID,
  47. pvName)
  48. // Fetch actual PV object
  49. volumeSpec, err := getPVSpecFromCache(
  50. pvName, pvcSource.ReadOnly, pvcUID, pvLister)
  51. if err != nil {
  52. return nil, fmt.Errorf(
  53. "error processing PVC %q/%q: %v",
  54. podNamespace,
  55. pvcSource.ClaimName,
  56. err)
  57. }
  58. klog.V(10).Infof(
  59. "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
  60. volumeSpec.Name(),
  61. pvName,
  62. podNamespace,
  63. pvcSource.ClaimName,
  64. pvcUID)
  65. return volumeSpec, nil
  66. }
  67. // Do not return the original volume object, since it's from the shared
  68. // informer it may be mutated by another consumer.
  69. clonedPodVolume := podVolume.DeepCopy()
  70. return volume.NewSpecFromVolume(clonedPodVolume), nil
  71. }
  72. // getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
  73. // name from the shared internal PVC store extracts the name of the PV it is
  74. // pointing to and returns it.
  75. // This method returns an error if a PVC object does not exist in the cache
  76. // with the given namespace/name.
  77. // This method returns an error if the PVC object's phase is not "Bound".
  78. func getPVCFromCacheExtractPV(namespace string, name string, pvcLister corelisters.PersistentVolumeClaimLister) (string, types.UID, error) {
  79. pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(name)
  80. if err != nil {
  81. return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
  82. }
  83. if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
  84. return "", "", fmt.Errorf(
  85. "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
  86. namespace,
  87. name,
  88. pvc.Status.Phase,
  89. pvc.Spec.VolumeName)
  90. }
  91. return pvc.Spec.VolumeName, pvc.UID, nil
  92. }
  93. // getPVSpecFromCache fetches the PV object with the given name from the shared
  94. // internal PV store and returns a volume.Spec representing it.
  95. // This method returns an error if a PV object does not exist in the cache with
  96. // the given name.
  97. // This method deep copies the PV object so the caller may use the returned
  98. // volume.Spec object without worrying about it mutating unexpectedly.
  99. func getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
  100. pv, err := pvLister.Get(name)
  101. if err != nil {
  102. return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
  103. }
  104. if pv.Spec.ClaimRef == nil {
  105. return nil, fmt.Errorf(
  106. "found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
  107. name)
  108. }
  109. if pv.Spec.ClaimRef.UID != expectedClaimUID {
  110. return nil, fmt.Errorf(
  111. "found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
  112. name,
  113. pv.Spec.ClaimRef.UID,
  114. expectedClaimUID)
  115. }
  116. // Do not return the object from the informer, since the store is shared it
  117. // may be mutated by another consumer.
  118. clonedPV := pv.DeepCopy()
  119. return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
  120. }
  121. // DetermineVolumeAction returns true if volume and pod needs to be added to dswp
  122. // and it returns false if volume and pod needs to be removed from dswp
  123. func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOfWorld, defaultAction bool) bool {
  124. if pod == nil || len(pod.Spec.Volumes) <= 0 {
  125. return defaultAction
  126. }
  127. nodeName := types.NodeName(pod.Spec.NodeName)
  128. keepTerminatedPodVolume := desiredStateOfWorld.GetKeepTerminatedPodVolumesForNode(nodeName)
  129. if util.IsPodTerminated(pod, pod.Status) {
  130. // if pod is terminate we let kubelet policy dictate if volume
  131. // should be detached or not
  132. return keepTerminatedPodVolume
  133. }
  134. return defaultAction
  135. }
  136. // ProcessPodVolumes processes the volumes in the given pod and adds them to the
  137. // desired state of the world if addVolumes is true, otherwise it removes them.
  138. func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister) {
  139. if pod == nil {
  140. return
  141. }
  142. if len(pod.Spec.Volumes) <= 0 {
  143. klog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.",
  144. pod.Namespace,
  145. pod.Name)
  146. return
  147. }
  148. nodeName := types.NodeName(pod.Spec.NodeName)
  149. if nodeName == "" {
  150. klog.V(10).Infof(
  151. "Skipping processing of pod %q/%q: it is not scheduled to a node.",
  152. pod.Namespace,
  153. pod.Name)
  154. return
  155. } else if !desiredStateOfWorld.NodeExists(nodeName) {
  156. // If the node the pod is scheduled to does not exist in the desired
  157. // state of the world data structure, that indicates the node is not
  158. // yet managed by the controller. Therefore, ignore the pod.
  159. klog.V(4).Infof(
  160. "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
  161. pod.Namespace,
  162. pod.Name,
  163. nodeName)
  164. return
  165. }
  166. // Process volume spec for each volume defined in pod
  167. for _, podVolume := range pod.Spec.Volumes {
  168. volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, pvcLister, pvLister)
  169. if err != nil {
  170. klog.V(10).Infof(
  171. "Error processing volume %q for pod %q/%q: %v",
  172. podVolume.Name,
  173. pod.Namespace,
  174. pod.Name,
  175. err)
  176. continue
  177. }
  178. attachableVolumePlugin, err :=
  179. volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  180. if err != nil || attachableVolumePlugin == nil {
  181. klog.V(10).Infof(
  182. "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
  183. podVolume.Name,
  184. pod.Namespace,
  185. pod.Name,
  186. err)
  187. continue
  188. }
  189. uniquePodName := util.GetUniquePodName(pod)
  190. if addVolumes {
  191. // Add volume to desired state of world
  192. _, err := desiredStateOfWorld.AddPod(
  193. uniquePodName, pod, volumeSpec, nodeName)
  194. if err != nil {
  195. klog.V(10).Infof(
  196. "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
  197. podVolume.Name,
  198. pod.Namespace,
  199. pod.Name,
  200. err)
  201. }
  202. } else {
  203. // Remove volume from desired state of world
  204. uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec(
  205. attachableVolumePlugin, volumeSpec)
  206. if err != nil {
  207. klog.V(10).Infof(
  208. "Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
  209. podVolume.Name,
  210. pod.Namespace,
  211. pod.Name,
  212. err)
  213. continue
  214. }
  215. desiredStateOfWorld.DeletePod(
  216. uniquePodName, uniqueVolumeName, nodeName)
  217. }
  218. }
  219. return
  220. }