wait.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "context"
  16. "fmt"
  17. appsv1 "k8s.io/api/apps/v1"
  18. v1 "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/util/wait"
  21. clientset "k8s.io/client-go/kubernetes"
  22. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. )
  25. // WaitForRunning waits for numPodsRunning in ss to be Running and for the first
  26. // numPodsReady ordinals to be Ready.
  27. func WaitForRunning(c clientset.Interface, numPodsRunning, numPodsReady int32, ss *appsv1.StatefulSet) {
  28. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  29. func() (bool, error) {
  30. podList := GetPodList(c, ss)
  31. SortStatefulPods(podList)
  32. if int32(len(podList.Items)) < numPodsRunning {
  33. framework.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numPodsRunning)
  34. return false, nil
  35. }
  36. if int32(len(podList.Items)) > numPodsRunning {
  37. return false, fmt.Errorf("too many pods scheduled, expected %d got %d", numPodsRunning, len(podList.Items))
  38. }
  39. for _, p := range podList.Items {
  40. shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
  41. isReady := podutil.IsPodReady(&p)
  42. desiredReadiness := shouldBeReady == isReady
  43. framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
  44. if p.Status.Phase != v1.PodRunning || !desiredReadiness {
  45. return false, nil
  46. }
  47. }
  48. return true, nil
  49. })
  50. if pollErr != nil {
  51. framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
  52. }
  53. }
  54. // WaitForState periodically polls for the ss and its pods until the until function returns either true or an error
  55. func WaitForState(c clientset.Interface, ss *appsv1.StatefulSet, until func(*appsv1.StatefulSet, *v1.PodList) (bool, error)) {
  56. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  57. func() (bool, error) {
  58. ssGet, err := c.AppsV1().StatefulSets(ss.Namespace).Get(context.TODO(), ss.Name, metav1.GetOptions{})
  59. if err != nil {
  60. return false, err
  61. }
  62. podList := GetPodList(c, ssGet)
  63. return until(ssGet, podList)
  64. })
  65. if pollErr != nil {
  66. framework.Failf("Failed waiting for state update: %v", pollErr)
  67. }
  68. }
  69. // WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
  70. func WaitForRunningAndReady(c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
  71. WaitForRunning(c, numStatefulPods, numStatefulPods, ss)
  72. }
  73. // WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition.
  74. func WaitForPodReady(c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
  75. var pods *v1.PodList
  76. WaitForState(c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
  77. set = set2
  78. pods = pods2
  79. for i := range pods.Items {
  80. if pods.Items[i].Name == podName {
  81. return podutil.IsPodReady(&pods.Items[i]), nil
  82. }
  83. }
  84. return false, nil
  85. })
  86. return set, pods
  87. }
  88. // WaitForStatusReadyReplicas waits for the ss.Status.ReadyReplicas to be equal to expectedReplicas
  89. func WaitForStatusReadyReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
  90. framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
  91. ns, name := ss.Namespace, ss.Name
  92. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  93. func() (bool, error) {
  94. ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{})
  95. if err != nil {
  96. return false, err
  97. }
  98. if ssGet.Status.ObservedGeneration < ss.Generation {
  99. return false, nil
  100. }
  101. if ssGet.Status.ReadyReplicas != expectedReplicas {
  102. framework.Logf("Waiting for stateful set status.readyReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.ReadyReplicas)
  103. return false, nil
  104. }
  105. return true, nil
  106. })
  107. if pollErr != nil {
  108. framework.Failf("Failed waiting for stateful set status.readyReplicas updated to %d: %v", expectedReplicas, pollErr)
  109. }
  110. }
  111. // WaitForStatusReplicas waits for the ss.Status.Replicas to be equal to expectedReplicas
  112. func WaitForStatusReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
  113. framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
  114. ns, name := ss.Namespace, ss.Name
  115. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  116. func() (bool, error) {
  117. ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{})
  118. if err != nil {
  119. return false, err
  120. }
  121. if ssGet.Status.ObservedGeneration < ss.Generation {
  122. return false, nil
  123. }
  124. if ssGet.Status.Replicas != expectedReplicas {
  125. framework.Logf("Waiting for stateful set status.replicas to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
  126. return false, nil
  127. }
  128. return true, nil
  129. })
  130. if pollErr != nil {
  131. framework.Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
  132. }
  133. }
  134. // Saturate waits for all Pods in ss to become Running and Ready.
  135. func Saturate(c clientset.Interface, ss *appsv1.StatefulSet) {
  136. var i int32
  137. for i = 0; i < *(ss.Spec.Replicas); i++ {
  138. framework.Logf("Waiting for stateful pod at index %v to enter Running", i)
  139. WaitForRunning(c, i+1, i, ss)
  140. framework.Logf("Resuming stateful pod at index %v", i)
  141. ResumeNextPod(c, ss)
  142. }
  143. }