generic.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pleg
  14. import (
  15. "fmt"
  16. "sync/atomic"
  17. "time"
  18. "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/apimachinery/pkg/util/clock"
  20. "k8s.io/apimachinery/pkg/util/sets"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  23. "k8s.io/klog"
  24. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  25. "k8s.io/kubernetes/pkg/kubelet/metrics"
  26. )
  27. // GenericPLEG is an extremely simple generic PLEG that relies solely on
  28. // periodic listing to discover container changes. It should be used
  29. // as temporary replacement for container runtimes do not support a proper
  30. // event generator yet.
  31. //
  32. // Note that GenericPLEG assumes that a container would not be created,
  33. // terminated, and garbage collected within one relist period. If such an
  34. // incident happens, GenenricPLEG would miss all events regarding this
  35. // container. In the case of relisting failure, the window may become longer.
  36. // Note that this assumption is not unique -- many kubelet internal components
  37. // rely on terminated containers as tombstones for bookkeeping purposes. The
  38. // garbage collector is implemented to work with such situations. However, to
  39. // guarantee that kubelet can handle missing container events, it is
  40. // recommended to set the relist period short and have an auxiliary, longer
  41. // periodic sync in kubelet as the safety net.
  42. type GenericPLEG struct {
  43. // The period for relisting.
  44. relistPeriod time.Duration
  45. // The container runtime.
  46. runtime kubecontainer.Runtime
  47. // The channel from which the subscriber listens events.
  48. eventChannel chan *PodLifecycleEvent
  49. // The internal cache for pod/container information.
  50. podRecords podRecords
  51. // Time of the last relisting.
  52. relistTime atomic.Value
  53. // Cache for storing the runtime states required for syncing pods.
  54. cache kubecontainer.Cache
  55. // For testability.
  56. clock clock.Clock
  57. // Pods that failed to have their status retrieved during a relist. These pods will be
  58. // retried during the next relisting.
  59. podsToReinspect map[types.UID]*kubecontainer.Pod
  60. }
  61. // plegContainerState has a one-to-one mapping to the
  62. // kubecontainer.ContainerState except for the non-existent state. This state
  63. // is introduced here to complete the state transition scenarios.
  64. type plegContainerState string
  65. const (
  66. plegContainerRunning plegContainerState = "running"
  67. plegContainerExited plegContainerState = "exited"
  68. plegContainerUnknown plegContainerState = "unknown"
  69. plegContainerNonExistent plegContainerState = "non-existent"
  70. // The threshold needs to be greater than the relisting period + the
  71. // relisting time, which can vary significantly. Set a conservative
  72. // threshold to avoid flipping between healthy and unhealthy.
  73. relistThreshold = 3 * time.Minute
  74. )
  75. func convertState(state kubecontainer.ContainerState) plegContainerState {
  76. switch state {
  77. case kubecontainer.ContainerStateCreated:
  78. // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
  79. return plegContainerUnknown
  80. case kubecontainer.ContainerStateRunning:
  81. return plegContainerRunning
  82. case kubecontainer.ContainerStateExited:
  83. return plegContainerExited
  84. case kubecontainer.ContainerStateUnknown:
  85. return plegContainerUnknown
  86. default:
  87. panic(fmt.Sprintf("unrecognized container state: %v", state))
  88. }
  89. }
  90. type podRecord struct {
  91. old *kubecontainer.Pod
  92. current *kubecontainer.Pod
  93. }
  94. type podRecords map[types.UID]*podRecord
  95. // NewGenericPLEG instantiates a new GenericPLEG object and return it.
  96. func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
  97. relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
  98. return &GenericPLEG{
  99. relistPeriod: relistPeriod,
  100. runtime: runtime,
  101. eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
  102. podRecords: make(podRecords),
  103. cache: cache,
  104. clock: clock,
  105. }
  106. }
  107. // Watch returns a channel from which the subscriber can receive PodLifecycleEvent
  108. // events.
  109. // TODO: support multiple subscribers.
  110. func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
  111. return g.eventChannel
  112. }
  113. // Start spawns a goroutine to relist periodically.
  114. func (g *GenericPLEG) Start() {
  115. go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
  116. }
  117. // Healthy check if PLEG work properly.
  118. // relistThreshold is the maximum interval between two relist.
  119. func (g *GenericPLEG) Healthy() (bool, error) {
  120. relistTime := g.getRelistTime()
  121. if relistTime.IsZero() {
  122. return false, fmt.Errorf("pleg has yet to be successful")
  123. }
  124. // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
  125. metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
  126. elapsed := g.clock.Since(relistTime)
  127. if elapsed > relistThreshold {
  128. return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
  129. }
  130. return true, nil
  131. }
  132. func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
  133. if newState == oldState {
  134. return nil
  135. }
  136. klog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
  137. switch newState {
  138. case plegContainerRunning:
  139. return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
  140. case plegContainerExited:
  141. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
  142. case plegContainerUnknown:
  143. return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
  144. case plegContainerNonExistent:
  145. switch oldState {
  146. case plegContainerExited:
  147. // We already reported that the container died before.
  148. return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
  149. default:
  150. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
  151. }
  152. default:
  153. panic(fmt.Sprintf("unrecognized container state: %v", newState))
  154. }
  155. }
  156. func (g *GenericPLEG) getRelistTime() time.Time {
  157. val := g.relistTime.Load()
  158. if val == nil {
  159. return time.Time{}
  160. }
  161. return val.(time.Time)
  162. }
  163. func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
  164. g.relistTime.Store(timestamp)
  165. }
  166. // relist queries the container runtime for list of pods/containers, compare
  167. // with the internal pods/containers, and generates events accordingly.
  168. func (g *GenericPLEG) relist() {
  169. klog.V(5).Infof("GenericPLEG: Relisting")
  170. if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
  171. metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
  172. }
  173. timestamp := g.clock.Now()
  174. defer func() {
  175. metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
  176. }()
  177. // Get all the pods.
  178. podList, err := g.runtime.GetPods(true)
  179. if err != nil {
  180. klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
  181. return
  182. }
  183. g.updateRelistTime(timestamp)
  184. pods := kubecontainer.Pods(podList)
  185. // update running pod and container count
  186. updateRunningPodAndContainerMetrics(pods)
  187. g.podRecords.setCurrent(pods)
  188. // Compare the old and the current pods, and generate events.
  189. eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
  190. for pid := range g.podRecords {
  191. oldPod := g.podRecords.getOld(pid)
  192. pod := g.podRecords.getCurrent(pid)
  193. // Get all containers in the old and the new pod.
  194. allContainers := getContainersFromPods(oldPod, pod)
  195. for _, container := range allContainers {
  196. events := computeEvents(oldPod, pod, &container.ID)
  197. for _, e := range events {
  198. updateEvents(eventsByPodID, e)
  199. }
  200. }
  201. }
  202. var needsReinspection map[types.UID]*kubecontainer.Pod
  203. if g.cacheEnabled() {
  204. needsReinspection = make(map[types.UID]*kubecontainer.Pod)
  205. }
  206. // If there are events associated with a pod, we should update the
  207. // podCache.
  208. for pid, events := range eventsByPodID {
  209. pod := g.podRecords.getCurrent(pid)
  210. if g.cacheEnabled() {
  211. // updateCache() will inspect the pod and update the cache. If an
  212. // error occurs during the inspection, we want PLEG to retry again
  213. // in the next relist. To achieve this, we do not update the
  214. // associated podRecord of the pod, so that the change will be
  215. // detect again in the next relist.
  216. // TODO: If many pods changed during the same relist period,
  217. // inspecting the pod and getting the PodStatus to update the cache
  218. // serially may take a while. We should be aware of this and
  219. // parallelize if needed.
  220. if err := g.updateCache(pod, pid); err != nil {
  221. // Rely on updateCache calling GetPodStatus to log the actual error.
  222. klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
  223. // make sure we try to reinspect the pod during the next relisting
  224. needsReinspection[pid] = pod
  225. continue
  226. } else {
  227. // this pod was in the list to reinspect and we did so because it had events, so remove it
  228. // from the list (we don't want the reinspection code below to inspect it a second time in
  229. // this relist execution)
  230. delete(g.podsToReinspect, pid)
  231. }
  232. }
  233. // Update the internal storage and send out the events.
  234. g.podRecords.update(pid)
  235. for i := range events {
  236. // Filter out events that are not reliable and no other components use yet.
  237. if events[i].Type == ContainerChanged {
  238. continue
  239. }
  240. select {
  241. case g.eventChannel <- events[i]:
  242. default:
  243. metrics.PLEGDiscardEvents.Inc()
  244. klog.Error("event channel is full, discard this relist() cycle event")
  245. }
  246. }
  247. }
  248. if g.cacheEnabled() {
  249. // reinspect any pods that failed inspection during the previous relist
  250. if len(g.podsToReinspect) > 0 {
  251. klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
  252. for pid, pod := range g.podsToReinspect {
  253. if err := g.updateCache(pod, pid); err != nil {
  254. // Rely on updateCache calling GetPodStatus to log the actual error.
  255. klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
  256. needsReinspection[pid] = pod
  257. }
  258. }
  259. }
  260. // Update the cache timestamp. This needs to happen *after*
  261. // all pods have been properly updated in the cache.
  262. g.cache.UpdateTime(timestamp)
  263. }
  264. // make sure we retain the list of pods that need reinspecting the next time relist is called
  265. g.podsToReinspect = needsReinspection
  266. }
  267. func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
  268. cidSet := sets.NewString()
  269. var containers []*kubecontainer.Container
  270. for _, p := range pods {
  271. if p == nil {
  272. continue
  273. }
  274. for _, c := range p.Containers {
  275. cid := string(c.ID.ID)
  276. if cidSet.Has(cid) {
  277. continue
  278. }
  279. cidSet.Insert(cid)
  280. containers = append(containers, c)
  281. }
  282. // Update sandboxes as containers
  283. // TODO: keep track of sandboxes explicitly.
  284. for _, c := range p.Sandboxes {
  285. cid := string(c.ID.ID)
  286. if cidSet.Has(cid) {
  287. continue
  288. }
  289. cidSet.Insert(cid)
  290. containers = append(containers, c)
  291. }
  292. }
  293. return containers
  294. }
  295. func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
  296. var pid types.UID
  297. if oldPod != nil {
  298. pid = oldPod.ID
  299. } else if newPod != nil {
  300. pid = newPod.ID
  301. }
  302. oldState := getContainerState(oldPod, cid)
  303. newState := getContainerState(newPod, cid)
  304. return generateEvents(pid, cid.ID, oldState, newState)
  305. }
  306. func (g *GenericPLEG) cacheEnabled() bool {
  307. return g.cache != nil
  308. }
  309. // getPodIP preserves an older cached status' pod IP if the new status has no pod IPs
  310. // and its sandboxes have exited
  311. func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
  312. if len(status.IPs) != 0 {
  313. return status.IPs
  314. }
  315. oldStatus, err := g.cache.Get(pid)
  316. if err != nil || len(oldStatus.IPs) == 0 {
  317. return nil
  318. }
  319. for _, sandboxStatus := range status.SandboxStatuses {
  320. // If at least one sandbox is ready, then use this status update's pod IP
  321. if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
  322. return status.IPs
  323. }
  324. }
  325. // For pods with no ready containers or sandboxes (like exited pods)
  326. // use the old status' pod IP
  327. return oldStatus.IPs
  328. }
  329. func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
  330. if pod == nil {
  331. // The pod is missing in the current relist. This means that
  332. // the pod has no visible (active or inactive) containers.
  333. klog.V(4).Infof("PLEG: Delete status for pod %q", string(pid))
  334. g.cache.Delete(pid)
  335. return nil
  336. }
  337. timestamp := g.clock.Now()
  338. // TODO: Consider adding a new runtime method
  339. // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
  340. // all containers again.
  341. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
  342. klog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
  343. if err == nil {
  344. // Preserve the pod IP across cache updates if the new IP is empty.
  345. // When a pod is torn down, kubelet may race with PLEG and retrieve
  346. // a pod status after network teardown, but the kubernetes API expects
  347. // the completed pod's IP to be available after the pod is dead.
  348. status.IPs = g.getPodIPs(pid, status)
  349. }
  350. g.cache.Set(pod.ID, status, err, timestamp)
  351. return err
  352. }
  353. func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
  354. if e == nil {
  355. return
  356. }
  357. eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
  358. }
  359. func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
  360. // Default to the non-existent state.
  361. state := plegContainerNonExistent
  362. if pod == nil {
  363. return state
  364. }
  365. c := pod.FindContainerByID(*cid)
  366. if c != nil {
  367. return convertState(c.State)
  368. }
  369. // Search through sandboxes too.
  370. c = pod.FindSandboxByID(*cid)
  371. if c != nil {
  372. return convertState(c.State)
  373. }
  374. return state
  375. }
  376. func updateRunningPodAndContainerMetrics(pods []*kubecontainer.Pod) {
  377. // Set the number of running pods in the parameter
  378. metrics.RunningPodCount.Set(float64(len(pods)))
  379. // intermediate map to store the count of each "container_state"
  380. containerStateCount := make(map[string]int)
  381. for _, pod := range pods {
  382. containers := pod.Containers
  383. for _, container := range containers {
  384. // update the corresponding "container_state" in map to set value for the gaugeVec metrics
  385. containerStateCount[string(container.State)]++
  386. }
  387. }
  388. for key, value := range containerStateCount {
  389. metrics.RunningContainerCount.WithLabelValues(key).Set(float64(value))
  390. }
  391. }
  392. func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
  393. r, ok := pr[id]
  394. if !ok {
  395. return nil
  396. }
  397. return r.old
  398. }
  399. func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
  400. r, ok := pr[id]
  401. if !ok {
  402. return nil
  403. }
  404. return r.current
  405. }
  406. func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
  407. for i := range pr {
  408. pr[i].current = nil
  409. }
  410. for _, pod := range pods {
  411. if r, ok := pr[pod.ID]; ok {
  412. r.current = pod
  413. } else {
  414. pr[pod.ID] = &podRecord{current: pod}
  415. }
  416. }
  417. }
  418. func (pr podRecords) update(id types.UID) {
  419. r, ok := pr[id]
  420. if !ok {
  421. return
  422. }
  423. pr.updateInternal(id, r)
  424. }
  425. func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
  426. if r.current == nil {
  427. // Pod no longer exists; delete the entry.
  428. delete(pr, id)
  429. return
  430. }
  431. r.old = r.current
  432. r.current = nil
  433. }