Browse Source

added in-memory

Achilleas Tzenetopoulos 5 years ago
parent
commit
00784d0d8d

+ 62 - 7
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -24,6 +24,7 @@ import (
 	_ "github.com/go-sql-driver/mysql"
 	client "github.com/influxdata/influxdb1-client/v2"
 	"k8s.io/klog"
+	"k8s.io/kubernetes/pkg/scheduler/customcache"
 )
 
 var (
@@ -106,8 +107,53 @@ func customScoreInfluxDB(metrics []string, uuid string, socket,
 }
 
 func customResourceScorer(nodeName string) (float64, error) {
-	//return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
-	//customRequestedScore(requested.Memory, allocable.Memory)) / 2
+
+	cores, _ := Cores[nodeName]
+
+	var results map[string]float64
+	// Check the cache
+	ipc, ok := customcache.LabCache.Cache[nodeName]["ipc"]
+	if !ok {
+		klog.Infof("IPC is nil")
+	}
+	reads, ok := customcache.LabCache.Cache[nodeName]["mem_read"]
+	if !ok {
+		klog.Infof("Memory Reads is nil")
+	}
+	writes, ok := customcache.LabCache.Cache[nodeName]["mem_write"]
+	if !ok {
+		klog.Infof("Memory Writes is nil")
+	}
+	c6res, ok := customcache.LabCache.Cache[nodeName]["c6res"]
+	if !ok {
+		klog.Infof("C6 state is nil")
+	}
+
+	// If the cache has value use it
+	if ipc != -1 && reads != -1 && writes != -1 && c6res != -1 {
+		results := map[string]float64{
+			"ipc":       ipc,
+			"mem_read":  reads,
+			"mem_write": writes,
+		}
+		res := calculateScore(scorerInput{metrics: results}, customScoreFn)
+
+		if sum := c6res * float64(len(cores)); sum < 1 {
+			//klog.Infof("Average C6 is less than 1, so we get: %v", average["c6res"])
+			res = res * c6res
+		} else {
+			res = res * 1
+		}
+
+		//Apply heterogeneity
+		speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
+		res = res * float64(speed)
+
+		// Select Node
+
+		klog.Infof("Node name %s, has score %v\n", nodeName, res)
+		return res, nil
+	}
 
 	//read database information
 	var cfg Config
@@ -129,9 +175,9 @@ func customResourceScorer(nodeName string) (float64, error) {
 	defer c.Close()
 
 	//Get the uuid of this node in order to query in the database
-	curr_uuid, ok := nodes[nodeName]
-	socket, _ := sockets[nodeName]
-	cores, _ := cores[nodeName]
+	curr_uuid, ok := Nodes[nodeName]
+	socket, _ := Sockets[nodeName]
+	// cores, _ := Cores[nodeName]
 
 	if ok {
 
@@ -151,11 +197,12 @@ func customResourceScorer(nodeName string) (float64, error) {
 		}
 
 		// Select Socket
-		results, err := customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
+		results, err = customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
 		if err != nil {
 			klog.Infof("Error in querying or calculating average for the custom score in the first stage: %v", err.Error())
 			return 0, nil
 		}
+
 		res := calculateScore(scorerInput{metrics: results}, customScoreFn)
 
 		//klog.Infof("Node: %v\t res before: %v", nodeName, res)
@@ -167,8 +214,16 @@ func customResourceScorer(nodeName string) (float64, error) {
 			res = res * 1
 		}
 
+		//Update the cache with the new metrics
+		err = customcache.LabCache.UpdateCache(results, average["c6res"], nodeName)
+		if err != nil {
+			klog.Infof(err.Error())
+		} else {
+			klog.Infof("Cache updated successfully for %v", nodeName)
+		}
+
 		//Apply heterogeneity
-		speed := links[nodes[nodeName]][0] * links[nodes[nodeName]][1]
+		speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
 		res = res * float64(speed)
 
 		// Select Node

+ 129 - 3
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/infrastructure.go

@@ -2,6 +2,7 @@ package priorities
 
 import (
 	"os"
+	"time"
 
 	client "github.com/influxdata/influxdb1-client/v2"
 	"gopkg.in/yaml.v2"
@@ -29,7 +30,132 @@ type Config struct {
 	} `yaml:"monitoring"`
 }
 
-var nodes = map[string]string{
+type Application struct {
+	Metrics  map[string]float64
+	Duration time.Duration
+}
+
+var Applications = map[string]Application{
+	"scikit-lasso": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.87,
+			"mem_read":  0.1753,
+			"mem_write": 0.008856,
+			"c6res":     0.003058,
+		},
+		Duration: 69 * time.Second,
+	},
+	"scikit-ada": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.10,
+			"mem_read":  0.09868,
+			"mem_write": 0.00669,
+			"c6res":     0,
+		},
+		Duration: 138 * time.Second,
+	},
+	"scikit-rfr": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.25,
+			"mem_read":  0.0228,
+			"mem_write": 0.00503,
+			"c6res":     0,
+		},
+		Duration: 115 * time.Second,
+	},
+	"scikit-rfc": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.802,
+			"mem_read":  0.02423,
+			"mem_write": 0.010603,
+			"c6res":     0,
+		},
+		Duration: 38 * time.Second,
+	},
+	"scikit-linregr": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.9464,
+			"mem_read":  0.040475,
+			"mem_write": 0.01974,
+			"c6res":     0.00928149,
+		},
+		Duration: 45 * time.Second,
+	},
+	"scikit-lda": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.9162,
+			"mem_read":  0.0541,
+			"mem_write": 0.029381,
+			"c6res":     0.003805,
+		},
+		Duration: 53 * time.Second,
+	},
+	"cloudsuite-data-serving-client": Application{
+		Metrics: map[string]float64{
+			"ipc":       0.6619,
+			"mem_read":  0,
+			"mem_write": 0,
+			"c6res":     44.48,
+		},
+		Duration: 72 * time.Second,
+	},
+	"cloudsuite-in-memory-analytics": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.3399,
+			"mem_read":  0.0052142,
+			"mem_write": 0.61361,
+			"c6res":     3.76196,
+		},
+		Duration: 60 * time.Second,
+	},
+	"cloudsuite-web-serving-client": Application{
+		Metrics: map[string]float64{
+			"ipc":       0.6619,
+			"mem_read":  0,
+			"mem_write": 0,
+			"c6res":     44.48,
+		},
+		Duration: 203 * time.Second,
+	},
+	"spec-sphinx": Application{
+		Metrics: map[string]float64{
+			"ipc":       2.035,
+			"mem_read":  0.0042372,
+			"mem_write": 0.0021131,
+			"c6res":     0.07497,
+		},
+		Duration: 592 * time.Second,
+	},
+	"spec-cactus": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.353,
+			"mem_read":  0.07105,
+			"mem_write": 0.0273161,
+			"c6res":     0.0532267,
+		},
+		Duration: 780 * time.Second,
+	},
+	"spec-astar": Application{
+		Metrics: map[string]float64{
+			"ipc":       0.86314,
+			"mem_read":  0.0063,
+			"mem_write": 0.0032874,
+			"c6res":     0.09115,
+		},
+		Duration: 468 * time.Second,
+	},
+	"spec-leslie": Application{
+		Metrics: map[string]float64{
+			"ipc":       1.5225,
+			"mem_read":  0.3221,
+			"mem_write": 0.1532,
+			"c6res":     0.1215,
+		},
+		Duration: 378 * time.Second,
+	},
+}
+
+var Nodes = map[string]string{
 	"kube-01": "e77467ad-636e-4e7e-8bc9-53e46ae51da1",
 	"kube-02": "e77467ad-636e-4e7e-8bc9-53e46ae51da1",
 	"kube-03": "e77467ad-636e-4e7e-8bc9-53e46ae51da1",
@@ -45,7 +171,7 @@ var links = map[string][]float32{
 	"c4766d29-4dc1-11ea-9d98-0242ac110002": []float32{2, 9.6},
 }
 
-var sockets = map[string]int{
+var Sockets = map[string]int{
 	"kube-01": 1,
 	"kube-02": 0,
 	"kube-03": 0,
@@ -56,7 +182,7 @@ var sockets = map[string]int{
 	"kube-08": 1,
 }
 
-var cores = map[string][]int{
+var Cores = map[string][]int{
 	"kube-01": []int{20, 21, 22, 23},
 	"kube-02": []int{2, 3, 4, 5, 6, 7, 8, 9},
 	"kube-03": []int{40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55},

+ 3 - 3
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/node_selection.go

@@ -129,9 +129,9 @@ func nodeSelectionScorer(nodeName string) (float64, error) {
 	defer c.Close()
 
 	//Get the uuid of this node in order to query in the database
-	curr_uuid, ok := nodes[nodeName]
-	socket, _ := sockets[nodeName]
-	cores, _ := cores[nodeName]
+	curr_uuid, ok := Nodes[nodeName]
+	socket, _ := Sockets[nodeName]
+	cores, _ := Cores[nodeName]
 	if len(cores) == 0 {
 		return 0.0, nil
 	}

+ 28 - 2
kubernetes-v1.15.4/pkg/scheduler/core/generic_scheduler.go

@@ -39,6 +39,7 @@ import (
 	"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
 	"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
 	schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
+	"k8s.io/kubernetes/pkg/scheduler/customcache"
 	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
 	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
 	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@@ -289,13 +290,38 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 	}
 	priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, nodePrioritizers, winningSocketNodes, g.extenders)
 
+	// The winner host
+	host, err := g.selectHost(priorityList)
+
+	winningSocket := priorities.Sockets[host]
+	winningUuid := priorities.Nodes[host]
+
+	var tmp []string
+	var socketNodes []string
+	for key, val := range priorities.Nodes {
+		if val == winningUuid {
+			tmp = append(tmp, key)
+		}
+	}
+	for n := range tmp {
+		if priorities.Sockets[n] == winningSocket {
+			socketNodes = append(socketNodes, n)
+		}
+	}
+
+	// Add pod's information (average metrics to the winning nodes metrics) and cache them
+	podName := pod.ObjectMeta.Name
+
+	for n := range winningSocket {
+		numCores := len(priorities.Cores[n])
+		customcache.LabCache.AddAppMetrics(priorities.Applications[podName].Metrics, n, numCores)
+	}
+
 	// -----------------------------------------------------
 	// ------------------END-CUSTOM-----------------------
 	// -----------------------------------------------------
 	//trace.Step("Selecting host")
 
-	host, err := g.selectHost(priorityList)
-
 	return ScheduleResult{
 		SuggestedHost:  host,
 		EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),

+ 147 - 0
kubernetes-v1.15.4/pkg/scheduler/customcache/cache.go

@@ -0,0 +1,147 @@
+package customcache
+
+import (
+	"sync"
+	"time"
+)
+
+var timeout *time.Ticker = time.NewTicker(time.Duration(10 * time.Second))
+
+type MlabCache struct {
+	Cache map[string]map[string]float64
+	Mux   sync.Mutex
+}
+
+var LabCache MlabCache = MlabCache{
+	Cache: map[string]map[string]float64{
+		"kube-01": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-02": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-03": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-04": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-05": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-06": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-07": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-08": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+	},
+}
+
+func (c *MlabCache) CleanCache() {
+	c.Mux.Lock()
+	c.Cache = map[string]map[string]float64{
+		"kube-01": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-02": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-03": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-04": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-05": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-06": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-07": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+		"kube-08": map[string]float64{
+			"ipc":       -1,
+			"mem_read":  -1,
+			"mem_write": -1,
+			"c6res":     -1,
+		},
+	}
+	c.Mux.Unlock()
+}
+
+func (c *MlabCache) UpdateCache(input map[string]float64, c6res float64, nodename string) error {
+	c.Mux.Lock()
+	c.Cache[nodename] = map[string]float64{
+		"ipc":       input["ipc"],
+		"mem_read":  input["mem_read"],
+		"mem_write": input["mem_write"],
+		"c6res":     input["c6res"],
+	}
+	c.Mux.Unlock()
+
+	return nil
+}
+
+func (c *MlabCache) AddAppMetrics(app map[string]float64, nodename string, numCores int) {
+	c.Mux.Lock()
+	c.Cache[nodename]["mem_read"] += app["mem_read"]
+	c.Cache[nodename]["mem_write"] += app["mem_write"]
+	//TODO
+	// handle c6res addition
+	c.Cache[nodename]["c6res"] -= (1 - app["c6res"]) / float64(100*numCores)
+
+	//TODO
+	// handle ipc addition
+	c.Mux.Unlock()
+}

+ 11 - 1
kubernetes-v1.15.4/pkg/scheduler/scheduler.go

@@ -24,7 +24,7 @@ import (
 
 	"k8s.io/klog"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/util/wait"
@@ -38,6 +38,7 @@ import (
 	latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
 	kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
 	"k8s.io/kubernetes/pkg/scheduler/core"
+	"k8s.io/kubernetes/pkg/scheduler/customcache"
 	"k8s.io/kubernetes/pkg/scheduler/factory"
 	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
 	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@@ -440,6 +441,15 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
 
 // scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
 func (sched *Scheduler) scheduleOne() {
+
+	// Check if the cache needs update
+	select {
+	// clean the cache if 10 seconds are passed
+	case <-timeout.C:
+		customcache.LabCache.CleanCache()
+	default:
+	}
+
 	fwk := sched.config.Framework
 
 	pod := sched.config.NextPod()

+ 2 - 0
kubernetes-v1.15.4/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go

@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"k8s.io/apimachinery/pkg/util/runtime"
+	"k8s.io/klog"
 )
 
 // For any test of the style:
@@ -149,6 +150,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
 
 		func() {
 			defer runtime.HandleCrash()
+			klog.Infof("Scheduler was Called\nTime: %v", time.Now())
 			f()
 		}()