pod_workers.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "fmt"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. "k8s.io/client-go/tools/record"
  24. "k8s.io/klog"
  25. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  26. "k8s.io/kubernetes/pkg/kubelet/events"
  27. "k8s.io/kubernetes/pkg/kubelet/eviction"
  28. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  29. "k8s.io/kubernetes/pkg/kubelet/util/format"
  30. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  31. )
  32. // OnCompleteFunc is a function that is invoked when an operation completes.
  33. // If err is non-nil, the operation did not complete successfully.
  34. type OnCompleteFunc func(err error)
  35. // PodStatusFunc is a function that is invoked to generate a pod status.
  36. type PodStatusFunc func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus
  37. // KillPodOptions are options when performing a pod update whose update type is kill.
  38. type KillPodOptions struct {
  39. // PodStatusFunc is the function to invoke to set pod status in response to a kill request.
  40. PodStatusFunc PodStatusFunc
  41. // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
  42. PodTerminationGracePeriodSecondsOverride *int64
  43. }
  44. // UpdatePodOptions is an options struct to pass to a UpdatePod operation.
  45. type UpdatePodOptions struct {
  46. // pod to update
  47. Pod *v1.Pod
  48. // the mirror pod for the pod to update, if it is a static pod
  49. MirrorPod *v1.Pod
  50. // the type of update (create, update, sync, kill)
  51. UpdateType kubetypes.SyncPodType
  52. // optional callback function when operation completes
  53. // this callback is not guaranteed to be completed since a pod worker may
  54. // drop update requests if it was fulfilling a previous request. this is
  55. // only guaranteed to be invoked in response to a kill pod request which is
  56. // always delivered.
  57. OnCompleteFunc OnCompleteFunc
  58. // if update type is kill, use the specified options to kill the pod.
  59. KillPodOptions *KillPodOptions
  60. }
  61. // PodWorkers is an abstract interface for testability.
  62. type PodWorkers interface {
  63. UpdatePod(options *UpdatePodOptions)
  64. ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
  65. ForgetWorker(uid types.UID)
  66. }
  67. // syncPodOptions provides the arguments to a SyncPod operation.
  68. type syncPodOptions struct {
  69. // the mirror pod for the pod to sync, if it is a static pod
  70. mirrorPod *v1.Pod
  71. // pod to sync
  72. pod *v1.Pod
  73. // the type of update (create, update, sync)
  74. updateType kubetypes.SyncPodType
  75. // the current status
  76. podStatus *kubecontainer.PodStatus
  77. // if update type is kill, use the specified options to kill the pod.
  78. killPodOptions *KillPodOptions
  79. }
  80. // the function to invoke to perform a sync.
  81. type syncPodFnType func(options syncPodOptions) error
  82. const (
  83. // jitter factor for resyncInterval
  84. workerResyncIntervalJitterFactor = 0.5
  85. // jitter factor for backOffPeriod and backOffOnTransientErrorPeriod
  86. workerBackOffPeriodJitterFactor = 0.5
  87. // backoff period when transient error occurred.
  88. backOffOnTransientErrorPeriod = time.Second
  89. )
  90. type podWorkers struct {
  91. // Protects all per worker fields.
  92. podLock sync.Mutex
  93. // Tracks all running per-pod goroutines - per-pod goroutine will be
  94. // processing updates received through its corresponding channel.
  95. podUpdates map[types.UID]chan UpdatePodOptions
  96. // Track the current state of per-pod goroutines.
  97. // Currently all update request for a given pod coming when another
  98. // update of this pod is being processed are ignored.
  99. isWorking map[types.UID]bool
  100. // Tracks the last undelivered work item for this pod - a work item is
  101. // undelivered if it comes in while the worker is working.
  102. lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
  103. workQueue queue.WorkQueue
  104. // This function is run to sync the desired stated of pod.
  105. // NOTE: This function has to be thread-safe - it can be called for
  106. // different pods at the same time.
  107. syncPodFn syncPodFnType
  108. // The EventRecorder to use
  109. recorder record.EventRecorder
  110. // backOffPeriod is the duration to back off when there is a sync error.
  111. backOffPeriod time.Duration
  112. // resyncInterval is the duration to wait until the next sync.
  113. resyncInterval time.Duration
  114. // podCache stores kubecontainer.PodStatus for all pods.
  115. podCache kubecontainer.Cache
  116. }
  117. func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
  118. resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
  119. return &podWorkers{
  120. podUpdates: map[types.UID]chan UpdatePodOptions{},
  121. isWorking: map[types.UID]bool{},
  122. lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
  123. syncPodFn: syncPodFn,
  124. recorder: recorder,
  125. workQueue: workQueue,
  126. resyncInterval: resyncInterval,
  127. backOffPeriod: backOffPeriod,
  128. podCache: podCache,
  129. }
  130. }
  131. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
  132. var lastSyncTime time.Time
  133. for update := range podUpdates {
  134. err := func() error {
  135. podUID := update.Pod.UID
  136. // This is a blocking call that would return only if the cache
  137. // has an entry for the pod that is newer than minRuntimeCache
  138. // Time. This ensures the worker doesn't start syncing until
  139. // after the cache is at least newer than the finished time of
  140. // the previous sync.
  141. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
  142. if err != nil {
  143. // This is the legacy event thrown by manage pod loop
  144. // all other events are now dispatched from syncPodFn
  145. p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
  146. return err
  147. }
  148. err = p.syncPodFn(syncPodOptions{
  149. mirrorPod: update.MirrorPod,
  150. pod: update.Pod,
  151. podStatus: status,
  152. killPodOptions: update.KillPodOptions,
  153. updateType: update.UpdateType,
  154. })
  155. lastSyncTime = time.Now()
  156. return err
  157. }()
  158. // notify the call-back function if the operation succeeded or not
  159. if update.OnCompleteFunc != nil {
  160. update.OnCompleteFunc(err)
  161. }
  162. if err != nil {
  163. // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
  164. klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
  165. }
  166. p.wrapUp(update.Pod.UID, err)
  167. }
  168. }
  169. // Apply the new setting to the specified pod.
  170. // If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
  171. // Update requests are ignored if a kill pod request is pending.
  172. func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  173. pod := options.Pod
  174. uid := pod.UID
  175. var podUpdates chan UpdatePodOptions
  176. var exists bool
  177. p.podLock.Lock()
  178. defer p.podLock.Unlock()
  179. if podUpdates, exists = p.podUpdates[uid]; !exists {
  180. // We need to have a buffer here, because checkForUpdates() method that
  181. // puts an update into channel is called from the same goroutine where
  182. // the channel is consumed. However, it is guaranteed that in such case
  183. // the channel is empty, so buffer of size 1 is enough.
  184. podUpdates = make(chan UpdatePodOptions, 1)
  185. p.podUpdates[uid] = podUpdates
  186. // Creating a new pod worker either means this is a new pod, or that the
  187. // kubelet just restarted. In either case the kubelet is willing to believe
  188. // the status of the pod for the first pod worker sync. See corresponding
  189. // comment in syncPod.
  190. go func() {
  191. defer runtime.HandleCrash()
  192. p.managePodLoop(podUpdates)
  193. }()
  194. }
  195. if !p.isWorking[pod.UID] {
  196. p.isWorking[pod.UID] = true
  197. podUpdates <- *options
  198. } else {
  199. // if a request to kill a pod is pending, we do not let anything overwrite that request.
  200. update, found := p.lastUndeliveredWorkUpdate[pod.UID]
  201. if !found || update.UpdateType != kubetypes.SyncPodKill {
  202. p.lastUndeliveredWorkUpdate[pod.UID] = *options
  203. }
  204. }
  205. }
  206. func (p *podWorkers) removeWorker(uid types.UID) {
  207. if ch, ok := p.podUpdates[uid]; ok {
  208. close(ch)
  209. delete(p.podUpdates, uid)
  210. // If there is an undelivered work update for this pod we need to remove it
  211. // since per-pod goroutine won't be able to put it to the already closed
  212. // channel when it finishes processing the current work update.
  213. if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
  214. delete(p.lastUndeliveredWorkUpdate, uid)
  215. }
  216. }
  217. }
  218. func (p *podWorkers) ForgetWorker(uid types.UID) {
  219. p.podLock.Lock()
  220. defer p.podLock.Unlock()
  221. p.removeWorker(uid)
  222. }
  223. func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
  224. p.podLock.Lock()
  225. defer p.podLock.Unlock()
  226. for key := range p.podUpdates {
  227. if _, exists := desiredPods[key]; !exists {
  228. p.removeWorker(key)
  229. }
  230. }
  231. }
  232. func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
  233. // Requeue the last update if the last sync returned error.
  234. switch {
  235. case syncErr == nil:
  236. // No error; requeue at the regular resync interval.
  237. p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
  238. case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
  239. // Network is not ready; back off for short period of time and retry as network might be ready soon.
  240. p.workQueue.Enqueue(uid, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
  241. default:
  242. // Error occurred during the sync; back off and then retry.
  243. p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
  244. }
  245. p.checkForUpdates(uid)
  246. }
  247. func (p *podWorkers) checkForUpdates(uid types.UID) {
  248. p.podLock.Lock()
  249. defer p.podLock.Unlock()
  250. if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
  251. p.podUpdates[uid] <- workUpdate
  252. delete(p.lastUndeliveredWorkUpdate, uid)
  253. } else {
  254. p.isWorking[uid] = false
  255. }
  256. }
  257. // killPodNow returns a KillPodFunc that can be used to kill a pod.
  258. // It is intended to be injected into other modules that need to kill a pod.
  259. func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
  260. return func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
  261. // determine the grace period to use when killing the pod
  262. gracePeriod := int64(0)
  263. if gracePeriodOverride != nil {
  264. gracePeriod = *gracePeriodOverride
  265. } else if pod.Spec.TerminationGracePeriodSeconds != nil {
  266. gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  267. }
  268. // we timeout and return an error if we don't get a callback within a reasonable time.
  269. // the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
  270. timeout := int64(gracePeriod + (gracePeriod / 2))
  271. minTimeout := int64(10)
  272. if timeout < minTimeout {
  273. timeout = minTimeout
  274. }
  275. timeoutDuration := time.Duration(timeout) * time.Second
  276. // open a channel we block against until we get a result
  277. type response struct {
  278. err error
  279. }
  280. ch := make(chan response, 1)
  281. podWorkers.UpdatePod(&UpdatePodOptions{
  282. Pod: pod,
  283. UpdateType: kubetypes.SyncPodKill,
  284. OnCompleteFunc: func(err error) {
  285. ch <- response{err: err}
  286. },
  287. KillPodOptions: &KillPodOptions{
  288. PodStatusFunc: func(p *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
  289. return status
  290. },
  291. PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
  292. },
  293. })
  294. // wait for either a response, or a timeout
  295. select {
  296. case r := <-ch:
  297. return r.err
  298. case <-time.After(timeoutDuration):
  299. recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
  300. return fmt.Errorf("timeout waiting to kill pod")
  301. }
  302. }
  303. }