prober_manager_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "fmt"
  16. "strconv"
  17. "testing"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/klog"
  25. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  26. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  27. "k8s.io/kubernetes/pkg/probe"
  28. )
  29. func init() {
  30. runtime.ReallyCrash = true
  31. }
  32. var defaultProbe *v1.Probe = &v1.Probe{
  33. Handler: v1.Handler{
  34. Exec: &v1.ExecAction{},
  35. },
  36. TimeoutSeconds: 1,
  37. PeriodSeconds: 1,
  38. SuccessThreshold: 1,
  39. FailureThreshold: 3,
  40. }
  41. func TestAddRemovePods(t *testing.T) {
  42. noProbePod := v1.Pod{
  43. ObjectMeta: metav1.ObjectMeta{
  44. UID: "no_probe_pod",
  45. },
  46. Spec: v1.PodSpec{
  47. Containers: []v1.Container{{
  48. Name: "no_probe1",
  49. }, {
  50. Name: "no_probe2",
  51. }},
  52. },
  53. }
  54. probePod := v1.Pod{
  55. ObjectMeta: metav1.ObjectMeta{
  56. UID: "probe_pod",
  57. },
  58. Spec: v1.PodSpec{
  59. Containers: []v1.Container{{
  60. Name: "no_probe1",
  61. }, {
  62. Name: "readiness",
  63. ReadinessProbe: defaultProbe,
  64. }, {
  65. Name: "no_probe2",
  66. }, {
  67. Name: "liveness",
  68. LivenessProbe: defaultProbe,
  69. }},
  70. },
  71. }
  72. m := newTestManager()
  73. defer cleanup(t, m)
  74. if err := expectProbes(m, nil); err != nil {
  75. t.Error(err)
  76. }
  77. // Adding a pod with no probes should be a no-op.
  78. m.AddPod(&noProbePod)
  79. if err := expectProbes(m, nil); err != nil {
  80. t.Error(err)
  81. }
  82. // Adding a pod with probes.
  83. m.AddPod(&probePod)
  84. probePaths := []probeKey{
  85. {"probe_pod", "readiness", readiness},
  86. {"probe_pod", "liveness", liveness},
  87. }
  88. if err := expectProbes(m, probePaths); err != nil {
  89. t.Error(err)
  90. }
  91. // Removing un-probed pod.
  92. m.RemovePod(&noProbePod)
  93. if err := expectProbes(m, probePaths); err != nil {
  94. t.Error(err)
  95. }
  96. // Removing probed pod.
  97. m.RemovePod(&probePod)
  98. if err := waitForWorkerExit(m, probePaths); err != nil {
  99. t.Fatal(err)
  100. }
  101. if err := expectProbes(m, nil); err != nil {
  102. t.Error(err)
  103. }
  104. // Removing already removed pods should be a no-op.
  105. m.RemovePod(&probePod)
  106. if err := expectProbes(m, nil); err != nil {
  107. t.Error(err)
  108. }
  109. }
  110. func TestCleanupPods(t *testing.T) {
  111. m := newTestManager()
  112. defer cleanup(t, m)
  113. podToCleanup := v1.Pod{
  114. ObjectMeta: metav1.ObjectMeta{
  115. UID: "pod_cleanup",
  116. },
  117. Spec: v1.PodSpec{
  118. Containers: []v1.Container{{
  119. Name: "prober1",
  120. ReadinessProbe: defaultProbe,
  121. }, {
  122. Name: "prober2",
  123. LivenessProbe: defaultProbe,
  124. }},
  125. },
  126. }
  127. podToKeep := v1.Pod{
  128. ObjectMeta: metav1.ObjectMeta{
  129. UID: "pod_keep",
  130. },
  131. Spec: v1.PodSpec{
  132. Containers: []v1.Container{{
  133. Name: "prober1",
  134. ReadinessProbe: defaultProbe,
  135. }, {
  136. Name: "prober2",
  137. LivenessProbe: defaultProbe,
  138. }},
  139. },
  140. }
  141. m.AddPod(&podToCleanup)
  142. m.AddPod(&podToKeep)
  143. m.CleanupPods([]*v1.Pod{&podToKeep})
  144. removedProbes := []probeKey{
  145. {"pod_cleanup", "prober1", readiness},
  146. {"pod_cleanup", "prober2", liveness},
  147. }
  148. expectedProbes := []probeKey{
  149. {"pod_keep", "prober1", readiness},
  150. {"pod_keep", "prober2", liveness},
  151. }
  152. if err := waitForWorkerExit(m, removedProbes); err != nil {
  153. t.Fatal(err)
  154. }
  155. if err := expectProbes(m, expectedProbes); err != nil {
  156. t.Error(err)
  157. }
  158. }
  159. func TestCleanupRepeated(t *testing.T) {
  160. m := newTestManager()
  161. defer cleanup(t, m)
  162. podTemplate := v1.Pod{
  163. Spec: v1.PodSpec{
  164. Containers: []v1.Container{{
  165. Name: "prober1",
  166. ReadinessProbe: defaultProbe,
  167. LivenessProbe: defaultProbe,
  168. }},
  169. },
  170. }
  171. const numTestPods = 100
  172. for i := 0; i < numTestPods; i++ {
  173. pod := podTemplate
  174. pod.UID = types.UID(strconv.Itoa(i))
  175. m.AddPod(&pod)
  176. }
  177. for i := 0; i < 10; i++ {
  178. m.CleanupPods([]*v1.Pod{})
  179. }
  180. }
  181. func TestUpdatePodStatus(t *testing.T) {
  182. unprobed := v1.ContainerStatus{
  183. Name: "unprobed_container",
  184. ContainerID: "test://unprobed_container_id",
  185. State: v1.ContainerState{
  186. Running: &v1.ContainerStateRunning{},
  187. },
  188. }
  189. probedReady := v1.ContainerStatus{
  190. Name: "probed_container_ready",
  191. ContainerID: "test://probed_container_ready_id",
  192. State: v1.ContainerState{
  193. Running: &v1.ContainerStateRunning{},
  194. },
  195. }
  196. probedPending := v1.ContainerStatus{
  197. Name: "probed_container_pending",
  198. ContainerID: "test://probed_container_pending_id",
  199. State: v1.ContainerState{
  200. Running: &v1.ContainerStateRunning{},
  201. },
  202. }
  203. probedUnready := v1.ContainerStatus{
  204. Name: "probed_container_unready",
  205. ContainerID: "test://probed_container_unready_id",
  206. State: v1.ContainerState{
  207. Running: &v1.ContainerStateRunning{},
  208. },
  209. }
  210. terminated := v1.ContainerStatus{
  211. Name: "terminated_container",
  212. ContainerID: "test://terminated_container_id",
  213. State: v1.ContainerState{
  214. Terminated: &v1.ContainerStateTerminated{},
  215. },
  216. }
  217. podStatus := v1.PodStatus{
  218. Phase: v1.PodRunning,
  219. ContainerStatuses: []v1.ContainerStatus{
  220. unprobed, probedReady, probedPending, probedUnready, terminated,
  221. },
  222. }
  223. m := newTestManager()
  224. // no cleanup: using fake workers.
  225. // Setup probe "workers" and cached results.
  226. m.workers = map[probeKey]*worker{
  227. {testPodUID, unprobed.Name, liveness}: {},
  228. {testPodUID, probedReady.Name, readiness}: {},
  229. {testPodUID, probedPending.Name, readiness}: {},
  230. {testPodUID, probedUnready.Name, readiness}: {},
  231. {testPodUID, terminated.Name, readiness}: {},
  232. }
  233. m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &v1.Pod{})
  234. m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &v1.Pod{})
  235. m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &v1.Pod{})
  236. m.UpdatePodStatus(testPodUID, &podStatus)
  237. expectedReadiness := map[probeKey]bool{
  238. {testPodUID, unprobed.Name, readiness}: true,
  239. {testPodUID, probedReady.Name, readiness}: true,
  240. {testPodUID, probedPending.Name, readiness}: false,
  241. {testPodUID, probedUnready.Name, readiness}: false,
  242. {testPodUID, terminated.Name, readiness}: false,
  243. }
  244. for _, c := range podStatus.ContainerStatuses {
  245. expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
  246. if !ok {
  247. t.Fatalf("Missing expectation for test case: %v", c.Name)
  248. }
  249. if expected != c.Ready {
  250. t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
  251. c.Name, expected, c.Ready)
  252. }
  253. }
  254. }
  255. func TestUpdateReadiness(t *testing.T) {
  256. testPod := getTestPod()
  257. setTestProbe(testPod, readiness, v1.Probe{})
  258. m := newTestManager()
  259. defer cleanup(t, m)
  260. // Start syncing readiness without leaking goroutine.
  261. stopCh := make(chan struct{})
  262. go wait.Until(m.updateReadiness, 0, stopCh)
  263. defer func() {
  264. close(stopCh)
  265. // Send an update to exit updateReadiness()
  266. m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
  267. }()
  268. exec := syncExecProber{}
  269. exec.set(probe.Success, nil)
  270. m.prober.exec = &exec
  271. m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
  272. m.AddPod(testPod)
  273. probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
  274. if err := expectProbes(m, probePaths); err != nil {
  275. t.Error(err)
  276. }
  277. // Wait for ready status.
  278. if err := waitForReadyStatus(m, true); err != nil {
  279. t.Error(err)
  280. }
  281. // Prober fails.
  282. exec.set(probe.Failure, nil)
  283. // Wait for failed status.
  284. if err := waitForReadyStatus(m, false); err != nil {
  285. t.Error(err)
  286. }
  287. }
  288. func expectProbes(m *manager, expectedProbes []probeKey) error {
  289. m.workerLock.RLock()
  290. defer m.workerLock.RUnlock()
  291. var unexpected []probeKey
  292. missing := make([]probeKey, len(expectedProbes))
  293. copy(missing, expectedProbes)
  294. outer:
  295. for probePath := range m.workers {
  296. for i, expectedPath := range missing {
  297. if probePath == expectedPath {
  298. missing = append(missing[:i], missing[i+1:]...)
  299. continue outer
  300. }
  301. }
  302. unexpected = append(unexpected, probePath)
  303. }
  304. if len(missing) == 0 && len(unexpected) == 0 {
  305. return nil // Yay!
  306. }
  307. return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
  308. }
  309. const interval = 1 * time.Second
  310. // Wait for the given workers to exit & clean up.
  311. func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
  312. for _, w := range workerPaths {
  313. condition := func() (bool, error) {
  314. _, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
  315. return !exists, nil
  316. }
  317. if exited, _ := condition(); exited {
  318. continue // Already exited, no need to poll.
  319. }
  320. klog.Infof("Polling %v", w)
  321. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  322. return err
  323. }
  324. }
  325. return nil
  326. }
  327. // Wait for the given workers to exit & clean up.
  328. func waitForReadyStatus(m *manager, ready bool) error {
  329. condition := func() (bool, error) {
  330. status, ok := m.statusManager.GetPodStatus(testPodUID)
  331. if !ok {
  332. return false, fmt.Errorf("status not found: %q", testPodUID)
  333. }
  334. if len(status.ContainerStatuses) != 1 {
  335. return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
  336. }
  337. if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
  338. return false, fmt.Errorf("expected container %q, found %q",
  339. testContainerID, status.ContainerStatuses[0].ContainerID)
  340. }
  341. return status.ContainerStatuses[0].Ready == ready, nil
  342. }
  343. klog.Infof("Polling for ready state %v", ready)
  344. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  345. return err
  346. }
  347. return nil
  348. }
  349. // cleanup running probes to avoid leaking goroutines.
  350. func cleanup(t *testing.T, m *manager) {
  351. m.CleanupPods(nil)
  352. condition := func() (bool, error) {
  353. workerCount := m.workerCount()
  354. if workerCount > 0 {
  355. klog.Infof("Waiting for %d workers to exit...", workerCount)
  356. }
  357. return workerCount == 0, nil
  358. }
  359. if exited, _ := condition(); exited {
  360. return // Already exited, no need to poll.
  361. }
  362. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  363. t.Fatalf("Error during cleanup: %v", err)
  364. }
  365. }