pod_workers.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "fmt"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/client-go/tools/record"
  25. "k8s.io/klog"
  26. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  27. "k8s.io/kubernetes/pkg/kubelet/events"
  28. "k8s.io/kubernetes/pkg/kubelet/eviction"
  29. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  30. "k8s.io/kubernetes/pkg/kubelet/util/format"
  31. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  32. )
  33. // OnCompleteFunc is a function that is invoked when an operation completes.
  34. // If err is non-nil, the operation did not complete successfully.
  35. type OnCompleteFunc func(err error)
  36. // PodStatusFunc is a function that is invoked to generate a pod status.
  37. type PodStatusFunc func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus
  38. // KillPodOptions are options when performing a pod update whose update type is kill.
  39. type KillPodOptions struct {
  40. // PodStatusFunc is the function to invoke to set pod status in response to a kill request.
  41. PodStatusFunc PodStatusFunc
  42. // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
  43. PodTerminationGracePeriodSecondsOverride *int64
  44. }
  45. // UpdatePodOptions is an options struct to pass to a UpdatePod operation.
  46. type UpdatePodOptions struct {
  47. // pod to update
  48. Pod *v1.Pod
  49. // the mirror pod for the pod to update, if it is a static pod
  50. MirrorPod *v1.Pod
  51. // the type of update (create, update, sync, kill)
  52. UpdateType kubetypes.SyncPodType
  53. // optional callback function when operation completes
  54. // this callback is not guaranteed to be completed since a pod worker may
  55. // drop update requests if it was fulfilling a previous request. this is
  56. // only guaranteed to be invoked in response to a kill pod request which is
  57. // always delivered.
  58. OnCompleteFunc OnCompleteFunc
  59. // if update type is kill, use the specified options to kill the pod.
  60. KillPodOptions *KillPodOptions
  61. }
  62. // PodWorkers is an abstract interface for testability.
  63. type PodWorkers interface {
  64. UpdatePod(options *UpdatePodOptions)
  65. ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty)
  66. ForgetWorker(uid types.UID)
  67. }
  68. // syncPodOptions provides the arguments to a SyncPod operation.
  69. type syncPodOptions struct {
  70. // the mirror pod for the pod to sync, if it is a static pod
  71. mirrorPod *v1.Pod
  72. // pod to sync
  73. pod *v1.Pod
  74. // the type of update (create, update, sync)
  75. updateType kubetypes.SyncPodType
  76. // the current status
  77. podStatus *kubecontainer.PodStatus
  78. // if update type is kill, use the specified options to kill the pod.
  79. killPodOptions *KillPodOptions
  80. }
  81. // the function to invoke to perform a sync.
  82. type syncPodFnType func(options syncPodOptions) error
  83. const (
  84. // jitter factor for resyncInterval
  85. workerResyncIntervalJitterFactor = 0.5
  86. // jitter factor for backOffPeriod and backOffOnTransientErrorPeriod
  87. workerBackOffPeriodJitterFactor = 0.5
  88. // backoff period when transient error occurred.
  89. backOffOnTransientErrorPeriod = time.Second
  90. )
  91. type podWorkers struct {
  92. // Protects all per worker fields.
  93. podLock sync.Mutex
  94. // Tracks all running per-pod goroutines - per-pod goroutine will be
  95. // processing updates received through its corresponding channel.
  96. podUpdates map[types.UID]chan UpdatePodOptions
  97. // Track the current state of per-pod goroutines.
  98. // Currently all update request for a given pod coming when another
  99. // update of this pod is being processed are ignored.
  100. isWorking map[types.UID]bool
  101. // Tracks the last undelivered work item for this pod - a work item is
  102. // undelivered if it comes in while the worker is working.
  103. lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
  104. workQueue queue.WorkQueue
  105. // This function is run to sync the desired stated of pod.
  106. // NOTE: This function has to be thread-safe - it can be called for
  107. // different pods at the same time.
  108. syncPodFn syncPodFnType
  109. // The EventRecorder to use
  110. recorder record.EventRecorder
  111. // backOffPeriod is the duration to back off when there is a sync error.
  112. backOffPeriod time.Duration
  113. // resyncInterval is the duration to wait until the next sync.
  114. resyncInterval time.Duration
  115. // podCache stores kubecontainer.PodStatus for all pods.
  116. podCache kubecontainer.Cache
  117. }
  118. func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
  119. resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
  120. return &podWorkers{
  121. podUpdates: map[types.UID]chan UpdatePodOptions{},
  122. isWorking: map[types.UID]bool{},
  123. lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
  124. syncPodFn: syncPodFn,
  125. recorder: recorder,
  126. workQueue: workQueue,
  127. resyncInterval: resyncInterval,
  128. backOffPeriod: backOffPeriod,
  129. podCache: podCache,
  130. }
  131. }
  132. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
  133. var lastSyncTime time.Time
  134. for update := range podUpdates {
  135. err := func() error {
  136. podUID := update.Pod.UID
  137. // This is a blocking call that would return only if the cache
  138. // has an entry for the pod that is newer than minRuntimeCache
  139. // Time. This ensures the worker doesn't start syncing until
  140. // after the cache is at least newer than the finished time of
  141. // the previous sync.
  142. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
  143. if err != nil {
  144. // This is the legacy event thrown by manage pod loop
  145. // all other events are now dispatched from syncPodFn
  146. p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
  147. return err
  148. }
  149. err = p.syncPodFn(syncPodOptions{
  150. mirrorPod: update.MirrorPod,
  151. pod: update.Pod,
  152. podStatus: status,
  153. killPodOptions: update.KillPodOptions,
  154. updateType: update.UpdateType,
  155. })
  156. lastSyncTime = time.Now()
  157. return err
  158. }()
  159. // notify the call-back function if the operation succeeded or not
  160. if update.OnCompleteFunc != nil {
  161. update.OnCompleteFunc(err)
  162. }
  163. if err != nil {
  164. // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
  165. klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
  166. }
  167. p.wrapUp(update.Pod.UID, err)
  168. }
  169. }
  170. // Apply the new setting to the specified pod.
  171. // If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
  172. // Update requests are ignored if a kill pod request is pending.
  173. func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  174. pod := options.Pod
  175. uid := pod.UID
  176. var podUpdates chan UpdatePodOptions
  177. var exists bool
  178. p.podLock.Lock()
  179. defer p.podLock.Unlock()
  180. if podUpdates, exists = p.podUpdates[uid]; !exists {
  181. // We need to have a buffer here, because checkForUpdates() method that
  182. // puts an update into channel is called from the same goroutine where
  183. // the channel is consumed. However, it is guaranteed that in such case
  184. // the channel is empty, so buffer of size 1 is enough.
  185. podUpdates = make(chan UpdatePodOptions, 1)
  186. p.podUpdates[uid] = podUpdates
  187. // Creating a new pod worker either means this is a new pod, or that the
  188. // kubelet just restarted. In either case the kubelet is willing to believe
  189. // the status of the pod for the first pod worker sync. See corresponding
  190. // comment in syncPod.
  191. go func() {
  192. defer runtime.HandleCrash()
  193. p.managePodLoop(podUpdates)
  194. }()
  195. }
  196. if !p.isWorking[pod.UID] {
  197. p.isWorking[pod.UID] = true
  198. podUpdates <- *options
  199. } else {
  200. // if a request to kill a pod is pending, we do not let anything overwrite that request.
  201. update, found := p.lastUndeliveredWorkUpdate[pod.UID]
  202. if !found || update.UpdateType != kubetypes.SyncPodKill {
  203. p.lastUndeliveredWorkUpdate[pod.UID] = *options
  204. }
  205. }
  206. }
  207. func (p *podWorkers) removeWorker(uid types.UID) {
  208. if ch, ok := p.podUpdates[uid]; ok {
  209. close(ch)
  210. delete(p.podUpdates, uid)
  211. // If there is an undelivered work update for this pod we need to remove it
  212. // since per-pod goroutine won't be able to put it to the already closed
  213. // channel when it finishes processing the current work update.
  214. delete(p.lastUndeliveredWorkUpdate, uid)
  215. }
  216. }
  217. func (p *podWorkers) ForgetWorker(uid types.UID) {
  218. p.podLock.Lock()
  219. defer p.podLock.Unlock()
  220. p.removeWorker(uid)
  221. }
  222. func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) {
  223. p.podLock.Lock()
  224. defer p.podLock.Unlock()
  225. for key := range p.podUpdates {
  226. if _, exists := desiredPods[key]; !exists {
  227. p.removeWorker(key)
  228. }
  229. }
  230. }
  231. func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
  232. // Requeue the last update if the last sync returned error.
  233. switch {
  234. case syncErr == nil:
  235. // No error; requeue at the regular resync interval.
  236. p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
  237. case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
  238. // Network is not ready; back off for short period of time and retry as network might be ready soon.
  239. p.workQueue.Enqueue(uid, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
  240. default:
  241. // Error occurred during the sync; back off and then retry.
  242. p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
  243. }
  244. p.checkForUpdates(uid)
  245. }
  246. func (p *podWorkers) checkForUpdates(uid types.UID) {
  247. p.podLock.Lock()
  248. defer p.podLock.Unlock()
  249. if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
  250. p.podUpdates[uid] <- workUpdate
  251. delete(p.lastUndeliveredWorkUpdate, uid)
  252. } else {
  253. p.isWorking[uid] = false
  254. }
  255. }
  256. // killPodNow returns a KillPodFunc that can be used to kill a pod.
  257. // It is intended to be injected into other modules that need to kill a pod.
  258. func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
  259. return func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
  260. // determine the grace period to use when killing the pod
  261. gracePeriod := int64(0)
  262. if gracePeriodOverride != nil {
  263. gracePeriod = *gracePeriodOverride
  264. } else if pod.Spec.TerminationGracePeriodSeconds != nil {
  265. gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  266. }
  267. // we timeout and return an error if we don't get a callback within a reasonable time.
  268. // the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
  269. timeout := int64(gracePeriod + (gracePeriod / 2))
  270. minTimeout := int64(10)
  271. if timeout < minTimeout {
  272. timeout = minTimeout
  273. }
  274. timeoutDuration := time.Duration(timeout) * time.Second
  275. // open a channel we block against until we get a result
  276. type response struct {
  277. err error
  278. }
  279. ch := make(chan response, 1)
  280. podWorkers.UpdatePod(&UpdatePodOptions{
  281. Pod: pod,
  282. UpdateType: kubetypes.SyncPodKill,
  283. OnCompleteFunc: func(err error) {
  284. ch <- response{err: err}
  285. },
  286. KillPodOptions: &KillPodOptions{
  287. PodStatusFunc: func(p *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
  288. return status
  289. },
  290. PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
  291. },
  292. })
  293. // wait for either a response, or a timeout
  294. select {
  295. case r := <-ch:
  296. return r.err
  297. case <-time.After(timeoutDuration):
  298. recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
  299. return fmt.Errorf("timeout waiting to kill pod")
  300. }
  301. }
  302. }