nettest.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // A tiny web server for checking networking connectivity.
  14. //
  15. // Will dial out to, and expect to hear from, every pod that is a member of
  16. // the service passed in the flag -service.
  17. //
  18. // Will serve a webserver on given -port.
  19. //
  20. // Visit /read to see the current state, or /quit to shut down.
  21. //
  22. // Visit /status to see pass/running/fail determination. (literally, it will
  23. // return one of those words.)
  24. //
  25. // /write is used by other network test pods to register connectivity.
  26. package nettest
  27. import (
  28. "bytes"
  29. "context"
  30. "encoding/json"
  31. "fmt"
  32. "io/ioutil"
  33. "log"
  34. "net"
  35. "net/http"
  36. "os"
  37. "os/signal"
  38. "sync"
  39. "syscall"
  40. "time"
  41. "github.com/spf13/cobra"
  42. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  43. "k8s.io/apimachinery/pkg/util/sets"
  44. "k8s.io/apimachinery/pkg/version"
  45. clientset "k8s.io/client-go/kubernetes"
  46. restclient "k8s.io/client-go/rest"
  47. )
  48. var (
  49. port int
  50. peerCount int
  51. service string
  52. namespace string
  53. delayShutdown int
  54. )
  55. // CmdNettest is used by agnhost Cobra.
  56. var CmdNettest = &cobra.Command{
  57. Use: "nettest",
  58. Short: "Starts a tiny web server for checking networking connectivity",
  59. Long: `Starts a web server for checking networking connectivity on the given "--port".
  60. Will dial out to, and expect to hear from, every pod that is a member of the service
  61. passed in the flag "--service".
  62. The web server will have the following endpoints:
  63. - "/read": to see the current state, or "/quit" to shut down.
  64. - "/status": to see "pass/running/fail" determination. (literally, it will return
  65. one of those words.)
  66. - "/write": is used by other network test pods to register connectivity.`,
  67. Args: cobra.MaximumNArgs(0),
  68. Run: main,
  69. }
  70. func init() {
  71. CmdNettest.Flags().IntVar(&port, "port", 8080, "Port number to serve at.")
  72. CmdNettest.Flags().IntVar(&peerCount, "peers", 8, "Must find at least this many peers for the test to pass.")
  73. CmdNettest.Flags().StringVar(&service, "service", "nettest", "Service to find other network test pods in.")
  74. CmdNettest.Flags().StringVar(&namespace, "namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.")
  75. CmdNettest.Flags().IntVar(&delayShutdown, "delay-shutdown", 0, "Number of seconds to delay shutdown when receiving SIGTERM.")
  76. }
  77. // State tracks the internal state of our little http server.
  78. // It's returned verbatim over the /read endpoint.
  79. type State struct {
  80. // Hostname is set once and never changed-- it's always safe to read.
  81. Hostname string
  82. // The below fields require that lock is held before reading or writing.
  83. Sent map[string]int
  84. Received map[string]int
  85. Errors []string
  86. Log []string
  87. StillContactingPeers bool
  88. lock sync.Mutex
  89. }
  90. func (s *State) doneContactingPeers() {
  91. s.lock.Lock()
  92. defer s.lock.Unlock()
  93. s.StillContactingPeers = false
  94. }
  95. // serveStatus returns "pass", "running", or "fail".
  96. func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) {
  97. s.lock.Lock()
  98. defer s.lock.Unlock()
  99. if len(s.Sent) >= peerCount && len(s.Received) >= peerCount {
  100. fmt.Fprintf(w, "pass")
  101. return
  102. }
  103. if s.StillContactingPeers {
  104. fmt.Fprintf(w, "running")
  105. return
  106. }
  107. // Logf can't be called while holding the lock, so defer using a goroutine
  108. go s.Logf("Declaring failure for %s/%s with %d sent and %d received and %d peers", namespace, service, len(s.Sent), len(s.Received), peerCount)
  109. fmt.Fprintf(w, "fail")
  110. }
  111. // serveRead writes our json encoded state
  112. func (s *State) serveRead(w http.ResponseWriter, r *http.Request) {
  113. s.lock.Lock()
  114. defer s.lock.Unlock()
  115. w.WriteHeader(http.StatusOK)
  116. b, err := json.MarshalIndent(s, "", "\t")
  117. s.appendErr(err)
  118. _, err = w.Write(b)
  119. s.appendErr(err)
  120. }
  121. // WritePost is the format that (json encoded) requests to the /write handler should take.
  122. type WritePost struct {
  123. Source string
  124. Dest string
  125. }
  126. // WriteResp is returned by /write
  127. type WriteResp struct {
  128. Hostname string
  129. }
  130. // serveWrite records the contact in our state.
  131. func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) {
  132. defer r.Body.Close()
  133. s.lock.Lock()
  134. defer s.lock.Unlock()
  135. w.WriteHeader(http.StatusOK)
  136. var wp WritePost
  137. s.appendErr(json.NewDecoder(r.Body).Decode(&wp))
  138. if wp.Source == "" {
  139. s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname))
  140. } else {
  141. if s.Received == nil {
  142. s.Received = map[string]int{}
  143. }
  144. s.Received[wp.Source]++
  145. }
  146. s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname}))
  147. }
  148. // appendErr adds err to the list, if err is not nil. s must be locked.
  149. func (s *State) appendErr(err error) {
  150. if err != nil {
  151. s.Errors = append(s.Errors, err.Error())
  152. }
  153. }
  154. // Logf writes to the log message list. s must not be locked.
  155. // s's Log member will drop an old message if it would otherwise
  156. // become longer than 500 messages.
  157. func (s *State) Logf(format string, args ...interface{}) {
  158. s.lock.Lock()
  159. defer s.lock.Unlock()
  160. s.Log = append(s.Log, fmt.Sprintf(format, args...))
  161. if len(s.Log) > 500 {
  162. s.Log = s.Log[1:]
  163. }
  164. }
  165. // s must not be locked
  166. func (s *State) appendSuccessfulSend(toHostname string) {
  167. s.lock.Lock()
  168. defer s.lock.Unlock()
  169. if s.Sent == nil {
  170. s.Sent = map[string]int{}
  171. }
  172. s.Sent[toHostname]++
  173. }
  174. var (
  175. // Our one and only state object
  176. state State
  177. )
  178. func main(cmd *cobra.Command, args []string) {
  179. if service == "" {
  180. log.Fatal("Must provide -service flag.")
  181. }
  182. hostname, err := os.Hostname()
  183. if err != nil {
  184. log.Fatalf("Error getting hostname: %v", err)
  185. }
  186. if delayShutdown > 0 {
  187. termCh := make(chan os.Signal)
  188. signal.Notify(termCh, syscall.SIGTERM)
  189. go func() {
  190. <-termCh
  191. log.Printf("Sleeping %d seconds before exit ...", delayShutdown)
  192. time.Sleep(time.Duration(delayShutdown) * time.Second)
  193. os.Exit(0)
  194. }()
  195. }
  196. state := State{
  197. Hostname: hostname,
  198. StillContactingPeers: true,
  199. }
  200. go contactOthers(&state)
  201. http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
  202. os.Exit(0)
  203. })
  204. http.HandleFunc("/read", state.serveRead)
  205. http.HandleFunc("/write", state.serveWrite)
  206. http.HandleFunc("/status", state.serveStatus)
  207. go log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
  208. select {}
  209. }
  210. // Find all sibling pods in the service and post to their /write handler.
  211. func contactOthers(state *State) {
  212. var (
  213. versionInfo *version.Info
  214. err error
  215. )
  216. sleepTime := 5 * time.Second
  217. // In large cluster getting all endpoints is pretty expensive.
  218. // Thus, we will limit ourselves to send on average at most 10 such
  219. // requests per second
  220. if sleepTime < time.Duration(peerCount/10)*time.Second {
  221. sleepTime = time.Duration(peerCount/10) * time.Second
  222. }
  223. timeout := 5 * time.Minute
  224. // Similarly we need to bump timeout so that it is reasonable in large
  225. // clusters.
  226. if timeout < time.Duration(peerCount)*time.Second {
  227. timeout = time.Duration(peerCount) * time.Second
  228. }
  229. defer state.doneContactingPeers()
  230. config, err := restclient.InClusterConfig()
  231. if err != nil {
  232. log.Fatalf("Unable to create config; error: %v\n", err)
  233. }
  234. config.ContentType = "application/vnd.kubernetes.protobuf"
  235. client, err := clientset.NewForConfig(config)
  236. if err != nil {
  237. log.Fatalf("Unable to create client; error: %v\n", err)
  238. }
  239. // Try to get the server version until <timeout>; we use a timeout because
  240. // the pod might not have immediate network connectivity.
  241. for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
  242. // Double check that worked by getting the server version.
  243. if versionInfo, err = client.Discovery().ServerVersion(); err != nil {
  244. log.Printf("Unable to get server version: %v; retrying.\n", err)
  245. } else {
  246. log.Printf("Server version: %#v\n", versionInfo)
  247. break
  248. }
  249. time.Sleep(1 * time.Second)
  250. }
  251. if err != nil {
  252. log.Fatalf("Unable to contact Kubernetes: %v\n", err)
  253. }
  254. for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
  255. eps := getWebserverEndpoints(client)
  256. if eps.Len() >= peerCount {
  257. break
  258. }
  259. state.Logf("%v/%v has %v endpoints (%v), which is less than %v as expected. Waiting for all endpoints to come up.", namespace, service, len(eps), eps.List(), peerCount)
  260. }
  261. // Do this repeatedly, in case there's some propagation delay with getting
  262. // newly started pods into the endpoints list.
  263. for i := 0; i < 15; i++ {
  264. eps := getWebserverEndpoints(client)
  265. for ep := range eps {
  266. state.Logf("Attempting to contact %s", ep)
  267. contactSingle(ep, state)
  268. }
  269. time.Sleep(sleepTime)
  270. }
  271. }
  272. //getWebserverEndpoints returns the webserver endpoints as a set of String, each in the format like "http://{ip}:{port}"
  273. func getWebserverEndpoints(client clientset.Interface) sets.String {
  274. endpoints, err := client.CoreV1().Endpoints(namespace).Get(context.TODO(), service, v1.GetOptions{})
  275. eps := sets.String{}
  276. if err != nil {
  277. state.Logf("Unable to read the endpoints for %v/%v: %v.", namespace, service, err)
  278. return eps
  279. }
  280. for _, ss := range endpoints.Subsets {
  281. for _, a := range ss.Addresses {
  282. for _, p := range ss.Ports {
  283. ipPort := net.JoinHostPort(a.IP, fmt.Sprint(p.Port))
  284. eps.Insert(fmt.Sprintf("http://%s", ipPort))
  285. }
  286. }
  287. }
  288. return eps
  289. }
  290. // contactSingle dials the address 'e' and tries to POST to its /write address.
  291. func contactSingle(e string, state *State) {
  292. body, err := json.Marshal(&WritePost{
  293. Dest: e,
  294. Source: state.Hostname,
  295. })
  296. if err != nil {
  297. log.Fatalf("json marshal error: %v", err)
  298. }
  299. resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body))
  300. if err != nil {
  301. state.Logf("Warning: unable to contact the endpoint %q: %v", e, err)
  302. return
  303. }
  304. defer resp.Body.Close()
  305. body, err = ioutil.ReadAll(resp.Body)
  306. if err != nil {
  307. state.Logf("Warning: unable to read response from '%v': '%v'", e, err)
  308. return
  309. }
  310. var wr WriteResp
  311. err = json.Unmarshal(body, &wr)
  312. if err != nil {
  313. state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err)
  314. return
  315. }
  316. state.appendSuccessfulSend(wr.Hostname)
  317. }