metadata.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package predicates
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "k8s.io/klog"
  19. "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/client-go/util/workqueue"
  24. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  25. priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
  26. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  27. schedutil "k8s.io/kubernetes/pkg/scheduler/util"
  28. )
  29. // PredicateMetadata interface represents anything that can access a predicate metadata.
  30. type PredicateMetadata interface {
  31. ShallowCopy() PredicateMetadata
  32. AddPod(addedPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) error
  33. RemovePod(deletedPod *v1.Pod) error
  34. }
  35. // PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
  36. type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata
  37. // PredicateMetadataFactory defines a factory of predicate metadata.
  38. type PredicateMetadataFactory struct {
  39. podLister algorithm.PodLister
  40. }
  41. // AntiAffinityTerm's topology key value used in predicate metadata
  42. type topologyPair struct {
  43. key string
  44. value string
  45. }
  46. type podSet map[*v1.Pod]struct{}
  47. type topologyPairSet map[topologyPair]struct{}
  48. // topologyPairsMaps keeps topologyPairToAntiAffinityPods and antiAffinityPodToTopologyPairs in sync
  49. // as they are the inverse of each others.
  50. type topologyPairsMaps struct {
  51. topologyPairToPods map[topologyPair]podSet
  52. podToTopologyPairs map[string]topologyPairSet
  53. }
  54. // NOTE: When new fields are added/removed or logic is changed, please make sure that
  55. // RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
  56. type predicateMetadata struct {
  57. pod *v1.Pod
  58. podBestEffort bool
  59. podRequest *schedulernodeinfo.Resource
  60. podPorts []*v1.ContainerPort
  61. topologyPairsAntiAffinityPodsMap *topologyPairsMaps
  62. // A map of topology pairs to a list of Pods that can potentially match
  63. // the affinity terms of the "pod" and its inverse.
  64. topologyPairsPotentialAffinityPods *topologyPairsMaps
  65. // A map of topology pairs to a list of Pods that can potentially match
  66. // the anti-affinity terms of the "pod" and its inverse.
  67. topologyPairsPotentialAntiAffinityPods *topologyPairsMaps
  68. serviceAffinityInUse bool
  69. serviceAffinityMatchingPodList []*v1.Pod
  70. serviceAffinityMatchingPodServices []*v1.Service
  71. // ignoredExtendedResources is a set of extended resource names that will
  72. // be ignored in the PodFitsResources predicate.
  73. //
  74. // They can be scheduler extender managed resources, the consumption of
  75. // which should be accounted only by the extenders. This set is synthesized
  76. // from scheduler extender configuration and does not change per pod.
  77. ignoredExtendedResources sets.String
  78. }
  79. // Ensure that predicateMetadata implements algorithm.PredicateMetadata.
  80. var _ PredicateMetadata = &predicateMetadata{}
  81. // predicateMetadataProducer function produces predicate metadata. It is stored in a global variable below
  82. // and used to modify the return values of PredicateMetadataProducer
  83. type predicateMetadataProducer func(pm *predicateMetadata)
  84. var predicateMetadataProducers = make(map[string]predicateMetadataProducer)
  85. // RegisterPredicateMetadataProducer registers a PredicateMetadataProducer.
  86. func RegisterPredicateMetadataProducer(predicateName string, precomp predicateMetadataProducer) {
  87. predicateMetadataProducers[predicateName] = precomp
  88. }
  89. // EmptyPredicateMetadataProducer returns a no-op MetadataProducer type.
  90. func EmptyPredicateMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata {
  91. return nil
  92. }
  93. // RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a
  94. // PredicateMetadataProducer that creates predicate metadata with the provided
  95. // options for extended resources.
  96. //
  97. // See the comments in "predicateMetadata" for the explanation of the options.
  98. func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
  99. RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
  100. pm.ignoredExtendedResources = ignoredExtendedResources
  101. })
  102. }
  103. // NewPredicateMetadataFactory creates a PredicateMetadataFactory.
  104. func NewPredicateMetadataFactory(podLister algorithm.PodLister) PredicateMetadataProducer {
  105. factory := &PredicateMetadataFactory{
  106. podLister,
  107. }
  108. return factory.GetMetadata
  109. }
  110. // GetMetadata returns the predicateMetadata used which will be used by various predicates.
  111. func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInfoMap map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata {
  112. // If we cannot compute metadata, just return nil
  113. if pod == nil {
  114. return nil
  115. }
  116. // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
  117. existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, nodeNameToInfoMap)
  118. if err != nil {
  119. return nil
  120. }
  121. // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
  122. // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
  123. incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, nodeNameToInfoMap)
  124. if err != nil {
  125. klog.Errorf("[predicate meta data generation] error finding pods that match affinity terms: %v", err)
  126. return nil
  127. }
  128. predicateMetadata := &predicateMetadata{
  129. pod: pod,
  130. podBestEffort: isPodBestEffort(pod),
  131. podRequest: GetResourceRequest(pod),
  132. podPorts: schedutil.GetContainerPorts(pod),
  133. topologyPairsPotentialAffinityPods: incomingPodAffinityMap,
  134. topologyPairsPotentialAntiAffinityPods: incomingPodAntiAffinityMap,
  135. topologyPairsAntiAffinityPodsMap: existingPodAntiAffinityMap,
  136. }
  137. for predicateName, precomputeFunc := range predicateMetadataProducers {
  138. klog.V(10).Infof("Precompute: %v", predicateName)
  139. precomputeFunc(predicateMetadata)
  140. }
  141. return predicateMetadata
  142. }
  143. // returns a pointer to a new topologyPairsMaps
  144. func newTopologyPairsMaps() *topologyPairsMaps {
  145. return &topologyPairsMaps{topologyPairToPods: make(map[topologyPair]podSet),
  146. podToTopologyPairs: make(map[string]topologyPairSet)}
  147. }
  148. func (topologyPairsMaps *topologyPairsMaps) addTopologyPair(pair topologyPair, pod *v1.Pod) {
  149. podFullName := schedutil.GetPodFullName(pod)
  150. if topologyPairsMaps.topologyPairToPods[pair] == nil {
  151. topologyPairsMaps.topologyPairToPods[pair] = make(map[*v1.Pod]struct{})
  152. }
  153. topologyPairsMaps.topologyPairToPods[pair][pod] = struct{}{}
  154. if topologyPairsMaps.podToTopologyPairs[podFullName] == nil {
  155. topologyPairsMaps.podToTopologyPairs[podFullName] = make(map[topologyPair]struct{})
  156. }
  157. topologyPairsMaps.podToTopologyPairs[podFullName][pair] = struct{}{}
  158. }
  159. func (topologyPairsMaps *topologyPairsMaps) removePod(deletedPod *v1.Pod) {
  160. deletedPodFullName := schedutil.GetPodFullName(deletedPod)
  161. for pair := range topologyPairsMaps.podToTopologyPairs[deletedPodFullName] {
  162. delete(topologyPairsMaps.topologyPairToPods[pair], deletedPod)
  163. if len(topologyPairsMaps.topologyPairToPods[pair]) == 0 {
  164. delete(topologyPairsMaps.topologyPairToPods, pair)
  165. }
  166. }
  167. delete(topologyPairsMaps.podToTopologyPairs, deletedPodFullName)
  168. }
  169. func (topologyPairsMaps *topologyPairsMaps) appendMaps(toAppend *topologyPairsMaps) {
  170. if toAppend == nil {
  171. return
  172. }
  173. for pair := range toAppend.topologyPairToPods {
  174. for pod := range toAppend.topologyPairToPods[pair] {
  175. topologyPairsMaps.addTopologyPair(pair, pod)
  176. }
  177. }
  178. }
  179. // RemovePod changes predicateMetadata assuming that the given `deletedPod` is
  180. // deleted from the system.
  181. func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
  182. deletedPodFullName := schedutil.GetPodFullName(deletedPod)
  183. if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
  184. return fmt.Errorf("deletedPod and meta.pod must not be the same")
  185. }
  186. meta.topologyPairsAntiAffinityPodsMap.removePod(deletedPod)
  187. // Delete pod from the matching affinity or anti-affinity topology pairs maps.
  188. meta.topologyPairsPotentialAffinityPods.removePod(deletedPod)
  189. meta.topologyPairsPotentialAntiAffinityPods.removePod(deletedPod)
  190. // All pods in the serviceAffinityMatchingPodList are in the same namespace.
  191. // So, if the namespace of the first one is not the same as the namespace of the
  192. // deletedPod, we don't need to check the list, as deletedPod isn't in the list.
  193. if meta.serviceAffinityInUse &&
  194. len(meta.serviceAffinityMatchingPodList) > 0 &&
  195. deletedPod.Namespace == meta.serviceAffinityMatchingPodList[0].Namespace {
  196. for i, pod := range meta.serviceAffinityMatchingPodList {
  197. if schedutil.GetPodFullName(pod) == deletedPodFullName {
  198. meta.serviceAffinityMatchingPodList = append(
  199. meta.serviceAffinityMatchingPodList[:i],
  200. meta.serviceAffinityMatchingPodList[i+1:]...)
  201. break
  202. }
  203. }
  204. }
  205. return nil
  206. }
  207. // AddPod changes predicateMetadata assuming that `newPod` is added to the
  208. // system.
  209. func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) error {
  210. addedPodFullName := schedutil.GetPodFullName(addedPod)
  211. if addedPodFullName == schedutil.GetPodFullName(meta.pod) {
  212. return fmt.Errorf("addedPod and meta.pod must not be the same")
  213. }
  214. if nodeInfo.Node() == nil {
  215. return fmt.Errorf("invalid node in nodeInfo")
  216. }
  217. // Add matching anti-affinity terms of the addedPod to the map.
  218. topologyPairsMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(meta.pod, addedPod, nodeInfo.Node())
  219. if err != nil {
  220. return err
  221. }
  222. meta.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps)
  223. // Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
  224. affinity := meta.pod.Spec.Affinity
  225. podNodeName := addedPod.Spec.NodeName
  226. if affinity != nil && len(podNodeName) > 0 {
  227. podNode := nodeInfo.Node()
  228. // It is assumed that when the added pod matches affinity of the meta.pod, all the terms must match,
  229. // this should be changed when the implementation of targetPodMatchesAffinityOfPod/podMatchesAffinityTermProperties
  230. // is changed
  231. if targetPodMatchesAffinityOfPod(meta.pod, addedPod) {
  232. affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
  233. for _, term := range affinityTerms {
  234. if topologyValue, ok := podNode.Labels[term.TopologyKey]; ok {
  235. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  236. meta.topologyPairsPotentialAffinityPods.addTopologyPair(pair, addedPod)
  237. }
  238. }
  239. }
  240. if targetPodMatchesAntiAffinityOfPod(meta.pod, addedPod) {
  241. antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
  242. for _, term := range antiAffinityTerms {
  243. if topologyValue, ok := podNode.Labels[term.TopologyKey]; ok {
  244. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  245. meta.topologyPairsPotentialAntiAffinityPods.addTopologyPair(pair, addedPod)
  246. }
  247. }
  248. }
  249. }
  250. // If addedPod is in the same namespace as the meta.pod, update the list
  251. // of matching pods if applicable.
  252. if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace {
  253. selector := CreateSelectorFromLabels(meta.pod.Labels)
  254. if selector.Matches(labels.Set(addedPod.Labels)) {
  255. meta.serviceAffinityMatchingPodList = append(meta.serviceAffinityMatchingPodList,
  256. addedPod)
  257. }
  258. }
  259. return nil
  260. }
  261. // ShallowCopy copies a metadata struct into a new struct and creates a copy of
  262. // its maps and slices, but it does not copy the contents of pointer values.
  263. func (meta *predicateMetadata) ShallowCopy() PredicateMetadata {
  264. newPredMeta := &predicateMetadata{
  265. pod: meta.pod,
  266. podBestEffort: meta.podBestEffort,
  267. podRequest: meta.podRequest,
  268. serviceAffinityInUse: meta.serviceAffinityInUse,
  269. ignoredExtendedResources: meta.ignoredExtendedResources,
  270. }
  271. newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
  272. newPredMeta.topologyPairsPotentialAffinityPods = newTopologyPairsMaps()
  273. newPredMeta.topologyPairsPotentialAffinityPods.appendMaps(meta.topologyPairsPotentialAffinityPods)
  274. newPredMeta.topologyPairsPotentialAntiAffinityPods = newTopologyPairsMaps()
  275. newPredMeta.topologyPairsPotentialAntiAffinityPods.appendMaps(meta.topologyPairsPotentialAntiAffinityPods)
  276. newPredMeta.topologyPairsAntiAffinityPodsMap = newTopologyPairsMaps()
  277. newPredMeta.topologyPairsAntiAffinityPodsMap.appendMaps(meta.topologyPairsAntiAffinityPodsMap)
  278. newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
  279. meta.serviceAffinityMatchingPodServices...)
  280. newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
  281. meta.serviceAffinityMatchingPodList...)
  282. return (PredicateMetadata)(newPredMeta)
  283. }
  284. type affinityTermProperties struct {
  285. namespaces sets.String
  286. selector labels.Selector
  287. }
  288. // getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and
  289. // selectors of the terms.
  290. func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (properties []*affinityTermProperties, err error) {
  291. if terms == nil {
  292. return properties, nil
  293. }
  294. for _, term := range terms {
  295. namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
  296. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  297. if err != nil {
  298. return nil, err
  299. }
  300. properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector})
  301. }
  302. return properties, nil
  303. }
  304. // podMatchesAllAffinityTermProperties returns true IFF the given pod matches all the given properties.
  305. func podMatchesAllAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
  306. if len(properties) == 0 {
  307. return false
  308. }
  309. for _, property := range properties {
  310. if !priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
  311. return false
  312. }
  313. }
  314. return true
  315. }
  316. // podMatchesAnyAffinityTermProperties returns true if the given pod matches any given property.
  317. func podMatchesAnyAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
  318. if len(properties) == 0 {
  319. return false
  320. }
  321. for _, property := range properties {
  322. if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
  323. return true
  324. }
  325. }
  326. return false
  327. }
  328. // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
  329. // (1) Whether it has PodAntiAffinity
  330. // (2) Whether any AffinityTerm matches the incoming pod
  331. func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) {
  332. allNodeNames := make([]string, 0, len(nodeInfoMap))
  333. for name := range nodeInfoMap {
  334. allNodeNames = append(allNodeNames, name)
  335. }
  336. var lock sync.Mutex
  337. var firstError error
  338. topologyMaps := newTopologyPairsMaps()
  339. appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
  340. lock.Lock()
  341. defer lock.Unlock()
  342. topologyMaps.appendMaps(toAppend)
  343. }
  344. catchError := func(err error) {
  345. lock.Lock()
  346. defer lock.Unlock()
  347. if firstError == nil {
  348. firstError = err
  349. }
  350. }
  351. ctx, cancel := context.WithCancel(context.Background())
  352. processNode := func(i int) {
  353. nodeInfo := nodeInfoMap[allNodeNames[i]]
  354. node := nodeInfo.Node()
  355. if node == nil {
  356. catchError(fmt.Errorf("node not found"))
  357. return
  358. }
  359. for _, existingPod := range nodeInfo.PodsWithAffinity() {
  360. existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
  361. if err != nil {
  362. catchError(err)
  363. cancel()
  364. return
  365. }
  366. appendTopologyPairsMaps(existingPodTopologyMaps)
  367. }
  368. }
  369. workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
  370. return topologyMaps, firstError
  371. }
  372. // getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
  373. // It returns a topologyPairsMaps that are checked later by the affinity
  374. // predicate. With this topologyPairsMaps available, the affinity predicate does not
  375. // need to check all the pods in the cluster.
  376. func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) {
  377. affinity := pod.Spec.Affinity
  378. if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
  379. return newTopologyPairsMaps(), newTopologyPairsMaps(), nil
  380. }
  381. allNodeNames := make([]string, 0, len(nodeInfoMap))
  382. for name := range nodeInfoMap {
  383. allNodeNames = append(allNodeNames, name)
  384. }
  385. var lock sync.Mutex
  386. var firstError error
  387. topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
  388. topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
  389. appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps *topologyPairsMaps) {
  390. lock.Lock()
  391. defer lock.Unlock()
  392. if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 {
  393. topologyPairsAffinityPodsMaps.appendMaps(nodeTopologyPairsAffinityPodsMaps)
  394. }
  395. if len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
  396. topologyPairsAntiAffinityPodsMaps.appendMaps(nodeTopologyPairsAntiAffinityPodsMaps)
  397. }
  398. }
  399. catchError := func(err error) {
  400. lock.Lock()
  401. defer lock.Unlock()
  402. if firstError == nil {
  403. firstError = err
  404. }
  405. }
  406. affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
  407. affinityProperties, err := getAffinityTermProperties(pod, affinityTerms)
  408. if err != nil {
  409. return nil, nil, err
  410. }
  411. antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
  412. ctx, cancel := context.WithCancel(context.Background())
  413. processNode := func(i int) {
  414. nodeInfo := nodeInfoMap[allNodeNames[i]]
  415. node := nodeInfo.Node()
  416. if node == nil {
  417. catchError(fmt.Errorf("nodeInfo.Node is nil"))
  418. return
  419. }
  420. nodeTopologyPairsAffinityPodsMaps := newTopologyPairsMaps()
  421. nodeTopologyPairsAntiAffinityPodsMaps := newTopologyPairsMaps()
  422. for _, existingPod := range nodeInfo.Pods() {
  423. // Check affinity properties.
  424. if podMatchesAllAffinityTermProperties(existingPod, affinityProperties) {
  425. for _, term := range affinityTerms {
  426. if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
  427. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  428. nodeTopologyPairsAffinityPodsMaps.addTopologyPair(pair, existingPod)
  429. }
  430. }
  431. }
  432. // Check anti-affinity properties.
  433. for _, term := range antiAffinityTerms {
  434. namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
  435. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  436. if err != nil {
  437. catchError(err)
  438. cancel()
  439. return
  440. }
  441. if priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, namespaces, selector) {
  442. if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
  443. pair := topologyPair{key: term.TopologyKey, value: topologyValue}
  444. nodeTopologyPairsAntiAffinityPodsMaps.addTopologyPair(pair, existingPod)
  445. }
  446. }
  447. }
  448. }
  449. if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 || len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
  450. appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
  451. }
  452. }
  453. workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
  454. return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
  455. }
  456. // targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of
  457. // "pod". This function does not check topology.
  458. // So, whether the targetPod actually matches or not needs further checks for a specific
  459. // node.
  460. func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
  461. affinity := pod.Spec.Affinity
  462. if affinity == nil || affinity.PodAffinity == nil {
  463. return false
  464. }
  465. affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
  466. if err != nil {
  467. klog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
  468. return false
  469. }
  470. return podMatchesAllAffinityTermProperties(targetPod, affinityProperties)
  471. }
  472. // targetPodMatchesAntiAffinityOfPod returns true if "targetPod" matches ANY anti-affinity
  473. // term of "pod". This function does not check topology.
  474. // So, whether the targetPod actually matches or not needs further checks for a specific
  475. // node.
  476. func targetPodMatchesAntiAffinityOfPod(pod, targetPod *v1.Pod) bool {
  477. affinity := pod.Spec.Affinity
  478. if affinity == nil || affinity.PodAntiAffinity == nil {
  479. return false
  480. }
  481. properties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
  482. if err != nil {
  483. klog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
  484. return false
  485. }
  486. return podMatchesAnyAffinityTermProperties(targetPod, properties)
  487. }