node_selection.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "k8s.io/klog"
  21. )
  22. var (
  23. nodeSelectionPriority = &CustomAllocationPriority{"NodeSelection", nodeSelectionScorer}
  24. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  25. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  26. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  27. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  28. //
  29. // Details:
  30. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  31. NodeSelectionPriorityMap = nodeSelectionPriority.PriorityMap
  32. )
  33. func OneScorer(si scorerInput) float64 {
  34. return 100 - si.metrics[si.metricName]
  35. }
  36. func calculateWeightedAverageCores(response *client.Response,
  37. numberOfRows, numberOfMetrics, numberOfCores int) (map[string]float64, error) {
  38. // initialize the metrics map with a constant size
  39. metrics := make(map[string]float64, numberOfMetrics)
  40. rows := response.Results[0].Series[0]
  41. for i := 1; i < len(rows.Columns); i++ {
  42. //klog.Infof("Name of column %v : %v\nrange of values: %v\nnumber of rows: %v\nnumber of cores %v\n", i, rows.Columns[i], len(rows.Values), numberOfRows, numberOfCores)
  43. for j := 0; j < numberOfRows; j++ {
  44. avg := 0.0
  45. for k := 0; k < numberOfCores; k++ {
  46. val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
  47. if err != nil {
  48. klog.Infof("Error while calculating %v", rows.Columns[i])
  49. return nil, err
  50. }
  51. //metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  52. avg += val / float64(numberOfCores)
  53. }
  54. metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
  55. }
  56. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  57. //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  58. }
  59. // TODO better handling for the returning errors
  60. return metrics, nil
  61. }
  62. // This function does the following:
  63. // 1. Queries the DB with the provided metrics and cores
  64. // 2. Calculates and returns the weighted average of each of those metrics
  65. func queryInfluxDbCores(metrics []string, uuid string, socket,
  66. numberOfRows int, cfg Config, c client.Client, cores []int) (*client.Response, error) {
  67. // calculate the number of rows needed
  68. // i.e. 20sec / 0.2s interval => 100rows
  69. //numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  70. // EDIT
  71. // This time we will fetch data for multiple cores
  72. // so we will need more rows, proportional to the core number
  73. // merge all the required columns
  74. columns := strings.Join(metrics, ", ")
  75. // build the cores part of the command
  76. var coresPart strings.Builder
  77. fmt.Fprintf(&coresPart, "core_id='%d'", cores[0])
  78. for i := 1; i < len(cores); i++ {
  79. fmt.Fprintf(&coresPart, " or core_id='%d'", cores[i])
  80. }
  81. // build the coommand
  82. var command strings.Builder
  83. fmt.Fprintf(&command, "SELECT %s from core_metrics where uuid = '%s' and socket_id='%d' and %s order by time desc limit %d", columns, uuid, socket, coresPart.String(), numberOfRows*len(cores))
  84. //klog.Infof("The query is: %v", command.String())
  85. q := client.NewQuery(command.String(), cfg.Database.Name, "")
  86. response, err := c.Query(q)
  87. if err != nil {
  88. klog.Infof("Error while executing the query: %v", err.Error())
  89. return nil, err
  90. }
  91. // Calculate the average for the metrics provided
  92. return response, nil
  93. }
  94. func nodeSelectionScorer(nodeName string) (float64, error) {
  95. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  96. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  97. //read database information
  98. var cfg Config
  99. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  100. if err != nil {
  101. return 0, err
  102. }
  103. /*-------------------------------------
  104. //TODO read also nodes to uuid mappings for EVOLVE
  105. -------------------------------------*/
  106. // InfluxDB
  107. c, err := connectToInfluxDB(cfg)
  108. if err != nil {
  109. return 0, err
  110. }
  111. // close the connection in the end of execution
  112. defer c.Close()
  113. //Get the uuid of this node in order to query in the database
  114. curr_uuid, ok := nodes[nodeName]
  115. socket, _ := sockets[nodeName]
  116. cores, _ := cores[nodeName]
  117. if len(cores) == 0 {
  118. return 0.0, nil
  119. }
  120. //klog.Infof("Node %v has %v cores", nodeName, len(cores))
  121. if ok {
  122. metrics := []string{"c6res"}
  123. time := 20
  124. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  125. // Select Socket
  126. r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, cores)
  127. if err != nil {
  128. klog.Infof("Error in querying or calculating average: %v", err.Error())
  129. return 0, nil
  130. }
  131. results, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(cores))
  132. res := calculateScore(scorerInput{metricName: "c6res", metrics: results}, OneScorer) * float64(len(cores))
  133. // Select Node
  134. klog.Infof("Node name %s, Score %v\n", nodeName, res)
  135. return res, nil
  136. } else {
  137. klog.Infof("Error finding the uuid: %v", ok)
  138. return 0, nil
  139. }
  140. }