custom_resource_allocation.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "github.com/iwita/kube-scheduler/customcache"
  21. "k8s.io/klog"
  22. )
  23. var (
  24. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  25. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  26. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  27. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  28. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  29. //
  30. // Details:
  31. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  32. CustomRequestedPriorityMap = customResourcePriority.PriorityMap
  33. )
  34. func customScoreFn(si scorerInput) float64 {
  35. rAndW := si.metrics["mem_read"] + si.metrics["mem_write"]
  36. if rAndW > 7 {
  37. return si.metrics["ipc"] / rAndW
  38. } else {
  39. return 2 / rAndW
  40. }
  41. }
  42. func onlyIPC(metrics map[string]float64) float64 {
  43. return metrics["ipc"]
  44. }
  45. func onlyL3(metrics map[string]float64) float64 {
  46. return 1 / metrics["l3m"]
  47. }
  48. func onlyNrg(metrics map[string]float64) float64 {
  49. return 1 / metrics["procnrg"]
  50. }
  51. func calculateScore(si scorerInput,
  52. logicFn func(scorerInput) float64) float64 {
  53. res := logicFn(si)
  54. //klog.Infof("Has score (in float) %v\n", res)
  55. return res
  56. }
  57. func calculateWeightedAverage(response *client.Response,
  58. numberOfRows, numberOfMetrics int) (map[string]float64, error) {
  59. // initialize the metrics map with a constant size
  60. metrics := make(map[string]float64, numberOfMetrics)
  61. rows := response.Results[0].Series[0]
  62. for i := 1; i < len(rows.Columns); i++ {
  63. for j := 0; j < numberOfRows; j++ {
  64. val, err := rows.Values[j][i].(json.Number).Float64()
  65. if err != nil {
  66. klog.Infof("Error while calculating %v", rows.Columns[i])
  67. return nil, err
  68. }
  69. metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  70. }
  71. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  72. //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  73. }
  74. // TODO better handling for the returning errors
  75. return metrics, nil
  76. }
  77. func customScoreInfluxDB(metrics []string, uuid string, socket,
  78. numberOfRows int, cfg Config, c client.Client) (map[string]float64, error) {
  79. // calculate the number of rows needed
  80. // i.e. 20sec / 0.5s interval => 40rows
  81. //numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  82. // merge all the required columns
  83. columns := strings.Join(metrics, ", ")
  84. // build the coommand
  85. var command strings.Builder
  86. fmt.Fprintf(&command, "SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
  87. //klog.Infof("%s", command.String())
  88. //q := client.NewQuery("select ipc from system_metrics", "evolve", "")
  89. q := client.NewQuery(command.String(), cfg.Database.Name, "")
  90. response, err := c.Query(q)
  91. if err != nil {
  92. klog.Infof("Error while executing the query: %v", err.Error())
  93. return nil, err
  94. }
  95. // Calculate the average for the metrics provided
  96. return calculateWeightedAverage(response, numberOfRows, len(metrics))
  97. }
  98. func InvalidateCache() {
  99. // Check if the cache needs update
  100. select {
  101. // clean the cache if 10 seconds are passed
  102. case <-customcache.LabCache.Timeout.C:
  103. klog.Infof("Time to erase")
  104. klog.Infof("Cache: %v", customcache.LabCache.Cache)
  105. //customcache.LabCache.Timeout.Stop()
  106. customcache.LabCache.CleanCache()
  107. default:
  108. }
  109. }
  110. func customResourceScorer(nodeName string) (float64, error) {
  111. //InvalidateCache()
  112. //klog.Infof("The value of the Ticker: %v", customcache.LabCache.Timeout.C)
  113. cores, _ := Cores[nodeName]
  114. var results map[string]float64
  115. // Check the cache
  116. customcache.LabCache.Mux.Lock()
  117. ipc, ok := customcache.LabCache.Cache[nodeName]["ipc"]
  118. if !ok {
  119. klog.Infof("IPC is nil")
  120. }
  121. reads, ok := customcache.LabCache.Cache[nodeName]["mem_read"]
  122. if !ok {
  123. klog.Infof("Memory Reads is nil")
  124. }
  125. writes, ok := customcache.LabCache.Cache[nodeName]["mem_write"]
  126. if !ok {
  127. klog.Infof("Memory Writes is nil")
  128. }
  129. c6res, ok := customcache.LabCache.Cache[nodeName]["c6res"]
  130. if !ok {
  131. klog.Infof("C6 state is nil")
  132. }
  133. customcache.LabCache.Mux.Unlock()
  134. // If the cache has value use it
  135. if ipc != -1 && reads != -1 && writes != -1 && c6res != -1 {
  136. results := map[string]float64{
  137. "ipc": ipc,
  138. "mem_read": reads,
  139. "mem_write": writes,
  140. "c6res": c6res,
  141. }
  142. klog.Infof("Found in the cache: ipc: %v, reads: %v, writes: %v", ipc, reads, writes)
  143. res := calculateScore(scorerInput{metrics: results}, customScoreFn)
  144. if sum := c6res * float64(len(cores)); sum < 1 {
  145. //klog.Infof("Average C6 is less than 1, so we get: %v", average["c6res"])
  146. res = res * c6res
  147. } else {
  148. res = res * 1
  149. }
  150. //Apply heterogeneity
  151. speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
  152. maxFreq := maxSpeed[Nodes[nodeName]]
  153. res = res * float64(speed) * float64(maxFreq)
  154. // Select Node
  155. klog.Infof("Using the cached values, Node name %s, has score %v\n", nodeName, res)
  156. return res, nil
  157. }
  158. //read database information
  159. var cfg Config
  160. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  161. if err != nil {
  162. return 0, err
  163. }
  164. /*-------------------------------------
  165. //TODO read also nodes to uuid mappings for EVOLVE
  166. -------------------------------------*/
  167. // InfluxDB
  168. c, err := connectToInfluxDB(cfg)
  169. if err != nil {
  170. return 0, err
  171. }
  172. // close the connection in the end of execution
  173. defer c.Close()
  174. //Get the uuid of this node in order to query in the database
  175. curr_uuid, ok := Nodes[nodeName]
  176. socket, _ := Sockets[nodeName]
  177. // cores, _ := Cores[nodeName]
  178. var socketNodes []string
  179. if ok {
  180. metrics := []string{"c6res"}
  181. time := 20
  182. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  183. // Define Core availability
  184. // Select all the nodes belonging to the current socket
  185. for kubenode, s := range Sockets {
  186. if s == socket && Nodes[kubenode] == curr_uuid {
  187. socketNodes = append(socketNodes, kubenode)
  188. }
  189. }
  190. sum := 0
  191. socketSum := 0.0
  192. socketCores := 0.0
  193. for _, snode := range socketNodes {
  194. currCores, _ := Cores[snode]
  195. r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, currCores)
  196. //r, err := queryInfluxDbSocket(metrics, curr_uuid, socket, numberOfRows, cfg, c)
  197. if err != nil {
  198. klog.Infof("Error in querying or calculating core availability in the first stage: %v", err.Error())
  199. }
  200. average, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(currCores))
  201. if err != nil {
  202. klog.Infof("Error defining core availability")
  203. }
  204. if average["c6res"]*float64(len(currCores)) >= 1 {
  205. klog.Infof("Node %v has C6 sum: %v", snode, average["c6res"]*float64(len(currCores)))
  206. sum++
  207. }
  208. socketSum += average["c6res"] * float64(len(currCores))
  209. socketCores += float64(len(currCores))
  210. }
  211. // Select Socket
  212. results, err = customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
  213. if err != nil {
  214. klog.Infof("Error in querying or calculating average for the custom score in the first stage: %v", err.Error())
  215. return 0, nil
  216. }
  217. res := calculateScore(scorerInput{metrics: results}, customScoreFn)
  218. //klog.Infof("Node: %v\t res before: %v", nodeName, res)
  219. if sum < 1 {
  220. klog.Infof("Less than 1 node is available\nC6contribution: %v", socketSum/socketCores)
  221. res = res * socketSum / socketCores
  222. } else {
  223. res = res * 1
  224. }
  225. //Update the cache with the new metrics
  226. err = customcache.LabCache.UpdateCache(results, socketSum/socketCores, nodeName)
  227. if err != nil {
  228. klog.Infof(err.Error())
  229. } else {
  230. klog.Infof("Cache updated successfully for %v", nodeName)
  231. }
  232. //Apply heterogeneity
  233. speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
  234. maxFreq := maxSpeed[Nodes[nodeName]]
  235. res = res * float64(speed) * float64(maxFreq)
  236. // Select Node
  237. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  238. return res, nil
  239. } else {
  240. klog.Infof("Error finding the uuid: %v", ok)
  241. return 0, nil
  242. }
  243. }
  244. // WARNING
  245. // c6res is not a dependable metric for isnpecting core availability
  246. // Some Systems use higher core states (e.g c7res)
  247. // func findAvailability(response *client.Response, numberOfMetrics, numberOfRows, numberOfCores int, floor float64) (map[string]float64, error) {
  248. // // initialize the metrics map with a constant size
  249. // metrics := make(map[string]float64, numberOfMetrics)
  250. // rows := response.Results[0].Series[0]
  251. // for i := 1; i < len(rows.Columns); i++ {
  252. // //klog.Infof("Name of column %v : %v\nrange of values: %v\nnumber of rows: %v\nnumber of cores %v\n", i, rows.Columns[i], len(rows.Values), numberOfRows, numberOfCores)
  253. // for j := 0; j < numberOfRows; j++ {
  254. // //avg, max := 0.0, 0.0
  255. // for k := 0; k < numberOfCores; k++ {
  256. // val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
  257. // if err != nil {
  258. // klog.Infof("Error while calculating %v", rows.Columns[i])
  259. // return false, err
  260. // }
  261. // // if val > floor {
  262. // // return true, nil
  263. // // }
  264. // // sum += val
  265. // //avg += val / float64(numberOfCores)
  266. // avg += val
  267. // }
  268. // metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
  269. // }
  270. // metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  271. // if metrics[row.Columns[i]] > 1 {
  272. // return true, nil
  273. // }
  274. // //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  275. // }
  276. // // TODO better handling for the returning errors
  277. // return false, nil
  278. // }