csi_volume_predicate.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package predicates
  14. import (
  15. "fmt"
  16. "k8s.io/api/core/v1"
  17. "k8s.io/apimachinery/pkg/util/rand"
  18. utilfeature "k8s.io/apiserver/pkg/util/feature"
  19. "k8s.io/klog"
  20. "k8s.io/kubernetes/pkg/features"
  21. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  22. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  23. )
  24. // CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
  25. type CSIMaxVolumeLimitChecker struct {
  26. pvInfo PersistentVolumeInfo
  27. pvcInfo PersistentVolumeClaimInfo
  28. scInfo StorageClassInfo
  29. randomVolumeIDPrefix string
  30. }
  31. // NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes
  32. func NewCSIMaxVolumeLimitPredicate(
  33. pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate {
  34. c := &CSIMaxVolumeLimitChecker{
  35. pvInfo: pvInfo,
  36. pvcInfo: pvcInfo,
  37. scInfo: scInfo,
  38. randomVolumeIDPrefix: rand.String(32),
  39. }
  40. return c.attachableLimitPredicate
  41. }
  42. func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
  43. pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  44. // if feature gate is disable we return
  45. if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
  46. return true, nil, nil
  47. }
  48. // If a pod doesn't have any volume attached to it, the predicate will always be true.
  49. // Thus we make a fast path for it, to avoid unnecessary computations in this case.
  50. if len(pod.Spec.Volumes) == 0 {
  51. return true, nil, nil
  52. }
  53. nodeVolumeLimits := nodeInfo.VolumeLimits()
  54. // if node does not have volume limits this predicate should exit
  55. if len(nodeVolumeLimits) == 0 {
  56. return true, nil, nil
  57. }
  58. // a map of unique volume name/csi volume handle and volume limit key
  59. newVolumes := make(map[string]string)
  60. if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
  61. return false, nil, err
  62. }
  63. if len(newVolumes) == 0 {
  64. return true, nil, nil
  65. }
  66. // a map of unique volume name/csi volume handle and volume limit key
  67. attachedVolumes := make(map[string]string)
  68. for _, existingPod := range nodeInfo.Pods() {
  69. if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
  70. return false, nil, err
  71. }
  72. }
  73. newVolumeCount := map[string]int{}
  74. attachedVolumeCount := map[string]int{}
  75. for volumeName, volumeLimitKey := range attachedVolumes {
  76. if _, ok := newVolumes[volumeName]; ok {
  77. delete(newVolumes, volumeName)
  78. }
  79. attachedVolumeCount[volumeLimitKey]++
  80. }
  81. for _, volumeLimitKey := range newVolumes {
  82. newVolumeCount[volumeLimitKey]++
  83. }
  84. for volumeLimitKey, count := range newVolumeCount {
  85. maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
  86. if ok {
  87. currentVolumeCount := attachedVolumeCount[volumeLimitKey]
  88. if currentVolumeCount+count > int(maxVolumeLimit) {
  89. return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
  90. }
  91. }
  92. }
  93. return true, nil, nil
  94. }
  95. func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
  96. volumes []v1.Volume, namespace string, result map[string]string) error {
  97. for _, vol := range volumes {
  98. // CSI volumes can only be used as persistent volumes
  99. if vol.PersistentVolumeClaim == nil {
  100. continue
  101. }
  102. pvcName := vol.PersistentVolumeClaim.ClaimName
  103. if pvcName == "" {
  104. return fmt.Errorf("PersistentVolumeClaim had no name")
  105. }
  106. pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
  107. if err != nil {
  108. klog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
  109. continue
  110. }
  111. driverName, volumeHandle := c.getCSIDriver(pvc)
  112. // if we can't find driver name or volume handle - we don't count this volume.
  113. if driverName == "" || volumeHandle == "" {
  114. continue
  115. }
  116. volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
  117. result[volumeHandle] = volumeLimitKey
  118. }
  119. return nil
  120. }
  121. func (c *CSIMaxVolumeLimitChecker) getCSIDriver(pvc *v1.PersistentVolumeClaim) (string, string) {
  122. pvName := pvc.Spec.VolumeName
  123. namespace := pvc.Namespace
  124. pvcName := pvc.Name
  125. placeHolderCSIDriver := ""
  126. placeHolderHandle := ""
  127. if pvName == "" {
  128. klog.V(5).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName)
  129. return c.getDriverNameFromSC(pvc)
  130. }
  131. pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
  132. if err != nil {
  133. klog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
  134. // If we can't fetch PV associated with PVC, may be it got deleted
  135. // or PVC was prebound to a PVC that hasn't been created yet.
  136. // fallback to using StorageClass for volume counting
  137. return c.getDriverNameFromSC(pvc)
  138. }
  139. csiSource := pv.Spec.PersistentVolumeSource.CSI
  140. if csiSource == nil {
  141. klog.V(5).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName)
  142. return placeHolderCSIDriver, placeHolderHandle
  143. }
  144. return csiSource.Driver, csiSource.VolumeHandle
  145. }
  146. func (c *CSIMaxVolumeLimitChecker) getDriverNameFromSC(pvc *v1.PersistentVolumeClaim) (string, string) {
  147. namespace := pvc.Namespace
  148. pvcName := pvc.Name
  149. scName := pvc.Spec.StorageClassName
  150. placeHolderCSIDriver := ""
  151. placeHolderHandle := ""
  152. if scName == nil {
  153. // if StorageClass is not set or found, then PVC must be using immediate binding mode
  154. // and hence it must be bound before scheduling. So it is safe to not count it.
  155. klog.V(5).Infof("pvc %s/%s has no storageClass", namespace, pvcName)
  156. return placeHolderCSIDriver, placeHolderHandle
  157. }
  158. storageClass, err := c.scInfo.GetStorageClassInfo(*scName)
  159. if err != nil {
  160. klog.V(5).Infof("no storage %s found for pvc %s/%s", *scName, namespace, pvcName)
  161. return placeHolderCSIDriver, placeHolderHandle
  162. }
  163. // We use random prefix to avoid conflict with volume-ids. If PVC is bound in the middle
  164. // predicate and there is another pod(on same node) that uses same volume then we will overcount
  165. // the volume and consider both volumes as different.
  166. volumeHandle := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)
  167. return storageClass.Provisioner, volumeHandle
  168. }