custom_resource_allocation.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "k8s.io/klog"
  21. "k8s.io/kubernetes/pkg/scheduler/customcache"
  22. )
  23. var (
  24. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  25. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  26. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  27. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  28. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  29. //
  30. // Details:
  31. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  32. CustomRequestedPriorityMap = customResourcePriority.PriorityMap
  33. )
  34. func customScoreFn(si scorerInput) float64 {
  35. return si.metrics["ipc"] / (si.metrics["mem_read"] + si.metrics["mem_write"])
  36. }
  37. func onlyIPC(metrics map[string]float64) float64 {
  38. return metrics["ipc"]
  39. }
  40. func onlyL3(metrics map[string]float64) float64 {
  41. return 1 / metrics["l3m"]
  42. }
  43. func onlyNrg(metrics map[string]float64) float64 {
  44. return 1 / metrics["procnrg"]
  45. }
  46. func calculateScore(si scorerInput,
  47. logicFn func(scorerInput) float64) float64 {
  48. res := logicFn(si)
  49. //klog.Infof("Has score (in float) %v\n", res)
  50. return res
  51. }
  52. func calculateWeightedAverage(response *client.Response,
  53. numberOfRows, numberOfMetrics int) (map[string]float64, error) {
  54. // initialize the metrics map with a constant size
  55. metrics := make(map[string]float64, numberOfMetrics)
  56. rows := response.Results[0].Series[0]
  57. for i := 1; i < len(rows.Columns); i++ {
  58. for j := 0; j < numberOfRows; j++ {
  59. val, err := rows.Values[j][i].(json.Number).Float64()
  60. if err != nil {
  61. klog.Infof("Error while calculating %v", rows.Columns[i])
  62. return nil, err
  63. }
  64. metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  65. }
  66. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  67. //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  68. }
  69. // TODO better handling for the returning errors
  70. return metrics, nil
  71. }
  72. func customScoreInfluxDB(metrics []string, uuid string, socket,
  73. numberOfRows int, cfg Config, c client.Client) (map[string]float64, error) {
  74. // calculate the number of rows needed
  75. // i.e. 20sec / 0.5s interval => 40rows
  76. //numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  77. // merge all the required columns
  78. columns := strings.Join(metrics, ", ")
  79. // build the coommand
  80. var command strings.Builder
  81. fmt.Fprintf(&command, "SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
  82. q := client.NewQuery(command.String(), cfg.Database.Name, "")
  83. response, err := c.Query(q)
  84. if err != nil {
  85. klog.Infof("Error while executing the query: %v", err.Error())
  86. return nil, err
  87. }
  88. // Calculate the average for the metrics provided
  89. return calculateWeightedAverage(response, numberOfRows, len(metrics))
  90. }
  91. func customResourceScorer(nodeName string) (float64, error) {
  92. cores, _ := Cores[nodeName]
  93. var results map[string]float64
  94. // Check the cache
  95. ipc, ok := customcache.LabCache.Cache[nodeName]["ipc"]
  96. if !ok {
  97. klog.Infof("IPC is nil")
  98. }
  99. reads, ok := customcache.LabCache.Cache[nodeName]["mem_read"]
  100. if !ok {
  101. klog.Infof("Memory Reads is nil")
  102. }
  103. writes, ok := customcache.LabCache.Cache[nodeName]["mem_write"]
  104. if !ok {
  105. klog.Infof("Memory Writes is nil")
  106. }
  107. c6res, ok := customcache.LabCache.Cache[nodeName]["c6res"]
  108. if !ok {
  109. klog.Infof("C6 state is nil")
  110. }
  111. // If the cache has value use it
  112. if ipc != -1 && reads != -1 && writes != -1 && c6res != -1 {
  113. results := map[string]float64{
  114. "ipc": ipc,
  115. "mem_read": reads,
  116. "mem_write": writes,
  117. }
  118. res := calculateScore(scorerInput{metrics: results}, customScoreFn)
  119. if sum := c6res * float64(len(cores)); sum < 1 {
  120. //klog.Infof("Average C6 is less than 1, so we get: %v", average["c6res"])
  121. res = res * c6res
  122. } else {
  123. res = res * 1
  124. }
  125. //Apply heterogeneity
  126. speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
  127. res = res * float64(speed)
  128. // Select Node
  129. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  130. return res, nil
  131. }
  132. //read database information
  133. var cfg Config
  134. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  135. if err != nil {
  136. return 0, err
  137. }
  138. /*-------------------------------------
  139. //TODO read also nodes to uuid mappings for EVOLVE
  140. -------------------------------------*/
  141. // InfluxDB
  142. c, err := connectToInfluxDB(cfg)
  143. if err != nil {
  144. return 0, err
  145. }
  146. // close the connection in the end of execution
  147. defer c.Close()
  148. //Get the uuid of this node in order to query in the database
  149. curr_uuid, ok := Nodes[nodeName]
  150. socket, _ := Sockets[nodeName]
  151. // cores, _ := Cores[nodeName]
  152. if ok {
  153. metrics := []string{"c6res"}
  154. time := 20
  155. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  156. // Define Core availability
  157. r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, cores)
  158. if err != nil {
  159. klog.Infof("Error in querying or calculating core availability in the first stage: %v", err.Error())
  160. }
  161. average, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(cores))
  162. if err != nil {
  163. klog.Infof("Error defining core availability")
  164. }
  165. // Select Socket
  166. results, err = customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
  167. if err != nil {
  168. klog.Infof("Error in querying or calculating average for the custom score in the first stage: %v", err.Error())
  169. return 0, nil
  170. }
  171. res := calculateScore(scorerInput{metrics: results}, customScoreFn)
  172. //klog.Infof("Node: %v\t res before: %v", nodeName, res)
  173. if sum := average["c6res"] * float64(len(cores)); sum < 1 {
  174. //klog.Infof("Average C6 is less than 1, so we get: %v", average["c6res"])
  175. res = res * average["c6res"]
  176. } else {
  177. res = res * 1
  178. }
  179. //Update the cache with the new metrics
  180. err = customcache.LabCache.UpdateCache(results, average["c6res"], nodeName)
  181. if err != nil {
  182. klog.Infof(err.Error())
  183. } else {
  184. klog.Infof("Cache updated successfully for %v", nodeName)
  185. }
  186. //Apply heterogeneity
  187. speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
  188. res = res * float64(speed)
  189. // Select Node
  190. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  191. return res, nil
  192. } else {
  193. klog.Infof("Error finding the uuid: %v", ok)
  194. return 0, nil
  195. }
  196. }
  197. // WARNING
  198. // c6res is not a dependable metric for isnpecting core availability
  199. // Some Systems use higher core states (e.g c7res)
  200. // func findAvailability(response *client.Response, numberOfMetrics, numberOfRows, numberOfCores int, floor float64) (map[string]float64, error) {
  201. // // initialize the metrics map with a constant size
  202. // metrics := make(map[string]float64, numberOfMetrics)
  203. // rows := response.Results[0].Series[0]
  204. // for i := 1; i < len(rows.Columns); i++ {
  205. // //klog.Infof("Name of column %v : %v\nrange of values: %v\nnumber of rows: %v\nnumber of cores %v\n", i, rows.Columns[i], len(rows.Values), numberOfRows, numberOfCores)
  206. // for j := 0; j < numberOfRows; j++ {
  207. // //avg, max := 0.0, 0.0
  208. // for k := 0; k < numberOfCores; k++ {
  209. // val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
  210. // if err != nil {
  211. // klog.Infof("Error while calculating %v", rows.Columns[i])
  212. // return false, err
  213. // }
  214. // // if val > floor {
  215. // // return true, nil
  216. // // }
  217. // // sum += val
  218. // //avg += val / float64(numberOfCores)
  219. // avg += val
  220. // }
  221. // metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
  222. // }
  223. // metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  224. // if metrics[row.Columns[i]] > 1 {
  225. // return true, nil
  226. // }
  227. // //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  228. // }
  229. // // TODO better handling for the returning errors
  230. // return false, nil
  231. // }