custom_resource_allocation.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Extension of the scheduler developed during my diploma thesis
  15. Make it pluggable with Kubernetes logic in scheduler extension
  16. Date started: 30/1/2020
  17. Using the scheduling framework and plugins logic
  18. */
  19. package noderesources
  20. import (
  21. "context"
  22. "database/sql"
  23. "fmt"
  24. "os"
  25. _ "github.com/go-sql-driver/mysql"
  26. "gopkg.in/yaml.v2"
  27. v1 "k8s.io/api/core/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/klog"
  30. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  31. "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  32. )
  33. type Config struct {
  34. Server struct {
  35. Port string `yaml:"port"`
  36. Host string `yaml:"host"`
  37. } `yaml:"server"`
  38. Database struct {
  39. Type string `yaml: "type"`
  40. Name string `yaml:"name"`
  41. Username string `yaml:"username"`
  42. Password string `yaml:"password"`
  43. } `yaml:"database"`
  44. }
  45. // Custom allocation is a score plugin that favors nodes with a custom scoring function taking into account custom resources.
  46. type CustomAllocated struct {
  47. handle framework.FrameworkHandle
  48. resourceAllocationScorer
  49. //customResourceAllocationScorer
  50. }
  51. type nodeMetrics struct {
  52. L3cacheMisses int64
  53. memoryReads int64
  54. memoryWrites int64
  55. averageIPC float32
  56. }
  57. type System struct {
  58. ID int `json:"id"`
  59. Uuid string `json:"uuid"`
  60. numSockets int `json:"num_sockets"`
  61. numCores int `json:"num_cores`
  62. }
  63. type wrongValueError struct{}
  64. var _ = framework.ScorePlugin(&CustomAllocated{})
  65. //Map nodes to uuid
  66. //var nodes = make(map[string]string)
  67. var nodes = map[string]string{
  68. "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  69. "kube-02": "2",
  70. "kube-03": "2",
  71. "kube-04": "2",
  72. }
  73. // CustomAllocatedName is the name of the plugin used in the plugin registry and configurations.
  74. const CustomAllocatedName = "NodeResourcesCustomAllocated"
  75. // Name returns name of the plugin. It is used in logs, etc.
  76. func (ca *CustomAllocated) Name() string {
  77. return CustomAllocatedName
  78. }
  79. func readFile(cfg *Config) {
  80. f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
  81. if err != nil {
  82. panic(err.Error())
  83. }
  84. defer f.Close()
  85. decoder := yaml.NewDecoder(f)
  86. err = decoder.Decode(&cfg)
  87. if err != nil {
  88. panic(err.Error())
  89. }
  90. }
  91. func getCustomMetrics(name string) *nodeMetrics {
  92. var cfg Config
  93. readFile(&cfg)
  94. //Open the database connection
  95. DBstring := cfg.Database.Username + ":" + cfg.Database.Password + "@tcp(" + cfg.Server.Host + ":" + cfg.Server.Port + ")/" + cfg.Database.Name
  96. db, err := sql.Open(cfg.Database.Type, DBstring)
  97. if err != nil {
  98. panic(err.Error())
  99. }
  100. defer db.Close()
  101. curr_uuid, ok := nodes[name]
  102. nd := nodeMetrics{}
  103. if ok {
  104. results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid)
  105. if err != nil {
  106. panic(err.Error()) // proper error handling instead of panic in your app
  107. }
  108. sys := System{}
  109. sys.Uuid = curr_uuid
  110. for results.Next() {
  111. // for each row, scan the result into our tag composite object
  112. err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores)
  113. if err != nil {
  114. panic(err.Error()) // proper error handling instead of panic in your app
  115. }
  116. // and then print out the tag's Name attribute
  117. klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", name, sys.ID, sys.numCores)
  118. }
  119. }
  120. //nd := nodeMetrics{}
  121. if name == "kube-01" {
  122. nd.L3cacheMisses = 1
  123. nd.memoryReads = 1
  124. nd.memoryWrites = 1
  125. nd.averageIPC = 2.0
  126. } else {
  127. //some sample values for testing purposes
  128. nd.L3cacheMisses = 200
  129. nd.memoryReads = 50
  130. nd.memoryWrites = 60
  131. nd.averageIPC = 1.2
  132. }
  133. //query metrics for this node provided in the input
  134. if nd.L3cacheMisses < 0 || nd.memoryReads < 0 || nd.memoryWrites < 0 || nd.averageIPC < 0 {
  135. //err := errors.New("Failed to get the values")
  136. return nil
  137. }
  138. return &nd
  139. }
  140. func (ca *CustomAllocated) scoringFunction(pod *v1.Pod, node *nodeMetrics, nodeInfo *nodeinfo.NodeInfo) int64 {
  141. //rawScore := float32(node.memoryReads+node.memoryWrites) / node.averageIPC
  142. intScore := int64(float32(node.memoryReads+node.memoryWrites) / node.averageIPC)
  143. //Sample implementation in order to check functionality
  144. klog.Infof("Score: %d", intScore)
  145. return intScore
  146. }
  147. // Score invoked at the score extension point.
  148. func (ca *CustomAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  149. nodeInfo, err := ca.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  150. if err != nil {
  151. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  152. }
  153. //Search into the monitored metrics for the specified node
  154. nodeMetrics := getCustomMetrics(nodeName)
  155. // ca.score favors nodes with 'better' monitored resources score.
  156. // It calculates the percentage of micro-architecture metrics like L3 cache misses and/or IPC and memory bandwidth, and
  157. // prioritizes based on the lowest score provided by a custom scoring function.
  158. //
  159. // Details:
  160. // TODO
  161. klog.Infof("Node: %s", nodeName)
  162. return ca.scoringFunction(pod, nodeMetrics, nodeInfo), nil
  163. //return ca.score(pod, nodeInfo)
  164. }
  165. // ScoreExtensions of the Score plugin.
  166. func (ca *CustomAllocated) ScoreExtensions() framework.ScoreExtensions {
  167. return nil
  168. }
  169. // NewCustomAllocated initializes a new plugin and returns it.
  170. func NewCustomAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  171. return &CustomAllocated{
  172. handle: h,
  173. resourceAllocationScorer: resourceAllocationScorer{
  174. CustomAllocatedName,
  175. customResourceScorer,
  176. defaultRequestedRatioResources,
  177. },
  178. }, nil
  179. }
  180. func customResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
  181. //There I should call a function which will get my custom metrics
  182. var nodeScore, weightSum int64
  183. for resource, weight := range defaultRequestedRatioResources {
  184. resourceScore := customRequestedScore(requested[resource], allocable[resource])
  185. nodeScore += resourceScore * weight
  186. weightSum += weight
  187. }
  188. return nodeScore / weightSum
  189. }
  190. // The unused capacity is calculated on a scale of 0-10
  191. // 0 being the lowest priority and 10 being the highest.
  192. // The more unused resources the higher the score is.
  193. func customRequestedScore(requested, capacity int64) int64 {
  194. if capacity == 0 {
  195. return 0
  196. }
  197. if requested > capacity {
  198. return 0
  199. }
  200. return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
  201. }