watch_restart_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apimachinery
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "testing"
  19. "time"
  20. corev1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/fields"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. "k8s.io/apimachinery/pkg/watch"
  27. "k8s.io/client-go/kubernetes"
  28. restclient "k8s.io/client-go/rest"
  29. "k8s.io/client-go/tools/cache"
  30. watchtools "k8s.io/client-go/tools/watch"
  31. "k8s.io/kubernetes/test/integration/framework"
  32. )
  33. func noopNormalization(output []string) []string {
  34. return output
  35. }
  36. func normalizeInformerOutputFunc(initialVal string) func(output []string) []string {
  37. return func(output []string) []string {
  38. result := make([]string, 0, len(output))
  39. // Removes initial value and all of its direct repetitions
  40. lastVal := initialVal
  41. for _, v := range output {
  42. // Make values unique as informer(List+Watch) duplicates some events
  43. if v == lastVal {
  44. continue
  45. }
  46. result = append(result, v)
  47. lastVal = v
  48. }
  49. return result
  50. }
  51. }
  52. func noop() {}
  53. func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
  54. // Has to be longer than 5 seconds
  55. timeout := 2 * time.Minute
  56. // Set up a master
  57. masterConfig := framework.NewIntegrationTestMasterConfig()
  58. // Timeout is set random between MinRequestTimeout and 2x
  59. masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4
  60. _, s, closeFn := framework.RunAMaster(masterConfig)
  61. defer closeFn()
  62. config := &restclient.Config{
  63. Host: s.URL,
  64. }
  65. namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t)
  66. defer framework.DeleteTestingNamespace(namespaceObject, s, t)
  67. getListFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) *corev1.SecretList {
  68. return func(options metav1.ListOptions) *corev1.SecretList {
  69. options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
  70. res, err := c.CoreV1().Secrets(secret.Namespace).List(context.TODO(), options)
  71. if err != nil {
  72. t.Fatalf("Failed to list Secrets: %v", err)
  73. }
  74. return res
  75. }
  76. }
  77. getWatchFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) (watch.Interface, error) {
  78. return func(options metav1.ListOptions) (watch.Interface, error) {
  79. options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
  80. res, err := c.CoreV1().Secrets(secret.Namespace).Watch(context.TODO(), options)
  81. if err != nil {
  82. t.Fatalf("Failed to create a watcher on Secrets: %v", err)
  83. }
  84. return res, err
  85. }
  86. }
  87. generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *corev1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) {
  88. defer close(stoppedChan)
  89. counter := 0
  90. // These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends
  91. softTimeout := timeout - 5*time.Second
  92. if softTimeout < 0 {
  93. panic("Timeout has to be grater than 5 seconds!")
  94. }
  95. endChannel := time.After(softTimeout)
  96. for {
  97. select {
  98. // TODO: get this lower once we figure out how to extend ETCD cache
  99. case <-time.After(1000 * time.Millisecond):
  100. counter = counter + 1
  101. patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter)
  102. _, err := c.CoreV1().Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
  103. if err != nil {
  104. t.Fatalf("Failed to patch secret: %v", err)
  105. }
  106. *referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter))
  107. case <-endChannel:
  108. return
  109. case <-stopChan:
  110. return
  111. }
  112. }
  113. }
  114. initialCount := "0"
  115. newTestSecret := func(name string) *corev1.Secret {
  116. return &corev1.Secret{
  117. ObjectMeta: metav1.ObjectMeta{
  118. Name: name,
  119. Namespace: namespaceObject.Name,
  120. Annotations: map[string]string{
  121. "count": initialCount,
  122. },
  123. },
  124. Data: map[string][]byte{
  125. "data": []byte("value1\n"),
  126. },
  127. }
  128. }
  129. tt := []struct {
  130. name string
  131. succeed bool
  132. secret *corev1.Secret
  133. getWatcher func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func())
  134. normalizeOutputFunc func(referenceOutput []string) []string
  135. }{
  136. {
  137. name: "regular watcher should fail",
  138. succeed: false,
  139. secret: newTestSecret("secret-01"),
  140. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  141. options := metav1.ListOptions{
  142. ResourceVersion: secret.ResourceVersion,
  143. }
  144. w, err := getWatchFunc(c, secret)(options)
  145. return w, err, noop
  146. }, // regular watcher; unfortunately destined to fail
  147. normalizeOutputFunc: noopNormalization,
  148. },
  149. {
  150. name: "RetryWatcher survives closed watches",
  151. succeed: true,
  152. secret: newTestSecret("secret-02"),
  153. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  154. lw := &cache.ListWatch{
  155. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  156. return getWatchFunc(c, secret)(options)
  157. },
  158. }
  159. w, err := watchtools.NewRetryWatcher(secret.ResourceVersion, lw)
  160. return w, err, func() { <-w.Done() }
  161. },
  162. normalizeOutputFunc: noopNormalization,
  163. },
  164. {
  165. name: "InformerWatcher survives closed watches",
  166. succeed: true,
  167. secret: newTestSecret("secret-03"),
  168. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  169. lw := &cache.ListWatch{
  170. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  171. return getListFunc(c, secret)(options), nil
  172. },
  173. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  174. return getWatchFunc(c, secret)(options)
  175. },
  176. }
  177. _, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{})
  178. return w, nil, func() { <-done }
  179. },
  180. normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),
  181. },
  182. }
  183. for _, tmptc := range tt {
  184. tc := tmptc // we need to copy it for parallel runs
  185. t.Run(tc.name, func(t *testing.T) {
  186. c, err := kubernetes.NewForConfig(config)
  187. if err != nil {
  188. t.Fatalf("Failed to create clientset: %v", err)
  189. }
  190. secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(context.TODO(), tc.secret, metav1.CreateOptions{})
  191. if err != nil {
  192. t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err)
  193. }
  194. watcher, err, doneFn := tc.getWatcher(c, secret)
  195. if err != nil {
  196. t.Fatalf("Failed to create watcher: %v", err)
  197. }
  198. defer doneFn()
  199. var referenceOutput []string
  200. var output []string
  201. stopChan := make(chan struct{})
  202. stoppedChan := make(chan struct{})
  203. go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan)
  204. // Record current time to be able to asses if the timeout has been reached
  205. startTime := time.Now()
  206. ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
  207. defer cancel()
  208. _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
  209. s, ok := event.Object.(*corev1.Secret)
  210. if !ok {
  211. t.Fatalf("Received an object that is not a Secret: %#v", event.Object)
  212. }
  213. output = append(output, s.Annotations["count"])
  214. // Watch will never end voluntarily
  215. return false, nil
  216. })
  217. watchDuration := time.Since(startTime)
  218. close(stopChan)
  219. <-stoppedChan
  220. output = tc.normalizeOutputFunc(output)
  221. t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout)
  222. if err == nil && !tc.succeed {
  223. t.Fatalf("Watch should have timed out but it exited without an error!")
  224. }
  225. if err != wait.ErrWaitTimeout && tc.succeed {
  226. t.Fatalf("Watch exited with error: %v!", err)
  227. }
  228. if watchDuration < timeout && tc.succeed {
  229. t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration)
  230. }
  231. if watchDuration >= timeout && !tc.succeed {
  232. t.Fatalf("Watch should have timed out but it succeeded!")
  233. }
  234. if tc.succeed && !reflect.DeepEqual(referenceOutput, output) {
  235. t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output)
  236. }
  237. })
  238. }
  239. }