123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package prober
- import (
- "math/rand"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/component-base/metrics"
- "k8s.io/klog"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/prober/results"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- )
- // worker handles the periodic probing of its assigned container. Each worker has a go-routine
- // associated with it which runs the probe loop until the container permanently terminates, or the
- // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
- // container IDs.
- type worker struct {
- // Channel for stopping the probe.
- stopCh chan struct{}
- // The pod containing this probe (read-only)
- pod *v1.Pod
- // The container to probe (read-only)
- container v1.Container
- // Describes the probe configuration (read-only)
- spec *v1.Probe
- // The type of the worker.
- probeType probeType
- // The probe value during the initial delay.
- initialValue results.Result
- // Where to store this workers results.
- resultsManager results.Manager
- probeManager *manager
- // The last known container ID for this worker.
- containerID kubecontainer.ContainerID
- // The last probe result for this worker.
- lastResult results.Result
- // How many times in a row the probe has returned the same result.
- resultRun int
- // If set, skip probing.
- onHold bool
- // proberResultsMetricLabels holds the labels attached to this worker
- // for the ProberResults metric by result.
- proberResultsSuccessfulMetricLabels metrics.Labels
- proberResultsFailedMetricLabels metrics.Labels
- proberResultsUnknownMetricLabels metrics.Labels
- }
- // Creates and starts a new probe worker.
- func newWorker(
- m *manager,
- probeType probeType,
- pod *v1.Pod,
- container v1.Container) *worker {
- w := &worker{
- stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
- pod: pod,
- container: container,
- probeType: probeType,
- probeManager: m,
- }
- switch probeType {
- case readiness:
- w.spec = container.ReadinessProbe
- w.resultsManager = m.readinessManager
- w.initialValue = results.Failure
- case liveness:
- w.spec = container.LivenessProbe
- w.resultsManager = m.livenessManager
- w.initialValue = results.Success
- case startup:
- w.spec = container.StartupProbe
- w.resultsManager = m.startupManager
- w.initialValue = results.Unknown
- }
- basicMetricLabels := metrics.Labels{
- "probe_type": w.probeType.String(),
- "container": w.container.Name,
- "pod": w.pod.Name,
- "namespace": w.pod.Namespace,
- "pod_uid": string(w.pod.UID),
- }
- w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
- w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
- w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
- w.proberResultsFailedMetricLabels["result"] = probeResultFailed
- w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
- w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
- return w
- }
- // run periodically probes the container.
- func (w *worker) run() {
- probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
- // If kubelet restarted the probes could be started in rapid succession.
- // Let the worker wait for a random portion of tickerPeriod before probing.
- time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
- probeTicker := time.NewTicker(probeTickerPeriod)
- defer func() {
- // Clean up.
- probeTicker.Stop()
- if !w.containerID.IsEmpty() {
- w.resultsManager.Remove(w.containerID)
- }
- w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
- ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
- ProberResults.Delete(w.proberResultsFailedMetricLabels)
- ProberResults.Delete(w.proberResultsUnknownMetricLabels)
- }()
- probeLoop:
- for w.doProbe() {
- // Wait for next probe tick.
- select {
- case <-w.stopCh:
- break probeLoop
- case <-probeTicker.C:
- // continue
- }
- }
- }
- // stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
- // It is safe to call stop multiple times.
- func (w *worker) stop() {
- select {
- case w.stopCh <- struct{}{}:
- default: // Non-blocking.
- }
- }
- // doProbe probes the container once and records the result.
- // Returns whether the worker should continue.
- func (w *worker) doProbe() (keepGoing bool) {
- defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
- defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
- status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
- if !ok {
- // Either the pod has not been created yet, or it was already deleted.
- klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
- return true
- }
- // Worker should terminate if pod is terminated.
- if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
- klog.V(3).Infof("Pod %v %v, exiting probe worker",
- format.Pod(w.pod), status.Phase)
- return false
- }
- c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
- if !ok || len(c.ContainerID) == 0 {
- // Either the container has not been created yet, or it was deleted.
- klog.V(3).Infof("Probe target container not found: %v - %v",
- format.Pod(w.pod), w.container.Name)
- return true // Wait for more information.
- }
- if w.containerID.String() != c.ContainerID {
- if !w.containerID.IsEmpty() {
- w.resultsManager.Remove(w.containerID)
- }
- w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
- w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
- // We've got a new container; resume probing.
- w.onHold = false
- }
- if w.onHold {
- // Worker is on hold until there is a new container.
- return true
- }
- if c.State.Running == nil {
- klog.V(3).Infof("Non-running container probed: %v - %v",
- format.Pod(w.pod), w.container.Name)
- if !w.containerID.IsEmpty() {
- w.resultsManager.Set(w.containerID, results.Failure, w.pod)
- }
- // Abort if the container will not be restarted.
- return c.State.Terminated == nil ||
- w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
- }
- // Probe disabled for InitialDelaySeconds.
- if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
- return true
- }
- if c.Started != nil && *c.Started {
- // Stop probing for startup once container has started.
- if w.probeType == startup {
- return true
- }
- } else {
- // Disable other probes until container has started.
- if w.probeType != startup {
- return true
- }
- }
- // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
- // the full container environment here, OR we must make a call to the CRI in order to get those environment
- // values from the running container.
- result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
- if err != nil {
- // Prober error, throw away the result.
- return true
- }
- switch result {
- case results.Success:
- ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
- case results.Failure:
- ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
- default:
- ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
- }
- if w.lastResult == result {
- w.resultRun++
- } else {
- w.lastResult = result
- w.resultRun = 1
- }
- if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
- (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
- // Success or failure is below threshold - leave the probe state unchanged.
- return true
- }
- w.resultsManager.Set(w.containerID, result, w.pod)
- if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
- // The container fails a liveness/startup check, it will need to be restarted.
- // Stop probing until we see a new container ID. This is to reduce the
- // chance of hitting #21751, where running `docker exec` when a
- // container is being stopped may lead to corrupted container state.
- w.onHold = true
- w.resultRun = 0
- }
- return true
- }
- func deepCopyPrometheusLabels(m metrics.Labels) metrics.Labels {
- ret := make(metrics.Labels, len(m))
- for k, v := range m {
- ret[k] = v
- }
- return ret
- }
|