endpointslicecache_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package proxy
  14. import (
  15. "fmt"
  16. "reflect"
  17. "testing"
  18. "k8s.io/api/core/v1"
  19. discovery "k8s.io/api/discovery/v1beta1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. "k8s.io/apimachinery/pkg/types"
  23. utilpointer "k8s.io/utils/pointer"
  24. )
  25. func TestEndpointsMapFromESC(t *testing.T) {
  26. testCases := map[string]struct {
  27. endpointSlices []*discovery.EndpointSlice
  28. hostname string
  29. namespacedName types.NamespacedName
  30. expectedMap map[ServicePortName][]*BaseEndpointInfo
  31. }{
  32. "1 slice, 2 hosts, ports 80,443": {
  33. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  34. hostname: "host1",
  35. endpointSlices: []*discovery.EndpointSlice{
  36. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
  37. },
  38. expectedMap: map[ServicePortName][]*BaseEndpointInfo{
  39. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  40. &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false},
  41. &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
  42. &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false},
  43. },
  44. makeServicePortName("ns1", "svc1", "port-1", v1.ProtocolTCP): {
  45. &BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: false},
  46. &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
  47. &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false},
  48. },
  49. },
  50. },
  51. "2 slices, same port": {
  52. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  53. endpointSlices: []*discovery.EndpointSlice{
  54. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  55. generateEndpointSlice("svc1", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  56. },
  57. expectedMap: map[ServicePortName][]*BaseEndpointInfo{
  58. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  59. &BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
  60. &BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
  61. &BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
  62. &BaseEndpointInfo{Endpoint: "10.0.2.1:80"},
  63. &BaseEndpointInfo{Endpoint: "10.0.2.2:80"},
  64. &BaseEndpointInfo{Endpoint: "10.0.2.3:80"},
  65. },
  66. },
  67. },
  68. // 2 slices, with some overlapping endpoints, result should be a union
  69. // of the 2.
  70. "2 overlapping slices, same port": {
  71. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  72. endpointSlices: []*discovery.EndpointSlice{
  73. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  74. generateEndpointSlice("svc1", "ns1", 1, 4, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  75. },
  76. expectedMap: map[ServicePortName][]*BaseEndpointInfo{
  77. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  78. &BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
  79. &BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
  80. &BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
  81. &BaseEndpointInfo{Endpoint: "10.0.1.4:80"},
  82. },
  83. },
  84. },
  85. // 2 slices with all endpoints overlapping, more unready in first than
  86. // second. If an endpoint is marked ready, we add it to the
  87. // EndpointsMap, even if conditions.Ready isn't true for another
  88. // matching endpoint
  89. "2 slices, overlapping endpoints, some endpoints unready in 1 or both": {
  90. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  91. endpointSlices: []*discovery.EndpointSlice{
  92. generateEndpointSlice("svc1", "ns1", 1, 10, 3, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  93. generateEndpointSlice("svc1", "ns1", 1, 10, 6, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  94. },
  95. expectedMap: map[ServicePortName][]*BaseEndpointInfo{
  96. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  97. &BaseEndpointInfo{Endpoint: "10.0.1.10:80"},
  98. &BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
  99. &BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
  100. &BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
  101. &BaseEndpointInfo{Endpoint: "10.0.1.4:80"},
  102. &BaseEndpointInfo{Endpoint: "10.0.1.5:80"},
  103. &BaseEndpointInfo{Endpoint: "10.0.1.7:80"},
  104. &BaseEndpointInfo{Endpoint: "10.0.1.8:80"},
  105. &BaseEndpointInfo{Endpoint: "10.0.1.9:80"},
  106. },
  107. },
  108. },
  109. "2 slices, overlapping endpoints, all unready": {
  110. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  111. endpointSlices: []*discovery.EndpointSlice{
  112. generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  113. generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  114. },
  115. expectedMap: map[ServicePortName][]*BaseEndpointInfo{},
  116. },
  117. "3 slices with different services and namespaces": {
  118. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  119. endpointSlices: []*discovery.EndpointSlice{
  120. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  121. generateEndpointSlice("svc2", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  122. generateEndpointSlice("svc1", "ns2", 3, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
  123. },
  124. expectedMap: map[ServicePortName][]*BaseEndpointInfo{
  125. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  126. &BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
  127. &BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
  128. &BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
  129. },
  130. },
  131. },
  132. // Ensuring that nil port value will not break things. This will
  133. // represent all ports in the future, but that has not been implemented
  134. // yet.
  135. "Nil port should not break anything": {
  136. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  137. hostname: "host1",
  138. endpointSlices: []*discovery.EndpointSlice{
  139. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{nil}),
  140. },
  141. expectedMap: map[ServicePortName][]*BaseEndpointInfo{},
  142. },
  143. }
  144. for name, tc := range testCases {
  145. t.Run(name, func(t *testing.T) {
  146. esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
  147. cmc := newCacheMutationCheck(tc.endpointSlices)
  148. for _, endpointSlice := range tc.endpointSlices {
  149. esCache.updatePending(endpointSlice, false)
  150. }
  151. compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
  152. cmc.Check(t)
  153. })
  154. }
  155. }
  156. func TestEndpointInfoByServicePort(t *testing.T) {
  157. testCases := map[string]struct {
  158. namespacedName types.NamespacedName
  159. endpointSlices []*discovery.EndpointSlice
  160. hostname string
  161. expectedMap spToEndpointMap
  162. }{
  163. "simple use case with 3 endpoints": {
  164. namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
  165. hostname: "host1",
  166. endpointSlices: []*discovery.EndpointSlice{
  167. generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}),
  168. },
  169. expectedMap: spToEndpointMap{
  170. makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
  171. "10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
  172. "10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Topology: map[string]string{"kubernetes.io/hostname": "host1"}},
  173. "10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
  174. },
  175. },
  176. },
  177. }
  178. for name, tc := range testCases {
  179. t.Run(name, func(t *testing.T) {
  180. esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
  181. for _, endpointSlice := range tc.endpointSlices {
  182. esCache.updatePending(endpointSlice, false)
  183. }
  184. got := esCache.endpointInfoByServicePort(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending)
  185. if !reflect.DeepEqual(got, tc.expectedMap) {
  186. t.Errorf("endpointInfoByServicePort does not match. Want: %+v, Got: %+v", tc.expectedMap, got)
  187. }
  188. })
  189. }
  190. }
  191. func TestEsInfoChanged(t *testing.T) {
  192. p80 := int32(80)
  193. p443 := int32(443)
  194. tcpProto := v1.ProtocolTCP
  195. port80 := discovery.EndpointPort{Port: &p80, Name: utilpointer.StringPtr("http"), Protocol: &tcpProto}
  196. port443 := discovery.EndpointPort{Port: &p443, Name: utilpointer.StringPtr("https"), Protocol: &tcpProto}
  197. endpoint1 := discovery.Endpoint{Addresses: []string{"10.0.1.0"}}
  198. endpoint2 := discovery.Endpoint{Addresses: []string{"10.0.1.1"}}
  199. objMeta := metav1.ObjectMeta{
  200. Name: "foo",
  201. Namespace: "bar",
  202. Labels: map[string]string{discovery.LabelServiceName: "svc1"},
  203. }
  204. testCases := map[string]struct {
  205. cache *EndpointSliceCache
  206. initialSlice *discovery.EndpointSlice
  207. updatedSlice *discovery.EndpointSlice
  208. expectChanged bool
  209. }{
  210. "identical slices, ports only": {
  211. cache: NewEndpointSliceCache("", nil, nil, nil),
  212. initialSlice: &discovery.EndpointSlice{
  213. ObjectMeta: objMeta,
  214. Ports: []discovery.EndpointPort{port80},
  215. },
  216. updatedSlice: &discovery.EndpointSlice{
  217. ObjectMeta: objMeta,
  218. Ports: []discovery.EndpointPort{port80},
  219. },
  220. expectChanged: false,
  221. },
  222. "identical slices, ports out of order": {
  223. cache: NewEndpointSliceCache("", nil, nil, nil),
  224. initialSlice: &discovery.EndpointSlice{
  225. ObjectMeta: objMeta,
  226. Ports: []discovery.EndpointPort{port443, port80},
  227. },
  228. updatedSlice: &discovery.EndpointSlice{
  229. ObjectMeta: objMeta,
  230. Ports: []discovery.EndpointPort{port80, port443},
  231. },
  232. expectChanged: false,
  233. },
  234. "port removed": {
  235. cache: NewEndpointSliceCache("", nil, nil, nil),
  236. initialSlice: &discovery.EndpointSlice{
  237. ObjectMeta: objMeta,
  238. Ports: []discovery.EndpointPort{port443, port80},
  239. },
  240. updatedSlice: &discovery.EndpointSlice{
  241. ObjectMeta: objMeta,
  242. Ports: []discovery.EndpointPort{port443},
  243. },
  244. expectChanged: true,
  245. },
  246. "port added": {
  247. cache: NewEndpointSliceCache("", nil, nil, nil),
  248. initialSlice: &discovery.EndpointSlice{
  249. ObjectMeta: objMeta,
  250. Ports: []discovery.EndpointPort{port443},
  251. },
  252. updatedSlice: &discovery.EndpointSlice{
  253. ObjectMeta: objMeta,
  254. Ports: []discovery.EndpointPort{port443, port80},
  255. },
  256. expectChanged: true,
  257. },
  258. "identical with endpoints": {
  259. cache: NewEndpointSliceCache("", nil, nil, nil),
  260. initialSlice: &discovery.EndpointSlice{
  261. ObjectMeta: objMeta,
  262. Ports: []discovery.EndpointPort{port443},
  263. Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
  264. },
  265. updatedSlice: &discovery.EndpointSlice{
  266. ObjectMeta: objMeta,
  267. Ports: []discovery.EndpointPort{port443},
  268. Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
  269. },
  270. expectChanged: false,
  271. },
  272. "identical with endpoints out of order": {
  273. cache: NewEndpointSliceCache("", nil, nil, nil),
  274. initialSlice: &discovery.EndpointSlice{
  275. ObjectMeta: objMeta,
  276. Ports: []discovery.EndpointPort{port443},
  277. Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
  278. },
  279. updatedSlice: &discovery.EndpointSlice{
  280. ObjectMeta: objMeta,
  281. Ports: []discovery.EndpointPort{port443},
  282. Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
  283. },
  284. expectChanged: false,
  285. },
  286. "identical with endpoint added": {
  287. cache: NewEndpointSliceCache("", nil, nil, nil),
  288. initialSlice: &discovery.EndpointSlice{
  289. ObjectMeta: objMeta,
  290. Ports: []discovery.EndpointPort{port443},
  291. Endpoints: []discovery.Endpoint{endpoint1},
  292. },
  293. updatedSlice: &discovery.EndpointSlice{
  294. ObjectMeta: objMeta,
  295. Ports: []discovery.EndpointPort{port443},
  296. Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
  297. },
  298. expectChanged: true,
  299. },
  300. }
  301. for name, tc := range testCases {
  302. t.Run(name, func(t *testing.T) {
  303. cmc := newCacheMutationCheck([]*discovery.EndpointSlice{tc.initialSlice})
  304. if tc.initialSlice != nil {
  305. tc.cache.updatePending(tc.initialSlice, false)
  306. tc.cache.checkoutChanges()
  307. }
  308. serviceKey, sliceKey, err := endpointSliceCacheKeys(tc.updatedSlice)
  309. if err != nil {
  310. t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
  311. }
  312. esInfo := newEndpointSliceInfo(tc.updatedSlice, false)
  313. changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo)
  314. if tc.expectChanged != changed {
  315. t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
  316. }
  317. cmc.Check(t)
  318. })
  319. }
  320. }
  321. func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, offset, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
  322. tcpProtocol := v1.ProtocolTCP
  323. endpointSlice := &discovery.EndpointSlice{
  324. ObjectMeta: metav1.ObjectMeta{
  325. Name: fmt.Sprintf("%s-%d", serviceName, sliceNum),
  326. Namespace: namespace,
  327. Labels: map[string]string{discovery.LabelServiceName: serviceName},
  328. },
  329. Ports: []discovery.EndpointPort{},
  330. AddressType: discovery.AddressTypeIPv4,
  331. Endpoints: []discovery.Endpoint{},
  332. }
  333. for i, portNum := range portNums {
  334. endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
  335. Name: utilpointer.StringPtr(fmt.Sprintf("port-%d", i)),
  336. Port: portNum,
  337. Protocol: &tcpProtocol,
  338. })
  339. }
  340. for i := 1; i <= numEndpoints; i++ {
  341. endpoint := discovery.Endpoint{
  342. Addresses: []string{fmt.Sprintf("10.0.%d.%d", offset, i)},
  343. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(i%unreadyMod != 0)},
  344. }
  345. if len(hosts) > 0 {
  346. endpoint.Topology = map[string]string{
  347. "kubernetes.io/hostname": hosts[i%len(hosts)],
  348. }
  349. }
  350. endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
  351. }
  352. return endpointSlice
  353. }
  354. func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
  355. return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
  356. }
  357. // cacheMutationCheck helps ensure that cached objects have not been changed
  358. // in any way throughout a test run.
  359. type cacheMutationCheck struct {
  360. objects []cacheObject
  361. }
  362. // cacheObject stores a reference to an original object as well as a deep copy
  363. // of that object to track any mutations in the original object.
  364. type cacheObject struct {
  365. original runtime.Object
  366. deepCopy runtime.Object
  367. }
  368. // newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
  369. func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
  370. cmc := cacheMutationCheck{}
  371. for _, endpointSlice := range endpointSlices {
  372. cmc.Add(endpointSlice)
  373. }
  374. return cmc
  375. }
  376. // Add appends a runtime.Object and a deep copy of that object into the
  377. // cacheMutationCheck.
  378. func (cmc *cacheMutationCheck) Add(o runtime.Object) {
  379. cmc.objects = append(cmc.objects, cacheObject{
  380. original: o,
  381. deepCopy: o.DeepCopyObject(),
  382. })
  383. }
  384. // Check verifies that no objects in the cacheMutationCheck have been mutated.
  385. func (cmc *cacheMutationCheck) Check(t *testing.T) {
  386. for _, o := range cmc.objects {
  387. if !reflect.DeepEqual(o.original, o.deepCopy) {
  388. // Cached objects can't be safely mutated and instead should be deep
  389. // copied before changed in any way.
  390. t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
  391. }
  392. }
  393. }