factory_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "encoding/json"
  17. "errors"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/google/go-cmp/cmp"
  22. v1 "k8s.io/api/core/v1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/util/clock"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. "k8s.io/client-go/informers"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/fake"
  30. clienttesting "k8s.io/client-go/testing"
  31. "k8s.io/client-go/tools/cache"
  32. apitesting "k8s.io/kubernetes/pkg/api/testing"
  33. kubefeatures "k8s.io/kubernetes/pkg/features"
  34. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  35. "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
  36. extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
  37. frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
  38. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
  39. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
  40. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
  41. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
  42. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
  43. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
  44. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  45. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  46. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  47. "k8s.io/kubernetes/pkg/scheduler/listers"
  48. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  49. )
  50. const (
  51. disablePodPreemption = false
  52. bindTimeoutSeconds = 600
  53. podInitialBackoffDurationSeconds = 1
  54. podMaxBackoffDurationSeconds = 10
  55. )
  56. func TestCreate(t *testing.T) {
  57. client := fake.NewSimpleClientset()
  58. stopCh := make(chan struct{})
  59. defer close(stopCh)
  60. factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
  61. factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName)
  62. }
  63. // Test configures a scheduler from a policies defined in a file
  64. // It combines some configurable predicate/priorities with some pre-defined ones
  65. func TestCreateFromConfig(t *testing.T) {
  66. var configData []byte
  67. var policy schedulerapi.Policy
  68. client := fake.NewSimpleClientset()
  69. stopCh := make(chan struct{})
  70. defer close(stopCh)
  71. factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
  72. configData = []byte(`{
  73. "kind" : "Policy",
  74. "apiVersion" : "v1",
  75. "predicates" : [
  76. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
  77. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["foo"]}}},
  78. {"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
  79. {"name" : "TestNoFooLabel", "argument" : {"labelsPresence" : {"labels" : ["foo"], "presence" : false}}},
  80. {"name" : "PodFitsResources"},
  81. {"name" : "PodFitsHostPorts"}
  82. ],
  83. "priorities" : [
  84. {"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}},
  85. {"name" : "ZoneSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "zone"}}},
  86. {"name" : "LabelPreference1", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l1", "presence": true}}},
  87. {"name" : "LabelPreference2", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l2", "presence": false}}},
  88. {"name" : "NodeAffinityPriority", "weight" : 2},
  89. {"name" : "ImageLocalityPriority", "weight" : 1} ]
  90. }`)
  91. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  92. t.Errorf("Invalid configuration: %v", err)
  93. }
  94. sched, err := factory.createFromConfig(policy)
  95. if err != nil {
  96. t.Fatalf("createFromConfig failed: %v", err)
  97. }
  98. hpa := factory.hardPodAffinitySymmetricWeight
  99. if hpa != v1.DefaultHardPodAffinitySymmetricWeight {
  100. t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa)
  101. }
  102. queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"]
  103. wantQueuePls := []schedulerapi.Plugin{{Name: queuesort.Name}}
  104. if diff := cmp.Diff(wantQueuePls, queueSortPls); diff != "" {
  105. t.Errorf("Unexpected QueueSort plugins (-want, +got): %s", diff)
  106. }
  107. bindPls := sched.Framework.ListPlugins()["BindPlugin"]
  108. wantBindPls := []schedulerapi.Plugin{{Name: defaultbinder.Name}}
  109. if diff := cmp.Diff(wantBindPls, bindPls); diff != "" {
  110. t.Errorf("Unexpected Bind plugins (-want, +got): %s", diff)
  111. }
  112. // Verify that node label predicate/priority are converted to framework plugins.
  113. wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`
  114. verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
  115. // Verify that service affinity custom predicate/priority is converted to framework plugin.
  116. wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}`
  117. verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
  118. }
  119. func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, configurator *Configurator, wantWeight int32, wantArgs string) {
  120. for _, extensionPoint := range extentionPoints {
  121. plugin, ok := findPlugin(name, extensionPoint, sched)
  122. if !ok {
  123. t.Fatalf("%q plugin does not exist in framework.", name)
  124. }
  125. if extensionPoint == "ScorePlugin" {
  126. if plugin.Weight != wantWeight {
  127. t.Errorf("Wrong weight. Got: %v, want: %v", plugin.Weight, wantWeight)
  128. }
  129. }
  130. // Verify that the policy config is converted to plugin config.
  131. pluginConfig := findPluginConfig(name, configurator)
  132. encoding, err := json.Marshal(pluginConfig)
  133. if err != nil {
  134. t.Errorf("Failed to marshal %+v: %v", pluginConfig, err)
  135. }
  136. if string(encoding) != wantArgs {
  137. t.Errorf("Config for %v plugin mismatch. got: %v, want: %v", name, string(encoding), wantArgs)
  138. }
  139. }
  140. }
  141. func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) {
  142. for _, pl := range sched.Framework.ListPlugins()[extensionPoint] {
  143. if pl.Name == name {
  144. return pl, true
  145. }
  146. }
  147. return schedulerapi.Plugin{}, false
  148. }
  149. func findPluginConfig(name string, configurator *Configurator) schedulerapi.PluginConfig {
  150. for _, c := range configurator.pluginConfig {
  151. if c.Name == name {
  152. return c
  153. }
  154. }
  155. return schedulerapi.PluginConfig{}
  156. }
  157. func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
  158. var configData []byte
  159. var policy schedulerapi.Policy
  160. client := fake.NewSimpleClientset()
  161. stopCh := make(chan struct{})
  162. defer close(stopCh)
  163. factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
  164. configData = []byte(`{
  165. "kind" : "Policy",
  166. "apiVersion" : "v1",
  167. "predicates" : [
  168. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
  169. {"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
  170. {"name" : "PodFitsResources"},
  171. {"name" : "PodFitsHostPorts"}
  172. ],
  173. "priorities" : [
  174. {"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}},
  175. {"name" : "NodeAffinityPriority", "weight" : 2},
  176. {"name" : "ImageLocalityPriority", "weight" : 1},
  177. {"name" : "InterPodAffinityPriority", "weight" : 1}
  178. ],
  179. "hardPodAffinitySymmetricWeight" : 10
  180. }`)
  181. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  182. t.Fatalf("Invalid configuration: %v", err)
  183. }
  184. factory.createFromConfig(policy)
  185. // TODO(#87703): Verify that the entire pluginConfig is correct.
  186. foundAffinityCfg := false
  187. for _, cfg := range factory.pluginConfig {
  188. if cfg.Name == interpodaffinity.Name {
  189. foundAffinityCfg = true
  190. wantArgs := runtime.Unknown{Raw: []byte(`{"hardPodAffinityWeight":10}`)}
  191. if diff := cmp.Diff(wantArgs, cfg.Args); diff != "" {
  192. t.Errorf("wrong InterPodAffinity args (-want, +got): %s", diff)
  193. }
  194. }
  195. }
  196. if !foundAffinityCfg {
  197. t.Errorf("args for InterPodAffinity were not found")
  198. }
  199. }
  200. func TestCreateFromEmptyConfig(t *testing.T) {
  201. var configData []byte
  202. var policy schedulerapi.Policy
  203. client := fake.NewSimpleClientset()
  204. stopCh := make(chan struct{})
  205. defer close(stopCh)
  206. factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
  207. configData = []byte(`{}`)
  208. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  209. t.Errorf("Invalid configuration: %v", err)
  210. }
  211. factory.createFromConfig(policy)
  212. wantConfig := []schedulerapi.PluginConfig{
  213. {
  214. Name: noderesources.FitName,
  215. Args: runtime.Unknown{Raw: []byte(`null`)},
  216. },
  217. {
  218. Name: interpodaffinity.Name,
  219. Args: runtime.Unknown{Raw: []byte(`{"hardPodAffinityWeight":1}`)},
  220. },
  221. }
  222. if diff := cmp.Diff(wantConfig, factory.pluginConfig); diff != "" {
  223. t.Errorf("wrong plugin config (-want, +got): %s", diff)
  224. }
  225. }
  226. // Test configures a scheduler from a policy that does not specify any
  227. // predicate/priority.
  228. // The predicate/priority from DefaultProvider will be used.
  229. func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
  230. client := fake.NewSimpleClientset()
  231. stopCh := make(chan struct{})
  232. defer close(stopCh)
  233. factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
  234. configData := []byte(`{
  235. "kind" : "Policy",
  236. "apiVersion" : "v1"
  237. }`)
  238. var policy schedulerapi.Policy
  239. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  240. t.Fatalf("Invalid configuration: %v", err)
  241. }
  242. sched, err := factory.createFromConfig(policy)
  243. if err != nil {
  244. t.Fatalf("Failed to create scheduler from configuration: %v", err)
  245. }
  246. if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched); !exist {
  247. t.Errorf("Expected plugin NodeResourcesFit")
  248. }
  249. }
  250. func TestDefaultErrorFunc(t *testing.T) {
  251. testPod := &v1.Pod{
  252. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
  253. Spec: apitesting.V1DeepEqualSafePodSpec(),
  254. }
  255. testPodInfo := &framework.PodInfo{Pod: testPod}
  256. client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
  257. stopCh := make(chan struct{})
  258. defer close(stopCh)
  259. timestamp := time.Now()
  260. queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp)))
  261. schedulerCache := internalcache.New(30*time.Second, stopCh)
  262. errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache)
  263. // Trigger error handling again to put the pod in unschedulable queue
  264. errFunc(testPodInfo, nil)
  265. // Try up to a minute to retrieve the error pod from priority queue
  266. foundPodFlag := false
  267. maxIterations := 10 * 60
  268. for i := 0; i < maxIterations; i++ {
  269. time.Sleep(100 * time.Millisecond)
  270. got := getPodfromPriorityQueue(queue, testPod)
  271. if got == nil {
  272. continue
  273. }
  274. testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
  275. if e, a := testPod, got; !reflect.DeepEqual(e, a) {
  276. t.Errorf("Expected %v, got %v", e, a)
  277. }
  278. foundPodFlag = true
  279. break
  280. }
  281. if !foundPodFlag {
  282. t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
  283. }
  284. // Remove the pod from priority queue to test putting error
  285. // pod in backoff queue.
  286. queue.Delete(testPod)
  287. // Trigger a move request
  288. queue.MoveAllToActiveOrBackoffQueue("test")
  289. // Trigger error handling again to put the pod in backoff queue
  290. errFunc(testPodInfo, nil)
  291. foundPodFlag = false
  292. for i := 0; i < maxIterations; i++ {
  293. time.Sleep(100 * time.Millisecond)
  294. // The pod should be found from backoff queue at this time
  295. got := getPodfromPriorityQueue(queue, testPod)
  296. if got == nil {
  297. continue
  298. }
  299. testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
  300. if e, a := testPod, got; !reflect.DeepEqual(e, a) {
  301. t.Errorf("Expected %v, got %v", e, a)
  302. }
  303. foundPodFlag = true
  304. break
  305. }
  306. if !foundPodFlag {
  307. t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
  308. }
  309. }
  310. // getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
  311. // the specific pod from the given priority queue. It returns the found pod in the priority queue.
  312. func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
  313. podList := queue.PendingPods()
  314. if len(podList) == 0 {
  315. return nil
  316. }
  317. queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
  318. if err != nil {
  319. return nil
  320. }
  321. for _, foundPod := range podList {
  322. foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
  323. if err != nil {
  324. return nil
  325. }
  326. if foundPodKey == queryPodKey {
  327. return foundPod
  328. }
  329. }
  330. return nil
  331. }
  332. // testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
  333. // It tests whether the fake client can receive request and correctly "get" the namespace
  334. // and name of the error pod.
  335. func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
  336. requestReceived := false
  337. actions := client.Actions()
  338. for _, a := range actions {
  339. if a.GetVerb() == "get" {
  340. getAction, ok := a.(clienttesting.GetAction)
  341. if !ok {
  342. t.Errorf("Can't cast action object to GetAction interface")
  343. break
  344. }
  345. name := getAction.GetName()
  346. ns := a.GetNamespace()
  347. if name != podName || ns != podNs {
  348. t.Errorf("Expected name %s namespace %s, got %s %s",
  349. podName, podNs, name, ns)
  350. }
  351. requestReceived = true
  352. }
  353. }
  354. if !requestReceived {
  355. t.Errorf("Get pod request not received")
  356. }
  357. }
  358. func newConfigFactoryWithFrameworkRegistry(
  359. client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{},
  360. registry framework.Registry) *Configurator {
  361. informerFactory := informers.NewSharedInformerFactory(client, 0)
  362. snapshot := internalcache.NewEmptySnapshot()
  363. return &Configurator{
  364. client: client,
  365. informerFactory: informerFactory,
  366. podInformer: informerFactory.Core().V1().Pods(),
  367. hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
  368. disablePreemption: disablePodPreemption,
  369. percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  370. bindTimeoutSeconds: bindTimeoutSeconds,
  371. podInitialBackoffSeconds: podInitialBackoffDurationSeconds,
  372. podMaxBackoffSeconds: podMaxBackoffDurationSeconds,
  373. StopEverything: stopCh,
  374. enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
  375. registry: registry,
  376. plugins: nil,
  377. pluginConfig: []schedulerapi.PluginConfig{},
  378. nodeInfoSnapshot: snapshot,
  379. }
  380. }
  381. func newConfigFactory(
  382. client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
  383. return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh,
  384. frameworkplugins.NewInTreeRegistry())
  385. }
  386. type fakeExtender struct {
  387. isBinder bool
  388. interestedPodName string
  389. ignorable bool
  390. gotBind bool
  391. }
  392. func (f *fakeExtender) Name() string {
  393. return "fakeExtender"
  394. }
  395. func (f *fakeExtender) IsIgnorable() bool {
  396. return f.ignorable
  397. }
  398. func (f *fakeExtender) ProcessPreemption(
  399. pod *v1.Pod,
  400. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  401. nodeInfos listers.NodeInfoLister,
  402. ) (map[*v1.Node]*extenderv1.Victims, error) {
  403. return nil, nil
  404. }
  405. func (f *fakeExtender) SupportsPreemption() bool {
  406. return false
  407. }
  408. func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
  409. return nil, nil, nil
  410. }
  411. func (f *fakeExtender) Prioritize(
  412. pod *v1.Pod,
  413. nodes []*v1.Node,
  414. ) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
  415. return nil, 0, nil
  416. }
  417. func (f *fakeExtender) Bind(binding *v1.Binding) error {
  418. if f.isBinder {
  419. f.gotBind = true
  420. return nil
  421. }
  422. return errors.New("not a binder")
  423. }
  424. func (f *fakeExtender) IsBinder() bool {
  425. return f.isBinder
  426. }
  427. func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
  428. return pod != nil && pod.Name == f.interestedPodName
  429. }
  430. type TestPlugin struct {
  431. name string
  432. }
  433. var _ framework.ScorePlugin = &TestPlugin{}
  434. var _ framework.FilterPlugin = &TestPlugin{}
  435. func (t *TestPlugin) Name() string {
  436. return t.name
  437. }
  438. func (t *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
  439. return 1, nil
  440. }
  441. func (t *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
  442. return nil
  443. }
  444. func (t *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  445. return nil
  446. }