ports.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. This soak tests places a specified number of pods on each node and then
  15. repeatedly sends queries to a service running on these pods via
  16. a serivce
  17. */
  18. package endpoints
  19. import (
  20. "fmt"
  21. "sort"
  22. "time"
  23. "github.com/onsi/ginkgo"
  24. v1 "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/types"
  27. clientset "k8s.io/client-go/kubernetes"
  28. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  29. )
  30. // ServiceStartTimeout is how long to wait for a service endpoint to be resolvable.
  31. const ServiceStartTimeout = 3 * time.Minute
  32. // PortsByPodName is a map that maps pod name to container ports.
  33. type PortsByPodName map[string][]int
  34. // PortsByPodUID is a map that maps pod UID to container ports.
  35. type PortsByPodUID map[types.UID][]int
  36. // GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
  37. func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
  38. m := PortsByPodUID{}
  39. for _, ss := range ep.Subsets {
  40. for _, port := range ss.Ports {
  41. for _, addr := range ss.Addresses {
  42. containerPort := port.Port
  43. if _, ok := m[addr.TargetRef.UID]; !ok {
  44. m[addr.TargetRef.UID] = make([]int, 0)
  45. }
  46. m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
  47. }
  48. }
  49. }
  50. return m
  51. }
  52. func translatePodNameToUID(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) (PortsByPodUID, error) {
  53. portsByUID := make(PortsByPodUID)
  54. for name, portList := range expectedEndpoints {
  55. pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{})
  56. if err != nil {
  57. return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
  58. }
  59. portsByUID[pod.ObjectMeta.UID] = portList
  60. }
  61. return portsByUID, nil
  62. }
  63. func validatePorts(ep PortsByPodUID, expectedEndpoints PortsByPodUID) error {
  64. if len(ep) != len(expectedEndpoints) {
  65. // should not happen because we check this condition before
  66. return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
  67. }
  68. for podUID := range expectedEndpoints {
  69. if _, ok := ep[podUID]; !ok {
  70. return fmt.Errorf("endpoint %v not found", podUID)
  71. }
  72. if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
  73. return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  74. }
  75. sort.Ints(ep[podUID])
  76. sort.Ints(expectedEndpoints[podUID])
  77. for index := range ep[podUID] {
  78. if ep[podUID][index] != expectedEndpoints[podUID][index] {
  79. return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
  80. }
  81. }
  82. }
  83. return nil
  84. }
  85. // ValidateEndpointsPorts validates that the given service exists and is served by the given expectedEndpoints.
  86. func ValidateEndpointsPorts(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) error {
  87. ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
  88. i := 1
  89. for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
  90. ep, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
  91. if err != nil {
  92. e2elog.Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
  93. continue
  94. }
  95. portsByPodUID := GetContainerPortsByPodUID(ep)
  96. expectedPortsByPodUID, err := translatePodNameToUID(c, namespace, expectedEndpoints)
  97. if err != nil {
  98. return err
  99. }
  100. if len(portsByPodUID) == len(expectedEndpoints) {
  101. err := validatePorts(portsByPodUID, expectedPortsByPodUID)
  102. if err != nil {
  103. return err
  104. }
  105. e2elog.Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
  106. serviceName, namespace, expectedEndpoints, time.Since(start))
  107. return nil
  108. }
  109. if i%5 == 0 {
  110. e2elog.Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
  111. }
  112. i++
  113. }
  114. if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil {
  115. for _, pod := range pods.Items {
  116. e2elog.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
  117. }
  118. } else {
  119. e2elog.Logf("Can't list pod debug info: %v", err)
  120. }
  121. return fmt.Errorf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
  122. }