job.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package upgrades
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. batchv1 "k8s.io/api/batch/v1"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. clientset "k8s.io/client-go/kubernetes"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
  25. "k8s.io/kubernetes/test/e2e/upgrades"
  26. "github.com/onsi/ginkgo"
  27. )
  28. // JobUpgradeTest is a test harness for batch Jobs.
  29. type JobUpgradeTest struct {
  30. job *batchv1.Job
  31. namespace string
  32. }
  33. // Name returns the tracking name of the test.
  34. func (JobUpgradeTest) Name() string { return "[sig-apps] job-upgrade" }
  35. // Setup starts a Job with a parallelism of 2 and 2 completions running.
  36. func (t *JobUpgradeTest) Setup(f *framework.Framework) {
  37. t.namespace = f.Namespace.Name
  38. ginkgo.By("Creating a job")
  39. t.job = e2ejob.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6)
  40. job, err := e2ejob.CreateJob(f.ClientSet, t.namespace, t.job)
  41. t.job = job
  42. framework.ExpectNoError(err)
  43. ginkgo.By("Ensuring active pods == parallelism")
  44. err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2)
  45. framework.ExpectNoError(err)
  46. }
  47. // Test verifies that the Jobs Pods are running after the an upgrade
  48. func (t *JobUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
  49. <-done
  50. ginkgo.By("Ensuring active pods == parallelism")
  51. err := ensureAllJobPodsRunning(f.ClientSet, t.namespace, t.job.Name, 2)
  52. framework.ExpectNoError(err)
  53. }
  54. // Teardown cleans up any remaining resources.
  55. func (t *JobUpgradeTest) Teardown(f *framework.Framework) {
  56. // rely on the namespace deletion to clean up everything
  57. }
  58. // ensureAllJobPodsRunning uses c to check in the Job named jobName in ns
  59. // is running, returning an error if the expected parallelism is not
  60. // satisfied.
  61. func ensureAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error {
  62. label := labels.SelectorFromSet(labels.Set(map[string]string{e2ejob.JobSelectorKey: jobName}))
  63. options := metav1.ListOptions{LabelSelector: label.String()}
  64. pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
  65. if err != nil {
  66. return err
  67. }
  68. podsSummary := make([]string, 0, parallelism)
  69. count := int32(0)
  70. for _, p := range pods.Items {
  71. if p.Status.Phase == v1.PodRunning {
  72. count++
  73. }
  74. podsSummary = append(podsSummary, fmt.Sprintf("%s (%s: %s)", p.ObjectMeta.Name, p.Status.Phase, p.Status.Message))
  75. }
  76. if count != parallelism {
  77. return fmt.Errorf("job has %d of %d expected running pods: %s", count, parallelism, strings.Join(podsSummary, ", "))
  78. }
  79. return nil
  80. }