memory_limits.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package windows
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "regexp"
  22. "strconv"
  23. "time"
  24. v1 "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/labels"
  28. "k8s.io/apimachinery/pkg/util/uuid"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
  31. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  32. "k8s.io/kubernetes/test/e2e/framework"
  33. e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
  34. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  35. imageutils "k8s.io/kubernetes/test/utils/image"
  36. "github.com/onsi/ginkgo"
  37. "github.com/onsi/gomega"
  38. )
  39. var _ = SIGDescribe("[Feature:Windows] Memory Limits [Serial] [Slow]", func() {
  40. f := framework.NewDefaultFramework("memory-limit-test-windows")
  41. ginkgo.BeforeEach(func() {
  42. // NOTE(vyta): these tests are Windows specific
  43. e2eskipper.SkipUnlessNodeOSDistroIs("windows")
  44. })
  45. ginkgo.Context("Allocatable node memory", func() {
  46. ginkgo.It("should be equal to a calculated allocatable memory value", func() {
  47. checkNodeAllocatableTest(f)
  48. })
  49. })
  50. ginkgo.Context("attempt to deploy past allocatable memory limits", func() {
  51. ginkgo.It("should fail deployments of pods once there isn't enough memory", func() {
  52. overrideAllocatableMemoryTest(f, 4)
  53. })
  54. })
  55. })
  56. type nodeMemory struct {
  57. // capacity
  58. capacity resource.Quantity
  59. // allocatable memory
  60. allocatable resource.Quantity
  61. // memory reserved for OS level processes
  62. systemReserve resource.Quantity
  63. // memory reserved for kubelet (not implemented)
  64. kubeReserve resource.Quantity
  65. // grace period memory limit (not implemented)
  66. softEviction resource.Quantity
  67. // no grace period memory limit
  68. hardEviction resource.Quantity
  69. }
  70. // runDensityBatchTest runs the density batch pod creation test
  71. // checks that a calculated value for NodeAllocatable is equal to the reported value
  72. func checkNodeAllocatableTest(f *framework.Framework) {
  73. nodeMem := getNodeMemory(f)
  74. framework.Logf("nodeMem says: %+v", nodeMem)
  75. // calculate the allocatable mem based on capacity - reserved amounts
  76. calculatedNodeAlloc := nodeMem.capacity.DeepCopy()
  77. calculatedNodeAlloc.Sub(nodeMem.systemReserve)
  78. calculatedNodeAlloc.Sub(nodeMem.kubeReserve)
  79. calculatedNodeAlloc.Sub(nodeMem.softEviction)
  80. calculatedNodeAlloc.Sub(nodeMem.hardEviction)
  81. ginkgo.By(fmt.Sprintf("Checking stated allocatable memory %v against calculated allocatable memory %v", &nodeMem.allocatable, calculatedNodeAlloc))
  82. // sanity check against stated allocatable
  83. framework.ExpectEqual(calculatedNodeAlloc.Cmp(nodeMem.allocatable), 0)
  84. }
  85. // Deploys `allocatablePods + 1` pods, each with a memory limit of `1/allocatablePods` of the total allocatable
  86. // memory, then confirms that the last pod failed because of failedScheduling
  87. func overrideAllocatableMemoryTest(f *framework.Framework, allocatablePods int) {
  88. const (
  89. podType = "memory_limit_test_pod"
  90. )
  91. totalAllocatable := getTotalAllocatableMemory(f)
  92. memValue := totalAllocatable.Value()
  93. memPerPod := memValue / int64(allocatablePods)
  94. ginkgo.By(fmt.Sprintf("Deploying %d pods with mem limit %v, then one additional pod", allocatablePods, memPerPod))
  95. // these should all work
  96. pods := newMemLimitTestPods(allocatablePods, imageutils.GetPauseImageName(), podType, strconv.FormatInt(memPerPod, 10))
  97. f.PodClient().CreateBatch(pods)
  98. failurePods := newMemLimitTestPods(1, imageutils.GetPauseImageName(), podType, strconv.FormatInt(memPerPod, 10))
  99. f.PodClient().Create(failurePods[0])
  100. gomega.Eventually(func() bool {
  101. eventList, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
  102. framework.ExpectNoError(err)
  103. for _, e := range eventList.Items {
  104. // Look for an event that shows FailedScheduling
  105. if e.Type == "Warning" && e.Reason == "FailedScheduling" && e.InvolvedObject.Name == failurePods[0].ObjectMeta.Name {
  106. framework.Logf("Found %+v event with message %+v", e.Reason, e.Message)
  107. return true
  108. }
  109. }
  110. return false
  111. }, 3*time.Minute, 10*time.Second).Should(gomega.Equal(true))
  112. }
  113. // newMemLimitTestPods creates a list of pods (specification) for test.
  114. func newMemLimitTestPods(numPods int, imageName, podType string, memoryLimit string) []*v1.Pod {
  115. var pods []*v1.Pod
  116. memLimitQuantity, err := resource.ParseQuantity(memoryLimit)
  117. framework.ExpectNoError(err)
  118. for i := 0; i < numPods; i++ {
  119. podName := "test-" + string(uuid.NewUUID())
  120. pod := v1.Pod{
  121. ObjectMeta: metav1.ObjectMeta{
  122. Name: podName,
  123. Labels: map[string]string{
  124. "type": podType,
  125. "name": podName,
  126. },
  127. },
  128. Spec: v1.PodSpec{
  129. // Restart policy is always (default).
  130. Containers: []v1.Container{
  131. {
  132. Image: imageName,
  133. Name: podName,
  134. Resources: v1.ResourceRequirements{
  135. Limits: v1.ResourceList{
  136. v1.ResourceMemory: memLimitQuantity,
  137. },
  138. },
  139. },
  140. },
  141. NodeSelector: map[string]string{
  142. "beta.kubernetes.io/os": "windows",
  143. },
  144. },
  145. }
  146. pods = append(pods, &pod)
  147. }
  148. return pods
  149. }
  150. // getNodeMemory populates a nodeMemory struct with information from the first
  151. func getNodeMemory(f *framework.Framework) nodeMemory {
  152. selector := labels.Set{"beta.kubernetes.io/os": "windows"}.AsSelector()
  153. nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
  154. LabelSelector: selector.String(),
  155. })
  156. framework.ExpectNoError(err)
  157. // Assuming that agent nodes have the same config
  158. // Make sure there is >0 agent nodes, then use the first one for info
  159. framework.ExpectNotEqual(nodeList.Size(), 0)
  160. ginkgo.By("Getting memory details from node status and kubelet config")
  161. status := nodeList.Items[0].Status
  162. nodeName := nodeList.Items[0].ObjectMeta.Name
  163. kubeletConfig, err := getCurrentKubeletConfig(nodeName, f.Namespace.Name)
  164. framework.ExpectNoError(err)
  165. systemReserve, err := resource.ParseQuantity(kubeletConfig.SystemReserved["memory"])
  166. if err != nil {
  167. systemReserve = *resource.NewQuantity(0, resource.BinarySI)
  168. }
  169. kubeReserve, err := resource.ParseQuantity(kubeletConfig.KubeReserved["memory"])
  170. if err != nil {
  171. kubeReserve = *resource.NewQuantity(0, resource.BinarySI)
  172. }
  173. hardEviction, err := resource.ParseQuantity(kubeletConfig.EvictionHard["memory.available"])
  174. if err != nil {
  175. hardEviction = *resource.NewQuantity(0, resource.BinarySI)
  176. }
  177. softEviction, err := resource.ParseQuantity(kubeletConfig.EvictionSoft["memory.available"])
  178. if err != nil {
  179. softEviction = *resource.NewQuantity(0, resource.BinarySI)
  180. }
  181. nodeMem := nodeMemory{
  182. capacity: status.Capacity[v1.ResourceMemory],
  183. allocatable: status.Allocatable[v1.ResourceMemory],
  184. systemReserve: systemReserve,
  185. hardEviction: hardEviction,
  186. // these are not implemented and are here for future use - will always be 0 at the moment
  187. kubeReserve: kubeReserve,
  188. softEviction: softEviction,
  189. }
  190. return nodeMem
  191. }
  192. // getTotalAllocatableMemory gets the sum of all agent node's allocatable memory
  193. func getTotalAllocatableMemory(f *framework.Framework) *resource.Quantity {
  194. selector := labels.Set{"beta.kubernetes.io/os": "windows"}.AsSelector()
  195. nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
  196. LabelSelector: selector.String(),
  197. })
  198. framework.ExpectNoError(err)
  199. ginkgo.By("Summing allocatable memory across all agent nodes")
  200. totalAllocatable := resource.NewQuantity(0, resource.BinarySI)
  201. for _, node := range nodeList.Items {
  202. status := node.Status
  203. totalAllocatable.Add(status.Allocatable[v1.ResourceMemory])
  204. }
  205. return totalAllocatable
  206. }
  207. // getCurrentKubeletConfig modified from test/e2e_node/util.go
  208. func getCurrentKubeletConfig(nodeName, namespace string) (*kubeletconfig.KubeletConfiguration, error) {
  209. resp := pollConfigz(5*time.Minute, 5*time.Second, nodeName, namespace)
  210. kubeCfg, err := decodeConfigz(resp)
  211. if err != nil {
  212. return nil, err
  213. }
  214. return kubeCfg, nil
  215. }
  216. // Causes the test to fail, or returns a status 200 response from the /configz endpoint
  217. func pollConfigz(timeout time.Duration, pollInterval time.Duration, nodeName, namespace string) *http.Response {
  218. // start local proxy, so we can send graceful deletion over query string, rather than body parameter
  219. ginkgo.By("Opening proxy to cluster")
  220. tk := e2ekubectl.NewTestKubeconfig(framework.TestContext.CertDir, framework.TestContext.Host, framework.TestContext.KubeConfig, framework.TestContext.KubeContext, framework.TestContext.KubectlPath, namespace)
  221. cmd := tk.KubectlCmd("proxy", "-p", "0")
  222. stdout, stderr, err := framework.StartCmdAndStreamOutput(cmd)
  223. framework.ExpectNoError(err)
  224. defer stdout.Close()
  225. defer stderr.Close()
  226. defer framework.TryKill(cmd)
  227. buf := make([]byte, 128)
  228. var n int
  229. n, err = stdout.Read(buf)
  230. framework.ExpectNoError(err)
  231. output := string(buf[:n])
  232. proxyRegexp := regexp.MustCompile("Starting to serve on 127.0.0.1:([0-9]+)")
  233. match := proxyRegexp.FindStringSubmatch(output)
  234. framework.ExpectEqual(len(match), 2)
  235. port, err := strconv.Atoi(match[1])
  236. framework.ExpectNoError(err)
  237. ginkgo.By("http requesting node kubelet /configz")
  238. endpoint := fmt.Sprintf("http://127.0.0.1:%d/api/v1/nodes/%s/proxy/configz", port, nodeName)
  239. tr := &http.Transport{
  240. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  241. }
  242. client := &http.Client{Transport: tr}
  243. req, err := http.NewRequest("GET", endpoint, nil)
  244. framework.ExpectNoError(err)
  245. req.Header.Add("Accept", "application/json")
  246. var resp *http.Response
  247. gomega.Eventually(func() bool {
  248. resp, err = client.Do(req)
  249. if err != nil {
  250. framework.Logf("Failed to get /configz, retrying. Error: %v", err)
  251. return false
  252. }
  253. if resp.StatusCode != 200 {
  254. framework.Logf("/configz response status not 200, retrying. Response was: %+v", resp)
  255. return false
  256. }
  257. return true
  258. }, timeout, pollInterval).Should(gomega.Equal(true))
  259. return resp
  260. }
  261. // Decodes the http response from /configz and returns a kubeletconfig.KubeletConfiguration (internal type).
  262. func decodeConfigz(resp *http.Response) (*kubeletconfig.KubeletConfiguration, error) {
  263. // This hack because /configz reports the following structure:
  264. // {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
  265. type configzWrapper struct {
  266. ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
  267. }
  268. configz := configzWrapper{}
  269. kubeCfg := kubeletconfig.KubeletConfiguration{}
  270. contentsBytes, err := ioutil.ReadAll(resp.Body)
  271. if err != nil {
  272. return nil, err
  273. }
  274. err = json.Unmarshal(contentsBytes, &configz)
  275. if err != nil {
  276. return nil, err
  277. }
  278. err = scheme.Scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
  279. if err != nil {
  280. return nil, err
  281. }
  282. return &kubeCfg, nil
  283. }