12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package daemonset
- import (
- "fmt"
- "net/http/httptest"
- "testing"
- "time"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/flowcontrol"
- "k8s.io/client-go/util/retry"
- "k8s.io/component-base/featuregate"
- featuregatetesting "k8s.io/component-base/featuregate/testing"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/daemon"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/scheduler"
- "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- "k8s.io/kubernetes/pkg/scheduler/factory"
- labelsutil "k8s.io/kubernetes/pkg/util/labels"
- "k8s.io/kubernetes/test/integration/framework"
- )
- var zero = int64(0)
- func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
- masterConfig := framework.NewIntegrationTestMasterConfig()
- _, server, closeFn := framework.RunAMaster(masterConfig)
- config := restclient.Config{Host: server.URL}
- clientSet, err := clientset.NewForConfig(&config)
- if err != nil {
- t.Fatalf("Error in creating clientset: %v", err)
- }
- resyncPeriod := 12 * time.Hour
- informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
- dc, err := daemon.NewDaemonSetsController(
- informers.Apps().V1().DaemonSets(),
- informers.Apps().V1().ControllerRevisions(),
- informers.Core().V1().Pods(),
- informers.Core().V1().Nodes(),
- clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")),
- flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
- )
- if err != nil {
- t.Fatalf("error creating DaemonSets controller: %v", err)
- }
- return server, closeFn, dc, informers, clientSet
- }
- func setupScheduler(
- t *testing.T,
- cs clientset.Interface,
- informerFactory informers.SharedInformerFactory,
- stopCh chan struct{},
- ) {
- // If ScheduleDaemonSetPods is disabled, do not start scheduler.
- if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
- return
- }
- // Enable Features.
- algorithmprovider.ApplyFeatureGates()
- schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
- SchedulerName: v1.DefaultSchedulerName,
- Client: cs,
- NodeInformer: informerFactory.Core().V1().Nodes(),
- PodInformer: informerFactory.Core().V1().Pods(),
- PvInformer: informerFactory.Core().V1().PersistentVolumes(),
- PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
- ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
- ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
- StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
- ServiceInformer: informerFactory.Core().V1().Services(),
- PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
- StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
- HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
- DisablePreemption: false,
- PercentageOfNodesToScore: 100,
- StopCh: stopCh,
- })
- schedulerConfig, err := schedulerConfigFactory.Create()
- if err != nil {
- t.Fatalf("Couldn't create scheduler config: %v", err)
- }
- // TODO: Replace NewFromConfig and AddAllEventHandlers with scheduler.New() in
- // all test/integration tests.
- sched := scheduler.NewFromConfig(schedulerConfig)
- scheduler.AddAllEventHandlers(sched,
- v1.DefaultSchedulerName,
- informerFactory.Core().V1().Nodes(),
- informerFactory.Core().V1().Pods(),
- informerFactory.Core().V1().PersistentVolumes(),
- informerFactory.Core().V1().PersistentVolumeClaims(),
- informerFactory.Core().V1().Services(),
- informerFactory.Storage().V1().StorageClasses(),
- )
- eventBroadcaster := record.NewBroadcaster()
- schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
- legacyscheme.Scheme,
- v1.EventSource{Component: v1.DefaultSchedulerName},
- )
- eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{
- Interface: cs.CoreV1().Events(""),
- })
- algorithmprovider.ApplyFeatureGates()
- go sched.Run()
- }
- func testLabels() map[string]string {
- return map[string]string{"name": "test"}
- }
- func newDaemonSet(name, namespace string) *apps.DaemonSet {
- two := int32(2)
- return &apps.DaemonSet{
- TypeMeta: metav1.TypeMeta{
- Kind: "DaemonSet",
- APIVersion: "apps/v1",
- },
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: name,
- },
- Spec: apps.DaemonSetSpec{
- RevisionHistoryLimit: &two,
- Selector: &metav1.LabelSelector{MatchLabels: testLabels()},
- UpdateStrategy: apps.DaemonSetUpdateStrategy{
- Type: apps.OnDeleteDaemonSetStrategyType,
- },
- Template: v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: testLabels(),
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{{Name: "foo", Image: "bar"}},
- TerminationGracePeriodSeconds: &zero,
- },
- },
- },
- }
- }
- func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) {
- ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
- if err != nil {
- t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
- return
- }
- // We set the nodeSelector to a random label. This label is nearly guaranteed
- // to not be set on any node so the DameonSetController will start deleting
- // daemon pods. Once it's done deleting the daemon pods, it's safe to delete
- // the DaemonSet.
- ds.Spec.Template.Spec.NodeSelector = map[string]string{
- string(uuid.NewUUID()): string(uuid.NewUUID()),
- }
- // force update to avoid version conflict
- ds.ResourceVersion = ""
- if ds, err = cs.AppsV1().DaemonSets(ds.Namespace).Update(ds); err != nil {
- t.Errorf("Failed to update DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
- return
- }
- // Wait for the daemon set controller to kill all the daemon pods.
- if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
- updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
- if err != nil {
- return false, nil
- }
- return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
- }); err != nil {
- t.Errorf("Failed to kill the pods of DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
- return
- }
- falseVar := false
- deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
- if err := cs.AppsV1().DaemonSets(ds.Namespace).Delete(ds.Name, deleteOptions); err != nil {
- t.Errorf("Failed to delete DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
- }
- }
- func newRollbackStrategy() *apps.DaemonSetUpdateStrategy {
- one := intstr.FromInt(1)
- return &apps.DaemonSetUpdateStrategy{
- Type: apps.RollingUpdateDaemonSetStrategyType,
- RollingUpdate: &apps.RollingUpdateDaemonSet{MaxUnavailable: &one},
- }
- }
- func newOnDeleteStrategy() *apps.DaemonSetUpdateStrategy {
- return &apps.DaemonSetUpdateStrategy{
- Type: apps.OnDeleteDaemonSetStrategyType,
- }
- }
- func updateStrategies() []*apps.DaemonSetUpdateStrategy {
- return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
- }
- func featureGates() []featuregate.Feature {
- return []featuregate.Feature{
- features.ScheduleDaemonSetPods,
- }
- }
- func allocatableResources(memory, cpu string) v1.ResourceList {
- return v1.ResourceList{
- v1.ResourceMemory: resource.MustParse(memory),
- v1.ResourceCPU: resource.MustParse(cpu),
- v1.ResourcePods: resource.MustParse("100"),
- }
- }
- func resourcePodSpec(nodeName, memory, cpu string) v1.PodSpec {
- return v1.PodSpec{
- NodeName: nodeName,
- Containers: []v1.Container{
- {
- Name: "foo",
- Image: "bar",
- Resources: v1.ResourceRequirements{
- Requests: v1.ResourceList{
- v1.ResourceMemory: resource.MustParse(memory),
- v1.ResourceCPU: resource.MustParse(cpu),
- },
- },
- },
- },
- TerminationGracePeriodSeconds: &zero,
- }
- }
- func newNode(name string, label map[string]string) *v1.Node {
- return &v1.Node{
- TypeMeta: metav1.TypeMeta{
- Kind: "Node",
- APIVersion: "v1",
- },
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Labels: label,
- Namespace: metav1.NamespaceNone,
- },
- Status: v1.NodeStatus{
- Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
- Allocatable: v1.ResourceList{v1.ResourcePods: resource.MustParse("100")},
- },
- }
- }
- func addNodes(nodeClient corev1client.NodeInterface, startIndex, numNodes int, label map[string]string, t *testing.T) {
- for i := startIndex; i < startIndex+numNodes; i++ {
- _, err := nodeClient.Create(newNode(fmt.Sprintf("node-%d", i), label))
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- }
- }
- func validateDaemonSetPodsAndMarkReady(
- podClient corev1client.PodInterface,
- podInformer cache.SharedIndexInformer,
- numberPods int,
- t *testing.T,
- ) {
- if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
- objects := podInformer.GetIndexer().List()
- if len(objects) != numberPods {
- return false, nil
- }
- for _, object := range objects {
- pod := object.(*v1.Pod)
- ownerReferences := pod.ObjectMeta.OwnerReferences
- if len(ownerReferences) != 1 {
- return false, fmt.Errorf("Pod %s has %d OwnerReferences, expected only 1", pod.Name, len(ownerReferences))
- }
- controllerRef := ownerReferences[0]
- if got, want := controllerRef.Kind, "DaemonSet"; got != want {
- t.Errorf("controllerRef.Kind = %q, want %q", got, want)
- }
- if controllerRef.Controller == nil || *controllerRef.Controller != true {
- t.Errorf("controllerRef.Controller is not set to true")
- }
- if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
- podCopy := pod.DeepCopy()
- podCopy.Status = v1.PodStatus{
- Phase: v1.PodRunning,
- Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
- }
- _, err := podClient.UpdateStatus(podCopy)
- if err != nil {
- return false, err
- }
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- // podUnschedulable returns a condition function that returns true if the given pod
- // gets unschedulable status.
- func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
- return func() (bool, error) {
- pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
- if errors.IsNotFound(err) {
- return false, nil
- }
- if err != nil {
- // This could be a connection error so we want to retry.
- return false, nil
- }
- _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
- return cond != nil && cond.Status == v1.ConditionFalse &&
- cond.Reason == v1.PodReasonUnschedulable, nil
- }
- }
- // waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
- // an error if it does not become unschedulable within the given timeout.
- func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
- return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
- }
- // waitForPodUnschedule waits for a pod to fail scheduling and returns
- // an error if it does not become unschedulable within the timeout duration (30 seconds).
- func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
- return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second)
- }
- // waitForPodsCreated waits for number of pods are created.
- func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
- return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
- objects := podInformer.GetIndexer().List()
- return len(objects) == num, nil
- })
- }
- func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
- return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
- ds, err := c.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if ds == nil {
- return false, nil
- }
- revs, err := c.AppsV1().ControllerRevisions(namespace).List(metav1.ListOptions{})
- if err != nil {
- return false, err
- }
- if revs.Size() == 0 {
- return false, nil
- }
- for _, rev := range revs.Items {
- for _, oref := range rev.OwnerReferences {
- if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
- return true, nil
- }
- }
- }
- return false, nil
- })
- }
- func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
- hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
- name := ds.Name + "-" + hash
- return hash, name
- }
- func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
- ds, err := dsClient.Get(dsName, metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to look up DaemonSet: %v", err)
- }
- collisionCount := ds.Status.CollisionCount
- if *collisionCount != expCount {
- t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
- }
- }
- func validateDaemonSetStatus(
- dsClient appstyped.DaemonSetInterface,
- dsName string,
- expectedNumberReady int32,
- t *testing.T) {
- if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
- ds, err := dsClient.Get(dsName, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- return ds.Status.NumberReady == expectedNumberReady, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func validateFailedPlacementEvent(eventClient corev1client.EventInterface, t *testing.T) {
- if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
- eventList, err := eventClient.List(metav1.ListOptions{})
- if err != nil {
- return false, err
- }
- if len(eventList.Items) == 0 {
- return false, nil
- }
- if len(eventList.Items) > 1 {
- t.Errorf("Expected 1 event got %d", len(eventList.Items))
- }
- event := eventList.Items[0]
- if event.Type != v1.EventTypeWarning {
- t.Errorf("Event type expected %s got %s", v1.EventTypeWarning, event.Type)
- }
- if event.Reason != daemon.FailedPlacementReason {
- t.Errorf("Event reason expected %s got %s", daemon.FailedPlacementReason, event.Reason)
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
- var ds *apps.DaemonSet
- if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
- newDS, err := dsClient.Get(dsName, metav1.GetOptions{})
- if err != nil {
- return err
- }
- updateFunc(newDS)
- ds, err = dsClient.Update(newDS)
- return err
- }); err != nil {
- t.Fatalf("Failed to update DaemonSet: %v", err)
- }
- return ds
- }
- func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) {
- for _, fg := range featureGates() {
- for _, f := range []bool{true, false} {
- func() {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, fg, f)()
- t.Run(fmt.Sprintf("%v (%t)", fg, f), tf)
- }()
- }
- }
- }
- func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
- for _, strategy := range updateStrategies() {
- t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
- func(tt *testing.T) { tf(tt, strategy) })
- }
- }
- func TestOneNodeDaemonLaunchesPod(t *testing.T) {
- forEachFeatureGate(t, func(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- podInformer := informers.Core().V1().Pods().Informer()
- stopCh := make(chan struct{})
- defer close(stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- _, err = nodeClient.Create(newNode("single-node", nil))
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
- validateDaemonSetStatus(dsClient, ds.Name, 1, t)
- })
- })
- }
- func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
- forEachFeatureGate(t, func(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- podInformer := informers.Core().V1().Pods().Informer()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- addNodes(nodeClient, 0, 5, nil, t)
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
- validateDaemonSetStatus(dsClient, ds.Name, 5, t)
- })
- })
- }
- func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
- forEachFeatureGate(t, func(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- podInformer := informers.Core().V1().Pods().Informer()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- ds.Spec.Template.Spec.Affinity = &v1.Affinity{
- NodeAffinity: &v1.NodeAffinity{
- RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
- NodeSelectorTerms: []v1.NodeSelectorTerm{
- {
- MatchExpressions: []v1.NodeSelectorRequirement{
- {
- Key: "zone",
- Operator: v1.NodeSelectorOpIn,
- Values: []string{"test"},
- },
- },
- },
- {
- MatchFields: []v1.NodeSelectorRequirement{
- {
- Key: schedulerapi.NodeFieldSelectorKeyNodeName,
- Operator: v1.NodeSelectorOpIn,
- Values: []string{"node-1"},
- },
- },
- },
- },
- },
- },
- }
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- addNodes(nodeClient, 0, 2, nil, t)
- // Two nodes with labels
- addNodes(nodeClient, 2, 2, map[string]string{
- "zone": "test",
- }, t)
- addNodes(nodeClient, 4, 2, nil, t)
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
- validateDaemonSetStatus(dsClient, ds.Name, 3, t)
- })
- })
- }
- func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- podInformer := informers.Core().V1().Pods().Informer()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- node := newNode("single-node", nil)
- node.Status.Conditions = []v1.NodeCondition{
- {Type: v1.NodeReady, Status: v1.ConditionFalse},
- }
- _, err = nodeClient.Create(node)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
- validateDaemonSetStatus(dsClient, ds.Name, 1, t)
- })
- }
- // When ScheduleDaemonSetPods is disabled, DaemonSets should not launch onto nodes with insufficient capacity.
- // Look for TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled, we don't need this test anymore.
- func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, false)()
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- eventClient := clientset.CoreV1().Events(ns.Namespace)
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m")
- ds.Spec.UpdateStrategy = *strategy
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- node := newNode("node-with-limited-memory", nil)
- node.Status.Allocatable = allocatableResources("100M", "200m")
- _, err = nodeClient.Create(node)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- validateFailedPlacementEvent(eventClient, t)
- })
- }
- // TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods"
- // feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource
- // on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource.
- func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, true)()
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- podInformer := informers.Core().V1().Pods().Informer()
- nodeClient := clientset.CoreV1().Nodes()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
- ds.Spec.UpdateStrategy = *strategy
- ds, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- node := newNode("node-with-limited-memory", nil)
- node.Status.Allocatable = allocatableResources("100M", "200m")
- _, err = nodeClient.Create(node)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- if err := waitForPodsCreated(podInformer, 1); err != nil {
- t.Errorf("Failed to wait for pods created: %v", err)
- }
- objects := podInformer.GetIndexer().List()
- for _, object := range objects {
- pod := object.(*v1.Pod)
- if err := waitForPodUnschedulable(clientset, pod); err != nil {
- t.Errorf("Failed to wait for unschedulable status of pod %+v", pod)
- }
- }
- node1 := newNode("node-with-enough-memory", nil)
- node1.Status.Allocatable = allocatableResources("200M", "2000m")
- _, err = nodeClient.Create(node1)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- // When ScheduleDaemonSetPods enabled, 2 pods are created. But only one
- // of two Pods is scheduled by default scheduler.
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
- validateDaemonSetStatus(dsClient, ds.Name, 1, t)
- })
- }
- // TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
- // hash collision with an existing ControllerRevision
- func TestLaunchWithHashCollision(t *testing.T) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podInformer := informers.Core().V1().Pods().Informer()
- nodeClient := clientset.CoreV1().Nodes()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(1, stopCh)
- setupScheduler(t, clientset, informers, stopCh)
- // Create single node
- _, err := nodeClient.Create(newNode("single-node", nil))
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- // Create new DaemonSet with RollingUpdate strategy
- orgDs := newDaemonSet("foo", ns.Name)
- oneIntString := intstr.FromInt(1)
- orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
- Type: apps.RollingUpdateDaemonSetStrategyType,
- RollingUpdate: &apps.RollingUpdateDaemonSet{
- MaxUnavailable: &oneIntString,
- },
- }
- ds, err := dsClient.Create(orgDs)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- // Wait for the DaemonSet to be created before proceeding
- err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- ds, err = dsClient.Get(ds.Name, metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to get DaemonSet: %v", err)
- }
- var orgCollisionCount int32
- if ds.Status.CollisionCount != nil {
- orgCollisionCount = *ds.Status.CollisionCount
- }
- // Look up the ControllerRevision for the DaemonSet
- _, name := hashAndNameForDaemonSet(ds)
- revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
- if err != nil || revision == nil {
- t.Fatalf("Failed to look up ControllerRevision: %v", err)
- }
- // Create a "fake" ControllerRevision that we know will create a hash collision when we make
- // the next update
- one := int64(1)
- ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
- newHash, newName := hashAndNameForDaemonSet(ds)
- newRevision := &apps.ControllerRevision{
- ObjectMeta: metav1.ObjectMeta{
- Name: newName,
- Namespace: ds.Namespace,
- Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
- Annotations: ds.Annotations,
- OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
- },
- Data: revision.Data,
- Revision: revision.Revision + 1,
- }
- _, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(newRevision)
- if err != nil {
- t.Fatalf("Failed to create ControllerRevision: %v", err)
- }
- // Make an update of the DaemonSet which we know will create a hash collision when
- // the next ControllerRevision is created.
- ds = updateDS(t, dsClient, ds.Name, func(updateDS *apps.DaemonSet) {
- updateDS.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
- })
- // Wait for any pod with the latest Spec to exist
- err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
- objects := podInformer.GetIndexer().List()
- for _, object := range objects {
- pod := object.(*v1.Pod)
- if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
- return true, nil
- }
- }
- return false, nil
- })
- if err != nil {
- t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
- }
- validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
- }
- // TestTaintedNode tests that no matter "ScheduleDaemonSetPods" feature is enabled or not
- // tainted node isn't expected to have pod scheduled
- func TestTaintedNode(t *testing.T) {
- forEachFeatureGate(t, func(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("tainted-node", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- podInformer := informers.Core().V1().Pods().Informer()
- nodeClient := clientset.CoreV1().Nodes()
- stopCh := make(chan struct{})
- defer close(stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- ds, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- nodeWithTaint := newNode("node-with-taint", nil)
- nodeWithTaint.Spec.Taints = []v1.Taint{{Key: "key1", Value: "val1", Effect: "NoSchedule"}}
- _, err = nodeClient.Create(nodeWithTaint)
- if err != nil {
- t.Fatalf("Failed to create nodeWithTaint: %v", err)
- }
- nodeWithoutTaint := newNode("node-without-taint", nil)
- _, err = nodeClient.Create(nodeWithoutTaint)
- if err != nil {
- t.Fatalf("Failed to create nodeWithoutTaint: %v", err)
- }
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
- validateDaemonSetStatus(dsClient, ds.Name, 1, t)
- // remove taint from nodeWithTaint
- nodeWithTaint, err = nodeClient.Get("node-with-taint", metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to retrieve nodeWithTaint: %v", err)
- }
- nodeWithTaintCopy := nodeWithTaint.DeepCopy()
- nodeWithTaintCopy.Spec.Taints = []v1.Taint{}
- _, err = nodeClient.Update(nodeWithTaintCopy)
- if err != nil {
- t.Fatalf("Failed to update nodeWithTaint: %v", err)
- }
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
- validateDaemonSetStatus(dsClient, ds.Name, 2, t)
- })
- })
- }
- // TestUnschedulableNodeDaemonDoesLaunchPod tests that the DaemonSet Pods can still be scheduled
- // to the Unschedulable nodes when TaintNodesByCondition are enabled.
- func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TaintNodesByCondition, true)()
- forEachFeatureGate(t, func(t *testing.T) {
- forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
- server, closeFn, dc, informers, clientset := setup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("daemonset-unschedulable-test", server, t)
- defer framework.DeleteTestingNamespace(ns, server, t)
- dsClient := clientset.AppsV1().DaemonSets(ns.Name)
- podClient := clientset.CoreV1().Pods(ns.Name)
- nodeClient := clientset.CoreV1().Nodes()
- podInformer := informers.Core().V1().Pods().Informer()
- stopCh := make(chan struct{})
- defer close(stopCh)
- informers.Start(stopCh)
- go dc.Run(5, stopCh)
- // Start Scheduler
- setupScheduler(t, clientset, informers, stopCh)
- ds := newDaemonSet("foo", ns.Name)
- ds.Spec.UpdateStrategy = *strategy
- ds.Spec.Template.Spec.HostNetwork = true
- _, err := dsClient.Create(ds)
- if err != nil {
- t.Fatalf("Failed to create DaemonSet: %v", err)
- }
- defer cleanupDaemonSets(t, clientset, ds)
- // Creates unschedulable node.
- node := newNode("unschedulable-node", nil)
- node.Spec.Unschedulable = true
- node.Spec.Taints = []v1.Taint{
- {
- Key: schedulerapi.TaintNodeUnschedulable,
- Effect: v1.TaintEffectNoSchedule,
- },
- }
- _, err = nodeClient.Create(node)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- // Creates network-unavailable node.
- nodeNU := newNode("network-unavailable-node", nil)
- nodeNU.Status.Conditions = []v1.NodeCondition{
- {Type: v1.NodeReady, Status: v1.ConditionFalse},
- {Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue},
- }
- nodeNU.Spec.Taints = []v1.Taint{
- {
- Key: schedulerapi.TaintNodeNetworkUnavailable,
- Effect: v1.TaintEffectNoSchedule,
- },
- }
- _, err = nodeClient.Create(nodeNU)
- if err != nil {
- t.Fatalf("Failed to create node: %v", err)
- }
- validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
- validateDaemonSetStatus(dsClient, ds.Name, 2, t)
- })
- })
- }
|