node_selection.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "k8s.io/klog"
  21. )
  22. var (
  23. nodeSelectionPriority = &CustomAllocationPriority{"NodeSelection", nodeSelectionScorer}
  24. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  25. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  26. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  27. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  28. //
  29. // Details:
  30. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  31. NodeSelectionPriorityMap = nodeSelectionPriority.PriorityMap
  32. )
  33. // type Config struct {
  34. // Server struct {
  35. // Port string `yaml:"port"`
  36. // Host string `yaml:"host"`
  37. // } `yaml:"server"`
  38. // Database struct {
  39. // Type string `yaml:"type"`
  40. // Name string `yaml:"name"`
  41. // Username string `yaml:"username"`
  42. // Password string `yaml:"password"`
  43. // } `yaml:"database"`
  44. // MonitoringSpecs struct {
  45. // TimeInterval float32 `yaml:"interval"`
  46. // } `yaml:"monitoring"`
  47. // }
  48. // type Row struct {
  49. // ipc float32
  50. // l3m float32
  51. // reads float32
  52. // writes float32
  53. // c6res float32
  54. // }
  55. // type System struct {
  56. // ID int `json:"id"`
  57. // Uuid string `json:"uuid"`
  58. // numSockets int `json:"num_sockets"`
  59. // numCores int `json:"num_cores`
  60. // }
  61. // var nodes = map[string]string{
  62. // "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  63. // "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  64. // "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  65. // "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  66. // "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  67. // "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  68. // "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  69. // "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  70. // }
  71. // var sockets = map[string]int{
  72. // "kube-01": 0,
  73. // "kube-02": 0,
  74. // "kube-03": 0,
  75. // "kube-04": 0,
  76. // "kube-05": 0,
  77. // "kube-06": 1,
  78. // "kube-07": 0,
  79. // "kube-08": 1,
  80. // }
  81. var cores = map[string][]int{
  82. "kube-01": []int{},
  83. "kube-02": []int{},
  84. "kube-03": []int{},
  85. "kube-04": []int{},
  86. "kube-05": []int{0, 1, 2, 3},
  87. "kube-06": []int{12, 13, 14, 15, 16, 17, 18, 19},
  88. "kube-07": []int{4, 5, 6, 7, 8, 9, 10, 11, 24, 25, 26, 27, 28, 29, 30, 31},
  89. "kube-08": []int{20, 21, 22, 23, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47},
  90. }
  91. // func readFile(cfg *Config, file string) error {
  92. // f, err := os.Open(file)
  93. // if err != nil {
  94. // klog.Infof("Config file for scheduler not found. Error: %v", err)
  95. // return err
  96. // }
  97. // defer f.Close()
  98. // decoder := yaml.NewDecoder(f)
  99. // err = decoder.Decode(&cfg)
  100. // if err != nil {
  101. // klog.Infof("Unable to decode the config file. Error: %v", err)
  102. // return err
  103. // }
  104. // return nil
  105. // }
  106. type scorerInput struct {
  107. metricName string
  108. metrics map[string]float64
  109. }
  110. func OneScorer(si scorerInput) float64 {
  111. return si.metrics[si.metricName]
  112. }
  113. // func customScoreFn(metrics map[string]float64) float64 {
  114. // return metrics["ipc"] / metrics["mem_read"] * metrics["mem_write"]
  115. // }
  116. // func onlyIPC(metrics map[string]float64) float64 {
  117. // return metrics["ipc"]
  118. // }
  119. // func onlyL3(metrics map[string]float64) float64 {
  120. // return 1 / metrics["l3m"]
  121. // }
  122. // func onlyNrg(metrics map[string]float64) float64 {
  123. // return 1 / metrics["procnrg"]
  124. // }
  125. // func calculateScore(results map[string]float64,
  126. // logicFn func(map[string]float64) float64) float64 {
  127. // res := logicFn(results)
  128. // //klog.Infof("Has score (in float) %v\n", res)
  129. // return res
  130. // }
  131. func calculateWeightedAverageCores(response *client.Response,
  132. numberOfRows, numberOfMetrics, numberOfCores int) (map[string]float64, error) {
  133. // initialize the metrics map with a constant size
  134. metrics := make(map[string]float64, numberOfMetrics)
  135. rows := response.Results[0].Series[0]
  136. for i := 1; i < len(rows.Columns); i++ {
  137. klog.Infof("Name of column %v : %v\n range of values: %v", i, rows.Columns[i], len(rows.Values))
  138. for j := 0; j < numberOfRows; j++ {
  139. avg := 0.0
  140. for k := 0; k < numberOfCores; k++ {
  141. val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
  142. if err != nil {
  143. klog.Infof("Error while calculating %v", rows.Columns[i])
  144. return nil, err
  145. }
  146. //metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  147. avg += val
  148. }
  149. metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
  150. }
  151. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  152. klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  153. }
  154. // TODO better handling for the returning errors
  155. return metrics, nil
  156. }
  157. // func connectToInfluxDB(cfg Config) (client.Client, error) {
  158. // c, err := client.NewHTTPClient(client.HTTPConfig{
  159. // Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
  160. // })
  161. // if err != nil {
  162. // klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
  163. // return nil, err
  164. // }
  165. // klog.Infof("Connected Successfully to InfluxDB")
  166. // return c, nil
  167. // }
  168. // This function does the following:
  169. // 1. Queries the DB with the provided metrics and cores
  170. // 2. Calculates and returns the weighted average of each of those metrics
  171. func queryInfluxDbCores(metrics []string, uuid string, socket,
  172. time int, cfg Config, c client.Client, cores []int) (map[string]float64, error) {
  173. // calculate the number of rows needed
  174. // i.e. 20sec / 0.5s interval => 40rows
  175. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  176. // EDIT
  177. // This time we will fetch data for multiple cores
  178. // so we will need more rows, proportional to the core number
  179. numberOfRows *= len(cores)
  180. // merge all the required columns
  181. columns := strings.Join(metrics, ", ")
  182. // build the cores part of the command
  183. var coresPart strings.Builder
  184. fmt.Fprintf(&coresPart, "core_id='%d'", cores[0])
  185. for i := 1; i < len(cores); i++ {
  186. fmt.Fprintf(&coresPart, " or core_id='%d'", cores[i])
  187. }
  188. // build the coommand
  189. var command strings.Builder
  190. fmt.Fprintf(&command, "SELECT %s from core_metrics where uuid = '%s' and socket_id='%d' and %s order by time desc limit %d", columns, uuid, socket, coresPart.String(), numberOfRows)
  191. klog.Infof("The query is: %v", command.String())
  192. q := client.NewQuery(command.String(), cfg.Database.Name, "")
  193. response, err := c.Query(q)
  194. if err != nil {
  195. klog.Infof("Error while executing the query: %v", err.Error())
  196. return nil, err
  197. }
  198. // Calculate the average for the metrics provided
  199. return calculateWeightedAverageCores(response, numberOfRows, len(metrics), len(cores))
  200. }
  201. func nodeSelectionScorer(nodeName string) (float64, error) {
  202. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  203. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  204. //read database information
  205. var cfg Config
  206. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  207. if err != nil {
  208. return 0, err
  209. }
  210. /*-------------------------------------
  211. //TODO read also nodes to uuid mappings for EVOLVE
  212. -------------------------------------*/
  213. // InfluxDB
  214. c, err := connectToInfluxDB(cfg)
  215. if err != nil {
  216. return 0, err
  217. }
  218. // close the connection in the end of execution
  219. defer c.Close()
  220. //Get the uuid of this node in order to query in the database
  221. curr_uuid, ok := nodes[nodeName]
  222. socket, _ := sockets[nodeName]
  223. cores, _ := cores[nodeName]
  224. if len(cores) == 0 {
  225. return 0.0, nil
  226. }
  227. klog.Infof("Node %v has %v cores", nodeName, len(cores))
  228. if ok {
  229. // Select Socket
  230. results, err := queryInfluxDbCores([]string{"c6res"}, curr_uuid, socket, 20, cfg, c, cores)
  231. if err != nil {
  232. klog.Infof("Error in querying or calculating average: %v", err.Error())
  233. return 0, nil
  234. }
  235. res := calculateScore(results, OneScorer)
  236. // Select Node
  237. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  238. return res, nil
  239. } else {
  240. klog.Infof("Error finding the uuid: %v", ok)
  241. return 0, nil
  242. }
  243. }