ingress.go 16 KB


  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scale
  14. import (
  15. "context"
  16. "fmt"
  17. "io/ioutil"
  18. "sync"
  19. "time"
  20. appsv1 "k8s.io/api/apps/v1"
  21. v1 "k8s.io/api/core/v1"
  22. networkingv1beta1 "k8s.io/api/networking/v1beta1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/util/intstr"
  25. clientset "k8s.io/client-go/kubernetes"
  26. imageutils "k8s.io/kubernetes/test/utils/image"
  27. "k8s.io/kubernetes/test/e2e/framework"
  28. "k8s.io/kubernetes/test/e2e/framework/ingress"
  29. "k8s.io/kubernetes/test/e2e/framework/providers/gce"
  30. )
  31. const (
  32. numIngressesSmall = 5
  33. numIngressesMedium = 20
  34. numIngressesLarge = 50
  35. numIngressesExtraLarge = 99
  36. scaleTestIngressNamePrefix = "ing-scale"
  37. scaleTestBackendName = "echoheaders-scale"
  38. scaleTestSecretName = "tls-secret-scale"
  39. scaleTestHostname = "scale.ingress.com"
  40. scaleTestNumBackends = 10
  41. scaleTestPollInterval = 15 * time.Second
  42. // We don't expect waitForIngress to take longer
  43. // than waitForIngressMaxTimeout.
  44. waitForIngressMaxTimeout = 80 * time.Minute
  45. ingressesCleanupTimeout = 80 * time.Minute
  46. )
  47. var (
  48. scaleTestLabels = map[string]string{
  49. "app": scaleTestBackendName,
  50. }
  51. )
  52. // IngressScaleFramework defines the framework for ingress scale testing.
  53. type IngressScaleFramework struct {
  54. Clientset clientset.Interface
  55. Jig *ingress.TestJig
  56. GCEController *gce.IngressController
  57. CloudConfig framework.CloudConfig
  58. Logger ingress.TestLogger
  59. Namespace string
  60. EnableTLS bool
  61. NumIngressesTest []int
  62. OutputFile string
  63. ScaleTestDeploy *appsv1.Deployment
  64. ScaleTestSvcs []*v1.Service
  65. ScaleTestIngs []*networkingv1beta1.Ingress
  66. // BatchCreateLatencies stores all ingress creation latencies, in different
  67. // batches.
  68. BatchCreateLatencies [][]time.Duration
  69. // BatchDurations stores the total duration for each ingress batch creation.
  70. BatchDurations []time.Duration
  71. // StepCreateLatencies stores the single ingress creation latency, which happens
  72. // after each ingress batch creation is complete.
  73. StepCreateLatencies []time.Duration
  74. // StepCreateLatencies stores the single ingress update latency, which happens
  75. // after each ingress batch creation is complete.
  76. StepUpdateLatencies []time.Duration
  77. }
  78. // NewIngressScaleFramework returns a new framework for ingress scale testing.
  79. func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
  80. return &IngressScaleFramework{
  81. Namespace: ns,
  82. Clientset: cs,
  83. CloudConfig: cloudConfig,
  84. Logger: &ingress.E2ELogger{},
  85. EnableTLS: true,
  86. NumIngressesTest: []int{
  87. numIngressesSmall,
  88. numIngressesMedium,
  89. numIngressesLarge,
  90. numIngressesExtraLarge,
  91. },
  92. }
  93. }
  94. // PrepareScaleTest prepares framework for ingress scale testing.
  95. func (f *IngressScaleFramework) PrepareScaleTest() error {
  96. f.Logger.Infof("Initializing ingress test suite and gce controller...")
  97. f.Jig = ingress.NewIngressTestJig(f.Clientset)
  98. f.Jig.Logger = f.Logger
  99. f.Jig.PollInterval = scaleTestPollInterval
  100. f.GCEController = &gce.IngressController{
  101. Client: f.Clientset,
  102. Cloud: f.CloudConfig,
  103. }
  104. if err := f.GCEController.Init(); err != nil {
  105. return fmt.Errorf("failed to initialize GCE controller: %v", err)
  106. }
  107. f.ScaleTestSvcs = []*v1.Service{}
  108. f.ScaleTestIngs = []*networkingv1beta1.Ingress{}
  109. return nil
  110. }
  111. // CleanupScaleTest cleans up framework for ingress scale testing.
  112. func (f *IngressScaleFramework) CleanupScaleTest() []error {
  113. var errs []error
  114. f.Logger.Infof("Cleaning up ingresses...")
  115. for _, ing := range f.ScaleTestIngs {
  116. if ing != nil {
  117. if err := f.Clientset.NetworkingV1beta1().Ingresses(ing.Namespace).Delete(context.TODO(), ing.Name, nil); err != nil {
  118. errs = append(errs, fmt.Errorf("error while deleting ingress %s/%s: %v", ing.Namespace, ing.Name, err))
  119. }
  120. }
  121. }
  122. f.Logger.Infof("Cleaning up services...")
  123. for _, svc := range f.ScaleTestSvcs {
  124. if svc != nil {
  125. if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(context.TODO(), svc.Name, nil); err != nil {
  126. errs = append(errs, fmt.Errorf("error while deleting service %s/%s: %v", svc.Namespace, svc.Name, err))
  127. }
  128. }
  129. }
  130. if f.ScaleTestDeploy != nil {
  131. f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
  132. if err := f.Clientset.AppsV1().Deployments(f.ScaleTestDeploy.Namespace).Delete(context.TODO(), f.ScaleTestDeploy.Name, nil); err != nil {
  133. errs = append(errs, fmt.Errorf("error while delting deployment %s/%s: %v", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
  134. }
  135. }
  136. f.Logger.Infof("Cleaning up cloud resources...")
  137. if err := f.GCEController.CleanupIngressControllerWithTimeout(ingressesCleanupTimeout); err != nil {
  138. errs = append(errs, err)
  139. }
  140. return errs
  141. }
  142. // RunScaleTest runs ingress scale testing.
  143. func (f *IngressScaleFramework) RunScaleTest() []error {
  144. var errs []error
  145. testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
  146. f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
  147. testDeploy, err := f.Jig.Client.AppsV1().Deployments(f.Namespace).Create(context.TODO(), testDeploy, metav1.CreateOptions{})
  148. if err != nil {
  149. errs = append(errs, fmt.Errorf("failed to create deployment %s: %v", testDeploy.Name, err))
  150. return errs
  151. }
  152. f.ScaleTestDeploy = testDeploy
  153. if f.EnableTLS {
  154. f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
  155. if err := f.Jig.PrepareTLSSecret(f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
  156. errs = append(errs, fmt.Errorf("failed to prepare TLS secret %s: %v", scaleTestSecretName, err))
  157. return errs
  158. }
  159. }
  160. // numIngsCreated keeps track of how many ingresses have been created.
  161. numIngsCreated := 0
  162. prepareIngsFunc := func(numIngsNeeded int) {
  163. var ingWg sync.WaitGroup
  164. numIngsToCreate := numIngsNeeded - numIngsCreated
  165. ingWg.Add(numIngsToCreate)
  166. svcQueue := make(chan *v1.Service, numIngsToCreate)
  167. ingQueue := make(chan *networkingv1beta1.Ingress, numIngsToCreate)
  168. errQueue := make(chan error, numIngsToCreate)
  169. latencyQueue := make(chan time.Duration, numIngsToCreate)
  170. start := time.Now()
  171. for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
  172. suffix := fmt.Sprintf("%d", numIngsCreated)
  173. go func() {
  174. defer ingWg.Done()
  175. start := time.Now()
  176. svcCreated, ingCreated, err := f.createScaleTestServiceIngress(suffix, f.EnableTLS)
  177. svcQueue <- svcCreated
  178. ingQueue <- ingCreated
  179. if err != nil {
  180. errQueue <- err
  181. return
  182. }
  183. f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
  184. if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
  185. errQueue <- err
  186. return
  187. }
  188. elapsed := time.Since(start)
  189. f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
  190. latencyQueue <- elapsed
  191. }()
  192. }
  193. // Wait until all ingress creations are complete.
  194. f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
  195. ingWg.Wait()
  196. close(svcQueue)
  197. close(ingQueue)
  198. close(errQueue)
  199. close(latencyQueue)
  200. elapsed := time.Since(start)
  201. for svc := range svcQueue {
  202. f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
  203. }
  204. for ing := range ingQueue {
  205. f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
  206. }
  207. var createLatencies []time.Duration
  208. for latency := range latencyQueue {
  209. createLatencies = append(createLatencies, latency)
  210. }
  211. f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
  212. if len(errQueue) != 0 {
  213. f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
  214. for err := range errQueue {
  215. errs = append(errs, err)
  216. }
  217. return
  218. }
  219. f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
  220. f.BatchDurations = append(f.BatchDurations, elapsed)
  221. }
  222. measureCreateUpdateFunc := func() {
  223. f.Logger.Infof("Create one more ingress and wait for it to come up")
  224. start := time.Now()
  225. svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
  226. numIngsCreated = numIngsCreated + 1
  227. f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
  228. f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
  229. if err != nil {
  230. errs = append(errs, err)
  231. return
  232. }
  233. f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
  234. if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
  235. errs = append(errs, err)
  236. return
  237. }
  238. elapsed := time.Since(start)
  239. f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
  240. f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
  241. f.Logger.Infof("Updating ingress and wait for change to take effect")
  242. ingToUpdate, err := f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Get(context.TODO(), ingCreated.Name, metav1.GetOptions{})
  243. if err != nil {
  244. errs = append(errs, err)
  245. return
  246. }
  247. addTestPathToIngress(ingToUpdate)
  248. start = time.Now()
  249. ingToUpdate, err = f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Update(context.TODO(), ingToUpdate, metav1.UpdateOptions{})
  250. if err != nil {
  251. errs = append(errs, err)
  252. return
  253. }
  254. if err := f.Jig.WaitForGivenIngressWithTimeout(ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
  255. errs = append(errs, err)
  256. return
  257. }
  258. elapsed = time.Since(start)
  259. f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
  260. f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
  261. }
  262. defer f.dumpLatencies()
  263. for _, num := range f.NumIngressesTest {
  264. f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
  265. prepareIngsFunc(num)
  266. f.Logger.Infof("Measure create and update latency with %d ingresses", num)
  267. measureCreateUpdateFunc()
  268. if len(errs) != 0 {
  269. return errs
  270. }
  271. }
  272. return errs
  273. }
  274. func (f *IngressScaleFramework) dumpLatencies() {
  275. f.Logger.Infof("Dumping scale test latencies...")
  276. formattedData := f.GetFormattedLatencies()
  277. if f.OutputFile != "" {
  278. f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
  279. ioutil.WriteFile(f.OutputFile, []byte(formattedData), 0644)
  280. return
  281. }
  282. f.Logger.Infof("\n%v", formattedData)
  283. }
  284. // GetFormattedLatencies returns the formatted latencies output.
  285. // TODO: Need a better way/format for data output.
  286. func (f *IngressScaleFramework) GetFormattedLatencies() string {
  287. if len(f.NumIngressesTest) == 0 ||
  288. len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
  289. len(f.NumIngressesTest) != len(f.BatchDurations) ||
  290. len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
  291. len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
  292. return "Failed to construct latencies output."
  293. }
  294. res := "--- Procedure logs ---\n"
  295. for i, latencies := range f.BatchCreateLatencies {
  296. res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
  297. for _, latency := range latencies {
  298. res = res + fmt.Sprintf("- %v\n", latency)
  299. }
  300. res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
  301. res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
  302. res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
  303. }
  304. res = res + "--- Summary ---\n"
  305. var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
  306. for i, latencies := range f.BatchCreateLatencies {
  307. batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
  308. var avgLatency time.Duration
  309. for _, latency := range latencies {
  310. avgLatency = avgLatency + latency
  311. }
  312. avgLatency /= time.Duration(len(latencies))
  313. batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
  314. singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
  315. singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
  316. }
  317. res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
  318. return res
  319. }
  320. func addTestPathToIngress(ing *networkingv1beta1.Ingress) {
  321. ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
  322. ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
  323. networkingv1beta1.HTTPIngressPath{
  324. Path: "/test",
  325. Backend: ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
  326. })
  327. }
  328. func (f *IngressScaleFramework) createScaleTestServiceIngress(suffix string, enableTLS bool) (*v1.Service, *networkingv1beta1.Ingress, error) {
  329. svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(context.TODO(), generateScaleTestServiceSpec(suffix), metav1.CreateOptions{})
  330. if err != nil {
  331. return nil, nil, err
  332. }
  333. ingCreated, err := f.Clientset.NetworkingV1beta1().Ingresses(f.Namespace).Create(context.TODO(), generateScaleTestIngressSpec(suffix, enableTLS), metav1.CreateOptions{})
  334. if err != nil {
  335. return nil, nil, err
  336. }
  337. return svcCreated, ingCreated, nil
  338. }
  339. func generateScaleTestIngressSpec(suffix string, enableTLS bool) *networkingv1beta1.Ingress {
  340. ing := &networkingv1beta1.Ingress{
  341. ObjectMeta: metav1.ObjectMeta{
  342. Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
  343. },
  344. Spec: networkingv1beta1.IngressSpec{
  345. TLS: []networkingv1beta1.IngressTLS{
  346. {SecretName: scaleTestSecretName},
  347. },
  348. Rules: []networkingv1beta1.IngressRule{
  349. {
  350. Host: scaleTestHostname,
  351. IngressRuleValue: networkingv1beta1.IngressRuleValue{
  352. HTTP: &networkingv1beta1.HTTPIngressRuleValue{
  353. Paths: []networkingv1beta1.HTTPIngressPath{
  354. {
  355. Path: "/scale",
  356. Backend: networkingv1beta1.IngressBackend{
  357. ServiceName: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
  358. ServicePort: intstr.IntOrString{
  359. Type: intstr.Int,
  360. IntVal: 80,
  361. },
  362. },
  363. },
  364. },
  365. },
  366. },
  367. },
  368. },
  369. },
  370. }
  371. if enableTLS {
  372. ing.Spec.TLS = []networkingv1beta1.IngressTLS{
  373. {SecretName: scaleTestSecretName},
  374. }
  375. }
  376. return ing
  377. }
  378. func generateScaleTestServiceSpec(suffix string) *v1.Service {
  379. return &v1.Service{
  380. ObjectMeta: metav1.ObjectMeta{
  381. Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
  382. Labels: scaleTestLabels,
  383. },
  384. Spec: v1.ServiceSpec{
  385. Ports: []v1.ServicePort{{
  386. Name: "http",
  387. Protocol: v1.ProtocolTCP,
  388. Port: 80,
  389. TargetPort: intstr.FromInt(8080),
  390. }},
  391. Selector: scaleTestLabels,
  392. Type: v1.ServiceTypeNodePort,
  393. },
  394. }
  395. }
  396. func generateScaleTestBackendDeploymentSpec(numReplicas int32) *appsv1.Deployment {
  397. return &appsv1.Deployment{
  398. ObjectMeta: metav1.ObjectMeta{
  399. Name: scaleTestBackendName,
  400. },
  401. Spec: appsv1.DeploymentSpec{
  402. Replicas: &numReplicas,
  403. Selector: &metav1.LabelSelector{MatchLabels: scaleTestLabels},
  404. Template: v1.PodTemplateSpec{
  405. ObjectMeta: metav1.ObjectMeta{
  406. Labels: scaleTestLabels,
  407. },
  408. Spec: v1.PodSpec{
  409. Containers: []v1.Container{
  410. {
  411. Name: scaleTestBackendName,
  412. Image: imageutils.GetE2EImage(imageutils.EchoServer),
  413. Ports: []v1.ContainerPort{{ContainerPort: 8080}},
  414. ReadinessProbe: &v1.Probe{
  415. Handler: v1.Handler{
  416. HTTPGet: &v1.HTTPGetAction{
  417. Port: intstr.FromInt(8080),
  418. Path: "/healthz",
  419. },
  420. },
  421. FailureThreshold: 10,
  422. PeriodSeconds: 1,
  423. SuccessThreshold: 1,
  424. TimeoutSeconds: 1,
  425. },
  426. },
  427. },
  428. },
  429. },
  430. },
  431. }
  432. }