roundrobin_test.go 30 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "net"
  16. "testing"
  17. "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/kubernetes/pkg/proxy"
  21. )
  22. func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
  23. loadBalancer := NewLoadBalancerRR()
  24. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
  25. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  26. if err == nil {
  27. t.Errorf("Didn't fail with non-existent service")
  28. }
  29. if len(endpoint) != 0 {
  30. t.Errorf("Got an endpoint")
  31. }
  32. }
  33. func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  34. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
  35. if err != nil {
  36. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  37. }
  38. if endpoint != expected {
  39. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  40. }
  41. }
  42. func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  43. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
  44. if err != nil {
  45. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  46. }
  47. if endpoint != expected {
  48. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  49. }
  50. }
  51. func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
  52. loadBalancer := NewLoadBalancerRR()
  53. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  54. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  55. if err == nil || len(endpoint) != 0 {
  56. t.Errorf("Didn't fail with non-existent service")
  57. }
  58. endpoints := &v1.Endpoints{
  59. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  60. Subsets: []v1.EndpointSubset{{
  61. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  62. Ports: []v1.EndpointPort{{Name: "p", Port: 40}},
  63. }},
  64. }
  65. loadBalancer.OnEndpointsAdd(endpoints)
  66. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  67. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  68. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  69. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  70. }
  71. func stringsInSlice(haystack []string, needles ...string) bool {
  72. for _, needle := range needles {
  73. found := false
  74. for i := range haystack {
  75. if haystack[i] == needle {
  76. found = true
  77. break
  78. }
  79. }
  80. if found == false {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
  87. loadBalancer := NewLoadBalancerRR()
  88. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  89. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  90. if err == nil || len(endpoint) != 0 {
  91. t.Errorf("Didn't fail with non-existent service")
  92. }
  93. endpoints := &v1.Endpoints{
  94. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  95. Subsets: []v1.EndpointSubset{{
  96. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  97. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
  98. }},
  99. }
  100. loadBalancer.OnEndpointsAdd(endpoints)
  101. shuffledEndpoints := loadBalancer.services[service].endpoints
  102. if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
  103. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  104. }
  105. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  106. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
  107. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
  108. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  109. }
  110. func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
  111. loadBalancer := NewLoadBalancerRR()
  112. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  113. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  114. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  115. if err == nil || len(endpoint) != 0 {
  116. t.Errorf("Didn't fail with non-existent service")
  117. }
  118. endpoints := &v1.Endpoints{
  119. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  120. Subsets: []v1.EndpointSubset{
  121. {
  122. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
  123. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}},
  124. },
  125. {
  126. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  127. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}},
  128. },
  129. },
  130. }
  131. loadBalancer.OnEndpointsAdd(endpoints)
  132. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  133. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
  134. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  135. }
  136. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  137. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  138. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  139. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  140. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  141. if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
  142. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  143. }
  144. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  145. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  146. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  147. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  148. }
  149. func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  150. loadBalancer := NewLoadBalancerRR()
  151. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  152. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  153. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  154. if err == nil || len(endpoint) != 0 {
  155. t.Errorf("Didn't fail with non-existent service")
  156. }
  157. endpointsv1 := &v1.Endpoints{
  158. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  159. Subsets: []v1.EndpointSubset{
  160. {
  161. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  162. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}},
  163. },
  164. {
  165. Addresses: []v1.EndpointAddress{{IP: "endpoint2"}},
  166. Ports: []v1.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}},
  167. },
  168. {
  169. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  170. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}},
  171. },
  172. },
  173. }
  174. loadBalancer.OnEndpointsAdd(endpointsv1)
  175. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  176. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
  177. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  178. }
  179. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  180. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  181. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  182. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  183. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  184. if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
  185. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  186. }
  187. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  188. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  189. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  190. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  191. // Then update the configuration with one fewer endpoints, make sure
  192. // we start in the beginning again
  193. endpointsv2 := &v1.Endpoints{
  194. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  195. Subsets: []v1.EndpointSubset{
  196. {
  197. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}},
  198. Ports: []v1.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}},
  199. },
  200. {
  201. Addresses: []v1.EndpointAddress{{IP: "endpoint5"}},
  202. Ports: []v1.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}},
  203. },
  204. },
  205. }
  206. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  207. shuffledEndpoints = loadBalancer.services[serviceP].endpoints
  208. if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
  209. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  210. }
  211. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  212. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  213. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  214. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  215. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  216. if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
  217. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  218. }
  219. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  220. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  221. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  222. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  223. // Clear endpoints
  224. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
  225. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  226. endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
  227. if err == nil || len(endpoint) != 0 {
  228. t.Errorf("Didn't fail with non-existent service")
  229. }
  230. }
  231. func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  232. loadBalancer := NewLoadBalancerRR()
  233. fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  234. barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
  235. endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
  236. if err == nil || len(endpoint) != 0 {
  237. t.Errorf("Didn't fail with non-existent service")
  238. }
  239. endpoints1 := &v1.Endpoints{
  240. ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
  241. Subsets: []v1.EndpointSubset{
  242. {
  243. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
  244. Ports: []v1.EndpointPort{{Name: "p", Port: 123}},
  245. },
  246. },
  247. }
  248. endpoints2 := &v1.Endpoints{
  249. ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
  250. Subsets: []v1.EndpointSubset{
  251. {
  252. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
  253. Ports: []v1.EndpointPort{{Name: "p", Port: 456}},
  254. },
  255. },
  256. }
  257. loadBalancer.OnEndpointsAdd(endpoints1)
  258. loadBalancer.OnEndpointsAdd(endpoints2)
  259. shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
  260. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  261. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  262. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil)
  263. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  264. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  265. shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints
  266. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  267. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  268. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  269. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  270. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  271. // Then update the configuration by removing foo
  272. loadBalancer.OnEndpointsDelete(endpoints1)
  273. endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
  274. if err == nil || len(endpoint) != 0 {
  275. t.Errorf("Didn't fail with non-existent service")
  276. }
  277. // but bar is still there, and we continue RR from where we left off.
  278. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  279. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  280. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  281. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  282. }
  283. func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
  284. loadBalancer := NewLoadBalancerRR()
  285. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  286. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  287. if err == nil || len(endpoint) != 0 {
  288. t.Errorf("Didn't fail with non-existent service")
  289. }
  290. // Call NewService() before OnEndpointsUpdate()
  291. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  292. endpoints := &v1.Endpoints{
  293. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  294. Subsets: []v1.EndpointSubset{
  295. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  296. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  297. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  298. },
  299. }
  300. loadBalancer.OnEndpointsAdd(endpoints)
  301. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  302. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  303. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  304. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  305. if err != nil {
  306. t.Errorf("Didn't find a service for %s: %v", service, err)
  307. }
  308. expectEndpoint(t, loadBalancer, service, ep1, client1)
  309. expectEndpoint(t, loadBalancer, service, ep1, client1)
  310. expectEndpoint(t, loadBalancer, service, ep1, client1)
  311. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  312. if err != nil {
  313. t.Errorf("Didn't find a service for %s: %v", service, err)
  314. }
  315. expectEndpoint(t, loadBalancer, service, ep2, client2)
  316. expectEndpoint(t, loadBalancer, service, ep2, client2)
  317. expectEndpoint(t, loadBalancer, service, ep2, client2)
  318. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  319. if err != nil {
  320. t.Errorf("Didn't find a service for %s: %v", service, err)
  321. }
  322. expectEndpoint(t, loadBalancer, service, ep3, client3)
  323. expectEndpoint(t, loadBalancer, service, ep3, client3)
  324. expectEndpoint(t, loadBalancer, service, ep3, client3)
  325. expectEndpoint(t, loadBalancer, service, ep1, client1)
  326. expectEndpoint(t, loadBalancer, service, ep2, client2)
  327. expectEndpoint(t, loadBalancer, service, ep3, client3)
  328. expectEndpoint(t, loadBalancer, service, ep1, client1)
  329. expectEndpoint(t, loadBalancer, service, ep2, client2)
  330. expectEndpoint(t, loadBalancer, service, ep3, client3)
  331. }
  332. func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
  333. loadBalancer := NewLoadBalancerRR()
  334. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  335. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  336. if err == nil || len(endpoint) != 0 {
  337. t.Errorf("Didn't fail with non-existent service")
  338. }
  339. // Call OnEndpointsUpdate() before NewService()
  340. endpoints := &v1.Endpoints{
  341. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  342. Subsets: []v1.EndpointSubset{
  343. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  344. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  345. },
  346. }
  347. loadBalancer.OnEndpointsAdd(endpoints)
  348. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  349. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  350. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  351. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  352. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  353. if err != nil {
  354. t.Errorf("Didn't find a service for %s: %v", service, err)
  355. }
  356. expectEndpoint(t, loadBalancer, service, ep1, client1)
  357. expectEndpoint(t, loadBalancer, service, ep1, client1)
  358. expectEndpoint(t, loadBalancer, service, ep1, client1)
  359. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  360. if err != nil {
  361. t.Errorf("Didn't find a service for %s: %v", service, err)
  362. }
  363. expectEndpoint(t, loadBalancer, service, ep2, client2)
  364. expectEndpoint(t, loadBalancer, service, ep2, client2)
  365. expectEndpoint(t, loadBalancer, service, ep2, client2)
  366. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  367. if err != nil {
  368. t.Errorf("Didn't find a service for %s: %v", service, err)
  369. }
  370. expectEndpoint(t, loadBalancer, service, ep3, client3)
  371. expectEndpoint(t, loadBalancer, service, ep3, client3)
  372. expectEndpoint(t, loadBalancer, service, ep3, client3)
  373. expectEndpoint(t, loadBalancer, service, ep1, client1)
  374. expectEndpoint(t, loadBalancer, service, ep2, client2)
  375. expectEndpoint(t, loadBalancer, service, ep3, client3)
  376. expectEndpoint(t, loadBalancer, service, ep1, client1)
  377. expectEndpoint(t, loadBalancer, service, ep2, client2)
  378. expectEndpoint(t, loadBalancer, service, ep3, client3)
  379. }
  380. func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
  381. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  382. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  383. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  384. client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0}
  385. client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
  386. client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
  387. loadBalancer := NewLoadBalancerRR()
  388. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  389. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  390. if err == nil || len(endpoint) != 0 {
  391. t.Errorf("Didn't fail with non-existent service")
  392. }
  393. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  394. endpointsv1 := &v1.Endpoints{
  395. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  396. Subsets: []v1.EndpointSubset{
  397. {
  398. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  399. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  400. },
  401. },
  402. }
  403. loadBalancer.OnEndpointsAdd(endpointsv1)
  404. shuffledEndpoints := loadBalancer.services[service].endpoints
  405. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  406. client1Endpoint := shuffledEndpoints[0]
  407. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  408. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  409. client2Endpoint := shuffledEndpoints[1]
  410. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  411. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  412. client3Endpoint := shuffledEndpoints[2]
  413. endpointsv2 := &v1.Endpoints{
  414. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  415. Subsets: []v1.EndpointSubset{
  416. {
  417. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  418. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}},
  419. },
  420. },
  421. }
  422. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  423. shuffledEndpoints = loadBalancer.services[service].endpoints
  424. if client1Endpoint == "endpoint:3" {
  425. client1Endpoint = shuffledEndpoints[0]
  426. } else if client2Endpoint == "endpoint:3" {
  427. client2Endpoint = shuffledEndpoints[0]
  428. } else if client3Endpoint == "endpoint:3" {
  429. client3Endpoint = shuffledEndpoints[0]
  430. }
  431. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  432. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  433. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  434. endpointsv3 := &v1.Endpoints{
  435. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  436. Subsets: []v1.EndpointSubset{
  437. {
  438. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  439. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}},
  440. },
  441. },
  442. }
  443. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  444. shuffledEndpoints = loadBalancer.services[service].endpoints
  445. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  446. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  447. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  448. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
  449. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
  450. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
  451. }
  452. func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  453. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  454. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  455. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  456. loadBalancer := NewLoadBalancerRR()
  457. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  458. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  459. if err == nil || len(endpoint) != 0 {
  460. t.Errorf("Didn't fail with non-existent service")
  461. }
  462. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  463. endpointsv1 := &v1.Endpoints{
  464. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  465. Subsets: []v1.EndpointSubset{
  466. {
  467. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  468. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  469. },
  470. },
  471. }
  472. loadBalancer.OnEndpointsAdd(endpointsv1)
  473. shuffledEndpoints := loadBalancer.services[service].endpoints
  474. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  475. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  476. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  477. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  478. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  479. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  480. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  481. // Then update the configuration with one fewer endpoints, make sure
  482. // we start in the beginning again
  483. endpointsv2 := &v1.Endpoints{
  484. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  485. Subsets: []v1.EndpointSubset{
  486. {
  487. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  488. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  489. },
  490. },
  491. }
  492. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  493. shuffledEndpoints = loadBalancer.services[service].endpoints
  494. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  495. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  496. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  497. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  498. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  499. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  500. // Clear endpoints
  501. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
  502. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  503. endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
  504. if err == nil || len(endpoint) != 0 {
  505. t.Errorf("Didn't fail with non-existent service")
  506. }
  507. }
  508. func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  509. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  510. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  511. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  512. loadBalancer := NewLoadBalancerRR()
  513. fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  514. endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
  515. if err == nil || len(endpoint) != 0 {
  516. t.Errorf("Didn't fail with non-existent service")
  517. }
  518. loadBalancer.NewService(fooService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  519. endpoints1 := &v1.Endpoints{
  520. ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
  521. Subsets: []v1.EndpointSubset{
  522. {
  523. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  524. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  525. },
  526. },
  527. }
  528. barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
  529. loadBalancer.NewService(barService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  530. endpoints2 := &v1.Endpoints{
  531. ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
  532. Subsets: []v1.EndpointSubset{
  533. {
  534. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  535. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  536. },
  537. },
  538. }
  539. loadBalancer.OnEndpointsAdd(endpoints1)
  540. loadBalancer.OnEndpointsAdd(endpoints2)
  541. shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
  542. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  543. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  544. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  545. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  546. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  547. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  548. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  549. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  550. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  551. shuffledBarEndpoints := loadBalancer.services[barService].endpoints
  552. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  553. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  554. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  555. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  556. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  557. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  558. // Then update the configuration by removing foo
  559. loadBalancer.OnEndpointsDelete(endpoints1)
  560. endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
  561. if err == nil || len(endpoint) != 0 {
  562. t.Errorf("Didn't fail with non-existent service")
  563. }
  564. // but bar is still there, and we continue RR from where we left off.
  565. shuffledBarEndpoints = loadBalancer.services[barService].endpoints
  566. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  567. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  568. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  569. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  570. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  571. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  572. }
  573. func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
  574. loadBalancer := NewLoadBalancerRR()
  575. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  576. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  577. if err == nil || len(endpoint) != 0 {
  578. t.Errorf("Didn't fail with non-existent service")
  579. }
  580. // Call NewService() before OnEndpointsUpdate()
  581. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  582. endpoints := &v1.Endpoints{
  583. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  584. Subsets: []v1.EndpointSubset{
  585. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  586. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  587. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  588. },
  589. }
  590. loadBalancer.OnEndpointsAdd(endpoints)
  591. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  592. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  593. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  594. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  595. if err != nil {
  596. t.Errorf("Didn't find a service for %s: %v", service, err)
  597. }
  598. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  599. if err != nil {
  600. t.Errorf("Didn't find a service for %s: %v", service, err)
  601. }
  602. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  603. if err != nil {
  604. t.Errorf("Didn't find a service for %s: %v", service, err)
  605. }
  606. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
  607. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
  608. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  609. expectEndpoint(t, loadBalancer, service, ep2, client2)
  610. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  611. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  612. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  613. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  614. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  615. }