reconciler_test.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpointslice
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "time"
  21. dto "github.com/prometheus/client_model/go"
  22. "github.com/stretchr/testify/assert"
  23. corev1 "k8s.io/api/core/v1"
  24. discovery "k8s.io/api/discovery/v1beta1"
  25. apiequality "k8s.io/apimachinery/pkg/api/equality"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. "k8s.io/client-go/informers"
  29. "k8s.io/client-go/kubernetes/fake"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. k8stesting "k8s.io/client-go/testing"
  32. compmetrics "k8s.io/component-base/metrics"
  33. "k8s.io/kubernetes/pkg/controller"
  34. "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
  35. utilpointer "k8s.io/utils/pointer"
  36. )
  37. var defaultMaxEndpointsPerSlice = int32(100)
  38. // Even when there are no pods, we want to have a placeholder slice for each service
  39. func TestReconcileEmpty(t *testing.T) {
  40. client := newClientset()
  41. setupMetrics()
  42. namespace := "test"
  43. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  44. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  45. reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{}, time.Now())
  46. expectActions(t, client.Actions(), 1, "create", "endpointslices")
  47. slices := fetchEndpointSlices(t, client, namespace)
  48. assert.Len(t, slices, 1, "Expected 1 endpoint slices")
  49. assert.Regexp(t, "^"+svc.Name, slices[0].Name)
  50. assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
  51. assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
  52. assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
  53. expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "100")
  54. expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
  55. }
  56. // Given a single pod matching a service selector and no existing endpoint slices,
  57. // a slice should be created
  58. func TestReconcile1Pod(t *testing.T) {
  59. namespace := "test"
  60. ipv6Family := corev1.IPv6Protocol
  61. svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
  62. svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
  63. svcv6.Spec.IPFamily = &ipv6Family
  64. svcv6ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
  65. svcv6ClusterIP.Spec.ClusterIP = "1234::5678:0000:0000:9abc:def1"
  66. pod1 := newPod(1, namespace, true, 1)
  67. pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
  68. pod1.Spec.Hostname = "example-hostname"
  69. node1 := &corev1.Node{
  70. ObjectMeta: metav1.ObjectMeta{
  71. Name: pod1.Spec.NodeName,
  72. Labels: map[string]string{
  73. "topology.kubernetes.io/zone": "us-central1-a",
  74. "topology.kubernetes.io/region": "us-central1",
  75. },
  76. },
  77. }
  78. testCases := map[string]struct {
  79. service corev1.Service
  80. expectedAddressType discovery.AddressType
  81. expectedEndpoint discovery.Endpoint
  82. }{
  83. "ipv4": {
  84. service: svcv4,
  85. expectedAddressType: discovery.AddressTypeIPv4,
  86. expectedEndpoint: discovery.Endpoint{
  87. Addresses: []string{"1.2.3.4"},
  88. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  89. Topology: map[string]string{
  90. "kubernetes.io/hostname": "node-1",
  91. "topology.kubernetes.io/zone": "us-central1-a",
  92. "topology.kubernetes.io/region": "us-central1",
  93. },
  94. TargetRef: &corev1.ObjectReference{
  95. Kind: "Pod",
  96. Namespace: namespace,
  97. Name: "pod1",
  98. },
  99. },
  100. },
  101. "ipv6": {
  102. service: svcv6,
  103. expectedAddressType: discovery.AddressTypeIPv6,
  104. expectedEndpoint: discovery.Endpoint{
  105. Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
  106. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  107. Topology: map[string]string{
  108. "kubernetes.io/hostname": "node-1",
  109. "topology.kubernetes.io/zone": "us-central1-a",
  110. "topology.kubernetes.io/region": "us-central1",
  111. },
  112. TargetRef: &corev1.ObjectReference{
  113. Kind: "Pod",
  114. Namespace: namespace,
  115. Name: "pod1",
  116. },
  117. },
  118. },
  119. "ipv6-clusterip": {
  120. service: svcv6ClusterIP,
  121. expectedAddressType: discovery.AddressTypeIPv6,
  122. expectedEndpoint: discovery.Endpoint{
  123. Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
  124. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  125. Topology: map[string]string{
  126. "kubernetes.io/hostname": "node-1",
  127. "topology.kubernetes.io/zone": "us-central1-a",
  128. "topology.kubernetes.io/region": "us-central1",
  129. },
  130. TargetRef: &corev1.ObjectReference{
  131. Kind: "Pod",
  132. Namespace: namespace,
  133. Name: "pod1",
  134. },
  135. },
  136. },
  137. }
  138. for name, testCase := range testCases {
  139. t.Run(name, func(t *testing.T) {
  140. client := newClientset()
  141. setupMetrics()
  142. triggerTime := time.Now()
  143. r := newReconciler(client, []*corev1.Node{node1}, defaultMaxEndpointsPerSlice)
  144. reconcileHelper(t, r, &testCase.service, []*corev1.Pod{pod1}, []*discovery.EndpointSlice{}, triggerTime)
  145. if len(client.Actions()) != 1 {
  146. t.Errorf("Expected 1 clientset action, got %d", len(client.Actions()))
  147. }
  148. slices := fetchEndpointSlices(t, client, namespace)
  149. if len(slices) != 1 {
  150. t.Fatalf("Expected 1 EndpointSlice, got %d", len(slices))
  151. }
  152. slice := slices[0]
  153. if !strings.HasPrefix(slice.Name, testCase.service.Name) {
  154. t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
  155. }
  156. if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
  157. t.Errorf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
  158. }
  159. if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
  160. t.Errorf("Expected EndpointSlice trigger time annotation to be %s, got %s", triggerTime.Format(time.RFC3339Nano), slice.Annotations[corev1.EndpointsLastChangeTriggerTime])
  161. }
  162. if len(slice.Endpoints) != 1 {
  163. t.Fatalf("Expected 1 Endpoint, got %d", len(slice.Endpoints))
  164. }
  165. endpoint := slice.Endpoints[0]
  166. if !reflect.DeepEqual(endpoint, testCase.expectedEndpoint) {
  167. t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
  168. }
  169. expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100")
  170. expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
  171. })
  172. }
  173. }
  174. // given an existing endpoint slice and no pods matching the service, the existing
  175. // slice should be updated to a placeholder (not deleted)
  176. func TestReconcile1EndpointSlice(t *testing.T) {
  177. client := newClientset()
  178. setupMetrics()
  179. namespace := "test"
  180. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  181. endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
  182. _, createErr := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice1, metav1.CreateOptions{})
  183. assert.Nil(t, createErr, "Expected no error creating endpoint slice")
  184. numActionsBefore := len(client.Actions())
  185. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  186. reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{endpointSlice1}, time.Now())
  187. assert.Len(t, client.Actions(), numActionsBefore+1, "Expected 1 additional clientset action")
  188. actions := client.Actions()
  189. assert.True(t, actions[numActionsBefore].Matches("update", "endpointslices"), "Action should be update endpoint slice")
  190. slices := fetchEndpointSlices(t, client, namespace)
  191. assert.Len(t, slices, 1, "Expected 1 endpoint slices")
  192. assert.Regexp(t, "^"+svc.Name, slices[0].Name)
  193. assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
  194. assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
  195. assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
  196. expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "200")
  197. expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
  198. }
  199. // when a Service has PublishNotReadyAddresses set to true, corresponding
  200. // Endpoints should be considered ready, even if the backing Pod is not.
  201. func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
  202. client := newClientset()
  203. namespace := "test"
  204. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  205. svc.Spec.PublishNotReadyAddresses = true
  206. // start with 50 pods, 1/3 not ready
  207. pods := []*corev1.Pod{}
  208. for i := 0; i < 50; i++ {
  209. ready := !(i%3 == 0)
  210. pods = append(pods, newPod(i, namespace, ready, 1))
  211. }
  212. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  213. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  214. // Only 1 action, an EndpointSlice create
  215. assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
  216. expectActions(t, client.Actions(), 1, "create", "endpointslices")
  217. // Two endpoint slices should be completely full, the remainder should be in another one
  218. endpointSlices := fetchEndpointSlices(t, client, namespace)
  219. for _, endpointSlice := range endpointSlices {
  220. for i, endpoint := range endpointSlice.Endpoints {
  221. if !*endpoint.Conditions.Ready {
  222. t.Errorf("Expected endpoints[%d] to be ready", i)
  223. }
  224. }
  225. }
  226. expectUnorderedSlicesWithLengths(t, endpointSlices, []int{50})
  227. }
  228. // a simple use case with 250 pods matching a service and no existing slices
  229. // reconcile should create 3 slices, completely filling 2 of them
  230. func TestReconcileManyPods(t *testing.T) {
  231. client := newClientset()
  232. setupMetrics()
  233. namespace := "test"
  234. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  235. // start with 250 pods
  236. pods := []*corev1.Pod{}
  237. for i := 0; i < 250; i++ {
  238. ready := !(i%3 == 0)
  239. pods = append(pods, newPod(i, namespace, ready, 1))
  240. }
  241. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  242. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  243. // This is an ideal scenario where only 3 actions are required, and they're all creates
  244. assert.Len(t, client.Actions(), 3, "Expected 3 additional clientset actions")
  245. expectActions(t, client.Actions(), 3, "create", "endpointslices")
  246. // Two endpoint slices should be completely full, the remainder should be in another one
  247. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
  248. expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0})
  249. }
  250. // now with preexisting slices, we have 250 pods matching a service
  251. // the first endpoint slice contains 62 endpoints, all desired
  252. // the second endpoint slice contains 61 endpoints, all desired
  253. // that leaves 127 to add
  254. // to minimize writes, our strategy is to create new slices for multiples of 100
  255. // that leaves 27 to drop in an existing slice
  256. // dropping them in the first slice will result in the slice being closest to full
  257. // this approach requires 1 update + 1 create instead of 2 updates + 1 create
  258. func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
  259. client := newClientset()
  260. setupMetrics()
  261. namespace := "test"
  262. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  263. // start with 250 pods
  264. pods := []*corev1.Pod{}
  265. for i := 0; i < 250; i++ {
  266. ready := !(i%3 == 0)
  267. pods = append(pods, newPod(i, namespace, ready, 1))
  268. }
  269. // have approximately 1/4 in first slice
  270. endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
  271. for i := 1; i < len(pods)-4; i += 4 {
  272. endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
  273. }
  274. // have approximately 1/4 in second slice
  275. endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
  276. for i := 3; i < len(pods)-4; i += 4 {
  277. endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
  278. }
  279. existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
  280. cmc := newCacheMutationCheck(existingSlices)
  281. createEndpointSlices(t, client, namespace, existingSlices)
  282. numActionsBefore := len(client.Actions())
  283. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  284. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  285. actions := client.Actions()
  286. assert.Equal(t, numActionsBefore+2, len(actions), "Expected 2 additional client actions as part of reconcile")
  287. assert.True(t, actions[numActionsBefore].Matches("create", "endpointslices"), "First action should be create endpoint slice")
  288. assert.True(t, actions[numActionsBefore+1].Matches("update", "endpointslices"), "Second action should be update endpoint slice")
  289. // 1 new slice (0->100) + 1 updated slice (62->89)
  290. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
  291. expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
  292. // ensure cache mutation has not occurred
  293. cmc.Check(t)
  294. }
  295. // now with preexisting slices, we have 300 pods matching a service
  296. // this scenario will show some less ideal allocation
  297. // the first endpoint slice contains 74 endpoints, all desired
  298. // the second endpoint slice contains 74 endpoints, all desired
  299. // that leaves 152 to add
  300. // to minimize writes, our strategy is to create new slices for multiples of 100
  301. // that leaves 52 to drop in an existing slice
  302. // that capacity could fit if split in the 2 existing slices
  303. // to minimize writes though, reconcile create a new slice with those 52 endpoints
  304. // this approach requires 2 creates instead of 2 updates + 1 create
  305. func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
  306. client := newClientset()
  307. setupMetrics()
  308. namespace := "test"
  309. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  310. // start with 300 pods
  311. pods := []*corev1.Pod{}
  312. for i := 0; i < 300; i++ {
  313. ready := !(i%3 == 0)
  314. pods = append(pods, newPod(i, namespace, ready, 1))
  315. }
  316. // have approximately 1/4 in first slice
  317. endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
  318. for i := 1; i < len(pods)-4; i += 4 {
  319. endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
  320. }
  321. // have approximately 1/4 in second slice
  322. endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
  323. for i := 3; i < len(pods)-4; i += 4 {
  324. endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
  325. }
  326. existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
  327. cmc := newCacheMutationCheck(existingSlices)
  328. createEndpointSlices(t, client, namespace, existingSlices)
  329. numActionsBefore := len(client.Actions())
  330. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  331. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  332. actions := client.Actions()
  333. assert.Equal(t, numActionsBefore+2, len(actions), "Expected 2 additional client actions as part of reconcile")
  334. expectActions(t, client.Actions(), 2, "create", "endpointslices")
  335. // 2 new slices (100, 52) in addition to existing slices (74, 74)
  336. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
  337. expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
  338. // ensure cache mutation has not occurred
  339. cmc.Check(t)
  340. }
  341. // In some cases, such as a service port change, all slices for that service will require a change
  342. // This test ensures that we are updating those slices and not calling create + delete for each
  343. func TestReconcileEndpointSlicesUpdating(t *testing.T) {
  344. client := newClientset()
  345. namespace := "test"
  346. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  347. // start with 250 pods
  348. pods := []*corev1.Pod{}
  349. for i := 0; i < 250; i++ {
  350. ready := !(i%3 == 0)
  351. pods = append(pods, newPod(i, namespace, ready, 1))
  352. }
  353. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  354. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  355. numActionsExpected := 3
  356. assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
  357. slices := fetchEndpointSlices(t, client, namespace)
  358. numActionsExpected++
  359. expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})
  360. svc.Spec.Ports[0].TargetPort.IntVal = 81
  361. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())
  362. numActionsExpected += 3
  363. assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
  364. expectActions(t, client.Actions(), 3, "update", "endpointslices")
  365. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
  366. }
  367. // In this test, we start with 10 slices that only have 30 endpoints each
  368. // An initial reconcile makes no changes (as desired to limit writes)
  369. // When we change a service port, all slices will need to be updated in some way
  370. // reconcile repacks the endpoints into 3 slices, and deletes the extras
  371. func TestReconcileEndpointSlicesRecycling(t *testing.T) {
  372. client := newClientset()
  373. setupMetrics()
  374. namespace := "test"
  375. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  376. // start with 300 pods
  377. pods := []*corev1.Pod{}
  378. for i := 0; i < 300; i++ {
  379. ready := !(i%3 == 0)
  380. pods = append(pods, newPod(i, namespace, ready, 1))
  381. }
  382. // generate 10 existing slices with 30 pods/endpoints each
  383. existingSlices := []*discovery.EndpointSlice{}
  384. for i, pod := range pods {
  385. sliceNum := i / 30
  386. if i%30 == 0 {
  387. existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
  388. }
  389. existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
  390. }
  391. cmc := newCacheMutationCheck(existingSlices)
  392. createEndpointSlices(t, client, namespace, existingSlices)
  393. numActionsBefore := len(client.Actions())
  394. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  395. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  396. // initial reconcile should be a no op, all pods are accounted for in slices, no repacking should be done
  397. assert.Equal(t, numActionsBefore+0, len(client.Actions()), "Expected 0 additional client actions as part of reconcile")
  398. // changing a service port should require all slices to be updated, time for a repack
  399. svc.Spec.Ports[0].TargetPort.IntVal = 81
  400. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  401. // this should reflect 3 updates + 7 deletes
  402. assert.Equal(t, numActionsBefore+10, len(client.Actions()), "Expected 10 additional client actions as part of reconcile")
  403. // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
  404. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
  405. expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
  406. // ensure cache mutation has not occurred
  407. cmc.Check(t)
  408. }
  409. // In this test, we want to verify that endpoints are added to a slice that will
  410. // be closest to full after the operation, even when slices are already marked
  411. // for update.
  412. func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
  413. client := newClientset()
  414. setupMetrics()
  415. namespace := "test"
  416. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  417. existingSlices := []*discovery.EndpointSlice{}
  418. pods := []*corev1.Pod{}
  419. slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
  420. for i := 0; i < 80; i++ {
  421. pod := newPod(i, namespace, true, 1)
  422. slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
  423. pods = append(pods, pod)
  424. }
  425. existingSlices = append(existingSlices, slice1)
  426. slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
  427. for i := 100; i < 120; i++ {
  428. pod := newPod(i, namespace, true, 1)
  429. slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
  430. pods = append(pods, pod)
  431. }
  432. existingSlices = append(existingSlices, slice2)
  433. cmc := newCacheMutationCheck(existingSlices)
  434. createEndpointSlices(t, client, namespace, existingSlices)
  435. // ensure that endpoints in each slice will be marked for update.
  436. for i, pod := range pods {
  437. if i%10 == 0 {
  438. pod.Status.Conditions = []corev1.PodCondition{{
  439. Type: corev1.PodReady,
  440. Status: corev1.ConditionFalse,
  441. }}
  442. }
  443. }
  444. // add a few additional endpoints - no more than could fit in either slice.
  445. for i := 200; i < 215; i++ {
  446. pods = append(pods, newPod(i, namespace, true, 1))
  447. }
  448. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  449. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  450. // ensure that both endpoint slices have been updated
  451. expectActions(t, client.Actions(), 2, "update", "endpointslices")
  452. expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0})
  453. // additional pods should get added to fuller slice
  454. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
  455. // ensure cache mutation has not occurred
  456. cmc.Check(t)
  457. }
  458. // In this test, we want to verify that old EndpointSlices with a deprecated IP
  459. // address type will be replaced with a newer IPv4 type.
  460. func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
  461. client := newClientset()
  462. setupMetrics()
  463. namespace := "test"
  464. svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
  465. endpointMeta.AddressType = discovery.AddressTypeIP
  466. existingSlices := []*discovery.EndpointSlice{}
  467. pods := []*corev1.Pod{}
  468. slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
  469. for i := 0; i < 80; i++ {
  470. pod := newPod(i, namespace, true, 1)
  471. slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
  472. pods = append(pods, pod)
  473. }
  474. existingSlices = append(existingSlices, slice1)
  475. slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
  476. for i := 100; i < 150; i++ {
  477. pod := newPod(i, namespace, true, 1)
  478. slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
  479. pods = append(pods, pod)
  480. }
  481. existingSlices = append(existingSlices, slice2)
  482. createEndpointSlices(t, client, namespace, existingSlices)
  483. cmc := newCacheMutationCheck(existingSlices)
  484. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  485. reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
  486. // ensure that both original endpoint slices have been deleted
  487. expectActions(t, client.Actions(), 2, "delete", "endpointslices")
  488. endpointSlices := fetchEndpointSlices(t, client, namespace)
  489. // since this involved replacing both EndpointSlices, the result should be
  490. // perfectly packed.
  491. expectUnorderedSlicesWithLengths(t, endpointSlices, []int{100, 30})
  492. for _, endpointSlice := range endpointSlices {
  493. if endpointSlice.AddressType != discovery.AddressTypeIPv4 {
  494. t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
  495. }
  496. }
  497. // ensure cache mutation has not occurred
  498. cmc.Check(t)
  499. }
  500. // Named ports can map to different port numbers on different pods.
  501. // This test ensures that EndpointSlices are grouped correctly in that case.
  502. func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
  503. client := newClientset()
  504. setupMetrics()
  505. namespace := "test"
  506. portNameIntStr := intstr.IntOrString{
  507. Type: intstr.String,
  508. StrVal: "http",
  509. }
  510. svc := corev1.Service{
  511. ObjectMeta: metav1.ObjectMeta{Name: "named-port-example", Namespace: namespace},
  512. Spec: corev1.ServiceSpec{
  513. Ports: []corev1.ServicePort{{
  514. TargetPort: portNameIntStr,
  515. Protocol: corev1.ProtocolTCP,
  516. }},
  517. Selector: map[string]string{"foo": "bar"},
  518. },
  519. }
  520. // start with 300 pods
  521. pods := []*corev1.Pod{}
  522. for i := 0; i < 300; i++ {
  523. ready := !(i%3 == 0)
  524. portOffset := i % 5
  525. pod := newPod(i, namespace, ready, 1)
  526. pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{
  527. Name: portNameIntStr.StrVal,
  528. ContainerPort: int32(8080 + portOffset),
  529. Protocol: corev1.ProtocolTCP,
  530. }}
  531. pods = append(pods, pod)
  532. }
  533. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  534. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  535. // reconcile should create 5 endpoint slices
  536. assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile")
  537. expectActions(t, client.Actions(), 5, "create", "endpointslices")
  538. expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0})
  539. fetchedSlices := fetchEndpointSlices(t, client, namespace)
  540. // each slice should have 60 endpoints to match 5 unique variations of named port mapping
  541. expectUnorderedSlicesWithLengths(t, fetchedSlices, []int{60, 60, 60, 60, 60})
  542. // generate data structures for expected slice ports and address types
  543. protoTCP := corev1.ProtocolTCP
  544. expectedSlices := []discovery.EndpointSlice{}
  545. for i := range fetchedSlices {
  546. expectedSlices = append(expectedSlices, discovery.EndpointSlice{
  547. Ports: []discovery.EndpointPort{{
  548. Name: utilpointer.StringPtr(""),
  549. Protocol: &protoTCP,
  550. Port: utilpointer.Int32Ptr(int32(8080 + i)),
  551. }},
  552. AddressType: discovery.AddressTypeIPv4,
  553. })
  554. }
  555. // slices fetched should match expected address type and ports
  556. expectUnorderedSlicesWithTopLevelAttrs(t, fetchedSlices, expectedSlices)
  557. }
  558. // This test ensures that maxEndpointsPerSlice configuration results in
  559. // appropriate endpoints distribution among slices
  560. func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
  561. namespace := "test"
  562. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  563. // start with 250 pods
  564. pods := []*corev1.Pod{}
  565. for i := 0; i < 250; i++ {
  566. ready := !(i%3 == 0)
  567. pods = append(pods, newPod(i, namespace, ready, 1))
  568. }
  569. testCases := []struct {
  570. maxEndpointsPerSlice int32
  571. expectedSliceLengths []int
  572. expectedMetricValues expectedMetrics
  573. }{
  574. {
  575. maxEndpointsPerSlice: int32(50),
  576. expectedSliceLengths: []int{50, 50, 50, 50, 50},
  577. expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5},
  578. }, {
  579. maxEndpointsPerSlice: int32(80),
  580. expectedSliceLengths: []int{80, 80, 80, 10},
  581. expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4},
  582. }, {
  583. maxEndpointsPerSlice: int32(150),
  584. expectedSliceLengths: []int{150, 100},
  585. expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2},
  586. }, {
  587. maxEndpointsPerSlice: int32(250),
  588. expectedSliceLengths: []int{250},
  589. expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
  590. }, {
  591. maxEndpointsPerSlice: int32(500),
  592. expectedSliceLengths: []int{250},
  593. expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
  594. },
  595. }
  596. for _, testCase := range testCases {
  597. t.Run(fmt.Sprintf("maxEndpointsPerSlice: %d", testCase.maxEndpointsPerSlice), func(t *testing.T) {
  598. client := newClientset()
  599. setupMetrics()
  600. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice)
  601. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  602. expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths)
  603. expectMetrics(t, testCase.expectedMetricValues)
  604. })
  605. }
  606. }
  607. func TestReconcileEndpointSlicesMetrics(t *testing.T) {
  608. client := newClientset()
  609. setupMetrics()
  610. namespace := "test"
  611. svc, _ := newServiceAndEndpointMeta("foo", namespace)
  612. // start with 20 pods
  613. pods := []*corev1.Pod{}
  614. for i := 0; i < 20; i++ {
  615. pods = append(pods, newPod(i, namespace, true, 1))
  616. }
  617. r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
  618. reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
  619. actions := client.Actions()
  620. assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile")
  621. assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice")
  622. expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
  623. fetchedSlices := fetchEndpointSlices(t, client, namespace)
  624. reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now())
  625. expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0})
  626. }
  627. // Test Helpers
  628. func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler {
  629. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  630. nodeInformer := informerFactory.Core().V1().Nodes()
  631. indexer := nodeInformer.Informer().GetIndexer()
  632. for _, node := range nodes {
  633. indexer.Add(node)
  634. }
  635. return &reconciler{
  636. client: client,
  637. nodeLister: corelisters.NewNodeLister(indexer),
  638. maxEndpointsPerSlice: maxEndpointsPerSlice,
  639. endpointSliceTracker: newEndpointSliceTracker(),
  640. metricsCache: metrics.NewCache(maxEndpointsPerSlice),
  641. }
  642. }
  643. // ensures endpoint slices exist with the desired set of lengths
  644. func expectUnorderedSlicesWithLengths(t *testing.T, endpointSlices []discovery.EndpointSlice, expectedLengths []int) {
  645. assert.Len(t, endpointSlices, len(expectedLengths), "Expected %d endpoint slices", len(expectedLengths))
  646. lengthsWithNoMatch := []int{}
  647. desiredLengths := expectedLengths
  648. actualLengths := []int{}
  649. for _, endpointSlice := range endpointSlices {
  650. actualLen := len(endpointSlice.Endpoints)
  651. actualLengths = append(actualLengths, actualLen)
  652. matchFound := false
  653. for i := 0; i < len(desiredLengths); i++ {
  654. if desiredLengths[i] == actualLen {
  655. matchFound = true
  656. desiredLengths = append(desiredLengths[:i], desiredLengths[i+1:]...)
  657. break
  658. }
  659. }
  660. if !matchFound {
  661. lengthsWithNoMatch = append(lengthsWithNoMatch, actualLen)
  662. }
  663. }
  664. if len(lengthsWithNoMatch) > 0 || len(desiredLengths) > 0 {
  665. t.Errorf("Actual slice lengths (%v) don't match expected (%v)", actualLengths, expectedLengths)
  666. }
  667. }
  668. // ensures endpoint slices exist with the desired set of ports and address types
  669. func expectUnorderedSlicesWithTopLevelAttrs(t *testing.T, endpointSlices []discovery.EndpointSlice, expectedSlices []discovery.EndpointSlice) {
  670. t.Helper()
  671. assert.Len(t, endpointSlices, len(expectedSlices), "Expected %d endpoint slices", len(expectedSlices))
  672. slicesWithNoMatch := []discovery.EndpointSlice{}
  673. for _, endpointSlice := range endpointSlices {
  674. matchFound := false
  675. for i := 0; i < len(expectedSlices); i++ {
  676. if portsAndAddressTypeEqual(expectedSlices[i], endpointSlice) {
  677. matchFound = true
  678. expectedSlices = append(expectedSlices[:i], expectedSlices[i+1:]...)
  679. break
  680. }
  681. }
  682. if !matchFound {
  683. slicesWithNoMatch = append(slicesWithNoMatch, endpointSlice)
  684. }
  685. }
  686. assert.Len(t, slicesWithNoMatch, 0, "EndpointSlice(s) found without matching attributes")
  687. assert.Len(t, expectedSlices, 0, "Expected slices(s) not found in EndpointSlices")
  688. }
  689. func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, resource string) {
  690. t.Helper()
  691. for i := 0; i < num; i++ {
  692. relativePos := len(actions) - i - 1
  693. assert.Equal(t, verb, actions[relativePos].GetVerb(), "Expected action -%d verb to be %s", i, verb)
  694. assert.Equal(t, resource, actions[relativePos].GetResource().Resource, "Expected action -%d resource to be %s", i, resource)
  695. }
  696. }
  697. func expectTrackedResourceVersion(t *testing.T, tracker *endpointSliceTracker, slice *discovery.EndpointSlice, expectedRV string) {
  698. rrv := tracker.relatedResourceVersions(slice)
  699. rv, tracked := rrv[slice.Name]
  700. if !tracked {
  701. t.Fatalf("Expected EndpointSlice %s to be tracked", slice.Name)
  702. }
  703. if rv != expectedRV {
  704. t.Errorf("Expected ResourceVersion of %s to be %s, got %s", slice.Name, expectedRV, rv)
  705. }
  706. }
  707. func portsAndAddressTypeEqual(slice1, slice2 discovery.EndpointSlice) bool {
  708. return apiequality.Semantic.DeepEqual(slice1.Ports, slice2.Ports) && apiequality.Semantic.DeepEqual(slice1.AddressType, slice2.AddressType)
  709. }
  710. func createEndpointSlices(t *testing.T, client *fake.Clientset, namespace string, endpointSlices []*discovery.EndpointSlice) {
  711. t.Helper()
  712. for _, endpointSlice := range endpointSlices {
  713. _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
  714. if err != nil {
  715. t.Fatalf("Expected no error creating Endpoint Slice, got: %v", err)
  716. }
  717. }
  718. }
  719. func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) []discovery.EndpointSlice {
  720. t.Helper()
  721. fetchedSlices, err := client.DiscoveryV1beta1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{})
  722. if err != nil {
  723. t.Fatalf("Expected no error fetching Endpoint Slices, got: %v", err)
  724. return []discovery.EndpointSlice{}
  725. }
  726. return fetchedSlices.Items
  727. }
  728. func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) {
  729. t.Helper()
  730. err := r.reconcile(service, pods, existingSlices, triggerTime)
  731. if err != nil {
  732. t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err)
  733. }
  734. }
  735. // Metrics helpers
  736. type expectedMetrics struct {
  737. desiredSlices int
  738. actualSlices int
  739. desiredEndpoints int
  740. addedPerSync int
  741. removedPerSync int
  742. numCreated int
  743. numUpdated int
  744. numDeleted int
  745. }
  746. func expectMetrics(t *testing.T, em expectedMetrics) {
  747. t.Helper()
  748. actualDesiredSlices := getGaugeMetricValue(t, metrics.DesiredEndpointSlices.WithLabelValues())
  749. if actualDesiredSlices != float64(em.desiredSlices) {
  750. t.Errorf("Expected desiredEndpointSlices to be %d, got %v", em.desiredSlices, actualDesiredSlices)
  751. }
  752. actualNumSlices := getGaugeMetricValue(t, metrics.NumEndpointSlices.WithLabelValues())
  753. if actualDesiredSlices != float64(em.desiredSlices) {
  754. t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices)
  755. }
  756. actualEndpointsDesired := getGaugeMetricValue(t, metrics.EndpointsDesired.WithLabelValues())
  757. if actualEndpointsDesired != float64(em.desiredEndpoints) {
  758. t.Errorf("Expected desiredEndpoints to be %d, got %v", em.desiredEndpoints, actualEndpointsDesired)
  759. }
  760. actualAddedPerSync := getHistogramMetricValue(t, metrics.EndpointsAddedPerSync.WithLabelValues())
  761. if actualAddedPerSync != float64(em.addedPerSync) {
  762. t.Errorf("Expected endpointsAddedPerSync to be %d, got %v", em.addedPerSync, actualAddedPerSync)
  763. }
  764. actualRemovedPerSync := getHistogramMetricValue(t, metrics.EndpointsRemovedPerSync.WithLabelValues())
  765. if actualRemovedPerSync != float64(em.removedPerSync) {
  766. t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync)
  767. }
  768. actualCreated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("create"))
  769. if actualCreated != float64(em.numCreated) {
  770. t.Errorf("Expected endpointSliceChangesCreated to be %d, got %v", em.numCreated, actualCreated)
  771. }
  772. actualUpdated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("update"))
  773. if actualUpdated != float64(em.numUpdated) {
  774. t.Errorf("Expected endpointSliceChangesUpdated to be %d, got %v", em.numUpdated, actualUpdated)
  775. }
  776. actualDeleted := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("delete"))
  777. if actualDeleted != float64(em.numDeleted) {
  778. t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
  779. }
  780. }
  781. func setupMetrics() {
  782. metrics.RegisterMetrics()
  783. metrics.NumEndpointSlices.Delete(map[string]string{})
  784. metrics.DesiredEndpointSlices.Delete(map[string]string{})
  785. metrics.EndpointsDesired.Delete(map[string]string{})
  786. metrics.EndpointsAddedPerSync.Delete(map[string]string{})
  787. metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
  788. metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
  789. metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
  790. metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
  791. }
  792. func getGaugeMetricValue(t *testing.T, metric compmetrics.GaugeMetric) float64 {
  793. t.Helper()
  794. metricProto := &dto.Metric{}
  795. if err := metric.Write(metricProto); err != nil {
  796. t.Errorf("Error writing metric: %v", err)
  797. }
  798. return metricProto.Gauge.GetValue()
  799. }
  800. func getCounterMetricValue(t *testing.T, metric compmetrics.CounterMetric) float64 {
  801. t.Helper()
  802. metricProto := &dto.Metric{}
  803. if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
  804. t.Errorf("Error writing metric: %v", err)
  805. }
  806. return metricProto.Counter.GetValue()
  807. }
  808. func getHistogramMetricValue(t *testing.T, metric compmetrics.ObserverMetric) float64 {
  809. t.Helper()
  810. metricProto := &dto.Metric{}
  811. if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
  812. t.Errorf("Error writing metric: %v", err)
  813. }
  814. return metricProto.Histogram.GetSampleSum()
  815. }