utils.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package network
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "net"
  20. "net/http"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/onsi/ginkgo"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/labels"
  28. "k8s.io/apimachinery/pkg/util/intstr"
  29. utilnet "k8s.io/apimachinery/pkg/util/net"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/uuid"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. clientset "k8s.io/client-go/kubernetes"
  34. coreclientset "k8s.io/client-go/kubernetes/typed/core/v1"
  35. "k8s.io/kubernetes/test/e2e/framework"
  36. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  37. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  38. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  39. imageutils "k8s.io/kubernetes/test/utils/image"
  40. )
  41. const (
  42. // EndpointHTTPPort is an endpoint HTTP port for testing.
  43. EndpointHTTPPort = 8080
  44. // EndpointUDPPort is an endpoint UDP port for testing.
  45. EndpointUDPPort = 8081
  46. testContainerHTTPPort = 8080
  47. // ClusterHTTPPort is a cluster HTTP port for testing.
  48. ClusterHTTPPort = 80
  49. // ClusterUDPPort is a cluster UDP port for testing.
  50. ClusterUDPPort = 90
  51. testPodName = "test-container-pod"
  52. hostTestPodName = "host-test-container-pod"
  53. nodePortServiceName = "node-port-service"
  54. sessionAffinityServiceName = "session-affinity-service"
  55. // wait time between poll attempts of a Service vip and/or nodePort.
  56. // coupled with testTries to produce a net timeout value.
  57. hitEndpointRetryDelay = 2 * time.Second
  58. // Number of retries to hit a given set of endpoints. Needs to be high
  59. // because we verify iptables statistical rr loadbalancing.
  60. testTries = 30
  61. // Maximum number of pods in a test, to make test work in large clusters.
  62. maxNetProxyPodsCount = 10
  63. // SessionAffinityChecks is number of checks to hit a given set of endpoints when enable session affinity.
  64. SessionAffinityChecks = 10
  65. // RegexIPv4 is a regex to match IPv4 addresses
  66. RegexIPv4 = "(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)"
  67. // RegexIPv6 is a regex to match IPv6 addresses
  68. RegexIPv6 = "(?:(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){6})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:::(?:(?:(?:[0-9a-fA-F]{1,4})):){5})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){4})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,1}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){3})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,2}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){2})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,3}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:[0-9a-fA-F]{1,4})):)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,4}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,5}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,6}(?:(?:[0-9a-fA-F]{1,4})))?::))))"
  69. resizeNodeReadyTimeout = 2 * time.Minute
  70. resizeNodeNotReadyTimeout = 2 * time.Minute
  71. // netexec dial commands
  72. // the destination will echo its hostname.
  73. echoHostname = "hostname"
  74. )
  75. // NetexecImageName is the image name for agnhost.
  76. var NetexecImageName = imageutils.GetE2EImage(imageutils.Agnhost)
  77. // NewNetworkingTestConfig creates and sets up a new test config helper.
  78. func NewNetworkingTestConfig(f *framework.Framework, hostNetwork bool) *NetworkingTestConfig {
  79. config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name, HostNetwork: hostNetwork}
  80. ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
  81. config.setup(getServiceSelector())
  82. return config
  83. }
  84. // NewCoreNetworkingTestConfig creates and sets up a new test config helper for Node E2E.
  85. func NewCoreNetworkingTestConfig(f *framework.Framework, hostNetwork bool) *NetworkingTestConfig {
  86. config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name, HostNetwork: hostNetwork}
  87. ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
  88. config.setupCore(getServiceSelector())
  89. return config
  90. }
  91. func getServiceSelector() map[string]string {
  92. ginkgo.By("creating a selector")
  93. selectorName := "selector-" + string(uuid.NewUUID())
  94. serviceSelector := map[string]string{
  95. selectorName: "true",
  96. }
  97. return serviceSelector
  98. }
  99. // NetworkingTestConfig is a convenience class around some utility methods
  100. // for testing kubeproxy/networking/services/endpoints.
  101. type NetworkingTestConfig struct {
  102. // TestContaienrPod is a test pod running the netexec image. It is capable
  103. // of executing tcp/udp requests against ip:port.
  104. TestContainerPod *v1.Pod
  105. // HostTestContainerPod is a pod running using the hostexec image.
  106. HostTestContainerPod *v1.Pod
  107. // if the HostTestContainerPod is running with HostNetwork=true.
  108. HostNetwork bool
  109. // EndpointPods are the pods belonging to the Service created by this
  110. // test config. Each invocation of `setup` creates a service with
  111. // 1 pod per node running the netexecImage.
  112. EndpointPods []*v1.Pod
  113. f *framework.Framework
  114. podClient *framework.PodClient
  115. // NodePortService is a Service with Type=NodePort spanning over all
  116. // endpointPods.
  117. NodePortService *v1.Service
  118. // SessionAffinityService is a Service with SessionAffinity=ClientIP
  119. // spanning over all endpointPods.
  120. SessionAffinityService *v1.Service
  121. // ExternalAddrs is a list of external IPs of nodes in the cluster.
  122. ExternalAddr string
  123. // Nodes is a list of nodes in the cluster.
  124. Nodes []v1.Node
  125. // MaxTries is the number of retries tolerated for tests run against
  126. // endpoints and services created by this config.
  127. MaxTries int
  128. // The ClusterIP of the Service reated by this test config.
  129. ClusterIP string
  130. // External ip of first node for use in nodePort testing.
  131. NodeIP string
  132. // The http/udp nodePorts of the Service.
  133. NodeHTTPPort int
  134. NodeUDPPort int
  135. // The kubernetes namespace within which all resources for this
  136. // config are created
  137. Namespace string
  138. }
  139. // DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container.
  140. func (config *NetworkingTestConfig) DialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  141. config.DialFromContainer(protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
  142. }
  143. // DialFromTestContainer executes a curl via kubectl exec in a test container.
  144. func (config *NetworkingTestConfig) DialFromTestContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  145. config.DialFromContainer(protocol, echoHostname, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedEps)
  146. }
  147. // DialEchoFromTestContainer executes a curl via kubectl exec in a test container. The response is expected to match the echoMessage.
  148. func (config *NetworkingTestConfig) DialEchoFromTestContainer(protocol, targetIP string, targetPort, maxTries, minTries int, echoMessage string) {
  149. expectedResponse := sets.NewString()
  150. expectedResponse.Insert(echoMessage)
  151. var dialCommand string
  152. // NOTE(claudiub): netexec /dialCommand will send a request to the given targetIP and targetPort as follows:
  153. // for HTTP: it will send a request to: http://targetIP:targetPort/dialCommand
  154. // for UDP: it will send targetCommand as a message. The consumer receives the data message and looks for
  155. // a few starting strings, including echo, and treats it accordingly.
  156. if protocol == "http" {
  157. dialCommand = fmt.Sprintf("echo?msg=%s", echoMessage)
  158. } else {
  159. dialCommand = fmt.Sprintf("echo%%20%s", echoMessage)
  160. }
  161. config.DialFromContainer(protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedResponse)
  162. }
  163. // diagnoseMissingEndpoints prints debug information about the endpoints that
  164. // are NOT in the given list of foundEndpoints. These are the endpoints we
  165. // expected a response from.
  166. func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
  167. for _, e := range config.EndpointPods {
  168. if foundEndpoints.Has(e.Name) {
  169. continue
  170. }
  171. framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
  172. desc, _ := framework.RunKubectl(
  173. e.Namespace, "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
  174. framework.Logf(desc)
  175. }
  176. }
  177. // EndpointHostnames returns a set of hostnames for existing endpoints.
  178. func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
  179. expectedEps := sets.NewString()
  180. for _, p := range config.EndpointPods {
  181. expectedEps.Insert(p.Name)
  182. }
  183. return expectedEps
  184. }
  185. // DialFromContainer executes a curl via kubectl exec in a test container,
  186. // which might then translate to a tcp or udp request based on the protocol
  187. // argument in the url.
  188. // - minTries is the minimum number of curl attempts required before declaring
  189. // success. Set to 0 if you'd like to return as soon as all endpoints respond
  190. // at least once.
  191. // - maxTries is the maximum number of curl attempts. If this many attempts pass
  192. // and we don't see all expected endpoints, the test fails.
  193. // - targetIP is the source Pod IP that will dial the given dialCommand using the given protocol.
  194. // - dialCommand is the command that the targetIP will send to the targetIP using the given protocol.
  195. // the dialCommand should be formatted properly for the protocol (http: URL path+parameters,
  196. // udp: command%20parameters, where parameters are optional)
  197. // - expectedResponses is the unordered set of responses to wait for. The responses are based on
  198. // the dialCommand; for example, for the dialCommand "hostname", the expectedResponses
  199. // should contain the hostnames reported by each pod in the service through /hostName.
  200. // maxTries == minTries will confirm that we see the expected endpoints and no
  201. // more for maxTries. Use this if you want to eg: fail a readiness check on a
  202. // pod and confirm it doesn't show up as an endpoint.
  203. func (config *NetworkingTestConfig) DialFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) {
  204. ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
  205. // The current versions of curl included in CentOS and RHEL distros
  206. // misinterpret square brackets around IPv6 as globbing, so use the -g
  207. // argument to disable globbing to handle the IPv6 case.
  208. cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
  209. ipPort,
  210. dialCommand,
  211. protocol,
  212. targetIP,
  213. targetPort)
  214. responses := sets.NewString()
  215. for i := 0; i < maxTries; i++ {
  216. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd)
  217. if err != nil {
  218. // A failure to kubectl exec counts as a try, not a hard fail.
  219. // Also note that we will keep failing for maxTries in tests where
  220. // we confirm unreachability.
  221. framework.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr)
  222. } else {
  223. var output map[string][]string
  224. if err := json.Unmarshal([]byte(stdout), &output); err != nil {
  225. framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
  226. cmd, config.HostTestContainerPod.Name, stdout, err)
  227. continue
  228. }
  229. for _, response := range output["responses"] {
  230. trimmed := strings.TrimSpace(response)
  231. if trimmed != "" {
  232. responses.Insert(trimmed)
  233. }
  234. }
  235. }
  236. framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses))
  237. // Check against i+1 so we exit if minTries == maxTries.
  238. if (responses.Equal(expectedResponses) || responses.Len() == 0 && expectedResponses.Len() == 0) && i+1 >= minTries {
  239. return
  240. }
  241. // TODO: get rid of this delay #36281
  242. time.Sleep(hitEndpointRetryDelay)
  243. }
  244. if dialCommand == echoHostname {
  245. config.diagnoseMissingEndpoints(responses)
  246. }
  247. framework.Failf("Failed to find expected responses:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", maxTries, cmd, responses, expectedResponses)
  248. }
  249. // GetEndpointsFromTestContainer executes a curl via kubectl exec in a test container.
  250. func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(protocol, targetIP string, targetPort, tries int) (sets.String, error) {
  251. return config.GetEndpointsFromContainer(protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, tries)
  252. }
  253. // GetEndpointsFromContainer executes a curl via kubectl exec in a test container,
  254. // which might then translate to a tcp or udp request based on the protocol argument
  255. // in the url. It returns all different endpoints from multiple retries.
  256. // - tries is the number of curl attempts. If this many attempts pass and
  257. // we don't see any endpoints, the test fails.
  258. func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) {
  259. ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
  260. // The current versions of curl included in CentOS and RHEL distros
  261. // misinterpret square brackets around IPv6 as globbing, so use the -g
  262. // argument to disable globbing to handle the IPv6 case.
  263. cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'",
  264. ipPort,
  265. protocol,
  266. targetIP,
  267. targetPort)
  268. eps := sets.NewString()
  269. for i := 0; i < tries; i++ {
  270. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd)
  271. if err != nil {
  272. // A failure to kubectl exec counts as a try, not a hard fail.
  273. // Also note that we will keep failing for maxTries in tests where
  274. // we confirm unreachability.
  275. framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
  276. } else {
  277. framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in: %#v", tries, i, stdout, stderr, config.HostTestContainerPod)
  278. var output map[string][]string
  279. if err := json.Unmarshal([]byte(stdout), &output); err != nil {
  280. framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
  281. cmd, config.HostTestContainerPod.Name, stdout, err)
  282. continue
  283. }
  284. for _, hostName := range output["responses"] {
  285. trimmed := strings.TrimSpace(hostName)
  286. if trimmed != "" {
  287. eps.Insert(trimmed)
  288. }
  289. }
  290. // TODO: get rid of this delay #36281
  291. time.Sleep(hitEndpointRetryDelay)
  292. }
  293. }
  294. return eps, nil
  295. }
  296. // DialFromNode executes a tcp or udp request based on protocol via kubectl exec
  297. // in a test container running with host networking.
  298. // - minTries is the minimum number of curl attempts required before declaring
  299. // success. Set to 0 if you'd like to return as soon as all endpoints respond
  300. // at least once.
  301. // - maxTries is the maximum number of curl attempts. If this many attempts pass
  302. // and we don't see all expected endpoints, the test fails.
  303. // maxTries == minTries will confirm that we see the expected endpoints and no
  304. // more for maxTries. Use this if you want to eg: fail a readiness check on a
  305. // pod and confirm it doesn't show up as an endpoint.
  306. func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  307. var cmd string
  308. if protocol == "udp" {
  309. // TODO: It would be enough to pass 1s+epsilon to timeout, but unfortunately
  310. // busybox timeout doesn't support non-integer values.
  311. cmd = fmt.Sprintf("echo hostName | nc -w 1 -u %s %d", targetIP, targetPort)
  312. } else {
  313. ipPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
  314. // The current versions of curl included in CentOS and RHEL distros
  315. // misinterpret square brackets around IPv6 as globbing, so use the -g
  316. // argument to disable globbing to handle the IPv6 case.
  317. cmd = fmt.Sprintf("curl -g -q -s --max-time 15 --connect-timeout 1 http://%s/hostName", ipPort)
  318. }
  319. // TODO: This simply tells us that we can reach the endpoints. Check that
  320. // the probability of hitting a specific endpoint is roughly the same as
  321. // hitting any other.
  322. eps := sets.NewString()
  323. filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
  324. for i := 0; i < maxTries; i++ {
  325. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.HostTestContainerPod.Name, filterCmd)
  326. if err != nil || len(stderr) > 0 {
  327. // A failure to exec command counts as a try, not a hard fail.
  328. // Also note that we will keep failing for maxTries in tests where
  329. // we confirm unreachability.
  330. framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
  331. } else {
  332. trimmed := strings.TrimSpace(stdout)
  333. if trimmed != "" {
  334. eps.Insert(trimmed)
  335. }
  336. }
  337. // Check against i+1 so we exit if minTries == maxTries.
  338. if eps.Equal(expectedEps) && i+1 >= minTries {
  339. framework.Logf("Found all expected endpoints: %+v", eps.List())
  340. return
  341. }
  342. framework.Logf("Waiting for %+v endpoints (expected=%+v, actual=%+v)", expectedEps.Difference(eps).List(), expectedEps.List(), eps.List())
  343. // TODO: get rid of this delay #36281
  344. time.Sleep(hitEndpointRetryDelay)
  345. }
  346. config.diagnoseMissingEndpoints(eps)
  347. framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", maxTries, cmd, eps, expectedEps)
  348. }
  349. // GetSelfURL executes a curl against the given path via kubectl exec into a
  350. // test container running with host networking, and fails if the output
  351. // doesn't match the expected string.
  352. func (config *NetworkingTestConfig) GetSelfURL(port int32, path string, expected string) {
  353. cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
  354. ginkgo.By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
  355. config.executeCurlCmd(cmd, expected)
  356. }
  357. // GetSelfURLStatusCode executes a curl against the given path via kubectl exec into a
  358. // test container running with host networking, and fails if the returned status
  359. // code doesn't match the expected string.
  360. func (config *NetworkingTestConfig) GetSelfURLStatusCode(port int32, path string, expected string) {
  361. // check status code
  362. cmd := fmt.Sprintf("curl -o /dev/null -i -q -s -w %%{http_code} --connect-timeout 1 http://localhost:%d%s", port, path)
  363. ginkgo.By(fmt.Sprintf("Checking status code against http://localhost:%d%s", port, path))
  364. config.executeCurlCmd(cmd, expected)
  365. }
  366. func (config *NetworkingTestConfig) executeCurlCmd(cmd string, expected string) {
  367. // These are arbitrary timeouts. The curl command should pass on first try,
  368. // unless remote server is starved/bootstrapping/restarting etc.
  369. const retryInterval = 1 * time.Second
  370. const retryTimeout = 30 * time.Second
  371. podName := config.HostTestContainerPod.Name
  372. var msg string
  373. if pollErr := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
  374. stdout, err := framework.RunHostCmd(config.Namespace, podName, cmd)
  375. if err != nil {
  376. msg = fmt.Sprintf("failed executing cmd %v in %v/%v: %v", cmd, config.Namespace, podName, err)
  377. framework.Logf(msg)
  378. return false, nil
  379. }
  380. if !strings.Contains(stdout, expected) {
  381. msg = fmt.Sprintf("successfully executed %v in %v/%v, but output '%v' doesn't contain expected string '%v'", cmd, config.Namespace, podName, stdout, expected)
  382. framework.Logf(msg)
  383. return false, nil
  384. }
  385. return true, nil
  386. }); pollErr != nil {
  387. framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", config.Namespace, podName)
  388. desc, _ := framework.RunKubectl(
  389. config.Namespace, "describe", "pod", podName, fmt.Sprintf("--namespace=%v", config.Namespace))
  390. framework.Logf("%s", desc)
  391. framework.Failf("Timed out in %v: %v", retryTimeout, msg)
  392. }
  393. }
  394. func (config *NetworkingTestConfig) createNetShellPodSpec(podName, hostname string) *v1.Pod {
  395. probe := &v1.Probe{
  396. InitialDelaySeconds: 10,
  397. TimeoutSeconds: 30,
  398. PeriodSeconds: 10,
  399. SuccessThreshold: 1,
  400. FailureThreshold: 3,
  401. Handler: v1.Handler{
  402. HTTPGet: &v1.HTTPGetAction{
  403. Path: "/healthz",
  404. Port: intstr.IntOrString{IntVal: EndpointHTTPPort},
  405. },
  406. },
  407. }
  408. pod := &v1.Pod{
  409. TypeMeta: metav1.TypeMeta{
  410. Kind: "Pod",
  411. APIVersion: "v1",
  412. },
  413. ObjectMeta: metav1.ObjectMeta{
  414. Name: podName,
  415. Namespace: config.Namespace,
  416. },
  417. Spec: v1.PodSpec{
  418. Containers: []v1.Container{
  419. {
  420. Name: "webserver",
  421. Image: NetexecImageName,
  422. ImagePullPolicy: v1.PullIfNotPresent,
  423. Args: []string{
  424. "netexec",
  425. fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
  426. fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
  427. },
  428. Ports: []v1.ContainerPort{
  429. {
  430. Name: "http",
  431. ContainerPort: EndpointHTTPPort,
  432. },
  433. {
  434. Name: "udp",
  435. ContainerPort: EndpointUDPPort,
  436. Protocol: v1.ProtocolUDP,
  437. },
  438. },
  439. LivenessProbe: probe,
  440. ReadinessProbe: probe,
  441. },
  442. },
  443. NodeSelector: map[string]string{
  444. "kubernetes.io/hostname": hostname,
  445. },
  446. },
  447. }
  448. return pod
  449. }
  450. func (config *NetworkingTestConfig) createTestPodSpec() *v1.Pod {
  451. pod := &v1.Pod{
  452. TypeMeta: metav1.TypeMeta{
  453. Kind: "Pod",
  454. APIVersion: "v1",
  455. },
  456. ObjectMeta: metav1.ObjectMeta{
  457. Name: testPodName,
  458. Namespace: config.Namespace,
  459. },
  460. Spec: v1.PodSpec{
  461. Containers: []v1.Container{
  462. {
  463. Name: "webserver",
  464. Image: NetexecImageName,
  465. ImagePullPolicy: v1.PullIfNotPresent,
  466. Args: []string{
  467. "netexec",
  468. fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
  469. fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
  470. },
  471. Ports: []v1.ContainerPort{
  472. {
  473. Name: "http",
  474. ContainerPort: testContainerHTTPPort,
  475. },
  476. },
  477. },
  478. },
  479. },
  480. }
  481. return pod
  482. }
  483. func (config *NetworkingTestConfig) createNodePortServiceSpec(svcName string, selector map[string]string, enableSessionAffinity bool) *v1.Service {
  484. sessionAffinity := v1.ServiceAffinityNone
  485. if enableSessionAffinity {
  486. sessionAffinity = v1.ServiceAffinityClientIP
  487. }
  488. return &v1.Service{
  489. ObjectMeta: metav1.ObjectMeta{
  490. Name: svcName,
  491. },
  492. Spec: v1.ServiceSpec{
  493. Type: v1.ServiceTypeNodePort,
  494. Ports: []v1.ServicePort{
  495. {Port: ClusterHTTPPort, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(EndpointHTTPPort)},
  496. {Port: ClusterUDPPort, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(EndpointUDPPort)},
  497. },
  498. Selector: selector,
  499. SessionAffinity: sessionAffinity,
  500. },
  501. }
  502. }
  503. func (config *NetworkingTestConfig) createNodePortService(selector map[string]string) {
  504. config.NodePortService = config.createService(config.createNodePortServiceSpec(nodePortServiceName, selector, false))
  505. }
  506. func (config *NetworkingTestConfig) createSessionAffinityService(selector map[string]string) {
  507. config.SessionAffinityService = config.createService(config.createNodePortServiceSpec(sessionAffinityServiceName, selector, true))
  508. }
  509. // DeleteNodePortService deletes NodePort service.
  510. func (config *NetworkingTestConfig) DeleteNodePortService() {
  511. err := config.getServiceClient().Delete(context.TODO(), config.NodePortService.Name, nil)
  512. framework.ExpectNoError(err, "error while deleting NodePortService. err:%v)", err)
  513. time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
  514. }
  515. func (config *NetworkingTestConfig) createTestPods() {
  516. testContainerPod := config.createTestPodSpec()
  517. hostTestContainerPod := e2epod.NewExecPodSpec(config.Namespace, hostTestPodName, config.HostNetwork)
  518. config.createPod(testContainerPod)
  519. if config.HostNetwork {
  520. config.createPod(hostTestContainerPod)
  521. }
  522. framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
  523. var err error
  524. config.TestContainerPod, err = config.getPodClient().Get(context.TODO(), testContainerPod.Name, metav1.GetOptions{})
  525. if err != nil {
  526. framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
  527. }
  528. if config.HostNetwork {
  529. framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
  530. config.HostTestContainerPod, err = config.getPodClient().Get(context.TODO(), hostTestContainerPod.Name, metav1.GetOptions{})
  531. if err != nil {
  532. framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
  533. }
  534. }
  535. }
  536. func (config *NetworkingTestConfig) createService(serviceSpec *v1.Service) *v1.Service {
  537. _, err := config.getServiceClient().Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
  538. framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  539. err = framework.WaitForService(config.f.ClientSet, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
  540. framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
  541. createdService, err := config.getServiceClient().Get(context.TODO(), serviceSpec.Name, metav1.GetOptions{})
  542. framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  543. return createdService
  544. }
  545. // setupCore sets up the pods and core test config
  546. // mainly for simplified node e2e setup
  547. func (config *NetworkingTestConfig) setupCore(selector map[string]string) {
  548. ginkgo.By("Creating the service pods in kubernetes")
  549. podName := "netserver"
  550. config.EndpointPods = config.createNetProxyPods(podName, selector)
  551. ginkgo.By("Creating test pods")
  552. config.createTestPods()
  553. epCount := len(config.EndpointPods)
  554. config.MaxTries = epCount*epCount + testTries
  555. }
  556. // setup includes setupCore and also sets up services
  557. func (config *NetworkingTestConfig) setup(selector map[string]string) {
  558. config.setupCore(selector)
  559. ginkgo.By("Getting node addresses")
  560. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute))
  561. nodeList, err := e2enode.GetReadySchedulableNodes(config.f.ClientSet)
  562. framework.ExpectNoError(err)
  563. config.ExternalAddr = e2enode.FirstAddress(nodeList, v1.NodeExternalIP)
  564. e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
  565. config.Nodes = nodeList.Items
  566. ginkgo.By("Creating the service on top of the pods in kubernetes")
  567. config.createNodePortService(selector)
  568. config.createSessionAffinityService(selector)
  569. for _, p := range config.NodePortService.Spec.Ports {
  570. switch p.Protocol {
  571. case v1.ProtocolUDP:
  572. config.NodeUDPPort = int(p.NodePort)
  573. case v1.ProtocolTCP:
  574. config.NodeHTTPPort = int(p.NodePort)
  575. default:
  576. continue
  577. }
  578. }
  579. config.ClusterIP = config.NodePortService.Spec.ClusterIP
  580. if config.ExternalAddr != "" {
  581. config.NodeIP = config.ExternalAddr
  582. } else {
  583. config.NodeIP = e2enode.FirstAddress(nodeList, v1.NodeInternalIP)
  584. }
  585. }
  586. func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*v1.Pod {
  587. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute))
  588. nodeList, err := e2enode.GetBoundedReadySchedulableNodes(config.f.ClientSet, maxNetProxyPodsCount)
  589. framework.ExpectNoError(err)
  590. nodes := nodeList.Items
  591. // create pods, one for each node
  592. createdPods := make([]*v1.Pod, 0, len(nodes))
  593. for i, n := range nodes {
  594. podName := fmt.Sprintf("%s-%d", podName, i)
  595. hostname, _ := n.Labels["kubernetes.io/hostname"]
  596. pod := config.createNetShellPodSpec(podName, hostname)
  597. pod.ObjectMeta.Labels = selector
  598. createdPod := config.createPod(pod)
  599. createdPods = append(createdPods, createdPod)
  600. }
  601. // wait that all of them are up
  602. runningPods := make([]*v1.Pod, 0, len(nodes))
  603. for _, p := range createdPods {
  604. framework.ExpectNoError(config.f.WaitForPodReady(p.Name))
  605. rp, err := config.getPodClient().Get(context.TODO(), p.Name, metav1.GetOptions{})
  606. framework.ExpectNoError(err)
  607. runningPods = append(runningPods, rp)
  608. }
  609. return runningPods
  610. }
  611. // DeleteNetProxyPod deletes the first endpoint pod and waits for it being removed.
  612. func (config *NetworkingTestConfig) DeleteNetProxyPod() {
  613. pod := config.EndpointPods[0]
  614. config.getPodClient().Delete(context.TODO(), pod.Name, metav1.NewDeleteOptions(0))
  615. config.EndpointPods = config.EndpointPods[1:]
  616. // wait for pod being deleted.
  617. err := e2epod.WaitForPodToDisappear(config.f.ClientSet, config.Namespace, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
  618. if err != nil {
  619. framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
  620. }
  621. // wait for endpoint being removed.
  622. err = framework.WaitForServiceEndpointsNum(config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
  623. if err != nil {
  624. framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
  625. }
  626. // wait for kube-proxy to catch up with the pod being deleted.
  627. time.Sleep(5 * time.Second)
  628. }
  629. func (config *NetworkingTestConfig) createPod(pod *v1.Pod) *v1.Pod {
  630. return config.getPodClient().Create(pod)
  631. }
  632. func (config *NetworkingTestConfig) getPodClient() *framework.PodClient {
  633. if config.podClient == nil {
  634. config.podClient = config.f.PodClient()
  635. }
  636. return config.podClient
  637. }
  638. func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInterface {
  639. return config.f.ClientSet.CoreV1().Services(config.Namespace)
  640. }
  641. // HTTPPokeParams is a struct for HTTP poke parameters.
  642. type HTTPPokeParams struct {
  643. Timeout time.Duration
  644. ExpectCode int // default = 200
  645. BodyContains string
  646. RetriableCodes []int
  647. }
  648. // HTTPPokeResult is a struct for HTTP poke result.
  649. type HTTPPokeResult struct {
  650. Status HTTPPokeStatus
  651. Code int // HTTP code: 0 if the connection was not made
  652. Error error // if there was any error
  653. Body []byte // if code != 0
  654. }
  655. // HTTPPokeStatus is string for representing HTTP poke status.
  656. type HTTPPokeStatus string
  657. const (
  658. // HTTPSuccess is HTTP poke status which is success.
  659. HTTPSuccess HTTPPokeStatus = "Success"
  660. // HTTPError is HTTP poke status which is error.
  661. HTTPError HTTPPokeStatus = "UnknownError"
  662. // HTTPTimeout is HTTP poke status which is timeout.
  663. HTTPTimeout HTTPPokeStatus = "TimedOut"
  664. // HTTPRefused is HTTP poke status which is connection refused.
  665. HTTPRefused HTTPPokeStatus = "ConnectionRefused"
  666. // HTTPRetryCode is HTTP poke status which is retry code.
  667. HTTPRetryCode HTTPPokeStatus = "RetryCode"
  668. // HTTPWrongCode is HTTP poke status which is wrong code.
  669. HTTPWrongCode HTTPPokeStatus = "WrongCode"
  670. // HTTPBadResponse is HTTP poke status which is bad response.
  671. HTTPBadResponse HTTPPokeStatus = "BadResponse"
  672. // Any time we add new errors, we should audit all callers of this.
  673. )
  674. // PokeHTTP tries to connect to a host on a port for a given URL path. Callers
  675. // can specify additional success parameters, if desired.
  676. //
  677. // The result status will be characterized as precisely as possible, given the
  678. // known users of this.
  679. //
  680. // The result code will be zero in case of any failure to connect, or non-zero
  681. // if the HTTP transaction completed (even if the other test params make this a
  682. // failure).
  683. //
  684. // The result error will be populated for any status other than Success.
  685. //
  686. // The result body will be populated if the HTTP transaction was completed, even
  687. // if the other test params make this a failure).
  688. func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
  689. hostPort := net.JoinHostPort(host, strconv.Itoa(port))
  690. url := fmt.Sprintf("http://%s%s", hostPort, path)
  691. ret := HTTPPokeResult{}
  692. // Sanity check inputs, because it has happened. These are the only things
  693. // that should hard fail the test - they are basically ASSERT()s.
  694. if host == "" {
  695. framework.Failf("Got empty host for HTTP poke (%s)", url)
  696. return ret
  697. }
  698. if port == 0 {
  699. framework.Failf("Got port==0 for HTTP poke (%s)", url)
  700. return ret
  701. }
  702. // Set default params.
  703. if params == nil {
  704. params = &HTTPPokeParams{}
  705. }
  706. if params.ExpectCode == 0 {
  707. params.ExpectCode = http.StatusOK
  708. }
  709. framework.Logf("Poking %q", url)
  710. resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
  711. if err != nil {
  712. ret.Error = err
  713. neterr, ok := err.(net.Error)
  714. if ok && neterr.Timeout() {
  715. ret.Status = HTTPTimeout
  716. } else if strings.Contains(err.Error(), "connection refused") {
  717. ret.Status = HTTPRefused
  718. } else {
  719. ret.Status = HTTPError
  720. }
  721. framework.Logf("Poke(%q): %v", url, err)
  722. return ret
  723. }
  724. ret.Code = resp.StatusCode
  725. defer resp.Body.Close()
  726. body, err := ioutil.ReadAll(resp.Body)
  727. if err != nil {
  728. ret.Status = HTTPError
  729. ret.Error = fmt.Errorf("error reading HTTP body: %v", err)
  730. framework.Logf("Poke(%q): %v", url, ret.Error)
  731. return ret
  732. }
  733. ret.Body = make([]byte, len(body))
  734. copy(ret.Body, body)
  735. if resp.StatusCode != params.ExpectCode {
  736. for _, code := range params.RetriableCodes {
  737. if resp.StatusCode == code {
  738. ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
  739. ret.Status = HTTPRetryCode
  740. framework.Logf("Poke(%q): %v", url, ret.Error)
  741. return ret
  742. }
  743. }
  744. ret.Status = HTTPWrongCode
  745. ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
  746. framework.Logf("Poke(%q): %v", url, ret.Error)
  747. return ret
  748. }
  749. if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
  750. ret.Status = HTTPBadResponse
  751. ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
  752. framework.Logf("Poke(%q): %v", url, ret.Error)
  753. return ret
  754. }
  755. ret.Status = HTTPSuccess
  756. framework.Logf("Poke(%q): success", url)
  757. return ret
  758. }
  759. // Does an HTTP GET, but does not reuse TCP connections
  760. // This masks problems where the iptables rule has changed, but we don't see it
  761. func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
  762. tr := utilnet.SetTransportDefaults(&http.Transport{
  763. DisableKeepAlives: true,
  764. })
  765. client := &http.Client{
  766. Transport: tr,
  767. Timeout: timeout,
  768. }
  769. return client.Get(url)
  770. }
  771. // TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
  772. // At the end (even in case of errors), the network traffic is brought back to normal.
  773. // This function executes commands on a node so it will work only for some
  774. // environments.
  775. func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
  776. host, err := e2enode.GetExternalIP(node)
  777. if err != nil {
  778. framework.Failf("Error getting node external ip : %v", err)
  779. }
  780. masterAddresses := framework.GetAllMasterAddresses(c)
  781. ginkgo.By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
  782. defer func() {
  783. // This code will execute even if setting the iptables rule failed.
  784. // It is on purpose because we may have an error even if the new rule
  785. // had been inserted. (yes, we could look at the error code and ssh error
  786. // separately, but I prefer to stay on the safe side).
  787. ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
  788. for _, masterAddress := range masterAddresses {
  789. framework.UnblockNetwork(host, masterAddress)
  790. }
  791. }()
  792. framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
  793. if !e2enode.WaitConditionToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
  794. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  795. }
  796. for _, masterAddress := range masterAddresses {
  797. framework.BlockNetwork(host, masterAddress)
  798. }
  799. framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
  800. if !e2enode.WaitConditionToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
  801. framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
  802. }
  803. testFunc()
  804. // network traffic is unblocked in a deferred function
  805. }