legacy_metrics_client_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package metrics
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "testing"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/runtime"
  26. "k8s.io/client-go/kubernetes/fake"
  27. restclient "k8s.io/client-go/rest"
  28. core "k8s.io/client-go/testing"
  29. heapster "k8s.io/heapster/metrics/api/v1/types"
  30. metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
  31. "github.com/stretchr/testify/assert"
  32. )
  33. var fixedTimestamp = time.Date(2015, time.November, 10, 12, 30, 0, 0, time.UTC)
  34. func (w fakeResponseWrapper) DoRaw(context.Context) ([]byte, error) {
  35. return w.raw, nil
  36. }
  37. func (w fakeResponseWrapper) Stream(context.Context) (io.ReadCloser, error) {
  38. return nil, nil
  39. }
  40. func newFakeResponseWrapper(raw []byte) fakeResponseWrapper {
  41. return fakeResponseWrapper{raw: raw}
  42. }
  43. type fakeResponseWrapper struct {
  44. raw []byte
  45. }
  46. // timestamp is used for establishing order on metricPoints
  47. type metricPoint struct {
  48. level uint64
  49. timestamp int
  50. }
  51. type testCase struct {
  52. desiredMetricValues PodMetricsInfo
  53. desiredError error
  54. replicas int
  55. targetTimestamp int
  56. window time.Duration
  57. reportedMetricsPoints [][]metricPoint
  58. reportedPodMetrics [][]int64
  59. namespace string
  60. selector labels.Selector
  61. metricSelector labels.Selector
  62. resourceName v1.ResourceName
  63. metricName string
  64. }
  65. func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
  66. namespace := "test-namespace"
  67. tc.namespace = namespace
  68. podNamePrefix := "test-pod"
  69. podLabels := map[string]string{"name": podNamePrefix}
  70. tc.selector = labels.SelectorFromSet(podLabels)
  71. // it's a resource test if we have a resource name
  72. isResource := len(tc.resourceName) > 0
  73. fakeClient := &fake.Clientset{}
  74. fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
  75. obj := &v1.PodList{}
  76. for i := 0; i < tc.replicas; i++ {
  77. podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
  78. pod := buildPod(namespace, podName, podLabels, v1.PodRunning, "1024")
  79. obj.Items = append(obj.Items, pod)
  80. }
  81. return true, obj, nil
  82. })
  83. if isResource {
  84. fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
  85. metrics := metricsapi.PodMetricsList{}
  86. for i, containers := range tc.reportedPodMetrics {
  87. metric := metricsapi.PodMetrics{
  88. ObjectMeta: metav1.ObjectMeta{
  89. Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
  90. Namespace: namespace,
  91. },
  92. Timestamp: metav1.Time{Time: offsetTimestampBy(tc.targetTimestamp)},
  93. Window: metav1.Duration{Duration: tc.window},
  94. Containers: []metricsapi.ContainerMetrics{},
  95. }
  96. for j, cpu := range containers {
  97. cm := metricsapi.ContainerMetrics{
  98. Name: fmt.Sprintf("%s-%d-container-%d", podNamePrefix, i, j),
  99. Usage: v1.ResourceList{
  100. v1.ResourceCPU: *resource.NewMilliQuantity(
  101. cpu,
  102. resource.DecimalSI),
  103. v1.ResourceMemory: *resource.NewQuantity(
  104. int64(1024*1024),
  105. resource.BinarySI),
  106. },
  107. }
  108. metric.Containers = append(metric.Containers, cm)
  109. }
  110. metrics.Items = append(metrics.Items, metric)
  111. }
  112. heapsterRawMemResponse, _ := json.Marshal(&metrics)
  113. return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
  114. })
  115. } else {
  116. fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
  117. metrics := heapster.MetricResultList{}
  118. var latestTimestamp time.Time
  119. for _, reportedMetricPoints := range tc.reportedMetricsPoints {
  120. var heapsterMetricPoints []heapster.MetricPoint
  121. for _, reportedMetricPoint := range reportedMetricPoints {
  122. timestamp := offsetTimestampBy(reportedMetricPoint.timestamp)
  123. if latestTimestamp.Before(timestamp) {
  124. latestTimestamp = timestamp
  125. }
  126. heapsterMetricPoint := heapster.MetricPoint{Timestamp: timestamp, Value: reportedMetricPoint.level, FloatValue: nil}
  127. heapsterMetricPoints = append(heapsterMetricPoints, heapsterMetricPoint)
  128. }
  129. metric := heapster.MetricResult{
  130. Metrics: heapsterMetricPoints,
  131. LatestTimestamp: latestTimestamp,
  132. }
  133. metrics.Items = append(metrics.Items, metric)
  134. }
  135. heapsterRawMemResponse, _ := json.Marshal(&metrics)
  136. return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
  137. })
  138. }
  139. return fakeClient
  140. }
  141. func buildPod(namespace, podName string, podLabels map[string]string, phase v1.PodPhase, request string) v1.Pod {
  142. return v1.Pod{
  143. ObjectMeta: metav1.ObjectMeta{
  144. Name: podName,
  145. Namespace: namespace,
  146. Labels: podLabels,
  147. },
  148. Spec: v1.PodSpec{
  149. Containers: []v1.Container{
  150. {
  151. Resources: v1.ResourceRequirements{
  152. Requests: v1.ResourceList{
  153. v1.ResourceCPU: resource.MustParse(request),
  154. },
  155. },
  156. },
  157. },
  158. },
  159. Status: v1.PodStatus{
  160. Phase: phase,
  161. Conditions: []v1.PodCondition{
  162. {
  163. Type: v1.PodReady,
  164. Status: v1.ConditionTrue,
  165. },
  166. },
  167. },
  168. }
  169. }
  170. func (tc *testCase) verifyResults(t *testing.T, metrics PodMetricsInfo, timestamp time.Time, err error) {
  171. if tc.desiredError != nil {
  172. assert.Error(t, err, "there should be an error retrieving the metrics")
  173. assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError), "the error message should be eas expected")
  174. return
  175. }
  176. assert.NoError(t, err, "there should be no error retrieving the metrics")
  177. assert.NotNil(t, metrics, "there should be metrics returned")
  178. if len(metrics) != len(tc.desiredMetricValues) {
  179. t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
  180. } else {
  181. for k, m := range metrics {
  182. if !m.Timestamp.Equal(tc.desiredMetricValues[k].Timestamp) ||
  183. m.Window != tc.desiredMetricValues[k].Window ||
  184. m.Value != tc.desiredMetricValues[k].Value {
  185. t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
  186. break
  187. }
  188. }
  189. }
  190. targetTimestamp := offsetTimestampBy(tc.targetTimestamp)
  191. assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp))
  192. }
  193. func (tc *testCase) runTest(t *testing.T) {
  194. testClient := tc.prepareTestClient(t)
  195. metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
  196. isResource := len(tc.resourceName) > 0
  197. if isResource {
  198. info, timestamp, err := metricsClient.GetResourceMetric(tc.resourceName, tc.namespace, tc.selector)
  199. tc.verifyResults(t, info, timestamp, err)
  200. } else {
  201. info, timestamp, err := metricsClient.GetRawMetric(tc.metricName, tc.namespace, tc.selector, tc.metricSelector)
  202. tc.verifyResults(t, info, timestamp, err)
  203. }
  204. }
  205. func TestCPU(t *testing.T) {
  206. targetTimestamp := 1
  207. window := 30 * time.Second
  208. tc := testCase{
  209. replicas: 3,
  210. desiredMetricValues: PodMetricsInfo{
  211. "test-pod-0": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  212. "test-pod-1": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  213. "test-pod-2": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  214. },
  215. resourceName: v1.ResourceCPU,
  216. targetTimestamp: targetTimestamp,
  217. window: window,
  218. reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
  219. }
  220. tc.runTest(t)
  221. }
  222. func TestQPS(t *testing.T) {
  223. targetTimestamp := 1
  224. tc := testCase{
  225. replicas: 3,
  226. desiredMetricValues: PodMetricsInfo{
  227. "test-pod-0": PodMetric{Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  228. "test-pod-1": PodMetric{Value: 20000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  229. "test-pod-2": PodMetric{Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  230. },
  231. metricName: "qps",
  232. targetTimestamp: targetTimestamp,
  233. reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}},
  234. }
  235. tc.runTest(t)
  236. }
  237. func TestQpsSumEqualZero(t *testing.T) {
  238. targetTimestamp := 0
  239. tc := testCase{
  240. replicas: 3,
  241. desiredMetricValues: PodMetricsInfo{
  242. "test-pod-0": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  243. "test-pod-1": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  244. "test-pod-2": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  245. },
  246. metricName: "qps",
  247. targetTimestamp: targetTimestamp,
  248. reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}},
  249. }
  250. tc.runTest(t)
  251. }
  252. func TestCPUMoreMetrics(t *testing.T) {
  253. targetTimestamp := 10
  254. window := 30 * time.Second
  255. tc := testCase{
  256. replicas: 5,
  257. desiredMetricValues: PodMetricsInfo{
  258. "test-pod-0": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  259. "test-pod-1": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  260. "test-pod-2": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  261. "test-pod-3": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  262. "test-pod-4": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  263. },
  264. resourceName: v1.ResourceCPU,
  265. targetTimestamp: targetTimestamp,
  266. window: window,
  267. reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}},
  268. }
  269. tc.runTest(t)
  270. }
  271. func TestCPUMissingMetrics(t *testing.T) {
  272. targetTimestamp := 0
  273. window := 30 * time.Second
  274. tc := testCase{
  275. replicas: 3,
  276. desiredMetricValues: PodMetricsInfo{
  277. "test-pod-0": PodMetric{Value: 4000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  278. },
  279. resourceName: v1.ResourceCPU,
  280. targetTimestamp: targetTimestamp,
  281. window: window,
  282. reportedPodMetrics: [][]int64{{4000}},
  283. }
  284. tc.runTest(t)
  285. }
  286. func TestQpsMissingMetrics(t *testing.T) {
  287. tc := testCase{
  288. replicas: 3,
  289. desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 1"),
  290. metricName: "qps",
  291. targetTimestamp: 1,
  292. reportedMetricsPoints: [][]metricPoint{{{4000, 4}}},
  293. }
  294. tc.runTest(t)
  295. }
  296. func TestQpsSuperfluousMetrics(t *testing.T) {
  297. tc := testCase{
  298. replicas: 3,
  299. desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 6"),
  300. metricName: "qps",
  301. reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}},
  302. }
  303. tc.runTest(t)
  304. }
  305. func TestCPUEmptyMetrics(t *testing.T) {
  306. tc := testCase{
  307. replicas: 3,
  308. resourceName: v1.ResourceCPU,
  309. desiredError: fmt.Errorf("no metrics returned from heapster"),
  310. reportedMetricsPoints: [][]metricPoint{},
  311. reportedPodMetrics: [][]int64{},
  312. }
  313. tc.runTest(t)
  314. }
  315. func TestQpsEmptyEntries(t *testing.T) {
  316. targetTimestamp := 4
  317. tc := testCase{
  318. replicas: 3,
  319. metricName: "qps",
  320. desiredMetricValues: PodMetricsInfo{
  321. "test-pod-0": PodMetric{Value: 4000000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  322. "test-pod-2": PodMetric{Value: 2000000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
  323. },
  324. targetTimestamp: targetTimestamp,
  325. reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {}, {{2000, 4}}},
  326. }
  327. tc.runTest(t)
  328. }
  329. func TestCPUZeroReplicas(t *testing.T) {
  330. tc := testCase{
  331. replicas: 0,
  332. resourceName: v1.ResourceCPU,
  333. desiredError: fmt.Errorf("no metrics returned from heapster"),
  334. reportedPodMetrics: [][]int64{},
  335. }
  336. tc.runTest(t)
  337. }
  338. func TestCPUEmptyMetricsForOnePod(t *testing.T) {
  339. targetTimestamp := 0
  340. window := 30 * time.Second
  341. tc := testCase{
  342. replicas: 3,
  343. resourceName: v1.ResourceCPU,
  344. desiredMetricValues: PodMetricsInfo{
  345. "test-pod-0": PodMetric{Value: 100, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  346. "test-pod-1": PodMetric{Value: 700, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
  347. },
  348. targetTimestamp: targetTimestamp,
  349. window: window,
  350. reportedPodMetrics: [][]int64{{100}, {300, 400}, {}},
  351. }
  352. tc.runTest(t)
  353. }
  354. func offsetTimestampBy(t int) time.Time {
  355. return fixedTimestamp.Add(time.Duration(t) * time.Minute)
  356. }