123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package framework
- import (
- "bytes"
- "fmt"
- "net"
- "sort"
- "strconv"
- "strings"
- "time"
- v1 "k8s.io/api/core/v1"
- policyv1beta1 "k8s.io/api/policy/v1beta1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/intstr"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/util/retry"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
- e2elog "k8s.io/kubernetes/test/e2e/framework/log"
- e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
- testutils "k8s.io/kubernetes/test/utils"
- imageutils "k8s.io/kubernetes/test/utils/image"
- "github.com/onsi/ginkgo"
- )
- const (
- // KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
- // to not notice a Service update, such as type=NodePort.
- // TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
- // liberal. Fix tracked in #20567.
- KubeProxyLagTimeout = 5 * time.Minute
- // KubeProxyEndpointLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
- // to not notice an Endpoint update.
- KubeProxyEndpointLagTimeout = 30 * time.Second
- // LoadBalancerLagTimeoutDefault is the maximum time a load balancer is allowed to
- // not respond after creation.
- LoadBalancerLagTimeoutDefault = 2 * time.Minute
- // LoadBalancerLagTimeoutAWS is the delay between ELB creation and serving traffic
- // on AWS. A few minutes is typical, so use 10m.
- LoadBalancerLagTimeoutAWS = 10 * time.Minute
- // LoadBalancerCreateTimeoutDefault is the default time to wait for a load balancer to be created/modified.
- // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
- LoadBalancerCreateTimeoutDefault = 20 * time.Minute
- // LoadBalancerCreateTimeoutLarge is the maximum time to wait for a load balancer to be created/modified.
- LoadBalancerCreateTimeoutLarge = 2 * time.Hour
- // LoadBalancerCleanupTimeout is the time required by the loadbalancer to cleanup, proportional to numApps/Ing.
- // Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
- LoadBalancerCleanupTimeout = 15 * time.Minute
- // LoadBalancerPollTimeout is the time required by the loadbalancer to poll.
- // On average it takes ~6 minutes for a single backend to come online in GCE.
- LoadBalancerPollTimeout = 15 * time.Minute
- // LoadBalancerPollInterval is the interval value in which the loadbalancer polls.
- LoadBalancerPollInterval = 30 * time.Second
- // LargeClusterMinNodesNumber is the number of nodes which a large cluster consists of.
- LargeClusterMinNodesNumber = 100
- // MaxNodesForEndpointsTests is the max number for testing endpoints.
- // Don't test with more than 3 nodes.
- // Many tests create an endpoint per node, in large clusters, this is
- // resource and time intensive.
- MaxNodesForEndpointsTests = 3
- // ServiceTestTimeout is used for most polling/waiting activities
- ServiceTestTimeout = 60 * time.Second
- // GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in
- // one instance group on GCP.
- GCPMaxInstancesInInstanceGroup = 2000
- // AffinityConfirmCount is the number of needed continuous requests to confirm that
- // affinity is enabled.
- AffinityConfirmCount = 15
- )
- // ServiceNodePortRange should match whatever the default/configured range is
- var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
- // ServiceTestJig is a test jig to help service testing.
- type ServiceTestJig struct {
- ID string
- Name string
- Client clientset.Interface
- Labels map[string]string
- }
- // PodNode is a pod-node pair indicating which node a given pod is running on
- type PodNode struct {
- // Pod represents pod name
- Pod string
- // Node represents node name
- Node string
- }
- // NewServiceTestJig allocates and inits a new ServiceTestJig.
- func NewServiceTestJig(client clientset.Interface, name string) *ServiceTestJig {
- j := &ServiceTestJig{}
- j.Client = client
- j.Name = name
- j.ID = j.Name + "-" + string(uuid.NewUUID())
- j.Labels = map[string]string{"testid": j.ID}
- return j
- }
- // newServiceTemplate returns the default v1.Service template for this jig, but
- // does not actually create the Service. The default Service has the same name
- // as the jig and exposes the given port.
- func (j *ServiceTestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ServiceSpec{
- Selector: j.Labels,
- Ports: []v1.ServicePort{
- {
- Protocol: proto,
- Port: port,
- },
- },
- },
- }
- return service
- }
- // CreateTCPServiceWithPort creates a new TCP Service with given port based on the
- // jig's defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
- svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(namespace).Create(svc)
- if err != nil {
- Failf("Failed to create TCP Service %q: %v", svc.Name, err)
- }
- return result
- }
- // CreateTCPServiceOrFail creates a new TCP Service based on the jig's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
- svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(namespace).Create(svc)
- if err != nil {
- Failf("Failed to create TCP Service %q: %v", svc.Name, err)
- }
- return result
- }
- // CreateUDPServiceOrFail creates a new UDP Service based on the jig's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
- svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(namespace).Create(svc)
- if err != nil {
- Failf("Failed to create UDP Service %q: %v", svc.Name, err)
- }
- return result
- }
- // CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults.
- // Callers can provide a function to tweak the Service object before it is created.
- func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
- svc := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ServiceSpec{
- Selector: j.Labels,
- ExternalName: "foo.example.com",
- Type: v1.ServiceTypeExternalName,
- },
- }
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.CoreV1().Services(namespace).Create(svc)
- if err != nil {
- Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
- }
- return result
- }
- // CreateServiceWithServicePort creates a new Service with ServicePort.
- func (j *ServiceTestJig) CreateServiceWithServicePort(labels map[string]string, namespace string, ports []v1.ServicePort) (*v1.Service, error) {
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: j.Name,
- },
- Spec: v1.ServiceSpec{
- Selector: labels,
- Ports: ports,
- },
- }
- return j.Client.CoreV1().Services(namespace).Create(service)
- }
- // ChangeServiceType updates the given service's ServiceType to the given newType.
- func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
- ingressIP := ""
- svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
- for _, ing := range s.Status.LoadBalancer.Ingress {
- if ing.IP != "" {
- ingressIP = ing.IP
- }
- }
- s.Spec.Type = newType
- s.Spec.Ports[0].NodePort = 0
- })
- if ingressIP != "" {
- j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
- }
- }
- // CreateOnlyLocalNodePortService creates a NodePort service with
- // ExternalTrafficPolicy set to Local and sanity checks its nodePort.
- // If createPod is true, it also creates an RC with 1 replica of
- // the standard netexec container used everywhere in this test.
- func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
- ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
- svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeNodePort
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
- })
- if createPod {
- ginkgo.By("creating a pod to be part of the service " + serviceName)
- j.RunOrFail(namespace, nil)
- }
- j.SanityCheckService(svc, v1.ServiceTypeNodePort)
- return svc
- }
- // CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
- // ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
- // If createPod is true, it also creates an RC with 1 replica of
- // the standard netexec container used everywhere in this test.
- func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
- tweak func(svc *v1.Service)) *v1.Service {
- ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
- svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- // We need to turn affinity off for our LB distribution tests
- svc.Spec.SessionAffinity = v1.ServiceAffinityNone
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- if tweak != nil {
- tweak(svc)
- }
- })
- if createPod {
- ginkgo.By("creating a pod to be part of the service " + serviceName)
- j.RunOrFail(namespace, nil)
- }
- ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
- svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
- j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
- return svc
- }
- // CreateLoadBalancerService creates a loadbalancer service and waits
- // for it to acquire an ingress IP.
- func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
- ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
- svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- // We need to turn affinity off for our LB distribution tests
- svc.Spec.SessionAffinity = v1.ServiceAffinityNone
- if tweak != nil {
- tweak(svc)
- }
- })
- ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
- svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
- j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
- return svc
- }
- // GetNodeAddresses returns a list of addresses of the given addressType for the given node
- func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
- for j := range node.Status.Addresses {
- nodeAddress := &node.Status.Addresses[j]
- if nodeAddress.Type == addressType && nodeAddress.Address != "" {
- ips = append(ips, nodeAddress.Address)
- }
- }
- return
- }
- // CollectAddresses returns a list of addresses of the given addressType for the given list of nodes
- func CollectAddresses(nodes *v1.NodeList, addressType v1.NodeAddressType) []string {
- ips := []string{}
- for i := range nodes.Items {
- ips = append(ips, GetNodeAddresses(&nodes.Items[i], addressType)...)
- }
- return ips
- }
- // GetNodePublicIps returns a public IP list of nodes.
- func GetNodePublicIps(c clientset.Interface) ([]string, error) {
- nodes := GetReadySchedulableNodesOrDie(c)
- ips := CollectAddresses(nodes, v1.NodeExternalIP)
- if len(ips) == 0 {
- // If ExternalIP isn't set, assume the test programs can reach the InternalIP
- ips = CollectAddresses(nodes, v1.NodeInternalIP)
- }
- return ips, nil
- }
- // PickNodeIP picks one public node IP
- func PickNodeIP(c clientset.Interface) string {
- publicIps, err := GetNodePublicIps(c)
- ExpectNoError(err)
- if len(publicIps) == 0 {
- Failf("got unexpected number (%d) of public IPs", len(publicIps))
- }
- ip := publicIps[0]
- return ip
- }
- // PodNodePairs return PodNode pairs for all pods in a namespace
- func PodNodePairs(c clientset.Interface, ns string) ([]PodNode, error) {
- var result []PodNode
- podList, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
- if err != nil {
- return result, err
- }
- for _, pod := range podList.Items {
- result = append(result, PodNode{
- Pod: pod.Name,
- Node: pod.Spec.NodeName,
- })
- }
- return result, nil
- }
- // GetEndpointNodes returns a map of nodenames:external-ip on which the
- // endpoints of the given Service are running.
- func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
- nodes := j.GetNodes(MaxNodesForEndpointsTests)
- endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
- if err != nil {
- Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
- }
- if len(endpoints.Subsets) == 0 {
- Failf("Endpoint has no subsets, cannot determine node addresses.")
- }
- epNodes := sets.NewString()
- for _, ss := range endpoints.Subsets {
- for _, e := range ss.Addresses {
- if e.NodeName != nil {
- epNodes.Insert(*e.NodeName)
- }
- }
- }
- nodeMap := map[string][]string{}
- for _, n := range nodes.Items {
- if epNodes.Has(n.Name) {
- nodeMap[n.Name] = GetNodeAddresses(&n, v1.NodeExternalIP)
- }
- }
- return nodeMap
- }
- // GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
- // where we don't eg: want to create an endpoint per node.
- func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
- nodes = GetReadySchedulableNodesOrDie(j.Client)
- if len(nodes.Items) <= maxNodesForTest {
- maxNodesForTest = len(nodes.Items)
- }
- nodes.Items = nodes.Items[:maxNodesForTest]
- return nodes
- }
- // GetNodesNames returns a list of names of the first maxNodesForTest nodes
- func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
- nodes := j.GetNodes(maxNodesForTest)
- nodesNames := []string{}
- for _, node := range nodes.Items {
- nodesNames = append(nodesNames, node.Name)
- }
- return nodesNames
- }
- // WaitForEndpointOnNode waits for a service endpoint on the given node.
- func (j *ServiceTestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
- err := wait.PollImmediate(Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
- endpoints, err := j.Client.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
- if err != nil {
- e2elog.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
- return false, nil
- }
- if len(endpoints.Subsets) == 0 {
- e2elog.Logf("Expect endpoints with subsets, got none.")
- return false, nil
- }
- // TODO: Handle multiple endpoints
- if len(endpoints.Subsets[0].Addresses) == 0 {
- e2elog.Logf("Expected Ready endpoints - found none")
- return false, nil
- }
- epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
- e2elog.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
- if epHostName != nodeName {
- e2elog.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
- return false, nil
- }
- return true, nil
- })
- ExpectNoError(err)
- }
- // SanityCheckService performs sanity checks on the given service
- func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
- if svc.Spec.Type != svcType {
- Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
- }
- if svcType != v1.ServiceTypeExternalName {
- if svc.Spec.ExternalName != "" {
- Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
- }
- if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
- Failf("didn't get ClusterIP for non-ExternamName service")
- }
- } else {
- if svc.Spec.ClusterIP != "" {
- Failf("unexpected Spec.ClusterIP (%s) for ExternamName service, expected empty", svc.Spec.ClusterIP)
- }
- }
- expectNodePorts := false
- if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
- expectNodePorts = true
- }
- for i, port := range svc.Spec.Ports {
- hasNodePort := (port.NodePort != 0)
- if hasNodePort != expectNodePorts {
- Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
- }
- if hasNodePort {
- if !ServiceNodePortRange.Contains(int(port.NodePort)) {
- Failf("out-of-range nodePort (%d) for service", port.NodePort)
- }
- }
- }
- expectIngress := false
- if svcType == v1.ServiceTypeLoadBalancer {
- expectIngress = true
- }
- hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
- if hasIngress != expectIngress {
- Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
- }
- if hasIngress {
- for i, ing := range svc.Status.LoadBalancer.Ingress {
- if ing.IP == "" && ing.Hostname == "" {
- Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
- }
- }
- }
- }
- // UpdateService fetches a service, calls the update function on it, and
- // then attempts to send the updated service. It tries up to 3 times in the
- // face of timeouts and conflicts.
- func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
- for i := 0; i < 3; i++ {
- service, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to get Service %q: %v", name, err)
- }
- update(service)
- service, err = j.Client.CoreV1().Services(namespace).Update(service)
- if err == nil {
- return service, nil
- }
- if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
- return nil, fmt.Errorf("failed to update Service %q: %v", name, err)
- }
- }
- return nil, fmt.Errorf("too many retries updating Service %q", name)
- }
- // UpdateServiceOrFail fetches a service, calls the update function on it, and
- // then attempts to send the updated service. It tries up to 3 times in the
- // face of timeouts and conflicts.
- func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
- svc, err := j.UpdateService(namespace, name, update)
- if err != nil {
- Failf(err.Error())
- }
- return svc
- }
- // WaitForNewIngressIPOrFail waits for the given service to get a new ingress IP, or fails after the given timeout
- func (j *ServiceTestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string, timeout time.Duration) *v1.Service {
- e2elog.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, name)
- service := j.waitForConditionOrFail(namespace, name, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
- if len(svc.Status.LoadBalancer.Ingress) == 0 {
- return false
- }
- ip := svc.Status.LoadBalancer.Ingress[0].IP
- if ip == "" || ip == existingIP {
- return false
- }
- return true
- })
- return service
- }
- // ChangeServiceNodePortOrFail changes node ports of the given service.
- func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
- var err error
- var service *v1.Service
- for i := 1; i < ServiceNodePortRange.Size; i++ {
- offs1 := initial - ServiceNodePortRange.Base
- offs2 := (offs1 + i) % ServiceNodePortRange.Size
- newPort := ServiceNodePortRange.Base + offs2
- service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
- s.Spec.Ports[0].NodePort = int32(newPort)
- })
- if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
- e2elog.Logf("tried nodePort %d, but it is in use, will try another", newPort)
- continue
- }
- // Otherwise err was nil or err was a real error
- break
- }
- if err != nil {
- Failf("Could not change the nodePort: %v", err)
- }
- return service
- }
- // WaitForLoadBalancerOrFail waits the given service to have a LoadBalancer, or fails after the given timeout
- func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
- e2elog.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
- service := j.waitForConditionOrFail(namespace, name, timeout, "have a load balancer", func(svc *v1.Service) bool {
- return len(svc.Status.LoadBalancer.Ingress) > 0
- })
- return service
- }
- // WaitForLoadBalancerDestroyOrFail waits the given service to destroy a LoadBalancer, or fails after the given timeout
- func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
- // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
- defer func() {
- if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
- e2elog.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
- }
- }()
- e2elog.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
- service := j.waitForConditionOrFail(namespace, name, timeout, "have no load balancer", func(svc *v1.Service) bool {
- return len(svc.Status.LoadBalancer.Ingress) == 0
- })
- return service
- }
- func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) *v1.Service {
- var service *v1.Service
- pollFunc := func() (bool, error) {
- svc, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if conditionFn(svc) {
- service = svc
- return true, nil
- }
- return false, nil
- }
- if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
- Failf("Timed out waiting for service %q to %s", name, message)
- }
- return service
- }
- // newRCTemplate returns the default v1.ReplicationController object for
- // this jig, but does not actually create the RC. The default RC has the same
- // name as the jig and runs the "netexec" container.
- func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
- var replicas int32 = 1
- var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
- rc := &v1.ReplicationController{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: v1.ReplicationControllerSpec{
- Replicas: &replicas,
- Selector: j.Labels,
- Template: &v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: j.Labels,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "netexec",
- Image: imageutils.GetE2EImage(imageutils.Netexec),
- Args: []string{"--http-port=80", "--udp-port=80"},
- ReadinessProbe: &v1.Probe{
- PeriodSeconds: 3,
- Handler: v1.Handler{
- HTTPGet: &v1.HTTPGetAction{
- Port: intstr.FromInt(80),
- Path: "/hostName",
- },
- },
- },
- },
- },
- TerminationGracePeriodSeconds: &grace,
- },
- },
- },
- }
- return rc
- }
- // AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
- func (j *ServiceTestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
- var replicas int32 = 2
- rc.Spec.Replicas = &replicas
- if rc.Spec.Template.Spec.Affinity == nil {
- rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
- }
- if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
- }
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
- rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
- v1.PodAffinityTerm{
- LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
- Namespaces: nil,
- TopologyKey: "kubernetes.io/hostname",
- })
- }
- // CreatePDBOrFail returns a PodDisruptionBudget for the given ReplicationController, or fails if a PodDisruptionBudget isn't ready
- func (j *ServiceTestJig) CreatePDBOrFail(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
- pdb := j.newPDBTemplate(namespace, rc)
- newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
- if err != nil {
- Failf("Failed to create PDB %q %v", pdb.Name, err)
- }
- if err := j.waitForPdbReady(namespace); err != nil {
- Failf("Failed waiting for PDB to be ready: %v", err)
- }
- return newPdb
- }
- // newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
- // this jig, but does not actually create the PDB. The default PDB specifies a
- // MinAvailable of N-1 and matches the pods created by the RC.
- func (j *ServiceTestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
- minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
- pdb := &policyv1beta1.PodDisruptionBudget{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: policyv1beta1.PodDisruptionBudgetSpec{
- MinAvailable: &minAvailable,
- Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
- },
- }
- return pdb
- }
- // RunOrFail creates a ReplicationController and Pod(s) and waits for the
- // Pod(s) to be running. Callers can provide a function to tweak the RC object
- // before it is created.
- func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
- rc := j.newRCTemplate(namespace)
- if tweak != nil {
- tweak(rc)
- }
- result, err := j.Client.CoreV1().ReplicationControllers(namespace).Create(rc)
- if err != nil {
- Failf("Failed to create RC %q: %v", rc.Name, err)
- }
- pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
- if err != nil {
- Failf("Failed to create pods: %v", err)
- }
- if err := j.waitForPodsReady(namespace, pods); err != nil {
- Failf("Failed waiting for pods to be running: %v", err)
- }
- return result
- }
- // Scale scales pods to the given replicas
- func (j *ServiceTestJig) Scale(namespace string, replicas int) {
- rc := j.Name
- scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
- if err != nil {
- Failf("Failed to get scale for RC %q: %v", rc, err)
- }
- scale.Spec.Replicas = int32(replicas)
- _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
- if err != nil {
- Failf("Failed to scale RC %q: %v", rc, err)
- }
- pods, err := j.waitForPodsCreated(namespace, replicas)
- if err != nil {
- Failf("Failed waiting for pods: %v", err)
- }
- if err := j.waitForPodsReady(namespace, pods); err != nil {
- Failf("Failed waiting for pods to be running: %v", err)
- }
- }
- func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
- timeout := 2 * time.Minute
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
- pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(j.Name, metav1.GetOptions{})
- if err != nil {
- return err
- }
- if pdb.Status.PodDisruptionsAllowed > 0 {
- return nil
- }
- }
- return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
- }
- func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
- timeout := 2 * time.Minute
- // List the pods, making sure we observe all the replicas.
- label := labels.SelectorFromSet(labels.Set(j.Labels))
- e2elog.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
- options := metav1.ListOptions{LabelSelector: label.String()}
- pods, err := j.Client.CoreV1().Pods(namespace).List(options)
- if err != nil {
- return nil, err
- }
- found := []string{}
- for _, pod := range pods.Items {
- if pod.DeletionTimestamp != nil {
- continue
- }
- found = append(found, pod.Name)
- }
- if len(found) == replicas {
- e2elog.Logf("Found all %d pods", replicas)
- return found, nil
- }
- e2elog.Logf("Found %d/%d pods - will retry", len(found), replicas)
- }
- return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
- }
- func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
- timeout := 2 * time.Minute
- if !CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
- return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
- }
- return nil
- }
- // newNetexecPodSpec returns the pod spec of netexec pod
- func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: podName,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "netexec",
- Image: netexecImageName,
- Command: []string{
- "/netexec",
- fmt.Sprintf("--http-port=%d", httpPort),
- fmt.Sprintf("--udp-port=%d", udpPort),
- },
- Ports: []v1.ContainerPort{
- {
- Name: "http",
- ContainerPort: httpPort,
- },
- {
- Name: "udp",
- ContainerPort: udpPort,
- },
- },
- },
- },
- HostNetwork: hostNetwork,
- },
- }
- return pod
- }
- // LaunchNetexecPodOnNode launches a netexec pod on the given node.
- func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
- e2elog.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
- pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
- pod.Spec.NodeName = nodeName
- pod.ObjectMeta.Labels = j.Labels
- podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
- _, err := podClient.Create(pod)
- ExpectNoError(err)
- ExpectNoError(f.WaitForPodRunning(podName))
- e2elog.Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
- }
- // newEchoServerPodSpec returns the pod spec of echo server pod
- func newEchoServerPodSpec(podName string) *v1.Pod {
- port := 8080
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: podName,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "echoserver",
- Image: imageutils.GetE2EImage(imageutils.EchoServer),
- Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
- },
- },
- RestartPolicy: v1.RestartPolicyNever,
- },
- }
- return pod
- }
- // LaunchEchoserverPodOnNode launches a pod serving http on port 8080 to act
- // as the target for source IP preservation test. The client's source ip would
- // be echoed back by the web server.
- func (j *ServiceTestJig) LaunchEchoserverPodOnNode(f *Framework, nodeName, podName string) {
- e2elog.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
- pod := newEchoServerPodSpec(podName)
- pod.Spec.NodeName = nodeName
- pod.ObjectMeta.Labels = j.Labels
- podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
- _, err := podClient.Create(pod)
- ExpectNoError(err)
- ExpectNoError(f.WaitForPodRunning(podName))
- e2elog.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
- }
- // TestReachableHTTP tests that the given host serves HTTP on the given port.
- func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
- j.TestReachableHTTPWithRetriableErrorCodes(host, port, []int{}, timeout)
- }
- // TestReachableHTTPWithRetriableErrorCodes tests that the given host serves HTTP on the given port with the given retriableErrCodes.
- func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeHTTP(host, port, "/echo?msg=hello",
- &HTTPPokeParams{
- BodyContains: "hello",
- RetriableCodes: retriableErrCodes,
- })
- if result.Status == HTTPSuccess {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- if err == wait.ErrWaitTimeout {
- Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
- } else {
- Failf("Failed to reach HTTP service through %v:%v: %v", host, port, err)
- }
- }
- }
- // TestNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
- func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeHTTP(host, port, "/", nil)
- if result.Code == 0 {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
- }
- }
- // TestRejectedHTTP tests that the given host rejects a HTTP request on the given port.
- func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeHTTP(host, port, "/", nil)
- if result.Status == HTTPRefused {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- Failf("HTTP service %v:%v not rejected: %v", host, port, err)
- }
- }
- // TestReachableUDP tests that the given host serves UDP on the given port.
- func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
- Timeout: 3 * time.Second,
- Response: "hello",
- })
- if result.Status == UDPSuccess {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
- }
- }
- // TestNotReachableUDP tests that the given host doesn't serve UDP on the given port.
- func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
- if result.Status != UDPSuccess && result.Status != UDPError {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
- }
- }
- // TestRejectedUDP tests that the given host rejects a UDP request on the given port.
- func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) {
- pollfn := func() (bool, error) {
- result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
- if result.Status == UDPRefused {
- return true, nil
- }
- return false, nil // caller can retry
- }
- if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
- Failf("UDP service %v:%v not rejected: %v", host, port, err)
- }
- }
- // GetHTTPContent returns the content of the given url by HTTP.
- func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
- var body bytes.Buffer
- if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
- result := PokeHTTP(host, port, url, nil)
- if result.Status == HTTPSuccess {
- body.Write(result.Body)
- return true, nil
- }
- return false, nil
- }); pollErr != nil {
- Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
- }
- return body
- }
- func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
- ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
- url := fmt.Sprintf("http://%s%s", ipPort, request)
- if ip == "" || port == 0 {
- Failf("Got empty IP for reachability check (%s)", url)
- return false, fmt.Errorf("invalid input ip or port")
- }
- e2elog.Logf("Testing HTTP health check on %v", url)
- resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
- if err != nil {
- e2elog.Logf("Got error testing for reachability of %s: %v", url, err)
- return false, err
- }
- defer resp.Body.Close()
- if err != nil {
- e2elog.Logf("Got error reading response from %s: %v", url, err)
- return false, err
- }
- // HealthCheck responder returns 503 for no local endpoints
- if resp.StatusCode == 503 {
- return false, nil
- }
- // HealthCheck responder returns 200 for non-zero local endpoints
- if resp.StatusCode == 200 {
- return true, nil
- }
- return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
- }
- // TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
- func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
- count := 0
- condition := func() (bool, error) {
- success, _ := testHTTPHealthCheckNodePort(host, port, request)
- if success && expectSucceed ||
- !success && !expectSucceed {
- count++
- }
- if count >= threshold {
- return true, nil
- }
- return false, nil
- }
- if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
- return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
- }
- return nil
- }
- // ServiceTestFixture is a simple helper class to avoid too much boilerplate in tests
- type ServiceTestFixture struct {
- ServiceName string
- Namespace string
- Client clientset.Interface
- TestID string
- Labels map[string]string
- rcs map[string]bool
- services map[string]bool
- Name string
- Image string
- }
- // NewServerTest creates a new ServiceTestFixture for the tests.
- func NewServerTest(client clientset.Interface, namespace string, serviceName string) *ServiceTestFixture {
- t := &ServiceTestFixture{}
- t.Client = client
- t.Namespace = namespace
- t.ServiceName = serviceName
- t.TestID = t.ServiceName + "-" + string(uuid.NewUUID())
- t.Labels = map[string]string{
- "testid": t.TestID,
- }
- t.rcs = make(map[string]bool)
- t.services = make(map[string]bool)
- t.Name = "webserver"
- t.Image = imageutils.GetE2EImage(imageutils.TestWebserver)
- return t
- }
- // BuildServiceSpec builds default config for a service (which can then be changed)
- func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: t.ServiceName,
- Namespace: t.Namespace,
- },
- Spec: v1.ServiceSpec{
- Selector: t.Labels,
- Ports: []v1.ServicePort{{
- Port: 80,
- TargetPort: intstr.FromInt(80),
- }},
- },
- }
- return service
- }
- // CreateRC creates a replication controller and records it for cleanup.
- func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.ReplicationController, error) {
- rc, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Create(rc)
- if err == nil {
- t.rcs[rc.Name] = true
- }
- return rc, err
- }
- // CreateService creates a service, and record it for cleanup
- func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
- result, err := t.Client.CoreV1().Services(t.Namespace).Create(service)
- if err == nil {
- t.services[service.Name] = true
- }
- return result, err
- }
- // DeleteService deletes a service, and remove it from the cleanup list
- func (t *ServiceTestFixture) DeleteService(serviceName string) error {
- err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
- if err == nil {
- delete(t.services, serviceName)
- }
- return err
- }
- // Cleanup cleans all ReplicationControllers and Services which this object holds.
- func (t *ServiceTestFixture) Cleanup() []error {
- var errs []error
- for rcName := range t.rcs {
- ginkgo.By("stopping RC " + rcName + " in namespace " + t.Namespace)
- err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
- // First, resize the RC to 0.
- old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
- if err != nil {
- if errors.IsNotFound(err) {
- return nil
- }
- return err
- }
- x := int32(0)
- old.Spec.Replicas = &x
- if _, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Update(old); err != nil {
- if errors.IsNotFound(err) {
- return nil
- }
- return err
- }
- return nil
- })
- if err != nil {
- errs = append(errs, err)
- }
- // TODO(mikedanese): Wait.
- // Then, delete the RC altogether.
- if err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
- if !errors.IsNotFound(err) {
- errs = append(errs, err)
- }
- }
- }
- for serviceName := range t.services {
- ginkgo.By("deleting service " + serviceName + " in namespace " + t.Namespace)
- err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
- if err != nil {
- if !errors.IsNotFound(err) {
- errs = append(errs, err)
- }
- }
- }
- return errs
- }
- // GetIngressPoint returns a host on which ingress serves.
- func GetIngressPoint(ing *v1.LoadBalancerIngress) string {
- host := ing.IP
- if host == "" {
- host = ing.Hostname
- }
- return host
- }
- // UpdateService fetches a service, calls the update function on it,
- // and then attempts to send the updated service. It retries up to 2
- // times in the face of timeouts and conflicts.
- func UpdateService(c clientset.Interface, namespace, serviceName string, update func(*v1.Service)) (*v1.Service, error) {
- var service *v1.Service
- var err error
- for i := 0; i < 3; i++ {
- service, err = c.CoreV1().Services(namespace).Get(serviceName, metav1.GetOptions{})
- if err != nil {
- return service, err
- }
- update(service)
- service, err = c.CoreV1().Services(namespace).Update(service)
- if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
- return service, err
- }
- }
- return service, err
- }
- // StartServeHostnameService creates a replication controller that serves its
- // hostname and a service on top of it.
- func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
- podNames := make([]string, replicas)
- name := svc.ObjectMeta.Name
- ginkgo.By("creating service " + name + " in namespace " + ns)
- _, err := c.CoreV1().Services(ns).Create(svc)
- if err != nil {
- return podNames, "", err
- }
- var createdPods []*v1.Pod
- maxContainerFailures := 0
- config := testutils.RCConfig{
- Client: c,
- Image: ServeHostnameImage,
- Name: name,
- Namespace: ns,
- PollInterval: 3 * time.Second,
- Timeout: PodReadyBeforeTimeout,
- Replicas: replicas,
- CreatedPods: &createdPods,
- MaxContainerFailures: &maxContainerFailures,
- }
- err = RunRC(config)
- if err != nil {
- return podNames, "", err
- }
- if len(createdPods) != replicas {
- return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
- }
- for i := range createdPods {
- podNames[i] = createdPods[i].ObjectMeta.Name
- }
- sort.StringSlice(podNames).Sort()
- service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
- if err != nil {
- return podNames, "", err
- }
- if service.Spec.ClusterIP == "" {
- return podNames, "", fmt.Errorf("service IP is blank for %v", name)
- }
- serviceIP := service.Spec.ClusterIP
- return podNames, serviceIP, nil
- }
- // StopServeHostnameService stops the given service.
- func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
- if err := DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
- return err
- }
- if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {
- return err
- }
- return nil
- }
- // VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
- // given host and from within a pod. The host is expected to be an SSH-able node
- // in the cluster. Each pod in the service is expected to echo its name. These
- // names are compared with the given expectedPods list after a sort | uniq.
- func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
- execPodName := CreateExecPodOrFail(c, ns, "execpod-", nil)
- defer func() {
- DeletePodOrFail(c, ns, execPodName)
- }()
- // Loop a bunch of times - the proxy is randomized, so we want a good
- // chance of hitting each backend at least once.
- buildCommand := func(wget string) string {
- serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
- return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
- 50*len(expectedPods), wget, serviceIPPort)
- }
- commands := []func() string{
- // verify service from node
- func() string {
- cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
- e2elog.Logf("Executing cmd %q on host %v", cmd, host)
- result, err := e2essh.SSH(cmd, host, TestContext.Provider)
- if err != nil || result.Code != 0 {
- e2essh.LogResult(result)
- e2elog.Logf("error while SSH-ing to node: %v", err)
- }
- return result.Stdout
- },
- // verify service from pod
- func() string {
- cmd := buildCommand("wget -q -T 1 -O -")
- e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
- // TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
- output, err := RunHostCmd(ns, execPodName, cmd)
- if err != nil {
- e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
- }
- return output
- },
- }
- expectedEndpoints := sets.NewString(expectedPods...)
- ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
- for _, cmdFunc := range commands {
- passed := false
- gotEndpoints := sets.NewString()
- // Retry cmdFunc for a while
- for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
- for _, endpoint := range strings.Split(cmdFunc(), "\n") {
- trimmedEp := strings.TrimSpace(endpoint)
- if trimmedEp != "" {
- gotEndpoints.Insert(trimmedEp)
- }
- }
- // TODO: simply checking that the retrieved endpoints is a superset
- // of the expected allows us to ignore intermitten network flakes that
- // result in output like "wget timed out", but these should be rare
- // and we need a better way to track how often it occurs.
- if gotEndpoints.IsSuperset(expectedEndpoints) {
- if !gotEndpoints.Equal(expectedEndpoints) {
- e2elog.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
- }
- passed = true
- break
- }
- e2elog.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
- }
- if !passed {
- // Sort the lists so they're easier to visually diff.
- exp := expectedEndpoints.List()
- got := gotEndpoints.List()
- sort.StringSlice(exp).Sort()
- sort.StringSlice(got).Sort()
- return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
- }
- }
- return nil
- }
- // VerifyServeHostnameServiceDown verifies that the given service isn't served.
- func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
- ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
- // The current versions of curl included in CentOS and RHEL distros
- // misinterpret square brackets around IPv6 as globbing, so use the -g
- // argument to disable globbing to handle the IPv6 case.
- command := fmt.Sprintf(
- "curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort)
- for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
- result, err := e2essh.SSH(command, host, TestContext.Provider)
- if err != nil {
- e2essh.LogResult(result)
- e2elog.Logf("error while SSH-ing to node: %v", err)
- }
- if result.Code != 99 {
- return nil
- }
- e2elog.Logf("service still alive - still waiting")
- }
- return fmt.Errorf("waiting for service to be down timed out")
- }
- // CleanupServiceResources cleans up service Type=LoadBalancer resources.
- func CleanupServiceResources(c clientset.Interface, loadBalancerName, region, zone string) {
- TestContext.CloudConfig.Provider.CleanupServiceResources(c, loadBalancerName, region, zone)
- }
- // DescribeSvc logs the output of kubectl describe svc for the given namespace
- func DescribeSvc(ns string) {
- e2elog.Logf("\nOutput of kubectl describe svc:\n")
- desc, _ := RunKubectl(
- "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
- e2elog.Logf(desc)
- }
- // CreateServiceSpec returns a Service object for testing.
- func CreateServiceSpec(serviceName, externalName string, isHeadless bool, selector map[string]string) *v1.Service {
- headlessService := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: serviceName,
- },
- Spec: v1.ServiceSpec{
- Selector: selector,
- },
- }
- if externalName != "" {
- headlessService.Spec.Type = v1.ServiceTypeExternalName
- headlessService.Spec.ExternalName = externalName
- } else {
- headlessService.Spec.Ports = []v1.ServicePort{
- {Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
- }
- }
- if isHeadless {
- headlessService.Spec.ClusterIP = "None"
- }
- return headlessService
- }
- // EnableAndDisableInternalLB returns two functions for enabling and disabling the internal load balancer
- // setting for the supported cloud providers (currently GCE/GKE and Azure) and empty functions for others.
- func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
- return TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
- }
- // GetServiceLoadBalancerCreationTimeout returns a timeout value for creating a load balancer of a service.
- func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration {
- if nodes := GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > LargeClusterMinNodesNumber {
- return LoadBalancerCreateTimeoutLarge
- }
- return LoadBalancerCreateTimeoutDefault
- }
- // affinityTracker tracks the destination of a request for the affinity tests.
- type affinityTracker struct {
- hostTrace []string
- }
- // Record the response going to a given host.
- func (at *affinityTracker) recordHost(host string) {
- at.hostTrace = append(at.hostTrace, host)
- e2elog.Logf("Received response from host: %s", host)
- }
- // Check that we got a constant count requests going to the same host.
- func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
- fulfilled = (len(at.hostTrace) >= count)
- if len(at.hostTrace) == 0 {
- return fulfilled, true
- }
- last := at.hostTrace[0:]
- if len(at.hostTrace)-count >= 0 {
- last = at.hostTrace[len(at.hostTrace)-count:]
- }
- host := at.hostTrace[len(at.hostTrace)-1]
- for _, h := range last {
- if h != host {
- return fulfilled, false
- }
- }
- return fulfilled, true
- }
- func checkAffinityFailed(tracker affinityTracker, err string) {
- e2elog.Logf("%v", tracker.hostTrace)
- Failf(err)
- }
- // CheckAffinity function tests whether the service affinity works as expected.
- // If affinity is expected, the test will return true once affinityConfirmCount
- // number of same response observed in a row. If affinity is not expected, the
- // test will keep observe until different responses observed. The function will
- // return false only in case of unexpected errors.
- func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool {
- targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
- cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort)
- timeout := ServiceTestTimeout
- if execPod == nil {
- timeout = LoadBalancerPollTimeout
- }
- var tracker affinityTracker
- if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
- if execPod != nil {
- stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd)
- if err != nil {
- e2elog.Logf("Failed to get response from %s. Retry until timeout", targetIPPort)
- return false, nil
- }
- tracker.recordHost(stdout)
- } else {
- rawResponse := jig.GetHTTPContent(targetIP, targetPort, timeout, "")
- tracker.recordHost(rawResponse.String())
- }
- trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
- if !shouldHold && !affinityHolds {
- return true, nil
- }
- if shouldHold && trackerFulfilled && affinityHolds {
- return true, nil
- }
- return false, nil
- }); pollErr != nil {
- trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
- if pollErr != wait.ErrWaitTimeout {
- checkAffinityFailed(tracker, pollErr.Error())
- return false
- }
- if !trackerFulfilled {
- checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIPPort))
- }
- if shouldHold {
- checkAffinityFailed(tracker, "Affinity should hold but didn't.")
- } else {
- checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
- }
- return true
- }
- return true
- }
|