1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package scheduler
- import (
- "context"
- "errors"
- "fmt"
- "io/ioutil"
- "os"
- "path"
- "reflect"
- "testing"
- "time"
- "github.com/google/go-cmp/cmp"
- v1 "k8s.io/api/core/v1"
- "k8s.io/api/events/v1beta1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/informers"
- clientsetfake "k8s.io/client-go/kubernetes/fake"
- "k8s.io/client-go/kubernetes/scheme"
- clienttesting "k8s.io/client-go/testing"
- clientcache "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/events"
- volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
- "k8s.io/kubernetes/pkg/scheduler/core"
- frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
- "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
- fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
- internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
- st "k8s.io/kubernetes/pkg/scheduler/testing"
- "k8s.io/kubernetes/pkg/scheduler/volumebinder"
- )
- type fakePodConditionUpdater struct{}
- func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error {
- return nil
- }
- type fakePodPreemptor struct{}
- func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
- return pod, nil
- }
- func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
- return nil
- }
- func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
- return nil
- }
- func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
- return nil
- }
- func podWithID(id, desiredHost string) *v1.Pod {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: id,
- UID: types.UID(id),
- SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
- },
- Spec: v1.PodSpec{
- NodeName: desiredHost,
- },
- }
- }
- func deletingPod(id string) *v1.Pod {
- deletionTimestamp := metav1.Now()
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: id,
- UID: types.UID(id),
- DeletionTimestamp: &deletionTimestamp,
- SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
- },
- Spec: v1.PodSpec{
- NodeName: "",
- },
- }
- }
- func podWithPort(id, desiredHost string, port int) *v1.Pod {
- pod := podWithID(id, desiredHost)
- pod.Spec.Containers = []v1.Container{
- {Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
- }
- return pod
- }
- func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
- pod := podWithID(id, desiredHost)
- pod.Spec.Containers = []v1.Container{
- {Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
- }
- return pod
- }
- type mockScheduler struct {
- result core.ScheduleResult
- err error
- }
- func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
- return es.result, es.err
- }
- func (es mockScheduler) Extenders() []core.SchedulerExtender {
- return nil
- }
- func (es mockScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
- return nil, nil, nil, nil
- }
- func (es mockScheduler) Snapshot() error {
- return nil
- }
- func (es mockScheduler) Framework() framework.Framework {
- return nil
- }
- func TestSchedulerCreation(t *testing.T) {
- client := clientsetfake.NewSimpleClientset()
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
- stopCh := make(chan struct{})
- defer close(stopCh)
- _, err := New(client,
- informerFactory,
- NewPodInformer(client, 0),
- eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
- stopCh,
- WithPodInitialBackoffSeconds(1),
- WithPodMaxBackoffSeconds(10),
- )
- if err != nil {
- t.Fatalf("Failed to create scheduler: %v", err)
- }
- // Test case for when a plugin name in frameworkOutOfTreeRegistry already exist in defaultRegistry.
- fakeFrameworkPluginName := ""
- for name := range frameworkplugins.NewInTreeRegistry() {
- fakeFrameworkPluginName = name
- break
- }
- registryFake := map[string]framework.PluginFactory{
- fakeFrameworkPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
- return nil, nil
- },
- }
- _, err = New(client,
- informerFactory,
- NewPodInformer(client, 0),
- eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
- stopCh,
- WithPodInitialBackoffSeconds(1),
- WithPodMaxBackoffSeconds(10),
- WithFrameworkOutOfTreeRegistry(registryFake),
- )
- if err == nil {
- t.Fatalf("Create scheduler should fail")
- }
- }
- func TestSchedulerScheduleOne(t *testing.T) {
- testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
- client := clientsetfake.NewSimpleClientset(&testNode)
- eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
- errS := errors.New("scheduler")
- errB := errors.New("binder")
- table := []struct {
- name string
- injectBindError error
- sendPod *v1.Pod
- algo core.ScheduleAlgorithm
- expectErrorPod *v1.Pod
- expectForgetPod *v1.Pod
- expectAssumedPod *v1.Pod
- expectError error
- expectBind *v1.Binding
- eventReason string
- }{
- {
- name: "bind assumed pod scheduled",
- sendPod: podWithID("foo", ""),
- algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
- expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
- expectAssumedPod: podWithID("foo", testNode.Name),
- eventReason: "Scheduled",
- },
- {
- name: "error pod failed scheduling",
- sendPod: podWithID("foo", ""),
- algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
- expectError: errS,
- expectErrorPod: podWithID("foo", ""),
- eventReason: "FailedScheduling",
- },
- {
- name: "error bind forget pod failed scheduling",
- sendPod: podWithID("foo", ""),
- algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
- expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
- expectAssumedPod: podWithID("foo", testNode.Name),
- injectBindError: errB,
- expectError: errors.New("plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"),
- expectErrorPod: podWithID("foo", testNode.Name),
- expectForgetPod: podWithID("foo", testNode.Name),
- eventReason: "FailedScheduling",
- }, {
- name: "deleting pod",
- sendPod: deletingPod("foo"),
- algo: mockScheduler{core.ScheduleResult{}, nil},
- eventReason: "FailedScheduling",
- },
- }
- stop := make(chan struct{})
- defer close(stop)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- informerFactory.Start(stop)
- informerFactory.WaitForCacheSync(stop)
- for _, item := range table {
- t.Run(item.name, func(t *testing.T) {
- var gotError error
- var gotPod *v1.Pod
- var gotForgetPod *v1.Pod
- var gotAssumedPod *v1.Pod
- var gotBinding *v1.Binding
- sCache := &fakecache.Cache{
- ForgetFunc: func(pod *v1.Pod) {
- gotForgetPod = pod
- },
- AssumeFunc: func(pod *v1.Pod) {
- gotAssumedPod = pod
- },
- IsAssumedPodFunc: func(pod *v1.Pod) bool {
- if pod == nil || gotAssumedPod == nil {
- return false
- }
- return pod.UID == gotAssumedPod.UID
- },
- }
- client := clientsetfake.NewSimpleClientset(item.sendPod)
- client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
- if action.GetSubresource() != "binding" {
- return false, nil, nil
- }
- gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
- return true, gotBinding, item.injectBindError
- })
- fwk, err := st.NewFramework([]st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- }, framework.WithClientSet(client))
- if err != nil {
- t.Fatal(err)
- }
- s := &Scheduler{
- SchedulerCache: sCache,
- Algorithm: item.algo,
- podConditionUpdater: fakePodConditionUpdater{},
- Error: func(p *framework.PodInfo, err error) {
- gotPod = p.Pod
- gotError = err
- },
- NextPod: func() *framework.PodInfo {
- return &framework.PodInfo{Pod: item.sendPod}
- },
- Framework: fwk,
- Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
- VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
- }
- called := make(chan struct{})
- stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
- e, _ := obj.(*v1beta1.Event)
- if e.Reason != item.eventReason {
- t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
- }
- close(called)
- })
- s.scheduleOne(context.Background())
- <-called
- if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
- t.Errorf("assumed pod: wanted %v, got %v", e, a)
- }
- if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
- t.Errorf("error pod: wanted %v, got %v", e, a)
- }
- if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) {
- t.Errorf("forget pod: wanted %v, got %v", e, a)
- }
- if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
- t.Errorf("error: wanted %v, got %v", e, a)
- }
- if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
- t.Errorf("got binding diff (-want, +got): %s", diff)
- }
- stopFunc()
- })
- }
- }
- func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
- stop := make(chan struct{})
- defer close(stop)
- queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
- scache := internalcache.New(100*time.Millisecond, stop)
- pod := podWithPort("pod.Name", "", 8080)
- node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
- scache.AddNode(&node)
- client := clientsetfake.NewSimpleClientset(&node)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- fns := []st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
- }
- scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...)
- waitPodExpireChan := make(chan struct{})
- timeout := make(chan struct{})
- errChan := make(chan error)
- go func() {
- for {
- select {
- case <-timeout:
- return
- default:
- }
- pods, err := scache.List(labels.Everything())
- if err != nil {
- errChan <- fmt.Errorf("cache.List failed: %v", err)
- return
- }
- if len(pods) == 0 {
- close(waitPodExpireChan)
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- }()
- // waiting for the assumed pod to expire
- select {
- case err := <-errChan:
- t.Fatal(err)
- case <-waitPodExpireChan:
- case <-time.After(wait.ForeverTestTimeout):
- close(timeout)
- t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
- }
- // We use conflicted pod ports to incur fit predicate failure if first pod not removed.
- secondPod := podWithPort("bar", "", 8080)
- queuedPodStore.Add(secondPod)
- scheduler.scheduleOne(context.Background())
- select {
- case b := <-bindingChan:
- expectBinding := &v1.Binding{
- ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
- Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
- }
- if !reflect.DeepEqual(expectBinding, b) {
- t.Errorf("binding want=%v, get=%v", expectBinding, b)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
- }
- }
- func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
- stop := make(chan struct{})
- defer close(stop)
- queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
- scache := internalcache.New(10*time.Minute, stop)
- firstPod := podWithPort("pod.Name", "", 8080)
- node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
- scache.AddNode(&node)
- client := clientsetfake.NewSimpleClientset(&node)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- fns := []st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
- }
- scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...)
- // We use conflicted pod ports to incur fit predicate failure.
- secondPod := podWithPort("bar", "", 8080)
- queuedPodStore.Add(secondPod)
- // queuedPodStore: [bar:8080]
- // cache: [(assumed)foo:8080]
- scheduler.scheduleOne(context.Background())
- select {
- case err := <-errChan:
- expectErr := &core.FitError{
- Pod: secondPod,
- NumAllNodes: 1,
- FilteredNodesStatuses: framework.NodeToStatusMap{
- node.Name: framework.NewStatus(
- framework.Unschedulable,
- nodeports.ErrReason,
- ),
- },
- }
- if !reflect.DeepEqual(expectErr, err) {
- t.Errorf("err want=%v, get=%v", expectErr, err)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
- }
- // We mimic the workflow of cache behavior when a pod is removed by user.
- // Note: if the schedulernodeinfo timeout would be super short, the first pod would expire
- // and would be removed itself (without any explicit actions on schedulernodeinfo). Even in that case,
- // explicitly AddPod will as well correct the behavior.
- firstPod.Spec.NodeName = node.Name
- if err := scache.AddPod(firstPod); err != nil {
- t.Fatalf("err: %v", err)
- }
- if err := scache.RemovePod(firstPod); err != nil {
- t.Fatalf("err: %v", err)
- }
- queuedPodStore.Add(secondPod)
- scheduler.scheduleOne(context.Background())
- select {
- case b := <-bindingChan:
- expectBinding := &v1.Binding{
- ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
- Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
- }
- if !reflect.DeepEqual(expectBinding, b) {
- t.Errorf("binding want=%v, get=%v", expectBinding, b)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
- }
- }
- // queuedPodStore: pods queued before processing.
- // cache: scheduler cache that might contain assumed pods.
- func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
- informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
- scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
- informerFactory.Start(stop)
- informerFactory.WaitForCacheSync(stop)
- queuedPodStore.Add(pod)
- // queuedPodStore: [foo:8080]
- // cache: []
- scheduler.scheduleOne(context.Background())
- // queuedPodStore: []
- // cache: [(assumed)foo:8080]
- select {
- case b := <-bindingChan:
- expectBinding := &v1.Binding{
- ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
- Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
- }
- if !reflect.DeepEqual(expectBinding, b) {
- t.Errorf("binding want=%v, get=%v", expectBinding, b)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
- }
- return scheduler, bindingChan, errChan
- }
- func TestSchedulerFailedSchedulingReasons(t *testing.T) {
- stop := make(chan struct{})
- defer close(stop)
- queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
- scache := internalcache.New(10*time.Minute, stop)
- // Design the baseline for the pods, and we will make nodes that dont fit it later.
- var cpu = int64(4)
- var mem = int64(500)
- podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
- v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
- v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
- }, v1.ResourceList{
- v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
- v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
- })
- // create several nodes which cannot schedule the above pod
- var nodes []*v1.Node
- var objects []runtime.Object
- for i := 0; i < 100; i++ {
- uid := fmt.Sprintf("machine%v", i)
- node := v1.Node{
- ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
- Status: v1.NodeStatus{
- Capacity: v1.ResourceList{
- v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
- v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
- v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
- },
- Allocatable: v1.ResourceList{
- v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
- v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
- v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
- }},
- }
- scache.AddNode(&node)
- nodes = append(nodes, &node)
- objects = append(objects, &node)
- }
- client := clientsetfake.NewSimpleClientset(objects...)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- // Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
- failedNodeStatues := framework.NodeToStatusMap{}
- for _, node := range nodes {
- failedNodeStatues[node.Name] = framework.NewStatus(
- framework.Unschedulable,
- fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
- fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
- )
- }
- fns := []st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
- }
- scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
- informerFactory.Start(stop)
- informerFactory.WaitForCacheSync(stop)
- queuedPodStore.Add(podWithTooBigResourceRequests)
- scheduler.scheduleOne(context.Background())
- select {
- case err := <-errChan:
- expectErr := &core.FitError{
- Pod: podWithTooBigResourceRequests,
- NumAllNodes: len(nodes),
- FilteredNodesStatuses: failedNodeStatues,
- }
- if len(fmt.Sprint(expectErr)) > 150 {
- t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
- }
- if !reflect.DeepEqual(expectErr, err) {
- t.Errorf("\n err \nWANT=%+v,\nGOT=%+v", expectErr, err)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
- }
- }
- // queuedPodStore: pods queued before processing.
- // scache: scheduler cache that might contain assumed pods.
- func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
- if fakeVolumeBinder == nil {
- // Create default volume binder if it didn't set.
- fakeVolumeBinder = volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true})
- }
- bindingChan := make(chan *v1.Binding, 1)
- client := clientsetfake.NewSimpleClientset()
- client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
- var b *v1.Binding
- if action.GetSubresource() == "binding" {
- b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
- bindingChan <- b
- }
- return true, b, nil
- })
- fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(fakeVolumeBinder))
- algo := core.NewGenericScheduler(
- scache,
- internalqueue.NewSchedulingQueue(nil),
- internalcache.NewEmptySnapshot(),
- fwk,
- []core.SchedulerExtender{},
- nil,
- informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
- informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
- false,
- schedulerapi.DefaultPercentageOfNodesToScore,
- false,
- )
- errChan := make(chan error, 1)
- sched := &Scheduler{
- SchedulerCache: scache,
- Algorithm: algo,
- NextPod: func() *framework.PodInfo {
- return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
- },
- Error: func(p *framework.PodInfo, err error) {
- errChan <- err
- },
- Recorder: &events.FakeRecorder{},
- podConditionUpdater: fakePodConditionUpdater{},
- podPreemptor: fakePodPreemptor{},
- Framework: fwk,
- VolumeBinder: fakeVolumeBinder,
- }
- if recorder != nil {
- sched.Recorder = recorder
- }
- return sched, bindingChan, errChan
- }
- func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
- testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
- queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
- pod := podWithID("foo", "")
- pod.Namespace = "foo-ns"
- pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
- VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
- queuedPodStore.Add(pod)
- scache := internalcache.New(10*time.Minute, stop)
- scache.AddNode(&testNode)
- testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
- client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
- fns := []st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
- }
- s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fakeVolumeBinder, fns...)
- informerFactory.Start(stop)
- informerFactory.WaitForCacheSync(stop)
- s.VolumeBinder = fakeVolumeBinder
- return s, bindingChan, errChan
- }
- // This is a workaround because golint complains that errors cannot
- // end with punctuation. However, the real predicate error message does
- // end with a period.
- func makePredicateError(failReason string) error {
- s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
- return fmt.Errorf(s)
- }
- func TestSchedulerWithVolumeBinding(t *testing.T) {
- findErr := fmt.Errorf("find err")
- assumeErr := fmt.Errorf("assume err")
- bindErr := fmt.Errorf("bind err")
- client := clientsetfake.NewSimpleClientset()
- eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
- // This can be small because we wait for pod to finish scheduling first
- chanTimeout := 2 * time.Second
- table := []struct {
- name string
- expectError error
- expectPodBind *v1.Binding
- expectAssumeCalled bool
- expectBindCalled bool
- eventReason string
- volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig
- }{
- {
- name: "all bound",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- AllBound: true,
- FindUnboundSatsified: true,
- FindBoundSatsified: true,
- },
- expectAssumeCalled: true,
- expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
- eventReason: "Scheduled",
- },
- {
- name: "bound/invalid pv affinity",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- AllBound: true,
- FindUnboundSatsified: true,
- FindBoundSatsified: false,
- },
- eventReason: "FailedScheduling",
- expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
- },
- {
- name: "unbound/no matches",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindUnboundSatsified: false,
- FindBoundSatsified: true,
- },
- eventReason: "FailedScheduling",
- expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
- },
- {
- name: "bound and unbound unsatisfied",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindUnboundSatsified: false,
- FindBoundSatsified: false,
- },
- eventReason: "FailedScheduling",
- expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
- },
- {
- name: "unbound/found matches/bind succeeds",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindUnboundSatsified: true,
- FindBoundSatsified: true,
- },
- expectAssumeCalled: true,
- expectBindCalled: true,
- expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
- eventReason: "Scheduled",
- },
- {
- name: "predicate error",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindErr: findErr,
- },
- eventReason: "FailedScheduling",
- expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
- },
- {
- name: "assume error",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindUnboundSatsified: true,
- FindBoundSatsified: true,
- AssumeErr: assumeErr,
- },
- expectAssumeCalled: true,
- eventReason: "FailedScheduling",
- expectError: assumeErr,
- },
- {
- name: "bind error",
- volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
- FindUnboundSatsified: true,
- FindBoundSatsified: true,
- BindErr: bindErr,
- },
- expectAssumeCalled: true,
- expectBindCalled: true,
- eventReason: "FailedScheduling",
- expectError: bindErr,
- },
- }
- for _, item := range table {
- t.Run(item.name, func(t *testing.T) {
- stop := make(chan struct{})
- fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
- internalBinder, ok := fakeVolumeBinder.Binder.(*volumescheduling.FakeVolumeBinder)
- if !ok {
- t.Fatalf("Failed to get fake volume binder")
- }
- s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
- eventChan := make(chan struct{})
- stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
- e, _ := obj.(*v1beta1.Event)
- if e, a := item.eventReason, e.Reason; e != a {
- t.Errorf("expected %v, got %v", e, a)
- }
- close(eventChan)
- })
- s.scheduleOne(context.Background())
- // Wait for pod to succeed or fail scheduling
- select {
- case <-eventChan:
- case <-time.After(wait.ForeverTestTimeout):
- t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
- }
- stopFunc()
- // Wait for scheduling to return an error
- select {
- case err := <-errChan:
- if item.expectError == nil || !reflect.DeepEqual(item.expectError.Error(), err.Error()) {
- t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, err)
- }
- case <-time.After(chanTimeout):
- if item.expectError != nil {
- t.Errorf("did not receive error after %v", chanTimeout)
- }
- }
- // Wait for pod to succeed binding
- select {
- case b := <-bindingChan:
- if !reflect.DeepEqual(item.expectPodBind, b) {
- t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, b)
- }
- case <-time.After(chanTimeout):
- if item.expectPodBind != nil {
- t.Errorf("did not receive pod binding after %v", chanTimeout)
- }
- }
- if item.expectAssumeCalled != internalBinder.AssumeCalled {
- t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
- }
- if item.expectBindCalled != internalBinder.BindCalled {
- t.Errorf("expectedBindCall %v", item.expectBindCalled)
- }
- close(stop)
- })
- }
- }
- func TestInitPolicyFromFile(t *testing.T) {
- dir, err := ioutil.TempDir(os.TempDir(), "policy")
- if err != nil {
- t.Errorf("unexpected error: %v", err)
- }
- defer os.RemoveAll(dir)
- for i, test := range []struct {
- policy string
- expectedPredicates sets.String
- }{
- // Test json format policy file
- {
- policy: `{
- "kind" : "Policy",
- "apiVersion" : "v1",
- "predicates" : [
- {"name" : "PredicateOne"},
- {"name" : "PredicateTwo"}
- ],
- "priorities" : [
- {"name" : "PriorityOne", "weight" : 1},
- {"name" : "PriorityTwo", "weight" : 5}
- ]
- }`,
- expectedPredicates: sets.NewString(
- "PredicateOne",
- "PredicateTwo",
- ),
- },
- // Test yaml format policy file
- {
- policy: `apiVersion: v1
- kind: Policy
- predicates:
- - name: PredicateOne
- - name: PredicateTwo
- priorities:
- - name: PriorityOne
- weight: 1
- - name: PriorityTwo
- weight: 5
- `,
- expectedPredicates: sets.NewString(
- "PredicateOne",
- "PredicateTwo",
- ),
- },
- } {
- file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
- fullPath := path.Join(dir, file)
- if err := ioutil.WriteFile(fullPath, []byte(test.policy), 0644); err != nil {
- t.Fatalf("Failed writing a policy config file: %v", err)
- }
- policy := &schedulerapi.Policy{}
- if err := initPolicyFromFile(fullPath, policy); err != nil {
- t.Fatalf("Failed writing a policy config file: %v", err)
- }
- // Verify that the policy is initialized correctly.
- schedPredicates := sets.NewString()
- for _, p := range policy.Predicates {
- schedPredicates.Insert(p.Name)
- }
- schedPrioritizers := sets.NewString()
- for _, p := range policy.Priorities {
- schedPrioritizers.Insert(p.Name)
- }
- if !schedPredicates.Equal(test.expectedPredicates) {
- t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
- }
- }
- }
- func TestSchedulerBinding(t *testing.T) {
- table := []struct {
- podName string
- extenders []core.SchedulerExtender
- wantBinderID int
- name string
- }{
- {
- name: "the extender is not a binder",
- podName: "pod0",
- extenders: []core.SchedulerExtender{
- &fakeExtender{isBinder: false, interestedPodName: "pod0"},
- },
- wantBinderID: -1, // default binding.
- },
- {
- name: "one of the extenders is a binder and interested in pod",
- podName: "pod0",
- extenders: []core.SchedulerExtender{
- &fakeExtender{isBinder: false, interestedPodName: "pod0"},
- &fakeExtender{isBinder: true, interestedPodName: "pod0"},
- },
- wantBinderID: 1,
- },
- {
- name: "one of the extenders is a binder, but not interested in pod",
- podName: "pod1",
- extenders: []core.SchedulerExtender{
- &fakeExtender{isBinder: false, interestedPodName: "pod1"},
- &fakeExtender{isBinder: true, interestedPodName: "pod0"},
- },
- wantBinderID: -1, // default binding.
- },
- }
- for _, test := range table {
- t.Run(test.name, func(t *testing.T) {
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: test.podName,
- },
- }
- defaultBound := false
- client := clientsetfake.NewSimpleClientset(pod)
- client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
- if action.GetSubresource() == "binding" {
- defaultBound = true
- }
- return false, nil, nil
- })
- fwk, err := st.NewFramework([]st.RegisterPluginFunc{
- st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
- st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
- }, framework.WithClientSet(client))
- if err != nil {
- t.Fatal(err)
- }
- stop := make(chan struct{})
- defer close(stop)
- scache := internalcache.New(100*time.Millisecond, stop)
- algo := core.NewGenericScheduler(
- scache,
- nil,
- nil,
- fwk,
- test.extenders,
- nil,
- nil,
- nil,
- false,
- 0,
- false,
- )
- sched := Scheduler{
- Algorithm: algo,
- Framework: fwk,
- Recorder: &events.FakeRecorder{},
- SchedulerCache: scache,
- }
- err = sched.bind(context.Background(), pod, "node", nil)
- if err != nil {
- t.Error(err)
- }
- // Checking default binding.
- if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
- t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
- }
- // Checking extenders binding.
- for i, ext := range test.extenders {
- wantBound := i == test.wantBinderID
- if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
- t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
- }
- }
- })
- }
- }
|