123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package prober
- import (
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/client-go/tools/record"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/events"
- "k8s.io/kubernetes/pkg/kubelet/prober/results"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/probe"
- execprobe "k8s.io/kubernetes/pkg/probe/exec"
- httprobe "k8s.io/kubernetes/pkg/probe/http"
- tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
- "k8s.io/utils/exec"
- "k8s.io/klog"
- )
- const maxProbeRetries = 3
- // Prober helps to check the liveness/readiness of a container.
- type prober struct {
- exec execprobe.Prober
- // probe types needs different httprobe instances so they don't
- // share a connection pool which can cause collsions to the
- // same host:port and transient failures. See #49740.
- readinessHttp httprobe.Prober
- livenessHttp httprobe.Prober
- tcp tcprobe.Prober
- runner kubecontainer.ContainerCommandRunner
- refManager *kubecontainer.RefManager
- recorder record.EventRecorder
- }
- // NewProber creates a Prober, it takes a command runner and
- // several container info managers.
- func newProber(
- runner kubecontainer.ContainerCommandRunner,
- refManager *kubecontainer.RefManager,
- recorder record.EventRecorder) *prober {
- const followNonLocalRedirects = false
- return &prober{
- exec: execprobe.New(),
- readinessHttp: httprobe.New(followNonLocalRedirects),
- livenessHttp: httprobe.New(followNonLocalRedirects),
- tcp: tcprobe.New(),
- runner: runner,
- refManager: refManager,
- recorder: recorder,
- }
- }
- // probe probes the container.
- func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
- var probeSpec *v1.Probe
- switch probeType {
- case readiness:
- probeSpec = container.ReadinessProbe
- case liveness:
- probeSpec = container.LivenessProbe
- default:
- return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
- }
- ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
- if probeSpec == nil {
- klog.Warningf("%s probe for %s is nil", probeType, ctrName)
- return results.Success, nil
- }
- result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
- if err != nil || (result != probe.Success && result != probe.Warning) {
- // Probe failed in one way or another.
- ref, hasRef := pb.refManager.GetRef(containerID)
- if !hasRef {
- klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
- }
- if err != nil {
- klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
- if hasRef {
- pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
- }
- } else { // result != probe.Success
- klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
- if hasRef {
- pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
- }
- }
- return results.Failure, err
- }
- if result == probe.Warning {
- if ref, hasRef := pb.refManager.GetRef(containerID); hasRef {
- pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
- }
- klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
- } else {
- klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
- }
- return results.Success, nil
- }
- // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
- // if it never succeeds.
- func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
- var err error
- var result probe.Result
- var output string
- for i := 0; i < retries; i++ {
- result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
- if err == nil {
- return result, output, nil
- }
- }
- return result, output, err
- }
- // buildHeaderMap takes a list of HTTPHeader <name, value> string
- // pairs and returns a populated string->[]string http.Header map.
- func buildHeader(headerList []v1.HTTPHeader) http.Header {
- headers := make(http.Header)
- for _, header := range headerList {
- headers[header.Name] = append(headers[header.Name], header.Value)
- }
- return headers
- }
- func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
- timeout := time.Duration(p.TimeoutSeconds) * time.Second
- if p.Exec != nil {
- klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
- command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
- return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
- }
- if p.HTTPGet != nil {
- scheme := strings.ToLower(string(p.HTTPGet.Scheme))
- host := p.HTTPGet.Host
- if host == "" {
- host = status.PodIP
- }
- port, err := extractPort(p.HTTPGet.Port, container)
- if err != nil {
- return probe.Unknown, "", err
- }
- path := p.HTTPGet.Path
- klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
- url := formatURL(scheme, host, port, path)
- headers := buildHeader(p.HTTPGet.HTTPHeaders)
- klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
- if probeType == liveness {
- return pb.livenessHttp.Probe(url, headers, timeout)
- } else { // readiness
- return pb.readinessHttp.Probe(url, headers, timeout)
- }
- }
- if p.TCPSocket != nil {
- port, err := extractPort(p.TCPSocket.Port, container)
- if err != nil {
- return probe.Unknown, "", err
- }
- host := p.TCPSocket.Host
- if host == "" {
- host = status.PodIP
- }
- klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
- return pb.tcp.Probe(host, port, timeout)
- }
- klog.Warningf("Failed to find probe builder for container: %v", container)
- return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
- }
- func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
- port := -1
- var err error
- switch param.Type {
- case intstr.Int:
- port = param.IntValue()
- case intstr.String:
- if port, err = findPortByName(container, param.StrVal); err != nil {
- // Last ditch effort - maybe it was an int stored as string?
- if port, err = strconv.Atoi(param.StrVal); err != nil {
- return port, err
- }
- }
- default:
- return port, fmt.Errorf("IntOrString had no kind: %+v", param)
- }
- if port > 0 && port < 65536 {
- return port, nil
- }
- return port, fmt.Errorf("invalid port number: %v", port)
- }
- // findPortByName is a helper function to look up a port in a container by name.
- func findPortByName(container v1.Container, portName string) (int, error) {
- for _, port := range container.Ports {
- if port.Name == portName {
- return int(port.ContainerPort), nil
- }
- }
- return 0, fmt.Errorf("port %s not found", portName)
- }
- // formatURL formats a URL from args. For testability.
- func formatURL(scheme string, host string, port int, path string) *url.URL {
- u, err := url.Parse(path)
- // Something is busted with the path, but it's too late to reject it. Pass it along as is.
- if err != nil {
- u = &url.URL{
- Path: path,
- }
- }
- u.Scheme = scheme
- u.Host = net.JoinHostPort(host, strconv.Itoa(port))
- return u
- }
- type execInContainer struct {
- // run executes a command in a container. Combined stdout and stderr output is always returned. An
- // error is returned if one occurred.
- run func() ([]byte, error)
- }
- func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
- return execInContainer{func() ([]byte, error) {
- return pb.runner.RunInContainer(containerID, cmd, timeout)
- }}
- }
- func (eic execInContainer) Run() error {
- return fmt.Errorf("unimplemented")
- }
- func (eic execInContainer) CombinedOutput() ([]byte, error) {
- return eic.run()
- }
- func (eic execInContainer) Output() ([]byte, error) {
- return nil, fmt.Errorf("unimplemented")
- }
- func (eic execInContainer) SetDir(dir string) {
- //unimplemented
- }
- func (eic execInContainer) SetStdin(in io.Reader) {
- //unimplemented
- }
- func (eic execInContainer) SetStdout(out io.Writer) {
- //unimplemented
- }
- func (eic execInContainer) SetStderr(out io.Writer) {
- //unimplemented
- }
- func (eic execInContainer) SetEnv(env []string) {
- //unimplemented
- }
- func (eic execInContainer) Stop() {
- //unimplemented
- }
- func (eic execInContainer) Start() error {
- return fmt.Errorf("unimplemented")
- }
- func (eic execInContainer) Wait() error {
- return fmt.Errorf("unimplemented")
- }
- func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) {
- return nil, fmt.Errorf("unimplemented")
- }
- func (eic execInContainer) StderrPipe() (io.ReadCloser, error) {
- return nil, fmt.Errorf("unimplemented")
- }
|