scheduler_test.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/google/go-cmp/cmp"
  25. v1 "k8s.io/api/core/v1"
  26. "k8s.io/api/events/v1beta1"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/labels"
  30. "k8s.io/apimachinery/pkg/runtime"
  31. "k8s.io/apimachinery/pkg/types"
  32. "k8s.io/apimachinery/pkg/util/sets"
  33. "k8s.io/apimachinery/pkg/util/wait"
  34. "k8s.io/client-go/informers"
  35. clientsetfake "k8s.io/client-go/kubernetes/fake"
  36. "k8s.io/client-go/kubernetes/scheme"
  37. clienttesting "k8s.io/client-go/testing"
  38. clientcache "k8s.io/client-go/tools/cache"
  39. "k8s.io/client-go/tools/events"
  40. volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
  41. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  42. "k8s.io/kubernetes/pkg/scheduler/core"
  43. frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
  44. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
  45. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
  46. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
  47. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
  48. "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
  49. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  50. internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  51. fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
  52. internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
  53. st "k8s.io/kubernetes/pkg/scheduler/testing"
  54. "k8s.io/kubernetes/pkg/scheduler/volumebinder"
  55. )
  56. type fakePodConditionUpdater struct{}
  57. func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error {
  58. return nil
  59. }
  60. type fakePodPreemptor struct{}
  61. func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
  62. return pod, nil
  63. }
  64. func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
  65. return nil
  66. }
  67. func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
  68. return nil
  69. }
  70. func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
  71. return nil
  72. }
  73. func podWithID(id, desiredHost string) *v1.Pod {
  74. return &v1.Pod{
  75. ObjectMeta: metav1.ObjectMeta{
  76. Name: id,
  77. UID: types.UID(id),
  78. SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
  79. },
  80. Spec: v1.PodSpec{
  81. NodeName: desiredHost,
  82. },
  83. }
  84. }
  85. func deletingPod(id string) *v1.Pod {
  86. deletionTimestamp := metav1.Now()
  87. return &v1.Pod{
  88. ObjectMeta: metav1.ObjectMeta{
  89. Name: id,
  90. UID: types.UID(id),
  91. DeletionTimestamp: &deletionTimestamp,
  92. SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
  93. },
  94. Spec: v1.PodSpec{
  95. NodeName: "",
  96. },
  97. }
  98. }
  99. func podWithPort(id, desiredHost string, port int) *v1.Pod {
  100. pod := podWithID(id, desiredHost)
  101. pod.Spec.Containers = []v1.Container{
  102. {Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
  103. }
  104. return pod
  105. }
  106. func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
  107. pod := podWithID(id, desiredHost)
  108. pod.Spec.Containers = []v1.Container{
  109. {Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
  110. }
  111. return pod
  112. }
  113. type mockScheduler struct {
  114. result core.ScheduleResult
  115. err error
  116. }
  117. func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
  118. return es.result, es.err
  119. }
  120. func (es mockScheduler) Extenders() []core.SchedulerExtender {
  121. return nil
  122. }
  123. func (es mockScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
  124. return nil, nil, nil, nil
  125. }
  126. func (es mockScheduler) Snapshot() error {
  127. return nil
  128. }
  129. func (es mockScheduler) Framework() framework.Framework {
  130. return nil
  131. }
  132. func TestSchedulerCreation(t *testing.T) {
  133. client := clientsetfake.NewSimpleClientset()
  134. informerFactory := informers.NewSharedInformerFactory(client, 0)
  135. eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
  136. stopCh := make(chan struct{})
  137. defer close(stopCh)
  138. _, err := New(client,
  139. informerFactory,
  140. NewPodInformer(client, 0),
  141. eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
  142. stopCh,
  143. WithPodInitialBackoffSeconds(1),
  144. WithPodMaxBackoffSeconds(10),
  145. )
  146. if err != nil {
  147. t.Fatalf("Failed to create scheduler: %v", err)
  148. }
  149. // Test case for when a plugin name in frameworkOutOfTreeRegistry already exist in defaultRegistry.
  150. fakeFrameworkPluginName := ""
  151. for name := range frameworkplugins.NewInTreeRegistry() {
  152. fakeFrameworkPluginName = name
  153. break
  154. }
  155. registryFake := map[string]framework.PluginFactory{
  156. fakeFrameworkPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
  157. return nil, nil
  158. },
  159. }
  160. _, err = New(client,
  161. informerFactory,
  162. NewPodInformer(client, 0),
  163. eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
  164. stopCh,
  165. WithPodInitialBackoffSeconds(1),
  166. WithPodMaxBackoffSeconds(10),
  167. WithFrameworkOutOfTreeRegistry(registryFake),
  168. )
  169. if err == nil {
  170. t.Fatalf("Create scheduler should fail")
  171. }
  172. }
  173. func TestSchedulerScheduleOne(t *testing.T) {
  174. testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  175. client := clientsetfake.NewSimpleClientset(&testNode)
  176. eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
  177. errS := errors.New("scheduler")
  178. errB := errors.New("binder")
  179. table := []struct {
  180. name string
  181. injectBindError error
  182. sendPod *v1.Pod
  183. algo core.ScheduleAlgorithm
  184. expectErrorPod *v1.Pod
  185. expectForgetPod *v1.Pod
  186. expectAssumedPod *v1.Pod
  187. expectError error
  188. expectBind *v1.Binding
  189. eventReason string
  190. }{
  191. {
  192. name: "bind assumed pod scheduled",
  193. sendPod: podWithID("foo", ""),
  194. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
  195. expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
  196. expectAssumedPod: podWithID("foo", testNode.Name),
  197. eventReason: "Scheduled",
  198. },
  199. {
  200. name: "error pod failed scheduling",
  201. sendPod: podWithID("foo", ""),
  202. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
  203. expectError: errS,
  204. expectErrorPod: podWithID("foo", ""),
  205. eventReason: "FailedScheduling",
  206. },
  207. {
  208. name: "error bind forget pod failed scheduling",
  209. sendPod: podWithID("foo", ""),
  210. algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
  211. expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
  212. expectAssumedPod: podWithID("foo", testNode.Name),
  213. injectBindError: errB,
  214. expectError: errors.New("plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"),
  215. expectErrorPod: podWithID("foo", testNode.Name),
  216. expectForgetPod: podWithID("foo", testNode.Name),
  217. eventReason: "FailedScheduling",
  218. }, {
  219. name: "deleting pod",
  220. sendPod: deletingPod("foo"),
  221. algo: mockScheduler{core.ScheduleResult{}, nil},
  222. eventReason: "FailedScheduling",
  223. },
  224. }
  225. stop := make(chan struct{})
  226. defer close(stop)
  227. informerFactory := informers.NewSharedInformerFactory(client, 0)
  228. informerFactory.Start(stop)
  229. informerFactory.WaitForCacheSync(stop)
  230. for _, item := range table {
  231. t.Run(item.name, func(t *testing.T) {
  232. var gotError error
  233. var gotPod *v1.Pod
  234. var gotForgetPod *v1.Pod
  235. var gotAssumedPod *v1.Pod
  236. var gotBinding *v1.Binding
  237. sCache := &fakecache.Cache{
  238. ForgetFunc: func(pod *v1.Pod) {
  239. gotForgetPod = pod
  240. },
  241. AssumeFunc: func(pod *v1.Pod) {
  242. gotAssumedPod = pod
  243. },
  244. IsAssumedPodFunc: func(pod *v1.Pod) bool {
  245. if pod == nil || gotAssumedPod == nil {
  246. return false
  247. }
  248. return pod.UID == gotAssumedPod.UID
  249. },
  250. }
  251. client := clientsetfake.NewSimpleClientset(item.sendPod)
  252. client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
  253. if action.GetSubresource() != "binding" {
  254. return false, nil, nil
  255. }
  256. gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
  257. return true, gotBinding, item.injectBindError
  258. })
  259. fwk, err := st.NewFramework([]st.RegisterPluginFunc{
  260. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  261. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  262. }, framework.WithClientSet(client))
  263. if err != nil {
  264. t.Fatal(err)
  265. }
  266. s := &Scheduler{
  267. SchedulerCache: sCache,
  268. Algorithm: item.algo,
  269. podConditionUpdater: fakePodConditionUpdater{},
  270. Error: func(p *framework.PodInfo, err error) {
  271. gotPod = p.Pod
  272. gotError = err
  273. },
  274. NextPod: func() *framework.PodInfo {
  275. return &framework.PodInfo{Pod: item.sendPod}
  276. },
  277. Framework: fwk,
  278. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
  279. VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
  280. }
  281. called := make(chan struct{})
  282. stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
  283. e, _ := obj.(*v1beta1.Event)
  284. if e.Reason != item.eventReason {
  285. t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
  286. }
  287. close(called)
  288. })
  289. s.scheduleOne(context.Background())
  290. <-called
  291. if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
  292. t.Errorf("assumed pod: wanted %v, got %v", e, a)
  293. }
  294. if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
  295. t.Errorf("error pod: wanted %v, got %v", e, a)
  296. }
  297. if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) {
  298. t.Errorf("forget pod: wanted %v, got %v", e, a)
  299. }
  300. if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
  301. t.Errorf("error: wanted %v, got %v", e, a)
  302. }
  303. if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
  304. t.Errorf("got binding diff (-want, +got): %s", diff)
  305. }
  306. stopFunc()
  307. })
  308. }
  309. }
  310. func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
  311. stop := make(chan struct{})
  312. defer close(stop)
  313. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  314. scache := internalcache.New(100*time.Millisecond, stop)
  315. pod := podWithPort("pod.Name", "", 8080)
  316. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  317. scache.AddNode(&node)
  318. client := clientsetfake.NewSimpleClientset(&node)
  319. informerFactory := informers.NewSharedInformerFactory(client, 0)
  320. fns := []st.RegisterPluginFunc{
  321. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  322. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  323. st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
  324. }
  325. scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...)
  326. waitPodExpireChan := make(chan struct{})
  327. timeout := make(chan struct{})
  328. errChan := make(chan error)
  329. go func() {
  330. for {
  331. select {
  332. case <-timeout:
  333. return
  334. default:
  335. }
  336. pods, err := scache.List(labels.Everything())
  337. if err != nil {
  338. errChan <- fmt.Errorf("cache.List failed: %v", err)
  339. return
  340. }
  341. if len(pods) == 0 {
  342. close(waitPodExpireChan)
  343. return
  344. }
  345. time.Sleep(100 * time.Millisecond)
  346. }
  347. }()
  348. // waiting for the assumed pod to expire
  349. select {
  350. case err := <-errChan:
  351. t.Fatal(err)
  352. case <-waitPodExpireChan:
  353. case <-time.After(wait.ForeverTestTimeout):
  354. close(timeout)
  355. t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
  356. }
  357. // We use conflicted pod ports to incur fit predicate failure if first pod not removed.
  358. secondPod := podWithPort("bar", "", 8080)
  359. queuedPodStore.Add(secondPod)
  360. scheduler.scheduleOne(context.Background())
  361. select {
  362. case b := <-bindingChan:
  363. expectBinding := &v1.Binding{
  364. ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
  365. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  366. }
  367. if !reflect.DeepEqual(expectBinding, b) {
  368. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  369. }
  370. case <-time.After(wait.ForeverTestTimeout):
  371. t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
  372. }
  373. }
  374. func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
  375. stop := make(chan struct{})
  376. defer close(stop)
  377. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  378. scache := internalcache.New(10*time.Minute, stop)
  379. firstPod := podWithPort("pod.Name", "", 8080)
  380. node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  381. scache.AddNode(&node)
  382. client := clientsetfake.NewSimpleClientset(&node)
  383. informerFactory := informers.NewSharedInformerFactory(client, 0)
  384. fns := []st.RegisterPluginFunc{
  385. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  386. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  387. st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
  388. }
  389. scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...)
  390. // We use conflicted pod ports to incur fit predicate failure.
  391. secondPod := podWithPort("bar", "", 8080)
  392. queuedPodStore.Add(secondPod)
  393. // queuedPodStore: [bar:8080]
  394. // cache: [(assumed)foo:8080]
  395. scheduler.scheduleOne(context.Background())
  396. select {
  397. case err := <-errChan:
  398. expectErr := &core.FitError{
  399. Pod: secondPod,
  400. NumAllNodes: 1,
  401. FilteredNodesStatuses: framework.NodeToStatusMap{
  402. node.Name: framework.NewStatus(
  403. framework.Unschedulable,
  404. nodeports.ErrReason,
  405. ),
  406. },
  407. }
  408. if !reflect.DeepEqual(expectErr, err) {
  409. t.Errorf("err want=%v, get=%v", expectErr, err)
  410. }
  411. case <-time.After(wait.ForeverTestTimeout):
  412. t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
  413. }
  414. // We mimic the workflow of cache behavior when a pod is removed by user.
  415. // Note: if the schedulernodeinfo timeout would be super short, the first pod would expire
  416. // and would be removed itself (without any explicit actions on schedulernodeinfo). Even in that case,
  417. // explicitly AddPod will as well correct the behavior.
  418. firstPod.Spec.NodeName = node.Name
  419. if err := scache.AddPod(firstPod); err != nil {
  420. t.Fatalf("err: %v", err)
  421. }
  422. if err := scache.RemovePod(firstPod); err != nil {
  423. t.Fatalf("err: %v", err)
  424. }
  425. queuedPodStore.Add(secondPod)
  426. scheduler.scheduleOne(context.Background())
  427. select {
  428. case b := <-bindingChan:
  429. expectBinding := &v1.Binding{
  430. ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
  431. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  432. }
  433. if !reflect.DeepEqual(expectBinding, b) {
  434. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  435. }
  436. case <-time.After(wait.ForeverTestTimeout):
  437. t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
  438. }
  439. }
  440. // queuedPodStore: pods queued before processing.
  441. // cache: scheduler cache that might contain assumed pods.
  442. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
  443. informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
  444. scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
  445. informerFactory.Start(stop)
  446. informerFactory.WaitForCacheSync(stop)
  447. queuedPodStore.Add(pod)
  448. // queuedPodStore: [foo:8080]
  449. // cache: []
  450. scheduler.scheduleOne(context.Background())
  451. // queuedPodStore: []
  452. // cache: [(assumed)foo:8080]
  453. select {
  454. case b := <-bindingChan:
  455. expectBinding := &v1.Binding{
  456. ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
  457. Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
  458. }
  459. if !reflect.DeepEqual(expectBinding, b) {
  460. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  461. }
  462. case <-time.After(wait.ForeverTestTimeout):
  463. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  464. }
  465. return scheduler, bindingChan, errChan
  466. }
  467. func TestSchedulerFailedSchedulingReasons(t *testing.T) {
  468. stop := make(chan struct{})
  469. defer close(stop)
  470. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  471. scache := internalcache.New(10*time.Minute, stop)
  472. // Design the baseline for the pods, and we will make nodes that dont fit it later.
  473. var cpu = int64(4)
  474. var mem = int64(500)
  475. podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
  476. v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
  477. v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
  478. }, v1.ResourceList{
  479. v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
  480. v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
  481. })
  482. // create several nodes which cannot schedule the above pod
  483. var nodes []*v1.Node
  484. var objects []runtime.Object
  485. for i := 0; i < 100; i++ {
  486. uid := fmt.Sprintf("machine%v", i)
  487. node := v1.Node{
  488. ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
  489. Status: v1.NodeStatus{
  490. Capacity: v1.ResourceList{
  491. v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
  492. v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
  493. v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  494. },
  495. Allocatable: v1.ResourceList{
  496. v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
  497. v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
  498. v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  499. }},
  500. }
  501. scache.AddNode(&node)
  502. nodes = append(nodes, &node)
  503. objects = append(objects, &node)
  504. }
  505. client := clientsetfake.NewSimpleClientset(objects...)
  506. informerFactory := informers.NewSharedInformerFactory(client, 0)
  507. // Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
  508. failedNodeStatues := framework.NodeToStatusMap{}
  509. for _, node := range nodes {
  510. failedNodeStatues[node.Name] = framework.NewStatus(
  511. framework.Unschedulable,
  512. fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
  513. fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
  514. )
  515. }
  516. fns := []st.RegisterPluginFunc{
  517. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  518. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  519. st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
  520. }
  521. scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
  522. informerFactory.Start(stop)
  523. informerFactory.WaitForCacheSync(stop)
  524. queuedPodStore.Add(podWithTooBigResourceRequests)
  525. scheduler.scheduleOne(context.Background())
  526. select {
  527. case err := <-errChan:
  528. expectErr := &core.FitError{
  529. Pod: podWithTooBigResourceRequests,
  530. NumAllNodes: len(nodes),
  531. FilteredNodesStatuses: failedNodeStatues,
  532. }
  533. if len(fmt.Sprint(expectErr)) > 150 {
  534. t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
  535. }
  536. if !reflect.DeepEqual(expectErr, err) {
  537. t.Errorf("\n err \nWANT=%+v,\nGOT=%+v", expectErr, err)
  538. }
  539. case <-time.After(wait.ForeverTestTimeout):
  540. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  541. }
  542. }
  543. // queuedPodStore: pods queued before processing.
  544. // scache: scheduler cache that might contain assumed pods.
  545. func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
  546. if fakeVolumeBinder == nil {
  547. // Create default volume binder if it didn't set.
  548. fakeVolumeBinder = volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true})
  549. }
  550. bindingChan := make(chan *v1.Binding, 1)
  551. client := clientsetfake.NewSimpleClientset()
  552. client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
  553. var b *v1.Binding
  554. if action.GetSubresource() == "binding" {
  555. b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
  556. bindingChan <- b
  557. }
  558. return true, b, nil
  559. })
  560. fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(fakeVolumeBinder))
  561. algo := core.NewGenericScheduler(
  562. scache,
  563. internalqueue.NewSchedulingQueue(nil),
  564. internalcache.NewEmptySnapshot(),
  565. fwk,
  566. []core.SchedulerExtender{},
  567. nil,
  568. informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
  569. informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
  570. false,
  571. schedulerapi.DefaultPercentageOfNodesToScore,
  572. false,
  573. )
  574. errChan := make(chan error, 1)
  575. sched := &Scheduler{
  576. SchedulerCache: scache,
  577. Algorithm: algo,
  578. NextPod: func() *framework.PodInfo {
  579. return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
  580. },
  581. Error: func(p *framework.PodInfo, err error) {
  582. errChan <- err
  583. },
  584. Recorder: &events.FakeRecorder{},
  585. podConditionUpdater: fakePodConditionUpdater{},
  586. podPreemptor: fakePodPreemptor{},
  587. Framework: fwk,
  588. VolumeBinder: fakeVolumeBinder,
  589. }
  590. if recorder != nil {
  591. sched.Recorder = recorder
  592. }
  593. return sched, bindingChan, errChan
  594. }
  595. func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
  596. testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
  597. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  598. pod := podWithID("foo", "")
  599. pod.Namespace = "foo-ns"
  600. pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
  601. VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
  602. queuedPodStore.Add(pod)
  603. scache := internalcache.New(10*time.Minute, stop)
  604. scache.AddNode(&testNode)
  605. testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
  606. client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
  607. informerFactory := informers.NewSharedInformerFactory(client, 0)
  608. recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
  609. fns := []st.RegisterPluginFunc{
  610. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  611. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  612. st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
  613. }
  614. s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fakeVolumeBinder, fns...)
  615. informerFactory.Start(stop)
  616. informerFactory.WaitForCacheSync(stop)
  617. s.VolumeBinder = fakeVolumeBinder
  618. return s, bindingChan, errChan
  619. }
  620. // This is a workaround because golint complains that errors cannot
  621. // end with punctuation. However, the real predicate error message does
  622. // end with a period.
  623. func makePredicateError(failReason string) error {
  624. s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
  625. return fmt.Errorf(s)
  626. }
  627. func TestSchedulerWithVolumeBinding(t *testing.T) {
  628. findErr := fmt.Errorf("find err")
  629. assumeErr := fmt.Errorf("assume err")
  630. bindErr := fmt.Errorf("bind err")
  631. client := clientsetfake.NewSimpleClientset()
  632. eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
  633. // This can be small because we wait for pod to finish scheduling first
  634. chanTimeout := 2 * time.Second
  635. table := []struct {
  636. name string
  637. expectError error
  638. expectPodBind *v1.Binding
  639. expectAssumeCalled bool
  640. expectBindCalled bool
  641. eventReason string
  642. volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig
  643. }{
  644. {
  645. name: "all bound",
  646. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  647. AllBound: true,
  648. FindUnboundSatsified: true,
  649. FindBoundSatsified: true,
  650. },
  651. expectAssumeCalled: true,
  652. expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
  653. eventReason: "Scheduled",
  654. },
  655. {
  656. name: "bound/invalid pv affinity",
  657. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  658. AllBound: true,
  659. FindUnboundSatsified: true,
  660. FindBoundSatsified: false,
  661. },
  662. eventReason: "FailedScheduling",
  663. expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
  664. },
  665. {
  666. name: "unbound/no matches",
  667. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  668. FindUnboundSatsified: false,
  669. FindBoundSatsified: true,
  670. },
  671. eventReason: "FailedScheduling",
  672. expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
  673. },
  674. {
  675. name: "bound and unbound unsatisfied",
  676. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  677. FindUnboundSatsified: false,
  678. FindBoundSatsified: false,
  679. },
  680. eventReason: "FailedScheduling",
  681. expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
  682. },
  683. {
  684. name: "unbound/found matches/bind succeeds",
  685. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  686. FindUnboundSatsified: true,
  687. FindBoundSatsified: true,
  688. },
  689. expectAssumeCalled: true,
  690. expectBindCalled: true,
  691. expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
  692. eventReason: "Scheduled",
  693. },
  694. {
  695. name: "predicate error",
  696. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  697. FindErr: findErr,
  698. },
  699. eventReason: "FailedScheduling",
  700. expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
  701. },
  702. {
  703. name: "assume error",
  704. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  705. FindUnboundSatsified: true,
  706. FindBoundSatsified: true,
  707. AssumeErr: assumeErr,
  708. },
  709. expectAssumeCalled: true,
  710. eventReason: "FailedScheduling",
  711. expectError: assumeErr,
  712. },
  713. {
  714. name: "bind error",
  715. volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
  716. FindUnboundSatsified: true,
  717. FindBoundSatsified: true,
  718. BindErr: bindErr,
  719. },
  720. expectAssumeCalled: true,
  721. expectBindCalled: true,
  722. eventReason: "FailedScheduling",
  723. expectError: bindErr,
  724. },
  725. }
  726. for _, item := range table {
  727. t.Run(item.name, func(t *testing.T) {
  728. stop := make(chan struct{})
  729. fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
  730. internalBinder, ok := fakeVolumeBinder.Binder.(*volumescheduling.FakeVolumeBinder)
  731. if !ok {
  732. t.Fatalf("Failed to get fake volume binder")
  733. }
  734. s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
  735. eventChan := make(chan struct{})
  736. stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
  737. e, _ := obj.(*v1beta1.Event)
  738. if e, a := item.eventReason, e.Reason; e != a {
  739. t.Errorf("expected %v, got %v", e, a)
  740. }
  741. close(eventChan)
  742. })
  743. s.scheduleOne(context.Background())
  744. // Wait for pod to succeed or fail scheduling
  745. select {
  746. case <-eventChan:
  747. case <-time.After(wait.ForeverTestTimeout):
  748. t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
  749. }
  750. stopFunc()
  751. // Wait for scheduling to return an error
  752. select {
  753. case err := <-errChan:
  754. if item.expectError == nil || !reflect.DeepEqual(item.expectError.Error(), err.Error()) {
  755. t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, err)
  756. }
  757. case <-time.After(chanTimeout):
  758. if item.expectError != nil {
  759. t.Errorf("did not receive error after %v", chanTimeout)
  760. }
  761. }
  762. // Wait for pod to succeed binding
  763. select {
  764. case b := <-bindingChan:
  765. if !reflect.DeepEqual(item.expectPodBind, b) {
  766. t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, b)
  767. }
  768. case <-time.After(chanTimeout):
  769. if item.expectPodBind != nil {
  770. t.Errorf("did not receive pod binding after %v", chanTimeout)
  771. }
  772. }
  773. if item.expectAssumeCalled != internalBinder.AssumeCalled {
  774. t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
  775. }
  776. if item.expectBindCalled != internalBinder.BindCalled {
  777. t.Errorf("expectedBindCall %v", item.expectBindCalled)
  778. }
  779. close(stop)
  780. })
  781. }
  782. }
  783. func TestInitPolicyFromFile(t *testing.T) {
  784. dir, err := ioutil.TempDir(os.TempDir(), "policy")
  785. if err != nil {
  786. t.Errorf("unexpected error: %v", err)
  787. }
  788. defer os.RemoveAll(dir)
  789. for i, test := range []struct {
  790. policy string
  791. expectedPredicates sets.String
  792. }{
  793. // Test json format policy file
  794. {
  795. policy: `{
  796. "kind" : "Policy",
  797. "apiVersion" : "v1",
  798. "predicates" : [
  799. {"name" : "PredicateOne"},
  800. {"name" : "PredicateTwo"}
  801. ],
  802. "priorities" : [
  803. {"name" : "PriorityOne", "weight" : 1},
  804. {"name" : "PriorityTwo", "weight" : 5}
  805. ]
  806. }`,
  807. expectedPredicates: sets.NewString(
  808. "PredicateOne",
  809. "PredicateTwo",
  810. ),
  811. },
  812. // Test yaml format policy file
  813. {
  814. policy: `apiVersion: v1
  815. kind: Policy
  816. predicates:
  817. - name: PredicateOne
  818. - name: PredicateTwo
  819. priorities:
  820. - name: PriorityOne
  821. weight: 1
  822. - name: PriorityTwo
  823. weight: 5
  824. `,
  825. expectedPredicates: sets.NewString(
  826. "PredicateOne",
  827. "PredicateTwo",
  828. ),
  829. },
  830. } {
  831. file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
  832. fullPath := path.Join(dir, file)
  833. if err := ioutil.WriteFile(fullPath, []byte(test.policy), 0644); err != nil {
  834. t.Fatalf("Failed writing a policy config file: %v", err)
  835. }
  836. policy := &schedulerapi.Policy{}
  837. if err := initPolicyFromFile(fullPath, policy); err != nil {
  838. t.Fatalf("Failed writing a policy config file: %v", err)
  839. }
  840. // Verify that the policy is initialized correctly.
  841. schedPredicates := sets.NewString()
  842. for _, p := range policy.Predicates {
  843. schedPredicates.Insert(p.Name)
  844. }
  845. schedPrioritizers := sets.NewString()
  846. for _, p := range policy.Priorities {
  847. schedPrioritizers.Insert(p.Name)
  848. }
  849. if !schedPredicates.Equal(test.expectedPredicates) {
  850. t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
  851. }
  852. }
  853. }
  854. func TestSchedulerBinding(t *testing.T) {
  855. table := []struct {
  856. podName string
  857. extenders []core.SchedulerExtender
  858. wantBinderID int
  859. name string
  860. }{
  861. {
  862. name: "the extender is not a binder",
  863. podName: "pod0",
  864. extenders: []core.SchedulerExtender{
  865. &fakeExtender{isBinder: false, interestedPodName: "pod0"},
  866. },
  867. wantBinderID: -1, // default binding.
  868. },
  869. {
  870. name: "one of the extenders is a binder and interested in pod",
  871. podName: "pod0",
  872. extenders: []core.SchedulerExtender{
  873. &fakeExtender{isBinder: false, interestedPodName: "pod0"},
  874. &fakeExtender{isBinder: true, interestedPodName: "pod0"},
  875. },
  876. wantBinderID: 1,
  877. },
  878. {
  879. name: "one of the extenders is a binder, but not interested in pod",
  880. podName: "pod1",
  881. extenders: []core.SchedulerExtender{
  882. &fakeExtender{isBinder: false, interestedPodName: "pod1"},
  883. &fakeExtender{isBinder: true, interestedPodName: "pod0"},
  884. },
  885. wantBinderID: -1, // default binding.
  886. },
  887. }
  888. for _, test := range table {
  889. t.Run(test.name, func(t *testing.T) {
  890. pod := &v1.Pod{
  891. ObjectMeta: metav1.ObjectMeta{
  892. Name: test.podName,
  893. },
  894. }
  895. defaultBound := false
  896. client := clientsetfake.NewSimpleClientset(pod)
  897. client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
  898. if action.GetSubresource() == "binding" {
  899. defaultBound = true
  900. }
  901. return false, nil, nil
  902. })
  903. fwk, err := st.NewFramework([]st.RegisterPluginFunc{
  904. st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
  905. st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
  906. }, framework.WithClientSet(client))
  907. if err != nil {
  908. t.Fatal(err)
  909. }
  910. stop := make(chan struct{})
  911. defer close(stop)
  912. scache := internalcache.New(100*time.Millisecond, stop)
  913. algo := core.NewGenericScheduler(
  914. scache,
  915. nil,
  916. nil,
  917. fwk,
  918. test.extenders,
  919. nil,
  920. nil,
  921. nil,
  922. false,
  923. 0,
  924. false,
  925. )
  926. sched := Scheduler{
  927. Algorithm: algo,
  928. Framework: fwk,
  929. Recorder: &events.FakeRecorder{},
  930. SchedulerCache: scache,
  931. }
  932. err = sched.bind(context.Background(), pod, "node", nil)
  933. if err != nil {
  934. t.Error(err)
  935. }
  936. // Checking default binding.
  937. if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
  938. t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
  939. }
  940. // Checking extenders binding.
  941. for i, ext := range test.extenders {
  942. wantBound := i == test.wantBinderID
  943. if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
  944. t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
  945. }
  946. }
  947. })
  948. }
  949. }