factory_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "encoding/json"
  17. "errors"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/google/go-cmp/cmp"
  22. v1 "k8s.io/api/core/v1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/util/clock"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. "k8s.io/client-go/informers"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/fake"
  30. clienttesting "k8s.io/client-go/testing"
  31. "k8s.io/client-go/tools/cache"
  32. apitesting "k8s.io/kubernetes/pkg/api/testing"
  33. kubefeatures "k8s.io/kubernetes/pkg/features"
  34. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  35. "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
  36. extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
  37. frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
  38. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
  39. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
  40. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
  41. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
  42. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
  43. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  44. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  45. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  46. "k8s.io/kubernetes/pkg/scheduler/listers"
  47. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  48. )
  49. const (
  50. disablePodPreemption = false
  51. bindTimeoutSeconds = 600
  52. podInitialBackoffDurationSeconds = 1
  53. podMaxBackoffDurationSeconds = 10
  54. )
  55. func TestCreate(t *testing.T) {
  56. client := fake.NewSimpleClientset()
  57. stopCh := make(chan struct{})
  58. defer close(stopCh)
  59. factory := newConfigFactory(client, stopCh)
  60. factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName)
  61. }
  62. // Test configures a scheduler from a policies defined in a file
  63. // It combines some configurable predicate/priorities with some pre-defined ones
  64. func TestCreateFromConfig(t *testing.T) {
  65. var configData []byte
  66. var policy schedulerapi.Policy
  67. client := fake.NewSimpleClientset()
  68. stopCh := make(chan struct{})
  69. defer close(stopCh)
  70. factory := newConfigFactory(client, stopCh)
  71. configData = []byte(`{
  72. "kind" : "Policy",
  73. "apiVersion" : "v1",
  74. "predicates" : [
  75. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
  76. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["foo"]}}},
  77. {"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
  78. {"name" : "TestNoFooLabel", "argument" : {"labelsPresence" : {"labels" : ["foo"], "presence" : false}}},
  79. {"name" : "PodFitsResources"},
  80. {"name" : "PodFitsHostPorts"}
  81. ],
  82. "priorities" : [
  83. {"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}},
  84. {"name" : "ZoneSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "zone"}}},
  85. {"name" : "LabelPreference1", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l1", "presence": true}}},
  86. {"name" : "LabelPreference2", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l2", "presence": false}}},
  87. {"name" : "NodeAffinityPriority", "weight" : 2},
  88. {"name" : "ImageLocalityPriority", "weight" : 1} ]
  89. }`)
  90. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  91. t.Errorf("Invalid configuration: %v", err)
  92. }
  93. sched, err := factory.createFromConfig(policy)
  94. if err != nil {
  95. t.Fatalf("createFromConfig failed: %v", err)
  96. }
  97. queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"]
  98. wantQueuePls := []schedulerapi.Plugin{{Name: queuesort.Name}}
  99. if diff := cmp.Diff(wantQueuePls, queueSortPls); diff != "" {
  100. t.Errorf("Unexpected QueueSort plugins (-want, +got): %s", diff)
  101. }
  102. bindPls := sched.Framework.ListPlugins()["BindPlugin"]
  103. wantBindPls := []schedulerapi.Plugin{{Name: defaultbinder.Name}}
  104. if diff := cmp.Diff(wantBindPls, bindPls); diff != "" {
  105. t.Errorf("Unexpected Bind plugins (-want, +got): %s", diff)
  106. }
  107. // Verify that node label predicate/priority are converted to framework plugins.
  108. wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`
  109. verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
  110. // Verify that service affinity custom predicate/priority is converted to framework plugin.
  111. wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}`
  112. verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
  113. // TODO(#87703): Verify all plugin configs.
  114. }
  115. func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, configurator *Configurator, wantWeight int32, wantArgs string) {
  116. for _, extensionPoint := range extentionPoints {
  117. plugin, ok := findPlugin(name, extensionPoint, sched)
  118. if !ok {
  119. t.Fatalf("%q plugin does not exist in framework.", name)
  120. }
  121. if extensionPoint == "ScorePlugin" {
  122. if plugin.Weight != wantWeight {
  123. t.Errorf("Wrong weight. Got: %v, want: %v", plugin.Weight, wantWeight)
  124. }
  125. }
  126. // Verify that the policy config is converted to plugin config.
  127. pluginConfig := findPluginConfig(name, configurator)
  128. encoding, err := json.Marshal(pluginConfig)
  129. if err != nil {
  130. t.Errorf("Failed to marshal %+v: %v", pluginConfig, err)
  131. }
  132. if string(encoding) != wantArgs {
  133. t.Errorf("Config for %v plugin mismatch. got: %v, want: %v", name, string(encoding), wantArgs)
  134. }
  135. }
  136. }
  137. func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) {
  138. for _, pl := range sched.Framework.ListPlugins()[extensionPoint] {
  139. if pl.Name == name {
  140. return pl, true
  141. }
  142. }
  143. return schedulerapi.Plugin{}, false
  144. }
  145. func findPluginConfig(name string, configurator *Configurator) schedulerapi.PluginConfig {
  146. for _, c := range configurator.pluginConfig {
  147. if c.Name == name {
  148. return c
  149. }
  150. }
  151. return schedulerapi.PluginConfig{}
  152. }
  153. func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
  154. var configData []byte
  155. var policy schedulerapi.Policy
  156. client := fake.NewSimpleClientset()
  157. stopCh := make(chan struct{})
  158. defer close(stopCh)
  159. factory := newConfigFactory(client, stopCh)
  160. configData = []byte(`{
  161. "kind" : "Policy",
  162. "apiVersion" : "v1",
  163. "predicates" : [
  164. {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
  165. {"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
  166. {"name" : "PodFitsResources"},
  167. {"name" : "PodFitsHostPorts"}
  168. ],
  169. "priorities" : [
  170. {"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}},
  171. {"name" : "NodeAffinityPriority", "weight" : 2},
  172. {"name" : "ImageLocalityPriority", "weight" : 1},
  173. {"name" : "InterPodAffinityPriority", "weight" : 1}
  174. ],
  175. "hardPodAffinitySymmetricWeight" : 10
  176. }`)
  177. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  178. t.Fatalf("Invalid configuration: %v", err)
  179. }
  180. factory.createFromConfig(policy)
  181. // TODO(#87703): Verify that the entire pluginConfig is correct.
  182. foundAffinityCfg := false
  183. for _, cfg := range factory.pluginConfig {
  184. if cfg.Name == interpodaffinity.Name {
  185. foundAffinityCfg = true
  186. wantArgs := runtime.Unknown{Raw: []byte(`{"hardPodAffinityWeight":10}`)}
  187. if diff := cmp.Diff(wantArgs, cfg.Args); diff != "" {
  188. t.Errorf("wrong InterPodAffinity args (-want, +got): %s", diff)
  189. }
  190. }
  191. }
  192. if !foundAffinityCfg {
  193. t.Errorf("args for InterPodAffinity were not found")
  194. }
  195. }
  196. func TestCreateFromEmptyConfig(t *testing.T) {
  197. var configData []byte
  198. var policy schedulerapi.Policy
  199. client := fake.NewSimpleClientset()
  200. stopCh := make(chan struct{})
  201. defer close(stopCh)
  202. factory := newConfigFactory(client, stopCh)
  203. configData = []byte(`{}`)
  204. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  205. t.Errorf("Invalid configuration: %v", err)
  206. }
  207. factory.createFromConfig(policy)
  208. if len(factory.pluginConfig) != 0 {
  209. t.Errorf("got plugin config %s, want none", factory.pluginConfig)
  210. }
  211. }
  212. // Test configures a scheduler from a policy that does not specify any
  213. // predicate/priority.
  214. // The predicate/priority from DefaultProvider will be used.
  215. func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
  216. client := fake.NewSimpleClientset()
  217. stopCh := make(chan struct{})
  218. defer close(stopCh)
  219. factory := newConfigFactory(client, stopCh)
  220. configData := []byte(`{
  221. "kind" : "Policy",
  222. "apiVersion" : "v1"
  223. }`)
  224. var policy schedulerapi.Policy
  225. if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
  226. t.Fatalf("Invalid configuration: %v", err)
  227. }
  228. sched, err := factory.createFromConfig(policy)
  229. if err != nil {
  230. t.Fatalf("Failed to create scheduler from configuration: %v", err)
  231. }
  232. if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched); !exist {
  233. t.Errorf("Expected plugin NodeResourcesFit")
  234. }
  235. }
  236. func TestDefaultErrorFunc(t *testing.T) {
  237. testPod := &v1.Pod{
  238. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
  239. Spec: apitesting.V1DeepEqualSafePodSpec(),
  240. }
  241. testPodInfo := &framework.PodInfo{Pod: testPod}
  242. client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
  243. stopCh := make(chan struct{})
  244. defer close(stopCh)
  245. timestamp := time.Now()
  246. queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp)))
  247. schedulerCache := internalcache.New(30*time.Second, stopCh)
  248. errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache)
  249. // Trigger error handling again to put the pod in unschedulable queue
  250. errFunc(testPodInfo, nil)
  251. // Try up to a minute to retrieve the error pod from priority queue
  252. foundPodFlag := false
  253. maxIterations := 10 * 60
  254. for i := 0; i < maxIterations; i++ {
  255. time.Sleep(100 * time.Millisecond)
  256. got := getPodfromPriorityQueue(queue, testPod)
  257. if got == nil {
  258. continue
  259. }
  260. testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
  261. if e, a := testPod, got; !reflect.DeepEqual(e, a) {
  262. t.Errorf("Expected %v, got %v", e, a)
  263. }
  264. foundPodFlag = true
  265. break
  266. }
  267. if !foundPodFlag {
  268. t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
  269. }
  270. // Remove the pod from priority queue to test putting error
  271. // pod in backoff queue.
  272. queue.Delete(testPod)
  273. // Trigger a move request
  274. queue.MoveAllToActiveOrBackoffQueue("test")
  275. // Trigger error handling again to put the pod in backoff queue
  276. errFunc(testPodInfo, nil)
  277. foundPodFlag = false
  278. for i := 0; i < maxIterations; i++ {
  279. time.Sleep(100 * time.Millisecond)
  280. // The pod should be found from backoff queue at this time
  281. got := getPodfromPriorityQueue(queue, testPod)
  282. if got == nil {
  283. continue
  284. }
  285. testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
  286. if e, a := testPod, got; !reflect.DeepEqual(e, a) {
  287. t.Errorf("Expected %v, got %v", e, a)
  288. }
  289. foundPodFlag = true
  290. break
  291. }
  292. if !foundPodFlag {
  293. t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
  294. }
  295. }
  296. // getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
  297. // the specific pod from the given priority queue. It returns the found pod in the priority queue.
  298. func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
  299. podList := queue.PendingPods()
  300. if len(podList) == 0 {
  301. return nil
  302. }
  303. queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
  304. if err != nil {
  305. return nil
  306. }
  307. for _, foundPod := range podList {
  308. foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
  309. if err != nil {
  310. return nil
  311. }
  312. if foundPodKey == queryPodKey {
  313. return foundPod
  314. }
  315. }
  316. return nil
  317. }
  318. // testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
  319. // It tests whether the fake client can receive request and correctly "get" the namespace
  320. // and name of the error pod.
  321. func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
  322. requestReceived := false
  323. actions := client.Actions()
  324. for _, a := range actions {
  325. if a.GetVerb() == "get" {
  326. getAction, ok := a.(clienttesting.GetAction)
  327. if !ok {
  328. t.Errorf("Can't cast action object to GetAction interface")
  329. break
  330. }
  331. name := getAction.GetName()
  332. ns := a.GetNamespace()
  333. if name != podName || ns != podNs {
  334. t.Errorf("Expected name %s namespace %s, got %s %s",
  335. podName, podNs, name, ns)
  336. }
  337. requestReceived = true
  338. }
  339. }
  340. if !requestReceived {
  341. t.Errorf("Get pod request not received")
  342. }
  343. }
  344. func newConfigFactoryWithFrameworkRegistry(
  345. client clientset.Interface, stopCh <-chan struct{},
  346. registry framework.Registry) *Configurator {
  347. informerFactory := informers.NewSharedInformerFactory(client, 0)
  348. snapshot := internalcache.NewEmptySnapshot()
  349. return &Configurator{
  350. client: client,
  351. informerFactory: informerFactory,
  352. podInformer: informerFactory.Core().V1().Pods(),
  353. disablePreemption: disablePodPreemption,
  354. percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  355. bindTimeoutSeconds: bindTimeoutSeconds,
  356. podInitialBackoffSeconds: podInitialBackoffDurationSeconds,
  357. podMaxBackoffSeconds: podMaxBackoffDurationSeconds,
  358. StopEverything: stopCh,
  359. enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
  360. registry: registry,
  361. plugins: nil,
  362. pluginConfig: []schedulerapi.PluginConfig{},
  363. nodeInfoSnapshot: snapshot,
  364. }
  365. }
  366. func newConfigFactory(client clientset.Interface, stopCh <-chan struct{}) *Configurator {
  367. return newConfigFactoryWithFrameworkRegistry(client, stopCh,
  368. frameworkplugins.NewInTreeRegistry())
  369. }
  370. type fakeExtender struct {
  371. isBinder bool
  372. interestedPodName string
  373. ignorable bool
  374. gotBind bool
  375. }
  376. func (f *fakeExtender) Name() string {
  377. return "fakeExtender"
  378. }
  379. func (f *fakeExtender) IsIgnorable() bool {
  380. return f.ignorable
  381. }
  382. func (f *fakeExtender) ProcessPreemption(
  383. pod *v1.Pod,
  384. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  385. nodeInfos listers.NodeInfoLister,
  386. ) (map[*v1.Node]*extenderv1.Victims, error) {
  387. return nil, nil
  388. }
  389. func (f *fakeExtender) SupportsPreemption() bool {
  390. return false
  391. }
  392. func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
  393. return nil, nil, nil
  394. }
  395. func (f *fakeExtender) Prioritize(
  396. pod *v1.Pod,
  397. nodes []*v1.Node,
  398. ) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
  399. return nil, 0, nil
  400. }
  401. func (f *fakeExtender) Bind(binding *v1.Binding) error {
  402. if f.isBinder {
  403. f.gotBind = true
  404. return nil
  405. }
  406. return errors.New("not a binder")
  407. }
  408. func (f *fakeExtender) IsBinder() bool {
  409. return f.isBinder
  410. }
  411. func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
  412. return pod != nil && pod.Name == f.interestedPodName
  413. }
  414. type TestPlugin struct {
  415. name string
  416. }
  417. var _ framework.ScorePlugin = &TestPlugin{}
  418. var _ framework.FilterPlugin = &TestPlugin{}
  419. func (t *TestPlugin) Name() string {
  420. return t.name
  421. }
  422. func (t *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
  423. return 1, nil
  424. }
  425. func (t *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
  426. return nil
  427. }
  428. func (t *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  429. return nil
  430. }