lease_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconcilers
  14. /*
  15. Original Source:
  16. https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler_test.go
  17. */
  18. import (
  19. "context"
  20. "net"
  21. "reflect"
  22. "testing"
  23. corev1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/client-go/kubernetes/fake"
  26. )
  27. type fakeLeases struct {
  28. keys map[string]bool
  29. }
  30. var _ Leases = &fakeLeases{}
  31. func newFakeLeases() *fakeLeases {
  32. return &fakeLeases{make(map[string]bool)}
  33. }
  34. func (f *fakeLeases) ListLeases() ([]string, error) {
  35. res := make([]string, 0, len(f.keys))
  36. for ip := range f.keys {
  37. res = append(res, ip)
  38. }
  39. return res, nil
  40. }
  41. func (f *fakeLeases) UpdateLease(ip string) error {
  42. f.keys[ip] = true
  43. return nil
  44. }
  45. func (f *fakeLeases) RemoveLease(ip string) error {
  46. delete(f.keys, ip)
  47. return nil
  48. }
  49. func (f *fakeLeases) SetKeys(keys []string) {
  50. for _, ip := range keys {
  51. f.keys[ip] = false
  52. }
  53. }
  54. func (f *fakeLeases) GetUpdatedKeys() []string {
  55. res := []string{}
  56. for ip, updated := range f.keys {
  57. if updated {
  58. res = append(res, ip)
  59. }
  60. }
  61. return res
  62. }
  63. func TestLeaseEndpointReconciler(t *testing.T) {
  64. ns := corev1.NamespaceDefault
  65. om := func(name string) metav1.ObjectMeta {
  66. return metav1.ObjectMeta{Namespace: ns, Name: name}
  67. }
  68. reconcileTests := []struct {
  69. testName string
  70. serviceName string
  71. ip string
  72. endpointPorts []corev1.EndpointPort
  73. endpointKeys []string
  74. endpoints *corev1.EndpointsList
  75. expectUpdate *corev1.Endpoints // nil means none expected
  76. }{
  77. {
  78. testName: "no existing endpoints",
  79. serviceName: "foo",
  80. ip: "1.2.3.4",
  81. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  82. endpoints: nil,
  83. expectUpdate: &corev1.Endpoints{
  84. ObjectMeta: om("foo"),
  85. Subsets: []corev1.EndpointSubset{{
  86. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  87. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  88. }},
  89. },
  90. },
  91. {
  92. testName: "existing endpoints satisfy",
  93. serviceName: "foo",
  94. ip: "1.2.3.4",
  95. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  96. endpoints: &corev1.EndpointsList{
  97. Items: []corev1.Endpoints{{
  98. ObjectMeta: om("foo"),
  99. Subsets: []corev1.EndpointSubset{{
  100. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  101. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  102. }},
  103. }},
  104. },
  105. },
  106. {
  107. testName: "existing endpoints satisfy + refresh existing key",
  108. serviceName: "foo",
  109. ip: "1.2.3.4",
  110. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  111. endpointKeys: []string{"1.2.3.4"},
  112. endpoints: &corev1.EndpointsList{
  113. Items: []corev1.Endpoints{{
  114. ObjectMeta: om("foo"),
  115. Subsets: []corev1.EndpointSubset{{
  116. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  117. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  118. }},
  119. }},
  120. },
  121. },
  122. {
  123. testName: "existing endpoints satisfy but too many",
  124. serviceName: "foo",
  125. ip: "1.2.3.4",
  126. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  127. endpoints: &corev1.EndpointsList{
  128. Items: []corev1.Endpoints{{
  129. ObjectMeta: om("foo"),
  130. Subsets: []corev1.EndpointSubset{{
  131. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
  132. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  133. }},
  134. }},
  135. },
  136. expectUpdate: &corev1.Endpoints{
  137. ObjectMeta: om("foo"),
  138. Subsets: []corev1.EndpointSubset{{
  139. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  140. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  141. }},
  142. },
  143. },
  144. {
  145. testName: "existing endpoints satisfy but too many + extra masters",
  146. serviceName: "foo",
  147. ip: "1.2.3.4",
  148. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  149. endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
  150. endpoints: &corev1.EndpointsList{
  151. Items: []corev1.Endpoints{{
  152. ObjectMeta: om("foo"),
  153. Subsets: []corev1.EndpointSubset{{
  154. Addresses: []corev1.EndpointAddress{
  155. {IP: "1.2.3.4"},
  156. {IP: "4.3.2.1"},
  157. {IP: "4.3.2.2"},
  158. {IP: "4.3.2.3"},
  159. {IP: "4.3.2.4"},
  160. },
  161. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  162. }},
  163. }},
  164. },
  165. expectUpdate: &corev1.Endpoints{
  166. ObjectMeta: om("foo"),
  167. Subsets: []corev1.EndpointSubset{{
  168. Addresses: []corev1.EndpointAddress{
  169. {IP: "1.2.3.4"},
  170. {IP: "4.3.2.2"},
  171. {IP: "4.3.2.3"},
  172. {IP: "4.3.2.4"},
  173. },
  174. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  175. }},
  176. },
  177. },
  178. {
  179. testName: "existing endpoints satisfy but too many + extra masters + delete first",
  180. serviceName: "foo",
  181. ip: "4.3.2.4",
  182. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  183. endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
  184. endpoints: &corev1.EndpointsList{
  185. Items: []corev1.Endpoints{{
  186. ObjectMeta: om("foo"),
  187. Subsets: []corev1.EndpointSubset{{
  188. Addresses: []corev1.EndpointAddress{
  189. {IP: "1.2.3.4"},
  190. {IP: "4.3.2.1"},
  191. {IP: "4.3.2.2"},
  192. {IP: "4.3.2.3"},
  193. {IP: "4.3.2.4"},
  194. },
  195. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  196. }},
  197. }},
  198. },
  199. expectUpdate: &corev1.Endpoints{
  200. ObjectMeta: om("foo"),
  201. Subsets: []corev1.EndpointSubset{{
  202. Addresses: []corev1.EndpointAddress{
  203. {IP: "4.3.2.1"},
  204. {IP: "4.3.2.2"},
  205. {IP: "4.3.2.3"},
  206. {IP: "4.3.2.4"},
  207. },
  208. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  209. }},
  210. },
  211. },
  212. {
  213. testName: "existing endpoints current IP missing",
  214. serviceName: "foo",
  215. ip: "4.3.2.2",
  216. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  217. endpointKeys: []string{"4.3.2.1"},
  218. endpoints: &corev1.EndpointsList{
  219. Items: []corev1.Endpoints{{
  220. ObjectMeta: om("foo"),
  221. Subsets: []corev1.EndpointSubset{{
  222. Addresses: []corev1.EndpointAddress{
  223. {IP: "4.3.2.1"},
  224. },
  225. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  226. }},
  227. }},
  228. },
  229. expectUpdate: &corev1.Endpoints{
  230. ObjectMeta: om("foo"),
  231. Subsets: []corev1.EndpointSubset{{
  232. Addresses: []corev1.EndpointAddress{
  233. {IP: "4.3.2.1"},
  234. {IP: "4.3.2.2"},
  235. },
  236. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  237. }},
  238. },
  239. },
  240. {
  241. testName: "existing endpoints wrong name",
  242. serviceName: "foo",
  243. ip: "1.2.3.4",
  244. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  245. endpoints: &corev1.EndpointsList{
  246. Items: []corev1.Endpoints{{
  247. ObjectMeta: om("bar"),
  248. Subsets: []corev1.EndpointSubset{{
  249. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  250. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  251. }},
  252. }},
  253. },
  254. expectUpdate: &corev1.Endpoints{
  255. ObjectMeta: om("foo"),
  256. Subsets: []corev1.EndpointSubset{{
  257. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  258. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  259. }},
  260. },
  261. },
  262. {
  263. testName: "existing endpoints wrong IP",
  264. serviceName: "foo",
  265. ip: "1.2.3.4",
  266. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  267. endpoints: &corev1.EndpointsList{
  268. Items: []corev1.Endpoints{{
  269. ObjectMeta: om("foo"),
  270. Subsets: []corev1.EndpointSubset{{
  271. Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
  272. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  273. }},
  274. }},
  275. },
  276. expectUpdate: &corev1.Endpoints{
  277. ObjectMeta: om("foo"),
  278. Subsets: []corev1.EndpointSubset{{
  279. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  280. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  281. }},
  282. },
  283. },
  284. {
  285. testName: "existing endpoints wrong port",
  286. serviceName: "foo",
  287. ip: "1.2.3.4",
  288. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  289. endpoints: &corev1.EndpointsList{
  290. Items: []corev1.Endpoints{{
  291. ObjectMeta: om("foo"),
  292. Subsets: []corev1.EndpointSubset{{
  293. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  294. Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
  295. }},
  296. }},
  297. },
  298. expectUpdate: &corev1.Endpoints{
  299. ObjectMeta: om("foo"),
  300. Subsets: []corev1.EndpointSubset{{
  301. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  302. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  303. }},
  304. },
  305. },
  306. {
  307. testName: "existing endpoints wrong protocol",
  308. serviceName: "foo",
  309. ip: "1.2.3.4",
  310. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  311. endpoints: &corev1.EndpointsList{
  312. Items: []corev1.Endpoints{{
  313. ObjectMeta: om("foo"),
  314. Subsets: []corev1.EndpointSubset{{
  315. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  316. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
  317. }},
  318. }},
  319. },
  320. expectUpdate: &corev1.Endpoints{
  321. ObjectMeta: om("foo"),
  322. Subsets: []corev1.EndpointSubset{{
  323. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  324. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  325. }},
  326. },
  327. },
  328. {
  329. testName: "existing endpoints wrong port name",
  330. serviceName: "foo",
  331. ip: "1.2.3.4",
  332. endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
  333. endpoints: &corev1.EndpointsList{
  334. Items: []corev1.Endpoints{{
  335. ObjectMeta: om("foo"),
  336. Subsets: []corev1.EndpointSubset{{
  337. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  338. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  339. }},
  340. }},
  341. },
  342. expectUpdate: &corev1.Endpoints{
  343. ObjectMeta: om("foo"),
  344. Subsets: []corev1.EndpointSubset{{
  345. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  346. Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
  347. }},
  348. },
  349. },
  350. {
  351. testName: "existing endpoints extra service ports satisfy",
  352. serviceName: "foo",
  353. ip: "1.2.3.4",
  354. endpointPorts: []corev1.EndpointPort{
  355. {Name: "foo", Port: 8080, Protocol: "TCP"},
  356. {Name: "bar", Port: 1000, Protocol: "TCP"},
  357. {Name: "baz", Port: 1010, Protocol: "TCP"},
  358. },
  359. endpoints: &corev1.EndpointsList{
  360. Items: []corev1.Endpoints{{
  361. ObjectMeta: om("foo"),
  362. Subsets: []corev1.EndpointSubset{{
  363. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  364. Ports: []corev1.EndpointPort{
  365. {Name: "foo", Port: 8080, Protocol: "TCP"},
  366. {Name: "bar", Port: 1000, Protocol: "TCP"},
  367. {Name: "baz", Port: 1010, Protocol: "TCP"},
  368. },
  369. }},
  370. }},
  371. },
  372. },
  373. {
  374. testName: "existing endpoints extra service ports missing port",
  375. serviceName: "foo",
  376. ip: "1.2.3.4",
  377. endpointPorts: []corev1.EndpointPort{
  378. {Name: "foo", Port: 8080, Protocol: "TCP"},
  379. {Name: "bar", Port: 1000, Protocol: "TCP"},
  380. },
  381. endpoints: &corev1.EndpointsList{
  382. Items: []corev1.Endpoints{{
  383. ObjectMeta: om("foo"),
  384. Subsets: []corev1.EndpointSubset{{
  385. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  386. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  387. }},
  388. }},
  389. },
  390. expectUpdate: &corev1.Endpoints{
  391. ObjectMeta: om("foo"),
  392. Subsets: []corev1.EndpointSubset{{
  393. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  394. Ports: []corev1.EndpointPort{
  395. {Name: "foo", Port: 8080, Protocol: "TCP"},
  396. {Name: "bar", Port: 1000, Protocol: "TCP"},
  397. },
  398. }},
  399. },
  400. },
  401. }
  402. for _, test := range reconcileTests {
  403. fakeLeases := newFakeLeases()
  404. fakeLeases.SetKeys(test.endpointKeys)
  405. clientset := fake.NewSimpleClientset()
  406. if test.endpoints != nil {
  407. for _, ep := range test.endpoints.Items {
  408. if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
  409. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  410. continue
  411. }
  412. }
  413. }
  414. epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
  415. r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
  416. err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
  417. if err != nil {
  418. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  419. }
  420. actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
  421. if err != nil {
  422. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  423. }
  424. if test.expectUpdate != nil {
  425. if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
  426. t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
  427. }
  428. }
  429. if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
  430. t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys)
  431. }
  432. }
  433. nonReconcileTests := []struct {
  434. testName string
  435. serviceName string
  436. ip string
  437. endpointPorts []corev1.EndpointPort
  438. endpointKeys []string
  439. endpoints *corev1.EndpointsList
  440. expectUpdate *corev1.Endpoints // nil means none expected
  441. }{
  442. {
  443. testName: "existing endpoints extra service ports missing port no update",
  444. serviceName: "foo",
  445. ip: "1.2.3.4",
  446. endpointPorts: []corev1.EndpointPort{
  447. {Name: "foo", Port: 8080, Protocol: "TCP"},
  448. {Name: "bar", Port: 1000, Protocol: "TCP"},
  449. },
  450. endpoints: &corev1.EndpointsList{
  451. Items: []corev1.Endpoints{{
  452. ObjectMeta: om("foo"),
  453. Subsets: []corev1.EndpointSubset{{
  454. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  455. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  456. }},
  457. }},
  458. },
  459. expectUpdate: nil,
  460. },
  461. {
  462. testName: "existing endpoints extra service ports, wrong ports, wrong IP",
  463. serviceName: "foo",
  464. ip: "1.2.3.4",
  465. endpointPorts: []corev1.EndpointPort{
  466. {Name: "foo", Port: 8080, Protocol: "TCP"},
  467. {Name: "bar", Port: 1000, Protocol: "TCP"},
  468. },
  469. endpoints: &corev1.EndpointsList{
  470. Items: []corev1.Endpoints{{
  471. ObjectMeta: om("foo"),
  472. Subsets: []corev1.EndpointSubset{{
  473. Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
  474. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  475. }},
  476. }},
  477. },
  478. expectUpdate: &corev1.Endpoints{
  479. ObjectMeta: om("foo"),
  480. Subsets: []corev1.EndpointSubset{{
  481. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  482. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  483. }},
  484. },
  485. },
  486. {
  487. testName: "no existing endpoints",
  488. serviceName: "foo",
  489. ip: "1.2.3.4",
  490. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  491. endpoints: nil,
  492. expectUpdate: &corev1.Endpoints{
  493. ObjectMeta: om("foo"),
  494. Subsets: []corev1.EndpointSubset{{
  495. Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
  496. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  497. }},
  498. },
  499. },
  500. }
  501. for _, test := range nonReconcileTests {
  502. t.Run(test.testName, func(t *testing.T) {
  503. fakeLeases := newFakeLeases()
  504. fakeLeases.SetKeys(test.endpointKeys)
  505. clientset := fake.NewSimpleClientset()
  506. if test.endpoints != nil {
  507. for _, ep := range test.endpoints.Items {
  508. if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
  509. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  510. continue
  511. }
  512. }
  513. }
  514. epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
  515. r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
  516. err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
  517. if err != nil {
  518. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  519. }
  520. actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
  521. if err != nil {
  522. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  523. }
  524. if test.expectUpdate != nil {
  525. if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
  526. t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
  527. }
  528. }
  529. if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
  530. t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys)
  531. }
  532. })
  533. }
  534. }
  535. func TestLeaseRemoveEndpoints(t *testing.T) {
  536. ns := corev1.NamespaceDefault
  537. om := func(name string) metav1.ObjectMeta {
  538. return metav1.ObjectMeta{Namespace: ns, Name: name}
  539. }
  540. stopTests := []struct {
  541. testName string
  542. serviceName string
  543. ip string
  544. endpointPorts []corev1.EndpointPort
  545. endpointKeys []string
  546. endpoints *corev1.EndpointsList
  547. expectUpdate *corev1.Endpoints // nil means none expected
  548. }{
  549. {
  550. testName: "successful stop reconciling",
  551. serviceName: "foo",
  552. ip: "1.2.3.4",
  553. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  554. endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
  555. endpoints: &corev1.EndpointsList{
  556. Items: []corev1.Endpoints{{
  557. ObjectMeta: om("foo"),
  558. Subsets: []corev1.EndpointSubset{{
  559. Addresses: []corev1.EndpointAddress{
  560. {IP: "1.2.3.4"},
  561. {IP: "4.3.2.2"},
  562. {IP: "4.3.2.3"},
  563. {IP: "4.3.2.4"},
  564. },
  565. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  566. }},
  567. }},
  568. },
  569. expectUpdate: &corev1.Endpoints{
  570. ObjectMeta: om("foo"),
  571. Subsets: []corev1.EndpointSubset{{
  572. Addresses: []corev1.EndpointAddress{
  573. {IP: "4.3.2.2"},
  574. {IP: "4.3.2.3"},
  575. {IP: "4.3.2.4"},
  576. },
  577. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  578. }},
  579. },
  580. },
  581. {
  582. testName: "stop reconciling with ip not in endpoint ip list",
  583. serviceName: "foo",
  584. ip: "5.6.7.8",
  585. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  586. endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
  587. endpoints: &corev1.EndpointsList{
  588. Items: []corev1.Endpoints{{
  589. ObjectMeta: om("foo"),
  590. Subsets: []corev1.EndpointSubset{{
  591. Addresses: []corev1.EndpointAddress{
  592. {IP: "1.2.3.4"},
  593. {IP: "4.3.2.2"},
  594. {IP: "4.3.2.3"},
  595. {IP: "4.3.2.4"},
  596. },
  597. Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  598. }},
  599. }},
  600. },
  601. },
  602. {
  603. testName: "endpoint with no subset",
  604. serviceName: "foo",
  605. ip: "5.6.7.8",
  606. endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
  607. endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
  608. endpoints: &corev1.EndpointsList{
  609. Items: []corev1.Endpoints{{
  610. ObjectMeta: om("foo"),
  611. Subsets: nil,
  612. }},
  613. },
  614. },
  615. }
  616. for _, test := range stopTests {
  617. t.Run(test.testName, func(t *testing.T) {
  618. fakeLeases := newFakeLeases()
  619. fakeLeases.SetKeys(test.endpointKeys)
  620. clientset := fake.NewSimpleClientset()
  621. for _, ep := range test.endpoints.Items {
  622. if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
  623. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  624. continue
  625. }
  626. }
  627. epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
  628. r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
  629. err := r.RemoveEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
  630. if err != nil {
  631. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  632. }
  633. actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
  634. if err != nil {
  635. t.Errorf("case %q: unexpected error: %v", test.testName, err)
  636. }
  637. if test.expectUpdate != nil {
  638. if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
  639. t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
  640. }
  641. }
  642. for _, key := range fakeLeases.GetUpdatedKeys() {
  643. if key == test.ip {
  644. t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key)
  645. }
  646. }
  647. })
  648. }
  649. }