Achilleas Tzenetopoulos %!s(int64=5) %!d(string=hai) anos
pai
achega
d644de03af

+ 4 - 4
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -112,15 +112,15 @@ func customScoreFn(metrics map[string]float64) float64 {
 }
 
 func calculateScore(results map[string]float64,
-	logicFn func(map[string]float64) float64) int64 {
+	logicFn func(map[string]float64) float64) float64 {
 
 	res := logicFn(results)
 	klog.Infof("Has score (in float) %v\n", res)
 
 	// TODO
 	// While the final score should be an integer,
-	// find a solution about resolving the float produced
-	return int64(res)
+	// find a solution about resolving the float prflduced
+	return res
 }
 
 func calculateWeightedAverage(response *client.Response,
@@ -177,7 +177,7 @@ func queryInfluxDB(metrics []string, uuid string, socket int,
 	return calculateWeightedAverage(response, numberOfRows, len(metrics))
 }
 
-func customResourceScorer(nodeName string) (int64, error) {
+func customResourceScorer(nodeName string) (float64, error) {
 	//return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
 	//customRequestedScore(requested.Memory, allocable.Memory)) / 2
 

+ 13 - 7
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/resource_allocation.go

@@ -36,7 +36,7 @@ type ResourceAllocationPriority struct {
 
 type CustomAllocationPriority struct {
 	Name   string
-	scorer func(nodeName string) (int64, error)
+	scorer func(nodeName string) (float64, error)
 }
 
 // PriorityMap priorities nodes according to the resource allocations on the node.
@@ -96,13 +96,13 @@ func (r *ResourceAllocationPriority) PriorityMap(
 	}, nil
 }
 
-func (r *CustomAllocationPriority) PriorityMap(
+func (r *CustomAllocationPriority) CustomPriorityMap(
 	pod *v1.Pod,
 	meta interface{},
-	nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
+	nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.CustomHostPriority, error) {
 	node := nodeInfo.Node()
 	if node == nil {
-		return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
+		return schedulerapi.CustomHostPriority{}, fmt.Errorf("node not found")
 	}
 	//allocatable := nodeInfo.AllocatableResource()
 
@@ -116,7 +116,7 @@ func (r *CustomAllocationPriority) PriorityMap(
 
 	//requested.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
 	//requested.Memory += nodeInfo.NonZeroRequest().Memory
-	var score int64
+	var score float64
 	// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
 	// if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
 	// 	score = r.scorer(&requested, &allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
@@ -147,10 +147,16 @@ func (r *CustomAllocationPriority) PriorityMap(
 	// 	}
 	// }
 
-	return schedulerapi.HostPriority{
+	// TODO create a custom HostPriority
+	return schedulerapi.CustomHostPriority{
 		Host:  node.Name,
-		Score: int(score),
+		Score: float64(score),
 	}, nil
+
+	// return schedulerapi.HostPriority{
+	// 	Host:  node.Name,
+	// 	Score: int(score),
+	// }, nil
 }
 
 func getNonZeroRequests(pod *v1.Pod) *schedulernodeinfo.Resource {

+ 13 - 6
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/types.go

@@ -17,7 +17,7 @@ limitations under the License.
 package priorities
 
 import (
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
 	schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
 )
@@ -26,12 +26,14 @@ import (
 // TODO: Figure out the exact API of this method.
 // TODO: Change interface{} to a specific type.
 type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error)
+type CustomPriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.CustomHostPriority, error)
 
 // PriorityReduceFunction is a function that aggregated per-node results and computes
 // final scores for all nodes.
 // TODO: Figure out the exact API of this method.
 // TODO: Change interface{} to a specific type.
 type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.HostPriorityList) error
+type CustomPriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.CustomHostPriorityList) error
 
 // PriorityMetadataProducer is a function that computes metadata for a given pod. This
 // is now used for only for priority functions. For predicates please use PredicateMetadataProducer.
@@ -42,15 +44,20 @@ type PriorityMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*sched
 // Use Map-Reduce pattern for priority functions.
 type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)
 
+type CustomPriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.CustomHostPriorityList, error)
+
 // PriorityConfig is a config used for a priority function.
 type PriorityConfig struct {
-	Name   string
-	Map    PriorityMapFunction
-	Reduce PriorityReduceFunction
+	Name         string
+	Map          PriorityMapFunction
+	CustomMap    CustomPriorityMapFunction
+	Reduce       PriorityReduceFunction
+	CustomReduce CustomPriorityReduceFunction
 	// TODO: Remove it after migrating all functions to
 	// Map-Reduce pattern.
-	Function PriorityFunction
-	Weight   int
+	Function       PriorityFunction
+	CustomFunction CustomPriorityFunction
+	Weight         int
 }
 
 // EmptyPriorityMetadataProducer returns a no-op PriorityMetadataProducer type.

+ 2 - 2
kubernetes-v1.15.4/pkg/scheduler/algorithm/scheduler_interface.go

@@ -17,7 +17,7 @@ limitations under the License.
 package algorithm
 
 import (
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
 	schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
 )
@@ -40,7 +40,7 @@ type SchedulerExtender interface {
 	// are used to compute the weighted score for an extender. The weighted scores are added to
 	// the scores computed  by Kubernetes scheduler. The total scores are used to do the host selection.
 	Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
-
+	CustomPrioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.CustomHostPriorityList, weight int, err error)
 	// Bind delegates the action of binding a pod to a node to the extender.
 	Bind(binding *v1.Binding) error
 

+ 26 - 1
kubernetes-v1.15.4/pkg/scheduler/api/types.go

@@ -19,7 +19,7 @@ package api
 import (
 	"time"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
 )
@@ -335,6 +335,13 @@ type HostPriority struct {
 	Score int
 }
 
+type CustomHostPriority struct {
+	// Name of the host
+	Host string
+	// Score associated with the host (float)
+	Score float64
+}
+
 // HostPriorityList declares a []HostPriority type.
 type HostPriorityList []HostPriority
 
@@ -352,3 +359,21 @@ func (h HostPriorityList) Less(i, j int) bool {
 func (h HostPriorityList) Swap(i, j int) {
 	h[i], h[j] = h[j], h[i]
 }
+
+// CustomHostPriorityList declares a []CustomHostPriority type.
+type CustomHostPriorityList []CustomHostPriority
+
+func (h CustomHostPriorityList) Len() int {
+	return len(h)
+}
+
+func (h CustomHostPriorityList) Less(i, j int) bool {
+	if h[i].Score == h[j].Score {
+		return h[i].Host < h[j].Host
+	}
+	return h[i].Score < h[j].Score
+}
+
+func (h CustomHostPriorityList) Swap(i, j int) {
+	h[i], h[j] = h[j], h[i]
+}

+ 1 - 1
kubernetes-v1.15.4/pkg/scheduler/core/extender.go

@@ -24,7 +24,7 @@ import (
 	"strings"
 	"time"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	utilnet "k8s.io/apimachinery/pkg/util/net"
 	"k8s.io/apimachinery/pkg/util/sets"
 	restclient "k8s.io/client-go/rest"

+ 152 - 1
kubernetes-v1.15.4/pkg/scheduler/core/generic_scheduler.go

@@ -28,7 +28,7 @@ import (
 
 	"k8s.io/klog"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	policy "k8s.io/api/policy/v1beta1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -235,6 +235,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 
 	metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
 	priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
+	//customPriotrityList, err := CustomPrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
 	if err != nil {
 		return result, err
 	}
@@ -806,6 +807,156 @@ func PrioritizeNodes(
 	return result, nil
 }
 
+//------------------------------------------------------------------
+//-------------------START-CUSTOM-BY-IWITA---------------------------------------------------------
+//------------------------------------------------------------------
+func CustomPrioritizeNodes(
+	pod *v1.Pod,
+	nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
+	meta interface{},
+	priorityConfigs []priorities.PriorityConfig,
+	nodes []*v1.Node,
+	extenders []algorithm.SchedulerExtender,
+) (schedulerapi.CustomHostPriorityList, error) {
+	// If no priority configs are provided, then the EqualPriority function is applied
+	// This is required to generate the priority list in the required format
+	// if len(priorityConfigs) == 0 && len(extenders) == 0 {
+	// 	result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
+	// 	for i := range nodes {
+	// 		// initializes nodes with Score = 1
+	// 		hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
+	// 		if err != nil {
+	// 			return nil, err
+	// 		}
+	// 		result = append(result, hostPriority)
+	// 	}
+	// 	return result, nil
+	// }
+
+	var (
+		mu   = sync.Mutex{}
+		wg   = sync.WaitGroup{}
+		errs []error
+	)
+	appendError := func(err error) {
+		mu.Lock()
+		defer mu.Unlock()
+		errs = append(errs, err)
+	}
+
+	results := make([]schedulerapi.CustomHostPriorityList, len(priorityConfigs), len(priorityConfigs))
+
+	// DEPRECATED: we can remove this when all priorityConfigs implement the
+	// Map-Reduce pattern.
+	for i := range priorityConfigs {
+		if priorityConfigs[i].CustomFunction != nil {
+			wg.Add(1)
+			go func(index int) {
+				defer wg.Done()
+				var err error
+				results[index], err = priorityConfigs[index].CustomFunction(pod, nodeNameToInfo, nodes)
+				if err != nil {
+					appendError(err)
+				}
+			}(i)
+		} else {
+			results[i] = make(schedulerapi.CustomHostPriorityList, len(nodes))
+		}
+	}
+
+	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
+		nodeInfo := nodeNameToInfo[nodes[index].Name]
+		for i := range priorityConfigs {
+			if priorityConfigs[i].Function != nil {
+				continue
+			}
+
+			var err error
+			results[i][index], err = priorityConfigs[i].CustomMap(pod, meta, nodeInfo)
+			if err != nil {
+				appendError(err)
+				results[i][index].Host = nodes[index].Name
+			}
+		}
+	})
+
+	for i := range priorityConfigs {
+		if priorityConfigs[i].Reduce == nil {
+			continue
+		}
+		wg.Add(1)
+		go func(index int) {
+			defer wg.Done()
+			if err := priorityConfigs[index].CustomReduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
+				appendError(err)
+			}
+			if klog.V(10) {
+				for _, hostPriority := range results[index] {
+					klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
+				}
+			}
+		}(i)
+	}
+	// Wait for all computations to be finished.
+	wg.Wait()
+	if len(errs) != 0 {
+		return schedulerapi.CustomHostPriorityList{}, errors.NewAggregate(errs)
+	}
+
+	// Summarize all scores.
+	result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
+
+	for i := range nodes {
+		result = append(result, schedulerapi.CustomHostPriority{Host: nodes[i].Name, Score: 0})
+		for j := range priorityConfigs {
+			result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
+		}
+	}
+
+	if len(extenders) != 0 && nodes != nil {
+		combinedScores := make(map[string]float64, len(nodeNameToInfo))
+		for i := range extenders {
+			if !extenders[i].IsInterested(pod) {
+				continue
+			}
+			wg.Add(1)
+			go func(extIndex int) {
+				defer wg.Done()
+				prioritizedList, weight, err := extenders[extIndex].CustomPrioritize(pod, nodes)
+				if err != nil {
+					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
+					return
+				}
+				mu.Lock()
+				for i := range *prioritizedList {
+					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
+					if klog.V(10) {
+						klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
+					}
+					combinedScores[host] += score * float64(weight)
+				}
+				mu.Unlock()
+			}(i)
+		}
+		// wait for all go routines to finish
+		wg.Wait()
+		for i := range result {
+			result[i].Score += combinedScores[result[i].Host]
+		}
+	}
+
+	if klog.V(10) {
+		for i := range result {
+			klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
+		}
+	}
+	return result, nil
+}
+
+//------------------------------------------------------------------
+// --------------END-CUSTOM-BY-IWITA--------------------------------
+//------------------------------------------------------------------
+
 // EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodes
 func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
 	node := nodeInfo.Node()