123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package config
- import (
- "reflect"
- "sync"
- "testing"
- "time"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/watch"
- informers "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes/fake"
- ktesting "k8s.io/client-go/testing"
- )
- func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
- service1v1 := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}}
- service1v2 := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}}}
- service2 := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s2"},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 30}}}}
- // Setup fake api client.
- client := fake.NewSimpleClientset()
- fakeWatch := watch.NewFake()
- client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
- stopCh := make(chan struct{})
- defer close(stopCh)
- handler := NewServiceHandlerMock()
- sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
- serviceConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
- serviceConfig.RegisterEventHandler(handler)
- go sharedInformers.Start(stopCh)
- go serviceConfig.Run(stopCh)
- // Add the first service
- fakeWatch.Add(service1v1)
- handler.ValidateServices(t, []*v1.Service{service1v1})
- // Add another service
- fakeWatch.Add(service2)
- handler.ValidateServices(t, []*v1.Service{service1v1, service2})
- // Modify service1
- fakeWatch.Modify(service1v2)
- handler.ValidateServices(t, []*v1.Service{service1v2, service2})
- // Delete service1
- fakeWatch.Delete(service1v2)
- handler.ValidateServices(t, []*v1.Service{service2})
- // Delete service2
- fakeWatch.Delete(service2)
- handler.ValidateServices(t, []*v1.Service{})
- }
- func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
- endpoints1v1 := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4"},
- },
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- }
- endpoints1v2 := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4"},
- {IP: "4.3.2.1"},
- },
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- }
- endpoints2 := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "5.6.7.8"},
- },
- Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}},
- }},
- }
- // Setup fake api client.
- client := fake.NewSimpleClientset()
- fakeWatch := watch.NewFake()
- client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
- stopCh := make(chan struct{})
- defer close(stopCh)
- handler := NewEndpointsHandlerMock()
- sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
- endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
- endpointsConfig.RegisterEventHandler(handler)
- go sharedInformers.Start(stopCh)
- go endpointsConfig.Run(stopCh)
- // Add the first endpoints
- fakeWatch.Add(endpoints1v1)
- handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1})
- // Add another endpoints
- fakeWatch.Add(endpoints2)
- handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2})
- // Modify endpoints1
- fakeWatch.Modify(endpoints1v2)
- handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2})
- // Delete endpoints1
- fakeWatch.Delete(endpoints1v2)
- handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2})
- // Delete endpoints2
- fakeWatch.Delete(endpoints2)
- handler.ValidateEndpoints(t, []*v1.Endpoints{})
- }
- func newSvcHandler(t *testing.T, svcs []*v1.Service, done func()) ServiceHandler {
- shm := &ServiceHandlerMock{
- state: make(map[types.NamespacedName]*v1.Service),
- }
- shm.process = func(services []*v1.Service) {
- defer done()
- if !reflect.DeepEqual(services, svcs) {
- t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
- }
- }
- return shm
- }
- func newEpsHandler(t *testing.T, eps []*v1.Endpoints, done func()) EndpointsHandler {
- ehm := &EndpointsHandlerMock{
- state: make(map[types.NamespacedName]*v1.Endpoints),
- }
- ehm.process = func(endpoints []*v1.Endpoints) {
- defer done()
- if !reflect.DeepEqual(eps, endpoints) {
- t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
- }
- }
- return ehm
- }
- func TestInitialSync(t *testing.T) {
- svc1 := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
- }
- svc2 := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
- }
- eps1 := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
- }
- eps2 := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
- }
- var wg sync.WaitGroup
- // Wait for both services and endpoints handler.
- wg.Add(2)
- // Setup fake api client.
- client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
- sharedInformers := informers.NewSharedInformerFactory(client, 0)
- svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0)
- epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0)
- svcHandler := newSvcHandler(t, []*v1.Service{svc2, svc1}, wg.Done)
- svcConfig.RegisterEventHandler(svcHandler)
- epsHandler := newEpsHandler(t, []*v1.Endpoints{eps2, eps1}, wg.Done)
- epsConfig.RegisterEventHandler(epsHandler)
- stopCh := make(chan struct{})
- defer close(stopCh)
- go sharedInformers.Start(stopCh)
- go svcConfig.Run(stopCh)
- go epsConfig.Run(stopCh)
- wg.Wait()
- }
|