job_controller_test.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package job
  14. import (
  15. "fmt"
  16. "strconv"
  17. "testing"
  18. "time"
  19. batch "k8s.io/api/batch/v1"
  20. "k8s.io/api/core/v1"
  21. apiequality "k8s.io/apimachinery/pkg/api/equality"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/runtime/schema"
  24. "k8s.io/apimachinery/pkg/util/rand"
  25. "k8s.io/apimachinery/pkg/util/uuid"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. "k8s.io/apimachinery/pkg/watch"
  28. "k8s.io/client-go/informers"
  29. clientset "k8s.io/client-go/kubernetes"
  30. "k8s.io/client-go/kubernetes/fake"
  31. restclient "k8s.io/client-go/rest"
  32. core "k8s.io/client-go/testing"
  33. "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/util/workqueue"
  35. _ "k8s.io/kubernetes/pkg/apis/core/install"
  36. "k8s.io/kubernetes/pkg/controller"
  37. "k8s.io/kubernetes/pkg/controller/testutil"
  38. )
  39. var alwaysReady = func() bool { return true }
  40. func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
  41. j := &batch.Job{
  42. TypeMeta: metav1.TypeMeta{Kind: "Job"},
  43. ObjectMeta: metav1.ObjectMeta{
  44. Name: "foobar",
  45. UID: uuid.NewUUID(),
  46. Namespace: metav1.NamespaceDefault,
  47. },
  48. Spec: batch.JobSpec{
  49. Selector: &metav1.LabelSelector{
  50. MatchLabels: map[string]string{"foo": "bar"},
  51. },
  52. Template: v1.PodTemplateSpec{
  53. ObjectMeta: metav1.ObjectMeta{
  54. Labels: map[string]string{
  55. "foo": "bar",
  56. },
  57. },
  58. Spec: v1.PodSpec{
  59. Containers: []v1.Container{
  60. {Image: "foo/bar"},
  61. },
  62. },
  63. },
  64. },
  65. }
  66. // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed
  67. // in practice by validation.
  68. if completions >= 0 {
  69. j.Spec.Completions = &completions
  70. } else {
  71. j.Spec.Completions = nil
  72. }
  73. if parallelism >= 0 {
  74. j.Spec.Parallelism = &parallelism
  75. } else {
  76. j.Spec.Parallelism = nil
  77. }
  78. j.Spec.BackoffLimit = &backoffLimit
  79. return j
  80. }
  81. func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
  82. sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
  83. jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
  84. jm.podControl = &controller.FakePodControl{}
  85. return jm, sharedInformers
  86. }
  87. func newPod(name string, job *batch.Job) *v1.Pod {
  88. return &v1.Pod{
  89. ObjectMeta: metav1.ObjectMeta{
  90. Name: name,
  91. Labels: job.Spec.Selector.MatchLabels,
  92. Namespace: job.Namespace,
  93. OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
  94. },
  95. }
  96. }
  97. // create count pods with the given phase for the given job
  98. func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
  99. pods := []v1.Pod{}
  100. for i := int32(0); i < count; i++ {
  101. newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
  102. newPod.Status = v1.PodStatus{Phase: status}
  103. pods = append(pods, *newPod)
  104. }
  105. return pods
  106. }
  107. func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) {
  108. for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
  109. podIndexer.Add(&pod)
  110. }
  111. for _, pod := range newPodList(activePods, v1.PodRunning, job) {
  112. podIndexer.Add(&pod)
  113. }
  114. for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) {
  115. podIndexer.Add(&pod)
  116. }
  117. for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
  118. podIndexer.Add(&pod)
  119. }
  120. }
  121. func TestControllerSyncJob(t *testing.T) {
  122. jobConditionComplete := batch.JobComplete
  123. jobConditionFailed := batch.JobFailed
  124. testCases := map[string]struct {
  125. // job setup
  126. parallelism int32
  127. completions int32
  128. backoffLimit int32
  129. deleting bool
  130. podLimit int
  131. // pod setup
  132. podControllerError error
  133. jobKeyForget bool
  134. pendingPods int32
  135. activePods int32
  136. succeededPods int32
  137. failedPods int32
  138. // expectations
  139. expectedCreations int32
  140. expectedDeletions int32
  141. expectedActive int32
  142. expectedSucceeded int32
  143. expectedFailed int32
  144. expectedCondition *batch.JobConditionType
  145. expectedConditionReason string
  146. }{
  147. "job start": {
  148. 2, 5, 6, false, 0,
  149. nil, true, 0, 0, 0, 0,
  150. 2, 0, 2, 0, 0, nil, "",
  151. },
  152. "WQ job start": {
  153. 2, -1, 6, false, 0,
  154. nil, true, 0, 0, 0, 0,
  155. 2, 0, 2, 0, 0, nil, "",
  156. },
  157. "pending pods": {
  158. 2, 5, 6, false, 0,
  159. nil, true, 2, 0, 0, 0,
  160. 0, 0, 2, 0, 0, nil, "",
  161. },
  162. "correct # of pods": {
  163. 2, 5, 6, false, 0,
  164. nil, true, 0, 2, 0, 0,
  165. 0, 0, 2, 0, 0, nil, "",
  166. },
  167. "WQ job: correct # of pods": {
  168. 2, -1, 6, false, 0,
  169. nil, true, 0, 2, 0, 0,
  170. 0, 0, 2, 0, 0, nil, "",
  171. },
  172. "too few active pods": {
  173. 2, 5, 6, false, 0,
  174. nil, true, 0, 1, 1, 0,
  175. 1, 0, 2, 1, 0, nil, "",
  176. },
  177. "too few active pods with a dynamic job": {
  178. 2, -1, 6, false, 0,
  179. nil, true, 0, 1, 0, 0,
  180. 1, 0, 2, 0, 0, nil, "",
  181. },
  182. "too few active pods, with controller error": {
  183. 2, 5, 6, false, 0,
  184. fmt.Errorf("Fake error"), true, 0, 1, 1, 0,
  185. 1, 0, 1, 1, 0, nil, "",
  186. },
  187. "too many active pods": {
  188. 2, 5, 6, false, 0,
  189. nil, true, 0, 3, 0, 0,
  190. 0, 1, 2, 0, 0, nil, "",
  191. },
  192. "too many active pods, with controller error": {
  193. 2, 5, 6, false, 0,
  194. fmt.Errorf("Fake error"), true, 0, 3, 0, 0,
  195. 0, 1, 3, 0, 0, nil, "",
  196. },
  197. "failed + succeed pods: reset backoff delay": {
  198. 2, 5, 6, false, 0,
  199. fmt.Errorf("Fake error"), true, 0, 1, 1, 1,
  200. 1, 0, 1, 1, 1, nil, "",
  201. },
  202. "only new failed pod": {
  203. 2, 5, 6, false, 0,
  204. fmt.Errorf("Fake error"), false, 0, 1, 0, 1,
  205. 1, 0, 1, 0, 1, nil, "",
  206. },
  207. "job finish": {
  208. 2, 5, 6, false, 0,
  209. nil, true, 0, 0, 5, 0,
  210. 0, 0, 0, 5, 0, nil, "",
  211. },
  212. "WQ job finishing": {
  213. 2, -1, 6, false, 0,
  214. nil, true, 0, 1, 1, 0,
  215. 0, 0, 1, 1, 0, nil, "",
  216. },
  217. "WQ job all finished": {
  218. 2, -1, 6, false, 0,
  219. nil, true, 0, 0, 2, 0,
  220. 0, 0, 0, 2, 0, &jobConditionComplete, "",
  221. },
  222. "WQ job all finished despite one failure": {
  223. 2, -1, 6, false, 0,
  224. nil, true, 0, 0, 1, 1,
  225. 0, 0, 0, 1, 1, &jobConditionComplete, "",
  226. },
  227. "more active pods than completions": {
  228. 2, 5, 6, false, 0,
  229. nil, true, 0, 10, 0, 0,
  230. 0, 8, 2, 0, 0, nil, "",
  231. },
  232. "status change": {
  233. 2, 5, 6, false, 0,
  234. nil, true, 0, 2, 2, 0,
  235. 0, 0, 2, 2, 0, nil, "",
  236. },
  237. "deleting job": {
  238. 2, 5, 6, true, 0,
  239. nil, true, 1, 1, 1, 0,
  240. 0, 0, 2, 1, 0, nil, "",
  241. },
  242. "limited pods": {
  243. 100, 200, 6, false, 10,
  244. nil, true, 0, 0, 0, 0,
  245. 10, 0, 10, 0, 0, nil, "",
  246. },
  247. "too many job failures": {
  248. 2, 5, 0, true, 0,
  249. nil, true, 0, 0, 0, 1,
  250. 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
  251. },
  252. }
  253. for name, tc := range testCases {
  254. // job manager setup
  255. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  256. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  257. fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
  258. manager.podControl = &fakePodControl
  259. manager.podStoreSynced = alwaysReady
  260. manager.jobStoreSynced = alwaysReady
  261. var actual *batch.Job
  262. manager.updateHandler = func(job *batch.Job) error {
  263. actual = job
  264. return nil
  265. }
  266. // job & pods setup
  267. job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
  268. if tc.deleting {
  269. now := metav1.Now()
  270. job.DeletionTimestamp = &now
  271. }
  272. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  273. podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
  274. setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods)
  275. // run
  276. forget, err := manager.syncJob(testutil.GetKey(job, t))
  277. // We need requeue syncJob task if podController error
  278. if tc.podControllerError != nil {
  279. if err == nil {
  280. t.Errorf("%s: Syncing jobs would return error when podController exception", name)
  281. }
  282. } else {
  283. if err != nil && (tc.podLimit == 0 || fakePodControl.CreateCallCount < tc.podLimit) {
  284. t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
  285. }
  286. }
  287. if forget != tc.jobKeyForget {
  288. t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget)
  289. }
  290. // validate created/deleted pods
  291. if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
  292. t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
  293. }
  294. if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
  295. t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
  296. }
  297. // Each create should have an accompanying ControllerRef.
  298. if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
  299. t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs))
  300. }
  301. // Make sure the ControllerRefs are correct.
  302. for _, controllerRef := range fakePodControl.ControllerRefs {
  303. if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
  304. t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
  305. }
  306. if got, want := controllerRef.Kind, "Job"; got != want {
  307. t.Errorf("controllerRef.Kind = %q, want %q", got, want)
  308. }
  309. if got, want := controllerRef.Name, job.Name; got != want {
  310. t.Errorf("controllerRef.Name = %q, want %q", got, want)
  311. }
  312. if got, want := controllerRef.UID, job.UID; got != want {
  313. t.Errorf("controllerRef.UID = %q, want %q", got, want)
  314. }
  315. if controllerRef.Controller == nil || *controllerRef.Controller != true {
  316. t.Errorf("controllerRef.Controller is not set to true")
  317. }
  318. }
  319. // validate status
  320. if actual.Status.Active != tc.expectedActive {
  321. t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
  322. }
  323. if actual.Status.Succeeded != tc.expectedSucceeded {
  324. t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
  325. }
  326. if actual.Status.Failed != tc.expectedFailed {
  327. t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
  328. }
  329. if actual.Status.StartTime == nil {
  330. t.Errorf("%s: .status.startTime was not set", name)
  331. }
  332. // validate conditions
  333. if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
  334. t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
  335. }
  336. // validate slow start
  337. expectedLimit := 0
  338. for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
  339. expectedLimit += controller.SlowStartInitialBatchSize << pass
  340. }
  341. if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
  342. t.Errorf("%s: Unexpected number of create calls. Expected <= %d, saw %d\n", name, fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
  343. }
  344. }
  345. }
  346. func TestSyncJobPastDeadline(t *testing.T) {
  347. testCases := map[string]struct {
  348. // job setup
  349. parallelism int32
  350. completions int32
  351. activeDeadlineSeconds int64
  352. startTime int64
  353. backoffLimit int32
  354. // pod setup
  355. activePods int32
  356. succeededPods int32
  357. failedPods int32
  358. // expectations
  359. expectedForGetKey bool
  360. expectedDeletions int32
  361. expectedActive int32
  362. expectedSucceeded int32
  363. expectedFailed int32
  364. expectedConditionReason string
  365. }{
  366. "activeDeadlineSeconds less than single pod execution": {
  367. 1, 1, 10, 15, 6,
  368. 1, 0, 0,
  369. true, 1, 0, 0, 1, "DeadlineExceeded",
  370. },
  371. "activeDeadlineSeconds bigger than single pod execution": {
  372. 1, 2, 10, 15, 6,
  373. 1, 1, 0,
  374. true, 1, 0, 1, 1, "DeadlineExceeded",
  375. },
  376. "activeDeadlineSeconds times-out before any pod starts": {
  377. 1, 1, 10, 10, 6,
  378. 0, 0, 0,
  379. true, 0, 0, 0, 0, "DeadlineExceeded",
  380. },
  381. "activeDeadlineSeconds with backofflimit reach": {
  382. 1, 1, 1, 10, 0,
  383. 0, 0, 1,
  384. true, 0, 0, 0, 1, "BackoffLimitExceeded",
  385. },
  386. }
  387. for name, tc := range testCases {
  388. // job manager setup
  389. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  390. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  391. fakePodControl := controller.FakePodControl{}
  392. manager.podControl = &fakePodControl
  393. manager.podStoreSynced = alwaysReady
  394. manager.jobStoreSynced = alwaysReady
  395. var actual *batch.Job
  396. manager.updateHandler = func(job *batch.Job) error {
  397. actual = job
  398. return nil
  399. }
  400. // job & pods setup
  401. job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
  402. job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
  403. start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
  404. job.Status.StartTime = &start
  405. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  406. podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
  407. setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods)
  408. // run
  409. forget, err := manager.syncJob(testutil.GetKey(job, t))
  410. if err != nil {
  411. t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
  412. }
  413. if forget != tc.expectedForGetKey {
  414. t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForGetKey, forget)
  415. }
  416. // validate created/deleted pods
  417. if int32(len(fakePodControl.Templates)) != 0 {
  418. t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
  419. }
  420. if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
  421. t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
  422. }
  423. // validate status
  424. if actual.Status.Active != tc.expectedActive {
  425. t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
  426. }
  427. if actual.Status.Succeeded != tc.expectedSucceeded {
  428. t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
  429. }
  430. if actual.Status.Failed != tc.expectedFailed {
  431. t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
  432. }
  433. if actual.Status.StartTime == nil {
  434. t.Errorf("%s: .status.startTime was not set", name)
  435. }
  436. // validate conditions
  437. if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) {
  438. t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
  439. }
  440. }
  441. }
  442. func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool {
  443. for _, v := range job.Status.Conditions {
  444. if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
  445. return true
  446. }
  447. }
  448. return false
  449. }
  450. func TestSyncPastDeadlineJobFinished(t *testing.T) {
  451. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  452. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  453. fakePodControl := controller.FakePodControl{}
  454. manager.podControl = &fakePodControl
  455. manager.podStoreSynced = alwaysReady
  456. manager.jobStoreSynced = alwaysReady
  457. var actual *batch.Job
  458. manager.updateHandler = func(job *batch.Job) error {
  459. actual = job
  460. return nil
  461. }
  462. job := newJob(1, 1, 6)
  463. activeDeadlineSeconds := int64(10)
  464. job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
  465. start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
  466. job.Status.StartTime = &start
  467. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
  468. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  469. forget, err := manager.syncJob(testutil.GetKey(job, t))
  470. if err != nil {
  471. t.Errorf("Unexpected error when syncing jobs %v", err)
  472. }
  473. if !forget {
  474. t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
  475. }
  476. if len(fakePodControl.Templates) != 0 {
  477. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  478. }
  479. if len(fakePodControl.DeletePodName) != 0 {
  480. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  481. }
  482. if actual != nil {
  483. t.Error("Unexpected job modification")
  484. }
  485. }
  486. func TestSyncJobComplete(t *testing.T) {
  487. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  488. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  489. fakePodControl := controller.FakePodControl{}
  490. manager.podControl = &fakePodControl
  491. manager.podStoreSynced = alwaysReady
  492. manager.jobStoreSynced = alwaysReady
  493. job := newJob(1, 1, 6)
  494. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  495. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  496. forget, err := manager.syncJob(testutil.GetKey(job, t))
  497. if err != nil {
  498. t.Fatalf("Unexpected error when syncing jobs %v", err)
  499. }
  500. if !forget {
  501. t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
  502. }
  503. actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
  504. if err != nil {
  505. t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
  506. }
  507. // Verify that after syncing a complete job, the conditions are the same.
  508. if got, expected := len(actual.Status.Conditions), 1; got != expected {
  509. t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
  510. }
  511. }
  512. func TestSyncJobDeleted(t *testing.T) {
  513. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  514. manager, _ := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  515. fakePodControl := controller.FakePodControl{}
  516. manager.podControl = &fakePodControl
  517. manager.podStoreSynced = alwaysReady
  518. manager.jobStoreSynced = alwaysReady
  519. manager.updateHandler = func(job *batch.Job) error { return nil }
  520. job := newJob(2, 2, 6)
  521. forget, err := manager.syncJob(testutil.GetKey(job, t))
  522. if err != nil {
  523. t.Errorf("Unexpected error when syncing jobs %v", err)
  524. }
  525. if !forget {
  526. t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
  527. }
  528. if len(fakePodControl.Templates) != 0 {
  529. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  530. }
  531. if len(fakePodControl.DeletePodName) != 0 {
  532. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  533. }
  534. }
  535. func TestSyncJobUpdateRequeue(t *testing.T) {
  536. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  537. DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
  538. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  539. fakePodControl := controller.FakePodControl{}
  540. manager.podControl = &fakePodControl
  541. manager.podStoreSynced = alwaysReady
  542. manager.jobStoreSynced = alwaysReady
  543. updateError := fmt.Errorf("Update error")
  544. manager.updateHandler = func(job *batch.Job) error {
  545. manager.queue.AddRateLimited(testutil.GetKey(job, t))
  546. return updateError
  547. }
  548. job := newJob(2, 2, 6)
  549. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  550. forget, err := manager.syncJob(testutil.GetKey(job, t))
  551. if err == nil || err != updateError {
  552. t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
  553. }
  554. if forget != false {
  555. t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
  556. }
  557. t.Log("Waiting for a job in the queue")
  558. key, _ := manager.queue.Get()
  559. expectedKey := testutil.GetKey(job, t)
  560. if key != expectedKey {
  561. t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
  562. }
  563. }
  564. func TestJobPodLookup(t *testing.T) {
  565. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  566. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  567. manager.podStoreSynced = alwaysReady
  568. manager.jobStoreSynced = alwaysReady
  569. testCases := []struct {
  570. job *batch.Job
  571. pod *v1.Pod
  572. expectedName string
  573. }{
  574. // pods without labels don't match any job
  575. {
  576. job: &batch.Job{
  577. ObjectMeta: metav1.ObjectMeta{Name: "basic"},
  578. },
  579. pod: &v1.Pod{
  580. ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll},
  581. },
  582. expectedName: "",
  583. },
  584. // matching labels, different namespace
  585. {
  586. job: &batch.Job{
  587. ObjectMeta: metav1.ObjectMeta{Name: "foo"},
  588. Spec: batch.JobSpec{
  589. Selector: &metav1.LabelSelector{
  590. MatchLabels: map[string]string{"foo": "bar"},
  591. },
  592. },
  593. },
  594. pod: &v1.Pod{
  595. ObjectMeta: metav1.ObjectMeta{
  596. Name: "foo2",
  597. Namespace: "ns",
  598. Labels: map[string]string{"foo": "bar"},
  599. },
  600. },
  601. expectedName: "",
  602. },
  603. // matching ns and labels returns
  604. {
  605. job: &batch.Job{
  606. ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"},
  607. Spec: batch.JobSpec{
  608. Selector: &metav1.LabelSelector{
  609. MatchExpressions: []metav1.LabelSelectorRequirement{
  610. {
  611. Key: "foo",
  612. Operator: metav1.LabelSelectorOpIn,
  613. Values: []string{"bar"},
  614. },
  615. },
  616. },
  617. },
  618. },
  619. pod: &v1.Pod{
  620. ObjectMeta: metav1.ObjectMeta{
  621. Name: "foo3",
  622. Namespace: "ns",
  623. Labels: map[string]string{"foo": "bar"},
  624. },
  625. },
  626. expectedName: "bar",
  627. },
  628. }
  629. for _, tc := range testCases {
  630. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
  631. if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 {
  632. if got, want := len(jobs), 1; got != want {
  633. t.Errorf("len(jobs) = %v, want %v", got, want)
  634. }
  635. job := jobs[0]
  636. if tc.expectedName != job.Name {
  637. t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
  638. }
  639. } else if tc.expectedName != "" {
  640. t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name)
  641. }
  642. }
  643. }
  644. func TestGetPodsForJob(t *testing.T) {
  645. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  646. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  647. jm.podStoreSynced = alwaysReady
  648. jm.jobStoreSynced = alwaysReady
  649. job1 := newJob(1, 1, 6)
  650. job1.Name = "job1"
  651. job2 := newJob(1, 1, 6)
  652. job2.Name = "job2"
  653. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  654. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  655. pod1 := newPod("pod1", job1)
  656. pod2 := newPod("pod2", job2)
  657. pod3 := newPod("pod3", job1)
  658. // Make pod3 an orphan that doesn't match. It should be ignored.
  659. pod3.OwnerReferences = nil
  660. pod3.Labels = nil
  661. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  662. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  663. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3)
  664. pods, err := jm.getPodsForJob(job1)
  665. if err != nil {
  666. t.Fatalf("getPodsForJob() error: %v", err)
  667. }
  668. if got, want := len(pods), 1; got != want {
  669. t.Errorf("len(pods) = %v, want %v", got, want)
  670. }
  671. if got, want := pods[0].Name, "pod1"; got != want {
  672. t.Errorf("pod.Name = %v, want %v", got, want)
  673. }
  674. pods, err = jm.getPodsForJob(job2)
  675. if err != nil {
  676. t.Fatalf("getPodsForJob() error: %v", err)
  677. }
  678. if got, want := len(pods), 1; got != want {
  679. t.Errorf("len(pods) = %v, want %v", got, want)
  680. }
  681. if got, want := pods[0].Name, "pod2"; got != want {
  682. t.Errorf("pod.Name = %v, want %v", got, want)
  683. }
  684. }
  685. func TestGetPodsForJobAdopt(t *testing.T) {
  686. job1 := newJob(1, 1, 6)
  687. job1.Name = "job1"
  688. clientset := fake.NewSimpleClientset(job1)
  689. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  690. jm.podStoreSynced = alwaysReady
  691. jm.jobStoreSynced = alwaysReady
  692. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  693. pod1 := newPod("pod1", job1)
  694. pod2 := newPod("pod2", job1)
  695. // Make this pod an orphan. It should still be returned because it's adopted.
  696. pod2.OwnerReferences = nil
  697. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  698. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  699. pods, err := jm.getPodsForJob(job1)
  700. if err != nil {
  701. t.Fatalf("getPodsForJob() error: %v", err)
  702. }
  703. if got, want := len(pods), 2; got != want {
  704. t.Errorf("len(pods) = %v, want %v", got, want)
  705. }
  706. }
  707. func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
  708. job1 := newJob(1, 1, 6)
  709. job1.Name = "job1"
  710. job1.DeletionTimestamp = &metav1.Time{}
  711. clientset := fake.NewSimpleClientset(job1)
  712. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  713. jm.podStoreSynced = alwaysReady
  714. jm.jobStoreSynced = alwaysReady
  715. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  716. pod1 := newPod("pod1", job1)
  717. pod2 := newPod("pod2", job1)
  718. // Make this pod an orphan. It should not be adopted because the Job is being deleted.
  719. pod2.OwnerReferences = nil
  720. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  721. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  722. pods, err := jm.getPodsForJob(job1)
  723. if err != nil {
  724. t.Fatalf("getPodsForJob() error: %v", err)
  725. }
  726. if got, want := len(pods), 1; got != want {
  727. t.Errorf("len(pods) = %v, want %v", got, want)
  728. }
  729. if got, want := pods[0].Name, pod1.Name; got != want {
  730. t.Errorf("pod.Name = %q, want %q", got, want)
  731. }
  732. }
  733. func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
  734. job1 := newJob(1, 1, 6)
  735. job1.Name = "job1"
  736. // The up-to-date object says it's being deleted.
  737. job1.DeletionTimestamp = &metav1.Time{}
  738. clientset := fake.NewSimpleClientset(job1)
  739. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  740. jm.podStoreSynced = alwaysReady
  741. jm.jobStoreSynced = alwaysReady
  742. // The cache says it's NOT being deleted.
  743. cachedJob := *job1
  744. cachedJob.DeletionTimestamp = nil
  745. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(&cachedJob)
  746. pod1 := newPod("pod1", job1)
  747. pod2 := newPod("pod2", job1)
  748. // Make this pod an orphan. It should not be adopted because the Job is being deleted.
  749. pod2.OwnerReferences = nil
  750. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  751. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  752. pods, err := jm.getPodsForJob(job1)
  753. if err != nil {
  754. t.Fatalf("getPodsForJob() error: %v", err)
  755. }
  756. if got, want := len(pods), 1; got != want {
  757. t.Errorf("len(pods) = %v, want %v", got, want)
  758. }
  759. if got, want := pods[0].Name, pod1.Name; got != want {
  760. t.Errorf("pod.Name = %q, want %q", got, want)
  761. }
  762. }
  763. func TestGetPodsForJobRelease(t *testing.T) {
  764. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  765. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  766. jm.podStoreSynced = alwaysReady
  767. jm.jobStoreSynced = alwaysReady
  768. job1 := newJob(1, 1, 6)
  769. job1.Name = "job1"
  770. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  771. pod1 := newPod("pod1", job1)
  772. pod2 := newPod("pod2", job1)
  773. // Make this pod not match, even though it's owned. It should be released.
  774. pod2.Labels = nil
  775. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  776. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  777. pods, err := jm.getPodsForJob(job1)
  778. if err != nil {
  779. t.Fatalf("getPodsForJob() error: %v", err)
  780. }
  781. if got, want := len(pods), 1; got != want {
  782. t.Errorf("len(pods) = %v, want %v", got, want)
  783. }
  784. if got, want := pods[0].Name, "pod1"; got != want {
  785. t.Errorf("pod.Name = %v, want %v", got, want)
  786. }
  787. }
  788. func TestAddPod(t *testing.T) {
  789. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  790. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  791. jm.podStoreSynced = alwaysReady
  792. jm.jobStoreSynced = alwaysReady
  793. job1 := newJob(1, 1, 6)
  794. job1.Name = "job1"
  795. job2 := newJob(1, 1, 6)
  796. job2.Name = "job2"
  797. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  798. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  799. pod1 := newPod("pod1", job1)
  800. pod2 := newPod("pod2", job2)
  801. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  802. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  803. jm.addPod(pod1)
  804. if got, want := jm.queue.Len(), 1; got != want {
  805. t.Fatalf("queue.Len() = %v, want %v", got, want)
  806. }
  807. key, done := jm.queue.Get()
  808. if key == nil || done {
  809. t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
  810. }
  811. expectedKey, _ := controller.KeyFunc(job1)
  812. if got, want := key.(string), expectedKey; got != want {
  813. t.Errorf("queue.Get() = %v, want %v", got, want)
  814. }
  815. jm.addPod(pod2)
  816. if got, want := jm.queue.Len(), 1; got != want {
  817. t.Fatalf("queue.Len() = %v, want %v", got, want)
  818. }
  819. key, done = jm.queue.Get()
  820. if key == nil || done {
  821. t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
  822. }
  823. expectedKey, _ = controller.KeyFunc(job2)
  824. if got, want := key.(string), expectedKey; got != want {
  825. t.Errorf("queue.Get() = %v, want %v", got, want)
  826. }
  827. }
  828. func TestAddPodOrphan(t *testing.T) {
  829. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  830. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  831. jm.podStoreSynced = alwaysReady
  832. jm.jobStoreSynced = alwaysReady
  833. job1 := newJob(1, 1, 6)
  834. job1.Name = "job1"
  835. job2 := newJob(1, 1, 6)
  836. job2.Name = "job2"
  837. job3 := newJob(1, 1, 6)
  838. job3.Name = "job3"
  839. job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
  840. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  841. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  842. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
  843. pod1 := newPod("pod1", job1)
  844. // Make pod an orphan. Expect all matching controllers to be queued.
  845. pod1.OwnerReferences = nil
  846. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  847. jm.addPod(pod1)
  848. if got, want := jm.queue.Len(), 2; got != want {
  849. t.Fatalf("queue.Len() = %v, want %v", got, want)
  850. }
  851. }
  852. func TestUpdatePod(t *testing.T) {
  853. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  854. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  855. jm.podStoreSynced = alwaysReady
  856. jm.jobStoreSynced = alwaysReady
  857. job1 := newJob(1, 1, 6)
  858. job1.Name = "job1"
  859. job2 := newJob(1, 1, 6)
  860. job2.Name = "job2"
  861. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  862. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  863. pod1 := newPod("pod1", job1)
  864. pod2 := newPod("pod2", job2)
  865. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  866. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  867. prev := *pod1
  868. bumpResourceVersion(pod1)
  869. jm.updatePod(&prev, pod1)
  870. if got, want := jm.queue.Len(), 1; got != want {
  871. t.Fatalf("queue.Len() = %v, want %v", got, want)
  872. }
  873. key, done := jm.queue.Get()
  874. if key == nil || done {
  875. t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
  876. }
  877. expectedKey, _ := controller.KeyFunc(job1)
  878. if got, want := key.(string), expectedKey; got != want {
  879. t.Errorf("queue.Get() = %v, want %v", got, want)
  880. }
  881. prev = *pod2
  882. bumpResourceVersion(pod2)
  883. jm.updatePod(&prev, pod2)
  884. if got, want := jm.queue.Len(), 1; got != want {
  885. t.Fatalf("queue.Len() = %v, want %v", got, want)
  886. }
  887. key, done = jm.queue.Get()
  888. if key == nil || done {
  889. t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
  890. }
  891. expectedKey, _ = controller.KeyFunc(job2)
  892. if got, want := key.(string), expectedKey; got != want {
  893. t.Errorf("queue.Get() = %v, want %v", got, want)
  894. }
  895. }
  896. func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
  897. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  898. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  899. jm.podStoreSynced = alwaysReady
  900. jm.jobStoreSynced = alwaysReady
  901. job1 := newJob(1, 1, 6)
  902. job1.Name = "job1"
  903. job2 := newJob(1, 1, 6)
  904. job2.Name = "job2"
  905. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  906. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  907. pod1 := newPod("pod1", job1)
  908. pod1.OwnerReferences = nil
  909. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  910. // Labels changed on orphan. Expect newly matching controllers to queue.
  911. prev := *pod1
  912. prev.Labels = map[string]string{"foo2": "bar2"}
  913. bumpResourceVersion(pod1)
  914. jm.updatePod(&prev, pod1)
  915. if got, want := jm.queue.Len(), 2; got != want {
  916. t.Fatalf("queue.Len() = %v, want %v", got, want)
  917. }
  918. }
  919. func TestUpdatePodChangeControllerRef(t *testing.T) {
  920. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  921. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  922. jm.podStoreSynced = alwaysReady
  923. jm.jobStoreSynced = alwaysReady
  924. job1 := newJob(1, 1, 6)
  925. job1.Name = "job1"
  926. job2 := newJob(1, 1, 6)
  927. job2.Name = "job2"
  928. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  929. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  930. pod1 := newPod("pod1", job1)
  931. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  932. // Changed ControllerRef. Expect both old and new to queue.
  933. prev := *pod1
  934. prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
  935. bumpResourceVersion(pod1)
  936. jm.updatePod(&prev, pod1)
  937. if got, want := jm.queue.Len(), 2; got != want {
  938. t.Fatalf("queue.Len() = %v, want %v", got, want)
  939. }
  940. }
  941. func TestUpdatePodRelease(t *testing.T) {
  942. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  943. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  944. jm.podStoreSynced = alwaysReady
  945. jm.jobStoreSynced = alwaysReady
  946. job1 := newJob(1, 1, 6)
  947. job1.Name = "job1"
  948. job2 := newJob(1, 1, 6)
  949. job2.Name = "job2"
  950. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  951. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  952. pod1 := newPod("pod1", job1)
  953. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  954. // Remove ControllerRef. Expect all matching to queue for adoption.
  955. prev := *pod1
  956. pod1.OwnerReferences = nil
  957. bumpResourceVersion(pod1)
  958. jm.updatePod(&prev, pod1)
  959. if got, want := jm.queue.Len(), 2; got != want {
  960. t.Fatalf("queue.Len() = %v, want %v", got, want)
  961. }
  962. }
  963. func TestDeletePod(t *testing.T) {
  964. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  965. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  966. jm.podStoreSynced = alwaysReady
  967. jm.jobStoreSynced = alwaysReady
  968. job1 := newJob(1, 1, 6)
  969. job1.Name = "job1"
  970. job2 := newJob(1, 1, 6)
  971. job2.Name = "job2"
  972. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  973. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  974. pod1 := newPod("pod1", job1)
  975. pod2 := newPod("pod2", job2)
  976. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  977. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
  978. jm.deletePod(pod1)
  979. if got, want := jm.queue.Len(), 1; got != want {
  980. t.Fatalf("queue.Len() = %v, want %v", got, want)
  981. }
  982. key, done := jm.queue.Get()
  983. if key == nil || done {
  984. t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
  985. }
  986. expectedKey, _ := controller.KeyFunc(job1)
  987. if got, want := key.(string), expectedKey; got != want {
  988. t.Errorf("queue.Get() = %v, want %v", got, want)
  989. }
  990. jm.deletePod(pod2)
  991. if got, want := jm.queue.Len(), 1; got != want {
  992. t.Fatalf("queue.Len() = %v, want %v", got, want)
  993. }
  994. key, done = jm.queue.Get()
  995. if key == nil || done {
  996. t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
  997. }
  998. expectedKey, _ = controller.KeyFunc(job2)
  999. if got, want := key.(string), expectedKey; got != want {
  1000. t.Errorf("queue.Get() = %v, want %v", got, want)
  1001. }
  1002. }
  1003. func TestDeletePodOrphan(t *testing.T) {
  1004. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1005. jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1006. jm.podStoreSynced = alwaysReady
  1007. jm.jobStoreSynced = alwaysReady
  1008. job1 := newJob(1, 1, 6)
  1009. job1.Name = "job1"
  1010. job2 := newJob(1, 1, 6)
  1011. job2.Name = "job2"
  1012. job3 := newJob(1, 1, 6)
  1013. job3.Name = "job3"
  1014. job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
  1015. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
  1016. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
  1017. informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
  1018. pod1 := newPod("pod1", job1)
  1019. pod1.OwnerReferences = nil
  1020. informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  1021. jm.deletePod(pod1)
  1022. if got, want := jm.queue.Len(), 0; got != want {
  1023. t.Fatalf("queue.Len() = %v, want %v", got, want)
  1024. }
  1025. }
  1026. type FakeJobExpectations struct {
  1027. *controller.ControllerExpectations
  1028. satisfied bool
  1029. expSatisfied func()
  1030. }
  1031. func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
  1032. fe.expSatisfied()
  1033. return fe.satisfied
  1034. }
  1035. // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
  1036. // and checking expectations.
  1037. func TestSyncJobExpectations(t *testing.T) {
  1038. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1039. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1040. fakePodControl := controller.FakePodControl{}
  1041. manager.podControl = &fakePodControl
  1042. manager.podStoreSynced = alwaysReady
  1043. manager.jobStoreSynced = alwaysReady
  1044. manager.updateHandler = func(job *batch.Job) error { return nil }
  1045. job := newJob(2, 2, 6)
  1046. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  1047. pods := newPodList(2, v1.PodPending, job)
  1048. podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
  1049. podIndexer.Add(&pods[0])
  1050. manager.expectations = FakeJobExpectations{
  1051. controller.NewControllerExpectations(), true, func() {
  1052. // If we check active pods before checking expectataions, the job
  1053. // will create a new replica because it doesn't see this pod, but
  1054. // has fulfilled its expectations.
  1055. podIndexer.Add(&pods[1])
  1056. },
  1057. }
  1058. manager.syncJob(testutil.GetKey(job, t))
  1059. if len(fakePodControl.Templates) != 0 {
  1060. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  1061. }
  1062. if len(fakePodControl.DeletePodName) != 0 {
  1063. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  1064. }
  1065. }
  1066. func TestWatchJobs(t *testing.T) {
  1067. clientset := fake.NewSimpleClientset()
  1068. fakeWatch := watch.NewFake()
  1069. clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
  1070. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1071. manager.podStoreSynced = alwaysReady
  1072. manager.jobStoreSynced = alwaysReady
  1073. var testJob batch.Job
  1074. received := make(chan struct{})
  1075. // The update sent through the fakeWatcher should make its way into the workqueue,
  1076. // and eventually into the syncHandler.
  1077. manager.syncHandler = func(key string) (bool, error) {
  1078. defer close(received)
  1079. ns, name, err := cache.SplitMetaNamespaceKey(key)
  1080. if err != nil {
  1081. t.Errorf("Error getting namespace/name from key %v: %v", key, err)
  1082. }
  1083. job, err := manager.jobLister.Jobs(ns).Get(name)
  1084. if err != nil || job == nil {
  1085. t.Errorf("Expected to find job under key %v: %v", key, err)
  1086. return true, nil
  1087. }
  1088. if !apiequality.Semantic.DeepDerivative(*job, testJob) {
  1089. t.Errorf("Expected %#v, but got %#v", testJob, *job)
  1090. }
  1091. return true, nil
  1092. }
  1093. // Start only the job watcher and the workqueue, send a watch event,
  1094. // and make sure it hits the sync method.
  1095. stopCh := make(chan struct{})
  1096. defer close(stopCh)
  1097. sharedInformerFactory.Start(stopCh)
  1098. go manager.Run(1, stopCh)
  1099. // We're sending new job to see if it reaches syncHandler.
  1100. testJob.Namespace = "bar"
  1101. testJob.Name = "foo"
  1102. fakeWatch.Add(&testJob)
  1103. t.Log("Waiting for job to reach syncHandler")
  1104. <-received
  1105. }
  1106. func TestWatchPods(t *testing.T) {
  1107. testJob := newJob(2, 2, 6)
  1108. clientset := fake.NewSimpleClientset(testJob)
  1109. fakeWatch := watch.NewFake()
  1110. clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
  1111. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1112. manager.podStoreSynced = alwaysReady
  1113. manager.jobStoreSynced = alwaysReady
  1114. // Put one job and one pod into the store
  1115. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(testJob)
  1116. received := make(chan struct{})
  1117. // The pod update sent through the fakeWatcher should figure out the managing job and
  1118. // send it into the syncHandler.
  1119. manager.syncHandler = func(key string) (bool, error) {
  1120. ns, name, err := cache.SplitMetaNamespaceKey(key)
  1121. if err != nil {
  1122. t.Errorf("Error getting namespace/name from key %v: %v", key, err)
  1123. }
  1124. job, err := manager.jobLister.Jobs(ns).Get(name)
  1125. if err != nil {
  1126. t.Errorf("Expected to find job under key %v: %v", key, err)
  1127. }
  1128. if !apiequality.Semantic.DeepDerivative(job, testJob) {
  1129. t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
  1130. close(received)
  1131. return true, nil
  1132. }
  1133. close(received)
  1134. return true, nil
  1135. }
  1136. // Start only the pod watcher and the workqueue, send a watch event,
  1137. // and make sure it hits the sync method for the right job.
  1138. stopCh := make(chan struct{})
  1139. defer close(stopCh)
  1140. go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
  1141. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  1142. pods := newPodList(1, v1.PodRunning, testJob)
  1143. testPod := pods[0]
  1144. testPod.Status.Phase = v1.PodFailed
  1145. fakeWatch.Add(&testPod)
  1146. t.Log("Waiting for pod to reach syncHandler")
  1147. <-received
  1148. }
  1149. func bumpResourceVersion(obj metav1.Object) {
  1150. ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
  1151. obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
  1152. }
  1153. type pods struct {
  1154. pending int32
  1155. active int32
  1156. succeed int32
  1157. failed int32
  1158. }
  1159. func TestJobBackoffReset(t *testing.T) {
  1160. testCases := map[string]struct {
  1161. // job setup
  1162. parallelism int32
  1163. completions int32
  1164. backoffLimit int32
  1165. // pod setup - each row is additive!
  1166. pods []pods
  1167. }{
  1168. "parallelism=1": {
  1169. 1, 2, 1,
  1170. []pods{
  1171. {0, 1, 0, 1},
  1172. {0, 0, 1, 0},
  1173. },
  1174. },
  1175. "parallelism=2 (just failure)": {
  1176. 2, 2, 1,
  1177. []pods{
  1178. {0, 2, 0, 1},
  1179. {0, 0, 1, 0},
  1180. },
  1181. },
  1182. }
  1183. for name, tc := range testCases {
  1184. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1185. DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
  1186. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1187. fakePodControl := controller.FakePodControl{}
  1188. manager.podControl = &fakePodControl
  1189. manager.podStoreSynced = alwaysReady
  1190. manager.jobStoreSynced = alwaysReady
  1191. var actual *batch.Job
  1192. manager.updateHandler = func(job *batch.Job) error {
  1193. actual = job
  1194. return nil
  1195. }
  1196. // job & pods setup
  1197. job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
  1198. key := testutil.GetKey(job, t)
  1199. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  1200. podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
  1201. setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed)
  1202. manager.queue.Add(key)
  1203. manager.processNextWorkItem()
  1204. retries := manager.queue.NumRequeues(key)
  1205. if retries != 1 {
  1206. t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
  1207. }
  1208. job = actual
  1209. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
  1210. setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed)
  1211. manager.processNextWorkItem()
  1212. retries = manager.queue.NumRequeues(key)
  1213. if retries != 0 {
  1214. t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
  1215. }
  1216. if getCondition(actual, batch.JobFailed, "BackoffLimitExceeded") {
  1217. t.Errorf("%s: unexpected job failure", name)
  1218. }
  1219. }
  1220. }
  1221. var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
  1222. type fakeRateLimitingQueue struct {
  1223. workqueue.Interface
  1224. requeues int
  1225. item interface{}
  1226. duration time.Duration
  1227. }
  1228. func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
  1229. func (f *fakeRateLimitingQueue) Forget(item interface{}) {
  1230. f.requeues = 0
  1231. }
  1232. func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
  1233. return f.requeues
  1234. }
  1235. func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
  1236. f.item = item
  1237. f.duration = duration
  1238. }
  1239. func TestJobBackoff(t *testing.T) {
  1240. job := newJob(1, 1, 1)
  1241. oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
  1242. oldPod.Status.Phase = v1.PodRunning
  1243. oldPod.ResourceVersion = "1"
  1244. newPod := oldPod.DeepCopy()
  1245. newPod.ResourceVersion = "2"
  1246. testCases := map[string]struct {
  1247. // inputs
  1248. requeues int
  1249. phase v1.PodPhase
  1250. // expectation
  1251. backoff int
  1252. }{
  1253. "1st failure": {0, v1.PodFailed, 0},
  1254. "2nd failure": {1, v1.PodFailed, 1},
  1255. "3rd failure": {2, v1.PodFailed, 2},
  1256. "1st success": {0, v1.PodSucceeded, 0},
  1257. "2nd success": {1, v1.PodSucceeded, 0},
  1258. "1st running": {0, v1.PodSucceeded, 0},
  1259. "2nd running": {1, v1.PodSucceeded, 0},
  1260. }
  1261. for name, tc := range testCases {
  1262. t.Run(name, func(t *testing.T) {
  1263. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1264. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1265. fakePodControl := controller.FakePodControl{}
  1266. manager.podControl = &fakePodControl
  1267. manager.podStoreSynced = alwaysReady
  1268. manager.jobStoreSynced = alwaysReady
  1269. queue := &fakeRateLimitingQueue{}
  1270. manager.queue = queue
  1271. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  1272. queue.requeues = tc.requeues
  1273. newPod.Status.Phase = tc.phase
  1274. manager.updatePod(oldPod, newPod)
  1275. if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
  1276. t.Errorf("unexpected backoff %v", queue.duration)
  1277. }
  1278. })
  1279. }
  1280. }
  1281. func TestJobBackoffForOnFailure(t *testing.T) {
  1282. jobConditionFailed := batch.JobFailed
  1283. testCases := map[string]struct {
  1284. // job setup
  1285. parallelism int32
  1286. completions int32
  1287. backoffLimit int32
  1288. // pod setup
  1289. jobKeyForget bool
  1290. restartCounts []int32
  1291. podPhase v1.PodPhase
  1292. // expectations
  1293. expectedActive int32
  1294. expectedSucceeded int32
  1295. expectedFailed int32
  1296. expectedCondition *batch.JobConditionType
  1297. expectedConditionReason string
  1298. }{
  1299. "backoffLimit 0 should have 1 pod active": {
  1300. 1, 1, 0,
  1301. true, []int32{0}, v1.PodRunning,
  1302. 1, 0, 0, nil, "",
  1303. },
  1304. "backoffLimit 1 with restartCount 0 should have 1 pod active": {
  1305. 1, 1, 1,
  1306. true, []int32{0}, v1.PodRunning,
  1307. 1, 0, 0, nil, "",
  1308. },
  1309. "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
  1310. 1, 1, 1,
  1311. true, []int32{1}, v1.PodRunning,
  1312. 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
  1313. },
  1314. "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
  1315. 1, 1, 1,
  1316. true, []int32{1}, v1.PodPending,
  1317. 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
  1318. },
  1319. "too many job failures with podRunning - single pod": {
  1320. 1, 5, 2,
  1321. true, []int32{2}, v1.PodRunning,
  1322. 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
  1323. },
  1324. "too many job failures with podPending - single pod": {
  1325. 1, 5, 2,
  1326. true, []int32{2}, v1.PodPending,
  1327. 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
  1328. },
  1329. "too many job failures with podRunning - multiple pods": {
  1330. 2, 5, 2,
  1331. true, []int32{1, 1}, v1.PodRunning,
  1332. 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
  1333. },
  1334. "too many job failures with podPending - multiple pods": {
  1335. 2, 5, 2,
  1336. true, []int32{1, 1}, v1.PodPending,
  1337. 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
  1338. },
  1339. "not enough failures": {
  1340. 2, 5, 3,
  1341. true, []int32{1, 1}, v1.PodRunning,
  1342. 2, 0, 0, nil, "",
  1343. },
  1344. }
  1345. for name, tc := range testCases {
  1346. t.Run(name, func(t *testing.T) {
  1347. // job manager setup
  1348. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1349. manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  1350. fakePodControl := controller.FakePodControl{}
  1351. manager.podControl = &fakePodControl
  1352. manager.podStoreSynced = alwaysReady
  1353. manager.jobStoreSynced = alwaysReady
  1354. var actual *batch.Job
  1355. manager.updateHandler = func(job *batch.Job) error {
  1356. actual = job
  1357. return nil
  1358. }
  1359. // job & pods setup
  1360. job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
  1361. job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
  1362. sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
  1363. podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
  1364. for i, pod := range newPodList(int32(len(tc.restartCounts)), tc.podPhase, job) {
  1365. pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
  1366. podIndexer.Add(&pod)
  1367. }
  1368. // run
  1369. forget, err := manager.syncJob(testutil.GetKey(job, t))
  1370. if err != nil {
  1371. t.Errorf("unexpected error syncing job. Got %#v", err)
  1372. }
  1373. if forget != tc.jobKeyForget {
  1374. t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
  1375. }
  1376. // validate status
  1377. if actual.Status.Active != tc.expectedActive {
  1378. t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
  1379. }
  1380. if actual.Status.Succeeded != tc.expectedSucceeded {
  1381. t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
  1382. }
  1383. if actual.Status.Failed != tc.expectedFailed {
  1384. t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
  1385. }
  1386. // validate conditions
  1387. if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
  1388. t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
  1389. }
  1390. })
  1391. }
  1392. }