proxier_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "net/url"
  21. "os"
  22. "strconv"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "k8s.io/api/core/v1"
  27. discovery "k8s.io/api/discovery/v1beta1"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/runtime"
  31. "k8s.io/kubernetes/pkg/proxy"
  32. netshtest "k8s.io/kubernetes/pkg/util/netsh/testing"
  33. )
  34. const (
  35. udpIdleTimeoutForTest = 250 * time.Millisecond
  36. )
  37. func joinHostPort(host string, port int) string {
  38. return net.JoinHostPort(host, fmt.Sprintf("%d", port))
  39. }
  40. func waitForClosedPortTCP(p *Proxier, proxyPort int) error {
  41. for i := 0; i < 50; i++ {
  42. conn, err := net.Dial("tcp", joinHostPort("", proxyPort))
  43. if err != nil {
  44. return nil
  45. }
  46. conn.Close()
  47. time.Sleep(1 * time.Millisecond)
  48. }
  49. return fmt.Errorf("port %d still open", proxyPort)
  50. }
  51. func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
  52. for i := 0; i < 50; i++ {
  53. conn, err := net.Dial("udp", joinHostPort("", proxyPort))
  54. if err != nil {
  55. return nil
  56. }
  57. conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
  58. // To detect a closed UDP port write, then read.
  59. _, err = conn.Write([]byte("x"))
  60. if err != nil {
  61. if e, ok := err.(net.Error); ok && !e.Timeout() {
  62. return nil
  63. }
  64. }
  65. var buf [4]byte
  66. _, err = conn.Read(buf[0:])
  67. if err != nil {
  68. if e, ok := err.(net.Error); ok && !e.Timeout() {
  69. return nil
  70. }
  71. }
  72. conn.Close()
  73. time.Sleep(1 * time.Millisecond)
  74. }
  75. return fmt.Errorf("port %d still open", proxyPort)
  76. }
  77. // udpEchoServer is a simple echo server in UDP, intended for testing the proxy.
  78. type udpEchoServer struct {
  79. net.PacketConn
  80. }
  81. func newUDPEchoServer() (*udpEchoServer, error) {
  82. packetconn, err := net.ListenPacket("udp", ":0")
  83. if err != nil {
  84. return nil, err
  85. }
  86. return &udpEchoServer{packetconn}, nil
  87. }
  88. func (r *udpEchoServer) Loop() {
  89. var buffer [4096]byte
  90. for {
  91. n, cliAddr, err := r.ReadFrom(buffer[0:])
  92. if err != nil {
  93. fmt.Printf("ReadFrom failed: %v\n", err)
  94. continue
  95. }
  96. r.WriteTo(buffer[0:n], cliAddr)
  97. }
  98. }
  99. var tcpServerPort int32
  100. var udpServerPort int32
  101. func TestMain(m *testing.M) {
  102. // Don't handle panics
  103. runtime.ReallyCrash = true
  104. // TCP setup.
  105. tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  106. w.WriteHeader(http.StatusOK)
  107. w.Write([]byte(r.URL.Path[1:]))
  108. }))
  109. defer tcp.Close()
  110. u, err := url.Parse(tcp.URL)
  111. if err != nil {
  112. panic(fmt.Sprintf("failed to parse: %v", err))
  113. }
  114. _, port, err := net.SplitHostPort(u.Host)
  115. if err != nil {
  116. panic(fmt.Sprintf("failed to parse: %v", err))
  117. }
  118. tcpServerPortValue, err := strconv.Atoi(port)
  119. if err != nil {
  120. panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
  121. }
  122. tcpServerPort = int32(tcpServerPortValue)
  123. // UDP setup.
  124. udp, err := newUDPEchoServer()
  125. if err != nil {
  126. panic(fmt.Sprintf("failed to make a UDP server: %v", err))
  127. }
  128. _, port, err = net.SplitHostPort(udp.LocalAddr().String())
  129. if err != nil {
  130. panic(fmt.Sprintf("failed to parse: %v", err))
  131. }
  132. udpServerPortValue, err := strconv.Atoi(port)
  133. if err != nil {
  134. panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
  135. }
  136. udpServerPort = int32(udpServerPortValue)
  137. go udp.Loop()
  138. ret := m.Run()
  139. // it should be safe to call Close() multiple times.
  140. tcp.Close()
  141. os.Exit(ret)
  142. }
  143. func testEchoTCP(t *testing.T, address string, port int) {
  144. path := "aaaaa"
  145. res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path)
  146. if err != nil {
  147. t.Fatalf("error connecting to server: %v", err)
  148. }
  149. defer res.Body.Close()
  150. data, err := ioutil.ReadAll(res.Body)
  151. if err != nil {
  152. t.Errorf("error reading data: %v %v", err, string(data))
  153. }
  154. if string(data) != path {
  155. t.Errorf("expected: %s, got %s", path, string(data))
  156. }
  157. }
  158. func testEchoUDP(t *testing.T, address string, port int) {
  159. data := "abc123"
  160. conn, err := net.Dial("udp", joinHostPort(address, port))
  161. if err != nil {
  162. t.Fatalf("error connecting to server: %v", err)
  163. }
  164. if _, err := conn.Write([]byte(data)); err != nil {
  165. t.Fatalf("error sending to server: %v", err)
  166. }
  167. var resp [1024]byte
  168. n, err := conn.Read(resp[0:])
  169. if err != nil {
  170. t.Errorf("error receiving data: %v", err)
  171. }
  172. if string(resp[0:n]) != data {
  173. t.Errorf("expected: %s, got %s", data, string(resp[0:n]))
  174. }
  175. }
  176. func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
  177. var got int32
  178. for i := 0; i < 600; i++ {
  179. got = atomic.LoadInt32(&p.numProxyLoops)
  180. if got == want {
  181. return
  182. }
  183. time.Sleep(100 * time.Millisecond)
  184. }
  185. t.Errorf("expected %d ProxyLoops running, got %d", want, got)
  186. }
  187. func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) {
  188. var got int
  189. now := time.Now()
  190. deadline := now.Add(timeout)
  191. for time.Now().Before(deadline) {
  192. s.activeClients.mu.Lock()
  193. got = len(s.activeClients.clients)
  194. s.activeClients.mu.Unlock()
  195. if got == want {
  196. return
  197. }
  198. time.Sleep(500 * time.Millisecond)
  199. }
  200. t.Errorf("expected %d ProxyClients live, got %d", want, got)
  201. }
  202. func getPortNum(t *testing.T, addr string) int {
  203. _, portStr, err := net.SplitHostPort(addr)
  204. if err != nil {
  205. t.Errorf("error getting port from %s", addr)
  206. return 0
  207. }
  208. portNum, err := strconv.Atoi(portStr)
  209. if err != nil {
  210. t.Errorf("error getting port from %s", addr)
  211. return 0
  212. }
  213. return portNum
  214. }
  215. func TestTCPProxy(t *testing.T) {
  216. lb := NewLoadBalancerRR()
  217. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  218. lb.OnEndpointsAdd(&v1.Endpoints{
  219. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  220. Subsets: []v1.EndpointSubset{{
  221. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  222. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  223. }},
  224. })
  225. listenIP := "0.0.0.0"
  226. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. waitForNumProxyLoops(t, p, 0)
  231. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  232. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  233. if err != nil {
  234. t.Fatalf("error adding new service: %#v", err)
  235. }
  236. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  237. waitForNumProxyLoops(t, p, 1)
  238. }
  239. func TestUDPProxy(t *testing.T) {
  240. lb := NewLoadBalancerRR()
  241. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  242. lb.OnEndpointsAdd(&v1.Endpoints{
  243. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  244. Subsets: []v1.EndpointSubset{{
  245. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  246. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  247. }},
  248. })
  249. listenIP := "0.0.0.0"
  250. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. waitForNumProxyLoops(t, p, 0)
  255. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  256. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  257. if err != nil {
  258. t.Fatalf("error adding new service: %#v", err)
  259. }
  260. testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  261. waitForNumProxyLoops(t, p, 1)
  262. }
  263. func TestUDPProxyTimeout(t *testing.T) {
  264. lb := NewLoadBalancerRR()
  265. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  266. lb.OnEndpointsAdd(&v1.Endpoints{
  267. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  268. Subsets: []v1.EndpointSubset{{
  269. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  270. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  271. }},
  272. })
  273. listenIP := "0.0.0.0"
  274. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  275. if err != nil {
  276. t.Fatal(err)
  277. }
  278. waitForNumProxyLoops(t, p, 0)
  279. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  280. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  281. if err != nil {
  282. t.Fatalf("error adding new service: %#v", err)
  283. }
  284. waitForNumProxyLoops(t, p, 1)
  285. testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  286. // When connecting to a UDP service endpoint, there should be a Conn for proxy.
  287. waitForNumProxyClients(t, svcInfo, 1, time.Second)
  288. // If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout.
  289. waitForNumProxyClients(t, svcInfo, 0, 2*time.Second)
  290. }
  291. func TestMultiPortProxy(t *testing.T) {
  292. lb := NewLoadBalancerRR()
  293. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
  294. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
  295. lb.OnEndpointsAdd(&v1.Endpoints{
  296. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  297. Subsets: []v1.EndpointSubset{{
  298. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  299. Ports: []v1.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
  300. }},
  301. })
  302. lb.OnEndpointsAdd(&v1.Endpoints{
  303. ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
  304. Subsets: []v1.EndpointSubset{{
  305. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  306. Ports: []v1.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
  307. }},
  308. })
  309. listenIP := "0.0.0.0"
  310. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  311. if err != nil {
  312. t.Fatal(err)
  313. }
  314. waitForNumProxyLoops(t, p, 0)
  315. servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
  316. svcInfoP, err := p.addServicePortPortal(servicePortPortalNameP, "TCP", listenIP, 0, time.Second)
  317. if err != nil {
  318. t.Fatalf("error adding new service: %#v", err)
  319. }
  320. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfoP.socket.Addr().String()))
  321. waitForNumProxyLoops(t, p, 1)
  322. servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP}
  323. svcInfoQ, err := p.addServicePortPortal(servicePortPortalNameQ, "UDP", listenIP, 0, time.Second)
  324. if err != nil {
  325. t.Fatalf("error adding new service: %#v", err)
  326. }
  327. testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfoQ.socket.Addr().String()))
  328. waitForNumProxyLoops(t, p, 2)
  329. }
  330. func TestMultiPortOnServiceAdd(t *testing.T) {
  331. lb := NewLoadBalancerRR()
  332. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  333. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
  334. serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
  335. listenIP := "0.0.0.0"
  336. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  337. if err != nil {
  338. t.Fatal(err)
  339. }
  340. waitForNumProxyLoops(t, p, 0)
  341. p.OnServiceAdd(&v1.Service{
  342. ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  343. Spec: v1.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []v1.ServicePort{{
  344. Name: "p",
  345. Port: 0,
  346. Protocol: "TCP",
  347. }, {
  348. Name: "q",
  349. Port: 0,
  350. Protocol: "UDP",
  351. }}},
  352. })
  353. waitForNumProxyLoops(t, p, 2)
  354. servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
  355. svcInfo, exists := p.getServiceInfo(servicePortPortalNameP)
  356. if !exists {
  357. t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameP)
  358. }
  359. if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "TCP" {
  360. t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
  361. }
  362. servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP}
  363. svcInfo, exists = p.getServiceInfo(servicePortPortalNameQ)
  364. if !exists {
  365. t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameQ)
  366. }
  367. if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "UDP" {
  368. t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo)
  369. }
  370. servicePortPortalNameX := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceX.Namespace, Name: serviceX.Name}, Port: serviceX.Port, PortalIPName: listenIP}
  371. svcInfo, exists = p.getServiceInfo(servicePortPortalNameX)
  372. if exists {
  373. t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
  374. }
  375. }
  376. // Helper: Stops the proxy for the named service.
  377. func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error {
  378. info, found := proxier.getServiceInfo(service)
  379. if !found {
  380. return fmt.Errorf("unknown service: %s", service)
  381. }
  382. return proxier.stopProxy(service, info)
  383. }
  384. func TestTCPProxyStop(t *testing.T) {
  385. lb := NewLoadBalancerRR()
  386. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  387. lb.OnEndpointsAdd(&v1.Endpoints{
  388. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  389. Subsets: []v1.EndpointSubset{{
  390. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  391. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  392. }},
  393. })
  394. listenIP := "0.0.0.0"
  395. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  396. if err != nil {
  397. t.Fatal(err)
  398. }
  399. waitForNumProxyLoops(t, p, 0)
  400. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  401. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  402. if err != nil {
  403. t.Fatalf("error adding new service: %#v", err)
  404. }
  405. if !svcInfo.isAlive() {
  406. t.Fatalf("wrong value for isAlive(): expected true")
  407. }
  408. conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  409. if err != nil {
  410. t.Fatalf("error connecting to proxy: %v", err)
  411. }
  412. conn.Close()
  413. waitForNumProxyLoops(t, p, 1)
  414. stopProxyByName(p, servicePortPortalName)
  415. if svcInfo.isAlive() {
  416. t.Fatalf("wrong value for isAlive(): expected false")
  417. }
  418. // Wait for the port to really close.
  419. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  420. t.Fatalf(err.Error())
  421. }
  422. waitForNumProxyLoops(t, p, 0)
  423. }
  424. func TestUDPProxyStop(t *testing.T) {
  425. lb := NewLoadBalancerRR()
  426. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  427. lb.OnEndpointsAdd(&v1.Endpoints{
  428. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  429. Subsets: []v1.EndpointSubset{{
  430. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  431. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  432. }},
  433. })
  434. listenIP := "0.0.0.0"
  435. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  436. if err != nil {
  437. t.Fatal(err)
  438. }
  439. waitForNumProxyLoops(t, p, 0)
  440. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  441. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  442. if err != nil {
  443. t.Fatalf("error adding new service: %#v", err)
  444. }
  445. conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  446. if err != nil {
  447. t.Fatalf("error connecting to proxy: %v", err)
  448. }
  449. conn.Close()
  450. waitForNumProxyLoops(t, p, 1)
  451. stopProxyByName(p, servicePortPortalName)
  452. // Wait for the port to really close.
  453. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  454. t.Fatalf(err.Error())
  455. }
  456. waitForNumProxyLoops(t, p, 0)
  457. }
  458. func TestTCPProxyUpdateDelete(t *testing.T) {
  459. lb := NewLoadBalancerRR()
  460. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  461. lb.OnEndpointsAdd(&v1.Endpoints{
  462. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  463. Subsets: []v1.EndpointSubset{{
  464. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  465. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  466. }},
  467. })
  468. listenIP := "0.0.0.0"
  469. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. waitForNumProxyLoops(t, p, 0)
  474. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  475. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  476. if err != nil {
  477. t.Fatalf("error adding new service: %#v", err)
  478. }
  479. fmt.Println("here0")
  480. conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  481. if err != nil {
  482. t.Fatalf("error connecting to proxy: %v", err)
  483. }
  484. conn.Close()
  485. waitForNumProxyLoops(t, p, 1)
  486. p.OnServiceDelete(&v1.Service{
  487. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  488. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  489. Name: "p",
  490. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  491. Protocol: "TCP",
  492. }}},
  493. })
  494. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  495. t.Fatalf(err.Error())
  496. }
  497. waitForNumProxyLoops(t, p, 0)
  498. }
  499. func TestUDPProxyUpdateDelete(t *testing.T) {
  500. lb := NewLoadBalancerRR()
  501. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  502. lb.OnEndpointsAdd(&v1.Endpoints{
  503. ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
  504. Subsets: []v1.EndpointSubset{{
  505. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  506. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  507. }},
  508. })
  509. listenIP := "0.0.0.0"
  510. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  511. if err != nil {
  512. t.Fatal(err)
  513. }
  514. waitForNumProxyLoops(t, p, 0)
  515. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  516. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  517. if err != nil {
  518. t.Fatalf("error adding new service: %#v", err)
  519. }
  520. conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  521. if err != nil {
  522. t.Fatalf("error connecting to proxy: %v", err)
  523. }
  524. conn.Close()
  525. waitForNumProxyLoops(t, p, 1)
  526. p.OnServiceDelete(&v1.Service{
  527. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  528. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  529. Name: "p",
  530. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  531. Protocol: "UDP",
  532. }}},
  533. })
  534. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  535. t.Fatalf(err.Error())
  536. }
  537. waitForNumProxyLoops(t, p, 0)
  538. }
  539. func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
  540. lb := NewLoadBalancerRR()
  541. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  542. endpoint := &v1.Endpoints{
  543. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  544. Subsets: []v1.EndpointSubset{{
  545. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  546. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  547. }},
  548. }
  549. lb.OnEndpointsAdd(endpoint)
  550. listenIP := "0.0.0.0"
  551. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. waitForNumProxyLoops(t, p, 0)
  556. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  557. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  558. if err != nil {
  559. t.Fatalf("error adding new service: %#v", err)
  560. }
  561. conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  562. if err != nil {
  563. t.Fatalf("error connecting to proxy: %v", err)
  564. }
  565. conn.Close()
  566. waitForNumProxyLoops(t, p, 1)
  567. p.OnServiceDelete(&v1.Service{
  568. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  569. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  570. Name: "p",
  571. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  572. Protocol: "TCP",
  573. }}},
  574. })
  575. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  576. t.Fatalf(err.Error())
  577. }
  578. waitForNumProxyLoops(t, p, 0)
  579. // need to add endpoint here because it got clean up during service delete
  580. lb.OnEndpointsAdd(endpoint)
  581. p.OnServiceAdd(&v1.Service{
  582. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  583. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  584. Name: "p",
  585. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  586. Protocol: "TCP",
  587. }}},
  588. })
  589. svcInfo, exists := p.getServiceInfo(servicePortPortalName)
  590. if !exists {
  591. t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
  592. }
  593. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  594. waitForNumProxyLoops(t, p, 1)
  595. }
  596. func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
  597. lb := NewLoadBalancerRR()
  598. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  599. endpoint := &v1.Endpoints{
  600. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  601. Subsets: []v1.EndpointSubset{{
  602. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  603. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  604. }},
  605. }
  606. lb.OnEndpointsAdd(endpoint)
  607. listenIP := "0.0.0.0"
  608. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  609. if err != nil {
  610. t.Fatal(err)
  611. }
  612. waitForNumProxyLoops(t, p, 0)
  613. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  614. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  615. if err != nil {
  616. t.Fatalf("error adding new service: %#v", err)
  617. }
  618. conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
  619. if err != nil {
  620. t.Fatalf("error connecting to proxy: %v", err)
  621. }
  622. conn.Close()
  623. waitForNumProxyLoops(t, p, 1)
  624. p.OnServiceDelete(&v1.Service{
  625. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  626. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  627. Name: "p",
  628. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  629. Protocol: "UDP",
  630. }}},
  631. })
  632. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  633. t.Fatalf(err.Error())
  634. }
  635. waitForNumProxyLoops(t, p, 0)
  636. // need to add endpoint here because it got clean up during service delete
  637. lb.OnEndpointsAdd(endpoint)
  638. p.OnServiceAdd(&v1.Service{
  639. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  640. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  641. Name: "p",
  642. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  643. Protocol: "UDP",
  644. }}},
  645. })
  646. svcInfo, exists := p.getServiceInfo(servicePortPortalName)
  647. if !exists {
  648. t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
  649. }
  650. testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  651. waitForNumProxyLoops(t, p, 1)
  652. }
  653. func TestTCPProxyUpdatePort(t *testing.T) {
  654. lb := NewLoadBalancerRR()
  655. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  656. lb.OnEndpointsAdd(&v1.Endpoints{
  657. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  658. Subsets: []v1.EndpointSubset{{
  659. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  660. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  661. }},
  662. })
  663. listenIP := "0.0.0.0"
  664. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  665. if err != nil {
  666. t.Fatal(err)
  667. }
  668. waitForNumProxyLoops(t, p, 0)
  669. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  670. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  671. if err != nil {
  672. t.Fatalf("error adding new service: %#v", err)
  673. }
  674. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  675. waitForNumProxyLoops(t, p, 1)
  676. p.OnServiceAdd(&v1.Service{
  677. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  678. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  679. Name: "p",
  680. Port: 0,
  681. Protocol: "TCP",
  682. }}},
  683. })
  684. // Wait for the socket to actually get free.
  685. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  686. t.Fatalf(err.Error())
  687. }
  688. svcInfo, exists := p.getServiceInfo(servicePortPortalName)
  689. if !exists {
  690. t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
  691. }
  692. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  693. // This is a bit async, but this should be sufficient.
  694. time.Sleep(500 * time.Millisecond)
  695. waitForNumProxyLoops(t, p, 1)
  696. }
  697. func TestUDPProxyUpdatePort(t *testing.T) {
  698. lb := NewLoadBalancerRR()
  699. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  700. lb.OnEndpointsAdd(&v1.Endpoints{
  701. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  702. Subsets: []v1.EndpointSubset{{
  703. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  704. Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}},
  705. }},
  706. })
  707. listenIP := "0.0.0.0"
  708. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. waitForNumProxyLoops(t, p, 0)
  713. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  714. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
  715. if err != nil {
  716. t.Fatalf("error adding new service: %#v", err)
  717. }
  718. waitForNumProxyLoops(t, p, 1)
  719. p.OnServiceAdd(&v1.Service{
  720. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  721. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  722. Name: "p",
  723. Port: 0,
  724. Protocol: "UDP",
  725. }}},
  726. })
  727. // Wait for the socket to actually get free.
  728. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  729. t.Fatalf(err.Error())
  730. }
  731. svcInfo, exists := p.getServiceInfo(servicePortPortalName)
  732. if !exists {
  733. t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
  734. }
  735. testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  736. waitForNumProxyLoops(t, p, 1)
  737. }
  738. func TestProxyUpdatePublicIPs(t *testing.T) {
  739. lb := NewLoadBalancerRR()
  740. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  741. lb.OnEndpointsAdd(&v1.Endpoints{
  742. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  743. Subsets: []v1.EndpointSubset{{
  744. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  745. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  746. }},
  747. })
  748. listenIP := "0.0.0.0"
  749. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  750. if err != nil {
  751. t.Fatal(err)
  752. }
  753. waitForNumProxyLoops(t, p, 0)
  754. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  755. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  756. if err != nil {
  757. t.Fatalf("error adding new service: %#v", err)
  758. }
  759. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  760. waitForNumProxyLoops(t, p, 1)
  761. p.OnServiceAdd(&v1.Service{
  762. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  763. Spec: v1.ServiceSpec{
  764. Ports: []v1.ServicePort{{
  765. Name: "p",
  766. Port: int32(svcInfo.portal.port),
  767. Protocol: "TCP",
  768. }},
  769. ClusterIP: svcInfo.portal.ip,
  770. ExternalIPs: []string{"0.0.0.0"},
  771. },
  772. })
  773. // Wait for the socket to actually get free.
  774. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
  775. t.Fatalf(err.Error())
  776. }
  777. svcInfo, exists := p.getServiceInfo(servicePortPortalName)
  778. if !exists {
  779. t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
  780. }
  781. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  782. // This is a bit async, but this should be sufficient.
  783. time.Sleep(500 * time.Millisecond)
  784. waitForNumProxyLoops(t, p, 1)
  785. }
  786. func TestProxyUpdatePortal(t *testing.T) {
  787. lb := NewLoadBalancerRR()
  788. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
  789. endpoint := &v1.Endpoints{
  790. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  791. Subsets: []v1.EndpointSubset{{
  792. Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
  793. Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
  794. }},
  795. }
  796. lb.OnEndpointsAdd(endpoint)
  797. listenIP := "0.0.0.0"
  798. p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
  799. if err != nil {
  800. t.Fatal(err)
  801. }
  802. waitForNumProxyLoops(t, p, 0)
  803. servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
  804. svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
  805. if err != nil {
  806. t.Fatalf("error adding new service: %#v", err)
  807. }
  808. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  809. waitForNumProxyLoops(t, p, 1)
  810. svcv0 := &v1.Service{
  811. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  812. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  813. Name: "p",
  814. Port: int32(svcInfo.portal.port),
  815. Protocol: "TCP",
  816. }}},
  817. }
  818. svcv1 := &v1.Service{
  819. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  820. Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{{
  821. Name: "p",
  822. Port: int32(svcInfo.portal.port),
  823. Protocol: "TCP",
  824. }}},
  825. }
  826. p.OnServiceUpdate(svcv0, svcv1)
  827. _, exists := p.getServiceInfo(servicePortPortalName)
  828. if exists {
  829. t.Fatalf("service with empty ClusterIP should not be included in the proxy")
  830. }
  831. svcv2 := &v1.Service{
  832. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  833. Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{{
  834. Name: "p",
  835. Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
  836. Protocol: "TCP",
  837. }}},
  838. }
  839. p.OnServiceUpdate(svcv1, svcv2)
  840. _, exists = p.getServiceInfo(servicePortPortalName)
  841. if exists {
  842. t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
  843. }
  844. svcv3 := &v1.Service{
  845. ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  846. Spec: v1.ServiceSpec{ClusterIP: listenIP, Ports: []v1.ServicePort{{
  847. Name: "p",
  848. Port: int32(svcInfo.portal.port),
  849. Protocol: "TCP",
  850. }}},
  851. }
  852. p.OnServiceUpdate(svcv2, svcv3)
  853. lb.OnEndpointsAdd(endpoint)
  854. svcInfo, exists = p.getServiceInfo(servicePortPortalName)
  855. if !exists {
  856. t.Fatalf("service with ClusterIP set not found in the proxy")
  857. }
  858. testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
  859. waitForNumProxyLoops(t, p, 1)
  860. }
  861. func TestNoopEndpointSlice(t *testing.T) {
  862. p := Proxier{}
  863. p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
  864. p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{})
  865. p.OnEndpointSliceDelete(&discovery.EndpointSlice{})
  866. p.OnEndpointSlicesSynced()
  867. }
  868. // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in