custom_resource_allocation.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "os"
  18. "strings"
  19. _ "github.com/go-sql-driver/mysql"
  20. client "github.com/influxdata/influxdb1-client/v2"
  21. "gopkg.in/yaml.v2"
  22. "k8s.io/klog"
  23. )
  24. var (
  25. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  26. //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
  27. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  28. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  29. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  30. //
  31. // Details:
  32. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  33. CustomRequestedPriorityMap = customResourcePriority.PriorityMap
  34. )
  35. type Config struct {
  36. Server struct {
  37. Port string `yaml:"port"`
  38. Host string `yaml:"host"`
  39. } `yaml:"server"`
  40. Database struct {
  41. Type string `yaml:"type"`
  42. Name string `yaml:"name"`
  43. Username string `yaml:"username"`
  44. Password string `yaml:"password"`
  45. } `yaml:"database"`
  46. MonitoringSpecs struct {
  47. TimeInterval float32 `yaml:"interval"`
  48. } `yaml:"monitoring"`
  49. }
  50. // type Row struct {
  51. // ipc float32
  52. // l3m float32
  53. // reads float32
  54. // writes float32
  55. // c6res float32
  56. // }
  57. // type System struct {
  58. // ID int `json:"id"`
  59. // Uuid string `json:"uuid"`
  60. // numSockets int `json:"num_sockets"`
  61. // numCores int `json:"num_cores`
  62. // }
  63. // TODO:
  64. // place those maps in another file inside the package
  65. var nodes = map[string]string{
  66. "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  67. "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  68. "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  69. "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  70. "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  71. "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  72. "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  73. "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  74. }
  75. var sockets = map[string]int{
  76. "kube-01": 0,
  77. "kube-02": 0,
  78. "kube-03": 0,
  79. "kube-04": 0,
  80. "kube-05": 0,
  81. "kube-06": 1,
  82. "kube-07": 0,
  83. "kube-08": 1,
  84. }
  85. func readFile(cfg *Config, file string) error {
  86. f, err := os.Open(file)
  87. if err != nil {
  88. klog.Infof("Config file for scheduler not found. Error: %v", err)
  89. return err
  90. }
  91. defer f.Close()
  92. decoder := yaml.NewDecoder(f)
  93. err = decoder.Decode(&cfg)
  94. if err != nil {
  95. klog.Infof("Unable to decode the config file. Error: %v", err)
  96. return err
  97. }
  98. return nil
  99. }
  100. func customScoreFn(si scorerInput) float64 {
  101. return si.metrics["ipc"] / si.metrics["mem_read"] * si.metrics["mem_write"]
  102. }
  103. func onlyIPC(metrics map[string]float64) float64 {
  104. return metrics["ipc"]
  105. }
  106. func onlyL3(metrics map[string]float64) float64 {
  107. return 1 / metrics["l3m"]
  108. }
  109. func onlyNrg(metrics map[string]float64) float64 {
  110. return 1 / metrics["procnrg"]
  111. }
  112. func calculateScore(results map[string]float64,
  113. logicFn func(scorerInput) float64) float64 {
  114. res := logicFn(scorerInput{metrics: results})
  115. //klog.Infof("Has score (in float) %v\n", res)
  116. return res
  117. }
  118. func calculateWeightedAverage(response *client.Response,
  119. numberOfRows, numberOfMetrics int) (map[string]float64, error) {
  120. // initialize the metrics map with a constant size
  121. metrics := make(map[string]float64, numberOfMetrics)
  122. rows := response.Results[0].Series[0]
  123. for i := 1; i < len(rows.Columns); i++ {
  124. for j := 0; j < numberOfRows; j++ {
  125. val, err := rows.Values[j][i].(json.Number).Float64()
  126. if err != nil {
  127. klog.Infof("Error while calculating %v", rows.Columns[i])
  128. return nil, err
  129. }
  130. metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  131. }
  132. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  133. klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  134. }
  135. // TODO better handling for the returning errors
  136. return metrics, nil
  137. }
  138. func connectToInfluxDB(cfg Config) (client.Client, error) {
  139. c, err := client.NewHTTPClient(client.HTTPConfig{
  140. Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
  141. })
  142. if err != nil {
  143. klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
  144. return nil, err
  145. }
  146. klog.Infof("Connected Successfully to InfluxDB")
  147. return c, nil
  148. }
  149. func queryInfluxDB(metrics []string, uuid string, socket,
  150. time int, cfg Config, c client.Client) (map[string]float64, error) {
  151. // calculate the number of rows needed
  152. // i.e. 20sec / 0.5s interval => 40rows
  153. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  154. // merge all the required columns
  155. columns := strings.Join(metrics, ", ")
  156. // build the coommand
  157. command := fmt.Sprintf("SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
  158. q := client.NewQuery(command, cfg.Database.Name, "")
  159. response, err := c.Query(q)
  160. if err != nil {
  161. klog.Infof("Error while executing the query: %v", err.Error())
  162. return nil, err
  163. }
  164. // Calculate the average for the metrics provided
  165. return calculateWeightedAverage(response, numberOfRows, len(metrics))
  166. }
  167. func customResourceScorer(nodeName string) (float64, error) {
  168. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  169. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  170. //read database information
  171. var cfg Config
  172. err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  173. if err != nil {
  174. return 0, err
  175. }
  176. /*-------------------------------------
  177. //TODO read also nodes to uuid mappings for EVOLVE
  178. -------------------------------------*/
  179. // InfluxDB
  180. c, err := connectToInfluxDB(cfg)
  181. if err != nil {
  182. return 0, err
  183. }
  184. // close the connection in the end of execution
  185. defer c.Close()
  186. //Get the uuid of this node in order to query in the database
  187. curr_uuid, ok := nodes[nodeName]
  188. socket, _ := sockets[nodeName]
  189. if ok {
  190. // Select Socket
  191. results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
  192. if err != nil {
  193. klog.Infof("Error in querying or calculating average: %v", err.Error())
  194. return 0, nil
  195. }
  196. res := calculateScore(results, customScoreFn)
  197. // Select Node
  198. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  199. return res, nil
  200. } else {
  201. klog.Infof("Error finding the uuid: %v", ok)
  202. return 0, nil
  203. }
  204. }