123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package pleg
- import (
- "fmt"
- "sync/atomic"
- "time"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
- "k8s.io/klog"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/metrics"
- )
- // GenericPLEG is an extremely simple generic PLEG that relies solely on
- // periodic listing to discover container changes. It should be used
- // as temporary replacement for container runtimes do not support a proper
- // event generator yet.
- //
- // Note that GenericPLEG assumes that a container would not be created,
- // terminated, and garbage collected within one relist period. If such an
- // incident happens, GenenricPLEG would miss all events regarding this
- // container. In the case of relisting failure, the window may become longer.
- // Note that this assumption is not unique -- many kubelet internal components
- // rely on terminated containers as tombstones for bookkeeping purposes. The
- // garbage collector is implemented to work with such situations. However, to
- // guarantee that kubelet can handle missing container events, it is
- // recommended to set the relist period short and have an auxiliary, longer
- // periodic sync in kubelet as the safety net.
- type GenericPLEG struct {
- // The period for relisting.
- relistPeriod time.Duration
- // The container runtime.
- runtime kubecontainer.Runtime
- // The channel from which the subscriber listens events.
- eventChannel chan *PodLifecycleEvent
- // The internal cache for pod/container information.
- podRecords podRecords
- // Time of the last relisting.
- relistTime atomic.Value
- // Cache for storing the runtime states required for syncing pods.
- cache kubecontainer.Cache
- // For testability.
- clock clock.Clock
- // Pods that failed to have their status retrieved during a relist. These pods will be
- // retried during the next relisting.
- podsToReinspect map[types.UID]*kubecontainer.Pod
- }
- // plegContainerState has a one-to-one mapping to the
- // kubecontainer.ContainerState except for the non-existent state. This state
- // is introduced here to complete the state transition scenarios.
- type plegContainerState string
- const (
- plegContainerRunning plegContainerState = "running"
- plegContainerExited plegContainerState = "exited"
- plegContainerUnknown plegContainerState = "unknown"
- plegContainerNonExistent plegContainerState = "non-existent"
- // The threshold needs to be greater than the relisting period + the
- // relisting time, which can vary significantly. Set a conservative
- // threshold to avoid flipping between healthy and unhealthy.
- relistThreshold = 3 * time.Minute
- )
- func convertState(state kubecontainer.ContainerState) plegContainerState {
- switch state {
- case kubecontainer.ContainerStateCreated:
- // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
- return plegContainerUnknown
- case kubecontainer.ContainerStateRunning:
- return plegContainerRunning
- case kubecontainer.ContainerStateExited:
- return plegContainerExited
- case kubecontainer.ContainerStateUnknown:
- return plegContainerUnknown
- default:
- panic(fmt.Sprintf("unrecognized container state: %v", state))
- }
- }
- type podRecord struct {
- old *kubecontainer.Pod
- current *kubecontainer.Pod
- }
- type podRecords map[types.UID]*podRecord
- // NewGenericPLEG instantiates a new GenericPLEG object and return it.
- func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
- relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
- return &GenericPLEG{
- relistPeriod: relistPeriod,
- runtime: runtime,
- eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
- podRecords: make(podRecords),
- cache: cache,
- clock: clock,
- }
- }
- // Watch returns a channel from which the subscriber can receive PodLifecycleEvent
- // events.
- // TODO: support multiple subscribers.
- func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
- return g.eventChannel
- }
- // Start spawns a goroutine to relist periodically.
- func (g *GenericPLEG) Start() {
- go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
- }
- // Healthy check if PLEG work properly.
- // relistThreshold is the maximum interval between two relist.
- func (g *GenericPLEG) Healthy() (bool, error) {
- relistTime := g.getRelistTime()
- if relistTime.IsZero() {
- return false, fmt.Errorf("pleg has yet to be successful")
- }
- elapsed := g.clock.Since(relistTime)
- if elapsed > relistThreshold {
- return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
- }
- return true, nil
- }
- func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
- if newState == oldState {
- return nil
- }
- klog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
- switch newState {
- case plegContainerRunning:
- return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
- case plegContainerExited:
- return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
- case plegContainerUnknown:
- return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
- case plegContainerNonExistent:
- switch oldState {
- case plegContainerExited:
- // We already reported that the container died before.
- return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
- default:
- return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
- }
- default:
- panic(fmt.Sprintf("unrecognized container state: %v", newState))
- }
- }
- func (g *GenericPLEG) getRelistTime() time.Time {
- val := g.relistTime.Load()
- if val == nil {
- return time.Time{}
- }
- return val.(time.Time)
- }
- func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
- g.relistTime.Store(timestamp)
- }
- // relist queries the container runtime for list of pods/containers, compare
- // with the internal pods/containers, and generates events accordingly.
- func (g *GenericPLEG) relist() {
- klog.V(5).Infof("GenericPLEG: Relisting")
- if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
- metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
- metrics.DeprecatedPLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
- }
- timestamp := g.clock.Now()
- defer func() {
- metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
- metrics.DeprecatedPLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
- }()
- // Get all the pods.
- podList, err := g.runtime.GetPods(true)
- if err != nil {
- klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
- return
- }
- g.updateRelistTime(timestamp)
- pods := kubecontainer.Pods(podList)
- g.podRecords.setCurrent(pods)
- // Compare the old and the current pods, and generate events.
- eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
- for pid := range g.podRecords {
- oldPod := g.podRecords.getOld(pid)
- pod := g.podRecords.getCurrent(pid)
- // Get all containers in the old and the new pod.
- allContainers := getContainersFromPods(oldPod, pod)
- for _, container := range allContainers {
- events := computeEvents(oldPod, pod, &container.ID)
- for _, e := range events {
- updateEvents(eventsByPodID, e)
- }
- }
- }
- var needsReinspection map[types.UID]*kubecontainer.Pod
- if g.cacheEnabled() {
- needsReinspection = make(map[types.UID]*kubecontainer.Pod)
- }
- // If there are events associated with a pod, we should update the
- // podCache.
- for pid, events := range eventsByPodID {
- pod := g.podRecords.getCurrent(pid)
- if g.cacheEnabled() {
- // updateCache() will inspect the pod and update the cache. If an
- // error occurs during the inspection, we want PLEG to retry again
- // in the next relist. To achieve this, we do not update the
- // associated podRecord of the pod, so that the change will be
- // detect again in the next relist.
- // TODO: If many pods changed during the same relist period,
- // inspecting the pod and getting the PodStatus to update the cache
- // serially may take a while. We should be aware of this and
- // parallelize if needed.
- if err := g.updateCache(pod, pid); err != nil {
- // Rely on updateCache calling GetPodStatus to log the actual error.
- klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
- // make sure we try to reinspect the pod during the next relisting
- needsReinspection[pid] = pod
- continue
- } else if _, found := g.podsToReinspect[pid]; found {
- // this pod was in the list to reinspect and we did so because it had events, so remove it
- // from the list (we don't want the reinspection code below to inspect it a second time in
- // this relist execution)
- delete(g.podsToReinspect, pid)
- }
- }
- // Update the internal storage and send out the events.
- g.podRecords.update(pid)
- for i := range events {
- // Filter out events that are not reliable and no other components use yet.
- if events[i].Type == ContainerChanged {
- continue
- }
- select {
- case g.eventChannel <- events[i]:
- default:
- metrics.PLEGDiscardEvents.WithLabelValues().Inc()
- klog.Error("event channel is full, discard this relist() cycle event")
- }
- }
- }
- if g.cacheEnabled() {
- // reinspect any pods that failed inspection during the previous relist
- if len(g.podsToReinspect) > 0 {
- klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
- for pid, pod := range g.podsToReinspect {
- if err := g.updateCache(pod, pid); err != nil {
- // Rely on updateCache calling GetPodStatus to log the actual error.
- klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
- needsReinspection[pid] = pod
- }
- }
- }
- // Update the cache timestamp. This needs to happen *after*
- // all pods have been properly updated in the cache.
- g.cache.UpdateTime(timestamp)
- }
- // make sure we retain the list of pods that need reinspecting the next time relist is called
- g.podsToReinspect = needsReinspection
- }
- func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
- cidSet := sets.NewString()
- var containers []*kubecontainer.Container
- for _, p := range pods {
- if p == nil {
- continue
- }
- for _, c := range p.Containers {
- cid := string(c.ID.ID)
- if cidSet.Has(cid) {
- continue
- }
- cidSet.Insert(cid)
- containers = append(containers, c)
- }
- // Update sandboxes as containers
- // TODO: keep track of sandboxes explicitly.
- for _, c := range p.Sandboxes {
- cid := string(c.ID.ID)
- if cidSet.Has(cid) {
- continue
- }
- cidSet.Insert(cid)
- containers = append(containers, c)
- }
- }
- return containers
- }
- func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
- var pid types.UID
- if oldPod != nil {
- pid = oldPod.ID
- } else if newPod != nil {
- pid = newPod.ID
- }
- oldState := getContainerState(oldPod, cid)
- newState := getContainerState(newPod, cid)
- return generateEvents(pid, cid.ID, oldState, newState)
- }
- func (g *GenericPLEG) cacheEnabled() bool {
- return g.cache != nil
- }
- // getPodIP preserves an older cached status' pod IP if the new status has no pod IP
- // and its sandboxes have exited
- func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string {
- if status.IP != "" {
- return status.IP
- }
- oldStatus, err := g.cache.Get(pid)
- if err != nil || oldStatus.IP == "" {
- return ""
- }
- for _, sandboxStatus := range status.SandboxStatuses {
- // If at least one sandbox is ready, then use this status update's pod IP
- if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
- return status.IP
- }
- }
- if len(status.SandboxStatuses) == 0 {
- // Without sandboxes (which built-in runtimes like rkt don't report)
- // look at all the container statuses, and if any containers are
- // running then use the new pod IP
- for _, containerStatus := range status.ContainerStatuses {
- if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning {
- return status.IP
- }
- }
- }
- // For pods with no ready containers or sandboxes (like exited pods)
- // use the old status' pod IP
- return oldStatus.IP
- }
- func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
- if pod == nil {
- // The pod is missing in the current relist. This means that
- // the pod has no visible (active or inactive) containers.
- klog.V(4).Infof("PLEG: Delete status for pod %q", string(pid))
- g.cache.Delete(pid)
- return nil
- }
- timestamp := g.clock.Now()
- // TODO: Consider adding a new runtime method
- // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
- // all containers again.
- status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
- klog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
- if err == nil {
- // Preserve the pod IP across cache updates if the new IP is empty.
- // When a pod is torn down, kubelet may race with PLEG and retrieve
- // a pod status after network teardown, but the kubernetes API expects
- // the completed pod's IP to be available after the pod is dead.
- status.IP = g.getPodIP(pid, status)
- }
- g.cache.Set(pod.ID, status, err, timestamp)
- return err
- }
- func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
- if e == nil {
- return
- }
- eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
- }
- func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
- // Default to the non-existent state.
- state := plegContainerNonExistent
- if pod == nil {
- return state
- }
- c := pod.FindContainerByID(*cid)
- if c != nil {
- return convertState(c.State)
- }
- // Search through sandboxes too.
- c = pod.FindSandboxByID(*cid)
- if c != nil {
- return convertState(c.State)
- }
- return state
- }
- func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
- r, ok := pr[id]
- if !ok {
- return nil
- }
- return r.old
- }
- func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
- r, ok := pr[id]
- if !ok {
- return nil
- }
- return r.current
- }
- func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
- for i := range pr {
- pr[i].current = nil
- }
- for _, pod := range pods {
- if r, ok := pr[pod.ID]; ok {
- r.current = pod
- } else {
- pr[pod.ID] = &podRecord{current: pod}
- }
- }
- }
- func (pr podRecords) update(id types.UID) {
- r, ok := pr[id]
- if !ok {
- return
- }
- pr.updateInternal(id, r)
- }
- func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
- if r.current == nil {
- // Pod no longer exists; delete the entry.
- delete(pr, id)
- return
- }
- r.old = r.current
- r.current = nil
- }
|