ingress.go 16 KB


  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scale
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "sync"
  18. "time"
  19. apps "k8s.io/api/apps/v1"
  20. "k8s.io/api/core/v1"
  21. networkingv1beta1 "k8s.io/api/networking/v1beta1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/util/intstr"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/kubernetes/test/e2e/framework"
  26. "k8s.io/kubernetes/test/e2e/framework/ingress"
  27. "k8s.io/kubernetes/test/e2e/framework/providers/gce"
  28. )
  29. const (
  30. numIngressesSmall = 5
  31. numIngressesMedium = 20
  32. numIngressesLarge = 50
  33. numIngressesExtraLarge = 99
  34. scaleTestIngressNamePrefix = "ing-scale"
  35. scaleTestBackendName = "echoheaders-scale"
  36. scaleTestSecretName = "tls-secret-scale"
  37. scaleTestHostname = "scale.ingress.com"
  38. scaleTestNumBackends = 10
  39. scaleTestPollInterval = 15 * time.Second
  40. // We don't expect waitForIngress to take longer
  41. // than waitForIngressMaxTimeout.
  42. waitForIngressMaxTimeout = 80 * time.Minute
  43. ingressesCleanupTimeout = 80 * time.Minute
  44. )
  45. var (
  46. scaleTestLabels = map[string]string{
  47. "app": scaleTestBackendName,
  48. }
  49. )
  50. // IngressScaleFramework defines the framework for ingress scale testing.
  51. type IngressScaleFramework struct {
  52. Clientset clientset.Interface
  53. Jig *ingress.TestJig
  54. GCEController *gce.IngressController
  55. CloudConfig framework.CloudConfig
  56. Logger ingress.TestLogger
  57. Namespace string
  58. EnableTLS bool
  59. NumIngressesTest []int
  60. OutputFile string
  61. ScaleTestDeploy *apps.Deployment
  62. ScaleTestSvcs []*v1.Service
  63. ScaleTestIngs []*networkingv1beta1.Ingress
  64. // BatchCreateLatencies stores all ingress creation latencies, in different
  65. // batches.
  66. BatchCreateLatencies [][]time.Duration
  67. // BatchDurations stores the total duration for each ingress batch creation.
  68. BatchDurations []time.Duration
  69. // StepCreateLatencies stores the single ingress creation latency, which happens
  70. // after each ingress batch creation is complete.
  71. StepCreateLatencies []time.Duration
  72. // StepCreateLatencies stores the single ingress update latency, which happens
  73. // after each ingress batch creation is complete.
  74. StepUpdateLatencies []time.Duration
  75. }
  76. // NewIngressScaleFramework returns a new framework for ingress scale testing.
  77. func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
  78. return &IngressScaleFramework{
  79. Namespace: ns,
  80. Clientset: cs,
  81. CloudConfig: cloudConfig,
  82. Logger: &ingress.E2ELogger{},
  83. EnableTLS: true,
  84. NumIngressesTest: []int{
  85. numIngressesSmall,
  86. numIngressesMedium,
  87. numIngressesLarge,
  88. numIngressesExtraLarge,
  89. },
  90. }
  91. }
  92. // PrepareScaleTest prepares framework for ingress scale testing.
  93. func (f *IngressScaleFramework) PrepareScaleTest() error {
  94. f.Logger.Infof("Initializing ingress test suite and gce controller...")
  95. f.Jig = ingress.NewIngressTestJig(f.Clientset)
  96. f.Jig.Logger = f.Logger
  97. f.Jig.PollInterval = scaleTestPollInterval
  98. f.GCEController = &gce.IngressController{
  99. Client: f.Clientset,
  100. Cloud: f.CloudConfig,
  101. }
  102. if err := f.GCEController.Init(); err != nil {
  103. return fmt.Errorf("Failed to initialize GCE controller: %v", err)
  104. }
  105. f.ScaleTestSvcs = []*v1.Service{}
  106. f.ScaleTestIngs = []*networkingv1beta1.Ingress{}
  107. return nil
  108. }
  109. // CleanupScaleTest cleans up framework for ingress scale testing.
  110. func (f *IngressScaleFramework) CleanupScaleTest() []error {
  111. var errs []error
  112. f.Logger.Infof("Cleaning up ingresses...")
  113. for _, ing := range f.ScaleTestIngs {
  114. if ing != nil {
  115. if err := f.Clientset.NetworkingV1beta1().Ingresses(ing.Namespace).Delete(ing.Name, nil); err != nil {
  116. errs = append(errs, fmt.Errorf("Error while deleting ingress %s/%s: %v", ing.Namespace, ing.Name, err))
  117. }
  118. }
  119. }
  120. f.Logger.Infof("Cleaning up services...")
  121. for _, svc := range f.ScaleTestSvcs {
  122. if svc != nil {
  123. if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil); err != nil {
  124. errs = append(errs, fmt.Errorf("Error while deleting service %s/%s: %v", svc.Namespace, svc.Name, err))
  125. }
  126. }
  127. }
  128. if f.ScaleTestDeploy != nil {
  129. f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
  130. if err := f.Clientset.AppsV1().Deployments(f.ScaleTestDeploy.Namespace).Delete(f.ScaleTestDeploy.Name, nil); err != nil {
  131. errs = append(errs, fmt.Errorf("Error while delting deployment %s/%s: %v", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
  132. }
  133. }
  134. f.Logger.Infof("Cleaning up cloud resources...")
  135. if err := f.GCEController.CleanupIngressControllerWithTimeout(ingressesCleanupTimeout); err != nil {
  136. errs = append(errs, err)
  137. }
  138. return errs
  139. }
  140. // RunScaleTest runs ingress scale testing.
  141. func (f *IngressScaleFramework) RunScaleTest() []error {
  142. var errs []error
  143. testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
  144. f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
  145. testDeploy, err := f.Jig.Client.AppsV1().Deployments(f.Namespace).Create(testDeploy)
  146. if err != nil {
  147. errs = append(errs, fmt.Errorf("Failed to create deployment %s: %v", testDeploy.Name, err))
  148. return errs
  149. }
  150. f.ScaleTestDeploy = testDeploy
  151. if f.EnableTLS {
  152. f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
  153. if err := f.Jig.PrepareTLSSecret(f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
  154. errs = append(errs, fmt.Errorf("Failed to prepare TLS secret %s: %v", scaleTestSecretName, err))
  155. return errs
  156. }
  157. }
  158. // numIngsCreated keeps track of how many ingresses have been created.
  159. numIngsCreated := 0
  160. prepareIngsFunc := func(numIngsNeeded int) {
  161. var ingWg sync.WaitGroup
  162. numIngsToCreate := numIngsNeeded - numIngsCreated
  163. ingWg.Add(numIngsToCreate)
  164. svcQueue := make(chan *v1.Service, numIngsToCreate)
  165. ingQueue := make(chan *networkingv1beta1.Ingress, numIngsToCreate)
  166. errQueue := make(chan error, numIngsToCreate)
  167. latencyQueue := make(chan time.Duration, numIngsToCreate)
  168. start := time.Now()
  169. for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
  170. suffix := fmt.Sprintf("%d", numIngsCreated)
  171. go func() {
  172. defer ingWg.Done()
  173. start := time.Now()
  174. svcCreated, ingCreated, err := f.createScaleTestServiceIngress(suffix, f.EnableTLS)
  175. svcQueue <- svcCreated
  176. ingQueue <- ingCreated
  177. if err != nil {
  178. errQueue <- err
  179. return
  180. }
  181. f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
  182. if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
  183. errQueue <- err
  184. return
  185. }
  186. elapsed := time.Since(start)
  187. f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
  188. latencyQueue <- elapsed
  189. }()
  190. }
  191. // Wait until all ingress creations are complete.
  192. f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
  193. ingWg.Wait()
  194. close(svcQueue)
  195. close(ingQueue)
  196. close(errQueue)
  197. close(latencyQueue)
  198. elapsed := time.Since(start)
  199. for svc := range svcQueue {
  200. f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
  201. }
  202. for ing := range ingQueue {
  203. f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
  204. }
  205. var createLatencies []time.Duration
  206. for latency := range latencyQueue {
  207. createLatencies = append(createLatencies, latency)
  208. }
  209. f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
  210. if len(errQueue) != 0 {
  211. f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
  212. for err := range errQueue {
  213. errs = append(errs, err)
  214. }
  215. return
  216. }
  217. f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
  218. f.BatchDurations = append(f.BatchDurations, elapsed)
  219. }
  220. measureCreateUpdateFunc := func() {
  221. f.Logger.Infof("Create one more ingress and wait for it to come up")
  222. start := time.Now()
  223. svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
  224. numIngsCreated = numIngsCreated + 1
  225. f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
  226. f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
  227. if err != nil {
  228. errs = append(errs, err)
  229. return
  230. }
  231. f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
  232. if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
  233. errs = append(errs, err)
  234. return
  235. }
  236. elapsed := time.Since(start)
  237. f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
  238. f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
  239. f.Logger.Infof("Updating ingress and wait for change to take effect")
  240. ingToUpdate, err := f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Get(ingCreated.Name, metav1.GetOptions{})
  241. if err != nil {
  242. errs = append(errs, err)
  243. return
  244. }
  245. addTestPathToIngress(ingToUpdate)
  246. start = time.Now()
  247. ingToUpdate, err = f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Update(ingToUpdate)
  248. if err != nil {
  249. errs = append(errs, err)
  250. return
  251. }
  252. if err := f.Jig.WaitForGivenIngressWithTimeout(ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
  253. errs = append(errs, err)
  254. return
  255. }
  256. elapsed = time.Since(start)
  257. f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
  258. f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
  259. }
  260. defer f.dumpLatencies()
  261. for _, num := range f.NumIngressesTest {
  262. f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
  263. prepareIngsFunc(num)
  264. f.Logger.Infof("Measure create and update latency with %d ingresses", num)
  265. measureCreateUpdateFunc()
  266. if len(errs) != 0 {
  267. return errs
  268. }
  269. }
  270. return errs
  271. }
  272. func (f *IngressScaleFramework) dumpLatencies() {
  273. f.Logger.Infof("Dumping scale test latencies...")
  274. formattedData := f.GetFormattedLatencies()
  275. if f.OutputFile != "" {
  276. f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
  277. ioutil.WriteFile(f.OutputFile, []byte(formattedData), 0644)
  278. return
  279. }
  280. f.Logger.Infof("\n%v", formattedData)
  281. }
  282. // GetFormattedLatencies returns the formatted latencies output.
  283. // TODO: Need a better way/format for data output.
  284. func (f *IngressScaleFramework) GetFormattedLatencies() string {
  285. if len(f.NumIngressesTest) == 0 ||
  286. len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
  287. len(f.NumIngressesTest) != len(f.BatchDurations) ||
  288. len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
  289. len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
  290. return "Failed to construct latencies output."
  291. }
  292. res := "--- Procedure logs ---\n"
  293. for i, latencies := range f.BatchCreateLatencies {
  294. res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
  295. for _, latency := range latencies {
  296. res = res + fmt.Sprintf("- %v\n", latency)
  297. }
  298. res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
  299. res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
  300. res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
  301. }
  302. res = res + "--- Summary ---\n"
  303. var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
  304. for i, latencies := range f.BatchCreateLatencies {
  305. batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
  306. var avgLatency time.Duration
  307. for _, latency := range latencies {
  308. avgLatency = avgLatency + latency
  309. }
  310. avgLatency /= time.Duration(len(latencies))
  311. batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
  312. singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
  313. singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
  314. }
  315. res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
  316. return res
  317. }
  318. func addTestPathToIngress(ing *networkingv1beta1.Ingress) {
  319. ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
  320. ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
  321. networkingv1beta1.HTTPIngressPath{
  322. Path: "/test",
  323. Backend: ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
  324. })
  325. }
  326. func (f *IngressScaleFramework) createScaleTestServiceIngress(suffix string, enableTLS bool) (*v1.Service, *networkingv1beta1.Ingress, error) {
  327. svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(generateScaleTestServiceSpec(suffix))
  328. if err != nil {
  329. return nil, nil, err
  330. }
  331. ingCreated, err := f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Create(generateScaleTestIngressSpec(suffix, enableTLS))
  332. if err != nil {
  333. return nil, nil, err
  334. }
  335. return svcCreated, ingCreated, nil
  336. }
  337. func generateScaleTestIngressSpec(suffix string, enableTLS bool) *networkingv1beta1.Ingress {
  338. ing := &networkingv1beta1.Ingress{
  339. ObjectMeta: metav1.ObjectMeta{
  340. Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
  341. },
  342. Spec: networkingv1beta1.IngressSpec{
  343. TLS: []networkingv1beta1.IngressTLS{
  344. {SecretName: scaleTestSecretName},
  345. },
  346. Rules: []networkingv1beta1.IngressRule{
  347. {
  348. Host: scaleTestHostname,
  349. IngressRuleValue: networkingv1beta1.IngressRuleValue{
  350. HTTP: &networkingv1beta1.HTTPIngressRuleValue{
  351. Paths: []networkingv1beta1.HTTPIngressPath{
  352. {
  353. Path: "/scale",
  354. Backend: networkingv1beta1.IngressBackend{
  355. ServiceName: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
  356. ServicePort: intstr.IntOrString{
  357. Type: intstr.Int,
  358. IntVal: 80,
  359. },
  360. },
  361. },
  362. },
  363. },
  364. },
  365. },
  366. },
  367. },
  368. }
  369. if enableTLS {
  370. ing.Spec.TLS = []networkingv1beta1.IngressTLS{
  371. {SecretName: scaleTestSecretName},
  372. }
  373. }
  374. return ing
  375. }
  376. func generateScaleTestServiceSpec(suffix string) *v1.Service {
  377. return &v1.Service{
  378. ObjectMeta: metav1.ObjectMeta{
  379. Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
  380. Labels: scaleTestLabels,
  381. },
  382. Spec: v1.ServiceSpec{
  383. Ports: []v1.ServicePort{{
  384. Name: "http",
  385. Protocol: v1.ProtocolTCP,
  386. Port: 80,
  387. TargetPort: intstr.FromInt(8080),
  388. }},
  389. Selector: scaleTestLabels,
  390. Type: v1.ServiceTypeNodePort,
  391. },
  392. }
  393. }
  394. func generateScaleTestBackendDeploymentSpec(numReplicas int32) *apps.Deployment {
  395. return &apps.Deployment{
  396. ObjectMeta: metav1.ObjectMeta{
  397. Name: scaleTestBackendName,
  398. },
  399. Spec: apps.DeploymentSpec{
  400. Replicas: &numReplicas,
  401. Selector: &metav1.LabelSelector{MatchLabels: scaleTestLabels},
  402. Template: v1.PodTemplateSpec{
  403. ObjectMeta: metav1.ObjectMeta{
  404. Labels: scaleTestLabels,
  405. },
  406. Spec: v1.PodSpec{
  407. Containers: []v1.Container{
  408. {
  409. Name: scaleTestBackendName,
  410. Image: "k8s.gcr.io/echoserver:1.10",
  411. Ports: []v1.ContainerPort{{ContainerPort: 8080}},
  412. ReadinessProbe: &v1.Probe{
  413. Handler: v1.Handler{
  414. HTTPGet: &v1.HTTPGetAction{
  415. Port: intstr.FromInt(8080),
  416. Path: "/healthz",
  417. },
  418. },
  419. FailureThreshold: 10,
  420. PeriodSeconds: 1,
  421. SuccessThreshold: 1,
  422. TimeoutSeconds: 1,
  423. },
  424. },
  425. },
  426. },
  427. },
  428. },
  429. }
  430. }