worker.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "math/rand"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. "k8s.io/apimachinery/pkg/util/runtime"
  19. "k8s.io/component-base/metrics"
  20. "k8s.io/klog"
  21. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  22. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  23. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  24. "k8s.io/kubernetes/pkg/kubelet/util/format"
  25. )
  26. // worker handles the periodic probing of its assigned container. Each worker has a go-routine
  27. // associated with it which runs the probe loop until the container permanently terminates, or the
  28. // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
  29. // container IDs.
  30. type worker struct {
  31. // Channel for stopping the probe.
  32. stopCh chan struct{}
  33. // The pod containing this probe (read-only)
  34. pod *v1.Pod
  35. // The container to probe (read-only)
  36. container v1.Container
  37. // Describes the probe configuration (read-only)
  38. spec *v1.Probe
  39. // The type of the worker.
  40. probeType probeType
  41. // The probe value during the initial delay.
  42. initialValue results.Result
  43. // Where to store this workers results.
  44. resultsManager results.Manager
  45. probeManager *manager
  46. // The last known container ID for this worker.
  47. containerID kubecontainer.ContainerID
  48. // The last probe result for this worker.
  49. lastResult results.Result
  50. // How many times in a row the probe has returned the same result.
  51. resultRun int
  52. // If set, skip probing.
  53. onHold bool
  54. // proberResultsMetricLabels holds the labels attached to this worker
  55. // for the ProberResults metric by result.
  56. proberResultsSuccessfulMetricLabels metrics.Labels
  57. proberResultsFailedMetricLabels metrics.Labels
  58. proberResultsUnknownMetricLabels metrics.Labels
  59. }
  60. // Creates and starts a new probe worker.
  61. func newWorker(
  62. m *manager,
  63. probeType probeType,
  64. pod *v1.Pod,
  65. container v1.Container) *worker {
  66. w := &worker{
  67. stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
  68. pod: pod,
  69. container: container,
  70. probeType: probeType,
  71. probeManager: m,
  72. }
  73. switch probeType {
  74. case readiness:
  75. w.spec = container.ReadinessProbe
  76. w.resultsManager = m.readinessManager
  77. w.initialValue = results.Failure
  78. case liveness:
  79. w.spec = container.LivenessProbe
  80. w.resultsManager = m.livenessManager
  81. w.initialValue = results.Success
  82. case startup:
  83. w.spec = container.StartupProbe
  84. w.resultsManager = m.startupManager
  85. w.initialValue = results.Unknown
  86. }
  87. basicMetricLabels := metrics.Labels{
  88. "probe_type": w.probeType.String(),
  89. "container": w.container.Name,
  90. "pod": w.pod.Name,
  91. "namespace": w.pod.Namespace,
  92. "pod_uid": string(w.pod.UID),
  93. }
  94. w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  95. w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
  96. w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  97. w.proberResultsFailedMetricLabels["result"] = probeResultFailed
  98. w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  99. w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
  100. return w
  101. }
  102. // run periodically probes the container.
  103. func (w *worker) run() {
  104. probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
  105. // If kubelet restarted the probes could be started in rapid succession.
  106. // Let the worker wait for a random portion of tickerPeriod before probing.
  107. time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
  108. probeTicker := time.NewTicker(probeTickerPeriod)
  109. defer func() {
  110. // Clean up.
  111. probeTicker.Stop()
  112. if !w.containerID.IsEmpty() {
  113. w.resultsManager.Remove(w.containerID)
  114. }
  115. w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
  116. ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
  117. ProberResults.Delete(w.proberResultsFailedMetricLabels)
  118. ProberResults.Delete(w.proberResultsUnknownMetricLabels)
  119. }()
  120. probeLoop:
  121. for w.doProbe() {
  122. // Wait for next probe tick.
  123. select {
  124. case <-w.stopCh:
  125. break probeLoop
  126. case <-probeTicker.C:
  127. // continue
  128. }
  129. }
  130. }
  131. // stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
  132. // It is safe to call stop multiple times.
  133. func (w *worker) stop() {
  134. select {
  135. case w.stopCh <- struct{}{}:
  136. default: // Non-blocking.
  137. }
  138. }
  139. // doProbe probes the container once and records the result.
  140. // Returns whether the worker should continue.
  141. func (w *worker) doProbe() (keepGoing bool) {
  142. defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
  143. defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
  144. status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
  145. if !ok {
  146. // Either the pod has not been created yet, or it was already deleted.
  147. klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
  148. return true
  149. }
  150. // Worker should terminate if pod is terminated.
  151. if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
  152. klog.V(3).Infof("Pod %v %v, exiting probe worker",
  153. format.Pod(w.pod), status.Phase)
  154. return false
  155. }
  156. c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
  157. if !ok || len(c.ContainerID) == 0 {
  158. // Either the container has not been created yet, or it was deleted.
  159. klog.V(3).Infof("Probe target container not found: %v - %v",
  160. format.Pod(w.pod), w.container.Name)
  161. return true // Wait for more information.
  162. }
  163. if w.containerID.String() != c.ContainerID {
  164. if !w.containerID.IsEmpty() {
  165. w.resultsManager.Remove(w.containerID)
  166. }
  167. w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
  168. w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
  169. // We've got a new container; resume probing.
  170. w.onHold = false
  171. }
  172. if w.onHold {
  173. // Worker is on hold until there is a new container.
  174. return true
  175. }
  176. if c.State.Running == nil {
  177. klog.V(3).Infof("Non-running container probed: %v - %v",
  178. format.Pod(w.pod), w.container.Name)
  179. if !w.containerID.IsEmpty() {
  180. w.resultsManager.Set(w.containerID, results.Failure, w.pod)
  181. }
  182. // Abort if the container will not be restarted.
  183. return c.State.Terminated == nil ||
  184. w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
  185. }
  186. // Probe disabled for InitialDelaySeconds.
  187. if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
  188. return true
  189. }
  190. if c.Started != nil && *c.Started {
  191. // Stop probing for startup once container has started.
  192. if w.probeType == startup {
  193. return true
  194. }
  195. } else {
  196. // Disable other probes until container has started.
  197. if w.probeType != startup {
  198. return true
  199. }
  200. }
  201. // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
  202. // the full container environment here, OR we must make a call to the CRI in order to get those environment
  203. // values from the running container.
  204. result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
  205. if err != nil {
  206. // Prober error, throw away the result.
  207. return true
  208. }
  209. switch result {
  210. case results.Success:
  211. ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
  212. case results.Failure:
  213. ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
  214. default:
  215. ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
  216. }
  217. if w.lastResult == result {
  218. w.resultRun++
  219. } else {
  220. w.lastResult = result
  221. w.resultRun = 1
  222. }
  223. if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
  224. (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
  225. // Success or failure is below threshold - leave the probe state unchanged.
  226. return true
  227. }
  228. w.resultsManager.Set(w.containerID, result, w.pod)
  229. if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
  230. // The container fails a liveness/startup check, it will need to be restarted.
  231. // Stop probing until we see a new container ID. This is to reduce the
  232. // chance of hitting #21751, where running `docker exec` when a
  233. // container is being stopped may lead to corrupted container state.
  234. w.onHold = true
  235. w.resultRun = 0
  236. }
  237. return true
  238. }
  239. func deepCopyPrometheusLabels(m metrics.Labels) metrics.Labels {
  240. ret := make(metrics.Labels, len(m))
  241. for k, v := range m {
  242. ret[k] = v
  243. }
  244. return ret
  245. }