watch_restart_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apimachinery
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "testing"
  19. "time"
  20. corev1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/fields"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. "k8s.io/apimachinery/pkg/watch"
  27. "k8s.io/client-go/kubernetes"
  28. restclient "k8s.io/client-go/rest"
  29. "k8s.io/client-go/tools/cache"
  30. watchtools "k8s.io/client-go/tools/watch"
  31. "k8s.io/kubernetes/pkg/api/testapi"
  32. "k8s.io/kubernetes/test/integration/framework"
  33. )
  34. func noopNormalization(output []string) []string {
  35. return output
  36. }
  37. func normalizeInformerOutputFunc(initialVal string) func(output []string) []string {
  38. return func(output []string) []string {
  39. result := make([]string, 0, len(output))
  40. // Removes initial value and all of its direct repetitions
  41. lastVal := initialVal
  42. for _, v := range output {
  43. // Make values unique as informer(List+Watch) duplicates some events
  44. if v == lastVal {
  45. continue
  46. }
  47. result = append(result, v)
  48. lastVal = v
  49. }
  50. return result
  51. }
  52. }
  53. func noop() {}
  54. func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
  55. // Has to be longer than 5 seconds
  56. timeout := 2 * time.Minute
  57. // Set up a master
  58. masterConfig := framework.NewIntegrationTestMasterConfig()
  59. // Timeout is set random between MinRequestTimeout and 2x
  60. masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4
  61. _, s, closeFn := framework.RunAMaster(masterConfig)
  62. defer closeFn()
  63. config := &restclient.Config{
  64. Host: s.URL,
  65. ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[corev1.GroupName].GroupVersion()},
  66. }
  67. namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t)
  68. defer framework.DeleteTestingNamespace(namespaceObject, s, t)
  69. getListFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) *corev1.SecretList {
  70. return func(options metav1.ListOptions) *corev1.SecretList {
  71. options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
  72. res, err := c.CoreV1().Secrets(secret.Namespace).List(options)
  73. if err != nil {
  74. t.Fatalf("Failed to list Secrets: %v", err)
  75. }
  76. return res
  77. }
  78. }
  79. getWatchFunc := func(c *kubernetes.Clientset, secret *corev1.Secret) func(options metav1.ListOptions) (watch.Interface, error) {
  80. return func(options metav1.ListOptions) (watch.Interface, error) {
  81. options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
  82. res, err := c.CoreV1().Secrets(secret.Namespace).Watch(options)
  83. if err != nil {
  84. t.Fatalf("Failed to create a watcher on Secrets: %v", err)
  85. }
  86. return res, err
  87. }
  88. }
  89. generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *corev1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) {
  90. defer close(stoppedChan)
  91. counter := 0
  92. // These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends
  93. softTimeout := timeout - 5*time.Second
  94. if softTimeout < 0 {
  95. panic("Timeout has to be grater than 5 seconds!")
  96. }
  97. endChannel := time.After(softTimeout)
  98. for {
  99. select {
  100. // TODO: get this lower once we figure out how to extend ETCD cache
  101. case <-time.After(1000 * time.Millisecond):
  102. counter = counter + 1
  103. patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter)
  104. _, err := c.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.StrategicMergePatchType, []byte(patch))
  105. if err != nil {
  106. t.Fatalf("Failed to patch secret: %v", err)
  107. }
  108. *referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter))
  109. case <-endChannel:
  110. return
  111. case <-stopChan:
  112. return
  113. }
  114. }
  115. }
  116. initialCount := "0"
  117. newTestSecret := func(name string) *corev1.Secret {
  118. return &corev1.Secret{
  119. ObjectMeta: metav1.ObjectMeta{
  120. Name: name,
  121. Namespace: namespaceObject.Name,
  122. Annotations: map[string]string{
  123. "count": initialCount,
  124. },
  125. },
  126. Data: map[string][]byte{
  127. "data": []byte("value1\n"),
  128. },
  129. }
  130. }
  131. tt := []struct {
  132. name string
  133. succeed bool
  134. secret *corev1.Secret
  135. getWatcher func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func())
  136. normalizeOutputFunc func(referenceOutput []string) []string
  137. }{
  138. {
  139. name: "regular watcher should fail",
  140. succeed: false,
  141. secret: newTestSecret("secret-01"),
  142. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  143. options := metav1.ListOptions{
  144. ResourceVersion: secret.ResourceVersion,
  145. }
  146. w, err := getWatchFunc(c, secret)(options)
  147. return w, err, noop
  148. }, // regular watcher; unfortunately destined to fail
  149. normalizeOutputFunc: noopNormalization,
  150. },
  151. {
  152. name: "RetryWatcher survives closed watches",
  153. succeed: true,
  154. secret: newTestSecret("secret-02"),
  155. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  156. lw := &cache.ListWatch{
  157. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  158. return getWatchFunc(c, secret)(options)
  159. },
  160. }
  161. w, err := watchtools.NewRetryWatcher(secret.ResourceVersion, lw)
  162. return w, err, func() { <-w.Done() }
  163. },
  164. normalizeOutputFunc: noopNormalization,
  165. },
  166. {
  167. name: "InformerWatcher survives closed watches",
  168. succeed: true,
  169. secret: newTestSecret("secret-03"),
  170. getWatcher: func(c *kubernetes.Clientset, secret *corev1.Secret) (watch.Interface, error, func()) {
  171. lw := &cache.ListWatch{
  172. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  173. return getListFunc(c, secret)(options), nil
  174. },
  175. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  176. return getWatchFunc(c, secret)(options)
  177. },
  178. }
  179. _, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &corev1.Secret{})
  180. return w, nil, func() { <-done }
  181. },
  182. normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),
  183. },
  184. }
  185. for _, tmptc := range tt {
  186. tc := tmptc // we need to copy it for parallel runs
  187. t.Run(tc.name, func(t *testing.T) {
  188. c, err := kubernetes.NewForConfig(config)
  189. if err != nil {
  190. t.Fatalf("Failed to create clientset: %v", err)
  191. }
  192. secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(tc.secret)
  193. if err != nil {
  194. t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err)
  195. }
  196. watcher, err, doneFn := tc.getWatcher(c, secret)
  197. if err != nil {
  198. t.Fatalf("Failed to create watcher: %v", err)
  199. }
  200. defer doneFn()
  201. var referenceOutput []string
  202. var output []string
  203. stopChan := make(chan struct{})
  204. stoppedChan := make(chan struct{})
  205. go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan)
  206. // Record current time to be able to asses if the timeout has been reached
  207. startTime := time.Now()
  208. ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
  209. defer cancel()
  210. _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
  211. s, ok := event.Object.(*corev1.Secret)
  212. if !ok {
  213. t.Fatalf("Received an object that is not a Secret: %#v", event.Object)
  214. }
  215. output = append(output, s.Annotations["count"])
  216. // Watch will never end voluntarily
  217. return false, nil
  218. })
  219. watchDuration := time.Since(startTime)
  220. close(stopChan)
  221. <-stoppedChan
  222. output = tc.normalizeOutputFunc(output)
  223. t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout)
  224. if err == nil && !tc.succeed {
  225. t.Fatalf("Watch should have timed out but it exited without an error!")
  226. }
  227. if err != wait.ErrWaitTimeout && tc.succeed {
  228. t.Fatalf("Watch exited with error: %v!", err)
  229. }
  230. if watchDuration < timeout && tc.succeed {
  231. t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration)
  232. }
  233. if watchDuration >= timeout && !tc.succeed {
  234. t.Fatalf("Watch should have timed out but it succeeded!")
  235. }
  236. if tc.succeed && !reflect.DeepEqual(referenceOutput, output) {
  237. t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output)
  238. }
  239. })
  240. }
  241. }