123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package priorities
- import (
- "context"
- "sync"
- "sync/atomic"
- v1 "k8s.io/api/core/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/scheduler/algorithm"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/klog"
- )
- type InterPodAffinity struct {
- info predicates.NodeInfo
- nodeLister algorithm.NodeLister
- podLister algorithm.PodLister
- hardPodAffinityWeight int32
- }
- func NewInterPodAffinityPriority(
- info predicates.NodeInfo,
- nodeLister algorithm.NodeLister,
- podLister algorithm.PodLister,
- hardPodAffinityWeight int32) PriorityFunction {
- interPodAffinity := &InterPodAffinity{
- info: info,
- nodeLister: nodeLister,
- podLister: podLister,
- hardPodAffinityWeight: hardPodAffinityWeight,
- }
- return interPodAffinity.CalculateInterPodAffinityPriority
- }
- type podAffinityPriorityMap struct {
- sync.Mutex
-
- nodes []*v1.Node
-
-
- counts map[string]*int64
-
- firstError error
- }
- func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
- return &podAffinityPriorityMap{
- nodes: nodes,
- counts: make(map[string]*int64, len(nodes)),
- }
- }
- func (p *podAffinityPriorityMap) setError(err error) {
- p.Lock()
- defer p.Unlock()
- if p.firstError == nil {
- p.firstError = err
- }
- }
- func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) {
- namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
- selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
- if err != nil {
- p.setError(err)
- return
- }
- match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
- if match {
- for _, node := range p.nodes {
- if priorityutil.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) {
- atomic.AddInt64(p.counts[node.Name], weight)
- }
- }
- }
- }
- func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
- for i := range terms {
- term := &terms[i]
- p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier)))
- }
- }
- func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
- affinity := pod.Spec.Affinity
- hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
- hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
-
-
- pm := newPodAffinityPriorityMap(nodes)
- allNodeNames := make([]string, 0, len(nodeNameToInfo))
- lazyInit := hasAffinityConstraints || hasAntiAffinityConstraints
- for name := range nodeNameToInfo {
- allNodeNames = append(allNodeNames, name)
-
- if lazyInit || len(nodeNameToInfo[name].PodsWithAffinity()) != 0 {
- pm.counts[name] = new(int64)
- }
- }
-
- var maxCount, minCount int64
- processPod := func(existingPod *v1.Pod) error {
- existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName)
- if err != nil {
- if apierrors.IsNotFound(err) {
- klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
- return nil
- }
- return err
- }
- existingPodAffinity := existingPod.Spec.Affinity
- existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
- existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
- if hasAffinityConstraints {
-
-
-
- terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
- pm.processTerms(terms, pod, existingPod, existingPodNode, 1)
- }
- if hasAntiAffinityConstraints {
-
-
-
- terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
- pm.processTerms(terms, pod, existingPod, existingPodNode, -1)
- }
- if existingHasAffinityConstraints {
-
-
-
- if ipa.hardPodAffinityWeight > 0 {
- terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
-
-
-
-
- for _, term := range terms {
- pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight))
- }
- }
-
-
-
- terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
- pm.processTerms(terms, existingPod, pod, existingPodNode, 1)
- }
- if existingHasAntiAffinityConstraints {
-
-
-
- terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
- pm.processTerms(terms, existingPod, pod, existingPodNode, -1)
- }
- return nil
- }
- processNode := func(i int) {
- nodeInfo := nodeNameToInfo[allNodeNames[i]]
- if nodeInfo.Node() != nil {
- if hasAffinityConstraints || hasAntiAffinityConstraints {
-
- for _, existingPod := range nodeInfo.Pods() {
- if err := processPod(existingPod); err != nil {
- pm.setError(err)
- }
- }
- } else {
-
-
- for _, existingPod := range nodeInfo.PodsWithAffinity() {
- if err := processPod(existingPod); err != nil {
- pm.setError(err)
- }
- }
- }
- }
- }
- workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
- if pm.firstError != nil {
- return nil, pm.firstError
- }
- for _, node := range nodes {
- if pm.counts[node.Name] == nil {
- continue
- }
- if *pm.counts[node.Name] > maxCount {
- maxCount = *pm.counts[node.Name]
- }
- if *pm.counts[node.Name] < minCount {
- minCount = *pm.counts[node.Name]
- }
- }
-
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- maxMinDiff := maxCount - minCount
- for _, node := range nodes {
- fScore := float64(0)
- if maxMinDiff > 0 && pm.counts[node.Name] != nil {
- fScore = float64(schedulerapi.MaxPriority) * (float64(*pm.counts[node.Name]-minCount) / float64(maxCount-minCount))
- }
- result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: float64(fScore)})
- if klog.V(10) {
- klog.Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
- }
- }
- return result, nil
- }
|