api_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "reflect"
  16. "sync"
  17. "testing"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/apimachinery/pkg/watch"
  23. informers "k8s.io/client-go/informers"
  24. "k8s.io/client-go/kubernetes/fake"
  25. ktesting "k8s.io/client-go/testing"
  26. )
  27. func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
  28. service1v1 := &v1.Service{
  29. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
  30. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}}
  31. service1v2 := &v1.Service{
  32. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
  33. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}}}
  34. service2 := &v1.Service{
  35. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s2"},
  36. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 30}}}}
  37. // Setup fake api client.
  38. client := fake.NewSimpleClientset()
  39. fakeWatch := watch.NewFake()
  40. client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
  41. stopCh := make(chan struct{})
  42. defer close(stopCh)
  43. handler := NewServiceHandlerMock()
  44. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  45. serviceConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
  46. serviceConfig.RegisterEventHandler(handler)
  47. go sharedInformers.Start(stopCh)
  48. go serviceConfig.Run(stopCh)
  49. // Add the first service
  50. fakeWatch.Add(service1v1)
  51. handler.ValidateServices(t, []*v1.Service{service1v1})
  52. // Add another service
  53. fakeWatch.Add(service2)
  54. handler.ValidateServices(t, []*v1.Service{service1v1, service2})
  55. // Modify service1
  56. fakeWatch.Modify(service1v2)
  57. handler.ValidateServices(t, []*v1.Service{service1v2, service2})
  58. // Delete service1
  59. fakeWatch.Delete(service1v2)
  60. handler.ValidateServices(t, []*v1.Service{service2})
  61. // Delete service2
  62. fakeWatch.Delete(service2)
  63. handler.ValidateServices(t, []*v1.Service{})
  64. }
  65. func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
  66. endpoints1v1 := &v1.Endpoints{
  67. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
  68. Subsets: []v1.EndpointSubset{{
  69. Addresses: []v1.EndpointAddress{
  70. {IP: "1.2.3.4"},
  71. },
  72. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  73. }},
  74. }
  75. endpoints1v2 := &v1.Endpoints{
  76. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
  77. Subsets: []v1.EndpointSubset{{
  78. Addresses: []v1.EndpointAddress{
  79. {IP: "1.2.3.4"},
  80. {IP: "4.3.2.1"},
  81. },
  82. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  83. }},
  84. }
  85. endpoints2 := &v1.Endpoints{
  86. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
  87. Subsets: []v1.EndpointSubset{{
  88. Addresses: []v1.EndpointAddress{
  89. {IP: "5.6.7.8"},
  90. },
  91. Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}},
  92. }},
  93. }
  94. // Setup fake api client.
  95. client := fake.NewSimpleClientset()
  96. fakeWatch := watch.NewFake()
  97. client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
  98. stopCh := make(chan struct{})
  99. defer close(stopCh)
  100. handler := NewEndpointsHandlerMock()
  101. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  102. endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
  103. endpointsConfig.RegisterEventHandler(handler)
  104. go sharedInformers.Start(stopCh)
  105. go endpointsConfig.Run(stopCh)
  106. // Add the first endpoints
  107. fakeWatch.Add(endpoints1v1)
  108. handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1})
  109. // Add another endpoints
  110. fakeWatch.Add(endpoints2)
  111. handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2})
  112. // Modify endpoints1
  113. fakeWatch.Modify(endpoints1v2)
  114. handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2})
  115. // Delete endpoints1
  116. fakeWatch.Delete(endpoints1v2)
  117. handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2})
  118. // Delete endpoints2
  119. fakeWatch.Delete(endpoints2)
  120. handler.ValidateEndpoints(t, []*v1.Endpoints{})
  121. }
  122. func newSvcHandler(t *testing.T, svcs []*v1.Service, done func()) ServiceHandler {
  123. shm := &ServiceHandlerMock{
  124. state: make(map[types.NamespacedName]*v1.Service),
  125. }
  126. shm.process = func(services []*v1.Service) {
  127. defer done()
  128. if !reflect.DeepEqual(services, svcs) {
  129. t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
  130. }
  131. }
  132. return shm
  133. }
  134. func newEpsHandler(t *testing.T, eps []*v1.Endpoints, done func()) EndpointsHandler {
  135. ehm := &EndpointsHandlerMock{
  136. state: make(map[types.NamespacedName]*v1.Endpoints),
  137. }
  138. ehm.process = func(endpoints []*v1.Endpoints) {
  139. defer done()
  140. if !reflect.DeepEqual(eps, endpoints) {
  141. t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
  142. }
  143. }
  144. return ehm
  145. }
  146. func TestInitialSync(t *testing.T) {
  147. svc1 := &v1.Service{
  148. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  149. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
  150. }
  151. svc2 := &v1.Service{
  152. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  153. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
  154. }
  155. eps1 := &v1.Endpoints{
  156. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  157. }
  158. eps2 := &v1.Endpoints{
  159. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  160. }
  161. var wg sync.WaitGroup
  162. // Wait for both services and endpoints handler.
  163. wg.Add(2)
  164. // Setup fake api client.
  165. client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
  166. sharedInformers := informers.NewSharedInformerFactory(client, 0)
  167. svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0)
  168. epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0)
  169. svcHandler := newSvcHandler(t, []*v1.Service{svc2, svc1}, wg.Done)
  170. svcConfig.RegisterEventHandler(svcHandler)
  171. epsHandler := newEpsHandler(t, []*v1.Endpoints{eps2, eps1}, wg.Done)
  172. epsConfig.RegisterEventHandler(epsHandler)
  173. stopCh := make(chan struct{})
  174. defer close(stopCh)
  175. go sharedInformers.Start(stopCh)
  176. go svcConfig.Run(stopCh)
  177. go epsConfig.Run(stopCh)
  178. wg.Wait()
  179. }