service_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package proxy
  14. import (
  15. "net"
  16. "testing"
  17. "github.com/davecgh/go-spew/spew"
  18. "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/intstr"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. )
  24. const testHostname = "test-hostname"
  25. func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo {
  26. info := &BaseServiceInfo{
  27. ClusterIP: net.ParseIP(clusterIP),
  28. Port: port,
  29. Protocol: v1.Protocol(protocol),
  30. }
  31. if healthcheckNodePort != 0 {
  32. info.HealthCheckNodePort = healthcheckNodePort
  33. }
  34. for _, svcInfoFunc := range svcInfoFuncs {
  35. svcInfoFunc(info)
  36. }
  37. return info
  38. }
  39. func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
  40. svc := &v1.Service{
  41. ObjectMeta: metav1.ObjectMeta{
  42. Name: name,
  43. Namespace: namespace,
  44. Annotations: map[string]string{},
  45. },
  46. Spec: v1.ServiceSpec{},
  47. Status: v1.ServiceStatus{},
  48. }
  49. svcFunc(svc)
  50. return svc
  51. }
  52. func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
  53. svcPort := v1.ServicePort{
  54. Name: name,
  55. Protocol: protocol,
  56. Port: port,
  57. NodePort: nodeport,
  58. TargetPort: intstr.FromInt(targetPort),
  59. }
  60. return append(array, svcPort)
  61. }
  62. func makeNSN(namespace, name string) types.NamespacedName {
  63. return types.NamespacedName{Namespace: namespace, Name: name}
  64. }
  65. func makeServicePortName(ns, name, port string) ServicePortName {
  66. return ServicePortName{
  67. NamespacedName: makeNSN(ns, name),
  68. Port: port,
  69. }
  70. }
  71. func TestServiceToServiceMap(t *testing.T) {
  72. svcTracker := NewServiceChangeTracker(nil, nil, nil)
  73. trueVal := true
  74. falseVal := false
  75. testClusterIPv4 := "10.0.0.1"
  76. testExternalIPv4 := "8.8.8.8"
  77. testSourceRangeIPv4 := "0.0.0.0/1"
  78. testClusterIPv6 := "2001:db8:85a3:0:0:8a2e:370:7334"
  79. testExternalIPv6 := "2001:db8:85a3:0:0:8a2e:370:7335"
  80. testSourceRangeIPv6 := "2001:db8::/32"
  81. testCases := []struct {
  82. desc string
  83. service *v1.Service
  84. expected map[ServicePortName]*BaseServiceInfo
  85. isIPv6Mode *bool
  86. }{
  87. {
  88. desc: "nothing",
  89. service: nil,
  90. expected: map[ServicePortName]*BaseServiceInfo{},
  91. },
  92. {
  93. desc: "headless service",
  94. service: makeTestService("ns2", "headless", func(svc *v1.Service) {
  95. svc.Spec.Type = v1.ServiceTypeClusterIP
  96. svc.Spec.ClusterIP = v1.ClusterIPNone
  97. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  98. }),
  99. expected: map[ServicePortName]*BaseServiceInfo{},
  100. },
  101. {
  102. desc: "headless sctp service",
  103. service: makeTestService("ns2", "headless", func(svc *v1.Service) {
  104. svc.Spec.Type = v1.ServiceTypeClusterIP
  105. svc.Spec.ClusterIP = v1.ClusterIPNone
  106. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 7777, 0, 0)
  107. }),
  108. expected: map[ServicePortName]*BaseServiceInfo{},
  109. },
  110. {
  111. desc: "headless service without port",
  112. service: makeTestService("ns2", "headless-without-port", func(svc *v1.Service) {
  113. svc.Spec.Type = v1.ServiceTypeClusterIP
  114. svc.Spec.ClusterIP = v1.ClusterIPNone
  115. }),
  116. expected: map[ServicePortName]*BaseServiceInfo{},
  117. },
  118. {
  119. desc: "cluster ip service",
  120. service: makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
  121. svc.Spec.Type = v1.ServiceTypeClusterIP
  122. svc.Spec.ClusterIP = "172.16.55.4"
  123. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
  124. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0)
  125. }),
  126. expected: map[ServicePortName]*BaseServiceInfo{
  127. makeServicePortName("ns2", "cluster-ip", "p1"): makeTestServiceInfo("172.16.55.4", 1234, "UDP", 0),
  128. makeServicePortName("ns2", "cluster-ip", "p2"): makeTestServiceInfo("172.16.55.4", 1235, "UDP", 0),
  129. },
  130. },
  131. {
  132. desc: "nodeport service",
  133. service: makeTestService("ns2", "node-port", func(svc *v1.Service) {
  134. svc.Spec.Type = v1.ServiceTypeNodePort
  135. svc.Spec.ClusterIP = "172.16.55.10"
  136. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0)
  137. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0)
  138. }),
  139. expected: map[ServicePortName]*BaseServiceInfo{
  140. makeServicePortName("ns2", "node-port", "port1"): makeTestServiceInfo("172.16.55.10", 345, "UDP", 0),
  141. makeServicePortName("ns2", "node-port", "port2"): makeTestServiceInfo("172.16.55.10", 344, "TCP", 0),
  142. },
  143. },
  144. {
  145. desc: "load balancer service",
  146. service: makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
  147. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  148. svc.Spec.ClusterIP = "172.16.55.11"
  149. svc.Spec.LoadBalancerIP = "5.6.7.8"
  150. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 8675, 30061, 7000)
  151. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
  152. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  153. Ingress: []v1.LoadBalancerIngress{
  154. {IP: "10.1.2.4"},
  155. },
  156. }
  157. }),
  158. expected: map[ServicePortName]*BaseServiceInfo{
  159. makeServicePortName("ns1", "load-balancer", "port3"): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0),
  160. makeServicePortName("ns1", "load-balancer", "port4"): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0),
  161. },
  162. },
  163. {
  164. desc: "load balancer service with only local traffic policy",
  165. service: makeTestService("ns1", "only-local-load-balancer", func(svc *v1.Service) {
  166. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  167. svc.Spec.ClusterIP = "172.16.55.12"
  168. svc.Spec.LoadBalancerIP = "5.6.7.8"
  169. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portx", "UDP", 8677, 30063, 7002)
  170. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "porty", "UDP", 8678, 30064, 7003)
  171. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  172. Ingress: []v1.LoadBalancerIngress{
  173. {IP: "10.1.2.3"},
  174. },
  175. }
  176. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  177. svc.Spec.HealthCheckNodePort = 345
  178. }),
  179. expected: map[ServicePortName]*BaseServiceInfo{
  180. makeServicePortName("ns1", "only-local-load-balancer", "portx"): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345),
  181. makeServicePortName("ns1", "only-local-load-balancer", "porty"): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345),
  182. },
  183. },
  184. {
  185. desc: "external name service",
  186. service: makeTestService("ns2", "external-name", func(svc *v1.Service) {
  187. svc.Spec.Type = v1.ServiceTypeExternalName
  188. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  189. svc.Spec.ExternalName = "foo2.bar.com"
  190. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portz", "UDP", 1235, 5321, 0)
  191. }),
  192. expected: map[ServicePortName]*BaseServiceInfo{},
  193. },
  194. {
  195. desc: "service with ipv6 clusterIP under ipv4 mode, service should be filtered",
  196. service: &v1.Service{
  197. ObjectMeta: metav1.ObjectMeta{
  198. Name: "invalidIPv6InIPV4Mode",
  199. Namespace: "test",
  200. },
  201. Spec: v1.ServiceSpec{
  202. ClusterIP: testClusterIPv6,
  203. Ports: []v1.ServicePort{
  204. {
  205. Name: "testPort",
  206. Port: int32(12345),
  207. Protocol: v1.ProtocolTCP,
  208. },
  209. },
  210. },
  211. },
  212. isIPv6Mode: &falseVal,
  213. },
  214. {
  215. desc: "service with ipv4 clusterIP under ipv6 mode, service should be filtered",
  216. service: &v1.Service{
  217. ObjectMeta: metav1.ObjectMeta{
  218. Name: "invalidIPv4InIPV6Mode",
  219. Namespace: "test",
  220. },
  221. Spec: v1.ServiceSpec{
  222. ClusterIP: testClusterIPv4,
  223. Ports: []v1.ServicePort{
  224. {
  225. Name: "testPort",
  226. Port: int32(12345),
  227. Protocol: v1.ProtocolTCP,
  228. },
  229. },
  230. },
  231. },
  232. isIPv6Mode: &trueVal,
  233. },
  234. {
  235. desc: "service with ipv4 configurations under ipv4 mode",
  236. service: &v1.Service{
  237. ObjectMeta: metav1.ObjectMeta{
  238. Name: "validIPv4",
  239. Namespace: "test",
  240. },
  241. Spec: v1.ServiceSpec{
  242. ClusterIP: testClusterIPv4,
  243. ExternalIPs: []string{testExternalIPv4},
  244. LoadBalancerSourceRanges: []string{testSourceRangeIPv4},
  245. Ports: []v1.ServicePort{
  246. {
  247. Name: "testPort",
  248. Port: int32(12345),
  249. Protocol: v1.ProtocolTCP,
  250. },
  251. },
  252. },
  253. },
  254. expected: map[ServicePortName]*BaseServiceInfo{
  255. makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
  256. info.ExternalIPs = []string{testExternalIPv4}
  257. info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4}
  258. }),
  259. },
  260. isIPv6Mode: &falseVal,
  261. },
  262. {
  263. desc: "service with ipv6 configurations under ipv6 mode",
  264. service: &v1.Service{
  265. ObjectMeta: metav1.ObjectMeta{
  266. Name: "validIPv6",
  267. Namespace: "test",
  268. },
  269. Spec: v1.ServiceSpec{
  270. ClusterIP: testClusterIPv6,
  271. ExternalIPs: []string{testExternalIPv6},
  272. LoadBalancerSourceRanges: []string{testSourceRangeIPv6},
  273. Ports: []v1.ServicePort{
  274. {
  275. Name: "testPort",
  276. Port: int32(12345),
  277. Protocol: v1.ProtocolTCP,
  278. },
  279. },
  280. },
  281. },
  282. expected: map[ServicePortName]*BaseServiceInfo{
  283. makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
  284. info.ExternalIPs = []string{testExternalIPv6}
  285. info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6}
  286. }),
  287. },
  288. isIPv6Mode: &trueVal,
  289. },
  290. {
  291. desc: "service with both ipv4 and ipv6 configurations under ipv4 mode, ipv6 fields should be filtered",
  292. service: &v1.Service{
  293. ObjectMeta: metav1.ObjectMeta{
  294. Name: "filterIPv6InIPV4Mode",
  295. Namespace: "test",
  296. },
  297. Spec: v1.ServiceSpec{
  298. ClusterIP: testClusterIPv4,
  299. ExternalIPs: []string{testExternalIPv4, testExternalIPv6},
  300. LoadBalancerSourceRanges: []string{testSourceRangeIPv4, testSourceRangeIPv6},
  301. Ports: []v1.ServicePort{
  302. {
  303. Name: "testPort",
  304. Port: int32(12345),
  305. Protocol: v1.ProtocolTCP,
  306. },
  307. },
  308. },
  309. },
  310. expected: map[ServicePortName]*BaseServiceInfo{
  311. makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
  312. info.ExternalIPs = []string{testExternalIPv4}
  313. info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4}
  314. }),
  315. },
  316. isIPv6Mode: &falseVal,
  317. },
  318. {
  319. desc: "service with both ipv4 and ipv6 configurations under ipv6 mode, ipv4 fields should be filtered",
  320. service: &v1.Service{
  321. ObjectMeta: metav1.ObjectMeta{
  322. Name: "filterIPv4InIPV6Mode",
  323. Namespace: "test",
  324. },
  325. Spec: v1.ServiceSpec{
  326. ClusterIP: testClusterIPv6,
  327. ExternalIPs: []string{testExternalIPv4, testExternalIPv6},
  328. LoadBalancerSourceRanges: []string{testSourceRangeIPv4, testSourceRangeIPv6},
  329. Ports: []v1.ServicePort{
  330. {
  331. Name: "testPort",
  332. Port: int32(12345),
  333. Protocol: v1.ProtocolTCP,
  334. },
  335. },
  336. },
  337. },
  338. expected: map[ServicePortName]*BaseServiceInfo{
  339. makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
  340. info.ExternalIPs = []string{testExternalIPv6}
  341. info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6}
  342. }),
  343. },
  344. isIPv6Mode: &trueVal,
  345. },
  346. }
  347. for _, tc := range testCases {
  348. svcTracker.isIPv6Mode = tc.isIPv6Mode
  349. // outputs
  350. newServices := svcTracker.serviceToServiceMap(tc.service)
  351. if len(newServices) != len(tc.expected) {
  352. t.Errorf("[%s] expected %d new, got %d: %v", tc.desc, len(tc.expected), len(newServices), spew.Sdump(newServices))
  353. }
  354. for svcKey, expectedInfo := range tc.expected {
  355. svcInfo := newServices[svcKey].(*BaseServiceInfo)
  356. if !svcInfo.ClusterIP.Equal(expectedInfo.ClusterIP) ||
  357. svcInfo.Port != expectedInfo.Port ||
  358. svcInfo.Protocol != expectedInfo.Protocol ||
  359. svcInfo.HealthCheckNodePort != expectedInfo.HealthCheckNodePort ||
  360. !sets.NewString(svcInfo.ExternalIPs...).Equal(sets.NewString(expectedInfo.ExternalIPs...)) ||
  361. !sets.NewString(svcInfo.LoadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.LoadBalancerSourceRanges...)) {
  362. t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
  363. }
  364. }
  365. }
  366. }
  367. type FakeProxier struct {
  368. endpointsChanges *EndpointChangeTracker
  369. serviceChanges *ServiceChangeTracker
  370. serviceMap ServiceMap
  371. endpointsMap EndpointsMap
  372. hostname string
  373. }
  374. func newFakeProxier() *FakeProxier {
  375. return &FakeProxier{
  376. serviceMap: make(ServiceMap),
  377. serviceChanges: NewServiceChangeTracker(nil, nil, nil),
  378. endpointsMap: make(EndpointsMap),
  379. endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil),
  380. }
  381. }
  382. func makeServiceMap(fake *FakeProxier, allServices ...*v1.Service) {
  383. for i := range allServices {
  384. fake.addService(allServices[i])
  385. }
  386. }
  387. func (fake *FakeProxier) addService(service *v1.Service) {
  388. fake.serviceChanges.Update(nil, service)
  389. }
  390. func (fake *FakeProxier) updateService(oldService *v1.Service, service *v1.Service) {
  391. fake.serviceChanges.Update(oldService, service)
  392. }
  393. func (fake *FakeProxier) deleteService(service *v1.Service) {
  394. fake.serviceChanges.Update(service, nil)
  395. }
  396. func TestUpdateServiceMapHeadless(t *testing.T) {
  397. fp := newFakeProxier()
  398. makeServiceMap(fp,
  399. makeTestService("ns2", "headless", func(svc *v1.Service) {
  400. svc.Spec.Type = v1.ServiceTypeClusterIP
  401. svc.Spec.ClusterIP = v1.ClusterIPNone
  402. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  403. }),
  404. makeTestService("ns2", "headless-without-port", func(svc *v1.Service) {
  405. svc.Spec.Type = v1.ServiceTypeClusterIP
  406. svc.Spec.ClusterIP = v1.ClusterIPNone
  407. }),
  408. )
  409. // Headless service should be ignored
  410. result := UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  411. if len(fp.serviceMap) != 0 {
  412. t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
  413. }
  414. // No proxied services, so no healthchecks
  415. if len(result.HCServiceNodePorts) != 0 {
  416. t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
  417. }
  418. if len(result.UDPStaleClusterIP) != 0 {
  419. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  420. }
  421. }
  422. func TestUpdateServiceTypeExternalName(t *testing.T) {
  423. fp := newFakeProxier()
  424. makeServiceMap(fp,
  425. makeTestService("ns2", "external-name", func(svc *v1.Service) {
  426. svc.Spec.Type = v1.ServiceTypeExternalName
  427. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  428. svc.Spec.ExternalName = "foo2.bar.com"
  429. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
  430. }),
  431. )
  432. result := UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  433. if len(fp.serviceMap) != 0 {
  434. t.Errorf("expected service map length 0, got %v", fp.serviceMap)
  435. }
  436. // No proxied services, so no healthchecks
  437. if len(result.HCServiceNodePorts) != 0 {
  438. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  439. }
  440. if len(result.UDPStaleClusterIP) != 0 {
  441. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
  442. }
  443. }
  444. func TestBuildServiceMapAddRemove(t *testing.T) {
  445. fp := newFakeProxier()
  446. services := []*v1.Service{
  447. makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
  448. svc.Spec.Type = v1.ServiceTypeClusterIP
  449. svc.Spec.ClusterIP = "172.16.55.4"
  450. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 1234, 4321, 0)
  451. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "UDP", 1235, 5321, 0)
  452. }),
  453. makeTestService("ns2", "node-port", func(svc *v1.Service) {
  454. svc.Spec.Type = v1.ServiceTypeNodePort
  455. svc.Spec.ClusterIP = "172.16.55.10"
  456. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0)
  457. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0)
  458. }),
  459. makeTestService("ns1", "load-balancer", func(svc *v1.Service) {
  460. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  461. svc.Spec.ClusterIP = "172.16.55.11"
  462. svc.Spec.LoadBalancerIP = "5.6.7.8"
  463. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
  464. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
  465. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  466. Ingress: []v1.LoadBalancerIngress{
  467. {IP: "10.1.2.4"},
  468. },
  469. }
  470. }),
  471. makeTestService("ns1", "only-local-load-balancer", func(svc *v1.Service) {
  472. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  473. svc.Spec.ClusterIP = "172.16.55.12"
  474. svc.Spec.LoadBalancerIP = "5.6.7.8"
  475. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
  476. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
  477. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  478. Ingress: []v1.LoadBalancerIngress{
  479. {IP: "10.1.2.3"},
  480. },
  481. }
  482. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  483. svc.Spec.HealthCheckNodePort = 345
  484. }),
  485. }
  486. for i := range services {
  487. fp.addService(services[i])
  488. }
  489. result := UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  490. if len(fp.serviceMap) != 8 {
  491. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  492. }
  493. // The only-local-loadbalancer ones get added
  494. if len(result.HCServiceNodePorts) != 1 {
  495. t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
  496. } else {
  497. nsn := makeNSN("ns1", "only-local-load-balancer")
  498. if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
  499. t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
  500. }
  501. }
  502. if len(result.UDPStaleClusterIP) != 0 {
  503. // Services only added, so nothing stale yet
  504. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  505. }
  506. // Remove some stuff
  507. // oneService is a modification of services[0] with removed first port.
  508. oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
  509. svc.Spec.Type = v1.ServiceTypeClusterIP
  510. svc.Spec.ClusterIP = "172.16.55.4"
  511. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0)
  512. })
  513. fp.updateService(services[0], oneService)
  514. fp.deleteService(services[1])
  515. fp.deleteService(services[2])
  516. fp.deleteService(services[3])
  517. result = UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  518. if len(fp.serviceMap) != 1 {
  519. t.Errorf("expected service map length 1, got %v", fp.serviceMap)
  520. }
  521. if len(result.HCServiceNodePorts) != 0 {
  522. t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
  523. }
  524. // All services but one were deleted. While you'd expect only the ClusterIPs
  525. // from the three deleted services here, we still have the ClusterIP for
  526. // the not-deleted service, because one of it's ServicePorts was deleted.
  527. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
  528. if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
  529. t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
  530. }
  531. for _, ip := range expectedStaleUDPServices {
  532. if !result.UDPStaleClusterIP.Has(ip) {
  533. t.Errorf("expected stale UDP service service %s", ip)
  534. }
  535. }
  536. }
  537. func TestBuildServiceMapServiceUpdate(t *testing.T) {
  538. fp := newFakeProxier()
  539. servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
  540. svc.Spec.Type = v1.ServiceTypeClusterIP
  541. svc.Spec.ClusterIP = "172.16.55.4"
  542. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
  543. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 0)
  544. })
  545. servicev2 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
  546. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  547. svc.Spec.ClusterIP = "172.16.55.4"
  548. svc.Spec.LoadBalancerIP = "5.6.7.8"
  549. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 7002)
  550. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 7003)
  551. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  552. Ingress: []v1.LoadBalancerIngress{
  553. {IP: "10.1.2.3"},
  554. },
  555. }
  556. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  557. svc.Spec.HealthCheckNodePort = 345
  558. })
  559. fp.addService(servicev1)
  560. result := UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  561. if len(fp.serviceMap) != 2 {
  562. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  563. }
  564. if len(result.HCServiceNodePorts) != 0 {
  565. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  566. }
  567. if len(result.UDPStaleClusterIP) != 0 {
  568. // Services only added, so nothing stale yet
  569. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  570. }
  571. // Change service to load-balancer
  572. fp.updateService(servicev1, servicev2)
  573. result = UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  574. if len(fp.serviceMap) != 2 {
  575. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  576. }
  577. if len(result.HCServiceNodePorts) != 1 {
  578. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  579. }
  580. if len(result.UDPStaleClusterIP) != 0 {
  581. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  582. }
  583. // No change; make sure the service map stays the same and there are
  584. // no health-check changes
  585. fp.updateService(servicev2, servicev2)
  586. result = UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  587. if len(fp.serviceMap) != 2 {
  588. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  589. }
  590. if len(result.HCServiceNodePorts) != 1 {
  591. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  592. }
  593. if len(result.UDPStaleClusterIP) != 0 {
  594. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  595. }
  596. // And back to ClusterIP
  597. fp.updateService(servicev2, servicev1)
  598. result = UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  599. if len(fp.serviceMap) != 2 {
  600. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  601. }
  602. if len(result.HCServiceNodePorts) != 0 {
  603. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  604. }
  605. if len(result.UDPStaleClusterIP) != 0 {
  606. // Services only added, so nothing stale yet
  607. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  608. }
  609. }