custom_resource_allocation.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "k8s.io/klog"
  21. )
  22. var (
  23. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  24. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  25. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  26. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  27. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  28. //
  29. // Details:
  30. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  31. CustomRequestedPriorityMap = customResourcePriority.PriorityMap
  32. )
  33. func customScoreFn(si scorerInput) float64 {
  34. return si.metrics["ipc"] / si.metrics["mem_read"] * si.metrics["mem_write"]
  35. }
  36. func onlyIPC(metrics map[string]float64) float64 {
  37. return metrics["ipc"]
  38. }
  39. func onlyL3(metrics map[string]float64) float64 {
  40. return 1 / metrics["l3m"]
  41. }
  42. func onlyNrg(metrics map[string]float64) float64 {
  43. return 1 / metrics["procnrg"]
  44. }
  45. func calculateScore(si scorerInput,
  46. logicFn func(scorerInput) float64) float64 {
  47. res := logicFn(si)
  48. //klog.Infof("Has score (in float) %v\n", res)
  49. return res
  50. }
  51. func calculateWeightedAverage(response *client.Response,
  52. numberOfRows, numberOfMetrics int) (map[string]float64, error) {
  53. // initialize the metrics map with a constant size
  54. metrics := make(map[string]float64, numberOfMetrics)
  55. rows := response.Results[0].Series[0]
  56. for i := 1; i < len(rows.Columns); i++ {
  57. for j := 0; j < numberOfRows; j++ {
  58. val, err := rows.Values[j][i].(json.Number).Float64()
  59. if err != nil {
  60. klog.Infof("Error while calculating %v", rows.Columns[i])
  61. return nil, err
  62. }
  63. metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  64. }
  65. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  66. //klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  67. }
  68. // TODO better handling for the returning errors
  69. return metrics, nil
  70. }
  71. func queryInfluxDB(metrics []string, uuid string, socket,
  72. time int, cfg Config, c client.Client) (map[string]float64, error) {
  73. // calculate the number of rows needed
  74. // i.e. 20sec / 0.5s interval => 40rows
  75. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  76. // merge all the required columns
  77. columns := strings.Join(metrics, ", ")
  78. // build the coommand
  79. var command strings.Builder
  80. fmt.Fprintf(&command, "SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
  81. q := client.NewQuery(command.String(), cfg.Database.Name, "")
  82. response, err := c.Query(q)
  83. if err != nil {
  84. klog.Infof("Error while executing the query: %v", err.Error())
  85. return nil, err
  86. }
  87. // Calculate the average for the metrics provided
  88. return calculateWeightedAverage(response, numberOfRows, len(metrics))
  89. }
  90. func customResourceScorer(nodeName string) (float64, error) {
  91. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  92. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  93. //read database information
  94. var cfg Config
  95. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  96. if err != nil {
  97. return 0, err
  98. }
  99. /*-------------------------------------
  100. //TODO read also nodes to uuid mappings for EVOLVE
  101. -------------------------------------*/
  102. // InfluxDB
  103. c, err := connectToInfluxDB(cfg)
  104. if err != nil {
  105. return 0, err
  106. }
  107. // close the connection in the end of execution
  108. defer c.Close()
  109. //Get the uuid of this node in order to query in the database
  110. curr_uuid, ok := nodes[nodeName]
  111. socket, _ := sockets[nodeName]
  112. if ok {
  113. // Select Socket
  114. results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
  115. if err != nil {
  116. klog.Infof("Error in querying or calculating average: %v", err.Error())
  117. return 0, nil
  118. }
  119. res := calculateScore(scorerInput{metrics: results}, customScoreFn)
  120. // Select Node
  121. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  122. return res, nil
  123. } else {
  124. klog.Infof("Error finding the uuid: %v", ok)
  125. return 0, nil
  126. }
  127. }