scheduler_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "errors"
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "path"
  20. "reflect"
  21. "testing"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/api/resource"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/diff"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/client-go/informers"
  33. clientsetfake "k8s.io/client-go/kubernetes/fake"
  34. "k8s.io/client-go/kubernetes/scheme"
  35. corelister "k8s.io/client-go/listers/core/v1"
  36. clientcache "k8s.io/client-go/tools/cache"
  37. "k8s.io/client-go/tools/record"
  38. volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
  39. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  40. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  41. "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
  42. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  43. kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  44. "k8s.io/kubernetes/pkg/scheduler/core"
  45. "k8s.io/kubernetes/pkg/scheduler/factory"
  46. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  47. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  48. fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
  49. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  50. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  51. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  52. )
  53. // EmptyFramework is an empty framework used in tests.
  54. // Note: If the test runs in goroutine, please don't using this variable to avoid a race condition.
  55. var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
  56. // EmptyPluginConfig is an empty plugin config used in tests.
  57. var EmptyPluginConfig = []kubeschedulerconfig.PluginConfig{}
  58. type fakeBinder struct {
  59. b func(binding *v1.Binding) error
  60. }
  61. func (fb fakeBinder) Bind(binding *v1.Binding) error { return fb.b(binding) }
  62. type fakePodConditionUpdater struct{}
  63. func (fc fakePodConditionUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondition) error {
  64. return nil
  65. }
  66. type fakePodPreemptor struct{}
  67. func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
  68. return pod, nil
  69. }
  70. func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
  71. return nil
  72. }
  73. func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
  74. return nil
  75. }
  76. func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
  77. return nil
  78. }
  79. type nodeLister struct {
  80. corelister.NodeLister
  81. }
  82. func (n *nodeLister) List() ([]*v1.Node, error) {
  83. return n.NodeLister.List(labels.Everything())
  84. }
  85. func podWithID(id, desiredHost string) *v1.Pod {
  86. return &v1.Pod{
  87. ObjectMeta: metav1.ObjectMeta{
  88. Name: id,
  89. UID: types.UID(id),
  90. SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
  91. },
  92. Spec: v1.PodSpec{
  93. NodeName: desiredHost,
  94. },
  95. }
  96. }
  97. func deletingPod(id string) *v1.Pod {
  98. deletionTimestamp := metav1.Now()
  99. return &v1.Pod{
  100. ObjectMeta: metav1.ObjectMeta{
  101. Name: id,
  102. UID: types.UID(id),
  103. DeletionTimestamp: &deletionTimestamp,
  104. SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
  105. },
  106. Spec: v1.PodSpec{
  107. NodeName: "",
  108. },
  109. }
  110. }
  111. func podWithPort(id, desiredHost string, port int) *v1.Pod {
  112. pod := podWithID(id, desiredHost)
  113. pod.Spec.Containers = []v1.Container{
  114. {Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
  115. }
  116. return pod
  117. }
  118. func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
  119. pod := podWithID(id, desiredHost)
  120. pod.Spec.Containers = []v1.Container{
  121. {Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
  122. }
  123. return pod
  124. }
  125. func PredicateOne(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
  126. return true, nil, nil
  127. }
  128. func PriorityOne(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
  129. return []schedulerapi.HostPriority{}, nil
  130. }
  131. type mockScheduler struct {
  132. result core.ScheduleResult
  133. err error
  134. }
  135. func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister) (core.ScheduleResult, error) {
  136. return es.result, es.err
  137. }
  138. func (es mockScheduler) Predicates() map[string]predicates.FitPredicate {
  139. return nil
  140. }
  141. func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
  142. return nil
  143. }
  144. func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
  145. return nil, nil, nil, nil
  146. }
  147. func TestSchedulerCreation(t *testing.T) {
  148. client := clientsetfake.NewSimpleClientset()
  149. informerFactory := informers.NewSharedInformerFactory(client, 0)
  150. testSource := "testProvider"
  151. eventBroadcaster := record.NewBroadcaster()
  152. eventBroadcaster.StartLogging(t.Logf).Stop()
  153. defaultBindTimeout := int64(30)
  154. factory.RegisterFitPredicate("PredicateOne", PredicateOne)
  155. factory.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
  156. factory.RegisterAlgorithmProvider(testSource, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
  157. stopCh := make(chan struct{})
  158. defer close(stopCh)
  159. _, err := New(client,
  160. informerFactory.Core().V1().Nodes(),
  161. factory.NewPodInformer(client, 0),
  162. informerFactory.Core().V1().PersistentVolumes(),
  163. informerFactory.Core().V1().PersistentVolumeClaims(),
  164. informerFactory.Core().V1().ReplicationControllers(),
  165. informerFactory.Apps().V1().ReplicaSets(),
  166. informerFactory.Apps().V1().StatefulSets(),
  167. informerFactory.Core().V1().Services(),
  168. informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  169. informerFactory.Storage().V1().StorageClasses(),
  170. eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
  171. kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
  172. stopCh,
  173. EmptyPluginRegistry,
  174. nil,
  175. EmptyPluginConfig,
  176. WithBindTimeoutSeconds(defaultBindTimeout))
  177. if err != nil {
  178. t.Fatalf("Failed to create scheduler: %v", err)
  179. }
  180. }
  181. func TestScheduler(t *testing.T) {
  182. eventBroadcaster := record.NewBroadcaster()
  183. eventBroadcaster.StartLogging(t.Logf).Stop()
  184. errS := errors.New("scheduler")
  185. errB := errors.New("binder")
  186. testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  187. table := []struct {
  188. name string
  189. injectBindError error
  190. sendPod *v1.Pod
  191. algo core.ScheduleAlgorithm
  192. expectErrorPod *v1.Pod
  193. expectForgetPod *v1.Pod
  194. expectAssumedPod *v1.Pod
  195. expectError error
  196. expectBind *v1.Binding
  197. eventReason string
  198. }{
  199. {
  200. name: "bind assumed pod scheduled",
  201. sendPod: podWithID("foo", ""),
  202. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
  203. expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
  204. expectAssumedPod: podWithID("foo", testNode.Name),
  205. eventReason: "Scheduled",
  206. },
  207. {
  208. name: "error pod failed scheduling",
  209. sendPod: podWithID("foo", ""),
  210. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
  211. expectError: errS,
  212. expectErrorPod: podWithID("foo", ""),
  213. eventReason: "FailedScheduling",
  214. },
  215. {
  216. name: "error bind forget pod failed scheduling",
  217. sendPod: podWithID("foo", ""),
  218. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
  219. expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
  220. expectAssumedPod: podWithID("foo", testNode.Name),
  221. injectBindError: errB,
  222. expectError: errB,
  223. expectErrorPod: podWithID("foo", testNode.Name),
  224. expectForgetPod: podWithID("foo", testNode.Name),
  225. eventReason: "FailedScheduling",
  226. }, {
  227. sendPod: deletingPod("foo"),
  228. algo: mockScheduler{core.ScheduleResult{}, nil},
  229. eventReason: "FailedScheduling",
  230. },
  231. }
  232. stop := make(chan struct{})
  233. defer close(stop)
  234. client := clientsetfake.NewSimpleClientset(&testNode)
  235. informerFactory := informers.NewSharedInformerFactory(client, 0)
  236. nl := informerFactory.Core().V1().Nodes().Lister()
  237. informerFactory.Start(stop)
  238. informerFactory.WaitForCacheSync(stop)
  239. for _, item := range table {
  240. t.Run(item.name, func(t *testing.T) {
  241. var gotError error
  242. var gotPod *v1.Pod
  243. var gotForgetPod *v1.Pod
  244. var gotAssumedPod *v1.Pod
  245. var gotBinding *v1.Binding
  246. s := NewFromConfig(&factory.Config{
  247. SchedulerCache: &fakecache.Cache{
  248. ForgetFunc: func(pod *v1.Pod) {
  249. gotForgetPod = pod
  250. },
  251. AssumeFunc: func(pod *v1.Pod) {
  252. gotAssumedPod = pod
  253. },
  254. },
  255. NodeLister: &nodeLister{nl},
  256. Algorithm: item.algo,
  257. GetBinder: func(pod *v1.Pod) factory.Binder {
  258. return fakeBinder{func(b *v1.Binding) error {
  259. gotBinding = b
  260. return item.injectBindError
  261. }}
  262. },
  263. PodConditionUpdater: fakePodConditionUpdater{},
  264. Error: func(p *v1.Pod, err error) {
  265. gotPod = p
  266. gotError = err
  267. },
  268. NextPod: func() *v1.Pod {
  269. return item.sendPod
  270. },
  271. Framework: EmptyFramework,
  272. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
  273. VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
  274. })
  275. called := make(chan struct{})
  276. events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
  277. if e, a := item.eventReason, e.Reason; e != a {
  278. t.Errorf("expected %v, got %v", e, a)
  279. }
  280. close(called)
  281. })
  282. s.scheduleOne()
  283. <-called
  284. if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
  285. t.Errorf("assumed pod: wanted %v, got %v", e, a)
  286. }
  287. if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
  288. t.Errorf("error pod: wanted %v, got %v", e, a)
  289. }
  290. if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) {
  291. t.Errorf("forget pod: wanted %v, got %v", e, a)
  292. }
  293. if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
  294. t.Errorf("error: wanted %v, got %v", e, a)
  295. }
  296. if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
  297. t.Errorf("error: %s", diff.ObjectDiff(e, a))
  298. }
  299. events.Stop()
  300. })
  301. }
  302. }
  303. func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
  304. stop := make(chan struct{})
  305. defer close(stop)
  306. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  307. scache := internalcache.New(100*time.Millisecond, stop)
  308. pod := podWithPort("pod.Name", "", 8080)
  309. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  310. scache.AddNode(&node)
  311. client := clientsetfake.NewSimpleClientset(&node)
  312. informerFactory := informers.NewSharedInformerFactory(client, 0)
  313. predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
  314. scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node)
  315. waitPodExpireChan := make(chan struct{})
  316. timeout := make(chan struct{})
  317. go func() {
  318. for {
  319. select {
  320. case <-timeout:
  321. return
  322. default:
  323. }
  324. pods, err := scache.List(labels.Everything())
  325. if err != nil {
  326. t.Fatalf("cache.List failed: %v", err)
  327. }
  328. if len(pods) == 0 {
  329. close(waitPodExpireChan)
  330. return
  331. }
  332. time.Sleep(100 * time.Millisecond)
  333. }
  334. }()
  335. // waiting for the assumed pod to expire
  336. select {
  337. case <-waitPodExpireChan:
  338. case <-time.After(wait.ForeverTestTimeout):
  339. close(timeout)
  340. t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
  341. }
  342. // We use conflicted pod ports to incur fit predicate failure if first pod not removed.
  343. secondPod := podWithPort("bar", "", 8080)
  344. queuedPodStore.Add(secondPod)
  345. scheduler.scheduleOne()
  346. select {
  347. case b := <-bindingChan:
  348. expectBinding := &v1.Binding{
  349. ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
  350. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  351. }
  352. if !reflect.DeepEqual(expectBinding, b) {
  353. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  354. }
  355. case <-time.After(wait.ForeverTestTimeout):
  356. t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
  357. }
  358. }
  359. func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
  360. stop := make(chan struct{})
  361. defer close(stop)
  362. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  363. scache := internalcache.New(10*time.Minute, stop)
  364. firstPod := podWithPort("pod.Name", "", 8080)
  365. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  366. scache.AddNode(&node)
  367. client := clientsetfake.NewSimpleClientset(&node)
  368. informerFactory := informers.NewSharedInformerFactory(client, 0)
  369. predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
  370. scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node)
  371. // We use conflicted pod ports to incur fit predicate failure.
  372. secondPod := podWithPort("bar", "", 8080)
  373. queuedPodStore.Add(secondPod)
  374. // queuedPodStore: [bar:8080]
  375. // cache: [(assumed)foo:8080]
  376. scheduler.scheduleOne()
  377. select {
  378. case err := <-errChan:
  379. expectErr := &core.FitError{
  380. Pod: secondPod,
  381. NumAllNodes: 1,
  382. FailedPredicates: core.FailedPredicateMap{node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}},
  383. }
  384. if !reflect.DeepEqual(expectErr, err) {
  385. t.Errorf("err want=%v, get=%v", expectErr, err)
  386. }
  387. case <-time.After(wait.ForeverTestTimeout):
  388. t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
  389. }
  390. // We mimic the workflow of cache behavior when a pod is removed by user.
  391. // Note: if the schedulernodeinfo timeout would be super short, the first pod would expire
  392. // and would be removed itself (without any explicit actions on schedulernodeinfo). Even in that case,
  393. // explicitly AddPod will as well correct the behavior.
  394. firstPod.Spec.NodeName = node.Name
  395. if err := scache.AddPod(firstPod); err != nil {
  396. t.Fatalf("err: %v", err)
  397. }
  398. if err := scache.RemovePod(firstPod); err != nil {
  399. t.Fatalf("err: %v", err)
  400. }
  401. queuedPodStore.Add(secondPod)
  402. scheduler.scheduleOne()
  403. select {
  404. case b := <-bindingChan:
  405. expectBinding := &v1.Binding{
  406. ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
  407. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  408. }
  409. if !reflect.DeepEqual(expectBinding, b) {
  410. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  411. }
  412. case <-time.After(wait.ForeverTestTimeout):
  413. t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
  414. }
  415. }
  416. // Scheduler should preserve predicate constraint even if binding was longer
  417. // than cache ttl
  418. func TestSchedulerErrorWithLongBinding(t *testing.T) {
  419. stop := make(chan struct{})
  420. defer close(stop)
  421. firstPod := podWithPort("foo", "", 8080)
  422. conflictPod := podWithPort("bar", "", 8080)
  423. pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod}
  424. for _, test := range []struct {
  425. name string
  426. Expected map[string]bool
  427. CacheTTL time.Duration
  428. BindingDuration time.Duration
  429. }{
  430. {
  431. name: "long cache ttl",
  432. Expected: map[string]bool{firstPod.Name: true},
  433. CacheTTL: 100 * time.Millisecond,
  434. BindingDuration: 300 * time.Millisecond,
  435. },
  436. {
  437. name: "short cache ttl",
  438. Expected: map[string]bool{firstPod.Name: true},
  439. CacheTTL: 10 * time.Second,
  440. BindingDuration: 300 * time.Millisecond,
  441. },
  442. } {
  443. t.Run(test.name, func(t *testing.T) {
  444. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  445. scache := internalcache.New(test.CacheTTL, stop)
  446. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  447. scache.AddNode(&node)
  448. client := clientsetfake.NewSimpleClientset(&node)
  449. informerFactory := informers.NewSharedInformerFactory(client, 0)
  450. predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
  451. scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
  452. queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration)
  453. informerFactory.Start(stop)
  454. informerFactory.WaitForCacheSync(stop)
  455. scheduler.Run()
  456. queuedPodStore.Add(firstPod)
  457. queuedPodStore.Add(conflictPod)
  458. resultBindings := map[string]bool{}
  459. waitChan := time.After(5 * time.Second)
  460. for finished := false; !finished; {
  461. select {
  462. case b := <-bindingChan:
  463. resultBindings[b.Name] = true
  464. p := pods[b.Name]
  465. p.Spec.NodeName = b.Target.Name
  466. scache.AddPod(p)
  467. case <-waitChan:
  468. finished = true
  469. }
  470. }
  471. if !reflect.DeepEqual(resultBindings, test.Expected) {
  472. t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected)
  473. }
  474. })
  475. }
  476. }
  477. // queuedPodStore: pods queued before processing.
  478. // cache: scheduler cache that might contain assumed pods.
  479. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
  480. informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]predicates.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
  481. scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
  482. informerFactory.Start(stop)
  483. informerFactory.WaitForCacheSync(stop)
  484. queuedPodStore.Add(pod)
  485. // queuedPodStore: [foo:8080]
  486. // cache: []
  487. scheduler.scheduleOne()
  488. // queuedPodStore: []
  489. // cache: [(assumed)foo:8080]
  490. select {
  491. case b := <-bindingChan:
  492. expectBinding := &v1.Binding{
  493. ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
  494. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  495. }
  496. if !reflect.DeepEqual(expectBinding, b) {
  497. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  498. }
  499. case <-time.After(wait.ForeverTestTimeout):
  500. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  501. }
  502. return scheduler, bindingChan, errChan
  503. }
  504. func TestSchedulerFailedSchedulingReasons(t *testing.T) {
  505. stop := make(chan struct{})
  506. defer close(stop)
  507. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  508. scache := internalcache.New(10*time.Minute, stop)
  509. // Design the baseline for the pods, and we will make nodes that dont fit it later.
  510. var cpu = int64(4)
  511. var mem = int64(500)
  512. podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
  513. v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
  514. v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
  515. }, v1.ResourceList{
  516. v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
  517. v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
  518. })
  519. // create several nodes which cannot schedule the above pod
  520. var nodes []*v1.Node
  521. var objects []runtime.Object
  522. for i := 0; i < 100; i++ {
  523. uid := fmt.Sprintf("machine%v", i)
  524. node := v1.Node{
  525. ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
  526. Status: v1.NodeStatus{
  527. Capacity: v1.ResourceList{
  528. v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
  529. v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
  530. v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  531. },
  532. Allocatable: v1.ResourceList{
  533. v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
  534. v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
  535. v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  536. }},
  537. }
  538. scache.AddNode(&node)
  539. nodes = append(nodes, &node)
  540. objects = append(objects, &node)
  541. }
  542. client := clientsetfake.NewSimpleClientset(objects...)
  543. informerFactory := informers.NewSharedInformerFactory(client, 0)
  544. predicateMap := map[string]predicates.FitPredicate{
  545. "PodFitsResources": predicates.PodFitsResources,
  546. }
  547. // Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
  548. failedPredicatesMap := core.FailedPredicateMap{}
  549. for _, node := range nodes {
  550. failedPredicatesMap[node.Name] = []predicates.PredicateFailureReason{
  551. predicates.NewInsufficientResourceError(v1.ResourceCPU, 4000, 0, 2000),
  552. predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
  553. }
  554. }
  555. scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
  556. informerFactory.Start(stop)
  557. informerFactory.WaitForCacheSync(stop)
  558. queuedPodStore.Add(podWithTooBigResourceRequests)
  559. scheduler.scheduleOne()
  560. select {
  561. case err := <-errChan:
  562. expectErr := &core.FitError{
  563. Pod: podWithTooBigResourceRequests,
  564. NumAllNodes: len(nodes),
  565. FailedPredicates: failedPredicatesMap,
  566. }
  567. if len(fmt.Sprint(expectErr)) > 150 {
  568. t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
  569. }
  570. if !reflect.DeepEqual(expectErr, err) {
  571. t.Errorf("\n err \nWANT=%+v,\nGOT=%+v", expectErr, err)
  572. }
  573. case <-time.After(wait.ForeverTestTimeout):
  574. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  575. }
  576. }
  577. // queuedPodStore: pods queued before processing.
  578. // scache: scheduler cache that might contain assumed pods.
  579. func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
  580. algo := core.NewGenericScheduler(
  581. scache,
  582. internalqueue.NewSchedulingQueue(nil, nil),
  583. predicateMap,
  584. predicates.EmptyPredicateMetadataProducer,
  585. []priorities.PriorityConfig{},
  586. priorities.EmptyPriorityMetadataProducer,
  587. EmptyFramework,
  588. []algorithm.SchedulerExtender{},
  589. nil,
  590. informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
  591. informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
  592. false,
  593. false,
  594. schedulerapi.DefaultPercentageOfNodesToScore,
  595. false,
  596. )
  597. bindingChan := make(chan *v1.Binding, 1)
  598. errChan := make(chan error, 1)
  599. config := &factory.Config{
  600. SchedulerCache: scache,
  601. NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
  602. Algorithm: algo,
  603. GetBinder: func(pod *v1.Pod) factory.Binder {
  604. return fakeBinder{func(b *v1.Binding) error {
  605. bindingChan <- b
  606. return nil
  607. }}
  608. },
  609. NextPod: func() *v1.Pod {
  610. return clientcache.Pop(queuedPodStore).(*v1.Pod)
  611. },
  612. Error: func(p *v1.Pod, err error) {
  613. errChan <- err
  614. },
  615. Recorder: &record.FakeRecorder{},
  616. PodConditionUpdater: fakePodConditionUpdater{},
  617. PodPreemptor: fakePodPreemptor{},
  618. Framework: EmptyFramework,
  619. VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
  620. }
  621. if recorder != nil {
  622. config.Recorder = recorder
  623. }
  624. sched := NewFromConfig(config)
  625. return sched, bindingChan, errChan
  626. }
  627. func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
  628. framework, _ := framework.NewFramework(EmptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
  629. algo := core.NewGenericScheduler(
  630. scache,
  631. internalqueue.NewSchedulingQueue(nil, nil),
  632. predicateMap,
  633. predicates.EmptyPredicateMetadataProducer,
  634. []priorities.PriorityConfig{},
  635. priorities.EmptyPriorityMetadataProducer,
  636. framework,
  637. []algorithm.SchedulerExtender{},
  638. nil,
  639. informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
  640. informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
  641. false,
  642. false,
  643. schedulerapi.DefaultPercentageOfNodesToScore,
  644. false,
  645. )
  646. bindingChan := make(chan *v1.Binding, 2)
  647. sched := NewFromConfig(&factory.Config{
  648. SchedulerCache: scache,
  649. NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
  650. Algorithm: algo,
  651. GetBinder: func(pod *v1.Pod) factory.Binder {
  652. return fakeBinder{func(b *v1.Binding) error {
  653. time.Sleep(bindingTime)
  654. bindingChan <- b
  655. return nil
  656. }}
  657. },
  658. WaitForCacheSync: func() bool {
  659. return true
  660. },
  661. NextPod: func() *v1.Pod {
  662. return clientcache.Pop(queuedPodStore).(*v1.Pod)
  663. },
  664. Error: func(p *v1.Pod, err error) {
  665. queuedPodStore.AddIfNotPresent(p)
  666. },
  667. Recorder: &record.FakeRecorder{},
  668. PodConditionUpdater: fakePodConditionUpdater{},
  669. PodPreemptor: fakePodPreemptor{},
  670. StopEverything: stop,
  671. Framework: framework,
  672. VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
  673. })
  674. return sched, bindingChan
  675. }
  676. func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
  677. testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  678. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  679. pod := podWithID("foo", "")
  680. pod.Namespace = "foo-ns"
  681. pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
  682. VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
  683. queuedPodStore.Add(pod)
  684. scache := internalcache.New(10*time.Minute, stop)
  685. scache.AddNode(&testNode)
  686. testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
  687. client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
  688. informerFactory := informers.NewSharedInformerFactory(client, 0)
  689. predicateMap := map[string]predicates.FitPredicate{
  690. predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
  691. }
  692. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"})
  693. s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
  694. informerFactory.Start(stop)
  695. informerFactory.WaitForCacheSync(stop)
  696. s.config.VolumeBinder = fakeVolumeBinder
  697. return s, bindingChan, errChan
  698. }
  699. // This is a workaround because golint complains that errors cannot
  700. // end with punctuation. However, the real predicate error message does
  701. // end with a period.
  702. func makePredicateError(failReason string) error {
  703. s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
  704. return fmt.Errorf(s)
  705. }
  706. func TestSchedulerWithVolumeBinding(t *testing.T) {
  707. findErr := fmt.Errorf("find err")
  708. assumeErr := fmt.Errorf("assume err")
  709. bindErr := fmt.Errorf("bind err")
  710. eventBroadcaster := record.NewBroadcaster()
  711. eventBroadcaster.StartLogging(t.Logf).Stop()
  712. // This can be small because we wait for pod to finish scheduling first
  713. chanTimeout := 2 * time.Second
  714. table := []struct {
  715. name string
  716. expectError error
  717. expectPodBind *v1.Binding
  718. expectAssumeCalled bool
  719. expectBindCalled bool
  720. eventReason string
  721. volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig
  722. }{
  723. {
  724. name: "all bound",
  725. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  726. AllBound: true,
  727. FindUnboundSatsified: true,
  728. FindBoundSatsified: true,
  729. },
  730. expectAssumeCalled: true,
  731. expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
  732. eventReason: "Scheduled",
  733. },
  734. {
  735. name: "bound/invalid pv affinity",
  736. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  737. AllBound: true,
  738. FindUnboundSatsified: true,
  739. FindBoundSatsified: false,
  740. },
  741. eventReason: "FailedScheduling",
  742. expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
  743. },
  744. {
  745. name: "unbound/no matches",
  746. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  747. FindUnboundSatsified: false,
  748. FindBoundSatsified: true,
  749. },
  750. eventReason: "FailedScheduling",
  751. expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
  752. },
  753. {
  754. name: "bound and unbound unsatisfied",
  755. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  756. FindUnboundSatsified: false,
  757. FindBoundSatsified: false,
  758. },
  759. eventReason: "FailedScheduling",
  760. expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
  761. },
  762. {
  763. name: "unbound/found matches/bind succeeds",
  764. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  765. FindUnboundSatsified: true,
  766. FindBoundSatsified: true,
  767. },
  768. expectAssumeCalled: true,
  769. expectBindCalled: true,
  770. expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
  771. eventReason: "Scheduled",
  772. },
  773. {
  774. name: "predicate error",
  775. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  776. FindErr: findErr,
  777. },
  778. eventReason: "FailedScheduling",
  779. expectError: findErr,
  780. },
  781. {
  782. name: "assume error",
  783. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  784. FindUnboundSatsified: true,
  785. FindBoundSatsified: true,
  786. AssumeErr: assumeErr,
  787. },
  788. expectAssumeCalled: true,
  789. eventReason: "FailedScheduling",
  790. expectError: assumeErr,
  791. },
  792. {
  793. name: "bind error",
  794. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  795. FindUnboundSatsified: true,
  796. FindBoundSatsified: true,
  797. BindErr: bindErr,
  798. },
  799. expectAssumeCalled: true,
  800. expectBindCalled: true,
  801. eventReason: "FailedScheduling",
  802. expectError: bindErr,
  803. },
  804. }
  805. for _, item := range table {
  806. t.Run(item.name, func(t *testing.T) {
  807. stop := make(chan struct{})
  808. fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
  809. internalBinder, ok := fakeVolumeBinder.Binder.(*volumescheduling.FakeVolumeBinder)
  810. if !ok {
  811. t.Fatalf("Failed to get fake volume binder")
  812. }
  813. s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
  814. eventChan := make(chan struct{})
  815. events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
  816. if e, a := item.eventReason, e.Reason; e != a {
  817. t.Errorf("expected %v, got %v", e, a)
  818. }
  819. close(eventChan)
  820. })
  821. s.scheduleOne()
  822. // Wait for pod to succeed or fail scheduling
  823. select {
  824. case <-eventChan:
  825. case <-time.After(wait.ForeverTestTimeout):
  826. t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
  827. }
  828. events.Stop()
  829. // Wait for scheduling to return an error
  830. select {
  831. case err := <-errChan:
  832. if item.expectError == nil || !reflect.DeepEqual(item.expectError.Error(), err.Error()) {
  833. t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, err)
  834. }
  835. case <-time.After(chanTimeout):
  836. if item.expectError != nil {
  837. t.Errorf("did not receive error after %v", chanTimeout)
  838. }
  839. }
  840. // Wait for pod to succeed binding
  841. select {
  842. case b := <-bindingChan:
  843. if !reflect.DeepEqual(item.expectPodBind, b) {
  844. t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, b)
  845. }
  846. case <-time.After(chanTimeout):
  847. if item.expectPodBind != nil {
  848. t.Errorf("did not receive pod binding after %v", chanTimeout)
  849. }
  850. }
  851. if item.expectAssumeCalled != internalBinder.AssumeCalled {
  852. t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
  853. }
  854. if item.expectBindCalled != internalBinder.BindCalled {
  855. t.Errorf("expectedBindCall %v", item.expectBindCalled)
  856. }
  857. close(stop)
  858. })
  859. }
  860. }
  861. func TestInitPolicyFromFile(t *testing.T) {
  862. dir, err := ioutil.TempDir(os.TempDir(), "policy")
  863. if err != nil {
  864. t.Errorf("unexpected error: %v", err)
  865. }
  866. defer os.RemoveAll(dir)
  867. for i, test := range []struct {
  868. policy string
  869. expectedPredicates sets.String
  870. expectedPrioritizers sets.String
  871. }{
  872. // Test json format policy file
  873. {
  874. policy: `{
  875. "kind" : "Policy",
  876. "apiVersion" : "v1",
  877. "predicates" : [
  878. {"name" : "PredicateOne"},
  879. {"name" : "PredicateTwo"}
  880. ],
  881. "priorities" : [
  882. {"name" : "PriorityOne", "weight" : 1},
  883. {"name" : "PriorityTwo", "weight" : 5}
  884. ]
  885. }`,
  886. expectedPredicates: sets.NewString(
  887. "PredicateOne",
  888. "PredicateTwo",
  889. ),
  890. expectedPrioritizers: sets.NewString(
  891. "PriorityOne",
  892. "PriorityTwo",
  893. ),
  894. },
  895. // Test yaml format policy file
  896. {
  897. policy: `apiVersion: v1
  898. kind: Policy
  899. predicates:
  900. - name: PredicateOne
  901. - name: PredicateTwo
  902. priorities:
  903. - name: PriorityOne
  904. weight: 1
  905. - name: PriorityTwo
  906. weight: 5
  907. `,
  908. expectedPredicates: sets.NewString(
  909. "PredicateOne",
  910. "PredicateTwo",
  911. ),
  912. expectedPrioritizers: sets.NewString(
  913. "PriorityOne",
  914. "PriorityTwo",
  915. ),
  916. },
  917. } {
  918. file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
  919. fullPath := path.Join(dir, file)
  920. if err := ioutil.WriteFile(fullPath, []byte(test.policy), 0644); err != nil {
  921. t.Fatalf("Failed writing a policy config file: %v", err)
  922. }
  923. policy := &schedulerapi.Policy{}
  924. if err := initPolicyFromFile(fullPath, policy); err != nil {
  925. t.Fatalf("Failed writing a policy config file: %v", err)
  926. }
  927. // Verify that the policy is initialized correctly.
  928. schedPredicates := sets.NewString()
  929. for _, p := range policy.Predicates {
  930. schedPredicates.Insert(p.Name)
  931. }
  932. schedPrioritizers := sets.NewString()
  933. for _, p := range policy.Priorities {
  934. schedPrioritizers.Insert(p.Name)
  935. }
  936. if !schedPredicates.Equal(test.expectedPredicates) {
  937. t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
  938. }
  939. if !schedPrioritizers.Equal(test.expectedPrioritizers) {
  940. t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
  941. }
  942. }
  943. }