custom_resource_allocation.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "os"
  18. "strings"
  19. _ "github.com/go-sql-driver/mysql"
  20. client "github.com/influxdata/influxdb1-client/v2"
  21. "gopkg.in/yaml.v2"
  22. "k8s.io/klog"
  23. )
  24. var (
  25. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  26. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  27. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  28. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  29. //
  30. // Details:
  31. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  32. CustomRequestedPriorityMap = customResourcePriority.CustomPriorityMap
  33. )
  34. type Config struct {
  35. Server struct {
  36. Port string `yaml:"port"`
  37. Host string `yaml:"host"`
  38. } `yaml:"server"`
  39. Database struct {
  40. Type string `yaml:"type"`
  41. Name string `yaml:"name"`
  42. Username string `yaml:"username"`
  43. Password string `yaml:"password"`
  44. } `yaml:"database"`
  45. MonitoringSpecs struct {
  46. TimeInterval float32 `yaml:"interval"`
  47. } `yaml:"monitoring"`
  48. }
  49. // type Row struct {
  50. // ipc float32
  51. // l3m float32
  52. // reads float32
  53. // writes float32
  54. // c6res float32
  55. // }
  56. // type System struct {
  57. // ID int `json:"id"`
  58. // Uuid string `json:"uuid"`
  59. // numSockets int `json:"num_sockets"`
  60. // numCores int `json:"num_cores`
  61. // }
  62. var nodes = map[string]string{
  63. "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  64. "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  65. "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  66. "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  67. "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  68. "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  69. "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  70. "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  71. }
  72. var sockets = map[string]int{
  73. "kube-01": 0,
  74. "kube-02": 0,
  75. "kube-03": 0,
  76. "kube-04": 0,
  77. "kube-05": 0,
  78. "kube-06": 1,
  79. "kube-07": 0,
  80. "kube-08": 1,
  81. }
  82. func readFile(cfg *Config, file string) {
  83. f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
  84. if err != nil {
  85. panic(err.Error())
  86. }
  87. defer f.Close()
  88. decoder := yaml.NewDecoder(f)
  89. err = decoder.Decode(&cfg)
  90. if err != nil {
  91. panic(err.Error())
  92. }
  93. }
  94. func customScoreFn(metrics map[string]float64) float64 {
  95. return metrics["mem_read"] * metrics["mem_write"] / metrics["ipc"]
  96. }
  97. func calculateScore(results map[string]float64,
  98. logicFn func(map[string]float64) float64) float64 {
  99. res := logicFn(results)
  100. klog.Infof("Has score (in float) %v\n", res)
  101. // TODO
  102. // While the final score should be an integer,
  103. // find a solution about resolving the float prflduced
  104. return res
  105. }
  106. func calculateWeightedAverage(response *client.Response,
  107. numberOfRows int, numberOfMetrics int) (map[string]float64, error) {
  108. // initialize the metrics map with a constant size
  109. metrics := make(map[string]float64, numberOfMetrics)
  110. rows := response.Results[0].Series[0]
  111. for i := 1; i < len(rows.Columns); i++ {
  112. for j := 0; j < numberOfRows; j++ {
  113. val, err := rows.Values[j][i].(json.Number).Float64()
  114. if err != nil {
  115. klog.Infof("Error while calculating %v", rows.Columns[i])
  116. return nil, err
  117. }
  118. metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
  119. }
  120. metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
  121. klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
  122. }
  123. // TODO better handling for the returning errors
  124. return metrics, nil
  125. }
  126. func connectToInfluxDB(cfg Config) (client.Client, error) {
  127. c, err := client.NewHTTPClient(client.HTTPConfig{
  128. Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
  129. })
  130. if err != nil {
  131. klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
  132. return nil, err
  133. }
  134. klog.Infof("Connected Successfully to InfluxDB")
  135. return c, nil
  136. }
  137. func queryInfluxDB(metrics []string, uuid string, socket int,
  138. time int, cfg Config, c client.Client) (map[string]float64, error) {
  139. // calculate the number of rows needed
  140. // i.e. 20sec / 0.5s interval => 40rows
  141. numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
  142. // merge all the required columns
  143. columns := strings.Join(metrics, ", ")
  144. // build the coommand
  145. command := fmt.Sprintf("SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
  146. q := client.NewQuery(command, cfg.Database.Name, "")
  147. response, err := c.Query(q)
  148. if err != nil {
  149. klog.Infof("Error while executing the query: %v", err.Error())
  150. return nil, err
  151. }
  152. // Calculate the average for the metrics provided
  153. return calculateWeightedAverage(response, numberOfRows, len(metrics))
  154. }
  155. func customResourceScorer(nodeName string) (float64, error) {
  156. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  157. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  158. //read database information
  159. var cfg Config
  160. readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  161. /*-------------------------------------
  162. //TODO read also nodes to uuid mappings
  163. -------------------------------------*/
  164. // InfluxDB
  165. c, err := connectToInfluxDB(cfg)
  166. if err != nil {
  167. return 0, err
  168. }
  169. // close the connection in the end of execution
  170. defer c.Close()
  171. //Get the uuid of this node in order to query in the database
  172. curr_uuid, ok := nodes[nodeName]
  173. socket, _ := sockets[nodeName]
  174. if ok {
  175. results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
  176. if err != nil {
  177. klog.Infof("Error in querying or calculating average: %v", err.Error())
  178. return 0, nil
  179. }
  180. res := calculateScore(results, customScoreFn)
  181. klog.Infof("Node name %s, has score %v\n", nodeName, res)
  182. return res, nil
  183. } else {
  184. klog.Infof("Error finding the uuid: %v", ok)
  185. return 0, nil
  186. }
  187. // //Close the database connection in the end of the execution
  188. // defer db.Close()
  189. // //Get the uuid of this node in order to query in the database
  190. // curr_uuid, ok := nodes[nodeName]
  191. // //Get the metrics for the current node
  192. // if ok {
  193. // results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid)
  194. // if err != nil {
  195. // panic(err.Error()) // proper error handling instead of panic in your app
  196. // }
  197. // sys := System{}
  198. // sys.Uuid = curr_uuid
  199. // for results.Next() {
  200. // // for each row, scan the result into our tag composite object
  201. // err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores)
  202. // if err != nil {
  203. // panic(err.Error()) // proper error handling instead of panic in your app
  204. // }
  205. // // and then print out the tag's Name attribute
  206. // klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", nodeName, sys.ID, sys.numCores)
  207. // }
  208. // }
  209. // res := customRequestedScore(nodeName)
  210. // klog.Infof("Node name %s, has score %d\n", nodeName, res)
  211. // return res, nil
  212. }
  213. func customRequestedScore(nodeName string) int64 {
  214. if nodeName == "kube-01" {
  215. return 10
  216. }
  217. return 0
  218. }