roundrobin_test.go 31 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "net"
  16. "testing"
  17. "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/kubernetes/pkg/proxy"
  21. )
  22. func TestValidateWorks(t *testing.T) {
  23. if isValidEndpoint(&hostPortPair{}) {
  24. t.Errorf("Didn't fail for empty set")
  25. }
  26. if isValidEndpoint(&hostPortPair{host: "foobar"}) {
  27. t.Errorf("Didn't fail with invalid port")
  28. }
  29. if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
  30. t.Errorf("Didn't fail with a negative port")
  31. }
  32. if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
  33. t.Errorf("Failed a valid config.")
  34. }
  35. }
  36. func TestFilterWorks(t *testing.T) {
  37. endpoints := []hostPortPair{
  38. {host: "foobar", port: 1},
  39. {host: "foobar", port: 2},
  40. {host: "foobar", port: -1},
  41. {host: "foobar", port: 3},
  42. {host: "foobar", port: -2},
  43. }
  44. filtered := flattenValidEndpoints(endpoints)
  45. if len(filtered) != 3 {
  46. t.Errorf("Failed to filter to the correct size")
  47. }
  48. if filtered[0] != "foobar:1" {
  49. t.Errorf("Index zero is not foobar:1")
  50. }
  51. if filtered[1] != "foobar:2" {
  52. t.Errorf("Index one is not foobar:2")
  53. }
  54. if filtered[2] != "foobar:3" {
  55. t.Errorf("Index two is not foobar:3")
  56. }
  57. }
  58. func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
  59. loadBalancer := NewLoadBalancerRR()
  60. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
  61. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  62. if err == nil {
  63. t.Errorf("Didn't fail with non-existent service")
  64. }
  65. if len(endpoint) != 0 {
  66. t.Errorf("Got an endpoint")
  67. }
  68. }
  69. func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  70. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
  71. if err != nil {
  72. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  73. }
  74. if endpoint != expected {
  75. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  76. }
  77. }
  78. func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  79. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
  80. if err != nil {
  81. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  82. }
  83. if endpoint != expected {
  84. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  85. }
  86. }
  87. func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
  88. loadBalancer := NewLoadBalancerRR()
  89. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  90. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  91. if err == nil || len(endpoint) != 0 {
  92. t.Errorf("Didn't fail with non-existent service")
  93. }
  94. endpoints := &v1.Endpoints{
  95. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  96. Subsets: []v1.EndpointSubset{{
  97. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  98. Ports: []v1.EndpointPort{{Name: "p", Port: 40}},
  99. }},
  100. }
  101. loadBalancer.OnEndpointsAdd(endpoints)
  102. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  103. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  104. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  105. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  106. }
  107. func stringsInSlice(haystack []string, needles ...string) bool {
  108. for _, needle := range needles {
  109. found := false
  110. for i := range haystack {
  111. if haystack[i] == needle {
  112. found = true
  113. break
  114. }
  115. }
  116. if found == false {
  117. return false
  118. }
  119. }
  120. return true
  121. }
  122. func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
  123. loadBalancer := NewLoadBalancerRR()
  124. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  125. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  126. if err == nil || len(endpoint) != 0 {
  127. t.Errorf("Didn't fail with non-existent service")
  128. }
  129. endpoints := &v1.Endpoints{
  130. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  131. Subsets: []v1.EndpointSubset{{
  132. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  133. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
  134. }},
  135. }
  136. loadBalancer.OnEndpointsAdd(endpoints)
  137. shuffledEndpoints := loadBalancer.services[service].endpoints
  138. if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
  139. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  140. }
  141. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  142. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
  143. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
  144. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  145. }
  146. func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
  147. loadBalancer := NewLoadBalancerRR()
  148. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  149. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  150. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  151. if err == nil || len(endpoint) != 0 {
  152. t.Errorf("Didn't fail with non-existent service")
  153. }
  154. endpoints := &v1.Endpoints{
  155. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  156. Subsets: []v1.EndpointSubset{
  157. {
  158. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
  159. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}},
  160. },
  161. {
  162. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  163. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}},
  164. },
  165. },
  166. }
  167. loadBalancer.OnEndpointsAdd(endpoints)
  168. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  169. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
  170. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  171. }
  172. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  173. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  174. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  175. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  176. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  177. if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
  178. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  179. }
  180. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  181. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  182. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  183. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  184. }
  185. func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  186. loadBalancer := NewLoadBalancerRR()
  187. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  188. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  189. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  190. if err == nil || len(endpoint) != 0 {
  191. t.Errorf("Didn't fail with non-existent service")
  192. }
  193. endpointsv1 := &v1.Endpoints{
  194. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  195. Subsets: []v1.EndpointSubset{
  196. {
  197. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  198. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}},
  199. },
  200. {
  201. Addresses: []v1.EndpointAddress{{IP: "endpoint2"}},
  202. Ports: []v1.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}},
  203. },
  204. {
  205. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  206. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}},
  207. },
  208. },
  209. }
  210. loadBalancer.OnEndpointsAdd(endpointsv1)
  211. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  212. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
  213. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  214. }
  215. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  216. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  217. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  218. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  219. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  220. if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
  221. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  222. }
  223. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  224. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  225. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  226. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  227. // Then update the configuration with one fewer endpoints, make sure
  228. // we start in the beginning again
  229. endpointsv2 := &v1.Endpoints{
  230. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  231. Subsets: []v1.EndpointSubset{
  232. {
  233. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}},
  234. Ports: []v1.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}},
  235. },
  236. {
  237. Addresses: []v1.EndpointAddress{{IP: "endpoint5"}},
  238. Ports: []v1.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}},
  239. },
  240. },
  241. }
  242. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  243. shuffledEndpoints = loadBalancer.services[serviceP].endpoints
  244. if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
  245. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  246. }
  247. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  248. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  249. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  250. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  251. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  252. if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
  253. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  254. }
  255. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  256. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  257. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  258. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  259. // Clear endpoints
  260. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
  261. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  262. endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
  263. if err == nil || len(endpoint) != 0 {
  264. t.Errorf("Didn't fail with non-existent service")
  265. }
  266. }
  267. func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  268. loadBalancer := NewLoadBalancerRR()
  269. fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  270. barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
  271. endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
  272. if err == nil || len(endpoint) != 0 {
  273. t.Errorf("Didn't fail with non-existent service")
  274. }
  275. endpoints1 := &v1.Endpoints{
  276. ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
  277. Subsets: []v1.EndpointSubset{
  278. {
  279. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
  280. Ports: []v1.EndpointPort{{Name: "p", Port: 123}},
  281. },
  282. },
  283. }
  284. endpoints2 := &v1.Endpoints{
  285. ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
  286. Subsets: []v1.EndpointSubset{
  287. {
  288. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
  289. Ports: []v1.EndpointPort{{Name: "p", Port: 456}},
  290. },
  291. },
  292. }
  293. loadBalancer.OnEndpointsAdd(endpoints1)
  294. loadBalancer.OnEndpointsAdd(endpoints2)
  295. shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
  296. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  297. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  298. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil)
  299. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  300. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  301. shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints
  302. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  303. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  304. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  305. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  306. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  307. // Then update the configuration by removing foo
  308. loadBalancer.OnEndpointsDelete(endpoints1)
  309. endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
  310. if err == nil || len(endpoint) != 0 {
  311. t.Errorf("Didn't fail with non-existent service")
  312. }
  313. // but bar is still there, and we continue RR from where we left off.
  314. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  315. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  316. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  317. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  318. }
  319. func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
  320. loadBalancer := NewLoadBalancerRR()
  321. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  322. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  323. if err == nil || len(endpoint) != 0 {
  324. t.Errorf("Didn't fail with non-existent service")
  325. }
  326. // Call NewService() before OnEndpointsUpdate()
  327. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  328. endpoints := &v1.Endpoints{
  329. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  330. Subsets: []v1.EndpointSubset{
  331. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  332. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  333. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  334. },
  335. }
  336. loadBalancer.OnEndpointsAdd(endpoints)
  337. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  338. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  339. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  340. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  341. if err != nil {
  342. t.Errorf("Didn't find a service for %s: %v", service, err)
  343. }
  344. expectEndpoint(t, loadBalancer, service, ep1, client1)
  345. expectEndpoint(t, loadBalancer, service, ep1, client1)
  346. expectEndpoint(t, loadBalancer, service, ep1, client1)
  347. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  348. if err != nil {
  349. t.Errorf("Didn't find a service for %s: %v", service, err)
  350. }
  351. expectEndpoint(t, loadBalancer, service, ep2, client2)
  352. expectEndpoint(t, loadBalancer, service, ep2, client2)
  353. expectEndpoint(t, loadBalancer, service, ep2, client2)
  354. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  355. if err != nil {
  356. t.Errorf("Didn't find a service for %s: %v", service, err)
  357. }
  358. expectEndpoint(t, loadBalancer, service, ep3, client3)
  359. expectEndpoint(t, loadBalancer, service, ep3, client3)
  360. expectEndpoint(t, loadBalancer, service, ep3, client3)
  361. expectEndpoint(t, loadBalancer, service, ep1, client1)
  362. expectEndpoint(t, loadBalancer, service, ep2, client2)
  363. expectEndpoint(t, loadBalancer, service, ep3, client3)
  364. expectEndpoint(t, loadBalancer, service, ep1, client1)
  365. expectEndpoint(t, loadBalancer, service, ep2, client2)
  366. expectEndpoint(t, loadBalancer, service, ep3, client3)
  367. }
  368. func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
  369. loadBalancer := NewLoadBalancerRR()
  370. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  371. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  372. if err == nil || len(endpoint) != 0 {
  373. t.Errorf("Didn't fail with non-existent service")
  374. }
  375. // Call OnEndpointsUpdate() before NewService()
  376. endpoints := &v1.Endpoints{
  377. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  378. Subsets: []v1.EndpointSubset{
  379. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  380. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  381. },
  382. }
  383. loadBalancer.OnEndpointsAdd(endpoints)
  384. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  385. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  386. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  387. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  388. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  389. if err != nil {
  390. t.Errorf("Didn't find a service for %s: %v", service, err)
  391. }
  392. expectEndpoint(t, loadBalancer, service, ep1, client1)
  393. expectEndpoint(t, loadBalancer, service, ep1, client1)
  394. expectEndpoint(t, loadBalancer, service, ep1, client1)
  395. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  396. if err != nil {
  397. t.Errorf("Didn't find a service for %s: %v", service, err)
  398. }
  399. expectEndpoint(t, loadBalancer, service, ep2, client2)
  400. expectEndpoint(t, loadBalancer, service, ep2, client2)
  401. expectEndpoint(t, loadBalancer, service, ep2, client2)
  402. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  403. if err != nil {
  404. t.Errorf("Didn't find a service for %s: %v", service, err)
  405. }
  406. expectEndpoint(t, loadBalancer, service, ep3, client3)
  407. expectEndpoint(t, loadBalancer, service, ep3, client3)
  408. expectEndpoint(t, loadBalancer, service, ep3, client3)
  409. expectEndpoint(t, loadBalancer, service, ep1, client1)
  410. expectEndpoint(t, loadBalancer, service, ep2, client2)
  411. expectEndpoint(t, loadBalancer, service, ep3, client3)
  412. expectEndpoint(t, loadBalancer, service, ep1, client1)
  413. expectEndpoint(t, loadBalancer, service, ep2, client2)
  414. expectEndpoint(t, loadBalancer, service, ep3, client3)
  415. }
  416. func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
  417. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  418. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  419. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  420. client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0}
  421. client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
  422. client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
  423. loadBalancer := NewLoadBalancerRR()
  424. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  425. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  426. if err == nil || len(endpoint) != 0 {
  427. t.Errorf("Didn't fail with non-existent service")
  428. }
  429. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  430. endpointsv1 := &v1.Endpoints{
  431. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  432. Subsets: []v1.EndpointSubset{
  433. {
  434. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  435. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  436. },
  437. },
  438. }
  439. loadBalancer.OnEndpointsAdd(endpointsv1)
  440. shuffledEndpoints := loadBalancer.services[service].endpoints
  441. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  442. client1Endpoint := shuffledEndpoints[0]
  443. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  444. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  445. client2Endpoint := shuffledEndpoints[1]
  446. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  447. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  448. client3Endpoint := shuffledEndpoints[2]
  449. endpointsv2 := &v1.Endpoints{
  450. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  451. Subsets: []v1.EndpointSubset{
  452. {
  453. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  454. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}},
  455. },
  456. },
  457. }
  458. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  459. shuffledEndpoints = loadBalancer.services[service].endpoints
  460. if client1Endpoint == "endpoint:3" {
  461. client1Endpoint = shuffledEndpoints[0]
  462. } else if client2Endpoint == "endpoint:3" {
  463. client2Endpoint = shuffledEndpoints[0]
  464. } else if client3Endpoint == "endpoint:3" {
  465. client3Endpoint = shuffledEndpoints[0]
  466. }
  467. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  468. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  469. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  470. endpointsv3 := &v1.Endpoints{
  471. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  472. Subsets: []v1.EndpointSubset{
  473. {
  474. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  475. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}},
  476. },
  477. },
  478. }
  479. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  480. shuffledEndpoints = loadBalancer.services[service].endpoints
  481. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  482. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  483. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  484. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
  485. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
  486. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
  487. }
  488. func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  489. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  490. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  491. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  492. loadBalancer := NewLoadBalancerRR()
  493. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  494. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  495. if err == nil || len(endpoint) != 0 {
  496. t.Errorf("Didn't fail with non-existent service")
  497. }
  498. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  499. endpointsv1 := &v1.Endpoints{
  500. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  501. Subsets: []v1.EndpointSubset{
  502. {
  503. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  504. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  505. },
  506. },
  507. }
  508. loadBalancer.OnEndpointsAdd(endpointsv1)
  509. shuffledEndpoints := loadBalancer.services[service].endpoints
  510. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  511. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  512. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  513. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  514. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  515. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  516. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  517. // Then update the configuration with one fewer endpoints, make sure
  518. // we start in the beginning again
  519. endpointsv2 := &v1.Endpoints{
  520. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  521. Subsets: []v1.EndpointSubset{
  522. {
  523. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  524. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  525. },
  526. },
  527. }
  528. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  529. shuffledEndpoints = loadBalancer.services[service].endpoints
  530. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  531. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  532. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  533. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  534. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  535. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  536. // Clear endpoints
  537. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
  538. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  539. endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
  540. if err == nil || len(endpoint) != 0 {
  541. t.Errorf("Didn't fail with non-existent service")
  542. }
  543. }
  544. func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  545. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  546. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  547. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  548. loadBalancer := NewLoadBalancerRR()
  549. fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  550. endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
  551. if err == nil || len(endpoint) != 0 {
  552. t.Errorf("Didn't fail with non-existent service")
  553. }
  554. loadBalancer.NewService(fooService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  555. endpoints1 := &v1.Endpoints{
  556. ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
  557. Subsets: []v1.EndpointSubset{
  558. {
  559. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  560. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  561. },
  562. },
  563. }
  564. barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
  565. loadBalancer.NewService(barService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  566. endpoints2 := &v1.Endpoints{
  567. ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
  568. Subsets: []v1.EndpointSubset{
  569. {
  570. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  571. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  572. },
  573. },
  574. }
  575. loadBalancer.OnEndpointsAdd(endpoints1)
  576. loadBalancer.OnEndpointsAdd(endpoints2)
  577. shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
  578. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  579. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  580. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  581. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  582. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  583. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  584. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  585. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  586. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  587. shuffledBarEndpoints := loadBalancer.services[barService].endpoints
  588. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  589. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  590. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  591. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  592. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  593. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  594. // Then update the configuration by removing foo
  595. loadBalancer.OnEndpointsDelete(endpoints1)
  596. endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
  597. if err == nil || len(endpoint) != 0 {
  598. t.Errorf("Didn't fail with non-existent service")
  599. }
  600. // but bar is still there, and we continue RR from where we left off.
  601. shuffledBarEndpoints = loadBalancer.services[barService].endpoints
  602. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  603. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  604. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  605. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  606. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  607. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  608. }
  609. func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
  610. loadBalancer := NewLoadBalancerRR()
  611. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  612. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  613. if err == nil || len(endpoint) != 0 {
  614. t.Errorf("Didn't fail with non-existent service")
  615. }
  616. // Call NewService() before OnEndpointsUpdate()
  617. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  618. endpoints := &v1.Endpoints{
  619. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  620. Subsets: []v1.EndpointSubset{
  621. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  622. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  623. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  624. },
  625. }
  626. loadBalancer.OnEndpointsAdd(endpoints)
  627. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  628. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  629. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  630. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  631. if err != nil {
  632. t.Errorf("Didn't find a service for %s: %v", service, err)
  633. }
  634. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  635. if err != nil {
  636. t.Errorf("Didn't find a service for %s: %v", service, err)
  637. }
  638. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  639. if err != nil {
  640. t.Errorf("Didn't find a service for %s: %v", service, err)
  641. }
  642. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
  643. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
  644. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  645. expectEndpoint(t, loadBalancer, service, ep2, client2)
  646. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  647. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  648. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  649. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  650. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  651. }