csi.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package nodevolumelimits
  14. import (
  15. "context"
  16. "fmt"
  17. v1 "k8s.io/api/core/v1"
  18. storagev1 "k8s.io/api/storage/v1"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. "k8s.io/apimachinery/pkg/util/rand"
  21. corelisters "k8s.io/client-go/listers/core/v1"
  22. storagelisters "k8s.io/client-go/listers/storage/v1"
  23. csitrans "k8s.io/csi-translation-lib"
  24. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  25. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  26. "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  27. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  28. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  29. "k8s.io/klog"
  30. )
  31. // InTreeToCSITranslator contains methods required to check migratable status
  32. // and perform translations from InTree PV's to CSI
  33. type InTreeToCSITranslator interface {
  34. IsPVMigratable(pv *v1.PersistentVolume) bool
  35. IsMigratableIntreePluginByName(inTreePluginName string) bool
  36. GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
  37. GetCSINameFromInTreeName(pluginName string) (string, error)
  38. TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
  39. }
  40. // CSILimits is a plugin that checks node volume limits.
  41. type CSILimits struct {
  42. csiNodeLister storagelisters.CSINodeLister
  43. pvLister corelisters.PersistentVolumeLister
  44. pvcLister corelisters.PersistentVolumeClaimLister
  45. scLister storagelisters.StorageClassLister
  46. randomVolumeIDPrefix string
  47. translator InTreeToCSITranslator
  48. }
  49. var _ framework.FilterPlugin = &CSILimits{}
  50. // CSIName is the name of the plugin used in the plugin registry and configurations.
  51. const CSIName = "NodeVolumeLimits"
  52. // Name returns name of the plugin. It is used in logs, etc.
  53. func (pl *CSILimits) Name() string {
  54. return CSIName
  55. }
  56. // Filter invoked at the filter extension point.
  57. func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  58. // If the new pod doesn't have any volume attached to it, the predicate will always be true
  59. if len(pod.Spec.Volumes) == 0 {
  60. return nil
  61. }
  62. node := nodeInfo.Node()
  63. if node == nil {
  64. return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
  65. }
  66. // If CSINode doesn't exist, the predicate may read the limits from Node object
  67. csiNode, err := pl.csiNodeLister.Get(node.Name)
  68. if err != nil {
  69. // TODO: return the error once CSINode is created by default (2 releases)
  70. klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
  71. }
  72. newVolumes := make(map[string]string)
  73. if err := pl.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
  74. return framework.NewStatus(framework.Error, err.Error())
  75. }
  76. // If the pod doesn't have any new CSI volumes, the predicate will always be true
  77. if len(newVolumes) == 0 {
  78. return nil
  79. }
  80. // If the node doesn't have volume limits, the predicate will always be true
  81. nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
  82. if len(nodeVolumeLimits) == 0 {
  83. return nil
  84. }
  85. attachedVolumes := make(map[string]string)
  86. for _, existingPod := range nodeInfo.Pods() {
  87. if err := pl.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
  88. return framework.NewStatus(framework.Error, err.Error())
  89. }
  90. }
  91. attachedVolumeCount := map[string]int{}
  92. for volumeUniqueName, volumeLimitKey := range attachedVolumes {
  93. if _, ok := newVolumes[volumeUniqueName]; ok {
  94. // Don't count single volume used in multiple pods more than once
  95. delete(newVolumes, volumeUniqueName)
  96. }
  97. attachedVolumeCount[volumeLimitKey]++
  98. }
  99. newVolumeCount := map[string]int{}
  100. for _, volumeLimitKey := range newVolumes {
  101. newVolumeCount[volumeLimitKey]++
  102. }
  103. for volumeLimitKey, count := range newVolumeCount {
  104. maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
  105. if ok {
  106. currentVolumeCount := attachedVolumeCount[volumeLimitKey]
  107. if currentVolumeCount+count > int(maxVolumeLimit) {
  108. return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
  109. }
  110. }
  111. }
  112. return nil
  113. }
  114. func (pl *CSILimits) filterAttachableVolumes(
  115. csiNode *storagev1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error {
  116. for _, vol := range volumes {
  117. // CSI volumes can only be used as persistent volumes
  118. if vol.PersistentVolumeClaim == nil {
  119. continue
  120. }
  121. pvcName := vol.PersistentVolumeClaim.ClaimName
  122. if pvcName == "" {
  123. return fmt.Errorf("PersistentVolumeClaim had no name")
  124. }
  125. pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
  126. if err != nil {
  127. klog.V(5).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
  128. continue
  129. }
  130. driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc)
  131. if driverName == "" || volumeHandle == "" {
  132. klog.V(5).Infof("Could not find a CSI driver name or volume handle, not counting volume")
  133. continue
  134. }
  135. volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
  136. volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
  137. result[volumeUniqueName] = volumeLimitKey
  138. }
  139. return nil
  140. }
  141. // getCSIDriverInfo returns the CSI driver name and volume ID of a given PVC.
  142. // If the PVC is from a migrated in-tree plugin, this function will return
  143. // the information of the CSI driver that the plugin has been migrated to.
  144. func (pl *CSILimits) getCSIDriverInfo(csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
  145. pvName := pvc.Spec.VolumeName
  146. namespace := pvc.Namespace
  147. pvcName := pvc.Name
  148. if pvName == "" {
  149. klog.V(5).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName)
  150. return pl.getCSIDriverInfoFromSC(csiNode, pvc)
  151. }
  152. pv, err := pl.pvLister.Get(pvName)
  153. if err != nil {
  154. klog.V(5).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
  155. // If we can't fetch PV associated with PVC, may be it got deleted
  156. // or PVC was prebound to a PVC that hasn't been created yet.
  157. // fallback to using StorageClass for volume counting
  158. return pl.getCSIDriverInfoFromSC(csiNode, pvc)
  159. }
  160. csiSource := pv.Spec.PersistentVolumeSource.CSI
  161. if csiSource == nil {
  162. // We make a fast path for non-CSI volumes that aren't migratable
  163. if !pl.translator.IsPVMigratable(pv) {
  164. return "", ""
  165. }
  166. pluginName, err := pl.translator.GetInTreePluginNameFromSpec(pv, nil)
  167. if err != nil {
  168. klog.V(5).Infof("Unable to look up plugin name from PV spec: %v", err)
  169. return "", ""
  170. }
  171. if !isCSIMigrationOn(csiNode, pluginName) {
  172. klog.V(5).Infof("CSI Migration of plugin %s is not enabled", pluginName)
  173. return "", ""
  174. }
  175. csiPV, err := pl.translator.TranslateInTreePVToCSI(pv)
  176. if err != nil {
  177. klog.V(5).Infof("Unable to translate in-tree volume to CSI: %v", err)
  178. return "", ""
  179. }
  180. if csiPV.Spec.PersistentVolumeSource.CSI == nil {
  181. klog.V(5).Infof("Unable to get a valid volume source for translated PV %s", pvName)
  182. return "", ""
  183. }
  184. csiSource = csiPV.Spec.PersistentVolumeSource.CSI
  185. }
  186. return csiSource.Driver, csiSource.VolumeHandle
  187. }
  188. // getCSIDriverInfoFromSC returns the CSI driver name and a random volume ID of a given PVC's StorageClass.
  189. func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
  190. namespace := pvc.Namespace
  191. pvcName := pvc.Name
  192. scName := v1helper.GetPersistentVolumeClaimClass(pvc)
  193. // If StorageClass is not set or not found, then PVC must be using immediate binding mode
  194. // and hence it must be bound before scheduling. So it is safe to not count it.
  195. if scName == "" {
  196. klog.V(5).Infof("PVC %s/%s has no StorageClass", namespace, pvcName)
  197. return "", ""
  198. }
  199. storageClass, err := pl.scLister.Get(scName)
  200. if err != nil {
  201. klog.V(5).Infof("Could not get StorageClass for PVC %s/%s: %v", namespace, pvcName, err)
  202. return "", ""
  203. }
  204. // We use random prefix to avoid conflict with volume IDs. If PVC is bound during the execution of the
  205. // predicate and there is another pod on the same node that uses same volume, then we will overcount
  206. // the volume and consider both volumes as different.
  207. volumeHandle := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName)
  208. provisioner := storageClass.Provisioner
  209. if pl.translator.IsMigratableIntreePluginByName(provisioner) {
  210. if !isCSIMigrationOn(csiNode, provisioner) {
  211. klog.V(5).Infof("CSI Migration of plugin %s is not enabled", provisioner)
  212. return "", ""
  213. }
  214. driverName, err := pl.translator.GetCSINameFromInTreeName(provisioner)
  215. if err != nil {
  216. klog.V(5).Infof("Unable to look up driver name from plugin name: %v", err)
  217. return "", ""
  218. }
  219. return driverName, volumeHandle
  220. }
  221. return provisioner, volumeHandle
  222. }
  223. // NewCSI initializes a new plugin and returns it.
  224. func NewCSI(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
  225. informerFactory := handle.SharedInformerFactory()
  226. pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
  227. pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
  228. scLister := informerFactory.Storage().V1().StorageClasses().Lister()
  229. return &CSILimits{
  230. csiNodeLister: getCSINodeListerIfEnabled(informerFactory),
  231. pvLister: pvLister,
  232. pvcLister: pvcLister,
  233. scLister: scLister,
  234. randomVolumeIDPrefix: rand.String(32),
  235. translator: csitrans.New(),
  236. }, nil
  237. }
  238. func getVolumeLimits(nodeInfo *schedulernodeinfo.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
  239. // TODO: stop getting values from Node object in v1.18
  240. nodeVolumeLimits := nodeInfo.VolumeLimits()
  241. if csiNode != nil {
  242. for i := range csiNode.Spec.Drivers {
  243. d := csiNode.Spec.Drivers[i]
  244. if d.Allocatable != nil && d.Allocatable.Count != nil {
  245. // TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18)
  246. k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
  247. nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
  248. }
  249. }
  250. }
  251. return nodeVolumeLimits
  252. }