density.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package windows
  14. import (
  15. "fmt"
  16. "sort"
  17. "sync"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/util/uuid"
  24. "k8s.io/apimachinery/pkg/watch"
  25. "k8s.io/client-go/tools/cache"
  26. "k8s.io/kubernetes/test/e2e/framework"
  27. imageutils "k8s.io/kubernetes/test/utils/image"
  28. "github.com/onsi/ginkgo"
  29. "github.com/onsi/gomega"
  30. )
  31. var _ = SIGDescribe("[Feature:Windows] Density [Serial] [Slow]", func() {
  32. f := framework.NewDefaultFramework("density-test-windows")
  33. ginkgo.Context("create a batch of pods", func() {
  34. // TODO(coufon): the values are generous, set more precise limits with benchmark data
  35. // and add more tests
  36. dTests := []densityTest{
  37. {
  38. podsNr: 10,
  39. interval: 0 * time.Millisecond,
  40. // percentile limit of single pod startup latency
  41. podStartupLimits: framework.LatencyMetric{
  42. Perc50: 30 * time.Second,
  43. Perc90: 54 * time.Second,
  44. Perc99: 59 * time.Second,
  45. },
  46. // upbound of startup latency of a batch of pods
  47. podBatchStartupLimit: 10 * time.Minute,
  48. },
  49. }
  50. for _, testArg := range dTests {
  51. itArg := testArg
  52. desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval", itArg.podsNr, itArg.interval)
  53. ginkgo.It(desc, func() {
  54. itArg.createMethod = "batch"
  55. runDensityBatchTest(f, itArg)
  56. })
  57. }
  58. })
  59. })
  60. type densityTest struct {
  61. // number of pods
  62. podsNr int
  63. // number of background pods
  64. bgPodsNr int
  65. // interval between creating pod (rate control)
  66. interval time.Duration
  67. // create pods in 'batch' or 'sequence'
  68. createMethod string
  69. // API QPS limit
  70. APIQPSLimit int
  71. // performance limits
  72. cpuLimits framework.ContainersCPUSummary
  73. memLimits framework.ResourceUsagePerContainer
  74. podStartupLimits framework.LatencyMetric
  75. podBatchStartupLimit time.Duration
  76. }
  77. // runDensityBatchTest runs the density batch pod creation test
  78. func runDensityBatchTest(f *framework.Framework, testArg densityTest) (time.Duration, []framework.PodLatencyData) {
  79. const (
  80. podType = "density_test_pod"
  81. )
  82. var (
  83. mutex = &sync.Mutex{}
  84. watchTimes = make(map[string]metav1.Time, 0)
  85. stopCh = make(chan struct{})
  86. )
  87. // create test pod data structure
  88. pods := newDensityTestPods(testArg.podsNr, false, imageutils.GetPauseImageName(), podType)
  89. // the controller watches the change of pod status
  90. controller := newInformerWatchPod(f, mutex, watchTimes, podType)
  91. go controller.Run(stopCh)
  92. defer close(stopCh)
  93. ginkgo.By("Creating a batch of pods")
  94. // It returns a map['pod name']'creation time' containing the creation timestamps
  95. createTimes := createBatchPodWithRateControl(f, pods, testArg.interval)
  96. ginkgo.By("Waiting for all Pods to be observed by the watch...")
  97. gomega.Eventually(func() bool {
  98. return len(watchTimes) == testArg.podsNr
  99. }, 10*time.Minute, 10*time.Second).Should(gomega.BeTrue())
  100. if len(watchTimes) < testArg.podsNr {
  101. framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
  102. }
  103. // Analyze results
  104. var (
  105. firstCreate metav1.Time
  106. lastRunning metav1.Time
  107. init = true
  108. e2eLags = make([]framework.PodLatencyData, 0)
  109. )
  110. for name, create := range createTimes {
  111. watch, ok := watchTimes[name]
  112. gomega.Expect(ok).To(gomega.Equal(true))
  113. e2eLags = append(e2eLags,
  114. framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
  115. if !init {
  116. if firstCreate.Time.After(create.Time) {
  117. firstCreate = create
  118. }
  119. if lastRunning.Time.Before(watch.Time) {
  120. lastRunning = watch
  121. }
  122. } else {
  123. init = false
  124. firstCreate, lastRunning = create, watch
  125. }
  126. }
  127. sort.Sort(framework.LatencySlice(e2eLags))
  128. batchLag := lastRunning.Time.Sub(firstCreate.Time)
  129. deletePodsSync(f, pods)
  130. return batchLag, e2eLags
  131. }
  132. // createBatchPodWithRateControl creates a batch of pods concurrently, uses one goroutine for each creation.
  133. // between creations there is an interval for throughput control
  134. func createBatchPodWithRateControl(f *framework.Framework, pods []*v1.Pod, interval time.Duration) map[string]metav1.Time {
  135. createTimes := make(map[string]metav1.Time)
  136. for _, pod := range pods {
  137. createTimes[pod.ObjectMeta.Name] = metav1.Now()
  138. go f.PodClient().Create(pod)
  139. time.Sleep(interval)
  140. }
  141. return createTimes
  142. }
  143. // newInformerWatchPod creates an informer to check whether all pods are running.
  144. func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller {
  145. ns := f.Namespace.Name
  146. checkPodRunning := func(p *v1.Pod) {
  147. mutex.Lock()
  148. defer mutex.Unlock()
  149. defer ginkgo.GinkgoRecover()
  150. if p.Status.Phase == v1.PodRunning {
  151. if _, found := watchTimes[p.Name]; !found {
  152. watchTimes[p.Name] = metav1.Now()
  153. }
  154. }
  155. }
  156. _, controller := cache.NewInformer(
  157. &cache.ListWatch{
  158. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  159. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
  160. obj, err := f.ClientSet.CoreV1().Pods(ns).List(options)
  161. return runtime.Object(obj), err
  162. },
  163. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  164. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
  165. return f.ClientSet.CoreV1().Pods(ns).Watch(options)
  166. },
  167. },
  168. &v1.Pod{},
  169. 0,
  170. cache.ResourceEventHandlerFuncs{
  171. AddFunc: func(obj interface{}) {
  172. p, ok := obj.(*v1.Pod)
  173. gomega.Expect(ok).To(gomega.Equal(true))
  174. go checkPodRunning(p)
  175. },
  176. UpdateFunc: func(oldObj, newObj interface{}) {
  177. p, ok := newObj.(*v1.Pod)
  178. gomega.Expect(ok).To(gomega.Equal(true))
  179. go checkPodRunning(p)
  180. },
  181. },
  182. )
  183. return controller
  184. }
  185. // newDensityTestPods creates a list of pods (specification) for test.
  186. func newDensityTestPods(numPods int, volume bool, imageName, podType string) []*v1.Pod {
  187. var pods []*v1.Pod
  188. for i := 0; i < numPods; i++ {
  189. podName := "test-" + string(uuid.NewUUID())
  190. pod := v1.Pod{
  191. ObjectMeta: metav1.ObjectMeta{
  192. Name: podName,
  193. Labels: map[string]string{
  194. "type": podType,
  195. "name": podName,
  196. },
  197. },
  198. Spec: v1.PodSpec{
  199. // Restart policy is always (default).
  200. Containers: []v1.Container{
  201. {
  202. Image: imageName,
  203. Name: podName,
  204. },
  205. },
  206. NodeSelector: map[string]string{
  207. "beta.kubernetes.io/os": "windows",
  208. },
  209. },
  210. }
  211. if volume {
  212. pod.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
  213. {MountPath: "/test-volume-mnt", Name: podName + "-volume"},
  214. }
  215. pod.Spec.Volumes = []v1.Volume{
  216. {Name: podName + "-volume", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
  217. }
  218. }
  219. pods = append(pods, &pod)
  220. }
  221. return pods
  222. }
  223. // deletePodsSync deletes a list of pods and block until pods disappear.
  224. func deletePodsSync(f *framework.Framework, pods []*v1.Pod) {
  225. var wg sync.WaitGroup
  226. for _, pod := range pods {
  227. wg.Add(1)
  228. go func(pod *v1.Pod) {
  229. defer ginkgo.GinkgoRecover()
  230. defer wg.Done()
  231. err := f.PodClient().Delete(pod.ObjectMeta.Name, metav1.NewDeleteOptions(30))
  232. framework.ExpectNoError(err)
  233. gomega.Expect(framework.WaitForPodToDisappear(f.ClientSet, f.Namespace.Name, pod.ObjectMeta.Name, labels.Everything(),
  234. 30*time.Second, 10*time.Minute)).NotTo(gomega.HaveOccurred())
  235. }(pod)
  236. }
  237. wg.Wait()
  238. return
  239. }