endpoints_controller_test.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "fmt"
  16. "net/http"
  17. "net/http/httptest"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/runtime/schema"
  24. "k8s.io/apimachinery/pkg/util/intstr"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. "k8s.io/client-go/informers"
  28. clientset "k8s.io/client-go/kubernetes"
  29. restclient "k8s.io/client-go/rest"
  30. "k8s.io/client-go/tools/cache"
  31. utiltesting "k8s.io/client-go/util/testing"
  32. "k8s.io/kubernetes/pkg/api/testapi"
  33. endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
  34. api "k8s.io/kubernetes/pkg/apis/core"
  35. "k8s.io/kubernetes/pkg/controller"
  36. )
  37. var alwaysReady = func() bool { return true }
  38. var neverReady = func() bool { return false }
  39. var emptyNodeName string
  40. var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
  41. var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
  42. var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
  43. func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
  44. for i := 0; i < nPods+nNotReady; i++ {
  45. p := &v1.Pod{
  46. TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
  47. ObjectMeta: metav1.ObjectMeta{
  48. Namespace: namespace,
  49. Name: fmt.Sprintf("pod%d", i),
  50. Labels: map[string]string{"foo": "bar"},
  51. },
  52. Spec: v1.PodSpec{
  53. Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
  54. },
  55. Status: v1.PodStatus{
  56. PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
  57. Conditions: []v1.PodCondition{
  58. {
  59. Type: v1.PodReady,
  60. Status: v1.ConditionTrue,
  61. },
  62. },
  63. },
  64. }
  65. if i >= nPods {
  66. p.Status.Conditions[0].Status = v1.ConditionFalse
  67. }
  68. for j := 0; j < nPorts; j++ {
  69. p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
  70. v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
  71. }
  72. store.Add(p)
  73. }
  74. }
  75. func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
  76. for i := 0; i < nPods; i++ {
  77. p := &v1.Pod{
  78. TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
  79. ObjectMeta: metav1.ObjectMeta{
  80. Namespace: namespace,
  81. Name: fmt.Sprintf("pod%d", i),
  82. Labels: map[string]string{"foo": "bar"},
  83. },
  84. Spec: v1.PodSpec{
  85. RestartPolicy: restartPolicy,
  86. Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
  87. },
  88. Status: v1.PodStatus{
  89. PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
  90. Phase: podPhase,
  91. Conditions: []v1.PodCondition{
  92. {
  93. Type: v1.PodReady,
  94. Status: v1.ConditionFalse,
  95. },
  96. },
  97. },
  98. }
  99. for j := 0; j < nPorts; j++ {
  100. p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
  101. v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
  102. }
  103. store.Add(p)
  104. }
  105. }
  106. func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
  107. fakeEndpointsHandler := utiltesting.FakeHandler{
  108. StatusCode: http.StatusOK,
  109. ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}),
  110. }
  111. mux := http.NewServeMux()
  112. mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
  113. mux.Handle(testapi.Default.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
  114. mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
  115. t.Errorf("unexpected request: %v", req.RequestURI)
  116. http.Error(res, "", http.StatusNotFound)
  117. })
  118. return httptest.NewServer(mux), &fakeEndpointsHandler
  119. }
  120. type endpointController struct {
  121. *EndpointController
  122. podStore cache.Store
  123. serviceStore cache.Store
  124. endpointsStore cache.Store
  125. }
  126. func newController(url string) *endpointController {
  127. client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  128. informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  129. endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
  130. informerFactory.Core().V1().Endpoints(), client)
  131. endpoints.podsSynced = alwaysReady
  132. endpoints.servicesSynced = alwaysReady
  133. endpoints.endpointsSynced = alwaysReady
  134. return &endpointController{
  135. endpoints,
  136. informerFactory.Core().V1().Pods().Informer().GetStore(),
  137. informerFactory.Core().V1().Services().Informer().GetStore(),
  138. informerFactory.Core().V1().Endpoints().Informer().GetStore(),
  139. }
  140. }
  141. func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
  142. ns := metav1.NamespaceDefault
  143. testServer, endpointsHandler := makeTestServer(t, ns)
  144. defer testServer.Close()
  145. endpoints := newController(testServer.URL)
  146. endpoints.endpointsStore.Add(&v1.Endpoints{
  147. ObjectMeta: metav1.ObjectMeta{
  148. Name: "foo",
  149. Namespace: ns,
  150. ResourceVersion: "1",
  151. },
  152. Subsets: []v1.EndpointSubset{{
  153. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  154. Ports: []v1.EndpointPort{{Port: 1000}},
  155. }},
  156. })
  157. endpoints.serviceStore.Add(&v1.Service{
  158. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  159. Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
  160. })
  161. endpoints.syncService(ns + "/foo")
  162. endpointsHandler.ValidateRequestCount(t, 0)
  163. }
  164. func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
  165. ns := metav1.NamespaceDefault
  166. testServer, endpointsHandler := makeTestServer(t, ns)
  167. defer testServer.Close()
  168. endpoints := newController(testServer.URL)
  169. endpoints.endpointsStore.Add(&v1.Endpoints{
  170. ObjectMeta: metav1.ObjectMeta{
  171. Name: "foo",
  172. Namespace: ns,
  173. ResourceVersion: "1",
  174. },
  175. Subsets: nil,
  176. })
  177. endpoints.serviceStore.Add(&v1.Service{
  178. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  179. Spec: v1.ServiceSpec{
  180. Selector: map[string]string{"foo": "bar"},
  181. Ports: []v1.ServicePort{{Port: 80}},
  182. },
  183. })
  184. endpoints.syncService(ns + "/foo")
  185. endpointsHandler.ValidateRequestCount(t, 0)
  186. }
  187. func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
  188. ns := metav1.NamespaceDefault
  189. testServer, endpointsHandler := makeTestServer(t, ns)
  190. defer testServer.Close()
  191. endpoints := newController(testServer.URL)
  192. endpoints.endpointsStore.Add(&v1.Endpoints{
  193. ObjectMeta: metav1.ObjectMeta{
  194. Name: "foo",
  195. Namespace: ns,
  196. ResourceVersion: "1",
  197. },
  198. Subsets: []v1.EndpointSubset{},
  199. })
  200. endpoints.serviceStore.Add(&v1.Service{
  201. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  202. Spec: v1.ServiceSpec{
  203. Selector: map[string]string{"foo": "bar"},
  204. Ports: []v1.ServicePort{{Port: 80}},
  205. },
  206. })
  207. endpoints.syncService(ns + "/foo")
  208. endpointsHandler.ValidateRequestCount(t, 0)
  209. }
  210. func TestSyncEndpointsNewNoSubsets(t *testing.T) {
  211. ns := metav1.NamespaceDefault
  212. testServer, endpointsHandler := makeTestServer(t, ns)
  213. defer testServer.Close()
  214. endpoints := newController(testServer.URL)
  215. endpoints.serviceStore.Add(&v1.Service{
  216. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  217. Spec: v1.ServiceSpec{
  218. Selector: map[string]string{"foo": "bar"},
  219. Ports: []v1.ServicePort{{Port: 80}},
  220. },
  221. })
  222. endpoints.syncService(ns + "/foo")
  223. endpointsHandler.ValidateRequestCount(t, 1)
  224. }
  225. func TestCheckLeftoverEndpoints(t *testing.T) {
  226. ns := metav1.NamespaceDefault
  227. testServer, _ := makeTestServer(t, ns)
  228. defer testServer.Close()
  229. endpoints := newController(testServer.URL)
  230. endpoints.endpointsStore.Add(&v1.Endpoints{
  231. ObjectMeta: metav1.ObjectMeta{
  232. Name: "foo",
  233. Namespace: ns,
  234. ResourceVersion: "1",
  235. },
  236. Subsets: []v1.EndpointSubset{{
  237. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  238. Ports: []v1.EndpointPort{{Port: 1000}},
  239. }},
  240. })
  241. endpoints.checkLeftoverEndpoints()
  242. if e, a := 1, endpoints.queue.Len(); e != a {
  243. t.Fatalf("Expected %v, got %v", e, a)
  244. }
  245. got, _ := endpoints.queue.Get()
  246. if e, a := ns+"/foo", got; e != a {
  247. t.Errorf("Expected %v, got %v", e, a)
  248. }
  249. }
  250. func TestSyncEndpointsProtocolTCP(t *testing.T) {
  251. ns := "other"
  252. testServer, endpointsHandler := makeTestServer(t, ns)
  253. defer testServer.Close()
  254. endpoints := newController(testServer.URL)
  255. endpoints.endpointsStore.Add(&v1.Endpoints{
  256. ObjectMeta: metav1.ObjectMeta{
  257. Name: "foo",
  258. Namespace: ns,
  259. ResourceVersion: "1",
  260. },
  261. Subsets: []v1.EndpointSubset{{
  262. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  263. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  264. }},
  265. })
  266. addPods(endpoints.podStore, ns, 1, 1, 0)
  267. endpoints.serviceStore.Add(&v1.Service{
  268. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  269. Spec: v1.ServiceSpec{
  270. Selector: map[string]string{},
  271. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  272. },
  273. })
  274. endpoints.syncService(ns + "/foo")
  275. endpointsHandler.ValidateRequestCount(t, 1)
  276. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  277. ObjectMeta: metav1.ObjectMeta{
  278. Name: "foo",
  279. Namespace: ns,
  280. ResourceVersion: "1",
  281. },
  282. Subsets: []v1.EndpointSubset{{
  283. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  284. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  285. }},
  286. })
  287. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  288. }
  289. func TestSyncEndpointsProtocolUDP(t *testing.T) {
  290. ns := "other"
  291. testServer, endpointsHandler := makeTestServer(t, ns)
  292. defer testServer.Close()
  293. endpoints := newController(testServer.URL)
  294. endpoints.endpointsStore.Add(&v1.Endpoints{
  295. ObjectMeta: metav1.ObjectMeta{
  296. Name: "foo",
  297. Namespace: ns,
  298. ResourceVersion: "1",
  299. },
  300. Subsets: []v1.EndpointSubset{{
  301. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  302. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
  303. }},
  304. })
  305. addPods(endpoints.podStore, ns, 1, 1, 0)
  306. endpoints.serviceStore.Add(&v1.Service{
  307. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  308. Spec: v1.ServiceSpec{
  309. Selector: map[string]string{},
  310. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
  311. },
  312. })
  313. endpoints.syncService(ns + "/foo")
  314. endpointsHandler.ValidateRequestCount(t, 1)
  315. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  316. ObjectMeta: metav1.ObjectMeta{
  317. Name: "foo",
  318. Namespace: ns,
  319. ResourceVersion: "1",
  320. },
  321. Subsets: []v1.EndpointSubset{{
  322. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  323. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
  324. }},
  325. })
  326. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  327. }
  328. func TestSyncEndpointsProtocolSCTP(t *testing.T) {
  329. ns := "other"
  330. testServer, endpointsHandler := makeTestServer(t, ns)
  331. defer testServer.Close()
  332. endpoints := newController(testServer.URL)
  333. endpoints.endpointsStore.Add(&v1.Endpoints{
  334. ObjectMeta: metav1.ObjectMeta{
  335. Name: "foo",
  336. Namespace: ns,
  337. ResourceVersion: "1",
  338. },
  339. Subsets: []v1.EndpointSubset{{
  340. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  341. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
  342. }},
  343. })
  344. addPods(endpoints.podStore, ns, 1, 1, 0)
  345. endpoints.serviceStore.Add(&v1.Service{
  346. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  347. Spec: v1.ServiceSpec{
  348. Selector: map[string]string{},
  349. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
  350. },
  351. })
  352. endpoints.syncService(ns + "/foo")
  353. endpointsHandler.ValidateRequestCount(t, 1)
  354. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  355. ObjectMeta: metav1.ObjectMeta{
  356. Name: "foo",
  357. Namespace: ns,
  358. ResourceVersion: "1",
  359. },
  360. Subsets: []v1.EndpointSubset{{
  361. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  362. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
  363. }},
  364. })
  365. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  366. }
  367. func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
  368. ns := "other"
  369. testServer, endpointsHandler := makeTestServer(t, ns)
  370. defer testServer.Close()
  371. endpoints := newController(testServer.URL)
  372. endpoints.endpointsStore.Add(&v1.Endpoints{
  373. ObjectMeta: metav1.ObjectMeta{
  374. Name: "foo",
  375. Namespace: ns,
  376. ResourceVersion: "1",
  377. },
  378. Subsets: []v1.EndpointSubset{},
  379. })
  380. addPods(endpoints.podStore, ns, 1, 1, 0)
  381. endpoints.serviceStore.Add(&v1.Service{
  382. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  383. Spec: v1.ServiceSpec{
  384. Selector: map[string]string{},
  385. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  386. },
  387. })
  388. endpoints.syncService(ns + "/foo")
  389. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  390. ObjectMeta: metav1.ObjectMeta{
  391. Name: "foo",
  392. Namespace: ns,
  393. ResourceVersion: "1",
  394. },
  395. Subsets: []v1.EndpointSubset{{
  396. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  397. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  398. }},
  399. })
  400. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  401. }
  402. func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
  403. ns := "other"
  404. testServer, endpointsHandler := makeTestServer(t, ns)
  405. defer testServer.Close()
  406. endpoints := newController(testServer.URL)
  407. endpoints.endpointsStore.Add(&v1.Endpoints{
  408. ObjectMeta: metav1.ObjectMeta{
  409. Name: "foo",
  410. Namespace: ns,
  411. ResourceVersion: "1",
  412. },
  413. Subsets: []v1.EndpointSubset{},
  414. })
  415. addPods(endpoints.podStore, ns, 0, 1, 1)
  416. endpoints.serviceStore.Add(&v1.Service{
  417. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  418. Spec: v1.ServiceSpec{
  419. Selector: map[string]string{},
  420. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  421. },
  422. })
  423. endpoints.syncService(ns + "/foo")
  424. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  425. ObjectMeta: metav1.ObjectMeta{
  426. Name: "foo",
  427. Namespace: ns,
  428. ResourceVersion: "1",
  429. },
  430. Subsets: []v1.EndpointSubset{{
  431. NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  432. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  433. }},
  434. })
  435. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  436. }
  437. func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
  438. ns := "other"
  439. testServer, endpointsHandler := makeTestServer(t, ns)
  440. defer testServer.Close()
  441. endpoints := newController(testServer.URL)
  442. endpoints.endpointsStore.Add(&v1.Endpoints{
  443. ObjectMeta: metav1.ObjectMeta{
  444. Name: "foo",
  445. Namespace: ns,
  446. ResourceVersion: "1",
  447. },
  448. Subsets: []v1.EndpointSubset{},
  449. })
  450. addPods(endpoints.podStore, ns, 1, 1, 1)
  451. endpoints.serviceStore.Add(&v1.Service{
  452. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  453. Spec: v1.ServiceSpec{
  454. Selector: map[string]string{},
  455. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  456. },
  457. })
  458. endpoints.syncService(ns + "/foo")
  459. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  460. ObjectMeta: metav1.ObjectMeta{
  461. Name: "foo",
  462. Namespace: ns,
  463. ResourceVersion: "1",
  464. },
  465. Subsets: []v1.EndpointSubset{{
  466. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  467. NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
  468. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  469. }},
  470. })
  471. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  472. }
  473. func TestSyncEndpointsItemsPreexisting(t *testing.T) {
  474. ns := "bar"
  475. testServer, endpointsHandler := makeTestServer(t, ns)
  476. defer testServer.Close()
  477. endpoints := newController(testServer.URL)
  478. endpoints.endpointsStore.Add(&v1.Endpoints{
  479. ObjectMeta: metav1.ObjectMeta{
  480. Name: "foo",
  481. Namespace: ns,
  482. ResourceVersion: "1",
  483. },
  484. Subsets: []v1.EndpointSubset{{
  485. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  486. Ports: []v1.EndpointPort{{Port: 1000}},
  487. }},
  488. })
  489. addPods(endpoints.podStore, ns, 1, 1, 0)
  490. endpoints.serviceStore.Add(&v1.Service{
  491. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  492. Spec: v1.ServiceSpec{
  493. Selector: map[string]string{"foo": "bar"},
  494. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  495. },
  496. })
  497. endpoints.syncService(ns + "/foo")
  498. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  499. ObjectMeta: metav1.ObjectMeta{
  500. Name: "foo",
  501. Namespace: ns,
  502. ResourceVersion: "1",
  503. },
  504. Subsets: []v1.EndpointSubset{{
  505. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  506. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  507. }},
  508. })
  509. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  510. }
  511. func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
  512. ns := metav1.NamespaceDefault
  513. testServer, endpointsHandler := makeTestServer(t, ns)
  514. defer testServer.Close()
  515. endpoints := newController(testServer.URL)
  516. endpoints.endpointsStore.Add(&v1.Endpoints{
  517. ObjectMeta: metav1.ObjectMeta{
  518. ResourceVersion: "1",
  519. Name: "foo",
  520. Namespace: ns,
  521. },
  522. Subsets: []v1.EndpointSubset{{
  523. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  524. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  525. }},
  526. })
  527. addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0)
  528. endpoints.serviceStore.Add(&v1.Service{
  529. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
  530. Spec: v1.ServiceSpec{
  531. Selector: map[string]string{"foo": "bar"},
  532. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  533. },
  534. })
  535. endpoints.syncService(ns + "/foo")
  536. endpointsHandler.ValidateRequestCount(t, 0)
  537. }
  538. func TestSyncEndpointsItems(t *testing.T) {
  539. ns := "other"
  540. testServer, endpointsHandler := makeTestServer(t, ns)
  541. defer testServer.Close()
  542. endpoints := newController(testServer.URL)
  543. addPods(endpoints.podStore, ns, 3, 2, 0)
  544. addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found!
  545. endpoints.serviceStore.Add(&v1.Service{
  546. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  547. Spec: v1.ServiceSpec{
  548. Selector: map[string]string{"foo": "bar"},
  549. Ports: []v1.ServicePort{
  550. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  551. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  552. },
  553. },
  554. })
  555. endpoints.syncService("other/foo")
  556. expectedSubsets := []v1.EndpointSubset{{
  557. Addresses: []v1.EndpointAddress{
  558. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  559. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  560. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  561. },
  562. Ports: []v1.EndpointPort{
  563. {Name: "port0", Port: 8080, Protocol: "TCP"},
  564. {Name: "port1", Port: 8088, Protocol: "TCP"},
  565. },
  566. }}
  567. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  568. ObjectMeta: metav1.ObjectMeta{
  569. ResourceVersion: "",
  570. Name: "foo",
  571. },
  572. Subsets: endptspkg.SortSubsets(expectedSubsets),
  573. })
  574. endpointsHandler.ValidateRequestCount(t, 1)
  575. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
  576. }
  577. func TestSyncEndpointsItemsWithLabels(t *testing.T) {
  578. ns := "other"
  579. testServer, endpointsHandler := makeTestServer(t, ns)
  580. defer testServer.Close()
  581. endpoints := newController(testServer.URL)
  582. addPods(endpoints.podStore, ns, 3, 2, 0)
  583. serviceLabels := map[string]string{"foo": "bar"}
  584. endpoints.serviceStore.Add(&v1.Service{
  585. ObjectMeta: metav1.ObjectMeta{
  586. Name: "foo",
  587. Namespace: ns,
  588. Labels: serviceLabels,
  589. },
  590. Spec: v1.ServiceSpec{
  591. Selector: map[string]string{"foo": "bar"},
  592. Ports: []v1.ServicePort{
  593. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  594. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  595. },
  596. },
  597. })
  598. endpoints.syncService(ns + "/foo")
  599. expectedSubsets := []v1.EndpointSubset{{
  600. Addresses: []v1.EndpointAddress{
  601. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  602. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  603. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  604. },
  605. Ports: []v1.EndpointPort{
  606. {Name: "port0", Port: 8080, Protocol: "TCP"},
  607. {Name: "port1", Port: 8088, Protocol: "TCP"},
  608. },
  609. }}
  610. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  611. ObjectMeta: metav1.ObjectMeta{
  612. ResourceVersion: "",
  613. Name: "foo",
  614. Labels: serviceLabels,
  615. },
  616. Subsets: endptspkg.SortSubsets(expectedSubsets),
  617. })
  618. endpointsHandler.ValidateRequestCount(t, 1)
  619. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
  620. }
  621. func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
  622. ns := "bar"
  623. testServer, endpointsHandler := makeTestServer(t, ns)
  624. defer testServer.Close()
  625. endpoints := newController(testServer.URL)
  626. endpoints.endpointsStore.Add(&v1.Endpoints{
  627. ObjectMeta: metav1.ObjectMeta{
  628. Name: "foo",
  629. Namespace: ns,
  630. ResourceVersion: "1",
  631. Labels: map[string]string{
  632. "foo": "bar",
  633. },
  634. },
  635. Subsets: []v1.EndpointSubset{{
  636. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  637. Ports: []v1.EndpointPort{{Port: 1000}},
  638. }},
  639. })
  640. addPods(endpoints.podStore, ns, 1, 1, 0)
  641. serviceLabels := map[string]string{"baz": "blah"}
  642. endpoints.serviceStore.Add(&v1.Service{
  643. ObjectMeta: metav1.ObjectMeta{
  644. Name: "foo",
  645. Namespace: ns,
  646. Labels: serviceLabels,
  647. },
  648. Spec: v1.ServiceSpec{
  649. Selector: map[string]string{"foo": "bar"},
  650. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  651. },
  652. })
  653. endpoints.syncService(ns + "/foo")
  654. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  655. ObjectMeta: metav1.ObjectMeta{
  656. Name: "foo",
  657. Namespace: ns,
  658. ResourceVersion: "1",
  659. Labels: serviceLabels,
  660. },
  661. Subsets: []v1.EndpointSubset{{
  662. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  663. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  664. }},
  665. })
  666. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  667. }
  668. func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
  669. var tests = []struct {
  670. podsSynced func() bool
  671. servicesSynced func() bool
  672. endpointsSynced func() bool
  673. shouldUpdateEndpoints bool
  674. }{
  675. {neverReady, alwaysReady, alwaysReady, false},
  676. {alwaysReady, neverReady, alwaysReady, false},
  677. {alwaysReady, alwaysReady, neverReady, false},
  678. {alwaysReady, alwaysReady, alwaysReady, true},
  679. }
  680. for _, test := range tests {
  681. func() {
  682. ns := "other"
  683. testServer, endpointsHandler := makeTestServer(t, ns)
  684. defer testServer.Close()
  685. endpoints := newController(testServer.URL)
  686. addPods(endpoints.podStore, ns, 1, 1, 0)
  687. service := &v1.Service{
  688. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  689. Spec: v1.ServiceSpec{
  690. Selector: map[string]string{},
  691. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  692. },
  693. }
  694. endpoints.serviceStore.Add(service)
  695. endpoints.enqueueService(service)
  696. endpoints.podsSynced = test.podsSynced
  697. endpoints.servicesSynced = test.servicesSynced
  698. endpoints.endpointsSynced = test.endpointsSynced
  699. endpoints.workerLoopPeriod = 10 * time.Millisecond
  700. stopCh := make(chan struct{})
  701. defer close(stopCh)
  702. go endpoints.Run(1, stopCh)
  703. // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
  704. // To ensure we get all updates, including unexpected ones, we need to wait at least as long as
  705. // a single cache sync period and worker period, with some fudge room.
  706. time.Sleep(150 * time.Millisecond)
  707. if test.shouldUpdateEndpoints {
  708. // Ensure the work queue has been processed by looping for up to a second to prevent flakes.
  709. wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
  710. return endpoints.queue.Len() == 0, nil
  711. })
  712. endpointsHandler.ValidateRequestCount(t, 1)
  713. } else {
  714. endpointsHandler.ValidateRequestCount(t, 0)
  715. }
  716. }()
  717. }
  718. }
  719. func TestSyncEndpointsHeadlessService(t *testing.T) {
  720. ns := "headless"
  721. testServer, endpointsHandler := makeTestServer(t, ns)
  722. defer testServer.Close()
  723. endpoints := newController(testServer.URL)
  724. endpoints.endpointsStore.Add(&v1.Endpoints{
  725. ObjectMeta: metav1.ObjectMeta{
  726. Name: "foo",
  727. Namespace: ns,
  728. ResourceVersion: "1",
  729. },
  730. Subsets: []v1.EndpointSubset{{
  731. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  732. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  733. }},
  734. })
  735. addPods(endpoints.podStore, ns, 1, 1, 0)
  736. endpoints.serviceStore.Add(&v1.Service{
  737. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  738. Spec: v1.ServiceSpec{
  739. Selector: map[string]string{},
  740. ClusterIP: api.ClusterIPNone,
  741. Ports: []v1.ServicePort{},
  742. },
  743. })
  744. endpoints.syncService(ns + "/foo")
  745. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  746. ObjectMeta: metav1.ObjectMeta{
  747. Name: "foo",
  748. Namespace: ns,
  749. ResourceVersion: "1",
  750. },
  751. Subsets: []v1.EndpointSubset{{
  752. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  753. Ports: []v1.EndpointPort{},
  754. }},
  755. })
  756. endpointsHandler.ValidateRequestCount(t, 1)
  757. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  758. }
  759. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
  760. ns := "other"
  761. testServer, endpointsHandler := makeTestServer(t, ns)
  762. defer testServer.Close()
  763. endpoints := newController(testServer.URL)
  764. endpoints.endpointsStore.Add(&v1.Endpoints{
  765. ObjectMeta: metav1.ObjectMeta{
  766. Name: "foo",
  767. Namespace: ns,
  768. ResourceVersion: "1",
  769. Labels: map[string]string{
  770. "foo": "bar",
  771. },
  772. },
  773. Subsets: []v1.EndpointSubset{},
  774. })
  775. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
  776. endpoints.serviceStore.Add(&v1.Service{
  777. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  778. Spec: v1.ServiceSpec{
  779. Selector: map[string]string{"foo": "bar"},
  780. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  781. },
  782. })
  783. endpoints.syncService(ns + "/foo")
  784. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  785. ObjectMeta: metav1.ObjectMeta{
  786. Name: "foo",
  787. Namespace: ns,
  788. ResourceVersion: "1",
  789. },
  790. Subsets: []v1.EndpointSubset{},
  791. })
  792. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  793. }
  794. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
  795. ns := "other"
  796. testServer, endpointsHandler := makeTestServer(t, ns)
  797. defer testServer.Close()
  798. endpoints := newController(testServer.URL)
  799. endpoints.endpointsStore.Add(&v1.Endpoints{
  800. ObjectMeta: metav1.ObjectMeta{
  801. Name: "foo",
  802. Namespace: ns,
  803. ResourceVersion: "1",
  804. Labels: map[string]string{
  805. "foo": "bar",
  806. },
  807. },
  808. Subsets: []v1.EndpointSubset{},
  809. })
  810. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
  811. endpoints.serviceStore.Add(&v1.Service{
  812. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  813. Spec: v1.ServiceSpec{
  814. Selector: map[string]string{"foo": "bar"},
  815. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  816. },
  817. })
  818. endpoints.syncService(ns + "/foo")
  819. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  820. ObjectMeta: metav1.ObjectMeta{
  821. Name: "foo",
  822. Namespace: ns,
  823. ResourceVersion: "1",
  824. },
  825. Subsets: []v1.EndpointSubset{},
  826. })
  827. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  828. }
  829. func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
  830. ns := "other"
  831. testServer, endpointsHandler := makeTestServer(t, ns)
  832. defer testServer.Close()
  833. endpoints := newController(testServer.URL)
  834. endpoints.endpointsStore.Add(&v1.Endpoints{
  835. ObjectMeta: metav1.ObjectMeta{
  836. Name: "foo",
  837. Namespace: ns,
  838. ResourceVersion: "1",
  839. Labels: map[string]string{
  840. "foo": "bar",
  841. },
  842. },
  843. Subsets: []v1.EndpointSubset{},
  844. })
  845. addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
  846. endpoints.serviceStore.Add(&v1.Service{
  847. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  848. Spec: v1.ServiceSpec{
  849. Selector: map[string]string{"foo": "bar"},
  850. Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  851. },
  852. })
  853. endpoints.syncService(ns + "/foo")
  854. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  855. ObjectMeta: metav1.ObjectMeta{
  856. Name: "foo",
  857. Namespace: ns,
  858. ResourceVersion: "1",
  859. },
  860. Subsets: []v1.EndpointSubset{},
  861. })
  862. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  863. }
  864. func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
  865. ns := metav1.NamespaceDefault
  866. testServer, endpointsHandler := makeTestServer(t, ns)
  867. defer testServer.Close()
  868. endpoints := newController(testServer.URL)
  869. endpoints.serviceStore.Add(&v1.Service{
  870. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  871. Spec: v1.ServiceSpec{
  872. Selector: map[string]string{"foo": "bar"},
  873. ClusterIP: "None",
  874. Ports: nil,
  875. },
  876. })
  877. addPods(endpoints.podStore, ns, 1, 1, 0)
  878. endpoints.syncService(ns + "/foo")
  879. endpointsHandler.ValidateRequestCount(t, 1)
  880. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  881. ObjectMeta: metav1.ObjectMeta{
  882. Name: "foo",
  883. },
  884. Subsets: []v1.EndpointSubset{{
  885. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  886. Ports: nil,
  887. }},
  888. })
  889. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
  890. }
  891. // There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
  892. // Just list all of the 3 false cases and 3 of the 12 true cases.
  893. func TestShouldPodBeInEndpoints(t *testing.T) {
  894. testCases := []struct {
  895. name string
  896. pod *v1.Pod
  897. expected bool
  898. }{
  899. // Pod should not be in endpoints cases:
  900. {
  901. name: "Failed pod with Never RestartPolicy",
  902. pod: &v1.Pod{
  903. Spec: v1.PodSpec{
  904. RestartPolicy: v1.RestartPolicyNever,
  905. },
  906. Status: v1.PodStatus{
  907. Phase: v1.PodFailed,
  908. },
  909. },
  910. expected: false,
  911. },
  912. {
  913. name: "Succeeded pod with Never RestartPolicy",
  914. pod: &v1.Pod{
  915. Spec: v1.PodSpec{
  916. RestartPolicy: v1.RestartPolicyNever,
  917. },
  918. Status: v1.PodStatus{
  919. Phase: v1.PodSucceeded,
  920. },
  921. },
  922. expected: false,
  923. },
  924. {
  925. name: "Succeeded pod with OnFailure RestartPolicy",
  926. pod: &v1.Pod{
  927. Spec: v1.PodSpec{
  928. RestartPolicy: v1.RestartPolicyOnFailure,
  929. },
  930. Status: v1.PodStatus{
  931. Phase: v1.PodSucceeded,
  932. },
  933. },
  934. expected: false,
  935. },
  936. // Pod should be in endpoints cases:
  937. {
  938. name: "Failed pod with Always RestartPolicy",
  939. pod: &v1.Pod{
  940. Spec: v1.PodSpec{
  941. RestartPolicy: v1.RestartPolicyAlways,
  942. },
  943. Status: v1.PodStatus{
  944. Phase: v1.PodFailed,
  945. },
  946. },
  947. expected: true,
  948. },
  949. {
  950. name: "Pending pod with Never RestartPolicy",
  951. pod: &v1.Pod{
  952. Spec: v1.PodSpec{
  953. RestartPolicy: v1.RestartPolicyNever,
  954. },
  955. Status: v1.PodStatus{
  956. Phase: v1.PodPending,
  957. },
  958. },
  959. expected: true,
  960. },
  961. {
  962. name: "Unknown pod with OnFailure RestartPolicy",
  963. pod: &v1.Pod{
  964. Spec: v1.PodSpec{
  965. RestartPolicy: v1.RestartPolicyOnFailure,
  966. },
  967. Status: v1.PodStatus{
  968. Phase: v1.PodUnknown,
  969. },
  970. },
  971. expected: true,
  972. },
  973. }
  974. for _, test := range testCases {
  975. result := shouldPodBeInEndpoints(test.pod)
  976. if result != test.expected {
  977. t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
  978. }
  979. }
  980. }
  981. func TestPodToEndpointAddress(t *testing.T) {
  982. podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  983. ns := "test"
  984. addPods(podStore, ns, 1, 1, 0)
  985. pods := podStore.List()
  986. if len(pods) != 1 {
  987. t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
  988. return
  989. }
  990. pod := pods[0].(*v1.Pod)
  991. epa := podToEndpointAddress(pod)
  992. if epa.IP != pod.Status.PodIP {
  993. t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
  994. }
  995. if *(epa.NodeName) != pod.Spec.NodeName {
  996. t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
  997. }
  998. if epa.TargetRef.Kind != "Pod" {
  999. t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
  1000. }
  1001. if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
  1002. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
  1003. }
  1004. if epa.TargetRef.Name != pod.ObjectMeta.Name {
  1005. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
  1006. }
  1007. if epa.TargetRef.UID != pod.ObjectMeta.UID {
  1008. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
  1009. }
  1010. if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
  1011. t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
  1012. }
  1013. }
  1014. func TestPodChanged(t *testing.T) {
  1015. podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
  1016. ns := "test"
  1017. addPods(podStore, ns, 1, 1, 0)
  1018. pods := podStore.List()
  1019. if len(pods) != 1 {
  1020. t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
  1021. return
  1022. }
  1023. oldPod := pods[0].(*v1.Pod)
  1024. newPod := oldPod.DeepCopy()
  1025. if podChanged(oldPod, newPod) {
  1026. t.Errorf("Expected pod to be unchanged for copied pod")
  1027. }
  1028. newPod.Spec.NodeName = "changed"
  1029. if !podChanged(oldPod, newPod) {
  1030. t.Errorf("Expected pod to be changed for pod with NodeName changed")
  1031. }
  1032. newPod.Spec.NodeName = oldPod.Spec.NodeName
  1033. newPod.ObjectMeta.ResourceVersion = "changed"
  1034. if podChanged(oldPod, newPod) {
  1035. t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
  1036. }
  1037. newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
  1038. newPod.Status.PodIP = "1.2.3.1"
  1039. if !podChanged(oldPod, newPod) {
  1040. t.Errorf("Expected pod to be changed with pod IP address change")
  1041. }
  1042. newPod.Status.PodIP = oldPod.Status.PodIP
  1043. newPod.ObjectMeta.Name = "wrong-name"
  1044. if !podChanged(oldPod, newPod) {
  1045. t.Errorf("Expected pod to be changed with pod name change")
  1046. }
  1047. newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
  1048. saveConditions := oldPod.Status.Conditions
  1049. oldPod.Status.Conditions = nil
  1050. if !podChanged(oldPod, newPod) {
  1051. t.Errorf("Expected pod to be changed with pod readiness change")
  1052. }
  1053. oldPod.Status.Conditions = saveConditions
  1054. now := metav1.NewTime(time.Now().UTC())
  1055. newPod.ObjectMeta.DeletionTimestamp = &now
  1056. if !podChanged(oldPod, newPod) {
  1057. t.Errorf("Expected pod to be changed with DeletionTimestamp change")
  1058. }
  1059. newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
  1060. }
  1061. func TestDetermineNeededServiceUpdates(t *testing.T) {
  1062. testCases := []struct {
  1063. name string
  1064. a sets.String
  1065. b sets.String
  1066. union sets.String
  1067. xor sets.String
  1068. }{
  1069. {
  1070. name: "no services changed",
  1071. a: sets.NewString("a", "b", "c"),
  1072. b: sets.NewString("a", "b", "c"),
  1073. xor: sets.NewString(),
  1074. union: sets.NewString("a", "b", "c"),
  1075. },
  1076. {
  1077. name: "all old services removed, new services added",
  1078. a: sets.NewString("a", "b", "c"),
  1079. b: sets.NewString("d", "e", "f"),
  1080. xor: sets.NewString("a", "b", "c", "d", "e", "f"),
  1081. union: sets.NewString("a", "b", "c", "d", "e", "f"),
  1082. },
  1083. {
  1084. name: "all old services removed, no new services added",
  1085. a: sets.NewString("a", "b", "c"),
  1086. b: sets.NewString(),
  1087. xor: sets.NewString("a", "b", "c"),
  1088. union: sets.NewString("a", "b", "c"),
  1089. },
  1090. {
  1091. name: "no old services, but new services added",
  1092. a: sets.NewString(),
  1093. b: sets.NewString("a", "b", "c"),
  1094. xor: sets.NewString("a", "b", "c"),
  1095. union: sets.NewString("a", "b", "c"),
  1096. },
  1097. {
  1098. name: "one service removed, one service added, two unchanged",
  1099. a: sets.NewString("a", "b", "c"),
  1100. b: sets.NewString("b", "c", "d"),
  1101. xor: sets.NewString("a", "d"),
  1102. union: sets.NewString("a", "b", "c", "d"),
  1103. },
  1104. {
  1105. name: "no services",
  1106. a: sets.NewString(),
  1107. b: sets.NewString(),
  1108. xor: sets.NewString(),
  1109. union: sets.NewString(),
  1110. },
  1111. }
  1112. for _, testCase := range testCases {
  1113. retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
  1114. if !retval.Equal(testCase.xor) {
  1115. t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
  1116. }
  1117. retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
  1118. if !retval.Equal(testCase.union) {
  1119. t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
  1120. }
  1121. }
  1122. }
  1123. func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
  1124. ns := "other"
  1125. testServer, endpointsHandler := makeTestServer(t, ns)
  1126. defer testServer.Close()
  1127. endpoints := newController(testServer.URL)
  1128. endpoints.endpointsStore.Add(&v1.Endpoints{
  1129. ObjectMeta: metav1.ObjectMeta{
  1130. Name: "foo",
  1131. Namespace: ns,
  1132. ResourceVersion: "1",
  1133. },
  1134. Subsets: []v1.EndpointSubset{{
  1135. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1136. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1137. }},
  1138. })
  1139. addPods(endpoints.podStore, ns, 1, 1, 0)
  1140. endpoints.serviceStore.Add(&v1.Service{
  1141. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1142. Spec: v1.ServiceSpec{
  1143. Selector: map[string]string{},
  1144. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1145. },
  1146. })
  1147. endpoints.syncService(ns + "/foo")
  1148. endpointsHandler.ValidateRequestCount(t, 1)
  1149. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  1150. ObjectMeta: metav1.ObjectMeta{
  1151. Name: "foo",
  1152. Namespace: ns,
  1153. ResourceVersion: "1",
  1154. Annotations: map[string]string{
  1155. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1156. },
  1157. },
  1158. Subsets: []v1.EndpointSubset{{
  1159. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1160. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1161. }},
  1162. })
  1163. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  1164. }
  1165. func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
  1166. ns := "other"
  1167. testServer, endpointsHandler := makeTestServer(t, ns)
  1168. defer testServer.Close()
  1169. endpoints := newController(testServer.URL)
  1170. endpoints.endpointsStore.Add(&v1.Endpoints{
  1171. ObjectMeta: metav1.ObjectMeta{
  1172. Name: "foo",
  1173. Namespace: ns,
  1174. ResourceVersion: "1",
  1175. Annotations: map[string]string{
  1176. v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
  1177. },
  1178. },
  1179. Subsets: []v1.EndpointSubset{{
  1180. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1181. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1182. }},
  1183. })
  1184. addPods(endpoints.podStore, ns, 1, 1, 0)
  1185. endpoints.serviceStore.Add(&v1.Service{
  1186. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
  1187. Spec: v1.ServiceSpec{
  1188. Selector: map[string]string{},
  1189. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1190. },
  1191. })
  1192. endpoints.syncService(ns + "/foo")
  1193. endpointsHandler.ValidateRequestCount(t, 1)
  1194. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  1195. ObjectMeta: metav1.ObjectMeta{
  1196. Name: "foo",
  1197. Namespace: ns,
  1198. ResourceVersion: "1",
  1199. Annotations: map[string]string{
  1200. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1201. },
  1202. },
  1203. Subsets: []v1.EndpointSubset{{
  1204. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1205. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1206. }},
  1207. })
  1208. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  1209. }
  1210. func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
  1211. ns := "other"
  1212. testServer, endpointsHandler := makeTestServer(t, ns)
  1213. defer testServer.Close()
  1214. endpoints := newController(testServer.URL)
  1215. endpoints.endpointsStore.Add(&v1.Endpoints{
  1216. ObjectMeta: metav1.ObjectMeta{
  1217. Name: "foo",
  1218. Namespace: ns,
  1219. ResourceVersion: "1",
  1220. Annotations: map[string]string{
  1221. v1.EndpointsLastChangeTriggerTime: triggerTimeString,
  1222. },
  1223. },
  1224. Subsets: []v1.EndpointSubset{{
  1225. Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  1226. Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  1227. }},
  1228. })
  1229. // Neither pod nor service has trigger time, this should cause annotation to be cleared.
  1230. addPods(endpoints.podStore, ns, 1, 1, 0)
  1231. endpoints.serviceStore.Add(&v1.Service{
  1232. ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
  1233. Spec: v1.ServiceSpec{
  1234. Selector: map[string]string{},
  1235. Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  1236. },
  1237. })
  1238. endpoints.syncService(ns + "/foo")
  1239. endpointsHandler.ValidateRequestCount(t, 1)
  1240. data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
  1241. ObjectMeta: metav1.ObjectMeta{
  1242. Name: "foo",
  1243. Namespace: ns,
  1244. ResourceVersion: "1",
  1245. Annotations: map[string]string{}, // Annotation not set anymore.
  1246. },
  1247. Subsets: []v1.EndpointSubset{{
  1248. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  1249. Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  1250. }},
  1251. })
  1252. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  1253. }