123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package job
- import (
- "fmt"
- "strconv"
- "testing"
- "time"
- batch "k8s.io/api/batch/v1"
- "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/rand"
- "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/fake"
- restclient "k8s.io/client-go/rest"
- core "k8s.io/client-go/testing"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- _ "k8s.io/kubernetes/pkg/apis/core/install"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/testutil"
- )
- var alwaysReady = func() bool { return true }
- func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
- j := &batch.Job{
- TypeMeta: metav1.TypeMeta{Kind: "Job"},
- ObjectMeta: metav1.ObjectMeta{
- Name: "foobar",
- UID: uuid.NewUUID(),
- Namespace: metav1.NamespaceDefault,
- },
- Spec: batch.JobSpec{
- Selector: &metav1.LabelSelector{
- MatchLabels: map[string]string{"foo": "bar"},
- },
- Template: v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {Image: "foo/bar"},
- },
- },
- },
- },
- }
- // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed
- // in practice by validation.
- if completions >= 0 {
- j.Spec.Completions = &completions
- } else {
- j.Spec.Completions = nil
- }
- if parallelism >= 0 {
- j.Spec.Parallelism = ¶llelism
- } else {
- j.Spec.Parallelism = nil
- }
- j.Spec.BackoffLimit = &backoffLimit
- return j
- }
- func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
- sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
- jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
- jm.podControl = &controller.FakePodControl{}
- return jm, sharedInformers
- }
- func newPod(name string, job *batch.Job) *v1.Pod {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Labels: job.Spec.Selector.MatchLabels,
- Namespace: job.Namespace,
- OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
- },
- }
- }
- // create count pods with the given phase for the given job
- func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
- pods := []v1.Pod{}
- for i := int32(0); i < count; i++ {
- newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
- newPod.Status = v1.PodStatus{Phase: status}
- pods = append(pods, *newPod)
- }
- return pods
- }
- func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) {
- for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
- podIndexer.Add(&pod)
- }
- for _, pod := range newPodList(activePods, v1.PodRunning, job) {
- podIndexer.Add(&pod)
- }
- for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) {
- podIndexer.Add(&pod)
- }
- for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
- podIndexer.Add(&pod)
- }
- }
- func TestControllerSyncJob(t *testing.T) {
- jobConditionComplete := batch.JobComplete
- jobConditionFailed := batch.JobFailed
- testCases := map[string]struct {
- // job setup
- parallelism int32
- completions int32
- backoffLimit int32
- deleting bool
- podLimit int
- // pod setup
- podControllerError error
- jobKeyForget bool
- pendingPods int32
- activePods int32
- succeededPods int32
- failedPods int32
- // expectations
- expectedCreations int32
- expectedDeletions int32
- expectedActive int32
- expectedSucceeded int32
- expectedFailed int32
- expectedCondition *batch.JobConditionType
- expectedConditionReason string
- }{
- "job start": {
- 2, 5, 6, false, 0,
- nil, true, 0, 0, 0, 0,
- 2, 0, 2, 0, 0, nil, "",
- },
- "WQ job start": {
- 2, -1, 6, false, 0,
- nil, true, 0, 0, 0, 0,
- 2, 0, 2, 0, 0, nil, "",
- },
- "pending pods": {
- 2, 5, 6, false, 0,
- nil, true, 2, 0, 0, 0,
- 0, 0, 2, 0, 0, nil, "",
- },
- "correct # of pods": {
- 2, 5, 6, false, 0,
- nil, true, 0, 2, 0, 0,
- 0, 0, 2, 0, 0, nil, "",
- },
- "WQ job: correct # of pods": {
- 2, -1, 6, false, 0,
- nil, true, 0, 2, 0, 0,
- 0, 0, 2, 0, 0, nil, "",
- },
- "too few active pods": {
- 2, 5, 6, false, 0,
- nil, true, 0, 1, 1, 0,
- 1, 0, 2, 1, 0, nil, "",
- },
- "too few active pods with a dynamic job": {
- 2, -1, 6, false, 0,
- nil, true, 0, 1, 0, 0,
- 1, 0, 2, 0, 0, nil, "",
- },
- "too few active pods, with controller error": {
- 2, 5, 6, false, 0,
- fmt.Errorf("Fake error"), true, 0, 1, 1, 0,
- 1, 0, 1, 1, 0, nil, "",
- },
- "too many active pods": {
- 2, 5, 6, false, 0,
- nil, true, 0, 3, 0, 0,
- 0, 1, 2, 0, 0, nil, "",
- },
- "too many active pods, with controller error": {
- 2, 5, 6, false, 0,
- fmt.Errorf("Fake error"), true, 0, 3, 0, 0,
- 0, 1, 3, 0, 0, nil, "",
- },
- "failed + succeed pods: reset backoff delay": {
- 2, 5, 6, false, 0,
- fmt.Errorf("Fake error"), true, 0, 1, 1, 1,
- 1, 0, 1, 1, 1, nil, "",
- },
- "only new failed pod": {
- 2, 5, 6, false, 0,
- fmt.Errorf("Fake error"), false, 0, 1, 0, 1,
- 1, 0, 1, 0, 1, nil, "",
- },
- "job finish": {
- 2, 5, 6, false, 0,
- nil, true, 0, 0, 5, 0,
- 0, 0, 0, 5, 0, nil, "",
- },
- "WQ job finishing": {
- 2, -1, 6, false, 0,
- nil, true, 0, 1, 1, 0,
- 0, 0, 1, 1, 0, nil, "",
- },
- "WQ job all finished": {
- 2, -1, 6, false, 0,
- nil, true, 0, 0, 2, 0,
- 0, 0, 0, 2, 0, &jobConditionComplete, "",
- },
- "WQ job all finished despite one failure": {
- 2, -1, 6, false, 0,
- nil, true, 0, 0, 1, 1,
- 0, 0, 0, 1, 1, &jobConditionComplete, "",
- },
- "more active pods than completions": {
- 2, 5, 6, false, 0,
- nil, true, 0, 10, 0, 0,
- 0, 8, 2, 0, 0, nil, "",
- },
- "status change": {
- 2, 5, 6, false, 0,
- nil, true, 0, 2, 2, 0,
- 0, 0, 2, 2, 0, nil, "",
- },
- "deleting job": {
- 2, 5, 6, true, 0,
- nil, true, 1, 1, 1, 0,
- 0, 0, 2, 1, 0, nil, "",
- },
- "limited pods": {
- 100, 200, 6, false, 10,
- nil, true, 0, 0, 0, 0,
- 10, 0, 10, 0, 0, nil, "",
- },
- "too many job failures": {
- 2, 5, 0, true, 0,
- nil, true, 0, 0, 0, 1,
- 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
- },
- }
- for name, tc := range testCases {
- // job manager setup
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var actual *batch.Job
- manager.updateHandler = func(job *batch.Job) error {
- actual = job
- return nil
- }
- // job & pods setup
- job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
- if tc.deleting {
- now := metav1.Now()
- job.DeletionTimestamp = &now
- }
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
- setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods)
- // run
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- // We need requeue syncJob task if podController error
- if tc.podControllerError != nil {
- if err == nil {
- t.Errorf("%s: Syncing jobs would return error when podController exception", name)
- }
- } else {
- if err != nil && (tc.podLimit == 0 || fakePodControl.CreateCallCount < tc.podLimit) {
- t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
- }
- }
- if forget != tc.jobKeyForget {
- t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget)
- }
- // validate created/deleted pods
- if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
- t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
- }
- if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
- t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
- }
- // Each create should have an accompanying ControllerRef.
- if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
- t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs))
- }
- // Make sure the ControllerRefs are correct.
- for _, controllerRef := range fakePodControl.ControllerRefs {
- if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
- t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
- }
- if got, want := controllerRef.Kind, "Job"; got != want {
- t.Errorf("controllerRef.Kind = %q, want %q", got, want)
- }
- if got, want := controllerRef.Name, job.Name; got != want {
- t.Errorf("controllerRef.Name = %q, want %q", got, want)
- }
- if got, want := controllerRef.UID, job.UID; got != want {
- t.Errorf("controllerRef.UID = %q, want %q", got, want)
- }
- if controllerRef.Controller == nil || *controllerRef.Controller != true {
- t.Errorf("controllerRef.Controller is not set to true")
- }
- }
- // validate status
- if actual.Status.Active != tc.expectedActive {
- t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
- }
- if actual.Status.Succeeded != tc.expectedSucceeded {
- t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
- }
- if actual.Status.Failed != tc.expectedFailed {
- t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
- }
- if actual.Status.StartTime == nil {
- t.Errorf("%s: .status.startTime was not set", name)
- }
- // validate conditions
- if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
- t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
- }
- // validate slow start
- expectedLimit := 0
- for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
- expectedLimit += controller.SlowStartInitialBatchSize << pass
- }
- if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
- t.Errorf("%s: Unexpected number of create calls. Expected <= %d, saw %d\n", name, fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
- }
- }
- }
- func TestSyncJobPastDeadline(t *testing.T) {
- testCases := map[string]struct {
- // job setup
- parallelism int32
- completions int32
- activeDeadlineSeconds int64
- startTime int64
- backoffLimit int32
- // pod setup
- activePods int32
- succeededPods int32
- failedPods int32
- // expectations
- expectedForGetKey bool
- expectedDeletions int32
- expectedActive int32
- expectedSucceeded int32
- expectedFailed int32
- expectedConditionReason string
- }{
- "activeDeadlineSeconds less than single pod execution": {
- 1, 1, 10, 15, 6,
- 1, 0, 0,
- true, 1, 0, 0, 1, "DeadlineExceeded",
- },
- "activeDeadlineSeconds bigger than single pod execution": {
- 1, 2, 10, 15, 6,
- 1, 1, 0,
- true, 1, 0, 1, 1, "DeadlineExceeded",
- },
- "activeDeadlineSeconds times-out before any pod starts": {
- 1, 1, 10, 10, 6,
- 0, 0, 0,
- true, 0, 0, 0, 0, "DeadlineExceeded",
- },
- "activeDeadlineSeconds with backofflimit reach": {
- 1, 1, 1, 10, 0,
- 0, 0, 1,
- true, 0, 0, 0, 1, "BackoffLimitExceeded",
- },
- }
- for name, tc := range testCases {
- // job manager setup
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var actual *batch.Job
- manager.updateHandler = func(job *batch.Job) error {
- actual = job
- return nil
- }
- // job & pods setup
- job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
- job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
- start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
- job.Status.StartTime = &start
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
- setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods)
- // run
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err != nil {
- t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
- }
- if forget != tc.expectedForGetKey {
- t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForGetKey, forget)
- }
- // validate created/deleted pods
- if int32(len(fakePodControl.Templates)) != 0 {
- t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
- }
- if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
- t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
- }
- // validate status
- if actual.Status.Active != tc.expectedActive {
- t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
- }
- if actual.Status.Succeeded != tc.expectedSucceeded {
- t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
- }
- if actual.Status.Failed != tc.expectedFailed {
- t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
- }
- if actual.Status.StartTime == nil {
- t.Errorf("%s: .status.startTime was not set", name)
- }
- // validate conditions
- if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) {
- t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
- }
- }
- }
- func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool {
- for _, v := range job.Status.Conditions {
- if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
- return true
- }
- }
- return false
- }
- func TestSyncPastDeadlineJobFinished(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var actual *batch.Job
- manager.updateHandler = func(job *batch.Job) error {
- actual = job
- return nil
- }
- job := newJob(1, 1, 6)
- activeDeadlineSeconds := int64(10)
- job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
- start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
- job.Status.StartTime = &start
- job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err != nil {
- t.Errorf("Unexpected error when syncing jobs %v", err)
- }
- if !forget {
- t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
- }
- if len(fakePodControl.Templates) != 0 {
- t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
- }
- if len(fakePodControl.DeletePodName) != 0 {
- t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
- }
- if actual != nil {
- t.Error("Unexpected job modification")
- }
- }
- func TestSyncJobComplete(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- job := newJob(1, 1, 6)
- job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err != nil {
- t.Fatalf("Unexpected error when syncing jobs %v", err)
- }
- if !forget {
- t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
- }
- actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
- if err != nil {
- t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
- }
- // Verify that after syncing a complete job, the conditions are the same.
- if got, expected := len(actual.Status.Conditions), 1; got != expected {
- t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
- }
- }
- func TestSyncJobDeleted(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- manager.updateHandler = func(job *batch.Job) error { return nil }
- job := newJob(2, 2, 6)
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err != nil {
- t.Errorf("Unexpected error when syncing jobs %v", err)
- }
- if !forget {
- t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
- }
- if len(fakePodControl.Templates) != 0 {
- t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
- }
- if len(fakePodControl.DeletePodName) != 0 {
- t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
- }
- }
- func TestSyncJobUpdateRequeue(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- updateError := fmt.Errorf("Update error")
- manager.updateHandler = func(job *batch.Job) error {
- manager.queue.AddRateLimited(testutil.GetKey(job, t))
- return updateError
- }
- job := newJob(2, 2, 6)
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err == nil || err != updateError {
- t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
- }
- if forget != false {
- t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
- }
- t.Log("Waiting for a job in the queue")
- key, _ := manager.queue.Get()
- expectedKey := testutil.GetKey(job, t)
- if key != expectedKey {
- t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
- }
- }
- func TestJobPodLookup(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- testCases := []struct {
- job *batch.Job
- pod *v1.Pod
- expectedName string
- }{
- // pods without labels don't match any job
- {
- job: &batch.Job{
- ObjectMeta: metav1.ObjectMeta{Name: "basic"},
- },
- pod: &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll},
- },
- expectedName: "",
- },
- // matching labels, different namespace
- {
- job: &batch.Job{
- ObjectMeta: metav1.ObjectMeta{Name: "foo"},
- Spec: batch.JobSpec{
- Selector: &metav1.LabelSelector{
- MatchLabels: map[string]string{"foo": "bar"},
- },
- },
- },
- pod: &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo2",
- Namespace: "ns",
- Labels: map[string]string{"foo": "bar"},
- },
- },
- expectedName: "",
- },
- // matching ns and labels returns
- {
- job: &batch.Job{
- ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"},
- Spec: batch.JobSpec{
- Selector: &metav1.LabelSelector{
- MatchExpressions: []metav1.LabelSelectorRequirement{
- {
- Key: "foo",
- Operator: metav1.LabelSelectorOpIn,
- Values: []string{"bar"},
- },
- },
- },
- },
- },
- pod: &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo3",
- Namespace: "ns",
- Labels: map[string]string{"foo": "bar"},
- },
- },
- expectedName: "bar",
- },
- }
- for _, tc := range testCases {
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
- if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 {
- if got, want := len(jobs), 1; got != want {
- t.Errorf("len(jobs) = %v, want %v", got, want)
- }
- job := jobs[0]
- if tc.expectedName != job.Name {
- t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
- }
- } else if tc.expectedName != "" {
- t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name)
- }
- }
- }
- func TestGetPodsForJob(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job2)
- pod3 := newPod("pod3", job1)
- // Make pod3 an orphan that doesn't match. It should be ignored.
- pod3.OwnerReferences = nil
- pod3.Labels = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3)
- pods, err := jm.getPodsForJob(job1)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 1; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- if got, want := pods[0].Name, "pod1"; got != want {
- t.Errorf("pod.Name = %v, want %v", got, want)
- }
- pods, err = jm.getPodsForJob(job2)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 1; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- if got, want := pods[0].Name, "pod2"; got != want {
- t.Errorf("pod.Name = %v, want %v", got, want)
- }
- }
- func TestGetPodsForJobAdopt(t *testing.T) {
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- clientset := fake.NewSimpleClientset(job1)
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job1)
- // Make this pod an orphan. It should still be returned because it's adopted.
- pod2.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- pods, err := jm.getPodsForJob(job1)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 2; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- }
- func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job1.DeletionTimestamp = &metav1.Time{}
- clientset := fake.NewSimpleClientset(job1)
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job1)
- // Make this pod an orphan. It should not be adopted because the Job is being deleted.
- pod2.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- pods, err := jm.getPodsForJob(job1)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 1; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- if got, want := pods[0].Name, pod1.Name; got != want {
- t.Errorf("pod.Name = %q, want %q", got, want)
- }
- }
- func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- // The up-to-date object says it's being deleted.
- job1.DeletionTimestamp = &metav1.Time{}
- clientset := fake.NewSimpleClientset(job1)
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- // The cache says it's NOT being deleted.
- cachedJob := *job1
- cachedJob.DeletionTimestamp = nil
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job1)
- // Make this pod an orphan. It should not be adopted because the Job is being deleted.
- pod2.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- pods, err := jm.getPodsForJob(job1)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 1; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- if got, want := pods[0].Name, pod1.Name; got != want {
- t.Errorf("pod.Name = %q, want %q", got, want)
- }
- }
- func TestGetPodsForJobRelease(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job1)
- // Make this pod not match, even though it's owned. It should be released.
- pod2.Labels = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- pods, err := jm.getPodsForJob(job1)
- if err != nil {
- t.Fatalf("getPodsForJob() error: %v", err)
- }
- if got, want := len(pods), 1; got != want {
- t.Errorf("len(pods) = %v, want %v", got, want)
- }
- if got, want := pods[0].Name, "pod1"; got != want {
- t.Errorf("pod.Name = %v, want %v", got, want)
- }
- }
- func TestAddPod(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job2)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- jm.addPod(pod1)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done := jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
- }
- expectedKey, _ := controller.KeyFunc(job1)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- jm.addPod(pod2)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done = jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
- }
- expectedKey, _ = controller.KeyFunc(job2)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- }
- func TestAddPodOrphan(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- job3 := newJob(1, 1, 6)
- job3.Name = "job3"
- job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
- pod1 := newPod("pod1", job1)
- // Make pod an orphan. Expect all matching controllers to be queued.
- pod1.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- jm.addPod(pod1)
- if got, want := jm.queue.Len(), 2; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- }
- func TestUpdatePod(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job2)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- prev := *pod1
- bumpResourceVersion(pod1)
- jm.updatePod(&prev, pod1)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done := jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
- }
- expectedKey, _ := controller.KeyFunc(job1)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- prev = *pod2
- bumpResourceVersion(pod2)
- jm.updatePod(&prev, pod2)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done = jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
- }
- expectedKey, _ = controller.KeyFunc(job2)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- }
- func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- pod1.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- // Labels changed on orphan. Expect newly matching controllers to queue.
- prev := *pod1
- prev.Labels = map[string]string{"foo2": "bar2"}
- bumpResourceVersion(pod1)
- jm.updatePod(&prev, pod1)
- if got, want := jm.queue.Len(), 2; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- }
- func TestUpdatePodChangeControllerRef(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- // Changed ControllerRef. Expect both old and new to queue.
- prev := *pod1
- prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
- bumpResourceVersion(pod1)
- jm.updatePod(&prev, pod1)
- if got, want := jm.queue.Len(), 2; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- }
- func TestUpdatePodRelease(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- // Remove ControllerRef. Expect all matching to queue for adoption.
- prev := *pod1
- pod1.OwnerReferences = nil
- bumpResourceVersion(pod1)
- jm.updatePod(&prev, pod1)
- if got, want := jm.queue.Len(), 2; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- }
- func TestDeletePod(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- pod1 := newPod("pod1", job1)
- pod2 := newPod("pod2", job2)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
- jm.deletePod(pod1)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done := jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
- }
- expectedKey, _ := controller.KeyFunc(job1)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- jm.deletePod(pod2)
- if got, want := jm.queue.Len(), 1; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- key, done = jm.queue.Get()
- if key == nil || done {
- t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
- }
- expectedKey, _ = controller.KeyFunc(job2)
- if got, want := key.(string), expectedKey; got != want {
- t.Errorf("queue.Get() = %v, want %v", got, want)
- }
- }
- func TestDeletePodOrphan(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- jm.podStoreSynced = alwaysReady
- jm.jobStoreSynced = alwaysReady
- job1 := newJob(1, 1, 6)
- job1.Name = "job1"
- job2 := newJob(1, 1, 6)
- job2.Name = "job2"
- job3 := newJob(1, 1, 6)
- job3.Name = "job3"
- job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
- informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
- pod1 := newPod("pod1", job1)
- pod1.OwnerReferences = nil
- informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
- jm.deletePod(pod1)
- if got, want := jm.queue.Len(), 0; got != want {
- t.Fatalf("queue.Len() = %v, want %v", got, want)
- }
- }
- type FakeJobExpectations struct {
- *controller.ControllerExpectations
- satisfied bool
- expSatisfied func()
- }
- func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
- fe.expSatisfied()
- return fe.satisfied
- }
- // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
- // and checking expectations.
- func TestSyncJobExpectations(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- manager.updateHandler = func(job *batch.Job) error { return nil }
- job := newJob(2, 2, 6)
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- pods := newPodList(2, v1.PodPending, job)
- podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
- podIndexer.Add(&pods[0])
- manager.expectations = FakeJobExpectations{
- controller.NewControllerExpectations(), true, func() {
- // If we check active pods before checking expectataions, the job
- // will create a new replica because it doesn't see this pod, but
- // has fulfilled its expectations.
- podIndexer.Add(&pods[1])
- },
- }
- manager.syncJob(testutil.GetKey(job, t))
- if len(fakePodControl.Templates) != 0 {
- t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
- }
- if len(fakePodControl.DeletePodName) != 0 {
- t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
- }
- }
- func TestWatchJobs(t *testing.T) {
- clientset := fake.NewSimpleClientset()
- fakeWatch := watch.NewFake()
- clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var testJob batch.Job
- received := make(chan struct{})
- // The update sent through the fakeWatcher should make its way into the workqueue,
- // and eventually into the syncHandler.
- manager.syncHandler = func(key string) (bool, error) {
- defer close(received)
- ns, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- t.Errorf("Error getting namespace/name from key %v: %v", key, err)
- }
- job, err := manager.jobLister.Jobs(ns).Get(name)
- if err != nil || job == nil {
- t.Errorf("Expected to find job under key %v: %v", key, err)
- return true, nil
- }
- if !apiequality.Semantic.DeepDerivative(*job, testJob) {
- t.Errorf("Expected %#v, but got %#v", testJob, *job)
- }
- return true, nil
- }
- // Start only the job watcher and the workqueue, send a watch event,
- // and make sure it hits the sync method.
- stopCh := make(chan struct{})
- defer close(stopCh)
- sharedInformerFactory.Start(stopCh)
- go manager.Run(1, stopCh)
- // We're sending new job to see if it reaches syncHandler.
- testJob.Namespace = "bar"
- testJob.Name = "foo"
- fakeWatch.Add(&testJob)
- t.Log("Waiting for job to reach syncHandler")
- <-received
- }
- func TestWatchPods(t *testing.T) {
- testJob := newJob(2, 2, 6)
- clientset := fake.NewSimpleClientset(testJob)
- fakeWatch := watch.NewFake()
- clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- // Put one job and one pod into the store
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(testJob)
- received := make(chan struct{})
- // The pod update sent through the fakeWatcher should figure out the managing job and
- // send it into the syncHandler.
- manager.syncHandler = func(key string) (bool, error) {
- ns, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- t.Errorf("Error getting namespace/name from key %v: %v", key, err)
- }
- job, err := manager.jobLister.Jobs(ns).Get(name)
- if err != nil {
- t.Errorf("Expected to find job under key %v: %v", key, err)
- }
- if !apiequality.Semantic.DeepDerivative(job, testJob) {
- t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
- close(received)
- return true, nil
- }
- close(received)
- return true, nil
- }
- // Start only the pod watcher and the workqueue, send a watch event,
- // and make sure it hits the sync method for the right job.
- stopCh := make(chan struct{})
- defer close(stopCh)
- go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
- go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
- pods := newPodList(1, v1.PodRunning, testJob)
- testPod := pods[0]
- testPod.Status.Phase = v1.PodFailed
- fakeWatch.Add(&testPod)
- t.Log("Waiting for pod to reach syncHandler")
- <-received
- }
- func bumpResourceVersion(obj metav1.Object) {
- ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
- obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
- }
- type pods struct {
- pending int32
- active int32
- succeed int32
- failed int32
- }
- func TestJobBackoffReset(t *testing.T) {
- testCases := map[string]struct {
- // job setup
- parallelism int32
- completions int32
- backoffLimit int32
- // pod setup - each row is additive!
- pods []pods
- }{
- "parallelism=1": {
- 1, 2, 1,
- []pods{
- {0, 1, 0, 1},
- {0, 0, 1, 0},
- },
- },
- "parallelism=2 (just failure)": {
- 2, 2, 1,
- []pods{
- {0, 2, 0, 1},
- {0, 0, 1, 0},
- },
- },
- }
- for name, tc := range testCases {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var actual *batch.Job
- manager.updateHandler = func(job *batch.Job) error {
- actual = job
- return nil
- }
- // job & pods setup
- job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
- key := testutil.GetKey(job, t)
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
- setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed)
- manager.queue.Add(key)
- manager.processNextWorkItem()
- retries := manager.queue.NumRequeues(key)
- if retries != 1 {
- t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
- }
- job = actual
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
- setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed)
- manager.processNextWorkItem()
- retries = manager.queue.NumRequeues(key)
- if retries != 0 {
- t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
- }
- if getCondition(actual, batch.JobFailed, "BackoffLimitExceeded") {
- t.Errorf("%s: unexpected job failure", name)
- }
- }
- }
- var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
- type fakeRateLimitingQueue struct {
- workqueue.Interface
- requeues int
- item interface{}
- duration time.Duration
- }
- func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
- func (f *fakeRateLimitingQueue) Forget(item interface{}) {
- f.requeues = 0
- }
- func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
- return f.requeues
- }
- func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
- f.item = item
- f.duration = duration
- }
- func TestJobBackoff(t *testing.T) {
- job := newJob(1, 1, 1)
- oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
- oldPod.Status.Phase = v1.PodRunning
- oldPod.ResourceVersion = "1"
- newPod := oldPod.DeepCopy()
- newPod.ResourceVersion = "2"
- testCases := map[string]struct {
- // inputs
- requeues int
- phase v1.PodPhase
- // expectation
- backoff int
- }{
- "1st failure": {0, v1.PodFailed, 0},
- "2nd failure": {1, v1.PodFailed, 1},
- "3rd failure": {2, v1.PodFailed, 2},
- "1st success": {0, v1.PodSucceeded, 0},
- "2nd success": {1, v1.PodSucceeded, 0},
- "1st running": {0, v1.PodSucceeded, 0},
- "2nd running": {1, v1.PodSucceeded, 0},
- }
- for name, tc := range testCases {
- t.Run(name, func(t *testing.T) {
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- queue := &fakeRateLimitingQueue{}
- manager.queue = queue
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- queue.requeues = tc.requeues
- newPod.Status.Phase = tc.phase
- manager.updatePod(oldPod, newPod)
- if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
- t.Errorf("unexpected backoff %v", queue.duration)
- }
- })
- }
- }
- func TestJobBackoffForOnFailure(t *testing.T) {
- jobConditionFailed := batch.JobFailed
- testCases := map[string]struct {
- // job setup
- parallelism int32
- completions int32
- backoffLimit int32
- // pod setup
- jobKeyForget bool
- restartCounts []int32
- podPhase v1.PodPhase
- // expectations
- expectedActive int32
- expectedSucceeded int32
- expectedFailed int32
- expectedCondition *batch.JobConditionType
- expectedConditionReason string
- }{
- "backoffLimit 0 should have 1 pod active": {
- 1, 1, 0,
- true, []int32{0}, v1.PodRunning,
- 1, 0, 0, nil, "",
- },
- "backoffLimit 1 with restartCount 0 should have 1 pod active": {
- 1, 1, 1,
- true, []int32{0}, v1.PodRunning,
- 1, 0, 0, nil, "",
- },
- "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
- 1, 1, 1,
- true, []int32{1}, v1.PodRunning,
- 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
- 1, 1, 1,
- true, []int32{1}, v1.PodPending,
- 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "too many job failures with podRunning - single pod": {
- 1, 5, 2,
- true, []int32{2}, v1.PodRunning,
- 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "too many job failures with podPending - single pod": {
- 1, 5, 2,
- true, []int32{2}, v1.PodPending,
- 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "too many job failures with podRunning - multiple pods": {
- 2, 5, 2,
- true, []int32{1, 1}, v1.PodRunning,
- 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "too many job failures with podPending - multiple pods": {
- 2, 5, 2,
- true, []int32{1, 1}, v1.PodPending,
- 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
- },
- "not enough failures": {
- 2, 5, 3,
- true, []int32{1, 1}, v1.PodRunning,
- 2, 0, 0, nil, "",
- },
- }
- for name, tc := range testCases {
- t.Run(name, func(t *testing.T) {
- // job manager setup
- clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
- fakePodControl := controller.FakePodControl{}
- manager.podControl = &fakePodControl
- manager.podStoreSynced = alwaysReady
- manager.jobStoreSynced = alwaysReady
- var actual *batch.Job
- manager.updateHandler = func(job *batch.Job) error {
- actual = job
- return nil
- }
- // job & pods setup
- job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
- job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
- sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
- podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
- for i, pod := range newPodList(int32(len(tc.restartCounts)), tc.podPhase, job) {
- pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
- podIndexer.Add(&pod)
- }
- // run
- forget, err := manager.syncJob(testutil.GetKey(job, t))
- if err != nil {
- t.Errorf("unexpected error syncing job. Got %#v", err)
- }
- if forget != tc.jobKeyForget {
- t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
- }
- // validate status
- if actual.Status.Active != tc.expectedActive {
- t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
- }
- if actual.Status.Succeeded != tc.expectedSucceeded {
- t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
- }
- if actual.Status.Failed != tc.expectedFailed {
- t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
- }
- // validate conditions
- if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
- t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
- }
- })
- }
- }
|