cronjob_controller.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cronjob
  14. /*
  15. I did not use watch or expectations. Those add a lot of corner cases, and we aren't
  16. expecting a large volume of jobs or scheduledJobs. (We are favoring correctness
  17. over scalability. If we find a single controller thread is too slow because
  18. there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
  19. If we find the load on the API server is too high, we can use a watch and
  20. UndeltaStore.)
  21. Just periodically list jobs and SJs, and then reconcile them.
  22. */
  23. import (
  24. "context"
  25. "fmt"
  26. "sort"
  27. "time"
  28. "k8s.io/klog"
  29. batchv1 "k8s.io/api/batch/v1"
  30. batchv1beta1 "k8s.io/api/batch/v1beta1"
  31. "k8s.io/api/core/v1"
  32. "k8s.io/apimachinery/pkg/api/errors"
  33. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  34. "k8s.io/apimachinery/pkg/runtime"
  35. "k8s.io/apimachinery/pkg/types"
  36. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  37. "k8s.io/apimachinery/pkg/util/wait"
  38. clientset "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/kubernetes/scheme"
  40. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  41. "k8s.io/client-go/tools/pager"
  42. "k8s.io/client-go/tools/record"
  43. ref "k8s.io/client-go/tools/reference"
  44. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  45. )
  46. // Utilities for dealing with Jobs and CronJobs and time.
  47. // controllerKind contains the schema.GroupVersionKind for this controller type.
  48. var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")
  49. // Controller is a controller for CronJobs.
  50. type Controller struct {
  51. kubeClient clientset.Interface
  52. jobControl jobControlInterface
  53. sjControl sjControlInterface
  54. podControl podControlInterface
  55. recorder record.EventRecorder
  56. }
  57. // NewController creates and initializes a new Controller.
  58. func NewController(kubeClient clientset.Interface) (*Controller, error) {
  59. eventBroadcaster := record.NewBroadcaster()
  60. eventBroadcaster.StartLogging(klog.Infof)
  61. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  62. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  63. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  64. return nil, err
  65. }
  66. }
  67. jm := &Controller{
  68. kubeClient: kubeClient,
  69. jobControl: realJobControl{KubeClient: kubeClient},
  70. sjControl: &realSJControl{KubeClient: kubeClient},
  71. podControl: &realPodControl{KubeClient: kubeClient},
  72. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
  73. }
  74. return jm, nil
  75. }
  76. // Run starts the main goroutine responsible for watching and syncing jobs.
  77. func (jm *Controller) Run(stopCh <-chan struct{}) {
  78. defer utilruntime.HandleCrash()
  79. klog.Infof("Starting CronJob Manager")
  80. // Check things every 10 second.
  81. go wait.Until(jm.syncAll, 10*time.Second, stopCh)
  82. <-stopCh
  83. klog.Infof("Shutting down CronJob Manager")
  84. }
  85. // syncAll lists all the CronJobs and Jobs and reconciles them.
  86. func (jm *Controller) syncAll() {
  87. // List children (Jobs) before parents (CronJob).
  88. // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
  89. // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
  90. // Note that this only works because we are NOT using any caches here.
  91. jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
  92. return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
  93. }
  94. js := make([]batchv1.Job, 0)
  95. err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
  96. jobTmp, ok := object.(*batchv1.Job)
  97. if !ok {
  98. return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
  99. }
  100. js = append(js, *jobTmp)
  101. return nil
  102. })
  103. if err != nil {
  104. utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
  105. return
  106. }
  107. klog.V(4).Infof("Found %d jobs", len(js))
  108. cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
  109. return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
  110. }
  111. jobsBySj := groupJobsByParent(js)
  112. klog.V(4).Infof("Found %d groups", len(jobsBySj))
  113. err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
  114. sj, ok := object.(*batchv1beta1.CronJob)
  115. if !ok {
  116. return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", sj)
  117. }
  118. syncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
  119. cleanupFinishedJobs(sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
  120. return nil
  121. })
  122. if err != nil {
  123. utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
  124. return
  125. }
  126. }
  127. // cleanupFinishedJobs cleanups finished jobs created by a CronJob
  128. func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
  129. sjc sjControlInterface, recorder record.EventRecorder) {
  130. // If neither limits are active, there is no need to do anything.
  131. if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
  132. return
  133. }
  134. failedJobs := []batchv1.Job{}
  135. succesfulJobs := []batchv1.Job{}
  136. for _, job := range js {
  137. isFinished, finishedStatus := getFinishedStatus(&job)
  138. if isFinished && finishedStatus == batchv1.JobComplete {
  139. succesfulJobs = append(succesfulJobs, job)
  140. } else if isFinished && finishedStatus == batchv1.JobFailed {
  141. failedJobs = append(failedJobs, job)
  142. }
  143. }
  144. if sj.Spec.SuccessfulJobsHistoryLimit != nil {
  145. removeOldestJobs(sj,
  146. succesfulJobs,
  147. jc,
  148. *sj.Spec.SuccessfulJobsHistoryLimit,
  149. recorder)
  150. }
  151. if sj.Spec.FailedJobsHistoryLimit != nil {
  152. removeOldestJobs(sj,
  153. failedJobs,
  154. jc,
  155. *sj.Spec.FailedJobsHistoryLimit,
  156. recorder)
  157. }
  158. // Update the CronJob, in case jobs were removed from the list.
  159. if _, err := sjc.UpdateStatus(sj); err != nil {
  160. nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
  161. klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
  162. }
  163. }
  164. // removeOldestJobs removes the oldest jobs from a list of jobs
  165. func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
  166. numToDelete := len(js) - int(maxJobs)
  167. if numToDelete <= 0 {
  168. return
  169. }
  170. nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
  171. klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
  172. sort.Sort(byJobStartTime(js))
  173. for i := 0; i < numToDelete; i++ {
  174. klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
  175. deleteJob(sj, &js[i], jc, recorder)
  176. }
  177. }
  178. // syncOne reconciles a CronJob with a list of any Jobs that it created.
  179. // All known jobs created by "sj" should be included in "js".
  180. // The current time is passed in to facilitate testing.
  181. // It has no receiver, to facilitate testing.
  182. func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
  183. nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
  184. childrenJobs := make(map[types.UID]bool)
  185. for _, j := range js {
  186. childrenJobs[j.ObjectMeta.UID] = true
  187. found := inActiveList(*sj, j.ObjectMeta.UID)
  188. if !found && !IsJobFinished(&j) {
  189. recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
  190. // We found an unfinished job that has us as the parent, but it is not in our Active list.
  191. // This could happen if we crashed right after creating the Job and before updating the status,
  192. // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
  193. // a job that they wanted us to adopt.
  194. // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
  195. // stop users from creating jobs if they have permission. It is assumed that if a
  196. // user has permission to create a job within a namespace, then they have permission to make any scheduledJob
  197. // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
  198. // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
  199. } else if found && IsJobFinished(&j) {
  200. _, status := getFinishedStatus(&j)
  201. deleteFromActiveList(sj, j.ObjectMeta.UID)
  202. recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
  203. }
  204. }
  205. // Remove any job reference from the active list if the corresponding job does not exist any more.
  206. // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
  207. // job running.
  208. for _, j := range sj.Status.Active {
  209. if found := childrenJobs[j.UID]; !found {
  210. recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
  211. deleteFromActiveList(sj, j.UID)
  212. }
  213. }
  214. updatedSJ, err := sjc.UpdateStatus(sj)
  215. if err != nil {
  216. klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
  217. return
  218. }
  219. *sj = *updatedSJ
  220. if sj.DeletionTimestamp != nil {
  221. // The CronJob is being deleted.
  222. // Don't do anything other than updating status.
  223. return
  224. }
  225. if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
  226. klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
  227. return
  228. }
  229. times, err := getRecentUnmetScheduleTimes(*sj, now)
  230. if err != nil {
  231. recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
  232. klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
  233. return
  234. }
  235. // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
  236. if len(times) == 0 {
  237. klog.V(4).Infof("No unmet start times for %s", nameForLog)
  238. return
  239. }
  240. if len(times) > 1 {
  241. klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
  242. }
  243. scheduledTime := times[len(times)-1]
  244. tooLate := false
  245. if sj.Spec.StartingDeadlineSeconds != nil {
  246. tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
  247. }
  248. if tooLate {
  249. klog.V(4).Infof("Missed starting window for %s", nameForLog)
  250. recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
  251. // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
  252. // the miss every cycle. In order to avoid sending multiple events, and to avoid processing
  253. // the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
  254. // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
  255. // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
  256. // and event the next time we process it, and also so the user looking at the status
  257. // can see easily that there was a missed execution.
  258. return
  259. }
  260. if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
  261. // Regardless which source of information we use for the set of active jobs,
  262. // there is some risk that we won't see an active job when there is one.
  263. // (because we haven't seen the status update to the SJ or the created pod).
  264. // So it is theoretically possible to have concurrency with Forbid.
  265. // As long the as the invocations are "far enough apart in time", this usually won't happen.
  266. //
  267. // TODO: for Forbid, we could use the same name for every execution, as a lock.
  268. // With replace, we could use a name that is deterministic per execution time.
  269. // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
  270. klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
  271. return
  272. }
  273. if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
  274. for _, j := range sj.Status.Active {
  275. klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
  276. job, err := jc.GetJob(j.Namespace, j.Name)
  277. if err != nil {
  278. recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
  279. return
  280. }
  281. if !deleteJob(sj, job, jc, recorder) {
  282. return
  283. }
  284. }
  285. }
  286. jobReq, err := getJobFromTemplate(sj, scheduledTime)
  287. if err != nil {
  288. klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
  289. return
  290. }
  291. jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
  292. if err != nil {
  293. // If the namespace is being torn down, we can safely ignore
  294. // this error since all subsequent creations will fail.
  295. if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  296. recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
  297. }
  298. return
  299. }
  300. klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
  301. recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
  302. // ------------------------------------------------------------------ //
  303. // If this process restarts at this point (after posting a job, but
  304. // before updating the status), then we might try to start the job on
  305. // the next time. Actually, if we re-list the SJs and Jobs on the next
  306. // iteration of syncAll, we might not see our own status update, and
  307. // then post one again. So, we need to use the job name as a lock to
  308. // prevent us from making the job twice (name the job with hash of its
  309. // scheduled time).
  310. // Add the just-started job to the status list.
  311. ref, err := getRef(jobResp)
  312. if err != nil {
  313. klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
  314. } else {
  315. sj.Status.Active = append(sj.Status.Active, *ref)
  316. }
  317. sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
  318. if _, err := sjc.UpdateStatus(sj); err != nil {
  319. klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
  320. }
  321. return
  322. }
  323. // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
  324. func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
  325. nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
  326. // delete the job itself...
  327. if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
  328. recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
  329. klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
  330. return false
  331. }
  332. // ... and its reference from active list
  333. deleteFromActiveList(sj, job.ObjectMeta.UID)
  334. recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
  335. return true
  336. }
  337. func getRef(object runtime.Object) (*v1.ObjectReference, error) {
  338. return ref.GetReference(scheme.Scheme, object)
  339. }