listwatch_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package tests
  14. import (
  15. "context"
  16. "net/http/httptest"
  17. "net/url"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/fields"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/runtime/schema"
  25. "k8s.io/apimachinery/pkg/watch"
  26. clientset "k8s.io/client-go/kubernetes"
  27. restclient "k8s.io/client-go/rest"
  28. . "k8s.io/client-go/tools/cache"
  29. watchtools "k8s.io/client-go/tools/watch"
  30. utiltesting "k8s.io/client-go/util/testing"
  31. "k8s.io/kubernetes/pkg/api/testapi"
  32. )
  33. func parseSelectorOrDie(s string) fields.Selector {
  34. selector, err := fields.ParseSelector(s)
  35. if err != nil {
  36. panic(err)
  37. }
  38. return selector
  39. }
  40. // buildQueryValues is a convenience function for knowing if a namespace should be in a query param or not
  41. func buildQueryValues(query url.Values) url.Values {
  42. v := url.Values{}
  43. if query != nil {
  44. for key, values := range query {
  45. for _, value := range values {
  46. v.Add(key, value)
  47. }
  48. }
  49. }
  50. return v
  51. }
  52. func buildLocation(resourcePath string, query url.Values) string {
  53. return resourcePath + "?" + query.Encode()
  54. }
  55. func TestListWatchesCanList(t *testing.T) {
  56. fieldSelectorQueryParamName := metav1.FieldSelectorQueryParam("v1")
  57. table := []struct {
  58. location string
  59. resource string
  60. namespace string
  61. fieldSelector fields.Selector
  62. }{
  63. // Node
  64. {
  65. location: testapi.Default.ResourcePath("nodes", metav1.NamespaceAll, ""),
  66. resource: "nodes",
  67. namespace: metav1.NamespaceAll,
  68. fieldSelector: parseSelectorOrDie(""),
  69. },
  70. // pod with "assigned" field selector.
  71. {
  72. location: buildLocation(
  73. testapi.Default.ResourcePath("pods", metav1.NamespaceAll, ""),
  74. buildQueryValues(url.Values{fieldSelectorQueryParamName: []string{"spec.host="}})),
  75. resource: "pods",
  76. namespace: metav1.NamespaceAll,
  77. fieldSelector: fields.Set{"spec.host": ""}.AsSelector(),
  78. },
  79. // pod in namespace "foo"
  80. {
  81. location: buildLocation(
  82. testapi.Default.ResourcePath("pods", "foo", ""),
  83. buildQueryValues(url.Values{fieldSelectorQueryParamName: []string{"spec.host="}})),
  84. resource: "pods",
  85. namespace: "foo",
  86. fieldSelector: fields.Set{"spec.host": ""}.AsSelector(),
  87. },
  88. }
  89. for _, item := range table {
  90. handler := utiltesting.FakeHandler{
  91. StatusCode: 500,
  92. ResponseBody: "",
  93. T: t,
  94. }
  95. server := httptest.NewServer(&handler)
  96. defer server.Close()
  97. client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  98. lw := NewListWatchFromClient(client.CoreV1().RESTClient(), item.resource, item.namespace, item.fieldSelector)
  99. lw.DisableChunking = true
  100. // This test merely tests that the correct request is made.
  101. lw.List(metav1.ListOptions{})
  102. handler.ValidateRequest(t, item.location, "GET", nil)
  103. }
  104. }
  105. func TestListWatchesCanWatch(t *testing.T) {
  106. fieldSelectorQueryParamName := metav1.FieldSelectorQueryParam("v1")
  107. table := []struct {
  108. rv string
  109. location string
  110. resource string
  111. namespace string
  112. fieldSelector fields.Selector
  113. }{
  114. // Node
  115. {
  116. location: buildLocation(
  117. testapi.Default.ResourcePath("nodes", metav1.NamespaceAll, ""),
  118. buildQueryValues(url.Values{"watch": []string{"true"}})),
  119. rv: "",
  120. resource: "nodes",
  121. namespace: metav1.NamespaceAll,
  122. fieldSelector: parseSelectorOrDie(""),
  123. },
  124. {
  125. location: buildLocation(
  126. testapi.Default.ResourcePath("nodes", metav1.NamespaceAll, ""),
  127. buildQueryValues(url.Values{"resourceVersion": []string{"42"}, "watch": []string{"true"}})),
  128. rv: "42",
  129. resource: "nodes",
  130. namespace: metav1.NamespaceAll,
  131. fieldSelector: parseSelectorOrDie(""),
  132. },
  133. // pod with "assigned" field selector.
  134. {
  135. location: buildLocation(
  136. testapi.Default.ResourcePath("pods", metav1.NamespaceAll, ""),
  137. buildQueryValues(url.Values{fieldSelectorQueryParamName: []string{"spec.host="}, "resourceVersion": []string{"0"}, "watch": []string{"true"}})),
  138. rv: "0",
  139. resource: "pods",
  140. namespace: metav1.NamespaceAll,
  141. fieldSelector: fields.Set{"spec.host": ""}.AsSelector(),
  142. },
  143. // pod with namespace foo and assigned field selector
  144. {
  145. location: buildLocation(
  146. testapi.Default.ResourcePath("pods", "foo", ""),
  147. buildQueryValues(url.Values{fieldSelectorQueryParamName: []string{"spec.host="}, "resourceVersion": []string{"0"}, "watch": []string{"true"}})),
  148. rv: "0",
  149. resource: "pods",
  150. namespace: "foo",
  151. fieldSelector: fields.Set{"spec.host": ""}.AsSelector(),
  152. },
  153. }
  154. for _, item := range table {
  155. handler := utiltesting.FakeHandler{
  156. StatusCode: 500,
  157. ResponseBody: "",
  158. T: t,
  159. }
  160. server := httptest.NewServer(&handler)
  161. defer server.Close()
  162. client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  163. lw := NewListWatchFromClient(client.CoreV1().RESTClient(), item.resource, item.namespace, item.fieldSelector)
  164. // This test merely tests that the correct request is made.
  165. lw.Watch(metav1.ListOptions{ResourceVersion: item.rv})
  166. handler.ValidateRequest(t, item.location, "GET", nil)
  167. }
  168. }
  169. type lw struct {
  170. list runtime.Object
  171. watch watch.Interface
  172. }
  173. func (w lw) List(options metav1.ListOptions) (runtime.Object, error) {
  174. return w.list, nil
  175. }
  176. func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) {
  177. return w.watch, nil
  178. }
  179. func TestListWatchUntil(t *testing.T) {
  180. fw := watch.NewFake()
  181. go func() {
  182. obj := &v1.Pod{
  183. ObjectMeta: metav1.ObjectMeta{
  184. ResourceVersion: "2",
  185. },
  186. }
  187. fw.Modify(obj)
  188. }()
  189. listwatch := lw{
  190. list: &v1.PodList{
  191. ListMeta: metav1.ListMeta{
  192. ResourceVersion: "1",
  193. },
  194. Items: []v1.Pod{{}},
  195. },
  196. watch: fw,
  197. }
  198. conditions := []watchtools.ConditionFunc{
  199. func(event watch.Event) (bool, error) {
  200. t.Logf("got %#v", event)
  201. return event.Type == watch.Added, nil
  202. },
  203. func(event watch.Event) (bool, error) {
  204. t.Logf("got %#v", event)
  205. return event.Type == watch.Modified, nil
  206. },
  207. }
  208. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  209. defer cancel()
  210. lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...)
  211. if err != nil {
  212. t.Fatalf("expected nil error, got %#v", err)
  213. }
  214. if lastEvent == nil {
  215. t.Fatal("expected an event")
  216. }
  217. if lastEvent.Type != watch.Modified {
  218. t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
  219. }
  220. if got, isPod := lastEvent.Object.(*v1.Pod); !isPod {
  221. t.Fatalf("expected a pod event, got %#v", got)
  222. }
  223. }