service_affinity.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package serviceaffinity
  14. import (
  15. "context"
  16. "fmt"
  17. v1 "k8s.io/api/core/v1"
  18. "k8s.io/apimachinery/pkg/labels"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. corelisters "k8s.io/client-go/listers/core/v1"
  21. "k8s.io/klog"
  22. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  23. schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
  24. "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  25. )
  26. const (
  27. // Name is the name of the plugin used in the plugin registry and configurations.
  28. Name = "ServiceAffinity"
  29. // preFilterStateKey is the key in CycleState to ServiceAffinity pre-computed data.
  30. // Using the name of the plugin will likely help us avoid collisions with other plugins.
  31. preFilterStateKey = "PreFilter" + Name
  32. // ErrReason is used for CheckServiceAffinity predicate error.
  33. ErrReason = "node(s) didn't match service affinity"
  34. )
  35. // Args holds the args that are used to configure the plugin.
  36. type Args struct {
  37. // Labels are homogeneous for pods that are scheduled to a node.
  38. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
  39. // the same service are running on nodes with the exact same values for Labels).
  40. AffinityLabels []string `json:"labels,omitempty"`
  41. // AntiAffinityLabelsPreference are the labels to consider for service anti affinity scoring.
  42. AntiAffinityLabelsPreference []string `json:"antiAffinityLabelsPreference,omitempty"`
  43. }
  44. // preFilterState computed at PreFilter and used at Filter.
  45. type preFilterState struct {
  46. matchingPodList []*v1.Pod
  47. matchingPodServices []*v1.Service
  48. }
  49. // Clone the prefilter state.
  50. func (s *preFilterState) Clone() framework.StateData {
  51. if s == nil {
  52. return nil
  53. }
  54. copy := preFilterState{}
  55. copy.matchingPodServices = append([]*v1.Service(nil),
  56. s.matchingPodServices...)
  57. copy.matchingPodList = append([]*v1.Pod(nil),
  58. s.matchingPodList...)
  59. return &copy
  60. }
  61. // New initializes a new plugin and returns it.
  62. func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
  63. args := Args{}
  64. if err := framework.DecodeInto(plArgs, &args); err != nil {
  65. return nil, err
  66. }
  67. informerFactory := handle.SharedInformerFactory()
  68. serviceLister := informerFactory.Core().V1().Services().Lister()
  69. return &ServiceAffinity{
  70. sharedLister: handle.SnapshotSharedLister(),
  71. serviceLister: serviceLister,
  72. args: args,
  73. }, nil
  74. }
  75. // ServiceAffinity is a plugin that checks service affinity.
  76. type ServiceAffinity struct {
  77. args Args
  78. sharedLister schedulerlisters.SharedLister
  79. serviceLister corelisters.ServiceLister
  80. }
  81. var _ framework.PreFilterPlugin = &ServiceAffinity{}
  82. var _ framework.FilterPlugin = &ServiceAffinity{}
  83. var _ framework.ScorePlugin = &ServiceAffinity{}
  84. // Name returns name of the plugin. It is used in logs, etc.
  85. func (pl *ServiceAffinity) Name() string {
  86. return Name
  87. }
  88. func (pl *ServiceAffinity) createPreFilterState(pod *v1.Pod) (*preFilterState, error) {
  89. if pod == nil {
  90. return nil, fmt.Errorf("a pod is required to calculate service affinity preFilterState")
  91. }
  92. // Store services which match the pod.
  93. matchingPodServices, err := schedulerlisters.GetPodServices(pl.serviceLister, pod)
  94. if err != nil {
  95. return nil, fmt.Errorf("listing pod services: %v", err.Error())
  96. }
  97. selector := createSelectorFromLabels(pod.Labels)
  98. allMatches, err := pl.sharedLister.Pods().List(selector)
  99. if err != nil {
  100. return nil, fmt.Errorf("listing pods: %v", err.Error())
  101. }
  102. // consider only the pods that belong to the same namespace
  103. matchingPodList := filterPodsByNamespace(allMatches, pod.Namespace)
  104. return &preFilterState{
  105. matchingPodList: matchingPodList,
  106. matchingPodServices: matchingPodServices,
  107. }, nil
  108. }
  109. // PreFilter invoked at the prefilter extension point.
  110. func (pl *ServiceAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
  111. s, err := pl.createPreFilterState(pod)
  112. if err != nil {
  113. return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err))
  114. }
  115. cycleState.Write(preFilterStateKey, s)
  116. return nil
  117. }
  118. // PreFilterExtensions returns prefilter extensions, pod add and remove.
  119. func (pl *ServiceAffinity) PreFilterExtensions() framework.PreFilterExtensions {
  120. return pl
  121. }
  122. // AddPod from pre-computed data in cycleState.
  123. func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  124. s, err := getPreFilterState(cycleState)
  125. if err != nil {
  126. return framework.NewStatus(framework.Error, err.Error())
  127. }
  128. // If addedPod is in the same namespace as the pod, update the list
  129. // of matching pods if applicable.
  130. if s == nil || podToAdd.Namespace != podToSchedule.Namespace {
  131. return nil
  132. }
  133. selector := createSelectorFromLabels(podToSchedule.Labels)
  134. if selector.Matches(labels.Set(podToAdd.Labels)) {
  135. s.matchingPodList = append(s.matchingPodList, podToAdd)
  136. }
  137. return nil
  138. }
  139. // RemovePod from pre-computed data in cycleState.
  140. func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  141. s, err := getPreFilterState(cycleState)
  142. if err != nil {
  143. return framework.NewStatus(framework.Error, err.Error())
  144. }
  145. if s == nil ||
  146. len(s.matchingPodList) == 0 ||
  147. podToRemove.Namespace != s.matchingPodList[0].Namespace {
  148. return nil
  149. }
  150. for i, pod := range s.matchingPodList {
  151. if pod.Name == podToRemove.Name && pod.Namespace == podToRemove.Namespace {
  152. s.matchingPodList = append(s.matchingPodList[:i], s.matchingPodList[i+1:]...)
  153. break
  154. }
  155. }
  156. return nil
  157. }
  158. func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
  159. c, err := cycleState.Read(preFilterStateKey)
  160. if err != nil {
  161. // The metadata wasn't pre-computed in prefilter. We ignore the error for now since
  162. // Filter is able to handle that by computing it again.
  163. klog.V(5).Infof(fmt.Sprintf("reading %q from cycleState: %v", preFilterStateKey, err))
  164. return nil, nil
  165. }
  166. if c == nil {
  167. return nil, nil
  168. }
  169. s, ok := c.(*preFilterState)
  170. if !ok {
  171. return nil, fmt.Errorf("%+v convert to interpodaffinity.state error", c)
  172. }
  173. return s, nil
  174. }
  175. // Filter matches nodes in such a way to force that
  176. // ServiceAffinity.labels are homogeneous for pods that are scheduled to a node.
  177. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
  178. // the same service are running on nodes with the exact same ServiceAffinity.label values).
  179. //
  180. // For example:
  181. // If the first pod of a service was scheduled to a node with label "region=foo",
  182. // all the other subsequent pods belong to the same service will be schedule on
  183. // nodes with the same "region=foo" label.
  184. //
  185. // Details:
  186. //
  187. // If (the svc affinity labels are not a subset of pod's label selectors )
  188. // The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate
  189. // the match.
  190. // Otherwise:
  191. // Create an "implicit selector" which guarantees pods will land on nodes with similar values
  192. // for the affinity labels.
  193. //
  194. // To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
  195. // These backfilled labels in the selector "L" are defined like so:
  196. // - L is a label that the ServiceAffinity object needs as a matching constraint.
  197. // - L is not defined in the pod itself already.
  198. // - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
  199. func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  200. if len(pl.args.AffinityLabels) == 0 {
  201. return nil
  202. }
  203. node := nodeInfo.Node()
  204. if node == nil {
  205. return framework.NewStatus(framework.Error, "node not found")
  206. }
  207. s, err := getPreFilterState(cycleState)
  208. if err != nil {
  209. return framework.NewStatus(framework.Error, err.Error())
  210. }
  211. if s == nil {
  212. // Make the filter resilient in case preFilterState is missing.
  213. s, err = pl.createPreFilterState(pod)
  214. if err != nil {
  215. return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err))
  216. }
  217. }
  218. pods, services := s.matchingPodList, s.matchingPodServices
  219. filteredPods := nodeInfo.FilterOutPods(pods)
  220. // check if the pod being scheduled has the affinity labels specified in its NodeSelector
  221. affinityLabels := findLabelsInSet(pl.args.AffinityLabels, labels.Set(pod.Spec.NodeSelector))
  222. // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
  223. if len(pl.args.AffinityLabels) > len(affinityLabels) {
  224. if len(services) > 0 {
  225. if len(filteredPods) > 0 {
  226. nodeWithAffinityLabels, err := pl.sharedLister.NodeInfos().Get(filteredPods[0].Spec.NodeName)
  227. if err != nil {
  228. return framework.NewStatus(framework.Error, "node not found")
  229. }
  230. addUnsetLabelsToMap(affinityLabels, pl.args.AffinityLabels, labels.Set(nodeWithAffinityLabels.Node().Labels))
  231. }
  232. }
  233. }
  234. // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
  235. if createSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
  236. return nil
  237. }
  238. return framework.NewStatus(framework.Unschedulable, ErrReason)
  239. }
  240. // Score invoked at the Score extension point.
  241. func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  242. nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
  243. if err != nil {
  244. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  245. }
  246. node := nodeInfo.Node()
  247. if node == nil {
  248. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
  249. }
  250. // Pods matched namespace,selector on current node.
  251. var selector labels.Selector
  252. if services, err := schedulerlisters.GetPodServices(pl.serviceLister, pod); err == nil && len(services) > 0 {
  253. selector = labels.SelectorFromSet(services[0].Spec.Selector)
  254. } else {
  255. selector = labels.NewSelector()
  256. }
  257. if len(nodeInfo.Pods()) == 0 || selector.Empty() {
  258. return 0, nil
  259. }
  260. var score int64
  261. for _, existingPod := range nodeInfo.Pods() {
  262. // Ignore pods being deleted for spreading purposes
  263. // Similar to how it is done for SelectorSpreadPriority
  264. if pod.Namespace == existingPod.Namespace && existingPod.DeletionTimestamp == nil {
  265. if selector.Matches(labels.Set(existingPod.Labels)) {
  266. score++
  267. }
  268. }
  269. }
  270. return score, nil
  271. }
  272. // NormalizeScore invoked after scoring all nodes.
  273. func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
  274. reduceResult := make([]float64, len(scores))
  275. for _, label := range pl.args.AntiAffinityLabelsPreference {
  276. if err := pl.updateNodeScoresForLabel(pl.sharedLister, scores, reduceResult, label); err != nil {
  277. return framework.NewStatus(framework.Error, err.Error())
  278. }
  279. }
  280. // Update the result after all labels have been evaluated.
  281. for i, nodeScore := range reduceResult {
  282. scores[i].Score = int64(nodeScore)
  283. }
  284. return nil
  285. }
  286. // updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the
  287. // original result from the map phase directly, but instead updates the reduceResult, which is used
  288. // to update the original result finally. This makes sure that each call to updateNodeScoresForLabel
  289. // receives the same mapResult to work with.
  290. // Why are doing this? This is a workaround for the migration from priorities to score plugins.
  291. // Historically the priority is designed to handle only one label, and multiple priorities are configured
  292. // to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore
  293. // we need to modify the old priority to be able to handle multiple labels so that it can be mapped
  294. // to a single plugin.
  295. // TODO: This will be deprecated soon.
  296. func (pl *ServiceAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error {
  297. var numServicePods int64
  298. var labelValue string
  299. podCounts := map[string]int64{}
  300. labelNodesStatus := map[string]string{}
  301. maxPriorityFloat64 := float64(framework.MaxNodeScore)
  302. for _, nodePriority := range mapResult {
  303. numServicePods += nodePriority.Score
  304. nodeInfo, err := sharedLister.NodeInfos().Get(nodePriority.Name)
  305. if err != nil {
  306. return err
  307. }
  308. if !labels.Set(nodeInfo.Node().Labels).Has(label) {
  309. continue
  310. }
  311. labelValue = labels.Set(nodeInfo.Node().Labels).Get(label)
  312. labelNodesStatus[nodePriority.Name] = labelValue
  313. podCounts[labelValue] += nodePriority.Score
  314. }
  315. //score int - scale of 0-maxPriority
  316. // 0 being the lowest priority and maxPriority being the highest
  317. for i, nodePriority := range mapResult {
  318. labelValue, ok := labelNodesStatus[nodePriority.Name]
  319. if !ok {
  320. continue
  321. }
  322. // initializing to the default/max node score of maxPriority
  323. fScore := maxPriorityFloat64
  324. if numServicePods > 0 {
  325. fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods))
  326. }
  327. // The score of current label only accounts for 1/len(s.labels) of the total score.
  328. // The policy API definition only allows a single label to be configured, associated with a weight.
  329. // This is compensated by the fact that the total weight is the sum of all weights configured
  330. // in each policy config.
  331. reduceResult[i] += fScore / float64(len(pl.args.AntiAffinityLabelsPreference))
  332. }
  333. return nil
  334. }
  335. // ScoreExtensions of the Score plugin.
  336. func (pl *ServiceAffinity) ScoreExtensions() framework.ScoreExtensions {
  337. return pl
  338. }
  339. // addUnsetLabelsToMap backfills missing values with values we find in a map.
  340. func addUnsetLabelsToMap(aL map[string]string, labelsToAdd []string, labelSet labels.Set) {
  341. for _, l := range labelsToAdd {
  342. // if the label is already there, dont overwrite it.
  343. if _, exists := aL[l]; exists {
  344. continue
  345. }
  346. // otherwise, backfill this label.
  347. if labelSet.Has(l) {
  348. aL[l] = labelSet.Get(l)
  349. }
  350. }
  351. }
  352. // createSelectorFromLabels is used to define a selector that corresponds to the keys in a map.
  353. func createSelectorFromLabels(aL map[string]string) labels.Selector {
  354. if len(aL) == 0 {
  355. return labels.Everything()
  356. }
  357. return labels.Set(aL).AsSelector()
  358. }
  359. // filterPodsByNamespace filters pods outside a namespace from the given list.
  360. func filterPodsByNamespace(pods []*v1.Pod, ns string) []*v1.Pod {
  361. filtered := []*v1.Pod{}
  362. for _, nsPod := range pods {
  363. if nsPod.Namespace == ns {
  364. filtered = append(filtered, nsPod)
  365. }
  366. }
  367. return filtered
  368. }
  369. // findLabelsInSet gets as many key/value pairs as possible out of a label set.
  370. func findLabelsInSet(labelsToKeep []string, selector labels.Set) map[string]string {
  371. aL := make(map[string]string)
  372. for _, l := range labelsToKeep {
  373. if selector.Has(l) {
  374. aL[l] = selector.Get(l)
  375. }
  376. }
  377. return aL
  378. }