custom_resource_allocation.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. /*
  2. Copyright 2020 Achilleas Tzenetopoulos
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Extension of the scheduler developed during my diploma thesis
  15. Make it pluggable with Kubernetes logic in scheduler extension
  16. Date started: 30/1/2020
  17. Using the scheduling framework and plugins logic
  18. */
  19. package noderesources
  20. import (
  21. "context"
  22. "database/sql"
  23. "fmt"
  24. _ "github.com/go-sql-driver/mysql"
  25. v1 "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/klog"
  28. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  29. "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  30. )
  31. // Custom allocation is a score plugin that favors nodes with a custom scoring function taking into account custom resources.
  32. type CustomAllocated struct {
  33. handle framework.FrameworkHandle
  34. resourceAllocationScorer
  35. //customResourceAllocationScorer
  36. }
  37. type nodeMetrics struct {
  38. L3cacheMisses int64
  39. memoryReads int64
  40. memoryWrites int64
  41. averageIPC float32
  42. }
  43. type System struct {
  44. ID int `json:"id"`
  45. Uuid string `json:"uuid"`
  46. numSockets int `json:"num_sockets"`
  47. numCores int `json:"num_cores`
  48. }
  49. type wrongValueError struct{}
  50. var _ = framework.ScorePlugin(&CustomAllocated{})
  51. //Map nodes to uuid
  52. //var nodes = make(map[string]string)
  53. var nodes = map[string]string{
  54. "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
  55. "kube-02": "2",
  56. "kube-03": "2",
  57. "kube-04": "2",
  58. }
  59. // CustomAllocatedName is the name of the plugin used in the plugin registry and configurations.
  60. const CustomAllocatedName = "NodeResourcesCustomAllocated"
  61. // Name returns name of the plugin. It is used in logs, etc.
  62. func (ca *CustomAllocated) Name() string {
  63. return CustomAllocatedName
  64. }
  65. func getCustomMetrics(name string) *nodeMetrics {
  66. //Open the database connection
  67. db, err := sql.Open("mysql", "client:password@tcp(147.102.37.161:3306)/evolve")
  68. if err != nil {
  69. panic(err.Error())
  70. }
  71. defer db.Close()
  72. curr_uuid, ok := nodes[name]
  73. nd := nodeMetrics{}
  74. if ok {
  75. results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid)
  76. if err != nil {
  77. panic(err.Error()) // proper error handling instead of panic in your app
  78. }
  79. sys := System{}
  80. sys.Uuid = curr_uuid
  81. for results.Next() {
  82. // for each row, scan the result into our tag composite object
  83. err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores)
  84. if err != nil {
  85. panic(err.Error()) // proper error handling instead of panic in your app
  86. }
  87. // and then print out the tag's Name attribute
  88. klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", name, sys.ID, sys.numCores)
  89. }
  90. }
  91. //nd := nodeMetrics{}
  92. if name == "kube-01" {
  93. nd.L3cacheMisses = 1
  94. nd.memoryReads = 1
  95. nd.memoryWrites = 1
  96. nd.averageIPC = 2.0
  97. } else {
  98. //some sample values for testing purposes
  99. nd.L3cacheMisses = 200
  100. nd.memoryReads = 50
  101. nd.memoryWrites = 60
  102. nd.averageIPC = 1.2
  103. }
  104. //query metrics for this node provided in the input
  105. if nd.L3cacheMisses < 0 || nd.memoryReads < 0 || nd.memoryWrites < 0 || nd.averageIPC < 0 {
  106. //err := errors.New("Failed to get the values")
  107. return nil
  108. }
  109. return &nd
  110. }
  111. func (ca *CustomAllocated) scoringFunction(pod *v1.Pod, node *nodeMetrics, nodeInfo *nodeinfo.NodeInfo) int64 {
  112. //rawScore := float32(node.memoryReads+node.memoryWrites) / node.averageIPC
  113. intScore := int64(float32(node.memoryReads+node.memoryWrites) / node.averageIPC)
  114. //Sample implementation in order to check functionality
  115. klog.Infof("Score: %d", intScore)
  116. return intScore
  117. }
  118. // Score invoked at the score extension point.
  119. func (ca *CustomAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  120. nodeInfo, err := ca.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  121. if err != nil {
  122. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  123. }
  124. //Search into the monitored metrics for the specified node
  125. nodeMetrics := getCustomMetrics(nodeName)
  126. // ca.score favors nodes with 'better' monitored resources score.
  127. // It calculates the percentage of micro-architecture metrics like L3 cache misses and/or IPC and memory bandwidth, and
  128. // prioritizes based on the lowest score provided by a custom scoring function.
  129. //
  130. // Details:
  131. // TODO
  132. klog.Infof("Node: %s", nodeName)
  133. return ca.scoringFunction(pod, nodeMetrics, nodeInfo), nil
  134. //return ca.score(pod, nodeInfo)
  135. }
  136. // ScoreExtensions of the Score plugin.
  137. func (ca *CustomAllocated) ScoreExtensions() framework.ScoreExtensions {
  138. return nil
  139. }
  140. // NewCustomAllocated initializes a new plugin and returns it.
  141. func NewCustomAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  142. return &CustomAllocated{
  143. handle: h,
  144. resourceAllocationScorer: resourceAllocationScorer{
  145. CustomAllocatedName,
  146. customResourceScorer,
  147. defaultRequestedRatioResources,
  148. },
  149. }, nil
  150. }
  151. func customResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
  152. //There I should call a function which will get my custom metrics
  153. var nodeScore, weightSum int64
  154. for resource, weight := range defaultRequestedRatioResources {
  155. resourceScore := customRequestedScore(requested[resource], allocable[resource])
  156. nodeScore += resourceScore * weight
  157. weightSum += weight
  158. }
  159. return nodeScore / weightSum
  160. }
  161. // The unused capacity is calculated on a scale of 0-10
  162. // 0 being the lowest priority and 10 being the highest.
  163. // The more unused resources the higher the score is.
  164. func customRequestedScore(requested, capacity int64) int64 {
  165. if capacity == 0 {
  166. return 0
  167. }
  168. if requested > capacity {
  169. return 0
  170. }
  171. return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
  172. }