endpointslice.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package network
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. v1 "k8s.io/api/core/v1"
  19. discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/intstr"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. clientset "k8s.io/client-go/kubernetes"
  25. "k8s.io/kubernetes/test/e2e/framework"
  26. imageutils "k8s.io/kubernetes/test/utils/image"
  27. "github.com/onsi/ginkgo"
  28. )
  29. var _ = SIGDescribe("EndpointSlice [Feature:EndpointSlice]", func() {
  30. version := "v1"
  31. ginkgo.Context("version "+version, func() {
  32. f := framework.NewDefaultFramework("endpointslice")
  33. var cs clientset.Interface
  34. var podClient *framework.PodClient
  35. ginkgo.BeforeEach(func() {
  36. cs = f.ClientSet
  37. podClient = f.PodClient()
  38. })
  39. labelPod1 := "pod1"
  40. labelPod2 := "pod2"
  41. labelPod3 := "pod3"
  42. labelShared12 := "shared12"
  43. labelValue := "on"
  44. ginkgo.It("should create Endpoints and EndpointSlices for Pods matching a Service", func() {
  45. pod1 := podClient.Create(&v1.Pod{
  46. ObjectMeta: metav1.ObjectMeta{
  47. Name: "pod1",
  48. Labels: map[string]string{
  49. labelPod1: labelValue,
  50. labelShared12: labelValue,
  51. },
  52. },
  53. Spec: v1.PodSpec{
  54. Containers: []v1.Container{
  55. {
  56. Name: "container1",
  57. Image: imageutils.GetE2EImage(imageutils.Nginx),
  58. Ports: []v1.ContainerPort{{
  59. Name: "example-name",
  60. ContainerPort: int32(3000),
  61. }},
  62. },
  63. },
  64. },
  65. })
  66. pod2 := podClient.Create(&v1.Pod{
  67. ObjectMeta: metav1.ObjectMeta{
  68. Name: "pod2",
  69. Labels: map[string]string{
  70. labelPod2: labelValue,
  71. labelShared12: labelValue,
  72. },
  73. },
  74. Spec: v1.PodSpec{
  75. Containers: []v1.Container{
  76. {
  77. Name: "container1",
  78. Image: imageutils.GetE2EImage(imageutils.Nginx),
  79. Ports: []v1.ContainerPort{{
  80. Name: "example-name",
  81. ContainerPort: int32(3001),
  82. }, {
  83. Name: "other-port",
  84. ContainerPort: int32(3002),
  85. }},
  86. },
  87. },
  88. },
  89. })
  90. svc1 := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{
  91. ObjectMeta: metav1.ObjectMeta{
  92. Name: "example-int-port",
  93. },
  94. Spec: v1.ServiceSpec{
  95. Selector: map[string]string{labelPod1: labelValue},
  96. Ports: []v1.ServicePort{{
  97. Name: "example",
  98. Port: 80,
  99. TargetPort: intstr.FromInt(3000),
  100. Protocol: v1.ProtocolTCP,
  101. }},
  102. },
  103. })
  104. svc2 := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{
  105. ObjectMeta: metav1.ObjectMeta{
  106. Name: "example-named-port",
  107. },
  108. Spec: v1.ServiceSpec{
  109. Selector: map[string]string{labelShared12: labelValue},
  110. Ports: []v1.ServicePort{{
  111. Name: "http",
  112. Port: 80,
  113. TargetPort: intstr.FromString("example-name"),
  114. Protocol: v1.ProtocolTCP,
  115. }},
  116. },
  117. })
  118. svc3 := createServiceReportErr(cs, f.Namespace.Name, &v1.Service{
  119. ObjectMeta: metav1.ObjectMeta{
  120. Name: "example-no-match",
  121. },
  122. Spec: v1.ServiceSpec{
  123. Selector: map[string]string{labelPod3: labelValue},
  124. Ports: []v1.ServicePort{{
  125. Name: "example-no-match",
  126. Port: 80,
  127. TargetPort: intstr.FromInt(8080),
  128. Protocol: v1.ProtocolTCP,
  129. }},
  130. },
  131. })
  132. err := wait.Poll(5*time.Second, 3*time.Minute, func() (bool, error) {
  133. if !podClient.PodIsReady(pod1.Name) {
  134. framework.Logf("Pod 1 not ready yet")
  135. return false, nil
  136. }
  137. if !podClient.PodIsReady(pod2.Name) {
  138. framework.Logf("Pod 2 not ready yet")
  139. return false, nil
  140. }
  141. var err error
  142. pod1, err = podClient.Get(context.TODO(), pod1.Name, metav1.GetOptions{})
  143. if err != nil {
  144. return false, err
  145. }
  146. pod2, err = podClient.Get(context.TODO(), pod2.Name, metav1.GetOptions{})
  147. if err != nil {
  148. return false, err
  149. }
  150. return true, nil
  151. })
  152. framework.ExpectNoError(err)
  153. ginkgo.By("referencing a single matching pod")
  154. expectEndpointsAndSlices(cs, f.Namespace.Name, svc1, []*v1.Pod{pod1}, 1, 1, false)
  155. ginkgo.By("referencing matching pods with named port")
  156. expectEndpointsAndSlices(cs, f.Namespace.Name, svc2, []*v1.Pod{pod1, pod2}, 2, 2, true)
  157. ginkgo.By("creating empty endpoints and endpointslices for no matching pods")
  158. expectEndpointsAndSlices(cs, f.Namespace.Name, svc3, []*v1.Pod{}, 0, 1, false)
  159. })
  160. })
  161. })
  162. // expectEndpointsAndSlices verifies that Endpoints and EndpointSlices exist for
  163. // a given Service and Namespace with the appropriate attributes set. This is a
  164. // relatively complex function as the order of attributes or resources is not
  165. // necessarily consistent. It is used as a helper function for the tests above
  166. // and takes some shortcuts with the assumption that those test cases will be
  167. // the only caller of this function.
  168. func expectEndpointsAndSlices(cs clientset.Interface, ns string, svc *v1.Service, pods []*v1.Pod, numSubsets, numSlices int, namedPort bool) {
  169. endpointSlices := []discoveryv1beta1.EndpointSlice{}
  170. endpoints := &v1.Endpoints{}
  171. err := wait.Poll(5*time.Second, 1*time.Minute, func() (bool, error) {
  172. endpointSlicesFound, matchingSlices := hasMatchingEndpointSlices(cs, ns, svc.Name, len(pods), numSlices)
  173. if !matchingSlices {
  174. framework.Logf("Matching EndpointSlices not found")
  175. return false, nil
  176. }
  177. endpointsFound, matchingEndpoints := hasMatchingEndpoints(cs, ns, svc.Name, len(pods), numSubsets)
  178. if !matchingEndpoints {
  179. framework.Logf("Matching EndpointSlices not found")
  180. return false, nil
  181. }
  182. endpointSlices = endpointSlicesFound
  183. endpoints = endpointsFound
  184. return true, nil
  185. })
  186. framework.ExpectNoError(err)
  187. podsByIP := map[string]*v1.Pod{}
  188. for _, pod := range pods {
  189. podsByIP[pod.Status.PodIP] = pod
  190. if len(pod.Spec.Containers) != 1 {
  191. framework.Failf("Expected pod to have 1 container, got %d", len(pod.Spec.Containers))
  192. }
  193. }
  194. if endpoints.Name != svc.Name {
  195. framework.Failf("Expected Endpoints name to be %s, got %s", svc.Name, endpoints.Name)
  196. }
  197. totalEndpointAddresses := 0
  198. for _, subset := range endpoints.Subsets {
  199. addresses := append(subset.Addresses, subset.NotReadyAddresses...)
  200. totalEndpointAddresses += len(addresses)
  201. if len(subset.Ports) != len(svc.Spec.Ports) {
  202. framework.Failf("Expected subset to have %d ports, got %d", len(svc.Spec.Ports), len(subset.Ports))
  203. }
  204. // If not a named port, the subset ports should directly correspond with
  205. // the Service ports.
  206. if !namedPort {
  207. for i, subsetPort := range subset.Ports {
  208. svcPort := svc.Spec.Ports[i]
  209. if subsetPort.Name != svcPort.Name {
  210. framework.Failf("Expected port name to be %s, got %s", svcPort.Name, subsetPort.Name)
  211. }
  212. if subsetPort.Protocol != svcPort.Protocol {
  213. framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, subsetPort.Protocol)
  214. }
  215. if subsetPort.Port != svcPort.TargetPort.IntVal {
  216. framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, subsetPort.Port)
  217. }
  218. }
  219. }
  220. for _, address := range addresses {
  221. pod, ok := podsByIP[address.IP]
  222. if !ok {
  223. framework.Failf("Unexpected address with IP: %s", address.IP)
  224. }
  225. ensurePodTargetRef(pod, address.TargetRef)
  226. // If a named port, the subset ports should directly correspond with
  227. // each individual pod.
  228. if namedPort {
  229. container := pod.Spec.Containers[0]
  230. for _, port := range container.Ports {
  231. if port.Name == svc.Spec.Ports[0].TargetPort.String() {
  232. subsetPort := subset.Ports[0]
  233. if subsetPort.Port != port.ContainerPort {
  234. framework.Failf("Expected subset port to be %d, got %d", port.ContainerPort, subsetPort.Port)
  235. }
  236. if subsetPort.Name != svc.Spec.Ports[0].Name {
  237. framework.Failf("Expected subset port name to be %s, got %s", svc.Spec.Ports[0].Name, subsetPort.Name)
  238. }
  239. }
  240. }
  241. }
  242. }
  243. }
  244. if len(pods) != totalEndpointAddresses {
  245. framework.Failf("Expected %d addresses, got %d", len(pods), totalEndpointAddresses)
  246. }
  247. if len(pods) == 0 && len(endpointSlices) != 1 {
  248. framework.Failf("Expected 1 EndpointSlice, got %d", len(endpointSlices))
  249. }
  250. totalEndpointSliceAddresses := 0
  251. for _, endpointSlice := range endpointSlices {
  252. totalEndpointSliceAddresses += len(endpointSlice.Endpoints)
  253. if len(pods) == 0 && len(endpointSlice.Ports) != 0 {
  254. framework.Failf("Expected EndpointSlice to have 0 ports, got %d", len(endpointSlice.Ports))
  255. }
  256. if len(pods) > 0 && len(endpointSlice.Ports) != len(svc.Spec.Ports) {
  257. framework.Failf("Expected EndpointSlice to have %d ports, got %d", len(svc.Spec.Ports), len(endpointSlice.Ports))
  258. }
  259. // If not a named port, the EndpointSlice ports should directly
  260. // correspond with the Service ports.
  261. if !namedPort {
  262. for i, esPort := range endpointSlice.Ports {
  263. svcPort := svc.Spec.Ports[i]
  264. if *esPort.Name != svcPort.Name {
  265. framework.Failf("Expected port name to be %s, got %s", svcPort.Name, *esPort.Name)
  266. }
  267. if *esPort.Protocol != svcPort.Protocol {
  268. framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, *esPort.Protocol)
  269. }
  270. if *esPort.Port != svcPort.TargetPort.IntVal {
  271. framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, *esPort.Port)
  272. }
  273. }
  274. }
  275. for _, endpoint := range endpointSlice.Endpoints {
  276. if len(endpoint.Addresses) == 0 {
  277. framework.Failf("Expected EndpointSlice endpoint to have at least 1 address")
  278. }
  279. pod, ok := podsByIP[endpoint.Addresses[0]]
  280. if !ok {
  281. framework.Failf("Unexpected address with IP: %s", endpoint.Addresses[0])
  282. }
  283. ensurePodTargetRef(pod, endpoint.TargetRef)
  284. // If a named port, the EndpointSlice ports should directly
  285. // correspond with each individual pod.
  286. if namedPort {
  287. container := pod.Spec.Containers[0]
  288. for _, port := range container.Ports {
  289. if port.Name == svc.Spec.Ports[0].TargetPort.String() {
  290. esPort := endpointSlice.Ports[0]
  291. if *esPort.Port != port.ContainerPort {
  292. framework.Failf("Expected EndpointSlice port to be %d, got %d", port.ContainerPort, *esPort.Port)
  293. }
  294. if *esPort.Name != svc.Spec.Ports[0].Name {
  295. framework.Failf("Expected EndpointSlice port name to be %s, got %s", svc.Spec.Ports[0].Name, *esPort.Name)
  296. }
  297. }
  298. }
  299. }
  300. }
  301. }
  302. if len(pods) != totalEndpointSliceAddresses {
  303. framework.Failf("Expected %d addresses, got %d", len(pods), totalEndpointSliceAddresses)
  304. }
  305. }
  306. // hasMatchingEndpointSlices returns any EndpointSlices that match the
  307. // conditions along with a boolean indicating if all the conditions have been
  308. // met.
  309. func hasMatchingEndpointSlices(cs clientset.Interface, ns, svcName string, numEndpoints, numSlices int) ([]discoveryv1beta1.EndpointSlice, bool) {
  310. listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1beta1.LabelServiceName, svcName)}
  311. esList, err := cs.DiscoveryV1beta1().EndpointSlices(ns).List(context.TODO(), listOptions)
  312. framework.ExpectNoError(err, "Error fetching EndpointSlice for %s/%s Service", ns, svcName)
  313. if len(esList.Items) == 0 {
  314. framework.Logf("EndpointSlice for %s/%s Service not found", ns, svcName)
  315. return []discoveryv1beta1.EndpointSlice{}, false
  316. }
  317. if len(esList.Items) != numSlices {
  318. framework.Logf("Expected %d EndpointSlices for %s/%s Service, got %d", numSlices, ns, svcName, len(esList.Items))
  319. return []discoveryv1beta1.EndpointSlice{}, false
  320. }
  321. actualNumEndpoints := 0
  322. for _, endpointSlice := range esList.Items {
  323. actualNumEndpoints += len(endpointSlice.Endpoints)
  324. }
  325. if actualNumEndpoints != numEndpoints {
  326. framework.Logf("EndpointSlices for %s/%s Service have %d/%d endpoints", ns, svcName, actualNumEndpoints, numEndpoints)
  327. return []discoveryv1beta1.EndpointSlice{}, false
  328. }
  329. return esList.Items, true
  330. }
  331. // hasMatchingEndpoints returns any Endpoints that match the conditions along
  332. // with a boolean indicating if all the conditions have been met.
  333. func hasMatchingEndpoints(cs clientset.Interface, ns, svcName string, numIPs, numSubsets int) (*v1.Endpoints, bool) {
  334. endpoints, err := cs.CoreV1().Endpoints(ns).Get(context.TODO(), svcName, metav1.GetOptions{})
  335. if err != nil {
  336. if apierrors.IsNotFound(err) {
  337. framework.Logf("Endpoints for %s/%s Service not found", ns, svcName)
  338. return nil, false
  339. }
  340. framework.ExpectNoError(err, "Error fetching Endpoints for %s/%s Service", ns, svcName)
  341. }
  342. if len(endpoints.Subsets) != numSubsets {
  343. framework.Logf("Endpoints for %s/%s Service with %d/%d Subsets", ns, svcName, len(endpoints.Subsets), numSubsets)
  344. return nil, false
  345. }
  346. actualNumIPs := 0
  347. for _, endpointSubset := range endpoints.Subsets {
  348. actualNumIPs += len(endpointSubset.Addresses) + len(endpointSubset.NotReadyAddresses)
  349. }
  350. if actualNumIPs != numIPs {
  351. framework.Logf("Endpoints for %s/%s Service with %d/%d IPs", ns, svcName, actualNumIPs, numIPs)
  352. return nil, false
  353. }
  354. return endpoints, true
  355. }
  356. // ensurePodTargetRef ensures that a Pod matches the provided target reference.
  357. func ensurePodTargetRef(pod *v1.Pod, targetRef *v1.ObjectReference) {
  358. if targetRef == nil {
  359. framework.Failf("Expected TargetRef to not be nil")
  360. }
  361. if targetRef.Kind != "Pod" {
  362. framework.Failf("Expected TargetRef.Kind to be Pod, got %s", targetRef.Kind)
  363. }
  364. if targetRef.Namespace != pod.Namespace {
  365. framework.Failf("Expected TargetRef.Namespace to be %s, got %s", pod.Namespace, targetRef.Namespace)
  366. }
  367. if targetRef.Name != pod.Name {
  368. framework.Failf("Expected TargetRef.Name to be %s, got %s", pod.Name, targetRef.Name)
  369. }
  370. if targetRef.UID != pod.UID {
  371. framework.Failf("Expected TargetRef.UID to be %s, got %s", pod.UID, targetRef.UID)
  372. }
  373. }
  374. // createServiceReportErr creates a Service and reports any associated error.
  375. func createServiceReportErr(cs clientset.Interface, ns string, service *v1.Service) *v1.Service {
  376. svc, err := cs.CoreV1().Services(ns).Create(context.TODO(), service, metav1.CreateOptions{})
  377. framework.ExpectNoError(err)
  378. return svc
  379. }