prober.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "fmt"
  16. "io"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "time"
  23. v1 "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/util/intstr"
  25. "k8s.io/client-go/tools/record"
  26. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  27. "k8s.io/kubernetes/pkg/kubelet/events"
  28. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  29. "k8s.io/kubernetes/pkg/kubelet/util/format"
  30. "k8s.io/kubernetes/pkg/probe"
  31. execprobe "k8s.io/kubernetes/pkg/probe/exec"
  32. httpprobe "k8s.io/kubernetes/pkg/probe/http"
  33. tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
  34. "k8s.io/utils/exec"
  35. "k8s.io/klog"
  36. )
  37. const maxProbeRetries = 3
  38. // Prober helps to check the liveness/readiness/startup of a container.
  39. type prober struct {
  40. exec execprobe.Prober
  41. // probe types needs different httprobe instances so they don't
  42. // share a connection pool which can cause collsions to the
  43. // same host:port and transient failures. See #49740.
  44. readinessHTTP httpprobe.Prober
  45. livenessHTTP httpprobe.Prober
  46. startupHTTP httpprobe.Prober
  47. tcp tcpprobe.Prober
  48. runner kubecontainer.ContainerCommandRunner
  49. refManager *kubecontainer.RefManager
  50. recorder record.EventRecorder
  51. }
  52. // NewProber creates a Prober, it takes a command runner and
  53. // several container info managers.
  54. func newProber(
  55. runner kubecontainer.ContainerCommandRunner,
  56. refManager *kubecontainer.RefManager,
  57. recorder record.EventRecorder) *prober {
  58. const followNonLocalRedirects = false
  59. return &prober{
  60. exec: execprobe.New(),
  61. readinessHTTP: httpprobe.New(followNonLocalRedirects),
  62. livenessHTTP: httpprobe.New(followNonLocalRedirects),
  63. startupHTTP: httpprobe.New(followNonLocalRedirects),
  64. tcp: tcpprobe.New(),
  65. runner: runner,
  66. refManager: refManager,
  67. recorder: recorder,
  68. }
  69. }
  70. // probe probes the container.
  71. func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
  72. var probeSpec *v1.Probe
  73. switch probeType {
  74. case readiness:
  75. probeSpec = container.ReadinessProbe
  76. case liveness:
  77. probeSpec = container.LivenessProbe
  78. case startup:
  79. probeSpec = container.StartupProbe
  80. default:
  81. return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
  82. }
  83. ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
  84. if probeSpec == nil {
  85. klog.Warningf("%s probe for %s is nil", probeType, ctrName)
  86. return results.Success, nil
  87. }
  88. result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
  89. if err != nil || (result != probe.Success && result != probe.Warning) {
  90. // Probe failed in one way or another.
  91. ref, hasRef := pb.refManager.GetRef(containerID)
  92. if !hasRef {
  93. klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
  94. }
  95. if err != nil {
  96. klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
  97. if hasRef {
  98. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
  99. }
  100. } else { // result != probe.Success
  101. klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
  102. if hasRef {
  103. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
  104. }
  105. }
  106. return results.Failure, err
  107. }
  108. if result == probe.Warning {
  109. if ref, hasRef := pb.refManager.GetRef(containerID); hasRef {
  110. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
  111. }
  112. klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
  113. } else {
  114. klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
  115. }
  116. return results.Success, nil
  117. }
  118. // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
  119. // if it never succeeds.
  120. func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
  121. var err error
  122. var result probe.Result
  123. var output string
  124. for i := 0; i < retries; i++ {
  125. result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
  126. if err == nil {
  127. return result, output, nil
  128. }
  129. }
  130. return result, output, err
  131. }
  132. // buildHeaderMap takes a list of HTTPHeader <name, value> string
  133. // pairs and returns a populated string->[]string http.Header map.
  134. func buildHeader(headerList []v1.HTTPHeader) http.Header {
  135. headers := make(http.Header)
  136. for _, header := range headerList {
  137. headers[header.Name] = append(headers[header.Name], header.Value)
  138. }
  139. return headers
  140. }
  141. func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
  142. timeout := time.Duration(p.TimeoutSeconds) * time.Second
  143. if p.Exec != nil {
  144. klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
  145. command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
  146. return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
  147. }
  148. if p.HTTPGet != nil {
  149. scheme := strings.ToLower(string(p.HTTPGet.Scheme))
  150. host := p.HTTPGet.Host
  151. if host == "" {
  152. host = status.PodIP
  153. }
  154. port, err := extractPort(p.HTTPGet.Port, container)
  155. if err != nil {
  156. return probe.Unknown, "", err
  157. }
  158. path := p.HTTPGet.Path
  159. klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
  160. url := formatURL(scheme, host, port, path)
  161. headers := buildHeader(p.HTTPGet.HTTPHeaders)
  162. klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
  163. switch probeType {
  164. case liveness:
  165. return pb.livenessHTTP.Probe(url, headers, timeout)
  166. case startup:
  167. return pb.startupHTTP.Probe(url, headers, timeout)
  168. default:
  169. return pb.readinessHTTP.Probe(url, headers, timeout)
  170. }
  171. }
  172. if p.TCPSocket != nil {
  173. port, err := extractPort(p.TCPSocket.Port, container)
  174. if err != nil {
  175. return probe.Unknown, "", err
  176. }
  177. host := p.TCPSocket.Host
  178. if host == "" {
  179. host = status.PodIP
  180. }
  181. klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
  182. return pb.tcp.Probe(host, port, timeout)
  183. }
  184. klog.Warningf("Failed to find probe builder for container: %v", container)
  185. return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
  186. }
  187. func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
  188. port := -1
  189. var err error
  190. switch param.Type {
  191. case intstr.Int:
  192. port = param.IntValue()
  193. case intstr.String:
  194. if port, err = findPortByName(container, param.StrVal); err != nil {
  195. // Last ditch effort - maybe it was an int stored as string?
  196. if port, err = strconv.Atoi(param.StrVal); err != nil {
  197. return port, err
  198. }
  199. }
  200. default:
  201. return port, fmt.Errorf("intOrString had no kind: %+v", param)
  202. }
  203. if port > 0 && port < 65536 {
  204. return port, nil
  205. }
  206. return port, fmt.Errorf("invalid port number: %v", port)
  207. }
  208. // findPortByName is a helper function to look up a port in a container by name.
  209. func findPortByName(container v1.Container, portName string) (int, error) {
  210. for _, port := range container.Ports {
  211. if port.Name == portName {
  212. return int(port.ContainerPort), nil
  213. }
  214. }
  215. return 0, fmt.Errorf("port %s not found", portName)
  216. }
  217. // formatURL formats a URL from args. For testability.
  218. func formatURL(scheme string, host string, port int, path string) *url.URL {
  219. u, err := url.Parse(path)
  220. // Something is busted with the path, but it's too late to reject it. Pass it along as is.
  221. if err != nil {
  222. u = &url.URL{
  223. Path: path,
  224. }
  225. }
  226. u.Scheme = scheme
  227. u.Host = net.JoinHostPort(host, strconv.Itoa(port))
  228. return u
  229. }
  230. type execInContainer struct {
  231. // run executes a command in a container. Combined stdout and stderr output is always returned. An
  232. // error is returned if one occurred.
  233. run func() ([]byte, error)
  234. writer io.Writer
  235. }
  236. func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
  237. return &execInContainer{run: func() ([]byte, error) {
  238. return pb.runner.RunInContainer(containerID, cmd, timeout)
  239. }}
  240. }
  241. func (eic *execInContainer) Run() error {
  242. return nil
  243. }
  244. func (eic *execInContainer) CombinedOutput() ([]byte, error) {
  245. return eic.run()
  246. }
  247. func (eic *execInContainer) Output() ([]byte, error) {
  248. return nil, fmt.Errorf("unimplemented")
  249. }
  250. func (eic *execInContainer) SetDir(dir string) {
  251. //unimplemented
  252. }
  253. func (eic *execInContainer) SetStdin(in io.Reader) {
  254. //unimplemented
  255. }
  256. func (eic *execInContainer) SetStdout(out io.Writer) {
  257. eic.writer = out
  258. }
  259. func (eic *execInContainer) SetStderr(out io.Writer) {
  260. eic.writer = out
  261. }
  262. func (eic *execInContainer) SetEnv(env []string) {
  263. //unimplemented
  264. }
  265. func (eic *execInContainer) Stop() {
  266. //unimplemented
  267. }
  268. func (eic *execInContainer) Start() error {
  269. data, err := eic.run()
  270. if eic.writer != nil {
  271. eic.writer.Write(data)
  272. }
  273. return err
  274. }
  275. func (eic *execInContainer) Wait() error {
  276. return nil
  277. }
  278. func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
  279. return nil, fmt.Errorf("unimplemented")
  280. }
  281. func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
  282. return nil, fmt.Errorf("unimplemented")
  283. }