prober.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "fmt"
  16. "io"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/util/intstr"
  25. "k8s.io/client-go/tools/record"
  26. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  27. "k8s.io/kubernetes/pkg/kubelet/events"
  28. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  29. "k8s.io/kubernetes/pkg/kubelet/util/format"
  30. "k8s.io/kubernetes/pkg/probe"
  31. execprobe "k8s.io/kubernetes/pkg/probe/exec"
  32. httprobe "k8s.io/kubernetes/pkg/probe/http"
  33. tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
  34. "k8s.io/utils/exec"
  35. "k8s.io/klog"
  36. )
  37. const maxProbeRetries = 3
  38. // Prober helps to check the liveness/readiness of a container.
  39. type prober struct {
  40. exec execprobe.Prober
  41. // probe types needs different httprobe instances so they don't
  42. // share a connection pool which can cause collsions to the
  43. // same host:port and transient failures. See #49740.
  44. readinessHttp httprobe.Prober
  45. livenessHttp httprobe.Prober
  46. tcp tcprobe.Prober
  47. runner kubecontainer.ContainerCommandRunner
  48. refManager *kubecontainer.RefManager
  49. recorder record.EventRecorder
  50. }
  51. // NewProber creates a Prober, it takes a command runner and
  52. // several container info managers.
  53. func newProber(
  54. runner kubecontainer.ContainerCommandRunner,
  55. refManager *kubecontainer.RefManager,
  56. recorder record.EventRecorder) *prober {
  57. const followNonLocalRedirects = false
  58. return &prober{
  59. exec: execprobe.New(),
  60. readinessHttp: httprobe.New(followNonLocalRedirects),
  61. livenessHttp: httprobe.New(followNonLocalRedirects),
  62. tcp: tcprobe.New(),
  63. runner: runner,
  64. refManager: refManager,
  65. recorder: recorder,
  66. }
  67. }
  68. // probe probes the container.
  69. func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
  70. var probeSpec *v1.Probe
  71. switch probeType {
  72. case readiness:
  73. probeSpec = container.ReadinessProbe
  74. case liveness:
  75. probeSpec = container.LivenessProbe
  76. default:
  77. return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
  78. }
  79. ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
  80. if probeSpec == nil {
  81. klog.Warningf("%s probe for %s is nil", probeType, ctrName)
  82. return results.Success, nil
  83. }
  84. result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
  85. if err != nil || (result != probe.Success && result != probe.Warning) {
  86. // Probe failed in one way or another.
  87. ref, hasRef := pb.refManager.GetRef(containerID)
  88. if !hasRef {
  89. klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
  90. }
  91. if err != nil {
  92. klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
  93. if hasRef {
  94. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
  95. }
  96. } else { // result != probe.Success
  97. klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
  98. if hasRef {
  99. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
  100. }
  101. }
  102. return results.Failure, err
  103. }
  104. if result == probe.Warning {
  105. if ref, hasRef := pb.refManager.GetRef(containerID); hasRef {
  106. pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
  107. }
  108. klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
  109. } else {
  110. klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
  111. }
  112. return results.Success, nil
  113. }
  114. // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
  115. // if it never succeeds.
  116. func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
  117. var err error
  118. var result probe.Result
  119. var output string
  120. for i := 0; i < retries; i++ {
  121. result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
  122. if err == nil {
  123. return result, output, nil
  124. }
  125. }
  126. return result, output, err
  127. }
  128. // buildHeaderMap takes a list of HTTPHeader <name, value> string
  129. // pairs and returns a populated string->[]string http.Header map.
  130. func buildHeader(headerList []v1.HTTPHeader) http.Header {
  131. headers := make(http.Header)
  132. for _, header := range headerList {
  133. headers[header.Name] = append(headers[header.Name], header.Value)
  134. }
  135. return headers
  136. }
  137. func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
  138. timeout := time.Duration(p.TimeoutSeconds) * time.Second
  139. if p.Exec != nil {
  140. klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
  141. command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
  142. return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
  143. }
  144. if p.HTTPGet != nil {
  145. scheme := strings.ToLower(string(p.HTTPGet.Scheme))
  146. host := p.HTTPGet.Host
  147. if host == "" {
  148. host = status.PodIP
  149. }
  150. port, err := extractPort(p.HTTPGet.Port, container)
  151. if err != nil {
  152. return probe.Unknown, "", err
  153. }
  154. path := p.HTTPGet.Path
  155. klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
  156. url := formatURL(scheme, host, port, path)
  157. headers := buildHeader(p.HTTPGet.HTTPHeaders)
  158. klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
  159. if probeType == liveness {
  160. return pb.livenessHttp.Probe(url, headers, timeout)
  161. } else { // readiness
  162. return pb.readinessHttp.Probe(url, headers, timeout)
  163. }
  164. }
  165. if p.TCPSocket != nil {
  166. port, err := extractPort(p.TCPSocket.Port, container)
  167. if err != nil {
  168. return probe.Unknown, "", err
  169. }
  170. host := p.TCPSocket.Host
  171. if host == "" {
  172. host = status.PodIP
  173. }
  174. klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
  175. return pb.tcp.Probe(host, port, timeout)
  176. }
  177. klog.Warningf("Failed to find probe builder for container: %v", container)
  178. return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
  179. }
  180. func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
  181. port := -1
  182. var err error
  183. switch param.Type {
  184. case intstr.Int:
  185. port = param.IntValue()
  186. case intstr.String:
  187. if port, err = findPortByName(container, param.StrVal); err != nil {
  188. // Last ditch effort - maybe it was an int stored as string?
  189. if port, err = strconv.Atoi(param.StrVal); err != nil {
  190. return port, err
  191. }
  192. }
  193. default:
  194. return port, fmt.Errorf("IntOrString had no kind: %+v", param)
  195. }
  196. if port > 0 && port < 65536 {
  197. return port, nil
  198. }
  199. return port, fmt.Errorf("invalid port number: %v", port)
  200. }
  201. // findPortByName is a helper function to look up a port in a container by name.
  202. func findPortByName(container v1.Container, portName string) (int, error) {
  203. for _, port := range container.Ports {
  204. if port.Name == portName {
  205. return int(port.ContainerPort), nil
  206. }
  207. }
  208. return 0, fmt.Errorf("port %s not found", portName)
  209. }
  210. // formatURL formats a URL from args. For testability.
  211. func formatURL(scheme string, host string, port int, path string) *url.URL {
  212. u, err := url.Parse(path)
  213. // Something is busted with the path, but it's too late to reject it. Pass it along as is.
  214. if err != nil {
  215. u = &url.URL{
  216. Path: path,
  217. }
  218. }
  219. u.Scheme = scheme
  220. u.Host = net.JoinHostPort(host, strconv.Itoa(port))
  221. return u
  222. }
  223. type execInContainer struct {
  224. // run executes a command in a container. Combined stdout and stderr output is always returned. An
  225. // error is returned if one occurred.
  226. run func() ([]byte, error)
  227. }
  228. func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
  229. return execInContainer{func() ([]byte, error) {
  230. return pb.runner.RunInContainer(containerID, cmd, timeout)
  231. }}
  232. }
  233. func (eic execInContainer) Run() error {
  234. return fmt.Errorf("unimplemented")
  235. }
  236. func (eic execInContainer) CombinedOutput() ([]byte, error) {
  237. return eic.run()
  238. }
  239. func (eic execInContainer) Output() ([]byte, error) {
  240. return nil, fmt.Errorf("unimplemented")
  241. }
  242. func (eic execInContainer) SetDir(dir string) {
  243. //unimplemented
  244. }
  245. func (eic execInContainer) SetStdin(in io.Reader) {
  246. //unimplemented
  247. }
  248. func (eic execInContainer) SetStdout(out io.Writer) {
  249. //unimplemented
  250. }
  251. func (eic execInContainer) SetStderr(out io.Writer) {
  252. //unimplemented
  253. }
  254. func (eic execInContainer) SetEnv(env []string) {
  255. //unimplemented
  256. }
  257. func (eic execInContainer) Stop() {
  258. //unimplemented
  259. }
  260. func (eic execInContainer) Start() error {
  261. return fmt.Errorf("unimplemented")
  262. }
  263. func (eic execInContainer) Wait() error {
  264. return fmt.Errorf("unimplemented")
  265. }
  266. func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) {
  267. return nil, fmt.Errorf("unimplemented")
  268. }
  269. func (eic execInContainer) StderrPipe() (io.ReadCloser, error) {
  270. return nil, fmt.Errorf("unimplemented")
  271. }