util.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "errors"
  16. "fmt"
  17. "strings"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. utilfeature "k8s.io/apiserver/pkg/util/feature"
  22. corelisters "k8s.io/client-go/listers/core/v1"
  23. "k8s.io/klog"
  24. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  25. "k8s.io/kubernetes/pkg/features"
  26. "k8s.io/kubernetes/pkg/volume"
  27. "k8s.io/kubernetes/pkg/volume/csimigration"
  28. "k8s.io/kubernetes/pkg/volume/util"
  29. )
  30. // CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
  31. // specified volume. It dereference any PVC to get PV objects, if needed.
  32. // A volume.Spec that refers to an in-tree plugin spec is translated to refer
  33. // to a migrated CSI plugin spec if all conditions for CSI migration on a node
  34. // for the in-tree plugin is satisfied.
  35. func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
  36. if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
  37. klog.V(10).Infof(
  38. "Found PVC, ClaimName: %q/%q",
  39. podNamespace,
  40. pvcSource.ClaimName)
  41. // If podVolume is a PVC, fetch the real PV behind the claim
  42. pvName, pvcUID, err := getPVCFromCacheExtractPV(
  43. podNamespace, pvcSource.ClaimName, pvcLister)
  44. if err != nil {
  45. return nil, fmt.Errorf(
  46. "error processing PVC %q/%q: %v",
  47. podNamespace,
  48. pvcSource.ClaimName,
  49. err)
  50. }
  51. klog.V(10).Infof(
  52. "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
  53. podNamespace,
  54. pvcSource.ClaimName,
  55. pvcUID,
  56. pvName)
  57. // Fetch actual PV object
  58. volumeSpec, err := getPVSpecFromCache(
  59. pvName, pvcSource.ReadOnly, pvcUID, pvLister)
  60. if err != nil {
  61. return nil, fmt.Errorf(
  62. "error processing PVC %q/%q: %v",
  63. podNamespace,
  64. pvcSource.ClaimName,
  65. err)
  66. }
  67. volumeSpec, err = translateInTreeSpecToCSIIfNeeded(volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator)
  68. if err != nil {
  69. return nil, fmt.Errorf(
  70. "error performing CSI migration checks and translation for PVC %q/%q: %v",
  71. podNamespace,
  72. pvcSource.ClaimName,
  73. err)
  74. }
  75. klog.V(10).Infof(
  76. "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
  77. volumeSpec.Name(),
  78. pvName,
  79. podNamespace,
  80. pvcSource.ClaimName,
  81. pvcUID)
  82. return volumeSpec, nil
  83. }
  84. // Do not return the original volume object, since it's from the shared
  85. // informer it may be mutated by another consumer.
  86. clonedPodVolume := podVolume.DeepCopy()
  87. origspec := volume.NewSpecFromVolume(clonedPodVolume)
  88. spec, err := translateInTreeSpecToCSIIfNeeded(origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator)
  89. if err != nil {
  90. return nil, fmt.Errorf(
  91. "error performing CSI migration checks and translation for inline volume %q: %v",
  92. podVolume.Name,
  93. err)
  94. }
  95. return spec, nil
  96. }
  97. // getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
  98. // name from the shared internal PVC store extracts the name of the PV it is
  99. // pointing to and returns it.
  100. // This method returns an error if a PVC object does not exist in the cache
  101. // with the given namespace/name.
  102. // This method returns an error if the PVC object's phase is not "Bound".
  103. func getPVCFromCacheExtractPV(namespace string, name string, pvcLister corelisters.PersistentVolumeClaimLister) (string, types.UID, error) {
  104. pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(name)
  105. if err != nil {
  106. return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
  107. }
  108. if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
  109. return "", "", fmt.Errorf(
  110. "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
  111. namespace,
  112. name,
  113. pvc.Status.Phase,
  114. pvc.Spec.VolumeName)
  115. }
  116. return pvc.Spec.VolumeName, pvc.UID, nil
  117. }
  118. // getPVSpecFromCache fetches the PV object with the given name from the shared
  119. // internal PV store and returns a volume.Spec representing it.
  120. // This method returns an error if a PV object does not exist in the cache with
  121. // the given name.
  122. // This method deep copies the PV object so the caller may use the returned
  123. // volume.Spec object without worrying about it mutating unexpectedly.
  124. func getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID, pvLister corelisters.PersistentVolumeLister) (*volume.Spec, error) {
  125. pv, err := pvLister.Get(name)
  126. if err != nil {
  127. return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
  128. }
  129. if pv.Spec.ClaimRef == nil {
  130. return nil, fmt.Errorf(
  131. "found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
  132. name)
  133. }
  134. if pv.Spec.ClaimRef.UID != expectedClaimUID {
  135. return nil, fmt.Errorf(
  136. "found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
  137. name,
  138. pv.Spec.ClaimRef.UID,
  139. expectedClaimUID)
  140. }
  141. // Do not return the object from the informer, since the store is shared it
  142. // may be mutated by another consumer.
  143. clonedPV := pv.DeepCopy()
  144. return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
  145. }
  146. // DetermineVolumeAction returns true if volume and pod needs to be added to dswp
  147. // and it returns false if volume and pod needs to be removed from dswp
  148. func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOfWorld, defaultAction bool) bool {
  149. if pod == nil || len(pod.Spec.Volumes) <= 0 {
  150. return defaultAction
  151. }
  152. nodeName := types.NodeName(pod.Spec.NodeName)
  153. keepTerminatedPodVolume := desiredStateOfWorld.GetKeepTerminatedPodVolumesForNode(nodeName)
  154. if util.IsPodTerminated(pod, pod.Status) {
  155. // if pod is terminate we let kubelet policy dictate if volume
  156. // should be detached or not
  157. return keepTerminatedPodVolume
  158. }
  159. return defaultAction
  160. }
  161. // ProcessPodVolumes processes the volumes in the given pod and adds them to the
  162. // desired state of the world if addVolumes is true, otherwise it removes them.
  163. func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) {
  164. if pod == nil {
  165. return
  166. }
  167. if len(pod.Spec.Volumes) <= 0 {
  168. klog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.",
  169. pod.Namespace,
  170. pod.Name)
  171. return
  172. }
  173. nodeName := types.NodeName(pod.Spec.NodeName)
  174. if nodeName == "" {
  175. klog.V(10).Infof(
  176. "Skipping processing of pod %q/%q: it is not scheduled to a node.",
  177. pod.Namespace,
  178. pod.Name)
  179. return
  180. } else if !desiredStateOfWorld.NodeExists(nodeName) {
  181. // If the node the pod is scheduled to does not exist in the desired
  182. // state of the world data structure, that indicates the node is not
  183. // yet managed by the controller. Therefore, ignore the pod.
  184. klog.V(4).Infof(
  185. "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
  186. pod.Namespace,
  187. pod.Name,
  188. nodeName)
  189. return
  190. }
  191. // Process volume spec for each volume defined in pod
  192. for _, podVolume := range pod.Spec.Volumes {
  193. volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator)
  194. if err != nil {
  195. klog.V(10).Infof(
  196. "Error processing volume %q for pod %q/%q: %v",
  197. podVolume.Name,
  198. pod.Namespace,
  199. pod.Name,
  200. err)
  201. continue
  202. }
  203. attachableVolumePlugin, err :=
  204. volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
  205. if err != nil || attachableVolumePlugin == nil {
  206. klog.V(10).Infof(
  207. "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
  208. podVolume.Name,
  209. pod.Namespace,
  210. pod.Name,
  211. err)
  212. continue
  213. }
  214. uniquePodName := util.GetUniquePodName(pod)
  215. if addVolumes {
  216. // Add volume to desired state of world
  217. _, err := desiredStateOfWorld.AddPod(
  218. uniquePodName, pod, volumeSpec, nodeName)
  219. if err != nil {
  220. klog.V(10).Infof(
  221. "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
  222. podVolume.Name,
  223. pod.Namespace,
  224. pod.Name,
  225. err)
  226. }
  227. } else {
  228. // Remove volume from desired state of world
  229. uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec(
  230. attachableVolumePlugin, volumeSpec)
  231. if err != nil {
  232. klog.V(10).Infof(
  233. "Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
  234. podVolume.Name,
  235. pod.Namespace,
  236. pod.Name,
  237. err)
  238. continue
  239. }
  240. desiredStateOfWorld.DeletePod(
  241. uniquePodName, uniqueVolumeName, nodeName)
  242. }
  243. }
  244. return
  245. }
  246. func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
  247. translatedSpec := spec
  248. migratable, err := csiMigratedPluginManager.IsMigratable(spec)
  249. if err != nil {
  250. return nil, err
  251. }
  252. if !migratable {
  253. // Jump out of translation fast so we don't check the node if the spec itself is not migratable
  254. return spec, nil
  255. }
  256. migrationSupportedOnNode, err := isCSIMigrationSupportedOnNode(nodeName, spec, vpm, csiMigratedPluginManager)
  257. if err != nil {
  258. return nil, err
  259. }
  260. if migratable && migrationSupportedOnNode {
  261. translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(spec, csiTranslator)
  262. if err != nil {
  263. return nil, err
  264. }
  265. }
  266. return translatedSpec, nil
  267. }
  268. func isCSIMigrationSupportedOnNode(nodeName types.NodeName, spec *volume.Spec, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager) (bool, error) {
  269. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) ||
  270. !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  271. // If CSIMigration is disabled, CSI migration paths will not be taken for
  272. // the node. If CSINodeInfo is disabled, checking of installation status
  273. // of a migrated CSI plugin cannot be performed. Therefore stick to
  274. // in-tree plugins.
  275. return false, nil
  276. }
  277. pluginName, err := csiMigratedPluginManager.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
  278. if err != nil {
  279. return false, err
  280. }
  281. if len(pluginName) == 0 {
  282. // Could not find a plugin name from translation directory, assume not translated
  283. return false, nil
  284. }
  285. if csiMigratedPluginManager.IsMigrationCompleteForPlugin(pluginName) {
  286. // All nodes are expected to have migrated CSI plugin installed and
  287. // configured when CSI Migration Complete flag is enabled for a plugin.
  288. // CSI migration is supported even if there is version skew between
  289. // managers and node.
  290. return true, nil
  291. }
  292. if len(nodeName) == 0 {
  293. return false, errors.New("nodeName is empty")
  294. }
  295. kubeClient := vpm.Host.GetKubeClient()
  296. if kubeClient == nil {
  297. // Don't handle the controller/kubelet version skew check and fallback
  298. // to just checking the feature gates. This can happen if
  299. // we are in a standalone (headless) Kubelet
  300. return true, nil
  301. }
  302. adcHost, ok := vpm.Host.(volume.AttachDetachVolumeHost)
  303. if !ok {
  304. // Don't handle the controller/kubelet version skew check and fallback
  305. // to just checking the feature gates. This can happen if
  306. // "enableControllerAttachDetach" is set to true on kubelet
  307. return true, nil
  308. }
  309. if adcHost.CSINodeLister() == nil {
  310. return false, errors.New("could not find CSINodeLister in attachDetachController")
  311. }
  312. csiNode, err := adcHost.CSINodeLister().Get(string(nodeName))
  313. if err != nil {
  314. return false, err
  315. }
  316. ann := csiNode.GetAnnotations()
  317. if ann == nil {
  318. return false, nil
  319. }
  320. mpa := ann[v1.MigratedPluginsAnnotationKey]
  321. tok := strings.Split(mpa, ",")
  322. mpaSet := sets.NewString(tok...)
  323. isMigratedOnNode := mpaSet.Has(pluginName)
  324. if isMigratedOnNode {
  325. installed := false
  326. driverName, err := csiMigratedPluginManager.GetCSINameFromInTreeName(pluginName)
  327. if err != nil {
  328. return isMigratedOnNode, err
  329. }
  330. for _, driver := range csiNode.Spec.Drivers {
  331. if driver.Name == driverName {
  332. installed = true
  333. break
  334. }
  335. }
  336. if !installed {
  337. return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
  338. }
  339. }
  340. return isMigratedOnNode, nil
  341. }