wait.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apiclient
  14. import (
  15. "fmt"
  16. "io"
  17. "net/http"
  18. "time"
  19. "github.com/pkg/errors"
  20. "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. netutil "k8s.io/apimachinery/pkg/util/net"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. clientset "k8s.io/client-go/kubernetes"
  26. kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
  27. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  28. )
  29. // Waiter is an interface for waiting for criteria in Kubernetes to happen
  30. type Waiter interface {
  31. // WaitForAPI waits for the API Server's /healthz endpoint to become "ok"
  32. WaitForAPI() error
  33. // WaitForPodsWithLabel waits for Pods in the kube-system namespace to become Ready
  34. WaitForPodsWithLabel(kvLabel string) error
  35. // WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
  36. WaitForPodToDisappear(staticPodName string) error
  37. // WaitForStaticPodSingleHash fetches sha256 hash for the control plane static pod
  38. WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
  39. // WaitForStaticPodHashChange waits for the given static pod component's static pod hash to get updated.
  40. // By doing that we can be sure that the kubelet has restarted the given Static Pod
  41. WaitForStaticPodHashChange(nodeName, component, previousHash string) error
  42. // WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
  43. WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
  44. // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
  45. WaitForHealthyKubelet(initalTimeout time.Duration, healthzEndpoint string) error
  46. // WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
  47. WaitForKubeletAndFunc(f func() error) error
  48. // SetTimeout adjusts the timeout to the specified duration
  49. SetTimeout(timeout time.Duration)
  50. }
  51. // KubeWaiter is an implementation of Waiter that is backed by a Kubernetes client
  52. type KubeWaiter struct {
  53. client clientset.Interface
  54. timeout time.Duration
  55. writer io.Writer
  56. }
  57. // NewKubeWaiter returns a new Waiter object that talks to the given Kubernetes cluster
  58. func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.Writer) Waiter {
  59. return &KubeWaiter{
  60. client: client,
  61. timeout: timeout,
  62. writer: writer,
  63. }
  64. }
  65. // WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
  66. func (w *KubeWaiter) WaitForAPI() error {
  67. start := time.Now()
  68. return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  69. healthStatus := 0
  70. w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
  71. if healthStatus != http.StatusOK {
  72. return false, nil
  73. }
  74. fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds())
  75. return true, nil
  76. })
  77. }
  78. // WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
  79. // reporting status as running.
  80. func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error {
  81. lastKnownPodNumber := -1
  82. return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  83. listOpts := metav1.ListOptions{LabelSelector: kvLabel}
  84. pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(listOpts)
  85. if err != nil {
  86. fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
  87. return false, nil
  88. }
  89. if lastKnownPodNumber != len(pods.Items) {
  90. fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
  91. lastKnownPodNumber = len(pods.Items)
  92. }
  93. if len(pods.Items) == 0 {
  94. return false, nil
  95. }
  96. for _, pod := range pods.Items {
  97. if pod.Status.Phase != v1.PodRunning {
  98. return false, nil
  99. }
  100. }
  101. return true, nil
  102. })
  103. }
  104. // WaitForPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
  105. func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
  106. return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  107. _, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
  108. if apierrors.IsNotFound(err) {
  109. fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
  110. return true, nil
  111. }
  112. return false, nil
  113. })
  114. }
  115. // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
  116. func (w *KubeWaiter) WaitForHealthyKubelet(initalTimeout time.Duration, healthzEndpoint string) error {
  117. time.Sleep(initalTimeout)
  118. fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initalTimeout)
  119. return TryRunCommand(func() error {
  120. client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
  121. resp, err := client.Get(healthzEndpoint)
  122. if err != nil {
  123. fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
  124. fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err)
  125. return err
  126. }
  127. defer resp.Body.Close()
  128. if resp.StatusCode != http.StatusOK {
  129. fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
  130. fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode)
  131. return errors.New("the kubelet healthz endpoint is unhealthy")
  132. }
  133. return nil
  134. }, 5) // a failureThreshold of five means waiting for a total of 155 seconds
  135. }
  136. // WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
  137. // /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
  138. func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error {
  139. errorChan := make(chan error)
  140. go func(errC chan error, waiter Waiter) {
  141. if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil {
  142. errC <- err
  143. }
  144. }(errorChan, w)
  145. go func(errC chan error, waiter Waiter) {
  146. // This main goroutine sends whatever the f function returns (error or not) to the channel
  147. // This in order to continue on success (nil error), or just fail if the function returns an error
  148. errC <- f()
  149. }(errorChan, w)
  150. // This call is blocking until one of the goroutines sends to errorChan
  151. return <-errorChan
  152. }
  153. // SetTimeout adjusts the timeout to the specified duration
  154. func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
  155. w.timeout = timeout
  156. }
  157. // WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods
  158. func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
  159. componentHash := ""
  160. var err error
  161. mirrorPodHashes := map[string]string{}
  162. for _, component := range kubeadmconstants.ControlPlaneComponents {
  163. err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  164. componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
  165. if err != nil {
  166. return false, nil
  167. }
  168. return true, nil
  169. })
  170. if err != nil {
  171. return nil, err
  172. }
  173. mirrorPodHashes[component] = componentHash
  174. }
  175. return mirrorPodHashes, nil
  176. }
  177. // WaitForStaticPodSingleHash blocks until it timeouts or gets a hash for a single component and its Static Pod
  178. func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component string) (string, error) {
  179. componentPodHash := ""
  180. var err error
  181. err = wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  182. componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
  183. if err != nil {
  184. return false, nil
  185. }
  186. return true, nil
  187. })
  188. return componentPodHash, err
  189. }
  190. // WaitForStaticPodHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
  191. // This implicitly means this function blocks until the kubelet has restarted the Static Pod in question
  192. func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error {
  193. return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
  194. hash, err := getStaticPodSingleHash(w.client, nodeName, component)
  195. if err != nil {
  196. return false, nil
  197. }
  198. // We should continue polling until the UID changes
  199. if hash == previousHash {
  200. return false, nil
  201. }
  202. return true, nil
  203. })
  204. }
  205. // getStaticPodSingleHash computes hashes for a single Static Pod resource
  206. func getStaticPodSingleHash(client clientset.Interface, nodeName string, component string) (string, error) {
  207. staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
  208. staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
  209. if err != nil {
  210. return "", err
  211. }
  212. staticPodHash := staticPod.Annotations[kubetypes.ConfigHashAnnotationKey]
  213. fmt.Printf("Static pod: %s hash: %s\n", staticPodName, staticPodHash)
  214. return staticPodHash, nil
  215. }
  216. // TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
  217. func TryRunCommand(f func() error, failureThreshold int) error {
  218. backoff := wait.Backoff{
  219. Duration: 5 * time.Second,
  220. Factor: 2, // double the timeout for every failure
  221. Steps: failureThreshold,
  222. }
  223. return wait.ExponentialBackoff(backoff, func() (bool, error) {
  224. err := f()
  225. if err != nil {
  226. // Retry until the timeout
  227. return false, nil
  228. }
  229. // The last f() call was a success, return cleanly
  230. return true, nil
  231. })
  232. }