prober_manager.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "sync"
  16. "github.com/prometheus/client_golang/prometheus"
  17. "k8s.io/api/core/v1"
  18. "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/apimachinery/pkg/util/sets"
  20. "k8s.io/apimachinery/pkg/util/wait"
  21. "k8s.io/client-go/tools/record"
  22. "k8s.io/klog"
  23. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  24. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  25. "k8s.io/kubernetes/pkg/kubelet/status"
  26. "k8s.io/kubernetes/pkg/kubelet/util/format"
  27. )
  28. // ProberResults stores the cumulative number of a probe by result as prometheus metrics.
  29. var ProberResults = prometheus.NewCounterVec(
  30. prometheus.CounterOpts{
  31. Subsystem: "prober",
  32. Name: "probe_total",
  33. Help: "Cumulative number of a liveness or readiness probe for a container by result.",
  34. },
  35. []string{"probe_type",
  36. "result",
  37. "container",
  38. "pod",
  39. "namespace",
  40. "pod_uid"},
  41. )
  42. // Manager manages pod probing. It creates a probe "worker" for every container that specifies a
  43. // probe (AddPod). The worker periodically probes its assigned container and caches the results. The
  44. // manager use the cached probe results to set the appropriate Ready state in the PodStatus when
  45. // requested (UpdatePodStatus). Updating probe parameters is not currently supported.
  46. // TODO: Move liveness probing out of the runtime, to here.
  47. type Manager interface {
  48. // AddPod creates new probe workers for every container probe. This should be called for every
  49. // pod created.
  50. AddPod(pod *v1.Pod)
  51. // RemovePod handles cleaning up the removed pod state, including terminating probe workers and
  52. // deleting cached results.
  53. RemovePod(pod *v1.Pod)
  54. // CleanupPods handles cleaning up pods which should no longer be running.
  55. // It takes a list of "active pods" which should not be cleaned up.
  56. CleanupPods(activePods []*v1.Pod)
  57. // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
  58. // container based on container running status, cached probe results and worker states.
  59. UpdatePodStatus(types.UID, *v1.PodStatus)
  60. // Start starts the Manager sync loops.
  61. Start()
  62. }
  63. type manager struct {
  64. // Map of active workers for probes
  65. workers map[probeKey]*worker
  66. // Lock for accessing & mutating workers
  67. workerLock sync.RWMutex
  68. // The statusManager cache provides pod IP and container IDs for probing.
  69. statusManager status.Manager
  70. // readinessManager manages the results of readiness probes
  71. readinessManager results.Manager
  72. // livenessManager manages the results of liveness probes
  73. livenessManager results.Manager
  74. // prober executes the probe actions.
  75. prober *prober
  76. }
  77. func NewManager(
  78. statusManager status.Manager,
  79. livenessManager results.Manager,
  80. runner kubecontainer.ContainerCommandRunner,
  81. refManager *kubecontainer.RefManager,
  82. recorder record.EventRecorder) Manager {
  83. prober := newProber(runner, refManager, recorder)
  84. readinessManager := results.NewManager()
  85. return &manager{
  86. statusManager: statusManager,
  87. prober: prober,
  88. readinessManager: readinessManager,
  89. livenessManager: livenessManager,
  90. workers: make(map[probeKey]*worker),
  91. }
  92. }
  93. // Start syncing probe status. This should only be called once.
  94. func (m *manager) Start() {
  95. // Start syncing readiness.
  96. go wait.Forever(m.updateReadiness, 0)
  97. }
  98. // Key uniquely identifying container probes
  99. type probeKey struct {
  100. podUID types.UID
  101. containerName string
  102. probeType probeType
  103. }
  104. // Type of probe (readiness or liveness)
  105. type probeType int
  106. const (
  107. liveness probeType = iota
  108. readiness
  109. probeResultSuccessful string = "successful"
  110. probeResultFailed string = "failed"
  111. probeResultUnknown string = "unknown"
  112. )
  113. // For debugging.
  114. func (t probeType) String() string {
  115. switch t {
  116. case readiness:
  117. return "Readiness"
  118. case liveness:
  119. return "Liveness"
  120. default:
  121. return "UNKNOWN"
  122. }
  123. }
  124. func (m *manager) AddPod(pod *v1.Pod) {
  125. m.workerLock.Lock()
  126. defer m.workerLock.Unlock()
  127. key := probeKey{podUID: pod.UID}
  128. for _, c := range pod.Spec.Containers {
  129. key.containerName = c.Name
  130. if c.ReadinessProbe != nil {
  131. key.probeType = readiness
  132. if _, ok := m.workers[key]; ok {
  133. klog.Errorf("Readiness probe already exists! %v - %v",
  134. format.Pod(pod), c.Name)
  135. return
  136. }
  137. w := newWorker(m, readiness, pod, c)
  138. m.workers[key] = w
  139. go w.run()
  140. }
  141. if c.LivenessProbe != nil {
  142. key.probeType = liveness
  143. if _, ok := m.workers[key]; ok {
  144. klog.Errorf("Liveness probe already exists! %v - %v",
  145. format.Pod(pod), c.Name)
  146. return
  147. }
  148. w := newWorker(m, liveness, pod, c)
  149. m.workers[key] = w
  150. go w.run()
  151. }
  152. }
  153. }
  154. func (m *manager) RemovePod(pod *v1.Pod) {
  155. m.workerLock.RLock()
  156. defer m.workerLock.RUnlock()
  157. key := probeKey{podUID: pod.UID}
  158. for _, c := range pod.Spec.Containers {
  159. key.containerName = c.Name
  160. for _, probeType := range [...]probeType{readiness, liveness} {
  161. key.probeType = probeType
  162. if worker, ok := m.workers[key]; ok {
  163. worker.stop()
  164. }
  165. }
  166. }
  167. }
  168. func (m *manager) CleanupPods(activePods []*v1.Pod) {
  169. desiredPods := make(map[types.UID]sets.Empty)
  170. for _, pod := range activePods {
  171. desiredPods[pod.UID] = sets.Empty{}
  172. }
  173. m.workerLock.RLock()
  174. defer m.workerLock.RUnlock()
  175. for key, worker := range m.workers {
  176. if _, ok := desiredPods[key.podUID]; !ok {
  177. worker.stop()
  178. }
  179. }
  180. }
  181. func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
  182. for i, c := range podStatus.ContainerStatuses {
  183. var ready bool
  184. if c.State.Running == nil {
  185. ready = false
  186. } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
  187. ready = result == results.Success
  188. } else {
  189. // The check whether there is a probe which hasn't run yet.
  190. _, exists := m.getWorker(podUID, c.Name, readiness)
  191. ready = !exists
  192. }
  193. podStatus.ContainerStatuses[i].Ready = ready
  194. }
  195. // init containers are ready if they have exited with success or if a readiness probe has
  196. // succeeded.
  197. for i, c := range podStatus.InitContainerStatuses {
  198. var ready bool
  199. if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
  200. ready = true
  201. }
  202. podStatus.InitContainerStatuses[i].Ready = ready
  203. }
  204. }
  205. func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
  206. m.workerLock.RLock()
  207. defer m.workerLock.RUnlock()
  208. worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
  209. return worker, ok
  210. }
  211. // Called by the worker after exiting.
  212. func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
  213. m.workerLock.Lock()
  214. defer m.workerLock.Unlock()
  215. delete(m.workers, probeKey{podUID, containerName, probeType})
  216. }
  217. // workerCount returns the total number of probe workers. For testing.
  218. func (m *manager) workerCount() int {
  219. m.workerLock.RLock()
  220. defer m.workerLock.RUnlock()
  221. return len(m.workers)
  222. }
  223. func (m *manager) updateReadiness() {
  224. update := <-m.readinessManager.Updates()
  225. ready := update.Result == results.Success
  226. m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
  227. }