service_affinity.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package serviceaffinity
  14. import (
  15. "context"
  16. "fmt"
  17. v1 "k8s.io/api/core/v1"
  18. "k8s.io/apimachinery/pkg/labels"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. corelisters "k8s.io/client-go/listers/core/v1"
  21. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  22. schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
  23. "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  24. )
  25. const (
  26. // Name is the name of the plugin used in the plugin registry and configurations.
  27. Name = "ServiceAffinity"
  28. // preFilterStateKey is the key in CycleState to ServiceAffinity pre-computed data.
  29. // Using the name of the plugin will likely help us avoid collisions with other plugins.
  30. preFilterStateKey = "PreFilter" + Name
  31. // ErrReason is used for CheckServiceAffinity predicate error.
  32. ErrReason = "node(s) didn't match service affinity"
  33. )
  34. // Args holds the args that are used to configure the plugin.
  35. type Args struct {
  36. // Labels are homogeneous for pods that are scheduled to a node.
  37. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
  38. // the same service are running on nodes with the exact same values for Labels).
  39. AffinityLabels []string `json:"labels,omitempty"`
  40. // AntiAffinityLabelsPreference are the labels to consider for service anti affinity scoring.
  41. AntiAffinityLabelsPreference []string `json:"antiAffinityLabelsPreference,omitempty"`
  42. }
  43. // preFilterState computed at PreFilter and used at Filter.
  44. type preFilterState struct {
  45. matchingPodList []*v1.Pod
  46. matchingPodServices []*v1.Service
  47. }
  48. // Clone the prefilter state.
  49. func (s *preFilterState) Clone() framework.StateData {
  50. if s == nil {
  51. return nil
  52. }
  53. copy := preFilterState{}
  54. copy.matchingPodServices = append([]*v1.Service(nil),
  55. s.matchingPodServices...)
  56. copy.matchingPodList = append([]*v1.Pod(nil),
  57. s.matchingPodList...)
  58. return &copy
  59. }
  60. // New initializes a new plugin and returns it.
  61. func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
  62. args := Args{}
  63. if err := framework.DecodeInto(plArgs, &args); err != nil {
  64. return nil, err
  65. }
  66. informerFactory := handle.SharedInformerFactory()
  67. serviceLister := informerFactory.Core().V1().Services().Lister()
  68. return &ServiceAffinity{
  69. sharedLister: handle.SnapshotSharedLister(),
  70. serviceLister: serviceLister,
  71. args: args,
  72. }, nil
  73. }
  74. // ServiceAffinity is a plugin that checks service affinity.
  75. type ServiceAffinity struct {
  76. args Args
  77. sharedLister schedulerlisters.SharedLister
  78. serviceLister corelisters.ServiceLister
  79. }
  80. var _ framework.PreFilterPlugin = &ServiceAffinity{}
  81. var _ framework.FilterPlugin = &ServiceAffinity{}
  82. var _ framework.ScorePlugin = &ServiceAffinity{}
  83. // Name returns name of the plugin. It is used in logs, etc.
  84. func (pl *ServiceAffinity) Name() string {
  85. return Name
  86. }
  87. func (pl *ServiceAffinity) createPreFilterState(pod *v1.Pod) (*preFilterState, error) {
  88. if pod == nil {
  89. return nil, fmt.Errorf("a pod is required to calculate service affinity preFilterState")
  90. }
  91. // Store services which match the pod.
  92. matchingPodServices, err := schedulerlisters.GetPodServices(pl.serviceLister, pod)
  93. if err != nil {
  94. return nil, fmt.Errorf("listing pod services: %v", err.Error())
  95. }
  96. selector := createSelectorFromLabels(pod.Labels)
  97. allMatches, err := pl.sharedLister.Pods().List(selector)
  98. if err != nil {
  99. return nil, fmt.Errorf("listing pods: %v", err.Error())
  100. }
  101. // consider only the pods that belong to the same namespace
  102. matchingPodList := filterPodsByNamespace(allMatches, pod.Namespace)
  103. return &preFilterState{
  104. matchingPodList: matchingPodList,
  105. matchingPodServices: matchingPodServices,
  106. }, nil
  107. }
  108. // PreFilter invoked at the prefilter extension point.
  109. func (pl *ServiceAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
  110. s, err := pl.createPreFilterState(pod)
  111. if err != nil {
  112. return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err))
  113. }
  114. cycleState.Write(preFilterStateKey, s)
  115. return nil
  116. }
  117. // PreFilterExtensions returns prefilter extensions, pod add and remove.
  118. func (pl *ServiceAffinity) PreFilterExtensions() framework.PreFilterExtensions {
  119. return pl
  120. }
  121. // AddPod from pre-computed data in cycleState.
  122. func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  123. s, err := getPreFilterState(cycleState)
  124. if err != nil {
  125. return framework.NewStatus(framework.Error, err.Error())
  126. }
  127. // If addedPod is in the same namespace as the pod, update the list
  128. // of matching pods if applicable.
  129. if podToAdd.Namespace != podToSchedule.Namespace {
  130. return nil
  131. }
  132. selector := createSelectorFromLabels(podToSchedule.Labels)
  133. if selector.Matches(labels.Set(podToAdd.Labels)) {
  134. s.matchingPodList = append(s.matchingPodList, podToAdd)
  135. }
  136. return nil
  137. }
  138. // RemovePod from pre-computed data in cycleState.
  139. func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  140. s, err := getPreFilterState(cycleState)
  141. if err != nil {
  142. return framework.NewStatus(framework.Error, err.Error())
  143. }
  144. if len(s.matchingPodList) == 0 ||
  145. podToRemove.Namespace != s.matchingPodList[0].Namespace {
  146. return nil
  147. }
  148. for i, pod := range s.matchingPodList {
  149. if pod.Name == podToRemove.Name && pod.Namespace == podToRemove.Namespace {
  150. s.matchingPodList = append(s.matchingPodList[:i], s.matchingPodList[i+1:]...)
  151. break
  152. }
  153. }
  154. return nil
  155. }
  156. func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
  157. c, err := cycleState.Read(preFilterStateKey)
  158. if err != nil {
  159. // preFilterState doesn't exist, likely PreFilter wasn't invoked.
  160. return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
  161. }
  162. if c == nil {
  163. return nil, nil
  164. }
  165. s, ok := c.(*preFilterState)
  166. if !ok {
  167. return nil, fmt.Errorf("%+v convert to interpodaffinity.state error", c)
  168. }
  169. return s, nil
  170. }
  171. // Filter matches nodes in such a way to force that
  172. // ServiceAffinity.labels are homogeneous for pods that are scheduled to a node.
  173. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
  174. // the same service are running on nodes with the exact same ServiceAffinity.label values).
  175. //
  176. // For example:
  177. // If the first pod of a service was scheduled to a node with label "region=foo",
  178. // all the other subsequent pods belong to the same service will be schedule on
  179. // nodes with the same "region=foo" label.
  180. //
  181. // Details:
  182. //
  183. // If (the svc affinity labels are not a subset of pod's label selectors )
  184. // The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate
  185. // the match.
  186. // Otherwise:
  187. // Create an "implicit selector" which guarantees pods will land on nodes with similar values
  188. // for the affinity labels.
  189. //
  190. // To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
  191. // These backfilled labels in the selector "L" are defined like so:
  192. // - L is a label that the ServiceAffinity object needs as a matching constraint.
  193. // - L is not defined in the pod itself already.
  194. // - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
  195. func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  196. if len(pl.args.AffinityLabels) == 0 {
  197. return nil
  198. }
  199. node := nodeInfo.Node()
  200. if node == nil {
  201. return framework.NewStatus(framework.Error, "node not found")
  202. }
  203. s, err := getPreFilterState(cycleState)
  204. if err != nil {
  205. return framework.NewStatus(framework.Error, err.Error())
  206. }
  207. pods, services := s.matchingPodList, s.matchingPodServices
  208. filteredPods := nodeInfo.FilterOutPods(pods)
  209. // check if the pod being scheduled has the affinity labels specified in its NodeSelector
  210. affinityLabels := findLabelsInSet(pl.args.AffinityLabels, labels.Set(pod.Spec.NodeSelector))
  211. // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
  212. if len(pl.args.AffinityLabels) > len(affinityLabels) {
  213. if len(services) > 0 {
  214. if len(filteredPods) > 0 {
  215. nodeWithAffinityLabels, err := pl.sharedLister.NodeInfos().Get(filteredPods[0].Spec.NodeName)
  216. if err != nil {
  217. return framework.NewStatus(framework.Error, "node not found")
  218. }
  219. addUnsetLabelsToMap(affinityLabels, pl.args.AffinityLabels, labels.Set(nodeWithAffinityLabels.Node().Labels))
  220. }
  221. }
  222. }
  223. // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
  224. if createSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
  225. return nil
  226. }
  227. return framework.NewStatus(framework.Unschedulable, ErrReason)
  228. }
  229. // Score invoked at the Score extension point.
  230. func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  231. nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
  232. if err != nil {
  233. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  234. }
  235. node := nodeInfo.Node()
  236. if node == nil {
  237. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
  238. }
  239. // Pods matched namespace,selector on current node.
  240. var selector labels.Selector
  241. if services, err := schedulerlisters.GetPodServices(pl.serviceLister, pod); err == nil && len(services) > 0 {
  242. selector = labels.SelectorFromSet(services[0].Spec.Selector)
  243. } else {
  244. selector = labels.NewSelector()
  245. }
  246. if len(nodeInfo.Pods()) == 0 || selector.Empty() {
  247. return 0, nil
  248. }
  249. var score int64
  250. for _, existingPod := range nodeInfo.Pods() {
  251. // Ignore pods being deleted for spreading purposes
  252. // Similar to how it is done for SelectorSpreadPriority
  253. if pod.Namespace == existingPod.Namespace && existingPod.DeletionTimestamp == nil {
  254. if selector.Matches(labels.Set(existingPod.Labels)) {
  255. score++
  256. }
  257. }
  258. }
  259. return score, nil
  260. }
  261. // NormalizeScore invoked after scoring all nodes.
  262. func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
  263. reduceResult := make([]float64, len(scores))
  264. for _, label := range pl.args.AntiAffinityLabelsPreference {
  265. if err := pl.updateNodeScoresForLabel(pl.sharedLister, scores, reduceResult, label); err != nil {
  266. return framework.NewStatus(framework.Error, err.Error())
  267. }
  268. }
  269. // Update the result after all labels have been evaluated.
  270. for i, nodeScore := range reduceResult {
  271. scores[i].Score = int64(nodeScore)
  272. }
  273. return nil
  274. }
  275. // updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the
  276. // original result from the map phase directly, but instead updates the reduceResult, which is used
  277. // to update the original result finally. This makes sure that each call to updateNodeScoresForLabel
  278. // receives the same mapResult to work with.
  279. // Why are doing this? This is a workaround for the migration from priorities to score plugins.
  280. // Historically the priority is designed to handle only one label, and multiple priorities are configured
  281. // to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore
  282. // we need to modify the old priority to be able to handle multiple labels so that it can be mapped
  283. // to a single plugin.
  284. // TODO: This will be deprecated soon.
  285. func (pl *ServiceAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error {
  286. var numServicePods int64
  287. var labelValue string
  288. podCounts := map[string]int64{}
  289. labelNodesStatus := map[string]string{}
  290. maxPriorityFloat64 := float64(framework.MaxNodeScore)
  291. for _, nodePriority := range mapResult {
  292. numServicePods += nodePriority.Score
  293. nodeInfo, err := sharedLister.NodeInfos().Get(nodePriority.Name)
  294. if err != nil {
  295. return err
  296. }
  297. if !labels.Set(nodeInfo.Node().Labels).Has(label) {
  298. continue
  299. }
  300. labelValue = labels.Set(nodeInfo.Node().Labels).Get(label)
  301. labelNodesStatus[nodePriority.Name] = labelValue
  302. podCounts[labelValue] += nodePriority.Score
  303. }
  304. //score int - scale of 0-maxPriority
  305. // 0 being the lowest priority and maxPriority being the highest
  306. for i, nodePriority := range mapResult {
  307. labelValue, ok := labelNodesStatus[nodePriority.Name]
  308. if !ok {
  309. continue
  310. }
  311. // initializing to the default/max node score of maxPriority
  312. fScore := maxPriorityFloat64
  313. if numServicePods > 0 {
  314. fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods))
  315. }
  316. // The score of current label only accounts for 1/len(s.labels) of the total score.
  317. // The policy API definition only allows a single label to be configured, associated with a weight.
  318. // This is compensated by the fact that the total weight is the sum of all weights configured
  319. // in each policy config.
  320. reduceResult[i] += fScore / float64(len(pl.args.AntiAffinityLabelsPreference))
  321. }
  322. return nil
  323. }
  324. // ScoreExtensions of the Score plugin.
  325. func (pl *ServiceAffinity) ScoreExtensions() framework.ScoreExtensions {
  326. return pl
  327. }
  328. // addUnsetLabelsToMap backfills missing values with values we find in a map.
  329. func addUnsetLabelsToMap(aL map[string]string, labelsToAdd []string, labelSet labels.Set) {
  330. for _, l := range labelsToAdd {
  331. // if the label is already there, dont overwrite it.
  332. if _, exists := aL[l]; exists {
  333. continue
  334. }
  335. // otherwise, backfill this label.
  336. if labelSet.Has(l) {
  337. aL[l] = labelSet.Get(l)
  338. }
  339. }
  340. }
  341. // createSelectorFromLabels is used to define a selector that corresponds to the keys in a map.
  342. func createSelectorFromLabels(aL map[string]string) labels.Selector {
  343. if len(aL) == 0 {
  344. return labels.Everything()
  345. }
  346. return labels.Set(aL).AsSelector()
  347. }
  348. // filterPodsByNamespace filters pods outside a namespace from the given list.
  349. func filterPodsByNamespace(pods []*v1.Pod, ns string) []*v1.Pod {
  350. filtered := []*v1.Pod{}
  351. for _, nsPod := range pods {
  352. if nsPod.Namespace == ns {
  353. filtered = append(filtered, nsPod)
  354. }
  355. }
  356. return filtered
  357. }
  358. // findLabelsInSet gets as many key/value pairs as possible out of a label set.
  359. func findLabelsInSet(labelsToKeep []string, selector labels.Set) map[string]string {
  360. aL := make(map[string]string)
  361. for _, l := range labelsToKeep {
  362. if selector.Has(l) {
  363. aL[l] = selector.Get(l)
  364. }
  365. }
  366. return aL
  367. }