proxier_test.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "net/url"
  21. "os"
  22. "reflect"
  23. "strconv"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "k8s.io/api/core/v1"
  28. discovery "k8s.io/api/discovery/v1beta1"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. "k8s.io/apimachinery/pkg/types"
  31. "k8s.io/apimachinery/pkg/util/runtime"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. "k8s.io/kubernetes/pkg/proxy"
  34. ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
  35. "k8s.io/utils/exec"
  36. fakeexec "k8s.io/utils/exec/testing"
  37. )
  38. const (
  39. udpIdleTimeoutForTest = 250 * time.Millisecond
  40. )
  41. func joinHostPort(host string, port int) string {
  42. return net.JoinHostPort(host, fmt.Sprintf("%d", port))
  43. }
  44. func waitForClosedPortTCP(p *Proxier, proxyPort int) error {
  45. for i := 0; i < 50; i++ {
  46. conn, err := net.Dial("tcp", joinHostPort("", proxyPort))
  47. if err != nil {
  48. return nil
  49. }
  50. conn.Close()
  51. time.Sleep(1 * time.Millisecond)
  52. }
  53. return fmt.Errorf("port %d still open", proxyPort)
  54. }
  55. func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
  56. for i := 0; i < 50; i++ {
  57. conn, err := net.Dial("udp", joinHostPort("", proxyPort))
  58. if err != nil {
  59. return nil
  60. }
  61. conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
  62. // To detect a closed UDP port write, then read.
  63. _, err = conn.Write([]byte("x"))
  64. if err != nil {
  65. if e, ok := err.(net.Error); ok && !e.Timeout() {
  66. return nil
  67. }
  68. }
  69. var buf [4]byte
  70. _, err = conn.Read(buf[0:])
  71. if err != nil {
  72. if e, ok := err.(net.Error); ok && !e.Timeout() {
  73. return nil
  74. }
  75. }
  76. conn.Close()
  77. time.Sleep(1 * time.Millisecond)
  78. }
  79. return fmt.Errorf("port %d still open", proxyPort)
  80. }
  81. func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) {
  82. var svcInfo *ServiceInfo
  83. var exists bool
  84. wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) {
  85. svcInfo, exists = p.getServiceInfo(service)
  86. return exists, nil
  87. })
  88. return svcInfo, exists
  89. }
  90. // udpEchoServer is a simple echo server in UDP, intended for testing the proxy.
  91. type udpEchoServer struct {
  92. net.PacketConn
  93. }
  94. func newUDPEchoServer() (*udpEchoServer, error) {
  95. packetconn, err := net.ListenPacket("udp", ":0")
  96. if err != nil {
  97. return nil, err
  98. }
  99. return &udpEchoServer{packetconn}, nil
  100. }
  101. func (r *udpEchoServer) Loop() {
  102. var buffer [4096]byte
  103. for {
  104. n, cliAddr, err := r.ReadFrom(buffer[0:])
  105. if err != nil {
  106. fmt.Printf("ReadFrom failed: %v\n", err)
  107. continue
  108. }
  109. r.WriteTo(buffer[0:n], cliAddr)
  110. }
  111. }
  112. var tcpServerPort int32
  113. var udpServerPort int32
  114. func TestMain(m *testing.M) {
  115. // Don't handle panics
  116. runtime.ReallyCrash = true
  117. // TCP setup.
  118. tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  119. w.WriteHeader(http.StatusOK)
  120. w.Write([]byte(r.URL.Path[1:]))
  121. }))
  122. defer tcp.Close()
  123. u, err := url.Parse(tcp.URL)
  124. if err != nil {
  125. panic(fmt.Sprintf("failed to parse: %v", err))
  126. }
  127. _, port, err := net.SplitHostPort(u.Host)
  128. if err != nil {
  129. panic(fmt.Sprintf("failed to parse: %v", err))
  130. }
  131. tcpServerPortValue, err := strconv.Atoi(port)
  132. if err != nil {
  133. panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
  134. }
  135. tcpServerPort = int32(tcpServerPortValue)
  136. // UDP setup.
  137. udp, err := newUDPEchoServer()
  138. if err != nil {
  139. panic(fmt.Sprintf("failed to make a UDP server: %v", err))
  140. }
  141. _, port, err = net.SplitHostPort(udp.LocalAddr().String())
  142. if err != nil {
  143. panic(fmt.Sprintf("failed to parse: %v", err))
  144. }
  145. udpServerPortValue, err := strconv.Atoi(port)
  146. if err != nil {
  147. panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
  148. }
  149. udpServerPort = int32(udpServerPortValue)
  150. go udp.Loop()
  151. ret := m.Run()
  152. // it should be safe to call Close() multiple times.
  153. tcp.Close()
  154. os.Exit(ret)
  155. }
  156. func testEchoTCP(t *testing.T, address string, port int) {
  157. path := "aaaaa"
  158. res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path)
  159. if err != nil {
  160. t.Fatalf("error connecting to server: %v", err)
  161. }
  162. defer res.Body.Close()
  163. data, err := ioutil.ReadAll(res.Body)
  164. if err != nil {
  165. t.Errorf("error reading data: %v %v", err, string(data))
  166. }
  167. if string(data) != path {
  168. t.Errorf("expected: %s, got %s", path, string(data))
  169. }
  170. }
  171. func testEchoUDP(t *testing.T, address string, port int) {
  172. data := "abc123"
  173. conn, err := net.Dial("udp", joinHostPort(address, port))
  174. if err != nil {
  175. t.Fatalf("error connecting to server: %v", err)
  176. }
  177. if _, err := conn.Write([]byte(data)); err != nil {
  178. t.Fatalf("error sending to server: %v", err)
  179. }
  180. var resp [1024]byte
  181. n, err := conn.Read(resp[0:])
  182. if err != nil {
  183. t.Errorf("error receiving data: %v", err)
  184. }
  185. if string(resp[0:n]) != data {
  186. t.Errorf("expected: %s, got %s", data, string(resp[0:n]))
  187. }
  188. }
  189. func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
  190. var got int32
  191. for i := 0; i < 600; i++ {
  192. got = atomic.LoadInt32(&p.numProxyLoops)
  193. if got == want {
  194. return
  195. }
  196. time.Sleep(100 * time.Millisecond)
  197. }
  198. t.Errorf("expected %d ProxyLoops running, got %d", want, got)
  199. }
  200. func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time.Duration) {
  201. var got int
  202. now := time.Now()
  203. deadline := now.Add(timeout)
  204. for time.Now().Before(deadline) {
  205. s.ActiveClients.Mu.Lock()
  206. got = len(s.ActiveClients.Clients)
  207. s.ActiveClients.Mu.Unlock()
  208. if got == want {
  209. return
  210. }
  211. time.Sleep(500 * time.Millisecond)
  212. }
  213. t.Errorf("expected %d ProxyClients live, got %d", want, got)
  214. }
  215. func startProxier(p *Proxier, t *testing.T) {
  216. go func() {
  217. p.SyncLoop()
  218. }()
  219. waitForNumProxyLoops(t, p, 0)
  220. p.OnServiceSynced()
  221. p.OnEndpointsSynced()
  222. }
  223. func TestTCPProxy(t *testing.T) {
  224. lb := NewLoadBalancerRR()
  225. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  226. lb.OnEndpointsAdd(&v1.Endpoints{
  227. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  228. Subsets: []v1.EndpointSubset{{
  229. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  230. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  231. }},
  232. })
  233. fexec := makeFakeExec()
  234. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. startProxier(p, t)
  239. defer p.shutdown()
  240. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  241. if err != nil {
  242. t.Fatalf("error adding new service: %#v", err)
  243. }
  244. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  245. waitForNumProxyLoops(t, p, 1)
  246. }
  247. func TestUDPProxy(t *testing.T) {
  248. lb := NewLoadBalancerRR()
  249. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  250. lb.OnEndpointsAdd(&v1.Endpoints{
  251. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  252. Subsets: []v1.EndpointSubset{{
  253. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  254. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  255. }},
  256. })
  257. fexec := makeFakeExec()
  258. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  259. if err != nil {
  260. t.Fatal(err)
  261. }
  262. startProxier(p, t)
  263. defer p.shutdown()
  264. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  265. if err != nil {
  266. t.Fatalf("error adding new service: %#v", err)
  267. }
  268. testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
  269. waitForNumProxyLoops(t, p, 1)
  270. }
  271. func TestUDPProxyTimeout(t *testing.T) {
  272. lb := NewLoadBalancerRR()
  273. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  274. lb.OnEndpointsAdd(&v1.Endpoints{
  275. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  276. Subsets: []v1.EndpointSubset{{
  277. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  278. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  279. }},
  280. })
  281. fexec := makeFakeExec()
  282. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. startProxier(p, t)
  287. defer p.shutdown()
  288. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  289. if err != nil {
  290. t.Fatalf("error adding new service: %#v", err)
  291. }
  292. waitForNumProxyLoops(t, p, 1)
  293. testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
  294. // When connecting to a UDP service endpoint, there should be a Conn for proxy.
  295. waitForNumProxyClients(t, svcInfo, 1, time.Second)
  296. // If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout.
  297. waitForNumProxyClients(t, svcInfo, 0, 2*time.Second)
  298. }
  299. func TestMultiPortProxy(t *testing.T) {
  300. lb := NewLoadBalancerRR()
  301. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
  302. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
  303. lb.OnEndpointsAdd(&v1.Endpoints{
  304. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  305. Subsets: []v1.EndpointSubset{{
  306. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  307. Ports: []v1.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
  308. }},
  309. })
  310. lb.OnEndpointsAdd(&v1.Endpoints{
  311. ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
  312. Subsets: []v1.EndpointSubset{{
  313. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  314. Ports: []v1.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
  315. }},
  316. })
  317. fexec := makeFakeExec()
  318. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. startProxier(p, t)
  323. defer p.shutdown()
  324. svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
  325. if err != nil {
  326. t.Fatalf("error adding new service: %#v", err)
  327. }
  328. testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
  329. waitForNumProxyLoops(t, p, 1)
  330. svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
  331. if err != nil {
  332. t.Fatalf("error adding new service: %#v", err)
  333. }
  334. testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort)
  335. waitForNumProxyLoops(t, p, 2)
  336. }
  337. func TestMultiPortOnServiceAdd(t *testing.T) {
  338. lb := NewLoadBalancerRR()
  339. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  340. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
  341. serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
  342. fexec := makeFakeExec()
  343. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. startProxier(p, t)
  348. defer p.shutdown()
  349. p.OnServiceAdd(&v1.Service{
  350. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  351. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  352. Name: "p",
  353. Port: 80,
  354. Protocol: "TCP",
  355. }, {
  356. Name: "q",
  357. Port: 81,
  358. Protocol: "UDP",
  359. }}},
  360. })
  361. waitForNumProxyLoops(t, p, 2)
  362. svcInfo, exists := waitForServiceInfo(p, serviceP)
  363. if !exists {
  364. t.Fatalf("can't find serviceInfo for %s", serviceP)
  365. }
  366. if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" {
  367. t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
  368. }
  369. svcInfo, exists = waitForServiceInfo(p, serviceQ)
  370. if !exists {
  371. t.Fatalf("can't find serviceInfo for %s", serviceQ)
  372. }
  373. if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" {
  374. t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo)
  375. }
  376. svcInfo, exists = p.getServiceInfo(serviceX)
  377. if exists {
  378. t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
  379. }
  380. }
  381. // Helper: Stops the proxy for the named service.
  382. func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
  383. proxier.mu.Lock()
  384. defer proxier.mu.Unlock()
  385. info, found := proxier.serviceMap[service]
  386. if !found {
  387. return fmt.Errorf("unknown service: %s", service)
  388. }
  389. return proxier.stopProxy(service, info)
  390. }
  391. func TestTCPProxyStop(t *testing.T) {
  392. lb := NewLoadBalancerRR()
  393. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  394. lb.OnEndpointsAdd(&v1.Endpoints{
  395. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  396. Subsets: []v1.EndpointSubset{{
  397. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  398. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  399. }},
  400. })
  401. fexec := makeFakeExec()
  402. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  403. if err != nil {
  404. t.Fatal(err)
  405. }
  406. startProxier(p, t)
  407. defer p.shutdown()
  408. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  409. if err != nil {
  410. t.Fatalf("error adding new service: %#v", err)
  411. }
  412. if !svcInfo.IsAlive() {
  413. t.Fatalf("wrong value for IsAlive(): expected true")
  414. }
  415. conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
  416. if err != nil {
  417. t.Fatalf("error connecting to proxy: %v", err)
  418. }
  419. conn.Close()
  420. waitForNumProxyLoops(t, p, 1)
  421. stopProxyByName(p, service)
  422. if svcInfo.IsAlive() {
  423. t.Fatalf("wrong value for IsAlive(): expected false")
  424. }
  425. // Wait for the port to really close.
  426. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
  427. t.Fatalf(err.Error())
  428. }
  429. waitForNumProxyLoops(t, p, 0)
  430. }
  431. func TestUDPProxyStop(t *testing.T) {
  432. lb := NewLoadBalancerRR()
  433. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  434. lb.OnEndpointsAdd(&v1.Endpoints{
  435. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  436. Subsets: []v1.EndpointSubset{{
  437. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  438. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  439. }},
  440. })
  441. fexec := makeFakeExec()
  442. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  443. if err != nil {
  444. t.Fatal(err)
  445. }
  446. startProxier(p, t)
  447. defer p.shutdown()
  448. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  449. if err != nil {
  450. t.Fatalf("error adding new service: %#v", err)
  451. }
  452. conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
  453. if err != nil {
  454. t.Fatalf("error connecting to proxy: %v", err)
  455. }
  456. conn.Close()
  457. waitForNumProxyLoops(t, p, 1)
  458. stopProxyByName(p, service)
  459. // Wait for the port to really close.
  460. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
  461. t.Fatalf(err.Error())
  462. }
  463. waitForNumProxyLoops(t, p, 0)
  464. }
  465. func TestTCPProxyUpdateDelete(t *testing.T) {
  466. lb := NewLoadBalancerRR()
  467. servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  468. lb.OnEndpointsAdd(&v1.Endpoints{
  469. ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name},
  470. Subsets: []v1.EndpointSubset{{
  471. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  472. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  473. }},
  474. })
  475. fexec := makeFakeExec()
  476. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  477. if err != nil {
  478. t.Fatal(err)
  479. }
  480. startProxier(p, t)
  481. defer p.shutdown()
  482. service := &v1.Service{
  483. ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace},
  484. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  485. Name: "p",
  486. Port: 9997,
  487. Protocol: "TCP",
  488. }}},
  489. }
  490. p.OnServiceAdd(service)
  491. waitForNumProxyLoops(t, p, 1)
  492. p.OnServiceDelete(service)
  493. if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil {
  494. t.Fatalf(err.Error())
  495. }
  496. waitForNumProxyLoops(t, p, 0)
  497. }
  498. func TestUDPProxyUpdateDelete(t *testing.T) {
  499. lb := NewLoadBalancerRR()
  500. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  501. lb.OnEndpointsAdd(&v1.Endpoints{
  502. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  503. Subsets: []v1.EndpointSubset{{
  504. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  505. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  506. }},
  507. })
  508. fexec := makeFakeExec()
  509. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  510. if err != nil {
  511. t.Fatal(err)
  512. }
  513. startProxier(p, t)
  514. defer p.shutdown()
  515. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  516. if err != nil {
  517. t.Fatalf("error adding new service: %#v", err)
  518. }
  519. conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
  520. if err != nil {
  521. t.Fatalf("error connecting to proxy: %v", err)
  522. }
  523. conn.Close()
  524. waitForNumProxyLoops(t, p, 1)
  525. p.OnServiceDelete(&v1.Service{
  526. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  527. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  528. Name: "p",
  529. Port: int32(svcInfo.proxyPort),
  530. Protocol: "UDP",
  531. }}},
  532. })
  533. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
  534. t.Fatalf(err.Error())
  535. }
  536. waitForNumProxyLoops(t, p, 0)
  537. }
  538. func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
  539. lb := NewLoadBalancerRR()
  540. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  541. endpoint := &v1.Endpoints{
  542. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  543. Subsets: []v1.EndpointSubset{{
  544. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  545. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  546. }},
  547. }
  548. lb.OnEndpointsAdd(endpoint)
  549. fexec := makeFakeExec()
  550. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  551. if err != nil {
  552. t.Fatal(err)
  553. }
  554. startProxier(p, t)
  555. defer p.shutdown()
  556. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  557. if err != nil {
  558. t.Fatalf("error adding new service: %#v", err)
  559. }
  560. conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
  561. if err != nil {
  562. t.Fatalf("error connecting to proxy: %v", err)
  563. }
  564. conn.Close()
  565. waitForNumProxyLoops(t, p, 1)
  566. p.OnServiceDelete(&v1.Service{
  567. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  568. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  569. Name: "p",
  570. Port: int32(svcInfo.proxyPort),
  571. Protocol: "TCP",
  572. }}},
  573. })
  574. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
  575. t.Fatalf(err.Error())
  576. }
  577. waitForNumProxyLoops(t, p, 0)
  578. // need to add endpoint here because it got clean up during service delete
  579. lb.OnEndpointsAdd(endpoint)
  580. p.OnServiceAdd(&v1.Service{
  581. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  582. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  583. Name: "p",
  584. Port: int32(svcInfo.proxyPort),
  585. Protocol: "TCP",
  586. }}},
  587. })
  588. svcInfo, exists := waitForServiceInfo(p, service)
  589. if !exists {
  590. t.Fatalf("can't find serviceInfo for %s", service)
  591. }
  592. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  593. waitForNumProxyLoops(t, p, 1)
  594. }
  595. func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
  596. lb := NewLoadBalancerRR()
  597. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  598. endpoint := &v1.Endpoints{
  599. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  600. Subsets: []v1.EndpointSubset{{
  601. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  602. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  603. }},
  604. }
  605. lb.OnEndpointsAdd(endpoint)
  606. fexec := makeFakeExec()
  607. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  608. if err != nil {
  609. t.Fatal(err)
  610. }
  611. startProxier(p, t)
  612. defer p.shutdown()
  613. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  614. if err != nil {
  615. t.Fatalf("error adding new service: %#v", err)
  616. }
  617. conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
  618. if err != nil {
  619. t.Fatalf("error connecting to proxy: %v", err)
  620. }
  621. conn.Close()
  622. waitForNumProxyLoops(t, p, 1)
  623. p.OnServiceDelete(&v1.Service{
  624. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  625. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  626. Name: "p",
  627. Port: int32(svcInfo.proxyPort),
  628. Protocol: "UDP",
  629. }}},
  630. })
  631. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
  632. t.Fatalf(err.Error())
  633. }
  634. waitForNumProxyLoops(t, p, 0)
  635. // need to add endpoint here because it got clean up during service delete
  636. lb.OnEndpointsAdd(endpoint)
  637. p.OnServiceAdd(&v1.Service{
  638. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  639. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  640. Name: "p",
  641. Port: int32(svcInfo.proxyPort),
  642. Protocol: "UDP",
  643. }}},
  644. })
  645. svcInfo, exists := waitForServiceInfo(p, service)
  646. if !exists {
  647. t.Fatalf("can't find serviceInfo")
  648. }
  649. testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
  650. waitForNumProxyLoops(t, p, 1)
  651. }
  652. func TestTCPProxyUpdatePort(t *testing.T) {
  653. lb := NewLoadBalancerRR()
  654. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  655. lb.OnEndpointsAdd(&v1.Endpoints{
  656. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  657. Subsets: []v1.EndpointSubset{{
  658. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  659. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  660. }},
  661. })
  662. fexec := makeFakeExec()
  663. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  664. if err != nil {
  665. t.Fatal(err)
  666. }
  667. startProxier(p, t)
  668. defer p.shutdown()
  669. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  670. if err != nil {
  671. t.Fatalf("error adding new service: %#v", err)
  672. }
  673. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  674. waitForNumProxyLoops(t, p, 1)
  675. p.OnServiceAdd(&v1.Service{
  676. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  677. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  678. Name: "p",
  679. Port: 99,
  680. Protocol: "TCP",
  681. }}},
  682. })
  683. // Wait for the socket to actually get free.
  684. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
  685. t.Fatalf(err.Error())
  686. }
  687. svcInfo, exists := waitForServiceInfo(p, service)
  688. if !exists {
  689. t.Fatalf("can't find serviceInfo")
  690. }
  691. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  692. // This is a bit async, but this should be sufficient.
  693. time.Sleep(500 * time.Millisecond)
  694. waitForNumProxyLoops(t, p, 1)
  695. }
  696. func TestUDPProxyUpdatePort(t *testing.T) {
  697. lb := NewLoadBalancerRR()
  698. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  699. lb.OnEndpointsAdd(&v1.Endpoints{
  700. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  701. Subsets: []v1.EndpointSubset{{
  702. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  703. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  704. }},
  705. })
  706. fexec := makeFakeExec()
  707. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  708. if err != nil {
  709. t.Fatal(err)
  710. }
  711. startProxier(p, t)
  712. defer p.shutdown()
  713. svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
  714. if err != nil {
  715. t.Fatalf("error adding new service: %#v", err)
  716. }
  717. waitForNumProxyLoops(t, p, 1)
  718. p.OnServiceAdd(&v1.Service{
  719. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  720. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  721. Name: "p",
  722. Port: 99,
  723. Protocol: "UDP",
  724. }}},
  725. })
  726. // Wait for the socket to actually get free.
  727. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
  728. t.Fatalf(err.Error())
  729. }
  730. svcInfo, exists := waitForServiceInfo(p, service)
  731. if !exists {
  732. t.Fatalf("can't find serviceInfo")
  733. }
  734. testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
  735. waitForNumProxyLoops(t, p, 1)
  736. }
  737. func TestProxyUpdatePublicIPs(t *testing.T) {
  738. lb := NewLoadBalancerRR()
  739. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  740. lb.OnEndpointsAdd(&v1.Endpoints{
  741. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  742. Subsets: []v1.EndpointSubset{{
  743. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  744. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  745. }},
  746. })
  747. fexec := makeFakeExec()
  748. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  749. if err != nil {
  750. t.Fatal(err)
  751. }
  752. startProxier(p, t)
  753. defer p.shutdown()
  754. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  755. if err != nil {
  756. t.Fatalf("error adding new service: %#v", err)
  757. }
  758. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  759. waitForNumProxyLoops(t, p, 1)
  760. p.OnServiceAdd(&v1.Service{
  761. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  762. Spec: v1.ServiceSpec{
  763. Ports: []v1.ServicePort{{
  764. Name: "p",
  765. Port: int32(svcInfo.portal.port),
  766. Protocol: "TCP",
  767. }},
  768. ClusterIP: svcInfo.portal.ip.String(),
  769. ExternalIPs: []string{"4.3.2.1"},
  770. },
  771. })
  772. // Wait for the socket to actually get free.
  773. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
  774. t.Fatalf(err.Error())
  775. }
  776. svcInfo, exists := waitForServiceInfo(p, service)
  777. if !exists {
  778. t.Fatalf("can't find serviceInfo")
  779. }
  780. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  781. // This is a bit async, but this should be sufficient.
  782. time.Sleep(500 * time.Millisecond)
  783. waitForNumProxyLoops(t, p, 1)
  784. }
  785. func TestProxyUpdatePortal(t *testing.T) {
  786. lb := NewLoadBalancerRR()
  787. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  788. endpoint := &v1.Endpoints{
  789. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  790. Subsets: []v1.EndpointSubset{{
  791. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  792. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  793. }},
  794. }
  795. lb.OnEndpointsAdd(endpoint)
  796. fexec := makeFakeExec()
  797. p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
  798. if err != nil {
  799. t.Fatal(err)
  800. }
  801. startProxier(p, t)
  802. defer p.shutdown()
  803. svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
  804. if err != nil {
  805. t.Fatalf("error adding new service: %#v", err)
  806. }
  807. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  808. waitForNumProxyLoops(t, p, 1)
  809. svcv0 := &v1.Service{
  810. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  811. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  812. Name: "p",
  813. Port: int32(svcInfo.proxyPort),
  814. Protocol: "TCP",
  815. }}},
  816. }
  817. svcv1 := &v1.Service{
  818. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  819. Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{{
  820. Name: "p",
  821. Port: int32(svcInfo.proxyPort),
  822. Protocol: "TCP",
  823. }}},
  824. }
  825. p.OnServiceUpdate(svcv0, svcv1)
  826. // Wait for the service to be removed because it had an empty ClusterIP
  827. var exists bool
  828. for i := 0; i < 50; i++ {
  829. _, exists = p.getServiceInfo(service)
  830. if !exists {
  831. break
  832. }
  833. time.Sleep(50 * time.Millisecond)
  834. }
  835. if exists {
  836. t.Fatalf("service with empty ClusterIP should not be included in the proxy")
  837. }
  838. svcv2 := &v1.Service{
  839. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  840. Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{{
  841. Name: "p",
  842. Port: int32(svcInfo.proxyPort),
  843. Protocol: "TCP",
  844. }}},
  845. }
  846. p.OnServiceUpdate(svcv1, svcv2)
  847. _, exists = p.getServiceInfo(service)
  848. if exists {
  849. t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
  850. }
  851. svcv3 := &v1.Service{
  852. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  853. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  854. Name: "p",
  855. Port: int32(svcInfo.proxyPort),
  856. Protocol: "TCP",
  857. }}},
  858. }
  859. p.OnServiceUpdate(svcv2, svcv3)
  860. lb.OnEndpointsAdd(endpoint)
  861. svcInfo, exists = waitForServiceInfo(p, service)
  862. if !exists {
  863. t.Fatalf("service with ClusterIP set not found in the proxy")
  864. }
  865. testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
  866. waitForNumProxyLoops(t, p, 1)
  867. }
  868. type fakeRunner struct{}
  869. // assert fakeAsyncRunner is a ProxyProvider
  870. var _ asyncRunnerInterface = &fakeRunner{}
  871. func (f fakeRunner) Run() {
  872. }
  873. func (f fakeRunner) Loop(stop <-chan struct{}) {
  874. }
  875. func TestOnServiceAddChangeMap(t *testing.T) {
  876. fexec := makeFakeExec()
  877. // Use long minSyncPeriod so we can test that immediate syncs work
  878. p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket)
  879. if err != nil {
  880. t.Fatal(err)
  881. }
  882. // Fake out sync runner
  883. p.syncRunner = fakeRunner{}
  884. serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"}
  885. service := &v1.Service{
  886. ObjectMeta: serviceMeta,
  887. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
  888. Name: "p",
  889. Port: 99,
  890. Protocol: "TCP",
  891. }}},
  892. }
  893. serviceUpdate := &v1.Service{
  894. ObjectMeta: serviceMeta,
  895. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{
  896. Name: "p",
  897. Port: 100,
  898. Protocol: "TCP",
  899. }}},
  900. }
  901. serviceUpdate2 := &v1.Service{
  902. ObjectMeta: serviceMeta,
  903. Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{
  904. Name: "p",
  905. Port: 101,
  906. Protocol: "TCP",
  907. }}},
  908. }
  909. type onServiceTest struct {
  910. detail string
  911. changes []serviceChange
  912. expectedChange *serviceChange
  913. }
  914. tests := []onServiceTest{
  915. {
  916. detail: "add",
  917. changes: []serviceChange{
  918. {current: service},
  919. },
  920. expectedChange: &serviceChange{
  921. current: service,
  922. },
  923. },
  924. {
  925. detail: "add+update=add",
  926. changes: []serviceChange{
  927. {current: service},
  928. {
  929. previous: service,
  930. current: serviceUpdate,
  931. },
  932. },
  933. expectedChange: &serviceChange{
  934. current: serviceUpdate,
  935. },
  936. },
  937. {
  938. detail: "add+del=none",
  939. changes: []serviceChange{
  940. {current: service},
  941. {previous: service},
  942. },
  943. },
  944. {
  945. detail: "update+update=update",
  946. changes: []serviceChange{
  947. {
  948. previous: service,
  949. current: serviceUpdate,
  950. },
  951. {
  952. previous: serviceUpdate,
  953. current: serviceUpdate2,
  954. },
  955. },
  956. expectedChange: &serviceChange{
  957. previous: service,
  958. current: serviceUpdate2,
  959. },
  960. },
  961. {
  962. detail: "update+del=del",
  963. changes: []serviceChange{
  964. {
  965. previous: service,
  966. current: serviceUpdate,
  967. },
  968. {previous: serviceUpdate},
  969. },
  970. // change collapsing always keeps the oldest service
  971. // info since correct unmerging depends on the least
  972. // recent update, not the most current.
  973. expectedChange: &serviceChange{
  974. previous: service,
  975. },
  976. },
  977. {
  978. detail: "del+add=update",
  979. changes: []serviceChange{
  980. {previous: service},
  981. {current: serviceUpdate},
  982. },
  983. expectedChange: &serviceChange{
  984. previous: service,
  985. current: serviceUpdate,
  986. },
  987. },
  988. }
  989. for _, test := range tests {
  990. for _, change := range test.changes {
  991. p.serviceChange(change.previous, change.current, test.detail)
  992. }
  993. if test.expectedChange != nil {
  994. if len(p.serviceChanges) != 1 {
  995. t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges))
  996. }
  997. expectedService := test.expectedChange.current
  998. if expectedService == nil {
  999. expectedService = test.expectedChange.previous
  1000. }
  1001. svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name}
  1002. change, ok := p.serviceChanges[svcName]
  1003. if !ok {
  1004. t.Fatalf("[%s] did not find service change for %v", test.detail, svcName)
  1005. }
  1006. if !reflect.DeepEqual(change.previous, test.expectedChange.previous) {
  1007. t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous)
  1008. }
  1009. if !reflect.DeepEqual(change.current, test.expectedChange.current) {
  1010. t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current)
  1011. }
  1012. } else {
  1013. if len(p.serviceChanges) != 0 {
  1014. t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges))
  1015. }
  1016. }
  1017. }
  1018. }
  1019. func TestNoopEndpointSlice(t *testing.T) {
  1020. p := Proxier{}
  1021. p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
  1022. p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{})
  1023. p.OnEndpointSliceDelete(&discovery.EndpointSlice{})
  1024. p.OnEndpointSlicesSynced()
  1025. }
  1026. func makeFakeExec() *fakeexec.FakeExec {
  1027. fcmd := fakeexec.FakeCmd{
  1028. CombinedOutputScript: []fakeexec.FakeAction{
  1029. func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
  1030. },
  1031. }
  1032. return &fakeexec.FakeExec{
  1033. CommandScript: []fakeexec.FakeCommandAction{
  1034. func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
  1035. },
  1036. LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
  1037. }
  1038. }
  1039. // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in