selector_spreading.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package priorities
  14. import (
  15. "fmt"
  16. "k8s.io/api/core/v1"
  17. "k8s.io/apimachinery/pkg/labels"
  18. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  19. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  20. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  21. utilnode "k8s.io/kubernetes/pkg/util/node"
  22. "k8s.io/klog"
  23. )
  24. // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
  25. // TODO: Any way to justify this weighting?
  26. const zoneWeighting float64 = 2.0 / 3.0
  27. // SelectorSpread contains information to calculate selector spread priority.
  28. type SelectorSpread struct {
  29. serviceLister algorithm.ServiceLister
  30. controllerLister algorithm.ControllerLister
  31. replicaSetLister algorithm.ReplicaSetLister
  32. statefulSetLister algorithm.StatefulSetLister
  33. }
  34. // NewSelectorSpreadPriority creates a SelectorSpread.
  35. func NewSelectorSpreadPriority(
  36. serviceLister algorithm.ServiceLister,
  37. controllerLister algorithm.ControllerLister,
  38. replicaSetLister algorithm.ReplicaSetLister,
  39. statefulSetLister algorithm.StatefulSetLister) (PriorityMapFunction, PriorityReduceFunction) {
  40. selectorSpread := &SelectorSpread{
  41. serviceLister: serviceLister,
  42. controllerLister: controllerLister,
  43. replicaSetLister: replicaSetLister,
  44. statefulSetLister: statefulSetLister,
  45. }
  46. return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
  47. }
  48. // CalculateSpreadPriorityMap spreads pods across hosts, considering pods
  49. // belonging to the same service,RC,RS or StatefulSet.
  50. // When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,
  51. // then finds existing pods that match those selectors.
  52. // It favors nodes that have fewer existing matching pods.
  53. // i.e. it pushes the scheduler towards a node where there's the smallest number of
  54. // pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
  55. func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
  56. var selectors []labels.Selector
  57. node := nodeInfo.Node()
  58. if node == nil {
  59. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
  60. }
  61. priorityMeta, ok := meta.(*priorityMetadata)
  62. if ok {
  63. selectors = priorityMeta.podSelectors
  64. } else {
  65. selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
  66. }
  67. if len(selectors) == 0 {
  68. return schedulerapi.HostPriority{
  69. Host: node.Name,
  70. Score: int(0),
  71. }, nil
  72. }
  73. count := countMatchingPods(pod.Namespace, selectors, nodeInfo)
  74. return schedulerapi.HostPriority{
  75. Host: node.Name,
  76. Score: count,
  77. }, nil
  78. }
  79. // CalculateSpreadPriorityReduce calculates the source of each node
  80. // based on the number of existing matching pods on the node
  81. // where zone information is included on the nodes, it favors nodes
  82. // in zones with fewer existing matching pods.
  83. func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.HostPriorityList) error {
  84. countsByZone := make(map[string]int, 10)
  85. maxCountByZone := int(0)
  86. maxCountByNodeName := int(0)
  87. for i := range result {
  88. if result[i].Score > maxCountByNodeName {
  89. maxCountByNodeName = result[i].Score
  90. }
  91. zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
  92. if zoneID == "" {
  93. continue
  94. }
  95. countsByZone[zoneID] += result[i].Score
  96. }
  97. for zoneID := range countsByZone {
  98. if countsByZone[zoneID] > maxCountByZone {
  99. maxCountByZone = countsByZone[zoneID]
  100. }
  101. }
  102. haveZones := len(countsByZone) != 0
  103. maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
  104. maxCountByZoneFloat64 := float64(maxCountByZone)
  105. MaxPriorityFloat64 := float64(schedulerapi.MaxPriority)
  106. for i := range result {
  107. // initializing to the default/max node score of maxPriority
  108. fScore := MaxPriorityFloat64
  109. if maxCountByNodeName > 0 {
  110. fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
  111. }
  112. // If there is zone information present, incorporate it
  113. if haveZones {
  114. zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
  115. if zoneID != "" {
  116. zoneScore := MaxPriorityFloat64
  117. if maxCountByZone > 0 {
  118. zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
  119. }
  120. fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
  121. }
  122. }
  123. result[i].Score = int(fScore)
  124. if klog.V(10) {
  125. klog.Infof(
  126. "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore),
  127. )
  128. }
  129. }
  130. return nil
  131. }
  132. // ServiceAntiAffinity contains information to calculate service anti-affinity priority.
  133. type ServiceAntiAffinity struct {
  134. podLister algorithm.PodLister
  135. serviceLister algorithm.ServiceLister
  136. label string
  137. }
  138. // NewServiceAntiAffinityPriority creates a ServiceAntiAffinity.
  139. func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) (PriorityMapFunction, PriorityReduceFunction) {
  140. antiAffinity := &ServiceAntiAffinity{
  141. podLister: podLister,
  142. serviceLister: serviceLister,
  143. label: label,
  144. }
  145. return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce
  146. }
  147. // Classifies nodes into ones with labels and without labels.
  148. func (s *ServiceAntiAffinity) getNodeClassificationByLabels(nodes []*v1.Node) (map[string]string, []string) {
  149. labeledNodes := map[string]string{}
  150. nonLabeledNodes := []string{}
  151. for _, node := range nodes {
  152. if labels.Set(node.Labels).Has(s.label) {
  153. label := labels.Set(node.Labels).Get(s.label)
  154. labeledNodes[node.Name] = label
  155. } else {
  156. nonLabeledNodes = append(nonLabeledNodes, node.Name)
  157. }
  158. }
  159. return labeledNodes, nonLabeledNodes
  160. }
  161. // countMatchingPods cout pods based on namespace and matching all selectors
  162. func countMatchingPods(namespace string, selectors []labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
  163. if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || len(selectors) == 0 {
  164. return 0
  165. }
  166. count := 0
  167. for _, pod := range nodeInfo.Pods() {
  168. // Ignore pods being deleted for spreading purposes
  169. // Similar to how it is done for SelectorSpreadPriority
  170. if namespace == pod.Namespace && pod.DeletionTimestamp == nil {
  171. matches := true
  172. for _, selector := range selectors {
  173. if !selector.Matches(labels.Set(pod.Labels)) {
  174. matches = false
  175. break
  176. }
  177. }
  178. if matches {
  179. count++
  180. }
  181. }
  182. }
  183. return count
  184. }
  185. // CalculateAntiAffinityPriorityMap spreads pods by minimizing the number of pods belonging to the same service
  186. // on given machine
  187. func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
  188. var firstServiceSelector labels.Selector
  189. node := nodeInfo.Node()
  190. if node == nil {
  191. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
  192. }
  193. priorityMeta, ok := meta.(*priorityMetadata)
  194. if ok {
  195. firstServiceSelector = priorityMeta.podFirstServiceSelector
  196. } else {
  197. firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
  198. }
  199. //pods matched namespace,selector on current node
  200. var selectors []labels.Selector
  201. if firstServiceSelector != nil {
  202. selectors = append(selectors, firstServiceSelector)
  203. }
  204. score := countMatchingPods(pod.Namespace, selectors, nodeInfo)
  205. return schedulerapi.HostPriority{
  206. Host: node.Name,
  207. Score: score,
  208. }, nil
  209. }
  210. // CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label.
  211. // The label to be considered is provided to the struct (ServiceAntiAffinity).
  212. func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.HostPriorityList) error {
  213. var numServicePods int
  214. var label string
  215. podCounts := map[string]int{}
  216. labelNodesStatus := map[string]string{}
  217. maxPriorityFloat64 := float64(schedulerapi.MaxPriority)
  218. for _, hostPriority := range result {
  219. numServicePods += hostPriority.Score
  220. if !labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Has(s.label) {
  221. continue
  222. }
  223. label = labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Get(s.label)
  224. labelNodesStatus[hostPriority.Host] = label
  225. podCounts[label] += hostPriority.Score
  226. }
  227. //score int - scale of 0-maxPriority
  228. // 0 being the lowest priority and maxPriority being the highest
  229. for i, hostPriority := range result {
  230. label, ok := labelNodesStatus[hostPriority.Host]
  231. if !ok {
  232. result[i].Host = hostPriority.Host
  233. result[i].Score = int(0)
  234. continue
  235. }
  236. // initializing to the default/max node score of maxPriority
  237. fScore := maxPriorityFloat64
  238. if numServicePods > 0 {
  239. fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[label]) / float64(numServicePods))
  240. }
  241. result[i].Host = hostPriority.Host
  242. result[i].Score = int(fScore)
  243. }
  244. return nil
  245. }