predicates.go 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package predicates
  14. import (
  15. "errors"
  16. "fmt"
  17. "os"
  18. "regexp"
  19. "strconv"
  20. "k8s.io/klog"
  21. "k8s.io/api/core/v1"
  22. storagev1 "k8s.io/api/storage/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/util/rand"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. storagelisters "k8s.io/client-go/listers/storage/v1"
  32. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  33. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  34. v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
  35. "k8s.io/kubernetes/pkg/features"
  36. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  37. priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
  38. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  39. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  40. schedutil "k8s.io/kubernetes/pkg/scheduler/util"
  41. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  42. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  43. )
  44. const (
  45. // MatchInterPodAffinityPred defines the name of predicate MatchInterPodAffinity.
  46. MatchInterPodAffinityPred = "MatchInterPodAffinity"
  47. // CheckVolumeBindingPred defines the name of predicate CheckVolumeBinding.
  48. CheckVolumeBindingPred = "CheckVolumeBinding"
  49. // CheckNodeConditionPred defines the name of predicate CheckNodeCondition.
  50. CheckNodeConditionPred = "CheckNodeCondition"
  51. // GeneralPred defines the name of predicate GeneralPredicates.
  52. GeneralPred = "GeneralPredicates"
  53. // HostNamePred defines the name of predicate HostName.
  54. HostNamePred = "HostName"
  55. // PodFitsHostPortsPred defines the name of predicate PodFitsHostPorts.
  56. PodFitsHostPortsPred = "PodFitsHostPorts"
  57. // MatchNodeSelectorPred defines the name of predicate MatchNodeSelector.
  58. MatchNodeSelectorPred = "MatchNodeSelector"
  59. // PodFitsResourcesPred defines the name of predicate PodFitsResources.
  60. PodFitsResourcesPred = "PodFitsResources"
  61. // NoDiskConflictPred defines the name of predicate NoDiskConflict.
  62. NoDiskConflictPred = "NoDiskConflict"
  63. // PodToleratesNodeTaintsPred defines the name of predicate PodToleratesNodeTaints.
  64. PodToleratesNodeTaintsPred = "PodToleratesNodeTaints"
  65. // CheckNodeUnschedulablePred defines the name of predicate CheckNodeUnschedulablePredicate.
  66. CheckNodeUnschedulablePred = "CheckNodeUnschedulable"
  67. // PodToleratesNodeNoExecuteTaintsPred defines the name of predicate PodToleratesNodeNoExecuteTaints.
  68. PodToleratesNodeNoExecuteTaintsPred = "PodToleratesNodeNoExecuteTaints"
  69. // CheckNodeLabelPresencePred defines the name of predicate CheckNodeLabelPresence.
  70. CheckNodeLabelPresencePred = "CheckNodeLabelPresence"
  71. // CheckServiceAffinityPred defines the name of predicate checkServiceAffinity.
  72. CheckServiceAffinityPred = "CheckServiceAffinity"
  73. // MaxEBSVolumeCountPred defines the name of predicate MaxEBSVolumeCount.
  74. // DEPRECATED
  75. // All cloudprovider specific predicates are deprecated in favour of MaxCSIVolumeCountPred.
  76. MaxEBSVolumeCountPred = "MaxEBSVolumeCount"
  77. // MaxGCEPDVolumeCountPred defines the name of predicate MaxGCEPDVolumeCount.
  78. // DEPRECATED
  79. // All cloudprovider specific predicates are deprecated in favour of MaxCSIVolumeCountPred.
  80. MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
  81. // MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount.
  82. // DEPRECATED
  83. // All cloudprovider specific predicates are deprecated in favour of MaxCSIVolumeCountPred.
  84. MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
  85. // MaxCinderVolumeCountPred defines the name of predicate MaxCinderDiskVolumeCount.
  86. // DEPRECATED
  87. // All cloudprovider specific predicates are deprecated in favour of MaxCSIVolumeCountPred.
  88. MaxCinderVolumeCountPred = "MaxCinderVolumeCount"
  89. // MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached
  90. MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred"
  91. // NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict.
  92. NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
  93. // CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure.
  94. CheckNodeMemoryPressurePred = "CheckNodeMemoryPressure"
  95. // CheckNodeDiskPressurePred defines the name of predicate CheckNodeDiskPressure.
  96. CheckNodeDiskPressurePred = "CheckNodeDiskPressure"
  97. // CheckNodePIDPressurePred defines the name of predicate CheckNodePIDPressure.
  98. CheckNodePIDPressurePred = "CheckNodePIDPressure"
  99. // DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
  100. // GCE instances can have up to 16 PD volumes attached.
  101. DefaultMaxGCEPDVolumes = 16
  102. // DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure
  103. // Larger Azure VMs can actually have much more disks attached.
  104. // TODO We should determine the max based on VM size
  105. DefaultMaxAzureDiskVolumes = 16
  106. // KubeMaxPDVols defines the maximum number of PD Volumes per kubelet
  107. KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
  108. // EBSVolumeFilterType defines the filter name for EBSVolumeFilter.
  109. EBSVolumeFilterType = "EBS"
  110. // GCEPDVolumeFilterType defines the filter name for GCEPDVolumeFilter.
  111. GCEPDVolumeFilterType = "GCE"
  112. // AzureDiskVolumeFilterType defines the filter name for AzureDiskVolumeFilter.
  113. AzureDiskVolumeFilterType = "AzureDisk"
  114. // CinderVolumeFilterType defines the filter name for CinderVolumeFilter.
  115. CinderVolumeFilterType = "Cinder"
  116. )
  117. // IMPORTANT NOTE for predicate developers:
  118. // We are using cached predicate result for pods belonging to the same equivalence class.
  119. // So when updating an existing predicate, you should consider whether your change will introduce new
  120. // dependency to attributes of any API object like Pod, Node, Service etc.
  121. // If yes, you are expected to invalidate the cached predicate result for related API object change.
  122. // For example:
  123. // https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422
  124. // IMPORTANT NOTE: this list contains the ordering of the predicates, if you develop a new predicate
  125. // it is mandatory to add its name to this list.
  126. // Otherwise it won't be processed, see generic_scheduler#podFitsOnNode().
  127. // The order is based on the restrictiveness & complexity of predicates.
  128. // Design doc: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md
  129. var (
  130. predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
  131. GeneralPred, HostNamePred, PodFitsHostPortsPred,
  132. MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
  133. PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
  134. CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
  135. MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
  136. CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
  137. )
  138. // FitPredicate is a function that indicates if a pod fits into an existing node.
  139. // The failure information is given by the error.
  140. type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error)
  141. // NodeInfo interface represents anything that can get node object from node ID.
  142. type NodeInfo interface {
  143. GetNodeInfo(nodeID string) (*v1.Node, error)
  144. }
  145. // PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID.
  146. type PersistentVolumeInfo interface {
  147. GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error)
  148. }
  149. // CachedPersistentVolumeInfo implements PersistentVolumeInfo
  150. type CachedPersistentVolumeInfo struct {
  151. corelisters.PersistentVolumeLister
  152. }
  153. // Ordering returns the ordering of predicates.
  154. func Ordering() []string {
  155. return predicatesOrdering
  156. }
  157. // GetPersistentVolumeInfo returns a persistent volume object by PV ID.
  158. func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
  159. return c.Get(pvID)
  160. }
  161. // PersistentVolumeClaimInfo interface represents anything that can get a PVC object in
  162. // specified namespace with specified name.
  163. type PersistentVolumeClaimInfo interface {
  164. GetPersistentVolumeClaimInfo(namespace string, name string) (*v1.PersistentVolumeClaim, error)
  165. }
  166. // CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo
  167. type CachedPersistentVolumeClaimInfo struct {
  168. corelisters.PersistentVolumeClaimLister
  169. }
  170. // GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name
  171. func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, name string) (*v1.PersistentVolumeClaim, error) {
  172. return c.PersistentVolumeClaims(namespace).Get(name)
  173. }
  174. // CachedNodeInfo implements NodeInfo
  175. type CachedNodeInfo struct {
  176. corelisters.NodeLister
  177. }
  178. // GetNodeInfo returns cached data for the node 'id'.
  179. func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
  180. node, err := c.Get(id)
  181. if apierrors.IsNotFound(err) {
  182. return nil, err
  183. }
  184. if err != nil {
  185. return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
  186. }
  187. return node, nil
  188. }
  189. // StorageClassInfo interface represents anything that can get a storage class object by class name.
  190. type StorageClassInfo interface {
  191. GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
  192. }
  193. // CachedStorageClassInfo implements StorageClassInfo
  194. type CachedStorageClassInfo struct {
  195. storagelisters.StorageClassLister
  196. }
  197. // GetStorageClassInfo get StorageClass by class name.
  198. func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storagev1.StorageClass, error) {
  199. return c.Get(className)
  200. }
  201. func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
  202. // fast path if there is no conflict checking targets.
  203. if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
  204. return false
  205. }
  206. for _, existingVolume := range pod.Spec.Volumes {
  207. // Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
  208. if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
  209. disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
  210. if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
  211. return true
  212. }
  213. }
  214. if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
  215. if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
  216. return true
  217. }
  218. }
  219. if volume.ISCSI != nil && existingVolume.ISCSI != nil {
  220. iqn := volume.ISCSI.IQN
  221. eiqn := existingVolume.ISCSI.IQN
  222. // two ISCSI volumes are same, if they share the same iqn. As iscsi volumes are of type
  223. // RWO or ROX, we could permit only one RW mount. Same iscsi volume mounted by multiple Pods
  224. // conflict unless all other pods mount as read only.
  225. if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
  226. return true
  227. }
  228. }
  229. if volume.RBD != nil && existingVolume.RBD != nil {
  230. mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
  231. emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
  232. // two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
  233. // only one read-write mount is permitted for the same RBD image.
  234. // same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
  235. if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
  236. return true
  237. }
  238. }
  239. }
  240. return false
  241. }
  242. // NoDiskConflict evaluates if a pod can fit due to the volumes it requests, and those that
  243. // are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
  244. // can't be scheduled there.
  245. // This is GCE, Amazon EBS, and Ceph RBD specific for now:
  246. // - GCE PD allows multiple mounts as long as they're all read-only
  247. // - AWS EBS forbids any two pods mounting the same volume ID
  248. // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
  249. // - ISCSI forbids if any two pods share at least same IQN, LUN and Target
  250. // TODO: migrate this into some per-volume specific code?
  251. func NoDiskConflict(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  252. for _, v := range pod.Spec.Volumes {
  253. for _, ev := range nodeInfo.Pods() {
  254. if isVolumeConflict(v, ev) {
  255. return false, []PredicateFailureReason{ErrDiskConflict}, nil
  256. }
  257. }
  258. }
  259. return true, nil, nil
  260. }
  261. // MaxPDVolumeCountChecker contains information to check the max number of volumes for a predicate.
  262. type MaxPDVolumeCountChecker struct {
  263. filter VolumeFilter
  264. volumeLimitKey v1.ResourceName
  265. maxVolumeFunc func(node *v1.Node) int
  266. pvInfo PersistentVolumeInfo
  267. pvcInfo PersistentVolumeClaimInfo
  268. // The string below is generated randomly during the struct's initialization.
  269. // It is used to prefix volumeID generated inside the predicate() method to
  270. // avoid conflicts with any real volume.
  271. randomVolumeIDPrefix string
  272. }
  273. // VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps
  274. type VolumeFilter struct {
  275. // Filter normal volumes
  276. FilterVolume func(vol *v1.Volume) (id string, relevant bool)
  277. FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
  278. }
  279. // NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
  280. // number of volumes which match a filter that it requests, and those that are already present.
  281. //
  282. // DEPRECATED
  283. // All cloudprovider specific predicates defined here are deprecated in favour of CSI volume limit
  284. // predicate - MaxCSIVolumeCountPred.
  285. //
  286. // The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
  287. // types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
  288. // the maximum.
  289. func NewMaxPDVolumeCountPredicate(
  290. filterName string, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) FitPredicate {
  291. var filter VolumeFilter
  292. var volumeLimitKey v1.ResourceName
  293. switch filterName {
  294. case EBSVolumeFilterType:
  295. filter = EBSVolumeFilter
  296. volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
  297. case GCEPDVolumeFilterType:
  298. filter = GCEPDVolumeFilter
  299. volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
  300. case AzureDiskVolumeFilterType:
  301. filter = AzureDiskVolumeFilter
  302. volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
  303. case CinderVolumeFilterType:
  304. filter = CinderVolumeFilter
  305. volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
  306. default:
  307. klog.Fatalf("Wrong filterName, Only Support %v %v %v ", EBSVolumeFilterType,
  308. GCEPDVolumeFilterType, AzureDiskVolumeFilterType)
  309. return nil
  310. }
  311. c := &MaxPDVolumeCountChecker{
  312. filter: filter,
  313. volumeLimitKey: volumeLimitKey,
  314. maxVolumeFunc: getMaxVolumeFunc(filterName),
  315. pvInfo: pvInfo,
  316. pvcInfo: pvcInfo,
  317. randomVolumeIDPrefix: rand.String(32),
  318. }
  319. return c.predicate
  320. }
  321. func getMaxVolumeFunc(filterName string) func(node *v1.Node) int {
  322. return func(node *v1.Node) int {
  323. maxVolumesFromEnv := getMaxVolLimitFromEnv()
  324. if maxVolumesFromEnv > 0 {
  325. return maxVolumesFromEnv
  326. }
  327. var nodeInstanceType string
  328. for k, v := range node.ObjectMeta.Labels {
  329. if k == v1.LabelInstanceType {
  330. nodeInstanceType = v
  331. }
  332. }
  333. switch filterName {
  334. case EBSVolumeFilterType:
  335. return getMaxEBSVolume(nodeInstanceType)
  336. case GCEPDVolumeFilterType:
  337. return DefaultMaxGCEPDVolumes
  338. case AzureDiskVolumeFilterType:
  339. return DefaultMaxAzureDiskVolumes
  340. case CinderVolumeFilterType:
  341. return volumeutil.DefaultMaxCinderVolumes
  342. default:
  343. return -1
  344. }
  345. }
  346. }
  347. func getMaxEBSVolume(nodeInstanceType string) int {
  348. if ok, _ := regexp.MatchString(volumeutil.EBSNitroLimitRegex, nodeInstanceType); ok {
  349. return volumeutil.DefaultMaxEBSNitroVolumeLimit
  350. }
  351. return volumeutil.DefaultMaxEBSVolumes
  352. }
  353. // getMaxVolLimitFromEnv checks the max PD volumes environment variable, otherwise returning a default value
  354. func getMaxVolLimitFromEnv() int {
  355. if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
  356. if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
  357. klog.Errorf("Unable to parse maximum PD volumes value, using default: %v", err)
  358. } else if parsedMaxVols <= 0 {
  359. klog.Errorf("Maximum PD volumes must be a positive value, using default ")
  360. } else {
  361. return parsedMaxVols
  362. }
  363. }
  364. return -1
  365. }
  366. func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes map[string]bool) error {
  367. for i := range volumes {
  368. vol := &volumes[i]
  369. if id, ok := c.filter.FilterVolume(vol); ok {
  370. filteredVolumes[id] = true
  371. } else if vol.PersistentVolumeClaim != nil {
  372. pvcName := vol.PersistentVolumeClaim.ClaimName
  373. if pvcName == "" {
  374. return fmt.Errorf("PersistentVolumeClaim had no name")
  375. }
  376. // Until we know real ID of the volume use namespace/pvcName as substitute
  377. // with a random prefix (calculated and stored inside 'c' during initialization)
  378. // to avoid conflicts with existing volume IDs.
  379. pvID := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)
  380. pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
  381. if err != nil || pvc == nil {
  382. // if the PVC is not found, log the error and count the PV towards the PV limit
  383. klog.V(4).Infof("Unable to look up PVC info for %s/%s, assuming PVC matches predicate when counting limits: %v", namespace, pvcName, err)
  384. filteredVolumes[pvID] = true
  385. continue
  386. }
  387. pvName := pvc.Spec.VolumeName
  388. if pvName == "" {
  389. // PVC is not bound. It was either deleted and created again or
  390. // it was forcefully unbound by admin. The pod can still use the
  391. // original PV where it was bound to -> log the error and count
  392. // the PV towards the PV limit
  393. klog.V(4).Infof("PVC %s/%s is not bound, assuming PVC matches predicate when counting limits", namespace, pvcName)
  394. filteredVolumes[pvID] = true
  395. continue
  396. }
  397. pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
  398. if err != nil || pv == nil {
  399. // if the PV is not found, log the error
  400. // and count the PV towards the PV limit
  401. klog.V(4).Infof("Unable to look up PV info for %s/%s/%s, assuming PV matches predicate when counting limits: %v", namespace, pvcName, pvName, err)
  402. filteredVolumes[pvID] = true
  403. continue
  404. }
  405. if id, ok := c.filter.FilterPersistentVolume(pv); ok {
  406. filteredVolumes[id] = true
  407. }
  408. }
  409. }
  410. return nil
  411. }
  412. func (c *MaxPDVolumeCountChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  413. // If a pod doesn't have any volume attached to it, the predicate will always be true.
  414. // Thus we make a fast path for it, to avoid unnecessary computations in this case.
  415. if len(pod.Spec.Volumes) == 0 {
  416. return true, nil, nil
  417. }
  418. newVolumes := make(map[string]bool)
  419. if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
  420. return false, nil, err
  421. }
  422. // quick return
  423. if len(newVolumes) == 0 {
  424. return true, nil, nil
  425. }
  426. // count unique volumes
  427. existingVolumes := make(map[string]bool)
  428. for _, existingPod := range nodeInfo.Pods() {
  429. if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
  430. return false, nil, err
  431. }
  432. }
  433. numExistingVolumes := len(existingVolumes)
  434. // filter out already-mounted volumes
  435. for k := range existingVolumes {
  436. if _, ok := newVolumes[k]; ok {
  437. delete(newVolumes, k)
  438. }
  439. }
  440. numNewVolumes := len(newVolumes)
  441. maxAttachLimit := c.maxVolumeFunc(nodeInfo.Node())
  442. if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
  443. volumeLimits := nodeInfo.VolumeLimits()
  444. if maxAttachLimitFromAllocatable, ok := volumeLimits[c.volumeLimitKey]; ok {
  445. maxAttachLimit = int(maxAttachLimitFromAllocatable)
  446. }
  447. }
  448. if numExistingVolumes+numNewVolumes > maxAttachLimit {
  449. // violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
  450. return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
  451. }
  452. if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {
  453. nodeInfo.TransientInfo.TransientLock.Lock()
  454. defer nodeInfo.TransientInfo.TransientLock.Unlock()
  455. nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
  456. nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
  457. }
  458. return true, nil, nil
  459. }
  460. // EBSVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes
  461. var EBSVolumeFilter = VolumeFilter{
  462. FilterVolume: func(vol *v1.Volume) (string, bool) {
  463. if vol.AWSElasticBlockStore != nil {
  464. return vol.AWSElasticBlockStore.VolumeID, true
  465. }
  466. return "", false
  467. },
  468. FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
  469. if pv.Spec.AWSElasticBlockStore != nil {
  470. return pv.Spec.AWSElasticBlockStore.VolumeID, true
  471. }
  472. return "", false
  473. },
  474. }
  475. // GCEPDVolumeFilter is a VolumeFilter for filtering GCE PersistentDisk Volumes
  476. var GCEPDVolumeFilter = VolumeFilter{
  477. FilterVolume: func(vol *v1.Volume) (string, bool) {
  478. if vol.GCEPersistentDisk != nil {
  479. return vol.GCEPersistentDisk.PDName, true
  480. }
  481. return "", false
  482. },
  483. FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
  484. if pv.Spec.GCEPersistentDisk != nil {
  485. return pv.Spec.GCEPersistentDisk.PDName, true
  486. }
  487. return "", false
  488. },
  489. }
  490. // AzureDiskVolumeFilter is a VolumeFilter for filtering Azure Disk Volumes
  491. var AzureDiskVolumeFilter = VolumeFilter{
  492. FilterVolume: func(vol *v1.Volume) (string, bool) {
  493. if vol.AzureDisk != nil {
  494. return vol.AzureDisk.DiskName, true
  495. }
  496. return "", false
  497. },
  498. FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
  499. if pv.Spec.AzureDisk != nil {
  500. return pv.Spec.AzureDisk.DiskName, true
  501. }
  502. return "", false
  503. },
  504. }
  505. // CinderVolumeFilter is a VolumeFilter for filtering Cinder Volumes
  506. // It will be deprecated once Openstack cloudprovider has been removed from in-tree.
  507. var CinderVolumeFilter = VolumeFilter{
  508. FilterVolume: func(vol *v1.Volume) (string, bool) {
  509. if vol.Cinder != nil {
  510. return vol.Cinder.VolumeID, true
  511. }
  512. return "", false
  513. },
  514. FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
  515. if pv.Spec.Cinder != nil {
  516. return pv.Spec.Cinder.VolumeID, true
  517. }
  518. return "", false
  519. },
  520. }
  521. // VolumeZoneChecker contains information to check the volume zone for a predicate.
  522. type VolumeZoneChecker struct {
  523. pvInfo PersistentVolumeInfo
  524. pvcInfo PersistentVolumeClaimInfo
  525. classInfo StorageClassInfo
  526. }
  527. // NewVolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given
  528. // that some volumes may have zone scheduling constraints. The requirement is that any
  529. // volume zone-labels must match the equivalent zone-labels on the node. It is OK for
  530. // the node to have more zone-label constraints (for example, a hypothetical replicated
  531. // volume might allow region-wide access)
  532. //
  533. // Currently this is only supported with PersistentVolumeClaims, and looks to the labels
  534. // only on the bound PersistentVolume.
  535. //
  536. // Working with volumes declared inline in the pod specification (i.e. not
  537. // using a PersistentVolume) is likely to be harder, as it would require
  538. // determining the zone of a volume during scheduling, and that is likely to
  539. // require calling out to the cloud provider. It seems that we are moving away
  540. // from inline volume declarations anyway.
  541. func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, classInfo StorageClassInfo) FitPredicate {
  542. c := &VolumeZoneChecker{
  543. pvInfo: pvInfo,
  544. pvcInfo: pvcInfo,
  545. classInfo: classInfo,
  546. }
  547. return c.predicate
  548. }
  549. func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  550. // If a pod doesn't have any volume attached to it, the predicate will always be true.
  551. // Thus we make a fast path for it, to avoid unnecessary computations in this case.
  552. if len(pod.Spec.Volumes) == 0 {
  553. return true, nil, nil
  554. }
  555. node := nodeInfo.Node()
  556. if node == nil {
  557. return false, nil, fmt.Errorf("node not found")
  558. }
  559. nodeConstraints := make(map[string]string)
  560. for k, v := range node.ObjectMeta.Labels {
  561. if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {
  562. continue
  563. }
  564. nodeConstraints[k] = v
  565. }
  566. if len(nodeConstraints) == 0 {
  567. // The node has no zone constraints, so we're OK to schedule.
  568. // In practice, when using zones, all nodes must be labeled with zone labels.
  569. // We want to fast-path this case though.
  570. return true, nil, nil
  571. }
  572. namespace := pod.Namespace
  573. manifest := &(pod.Spec)
  574. for i := range manifest.Volumes {
  575. volume := &manifest.Volumes[i]
  576. if volume.PersistentVolumeClaim != nil {
  577. pvcName := volume.PersistentVolumeClaim.ClaimName
  578. if pvcName == "" {
  579. return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
  580. }
  581. pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
  582. if err != nil {
  583. return false, nil, err
  584. }
  585. if pvc == nil {
  586. return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
  587. }
  588. pvName := pvc.Spec.VolumeName
  589. if pvName == "" {
  590. scName := v1helper.GetPersistentVolumeClaimClass(pvc)
  591. if len(scName) > 0 {
  592. class, _ := c.classInfo.GetStorageClassInfo(scName)
  593. if class != nil {
  594. if class.VolumeBindingMode == nil {
  595. return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)
  596. }
  597. if *class.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
  598. // Skip unbound volumes
  599. continue
  600. }
  601. }
  602. }
  603. return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
  604. }
  605. pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
  606. if err != nil {
  607. return false, nil, err
  608. }
  609. if pv == nil {
  610. return false, nil, fmt.Errorf("PersistentVolume was not found: %q", pvName)
  611. }
  612. for k, v := range pv.ObjectMeta.Labels {
  613. if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {
  614. continue
  615. }
  616. nodeV, _ := nodeConstraints[k]
  617. volumeVSet, err := volumehelpers.LabelZonesToSet(v)
  618. if err != nil {
  619. klog.Warningf("Failed to parse label for %q: %q. Ignoring the label. err=%v. ", k, v, err)
  620. continue
  621. }
  622. if !volumeVSet.Has(nodeV) {
  623. klog.V(10).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, node.Name, pvName, k)
  624. return false, []PredicateFailureReason{ErrVolumeZoneConflict}, nil
  625. }
  626. }
  627. }
  628. }
  629. return true, nil, nil
  630. }
  631. // GetResourceRequest returns a *schedulernodeinfo.Resource that covers the largest
  632. // width in each resource dimension. Because init-containers run sequentially, we collect
  633. // the max in each dimension iteratively. In contrast, we sum the resource vectors for
  634. // regular containers since they run simultaneously.
  635. //
  636. // Example:
  637. //
  638. // Pod:
  639. // InitContainers
  640. // IC1:
  641. // CPU: 2
  642. // Memory: 1G
  643. // IC2:
  644. // CPU: 2
  645. // Memory: 3G
  646. // Containers
  647. // C1:
  648. // CPU: 2
  649. // Memory: 1G
  650. // C2:
  651. // CPU: 1
  652. // Memory: 1G
  653. //
  654. // Result: CPU: 3, Memory: 3G
  655. func GetResourceRequest(pod *v1.Pod) *schedulernodeinfo.Resource {
  656. result := &schedulernodeinfo.Resource{}
  657. for _, container := range pod.Spec.Containers {
  658. result.Add(container.Resources.Requests)
  659. }
  660. // take max_resource(sum_pod, any_init_container)
  661. for _, container := range pod.Spec.InitContainers {
  662. result.SetMaxResource(container.Resources.Requests)
  663. }
  664. return result
  665. }
  666. func podName(pod *v1.Pod) string {
  667. return pod.Namespace + "/" + pod.Name
  668. }
  669. // PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
  670. // First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
  671. // predicate failure reasons if the node has insufficient resources to run the pod.
  672. func PodFitsResources(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  673. node := nodeInfo.Node()
  674. if node == nil {
  675. return false, nil, fmt.Errorf("node not found")
  676. }
  677. var predicateFails []PredicateFailureReason
  678. allowedPodNumber := nodeInfo.AllowedPodNumber()
  679. if len(nodeInfo.Pods())+1 > allowedPodNumber {
  680. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
  681. }
  682. // No extended resources should be ignored by default.
  683. ignoredExtendedResources := sets.NewString()
  684. var podRequest *schedulernodeinfo.Resource
  685. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  686. podRequest = predicateMeta.podRequest
  687. if predicateMeta.ignoredExtendedResources != nil {
  688. ignoredExtendedResources = predicateMeta.ignoredExtendedResources
  689. }
  690. } else {
  691. // We couldn't parse metadata - fallback to computing it.
  692. podRequest = GetResourceRequest(pod)
  693. }
  694. if podRequest.MilliCPU == 0 &&
  695. podRequest.Memory == 0 &&
  696. podRequest.EphemeralStorage == 0 &&
  697. len(podRequest.ScalarResources) == 0 {
  698. return len(predicateFails) == 0, predicateFails, nil
  699. }
  700. allocatable := nodeInfo.AllocatableResource()
  701. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
  702. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
  703. }
  704. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
  705. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
  706. }
  707. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
  708. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
  709. }
  710. for rName, rQuant := range podRequest.ScalarResources {
  711. if v1helper.IsExtendedResourceName(rName) {
  712. // If this resource is one of the extended resources that should be
  713. // ignored, we will skip checking it.
  714. if ignoredExtendedResources.Has(string(rName)) {
  715. continue
  716. }
  717. }
  718. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
  719. predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
  720. }
  721. }
  722. if klog.V(10) {
  723. if len(predicateFails) == 0 {
  724. // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
  725. // not logged. There is visible performance gain from it.
  726. klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
  727. podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
  728. }
  729. }
  730. return len(predicateFails) == 0, predicateFails, nil
  731. }
  732. // nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms,
  733. // terms are ORed, and an empty list of terms will match nothing.
  734. func nodeMatchesNodeSelectorTerms(node *v1.Node, nodeSelectorTerms []v1.NodeSelectorTerm) bool {
  735. nodeFields := map[string]string{}
  736. for k, f := range algorithm.NodeFieldSelectorKeys {
  737. nodeFields[k] = f(node)
  738. }
  739. return v1helper.MatchNodeSelectorTerms(nodeSelectorTerms, labels.Set(node.Labels), fields.Set(nodeFields))
  740. }
  741. // podMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to
  742. // the requirements in both NodeAffinity and nodeSelector.
  743. func podMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
  744. // Check if node.Labels match pod.Spec.NodeSelector.
  745. if len(pod.Spec.NodeSelector) > 0 {
  746. selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
  747. if !selector.Matches(labels.Set(node.Labels)) {
  748. return false
  749. }
  750. }
  751. // 1. nil NodeSelector matches all nodes (i.e. does not filter out any nodes)
  752. // 2. nil []NodeSelectorTerm (equivalent to non-nil empty NodeSelector) matches no nodes
  753. // 3. zero-length non-nil []NodeSelectorTerm matches no nodes also, just for simplicity
  754. // 4. nil []NodeSelectorRequirement (equivalent to non-nil empty NodeSelectorTerm) matches no nodes
  755. // 5. zero-length non-nil []NodeSelectorRequirement matches no nodes also, just for simplicity
  756. // 6. non-nil empty NodeSelectorRequirement is not allowed
  757. nodeAffinityMatches := true
  758. affinity := pod.Spec.Affinity
  759. if affinity != nil && affinity.NodeAffinity != nil {
  760. nodeAffinity := affinity.NodeAffinity
  761. // if no required NodeAffinity requirements, will do no-op, means select all nodes.
  762. // TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution.
  763. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
  764. // if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
  765. return true
  766. }
  767. // Match node selector for requiredDuringSchedulingRequiredDuringExecution.
  768. // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
  769. // if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil {
  770. // nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms
  771. // klog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms)
  772. // nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
  773. // }
  774. // Match node selector for requiredDuringSchedulingIgnoredDuringExecution.
  775. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
  776. nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
  777. klog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", nodeSelectorTerms)
  778. nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
  779. }
  780. }
  781. return nodeAffinityMatches
  782. }
  783. // PodMatchNodeSelector checks if a pod node selector matches the node label.
  784. func PodMatchNodeSelector(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  785. node := nodeInfo.Node()
  786. if node == nil {
  787. return false, nil, fmt.Errorf("node not found")
  788. }
  789. if podMatchesNodeSelectorAndAffinityTerms(pod, node) {
  790. return true, nil, nil
  791. }
  792. return false, []PredicateFailureReason{ErrNodeSelectorNotMatch}, nil
  793. }
  794. // PodFitsHost checks if a pod spec node name matches the current node.
  795. func PodFitsHost(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  796. if len(pod.Spec.NodeName) == 0 {
  797. return true, nil, nil
  798. }
  799. node := nodeInfo.Node()
  800. if node == nil {
  801. return false, nil, fmt.Errorf("node not found")
  802. }
  803. if pod.Spec.NodeName == node.Name {
  804. return true, nil, nil
  805. }
  806. return false, []PredicateFailureReason{ErrPodNotMatchHostName}, nil
  807. }
  808. // NodeLabelChecker contains information to check node labels for a predicate.
  809. type NodeLabelChecker struct {
  810. labels []string
  811. presence bool
  812. }
  813. // NewNodeLabelPredicate creates a predicate which evaluates whether a pod can fit based on the
  814. // node labels which match a filter that it requests.
  815. func NewNodeLabelPredicate(labels []string, presence bool) FitPredicate {
  816. labelChecker := &NodeLabelChecker{
  817. labels: labels,
  818. presence: presence,
  819. }
  820. return labelChecker.CheckNodeLabelPresence
  821. }
  822. // CheckNodeLabelPresence checks whether all of the specified labels exists on a node or not, regardless of their value
  823. // If "presence" is false, then returns false if any of the requested labels matches any of the node's labels,
  824. // otherwise returns true.
  825. // If "presence" is true, then returns false if any of the requested labels does not match any of the node's labels,
  826. // otherwise returns true.
  827. //
  828. // Consider the cases where the nodes are placed in regions/zones/racks and these are identified by labels
  829. // In some cases, it is required that only nodes that are part of ANY of the defined regions/zones/racks be selected
  830. //
  831. // Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
  832. // A node may have a label with "retiring" as key and the date as the value
  833. // and it may be desirable to avoid scheduling new pods on this node
  834. func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  835. node := nodeInfo.Node()
  836. if node == nil {
  837. return false, nil, fmt.Errorf("node not found")
  838. }
  839. var exists bool
  840. nodeLabels := labels.Set(node.Labels)
  841. for _, label := range n.labels {
  842. exists = nodeLabels.Has(label)
  843. if (exists && !n.presence) || (!exists && n.presence) {
  844. return false, []PredicateFailureReason{ErrNodeLabelPresenceViolated}, nil
  845. }
  846. }
  847. return true, nil, nil
  848. }
  849. // ServiceAffinity defines a struct used for creating service affinity predicates.
  850. type ServiceAffinity struct {
  851. podLister algorithm.PodLister
  852. serviceLister algorithm.ServiceLister
  853. nodeInfo NodeInfo
  854. labels []string
  855. }
  856. // serviceAffinityMetadataProducer should be run once by the scheduler before looping through the Predicate. It is a helper function that
  857. // only should be referenced by NewServiceAffinityPredicate.
  858. func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata) {
  859. if pm.pod == nil {
  860. klog.Errorf("Cannot precompute service affinity, a pod is required to calculate service affinity.")
  861. return
  862. }
  863. pm.serviceAffinityInUse = true
  864. var err error
  865. // Store services which match the pod.
  866. pm.serviceAffinityMatchingPodServices, err = s.serviceLister.GetPodServices(pm.pod)
  867. if err != nil {
  868. klog.Errorf("Error precomputing service affinity: could not list services: %v", err)
  869. }
  870. selector := CreateSelectorFromLabels(pm.pod.Labels)
  871. allMatches, err := s.podLister.List(selector)
  872. if err != nil {
  873. klog.Errorf("Error precomputing service affinity: could not list pods: %v", err)
  874. }
  875. // consider only the pods that belong to the same namespace
  876. pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
  877. }
  878. // NewServiceAffinityPredicate creates a ServiceAffinity.
  879. func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (FitPredicate, predicateMetadataProducer) {
  880. affinity := &ServiceAffinity{
  881. podLister: podLister,
  882. serviceLister: serviceLister,
  883. nodeInfo: nodeInfo,
  884. labels: labels,
  885. }
  886. return affinity.checkServiceAffinity, affinity.serviceAffinityMetadataProducer
  887. }
  888. // checkServiceAffinity is a predicate which matches nodes in such a way to force that
  889. // ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
  890. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
  891. // the same service are running on nodes with the exact same ServiceAffinity.label values).
  892. //
  893. // For example:
  894. // If the first pod of a service was scheduled to a node with label "region=foo",
  895. // all the other subsequent pods belong to the same service will be schedule on
  896. // nodes with the same "region=foo" label.
  897. //
  898. // Details:
  899. //
  900. // If (the svc affinity labels are not a subset of pod's label selectors )
  901. // The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate
  902. // the match.
  903. // Otherwise:
  904. // Create an "implicit selector" which guarantees pods will land on nodes with similar values
  905. // for the affinity labels.
  906. //
  907. // To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
  908. // These backfilled labels in the selector "L" are defined like so:
  909. // - L is a label that the ServiceAffinity object needs as a matching constraint.
  910. // - L is not defined in the pod itself already.
  911. // - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
  912. //
  913. // WARNING: This Predicate is NOT guaranteed to work if some of the predicateMetadata data isn't precomputed...
  914. // For that reason it is not exported, i.e. it is highly coupled to the implementation of the FitPredicate construction.
  915. func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  916. var services []*v1.Service
  917. var pods []*v1.Pod
  918. if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) {
  919. services = pm.serviceAffinityMatchingPodServices
  920. pods = pm.serviceAffinityMatchingPodList
  921. } else {
  922. // Make the predicate resilient in case metadata is missing.
  923. pm = &predicateMetadata{pod: pod}
  924. s.serviceAffinityMetadataProducer(pm)
  925. pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
  926. }
  927. filteredPods := nodeInfo.FilterOutPods(pods)
  928. node := nodeInfo.Node()
  929. if node == nil {
  930. return false, nil, fmt.Errorf("node not found")
  931. }
  932. // check if the pod being scheduled has the affinity labels specified in its NodeSelector
  933. affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
  934. // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
  935. if len(s.labels) > len(affinityLabels) {
  936. if len(services) > 0 {
  937. if len(filteredPods) > 0 {
  938. nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(filteredPods[0].Spec.NodeName)
  939. if err != nil {
  940. return false, nil, err
  941. }
  942. AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels))
  943. }
  944. }
  945. }
  946. // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
  947. if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
  948. return true, nil, nil
  949. }
  950. return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil
  951. }
  952. // PodFitsHostPorts checks if a node has free ports for the requested pod ports.
  953. func PodFitsHostPorts(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  954. var wantPorts []*v1.ContainerPort
  955. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  956. wantPorts = predicateMeta.podPorts
  957. } else {
  958. // We couldn't parse metadata - fallback to computing it.
  959. wantPorts = schedutil.GetContainerPorts(pod)
  960. }
  961. if len(wantPorts) == 0 {
  962. return true, nil, nil
  963. }
  964. existingPorts := nodeInfo.UsedPorts()
  965. // try to see whether existingPorts and wantPorts will conflict or not
  966. if portsConflict(existingPorts, wantPorts) {
  967. return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
  968. }
  969. return true, nil, nil
  970. }
  971. // search two arrays and return true if they have at least one common element; return false otherwise
  972. func haveOverlap(a1, a2 []string) bool {
  973. if len(a1) > len(a2) {
  974. a1, a2 = a2, a1
  975. }
  976. m := map[string]bool{}
  977. for _, val := range a1 {
  978. m[val] = true
  979. }
  980. for _, val := range a2 {
  981. if _, ok := m[val]; ok {
  982. return true
  983. }
  984. }
  985. return false
  986. }
  987. // GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
  988. // that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
  989. func GeneralPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  990. var predicateFails []PredicateFailureReason
  991. fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo)
  992. if err != nil {
  993. return false, predicateFails, err
  994. }
  995. if !fit {
  996. predicateFails = append(predicateFails, reasons...)
  997. }
  998. fit, reasons, err = EssentialPredicates(pod, meta, nodeInfo)
  999. if err != nil {
  1000. return false, predicateFails, err
  1001. }
  1002. if !fit {
  1003. predicateFails = append(predicateFails, reasons...)
  1004. }
  1005. return len(predicateFails) == 0, predicateFails, nil
  1006. }
  1007. // noncriticalPredicates are the predicates that only non-critical pods need
  1008. func noncriticalPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1009. var predicateFails []PredicateFailureReason
  1010. fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
  1011. if err != nil {
  1012. return false, predicateFails, err
  1013. }
  1014. if !fit {
  1015. predicateFails = append(predicateFails, reasons...)
  1016. }
  1017. return len(predicateFails) == 0, predicateFails, nil
  1018. }
  1019. // EssentialPredicates are the predicates that all pods, including critical pods, need
  1020. func EssentialPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1021. var predicateFails []PredicateFailureReason
  1022. fit, reasons, err := PodFitsHost(pod, meta, nodeInfo)
  1023. if err != nil {
  1024. return false, predicateFails, err
  1025. }
  1026. if !fit {
  1027. predicateFails = append(predicateFails, reasons...)
  1028. }
  1029. // TODO: PodFitsHostPorts is essential for now, but kubelet should ideally
  1030. // preempt pods to free up host ports too
  1031. fit, reasons, err = PodFitsHostPorts(pod, meta, nodeInfo)
  1032. if err != nil {
  1033. return false, predicateFails, err
  1034. }
  1035. if !fit {
  1036. predicateFails = append(predicateFails, reasons...)
  1037. }
  1038. fit, reasons, err = PodMatchNodeSelector(pod, meta, nodeInfo)
  1039. if err != nil {
  1040. return false, predicateFails, err
  1041. }
  1042. if !fit {
  1043. predicateFails = append(predicateFails, reasons...)
  1044. }
  1045. return len(predicateFails) == 0, predicateFails, nil
  1046. }
  1047. // PodAffinityChecker contains information to check pod affinity.
  1048. type PodAffinityChecker struct {
  1049. info NodeInfo
  1050. podLister algorithm.PodLister
  1051. }
  1052. // NewPodAffinityPredicate creates a PodAffinityChecker.
  1053. func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister) FitPredicate {
  1054. checker := &PodAffinityChecker{
  1055. info: info,
  1056. podLister: podLister,
  1057. }
  1058. return checker.InterPodAffinityMatches
  1059. }
  1060. // InterPodAffinityMatches checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
  1061. // First return value indicates whether a pod can be scheduled on the specified node while the second return value indicates the
  1062. // predicate failure reasons if the pod cannot be scheduled on the specified node.
  1063. func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1064. node := nodeInfo.Node()
  1065. if node == nil {
  1066. return false, nil, fmt.Errorf("node not found")
  1067. }
  1068. if failedPredicates, error := c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo); failedPredicates != nil {
  1069. failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
  1070. return false, failedPredicates, error
  1071. }
  1072. // Now check if <pod> requirements will be satisfied on this node.
  1073. affinity := pod.Spec.Affinity
  1074. if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
  1075. return true, nil, nil
  1076. }
  1077. if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {
  1078. failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
  1079. return false, failedPredicates, error
  1080. }
  1081. if klog.V(10) {
  1082. // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
  1083. // not logged. There is visible performance gain from it.
  1084. klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
  1085. podName(pod), node.Name)
  1086. }
  1087. return true, nil, nil
  1088. }
  1089. // podMatchesPodAffinityTerms checks if the "targetPod" matches the given "terms"
  1090. // of the "pod" on the given "nodeInfo".Node(). It returns three values: 1) whether
  1091. // targetPod matches all the terms and their topologies, 2) whether targetPod
  1092. // matches all the terms label selector and namespaces (AKA term properties),
  1093. // 3) any error.
  1094. func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) (bool, bool, error) {
  1095. if len(terms) == 0 {
  1096. return false, false, fmt.Errorf("terms array is empty")
  1097. }
  1098. props, err := getAffinityTermProperties(pod, terms)
  1099. if err != nil {
  1100. return false, false, err
  1101. }
  1102. if !podMatchesAllAffinityTermProperties(targetPod, props) {
  1103. return false, false, nil
  1104. }
  1105. // Namespace and selector of the terms have matched. Now we check topology of the terms.
  1106. targetPodNode, err := c.info.GetNodeInfo(targetPod.Spec.NodeName)
  1107. if err != nil {
  1108. return false, false, err
  1109. }
  1110. for _, term := range terms {
  1111. if len(term.TopologyKey) == 0 {
  1112. return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
  1113. }
  1114. if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNode, term.TopologyKey) {
  1115. return false, true, nil
  1116. }
  1117. }
  1118. return true, true, nil
  1119. }
  1120. // GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
  1121. func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
  1122. if podAffinity != nil {
  1123. if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
  1124. terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
  1125. }
  1126. // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
  1127. //if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
  1128. // terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
  1129. //}
  1130. }
  1131. return terms
  1132. }
  1133. // GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity.
  1134. func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
  1135. if podAntiAffinity != nil {
  1136. if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
  1137. terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
  1138. }
  1139. // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
  1140. //if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
  1141. // terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
  1142. //}
  1143. }
  1144. return terms
  1145. }
  1146. // getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
  1147. // (1) Whether it has PodAntiAffinity
  1148. // (2) Whether ANY AffinityTerm matches the incoming pod
  1149. func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (*topologyPairsMaps, error) {
  1150. affinity := existingPod.Spec.Affinity
  1151. if affinity == nil || affinity.PodAntiAffinity == nil {
  1152. return nil, nil
  1153. }
  1154. topologyMaps := newTopologyPairsMaps()
  1155. for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
  1156. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  1157. if err != nil {
  1158. return nil, err
  1159. }
  1160. namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
  1161. if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
  1162. if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
  1163. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  1164. topologyMaps.addTopologyPair(pair, existingPod)
  1165. }
  1166. }
  1167. }
  1168. return topologyMaps, nil
  1169. }
  1170. func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (*topologyPairsMaps, error) {
  1171. topologyMaps := newTopologyPairsMaps()
  1172. for _, existingPod := range existingPods {
  1173. existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
  1174. if err != nil {
  1175. if apierrors.IsNotFound(err) {
  1176. klog.Errorf("Pod %s has NodeName %q but node is not found",
  1177. podName(existingPod), existingPod.Spec.NodeName)
  1178. continue
  1179. }
  1180. return nil, err
  1181. }
  1182. existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNode)
  1183. if err != nil {
  1184. return nil, err
  1185. }
  1186. topologyMaps.appendMaps(existingPodTopologyMaps)
  1187. }
  1188. return topologyMaps, nil
  1189. }
  1190. // Checks if scheduling the pod onto this node would break any anti-affinity
  1191. // terms indicated by the existing pods.
  1192. func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (PredicateFailureReason, error) {
  1193. node := nodeInfo.Node()
  1194. if node == nil {
  1195. return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
  1196. }
  1197. var topologyMaps *topologyPairsMaps
  1198. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  1199. topologyMaps = predicateMeta.topologyPairsAntiAffinityPodsMap
  1200. } else {
  1201. // Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
  1202. // present in nodeInfo. Pods on other nodes pass the filter.
  1203. filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
  1204. if err != nil {
  1205. errMessage := fmt.Sprintf("Failed to get all pods: %v", err)
  1206. klog.Error(errMessage)
  1207. return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
  1208. }
  1209. if topologyMaps, err = c.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
  1210. errMessage := fmt.Sprintf("Failed to get all terms that match pod %s: %v", podName(pod), err)
  1211. klog.Error(errMessage)
  1212. return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
  1213. }
  1214. }
  1215. // Iterate over topology pairs to get any of the pods being affected by
  1216. // the scheduled pod anti-affinity terms
  1217. for topologyKey, topologyValue := range node.Labels {
  1218. if topologyMaps.topologyPairToPods[topologyPair{key: topologyKey, value: topologyValue}] != nil {
  1219. klog.V(10).Infof("Cannot schedule pod %+v onto node %v", podName(pod), node.Name)
  1220. return ErrExistingPodsAntiAffinityRulesNotMatch, nil
  1221. }
  1222. }
  1223. if klog.V(10) {
  1224. // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
  1225. // not logged. There is visible performance gain from it.
  1226. klog.Infof("Schedule Pod %+v on Node %+v is allowed, existing pods anti-affinity terms satisfied.",
  1227. podName(pod), node.Name)
  1228. }
  1229. return nil, nil
  1230. }
  1231. // nodeMatchesAllTopologyTerms checks whether "nodeInfo" matches
  1232. // topology of all the "terms" for the given "pod".
  1233. func (c *PodAffinityChecker) nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs *topologyPairsMaps, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
  1234. node := nodeInfo.Node()
  1235. for _, term := range terms {
  1236. if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
  1237. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  1238. if _, ok := topologyPairs.topologyPairToPods[pair]; !ok {
  1239. return false
  1240. }
  1241. } else {
  1242. return false
  1243. }
  1244. }
  1245. return true
  1246. }
  1247. // nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches
  1248. // topology of any "term" for the given "pod".
  1249. func (c *PodAffinityChecker) nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs *topologyPairsMaps, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
  1250. node := nodeInfo.Node()
  1251. for _, term := range terms {
  1252. if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
  1253. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  1254. if _, ok := topologyPairs.topologyPairToPods[pair]; ok {
  1255. return true
  1256. }
  1257. }
  1258. }
  1259. return false
  1260. }
  1261. // Checks if scheduling the pod onto this node would break any term of this pod.
  1262. func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
  1263. meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo,
  1264. affinity *v1.Affinity) (PredicateFailureReason, error) {
  1265. node := nodeInfo.Node()
  1266. if node == nil {
  1267. return ErrPodAffinityRulesNotMatch, fmt.Errorf("Node is nil")
  1268. }
  1269. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  1270. // Check all affinity terms.
  1271. topologyPairsPotentialAffinityPods := predicateMeta.topologyPairsPotentialAffinityPods
  1272. if affinityTerms := GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
  1273. matchExists := c.nodeMatchesAllTopologyTerms(pod, topologyPairsPotentialAffinityPods, nodeInfo, affinityTerms)
  1274. if !matchExists {
  1275. // This pod may the first pod in a series that have affinity to themselves. In order
  1276. // to not leave such pods in pending state forever, we check that if no other pod
  1277. // in the cluster matches the namespace and selector of this pod and the pod matches
  1278. // its own terms, then we allow the pod to pass the affinity check.
  1279. if !(len(topologyPairsPotentialAffinityPods.topologyPairToPods) == 0 && targetPodMatchesAffinityOfPod(pod, pod)) {
  1280. klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
  1281. podName(pod), node.Name)
  1282. return ErrPodAffinityRulesNotMatch, nil
  1283. }
  1284. }
  1285. }
  1286. // Check all anti-affinity terms.
  1287. topologyPairsPotentialAntiAffinityPods := predicateMeta.topologyPairsPotentialAntiAffinityPods
  1288. if antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
  1289. matchExists := c.nodeMatchesAnyTopologyTerm(pod, topologyPairsPotentialAntiAffinityPods, nodeInfo, antiAffinityTerms)
  1290. if matchExists {
  1291. klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinity",
  1292. podName(pod), node.Name)
  1293. return ErrPodAntiAffinityRulesNotMatch, nil
  1294. }
  1295. }
  1296. } else { // We don't have precomputed metadata. We have to follow a slow path to check affinity terms.
  1297. filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
  1298. if err != nil {
  1299. return ErrPodAffinityRulesNotMatch, err
  1300. }
  1301. affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
  1302. antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
  1303. matchFound, termsSelectorMatchFound := false, false
  1304. for _, targetPod := range filteredPods {
  1305. // Check all affinity terms.
  1306. if !matchFound && len(affinityTerms) > 0 {
  1307. affTermsMatch, termsSelectorMatch, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, affinityTerms)
  1308. if err != nil {
  1309. errMessage := fmt.Sprintf("Cannot schedule pod %s onto node %s, because of PodAffinity: %v", podName(pod), node.Name, err)
  1310. klog.Error(errMessage)
  1311. return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
  1312. }
  1313. if termsSelectorMatch {
  1314. termsSelectorMatchFound = true
  1315. }
  1316. if affTermsMatch {
  1317. matchFound = true
  1318. }
  1319. }
  1320. // Check all anti-affinity terms.
  1321. if len(antiAffinityTerms) > 0 {
  1322. antiAffTermsMatch, _, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, antiAffinityTerms)
  1323. if err != nil || antiAffTermsMatch {
  1324. klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm, err: %v",
  1325. podName(pod), node.Name, err)
  1326. return ErrPodAntiAffinityRulesNotMatch, nil
  1327. }
  1328. }
  1329. }
  1330. if !matchFound && len(affinityTerms) > 0 {
  1331. // We have not been able to find any matches for the pod's affinity terms.
  1332. // This pod may be the first pod in a series that have affinity to themselves. In order
  1333. // to not leave such pods in pending state forever, we check that if no other pod
  1334. // in the cluster matches the namespace and selector of this pod and the pod matches
  1335. // its own terms, then we allow the pod to pass the affinity check.
  1336. if termsSelectorMatchFound {
  1337. klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
  1338. podName(pod), node.Name)
  1339. return ErrPodAffinityRulesNotMatch, nil
  1340. }
  1341. // Check if pod matches its own affinity properties (namespace and label selector).
  1342. if !targetPodMatchesAffinityOfPod(pod, pod) {
  1343. klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
  1344. podName(pod), node.Name)
  1345. return ErrPodAffinityRulesNotMatch, nil
  1346. }
  1347. }
  1348. }
  1349. if klog.V(10) {
  1350. // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
  1351. // not logged. There is visible performance gain from it.
  1352. klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod affinity/anti-affinity constraints satisfied.",
  1353. podName(pod), node.Name)
  1354. }
  1355. return nil, nil
  1356. }
  1357. // CheckNodeUnschedulablePredicate checks if a pod can be scheduled on a node with Unschedulable spec.
  1358. func CheckNodeUnschedulablePredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1359. if nodeInfo == nil || nodeInfo.Node() == nil {
  1360. return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil
  1361. }
  1362. // If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`.
  1363. podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{
  1364. Key: schedulerapi.TaintNodeUnschedulable,
  1365. Effect: v1.TaintEffectNoSchedule,
  1366. })
  1367. // TODO (k82cn): deprecates `node.Spec.Unschedulable` in 1.13.
  1368. if nodeInfo.Node().Spec.Unschedulable && !podToleratesUnschedulable {
  1369. return false, []PredicateFailureReason{ErrNodeUnschedulable}, nil
  1370. }
  1371. return true, nil, nil
  1372. }
  1373. // PodToleratesNodeTaints checks if a pod tolerations can tolerate the node taints
  1374. func PodToleratesNodeTaints(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1375. if nodeInfo == nil || nodeInfo.Node() == nil {
  1376. return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil
  1377. }
  1378. return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
  1379. // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
  1380. return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
  1381. })
  1382. }
  1383. // PodToleratesNodeNoExecuteTaints checks if a pod tolerations can tolerate the node's NoExecute taints
  1384. func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1385. return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
  1386. return t.Effect == v1.TaintEffectNoExecute
  1387. })
  1388. }
  1389. func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, filter func(t *v1.Taint) bool) (bool, []PredicateFailureReason, error) {
  1390. taints, err := nodeInfo.Taints()
  1391. if err != nil {
  1392. return false, nil, err
  1393. }
  1394. if v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, filter) {
  1395. return true, nil, nil
  1396. }
  1397. return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
  1398. }
  1399. // isPodBestEffort checks if pod is scheduled with best-effort QoS
  1400. func isPodBestEffort(pod *v1.Pod) bool {
  1401. return v1qos.GetPodQOS(pod) == v1.PodQOSBestEffort
  1402. }
  1403. // CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
  1404. // reporting memory pressure condition.
  1405. func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1406. var podBestEffort bool
  1407. if predicateMeta, ok := meta.(*predicateMetadata); ok {
  1408. podBestEffort = predicateMeta.podBestEffort
  1409. } else {
  1410. // We couldn't parse metadata - fallback to computing it.
  1411. podBestEffort = isPodBestEffort(pod)
  1412. }
  1413. // pod is not BestEffort pod
  1414. if !podBestEffort {
  1415. return true, nil, nil
  1416. }
  1417. // check if node is under memory pressure
  1418. if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
  1419. return false, []PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
  1420. }
  1421. return true, nil, nil
  1422. }
  1423. // CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
  1424. // reporting disk pressure condition.
  1425. func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1426. // check if node is under disk pressure
  1427. if nodeInfo.DiskPressureCondition() == v1.ConditionTrue {
  1428. return false, []PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
  1429. }
  1430. return true, nil, nil
  1431. }
  1432. // CheckNodePIDPressurePredicate checks if a pod can be scheduled on a node
  1433. // reporting pid pressure condition.
  1434. func CheckNodePIDPressurePredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1435. // check if node is under pid pressure
  1436. if nodeInfo.PIDPressureCondition() == v1.ConditionTrue {
  1437. return false, []PredicateFailureReason{ErrNodeUnderPIDPressure}, nil
  1438. }
  1439. return true, nil, nil
  1440. }
  1441. // CheckNodeConditionPredicate checks if a pod can be scheduled on a node reporting
  1442. // network unavailable and not ready condition. Only node conditions are accounted in this predicate.
  1443. func CheckNodeConditionPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1444. reasons := []PredicateFailureReason{}
  1445. if nodeInfo == nil || nodeInfo.Node() == nil {
  1446. return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil
  1447. }
  1448. node := nodeInfo.Node()
  1449. for _, cond := range node.Status.Conditions {
  1450. // We consider the node for scheduling only when its:
  1451. // - NodeReady condition status is ConditionTrue,
  1452. // - NodeNetworkUnavailable condition status is ConditionFalse.
  1453. if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
  1454. reasons = append(reasons, ErrNodeNotReady)
  1455. } else if cond.Type == v1.NodeNetworkUnavailable && cond.Status != v1.ConditionFalse {
  1456. reasons = append(reasons, ErrNodeNetworkUnavailable)
  1457. }
  1458. }
  1459. if node.Spec.Unschedulable {
  1460. reasons = append(reasons, ErrNodeUnschedulable)
  1461. }
  1462. return len(reasons) == 0, reasons, nil
  1463. }
  1464. // VolumeBindingChecker contains information to check a volume binding.
  1465. type VolumeBindingChecker struct {
  1466. binder *volumebinder.VolumeBinder
  1467. }
  1468. // NewVolumeBindingPredicate evaluates if a pod can fit due to the volumes it requests,
  1469. // for both bound and unbound PVCs.
  1470. //
  1471. // For PVCs that are bound, then it checks that the corresponding PV's node affinity is
  1472. // satisfied by the given node.
  1473. //
  1474. // For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
  1475. // and that the PV node affinity is satisfied by the given node.
  1476. //
  1477. // The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
  1478. // PVCs can be matched with an available and node-compatible PV.
  1479. func NewVolumeBindingPredicate(binder *volumebinder.VolumeBinder) FitPredicate {
  1480. c := &VolumeBindingChecker{
  1481. binder: binder,
  1482. }
  1483. return c.predicate
  1484. }
  1485. func podHasPVCs(pod *v1.Pod) bool {
  1486. for _, vol := range pod.Spec.Volumes {
  1487. if vol.PersistentVolumeClaim != nil {
  1488. return true
  1489. }
  1490. }
  1491. return false
  1492. }
  1493. func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
  1494. // If pod does not request any PVC, we don't need to do anything.
  1495. if !podHasPVCs(pod) {
  1496. return true, nil, nil
  1497. }
  1498. node := nodeInfo.Node()
  1499. if node == nil {
  1500. return false, nil, fmt.Errorf("node not found")
  1501. }
  1502. unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
  1503. if err != nil {
  1504. return false, nil, err
  1505. }
  1506. failReasons := []PredicateFailureReason{}
  1507. if !boundSatisfied {
  1508. klog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
  1509. failReasons = append(failReasons, ErrVolumeNodeConflict)
  1510. }
  1511. if !unboundSatisfied {
  1512. klog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
  1513. failReasons = append(failReasons, ErrVolumeBindConflict)
  1514. }
  1515. if len(failReasons) > 0 {
  1516. return false, failReasons, nil
  1517. }
  1518. // All volumes bound or matching PVs found for all unbound PVCs
  1519. klog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
  1520. return true, nil, nil
  1521. }