roundrobin_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "net"
  16. "testing"
  17. "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/kubernetes/pkg/proxy"
  21. )
  22. func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
  23. loadBalancer := NewLoadBalancerRR()
  24. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
  25. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  26. if err == nil {
  27. t.Errorf("Didn't fail with non-existent service")
  28. }
  29. if len(endpoint) != 0 {
  30. t.Errorf("Got an endpoint")
  31. }
  32. }
  33. func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  34. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
  35. if err != nil {
  36. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  37. }
  38. if endpoint != expected {
  39. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  40. }
  41. }
  42. func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  43. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
  44. if err != nil {
  45. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  46. }
  47. if endpoint != expected {
  48. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  49. }
  50. }
  51. func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
  52. loadBalancer := NewLoadBalancerRR()
  53. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  54. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  55. if err == nil || len(endpoint) != 0 {
  56. t.Errorf("Didn't fail with non-existent service")
  57. }
  58. endpoints := &v1.Endpoints{
  59. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  60. Subsets: []v1.EndpointSubset{{
  61. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  62. Ports: []v1.EndpointPort{{Name: "p", Port: 40}},
  63. }},
  64. }
  65. loadBalancer.OnEndpointsAdd(endpoints)
  66. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  67. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  68. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  69. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  70. }
  71. func stringsInSlice(haystack []string, needles ...string) bool {
  72. for _, needle := range needles {
  73. found := false
  74. for i := range haystack {
  75. if haystack[i] == needle {
  76. found = true
  77. break
  78. }
  79. }
  80. if found == false {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
  87. loadBalancer := NewLoadBalancerRR()
  88. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  89. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  90. if err == nil || len(endpoint) != 0 {
  91. t.Errorf("Didn't fail with non-existent service")
  92. }
  93. endpoints := &v1.Endpoints{
  94. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  95. Subsets: []v1.EndpointSubset{{
  96. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  97. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
  98. }},
  99. }
  100. loadBalancer.OnEndpointsAdd(endpoints)
  101. shuffledEndpoints := loadBalancer.services[service].endpoints
  102. if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
  103. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  104. }
  105. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  106. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
  107. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
  108. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  109. }
  110. func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
  111. loadBalancer := NewLoadBalancerRR()
  112. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  113. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  114. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  115. if err == nil || len(endpoint) != 0 {
  116. t.Errorf("Didn't fail with non-existent service")
  117. }
  118. endpoints := &v1.Endpoints{
  119. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  120. Subsets: []v1.EndpointSubset{
  121. {
  122. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
  123. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}},
  124. },
  125. {
  126. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  127. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}},
  128. },
  129. },
  130. }
  131. loadBalancer.OnEndpointsAdd(endpoints)
  132. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  133. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
  134. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  135. }
  136. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  137. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  138. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  139. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  140. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  141. if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
  142. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  143. }
  144. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  145. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  146. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  147. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  148. }
  149. func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  150. loadBalancer := NewLoadBalancerRR()
  151. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  152. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  153. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  154. if err == nil || len(endpoint) != 0 {
  155. t.Errorf("Didn't fail with non-existent service")
  156. }
  157. endpointsv1 := &v1.Endpoints{
  158. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  159. Subsets: []v1.EndpointSubset{
  160. {
  161. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}},
  162. Ports: []v1.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}},
  163. },
  164. {
  165. Addresses: []v1.EndpointAddress{{IP: "endpoint2"}},
  166. Ports: []v1.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}},
  167. },
  168. {
  169. Addresses: []v1.EndpointAddress{{IP: "endpoint3"}},
  170. Ports: []v1.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}},
  171. },
  172. },
  173. }
  174. loadBalancer.OnEndpointsAdd(endpointsv1)
  175. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  176. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
  177. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  178. }
  179. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  180. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  181. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  182. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  183. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  184. if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
  185. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  186. }
  187. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  188. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  189. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  190. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  191. // Then update the configuration with one fewer endpoints, make sure
  192. // we start in the beginning again
  193. endpointsv2 := &v1.Endpoints{
  194. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  195. Subsets: []v1.EndpointSubset{
  196. {
  197. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}},
  198. Ports: []v1.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}},
  199. },
  200. {
  201. Addresses: []v1.EndpointAddress{{IP: "endpoint5"}},
  202. Ports: []v1.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}},
  203. },
  204. },
  205. }
  206. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  207. shuffledEndpoints = loadBalancer.services[serviceP].endpoints
  208. if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
  209. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  210. }
  211. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  212. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  213. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  214. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  215. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  216. if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
  217. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  218. }
  219. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  220. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  221. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  222. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  223. // Clear endpoints
  224. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
  225. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  226. endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
  227. if err == nil || len(endpoint) != 0 {
  228. t.Errorf("Didn't fail with non-existent service")
  229. }
  230. }
  231. func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  232. loadBalancer := NewLoadBalancerRR()
  233. fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  234. barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
  235. endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
  236. if err == nil || len(endpoint) != 0 {
  237. t.Errorf("Didn't fail with non-existent service")
  238. }
  239. endpoints1 := &v1.Endpoints{
  240. ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
  241. Subsets: []v1.EndpointSubset{
  242. {
  243. Addresses: []v1.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
  244. Ports: []v1.EndpointPort{{Name: "p", Port: 123}},
  245. },
  246. },
  247. }
  248. endpoints2 := &v1.Endpoints{
  249. ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
  250. Subsets: []v1.EndpointSubset{
  251. {
  252. Addresses: []v1.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
  253. Ports: []v1.EndpointPort{{Name: "p", Port: 456}},
  254. },
  255. },
  256. }
  257. loadBalancer.OnEndpointsAdd(endpoints1)
  258. loadBalancer.OnEndpointsAdd(endpoints2)
  259. shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
  260. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  261. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  262. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil)
  263. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  264. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  265. shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints
  266. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  267. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  268. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  269. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  270. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  271. // Then update the configuration by removing foo
  272. loadBalancer.OnEndpointsDelete(endpoints1)
  273. endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
  274. if err == nil || len(endpoint) != 0 {
  275. t.Errorf("Didn't fail with non-existent service")
  276. }
  277. // but bar is still there, and we continue RR from where we left off.
  278. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  279. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  280. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  281. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  282. }
  283. func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
  284. loadBalancer := NewLoadBalancerRR()
  285. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  286. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  287. if err == nil || len(endpoint) != 0 {
  288. t.Errorf("Didn't fail with non-existent service")
  289. }
  290. // Call NewService() before OnEndpointsUpdate()
  291. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  292. endpoints := &v1.Endpoints{
  293. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  294. Subsets: []v1.EndpointSubset{
  295. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  296. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  297. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  298. },
  299. }
  300. loadBalancer.OnEndpointsAdd(endpoints)
  301. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  302. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  303. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  304. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  305. if err != nil {
  306. t.Errorf("Didn't find a service for %s: %v", service, err)
  307. }
  308. expectEndpoint(t, loadBalancer, service, ep1, client1)
  309. expectEndpoint(t, loadBalancer, service, ep1, client1)
  310. expectEndpoint(t, loadBalancer, service, ep1, client1)
  311. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  312. if err != nil {
  313. t.Errorf("Didn't find a service for %s: %v", service, err)
  314. }
  315. expectEndpoint(t, loadBalancer, service, ep2, client2)
  316. expectEndpoint(t, loadBalancer, service, ep2, client2)
  317. expectEndpoint(t, loadBalancer, service, ep2, client2)
  318. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  319. if err != nil {
  320. t.Errorf("Didn't find a service for %s: %v", service, err)
  321. }
  322. expectEndpoint(t, loadBalancer, service, ep3, client3)
  323. expectEndpoint(t, loadBalancer, service, ep3, client3)
  324. expectEndpoint(t, loadBalancer, service, ep3, client3)
  325. expectEndpoint(t, loadBalancer, service, ep1, client1)
  326. expectEndpoint(t, loadBalancer, service, ep2, client2)
  327. expectEndpoint(t, loadBalancer, service, ep3, client3)
  328. expectEndpoint(t, loadBalancer, service, ep1, client1)
  329. expectEndpoint(t, loadBalancer, service, ep2, client2)
  330. expectEndpoint(t, loadBalancer, service, ep3, client3)
  331. }
  332. func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
  333. loadBalancer := NewLoadBalancerRR()
  334. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  335. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  336. if err == nil || len(endpoint) != 0 {
  337. t.Errorf("Didn't fail with non-existent service")
  338. }
  339. // Call OnEndpointsUpdate() before NewService()
  340. endpoints := &v1.Endpoints{
  341. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  342. Subsets: []v1.EndpointSubset{
  343. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  344. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  345. },
  346. }
  347. loadBalancer.OnEndpointsAdd(endpoints)
  348. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  349. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  350. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  351. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  352. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  353. if err != nil {
  354. t.Errorf("Didn't find a service for %s: %v", service, err)
  355. }
  356. expectEndpoint(t, loadBalancer, service, ep1, client1)
  357. expectEndpoint(t, loadBalancer, service, ep1, client1)
  358. expectEndpoint(t, loadBalancer, service, ep1, client1)
  359. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  360. if err != nil {
  361. t.Errorf("Didn't find a service for %s: %v", service, err)
  362. }
  363. expectEndpoint(t, loadBalancer, service, ep2, client2)
  364. expectEndpoint(t, loadBalancer, service, ep2, client2)
  365. expectEndpoint(t, loadBalancer, service, ep2, client2)
  366. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  367. if err != nil {
  368. t.Errorf("Didn't find a service for %s: %v", service, err)
  369. }
  370. expectEndpoint(t, loadBalancer, service, ep3, client3)
  371. expectEndpoint(t, loadBalancer, service, ep3, client3)
  372. expectEndpoint(t, loadBalancer, service, ep3, client3)
  373. expectEndpoint(t, loadBalancer, service, ep1, client1)
  374. expectEndpoint(t, loadBalancer, service, ep2, client2)
  375. expectEndpoint(t, loadBalancer, service, ep3, client3)
  376. expectEndpoint(t, loadBalancer, service, ep1, client1)
  377. expectEndpoint(t, loadBalancer, service, ep2, client2)
  378. expectEndpoint(t, loadBalancer, service, ep3, client3)
  379. }
  380. func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
  381. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  382. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  383. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  384. client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0}
  385. client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
  386. client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
  387. loadBalancer := NewLoadBalancerRR()
  388. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  389. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  390. if err == nil || len(endpoint) != 0 {
  391. t.Errorf("Didn't fail with non-existent service")
  392. }
  393. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  394. endpointsv1 := &v1.Endpoints{
  395. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  396. Subsets: []v1.EndpointSubset{
  397. {
  398. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  399. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  400. },
  401. },
  402. }
  403. loadBalancer.OnEndpointsAdd(endpointsv1)
  404. shuffledEndpoints := loadBalancer.services[service].endpoints
  405. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  406. client1Endpoint := shuffledEndpoints[0]
  407. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  408. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  409. client2Endpoint := shuffledEndpoints[1]
  410. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  411. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  412. client3Endpoint := shuffledEndpoints[2]
  413. endpointsv2 := &v1.Endpoints{
  414. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  415. Subsets: []v1.EndpointSubset{
  416. {
  417. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  418. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}},
  419. },
  420. },
  421. }
  422. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  423. shuffledEndpoints = loadBalancer.services[service].endpoints
  424. if client1Endpoint == "endpoint:3" {
  425. client1Endpoint = shuffledEndpoints[0]
  426. } else if client2Endpoint == "endpoint:3" {
  427. client2Endpoint = shuffledEndpoints[0]
  428. } else if client3Endpoint == "endpoint:3" {
  429. client3Endpoint = shuffledEndpoints[0]
  430. }
  431. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  432. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  433. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  434. endpointsv3 := &v1.Endpoints{
  435. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  436. Subsets: []v1.EndpointSubset{
  437. {
  438. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  439. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}},
  440. },
  441. },
  442. }
  443. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  444. shuffledEndpoints = loadBalancer.services[service].endpoints
  445. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  446. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  447. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  448. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
  449. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
  450. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
  451. }
  452. func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  453. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  454. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  455. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  456. loadBalancer := NewLoadBalancerRR()
  457. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  458. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  459. if err == nil || len(endpoint) != 0 {
  460. t.Errorf("Didn't fail with non-existent service")
  461. }
  462. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  463. endpointsv1 := &v1.Endpoints{
  464. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  465. Subsets: []v1.EndpointSubset{
  466. {
  467. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  468. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  469. },
  470. },
  471. }
  472. loadBalancer.OnEndpointsAdd(endpointsv1)
  473. shuffledEndpoints := loadBalancer.services[service].endpoints
  474. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  475. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  476. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  477. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  478. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  479. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  480. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  481. // Then update the configuration with one fewer endpoints, make sure
  482. // we start in the beginning again
  483. endpointsv2 := &v1.Endpoints{
  484. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  485. Subsets: []v1.EndpointSubset{
  486. {
  487. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  488. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  489. },
  490. },
  491. }
  492. loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
  493. shuffledEndpoints = loadBalancer.services[service].endpoints
  494. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  495. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  496. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  497. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  498. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  499. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  500. // Clear endpoints
  501. endpointsv3 := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
  502. loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
  503. endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
  504. if err == nil || len(endpoint) != 0 {
  505. t.Errorf("Didn't fail with non-existent service")
  506. }
  507. }
  508. func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  509. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  510. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  511. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  512. loadBalancer := NewLoadBalancerRR()
  513. fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  514. endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
  515. if err == nil || len(endpoint) != 0 {
  516. t.Errorf("Didn't fail with non-existent service")
  517. }
  518. loadBalancer.NewService(fooService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  519. endpoints1 := &v1.Endpoints{
  520. ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
  521. Subsets: []v1.EndpointSubset{
  522. {
  523. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  524. Ports: []v1.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  525. },
  526. },
  527. }
  528. barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
  529. loadBalancer.NewService(barService, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  530. endpoints2 := &v1.Endpoints{
  531. ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
  532. Subsets: []v1.EndpointSubset{
  533. {
  534. Addresses: []v1.EndpointAddress{{IP: "endpoint"}},
  535. Ports: []v1.EndpointPort{{Port: 4}, {Port: 5}},
  536. },
  537. },
  538. }
  539. loadBalancer.OnEndpointsAdd(endpoints1)
  540. loadBalancer.OnEndpointsAdd(endpoints2)
  541. shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
  542. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  543. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  544. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  545. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  546. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  547. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  548. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  549. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  550. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  551. shuffledBarEndpoints := loadBalancer.services[barService].endpoints
  552. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  553. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  554. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  555. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  556. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  557. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  558. // Then update the configuration by removing foo
  559. loadBalancer.OnEndpointsDelete(endpoints1)
  560. endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
  561. if err == nil || len(endpoint) != 0 {
  562. t.Errorf("Didn't fail with non-existent service")
  563. }
  564. // but bar is still there, and we continue RR from where we left off.
  565. shuffledBarEndpoints = loadBalancer.services[barService].endpoints
  566. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  567. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  568. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  569. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  570. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  571. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  572. }
  573. func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
  574. loadBalancer := NewLoadBalancerRR()
  575. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  576. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  577. if err == nil || len(endpoint) != 0 {
  578. t.Errorf("Didn't fail with non-existent service")
  579. }
  580. // Call NewService() before OnEndpointsUpdate()
  581. loadBalancer.NewService(service, v1.ServiceAffinityClientIP, int(v1.DefaultClientIPServiceAffinitySeconds))
  582. endpoints := &v1.Endpoints{
  583. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  584. Subsets: []v1.EndpointSubset{
  585. {Addresses: []v1.EndpointAddress{{IP: "endpoint1"}}, Ports: []v1.EndpointPort{{Port: 1}}},
  586. {Addresses: []v1.EndpointAddress{{IP: "endpoint2"}}, Ports: []v1.EndpointPort{{Port: 2}}},
  587. {Addresses: []v1.EndpointAddress{{IP: "endpoint3"}}, Ports: []v1.EndpointPort{{Port: 3}}},
  588. },
  589. }
  590. loadBalancer.OnEndpointsAdd(endpoints)
  591. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  592. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  593. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  594. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  595. if err != nil {
  596. t.Errorf("Didn't find a service for %s: %v", service, err)
  597. }
  598. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  599. if err != nil {
  600. t.Errorf("Didn't find a service for %s: %v", service, err)
  601. }
  602. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  603. if err != nil {
  604. t.Errorf("Didn't find a service for %s: %v", service, err)
  605. }
  606. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
  607. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
  608. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  609. expectEndpoint(t, loadBalancer, service, ep2, client2)
  610. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  611. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  612. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  613. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  614. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  615. }