density.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package windows
  14. import (
  15. "context"
  16. "fmt"
  17. "sort"
  18. "sync"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/labels"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/util/uuid"
  25. "k8s.io/apimachinery/pkg/watch"
  26. "k8s.io/client-go/tools/cache"
  27. "k8s.io/kubernetes/test/e2e/framework"
  28. e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
  29. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  30. imageutils "k8s.io/kubernetes/test/utils/image"
  31. "github.com/onsi/ginkgo"
  32. "github.com/onsi/gomega"
  33. )
  34. var _ = SIGDescribe("[Feature:Windows] Density [Serial] [Slow]", func() {
  35. f := framework.NewDefaultFramework("density-test-windows")
  36. ginkgo.Context("create a batch of pods", func() {
  37. // TODO(coufon): the values are generous, set more precise limits with benchmark data
  38. // and add more tests
  39. dTests := []densityTest{
  40. {
  41. podsNr: 10,
  42. interval: 0 * time.Millisecond,
  43. // percentile limit of single pod startup latency
  44. podStartupLimits: e2emetrics.LatencyMetric{
  45. Perc50: 30 * time.Second,
  46. Perc90: 54 * time.Second,
  47. Perc99: 59 * time.Second,
  48. },
  49. // upbound of startup latency of a batch of pods
  50. podBatchStartupLimit: 10 * time.Minute,
  51. },
  52. }
  53. for _, testArg := range dTests {
  54. itArg := testArg
  55. desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval", itArg.podsNr, itArg.interval)
  56. ginkgo.It(desc, func() {
  57. itArg.createMethod = "batch"
  58. runDensityBatchTest(f, itArg)
  59. })
  60. }
  61. })
  62. })
  63. type densityTest struct {
  64. // number of pods
  65. podsNr int
  66. // interval between creating pod (rate control)
  67. interval time.Duration
  68. // create pods in 'batch' or 'sequence'
  69. createMethod string
  70. // API QPS limit
  71. APIQPSLimit int
  72. // performance limits
  73. podStartupLimits e2emetrics.LatencyMetric
  74. podBatchStartupLimit time.Duration
  75. }
  76. // runDensityBatchTest runs the density batch pod creation test
  77. func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Duration, []e2emetrics.PodLatencyData) {
  78. const (
  79. podType = "density_test_pod"
  80. )
  81. var (
  82. mutex = &sync.Mutex{}
  83. watchTimes = make(map[string]metav1.Time)
  84. stopCh = make(chan struct{})
  85. )
  86. // create test pod data structure
  87. pods := newDensityTestPods(testArg.podsNr, false, imageutils.GetPauseImageName(), podType)
  88. // the controller watches the change of pod status
  89. controller := newInformerWatchPod(f, mutex, watchTimes, podType)
  90. go controller.Run(stopCh)
  91. defer close(stopCh)
  92. ginkgo.By("Creating a batch of pods")
  93. // It returns a map['pod name']'creation time' containing the creation timestamps
  94. createTimes := createBatchPodWithRateControl(f, pods, testArg.interval)
  95. ginkgo.By("Waiting for all Pods to be observed by the watch...")
  96. gomega.Eventually(func() bool {
  97. return len(watchTimes) == testArg.podsNr
  98. }, 10*time.Minute, 10*time.Second).Should(gomega.BeTrue())
  99. if len(watchTimes) < testArg.podsNr {
  100. framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
  101. }
  102. // Analyze results
  103. var (
  104. firstCreate metav1.Time
  105. lastRunning metav1.Time
  106. init = true
  107. e2eLags = make([]e2emetrics.PodLatencyData, 0)
  108. )
  109. for name, create := range createTimes {
  110. watch, ok := watchTimes[name]
  111. framework.ExpectEqual(ok, true)
  112. e2eLags = append(e2eLags,
  113. e2emetrics.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
  114. if !init {
  115. if firstCreate.Time.After(create.Time) {
  116. firstCreate = create
  117. }
  118. if lastRunning.Time.Before(watch.Time) {
  119. lastRunning = watch
  120. }
  121. } else {
  122. init = false
  123. firstCreate, lastRunning = create, watch
  124. }
  125. }
  126. sort.Sort(e2emetrics.LatencySlice(e2eLags))
  127. batchLag := lastRunning.Time.Sub(firstCreate.Time)
  128. deletePodsSync(f, pods)
  129. return batchLag, e2eLags
  130. }
  131. // createBatchPodWithRateControl creates a batch of pods concurrently, uses one goroutine for each creation.
  132. // between creations there is an interval for throughput control
  133. func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, interval time.Duration) map[string]metav1.Time {
  134. createTimes := make(map[string]metav1.Time)
  135. for _, pod := range pods {
  136. createTimes[pod.ObjectMeta.Name] = metav1.Now()
  137. go f.PodClient().Create(pod)
  138. time.Sleep(interval)
  139. }
  140. return createTimes
  141. }
  142. // newInformerWatchPod creates an informer to check whether all pods are running.
  143. func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller {
  144. ns := f.Namespace.Name
  145. checkPodRunning := func(p *v1.Pod) {
  146. mutex.Lock()
  147. defer mutex.Unlock()
  148. defer ginkgo.GinkgoRecover()
  149. if p.Status.Phase == v1.PodRunning {
  150. if _, found := watchTimes[p.Name]; !found {
  151. watchTimes[p.Name] = metav1.Now()
  152. }
  153. }
  154. }
  155. _, controller := cache.NewInformer(
  156. &cache.ListWatch{
  157. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  158. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
  159. obj, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), options)
  160. return runtime.Object(obj), err
  161. },
  162. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  163. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
  164. return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
  165. },
  166. },
  167. &v1.Pod{},
  168. 0,
  169. cache.ResourceEventHandlerFuncs{
  170. AddFunc: func(obj interface{}) {
  171. p, ok := obj.(*v1.Pod)
  172. framework.ExpectEqual(ok, true)
  173. go checkPodRunning(p)
  174. },
  175. UpdateFunc: func(oldObj, newObj interface{}) {
  176. p, ok := newObj.(*v1.Pod)
  177. framework.ExpectEqual(ok, true)
  178. go checkPodRunning(p)
  179. },
  180. },
  181. )
  182. return controller
  183. }
  184. // newDensityTestPods creates a list of pods (specification) for test.
  185. func newDensityTestPods(numPods int, volume bool, imageName, podType string) []*v1.Pod {
  186. var pods []*v1.Pod
  187. for i := 0; i < numPods; i++ {
  188. podName := "test-" + string(uuid.NewUUID())
  189. pod := v1.Pod{
  190. ObjectMeta: metav1.ObjectMeta{
  191. Name: podName,
  192. Labels: map[string]string{
  193. "type": podType,
  194. "name": podName,
  195. },
  196. },
  197. Spec: v1.PodSpec{
  198. // Restart policy is always (default).
  199. Containers: []v1.Container{
  200. {
  201. Image: imageName,
  202. Name: podName,
  203. },
  204. },
  205. NodeSelector: map[string]string{
  206. "kubernetes.io/os": "windows",
  207. },
  208. },
  209. }
  210. if volume {
  211. pod.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
  212. {MountPath: "/test-volume-mnt", Name: podName + "-volume"},
  213. }
  214. pod.Spec.Volumes = []v1.Volume{
  215. {Name: podName + "-volume", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
  216. }
  217. }
  218. pods = append(pods, &pod)
  219. }
  220. return pods
  221. }
  222. // deletePodsSync deletes a list of pods and block until pods disappear.
  223. func deletePodsSync(f *framework.Framework, pods []*v1.Pod) {
  224. var wg sync.WaitGroup
  225. for _, pod := range pods {
  226. wg.Add(1)
  227. go func(pod *v1.Pod) {
  228. defer ginkgo.GinkgoRecover()
  229. defer wg.Done()
  230. err := f.PodClient().Delete(context.TODO(), pod.ObjectMeta.Name, metav1.NewDeleteOptions(30))
  231. framework.ExpectNoError(err)
  232. err = e2epod.WaitForPodToDisappear(f.ClientSet, f.Namespace.Name, pod.ObjectMeta.Name, labels.Everything(),
  233. 30*time.Second, 10*time.Minute)
  234. framework.ExpectNoError(err)
  235. }(pod)
  236. }
  237. wg.Wait()
  238. }