job_controller.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package job
  14. import (
  15. "fmt"
  16. "math"
  17. "reflect"
  18. "sort"
  19. "sync"
  20. "time"
  21. batch "k8s.io/api/batch/v1"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/labels"
  26. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. batchinformers "k8s.io/client-go/informers/batch/v1"
  29. coreinformers "k8s.io/client-go/informers/core/v1"
  30. clientset "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/kubernetes/scheme"
  32. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  33. batchv1listers "k8s.io/client-go/listers/batch/v1"
  34. corelisters "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/tools/cache"
  36. "k8s.io/client-go/tools/record"
  37. "k8s.io/client-go/util/workqueue"
  38. "k8s.io/kubernetes/pkg/controller"
  39. "k8s.io/kubernetes/pkg/util/metrics"
  40. "k8s.io/utils/integer"
  41. "k8s.io/klog"
  42. )
  43. const statusUpdateRetries = 3
  44. // controllerKind contains the schema.GroupVersionKind for this controller type.
  45. var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
  46. var (
  47. // DefaultJobBackOff is the max backoff period, exported for the e2e test
  48. DefaultJobBackOff = 10 * time.Second
  49. // MaxJobBackOff is the max backoff period, exported for the e2e test
  50. MaxJobBackOff = 360 * time.Second
  51. )
  52. type JobController struct {
  53. kubeClient clientset.Interface
  54. podControl controller.PodControlInterface
  55. // To allow injection of updateJobStatus for testing.
  56. updateHandler func(job *batch.Job) error
  57. syncHandler func(jobKey string) (bool, error)
  58. // podStoreSynced returns true if the pod store has been synced at least once.
  59. // Added as a member to the struct to allow injection for testing.
  60. podStoreSynced cache.InformerSynced
  61. // jobStoreSynced returns true if the job store has been synced at least once.
  62. // Added as a member to the struct to allow injection for testing.
  63. jobStoreSynced cache.InformerSynced
  64. // A TTLCache of pod creates/deletes each rc expects to see
  65. expectations controller.ControllerExpectationsInterface
  66. // A store of jobs
  67. jobLister batchv1listers.JobLister
  68. // A store of pods, populated by the podController
  69. podStore corelisters.PodLister
  70. // Jobs that need to be updated
  71. queue workqueue.RateLimitingInterface
  72. recorder record.EventRecorder
  73. }
  74. func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
  75. eventBroadcaster := record.NewBroadcaster()
  76. eventBroadcaster.StartLogging(klog.Infof)
  77. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  78. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  79. metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
  80. }
  81. jm := &JobController{
  82. kubeClient: kubeClient,
  83. podControl: controller.RealPodControl{
  84. KubeClient: kubeClient,
  85. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
  86. },
  87. expectations: controller.NewControllerExpectations(),
  88. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
  89. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
  90. }
  91. jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  92. AddFunc: func(obj interface{}) {
  93. jm.enqueueController(obj, true)
  94. },
  95. UpdateFunc: jm.updateJob,
  96. DeleteFunc: func(obj interface{}) {
  97. jm.enqueueController(obj, true)
  98. },
  99. })
  100. jm.jobLister = jobInformer.Lister()
  101. jm.jobStoreSynced = jobInformer.Informer().HasSynced
  102. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  103. AddFunc: jm.addPod,
  104. UpdateFunc: jm.updatePod,
  105. DeleteFunc: jm.deletePod,
  106. })
  107. jm.podStore = podInformer.Lister()
  108. jm.podStoreSynced = podInformer.Informer().HasSynced
  109. jm.updateHandler = jm.updateJobStatus
  110. jm.syncHandler = jm.syncJob
  111. return jm
  112. }
  113. // Run the main goroutine responsible for watching and syncing jobs.
  114. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
  115. defer utilruntime.HandleCrash()
  116. defer jm.queue.ShutDown()
  117. klog.Infof("Starting job controller")
  118. defer klog.Infof("Shutting down job controller")
  119. if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
  120. return
  121. }
  122. for i := 0; i < workers; i++ {
  123. go wait.Until(jm.worker, time.Second, stopCh)
  124. }
  125. <-stopCh
  126. }
  127. // getPodJobs returns a list of Jobs that potentially match a Pod.
  128. func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
  129. jobs, err := jm.jobLister.GetPodJobs(pod)
  130. if err != nil {
  131. return nil
  132. }
  133. if len(jobs) > 1 {
  134. // ControllerRef will ensure we don't do anything crazy, but more than one
  135. // item in this list nevertheless constitutes user error.
  136. utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
  137. }
  138. ret := make([]*batch.Job, 0, len(jobs))
  139. for i := range jobs {
  140. ret = append(ret, &jobs[i])
  141. }
  142. return ret
  143. }
  144. // resolveControllerRef returns the controller referenced by a ControllerRef,
  145. // or nil if the ControllerRef could not be resolved to a matching controller
  146. // of the correct Kind.
  147. func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
  148. // We can't look up by UID, so look up by Name and then verify UID.
  149. // Don't even try to look up by Name if it's the wrong Kind.
  150. if controllerRef.Kind != controllerKind.Kind {
  151. return nil
  152. }
  153. job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name)
  154. if err != nil {
  155. return nil
  156. }
  157. if job.UID != controllerRef.UID {
  158. // The controller we found with this Name is not the same one that the
  159. // ControllerRef points to.
  160. return nil
  161. }
  162. return job
  163. }
  164. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  165. func (jm *JobController) addPod(obj interface{}) {
  166. pod := obj.(*v1.Pod)
  167. if pod.DeletionTimestamp != nil {
  168. // on a restart of the controller controller, it's possible a new pod shows up in a state that
  169. // is already pending deletion. Prevent the pod from being a creation observation.
  170. jm.deletePod(pod)
  171. return
  172. }
  173. // If it has a ControllerRef, that's all that matters.
  174. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  175. job := jm.resolveControllerRef(pod.Namespace, controllerRef)
  176. if job == nil {
  177. return
  178. }
  179. jobKey, err := controller.KeyFunc(job)
  180. if err != nil {
  181. return
  182. }
  183. jm.expectations.CreationObserved(jobKey)
  184. jm.enqueueController(job, true)
  185. return
  186. }
  187. // Otherwise, it's an orphan. Get a list of all matching controllers and sync
  188. // them to see if anyone wants to adopt it.
  189. // DO NOT observe creation because no controller should be waiting for an
  190. // orphan.
  191. for _, job := range jm.getPodJobs(pod) {
  192. jm.enqueueController(job, true)
  193. }
  194. }
  195. // When a pod is updated, figure out what job/s manage it and wake them up.
  196. // If the labels of the pod have changed we need to awaken both the old
  197. // and new job. old and cur must be *v1.Pod types.
  198. func (jm *JobController) updatePod(old, cur interface{}) {
  199. curPod := cur.(*v1.Pod)
  200. oldPod := old.(*v1.Pod)
  201. if curPod.ResourceVersion == oldPod.ResourceVersion {
  202. // Periodic resync will send update events for all known pods.
  203. // Two different versions of the same pod will always have different RVs.
  204. return
  205. }
  206. if curPod.DeletionTimestamp != nil {
  207. // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
  208. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  209. // for modification of the deletion timestamp and expect an job to create more pods asap, not wait
  210. // until the kubelet actually deletes the pod.
  211. jm.deletePod(curPod)
  212. return
  213. }
  214. // the only time we want the backoff to kick-in, is when the pod failed
  215. immediate := curPod.Status.Phase != v1.PodFailed
  216. curControllerRef := metav1.GetControllerOf(curPod)
  217. oldControllerRef := metav1.GetControllerOf(oldPod)
  218. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  219. if controllerRefChanged && oldControllerRef != nil {
  220. // The ControllerRef was changed. Sync the old controller, if any.
  221. if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
  222. jm.enqueueController(job, immediate)
  223. }
  224. }
  225. // If it has a ControllerRef, that's all that matters.
  226. if curControllerRef != nil {
  227. job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
  228. if job == nil {
  229. return
  230. }
  231. jm.enqueueController(job, immediate)
  232. return
  233. }
  234. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  235. // to see if anyone wants to adopt it now.
  236. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  237. if labelChanged || controllerRefChanged {
  238. for _, job := range jm.getPodJobs(curPod) {
  239. jm.enqueueController(job, immediate)
  240. }
  241. }
  242. }
  243. // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
  244. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  245. func (jm *JobController) deletePod(obj interface{}) {
  246. pod, ok := obj.(*v1.Pod)
  247. // When a delete is dropped, the relist will notice a pod in the store not
  248. // in the list, leading to the insertion of a tombstone object which contains
  249. // the deleted key/value. Note that this value might be stale. If the pod
  250. // changed labels the new job will not be woken up till the periodic resync.
  251. if !ok {
  252. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  253. if !ok {
  254. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  255. return
  256. }
  257. pod, ok = tombstone.Obj.(*v1.Pod)
  258. if !ok {
  259. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
  260. return
  261. }
  262. }
  263. controllerRef := metav1.GetControllerOf(pod)
  264. if controllerRef == nil {
  265. // No controller should care about orphans being deleted.
  266. return
  267. }
  268. job := jm.resolveControllerRef(pod.Namespace, controllerRef)
  269. if job == nil {
  270. return
  271. }
  272. jobKey, err := controller.KeyFunc(job)
  273. if err != nil {
  274. return
  275. }
  276. jm.expectations.DeletionObserved(jobKey)
  277. jm.enqueueController(job, true)
  278. }
  279. func (jm *JobController) updateJob(old, cur interface{}) {
  280. oldJob := old.(*batch.Job)
  281. curJob := cur.(*batch.Job)
  282. // never return error
  283. key, err := controller.KeyFunc(curJob)
  284. if err != nil {
  285. return
  286. }
  287. jm.enqueueController(curJob, true)
  288. // check if need to add a new rsync for ActiveDeadlineSeconds
  289. if curJob.Status.StartTime != nil {
  290. curADS := curJob.Spec.ActiveDeadlineSeconds
  291. if curADS == nil {
  292. return
  293. }
  294. oldADS := oldJob.Spec.ActiveDeadlineSeconds
  295. if oldADS == nil || *oldADS != *curADS {
  296. now := metav1.Now()
  297. start := curJob.Status.StartTime.Time
  298. passed := now.Time.Sub(start)
  299. total := time.Duration(*curADS) * time.Second
  300. // AddAfter will handle total < passed
  301. jm.queue.AddAfter(key, total-passed)
  302. klog.V(4).Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed)
  303. }
  304. }
  305. }
  306. // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
  307. // immediate tells the controller to update the status right away, and should
  308. // happen ONLY when there was a successful pod run.
  309. func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
  310. key, err := controller.KeyFunc(obj)
  311. if err != nil {
  312. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  313. return
  314. }
  315. backoff := time.Duration(0)
  316. if !immediate {
  317. backoff = getBackoff(jm.queue, key)
  318. }
  319. // TODO: Handle overlapping controllers better. Either disallow them at admission time or
  320. // deterministically avoid syncing controllers that fight over pods. Currently, we only
  321. // ensure that the same controller is synced for a given pod. When we periodically relist
  322. // all controllers there will still be some replica instability. One way to handle this is
  323. // by querying the store for all controllers that this rc overlaps, as well as all
  324. // controllers that overlap this rc, and sorting them.
  325. jm.queue.AddAfter(key, backoff)
  326. }
  327. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  328. // It enforces that the syncHandler is never invoked concurrently with the same key.
  329. func (jm *JobController) worker() {
  330. for jm.processNextWorkItem() {
  331. }
  332. }
  333. func (jm *JobController) processNextWorkItem() bool {
  334. key, quit := jm.queue.Get()
  335. if quit {
  336. return false
  337. }
  338. defer jm.queue.Done(key)
  339. forget, err := jm.syncHandler(key.(string))
  340. if err == nil {
  341. if forget {
  342. jm.queue.Forget(key)
  343. }
  344. return true
  345. }
  346. utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
  347. jm.queue.AddRateLimited(key)
  348. return true
  349. }
  350. // getPodsForJob returns the set of pods that this Job should manage.
  351. // It also reconciles ControllerRef by adopting/orphaning.
  352. // Note that the returned Pods are pointers into the cache.
  353. func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
  354. selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
  355. if err != nil {
  356. return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
  357. }
  358. // List all pods to include those that don't match the selector anymore
  359. // but have a ControllerRef pointing to this controller.
  360. pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
  361. if err != nil {
  362. return nil, err
  363. }
  364. // If any adoptions are attempted, we should first recheck for deletion
  365. // with an uncached quorum read sometime after listing Pods (see #42639).
  366. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  367. fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{})
  368. if err != nil {
  369. return nil, err
  370. }
  371. if fresh.UID != j.UID {
  372. return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)
  373. }
  374. return fresh, nil
  375. })
  376. cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc)
  377. return cm.ClaimPods(pods)
  378. }
  379. // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
  380. // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
  381. // concurrently with the same key.
  382. func (jm *JobController) syncJob(key string) (bool, error) {
  383. startTime := time.Now()
  384. defer func() {
  385. klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
  386. }()
  387. ns, name, err := cache.SplitMetaNamespaceKey(key)
  388. if err != nil {
  389. return false, err
  390. }
  391. if len(ns) == 0 || len(name) == 0 {
  392. return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
  393. }
  394. sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
  395. if err != nil {
  396. if errors.IsNotFound(err) {
  397. klog.V(4).Infof("Job has been deleted: %v", key)
  398. jm.expectations.DeleteExpectations(key)
  399. return true, nil
  400. }
  401. return false, err
  402. }
  403. job := *sharedJob
  404. // if job was finished previously, we don't want to redo the termination
  405. if IsJobFinished(&job) {
  406. return true, nil
  407. }
  408. // retrieve the previous number of retry
  409. previousRetry := jm.queue.NumRequeues(key)
  410. // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
  411. // and update the expectations after we've retrieved active pods from the store. If a new pod enters
  412. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
  413. jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
  414. pods, err := jm.getPodsForJob(&job)
  415. if err != nil {
  416. return false, err
  417. }
  418. activePods := controller.FilterActivePods(pods)
  419. active := int32(len(activePods))
  420. succeeded, failed := getStatus(pods)
  421. conditions := len(job.Status.Conditions)
  422. // job first start
  423. if job.Status.StartTime == nil {
  424. now := metav1.Now()
  425. job.Status.StartTime = &now
  426. // enqueue a sync to check if job past ActiveDeadlineSeconds
  427. if job.Spec.ActiveDeadlineSeconds != nil {
  428. klog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
  429. key, *job.Spec.ActiveDeadlineSeconds)
  430. jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
  431. }
  432. }
  433. var manageJobErr error
  434. jobFailed := false
  435. var failureReason string
  436. var failureMessage string
  437. jobHaveNewFailure := failed > job.Status.Failed
  438. // new failures happen when status does not reflect the failures and active
  439. // is different than parallelism, otherwise the previous controller loop
  440. // failed updating status so even if we pick up failure it is not a new one
  441. exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
  442. (int32(previousRetry)+1 > *job.Spec.BackoffLimit)
  443. if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
  444. // check if the number of pod restart exceeds backoff (for restart OnFailure only)
  445. // OR if the number of failed jobs increased since the last syncJob
  446. jobFailed = true
  447. failureReason = "BackoffLimitExceeded"
  448. failureMessage = "Job has reached the specified backoff limit"
  449. } else if pastActiveDeadline(&job) {
  450. jobFailed = true
  451. failureReason = "DeadlineExceeded"
  452. failureMessage = "Job was active longer than specified deadline"
  453. }
  454. if jobFailed {
  455. errCh := make(chan error, active)
  456. jm.deleteJobPods(&job, activePods, errCh)
  457. select {
  458. case manageJobErr = <-errCh:
  459. if manageJobErr != nil {
  460. break
  461. }
  462. default:
  463. }
  464. // update status values accordingly
  465. failed += active
  466. active = 0
  467. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
  468. jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
  469. } else {
  470. if jobNeedsSync && job.DeletionTimestamp == nil {
  471. active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
  472. }
  473. completions := succeeded
  474. complete := false
  475. if job.Spec.Completions == nil {
  476. // This type of job is complete when any pod exits with success.
  477. // Each pod is capable of
  478. // determining whether or not the entire Job is done. Subsequent pods are
  479. // not expected to fail, but if they do, the failure is ignored. Once any
  480. // pod succeeds, the controller waits for remaining pods to finish, and
  481. // then the job is complete.
  482. if succeeded > 0 && active == 0 {
  483. complete = true
  484. }
  485. } else {
  486. // Job specifies a number of completions. This type of job signals
  487. // success by having that number of successes. Since we do not
  488. // start more pods than there are remaining completions, there should
  489. // not be any remaining active pods once this count is reached.
  490. if completions >= *job.Spec.Completions {
  491. complete = true
  492. if active > 0 {
  493. jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
  494. }
  495. if completions > *job.Spec.Completions {
  496. jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
  497. }
  498. }
  499. }
  500. if complete {
  501. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  502. now := metav1.Now()
  503. job.Status.CompletionTime = &now
  504. }
  505. }
  506. forget := false
  507. // Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
  508. // This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
  509. // improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed.
  510. // In this case, we should clear the backoff delay.
  511. if job.Status.Succeeded < succeeded {
  512. forget = true
  513. }
  514. // no need to update the job if the status hasn't changed since last time
  515. if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
  516. job.Status.Active = active
  517. job.Status.Succeeded = succeeded
  518. job.Status.Failed = failed
  519. if err := jm.updateHandler(&job); err != nil {
  520. return forget, err
  521. }
  522. if jobHaveNewFailure && !IsJobFinished(&job) {
  523. // returning an error will re-enqueue Job after the backoff period
  524. return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
  525. }
  526. forget = true
  527. }
  528. return forget, manageJobErr
  529. }
  530. func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
  531. // TODO: below code should be replaced with pod termination resulting in
  532. // pod failures, rather than killing pods. Unfortunately none such solution
  533. // exists ATM. There's an open discussion in the topic in
  534. // https://github.com/kubernetes/kubernetes/issues/14602 which might give
  535. // some sort of solution to above problem.
  536. // kill remaining active pods
  537. wait := sync.WaitGroup{}
  538. nbPods := len(pods)
  539. wait.Add(nbPods)
  540. for i := int32(0); i < int32(nbPods); i++ {
  541. go func(ix int32) {
  542. defer wait.Done()
  543. if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil {
  544. defer utilruntime.HandleError(err)
  545. klog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name)
  546. errCh <- err
  547. }
  548. }(i)
  549. }
  550. wait.Wait()
  551. }
  552. // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
  553. // this method applies only to pods with restartPolicy == OnFailure
  554. func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
  555. if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
  556. return false
  557. }
  558. result := int32(0)
  559. for i := range pods {
  560. po := pods[i]
  561. if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {
  562. for j := range po.Status.InitContainerStatuses {
  563. stat := po.Status.InitContainerStatuses[j]
  564. result += stat.RestartCount
  565. }
  566. for j := range po.Status.ContainerStatuses {
  567. stat := po.Status.ContainerStatuses[j]
  568. result += stat.RestartCount
  569. }
  570. }
  571. }
  572. if *job.Spec.BackoffLimit == 0 {
  573. return result > 0
  574. }
  575. return result >= *job.Spec.BackoffLimit
  576. }
  577. // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
  578. func pastActiveDeadline(job *batch.Job) bool {
  579. if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
  580. return false
  581. }
  582. now := metav1.Now()
  583. start := job.Status.StartTime.Time
  584. duration := now.Time.Sub(start)
  585. allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
  586. return duration >= allowedDuration
  587. }
  588. func newCondition(conditionType batch.JobConditionType, reason, message string) batch.JobCondition {
  589. return batch.JobCondition{
  590. Type: conditionType,
  591. Status: v1.ConditionTrue,
  592. LastProbeTime: metav1.Now(),
  593. LastTransitionTime: metav1.Now(),
  594. Reason: reason,
  595. Message: message,
  596. }
  597. }
  598. // getStatus returns no of succeeded and failed pods running a job
  599. func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
  600. succeeded = int32(filterPods(pods, v1.PodSucceeded))
  601. failed = int32(filterPods(pods, v1.PodFailed))
  602. return
  603. }
  604. // manageJob is the core method responsible for managing the number of running
  605. // pods according to what is specified in the job.Spec.
  606. // Does NOT modify <activePods>.
  607. func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
  608. var activeLock sync.Mutex
  609. active := int32(len(activePods))
  610. parallelism := *job.Spec.Parallelism
  611. jobKey, err := controller.KeyFunc(job)
  612. if err != nil {
  613. utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
  614. return 0, nil
  615. }
  616. var errCh chan error
  617. if active > parallelism {
  618. diff := active - parallelism
  619. errCh = make(chan error, diff)
  620. jm.expectations.ExpectDeletions(jobKey, int(diff))
  621. klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
  622. // Sort the pods in the order such that not-ready < ready, unscheduled
  623. // < scheduled, and pending < running. This ensures that we delete pods
  624. // in the earlier stages whenever possible.
  625. sort.Sort(controller.ActivePods(activePods))
  626. active -= diff
  627. wait := sync.WaitGroup{}
  628. wait.Add(int(diff))
  629. for i := int32(0); i < diff; i++ {
  630. go func(ix int32) {
  631. defer wait.Done()
  632. if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
  633. defer utilruntime.HandleError(err)
  634. // Decrement the expected number of deletes because the informer won't observe this deletion
  635. klog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
  636. jm.expectations.DeletionObserved(jobKey)
  637. activeLock.Lock()
  638. active++
  639. activeLock.Unlock()
  640. errCh <- err
  641. }
  642. }(i)
  643. }
  644. wait.Wait()
  645. } else if active < parallelism {
  646. wantActive := int32(0)
  647. if job.Spec.Completions == nil {
  648. // Job does not specify a number of completions. Therefore, number active
  649. // should be equal to parallelism, unless the job has seen at least
  650. // once success, in which leave whatever is running, running.
  651. if succeeded > 0 {
  652. wantActive = active
  653. } else {
  654. wantActive = parallelism
  655. }
  656. } else {
  657. // Job specifies a specific number of completions. Therefore, number
  658. // active should not ever exceed number of remaining completions.
  659. wantActive = *job.Spec.Completions - succeeded
  660. if wantActive > parallelism {
  661. wantActive = parallelism
  662. }
  663. }
  664. diff := wantActive - active
  665. if diff < 0 {
  666. utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
  667. diff = 0
  668. }
  669. jm.expectations.ExpectCreations(jobKey, int(diff))
  670. errCh = make(chan error, diff)
  671. klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
  672. active += diff
  673. wait := sync.WaitGroup{}
  674. // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  675. // and double with each successful iteration in a kind of "slow start".
  676. // This handles attempts to start large numbers of pods that would
  677. // likely all fail with the same error. For example a project with a
  678. // low quota that attempts to create a large number of pods will be
  679. // prevented from spamming the API service with the pod create requests
  680. // after one of its pods fails. Conveniently, this also prevents the
  681. // event spam that those failures would generate.
  682. for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
  683. errorCount := len(errCh)
  684. wait.Add(int(batchSize))
  685. for i := int32(0); i < batchSize; i++ {
  686. go func() {
  687. defer wait.Done()
  688. err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
  689. if err != nil && errors.IsTimeout(err) {
  690. // Pod is created but its initialization has timed out.
  691. // If the initialization is successful eventually, the
  692. // controller will observe the creation via the informer.
  693. // If the initialization fails, or if the pod keeps
  694. // uninitialized for a long time, the informer will not
  695. // receive any update, and the controller will create a new
  696. // pod when the expectation expires.
  697. return
  698. }
  699. if err != nil {
  700. defer utilruntime.HandleError(err)
  701. // Decrement the expected number of creates because the informer won't observe this pod
  702. klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
  703. jm.expectations.CreationObserved(jobKey)
  704. activeLock.Lock()
  705. active--
  706. activeLock.Unlock()
  707. errCh <- err
  708. }
  709. }()
  710. }
  711. wait.Wait()
  712. // any skipped pods that we never attempted to start shouldn't be expected.
  713. skippedPods := diff - batchSize
  714. if errorCount < len(errCh) && skippedPods > 0 {
  715. klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
  716. active -= skippedPods
  717. for i := int32(0); i < skippedPods; i++ {
  718. // Decrement the expected number of creates because the informer won't observe this pod
  719. jm.expectations.CreationObserved(jobKey)
  720. }
  721. // The skipped pods will be retried later. The next controller resync will
  722. // retry the slow start process.
  723. break
  724. }
  725. diff -= batchSize
  726. }
  727. }
  728. select {
  729. case err := <-errCh:
  730. // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
  731. if err != nil {
  732. return active, err
  733. }
  734. default:
  735. }
  736. return active, nil
  737. }
  738. func (jm *JobController) updateJobStatus(job *batch.Job) error {
  739. jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
  740. var err error
  741. for i := 0; i <= statusUpdateRetries; i = i + 1 {
  742. var newJob *batch.Job
  743. newJob, err = jobClient.Get(job.Name, metav1.GetOptions{})
  744. if err != nil {
  745. break
  746. }
  747. newJob.Status = job.Status
  748. if _, err = jobClient.UpdateStatus(newJob); err == nil {
  749. break
  750. }
  751. }
  752. return err
  753. }
  754. func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
  755. exp := queue.NumRequeues(key)
  756. if exp <= 0 {
  757. return time.Duration(0)
  758. }
  759. // The backoff is capped such that 'calculated' value never overflows.
  760. backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
  761. if backoff > math.MaxInt64 {
  762. return MaxJobBackOff
  763. }
  764. calculated := time.Duration(backoff)
  765. if calculated > MaxJobBackOff {
  766. return MaxJobBackOff
  767. }
  768. return calculated
  769. }
  770. // filterPods returns pods based on their phase.
  771. func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
  772. result := 0
  773. for i := range pods {
  774. if phase == pods[i].Status.Phase {
  775. result++
  776. }
  777. }
  778. return result
  779. }