ttlafterfinished_controller.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ttlafterfinished
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. "k8s.io/klog"
  19. batch "k8s.io/api/batch/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/util/clock"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. batchinformers "k8s.io/client-go/informers/batch/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  29. batchlisters "k8s.io/client-go/listers/batch/v1"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/tools/record"
  32. "k8s.io/client-go/util/workqueue"
  33. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  34. "k8s.io/kubectl/pkg/scheme"
  35. "k8s.io/kubernetes/pkg/controller"
  36. jobutil "k8s.io/kubernetes/pkg/controller/job"
  37. )
  38. // Controller watches for changes of Jobs API objects. Triggered by Job creation
  39. // and updates, it enqueues Jobs that have non-nil `.spec.ttlSecondsAfterFinished`
  40. // to the `queue`. The Controller has workers who consume `queue`, check whether
  41. // the Job TTL has expired or not; if the Job TTL hasn't expired, it will add the
  42. // Job to the queue after the TTL is expected to expire; if the TTL has expired, the
  43. // worker will send requests to the API server to delete the Jobs accordingly.
  44. // This is implemented outside of Job controller for separation of concerns, and
  45. // because it will be extended to handle other finishable resource types.
  46. type Controller struct {
  47. client clientset.Interface
  48. recorder record.EventRecorder
  49. // jLister can list/get Jobs from the shared informer's store
  50. jLister batchlisters.JobLister
  51. // jStoreSynced returns true if the Job store has been synced at least once.
  52. // Added as a member to the struct to allow injection for testing.
  53. jListerSynced cache.InformerSynced
  54. // Jobs that the controller will check its TTL and attempt to delete when the TTL expires.
  55. queue workqueue.RateLimitingInterface
  56. // The clock for tracking time
  57. clock clock.Clock
  58. }
  59. // New creates an instance of Controller
  60. func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
  61. eventBroadcaster := record.NewBroadcaster()
  62. eventBroadcaster.StartLogging(klog.Infof)
  63. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  64. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  65. ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
  66. }
  67. tc := &Controller{
  68. client: client,
  69. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
  70. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
  71. }
  72. jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  73. AddFunc: tc.addJob,
  74. UpdateFunc: tc.updateJob,
  75. })
  76. tc.jLister = jobInformer.Lister()
  77. tc.jListerSynced = jobInformer.Informer().HasSynced
  78. tc.clock = clock.RealClock{}
  79. return tc
  80. }
  81. // Run starts the workers to clean up Jobs.
  82. func (tc *Controller) Run(workers int, stopCh <-chan struct{}) {
  83. defer utilruntime.HandleCrash()
  84. defer tc.queue.ShutDown()
  85. klog.Infof("Starting TTL after finished controller")
  86. defer klog.Infof("Shutting down TTL after finished controller")
  87. if !cache.WaitForNamedCacheSync("TTL after finished", stopCh, tc.jListerSynced) {
  88. return
  89. }
  90. for i := 0; i < workers; i++ {
  91. go wait.Until(tc.worker, time.Second, stopCh)
  92. }
  93. <-stopCh
  94. }
  95. func (tc *Controller) addJob(obj interface{}) {
  96. job := obj.(*batch.Job)
  97. klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
  98. if job.DeletionTimestamp == nil && needsCleanup(job) {
  99. tc.enqueue(job)
  100. }
  101. }
  102. func (tc *Controller) updateJob(old, cur interface{}) {
  103. job := cur.(*batch.Job)
  104. klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
  105. if job.DeletionTimestamp == nil && needsCleanup(job) {
  106. tc.enqueue(job)
  107. }
  108. }
  109. func (tc *Controller) enqueue(job *batch.Job) {
  110. klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
  111. key, err := controller.KeyFunc(job)
  112. if err != nil {
  113. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
  114. return
  115. }
  116. tc.queue.Add(key)
  117. }
  118. func (tc *Controller) enqueueAfter(job *batch.Job, after time.Duration) {
  119. key, err := controller.KeyFunc(job)
  120. if err != nil {
  121. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
  122. return
  123. }
  124. tc.queue.AddAfter(key, after)
  125. }
  126. func (tc *Controller) worker() {
  127. for tc.processNextWorkItem() {
  128. }
  129. }
  130. func (tc *Controller) processNextWorkItem() bool {
  131. key, quit := tc.queue.Get()
  132. if quit {
  133. return false
  134. }
  135. defer tc.queue.Done(key)
  136. err := tc.processJob(key.(string))
  137. tc.handleErr(err, key)
  138. return true
  139. }
  140. func (tc *Controller) handleErr(err error, key interface{}) {
  141. if err == nil {
  142. tc.queue.Forget(key)
  143. return
  144. }
  145. utilruntime.HandleError(fmt.Errorf("error cleaning up Job %v, will retry: %v", key, err))
  146. tc.queue.AddRateLimited(key)
  147. }
  148. // processJob will check the Job's state and TTL and delete the Job when it
  149. // finishes and its TTL after finished has expired. If the Job hasn't finished or
  150. // its TTL hasn't expired, it will be added to the queue after the TTL is expected
  151. // to expire.
  152. // This function is not meant to be invoked concurrently with the same key.
  153. func (tc *Controller) processJob(key string) error {
  154. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  155. if err != nil {
  156. return err
  157. }
  158. klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
  159. // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
  160. job, err := tc.jLister.Jobs(namespace).Get(name)
  161. if errors.IsNotFound(err) {
  162. return nil
  163. }
  164. if err != nil {
  165. return err
  166. }
  167. if expired, err := tc.processTTL(job); err != nil {
  168. return err
  169. } else if !expired {
  170. return nil
  171. }
  172. // The Job's TTL is assumed to have expired, but the Job TTL might be stale.
  173. // Before deleting the Job, do a final sanity check.
  174. // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
  175. // The latest Job may have a different UID, but it's fine because the checks will be run again.
  176. fresh, err := tc.client.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
  177. if errors.IsNotFound(err) {
  178. return nil
  179. }
  180. if err != nil {
  181. return err
  182. }
  183. // Use the latest Job TTL to see if the TTL truly expires.
  184. if expired, err := tc.processTTL(fresh); err != nil {
  185. return err
  186. } else if !expired {
  187. return nil
  188. }
  189. // Cascade deletes the Jobs if TTL truly expires.
  190. policy := metav1.DeletePropagationForeground
  191. options := &metav1.DeleteOptions{
  192. PropagationPolicy: &policy,
  193. Preconditions: &metav1.Preconditions{UID: &fresh.UID},
  194. }
  195. klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
  196. return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
  197. }
  198. // processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
  199. // if the TTL will expire later.
  200. func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) {
  201. // We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
  202. if job.DeletionTimestamp != nil || !needsCleanup(job) {
  203. return false, nil
  204. }
  205. now := tc.clock.Now()
  206. t, err := timeLeft(job, &now)
  207. if err != nil {
  208. return false, err
  209. }
  210. // TTL has expired
  211. if *t <= 0 {
  212. return true, nil
  213. }
  214. tc.enqueueAfter(job, *t)
  215. return false, nil
  216. }
  217. // needsCleanup checks whether a Job has finished and has a TTL set.
  218. func needsCleanup(j *batch.Job) bool {
  219. return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
  220. }
  221. func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
  222. if !needsCleanup(j) {
  223. return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
  224. }
  225. finishAt, err := jobFinishTime(j)
  226. if err != nil {
  227. return nil, nil, err
  228. }
  229. finishAtUTC := finishAt.UTC()
  230. expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
  231. return &finishAtUTC, &expireAtUTC, nil
  232. }
  233. func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) {
  234. finishAt, expireAt, err := getFinishAndExpireTime(j)
  235. if err != nil {
  236. return nil, err
  237. }
  238. if finishAt.UTC().After(since.UTC()) {
  239. klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
  240. }
  241. remaining := expireAt.UTC().Sub(since.UTC())
  242. klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
  243. return &remaining, nil
  244. }
  245. // jobFinishTime takes an already finished Job and returns the time it finishes.
  246. func jobFinishTime(finishedJob *batch.Job) (metav1.Time, error) {
  247. for _, c := range finishedJob.Status.Conditions {
  248. if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
  249. finishAt := c.LastTransitionTime
  250. if finishAt.IsZero() {
  251. return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
  252. }
  253. return c.LastTransitionTime, nil
  254. }
  255. }
  256. // This should never happen if the Jobs has finished
  257. return metav1.Time{}, fmt.Errorf("unable to find the status of the finished Job %s/%s", finishedJob.Namespace, finishedJob.Name)
  258. }