prober_manager.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "sync"
  16. v1 "k8s.io/api/core/v1"
  17. "k8s.io/apimachinery/pkg/types"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. utilfeature "k8s.io/apiserver/pkg/util/feature"
  21. "k8s.io/client-go/tools/record"
  22. "k8s.io/component-base/metrics"
  23. "k8s.io/klog"
  24. "k8s.io/kubernetes/pkg/features"
  25. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  26. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  27. "k8s.io/kubernetes/pkg/kubelet/status"
  28. "k8s.io/kubernetes/pkg/kubelet/util/format"
  29. )
  30. // ProberResults stores the cumulative number of a probe by result as prometheus metrics.
  31. var ProberResults = metrics.NewCounterVec(
  32. &metrics.CounterOpts{
  33. Subsystem: "prober",
  34. Name: "probe_total",
  35. Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.",
  36. StabilityLevel: metrics.ALPHA,
  37. },
  38. []string{"probe_type",
  39. "result",
  40. "container",
  41. "pod",
  42. "namespace",
  43. "pod_uid"},
  44. )
  45. // Manager manages pod probing. It creates a probe "worker" for every container that specifies a
  46. // probe (AddPod). The worker periodically probes its assigned container and caches the results. The
  47. // manager use the cached probe results to set the appropriate Ready state in the PodStatus when
  48. // requested (UpdatePodStatus). Updating probe parameters is not currently supported.
  49. // TODO: Move liveness probing out of the runtime, to here.
  50. type Manager interface {
  51. // AddPod creates new probe workers for every container probe. This should be called for every
  52. // pod created.
  53. AddPod(pod *v1.Pod)
  54. // RemovePod handles cleaning up the removed pod state, including terminating probe workers and
  55. // deleting cached results.
  56. RemovePod(pod *v1.Pod)
  57. // CleanupPods handles cleaning up pods which should no longer be running.
  58. // It takes a map of "desired pods" which should not be cleaned up.
  59. CleanupPods(desiredPods map[types.UID]sets.Empty)
  60. // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
  61. // container based on container running status, cached probe results and worker states.
  62. UpdatePodStatus(types.UID, *v1.PodStatus)
  63. // Start starts the Manager sync loops.
  64. Start()
  65. }
  66. type manager struct {
  67. // Map of active workers for probes
  68. workers map[probeKey]*worker
  69. // Lock for accessing & mutating workers
  70. workerLock sync.RWMutex
  71. // The statusManager cache provides pod IP and container IDs for probing.
  72. statusManager status.Manager
  73. // readinessManager manages the results of readiness probes
  74. readinessManager results.Manager
  75. // livenessManager manages the results of liveness probes
  76. livenessManager results.Manager
  77. // startupManager manages the results of startup probes
  78. startupManager results.Manager
  79. // prober executes the probe actions.
  80. prober *prober
  81. }
  82. // NewManager creates a Manager for pod probing.
  83. func NewManager(
  84. statusManager status.Manager,
  85. livenessManager results.Manager,
  86. startupManager results.Manager,
  87. runner kubecontainer.ContainerCommandRunner,
  88. refManager *kubecontainer.RefManager,
  89. recorder record.EventRecorder) Manager {
  90. prober := newProber(runner, refManager, recorder)
  91. readinessManager := results.NewManager()
  92. return &manager{
  93. statusManager: statusManager,
  94. prober: prober,
  95. readinessManager: readinessManager,
  96. livenessManager: livenessManager,
  97. startupManager: startupManager,
  98. workers: make(map[probeKey]*worker),
  99. }
  100. }
  101. // Start syncing probe status. This should only be called once.
  102. func (m *manager) Start() {
  103. // Start syncing readiness.
  104. go wait.Forever(m.updateReadiness, 0)
  105. // Start syncing startup.
  106. go wait.Forever(m.updateStartup, 0)
  107. }
  108. // Key uniquely identifying container probes
  109. type probeKey struct {
  110. podUID types.UID
  111. containerName string
  112. probeType probeType
  113. }
  114. // Type of probe (liveness, readiness or startup)
  115. type probeType int
  116. const (
  117. liveness probeType = iota
  118. readiness
  119. startup
  120. probeResultSuccessful string = "successful"
  121. probeResultFailed string = "failed"
  122. probeResultUnknown string = "unknown"
  123. )
  124. // For debugging.
  125. func (t probeType) String() string {
  126. switch t {
  127. case readiness:
  128. return "Readiness"
  129. case liveness:
  130. return "Liveness"
  131. case startup:
  132. return "Startup"
  133. default:
  134. return "UNKNOWN"
  135. }
  136. }
  137. func (m *manager) AddPod(pod *v1.Pod) {
  138. m.workerLock.Lock()
  139. defer m.workerLock.Unlock()
  140. key := probeKey{podUID: pod.UID}
  141. for _, c := range pod.Spec.Containers {
  142. key.containerName = c.Name
  143. if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
  144. key.probeType = startup
  145. if _, ok := m.workers[key]; ok {
  146. klog.Errorf("Startup probe already exists! %v - %v",
  147. format.Pod(pod), c.Name)
  148. return
  149. }
  150. w := newWorker(m, startup, pod, c)
  151. m.workers[key] = w
  152. go w.run()
  153. }
  154. if c.ReadinessProbe != nil {
  155. key.probeType = readiness
  156. if _, ok := m.workers[key]; ok {
  157. klog.Errorf("Readiness probe already exists! %v - %v",
  158. format.Pod(pod), c.Name)
  159. return
  160. }
  161. w := newWorker(m, readiness, pod, c)
  162. m.workers[key] = w
  163. go w.run()
  164. }
  165. if c.LivenessProbe != nil {
  166. key.probeType = liveness
  167. if _, ok := m.workers[key]; ok {
  168. klog.Errorf("Liveness probe already exists! %v - %v",
  169. format.Pod(pod), c.Name)
  170. return
  171. }
  172. w := newWorker(m, liveness, pod, c)
  173. m.workers[key] = w
  174. go w.run()
  175. }
  176. }
  177. }
  178. func (m *manager) RemovePod(pod *v1.Pod) {
  179. m.workerLock.RLock()
  180. defer m.workerLock.RUnlock()
  181. key := probeKey{podUID: pod.UID}
  182. for _, c := range pod.Spec.Containers {
  183. key.containerName = c.Name
  184. for _, probeType := range [...]probeType{readiness, liveness, startup} {
  185. key.probeType = probeType
  186. if worker, ok := m.workers[key]; ok {
  187. worker.stop()
  188. }
  189. }
  190. }
  191. }
  192. func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
  193. m.workerLock.RLock()
  194. defer m.workerLock.RUnlock()
  195. for key, worker := range m.workers {
  196. if _, ok := desiredPods[key.podUID]; !ok {
  197. worker.stop()
  198. }
  199. }
  200. }
  201. func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
  202. for i, c := range podStatus.ContainerStatuses {
  203. var ready bool
  204. if c.State.Running == nil {
  205. ready = false
  206. } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
  207. ready = result == results.Success
  208. } else {
  209. // The check whether there is a probe which hasn't run yet.
  210. _, exists := m.getWorker(podUID, c.Name, readiness)
  211. ready = !exists
  212. }
  213. podStatus.ContainerStatuses[i].Ready = ready
  214. var started bool
  215. if c.State.Running == nil {
  216. started = false
  217. } else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
  218. // the container is running, assume it is started if the StartupProbe feature is disabled
  219. started = true
  220. } else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
  221. started = result == results.Success
  222. } else {
  223. // The check whether there is a probe which hasn't run yet.
  224. _, exists := m.getWorker(podUID, c.Name, startup)
  225. started = !exists
  226. }
  227. podStatus.ContainerStatuses[i].Started = &started
  228. }
  229. // init containers are ready if they have exited with success or if a readiness probe has
  230. // succeeded.
  231. for i, c := range podStatus.InitContainerStatuses {
  232. var ready bool
  233. if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
  234. ready = true
  235. }
  236. podStatus.InitContainerStatuses[i].Ready = ready
  237. }
  238. }
  239. func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
  240. m.workerLock.RLock()
  241. defer m.workerLock.RUnlock()
  242. worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
  243. return worker, ok
  244. }
  245. // Called by the worker after exiting.
  246. func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
  247. m.workerLock.Lock()
  248. defer m.workerLock.Unlock()
  249. delete(m.workers, probeKey{podUID, containerName, probeType})
  250. }
  251. // workerCount returns the total number of probe workers. For testing.
  252. func (m *manager) workerCount() int {
  253. m.workerLock.RLock()
  254. defer m.workerLock.RUnlock()
  255. return len(m.workers)
  256. }
  257. func (m *manager) updateReadiness() {
  258. update := <-m.readinessManager.Updates()
  259. ready := update.Result == results.Success
  260. m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
  261. }
  262. func (m *manager) updateStartup() {
  263. update := <-m.startupManager.Updates()
  264. started := update.Result == results.Success
  265. m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
  266. }