jig.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package service
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "time"
  22. "github.com/onsi/ginkgo"
  23. v1 "k8s.io/api/core/v1"
  24. policyv1beta1 "k8s.io/api/policy/v1beta1"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/fields"
  28. "k8s.io/apimachinery/pkg/labels"
  29. "k8s.io/apimachinery/pkg/runtime"
  30. "k8s.io/apimachinery/pkg/util/intstr"
  31. utilnet "k8s.io/apimachinery/pkg/util/net"
  32. "k8s.io/apimachinery/pkg/util/sets"
  33. "k8s.io/apimachinery/pkg/util/uuid"
  34. "k8s.io/apimachinery/pkg/util/wait"
  35. "k8s.io/apimachinery/pkg/watch"
  36. clientset "k8s.io/client-go/kubernetes"
  37. "k8s.io/client-go/tools/cache"
  38. "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
  39. "k8s.io/kubernetes/test/e2e/framework"
  40. e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
  41. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  42. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  43. e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
  44. testutils "k8s.io/kubernetes/test/utils"
  45. imageutils "k8s.io/kubernetes/test/utils/image"
  46. )
  47. // NodePortRange should match whatever the default/configured range is
  48. var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
  49. // TestJig is a test jig to help service testing.
  50. type TestJig struct {
  51. Client clientset.Interface
  52. Namespace string
  53. Name string
  54. ID string
  55. Labels map[string]string
  56. }
  57. // NewTestJig allocates and inits a new TestJig.
  58. func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
  59. j := &TestJig{}
  60. j.Client = client
  61. j.Namespace = namespace
  62. j.Name = name
  63. j.ID = j.Name + "-" + string(uuid.NewUUID())
  64. j.Labels = map[string]string{"testid": j.ID}
  65. return j
  66. }
  67. // newServiceTemplate returns the default v1.Service template for this j, but
  68. // does not actually create the Service. The default Service has the same name
  69. // as the j and exposes the given port.
  70. func (j *TestJig) newServiceTemplate(proto v1.Protocol, port int32) *v1.Service {
  71. service := &v1.Service{
  72. ObjectMeta: metav1.ObjectMeta{
  73. Namespace: j.Namespace,
  74. Name: j.Name,
  75. Labels: j.Labels,
  76. },
  77. Spec: v1.ServiceSpec{
  78. Selector: j.Labels,
  79. Ports: []v1.ServicePort{
  80. {
  81. Protocol: proto,
  82. Port: port,
  83. },
  84. },
  85. },
  86. }
  87. return service
  88. }
  89. // CreateTCPServiceWithPort creates a new TCP Service with given port based on the
  90. // j's defaults. Callers can provide a function to tweak the Service object before
  91. // it is created.
  92. func (j *TestJig) CreateTCPServiceWithPort(tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
  93. svc := j.newServiceTemplate(v1.ProtocolTCP, port)
  94. if tweak != nil {
  95. tweak(svc)
  96. }
  97. result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  98. if err != nil {
  99. return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
  100. }
  101. return j.sanityCheckService(result, svc.Spec.Type)
  102. }
  103. // CreateTCPService creates a new TCP Service based on the j's
  104. // defaults. Callers can provide a function to tweak the Service object before
  105. // it is created.
  106. func (j *TestJig) CreateTCPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
  107. svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
  108. if tweak != nil {
  109. tweak(svc)
  110. }
  111. result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  112. if err != nil {
  113. return nil, fmt.Errorf("failed to create TCP Service %q: %v", svc.Name, err)
  114. }
  115. return j.sanityCheckService(result, svc.Spec.Type)
  116. }
  117. // CreateUDPService creates a new UDP Service based on the j's
  118. // defaults. Callers can provide a function to tweak the Service object before
  119. // it is created.
  120. func (j *TestJig) CreateUDPService(tweak func(svc *v1.Service)) (*v1.Service, error) {
  121. svc := j.newServiceTemplate(v1.ProtocolUDP, 80)
  122. if tweak != nil {
  123. tweak(svc)
  124. }
  125. result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  126. if err != nil {
  127. return nil, fmt.Errorf("failed to create UDP Service %q: %v", svc.Name, err)
  128. }
  129. return j.sanityCheckService(result, svc.Spec.Type)
  130. }
  131. // CreateExternalNameService creates a new ExternalName type Service based on the j's defaults.
  132. // Callers can provide a function to tweak the Service object before it is created.
  133. func (j *TestJig) CreateExternalNameService(tweak func(svc *v1.Service)) (*v1.Service, error) {
  134. svc := &v1.Service{
  135. ObjectMeta: metav1.ObjectMeta{
  136. Namespace: j.Namespace,
  137. Name: j.Name,
  138. Labels: j.Labels,
  139. },
  140. Spec: v1.ServiceSpec{
  141. Selector: j.Labels,
  142. ExternalName: "foo.example.com",
  143. Type: v1.ServiceTypeExternalName,
  144. },
  145. }
  146. if tweak != nil {
  147. tweak(svc)
  148. }
  149. result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  150. if err != nil {
  151. return nil, fmt.Errorf("failed to create ExternalName Service %q: %v", svc.Name, err)
  152. }
  153. return j.sanityCheckService(result, svc.Spec.Type)
  154. }
  155. // ChangeServiceType updates the given service's ServiceType to the given newType.
  156. func (j *TestJig) ChangeServiceType(newType v1.ServiceType, timeout time.Duration) error {
  157. ingressIP := ""
  158. svc, err := j.UpdateService(func(s *v1.Service) {
  159. for _, ing := range s.Status.LoadBalancer.Ingress {
  160. if ing.IP != "" {
  161. ingressIP = ing.IP
  162. }
  163. }
  164. s.Spec.Type = newType
  165. s.Spec.Ports[0].NodePort = 0
  166. })
  167. if err != nil {
  168. return err
  169. }
  170. if ingressIP != "" {
  171. _, err = j.WaitForLoadBalancerDestroy(ingressIP, int(svc.Spec.Ports[0].Port), timeout)
  172. }
  173. return err
  174. }
  175. // CreateOnlyLocalNodePortService creates a NodePort service with
  176. // ExternalTrafficPolicy set to Local and sanity checks its nodePort.
  177. // If createPod is true, it also creates an RC with 1 replica of
  178. // the standard netexec container used everywhere in this test.
  179. func (j *TestJig) CreateOnlyLocalNodePortService(createPod bool) (*v1.Service, error) {
  180. ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=NodePort and ExternalTrafficPolicy=Local")
  181. svc, err := j.CreateTCPService(func(svc *v1.Service) {
  182. svc.Spec.Type = v1.ServiceTypeNodePort
  183. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  184. svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
  185. })
  186. if err != nil {
  187. return nil, err
  188. }
  189. if createPod {
  190. ginkgo.By("creating a pod to be part of the service " + j.Name)
  191. _, err = j.Run(nil)
  192. if err != nil {
  193. return nil, err
  194. }
  195. }
  196. return svc, nil
  197. }
  198. // CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
  199. // ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
  200. // If createPod is true, it also creates an RC with 1 replica of
  201. // the standard netexec container used everywhere in this test.
  202. func (j *TestJig) CreateOnlyLocalLoadBalancerService(timeout time.Duration, createPod bool,
  203. tweak func(svc *v1.Service)) (*v1.Service, error) {
  204. _, err := j.CreateLoadBalancerService(timeout, func(svc *v1.Service) {
  205. ginkgo.By("setting ExternalTrafficPolicy=Local")
  206. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  207. if tweak != nil {
  208. tweak(svc)
  209. }
  210. })
  211. if err != nil {
  212. return nil, err
  213. }
  214. if createPod {
  215. ginkgo.By("creating a pod to be part of the service " + j.Name)
  216. _, err = j.Run(nil)
  217. if err != nil {
  218. return nil, err
  219. }
  220. }
  221. ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
  222. return j.WaitForLoadBalancer(timeout)
  223. }
  224. // CreateLoadBalancerService creates a loadbalancer service and waits
  225. // for it to acquire an ingress IP.
  226. func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
  227. ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
  228. svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
  229. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  230. // We need to turn affinity off for our LB distribution tests
  231. svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  232. if tweak != nil {
  233. tweak(svc)
  234. }
  235. _, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  236. if err != nil {
  237. return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err)
  238. }
  239. ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
  240. return j.WaitForLoadBalancer(timeout)
  241. }
  242. // GetEndpointNodes returns a map of nodenames:external-ip on which the
  243. // endpoints of the Service are running.
  244. func (j *TestJig) GetEndpointNodes() (map[string][]string, error) {
  245. nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
  246. if err != nil {
  247. return nil, err
  248. }
  249. epNodes, err := j.GetEndpointNodeNames()
  250. if err != nil {
  251. return nil, err
  252. }
  253. nodeMap := map[string][]string{}
  254. for _, n := range nodes.Items {
  255. if epNodes.Has(n.Name) {
  256. nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
  257. }
  258. }
  259. return nodeMap, nil
  260. }
  261. // GetEndpointNodeNames returns a string set of node names on which the
  262. // endpoints of the given Service are running.
  263. func (j *TestJig) GetEndpointNodeNames() (sets.String, error) {
  264. endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
  265. if err != nil {
  266. return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
  267. }
  268. if len(endpoints.Subsets) == 0 {
  269. return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
  270. }
  271. epNodes := sets.NewString()
  272. for _, ss := range endpoints.Subsets {
  273. for _, e := range ss.Addresses {
  274. if e.NodeName != nil {
  275. epNodes.Insert(*e.NodeName)
  276. }
  277. }
  278. }
  279. return epNodes, nil
  280. }
  281. // WaitForEndpointOnNode waits for a service endpoint on the given node.
  282. func (j *TestJig) WaitForEndpointOnNode(nodeName string) error {
  283. return wait.PollImmediate(framework.Poll, LoadBalancerPropagationTimeoutDefault, func() (bool, error) {
  284. endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
  285. if err != nil {
  286. framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
  287. return false, nil
  288. }
  289. if len(endpoints.Subsets) == 0 {
  290. framework.Logf("Expect endpoints with subsets, got none.")
  291. return false, nil
  292. }
  293. // TODO: Handle multiple endpoints
  294. if len(endpoints.Subsets[0].Addresses) == 0 {
  295. framework.Logf("Expected Ready endpoints - found none")
  296. return false, nil
  297. }
  298. epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
  299. framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
  300. if epHostName != nodeName {
  301. framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
  302. return false, nil
  303. }
  304. return true, nil
  305. })
  306. }
  307. // WaitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
  308. func (j *TestJig) WaitForAvailableEndpoint(timeout time.Duration) error {
  309. //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
  310. endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
  311. stopCh := make(chan struct{})
  312. endpointAvailable := false
  313. var controller cache.Controller
  314. _, controller = cache.NewInformer(
  315. &cache.ListWatch{
  316. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  317. options.FieldSelector = endpointSelector.String()
  318. obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(context.TODO(), options)
  319. return runtime.Object(obj), err
  320. },
  321. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  322. options.FieldSelector = endpointSelector.String()
  323. return j.Client.CoreV1().Endpoints(j.Namespace).Watch(context.TODO(), options)
  324. },
  325. },
  326. &v1.Endpoints{},
  327. 0,
  328. cache.ResourceEventHandlerFuncs{
  329. AddFunc: func(obj interface{}) {
  330. if e, ok := obj.(*v1.Endpoints); ok {
  331. if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
  332. endpointAvailable = true
  333. }
  334. }
  335. },
  336. UpdateFunc: func(old, cur interface{}) {
  337. if e, ok := cur.(*v1.Endpoints); ok {
  338. if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
  339. endpointAvailable = true
  340. }
  341. }
  342. },
  343. },
  344. )
  345. defer func() {
  346. close(stopCh)
  347. }()
  348. go controller.Run(stopCh)
  349. err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
  350. return endpointAvailable, nil
  351. })
  352. if err != nil {
  353. return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)
  354. }
  355. return nil
  356. }
  357. // sanityCheckService performs sanity checks on the given service; in particular, ensuring
  358. // that creating/updating a service allocates IPs, ports, etc, as needed. It does not
  359. // check for ingress assignment as that happens asynchronously after the Service is created.
  360. func (j *TestJig) sanityCheckService(svc *v1.Service, svcType v1.ServiceType) (*v1.Service, error) {
  361. if svcType == "" {
  362. svcType = v1.ServiceTypeClusterIP
  363. }
  364. if svc.Spec.Type != svcType {
  365. return nil, fmt.Errorf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
  366. }
  367. if svcType != v1.ServiceTypeExternalName {
  368. if svc.Spec.ExternalName != "" {
  369. return nil, fmt.Errorf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
  370. }
  371. if svc.Spec.ClusterIP == "" {
  372. return nil, fmt.Errorf("didn't get ClusterIP for non-ExternalName service")
  373. }
  374. } else {
  375. if svc.Spec.ClusterIP != "" {
  376. return nil, fmt.Errorf("unexpected Spec.ClusterIP (%s) for ExternalName service, expected empty", svc.Spec.ClusterIP)
  377. }
  378. }
  379. expectNodePorts := false
  380. if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
  381. expectNodePorts = true
  382. }
  383. for i, port := range svc.Spec.Ports {
  384. hasNodePort := (port.NodePort != 0)
  385. if hasNodePort != expectNodePorts {
  386. return nil, fmt.Errorf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
  387. }
  388. if hasNodePort {
  389. if !NodePortRange.Contains(int(port.NodePort)) {
  390. return nil, fmt.Errorf("out-of-range nodePort (%d) for service", port.NodePort)
  391. }
  392. }
  393. }
  394. // FIXME: this fails for tests that were changed from LoadBalancer to ClusterIP.
  395. // if svcType != v1.ServiceTypeLoadBalancer {
  396. // if len(svc.Status.LoadBalancer.Ingress) != 0 {
  397. // return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress on non-LoadBalancer service")
  398. // }
  399. // }
  400. return svc, nil
  401. }
  402. // UpdateService fetches a service, calls the update function on it, and
  403. // then attempts to send the updated service. It tries up to 3 times in the
  404. // face of timeouts and conflicts.
  405. func (j *TestJig) UpdateService(update func(*v1.Service)) (*v1.Service, error) {
  406. for i := 0; i < 3; i++ {
  407. service, err := j.Client.CoreV1().Services(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
  408. if err != nil {
  409. return nil, fmt.Errorf("failed to get Service %q: %v", j.Name, err)
  410. }
  411. update(service)
  412. result, err := j.Client.CoreV1().Services(j.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
  413. if err == nil {
  414. return j.sanityCheckService(result, service.Spec.Type)
  415. }
  416. if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
  417. return nil, fmt.Errorf("failed to update Service %q: %v", j.Name, err)
  418. }
  419. }
  420. return nil, fmt.Errorf("too many retries updating Service %q", j.Name)
  421. }
  422. // WaitForNewIngressIP waits for the given service to get a new ingress IP, or returns an error after the given timeout
  423. func (j *TestJig) WaitForNewIngressIP(existingIP string, timeout time.Duration) (*v1.Service, error) {
  424. framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, j.Name)
  425. service, err := j.waitForCondition(timeout, "have a new ingress IP", func(svc *v1.Service) bool {
  426. if len(svc.Status.LoadBalancer.Ingress) == 0 {
  427. return false
  428. }
  429. ip := svc.Status.LoadBalancer.Ingress[0].IP
  430. if ip == "" || ip == existingIP {
  431. return false
  432. }
  433. return true
  434. })
  435. if err != nil {
  436. return nil, err
  437. }
  438. return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
  439. }
  440. // ChangeServiceNodePort changes node ports of the given service.
  441. func (j *TestJig) ChangeServiceNodePort(initial int) (*v1.Service, error) {
  442. var err error
  443. var service *v1.Service
  444. for i := 1; i < NodePortRange.Size; i++ {
  445. offs1 := initial - NodePortRange.Base
  446. offs2 := (offs1 + i) % NodePortRange.Size
  447. newPort := NodePortRange.Base + offs2
  448. service, err = j.UpdateService(func(s *v1.Service) {
  449. s.Spec.Ports[0].NodePort = int32(newPort)
  450. })
  451. if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
  452. framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
  453. continue
  454. }
  455. // Otherwise err was nil or err was a real error
  456. break
  457. }
  458. return service, err
  459. }
  460. // WaitForLoadBalancer waits the given service to have a LoadBalancer, or returns an error after the given timeout
  461. func (j *TestJig) WaitForLoadBalancer(timeout time.Duration) (*v1.Service, error) {
  462. framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, j.Name)
  463. service, err := j.waitForCondition(timeout, "have a load balancer", func(svc *v1.Service) bool {
  464. return len(svc.Status.LoadBalancer.Ingress) > 0
  465. })
  466. if err != nil {
  467. return nil, err
  468. }
  469. for i, ing := range service.Status.LoadBalancer.Ingress {
  470. if ing.IP == "" && ing.Hostname == "" {
  471. return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
  472. }
  473. }
  474. return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
  475. }
  476. // WaitForLoadBalancerDestroy waits the given service to destroy a LoadBalancer, or returns an error after the given timeout
  477. func (j *TestJig) WaitForLoadBalancerDestroy(ip string, port int, timeout time.Duration) (*v1.Service, error) {
  478. // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
  479. defer func() {
  480. if err := framework.EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
  481. framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
  482. }
  483. }()
  484. framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, j.Name)
  485. service, err := j.waitForCondition(timeout, "have no load balancer", func(svc *v1.Service) bool {
  486. return len(svc.Status.LoadBalancer.Ingress) == 0
  487. })
  488. if err != nil {
  489. return nil, err
  490. }
  491. return j.sanityCheckService(service, service.Spec.Type)
  492. }
  493. func (j *TestJig) waitForCondition(timeout time.Duration, message string, conditionFn func(*v1.Service) bool) (*v1.Service, error) {
  494. var service *v1.Service
  495. pollFunc := func() (bool, error) {
  496. svc, err := j.Client.CoreV1().Services(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
  497. if err != nil {
  498. return false, err
  499. }
  500. if conditionFn(svc) {
  501. service = svc
  502. return true, nil
  503. }
  504. return false, nil
  505. }
  506. if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
  507. return nil, fmt.Errorf("timed out waiting for service %q to %s", j.Name, message)
  508. }
  509. return service, nil
  510. }
  511. // newRCTemplate returns the default v1.ReplicationController object for
  512. // this j, but does not actually create the RC. The default RC has the same
  513. // name as the j and runs the "netexec" container.
  514. func (j *TestJig) newRCTemplate() *v1.ReplicationController {
  515. var replicas int32 = 1
  516. var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
  517. rc := &v1.ReplicationController{
  518. ObjectMeta: metav1.ObjectMeta{
  519. Namespace: j.Namespace,
  520. Name: j.Name,
  521. Labels: j.Labels,
  522. },
  523. Spec: v1.ReplicationControllerSpec{
  524. Replicas: &replicas,
  525. Selector: j.Labels,
  526. Template: &v1.PodTemplateSpec{
  527. ObjectMeta: metav1.ObjectMeta{
  528. Labels: j.Labels,
  529. },
  530. Spec: v1.PodSpec{
  531. Containers: []v1.Container{
  532. {
  533. Name: "netexec",
  534. Image: imageutils.GetE2EImage(imageutils.Agnhost),
  535. Args: []string{"netexec", "--http-port=80", "--udp-port=80"},
  536. ReadinessProbe: &v1.Probe{
  537. PeriodSeconds: 3,
  538. Handler: v1.Handler{
  539. HTTPGet: &v1.HTTPGetAction{
  540. Port: intstr.FromInt(80),
  541. Path: "/hostName",
  542. },
  543. },
  544. },
  545. },
  546. },
  547. TerminationGracePeriodSeconds: &grace,
  548. },
  549. },
  550. },
  551. }
  552. return rc
  553. }
  554. // AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
  555. func (j *TestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
  556. var replicas int32 = 2
  557. rc.Spec.Replicas = &replicas
  558. if rc.Spec.Template.Spec.Affinity == nil {
  559. rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
  560. }
  561. if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
  562. rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
  563. }
  564. rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
  565. rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
  566. v1.PodAffinityTerm{
  567. LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
  568. Namespaces: nil,
  569. TopologyKey: "kubernetes.io/hostname",
  570. })
  571. }
  572. // CreatePDB returns a PodDisruptionBudget for the given ReplicationController, or returns an error if a PodDisruptionBudget isn't ready
  573. func (j *TestJig) CreatePDB(rc *v1.ReplicationController) (*policyv1beta1.PodDisruptionBudget, error) {
  574. pdb := j.newPDBTemplate(rc)
  575. newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
  576. if err != nil {
  577. return nil, fmt.Errorf("failed to create PDB %q %v", pdb.Name, err)
  578. }
  579. if err := j.waitForPdbReady(); err != nil {
  580. return nil, fmt.Errorf("failed waiting for PDB to be ready: %v", err)
  581. }
  582. return newPdb, nil
  583. }
  584. // newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
  585. // this j, but does not actually create the PDB. The default PDB specifies a
  586. // MinAvailable of N-1 and matches the pods created by the RC.
  587. func (j *TestJig) newPDBTemplate(rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
  588. minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
  589. pdb := &policyv1beta1.PodDisruptionBudget{
  590. ObjectMeta: metav1.ObjectMeta{
  591. Namespace: j.Namespace,
  592. Name: j.Name,
  593. Labels: j.Labels,
  594. },
  595. Spec: policyv1beta1.PodDisruptionBudgetSpec{
  596. MinAvailable: &minAvailable,
  597. Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
  598. },
  599. }
  600. return pdb
  601. }
  602. // Run creates a ReplicationController and Pod(s) and waits for the
  603. // Pod(s) to be running. Callers can provide a function to tweak the RC object
  604. // before it is created.
  605. func (j *TestJig) Run(tweak func(rc *v1.ReplicationController)) (*v1.ReplicationController, error) {
  606. rc := j.newRCTemplate()
  607. if tweak != nil {
  608. tweak(rc)
  609. }
  610. result, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).Create(context.TODO(), rc, metav1.CreateOptions{})
  611. if err != nil {
  612. return nil, fmt.Errorf("failed to create RC %q: %v", rc.Name, err)
  613. }
  614. pods, err := j.waitForPodsCreated(int(*(rc.Spec.Replicas)))
  615. if err != nil {
  616. return nil, fmt.Errorf("failed to create pods: %v", err)
  617. }
  618. if err := j.waitForPodsReady(pods); err != nil {
  619. return nil, fmt.Errorf("failed waiting for pods to be running: %v", err)
  620. }
  621. return result, nil
  622. }
  623. // Scale scales pods to the given replicas
  624. func (j *TestJig) Scale(replicas int) error {
  625. rc := j.Name
  626. scale, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).GetScale(context.TODO(), rc, metav1.GetOptions{})
  627. if err != nil {
  628. return fmt.Errorf("failed to get scale for RC %q: %v", rc, err)
  629. }
  630. scale.ResourceVersion = "" // indicate the scale update should be unconditional
  631. scale.Spec.Replicas = int32(replicas)
  632. _, err = j.Client.CoreV1().ReplicationControllers(j.Namespace).UpdateScale(context.TODO(), rc, scale, metav1.UpdateOptions{})
  633. if err != nil {
  634. return fmt.Errorf("failed to scale RC %q: %v", rc, err)
  635. }
  636. pods, err := j.waitForPodsCreated(replicas)
  637. if err != nil {
  638. return fmt.Errorf("failed waiting for pods: %v", err)
  639. }
  640. if err := j.waitForPodsReady(pods); err != nil {
  641. return fmt.Errorf("failed waiting for pods to be running: %v", err)
  642. }
  643. return nil
  644. }
  645. func (j *TestJig) waitForPdbReady() error {
  646. timeout := 2 * time.Minute
  647. for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
  648. pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})
  649. if err != nil {
  650. return err
  651. }
  652. if pdb.Status.DisruptionsAllowed > 0 {
  653. return nil
  654. }
  655. }
  656. return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
  657. }
  658. func (j *TestJig) waitForPodsCreated(replicas int) ([]string, error) {
  659. timeout := 2 * time.Minute
  660. // List the pods, making sure we observe all the replicas.
  661. label := labels.SelectorFromSet(labels.Set(j.Labels))
  662. framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
  663. for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
  664. options := metav1.ListOptions{LabelSelector: label.String()}
  665. pods, err := j.Client.CoreV1().Pods(j.Namespace).List(context.TODO(), options)
  666. if err != nil {
  667. return nil, err
  668. }
  669. found := []string{}
  670. for _, pod := range pods.Items {
  671. if pod.DeletionTimestamp != nil {
  672. continue
  673. }
  674. found = append(found, pod.Name)
  675. }
  676. if len(found) == replicas {
  677. framework.Logf("Found all %d pods", replicas)
  678. return found, nil
  679. }
  680. framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
  681. }
  682. return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
  683. }
  684. func (j *TestJig) waitForPodsReady(pods []string) error {
  685. timeout := 2 * time.Minute
  686. if !e2epod.CheckPodsRunningReady(j.Client, j.Namespace, pods, timeout) {
  687. return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
  688. }
  689. return nil
  690. }
  691. func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) error {
  692. return testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod)
  693. }
  694. func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) error {
  695. // If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort
  696. isClusterIPV46, err := regexp.MatchString(e2enetwork.RegexIPv4+"||"+e2enetwork.RegexIPv6, clusterIP)
  697. if err != nil {
  698. return fmt.Errorf("unable to parse ClusterIP: %s", clusterIP)
  699. }
  700. if isClusterIPV46 {
  701. return testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod)
  702. }
  703. return nil
  704. }
  705. func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod, clusterIP string) error {
  706. internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
  707. externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
  708. isClusterIPV4 := net.ParseIP(clusterIP).To4() != nil
  709. for _, internalAddr := range internalAddrs {
  710. // If the node's internal address points to localhost, then we are not
  711. // able to test the service reachability via that address
  712. if isInvalidOrLocalhostAddress(internalAddr) {
  713. framework.Logf("skipping testEndpointReachability() for internal adddress %s", internalAddr)
  714. continue
  715. }
  716. isNodeInternalIPV4 := net.ParseIP(internalAddr).To4() != nil
  717. // Check service reachability on the node internalIP which is same family
  718. // as clusterIP
  719. if isClusterIPV4 != isNodeInternalIPV4 {
  720. framework.Logf("skipping testEndpointReachability() for internal adddress %s as it does not match clusterIP (%s) family", internalAddr, clusterIP)
  721. continue
  722. }
  723. err := testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod)
  724. if err != nil {
  725. return err
  726. }
  727. }
  728. for _, externalAddr := range externalAddrs {
  729. isNodeExternalIPV4 := net.ParseIP(externalAddr).To4() != nil
  730. if isClusterIPV4 != isNodeExternalIPV4 {
  731. framework.Logf("skipping testEndpointReachability() for external adddress %s as it does not match clusterIP (%s) family", externalAddr, clusterIP)
  732. continue
  733. }
  734. err := testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod)
  735. if err != nil {
  736. return err
  737. }
  738. }
  739. return nil
  740. }
  741. // isInvalidOrLocalhostAddress returns `true` if the provided `ip` is either not
  742. // parsable or the loopback address. Otherwise it will return `false`.
  743. func isInvalidOrLocalhostAddress(ip string) bool {
  744. parsedIP := net.ParseIP(ip)
  745. if parsedIP == nil || parsedIP.IsLoopback() {
  746. return true
  747. }
  748. return false
  749. }
  750. // testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
  751. // TCP and UDP protocol based service are supported at this moment
  752. // TODO: add support to test SCTP Protocol based services.
  753. func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) error {
  754. ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
  755. cmd := ""
  756. switch protocol {
  757. case v1.ProtocolTCP:
  758. cmd = fmt.Sprintf("nc -zv -t -w 2 %s %v", endpoint, port)
  759. case v1.ProtocolUDP:
  760. cmd = fmt.Sprintf("nc -zv -u -w 2 %s %v", endpoint, port)
  761. default:
  762. return fmt.Errorf("service reachablity check is not supported for %v", protocol)
  763. }
  764. err := wait.PollImmediate(1*time.Second, ServiceReachabilityShortPollTimeout, func() (bool, error) {
  765. if _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
  766. framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
  767. return false, nil
  768. }
  769. return true, nil
  770. })
  771. if err != nil {
  772. return fmt.Errorf("service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
  773. }
  774. return nil
  775. }
  776. // checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over
  777. // - ServiceName:ServicePort, ClusterIP:ServicePort
  778. func (j *TestJig) checkClusterIPServiceReachability(svc *v1.Service, pod *v1.Pod) error {
  779. clusterIP := svc.Spec.ClusterIP
  780. servicePorts := svc.Spec.Ports
  781. err := j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
  782. if err != nil {
  783. return err
  784. }
  785. for _, servicePort := range servicePorts {
  786. err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
  787. if err != nil {
  788. return err
  789. }
  790. err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
  791. if err != nil {
  792. return err
  793. }
  794. }
  795. return nil
  796. }
  797. // checkNodePortServiceReachability ensures that service of type nodePort are reachable
  798. // - Internal clients should be reachable to service over -
  799. // ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort
  800. // - External clients should be reachable to service over -
  801. // NodePublicIPs:NodePort
  802. func (j *TestJig) checkNodePortServiceReachability(svc *v1.Service, pod *v1.Pod) error {
  803. clusterIP := svc.Spec.ClusterIP
  804. servicePorts := svc.Spec.Ports
  805. // Consider only 2 nodes for testing
  806. nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, 2)
  807. if err != nil {
  808. return err
  809. }
  810. err = j.WaitForAvailableEndpoint(ServiceEndpointsTimeout)
  811. if err != nil {
  812. return err
  813. }
  814. for _, servicePort := range servicePorts {
  815. err = testReachabilityOverServiceName(svc.Name, servicePort, pod)
  816. if err != nil {
  817. return err
  818. }
  819. err = testReachabilityOverClusterIP(clusterIP, servicePort, pod)
  820. if err != nil {
  821. return err
  822. }
  823. err = testReachabilityOverNodePorts(nodes, servicePort, pod, clusterIP)
  824. if err != nil {
  825. return err
  826. }
  827. }
  828. return nil
  829. }
  830. // checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set
  831. // FQDN of kubernetes is used as externalName(for air tight platforms).
  832. func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) error {
  833. // Service must resolve to IP
  834. cmd := fmt.Sprintf("nslookup %s", svc.Name)
  835. _, err := framework.RunHostCmd(pod.Namespace, pod.Name, cmd)
  836. if err != nil {
  837. return fmt.Errorf("ExternalName service %q must resolve to IP", pod.Namespace+"/"+pod.Name)
  838. }
  839. return nil
  840. }
  841. // CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName.
  842. func (j *TestJig) CheckServiceReachability(svc *v1.Service, pod *v1.Pod) error {
  843. svcType := svc.Spec.Type
  844. _, err := j.sanityCheckService(svc, svcType)
  845. if err != nil {
  846. return err
  847. }
  848. switch svcType {
  849. case v1.ServiceTypeClusterIP:
  850. return j.checkClusterIPServiceReachability(svc, pod)
  851. case v1.ServiceTypeNodePort:
  852. return j.checkNodePortServiceReachability(svc, pod)
  853. case v1.ServiceTypeExternalName:
  854. return j.checkExternalServiceReachability(svc, pod)
  855. default:
  856. return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
  857. }
  858. }
  859. // CreateServicePods creates a replication controller with the label same as service. Service listens to HTTP.
  860. func (j *TestJig) CreateServicePods(replica int) error {
  861. config := testutils.RCConfig{
  862. Client: j.Client,
  863. Name: j.Name,
  864. Image: framework.ServeHostnameImage,
  865. Command: []string{"/agnhost", "serve-hostname"},
  866. Namespace: j.Namespace,
  867. Labels: j.Labels,
  868. PollInterval: 3 * time.Second,
  869. Timeout: framework.PodReadyBeforeTimeout,
  870. Replicas: replica,
  871. }
  872. return e2erc.RunRC(config)
  873. }
  874. // CreateTCPUDPServicePods creates a replication controller with the label same as service. Service listens to TCP and UDP.
  875. func (j *TestJig) CreateTCPUDPServicePods(replica int) error {
  876. config := testutils.RCConfig{
  877. Client: j.Client,
  878. Name: j.Name,
  879. Image: framework.ServeHostnameImage,
  880. Command: []string{"/agnhost", "serve-hostname", "--http=false", "--tcp", "--udp"},
  881. Namespace: j.Namespace,
  882. Labels: j.Labels,
  883. PollInterval: 3 * time.Second,
  884. Timeout: framework.PodReadyBeforeTimeout,
  885. Replicas: replica,
  886. }
  887. return e2erc.RunRC(config)
  888. }