|
@@ -234,8 +234,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
|
|
|
}
|
|
|
|
|
|
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
|
|
|
- priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
|
|
- //customPriotrityList, err := CustomPrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
|
|
+ //priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
|
|
+ priorityList, err := CustomPrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
|
|
if err != nil {
|
|
|
return result, err
|
|
|
}
|
|
@@ -296,6 +296,29 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
|
|
|
return priorityList[maxScores[ix]].Host, nil
|
|
|
}
|
|
|
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+// ---------START OF CUSTOMIZATION----------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+// func (g *genericScheduler) customSelectHost(priorityList schedulerapi.CustomHostPriorityList) (string, error) {
|
|
|
+// if len(priorityList) == 0 {
|
|
|
+// return "", fmt.Errorf("empty priorityList")
|
|
|
+// }
|
|
|
+
|
|
|
+// maxScores := findMaxScores(priorityList)
|
|
|
+// ix := int(g.lastNodeIndex % uint64(len(maxScores)))
|
|
|
+// g.lastNodeIndex++
|
|
|
+
|
|
|
+// return priorityList[maxScores[ix]].Host, nil
|
|
|
+// }
|
|
|
+
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+// ---------END OF CUSTOMIZATION----------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+//------------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
// preempt finds nodes with pods that can be preempted to make room for "pod" to
|
|
|
// schedule. It chooses one of the nodes and preempts the pods on the node and
|
|
|
// returns 1) the node, 2) the list of preempted pods if such a node is found,
|
|
@@ -810,148 +833,148 @@ func PrioritizeNodes(
|
|
|
//------------------------------------------------------------------
|
|
|
//-------------------START-CUSTOM-BY-IWITA---------------------------------------------------------
|
|
|
//------------------------------------------------------------------
|
|
|
-func CustomPrioritizeNodes(
|
|
|
- pod *v1.Pod,
|
|
|
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
|
|
- meta interface{},
|
|
|
- priorityConfigs []priorities.PriorityConfig,
|
|
|
- nodes []*v1.Node,
|
|
|
- extenders []algorithm.SchedulerExtender,
|
|
|
-) (schedulerapi.CustomHostPriorityList, error) {
|
|
|
- // If no priority configs are provided, then the EqualPriority function is applied
|
|
|
- // This is required to generate the priority list in the required format
|
|
|
- // if len(priorityConfigs) == 0 && len(extenders) == 0 {
|
|
|
- // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
|
|
|
- // for i := range nodes {
|
|
|
- // // initializes nodes with Score = 1
|
|
|
- // hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
|
|
|
- // if err != nil {
|
|
|
- // return nil, err
|
|
|
- // }
|
|
|
- // result = append(result, hostPriority)
|
|
|
- // }
|
|
|
- // return result, nil
|
|
|
- // }
|
|
|
-
|
|
|
- var (
|
|
|
- mu = sync.Mutex{}
|
|
|
- wg = sync.WaitGroup{}
|
|
|
- errs []error
|
|
|
- )
|
|
|
- appendError := func(err error) {
|
|
|
- mu.Lock()
|
|
|
- defer mu.Unlock()
|
|
|
- errs = append(errs, err)
|
|
|
- }
|
|
|
-
|
|
|
- results := make([]schedulerapi.CustomHostPriorityList, len(priorityConfigs), len(priorityConfigs))
|
|
|
-
|
|
|
- // DEPRECATED: we can remove this when all priorityConfigs implement the
|
|
|
- // Map-Reduce pattern.
|
|
|
- for i := range priorityConfigs {
|
|
|
- if priorityConfigs[i].CustomFunction != nil {
|
|
|
- wg.Add(1)
|
|
|
- go func(index int) {
|
|
|
- defer wg.Done()
|
|
|
- var err error
|
|
|
- results[index], err = priorityConfigs[index].CustomFunction(pod, nodeNameToInfo, nodes)
|
|
|
- if err != nil {
|
|
|
- appendError(err)
|
|
|
- }
|
|
|
- }(i)
|
|
|
- } else {
|
|
|
- results[i] = make(schedulerapi.CustomHostPriorityList, len(nodes))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
|
|
|
- nodeInfo := nodeNameToInfo[nodes[index].Name]
|
|
|
- for i := range priorityConfigs {
|
|
|
- if priorityConfigs[i].Function != nil {
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- var err error
|
|
|
- results[i][index], err = priorityConfigs[i].CustomMap(pod, meta, nodeInfo)
|
|
|
- if err != nil {
|
|
|
- appendError(err)
|
|
|
- results[i][index].Host = nodes[index].Name
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
-
|
|
|
- for i := range priorityConfigs {
|
|
|
- if priorityConfigs[i].Reduce == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- wg.Add(1)
|
|
|
- go func(index int) {
|
|
|
- defer wg.Done()
|
|
|
- if err := priorityConfigs[index].CustomReduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
|
|
|
- appendError(err)
|
|
|
- }
|
|
|
- if klog.V(10) {
|
|
|
- for _, hostPriority := range results[index] {
|
|
|
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
|
|
|
- }
|
|
|
- }
|
|
|
- }(i)
|
|
|
- }
|
|
|
- // Wait for all computations to be finished.
|
|
|
- wg.Wait()
|
|
|
- if len(errs) != 0 {
|
|
|
- return schedulerapi.CustomHostPriorityList{}, errors.NewAggregate(errs)
|
|
|
- }
|
|
|
-
|
|
|
- // Summarize all scores.
|
|
|
- result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
|
|
|
-
|
|
|
- for i := range nodes {
|
|
|
- result = append(result, schedulerapi.CustomHostPriority{Host: nodes[i].Name, Score: 0})
|
|
|
- for j := range priorityConfigs {
|
|
|
- result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if len(extenders) != 0 && nodes != nil {
|
|
|
- combinedScores := make(map[string]float64, len(nodeNameToInfo))
|
|
|
- for i := range extenders {
|
|
|
- if !extenders[i].IsInterested(pod) {
|
|
|
- continue
|
|
|
- }
|
|
|
- wg.Add(1)
|
|
|
- go func(extIndex int) {
|
|
|
- defer wg.Done()
|
|
|
- prioritizedList, weight, err := extenders[extIndex].CustomPrioritize(pod, nodes)
|
|
|
- if err != nil {
|
|
|
- // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
|
|
|
- return
|
|
|
- }
|
|
|
- mu.Lock()
|
|
|
- for i := range *prioritizedList {
|
|
|
- host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
|
|
|
- if klog.V(10) {
|
|
|
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
|
|
|
- }
|
|
|
- combinedScores[host] += score * float64(weight)
|
|
|
- }
|
|
|
- mu.Unlock()
|
|
|
- }(i)
|
|
|
- }
|
|
|
- // wait for all go routines to finish
|
|
|
- wg.Wait()
|
|
|
- for i := range result {
|
|
|
- result[i].Score += combinedScores[result[i].Host]
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if klog.V(10) {
|
|
|
- for i := range result {
|
|
|
- klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
|
|
|
- }
|
|
|
- }
|
|
|
- return result, nil
|
|
|
-}
|
|
|
+// func CustomPrioritizeNodes(
|
|
|
+// pod *v1.Pod,
|
|
|
+// nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
|
|
+// meta interface{},
|
|
|
+// priorityConfigs []priorities.PriorityConfig,
|
|
|
+// nodes []*v1.Node,
|
|
|
+// extenders []algorithm.SchedulerExtender,
|
|
|
+// ) (schedulerapi.CustomHostPriorityList, error) {
|
|
|
+// // If no priority configs are provided, then the EqualPriority function is applied
|
|
|
+// // This is required to generate the priority list in the required format
|
|
|
+// // if len(priorityConfigs) == 0 && len(extenders) == 0 {
|
|
|
+// // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
|
|
|
+// // for i := range nodes {
|
|
|
+// // // initializes nodes with Score = 1
|
|
|
+// // hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
|
|
|
+// // if err != nil {
|
|
|
+// // return nil, err
|
|
|
+// // }
|
|
|
+// // result = append(result, hostPriority)
|
|
|
+// // }
|
|
|
+// // return result, nil
|
|
|
+// // }
|
|
|
+
|
|
|
+// var (
|
|
|
+// mu = sync.Mutex{}
|
|
|
+// wg = sync.WaitGroup{}
|
|
|
+// errs []error
|
|
|
+// )
|
|
|
+// appendError := func(err error) {
|
|
|
+// mu.Lock()
|
|
|
+// defer mu.Unlock()
|
|
|
+// errs = append(errs, err)
|
|
|
+// }
|
|
|
+
|
|
|
+// results := make([]schedulerapi.CustomHostPriorityList, len(priorityConfigs), len(priorityConfigs))
|
|
|
+
|
|
|
+// // DEPRECATED: we can remove this when all priorityConfigs implement the
|
|
|
+// // Map-Reduce pattern.
|
|
|
+// for i := range priorityConfigs {
|
|
|
+// if priorityConfigs[i].CustomFunction != nil {
|
|
|
+// wg.Add(1)
|
|
|
+// go func(index int) {
|
|
|
+// defer wg.Done()
|
|
|
+// var err error
|
|
|
+// results[index], err = priorityConfigs[index].CustomFunction(pod, nodeNameToInfo, nodes)
|
|
|
+// if err != nil {
|
|
|
+// appendError(err)
|
|
|
+// }
|
|
|
+// }(i)
|
|
|
+// } else {
|
|
|
+// results[i] = make(schedulerapi.CustomHostPriorityList, len(nodes))
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
|
|
|
+// nodeInfo := nodeNameToInfo[nodes[index].Name]
|
|
|
+// for i := range priorityConfigs {
|
|
|
+// if priorityConfigs[i].Function != nil {
|
|
|
+// continue
|
|
|
+// }
|
|
|
+
|
|
|
+// var err error
|
|
|
+// results[i][index], err = priorityConfigs[i].CustomMap(pod, meta, nodeInfo)
|
|
|
+// if err != nil {
|
|
|
+// appendError(err)
|
|
|
+// results[i][index].Host = nodes[index].Name
|
|
|
+// }
|
|
|
+// }
|
|
|
+// })
|
|
|
+
|
|
|
+// for i := range priorityConfigs {
|
|
|
+// if priorityConfigs[i].Reduce == nil {
|
|
|
+// continue
|
|
|
+// }
|
|
|
+// wg.Add(1)
|
|
|
+// go func(index int) {
|
|
|
+// defer wg.Done()
|
|
|
+// if err := priorityConfigs[index].CustomReduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
|
|
|
+// appendError(err)
|
|
|
+// }
|
|
|
+// if klog.V(10) {
|
|
|
+// for _, hostPriority := range results[index] {
|
|
|
+// klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }(i)
|
|
|
+// }
|
|
|
+// // Wait for all computations to be finished.
|
|
|
+// wg.Wait()
|
|
|
+// if len(errs) != 0 {
|
|
|
+// return schedulerapi.CustomHostPriorityList{}, errors.NewAggregate(errs)
|
|
|
+// }
|
|
|
+
|
|
|
+// // Summarize all scores.
|
|
|
+// result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
|
|
|
+
|
|
|
+// for i := range nodes {
|
|
|
+// result = append(result, schedulerapi.CustomHostPriority{Host: nodes[i].Name, Score: 0})
|
|
|
+// for j := range priorityConfigs {
|
|
|
+// result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// if len(extenders) != 0 && nodes != nil {
|
|
|
+// combinedScores := make(map[string]float64, len(nodeNameToInfo))
|
|
|
+// for i := range extenders {
|
|
|
+// if !extenders[i].IsInterested(pod) {
|
|
|
+// continue
|
|
|
+// }
|
|
|
+// wg.Add(1)
|
|
|
+// go func(extIndex int) {
|
|
|
+// defer wg.Done()
|
|
|
+// prioritizedList, weight, err := extenders[extIndex].CustomPrioritize(pod, nodes)
|
|
|
+// if err != nil {
|
|
|
+// // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
|
|
|
+// return
|
|
|
+// }
|
|
|
+// mu.Lock()
|
|
|
+// for i := range *prioritizedList {
|
|
|
+// host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
|
|
|
+// if klog.V(10) {
|
|
|
+// klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
|
|
|
+// }
|
|
|
+// combinedScores[host] += score * float64(weight)
|
|
|
+// }
|
|
|
+// mu.Unlock()
|
|
|
+// }(i)
|
|
|
+// }
|
|
|
+// // wait for all go routines to finish
|
|
|
+// wg.Wait()
|
|
|
+// for i := range result {
|
|
|
+// result[i].Score += combinedScores[result[i].Host]
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// if klog.V(10) {
|
|
|
+// for i := range result {
|
|
|
+// klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return result, nil
|
|
|
+// }
|
|
|
|
|
|
//------------------------------------------------------------------
|
|
|
// --------------END-CUSTOM-BY-IWITA--------------------------------
|