123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package ttlafterfinished
- import (
- "context"
- "fmt"
- "time"
- "k8s.io/klog"
- batch "k8s.io/api/batch/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/clock"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- batchinformers "k8s.io/client-go/informers/batch/v1"
- clientset "k8s.io/client-go/kubernetes"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- batchlisters "k8s.io/client-go/listers/batch/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- "k8s.io/kubectl/pkg/scheme"
- "k8s.io/kubernetes/pkg/controller"
- jobutil "k8s.io/kubernetes/pkg/controller/job"
- )
- // Controller watches for changes of Jobs API objects. Triggered by Job creation
- // and updates, it enqueues Jobs that have non-nil `.spec.ttlSecondsAfterFinished`
- // to the `queue`. The Controller has workers who consume `queue`, check whether
- // the Job TTL has expired or not; if the Job TTL hasn't expired, it will add the
- // Job to the queue after the TTL is expected to expire; if the TTL has expired, the
- // worker will send requests to the API server to delete the Jobs accordingly.
- // This is implemented outside of Job controller for separation of concerns, and
- // because it will be extended to handle other finishable resource types.
- type Controller struct {
- client clientset.Interface
- recorder record.EventRecorder
- // jLister can list/get Jobs from the shared informer's store
- jLister batchlisters.JobLister
- // jStoreSynced returns true if the Job store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- jListerSynced cache.InformerSynced
- // Jobs that the controller will check its TTL and attempt to delete when the TTL expires.
- queue workqueue.RateLimitingInterface
- // The clock for tracking time
- clock clock.Clock
- }
- // New creates an instance of Controller
- func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
- if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
- ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
- }
- tc := &Controller{
- client: client,
- recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
- }
- jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: tc.addJob,
- UpdateFunc: tc.updateJob,
- })
- tc.jLister = jobInformer.Lister()
- tc.jListerSynced = jobInformer.Informer().HasSynced
- tc.clock = clock.RealClock{}
- return tc
- }
- // Run starts the workers to clean up Jobs.
- func (tc *Controller) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer tc.queue.ShutDown()
- klog.Infof("Starting TTL after finished controller")
- defer klog.Infof("Shutting down TTL after finished controller")
- if !cache.WaitForNamedCacheSync("TTL after finished", stopCh, tc.jListerSynced) {
- return
- }
- for i := 0; i < workers; i++ {
- go wait.Until(tc.worker, time.Second, stopCh)
- }
- <-stopCh
- }
- func (tc *Controller) addJob(obj interface{}) {
- job := obj.(*batch.Job)
- klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
- if job.DeletionTimestamp == nil && needsCleanup(job) {
- tc.enqueue(job)
- }
- }
- func (tc *Controller) updateJob(old, cur interface{}) {
- job := cur.(*batch.Job)
- klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
- if job.DeletionTimestamp == nil && needsCleanup(job) {
- tc.enqueue(job)
- }
- }
- func (tc *Controller) enqueue(job *batch.Job) {
- klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
- key, err := controller.KeyFunc(job)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
- return
- }
- tc.queue.Add(key)
- }
- func (tc *Controller) enqueueAfter(job *batch.Job, after time.Duration) {
- key, err := controller.KeyFunc(job)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
- return
- }
- tc.queue.AddAfter(key, after)
- }
- func (tc *Controller) worker() {
- for tc.processNextWorkItem() {
- }
- }
- func (tc *Controller) processNextWorkItem() bool {
- key, quit := tc.queue.Get()
- if quit {
- return false
- }
- defer tc.queue.Done(key)
- err := tc.processJob(key.(string))
- tc.handleErr(err, key)
- return true
- }
- func (tc *Controller) handleErr(err error, key interface{}) {
- if err == nil {
- tc.queue.Forget(key)
- return
- }
- utilruntime.HandleError(fmt.Errorf("error cleaning up Job %v, will retry: %v", key, err))
- tc.queue.AddRateLimited(key)
- }
- // processJob will check the Job's state and TTL and delete the Job when it
- // finishes and its TTL after finished has expired. If the Job hasn't finished or
- // its TTL hasn't expired, it will be added to the queue after the TTL is expected
- // to expire.
- // This function is not meant to be invoked concurrently with the same key.
- func (tc *Controller) processJob(key string) error {
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
- // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
- job, err := tc.jLister.Jobs(namespace).Get(name)
- if errors.IsNotFound(err) {
- return nil
- }
- if err != nil {
- return err
- }
- if expired, err := tc.processTTL(job); err != nil {
- return err
- } else if !expired {
- return nil
- }
- // The Job's TTL is assumed to have expired, but the Job TTL might be stale.
- // Before deleting the Job, do a final sanity check.
- // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
- // The latest Job may have a different UID, but it's fine because the checks will be run again.
- fresh, err := tc.client.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
- if errors.IsNotFound(err) {
- return nil
- }
- if err != nil {
- return err
- }
- // Use the latest Job TTL to see if the TTL truly expires.
- if expired, err := tc.processTTL(fresh); err != nil {
- return err
- } else if !expired {
- return nil
- }
- // Cascade deletes the Jobs if TTL truly expires.
- policy := metav1.DeletePropagationForeground
- options := &metav1.DeleteOptions{
- PropagationPolicy: &policy,
- Preconditions: &metav1.Preconditions{UID: &fresh.UID},
- }
- klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
- return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
- }
- // processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
- // if the TTL will expire later.
- func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) {
- // We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
- if job.DeletionTimestamp != nil || !needsCleanup(job) {
- return false, nil
- }
- now := tc.clock.Now()
- t, err := timeLeft(job, &now)
- if err != nil {
- return false, err
- }
- // TTL has expired
- if *t <= 0 {
- return true, nil
- }
- tc.enqueueAfter(job, *t)
- return false, nil
- }
- // needsCleanup checks whether a Job has finished and has a TTL set.
- func needsCleanup(j *batch.Job) bool {
- return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
- }
- func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
- if !needsCleanup(j) {
- return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
- }
- finishAt, err := jobFinishTime(j)
- if err != nil {
- return nil, nil, err
- }
- finishAtUTC := finishAt.UTC()
- expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
- return &finishAtUTC, &expireAtUTC, nil
- }
- func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) {
- finishAt, expireAt, err := getFinishAndExpireTime(j)
- if err != nil {
- return nil, err
- }
- if finishAt.UTC().After(since.UTC()) {
- klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
- }
- remaining := expireAt.UTC().Sub(since.UTC())
- klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
- return &remaining, nil
- }
- // jobFinishTime takes an already finished Job and returns the time it finishes.
- func jobFinishTime(finishedJob *batch.Job) (metav1.Time, error) {
- for _, c := range finishedJob.Status.Conditions {
- if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
- finishAt := c.LastTransitionTime
- if finishAt.IsZero() {
- return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
- }
- return c.LastTransitionTime, nil
- }
- }
- // This should never happen if the Jobs has finished
- return metav1.Time{}, fmt.Errorf("unable to find the status of the finished Job %s/%s", finishedJob.Namespace, finishedJob.Name)
- }
|