123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cronjob
- /*
- I did not use watch or expectations. Those add a lot of corner cases, and we aren't
- expecting a large volume of jobs or scheduledJobs. (We are favoring correctness
- over scalability. If we find a single controller thread is too slow because
- there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
- If we find the load on the API server is too high, we can use a watch and
- UndeltaStore.)
- Just periodically list jobs and SJs, and then reconcile them.
- */
- import (
- "context"
- "fmt"
- "sort"
- "time"
- "k8s.io/klog"
- batchv1 "k8s.io/api/batch/v1"
- batchv1beta1 "k8s.io/api/batch/v1beta1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/pager"
- "k8s.io/client-go/tools/record"
- ref "k8s.io/client-go/tools/reference"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- )
- // Utilities for dealing with Jobs and CronJobs and time.
- // controllerKind contains the schema.GroupVersionKind for this controller type.
- var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")
- // Controller is a controller for CronJobs.
- type Controller struct {
- kubeClient clientset.Interface
- jobControl jobControlInterface
- sjControl sjControlInterface
- podControl podControlInterface
- recorder record.EventRecorder
- }
- // NewController creates and initializes a new Controller.
- func NewController(kubeClient clientset.Interface) (*Controller, error) {
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
- return nil, err
- }
- }
- jm := &Controller{
- kubeClient: kubeClient,
- jobControl: realJobControl{KubeClient: kubeClient},
- sjControl: &realSJControl{KubeClient: kubeClient},
- podControl: &realPodControl{KubeClient: kubeClient},
- recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
- }
- return jm, nil
- }
- // Run starts the main goroutine responsible for watching and syncing jobs.
- func (jm *Controller) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- klog.Infof("Starting CronJob Manager")
- // Check things every 10 second.
- go wait.Until(jm.syncAll, 10*time.Second, stopCh)
- <-stopCh
- klog.Infof("Shutting down CronJob Manager")
- }
- // syncAll lists all the CronJobs and Jobs and reconciles them.
- func (jm *Controller) syncAll() {
- // List children (Jobs) before parents (CronJob).
- // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
- // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
- // Note that this only works because we are NOT using any caches here.
- jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
- return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
- }
- js := make([]batchv1.Job, 0)
- err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
- jobTmp, ok := object.(*batchv1.Job)
- if !ok {
- return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
- }
- js = append(js, *jobTmp)
- return nil
- })
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
- return
- }
- klog.V(4).Infof("Found %d jobs", len(js))
- cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
- return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
- }
- jobsBySj := groupJobsByParent(js)
- klog.V(4).Infof("Found %d groups", len(jobsBySj))
- err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
- sj, ok := object.(*batchv1beta1.CronJob)
- if !ok {
- return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", sj)
- }
- syncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
- cleanupFinishedJobs(sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
- return nil
- })
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
- return
- }
- }
- // cleanupFinishedJobs cleanups finished jobs created by a CronJob
- func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
- sjc sjControlInterface, recorder record.EventRecorder) {
- // If neither limits are active, there is no need to do anything.
- if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
- return
- }
- failedJobs := []batchv1.Job{}
- succesfulJobs := []batchv1.Job{}
- for _, job := range js {
- isFinished, finishedStatus := getFinishedStatus(&job)
- if isFinished && finishedStatus == batchv1.JobComplete {
- succesfulJobs = append(succesfulJobs, job)
- } else if isFinished && finishedStatus == batchv1.JobFailed {
- failedJobs = append(failedJobs, job)
- }
- }
- if sj.Spec.SuccessfulJobsHistoryLimit != nil {
- removeOldestJobs(sj,
- succesfulJobs,
- jc,
- *sj.Spec.SuccessfulJobsHistoryLimit,
- recorder)
- }
- if sj.Spec.FailedJobsHistoryLimit != nil {
- removeOldestJobs(sj,
- failedJobs,
- jc,
- *sj.Spec.FailedJobsHistoryLimit,
- recorder)
- }
- // Update the CronJob, in case jobs were removed from the list.
- if _, err := sjc.UpdateStatus(sj); err != nil {
- nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
- klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
- }
- }
- // removeOldestJobs removes the oldest jobs from a list of jobs
- func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
- numToDelete := len(js) - int(maxJobs)
- if numToDelete <= 0 {
- return
- }
- nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
- klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
- sort.Sort(byJobStartTime(js))
- for i := 0; i < numToDelete; i++ {
- klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
- deleteJob(sj, &js[i], jc, recorder)
- }
- }
- // syncOne reconciles a CronJob with a list of any Jobs that it created.
- // All known jobs created by "sj" should be included in "js".
- // The current time is passed in to facilitate testing.
- // It has no receiver, to facilitate testing.
- func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
- nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
- childrenJobs := make(map[types.UID]bool)
- for _, j := range js {
- childrenJobs[j.ObjectMeta.UID] = true
- found := inActiveList(*sj, j.ObjectMeta.UID)
- if !found && !IsJobFinished(&j) {
- recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
- // We found an unfinished job that has us as the parent, but it is not in our Active list.
- // This could happen if we crashed right after creating the Job and before updating the status,
- // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
- // a job that they wanted us to adopt.
- // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
- // stop users from creating jobs if they have permission. It is assumed that if a
- // user has permission to create a job within a namespace, then they have permission to make any scheduledJob
- // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
- // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
- } else if found && IsJobFinished(&j) {
- _, status := getFinishedStatus(&j)
- deleteFromActiveList(sj, j.ObjectMeta.UID)
- recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
- }
- }
- // Remove any job reference from the active list if the corresponding job does not exist any more.
- // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
- // job running.
- for _, j := range sj.Status.Active {
- if found := childrenJobs[j.UID]; !found {
- recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
- deleteFromActiveList(sj, j.UID)
- }
- }
- updatedSJ, err := sjc.UpdateStatus(sj)
- if err != nil {
- klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
- return
- }
- *sj = *updatedSJ
- if sj.DeletionTimestamp != nil {
- // The CronJob is being deleted.
- // Don't do anything other than updating status.
- return
- }
- if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
- klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
- return
- }
- times, err := getRecentUnmetScheduleTimes(*sj, now)
- if err != nil {
- recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
- klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
- return
- }
- // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
- if len(times) == 0 {
- klog.V(4).Infof("No unmet start times for %s", nameForLog)
- return
- }
- if len(times) > 1 {
- klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
- }
- scheduledTime := times[len(times)-1]
- tooLate := false
- if sj.Spec.StartingDeadlineSeconds != nil {
- tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
- }
- if tooLate {
- klog.V(4).Infof("Missed starting window for %s", nameForLog)
- recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
- // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
- // the miss every cycle. In order to avoid sending multiple events, and to avoid processing
- // the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
- // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
- // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
- // and event the next time we process it, and also so the user looking at the status
- // can see easily that there was a missed execution.
- return
- }
- if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
- // Regardless which source of information we use for the set of active jobs,
- // there is some risk that we won't see an active job when there is one.
- // (because we haven't seen the status update to the SJ or the created pod).
- // So it is theoretically possible to have concurrency with Forbid.
- // As long the as the invocations are "far enough apart in time", this usually won't happen.
- //
- // TODO: for Forbid, we could use the same name for every execution, as a lock.
- // With replace, we could use a name that is deterministic per execution time.
- // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
- klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
- return
- }
- if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
- for _, j := range sj.Status.Active {
- klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
- job, err := jc.GetJob(j.Namespace, j.Name)
- if err != nil {
- recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
- return
- }
- if !deleteJob(sj, job, jc, recorder) {
- return
- }
- }
- }
- jobReq, err := getJobFromTemplate(sj, scheduledTime)
- if err != nil {
- klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
- return
- }
- jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
- if err != nil {
- // If the namespace is being torn down, we can safely ignore
- // this error since all subsequent creations will fail.
- if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
- recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
- }
- return
- }
- klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
- recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
- // ------------------------------------------------------------------ //
- // If this process restarts at this point (after posting a job, but
- // before updating the status), then we might try to start the job on
- // the next time. Actually, if we re-list the SJs and Jobs on the next
- // iteration of syncAll, we might not see our own status update, and
- // then post one again. So, we need to use the job name as a lock to
- // prevent us from making the job twice (name the job with hash of its
- // scheduled time).
- // Add the just-started job to the status list.
- ref, err := getRef(jobResp)
- if err != nil {
- klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
- } else {
- sj.Status.Active = append(sj.Status.Active, *ref)
- }
- sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
- if _, err := sjc.UpdateStatus(sj); err != nil {
- klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
- }
- return
- }
- // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
- func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
- nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
- // delete the job itself...
- if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
- recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
- klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
- return false
- }
- // ... and its reference from active list
- deleteFromActiveList(sj, job.ObjectMeta.UID)
- recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
- return true
- }
- func getRef(object runtime.Object) (*v1.ObjectReference, error) {
- return ref.GetReference(scheme.Scheme, object)
- }
|