123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package predicates
- import (
- "fmt"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/rand"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- volumeutil "k8s.io/kubernetes/pkg/volume/util"
- )
- type CSIMaxVolumeLimitChecker struct {
- pvInfo PersistentVolumeInfo
- pvcInfo PersistentVolumeClaimInfo
- scInfo StorageClassInfo
- randomVolumeIDPrefix string
- }
- func NewCSIMaxVolumeLimitPredicate(
- pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, scInfo StorageClassInfo) FitPredicate {
- c := &CSIMaxVolumeLimitChecker{
- pvInfo: pvInfo,
- pvcInfo: pvcInfo,
- scInfo: scInfo,
- randomVolumeIDPrefix: rand.String(32),
- }
- return c.attachableLimitPredicate
- }
- func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
- pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
-
- if !utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
- return true, nil, nil
- }
-
-
- if len(pod.Spec.Volumes) == 0 {
- return true, nil, nil
- }
- nodeVolumeLimits := nodeInfo.VolumeLimits()
-
- if len(nodeVolumeLimits) == 0 {
- return true, nil, nil
- }
-
- newVolumes := make(map[string]string)
- if err := c.filterAttachableVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
- return false, nil, err
- }
- if len(newVolumes) == 0 {
- return true, nil, nil
- }
-
- attachedVolumes := make(map[string]string)
- for _, existingPod := range nodeInfo.Pods() {
- if err := c.filterAttachableVolumes(existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
- return false, nil, err
- }
- }
- newVolumeCount := map[string]int{}
- attachedVolumeCount := map[string]int{}
- for volumeName, volumeLimitKey := range attachedVolumes {
- if _, ok := newVolumes[volumeName]; ok {
- delete(newVolumes, volumeName)
- }
- attachedVolumeCount[volumeLimitKey]++
- }
- for _, volumeLimitKey := range newVolumes {
- newVolumeCount[volumeLimitKey]++
- }
- for volumeLimitKey, count := range newVolumeCount {
- maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
- if ok {
- currentVolumeCount := attachedVolumeCount[volumeLimitKey]
- if currentVolumeCount+count > int(maxVolumeLimit) {
- return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
- }
- }
- }
- return true, nil, nil
- }
- func (c *CSIMaxVolumeLimitChecker) filterAttachableVolumes(
- volumes []v1.Volume, namespace string, result map[string]string) error {
- for _, vol := range volumes {
-
- if vol.PersistentVolumeClaim == nil {
- continue
- }
- pvcName := vol.PersistentVolumeClaim.ClaimName
- if pvcName == "" {
- return fmt.Errorf("PersistentVolumeClaim had no name")
- }
- pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
- if err != nil {
- klog.V(4).Infof("Unable to look up PVC info for %s/%s", namespace, pvcName)
- continue
- }
- driverName, volumeHandle := c.getCSIDriver(pvc)
-
- if driverName == "" || volumeHandle == "" {
- continue
- }
- volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
- result[volumeHandle] = volumeLimitKey
- }
- return nil
- }
- func (c *CSIMaxVolumeLimitChecker) getCSIDriver(pvc *v1.PersistentVolumeClaim) (string, string) {
- pvName := pvc.Spec.VolumeName
- namespace := pvc.Namespace
- pvcName := pvc.Name
- placeHolderCSIDriver := ""
- placeHolderHandle := ""
- if pvName == "" {
- klog.V(5).Infof("Persistent volume had no name for claim %s/%s", namespace, pvcName)
- return c.getDriverNameFromSC(pvc)
- }
- pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
- if err != nil {
- klog.V(4).Infof("Unable to look up PV info for PVC %s/%s and PV %s", namespace, pvcName, pvName)
-
-
-
- return c.getDriverNameFromSC(pvc)
- }
- csiSource := pv.Spec.PersistentVolumeSource.CSI
- if csiSource == nil {
- klog.V(5).Infof("Not considering non-CSI volume %s/%s", namespace, pvcName)
- return placeHolderCSIDriver, placeHolderHandle
- }
- return csiSource.Driver, csiSource.VolumeHandle
- }
- func (c *CSIMaxVolumeLimitChecker) getDriverNameFromSC(pvc *v1.PersistentVolumeClaim) (string, string) {
- namespace := pvc.Namespace
- pvcName := pvc.Name
- scName := pvc.Spec.StorageClassName
- placeHolderCSIDriver := ""
- placeHolderHandle := ""
- if scName == nil {
-
-
- klog.V(5).Infof("pvc %s/%s has no storageClass", namespace, pvcName)
- return placeHolderCSIDriver, placeHolderHandle
- }
- storageClass, err := c.scInfo.GetStorageClassInfo(*scName)
- if err != nil {
- klog.V(5).Infof("no storage %s found for pvc %s/%s", *scName, namespace, pvcName)
- return placeHolderCSIDriver, placeHolderHandle
- }
-
-
-
- volumeHandle := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)
- return storageClass.Provisioner, volumeHandle
- }
|