service_util.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. policyv1beta1 "k8s.io/api/policy/v1beta1"
  24. "k8s.io/apimachinery/pkg/api/errors"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. utilnet "k8s.io/apimachinery/pkg/util/net"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/uuid"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. clientset "k8s.io/client-go/kubernetes"
  33. "k8s.io/client-go/util/retry"
  34. api "k8s.io/kubernetes/pkg/apis/core"
  35. "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
  36. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  37. e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
  38. testutils "k8s.io/kubernetes/test/utils"
  39. imageutils "k8s.io/kubernetes/test/utils/image"
  40. "github.com/onsi/ginkgo"
  41. )
  42. const (
  43. // KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
  44. // to not notice a Service update, such as type=NodePort.
  45. // TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
  46. // liberal. Fix tracked in #20567.
  47. KubeProxyLagTimeout = 5 * time.Minute
  48. // KubeProxyEndpointLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
  49. // to not notice an Endpoint update.
  50. KubeProxyEndpointLagTimeout = 30 * time.Second
  51. // LoadBalancerLagTimeoutDefault is the maximum time a load balancer is allowed to
  52. // not respond after creation.
  53. LoadBalancerLagTimeoutDefault = 2 * time.Minute
  54. // LoadBalancerLagTimeoutAWS is the delay between ELB creation and serving traffic
  55. // on AWS. A few minutes is typical, so use 10m.
  56. LoadBalancerLagTimeoutAWS = 10 * time.Minute
  57. // LoadBalancerCreateTimeoutDefault is the default time to wait for a load balancer to be created/modified.
  58. // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
  59. LoadBalancerCreateTimeoutDefault = 20 * time.Minute
  60. // LoadBalancerCreateTimeoutLarge is the maximum time to wait for a load balancer to be created/modified.
  61. LoadBalancerCreateTimeoutLarge = 2 * time.Hour
  62. // LoadBalancerCleanupTimeout is the time required by the loadbalancer to cleanup, proportional to numApps/Ing.
  63. // Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
  64. LoadBalancerCleanupTimeout = 15 * time.Minute
  65. // LoadBalancerPollTimeout is the time required by the loadbalancer to poll.
  66. // On average it takes ~6 minutes for a single backend to come online in GCE.
  67. LoadBalancerPollTimeout = 15 * time.Minute
  68. // LoadBalancerPollInterval is the interval value in which the loadbalancer polls.
  69. LoadBalancerPollInterval = 30 * time.Second
  70. // LargeClusterMinNodesNumber is the number of nodes which a large cluster consists of.
  71. LargeClusterMinNodesNumber = 100
  72. // MaxNodesForEndpointsTests is the max number for testing endpoints.
  73. // Don't test with more than 3 nodes.
  74. // Many tests create an endpoint per node, in large clusters, this is
  75. // resource and time intensive.
  76. MaxNodesForEndpointsTests = 3
  77. // ServiceTestTimeout is used for most polling/waiting activities
  78. ServiceTestTimeout = 60 * time.Second
  79. // GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in
  80. // one instance group on GCP.
  81. GCPMaxInstancesInInstanceGroup = 2000
  82. // AffinityConfirmCount is the number of needed continuous requests to confirm that
  83. // affinity is enabled.
  84. AffinityConfirmCount = 15
  85. )
  86. // ServiceNodePortRange should match whatever the default/configured range is
  87. var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
  88. // ServiceTestJig is a test jig to help service testing.
  89. type ServiceTestJig struct {
  90. ID string
  91. Name string
  92. Client clientset.Interface
  93. Labels map[string]string
  94. }
  95. // PodNode is a pod-node pair indicating which node a given pod is running on
  96. type PodNode struct {
  97. // Pod represents pod name
  98. Pod string
  99. // Node represents node name
  100. Node string
  101. }
  102. // NewServiceTestJig allocates and inits a new ServiceTestJig.
  103. func NewServiceTestJig(client clientset.Interface, name string) *ServiceTestJig {
  104. j := &ServiceTestJig{}
  105. j.Client = client
  106. j.Name = name
  107. j.ID = j.Name + "-" + string(uuid.NewUUID())
  108. j.Labels = map[string]string{"testid": j.ID}
  109. return j
  110. }
  111. // newServiceTemplate returns the default v1.Service template for this jig, but
  112. // does not actually create the Service. The default Service has the same name
  113. // as the jig and exposes the given port.
  114. func (j *ServiceTestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
  115. service := &v1.Service{
  116. ObjectMeta: metav1.ObjectMeta{
  117. Namespace: namespace,
  118. Name: j.Name,
  119. Labels: j.Labels,
  120. },
  121. Spec: v1.ServiceSpec{
  122. Selector: j.Labels,
  123. Ports: []v1.ServicePort{
  124. {
  125. Protocol: proto,
  126. Port: port,
  127. },
  128. },
  129. },
  130. }
  131. return service
  132. }
  133. // CreateTCPServiceWithPort creates a new TCP Service with given port based on the
  134. // jig's defaults. Callers can provide a function to tweak the Service object before
  135. // it is created.
  136. func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
  137. svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
  138. if tweak != nil {
  139. tweak(svc)
  140. }
  141. result, err := j.Client.CoreV1().Services(namespace).Create(svc)
  142. if err != nil {
  143. Failf("Failed to create TCP Service %q: %v", svc.Name, err)
  144. }
  145. return result
  146. }
  147. // CreateTCPServiceOrFail creates a new TCP Service based on the jig's
  148. // defaults. Callers can provide a function to tweak the Service object before
  149. // it is created.
  150. func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
  151. svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
  152. if tweak != nil {
  153. tweak(svc)
  154. }
  155. result, err := j.Client.CoreV1().Services(namespace).Create(svc)
  156. if err != nil {
  157. Failf("Failed to create TCP Service %q: %v", svc.Name, err)
  158. }
  159. return result
  160. }
  161. // CreateUDPServiceOrFail creates a new UDP Service based on the jig's
  162. // defaults. Callers can provide a function to tweak the Service object before
  163. // it is created.
  164. func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
  165. svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
  166. if tweak != nil {
  167. tweak(svc)
  168. }
  169. result, err := j.Client.CoreV1().Services(namespace).Create(svc)
  170. if err != nil {
  171. Failf("Failed to create UDP Service %q: %v", svc.Name, err)
  172. }
  173. return result
  174. }
  175. // CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults.
  176. // Callers can provide a function to tweak the Service object before it is created.
  177. func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
  178. svc := &v1.Service{
  179. ObjectMeta: metav1.ObjectMeta{
  180. Namespace: namespace,
  181. Name: j.Name,
  182. Labels: j.Labels,
  183. },
  184. Spec: v1.ServiceSpec{
  185. Selector: j.Labels,
  186. ExternalName: "foo.example.com",
  187. Type: v1.ServiceTypeExternalName,
  188. },
  189. }
  190. if tweak != nil {
  191. tweak(svc)
  192. }
  193. result, err := j.Client.CoreV1().Services(namespace).Create(svc)
  194. if err != nil {
  195. Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
  196. }
  197. return result
  198. }
  199. // CreateServiceWithServicePort creates a new Service with ServicePort.
  200. func (j *ServiceTestJig) CreateServiceWithServicePort(labels map[string]string, namespace string, ports []v1.ServicePort) (*v1.Service, error) {
  201. service := &v1.Service{
  202. ObjectMeta: metav1.ObjectMeta{
  203. Name: j.Name,
  204. },
  205. Spec: v1.ServiceSpec{
  206. Selector: labels,
  207. Ports: ports,
  208. },
  209. }
  210. return j.Client.CoreV1().Services(namespace).Create(service)
  211. }
  212. // ChangeServiceType updates the given service's ServiceType to the given newType.
  213. func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
  214. ingressIP := ""
  215. svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
  216. for _, ing := range s.Status.LoadBalancer.Ingress {
  217. if ing.IP != "" {
  218. ingressIP = ing.IP
  219. }
  220. }
  221. s.Spec.Type = newType
  222. s.Spec.Ports[0].NodePort = 0
  223. })
  224. if ingressIP != "" {
  225. j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
  226. }
  227. }
  228. // CreateOnlyLocalNodePortService creates a NodePort service with
  229. // ExternalTrafficPolicy set to Local and sanity checks its nodePort.
  230. // If createPod is true, it also creates an RC with 1 replica of
  231. // the standard netexec container used everywhere in this test.
  232. func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
  233. ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
  234. svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
  235. svc.Spec.Type = v1.ServiceTypeNodePort
  236. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  237. svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
  238. })
  239. if createPod {
  240. ginkgo.By("creating a pod to be part of the service " + serviceName)
  241. j.RunOrFail(namespace, nil)
  242. }
  243. j.SanityCheckService(svc, v1.ServiceTypeNodePort)
  244. return svc
  245. }
  246. // CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
  247. // ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
  248. // If createPod is true, it also creates an RC with 1 replica of
  249. // the standard netexec container used everywhere in this test.
  250. func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
  251. tweak func(svc *v1.Service)) *v1.Service {
  252. ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
  253. svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
  254. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  255. // We need to turn affinity off for our LB distribution tests
  256. svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  257. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  258. if tweak != nil {
  259. tweak(svc)
  260. }
  261. })
  262. if createPod {
  263. ginkgo.By("creating a pod to be part of the service " + serviceName)
  264. j.RunOrFail(namespace, nil)
  265. }
  266. ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
  267. svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
  268. j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
  269. return svc
  270. }
  271. // CreateLoadBalancerService creates a loadbalancer service and waits
  272. // for it to acquire an ingress IP.
  273. func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
  274. ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
  275. svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
  276. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  277. // We need to turn affinity off for our LB distribution tests
  278. svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  279. if tweak != nil {
  280. tweak(svc)
  281. }
  282. })
  283. ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
  284. svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
  285. j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
  286. return svc
  287. }
  288. // GetNodeAddresses returns a list of addresses of the given addressType for the given node
  289. func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
  290. for j := range node.Status.Addresses {
  291. nodeAddress := &node.Status.Addresses[j]
  292. if nodeAddress.Type == addressType && nodeAddress.Address != "" {
  293. ips = append(ips, nodeAddress.Address)
  294. }
  295. }
  296. return
  297. }
  298. // CollectAddresses returns a list of addresses of the given addressType for the given list of nodes
  299. func CollectAddresses(nodes *v1.NodeList, addressType v1.NodeAddressType) []string {
  300. ips := []string{}
  301. for i := range nodes.Items {
  302. ips = append(ips, GetNodeAddresses(&nodes.Items[i], addressType)...)
  303. }
  304. return ips
  305. }
  306. // GetNodePublicIps returns a public IP list of nodes.
  307. func GetNodePublicIps(c clientset.Interface) ([]string, error) {
  308. nodes := GetReadySchedulableNodesOrDie(c)
  309. ips := CollectAddresses(nodes, v1.NodeExternalIP)
  310. if len(ips) == 0 {
  311. // If ExternalIP isn't set, assume the test programs can reach the InternalIP
  312. ips = CollectAddresses(nodes, v1.NodeInternalIP)
  313. }
  314. return ips, nil
  315. }
  316. // PickNodeIP picks one public node IP
  317. func PickNodeIP(c clientset.Interface) string {
  318. publicIps, err := GetNodePublicIps(c)
  319. ExpectNoError(err)
  320. if len(publicIps) == 0 {
  321. Failf("got unexpected number (%d) of public IPs", len(publicIps))
  322. }
  323. ip := publicIps[0]
  324. return ip
  325. }
  326. // PodNodePairs return PodNode pairs for all pods in a namespace
  327. func PodNodePairs(c clientset.Interface, ns string) ([]PodNode, error) {
  328. var result []PodNode
  329. podList, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
  330. if err != nil {
  331. return result, err
  332. }
  333. for _, pod := range podList.Items {
  334. result = append(result, PodNode{
  335. Pod: pod.Name,
  336. Node: pod.Spec.NodeName,
  337. })
  338. }
  339. return result, nil
  340. }
  341. // GetEndpointNodes returns a map of nodenames:external-ip on which the
  342. // endpoints of the given Service are running.
  343. func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
  344. nodes := j.GetNodes(MaxNodesForEndpointsTests)
  345. endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
  346. if err != nil {
  347. Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
  348. }
  349. if len(endpoints.Subsets) == 0 {
  350. Failf("Endpoint has no subsets, cannot determine node addresses.")
  351. }
  352. epNodes := sets.NewString()
  353. for _, ss := range endpoints.Subsets {
  354. for _, e := range ss.Addresses {
  355. if e.NodeName != nil {
  356. epNodes.Insert(*e.NodeName)
  357. }
  358. }
  359. }
  360. nodeMap := map[string][]string{}
  361. for _, n := range nodes.Items {
  362. if epNodes.Has(n.Name) {
  363. nodeMap[n.Name] = GetNodeAddresses(&n, v1.NodeExternalIP)
  364. }
  365. }
  366. return nodeMap
  367. }
  368. // GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
  369. // where we don't eg: want to create an endpoint per node.
  370. func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
  371. nodes = GetReadySchedulableNodesOrDie(j.Client)
  372. if len(nodes.Items) <= maxNodesForTest {
  373. maxNodesForTest = len(nodes.Items)
  374. }
  375. nodes.Items = nodes.Items[:maxNodesForTest]
  376. return nodes
  377. }
  378. // GetNodesNames returns a list of names of the first maxNodesForTest nodes
  379. func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
  380. nodes := j.GetNodes(maxNodesForTest)
  381. nodesNames := []string{}
  382. for _, node := range nodes.Items {
  383. nodesNames = append(nodesNames, node.Name)
  384. }
  385. return nodesNames
  386. }
  387. // WaitForEndpointOnNode waits for a service endpoint on the given node.
  388. func (j *ServiceTestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
  389. err := wait.PollImmediate(Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
  390. endpoints, err := j.Client.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
  391. if err != nil {
  392. e2elog.Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
  393. return false, nil
  394. }
  395. if len(endpoints.Subsets) == 0 {
  396. e2elog.Logf("Expect endpoints with subsets, got none.")
  397. return false, nil
  398. }
  399. // TODO: Handle multiple endpoints
  400. if len(endpoints.Subsets[0].Addresses) == 0 {
  401. e2elog.Logf("Expected Ready endpoints - found none")
  402. return false, nil
  403. }
  404. epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
  405. e2elog.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
  406. if epHostName != nodeName {
  407. e2elog.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
  408. return false, nil
  409. }
  410. return true, nil
  411. })
  412. ExpectNoError(err)
  413. }
  414. // SanityCheckService performs sanity checks on the given service
  415. func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
  416. if svc.Spec.Type != svcType {
  417. Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
  418. }
  419. if svcType != v1.ServiceTypeExternalName {
  420. if svc.Spec.ExternalName != "" {
  421. Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
  422. }
  423. if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
  424. Failf("didn't get ClusterIP for non-ExternamName service")
  425. }
  426. } else {
  427. if svc.Spec.ClusterIP != "" {
  428. Failf("unexpected Spec.ClusterIP (%s) for ExternamName service, expected empty", svc.Spec.ClusterIP)
  429. }
  430. }
  431. expectNodePorts := false
  432. if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
  433. expectNodePorts = true
  434. }
  435. for i, port := range svc.Spec.Ports {
  436. hasNodePort := (port.NodePort != 0)
  437. if hasNodePort != expectNodePorts {
  438. Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
  439. }
  440. if hasNodePort {
  441. if !ServiceNodePortRange.Contains(int(port.NodePort)) {
  442. Failf("out-of-range nodePort (%d) for service", port.NodePort)
  443. }
  444. }
  445. }
  446. expectIngress := false
  447. if svcType == v1.ServiceTypeLoadBalancer {
  448. expectIngress = true
  449. }
  450. hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
  451. if hasIngress != expectIngress {
  452. Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
  453. }
  454. if hasIngress {
  455. for i, ing := range svc.Status.LoadBalancer.Ingress {
  456. if ing.IP == "" && ing.Hostname == "" {
  457. Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
  458. }
  459. }
  460. }
  461. }
  462. // UpdateService fetches a service, calls the update function on it, and
  463. // then attempts to send the updated service. It tries up to 3 times in the
  464. // face of timeouts and conflicts.
  465. func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
  466. for i := 0; i < 3; i++ {
  467. service, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
  468. if err != nil {
  469. return nil, fmt.Errorf("failed to get Service %q: %v", name, err)
  470. }
  471. update(service)
  472. service, err = j.Client.CoreV1().Services(namespace).Update(service)
  473. if err == nil {
  474. return service, nil
  475. }
  476. if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
  477. return nil, fmt.Errorf("failed to update Service %q: %v", name, err)
  478. }
  479. }
  480. return nil, fmt.Errorf("too many retries updating Service %q", name)
  481. }
  482. // UpdateServiceOrFail fetches a service, calls the update function on it, and
  483. // then attempts to send the updated service. It tries up to 3 times in the
  484. // face of timeouts and conflicts.
  485. func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
  486. svc, err := j.UpdateService(namespace, name, update)
  487. if err != nil {
  488. Failf(err.Error())
  489. }
  490. return svc
  491. }
  492. // WaitForNewIngressIPOrFail waits for the given service to get a new ingress IP, or fails after the given timeout
  493. func (j *ServiceTestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string, timeout time.Duration) *v1.Service {
  494. e2elog.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, name)
  495. service := j.waitForConditionOrFail(namespace, name, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
  496. if len(svc.Status.LoadBalancer.Ingress) == 0 {
  497. return false
  498. }
  499. ip := svc.Status.LoadBalancer.Ingress[0].IP
  500. if ip == "" || ip == existingIP {
  501. return false
  502. }
  503. return true
  504. })
  505. return service
  506. }
  507. // ChangeServiceNodePortOrFail changes node ports of the given service.
  508. func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
  509. var err error
  510. var service *v1.Service
  511. for i := 1; i < ServiceNodePortRange.Size; i++ {
  512. offs1 := initial - ServiceNodePortRange.Base
  513. offs2 := (offs1 + i) % ServiceNodePortRange.Size
  514. newPort := ServiceNodePortRange.Base + offs2
  515. service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
  516. s.Spec.Ports[0].NodePort = int32(newPort)
  517. })
  518. if err != nil && strings.Contains(err.Error(), portallocator.ErrAllocated.Error()) {
  519. e2elog.Logf("tried nodePort %d, but it is in use, will try another", newPort)
  520. continue
  521. }
  522. // Otherwise err was nil or err was a real error
  523. break
  524. }
  525. if err != nil {
  526. Failf("Could not change the nodePort: %v", err)
  527. }
  528. return service
  529. }
  530. // WaitForLoadBalancerOrFail waits the given service to have a LoadBalancer, or fails after the given timeout
  531. func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
  532. e2elog.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
  533. service := j.waitForConditionOrFail(namespace, name, timeout, "have a load balancer", func(svc *v1.Service) bool {
  534. return len(svc.Status.LoadBalancer.Ingress) > 0
  535. })
  536. return service
  537. }
  538. // WaitForLoadBalancerDestroyOrFail waits the given service to destroy a LoadBalancer, or fails after the given timeout
  539. func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
  540. // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
  541. defer func() {
  542. if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
  543. e2elog.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
  544. }
  545. }()
  546. e2elog.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
  547. service := j.waitForConditionOrFail(namespace, name, timeout, "have no load balancer", func(svc *v1.Service) bool {
  548. return len(svc.Status.LoadBalancer.Ingress) == 0
  549. })
  550. return service
  551. }
  552. func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) *v1.Service {
  553. var service *v1.Service
  554. pollFunc := func() (bool, error) {
  555. svc, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
  556. if err != nil {
  557. return false, err
  558. }
  559. if conditionFn(svc) {
  560. service = svc
  561. return true, nil
  562. }
  563. return false, nil
  564. }
  565. if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
  566. Failf("Timed out waiting for service %q to %s", name, message)
  567. }
  568. return service
  569. }
  570. // newRCTemplate returns the default v1.ReplicationController object for
  571. // this jig, but does not actually create the RC. The default RC has the same
  572. // name as the jig and runs the "netexec" container.
  573. func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
  574. var replicas int32 = 1
  575. var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
  576. rc := &v1.ReplicationController{
  577. ObjectMeta: metav1.ObjectMeta{
  578. Namespace: namespace,
  579. Name: j.Name,
  580. Labels: j.Labels,
  581. },
  582. Spec: v1.ReplicationControllerSpec{
  583. Replicas: &replicas,
  584. Selector: j.Labels,
  585. Template: &v1.PodTemplateSpec{
  586. ObjectMeta: metav1.ObjectMeta{
  587. Labels: j.Labels,
  588. },
  589. Spec: v1.PodSpec{
  590. Containers: []v1.Container{
  591. {
  592. Name: "netexec",
  593. Image: imageutils.GetE2EImage(imageutils.Netexec),
  594. Args: []string{"--http-port=80", "--udp-port=80"},
  595. ReadinessProbe: &v1.Probe{
  596. PeriodSeconds: 3,
  597. Handler: v1.Handler{
  598. HTTPGet: &v1.HTTPGetAction{
  599. Port: intstr.FromInt(80),
  600. Path: "/hostName",
  601. },
  602. },
  603. },
  604. },
  605. },
  606. TerminationGracePeriodSeconds: &grace,
  607. },
  608. },
  609. },
  610. }
  611. return rc
  612. }
  613. // AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
  614. func (j *ServiceTestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
  615. var replicas int32 = 2
  616. rc.Spec.Replicas = &replicas
  617. if rc.Spec.Template.Spec.Affinity == nil {
  618. rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
  619. }
  620. if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
  621. rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
  622. }
  623. rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
  624. rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
  625. v1.PodAffinityTerm{
  626. LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
  627. Namespaces: nil,
  628. TopologyKey: "kubernetes.io/hostname",
  629. })
  630. }
  631. // CreatePDBOrFail returns a PodDisruptionBudget for the given ReplicationController, or fails if a PodDisruptionBudget isn't ready
  632. func (j *ServiceTestJig) CreatePDBOrFail(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
  633. pdb := j.newPDBTemplate(namespace, rc)
  634. newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
  635. if err != nil {
  636. Failf("Failed to create PDB %q %v", pdb.Name, err)
  637. }
  638. if err := j.waitForPdbReady(namespace); err != nil {
  639. Failf("Failed waiting for PDB to be ready: %v", err)
  640. }
  641. return newPdb
  642. }
  643. // newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
  644. // this jig, but does not actually create the PDB. The default PDB specifies a
  645. // MinAvailable of N-1 and matches the pods created by the RC.
  646. func (j *ServiceTestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
  647. minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
  648. pdb := &policyv1beta1.PodDisruptionBudget{
  649. ObjectMeta: metav1.ObjectMeta{
  650. Namespace: namespace,
  651. Name: j.Name,
  652. Labels: j.Labels,
  653. },
  654. Spec: policyv1beta1.PodDisruptionBudgetSpec{
  655. MinAvailable: &minAvailable,
  656. Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
  657. },
  658. }
  659. return pdb
  660. }
  661. // RunOrFail creates a ReplicationController and Pod(s) and waits for the
  662. // Pod(s) to be running. Callers can provide a function to tweak the RC object
  663. // before it is created.
  664. func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
  665. rc := j.newRCTemplate(namespace)
  666. if tweak != nil {
  667. tweak(rc)
  668. }
  669. result, err := j.Client.CoreV1().ReplicationControllers(namespace).Create(rc)
  670. if err != nil {
  671. Failf("Failed to create RC %q: %v", rc.Name, err)
  672. }
  673. pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
  674. if err != nil {
  675. Failf("Failed to create pods: %v", err)
  676. }
  677. if err := j.waitForPodsReady(namespace, pods); err != nil {
  678. Failf("Failed waiting for pods to be running: %v", err)
  679. }
  680. return result
  681. }
  682. // Scale scales pods to the given replicas
  683. func (j *ServiceTestJig) Scale(namespace string, replicas int) {
  684. rc := j.Name
  685. scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
  686. if err != nil {
  687. Failf("Failed to get scale for RC %q: %v", rc, err)
  688. }
  689. scale.Spec.Replicas = int32(replicas)
  690. _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
  691. if err != nil {
  692. Failf("Failed to scale RC %q: %v", rc, err)
  693. }
  694. pods, err := j.waitForPodsCreated(namespace, replicas)
  695. if err != nil {
  696. Failf("Failed waiting for pods: %v", err)
  697. }
  698. if err := j.waitForPodsReady(namespace, pods); err != nil {
  699. Failf("Failed waiting for pods to be running: %v", err)
  700. }
  701. }
  702. func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
  703. timeout := 2 * time.Minute
  704. for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
  705. pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(j.Name, metav1.GetOptions{})
  706. if err != nil {
  707. return err
  708. }
  709. if pdb.Status.PodDisruptionsAllowed > 0 {
  710. return nil
  711. }
  712. }
  713. return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
  714. }
  715. func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
  716. timeout := 2 * time.Minute
  717. // List the pods, making sure we observe all the replicas.
  718. label := labels.SelectorFromSet(labels.Set(j.Labels))
  719. e2elog.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
  720. for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
  721. options := metav1.ListOptions{LabelSelector: label.String()}
  722. pods, err := j.Client.CoreV1().Pods(namespace).List(options)
  723. if err != nil {
  724. return nil, err
  725. }
  726. found := []string{}
  727. for _, pod := range pods.Items {
  728. if pod.DeletionTimestamp != nil {
  729. continue
  730. }
  731. found = append(found, pod.Name)
  732. }
  733. if len(found) == replicas {
  734. e2elog.Logf("Found all %d pods", replicas)
  735. return found, nil
  736. }
  737. e2elog.Logf("Found %d/%d pods - will retry", len(found), replicas)
  738. }
  739. return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
  740. }
  741. func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
  742. timeout := 2 * time.Minute
  743. if !CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
  744. return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
  745. }
  746. return nil
  747. }
  748. // newNetexecPodSpec returns the pod spec of netexec pod
  749. func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
  750. pod := &v1.Pod{
  751. ObjectMeta: metav1.ObjectMeta{
  752. Name: podName,
  753. },
  754. Spec: v1.PodSpec{
  755. Containers: []v1.Container{
  756. {
  757. Name: "netexec",
  758. Image: netexecImageName,
  759. Command: []string{
  760. "/netexec",
  761. fmt.Sprintf("--http-port=%d", httpPort),
  762. fmt.Sprintf("--udp-port=%d", udpPort),
  763. },
  764. Ports: []v1.ContainerPort{
  765. {
  766. Name: "http",
  767. ContainerPort: httpPort,
  768. },
  769. {
  770. Name: "udp",
  771. ContainerPort: udpPort,
  772. },
  773. },
  774. },
  775. },
  776. HostNetwork: hostNetwork,
  777. },
  778. }
  779. return pod
  780. }
  781. // LaunchNetexecPodOnNode launches a netexec pod on the given node.
  782. func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
  783. e2elog.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
  784. pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
  785. pod.Spec.NodeName = nodeName
  786. pod.ObjectMeta.Labels = j.Labels
  787. podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
  788. _, err := podClient.Create(pod)
  789. ExpectNoError(err)
  790. ExpectNoError(f.WaitForPodRunning(podName))
  791. e2elog.Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
  792. }
  793. // newEchoServerPodSpec returns the pod spec of echo server pod
  794. func newEchoServerPodSpec(podName string) *v1.Pod {
  795. port := 8080
  796. pod := &v1.Pod{
  797. ObjectMeta: metav1.ObjectMeta{
  798. Name: podName,
  799. },
  800. Spec: v1.PodSpec{
  801. Containers: []v1.Container{
  802. {
  803. Name: "echoserver",
  804. Image: imageutils.GetE2EImage(imageutils.EchoServer),
  805. Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
  806. },
  807. },
  808. RestartPolicy: v1.RestartPolicyNever,
  809. },
  810. }
  811. return pod
  812. }
  813. // LaunchEchoserverPodOnNode launches a pod serving http on port 8080 to act
  814. // as the target for source IP preservation test. The client's source ip would
  815. // be echoed back by the web server.
  816. func (j *ServiceTestJig) LaunchEchoserverPodOnNode(f *Framework, nodeName, podName string) {
  817. e2elog.Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
  818. pod := newEchoServerPodSpec(podName)
  819. pod.Spec.NodeName = nodeName
  820. pod.ObjectMeta.Labels = j.Labels
  821. podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
  822. _, err := podClient.Create(pod)
  823. ExpectNoError(err)
  824. ExpectNoError(f.WaitForPodRunning(podName))
  825. e2elog.Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
  826. }
  827. // TestReachableHTTP tests that the given host serves HTTP on the given port.
  828. func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
  829. j.TestReachableHTTPWithRetriableErrorCodes(host, port, []int{}, timeout)
  830. }
  831. // TestReachableHTTPWithRetriableErrorCodes tests that the given host serves HTTP on the given port with the given retriableErrCodes.
  832. func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
  833. pollfn := func() (bool, error) {
  834. result := PokeHTTP(host, port, "/echo?msg=hello",
  835. &HTTPPokeParams{
  836. BodyContains: "hello",
  837. RetriableCodes: retriableErrCodes,
  838. })
  839. if result.Status == HTTPSuccess {
  840. return true, nil
  841. }
  842. return false, nil // caller can retry
  843. }
  844. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  845. if err == wait.ErrWaitTimeout {
  846. Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
  847. } else {
  848. Failf("Failed to reach HTTP service through %v:%v: %v", host, port, err)
  849. }
  850. }
  851. }
  852. // TestNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
  853. func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
  854. pollfn := func() (bool, error) {
  855. result := PokeHTTP(host, port, "/", nil)
  856. if result.Code == 0 {
  857. return true, nil
  858. }
  859. return false, nil // caller can retry
  860. }
  861. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  862. Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
  863. }
  864. }
  865. // TestRejectedHTTP tests that the given host rejects a HTTP request on the given port.
  866. func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) {
  867. pollfn := func() (bool, error) {
  868. result := PokeHTTP(host, port, "/", nil)
  869. if result.Status == HTTPRefused {
  870. return true, nil
  871. }
  872. return false, nil // caller can retry
  873. }
  874. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  875. Failf("HTTP service %v:%v not rejected: %v", host, port, err)
  876. }
  877. }
  878. // TestReachableUDP tests that the given host serves UDP on the given port.
  879. func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
  880. pollfn := func() (bool, error) {
  881. result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
  882. Timeout: 3 * time.Second,
  883. Response: "hello",
  884. })
  885. if result.Status == UDPSuccess {
  886. return true, nil
  887. }
  888. return false, nil // caller can retry
  889. }
  890. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  891. Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
  892. }
  893. }
  894. // TestNotReachableUDP tests that the given host doesn't serve UDP on the given port.
  895. func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
  896. pollfn := func() (bool, error) {
  897. result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
  898. if result.Status != UDPSuccess && result.Status != UDPError {
  899. return true, nil
  900. }
  901. return false, nil // caller can retry
  902. }
  903. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  904. Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
  905. }
  906. }
  907. // TestRejectedUDP tests that the given host rejects a UDP request on the given port.
  908. func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) {
  909. pollfn := func() (bool, error) {
  910. result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
  911. if result.Status == UDPRefused {
  912. return true, nil
  913. }
  914. return false, nil // caller can retry
  915. }
  916. if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
  917. Failf("UDP service %v:%v not rejected: %v", host, port, err)
  918. }
  919. }
  920. // GetHTTPContent returns the content of the given url by HTTP.
  921. func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
  922. var body bytes.Buffer
  923. if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
  924. result := PokeHTTP(host, port, url, nil)
  925. if result.Status == HTTPSuccess {
  926. body.Write(result.Body)
  927. return true, nil
  928. }
  929. return false, nil
  930. }); pollErr != nil {
  931. Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
  932. }
  933. return body
  934. }
  935. func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
  936. ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
  937. url := fmt.Sprintf("http://%s%s", ipPort, request)
  938. if ip == "" || port == 0 {
  939. Failf("Got empty IP for reachability check (%s)", url)
  940. return false, fmt.Errorf("invalid input ip or port")
  941. }
  942. e2elog.Logf("Testing HTTP health check on %v", url)
  943. resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
  944. if err != nil {
  945. e2elog.Logf("Got error testing for reachability of %s: %v", url, err)
  946. return false, err
  947. }
  948. defer resp.Body.Close()
  949. if err != nil {
  950. e2elog.Logf("Got error reading response from %s: %v", url, err)
  951. return false, err
  952. }
  953. // HealthCheck responder returns 503 for no local endpoints
  954. if resp.StatusCode == 503 {
  955. return false, nil
  956. }
  957. // HealthCheck responder returns 200 for non-zero local endpoints
  958. if resp.StatusCode == 200 {
  959. return true, nil
  960. }
  961. return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
  962. }
  963. // TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
  964. func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
  965. count := 0
  966. condition := func() (bool, error) {
  967. success, _ := testHTTPHealthCheckNodePort(host, port, request)
  968. if success && expectSucceed ||
  969. !success && !expectSucceed {
  970. count++
  971. }
  972. if count >= threshold {
  973. return true, nil
  974. }
  975. return false, nil
  976. }
  977. if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
  978. return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
  979. }
  980. return nil
  981. }
  982. // ServiceTestFixture is a simple helper class to avoid too much boilerplate in tests
  983. type ServiceTestFixture struct {
  984. ServiceName string
  985. Namespace string
  986. Client clientset.Interface
  987. TestID string
  988. Labels map[string]string
  989. rcs map[string]bool
  990. services map[string]bool
  991. Name string
  992. Image string
  993. }
  994. // NewServerTest creates a new ServiceTestFixture for the tests.
  995. func NewServerTest(client clientset.Interface, namespace string, serviceName string) *ServiceTestFixture {
  996. t := &ServiceTestFixture{}
  997. t.Client = client
  998. t.Namespace = namespace
  999. t.ServiceName = serviceName
  1000. t.TestID = t.ServiceName + "-" + string(uuid.NewUUID())
  1001. t.Labels = map[string]string{
  1002. "testid": t.TestID,
  1003. }
  1004. t.rcs = make(map[string]bool)
  1005. t.services = make(map[string]bool)
  1006. t.Name = "webserver"
  1007. t.Image = imageutils.GetE2EImage(imageutils.TestWebserver)
  1008. return t
  1009. }
  1010. // BuildServiceSpec builds default config for a service (which can then be changed)
  1011. func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
  1012. service := &v1.Service{
  1013. ObjectMeta: metav1.ObjectMeta{
  1014. Name: t.ServiceName,
  1015. Namespace: t.Namespace,
  1016. },
  1017. Spec: v1.ServiceSpec{
  1018. Selector: t.Labels,
  1019. Ports: []v1.ServicePort{{
  1020. Port: 80,
  1021. TargetPort: intstr.FromInt(80),
  1022. }},
  1023. },
  1024. }
  1025. return service
  1026. }
  1027. // CreateRC creates a replication controller and records it for cleanup.
  1028. func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.ReplicationController, error) {
  1029. rc, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Create(rc)
  1030. if err == nil {
  1031. t.rcs[rc.Name] = true
  1032. }
  1033. return rc, err
  1034. }
  1035. // CreateService creates a service, and record it for cleanup
  1036. func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
  1037. result, err := t.Client.CoreV1().Services(t.Namespace).Create(service)
  1038. if err == nil {
  1039. t.services[service.Name] = true
  1040. }
  1041. return result, err
  1042. }
  1043. // DeleteService deletes a service, and remove it from the cleanup list
  1044. func (t *ServiceTestFixture) DeleteService(serviceName string) error {
  1045. err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
  1046. if err == nil {
  1047. delete(t.services, serviceName)
  1048. }
  1049. return err
  1050. }
  1051. // Cleanup cleans all ReplicationControllers and Services which this object holds.
  1052. func (t *ServiceTestFixture) Cleanup() []error {
  1053. var errs []error
  1054. for rcName := range t.rcs {
  1055. ginkgo.By("stopping RC " + rcName + " in namespace " + t.Namespace)
  1056. err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
  1057. // First, resize the RC to 0.
  1058. old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
  1059. if err != nil {
  1060. if errors.IsNotFound(err) {
  1061. return nil
  1062. }
  1063. return err
  1064. }
  1065. x := int32(0)
  1066. old.Spec.Replicas = &x
  1067. if _, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Update(old); err != nil {
  1068. if errors.IsNotFound(err) {
  1069. return nil
  1070. }
  1071. return err
  1072. }
  1073. return nil
  1074. })
  1075. if err != nil {
  1076. errs = append(errs, err)
  1077. }
  1078. // TODO(mikedanese): Wait.
  1079. // Then, delete the RC altogether.
  1080. if err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
  1081. if !errors.IsNotFound(err) {
  1082. errs = append(errs, err)
  1083. }
  1084. }
  1085. }
  1086. for serviceName := range t.services {
  1087. ginkgo.By("deleting service " + serviceName + " in namespace " + t.Namespace)
  1088. err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
  1089. if err != nil {
  1090. if !errors.IsNotFound(err) {
  1091. errs = append(errs, err)
  1092. }
  1093. }
  1094. }
  1095. return errs
  1096. }
  1097. // GetIngressPoint returns a host on which ingress serves.
  1098. func GetIngressPoint(ing *v1.LoadBalancerIngress) string {
  1099. host := ing.IP
  1100. if host == "" {
  1101. host = ing.Hostname
  1102. }
  1103. return host
  1104. }
  1105. // UpdateService fetches a service, calls the update function on it,
  1106. // and then attempts to send the updated service. It retries up to 2
  1107. // times in the face of timeouts and conflicts.
  1108. func UpdateService(c clientset.Interface, namespace, serviceName string, update func(*v1.Service)) (*v1.Service, error) {
  1109. var service *v1.Service
  1110. var err error
  1111. for i := 0; i < 3; i++ {
  1112. service, err = c.CoreV1().Services(namespace).Get(serviceName, metav1.GetOptions{})
  1113. if err != nil {
  1114. return service, err
  1115. }
  1116. update(service)
  1117. service, err = c.CoreV1().Services(namespace).Update(service)
  1118. if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
  1119. return service, err
  1120. }
  1121. }
  1122. return service, err
  1123. }
  1124. // StartServeHostnameService creates a replication controller that serves its
  1125. // hostname and a service on top of it.
  1126. func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
  1127. podNames := make([]string, replicas)
  1128. name := svc.ObjectMeta.Name
  1129. ginkgo.By("creating service " + name + " in namespace " + ns)
  1130. _, err := c.CoreV1().Services(ns).Create(svc)
  1131. if err != nil {
  1132. return podNames, "", err
  1133. }
  1134. var createdPods []*v1.Pod
  1135. maxContainerFailures := 0
  1136. config := testutils.RCConfig{
  1137. Client: c,
  1138. Image: ServeHostnameImage,
  1139. Name: name,
  1140. Namespace: ns,
  1141. PollInterval: 3 * time.Second,
  1142. Timeout: PodReadyBeforeTimeout,
  1143. Replicas: replicas,
  1144. CreatedPods: &createdPods,
  1145. MaxContainerFailures: &maxContainerFailures,
  1146. }
  1147. err = RunRC(config)
  1148. if err != nil {
  1149. return podNames, "", err
  1150. }
  1151. if len(createdPods) != replicas {
  1152. return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
  1153. }
  1154. for i := range createdPods {
  1155. podNames[i] = createdPods[i].ObjectMeta.Name
  1156. }
  1157. sort.StringSlice(podNames).Sort()
  1158. service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
  1159. if err != nil {
  1160. return podNames, "", err
  1161. }
  1162. if service.Spec.ClusterIP == "" {
  1163. return podNames, "", fmt.Errorf("service IP is blank for %v", name)
  1164. }
  1165. serviceIP := service.Spec.ClusterIP
  1166. return podNames, serviceIP, nil
  1167. }
  1168. // StopServeHostnameService stops the given service.
  1169. func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
  1170. if err := DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
  1171. return err
  1172. }
  1173. if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {
  1174. return err
  1175. }
  1176. return nil
  1177. }
  1178. // VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
  1179. // given host and from within a pod. The host is expected to be an SSH-able node
  1180. // in the cluster. Each pod in the service is expected to echo its name. These
  1181. // names are compared with the given expectedPods list after a sort | uniq.
  1182. func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
  1183. execPodName := CreateExecPodOrFail(c, ns, "execpod-", nil)
  1184. defer func() {
  1185. DeletePodOrFail(c, ns, execPodName)
  1186. }()
  1187. // Loop a bunch of times - the proxy is randomized, so we want a good
  1188. // chance of hitting each backend at least once.
  1189. buildCommand := func(wget string) string {
  1190. serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
  1191. return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
  1192. 50*len(expectedPods), wget, serviceIPPort)
  1193. }
  1194. commands := []func() string{
  1195. // verify service from node
  1196. func() string {
  1197. cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
  1198. e2elog.Logf("Executing cmd %q on host %v", cmd, host)
  1199. result, err := e2essh.SSH(cmd, host, TestContext.Provider)
  1200. if err != nil || result.Code != 0 {
  1201. e2essh.LogResult(result)
  1202. e2elog.Logf("error while SSH-ing to node: %v", err)
  1203. }
  1204. return result.Stdout
  1205. },
  1206. // verify service from pod
  1207. func() string {
  1208. cmd := buildCommand("wget -q -T 1 -O -")
  1209. e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
  1210. // TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
  1211. output, err := RunHostCmd(ns, execPodName, cmd)
  1212. if err != nil {
  1213. e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
  1214. }
  1215. return output
  1216. },
  1217. }
  1218. expectedEndpoints := sets.NewString(expectedPods...)
  1219. ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
  1220. for _, cmdFunc := range commands {
  1221. passed := false
  1222. gotEndpoints := sets.NewString()
  1223. // Retry cmdFunc for a while
  1224. for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
  1225. for _, endpoint := range strings.Split(cmdFunc(), "\n") {
  1226. trimmedEp := strings.TrimSpace(endpoint)
  1227. if trimmedEp != "" {
  1228. gotEndpoints.Insert(trimmedEp)
  1229. }
  1230. }
  1231. // TODO: simply checking that the retrieved endpoints is a superset
  1232. // of the expected allows us to ignore intermitten network flakes that
  1233. // result in output like "wget timed out", but these should be rare
  1234. // and we need a better way to track how often it occurs.
  1235. if gotEndpoints.IsSuperset(expectedEndpoints) {
  1236. if !gotEndpoints.Equal(expectedEndpoints) {
  1237. e2elog.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
  1238. }
  1239. passed = true
  1240. break
  1241. }
  1242. e2elog.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
  1243. }
  1244. if !passed {
  1245. // Sort the lists so they're easier to visually diff.
  1246. exp := expectedEndpoints.List()
  1247. got := gotEndpoints.List()
  1248. sort.StringSlice(exp).Sort()
  1249. sort.StringSlice(got).Sort()
  1250. return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
  1251. }
  1252. }
  1253. return nil
  1254. }
  1255. // VerifyServeHostnameServiceDown verifies that the given service isn't served.
  1256. func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
  1257. ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
  1258. // The current versions of curl included in CentOS and RHEL distros
  1259. // misinterpret square brackets around IPv6 as globbing, so use the -g
  1260. // argument to disable globbing to handle the IPv6 case.
  1261. command := fmt.Sprintf(
  1262. "curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort)
  1263. for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
  1264. result, err := e2essh.SSH(command, host, TestContext.Provider)
  1265. if err != nil {
  1266. e2essh.LogResult(result)
  1267. e2elog.Logf("error while SSH-ing to node: %v", err)
  1268. }
  1269. if result.Code != 99 {
  1270. return nil
  1271. }
  1272. e2elog.Logf("service still alive - still waiting")
  1273. }
  1274. return fmt.Errorf("waiting for service to be down timed out")
  1275. }
  1276. // CleanupServiceResources cleans up service Type=LoadBalancer resources.
  1277. func CleanupServiceResources(c clientset.Interface, loadBalancerName, region, zone string) {
  1278. TestContext.CloudConfig.Provider.CleanupServiceResources(c, loadBalancerName, region, zone)
  1279. }
  1280. // DescribeSvc logs the output of kubectl describe svc for the given namespace
  1281. func DescribeSvc(ns string) {
  1282. e2elog.Logf("\nOutput of kubectl describe svc:\n")
  1283. desc, _ := RunKubectl(
  1284. "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
  1285. e2elog.Logf(desc)
  1286. }
  1287. // CreateServiceSpec returns a Service object for testing.
  1288. func CreateServiceSpec(serviceName, externalName string, isHeadless bool, selector map[string]string) *v1.Service {
  1289. headlessService := &v1.Service{
  1290. ObjectMeta: metav1.ObjectMeta{
  1291. Name: serviceName,
  1292. },
  1293. Spec: v1.ServiceSpec{
  1294. Selector: selector,
  1295. },
  1296. }
  1297. if externalName != "" {
  1298. headlessService.Spec.Type = v1.ServiceTypeExternalName
  1299. headlessService.Spec.ExternalName = externalName
  1300. } else {
  1301. headlessService.Spec.Ports = []v1.ServicePort{
  1302. {Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
  1303. }
  1304. }
  1305. if isHeadless {
  1306. headlessService.Spec.ClusterIP = "None"
  1307. }
  1308. return headlessService
  1309. }
  1310. // EnableAndDisableInternalLB returns two functions for enabling and disabling the internal load balancer
  1311. // setting for the supported cloud providers (currently GCE/GKE and Azure) and empty functions for others.
  1312. func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
  1313. return TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
  1314. }
  1315. // GetServiceLoadBalancerCreationTimeout returns a timeout value for creating a load balancer of a service.
  1316. func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration {
  1317. if nodes := GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > LargeClusterMinNodesNumber {
  1318. return LoadBalancerCreateTimeoutLarge
  1319. }
  1320. return LoadBalancerCreateTimeoutDefault
  1321. }
  1322. // affinityTracker tracks the destination of a request for the affinity tests.
  1323. type affinityTracker struct {
  1324. hostTrace []string
  1325. }
  1326. // Record the response going to a given host.
  1327. func (at *affinityTracker) recordHost(host string) {
  1328. at.hostTrace = append(at.hostTrace, host)
  1329. e2elog.Logf("Received response from host: %s", host)
  1330. }
  1331. // Check that we got a constant count requests going to the same host.
  1332. func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
  1333. fulfilled = (len(at.hostTrace) >= count)
  1334. if len(at.hostTrace) == 0 {
  1335. return fulfilled, true
  1336. }
  1337. last := at.hostTrace[0:]
  1338. if len(at.hostTrace)-count >= 0 {
  1339. last = at.hostTrace[len(at.hostTrace)-count:]
  1340. }
  1341. host := at.hostTrace[len(at.hostTrace)-1]
  1342. for _, h := range last {
  1343. if h != host {
  1344. return fulfilled, false
  1345. }
  1346. }
  1347. return fulfilled, true
  1348. }
  1349. func checkAffinityFailed(tracker affinityTracker, err string) {
  1350. e2elog.Logf("%v", tracker.hostTrace)
  1351. Failf(err)
  1352. }
  1353. // CheckAffinity function tests whether the service affinity works as expected.
  1354. // If affinity is expected, the test will return true once affinityConfirmCount
  1355. // number of same response observed in a row. If affinity is not expected, the
  1356. // test will keep observe until different responses observed. The function will
  1357. // return false only in case of unexpected errors.
  1358. func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool {
  1359. targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
  1360. cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort)
  1361. timeout := ServiceTestTimeout
  1362. if execPod == nil {
  1363. timeout = LoadBalancerPollTimeout
  1364. }
  1365. var tracker affinityTracker
  1366. if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
  1367. if execPod != nil {
  1368. stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd)
  1369. if err != nil {
  1370. e2elog.Logf("Failed to get response from %s. Retry until timeout", targetIPPort)
  1371. return false, nil
  1372. }
  1373. tracker.recordHost(stdout)
  1374. } else {
  1375. rawResponse := jig.GetHTTPContent(targetIP, targetPort, timeout, "")
  1376. tracker.recordHost(rawResponse.String())
  1377. }
  1378. trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
  1379. if !shouldHold && !affinityHolds {
  1380. return true, nil
  1381. }
  1382. if shouldHold && trackerFulfilled && affinityHolds {
  1383. return true, nil
  1384. }
  1385. return false, nil
  1386. }); pollErr != nil {
  1387. trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
  1388. if pollErr != wait.ErrWaitTimeout {
  1389. checkAffinityFailed(tracker, pollErr.Error())
  1390. return false
  1391. }
  1392. if !trackerFulfilled {
  1393. checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIPPort))
  1394. }
  1395. if shouldHold {
  1396. checkAffinityFailed(tracker, "Affinity should hold but didn't.")
  1397. } else {
  1398. checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
  1399. }
  1400. return true
  1401. }
  1402. return true
  1403. }