generic.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pleg
  14. import (
  15. "fmt"
  16. "sync/atomic"
  17. "time"
  18. "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/apimachinery/pkg/util/clock"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  23. "k8s.io/klog"
  24. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  25. "k8s.io/kubernetes/pkg/kubelet/metrics"
  26. )
  27. // GenericPLEG is an extremely simple generic PLEG that relies solely on
  28. // periodic listing to discover container changes. It should be used
  29. // as temporary replacement for container runtimes do not support a proper
  30. // event generator yet.
  31. //
  32. // Note that GenericPLEG assumes that a container would not be created,
  33. // terminated, and garbage collected within one relist period. If such an
  34. // incident happens, GenenricPLEG would miss all events regarding this
  35. // container. In the case of relisting failure, the window may become longer.
  36. // Note that this assumption is not unique -- many kubelet internal components
  37. // rely on terminated containers as tombstones for bookkeeping purposes. The
  38. // garbage collector is implemented to work with such situations. However, to
  39. // guarantee that kubelet can handle missing container events, it is
  40. // recommended to set the relist period short and have an auxiliary, longer
  41. // periodic sync in kubelet as the safety net.
  42. type GenericPLEG struct {
  43. // The period for relisting.
  44. relistPeriod time.Duration
  45. // The container runtime.
  46. runtime kubecontainer.Runtime
  47. // The channel from which the subscriber listens events.
  48. eventChannel chan *PodLifecycleEvent
  49. // The internal cache for pod/container information.
  50. podRecords podRecords
  51. // Time of the last relisting.
  52. relistTime atomic.Value
  53. // Cache for storing the runtime states required for syncing pods.
  54. cache kubecontainer.Cache
  55. // For testability.
  56. clock clock.Clock
  57. // Pods that failed to have their status retrieved during a relist. These pods will be
  58. // retried during the next relisting.
  59. podsToReinspect map[types.UID]*kubecontainer.Pod
  60. }
  61. // plegContainerState has a one-to-one mapping to the
  62. // kubecontainer.ContainerState except for the non-existent state. This state
  63. // is introduced here to complete the state transition scenarios.
  64. type plegContainerState string
  65. const (
  66. plegContainerRunning plegContainerState = "running"
  67. plegContainerExited plegContainerState = "exited"
  68. plegContainerUnknown plegContainerState = "unknown"
  69. plegContainerNonExistent plegContainerState = "non-existent"
  70. // The threshold needs to be greater than the relisting period + the
  71. // relisting time, which can vary significantly. Set a conservative
  72. // threshold to avoid flipping between healthy and unhealthy.
  73. relistThreshold = 3 * time.Minute
  74. )
  75. func convertState(state kubecontainer.ContainerState) plegContainerState {
  76. switch state {
  77. case kubecontainer.ContainerStateCreated:
  78. // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
  79. return plegContainerUnknown
  80. case kubecontainer.ContainerStateRunning:
  81. return plegContainerRunning
  82. case kubecontainer.ContainerStateExited:
  83. return plegContainerExited
  84. case kubecontainer.ContainerStateUnknown:
  85. return plegContainerUnknown
  86. default:
  87. panic(fmt.Sprintf("unrecognized container state: %v", state))
  88. }
  89. }
  90. type podRecord struct {
  91. old *kubecontainer.Pod
  92. current *kubecontainer.Pod
  93. }
  94. type podRecords map[types.UID]*podRecord
  95. // NewGenericPLEG instantiates a new GenericPLEG object and return it.
  96. func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
  97. relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
  98. return &GenericPLEG{
  99. relistPeriod: relistPeriod,
  100. runtime: runtime,
  101. eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
  102. podRecords: make(podRecords),
  103. cache: cache,
  104. clock: clock,
  105. }
  106. }
  107. // Watch returns a channel from which the subscriber can receive PodLifecycleEvent
  108. // events.
  109. // TODO: support multiple subscribers.
  110. func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
  111. return g.eventChannel
  112. }
  113. // Start spawns a goroutine to relist periodically.
  114. func (g *GenericPLEG) Start() {
  115. go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
  116. }
  117. // Healthy check if PLEG work properly.
  118. // relistThreshold is the maximum interval between two relist.
  119. func (g *GenericPLEG) Healthy() (bool, error) {
  120. relistTime := g.getRelistTime()
  121. if relistTime.IsZero() {
  122. return false, fmt.Errorf("pleg has yet to be successful")
  123. }
  124. elapsed := g.clock.Since(relistTime)
  125. if elapsed > relistThreshold {
  126. return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
  127. }
  128. return true, nil
  129. }
  130. func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
  131. if newState == oldState {
  132. return nil
  133. }
  134. klog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
  135. switch newState {
  136. case plegContainerRunning:
  137. return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
  138. case plegContainerExited:
  139. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
  140. case plegContainerUnknown:
  141. return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
  142. case plegContainerNonExistent:
  143. switch oldState {
  144. case plegContainerExited:
  145. // We already reported that the container died before.
  146. return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
  147. default:
  148. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
  149. }
  150. default:
  151. panic(fmt.Sprintf("unrecognized container state: %v", newState))
  152. }
  153. }
  154. func (g *GenericPLEG) getRelistTime() time.Time {
  155. val := g.relistTime.Load()
  156. if val == nil {
  157. return time.Time{}
  158. }
  159. return val.(time.Time)
  160. }
  161. func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
  162. g.relistTime.Store(timestamp)
  163. }
  164. // relist queries the container runtime for list of pods/containers, compare
  165. // with the internal pods/containers, and generates events accordingly.
  166. func (g *GenericPLEG) relist() {
  167. klog.V(5).Infof("GenericPLEG: Relisting")
  168. if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
  169. metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
  170. metrics.DeprecatedPLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
  171. }
  172. timestamp := g.clock.Now()
  173. defer func() {
  174. metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
  175. metrics.DeprecatedPLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
  176. }()
  177. // Get all the pods.
  178. podList, err := g.runtime.GetPods(true)
  179. if err != nil {
  180. klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
  181. return
  182. }
  183. g.updateRelistTime(timestamp)
  184. pods := kubecontainer.Pods(podList)
  185. g.podRecords.setCurrent(pods)
  186. // Compare the old and the current pods, and generate events.
  187. eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
  188. for pid := range g.podRecords {
  189. oldPod := g.podRecords.getOld(pid)
  190. pod := g.podRecords.getCurrent(pid)
  191. // Get all containers in the old and the new pod.
  192. allContainers := getContainersFromPods(oldPod, pod)
  193. for _, container := range allContainers {
  194. events := computeEvents(oldPod, pod, &container.ID)
  195. for _, e := range events {
  196. updateEvents(eventsByPodID, e)
  197. }
  198. }
  199. }
  200. var needsReinspection map[types.UID]*kubecontainer.Pod
  201. if g.cacheEnabled() {
  202. needsReinspection = make(map[types.UID]*kubecontainer.Pod)
  203. }
  204. // If there are events associated with a pod, we should update the
  205. // podCache.
  206. for pid, events := range eventsByPodID {
  207. pod := g.podRecords.getCurrent(pid)
  208. if g.cacheEnabled() {
  209. // updateCache() will inspect the pod and update the cache. If an
  210. // error occurs during the inspection, we want PLEG to retry again
  211. // in the next relist. To achieve this, we do not update the
  212. // associated podRecord of the pod, so that the change will be
  213. // detect again in the next relist.
  214. // TODO: If many pods changed during the same relist period,
  215. // inspecting the pod and getting the PodStatus to update the cache
  216. // serially may take a while. We should be aware of this and
  217. // parallelize if needed.
  218. if err := g.updateCache(pod, pid); err != nil {
  219. // Rely on updateCache calling GetPodStatus to log the actual error.
  220. klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
  221. // make sure we try to reinspect the pod during the next relisting
  222. needsReinspection[pid] = pod
  223. continue
  224. } else if _, found := g.podsToReinspect[pid]; found {
  225. // this pod was in the list to reinspect and we did so because it had events, so remove it
  226. // from the list (we don't want the reinspection code below to inspect it a second time in
  227. // this relist execution)
  228. delete(g.podsToReinspect, pid)
  229. }
  230. }
  231. // Update the internal storage and send out the events.
  232. g.podRecords.update(pid)
  233. for i := range events {
  234. // Filter out events that are not reliable and no other components use yet.
  235. if events[i].Type == ContainerChanged {
  236. continue
  237. }
  238. select {
  239. case g.eventChannel <- events[i]:
  240. default:
  241. metrics.PLEGDiscardEvents.WithLabelValues().Inc()
  242. klog.Error("event channel is full, discard this relist() cycle event")
  243. }
  244. }
  245. }
  246. if g.cacheEnabled() {
  247. // reinspect any pods that failed inspection during the previous relist
  248. if len(g.podsToReinspect) > 0 {
  249. klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
  250. for pid, pod := range g.podsToReinspect {
  251. if err := g.updateCache(pod, pid); err != nil {
  252. // Rely on updateCache calling GetPodStatus to log the actual error.
  253. klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
  254. needsReinspection[pid] = pod
  255. }
  256. }
  257. }
  258. // Update the cache timestamp. This needs to happen *after*
  259. // all pods have been properly updated in the cache.
  260. g.cache.UpdateTime(timestamp)
  261. }
  262. // make sure we retain the list of pods that need reinspecting the next time relist is called
  263. g.podsToReinspect = needsReinspection
  264. }
  265. func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
  266. cidSet := sets.NewString()
  267. var containers []*kubecontainer.Container
  268. for _, p := range pods {
  269. if p == nil {
  270. continue
  271. }
  272. for _, c := range p.Containers {
  273. cid := string(c.ID.ID)
  274. if cidSet.Has(cid) {
  275. continue
  276. }
  277. cidSet.Insert(cid)
  278. containers = append(containers, c)
  279. }
  280. // Update sandboxes as containers
  281. // TODO: keep track of sandboxes explicitly.
  282. for _, c := range p.Sandboxes {
  283. cid := string(c.ID.ID)
  284. if cidSet.Has(cid) {
  285. continue
  286. }
  287. cidSet.Insert(cid)
  288. containers = append(containers, c)
  289. }
  290. }
  291. return containers
  292. }
  293. func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
  294. var pid types.UID
  295. if oldPod != nil {
  296. pid = oldPod.ID
  297. } else if newPod != nil {
  298. pid = newPod.ID
  299. }
  300. oldState := getContainerState(oldPod, cid)
  301. newState := getContainerState(newPod, cid)
  302. return generateEvents(pid, cid.ID, oldState, newState)
  303. }
  304. func (g *GenericPLEG) cacheEnabled() bool {
  305. return g.cache != nil
  306. }
  307. // getPodIP preserves an older cached status' pod IP if the new status has no pod IP
  308. // and its sandboxes have exited
  309. func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string {
  310. if status.IP != "" {
  311. return status.IP
  312. }
  313. oldStatus, err := g.cache.Get(pid)
  314. if err != nil || oldStatus.IP == "" {
  315. return ""
  316. }
  317. for _, sandboxStatus := range status.SandboxStatuses {
  318. // If at least one sandbox is ready, then use this status update's pod IP
  319. if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
  320. return status.IP
  321. }
  322. }
  323. if len(status.SandboxStatuses) == 0 {
  324. // Without sandboxes (which built-in runtimes like rkt don't report)
  325. // look at all the container statuses, and if any containers are
  326. // running then use the new pod IP
  327. for _, containerStatus := range status.ContainerStatuses {
  328. if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning {
  329. return status.IP
  330. }
  331. }
  332. }
  333. // For pods with no ready containers or sandboxes (like exited pods)
  334. // use the old status' pod IP
  335. return oldStatus.IP
  336. }
  337. func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
  338. if pod == nil {
  339. // The pod is missing in the current relist. This means that
  340. // the pod has no visible (active or inactive) containers.
  341. klog.V(4).Infof("PLEG: Delete status for pod %q", string(pid))
  342. g.cache.Delete(pid)
  343. return nil
  344. }
  345. timestamp := g.clock.Now()
  346. // TODO: Consider adding a new runtime method
  347. // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
  348. // all containers again.
  349. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
  350. klog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
  351. if err == nil {
  352. // Preserve the pod IP across cache updates if the new IP is empty.
  353. // When a pod is torn down, kubelet may race with PLEG and retrieve
  354. // a pod status after network teardown, but the kubernetes API expects
  355. // the completed pod's IP to be available after the pod is dead.
  356. status.IP = g.getPodIP(pid, status)
  357. }
  358. g.cache.Set(pod.ID, status, err, timestamp)
  359. return err
  360. }
  361. func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
  362. if e == nil {
  363. return
  364. }
  365. eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
  366. }
  367. func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
  368. // Default to the non-existent state.
  369. state := plegContainerNonExistent
  370. if pod == nil {
  371. return state
  372. }
  373. c := pod.FindContainerByID(*cid)
  374. if c != nil {
  375. return convertState(c.State)
  376. }
  377. // Search through sandboxes too.
  378. c = pod.FindSandboxByID(*cid)
  379. if c != nil {
  380. return convertState(c.State)
  381. }
  382. return state
  383. }
  384. func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
  385. r, ok := pr[id]
  386. if !ok {
  387. return nil
  388. }
  389. return r.old
  390. }
  391. func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
  392. r, ok := pr[id]
  393. if !ok {
  394. return nil
  395. }
  396. return r.current
  397. }
  398. func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
  399. for i := range pr {
  400. pr[i].current = nil
  401. }
  402. for _, pod := range pods {
  403. if r, ok := pr[pod.ID]; ok {
  404. r.current = pod
  405. } else {
  406. pr[pod.ID] = &podRecord{current: pod}
  407. }
  408. }
  409. }
  410. func (pr podRecords) update(id types.UID) {
  411. r, ok := pr[id]
  412. if !ok {
  413. return
  414. }
  415. pr.updateInternal(id, r)
  416. }
  417. func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
  418. if r.current == nil {
  419. // Pod no longer exists; delete the entry.
  420. delete(pr, id)
  421. return
  422. }
  423. r.old = r.current
  424. r.current = nil
  425. }