custom_resource_allocation.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "os"
  18. _ "github.com/go-sql-driver/mysql"
  19. client "github.com/influxdata/influxdb1-client/v2"
  20. "gopkg.in/yaml.v2"
  21. "k8s.io/klog"
  22. )
  23. var (
  24. customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
  25. // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
  26. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
  27. // prioritizes based on the minimum of the average of the fraction of requested to capacity.
  28. //
  29. // Details:
  30. // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
  31. CustomRequestedPriorityMap = customResourcePriority.PriorityMap
  32. )
  33. type Config struct {
  34. Server struct {
  35. Port string `yaml:"port"`
  36. Host string `yaml:"host"`
  37. } `yaml:"server"`
  38. Database struct {
  39. Type string `yaml: "type"`
  40. Name string `yaml:"name"`
  41. Username string `yaml:"username"`
  42. Password string `yaml:"password"`
  43. } `yaml:"database"`
  44. }
  45. type System struct {
  46. ID int `json:"id"`
  47. Uuid string `json:"uuid"`
  48. numSockets int `json:"num_sockets"`
  49. numCores int `json:"num_cores`
  50. }
  51. var nodes = map[string]string{
  52. "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  53. "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  54. "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  55. "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  56. "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  57. "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  58. "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  59. "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  60. }
  61. func readFile(cfg *Config, file string) {
  62. f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
  63. if err != nil {
  64. panic(err.Error())
  65. }
  66. defer f.Close()
  67. decoder := yaml.NewDecoder(f)
  68. err = decoder.Decode(&cfg)
  69. if err != nil {
  70. panic(err.Error())
  71. }
  72. }
  73. func customResourceScorer(nodeName string) int64 {
  74. //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
  75. //customRequestedScore(requested.Memory, allocable.Memory)) / 2
  76. //read database information
  77. var cfg Config
  78. readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
  79. /*-------------------------------------
  80. //TODO read also nodes to uuid mappings
  81. -------------------------------------*/
  82. //Access the Database
  83. // DBstring := cfg.Database.Username + ":" + cfg.Database.Password + "@tcp(" + cfg.Server.Host + ":" + cfg.Server.Port + ")/" + cfg.Database.Name
  84. // _, err := sql.Open(cfg.Database.Type, DBstring)
  85. // if err != nil {
  86. // panic(err.Error())
  87. // }
  88. // InfluxDB
  89. c, err := client.NewHTTPClient(client.HTTPConfig{
  90. Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
  91. })
  92. if err != nil {
  93. klog.Infof("Error while creating InfluxDB client: %v ", err.Error())
  94. } else {
  95. klog.Infof("Connected Successfully to InfluxDB")
  96. }
  97. // close the connection in the end of execution
  98. defer c.Close()
  99. //Get the uuid of this node in order to query in the database
  100. curr_uuid, ok := nodes[nodeName]
  101. if ok {
  102. command := fmt.Sprintf("SELECT ipc from system_metrics where uuid = '%s' order by time desc limit 1", curr_uuid)
  103. q := client.NewQuery(command, "evolve", "")
  104. if response, err := c.Query(q); err == nil && response.Error() == nil {
  105. klog.Infof("%v", response.Results[0].Series[0].Values[0][1].(json.Number))
  106. } else if err != nil {
  107. klog.Infof("Error: %v", err)
  108. }
  109. } else {
  110. klog.Infof("Error finding the uuid: %v", ok)
  111. }
  112. // //Close the database connection in the end of the execution
  113. // defer db.Close()
  114. // //Get the uuid of this node in order to query in the database
  115. // curr_uuid, ok := nodes[nodeName]
  116. // //Get the metrics for the current node
  117. // if ok {
  118. // results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid)
  119. // if err != nil {
  120. // panic(err.Error()) // proper error handling instead of panic in your app
  121. // }
  122. // sys := System{}
  123. // sys.Uuid = curr_uuid
  124. // for results.Next() {
  125. // // for each row, scan the result into our tag composite object
  126. // err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores)
  127. // if err != nil {
  128. // panic(err.Error()) // proper error handling instead of panic in your app
  129. // }
  130. // // and then print out the tag's Name attribute
  131. // klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", nodeName, sys.ID, sys.numCores)
  132. // }
  133. // }
  134. res := customRequestedScore(nodeName)
  135. klog.Infof("Node name %s, has score %d\n", nodeName, res)
  136. return res
  137. }
  138. func customRequestedScore(nodeName string) int64 {
  139. if nodeName == "kube-01" {
  140. return 10
  141. }
  142. return 0
  143. }