networking_utils.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io/ioutil"
  18. "net"
  19. "net/http"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "github.com/onsi/ginkgo"
  24. "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. utilnet "k8s.io/apimachinery/pkg/util/net"
  29. "k8s.io/apimachinery/pkg/util/rand"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/uuid"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. clientset "k8s.io/client-go/kubernetes"
  34. coreclientset "k8s.io/client-go/kubernetes/typed/core/v1"
  35. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  36. imageutils "k8s.io/kubernetes/test/utils/image"
  37. )
  38. const (
  39. // EndpointHTTPPort is an endpoint HTTP port for testing.
  40. EndpointHTTPPort = 8080
  41. // EndpointUDPPort is an endpoint UDP port for testing.
  42. EndpointUDPPort = 8081
  43. testContainerHTTPPort = 8080
  44. // ClusterHTTPPort is a cluster HTTP port for testing.
  45. ClusterHTTPPort = 80
  46. // ClusterUDPPort is a cluster UDP port for testing.
  47. ClusterUDPPort = 90
  48. testPodName = "test-container-pod"
  49. hostTestPodName = "host-test-container-pod"
  50. nodePortServiceName = "node-port-service"
  51. sessionAffinityServiceName = "session-affinity-service"
  52. // wait time between poll attempts of a Service vip and/or nodePort.
  53. // coupled with testTries to produce a net timeout value.
  54. hitEndpointRetryDelay = 2 * time.Second
  55. // Number of retries to hit a given set of endpoints. Needs to be high
  56. // because we verify iptables statistical rr loadbalancing.
  57. testTries = 30
  58. // Maximum number of pods in a test, to make test work in large clusters.
  59. maxNetProxyPodsCount = 10
  60. // SessionAffinityChecks is number of checks to hit a given set of endpoints when enable session affinity.
  61. SessionAffinityChecks = 10
  62. // RegexIPv4 is a regex to match IPv4 addresses
  63. RegexIPv4 = "(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)"
  64. // RegexIPv6 is a regex to match IPv6 addresses
  65. RegexIPv6 = "(?:(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){6})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:::(?:(?:(?:[0-9a-fA-F]{1,4})):){5})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){4})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,1}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){3})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,2}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){2})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,3}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:[0-9a-fA-F]{1,4})):)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,4}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,5}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,6}(?:(?:[0-9a-fA-F]{1,4})))?::))))"
  66. )
  67. var netexecImageName = imageutils.GetE2EImage(imageutils.Netexec)
  68. // NewNetworkingTestConfig creates and sets up a new test config helper.
  69. func NewNetworkingTestConfig(f *Framework) *NetworkingTestConfig {
  70. config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name, HostNetwork: true}
  71. ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
  72. config.setup(getServiceSelector())
  73. return config
  74. }
  75. // NewCoreNetworkingTestConfig creates and sets up a new test config helper for Node E2E.
  76. func NewCoreNetworkingTestConfig(f *Framework, hostNetwork bool) *NetworkingTestConfig {
  77. config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name, HostNetwork: hostNetwork}
  78. ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
  79. config.setupCore(getServiceSelector())
  80. return config
  81. }
  82. func getServiceSelector() map[string]string {
  83. ginkgo.By("creating a selector")
  84. selectorName := "selector-" + string(uuid.NewUUID())
  85. serviceSelector := map[string]string{
  86. selectorName: "true",
  87. }
  88. return serviceSelector
  89. }
  90. // NetworkingTestConfig is a convenience class around some utility methods
  91. // for testing kubeproxy/networking/services/endpoints.
  92. type NetworkingTestConfig struct {
  93. // TestContaienrPod is a test pod running the netexec image. It is capable
  94. // of executing tcp/udp requests against ip:port.
  95. TestContainerPod *v1.Pod
  96. // HostTestContainerPod is a pod running using the hostexec image.
  97. HostTestContainerPod *v1.Pod
  98. // if the HostTestContainerPod is running with HostNetwork=true.
  99. HostNetwork bool
  100. // EndpointPods are the pods belonging to the Service created by this
  101. // test config. Each invocation of `setup` creates a service with
  102. // 1 pod per node running the netexecImage.
  103. EndpointPods []*v1.Pod
  104. f *Framework
  105. podClient *PodClient
  106. // NodePortService is a Service with Type=NodePort spanning over all
  107. // endpointPods.
  108. NodePortService *v1.Service
  109. // SessionAffinityService is a Service with SessionAffinity=ClientIP
  110. // spanning over all endpointPods.
  111. SessionAffinityService *v1.Service
  112. // ExternalAddrs is a list of external IPs of nodes in the cluster.
  113. ExternalAddrs []string
  114. // Nodes is a list of nodes in the cluster.
  115. Nodes []v1.Node
  116. // MaxTries is the number of retries tolerated for tests run against
  117. // endpoints and services created by this config.
  118. MaxTries int
  119. // The ClusterIP of the Service reated by this test config.
  120. ClusterIP string
  121. // External ip of first node for use in nodePort testing.
  122. NodeIP string
  123. // The http/udp nodePorts of the Service.
  124. NodeHTTPPort int
  125. NodeUDPPort int
  126. // The kubernetes namespace within which all resources for this
  127. // config are created
  128. Namespace string
  129. }
  130. // DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container.
  131. func (config *NetworkingTestConfig) DialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  132. config.DialFromContainer(protocol, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
  133. }
  134. // DialFromTestContainer executes a curl via kubectl exec in a test container.
  135. func (config *NetworkingTestConfig) DialFromTestContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  136. config.DialFromContainer(protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedEps)
  137. }
  138. // diagnoseMissingEndpoints prints debug information about the endpoints that
  139. // are NOT in the given list of foundEndpoints. These are the endpoints we
  140. // expected a response from.
  141. func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
  142. for _, e := range config.EndpointPods {
  143. if foundEndpoints.Has(e.Name) {
  144. continue
  145. }
  146. e2elog.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
  147. desc, _ := RunKubectl(
  148. "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
  149. e2elog.Logf(desc)
  150. }
  151. }
  152. // EndpointHostnames returns a set of hostnames for existing endpoints.
  153. func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
  154. expectedEps := sets.NewString()
  155. for _, p := range config.EndpointPods {
  156. expectedEps.Insert(p.Name)
  157. }
  158. return expectedEps
  159. }
  160. // DialFromContainer executes a curl via kubectl exec in a test container,
  161. // which might then translate to a tcp or udp request based on the protocol
  162. // argument in the url.
  163. // - minTries is the minimum number of curl attempts required before declaring
  164. // success. Set to 0 if you'd like to return as soon as all endpoints respond
  165. // at least once.
  166. // - maxTries is the maximum number of curl attempts. If this many attempts pass
  167. // and we don't see all expected endpoints, the test fails.
  168. // - expectedEps is the set of endpointnames to wait for. Typically this is also
  169. // the hostname reported by each pod in the service through /hostName.
  170. // maxTries == minTries will confirm that we see the expected endpoints and no
  171. // more for maxTries. Use this if you want to eg: fail a readiness check on a
  172. // pod and confirm it doesn't show up as an endpoint.
  173. func (config *NetworkingTestConfig) DialFromContainer(protocol, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedEps sets.String) {
  174. ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
  175. // The current versions of curl included in CentOS and RHEL distros
  176. // misinterpret square brackets around IPv6 as globbing, so use the -g
  177. // argument to disable globbing to handle the IPv6 case.
  178. cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'",
  179. ipPort,
  180. protocol,
  181. targetIP,
  182. targetPort)
  183. eps := sets.NewString()
  184. for i := 0; i < maxTries; i++ {
  185. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.HostTestContainerPod.Name, cmd)
  186. if err != nil {
  187. // A failure to kubectl exec counts as a try, not a hard fail.
  188. // Also note that we will keep failing for maxTries in tests where
  189. // we confirm unreachability.
  190. e2elog.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr)
  191. } else {
  192. var output map[string][]string
  193. if err := json.Unmarshal([]byte(stdout), &output); err != nil {
  194. e2elog.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
  195. cmd, config.HostTestContainerPod.Name, stdout, err)
  196. continue
  197. }
  198. for _, hostName := range output["responses"] {
  199. trimmed := strings.TrimSpace(hostName)
  200. if trimmed != "" {
  201. eps.Insert(trimmed)
  202. }
  203. }
  204. }
  205. e2elog.Logf("Waiting for endpoints: %v", expectedEps.Difference(eps))
  206. // Check against i+1 so we exit if minTries == maxTries.
  207. if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries {
  208. return
  209. }
  210. // TODO: get rid of this delay #36281
  211. time.Sleep(hitEndpointRetryDelay)
  212. }
  213. config.diagnoseMissingEndpoints(eps)
  214. Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", maxTries, cmd, eps, expectedEps)
  215. }
  216. // GetEndpointsFromTestContainer executes a curl via kubectl exec in a test container.
  217. func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(protocol, targetIP string, targetPort, tries int) (sets.String, error) {
  218. return config.GetEndpointsFromContainer(protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, tries)
  219. }
  220. // GetEndpointsFromContainer executes a curl via kubectl exec in a test container,
  221. // which might then translate to a tcp or udp request based on the protocol argument
  222. // in the url. It returns all different endpoints from multiple retries.
  223. // - tries is the number of curl attempts. If this many attempts pass and
  224. // we don't see any endpoints, the test fails.
  225. func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) {
  226. ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
  227. // The current versions of curl included in CentOS and RHEL distros
  228. // misinterpret square brackets around IPv6 as globbing, so use the -g
  229. // argument to disable globbing to handle the IPv6 case.
  230. cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'",
  231. ipPort,
  232. protocol,
  233. targetIP,
  234. targetPort)
  235. eps := sets.NewString()
  236. for i := 0; i < tries; i++ {
  237. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.HostTestContainerPod.Name, cmd)
  238. if err != nil {
  239. // A failure to kubectl exec counts as a try, not a hard fail.
  240. // Also note that we will keep failing for maxTries in tests where
  241. // we confirm unreachability.
  242. e2elog.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
  243. } else {
  244. e2elog.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in: %#v", tries, i, stdout, stderr, config.HostTestContainerPod)
  245. var output map[string][]string
  246. if err := json.Unmarshal([]byte(stdout), &output); err != nil {
  247. e2elog.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
  248. cmd, config.HostTestContainerPod.Name, stdout, err)
  249. continue
  250. }
  251. for _, hostName := range output["responses"] {
  252. trimmed := strings.TrimSpace(hostName)
  253. if trimmed != "" {
  254. eps.Insert(trimmed)
  255. }
  256. }
  257. // TODO: get rid of this delay #36281
  258. time.Sleep(hitEndpointRetryDelay)
  259. }
  260. }
  261. return eps, nil
  262. }
  263. // DialFromNode executes a tcp or udp request based on protocol via kubectl exec
  264. // in a test container running with host networking.
  265. // - minTries is the minimum number of curl attempts required before declaring
  266. // success. Set to 0 if you'd like to return as soon as all endpoints respond
  267. // at least once.
  268. // - maxTries is the maximum number of curl attempts. If this many attempts pass
  269. // and we don't see all expected endpoints, the test fails.
  270. // maxTries == minTries will confirm that we see the expected endpoints and no
  271. // more for maxTries. Use this if you want to eg: fail a readiness check on a
  272. // pod and confirm it doesn't show up as an endpoint.
  273. func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
  274. var cmd string
  275. if protocol == "udp" {
  276. // TODO: It would be enough to pass 1s+epsilon to timeout, but unfortunately
  277. // busybox timeout doesn't support non-integer values.
  278. cmd = fmt.Sprintf("echo hostName | nc -w 1 -u %s %d", targetIP, targetPort)
  279. } else {
  280. ipPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
  281. // The current versions of curl included in CentOS and RHEL distros
  282. // misinterpret square brackets around IPv6 as globbing, so use the -g
  283. // argument to disable globbing to handle the IPv6 case.
  284. cmd = fmt.Sprintf("curl -g -q -s --max-time 15 --connect-timeout 1 http://%s/hostName", ipPort)
  285. }
  286. // TODO: This simply tells us that we can reach the endpoints. Check that
  287. // the probability of hitting a specific endpoint is roughly the same as
  288. // hitting any other.
  289. eps := sets.NewString()
  290. filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
  291. for i := 0; i < maxTries; i++ {
  292. stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.HostTestContainerPod.Name, filterCmd)
  293. if err != nil || len(stderr) > 0 {
  294. // A failure to exec command counts as a try, not a hard fail.
  295. // Also note that we will keep failing for maxTries in tests where
  296. // we confirm unreachability.
  297. e2elog.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
  298. } else {
  299. trimmed := strings.TrimSpace(stdout)
  300. if trimmed != "" {
  301. eps.Insert(trimmed)
  302. }
  303. }
  304. // Check against i+1 so we exit if minTries == maxTries.
  305. if eps.Equal(expectedEps) && i+1 >= minTries {
  306. e2elog.Logf("Found all expected endpoints: %+v", eps.List())
  307. return
  308. }
  309. e2elog.Logf("Waiting for %+v endpoints (expected=%+v, actual=%+v)", expectedEps.Difference(eps).List(), expectedEps.List(), eps.List())
  310. // TODO: get rid of this delay #36281
  311. time.Sleep(hitEndpointRetryDelay)
  312. }
  313. config.diagnoseMissingEndpoints(eps)
  314. Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", maxTries, cmd, eps, expectedEps)
  315. }
  316. // GetSelfURL executes a curl against the given path via kubectl exec into a
  317. // test container running with host networking, and fails if the output
  318. // doesn't match the expected string.
  319. func (config *NetworkingTestConfig) GetSelfURL(port int32, path string, expected string) {
  320. cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
  321. ginkgo.By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
  322. config.executeCurlCmd(cmd, expected)
  323. }
  324. // GetSelfURLStatusCode executes a curl against the given path via kubectl exec into a
  325. // test container running with host networking, and fails if the returned status
  326. // code doesn't match the expected string.
  327. func (config *NetworkingTestConfig) GetSelfURLStatusCode(port int32, path string, expected string) {
  328. // check status code
  329. cmd := fmt.Sprintf("curl -o /dev/null -i -q -s -w %%{http_code} --connect-timeout 1 http://localhost:%d%s", port, path)
  330. ginkgo.By(fmt.Sprintf("Checking status code against http://localhost:%d%s", port, path))
  331. config.executeCurlCmd(cmd, expected)
  332. }
  333. func (config *NetworkingTestConfig) executeCurlCmd(cmd string, expected string) {
  334. // These are arbitrary timeouts. The curl command should pass on first try,
  335. // unless remote server is starved/bootstrapping/restarting etc.
  336. const retryInterval = 1 * time.Second
  337. const retryTimeout = 30 * time.Second
  338. podName := config.HostTestContainerPod.Name
  339. var msg string
  340. if pollErr := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
  341. stdout, err := RunHostCmd(config.Namespace, podName, cmd)
  342. if err != nil {
  343. msg = fmt.Sprintf("failed executing cmd %v in %v/%v: %v", cmd, config.Namespace, podName, err)
  344. e2elog.Logf(msg)
  345. return false, nil
  346. }
  347. if !strings.Contains(stdout, expected) {
  348. msg = fmt.Sprintf("successfully executed %v in %v/%v, but output '%v' doesn't contain expected string '%v'", cmd, config.Namespace, podName, stdout, expected)
  349. e2elog.Logf(msg)
  350. return false, nil
  351. }
  352. return true, nil
  353. }); pollErr != nil {
  354. e2elog.Logf("\nOutput of kubectl describe pod %v/%v:\n", config.Namespace, podName)
  355. desc, _ := RunKubectl(
  356. "describe", "pod", podName, fmt.Sprintf("--namespace=%v", config.Namespace))
  357. e2elog.Logf("%s", desc)
  358. Failf("Timed out in %v: %v", retryTimeout, msg)
  359. }
  360. }
  361. func (config *NetworkingTestConfig) createNetShellPodSpec(podName, hostname string) *v1.Pod {
  362. probe := &v1.Probe{
  363. InitialDelaySeconds: 10,
  364. TimeoutSeconds: 30,
  365. PeriodSeconds: 10,
  366. SuccessThreshold: 1,
  367. FailureThreshold: 3,
  368. Handler: v1.Handler{
  369. HTTPGet: &v1.HTTPGetAction{
  370. Path: "/healthz",
  371. Port: intstr.IntOrString{IntVal: EndpointHTTPPort},
  372. },
  373. },
  374. }
  375. pod := &v1.Pod{
  376. TypeMeta: metav1.TypeMeta{
  377. Kind: "Pod",
  378. APIVersion: "v1",
  379. },
  380. ObjectMeta: metav1.ObjectMeta{
  381. Name: podName,
  382. Namespace: config.Namespace,
  383. },
  384. Spec: v1.PodSpec{
  385. Containers: []v1.Container{
  386. {
  387. Name: "webserver",
  388. Image: netexecImageName,
  389. ImagePullPolicy: v1.PullIfNotPresent,
  390. Command: []string{
  391. "/netexec",
  392. fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
  393. fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
  394. },
  395. Ports: []v1.ContainerPort{
  396. {
  397. Name: "http",
  398. ContainerPort: EndpointHTTPPort,
  399. },
  400. {
  401. Name: "udp",
  402. ContainerPort: EndpointUDPPort,
  403. Protocol: v1.ProtocolUDP,
  404. },
  405. },
  406. LivenessProbe: probe,
  407. ReadinessProbe: probe,
  408. },
  409. },
  410. NodeSelector: map[string]string{
  411. "kubernetes.io/hostname": hostname,
  412. },
  413. },
  414. }
  415. return pod
  416. }
  417. func (config *NetworkingTestConfig) createTestPodSpec() *v1.Pod {
  418. pod := &v1.Pod{
  419. TypeMeta: metav1.TypeMeta{
  420. Kind: "Pod",
  421. APIVersion: "v1",
  422. },
  423. ObjectMeta: metav1.ObjectMeta{
  424. Name: testPodName,
  425. Namespace: config.Namespace,
  426. },
  427. Spec: v1.PodSpec{
  428. Containers: []v1.Container{
  429. {
  430. Name: "webserver",
  431. Image: netexecImageName,
  432. ImagePullPolicy: v1.PullIfNotPresent,
  433. Command: []string{
  434. "/netexec",
  435. fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
  436. fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
  437. },
  438. Ports: []v1.ContainerPort{
  439. {
  440. Name: "http",
  441. ContainerPort: testContainerHTTPPort,
  442. },
  443. },
  444. },
  445. },
  446. },
  447. }
  448. return pod
  449. }
  450. func (config *NetworkingTestConfig) createNodePortServiceSpec(svcName string, selector map[string]string, enableSessionAffinity bool) *v1.Service {
  451. sessionAffinity := v1.ServiceAffinityNone
  452. if enableSessionAffinity {
  453. sessionAffinity = v1.ServiceAffinityClientIP
  454. }
  455. return &v1.Service{
  456. ObjectMeta: metav1.ObjectMeta{
  457. Name: svcName,
  458. },
  459. Spec: v1.ServiceSpec{
  460. Type: v1.ServiceTypeNodePort,
  461. Ports: []v1.ServicePort{
  462. {Port: ClusterHTTPPort, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(EndpointHTTPPort)},
  463. {Port: ClusterUDPPort, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(EndpointUDPPort)},
  464. },
  465. Selector: selector,
  466. SessionAffinity: sessionAffinity,
  467. },
  468. }
  469. }
  470. func (config *NetworkingTestConfig) createNodePortService(selector map[string]string) {
  471. config.NodePortService = config.createService(config.createNodePortServiceSpec(nodePortServiceName, selector, false))
  472. }
  473. func (config *NetworkingTestConfig) createSessionAffinityService(selector map[string]string) {
  474. config.SessionAffinityService = config.createService(config.createNodePortServiceSpec(sessionAffinityServiceName, selector, true))
  475. }
  476. // DeleteNodePortService deletes NodePort service.
  477. func (config *NetworkingTestConfig) DeleteNodePortService() {
  478. err := config.getServiceClient().Delete(config.NodePortService.Name, nil)
  479. ExpectNoError(err, "error while deleting NodePortService. err:%v)", err)
  480. time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
  481. }
  482. func (config *NetworkingTestConfig) createTestPods() {
  483. testContainerPod := config.createTestPodSpec()
  484. hostTestContainerPod := NewExecPodSpec(config.Namespace, hostTestPodName, config.HostNetwork)
  485. config.createPod(testContainerPod)
  486. config.createPod(hostTestContainerPod)
  487. ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
  488. ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
  489. var err error
  490. config.TestContainerPod, err = config.getPodClient().Get(testContainerPod.Name, metav1.GetOptions{})
  491. if err != nil {
  492. Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
  493. }
  494. config.HostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name, metav1.GetOptions{})
  495. if err != nil {
  496. Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
  497. }
  498. }
  499. func (config *NetworkingTestConfig) createService(serviceSpec *v1.Service) *v1.Service {
  500. _, err := config.getServiceClient().Create(serviceSpec)
  501. ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  502. err = WaitForService(config.f.ClientSet, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
  503. ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
  504. createdService, err := config.getServiceClient().Get(serviceSpec.Name, metav1.GetOptions{})
  505. ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  506. return createdService
  507. }
  508. // setupCore sets up the pods and core test config
  509. // mainly for simplified node e2e setup
  510. func (config *NetworkingTestConfig) setupCore(selector map[string]string) {
  511. ginkgo.By("Creating the service pods in kubernetes")
  512. podName := "netserver"
  513. config.EndpointPods = config.createNetProxyPods(podName, selector)
  514. ginkgo.By("Creating test pods")
  515. config.createTestPods()
  516. epCount := len(config.EndpointPods)
  517. config.MaxTries = epCount*epCount + testTries
  518. }
  519. // setup includes setupCore and also sets up services
  520. func (config *NetworkingTestConfig) setup(selector map[string]string) {
  521. config.setupCore(selector)
  522. ginkgo.By("Getting node addresses")
  523. ExpectNoError(WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute))
  524. nodeList := GetReadySchedulableNodesOrDie(config.f.ClientSet)
  525. config.ExternalAddrs = NodeAddresses(nodeList, v1.NodeExternalIP)
  526. SkipUnlessNodeCountIsAtLeast(2)
  527. config.Nodes = nodeList.Items
  528. ginkgo.By("Creating the service on top of the pods in kubernetes")
  529. config.createNodePortService(selector)
  530. config.createSessionAffinityService(selector)
  531. for _, p := range config.NodePortService.Spec.Ports {
  532. switch p.Protocol {
  533. case v1.ProtocolUDP:
  534. config.NodeUDPPort = int(p.NodePort)
  535. case v1.ProtocolTCP:
  536. config.NodeHTTPPort = int(p.NodePort)
  537. default:
  538. continue
  539. }
  540. }
  541. config.ClusterIP = config.NodePortService.Spec.ClusterIP
  542. if len(config.ExternalAddrs) != 0 {
  543. config.NodeIP = config.ExternalAddrs[0]
  544. } else {
  545. internalAddrs := NodeAddresses(nodeList, v1.NodeInternalIP)
  546. config.NodeIP = internalAddrs[0]
  547. }
  548. }
  549. func (config *NetworkingTestConfig) cleanup() {
  550. nsClient := config.getNamespacesClient()
  551. nsList, err := nsClient.List(metav1.ListOptions{})
  552. if err == nil {
  553. for _, ns := range nsList.Items {
  554. if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.Namespace {
  555. nsClient.Delete(ns.Name, nil)
  556. }
  557. }
  558. }
  559. }
  560. // shuffleNodes copies nodes from the specified slice into a copy in random
  561. // order. It returns a new slice.
  562. func shuffleNodes(nodes []v1.Node) []v1.Node {
  563. shuffled := make([]v1.Node, len(nodes))
  564. perm := rand.Perm(len(nodes))
  565. for i, j := range perm {
  566. shuffled[j] = nodes[i]
  567. }
  568. return shuffled
  569. }
  570. func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*v1.Pod {
  571. ExpectNoError(WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute))
  572. nodeList := GetReadySchedulableNodesOrDie(config.f.ClientSet)
  573. // To make this test work reasonably fast in large clusters,
  574. // we limit the number of NetProxyPods to no more than
  575. // maxNetProxyPodsCount on random nodes.
  576. nodes := shuffleNodes(nodeList.Items)
  577. if len(nodes) > maxNetProxyPodsCount {
  578. nodes = nodes[:maxNetProxyPodsCount]
  579. }
  580. // create pods, one for each node
  581. createdPods := make([]*v1.Pod, 0, len(nodes))
  582. for i, n := range nodes {
  583. podName := fmt.Sprintf("%s-%d", podName, i)
  584. hostname, _ := n.Labels["kubernetes.io/hostname"]
  585. pod := config.createNetShellPodSpec(podName, hostname)
  586. pod.ObjectMeta.Labels = selector
  587. createdPod := config.createPod(pod)
  588. createdPods = append(createdPods, createdPod)
  589. }
  590. // wait that all of them are up
  591. runningPods := make([]*v1.Pod, 0, len(nodes))
  592. for _, p := range createdPods {
  593. ExpectNoError(config.f.WaitForPodReady(p.Name))
  594. rp, err := config.getPodClient().Get(p.Name, metav1.GetOptions{})
  595. ExpectNoError(err)
  596. runningPods = append(runningPods, rp)
  597. }
  598. return runningPods
  599. }
  600. // DeleteNetProxyPod deletes the first endpoint pod and waits for it being removed.
  601. func (config *NetworkingTestConfig) DeleteNetProxyPod() {
  602. pod := config.EndpointPods[0]
  603. config.getPodClient().Delete(pod.Name, metav1.NewDeleteOptions(0))
  604. config.EndpointPods = config.EndpointPods[1:]
  605. // wait for pod being deleted.
  606. err := WaitForPodToDisappear(config.f.ClientSet, config.Namespace, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
  607. if err != nil {
  608. Failf("Failed to delete %s pod: %v", pod.Name, err)
  609. }
  610. // wait for endpoint being removed.
  611. err = WaitForServiceEndpointsNum(config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
  612. if err != nil {
  613. Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
  614. }
  615. // wait for kube-proxy to catch up with the pod being deleted.
  616. time.Sleep(5 * time.Second)
  617. }
  618. func (config *NetworkingTestConfig) createPod(pod *v1.Pod) *v1.Pod {
  619. return config.getPodClient().Create(pod)
  620. }
  621. func (config *NetworkingTestConfig) getPodClient() *PodClient {
  622. if config.podClient == nil {
  623. config.podClient = config.f.PodClient()
  624. }
  625. return config.podClient
  626. }
  627. func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInterface {
  628. return config.f.ClientSet.CoreV1().Services(config.Namespace)
  629. }
  630. func (config *NetworkingTestConfig) getNamespacesClient() coreclientset.NamespaceInterface {
  631. return config.f.ClientSet.CoreV1().Namespaces()
  632. }
  633. // CheckReachabilityFromPod checks reachability from the specified pod.
  634. func CheckReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, namespace, pod, target string) {
  635. cmd := fmt.Sprintf("wget -T 5 -qO- %q", target)
  636. err := wait.PollImmediate(Poll, timeout, func() (bool, error) {
  637. _, err := RunHostCmd(namespace, pod, cmd)
  638. if expectToBeReachable && err != nil {
  639. e2elog.Logf("Expect target to be reachable. But got err: %v. Retry until timeout", err)
  640. return false, nil
  641. }
  642. if !expectToBeReachable && err == nil {
  643. e2elog.Logf("Expect target NOT to be reachable. But it is reachable. Retry until timeout")
  644. return false, nil
  645. }
  646. return true, nil
  647. })
  648. ExpectNoError(err)
  649. }
  650. // HTTPPokeParams is a struct for HTTP poke parameters.
  651. type HTTPPokeParams struct {
  652. Timeout time.Duration
  653. ExpectCode int // default = 200
  654. BodyContains string
  655. RetriableCodes []int
  656. }
  657. // HTTPPokeResult is a struct for HTTP poke result.
  658. type HTTPPokeResult struct {
  659. Status HTTPPokeStatus
  660. Code int // HTTP code: 0 if the connection was not made
  661. Error error // if there was any error
  662. Body []byte // if code != 0
  663. }
  664. // HTTPPokeStatus is string for representing HTTP poke status.
  665. type HTTPPokeStatus string
  666. const (
  667. // HTTPSuccess is HTTP poke status which is success.
  668. HTTPSuccess HTTPPokeStatus = "Success"
  669. // HTTPError is HTTP poke status which is error.
  670. HTTPError HTTPPokeStatus = "UnknownError"
  671. // HTTPTimeout is HTTP poke status which is timeout.
  672. HTTPTimeout HTTPPokeStatus = "TimedOut"
  673. // HTTPRefused is HTTP poke status which is connection refused.
  674. HTTPRefused HTTPPokeStatus = "ConnectionRefused"
  675. // HTTPRetryCode is HTTP poke status which is retry code.
  676. HTTPRetryCode HTTPPokeStatus = "RetryCode"
  677. // HTTPWrongCode is HTTP poke status which is wrong code.
  678. HTTPWrongCode HTTPPokeStatus = "WrongCode"
  679. // HTTPBadResponse is HTTP poke status which is bad response.
  680. HTTPBadResponse HTTPPokeStatus = "BadResponse"
  681. // Any time we add new errors, we should audit all callers of this.
  682. )
  683. // PokeHTTP tries to connect to a host on a port for a given URL path. Callers
  684. // can specify additional success parameters, if desired.
  685. //
  686. // The result status will be characterized as precisely as possible, given the
  687. // known users of this.
  688. //
  689. // The result code will be zero in case of any failure to connect, or non-zero
  690. // if the HTTP transaction completed (even if the other test params make this a
  691. // failure).
  692. //
  693. // The result error will be populated for any status other than Success.
  694. //
  695. // The result body will be populated if the HTTP transaction was completed, even
  696. // if the other test params make this a failure).
  697. func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
  698. hostPort := net.JoinHostPort(host, strconv.Itoa(port))
  699. url := fmt.Sprintf("http://%s%s", hostPort, path)
  700. ret := HTTPPokeResult{}
  701. // Sanity check inputs, because it has happened. These are the only things
  702. // that should hard fail the test - they are basically ASSERT()s.
  703. if host == "" {
  704. Failf("Got empty host for HTTP poke (%s)", url)
  705. return ret
  706. }
  707. if port == 0 {
  708. Failf("Got port==0 for HTTP poke (%s)", url)
  709. return ret
  710. }
  711. // Set default params.
  712. if params == nil {
  713. params = &HTTPPokeParams{}
  714. }
  715. if params.ExpectCode == 0 {
  716. params.ExpectCode = http.StatusOK
  717. }
  718. e2elog.Logf("Poking %q", url)
  719. resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
  720. if err != nil {
  721. ret.Error = err
  722. neterr, ok := err.(net.Error)
  723. if ok && neterr.Timeout() {
  724. ret.Status = HTTPTimeout
  725. } else if strings.Contains(err.Error(), "connection refused") {
  726. ret.Status = HTTPRefused
  727. } else {
  728. ret.Status = HTTPError
  729. }
  730. e2elog.Logf("Poke(%q): %v", url, err)
  731. return ret
  732. }
  733. ret.Code = resp.StatusCode
  734. defer resp.Body.Close()
  735. body, err := ioutil.ReadAll(resp.Body)
  736. if err != nil {
  737. ret.Status = HTTPError
  738. ret.Error = fmt.Errorf("error reading HTTP body: %v", err)
  739. e2elog.Logf("Poke(%q): %v", url, ret.Error)
  740. return ret
  741. }
  742. ret.Body = make([]byte, len(body))
  743. copy(ret.Body, body)
  744. if resp.StatusCode != params.ExpectCode {
  745. for _, code := range params.RetriableCodes {
  746. if resp.StatusCode == code {
  747. ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
  748. ret.Status = HTTPRetryCode
  749. e2elog.Logf("Poke(%q): %v", url, ret.Error)
  750. return ret
  751. }
  752. }
  753. ret.Status = HTTPWrongCode
  754. ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
  755. e2elog.Logf("Poke(%q): %v", url, ret.Error)
  756. return ret
  757. }
  758. if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
  759. ret.Status = HTTPBadResponse
  760. ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
  761. e2elog.Logf("Poke(%q): %v", url, ret.Error)
  762. return ret
  763. }
  764. ret.Status = HTTPSuccess
  765. e2elog.Logf("Poke(%q): success", url)
  766. return ret
  767. }
  768. // Does an HTTP GET, but does not reuse TCP connections
  769. // This masks problems where the iptables rule has changed, but we don't see it
  770. func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
  771. tr := utilnet.SetTransportDefaults(&http.Transport{
  772. DisableKeepAlives: true,
  773. })
  774. client := &http.Client{
  775. Transport: tr,
  776. Timeout: timeout,
  777. }
  778. return client.Get(url)
  779. }
  780. // UDPPokeParams is a struct for UDP poke parameters.
  781. type UDPPokeParams struct {
  782. Timeout time.Duration
  783. Response string
  784. }
  785. // UDPPokeResult is a struct for UDP poke result.
  786. type UDPPokeResult struct {
  787. Status UDPPokeStatus
  788. Error error // if there was any error
  789. Response []byte // if code != 0
  790. }
  791. // UDPPokeStatus is string for representing UDP poke status.
  792. type UDPPokeStatus string
  793. const (
  794. // UDPSuccess is UDP poke status which is success.
  795. UDPSuccess UDPPokeStatus = "Success"
  796. // UDPError is UDP poke status which is error.
  797. UDPError UDPPokeStatus = "UnknownError"
  798. // UDPTimeout is UDP poke status which is timeout.
  799. UDPTimeout UDPPokeStatus = "TimedOut"
  800. // UDPRefused is UDP poke status which is connection refused.
  801. UDPRefused UDPPokeStatus = "ConnectionRefused"
  802. // UDPBadResponse is UDP poke status which is bad response.
  803. UDPBadResponse UDPPokeStatus = "BadResponse"
  804. // Any time we add new errors, we should audit all callers of this.
  805. )
  806. // PokeUDP tries to connect to a host on a port and send the given request. Callers
  807. // can specify additional success parameters, if desired.
  808. //
  809. // The result status will be characterized as precisely as possible, given the
  810. // known users of this.
  811. //
  812. // The result error will be populated for any status other than Success.
  813. //
  814. // The result response will be populated if the UDP transaction was completed, even
  815. // if the other test params make this a failure).
  816. func PokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
  817. hostPort := net.JoinHostPort(host, strconv.Itoa(port))
  818. url := fmt.Sprintf("udp://%s", hostPort)
  819. ret := UDPPokeResult{}
  820. // Sanity check inputs, because it has happened. These are the only things
  821. // that should hard fail the test - they are basically ASSERT()s.
  822. if host == "" {
  823. Failf("Got empty host for UDP poke (%s)", url)
  824. return ret
  825. }
  826. if port == 0 {
  827. Failf("Got port==0 for UDP poke (%s)", url)
  828. return ret
  829. }
  830. // Set default params.
  831. if params == nil {
  832. params = &UDPPokeParams{}
  833. }
  834. e2elog.Logf("Poking %v", url)
  835. con, err := net.Dial("udp", hostPort)
  836. if err != nil {
  837. ret.Status = UDPError
  838. ret.Error = err
  839. e2elog.Logf("Poke(%q): %v", url, err)
  840. return ret
  841. }
  842. _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
  843. if err != nil {
  844. ret.Error = err
  845. neterr, ok := err.(net.Error)
  846. if ok && neterr.Timeout() {
  847. ret.Status = UDPTimeout
  848. } else if strings.Contains(err.Error(), "connection refused") {
  849. ret.Status = UDPRefused
  850. } else {
  851. ret.Status = UDPError
  852. }
  853. e2elog.Logf("Poke(%q): %v", url, err)
  854. return ret
  855. }
  856. if params.Timeout != 0 {
  857. err = con.SetDeadline(time.Now().Add(params.Timeout))
  858. if err != nil {
  859. ret.Status = UDPError
  860. ret.Error = err
  861. e2elog.Logf("Poke(%q): %v", url, err)
  862. return ret
  863. }
  864. }
  865. bufsize := len(params.Response) + 1
  866. if bufsize == 0 {
  867. bufsize = 4096
  868. }
  869. var buf = make([]byte, bufsize)
  870. n, err := con.Read(buf)
  871. if err != nil {
  872. ret.Error = err
  873. neterr, ok := err.(net.Error)
  874. if ok && neterr.Timeout() {
  875. ret.Status = UDPTimeout
  876. } else if strings.Contains(err.Error(), "connection refused") {
  877. ret.Status = UDPRefused
  878. } else {
  879. ret.Status = UDPError
  880. }
  881. e2elog.Logf("Poke(%q): %v", url, err)
  882. return ret
  883. }
  884. ret.Response = buf[0:n]
  885. if params.Response != "" && string(ret.Response) != params.Response {
  886. ret.Status = UDPBadResponse
  887. ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
  888. e2elog.Logf("Poke(%q): %v", url, ret.Error)
  889. return ret
  890. }
  891. ret.Status = UDPSuccess
  892. e2elog.Logf("Poke(%q): success", url)
  893. return ret
  894. }
  895. // TestHitNodesFromOutside checkes HTTP connectivity from outside.
  896. func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error {
  897. return TestHitNodesFromOutsideWithCount(externalIP, httpPort, timeout, expectedHosts, 1)
  898. }
  899. // TestHitNodesFromOutsideWithCount checkes HTTP connectivity from outside with count.
  900. func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String,
  901. countToSucceed int) error {
  902. e2elog.Logf("Waiting up to %v for satisfying expectedHosts for %v times", timeout, countToSucceed)
  903. hittedHosts := sets.NewString()
  904. count := 0
  905. condition := func() (bool, error) {
  906. result := PokeHTTP(externalIP, int(httpPort), "/hostname", &HTTPPokeParams{Timeout: 1 * time.Second})
  907. if result.Status != HTTPSuccess {
  908. return false, nil
  909. }
  910. hittedHost := strings.TrimSpace(string(result.Body))
  911. if !expectedHosts.Has(hittedHost) {
  912. e2elog.Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count)
  913. count = 0
  914. return false, nil
  915. }
  916. if !hittedHosts.Has(hittedHost) {
  917. hittedHosts.Insert(hittedHost)
  918. e2elog.Logf("Missing %+v, got %+v", expectedHosts.Difference(hittedHosts), hittedHosts)
  919. }
  920. if hittedHosts.Equal(expectedHosts) {
  921. count++
  922. if count >= countToSucceed {
  923. return true, nil
  924. }
  925. }
  926. return false, nil
  927. }
  928. if err := wait.Poll(time.Second, timeout, condition); err != nil {
  929. return fmt.Errorf("error waiting for expectedHosts: %v, hittedHosts: %v, count: %v, expected count: %v",
  930. expectedHosts, hittedHosts, count, countToSucceed)
  931. }
  932. return nil
  933. }
  934. // TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
  935. // At the end (even in case of errors), the network traffic is brought back to normal.
  936. // This function executes commands on a node so it will work only for some
  937. // environments.
  938. func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
  939. host, err := GetNodeExternalIP(node)
  940. if err != nil {
  941. Failf("Error getting node external ip : %v", err)
  942. }
  943. masterAddresses := GetAllMasterAddresses(c)
  944. ginkgo.By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
  945. defer func() {
  946. // This code will execute even if setting the iptables rule failed.
  947. // It is on purpose because we may have an error even if the new rule
  948. // had been inserted. (yes, we could look at the error code and ssh error
  949. // separately, but I prefer to stay on the safe side).
  950. ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
  951. for _, masterAddress := range masterAddresses {
  952. UnblockNetwork(host, masterAddress)
  953. }
  954. }()
  955. e2elog.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
  956. if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
  957. Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  958. }
  959. for _, masterAddress := range masterAddresses {
  960. BlockNetwork(host, masterAddress)
  961. }
  962. e2elog.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
  963. if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
  964. Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
  965. }
  966. testFunc()
  967. // network traffic is unblocked in a deferred function
  968. }