endpoints_controller_test.go 61 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "fmt"
  16. "net/http"
  17. "net/http/httptest"
  18. "reflect"
  19. "strconv"
  20. "testing"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/util/diff"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. "k8s.io/client-go/informers"
  31. clientset "k8s.io/client-go/kubernetes"
  32. clientscheme "k8s.io/client-go/kubernetes/scheme"
  33. restclient "k8s.io/client-go/rest"
  34. "k8s.io/client-go/tools/cache"
  35. utiltesting "k8s.io/client-go/util/testing"
  36. featuregatetesting "k8s.io/component-base/featuregate/testing"
  37. endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
  38. api "k8s.io/kubernetes/pkg/apis/core"
  39. "k8s.io/kubernetes/pkg/controller"
  40. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  41. "k8s.io/kubernetes/pkg/features"
  42. )
  43. var alwaysReady = func() bool { return true }
  44. var neverReady = func() bool { return false }
  45. var emptyNodeName string
  46. var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
  47. var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
  48. var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
  49. func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
  50. p := &v1.Pod{
  51. TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
  52. ObjectMeta: metav1.ObjectMeta{
  53. Namespace: namespace,
  54. Name: fmt.Sprintf("pod%d", id),
  55. Labels: map[string]string{"foo": "bar"},
  56. },
  57. Spec: v1.PodSpec{
  58. Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
  59. },
  60. Status: v1.PodStatus{
  61. PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
  62. Conditions: []v1.PodCondition{
  63. {
  64. Type: v1.PodReady,
  65. Status: v1.ConditionTrue,
  66. },
  67. },
  68. },
  69. }
  70. if !isReady {
  71. p.Status.Conditions[0].Status = v1.ConditionFalse
  72. }
  73. for j := 0; j < nPorts; j++ {
  74. p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
  75. v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
  76. }
  77. if makeDualstack {
  78. p.Status.PodIPs = []v1.PodIP{
  79. {
  80. IP: p.Status.PodIP,
  81. },
  82. {
  83. IP: fmt.Sprintf("2000::%d", id),
  84. },
  85. }
  86. }
  87. return p
  88. }
  89. func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
  90. for i := 0; i < nPods+nNotReady; i++ {
  91. isReady := i < nPods
  92. pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
  93. store.Add(pod)
  94. }
  95. }
  96. func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
  97. for i := 0; i < nPods; i++ {
  98. p := &v1.Pod{
  99. TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
  100. ObjectMeta: metav1.ObjectMeta{
  101. Namespace: namespace,
  102. Name: fmt.Sprintf("pod%d", i),
  103. Labels: map[string]string{"foo": "bar"},
  104. },
  105. Spec: v1.PodSpec{
  106. RestartPolicy: restartPolicy,
  107. Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
  108. },
  109. Status: v1.PodStatus{
  110. PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
  111. Phase: podPhase,
  112. Conditions: []v1.PodCondition{
  113. {
  114. Type: v1.PodReady,
  115. Status: v1.ConditionFalse,
  116. },
  117. },
  118. },
  119. }
  120. for j := 0; j < nPorts; j++ {
  121. p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
  122. v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
  123. }
  124. store.Add(p)
  125. }
  126. }
  127. func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
  128. fakeEndpointsHandler := utiltesting.FakeHandler{
  129. StatusCode: http.StatusOK,
  130. ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
  131. }
  132. mux := http.NewServeMux()
  133. if namespace == "" {
  134. t.Fatal("namespace cannot be empty")
  135. }
  136. mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
  137. mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
  138. mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
  139. t.Errorf("unexpected request: %v", req.RequestURI)
  140. http.Error(res, "", http.StatusNotFound)
  141. })
  142. return httptest.NewServer(mux), &fakeEndpointsHandler
  143. }
  144. type endpointController struct {
  145. *EndpointController
  146. podStore cache.Store
  147. serviceStore cache.Store
  148. endpointsStore cache.Store
  149. }
  150. func newController(url string, batchPeriod time.Duration) *endpointController {
  151. client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  152. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  153. endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
  154. informerFactory.Core().V1().Endpoints(), client, batchPeriod)
  155. endpoints.podsSynced = alwaysReady
  156. endpoints.servicesSynced = alwaysReady
  157. endpoints.endpointsSynced = alwaysReady
  158. return &endpointController{
  159. endpoints,
  160. informerFactory.Core().V1().Pods().Informer().GetStore(),
  161. informerFactory.Core().V1().Services().Informer().GetStore(),
  162. informerFactory.Core().V1().Endpoints().Informer().GetStore(),
  163. }
  164. }
  165. func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
  166. ns := metav1.NamespaceDefault
  167. testServer, endpointsHandler := makeTestServer(t, ns)
  168. defer testServer.Close()
  169. endpoints := newController(testServer.URL, 0*time.Second)
  170. endpoints.endpointsStore.Add(&v1.Endpoints{
  171. ObjectMeta: metav1.ObjectMeta{
  172. Name: "foo",
  173. Namespace: ns,
  174. ResourceVersion: "1",
  175. },
  176. Subsets: []v1.EndpointSubset{{
  177. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  178. Ports: []v1.EndpointPort{{Port: 1000}},
  179. }},
  180. })
  181. endpoints.serviceStore.Add(&v1.Service{
  182. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  183. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
  184. })
  185. endpoints.syncService(ns + "/foo")
  186. endpointsHandler.ValidateRequestCount(t, 0)
  187. }
  188. func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
  189. ns := metav1.NamespaceDefault
  190. testServer, endpointsHandler := makeTestServer(t, ns)
  191. defer testServer.Close()
  192. endpoints := newController(testServer.URL, 0*time.Second)
  193. endpoints.endpointsStore.Add(&v1.Endpoints{
  194. ObjectMeta: metav1.ObjectMeta{
  195. Name: "foo",
  196. Namespace: ns,
  197. ResourceVersion: "1",
  198. },
  199. Subsets: nil,
  200. })
  201. endpoints.serviceStore.Add(&v1.Service{
  202. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  203. Spec: v1.ServiceSpec{
  204. Selector: map[string]string{"foo": "bar"},
  205. Ports: []v1.ServicePort{{Port: 80}},
  206. },
  207. })
  208. endpoints.syncService(ns + "/foo")
  209. endpointsHandler.ValidateRequestCount(t, 0)
  210. }
  211. func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
  212. ns := metav1.NamespaceDefault
  213. testServer, endpointsHandler := makeTestServer(t, ns)
  214. defer testServer.Close()
  215. endpoints := newController(testServer.URL, 0*time.Second)
  216. endpoints.endpointsStore.Add(&v1.Endpoints{
  217. ObjectMeta: metav1.ObjectMeta{
  218. Name: "foo",
  219. Namespace: ns,
  220. ResourceVersion: "1",
  221. },
  222. Subsets: []v1.EndpointSubset{},
  223. })
  224. endpoints.serviceStore.Add(&v1.Service{
  225. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  226. Spec: v1.ServiceSpec{
  227. Selector: map[string]string{"foo": "bar"},
  228. Ports: []v1.ServicePort{{Port: 80}},
  229. },
  230. })
  231. endpoints.syncService(ns + "/foo")
  232. endpointsHandler.ValidateRequestCount(t, 0)
  233. }
  234. func TestSyncEndpointsNewNoSubsets(t *testing.T) {
  235. ns := metav1.NamespaceDefault
  236. testServer, endpointsHandler := makeTestServer(t, ns)
  237. defer testServer.Close()
  238. endpoints := newController(testServer.URL, 0*time.Second)
  239. endpoints.serviceStore.Add(&v1.Service{
  240. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  241. Spec: v1.ServiceSpec{
  242. Selector: map[string]string{"foo": "bar"},
  243. Ports: []v1.ServicePort{{Port: 80}},
  244. },
  245. })
  246. endpoints.syncService(ns + "/foo")
  247. endpointsHandler.ValidateRequestCount(t, 1)
  248. }
  249. func TestCheckLeftoverEndpoints(t *testing.T) {
  250. ns := metav1.NamespaceDefault
  251. testServer, _ := makeTestServer(t, ns)
  252. defer testServer.Close()
  253. endpoints := newController(testServer.URL, 0*time.Second)
  254. endpoints.endpointsStore.Add(&v1.Endpoints{
  255. ObjectMeta: metav1.ObjectMeta{
  256. Name: "foo",
  257. Namespace: ns,
  258. ResourceVersion: "1",
  259. },
  260. Subsets: []v1.EndpointSubset{{
  261. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  262. Ports: []v1.EndpointPort{{Port: 1000}},
  263. }},
  264. })
  265. endpoints.checkLeftoverEndpoints()
  266. if e, a := 1, endpoints.queue.Len(); e != a {
  267. t.Fatalf("Expected %v, got %v", e, a)
  268. }
  269. got, _ := endpoints.queue.Get()
  270. if e, a := ns+"/foo", got; e != a {
  271. t.Errorf("Expected %v, got %v", e, a)
  272. }
  273. }
  274. func TestSyncEndpointsProtocolTCP(t *testing.T) {
  275. ns := "other"
  276. testServer, endpointsHandler := makeTestServer(t, ns)
  277. defer testServer.Close()
  278. endpoints := newController(testServer.URL, 0*time.Second)
  279. endpoints.endpointsStore.Add(&v1.Endpoints{
  280. ObjectMeta: metav1.ObjectMeta{
  281. Name: "foo",
  282. Namespace: ns,
  283. ResourceVersion: "1",
  284. },
  285. Subsets: []v1.EndpointSubset{{
  286. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  287. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  288. }},
  289. })
  290. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  291. endpoints.serviceStore.Add(&v1.Service{
  292. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  293. Spec: v1.ServiceSpec{
  294. Selector: map[string]string{},
  295. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  296. },
  297. })
  298. endpoints.syncService(ns + "/foo")
  299. endpointsHandler.ValidateRequestCount(t, 1)
  300. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  301. ObjectMeta: metav1.ObjectMeta{
  302. Name: "foo",
  303. Namespace: ns,
  304. ResourceVersion: "1",
  305. Labels: map[string]string{
  306. v1.IsHeadlessService: "",
  307. },
  308. },
  309. Subsets: []v1.EndpointSubset{{
  310. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  311. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  312. }},
  313. })
  314. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  315. }
  316. func TestSyncEndpointsProtocolUDP(t *testing.T) {
  317. ns := "other"
  318. testServer, endpointsHandler := makeTestServer(t, ns)
  319. defer testServer.Close()
  320. endpoints := newController(testServer.URL, 0*time.Second)
  321. endpoints.endpointsStore.Add(&v1.Endpoints{
  322. ObjectMeta: metav1.ObjectMeta{
  323. Name: "foo",
  324. Namespace: ns,
  325. ResourceVersion: "1",
  326. },
  327. Subsets: []v1.EndpointSubset{{
  328. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  329. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
  330. }},
  331. })
  332. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  333. endpoints.serviceStore.Add(&v1.Service{
  334. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  335. Spec: v1.ServiceSpec{
  336. Selector: map[string]string{},
  337. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
  338. },
  339. })
  340. endpoints.syncService(ns + "/foo")
  341. endpointsHandler.ValidateRequestCount(t, 1)
  342. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  343. ObjectMeta: metav1.ObjectMeta{
  344. Name: "foo",
  345. Namespace: ns,
  346. ResourceVersion: "1",
  347. Labels: map[string]string{
  348. v1.IsHeadlessService: "",
  349. },
  350. },
  351. Subsets: []v1.EndpointSubset{{
  352. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  353. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
  354. }},
  355. })
  356. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  357. }
  358. func TestSyncEndpointsProtocolSCTP(t *testing.T) {
  359. ns := "other"
  360. testServer, endpointsHandler := makeTestServer(t, ns)
  361. defer testServer.Close()
  362. endpoints := newController(testServer.URL, 0*time.Second)
  363. endpoints.endpointsStore.Add(&v1.Endpoints{
  364. ObjectMeta: metav1.ObjectMeta{
  365. Name: "foo",
  366. Namespace: ns,
  367. ResourceVersion: "1",
  368. },
  369. Subsets: []v1.EndpointSubset{{
  370. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  371. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
  372. }},
  373. })
  374. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  375. endpoints.serviceStore.Add(&v1.Service{
  376. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  377. Spec: v1.ServiceSpec{
  378. Selector: map[string]string{},
  379. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
  380. },
  381. })
  382. endpoints.syncService(ns + "/foo")
  383. endpointsHandler.ValidateRequestCount(t, 1)
  384. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  385. ObjectMeta: metav1.ObjectMeta{
  386. Name: "foo",
  387. Namespace: ns,
  388. ResourceVersion: "1",
  389. Labels: map[string]string{
  390. v1.IsHeadlessService: "",
  391. },
  392. },
  393. Subsets: []v1.EndpointSubset{{
  394. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  395. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
  396. }},
  397. })
  398. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  399. }
  400. func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
  401. ns := "other"
  402. testServer, endpointsHandler := makeTestServer(t, ns)
  403. defer testServer.Close()
  404. endpoints := newController(testServer.URL, 0*time.Second)
  405. endpoints.endpointsStore.Add(&v1.Endpoints{
  406. ObjectMeta: metav1.ObjectMeta{
  407. Name: "foo",
  408. Namespace: ns,
  409. ResourceVersion: "1",
  410. },
  411. Subsets: []v1.EndpointSubset{},
  412. })
  413. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  414. endpoints.serviceStore.Add(&v1.Service{
  415. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  416. Spec: v1.ServiceSpec{
  417. Selector: map[string]string{},
  418. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  419. },
  420. })
  421. endpoints.syncService(ns + "/foo")
  422. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  423. ObjectMeta: metav1.ObjectMeta{
  424. Name: "foo",
  425. Namespace: ns,
  426. ResourceVersion: "1",
  427. Labels: map[string]string{
  428. v1.IsHeadlessService: "",
  429. },
  430. },
  431. Subsets: []v1.EndpointSubset{{
  432. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  433. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  434. }},
  435. })
  436. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  437. }
  438. func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
  439. ns := "other"
  440. testServer, endpointsHandler := makeTestServer(t, ns)
  441. defer testServer.Close()
  442. endpoints := newController(testServer.URL, 0*time.Second)
  443. endpoints.endpointsStore.Add(&v1.Endpoints{
  444. ObjectMeta: metav1.ObjectMeta{
  445. Name: "foo",
  446. Namespace: ns,
  447. ResourceVersion: "1",
  448. },
  449. Subsets: []v1.EndpointSubset{},
  450. })
  451. addPods(endpoints.podStore, ns, 0, 1, 1, false)
  452. endpoints.serviceStore.Add(&v1.Service{
  453. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  454. Spec: v1.ServiceSpec{
  455. Selector: map[string]string{},
  456. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  457. },
  458. })
  459. endpoints.syncService(ns + "/foo")
  460. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  461. ObjectMeta: metav1.ObjectMeta{
  462. Name: "foo",
  463. Namespace: ns,
  464. ResourceVersion: "1",
  465. Labels: map[string]string{
  466. v1.IsHeadlessService: "",
  467. },
  468. },
  469. Subsets: []v1.EndpointSubset{{
  470. NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  471. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  472. }},
  473. })
  474. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  475. }
  476. func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
  477. ns := "other"
  478. testServer, endpointsHandler := makeTestServer(t, ns)
  479. defer testServer.Close()
  480. endpoints := newController(testServer.URL, 0*time.Second)
  481. endpoints.endpointsStore.Add(&v1.Endpoints{
  482. ObjectMeta: metav1.ObjectMeta{
  483. Name: "foo",
  484. Namespace: ns,
  485. ResourceVersion: "1",
  486. },
  487. Subsets: []v1.EndpointSubset{},
  488. })
  489. addPods(endpoints.podStore, ns, 1, 1, 1, false)
  490. endpoints.serviceStore.Add(&v1.Service{
  491. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  492. Spec: v1.ServiceSpec{
  493. Selector: map[string]string{},
  494. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  495. },
  496. })
  497. endpoints.syncService(ns + "/foo")
  498. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  499. ObjectMeta: metav1.ObjectMeta{
  500. Name: "foo",
  501. Namespace: ns,
  502. ResourceVersion: "1",
  503. Labels: map[string]string{
  504. v1.IsHeadlessService: "",
  505. },
  506. },
  507. Subsets: []v1.EndpointSubset{{
  508. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  509. NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
  510. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  511. }},
  512. })
  513. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  514. }
  515. func TestSyncEndpointsItemsPreexisting(t *testing.T) {
  516. ns := "bar"
  517. testServer, endpointsHandler := makeTestServer(t, ns)
  518. defer testServer.Close()
  519. endpoints := newController(testServer.URL, 0*time.Second)
  520. endpoints.endpointsStore.Add(&v1.Endpoints{
  521. ObjectMeta: metav1.ObjectMeta{
  522. Name: "foo",
  523. Namespace: ns,
  524. ResourceVersion: "1",
  525. },
  526. Subsets: []v1.EndpointSubset{{
  527. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  528. Ports: []v1.EndpointPort{{Port: 1000}},
  529. }},
  530. })
  531. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  532. endpoints.serviceStore.Add(&v1.Service{
  533. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  534. Spec: v1.ServiceSpec{
  535. Selector: map[string]string{"foo": "bar"},
  536. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  537. },
  538. })
  539. endpoints.syncService(ns + "/foo")
  540. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  541. ObjectMeta: metav1.ObjectMeta{
  542. Name: "foo",
  543. Namespace: ns,
  544. ResourceVersion: "1",
  545. Labels: map[string]string{
  546. v1.IsHeadlessService: "",
  547. },
  548. },
  549. Subsets: []v1.EndpointSubset{{
  550. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  551. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  552. }},
  553. })
  554. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  555. }
  556. func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
  557. ns := metav1.NamespaceDefault
  558. testServer, endpointsHandler := makeTestServer(t, ns)
  559. defer testServer.Close()
  560. endpoints := newController(testServer.URL, 0*time.Second)
  561. endpoints.endpointsStore.Add(&v1.Endpoints{
  562. ObjectMeta: metav1.ObjectMeta{
  563. ResourceVersion: "1",
  564. Name: "foo",
  565. Namespace: ns,
  566. },
  567. Subsets: []v1.EndpointSubset{{
  568. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  569. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  570. }},
  571. })
  572. addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
  573. endpoints.serviceStore.Add(&v1.Service{
  574. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
  575. Spec: v1.ServiceSpec{
  576. Selector: map[string]string{"foo": "bar"},
  577. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  578. },
  579. })
  580. endpoints.syncService(ns + "/foo")
  581. endpointsHandler.ValidateRequestCount(t, 0)
  582. }
  583. func TestSyncEndpointsItems(t *testing.T) {
  584. ns := "other"
  585. testServer, endpointsHandler := makeTestServer(t, ns)
  586. defer testServer.Close()
  587. endpoints := newController(testServer.URL, 0*time.Second)
  588. addPods(endpoints.podStore, ns, 3, 2, 0, false)
  589. addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found!
  590. endpoints.serviceStore.Add(&v1.Service{
  591. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  592. Spec: v1.ServiceSpec{
  593. Selector: map[string]string{"foo": "bar"},
  594. Ports: []v1.ServicePort{
  595. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  596. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  597. },
  598. },
  599. })
  600. endpoints.syncService("other/foo")
  601. expectedSubsets := []v1.EndpointSubset{{
  602. Addresses: []v1.EndpointAddress{
  603. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  604. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  605. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  606. },
  607. Ports: []v1.EndpointPort{
  608. {Name: "port0", Port: 8080, Protocol: "TCP"},
  609. {Name: "port1", Port: 8088, Protocol: "TCP"},
  610. },
  611. }}
  612. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  613. ObjectMeta: metav1.ObjectMeta{
  614. ResourceVersion: "",
  615. Name: "foo",
  616. Labels: map[string]string{
  617. v1.IsHeadlessService: "",
  618. },
  619. },
  620. Subsets: endptspkg.SortSubsets(expectedSubsets),
  621. })
  622. endpointsHandler.ValidateRequestCount(t, 1)
  623. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
  624. }
  625. func TestSyncEndpointsItemsWithLabels(t *testing.T) {
  626. ns := "other"
  627. testServer, endpointsHandler := makeTestServer(t, ns)
  628. defer testServer.Close()
  629. endpoints := newController(testServer.URL, 0*time.Second)
  630. addPods(endpoints.podStore, ns, 3, 2, 0, false)
  631. serviceLabels := map[string]string{"foo": "bar"}
  632. endpoints.serviceStore.Add(&v1.Service{
  633. ObjectMeta: metav1.ObjectMeta{
  634. Name: "foo",
  635. Namespace: ns,
  636. Labels: serviceLabels,
  637. },
  638. Spec: v1.ServiceSpec{
  639. Selector: map[string]string{"foo": "bar"},
  640. Ports: []v1.ServicePort{
  641. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  642. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  643. },
  644. },
  645. })
  646. endpoints.syncService(ns + "/foo")
  647. expectedSubsets := []v1.EndpointSubset{{
  648. Addresses: []v1.EndpointAddress{
  649. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  650. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  651. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  652. },
  653. Ports: []v1.EndpointPort{
  654. {Name: "port0", Port: 8080, Protocol: "TCP"},
  655. {Name: "port1", Port: 8088, Protocol: "TCP"},
  656. },
  657. }}
  658. serviceLabels[v1.IsHeadlessService] = ""
  659. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  660. ObjectMeta: metav1.ObjectMeta{
  661. ResourceVersion: "",
  662. Name: "foo",
  663. Labels: serviceLabels,
  664. },
  665. Subsets: endptspkg.SortSubsets(expectedSubsets),
  666. })
  667. endpointsHandler.ValidateRequestCount(t, 1)
  668. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
  669. }
  670. func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
  671. ns := "bar"
  672. testServer, endpointsHandler := makeTestServer(t, ns)
  673. defer testServer.Close()
  674. endpoints := newController(testServer.URL, 0*time.Second)
  675. endpoints.endpointsStore.Add(&v1.Endpoints{
  676. ObjectMeta: metav1.ObjectMeta{
  677. Name: "foo",
  678. Namespace: ns,
  679. ResourceVersion: "1",
  680. Labels: map[string]string{
  681. "foo": "bar",
  682. },
  683. },
  684. Subsets: []v1.EndpointSubset{{
  685. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  686. Ports: []v1.EndpointPort{{Port: 1000}},
  687. }},
  688. })
  689. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  690. serviceLabels := map[string]string{"baz": "blah"}
  691. endpoints.serviceStore.Add(&v1.Service{
  692. ObjectMeta: metav1.ObjectMeta{
  693. Name: "foo",
  694. Namespace: ns,
  695. Labels: serviceLabels,
  696. },
  697. Spec: v1.ServiceSpec{
  698. Selector: map[string]string{"foo": "bar"},
  699. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  700. },
  701. })
  702. endpoints.syncService(ns + "/foo")
  703. serviceLabels[v1.IsHeadlessService] = ""
  704. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  705. ObjectMeta: metav1.ObjectMeta{
  706. Name: "foo",
  707. Namespace: ns,
  708. ResourceVersion: "1",
  709. Labels: serviceLabels,
  710. },
  711. Subsets: []v1.EndpointSubset{{
  712. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  713. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  714. }},
  715. })
  716. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  717. }
  718. func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
  719. var tests = []struct {
  720. podsSynced func() bool
  721. servicesSynced func() bool
  722. endpointsSynced func() bool
  723. shouldUpdateEndpoints bool
  724. }{
  725. {neverReady, alwaysReady, alwaysReady, false},
  726. {alwaysReady, neverReady, alwaysReady, false},
  727. {alwaysReady, alwaysReady, neverReady, false},
  728. {alwaysReady, alwaysReady, alwaysReady, true},
  729. }
  730. for _, test := range tests {
  731. func() {
  732. ns := "other"
  733. testServer, endpointsHandler := makeTestServer(t, ns)
  734. defer testServer.Close()
  735. endpoints := newController(testServer.URL, 0*time.Second)
  736. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  737. service := &v1.Service{
  738. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  739. Spec: v1.ServiceSpec{
  740. Selector: map[string]string{},
  741. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  742. },
  743. }
  744. endpoints.serviceStore.Add(service)
  745. endpoints.onServiceUpdate(service)
  746. endpoints.podsSynced = test.podsSynced
  747. endpoints.servicesSynced = test.servicesSynced
  748. endpoints.endpointsSynced = test.endpointsSynced
  749. endpoints.workerLoopPeriod = 10 * time.Millisecond
  750. stopCh := make(chan struct{})
  751. defer close(stopCh)
  752. go endpoints.Run(1, stopCh)
  753. // cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
  754. // To ensure we get all updates, including unexpected ones, we need to wait at least as long as
  755. // a single cache sync period and worker period, with some fudge room.
  756. time.Sleep(150 * time.Millisecond)
  757. if test.shouldUpdateEndpoints {
  758. // Ensure the work queue has been processed by looping for up to a second to prevent flakes.
  759. wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
  760. return endpoints.queue.Len() == 0, nil
  761. })
  762. endpointsHandler.ValidateRequestCount(t, 1)
  763. } else {
  764. endpointsHandler.ValidateRequestCount(t, 0)
  765. }
  766. }()
  767. }
  768. }
  769. func TestSyncEndpointsHeadlessService(t *testing.T) {
  770. ns := "headless"
  771. testServer, endpointsHandler := makeTestServer(t, ns)
  772. defer testServer.Close()
  773. endpoints := newController(testServer.URL, 0*time.Second)
  774. endpoints.endpointsStore.Add(&v1.Endpoints{
  775. ObjectMeta: metav1.ObjectMeta{
  776. Name: "foo",
  777. Namespace: ns,
  778. ResourceVersion: "1",
  779. },
  780. Subsets: []v1.EndpointSubset{{
  781. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  782. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  783. }},
  784. })
  785. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  786. service := &v1.Service{
  787. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
  788. Spec: v1.ServiceSpec{
  789. Selector: map[string]string{},
  790. ClusterIP: api.ClusterIPNone,
  791. Ports: []v1.ServicePort{},
  792. },
  793. }
  794. originalService := service.DeepCopy()
  795. endpoints.serviceStore.Add(service)
  796. endpoints.syncService(ns + "/foo")
  797. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  798. ObjectMeta: metav1.ObjectMeta{
  799. Name: "foo",
  800. Namespace: ns,
  801. ResourceVersion: "1",
  802. Labels: map[string]string{
  803. "a": "b",
  804. v1.IsHeadlessService: "",
  805. },
  806. },
  807. Subsets: []v1.EndpointSubset{{
  808. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  809. Ports: []v1.EndpointPort{},
  810. }},
  811. })
  812. if !reflect.DeepEqual(originalService, service) {
  813. t.Fatalf("syncing endpoints changed service: %s", diff.ObjectReflectDiff(service, originalService))
  814. }
  815. endpointsHandler.ValidateRequestCount(t, 1)
  816. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  817. }
  818. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
  819. ns := "other"
  820. testServer, endpointsHandler := makeTestServer(t, ns)
  821. defer testServer.Close()
  822. endpoints := newController(testServer.URL, 0*time.Second)
  823. endpoints.endpointsStore.Add(&v1.Endpoints{
  824. ObjectMeta: metav1.ObjectMeta{
  825. Name: "foo",
  826. Namespace: ns,
  827. ResourceVersion: "1",
  828. Labels: map[string]string{
  829. "foo": "bar",
  830. },
  831. },
  832. Subsets: []v1.EndpointSubset{},
  833. })
  834. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
  835. endpoints.serviceStore.Add(&v1.Service{
  836. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  837. Spec: v1.ServiceSpec{
  838. Selector: map[string]string{"foo": "bar"},
  839. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  840. },
  841. })
  842. endpoints.syncService(ns + "/foo")
  843. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  844. ObjectMeta: metav1.ObjectMeta{
  845. Name: "foo",
  846. Namespace: ns,
  847. ResourceVersion: "1",
  848. Labels: map[string]string{
  849. v1.IsHeadlessService: "",
  850. },
  851. },
  852. Subsets: []v1.EndpointSubset{},
  853. })
  854. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  855. }
  856. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
  857. ns := "other"
  858. testServer, endpointsHandler := makeTestServer(t, ns)
  859. defer testServer.Close()
  860. endpoints := newController(testServer.URL, 0*time.Second)
  861. endpoints.endpointsStore.Add(&v1.Endpoints{
  862. ObjectMeta: metav1.ObjectMeta{
  863. Name: "foo",
  864. Namespace: ns,
  865. ResourceVersion: "1",
  866. Labels: map[string]string{
  867. "foo": "bar",
  868. },
  869. },
  870. Subsets: []v1.EndpointSubset{},
  871. })
  872. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
  873. endpoints.serviceStore.Add(&v1.Service{
  874. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  875. Spec: v1.ServiceSpec{
  876. Selector: map[string]string{"foo": "bar"},
  877. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  878. },
  879. })
  880. endpoints.syncService(ns + "/foo")
  881. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  882. ObjectMeta: metav1.ObjectMeta{
  883. Name: "foo",
  884. Namespace: ns,
  885. ResourceVersion: "1",
  886. Labels: map[string]string{
  887. v1.IsHeadlessService: "",
  888. },
  889. },
  890. Subsets: []v1.EndpointSubset{},
  891. })
  892. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  893. }
  894. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
  895. ns := "other"
  896. testServer, endpointsHandler := makeTestServer(t, ns)
  897. defer testServer.Close()
  898. endpoints := newController(testServer.URL, 0*time.Second)
  899. endpoints.endpointsStore.Add(&v1.Endpoints{
  900. ObjectMeta: metav1.ObjectMeta{
  901. Name: "foo",
  902. Namespace: ns,
  903. ResourceVersion: "1",
  904. Labels: map[string]string{
  905. "foo": "bar",
  906. },
  907. },
  908. Subsets: []v1.EndpointSubset{},
  909. })
  910. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
  911. endpoints.serviceStore.Add(&v1.Service{
  912. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  913. Spec: v1.ServiceSpec{
  914. Selector: map[string]string{"foo": "bar"},
  915. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  916. },
  917. })
  918. endpoints.syncService(ns + "/foo")
  919. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  920. ObjectMeta: metav1.ObjectMeta{
  921. Name: "foo",
  922. Namespace: ns,
  923. ResourceVersion: "1",
  924. Labels: map[string]string{
  925. v1.IsHeadlessService: "",
  926. },
  927. },
  928. Subsets: []v1.EndpointSubset{},
  929. })
  930. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  931. }
  932. func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
  933. ns := metav1.NamespaceDefault
  934. testServer, endpointsHandler := makeTestServer(t, ns)
  935. defer testServer.Close()
  936. endpoints := newController(testServer.URL, 0*time.Second)
  937. endpoints.serviceStore.Add(&v1.Service{
  938. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  939. Spec: v1.ServiceSpec{
  940. Selector: map[string]string{"foo": "bar"},
  941. ClusterIP: "None",
  942. Ports: nil,
  943. },
  944. })
  945. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  946. endpoints.syncService(ns + "/foo")
  947. endpointsHandler.ValidateRequestCount(t, 1)
  948. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  949. ObjectMeta: metav1.ObjectMeta{
  950. Name: "foo",
  951. Labels: map[string]string{
  952. v1.IsHeadlessService: "",
  953. },
  954. },
  955. Subsets: []v1.EndpointSubset{{
  956. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  957. Ports: nil,
  958. }},
  959. })
  960. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
  961. }
  962. // There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
  963. // Just list all of the 3 false cases and 3 of the 12 true cases.
  964. func TestShouldPodBeInEndpoints(t *testing.T) {
  965. testCases := []struct {
  966. name string
  967. pod *v1.Pod
  968. expected bool
  969. }{
  970. // Pod should not be in endpoints cases:
  971. {
  972. name: "Failed pod with Never RestartPolicy",
  973. pod: &v1.Pod{
  974. Spec: v1.PodSpec{
  975. RestartPolicy: v1.RestartPolicyNever,
  976. },
  977. Status: v1.PodStatus{
  978. Phase: v1.PodFailed,
  979. },
  980. },
  981. expected: false,
  982. },
  983. {
  984. name: "Succeeded pod with Never RestartPolicy",
  985. pod: &v1.Pod{
  986. Spec: v1.PodSpec{
  987. RestartPolicy: v1.RestartPolicyNever,
  988. },
  989. Status: v1.PodStatus{
  990. Phase: v1.PodSucceeded,
  991. },
  992. },
  993. expected: false,
  994. },
  995. {
  996. name: "Succeeded pod with OnFailure RestartPolicy",
  997. pod: &v1.Pod{
  998. Spec: v1.PodSpec{
  999. RestartPolicy: v1.RestartPolicyOnFailure,
  1000. },
  1001. Status: v1.PodStatus{
  1002. Phase: v1.PodSucceeded,
  1003. },
  1004. },
  1005. expected: false,
  1006. },
  1007. // Pod should be in endpoints cases:
  1008. {
  1009. name: "Failed pod with Always RestartPolicy",
  1010. pod: &v1.Pod{
  1011. Spec: v1.PodSpec{
  1012. RestartPolicy: v1.RestartPolicyAlways,
  1013. },
  1014. Status: v1.PodStatus{
  1015. Phase: v1.PodFailed,
  1016. },
  1017. },
  1018. expected: true,
  1019. },
  1020. {
  1021. name: "Pending pod with Never RestartPolicy",
  1022. pod: &v1.Pod{
  1023. Spec: v1.PodSpec{
  1024. RestartPolicy: v1.RestartPolicyNever,
  1025. },
  1026. Status: v1.PodStatus{
  1027. Phase: v1.PodPending,
  1028. },
  1029. },
  1030. expected: true,
  1031. },
  1032. {
  1033. name: "Unknown pod with OnFailure RestartPolicy",
  1034. pod: &v1.Pod{
  1035. Spec: v1.PodSpec{
  1036. RestartPolicy: v1.RestartPolicyOnFailure,
  1037. },
  1038. Status: v1.PodStatus{
  1039. Phase: v1.PodUnknown,
  1040. },
  1041. },
  1042. expected: true,
  1043. },
  1044. }
  1045. for _, test := range testCases {
  1046. result := shouldPodBeInEndpoints(test.pod)
  1047. if result != test.expected {
  1048. t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
  1049. }
  1050. }
  1051. }
  1052. func TestPodToEndpointAddressForService(t *testing.T) {
  1053. testCases := []struct {
  1054. name string
  1055. expectedEndPointIP string
  1056. enableDualStack bool
  1057. expectError bool
  1058. enableDualStackPod bool
  1059. service v1.Service
  1060. }{
  1061. {
  1062. name: "v4 service, in a single stack cluster",
  1063. expectedEndPointIP: "1.2.3.4",
  1064. enableDualStack: false,
  1065. expectError: false,
  1066. enableDualStackPod: false,
  1067. service: v1.Service{
  1068. Spec: v1.ServiceSpec{
  1069. ClusterIP: "10.0.0.1",
  1070. },
  1071. },
  1072. },
  1073. {
  1074. name: "v4 service, in a dual stack cluster",
  1075. expectedEndPointIP: "1.2.3.4",
  1076. enableDualStack: true,
  1077. expectError: false,
  1078. enableDualStackPod: true,
  1079. service: v1.Service{
  1080. Spec: v1.ServiceSpec{
  1081. ClusterIP: "10.0.0.1",
  1082. },
  1083. },
  1084. },
  1085. {
  1086. name: "v6 service, in a dual stack cluster. dual stack enabled",
  1087. expectedEndPointIP: "2000::0",
  1088. enableDualStack: true,
  1089. expectError: false,
  1090. enableDualStackPod: true,
  1091. service: v1.Service{
  1092. Spec: v1.ServiceSpec{
  1093. ClusterIP: "3000::1",
  1094. },
  1095. },
  1096. },
  1097. // in reality this is a misconfigured cluster
  1098. // i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
  1099. // we are testing that we will keep producing the expected behavior
  1100. {
  1101. name: "v6 service, in a v4 only cluster. dual stack disabled",
  1102. expectedEndPointIP: "1.2.3.4",
  1103. enableDualStack: false,
  1104. expectError: false,
  1105. enableDualStackPod: false,
  1106. service: v1.Service{
  1107. Spec: v1.ServiceSpec{
  1108. ClusterIP: "3000::1",
  1109. },
  1110. },
  1111. },
  1112. {
  1113. name: "v6 service, in a v4 only cluster - dual stack enabled",
  1114. expectedEndPointIP: "1.2.3.4",
  1115. enableDualStack: true,
  1116. expectError: true,
  1117. enableDualStackPod: false,
  1118. service: v1.Service{
  1119. Spec: v1.ServiceSpec{
  1120. ClusterIP: "3000::1",
  1121. },
  1122. },
  1123. },
  1124. }
  1125. for _, tc := range testCases {
  1126. t.Run(tc.name, func(t *testing.T) {
  1127. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
  1128. podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  1129. ns := "test"
  1130. addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
  1131. pods := podStore.List()
  1132. if len(pods) != 1 {
  1133. t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
  1134. }
  1135. pod := pods[0].(*v1.Pod)
  1136. epa, err := podToEndpointAddressForService(&tc.service, pod)
  1137. if err != nil && !tc.expectError {
  1138. t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
  1139. }
  1140. if err == nil && tc.expectError {
  1141. t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
  1142. }
  1143. if err != nil && tc.expectError {
  1144. return
  1145. }
  1146. if epa.IP != tc.expectedEndPointIP {
  1147. t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
  1148. }
  1149. if *(epa.NodeName) != pod.Spec.NodeName {
  1150. t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
  1151. }
  1152. if epa.TargetRef.Kind != "Pod" {
  1153. t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
  1154. }
  1155. if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
  1156. t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
  1157. }
  1158. if epa.TargetRef.Name != pod.ObjectMeta.Name {
  1159. t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
  1160. }
  1161. if epa.TargetRef.UID != pod.ObjectMeta.UID {
  1162. t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
  1163. }
  1164. if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
  1165. t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
  1166. }
  1167. })
  1168. }
  1169. }
  1170. func TestPodToEndpointAddress(t *testing.T) {
  1171. podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  1172. ns := "test"
  1173. addPods(podStore, ns, 1, 1, 0, false)
  1174. pods := podStore.List()
  1175. if len(pods) != 1 {
  1176. t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
  1177. return
  1178. }
  1179. pod := pods[0].(*v1.Pod)
  1180. epa := podToEndpointAddress(pod)
  1181. if epa.IP != pod.Status.PodIP {
  1182. t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
  1183. }
  1184. if *(epa.NodeName) != pod.Spec.NodeName {
  1185. t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
  1186. }
  1187. if epa.TargetRef.Kind != "Pod" {
  1188. t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
  1189. }
  1190. if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
  1191. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
  1192. }
  1193. if epa.TargetRef.Name != pod.ObjectMeta.Name {
  1194. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
  1195. }
  1196. if epa.TargetRef.UID != pod.ObjectMeta.UID {
  1197. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
  1198. }
  1199. if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
  1200. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
  1201. }
  1202. }
  1203. func TestPodChanged(t *testing.T) {
  1204. podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  1205. ns := "test"
  1206. addPods(podStore, ns, 1, 1, 0, false)
  1207. pods := podStore.List()
  1208. if len(pods) != 1 {
  1209. t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
  1210. return
  1211. }
  1212. oldPod := pods[0].(*v1.Pod)
  1213. newPod := oldPod.DeepCopy()
  1214. if podChangedHelper(oldPod, newPod, endpointChanged) {
  1215. t.Errorf("Expected pod to be unchanged for copied pod")
  1216. }
  1217. newPod.Spec.NodeName = "changed"
  1218. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1219. t.Errorf("Expected pod to be changed for pod with NodeName changed")
  1220. }
  1221. newPod.Spec.NodeName = oldPod.Spec.NodeName
  1222. newPod.ObjectMeta.ResourceVersion = "changed"
  1223. if podChangedHelper(oldPod, newPod, endpointChanged) {
  1224. t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
  1225. }
  1226. newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
  1227. newPod.Status.PodIP = "1.2.3.1"
  1228. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1229. t.Errorf("Expected pod to be changed with pod IP address change")
  1230. }
  1231. newPod.Status.PodIP = oldPod.Status.PodIP
  1232. /* dual stack tests */
  1233. // primary changes, because changing IPs is done by changing sandbox
  1234. // case 1: add new secondrary IP
  1235. newPod.Status.PodIP = "1.1.3.1"
  1236. newPod.Status.PodIPs = []v1.PodIP{
  1237. {
  1238. IP: "1.1.3.1",
  1239. },
  1240. {
  1241. IP: "2000::1",
  1242. },
  1243. }
  1244. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1245. t.Errorf("Expected pod to be changed with adding secondary IP")
  1246. }
  1247. // reset
  1248. newPod.Status.PodIPs = nil
  1249. newPod.Status.PodIP = oldPod.Status.PodIP
  1250. // case 2: removing a secondary IP
  1251. saved := oldPod.Status.PodIP
  1252. oldPod.Status.PodIP = "1.1.3.1"
  1253. oldPod.Status.PodIPs = []v1.PodIP{
  1254. {
  1255. IP: "1.1.3.1",
  1256. },
  1257. {
  1258. IP: "2000::1",
  1259. },
  1260. }
  1261. newPod.Status.PodIP = "1.2.3.4"
  1262. newPod.Status.PodIPs = []v1.PodIP{
  1263. {
  1264. IP: "1.2.3.4",
  1265. },
  1266. }
  1267. // reset
  1268. oldPod.Status.PodIPs = nil
  1269. newPod.Status.PodIPs = nil
  1270. oldPod.Status.PodIP = saved
  1271. newPod.Status.PodIP = saved
  1272. // case 3: change secondary
  1273. // case 2: removing a secondary IP
  1274. saved = oldPod.Status.PodIP
  1275. oldPod.Status.PodIP = "1.1.3.1"
  1276. oldPod.Status.PodIPs = []v1.PodIP{
  1277. {
  1278. IP: "1.1.3.1",
  1279. },
  1280. {
  1281. IP: "2000::1",
  1282. },
  1283. }
  1284. newPod.Status.PodIP = "1.2.3.4"
  1285. newPod.Status.PodIPs = []v1.PodIP{
  1286. {
  1287. IP: "1.2.3.4",
  1288. },
  1289. {
  1290. IP: "2000::2",
  1291. },
  1292. }
  1293. // reset
  1294. oldPod.Status.PodIPs = nil
  1295. newPod.Status.PodIPs = nil
  1296. oldPod.Status.PodIP = saved
  1297. newPod.Status.PodIP = saved
  1298. /* end dual stack testing */
  1299. newPod.ObjectMeta.Name = "wrong-name"
  1300. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1301. t.Errorf("Expected pod to be changed with pod name change")
  1302. }
  1303. newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
  1304. saveConditions := oldPod.Status.Conditions
  1305. oldPod.Status.Conditions = nil
  1306. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1307. t.Errorf("Expected pod to be changed with pod readiness change")
  1308. }
  1309. oldPod.Status.Conditions = saveConditions
  1310. now := metav1.NewTime(time.Now().UTC())
  1311. newPod.ObjectMeta.DeletionTimestamp = &now
  1312. if !podChangedHelper(oldPod, newPod, endpointChanged) {
  1313. t.Errorf("Expected pod to be changed with DeletionTimestamp change")
  1314. }
  1315. newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
  1316. }
  1317. func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
  1318. ns := "other"
  1319. testServer, endpointsHandler := makeTestServer(t, ns)
  1320. defer testServer.Close()
  1321. endpoints := newController(testServer.URL, 0*time.Second)
  1322. endpoints.endpointsStore.Add(&v1.Endpoints{
  1323. ObjectMeta: metav1.ObjectMeta{
  1324. Name: "foo",
  1325. Namespace: ns,
  1326. ResourceVersion: "1",
  1327. },
  1328. Subsets: []v1.EndpointSubset{{
  1329. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1330. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1331. }},
  1332. })
  1333. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  1334. endpoints.serviceStore.Add(&v1.Service{
  1335. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1336. Spec: v1.ServiceSpec{
  1337. Selector: map[string]string{},
  1338. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1339. },
  1340. })
  1341. endpoints.syncService(ns + "/foo")
  1342. endpointsHandler.ValidateRequestCount(t, 1)
  1343. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1344. ObjectMeta: metav1.ObjectMeta{
  1345. Name: "foo",
  1346. Namespace: ns,
  1347. ResourceVersion: "1",
  1348. Annotations: map[string]string{
  1349. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1350. },
  1351. Labels: map[string]string{
  1352. v1.IsHeadlessService: "",
  1353. },
  1354. },
  1355. Subsets: []v1.EndpointSubset{{
  1356. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1357. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1358. }},
  1359. })
  1360. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1361. }
  1362. func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
  1363. ns := "other"
  1364. testServer, endpointsHandler := makeTestServer(t, ns)
  1365. defer testServer.Close()
  1366. endpoints := newController(testServer.URL, 0*time.Second)
  1367. endpoints.endpointsStore.Add(&v1.Endpoints{
  1368. ObjectMeta: metav1.ObjectMeta{
  1369. Name: "foo",
  1370. Namespace: ns,
  1371. ResourceVersion: "1",
  1372. Annotations: map[string]string{
  1373. v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
  1374. },
  1375. },
  1376. Subsets: []v1.EndpointSubset{{
  1377. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1378. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1379. }},
  1380. })
  1381. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  1382. endpoints.serviceStore.Add(&v1.Service{
  1383. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1384. Spec: v1.ServiceSpec{
  1385. Selector: map[string]string{},
  1386. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1387. },
  1388. })
  1389. endpoints.syncService(ns + "/foo")
  1390. endpointsHandler.ValidateRequestCount(t, 1)
  1391. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1392. ObjectMeta: metav1.ObjectMeta{
  1393. Name: "foo",
  1394. Namespace: ns,
  1395. ResourceVersion: "1",
  1396. Annotations: map[string]string{
  1397. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1398. },
  1399. Labels: map[string]string{
  1400. v1.IsHeadlessService: "",
  1401. },
  1402. },
  1403. Subsets: []v1.EndpointSubset{{
  1404. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1405. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1406. }},
  1407. })
  1408. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1409. }
  1410. func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
  1411. ns := "other"
  1412. testServer, endpointsHandler := makeTestServer(t, ns)
  1413. defer testServer.Close()
  1414. endpoints := newController(testServer.URL, 0*time.Second)
  1415. endpoints.endpointsStore.Add(&v1.Endpoints{
  1416. ObjectMeta: metav1.ObjectMeta{
  1417. Name: "foo",
  1418. Namespace: ns,
  1419. ResourceVersion: "1",
  1420. Annotations: map[string]string{
  1421. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1422. },
  1423. },
  1424. Subsets: []v1.EndpointSubset{{
  1425. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1426. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1427. }},
  1428. })
  1429. // Neither pod nor service has trigger time, this should cause annotation to be cleared.
  1430. addPods(endpoints.podStore, ns, 1, 1, 0, false)
  1431. endpoints.serviceStore.Add(&v1.Service{
  1432. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1433. Spec: v1.ServiceSpec{
  1434. Selector: map[string]string{},
  1435. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1436. },
  1437. })
  1438. endpoints.syncService(ns + "/foo")
  1439. endpointsHandler.ValidateRequestCount(t, 1)
  1440. data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
  1441. ObjectMeta: metav1.ObjectMeta{
  1442. Name: "foo",
  1443. Namespace: ns,
  1444. ResourceVersion: "1",
  1445. Labels: map[string]string{
  1446. v1.IsHeadlessService: "",
  1447. }, // Annotation not set anymore.
  1448. },
  1449. Subsets: []v1.EndpointSubset{{
  1450. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1451. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1452. }},
  1453. })
  1454. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
  1455. }
  1456. // TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
  1457. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1458. // TODO(mborsz): Migrate this test to mock clock when possible.
  1459. func TestPodUpdatesBatching(t *testing.T) {
  1460. type podUpdate struct {
  1461. delay time.Duration
  1462. podName string
  1463. podIP string
  1464. }
  1465. tests := []struct {
  1466. name string
  1467. batchPeriod time.Duration
  1468. podsCount int
  1469. updates []podUpdate
  1470. finalDelay time.Duration
  1471. wantRequestCount int
  1472. }{
  1473. {
  1474. name: "three updates with no batching",
  1475. batchPeriod: 0 * time.Second,
  1476. podsCount: 10,
  1477. updates: []podUpdate{
  1478. {
  1479. // endpoints.Run needs ~100 ms to start processing updates.
  1480. delay: 200 * time.Millisecond,
  1481. podName: "pod0",
  1482. podIP: "10.0.0.0",
  1483. },
  1484. {
  1485. delay: 100 * time.Millisecond,
  1486. podName: "pod1",
  1487. podIP: "10.0.0.1",
  1488. },
  1489. {
  1490. delay: 100 * time.Millisecond,
  1491. podName: "pod2",
  1492. podIP: "10.0.0.2",
  1493. },
  1494. },
  1495. finalDelay: 3 * time.Second,
  1496. wantRequestCount: 3,
  1497. },
  1498. {
  1499. name: "three updates in one batch",
  1500. batchPeriod: 1 * time.Second,
  1501. podsCount: 10,
  1502. updates: []podUpdate{
  1503. {
  1504. // endpoints.Run needs ~100 ms to start processing updates.
  1505. delay: 200 * time.Millisecond,
  1506. podName: "pod0",
  1507. podIP: "10.0.0.0",
  1508. },
  1509. {
  1510. delay: 100 * time.Millisecond,
  1511. podName: "pod1",
  1512. podIP: "10.0.0.1",
  1513. },
  1514. {
  1515. delay: 100 * time.Millisecond,
  1516. podName: "pod2",
  1517. podIP: "10.0.0.2",
  1518. },
  1519. },
  1520. finalDelay: 3 * time.Second,
  1521. wantRequestCount: 1,
  1522. },
  1523. {
  1524. name: "three updates in two batches",
  1525. batchPeriod: 1 * time.Second,
  1526. podsCount: 10,
  1527. updates: []podUpdate{
  1528. {
  1529. // endpoints.Run needs ~100 ms to start processing updates.
  1530. delay: 200 * time.Millisecond,
  1531. podName: "pod0",
  1532. podIP: "10.0.0.0",
  1533. },
  1534. {
  1535. delay: 100 * time.Millisecond,
  1536. podName: "pod1",
  1537. podIP: "10.0.0.1",
  1538. },
  1539. {
  1540. delay: 1 * time.Second,
  1541. podName: "pod2",
  1542. podIP: "10.0.0.2",
  1543. },
  1544. },
  1545. finalDelay: 3 * time.Second,
  1546. wantRequestCount: 2,
  1547. },
  1548. }
  1549. for _, tc := range tests {
  1550. t.Run(tc.name, func(t *testing.T) {
  1551. ns := "other"
  1552. resourceVersion := 1
  1553. testServer, endpointsHandler := makeTestServer(t, ns)
  1554. defer testServer.Close()
  1555. endpoints := newController(testServer.URL, tc.batchPeriod)
  1556. stopCh := make(chan struct{})
  1557. defer close(stopCh)
  1558. endpoints.podsSynced = alwaysReady
  1559. endpoints.servicesSynced = alwaysReady
  1560. endpoints.endpointsSynced = alwaysReady
  1561. endpoints.workerLoopPeriod = 10 * time.Millisecond
  1562. go endpoints.Run(1, stopCh)
  1563. addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
  1564. endpoints.serviceStore.Add(&v1.Service{
  1565. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1566. Spec: v1.ServiceSpec{
  1567. Selector: map[string]string{"foo": "bar"},
  1568. Ports: []v1.ServicePort{{Port: 80}},
  1569. },
  1570. })
  1571. for _, update := range tc.updates {
  1572. time.Sleep(update.delay)
  1573. old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
  1574. if err != nil {
  1575. t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
  1576. }
  1577. if !exists {
  1578. t.Fatalf("Pod %q doesn't exist", update.podName)
  1579. }
  1580. oldPod := old.(*v1.Pod)
  1581. newPod := oldPod.DeepCopy()
  1582. newPod.Status.PodIP = update.podIP
  1583. newPod.ResourceVersion = strconv.Itoa(resourceVersion)
  1584. resourceVersion++
  1585. endpoints.podStore.Update(newPod)
  1586. endpoints.updatePod(oldPod, newPod)
  1587. }
  1588. time.Sleep(tc.finalDelay)
  1589. endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  1590. })
  1591. }
  1592. }
  1593. // TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
  1594. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1595. // TODO(mborsz): Migrate this test to mock clock when possible.
  1596. func TestPodAddsBatching(t *testing.T) {
  1597. type podAdd struct {
  1598. delay time.Duration
  1599. }
  1600. tests := []struct {
  1601. name string
  1602. batchPeriod time.Duration
  1603. adds []podAdd
  1604. finalDelay time.Duration
  1605. wantRequestCount int
  1606. }{
  1607. {
  1608. name: "three adds with no batching",
  1609. batchPeriod: 0 * time.Second,
  1610. adds: []podAdd{
  1611. {
  1612. // endpoints.Run needs ~100 ms to start processing updates.
  1613. delay: 200 * time.Millisecond,
  1614. },
  1615. {
  1616. delay: 100 * time.Millisecond,
  1617. },
  1618. {
  1619. delay: 100 * time.Millisecond,
  1620. },
  1621. },
  1622. finalDelay: 3 * time.Second,
  1623. wantRequestCount: 3,
  1624. },
  1625. {
  1626. name: "three adds in one batch",
  1627. batchPeriod: 1 * time.Second,
  1628. adds: []podAdd{
  1629. {
  1630. // endpoints.Run needs ~100 ms to start processing updates.
  1631. delay: 200 * time.Millisecond,
  1632. },
  1633. {
  1634. delay: 100 * time.Millisecond,
  1635. },
  1636. {
  1637. delay: 100 * time.Millisecond,
  1638. },
  1639. },
  1640. finalDelay: 3 * time.Second,
  1641. wantRequestCount: 1,
  1642. },
  1643. {
  1644. name: "three adds in two batches",
  1645. batchPeriod: 1 * time.Second,
  1646. adds: []podAdd{
  1647. {
  1648. // endpoints.Run needs ~100 ms to start processing updates.
  1649. delay: 200 * time.Millisecond,
  1650. },
  1651. {
  1652. delay: 100 * time.Millisecond,
  1653. },
  1654. {
  1655. delay: 1 * time.Second,
  1656. },
  1657. },
  1658. finalDelay: 3 * time.Second,
  1659. wantRequestCount: 2,
  1660. },
  1661. }
  1662. for _, tc := range tests {
  1663. t.Run(tc.name, func(t *testing.T) {
  1664. ns := "other"
  1665. testServer, endpointsHandler := makeTestServer(t, ns)
  1666. defer testServer.Close()
  1667. endpoints := newController(testServer.URL, tc.batchPeriod)
  1668. stopCh := make(chan struct{})
  1669. defer close(stopCh)
  1670. endpoints.podsSynced = alwaysReady
  1671. endpoints.servicesSynced = alwaysReady
  1672. endpoints.endpointsSynced = alwaysReady
  1673. endpoints.workerLoopPeriod = 10 * time.Millisecond
  1674. go endpoints.Run(1, stopCh)
  1675. endpoints.serviceStore.Add(&v1.Service{
  1676. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1677. Spec: v1.ServiceSpec{
  1678. Selector: map[string]string{"foo": "bar"},
  1679. Ports: []v1.ServicePort{{Port: 80}},
  1680. },
  1681. })
  1682. for i, add := range tc.adds {
  1683. time.Sleep(add.delay)
  1684. p := testPod(ns, i, 1, true, false)
  1685. endpoints.podStore.Add(p)
  1686. endpoints.addPod(p)
  1687. }
  1688. time.Sleep(tc.finalDelay)
  1689. endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  1690. })
  1691. }
  1692. }
  1693. // TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
  1694. // This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
  1695. // TODO(mborsz): Migrate this test to mock clock when possible.
  1696. func TestPodDeleteBatching(t *testing.T) {
  1697. type podDelete struct {
  1698. delay time.Duration
  1699. podName string
  1700. }
  1701. tests := []struct {
  1702. name string
  1703. batchPeriod time.Duration
  1704. podsCount int
  1705. deletes []podDelete
  1706. finalDelay time.Duration
  1707. wantRequestCount int
  1708. }{
  1709. {
  1710. name: "three deletes with no batching",
  1711. batchPeriod: 0 * time.Second,
  1712. podsCount: 10,
  1713. deletes: []podDelete{
  1714. {
  1715. // endpoints.Run needs ~100 ms to start processing updates.
  1716. delay: 200 * time.Millisecond,
  1717. podName: "pod0",
  1718. },
  1719. {
  1720. delay: 100 * time.Millisecond,
  1721. podName: "pod1",
  1722. },
  1723. {
  1724. delay: 100 * time.Millisecond,
  1725. podName: "pod2",
  1726. },
  1727. },
  1728. finalDelay: 3 * time.Second,
  1729. wantRequestCount: 3,
  1730. },
  1731. {
  1732. name: "three deletes in one batch",
  1733. batchPeriod: 1 * time.Second,
  1734. podsCount: 10,
  1735. deletes: []podDelete{
  1736. {
  1737. // endpoints.Run needs ~100 ms to start processing updates.
  1738. delay: 200 * time.Millisecond,
  1739. podName: "pod0",
  1740. },
  1741. {
  1742. delay: 100 * time.Millisecond,
  1743. podName: "pod1",
  1744. },
  1745. {
  1746. delay: 100 * time.Millisecond,
  1747. podName: "pod2",
  1748. },
  1749. },
  1750. finalDelay: 3 * time.Second,
  1751. wantRequestCount: 1,
  1752. },
  1753. {
  1754. name: "three deletes in two batches",
  1755. batchPeriod: 1 * time.Second,
  1756. podsCount: 10,
  1757. deletes: []podDelete{
  1758. {
  1759. // endpoints.Run needs ~100 ms to start processing updates.
  1760. delay: 200 * time.Millisecond,
  1761. podName: "pod0",
  1762. },
  1763. {
  1764. delay: 100 * time.Millisecond,
  1765. podName: "pod1",
  1766. },
  1767. {
  1768. delay: 1 * time.Second,
  1769. podName: "pod2",
  1770. },
  1771. },
  1772. finalDelay: 3 * time.Second,
  1773. wantRequestCount: 2,
  1774. },
  1775. }
  1776. for _, tc := range tests {
  1777. t.Run(tc.name, func(t *testing.T) {
  1778. ns := "other"
  1779. testServer, endpointsHandler := makeTestServer(t, ns)
  1780. defer testServer.Close()
  1781. endpoints := newController(testServer.URL, tc.batchPeriod)
  1782. stopCh := make(chan struct{})
  1783. defer close(stopCh)
  1784. endpoints.podsSynced = alwaysReady
  1785. endpoints.servicesSynced = alwaysReady
  1786. endpoints.endpointsSynced = alwaysReady
  1787. endpoints.workerLoopPeriod = 10 * time.Millisecond
  1788. go endpoints.Run(1, stopCh)
  1789. addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
  1790. endpoints.serviceStore.Add(&v1.Service{
  1791. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1792. Spec: v1.ServiceSpec{
  1793. Selector: map[string]string{"foo": "bar"},
  1794. Ports: []v1.ServicePort{{Port: 80}},
  1795. },
  1796. })
  1797. for _, update := range tc.deletes {
  1798. time.Sleep(update.delay)
  1799. old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
  1800. if err != nil {
  1801. t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
  1802. }
  1803. if !exists {
  1804. t.Fatalf("Pod %q doesn't exist", update.podName)
  1805. }
  1806. endpoints.podStore.Delete(old)
  1807. endpoints.deletePod(old)
  1808. }
  1809. time.Sleep(tc.finalDelay)
  1810. endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
  1811. })
  1812. }
  1813. }
  1814. func TestSyncEndpointsServiceNotFound(t *testing.T) {
  1815. ns := metav1.NamespaceDefault
  1816. testServer, endpointsHandler := makeTestServer(t, ns)
  1817. defer testServer.Close()
  1818. endpoints := newController(testServer.URL, 0)
  1819. endpoints.endpointsStore.Add(&v1.Endpoints{
  1820. ObjectMeta: metav1.ObjectMeta{
  1821. Name: "foo",
  1822. Namespace: ns,
  1823. ResourceVersion: "1",
  1824. },
  1825. })
  1826. endpoints.syncService(ns + "/foo")
  1827. endpointsHandler.ValidateRequestCount(t, 1)
  1828. endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
  1829. }
  1830. func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
  1831. podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
  1832. return podChanged
  1833. }