prober_manager_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package prober
  14. import (
  15. "fmt"
  16. "strconv"
  17. "testing"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. featuregatetesting "k8s.io/component-base/featuregate/testing"
  27. "k8s.io/klog"
  28. "k8s.io/kubernetes/pkg/features"
  29. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  30. "k8s.io/kubernetes/pkg/kubelet/prober/results"
  31. "k8s.io/kubernetes/pkg/probe"
  32. )
  33. func init() {
  34. runtime.ReallyCrash = true
  35. }
  36. var defaultProbe = &v1.Probe{
  37. Handler: v1.Handler{
  38. Exec: &v1.ExecAction{},
  39. },
  40. TimeoutSeconds: 1,
  41. PeriodSeconds: 1,
  42. SuccessThreshold: 1,
  43. FailureThreshold: 3,
  44. }
  45. func TestAddRemovePods(t *testing.T) {
  46. noProbePod := v1.Pod{
  47. ObjectMeta: metav1.ObjectMeta{
  48. UID: "no_probe_pod",
  49. },
  50. Spec: v1.PodSpec{
  51. Containers: []v1.Container{{
  52. Name: "no_probe1",
  53. }, {
  54. Name: "no_probe2",
  55. }, {
  56. Name: "no_probe3",
  57. }},
  58. },
  59. }
  60. probePod := v1.Pod{
  61. ObjectMeta: metav1.ObjectMeta{
  62. UID: "probe_pod",
  63. },
  64. Spec: v1.PodSpec{
  65. Containers: []v1.Container{{
  66. Name: "probe1",
  67. }, {
  68. Name: "readiness",
  69. ReadinessProbe: defaultProbe,
  70. }, {
  71. Name: "probe2",
  72. }, {
  73. Name: "liveness",
  74. LivenessProbe: defaultProbe,
  75. }, {
  76. Name: "probe3",
  77. }, {
  78. Name: "startup",
  79. StartupProbe: defaultProbe,
  80. }},
  81. },
  82. }
  83. m := newTestManager()
  84. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StartupProbe, true)()
  85. defer cleanup(t, m)
  86. if err := expectProbes(m, nil); err != nil {
  87. t.Error(err)
  88. }
  89. // Adding a pod with no probes should be a no-op.
  90. m.AddPod(&noProbePod)
  91. if err := expectProbes(m, nil); err != nil {
  92. t.Error(err)
  93. }
  94. // Adding a pod with probes.
  95. m.AddPod(&probePod)
  96. probePaths := []probeKey{
  97. {"probe_pod", "readiness", readiness},
  98. {"probe_pod", "liveness", liveness},
  99. {"probe_pod", "startup", startup},
  100. }
  101. if err := expectProbes(m, probePaths); err != nil {
  102. t.Error(err)
  103. }
  104. // Removing un-probed pod.
  105. m.RemovePod(&noProbePod)
  106. if err := expectProbes(m, probePaths); err != nil {
  107. t.Error(err)
  108. }
  109. // Removing probed pod.
  110. m.RemovePod(&probePod)
  111. if err := waitForWorkerExit(m, probePaths); err != nil {
  112. t.Fatal(err)
  113. }
  114. if err := expectProbes(m, nil); err != nil {
  115. t.Error(err)
  116. }
  117. // Removing already removed pods should be a no-op.
  118. m.RemovePod(&probePod)
  119. if err := expectProbes(m, nil); err != nil {
  120. t.Error(err)
  121. }
  122. }
  123. func TestCleanupPods(t *testing.T) {
  124. m := newTestManager()
  125. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StartupProbe, true)()
  126. defer cleanup(t, m)
  127. podToCleanup := v1.Pod{
  128. ObjectMeta: metav1.ObjectMeta{
  129. UID: "pod_cleanup",
  130. },
  131. Spec: v1.PodSpec{
  132. Containers: []v1.Container{{
  133. Name: "prober1",
  134. ReadinessProbe: defaultProbe,
  135. }, {
  136. Name: "prober2",
  137. LivenessProbe: defaultProbe,
  138. }, {
  139. Name: "prober3",
  140. StartupProbe: defaultProbe,
  141. }},
  142. },
  143. }
  144. podToKeep := v1.Pod{
  145. ObjectMeta: metav1.ObjectMeta{
  146. UID: "pod_keep",
  147. },
  148. Spec: v1.PodSpec{
  149. Containers: []v1.Container{{
  150. Name: "prober1",
  151. ReadinessProbe: defaultProbe,
  152. }, {
  153. Name: "prober2",
  154. LivenessProbe: defaultProbe,
  155. }, {
  156. Name: "prober3",
  157. StartupProbe: defaultProbe,
  158. }},
  159. },
  160. }
  161. m.AddPod(&podToCleanup)
  162. m.AddPod(&podToKeep)
  163. desiredPods := map[types.UID]sets.Empty{}
  164. desiredPods[podToKeep.UID] = sets.Empty{}
  165. m.CleanupPods(desiredPods)
  166. removedProbes := []probeKey{
  167. {"pod_cleanup", "prober1", readiness},
  168. {"pod_cleanup", "prober2", liveness},
  169. {"pod_cleanup", "prober3", startup},
  170. }
  171. expectedProbes := []probeKey{
  172. {"pod_keep", "prober1", readiness},
  173. {"pod_keep", "prober2", liveness},
  174. {"pod_keep", "prober3", startup},
  175. }
  176. if err := waitForWorkerExit(m, removedProbes); err != nil {
  177. t.Fatal(err)
  178. }
  179. if err := expectProbes(m, expectedProbes); err != nil {
  180. t.Error(err)
  181. }
  182. }
  183. func TestCleanupRepeated(t *testing.T) {
  184. m := newTestManager()
  185. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StartupProbe, true)()
  186. defer cleanup(t, m)
  187. podTemplate := v1.Pod{
  188. Spec: v1.PodSpec{
  189. Containers: []v1.Container{{
  190. Name: "prober1",
  191. ReadinessProbe: defaultProbe,
  192. LivenessProbe: defaultProbe,
  193. StartupProbe: defaultProbe,
  194. }},
  195. },
  196. }
  197. const numTestPods = 100
  198. for i := 0; i < numTestPods; i++ {
  199. pod := podTemplate
  200. pod.UID = types.UID(strconv.Itoa(i))
  201. m.AddPod(&pod)
  202. }
  203. for i := 0; i < 10; i++ {
  204. m.CleanupPods(map[types.UID]sets.Empty{})
  205. }
  206. }
  207. func TestUpdatePodStatus(t *testing.T) {
  208. unprobed := v1.ContainerStatus{
  209. Name: "unprobed_container",
  210. ContainerID: "test://unprobed_container_id",
  211. State: v1.ContainerState{
  212. Running: &v1.ContainerStateRunning{},
  213. },
  214. }
  215. probedReady := v1.ContainerStatus{
  216. Name: "probed_container_ready",
  217. ContainerID: "test://probed_container_ready_id",
  218. State: v1.ContainerState{
  219. Running: &v1.ContainerStateRunning{},
  220. },
  221. }
  222. probedPending := v1.ContainerStatus{
  223. Name: "probed_container_pending",
  224. ContainerID: "test://probed_container_pending_id",
  225. State: v1.ContainerState{
  226. Running: &v1.ContainerStateRunning{},
  227. },
  228. }
  229. probedUnready := v1.ContainerStatus{
  230. Name: "probed_container_unready",
  231. ContainerID: "test://probed_container_unready_id",
  232. State: v1.ContainerState{
  233. Running: &v1.ContainerStateRunning{},
  234. },
  235. }
  236. terminated := v1.ContainerStatus{
  237. Name: "terminated_container",
  238. ContainerID: "test://terminated_container_id",
  239. State: v1.ContainerState{
  240. Terminated: &v1.ContainerStateTerminated{},
  241. },
  242. }
  243. podStatus := v1.PodStatus{
  244. Phase: v1.PodRunning,
  245. ContainerStatuses: []v1.ContainerStatus{
  246. unprobed, probedReady, probedPending, probedUnready, terminated,
  247. },
  248. }
  249. m := newTestManager()
  250. // no cleanup: using fake workers.
  251. // Setup probe "workers" and cached results.
  252. m.workers = map[probeKey]*worker{
  253. {testPodUID, unprobed.Name, liveness}: {},
  254. {testPodUID, probedReady.Name, readiness}: {},
  255. {testPodUID, probedPending.Name, readiness}: {},
  256. {testPodUID, probedUnready.Name, readiness}: {},
  257. {testPodUID, terminated.Name, readiness}: {},
  258. }
  259. m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &v1.Pod{})
  260. m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &v1.Pod{})
  261. m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &v1.Pod{})
  262. m.UpdatePodStatus(testPodUID, &podStatus)
  263. expectedReadiness := map[probeKey]bool{
  264. {testPodUID, unprobed.Name, readiness}: true,
  265. {testPodUID, probedReady.Name, readiness}: true,
  266. {testPodUID, probedPending.Name, readiness}: false,
  267. {testPodUID, probedUnready.Name, readiness}: false,
  268. {testPodUID, terminated.Name, readiness}: false,
  269. }
  270. for _, c := range podStatus.ContainerStatuses {
  271. expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
  272. if !ok {
  273. t.Fatalf("Missing expectation for test case: %v", c.Name)
  274. }
  275. if expected != c.Ready {
  276. t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
  277. c.Name, expected, c.Ready)
  278. }
  279. }
  280. }
  281. func TestUpdateReadiness(t *testing.T) {
  282. testPod := getTestPod()
  283. setTestProbe(testPod, readiness, v1.Probe{})
  284. m := newTestManager()
  285. defer cleanup(t, m)
  286. // Start syncing readiness without leaking goroutine.
  287. stopCh := make(chan struct{})
  288. go wait.Until(m.updateReadiness, 0, stopCh)
  289. defer func() {
  290. close(stopCh)
  291. // Send an update to exit updateReadiness()
  292. m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
  293. }()
  294. exec := syncExecProber{}
  295. exec.set(probe.Success, nil)
  296. m.prober.exec = &exec
  297. m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
  298. m.AddPod(testPod)
  299. probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
  300. if err := expectProbes(m, probePaths); err != nil {
  301. t.Error(err)
  302. }
  303. // Wait for ready status.
  304. if err := waitForReadyStatus(m, true); err != nil {
  305. t.Error(err)
  306. }
  307. // Prober fails.
  308. exec.set(probe.Failure, nil)
  309. // Wait for failed status.
  310. if err := waitForReadyStatus(m, false); err != nil {
  311. t.Error(err)
  312. }
  313. }
  314. func expectProbes(m *manager, expectedProbes []probeKey) error {
  315. m.workerLock.RLock()
  316. defer m.workerLock.RUnlock()
  317. var unexpected []probeKey
  318. missing := make([]probeKey, len(expectedProbes))
  319. copy(missing, expectedProbes)
  320. outer:
  321. for probePath := range m.workers {
  322. for i, expectedPath := range missing {
  323. if probePath == expectedPath {
  324. missing = append(missing[:i], missing[i+1:]...)
  325. continue outer
  326. }
  327. }
  328. unexpected = append(unexpected, probePath)
  329. }
  330. if len(missing) == 0 && len(unexpected) == 0 {
  331. return nil // Yay!
  332. }
  333. return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
  334. }
  335. const interval = 1 * time.Second
  336. // Wait for the given workers to exit & clean up.
  337. func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
  338. for _, w := range workerPaths {
  339. condition := func() (bool, error) {
  340. _, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
  341. return !exists, nil
  342. }
  343. if exited, _ := condition(); exited {
  344. continue // Already exited, no need to poll.
  345. }
  346. klog.Infof("Polling %v", w)
  347. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  348. return err
  349. }
  350. }
  351. return nil
  352. }
  353. // Wait for the given workers to exit & clean up.
  354. func waitForReadyStatus(m *manager, ready bool) error {
  355. condition := func() (bool, error) {
  356. status, ok := m.statusManager.GetPodStatus(testPodUID)
  357. if !ok {
  358. return false, fmt.Errorf("status not found: %q", testPodUID)
  359. }
  360. if len(status.ContainerStatuses) != 1 {
  361. return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
  362. }
  363. if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
  364. return false, fmt.Errorf("expected container %q, found %q",
  365. testContainerID, status.ContainerStatuses[0].ContainerID)
  366. }
  367. return status.ContainerStatuses[0].Ready == ready, nil
  368. }
  369. klog.Infof("Polling for ready state %v", ready)
  370. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  371. return err
  372. }
  373. return nil
  374. }
  375. // cleanup running probes to avoid leaking goroutines.
  376. func cleanup(t *testing.T, m *manager) {
  377. m.CleanupPods(nil)
  378. condition := func() (bool, error) {
  379. workerCount := m.workerCount()
  380. if workerCount > 0 {
  381. klog.Infof("Waiting for %d workers to exit...", workerCount)
  382. }
  383. return workerCount == 0, nil
  384. }
  385. if exited, _ := condition(); exited {
  386. return // Already exited, no need to poll.
  387. }
  388. if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
  389. t.Fatalf("Error during cleanup: %v", err)
  390. }
  391. }