config_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "reflect"
  16. "sort"
  17. "sync"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/apimachinery/pkg/watch"
  25. informers "k8s.io/client-go/informers"
  26. "k8s.io/client-go/kubernetes/fake"
  27. ktesting "k8s.io/client-go/testing"
  28. )
  29. type sortedServices []*v1.Service
  30. func (s sortedServices) Len() int {
  31. return len(s)
  32. }
  33. func (s sortedServices) Swap(i, j int) {
  34. s[i], s[j] = s[j], s[i]
  35. }
  36. func (s sortedServices) Less(i, j int) bool {
  37. return s[i].Name < s[j].Name
  38. }
  39. type ServiceHandlerMock struct {
  40. lock sync.Mutex
  41. state map[types.NamespacedName]*v1.Service
  42. synced bool
  43. updated chan []*v1.Service
  44. process func([]*v1.Service)
  45. }
  46. func NewServiceHandlerMock() *ServiceHandlerMock {
  47. shm := &ServiceHandlerMock{
  48. state: make(map[types.NamespacedName]*v1.Service),
  49. updated: make(chan []*v1.Service, 5),
  50. }
  51. shm.process = func(services []*v1.Service) {
  52. shm.updated <- services
  53. }
  54. return shm
  55. }
  56. func (h *ServiceHandlerMock) OnServiceAdd(service *v1.Service) {
  57. h.lock.Lock()
  58. defer h.lock.Unlock()
  59. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  60. h.state[namespacedName] = service
  61. h.sendServices()
  62. }
  63. func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *v1.Service) {
  64. h.lock.Lock()
  65. defer h.lock.Unlock()
  66. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  67. h.state[namespacedName] = service
  68. h.sendServices()
  69. }
  70. func (h *ServiceHandlerMock) OnServiceDelete(service *v1.Service) {
  71. h.lock.Lock()
  72. defer h.lock.Unlock()
  73. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  74. delete(h.state, namespacedName)
  75. h.sendServices()
  76. }
  77. func (h *ServiceHandlerMock) OnServiceSynced() {
  78. h.lock.Lock()
  79. defer h.lock.Unlock()
  80. h.synced = true
  81. h.sendServices()
  82. }
  83. func (h *ServiceHandlerMock) sendServices() {
  84. if !h.synced {
  85. return
  86. }
  87. services := make([]*v1.Service, 0, len(h.state))
  88. for _, svc := range h.state {
  89. services = append(services, svc)
  90. }
  91. sort.Sort(sortedServices(services))
  92. h.process(services)
  93. }
  94. func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*v1.Service) {
  95. // We might get 1 or more updates for N service updates, because we
  96. // over write older snapshots of services from the producer go-routine
  97. // if the consumer falls behind.
  98. var services []*v1.Service
  99. for {
  100. select {
  101. case services = <-h.updated:
  102. if reflect.DeepEqual(services, expectedServices) {
  103. return
  104. }
  105. // Unittests will hard timeout in 5m with a stack trace, prevent that
  106. // and surface a clearer reason for failure.
  107. case <-time.After(wait.ForeverTestTimeout):
  108. t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services)
  109. return
  110. }
  111. }
  112. }
  113. type sortedEndpoints []*v1.Endpoints
  114. func (s sortedEndpoints) Len() int {
  115. return len(s)
  116. }
  117. func (s sortedEndpoints) Swap(i, j int) {
  118. s[i], s[j] = s[j], s[i]
  119. }
  120. func (s sortedEndpoints) Less(i, j int) bool {
  121. return s[i].Name < s[j].Name
  122. }
  123. type EndpointsHandlerMock struct {
  124. lock sync.Mutex
  125. state map[types.NamespacedName]*v1.Endpoints
  126. synced bool
  127. updated chan []*v1.Endpoints
  128. process func([]*v1.Endpoints)
  129. }
  130. func NewEndpointsHandlerMock() *EndpointsHandlerMock {
  131. ehm := &EndpointsHandlerMock{
  132. state: make(map[types.NamespacedName]*v1.Endpoints),
  133. updated: make(chan []*v1.Endpoints, 5),
  134. }
  135. ehm.process = func(endpoints []*v1.Endpoints) {
  136. ehm.updated <- endpoints
  137. }
  138. return ehm
  139. }
  140. func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) {
  141. h.lock.Lock()
  142. defer h.lock.Unlock()
  143. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  144. h.state[namespacedName] = endpoints
  145. h.sendEndpoints()
  146. }
  147. func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  148. h.lock.Lock()
  149. defer h.lock.Unlock()
  150. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  151. h.state[namespacedName] = endpoints
  152. h.sendEndpoints()
  153. }
  154. func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) {
  155. h.lock.Lock()
  156. defer h.lock.Unlock()
  157. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  158. delete(h.state, namespacedName)
  159. h.sendEndpoints()
  160. }
  161. func (h *EndpointsHandlerMock) OnEndpointsSynced() {
  162. h.lock.Lock()
  163. defer h.lock.Unlock()
  164. h.synced = true
  165. h.sendEndpoints()
  166. }
  167. func (h *EndpointsHandlerMock) sendEndpoints() {
  168. if !h.synced {
  169. return
  170. }
  171. endpoints := make([]*v1.Endpoints, 0, len(h.state))
  172. for _, eps := range h.state {
  173. endpoints = append(endpoints, eps)
  174. }
  175. sort.Sort(sortedEndpoints(endpoints))
  176. h.process(endpoints)
  177. }
  178. func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) {
  179. // We might get 1 or more updates for N endpoint updates, because we
  180. // over write older snapshots of endpoints from the producer go-routine
  181. // if the consumer falls behind. Unittests will hard timeout in 5m.
  182. var endpoints []*v1.Endpoints
  183. for {
  184. select {
  185. case endpoints = <-h.updated:
  186. if reflect.DeepEqual(endpoints, expectedEndpoints) {
  187. return
  188. }
  189. // Unittests will hard timeout in 5m with a stack trace, prevent that
  190. // and surface a clearer reason for failure.
  191. case <-time.After(wait.ForeverTestTimeout):
  192. t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints)
  193. return
  194. }
  195. }
  196. }
  197. func TestNewServiceAddedAndNotified(t *testing.T) {
  198. client := fake.NewSimpleClientset()
  199. fakeWatch := watch.NewFake()
  200. client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
  201. stopCh := make(chan struct{})
  202. defer close(stopCh)
  203. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  204. config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
  205. handler := NewServiceHandlerMock()
  206. config.RegisterEventHandler(handler)
  207. go sharedInformers.Start(stopCh)
  208. go config.Run(stopCh)
  209. service := &v1.Service{
  210. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  211. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
  212. }
  213. fakeWatch.Add(service)
  214. handler.ValidateServices(t, []*v1.Service{service})
  215. }
  216. func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
  217. client := fake.NewSimpleClientset()
  218. fakeWatch := watch.NewFake()
  219. client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
  220. stopCh := make(chan struct{})
  221. defer close(stopCh)
  222. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  223. config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
  224. handler := NewServiceHandlerMock()
  225. config.RegisterEventHandler(handler)
  226. go sharedInformers.Start(stopCh)
  227. go config.Run(stopCh)
  228. service1 := &v1.Service{
  229. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  230. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
  231. }
  232. fakeWatch.Add(service1)
  233. handler.ValidateServices(t, []*v1.Service{service1})
  234. service2 := &v1.Service{
  235. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  236. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
  237. }
  238. fakeWatch.Add(service2)
  239. services := []*v1.Service{service2, service1}
  240. handler.ValidateServices(t, services)
  241. fakeWatch.Delete(service1)
  242. services = []*v1.Service{service2}
  243. handler.ValidateServices(t, services)
  244. }
  245. func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
  246. client := fake.NewSimpleClientset()
  247. fakeWatch := watch.NewFake()
  248. client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
  249. stopCh := make(chan struct{})
  250. defer close(stopCh)
  251. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  252. config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
  253. handler := NewServiceHandlerMock()
  254. handler2 := NewServiceHandlerMock()
  255. config.RegisterEventHandler(handler)
  256. config.RegisterEventHandler(handler2)
  257. go sharedInformers.Start(stopCh)
  258. go config.Run(stopCh)
  259. service1 := &v1.Service{
  260. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  261. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
  262. }
  263. service2 := &v1.Service{
  264. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  265. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
  266. }
  267. fakeWatch.Add(service1)
  268. fakeWatch.Add(service2)
  269. services := []*v1.Service{service2, service1}
  270. handler.ValidateServices(t, services)
  271. handler2.ValidateServices(t, services)
  272. }
  273. func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
  274. client := fake.NewSimpleClientset()
  275. fakeWatch := watch.NewFake()
  276. client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
  277. stopCh := make(chan struct{})
  278. defer close(stopCh)
  279. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  280. config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
  281. handler := NewEndpointsHandlerMock()
  282. handler2 := NewEndpointsHandlerMock()
  283. config.RegisterEventHandler(handler)
  284. config.RegisterEventHandler(handler2)
  285. go sharedInformers.Start(stopCh)
  286. go config.Run(stopCh)
  287. endpoints1 := &v1.Endpoints{
  288. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  289. Subsets: []v1.EndpointSubset{{
  290. Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
  291. Ports: []v1.EndpointPort{{Port: 80}},
  292. }},
  293. }
  294. endpoints2 := &v1.Endpoints{
  295. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  296. Subsets: []v1.EndpointSubset{{
  297. Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
  298. Ports: []v1.EndpointPort{{Port: 80}},
  299. }},
  300. }
  301. fakeWatch.Add(endpoints1)
  302. fakeWatch.Add(endpoints2)
  303. endpoints := []*v1.Endpoints{endpoints2, endpoints1}
  304. handler.ValidateEndpoints(t, endpoints)
  305. handler2.ValidateEndpoints(t, endpoints)
  306. }
  307. func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
  308. client := fake.NewSimpleClientset()
  309. fakeWatch := watch.NewFake()
  310. client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
  311. stopCh := make(chan struct{})
  312. defer close(stopCh)
  313. sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
  314. config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
  315. handler := NewEndpointsHandlerMock()
  316. handler2 := NewEndpointsHandlerMock()
  317. config.RegisterEventHandler(handler)
  318. config.RegisterEventHandler(handler2)
  319. go sharedInformers.Start(stopCh)
  320. go config.Run(stopCh)
  321. endpoints1 := &v1.Endpoints{
  322. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  323. Subsets: []v1.EndpointSubset{{
  324. Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
  325. Ports: []v1.EndpointPort{{Port: 80}},
  326. }},
  327. }
  328. endpoints2 := &v1.Endpoints{
  329. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
  330. Subsets: []v1.EndpointSubset{{
  331. Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
  332. Ports: []v1.EndpointPort{{Port: 80}},
  333. }},
  334. }
  335. fakeWatch.Add(endpoints1)
  336. fakeWatch.Add(endpoints2)
  337. endpoints := []*v1.Endpoints{endpoints2, endpoints1}
  338. handler.ValidateEndpoints(t, endpoints)
  339. handler2.ValidateEndpoints(t, endpoints)
  340. // Add one more
  341. endpoints3 := &v1.Endpoints{
  342. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
  343. Subsets: []v1.EndpointSubset{{
  344. Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
  345. Ports: []v1.EndpointPort{{Port: 80}},
  346. }},
  347. }
  348. fakeWatch.Add(endpoints3)
  349. endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3}
  350. handler.ValidateEndpoints(t, endpoints)
  351. handler2.ValidateEndpoints(t, endpoints)
  352. // Update the "foo" service with new endpoints
  353. endpoints1v2 := &v1.Endpoints{
  354. ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
  355. Subsets: []v1.EndpointSubset{{
  356. Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}},
  357. Ports: []v1.EndpointPort{{Port: 80}},
  358. }},
  359. }
  360. fakeWatch.Modify(endpoints1v2)
  361. endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3}
  362. handler.ValidateEndpoints(t, endpoints)
  363. handler2.ValidateEndpoints(t, endpoints)
  364. // Remove "bar" endpoints
  365. fakeWatch.Delete(endpoints2)
  366. endpoints = []*v1.Endpoints{endpoints1v2, endpoints3}
  367. handler.ValidateEndpoints(t, endpoints)
  368. handler2.ValidateEndpoints(t, endpoints)
  369. }
  370. // TODO: Add a unittest for interrupts getting processed in a timely manner.
  371. // Currently this module has a circular dependency with config, and so it's
  372. // named config_test, which means even test methods need to be public. This
  373. // is refactoring that we can avoid by resolving the dependency.