123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package service
- import (
- "context"
- "fmt"
- "net"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/onsi/ginkgo"
- v1 "k8s.io/api/core/v1"
- policyv1beta1 "k8s.io/api/policy/v1beta1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/intstr"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
- "k8s.io/kubernetes/test/e2e/framework"
- e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
- e2enode "k8s.io/kubernetes/test/e2e/framework/node"
- e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
- e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
- testutils "k8s.io/kubernetes/test/utils"
- imageutils "k8s.io/kubernetes/test/utils/image"
- )
- // NodePortRange should match whatever the default/configured range is
- var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
- // TestJig is a test jig to help service testing.
- type TestJig struct {
- Client clientset.Interface
- Namespace string
- Name string
- ID string
- Labels map[string]string
- }
- // NewTestJig allocates and inits a new TestJig.
- func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
- j := &TestJig{}
- j.Client = client
- j.Namespace = namespace
- j.Name = name
- j.ID = j.Name + "-" + string(uuid.NewUUID())
- j.Labels = map[string]string{"testid": j.ID}
- return j
- }
- // newServiceTemplate returns the default v1.Service template for this j, but
- // does not actually create the Service. The default Service has the same name
- // as the j and exposes the given port.
- func (j *TestJig) newServiceTemplate(proto v1.Protocol, port int32) *v1.Service {
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: j.Namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ServiceSpec{
- Selector: j.Labels,
- Ports: []v1.ServicePort{
- {
- Protocol: proto,
- Port: port,
- },
- },
- },
- }
- return service
- }
- // CreateTCPServiceWithPort creates a new TCP Service with given port based on the
- // j's defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *TestJig) CreateTCPServiceWithPort(tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
- svc := j.newServiceTemplate(v1.ProtocolTCP, port)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
- }
- return j.sanityCheckService(result, svc.Spec.Type)
- }
- // CreateTCPService creates a new TCP Service based on the j's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *TestJig) CreateTCPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
- svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
- }
- return j.sanityCheckService(result, svc.Spec.Type)
- }
- // CreateUDPService creates a new UDP Service based on the j's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *TestJig) CreateUDPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
- svc := j.newServiceTemplate(v1.ProtocolUDP, 80)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create UDP Service %q: %v", svc.Name, err)
- }
- return j.sanityCheckService(result, svc.Spec.Type)
- }
- // CreateExternalNameService creates a new ExternalName type Service based on the j's defaults.
- // Callers can provide a function to tweak the Service object before it is created.
- func (j *TestJig) CreateExternalNameService(tweak func(svc *v1.Service)) (*v1.Service, error) {
- svc := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: j.Namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ServiceSpec{
- Selector: j.Labels,
- ExternalName: "foo.example.com",
- Type: v1.ServiceTypeExternalName,
- },
- }
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create ExternalName Service %q: %v", svc.Name, err)
- }
- return j.sanityCheckService(result, svc.Spec.Type)
- }
- // ChangeServiceType updates the given service's ServiceType to the given newType.
- func (j *TestJig) ChangeServiceType(newType v1.ServiceType, timeout time.Duration) error {
- ingressIP := ""
- svc, err := j.UpdateService(func(s *v1.Service) {
- for _, ing := range s.Status.LoadBalancer.Ingress {
- if ing.IP != "" {
- ingressIP = ing.IP
- }
- }
- s.Spec.Type = newType
- s.Spec.Ports[0].NodePort = 0
- })
- if err != nil {
- return err
- }
- if ingressIP != "" {
- _, err = j.WaitForLoadBalancerDestroy(ingressIP, int(svc.Spec.Ports[0].Port), timeout)
- }
- return err
- }
- // CreateOnlyLocalNodePortService creates a NodePort service with
- // ExternalTrafficPolicy set to Local and sanity checks its nodePort.
- // If createPod is true, it also creates an RC with 1 replica of
- // the standard netexec container used everywhere in this test.
- func (j *TestJig) CreateOnlyLocalNodePortService(createPod bool) (*v1.Service, error) {
- ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=NodePort and ExternalTrafficPolicy=Local")
- svc, err := j.CreateTCPService(func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeNodePort
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
- })
- if err != nil {
- return nil, err
- }
- if createPod {
- ginkgo.By("creating a pod to be part of the service " + j.Name)
- _, err = j.Run(nil)
- if err != nil {
- return nil, err
- }
- }
- return svc, nil
- }
- // CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
- // ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
- // If createPod is true, it also creates an RC with 1 replica of
- // the standard netexec container used everywhere in this test.
- func (j *TestJig) CreateOnlyLocalLoadBalancerService(timeout time.Duration, createPod bool,
- tweak func(svc *v1.Service)) (*v1.Service, error) {
- _, err := j.CreateLoadBalancerService(timeout, func(svc *v1.Service) {
- ginkgo.By("setting ExternalTrafficPolicy=Local")
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- if tweak != nil {
- tweak(svc)
- }
- })
- if err != nil {
- return nil, err
- }
- if createPod {
- ginkgo.By("creating a pod to be part of the service " + j.Name)
- _, err = j.Run(nil)
- if err != nil {
- return nil, err
- }
- }
- ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
- return j.WaitForLoadBalancer(timeout)
- }
- // CreateLoadBalancerService creates a loadbalancer service and waits
- // for it to acquire an ingress IP.
- func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
- ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
- svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- // We need to turn affinity off for our LB distribution tests
- svc.Spec.SessionAffinity = v1.ServiceAffinityNone
- if tweak != nil {
- tweak(svc)
- }
- _, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err)
- }
- ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
- return j.WaitForLoadBalancer(timeout)
- }
- // GetEndpointNodes returns a map of nodenames:external-ip on which the
- // endpoints of the Service are running.
- func (j *TestJig) GetEndpointNodes() (map[string][]string, error) {
- nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
- if err != nil {
- return nil, err
- }
- epNodes, err := j.GetEndpointNodeNames()
- if err != nil {
- return nil, err
- }
- nodeMap := map[string][]string{}
- for _, n := range nodes.Items {
- if epNodes.Has(n.Name) {
- nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
- }
- }
- return nodeMap, nil
- }
- // GetEndpointNodeNames returns a string set of node names on which the
- // endpoints of the given Service are running.
- func (j *TestJig) GetEndpointNodeNames() (sets.String, error) {
- endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
- if err != nil {
- return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
- }
- if len(endpoints.Subsets) == 0 {
- return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
- }
- epNodes := sets.NewString()
- for _, ss := range endpoints.Subsets {
- for _, e := range ss.Addresses {
- if e.NodeName != nil {
- epNodes.Insert(*e.NodeName)
- }
- }
- }
- return epNodes, nil
- }
- // WaitForEndpointOnNode waits for a service endpoint on the given node.
- func (j *TestJig) WaitForEndpointOnNode(nodeName string) error {
- return wait.PollImmediate(framework.Poll, LoadBalancerPropagationTimeoutDefault, func() (bool, error) {
- endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
- if err != nil {
- framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
- return false, nil
- }
- if len(endpoints.Subsets) == 0 {
- framework.Logf("Expect endpoints with subsets, got none.")
- return false, nil
- }
- // TODO: Handle multiple endpoints
- if len(endpoints.Subsets[0].Addresses) == 0 {
- framework.Logf("Expected Ready endpoints - found none")
- return false, nil
- }
- epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
- framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
- if epHostName != nodeName {
- framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
- return false, nil
- }
- return true, nil
- })
- }
- // WaitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
- func (j *TestJig) WaitForAvailableEndpoint(timeout time.Duration) error {
- //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
- endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
- stopCh := make(chan struct{})
- endpointAvailable := false
- var controller cache.Controller
- _, controller = cache.NewInformer(
- &cache.ListWatch{
- ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
- options.FieldSelector = endpointSelector.String()
- obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(context.TODO(), options)
- return runtime.Object(obj), err
- },
- WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
- options.FieldSelector = endpointSelector.String()
- return j.Client.CoreV1().Endpoints(j.Namespace).Watch(context.TODO(), options)
- },
- },
- &v1.Endpoints{},
- 0,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- if e, ok := obj.(*v1.Endpoints); ok {
- if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
- endpointAvailable = true
- }
- }
- },
- UpdateFunc: func(old, cur interface{}) {
- if e, ok := cur.(*v1.Endpoints); ok {
- if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
- endpointAvailable = true
- }
- }
- },
- },
- )
- defer func() {
- close(stopCh)
- }()
- go controller.Run(stopCh)
- err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
- return endpointAvailable, nil
- })
- if err != nil {
- return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)
- }
- return nil
- }
- // sanityCheckService performs sanity checks on the given service; in particular, ensuring
- // that creating/updating a service allocates IPs, ports, etc, as needed. It does not
- // check for ingress assignment as that happens asynchronously after the Service is created.
- func (j *TestJig) sanityCheckService(svc *v1.Service, svcType v1.ServiceType) (*v1.Service, error) {
- if svcType == "" {
- svcType = v1.ServiceTypeClusterIP
- }
- if svc.Spec.Type != svcType {
- return nil, fmt.Errorf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
- }
- if svcType != v1.ServiceTypeExternalName {
- if svc.Spec.ExternalName != "" {
- return nil, fmt.Errorf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
- }
- if svc.Spec.ClusterIP == "" {
- return nil, fmt.Errorf("didn't get ClusterIP for non-ExternalName service")
- }
- } else {
- if svc.Spec.ClusterIP != "" {
- return nil, fmt.Errorf("unexpected Spec.ClusterIP (%s) for ExternalName service, expected empty", svc.Spec.ClusterIP)
- }
- }
- expectNodePorts := false
- if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
- expectNodePorts = true
- }
- for i, port := range svc.Spec.Ports {
- hasNodePort := (port.NodePort != 0)
- if hasNodePort != expectNodePorts {
- return nil, fmt.Errorf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
- }
- if hasNodePort {
- if !NodePortRange.Contains(int(port.NodePort)) {
- return nil, fmt.Errorf("out-of-range nodePort (%d) for service", port.NodePort)
- }
- }
- }
- // FIXME: this fails for tests that were changed from LoadBalancer to ClusterIP.
- // if svcType != v1.ServiceTypeLoadBalancer {
- // if len(svc.Status.LoadBalancer.Ingress) != 0 {
- // return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress on non-LoadBalancer service")
- // }
- // }
- return svc, nil
- }
- // UpdateService fetches a service, calls the update function on it, and
- // then attempts to send the updated service. It tries up to 3 times in the
- // face of timeouts and conflicts.
- func (j *TestJig) UpdateService(update func(*v1.Service)) (*v1.Service, error) {
- for i := 0; i < 3; i++ {
- service, err := j.Client.CoreV1().Services(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to get Service %q: %v", j.Name, err)
- }
- update(service)
- result, err := j.Client.CoreV1().Services(j.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
- if err == nil {
- return j.sanityCheckService(result, service.Spec.Type)
- }
- if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
- return nil, fmt.Errorf("failed to update Service %q: %v", j.Name, err)
- }
- }
- return nil, fmt.Errorf("too many retries updating Service %q", j.Name)
- }
- // WaitForNewIngressIP waits for the given service to get a new ingress IP, or returns an error after the given timeout
- func (j *TestJig) WaitForNewIngressIP(existingIP string, timeout time.Duration) (*v1.Service, error) {
- framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, j.Name)
- service, err := j.waitForCondition(timeout, "have a new ingress IP", func(svc *v1.Service) bool {
- if len(svc.Status.LoadBalancer.Ingress) == 0 {
- return false
- }
- ip := svc.Status.LoadBalancer.Ingress[0].IP
- if ip == "" || ip == existingIP {
- return false
- }
- return true
- })
- if err != nil {
- return nil, err
- }
- return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
- }
- // ChangeServiceNodePort changes node ports of the given service.
- func (j *TestJig) ChangeServiceNodePort(initial int) (*v1.Service, error) {
- var err error
- var service *v1.Service
- for i := 1; i < NodePortRange.Size; i++ {
- offs1 := initial - NodePortRange.Base
- offs2 := (offs1 + i) % NodePortRange.Size
- newPort := NodePortRange.Base + offs2
- service, err = j.UpdateService(func(s *v1.Service) {
- s.Spec.Ports[0].NodePort = int32(newPort)
- })
- if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
- framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
- continue
- }
- // Otherwise err was nil or err was a real error
- break
- }
- return service, err
- }
- // WaitForLoadBalancer waits the given service to have a LoadBalancer, or returns an error after the given timeout
- func (j *TestJig) WaitForLoadBalancer(timeout time.Duration) (*v1.Service, error) {
- framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, j.Name)
- service, err := j.waitForCondition(timeout, "have a load balancer", func(svc *v1.Service) bool {
- return len(svc.Status.LoadBalancer.Ingress) > 0
- })
- if err != nil {
- return nil, err
- }
- for i, ing := range service.Status.LoadBalancer.Ingress {
- if ing.IP == "" && ing.Hostname == "" {
- return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
- }
- }
- return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
- }
- // WaitForLoadBalancerDestroy waits the given service to destroy a LoadBalancer, or returns an error after the given timeout
- func (j *TestJig) WaitForLoadBalancerDestroy(ip string, port int, timeout time.Duration) (*v1.Service, error) {
- // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
- defer func() {
- if err := framework.EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
- framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
- }
- }()
- framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, j.Name)
- service, err := j.waitForCondition(timeout, "have no load balancer", func(svc *v1.Service) bool {
- return len(svc.Status.LoadBalancer.Ingress) == 0
- })
- if err != nil {
- return nil, err
- }
- return j.sanityCheckService(service, service.Spec.Type)
- }
- func (j *TestJig) waitForCondition(timeout time.Duration, message string, conditionFn func(*v1.Service) bool) (*v1.Service, error) {
- var service *v1.Service
- pollFunc := func() (bool, error) {
- svc, err := j.Client.CoreV1().Services(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if conditionFn(svc) {
- service = svc
- return true, nil
- }
- return false, nil
- }
- if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
- return nil, fmt.Errorf("timed out waiting for service %q to %s", j.Name, message)
- }
- return service, nil
- }
- // newRCTemplate returns the default v1.ReplicationController object for
- // this j, but does not actually create the RC. The default RC has the same
- // name as the j and runs the "netexec" container.
- func (j *TestJig) newRCTemplate() *v1.ReplicationController {
- var replicas int32 = 1
- var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
- rc := &v1.ReplicationController{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: j.Namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ReplicationControllerSpec{
- Replicas: &replicas,
- Selector: j.Labels,
- Template: &v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: j.Labels,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "netexec",
- Image: imageutils.GetE2EImage(imageutils.Agnhost),
- Args: []string{"netexec", "--http-port=80", "--udp-port=80"},
- ReadinessProbe: &v1.Probe{
- PeriodSeconds: 3,
- Handler: v1.Handler{
- HTTPGet: &v1.HTTPGetAction{
- Port: intstr.FromInt(80),
- Path: "/hostName",
- },
- },
- },
- },
- },
- TerminationGracePeriodSeconds: &grace,
- },
- },
- },
- }
- return rc
- }
- // AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
- func (j *TestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
- var replicas int32 = 2
- rc.Spec.Replicas = &replicas
- if rc.Spec.Template.Spec.Affinity == nil {
- rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
- }
- if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
- }
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
- v1.PodAffinityTerm{
- LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
- Namespaces: nil,
- TopologyKey: "kubernetes.io/hostname",
- })
- }
- // CreatePDB returns a PodDisruptionBudget for the given ReplicationController, or returns an error if a PodDisruptionBudget isn't ready
- func (j *TestJig) CreatePDB(rc *v1.ReplicationController) (*policyv1beta1.PodDisruptionBudget, error) {
- pdb := j.newPDBTemplate(rc)
- newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create PDB %q %v", pdb.Name, err)
- }
- if err := j.waitForPdbReady(); err != nil {
- return nil, fmt.Errorf("failed waiting for PDB to be ready: %v", err)
- }
- return newPdb, nil
- }
- // newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
- // this j, but does not actually create the PDB. The default PDB specifies a
- // MinAvailable of N-1 and matches the pods created by the RC.
- func (j *TestJig) newPDBTemplate(rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
- minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
- pdb := &policyv1beta1.PodDisruptionBudget{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: j.Namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: policyv1beta1.PodDisruptionBudgetSpec{
- MinAvailable: &minAvailable,
- Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
- },
- }
- return pdb
- }
- // Run creates a ReplicationController and Pod(s) and waits for the
- // Pod(s) to be running. Callers can provide a function to tweak the RC object
- // before it is created.
- func (j *TestJig) Run(tweak func(rc *v1.ReplicationController)) (*v1.ReplicationController, error) {
- rc := j.newRCTemplate()
- if tweak != nil {
- tweak(rc)
- }
- result, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).Create(context.TODO(), rc, metav1.CreateOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to create RC %q: %v", rc.Name, err)
- }
- pods, err := j.waitForPodsCreated(int(*(rc.Spec.Replicas)))
- if err != nil {
- return nil, fmt.Errorf("failed to create pods: %v", err)
- }
- if err := j.waitForPodsReady(pods); err != nil {
- return nil, fmt.Errorf("failed waiting for pods to be running: %v", err)
- }
- return result, nil
- }
- // Scale scales pods to the given replicas
- func (j *TestJig) Scale(replicas int) error {
- rc := j.Name
- scale, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).GetScale(context.TODO(), rc, metav1.GetOptions{})
- if err != nil {
- return fmt.Errorf("failed to get scale for RC %q: %v", rc, err)
- }
- scale.ResourceVersion = "" // indicate the scale update should be unconditional
- scale.Spec.Replicas = int32(replicas)
- _, err = j.Client.CoreV1().ReplicationControllers(j.Namespace).UpdateScale(context.TODO(), rc, scale, metav1.UpdateOptions{})
- if err != nil {
- return fmt.Errorf("failed to scale RC %q: %v", rc, err)
- }
- pods, err := j.waitForPodsCreated(replicas)
- if err != nil {
- return fmt.Errorf("failed waiting for pods: %v", err)
- }
- if err := j.waitForPodsReady(pods); err != nil {
- return fmt.Errorf("failed waiting for pods to be running: %v", err)
- }
- return nil
- }
- func (j *TestJig) waitForPdbReady() error {
- timeout := 2 * time.Minute
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
- pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
- if err != nil {
- return err
- }
- if pdb.Status.DisruptionsAllowed > 0 {
- return nil
- }
- }
- return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
- }
- func (j *TestJig) waitForPodsCreated(replicas int) ([]string, error) {
- timeout := 2 * time.Minute
- // List the pods, making sure we observe all the replicas.
- label := labels.SelectorFromSet(labels.Set(j.Labels))
- framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
- options := metav1.ListOptions{LabelSelector: label.String()}
- pods, err := j.Client.CoreV1().Pods(j.Namespace).List(context.TODO(), options)
- if err != nil {
- return nil, err
- }
- found := []string{}
- for _, pod := range pods.Items {
- if pod.DeletionTimestamp != nil {
- continue
- }
- found = append(found, pod.Name)
- }
- if len(found) == replicas {
- framework.Logf("Found all %d pods", replicas)
- return found, nil
- }
- framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
- }
- return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
- }
- func (j *TestJig) waitForPodsReady(pods []string) error {
- timeout := 2 * time.Minute
- if !e2epod.CheckPodsRunningReady(j.Client, j.Namespace, pods, timeout) {
- return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
- }
- return nil
- }
- func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) error {
- return testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod)
- }
- func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) error {
- // If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort
- isClusterIPV46, err := regexp.MatchString(e2enetwork.RegexIPv4+"||"+e2enetwork.RegexIPv6, clusterIP)
- if err != nil {
- return fmt.Errorf("unable to parse ClusterIP: %s", clusterIP)
- }
- if isClusterIPV46 {
- return testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod)
- }
- return nil
- }
- func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod, clusterIP string) error {
- internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
- externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
- isClusterIPV4 := net.ParseIP(clusterIP).To4() != nil
- for _, internalAddr := range internalAddrs {
- // If the node's internal address points to localhost, then we are not
- // able to test the service reachability via that address
- if isInvalidOrLocalhostAddress(internalAddr) {
- framework.Logf("skipping testEndpointReachability() for internal adddress %s", internalAddr)
- continue
- }
- isNodeInternalIPV4 := net.ParseIP(internalAddr).To4() != nil
- // Check service reachability on the node internalIP which is same family
- // as clusterIP
- if isClusterIPV4 != isNodeInternalIPV4 {
- framework.Logf("skipping testEndpointReachability() for internal adddress %s as it does not match clusterIP (%s) family", internalAddr, clusterIP)
- continue
- }
- err := testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod)
- if err != nil {
- return err
- }
- }
- for _, externalAddr := range externalAddrs {
- isNodeExternalIPV4 := net.ParseIP(externalAddr).To4() != nil
- if isClusterIPV4 != isNodeExternalIPV4 {
- framework.Logf("skipping testEndpointReachability() for external adddress %s as it does not match clusterIP (%s) family", externalAddr, clusterIP)
- continue
- }
- err := testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // isInvalidOrLocalhostAddress returns `true` if the provided `ip` is either not
- // parsable or the loopback address. Otherwise it will return `false`.
- func isInvalidOrLocalhostAddress(ip string) bool {
- parsedIP := net.ParseIP(ip)
- if parsedIP == nil || parsedIP.IsLoopback() {
- return true
- }
- return false
- }
- // testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
- // TCP and UDP protocol based service are supported at this moment
- // TODO: add support to test SCTP Protocol based services.
- func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) error {
- ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
- cmd := ""
- switch protocol {
- case v1.ProtocolTCP:
- cmd = fmt.Sprintf("nc -zv -t -w 2 %s %v", endpoint, port)
- case v1.ProtocolUDP:
- cmd = fmt.Sprintf("nc -zv -u -w 2 %s %v", endpoint, port)
- default:
- return fmt.Errorf("service reachablity check is not supported for %v", protocol)
- }
- err := wait.PollImmediate(1*time.Second, ServiceReachabilityShortPollTimeout, func() (bool, error) {
- if _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
- framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
- return false, nil
- }
- return true, nil
- })
- if err != nil {
- return fmt.Errorf("service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
- }
- return nil
- }
- // checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over
- // - ServiceName:ServicePort, ClusterIP:ServicePort
- func (j *TestJig) checkClusterIPServiceReachability(svc *v1.Service, pod *v1.Pod) error {
- clusterIP := svc.Spec.ClusterIP
- servicePorts := svc.Spec.Ports
- err := j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
- if err != nil {
- return err
- }
- for _, servicePort := range servicePorts {
- err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
- if err != nil {
- return err
- }
- err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // checkNodePortServiceReachability ensures that service of type nodePort are reachable
- // - Internal clients should be reachable to service over -
- // ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort
- // - External clients should be reachable to service over -
- // NodePublicIPs:NodePort
- func (j *TestJig) checkNodePortServiceReachability(svc *v1.Service, pod *v1.Pod) error {
- clusterIP := svc.Spec.ClusterIP
- servicePorts := svc.Spec.Ports
- // Consider only 2 nodes for testing
- nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, 2)
- if err != nil {
- return err
- }
- err = j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
- if err != nil {
- return err
- }
- for _, servicePort := range servicePorts {
- err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
- if err != nil {
- return err
- }
- err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
- if err != nil {
- return err
- }
- err = testReachabilityOverNodePorts(nodes, servicePort, pod, clusterIP)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set
- // FQDN of kubernetes is used as externalName(for air tight platforms).
- func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) error {
- // Service must resolve to IP
- cmd := fmt.Sprintf("nslookup %s", svc.Name)
- _, err := framework.RunHostCmd(pod.Namespace, pod.Name, cmd)
- if err != nil {
- return fmt.Errorf("ExternalName service %q must resolve to IP", pod.Namespace+"/"+pod.Name)
- }
- return nil
- }
- // CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName.
- func (j *TestJig) CheckServiceReachability(svc *v1.Service, pod *v1.Pod) error {
- svcType := svc.Spec.Type
- _, err := j.sanityCheckService(svc, svcType)
- if err != nil {
- return err
- }
- switch svcType {
- case v1.ServiceTypeClusterIP:
- return j.checkClusterIPServiceReachability(svc, pod)
- case v1.ServiceTypeNodePort:
- return j.checkNodePortServiceReachability(svc, pod)
- case v1.ServiceTypeExternalName:
- return j.checkExternalServiceReachability(svc, pod)
- default:
- return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
- }
- }
- // CreateServicePods creates a replication controller with the label same as service. Service listens to HTTP.
- func (j *TestJig) CreateServicePods(replica int) error {
- config := testutils.RCConfig{
- Client: j.Client,
- Name: j.Name,
- Image: framework.ServeHostnameImage,
- Command: []string{"/agnhost", "serve-hostname"},
- Namespace: j.Namespace,
- Labels: j.Labels,
- PollInterval: 3 * time.Second,
- Timeout: framework.PodReadyBeforeTimeout,
- Replicas: replica,
- }
- return e2erc.RunRC(config)
- }
- // CreateTCPUDPServicePods creates a replication controller with the label same as service. Service listens to TCP and UDP.
- func (j *TestJig) CreateTCPUDPServicePods(replica int) error {
- config := testutils.RCConfig{
- Client: j.Client,
- Name: j.Name,
- Image: framework.ServeHostnameImage,
- Command: []string{"/agnhost", "serve-hostname", "--http=false", "--tcp", "--udp"},
- Namespace: j.Namespace,
- Labels: j.Labels,
- PollInterval: 3 * time.Second,
- Timeout: framework.PodReadyBeforeTimeout,
- Replicas: replica,
- }
- return e2erc.RunRC(config)
- }
|