guestbook.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package guestbook
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io/ioutil"
  18. "log"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "time"
  24. "github.com/spf13/cobra"
  25. utilnet "k8s.io/apimachinery/pkg/util/net"
  26. )
  27. // CmdGuestbook is used by agnhost Cobra.
  28. var CmdGuestbook = &cobra.Command{
  29. Use: "guestbook",
  30. Short: "Creates a HTTP server with various endpoints representing a guestbook app",
  31. Long: `Starts a HTTP server on the given --http-port (default: 80), serving various endpoints representing a guestbook app. The endpoints and their purpose are:
  32. - /register: A guestbook slave will subscribe to a master, to its given --slaveof endpoint. The master will then push any updates it receives to its registered slaves through the --backend-port.
  33. - /get: Returns '{"data": value}', where the value is the stored value for the given key if non-empty, or the entire store.
  34. - /set: Will set the given key-value pair in its own store and propagate it to its slaves, if any. Will return '{"data": "Updated"}' to the caller on success.
  35. - /guestbook: Will proxy the request to agnhost-master if the given cmd is 'set', or agnhost-slave if the given cmd is 'get'.`,
  36. Args: cobra.MaximumNArgs(0),
  37. Run: main,
  38. }
  39. var (
  40. httpPort string
  41. backendPort string
  42. slaveOf string
  43. slaves []string
  44. store map[string]interface{}
  45. )
  46. const (
  47. timeout = time.Duration(15) * time.Second
  48. sleep = time.Duration(1) * time.Second
  49. )
  50. func init() {
  51. CmdGuestbook.Flags().StringVar(&httpPort, "http-port", "80", "HTTP Listen Port")
  52. CmdGuestbook.Flags().StringVar(&backendPort, "backend-port", "6379", "Backend's HTTP Listen Port")
  53. CmdGuestbook.Flags().StringVar(&slaveOf, "slaveof", "", "The host's name to register to")
  54. store = make(map[string]interface{})
  55. }
  56. func main(cmd *cobra.Command, args []string) {
  57. go registerNode(slaveOf, backendPort)
  58. startHTTPServer(httpPort)
  59. }
  60. func registerNode(registerTo, port string) {
  61. if registerTo == "" {
  62. return
  63. }
  64. hostPort := net.JoinHostPort(registerTo, backendPort)
  65. _, err := net.ResolveTCPAddr("tcp", hostPort)
  66. if err != nil {
  67. log.Fatalf("--slaveof param and/or --backend-port param are invalid. %v", err)
  68. return
  69. }
  70. start := time.Now()
  71. for time.Since(start) < timeout {
  72. response, err := dialHTTP("register", hostPort)
  73. if err != nil {
  74. log.Printf("encountered error while registering to master: %v. Retrying in 1 second.", err)
  75. time.Sleep(sleep)
  76. continue
  77. }
  78. responseJSON := make(map[string]interface{})
  79. err = json.Unmarshal([]byte(response), &responseJSON)
  80. if err != nil {
  81. log.Fatalf("Error while unmarshaling master's response: %v", err)
  82. }
  83. var ok bool
  84. store, ok = responseJSON["data"].(map[string]interface{})
  85. if !ok {
  86. log.Fatalf("Could not cast responseJSON: %s", responseJSON["data"])
  87. }
  88. log.Printf("Registered to node: %s", registerTo)
  89. return
  90. }
  91. log.Fatal("Timed out while registering to master.")
  92. }
  93. func startHTTPServer(port string) {
  94. http.HandleFunc("/register", registerHandler)
  95. http.HandleFunc("/get", getHandler)
  96. http.HandleFunc("/set", setHandler)
  97. http.HandleFunc("/guestbook", guestbookHandler)
  98. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
  99. }
  100. // registerHandler will register the caller in this server's list of slaves.
  101. // /set requests will be propagated to slaves, if any.
  102. func registerHandler(w http.ResponseWriter, r *http.Request) {
  103. ip, _, err := net.SplitHostPort(r.RemoteAddr)
  104. if err != nil {
  105. fmt.Fprintf(w, "userip: %q is not IP:port", r.RemoteAddr)
  106. return
  107. }
  108. log.Printf("GET /register, IP: %s", ip)
  109. // send all the store to the slave as well.
  110. output := make(map[string]interface{})
  111. output["data"] = store
  112. bytes, err := json.Marshal(output)
  113. if err != nil {
  114. http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
  115. return
  116. }
  117. fmt.Fprint(w, string(bytes))
  118. slaves = append(slaves, ip)
  119. log.Printf("Node '%s' registered.", ip)
  120. }
  121. // getHandler will return '{"data": value}', where value is the stored value for
  122. // the given key if non-empty, or entire store.
  123. func getHandler(w http.ResponseWriter, r *http.Request) {
  124. values, err := url.Parse(r.URL.RequestURI())
  125. if err != nil {
  126. http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
  127. return
  128. }
  129. key := values.Query().Get("key")
  130. log.Printf("GET /get?key=%s", key)
  131. output := make(map[string]interface{})
  132. if key == "" {
  133. output["data"] = store
  134. } else {
  135. value, found := store[key]
  136. if !found {
  137. value = ""
  138. }
  139. output["data"] = value
  140. }
  141. bytes, err := json.Marshal(output)
  142. if err == nil {
  143. fmt.Fprint(w, string(bytes))
  144. } else {
  145. http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
  146. }
  147. }
  148. // setHandler will set the given key-value pair in its own store and propagate
  149. // it to its slaves, if any. Will return '{"message": "Updated"}' to the caller on success.
  150. func setHandler(w http.ResponseWriter, r *http.Request) {
  151. values, err := url.Parse(r.URL.RequestURI())
  152. if err != nil {
  153. http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
  154. return
  155. }
  156. key := values.Query().Get("key")
  157. value := values.Query().Get("value")
  158. log.Printf("GET /set?key=%s&value=%s", key, value)
  159. if key == "" {
  160. http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
  161. return
  162. }
  163. store[key] = value
  164. request := fmt.Sprintf("set?key=%s&value=%s", key, value)
  165. for _, slave := range slaves {
  166. hostPort := net.JoinHostPort(slave, backendPort)
  167. _, err = dialHTTP(request, hostPort)
  168. if err != nil {
  169. http.Error(w, fmt.Sprintf("encountered error while propagating to slave '%s': %v", slave, err), http.StatusExpectationFailed)
  170. return
  171. }
  172. }
  173. output := map[string]string{}
  174. output["message"] = "Updated"
  175. bytes, err := json.Marshal(output)
  176. if err == nil {
  177. fmt.Fprint(w, string(bytes))
  178. } else {
  179. http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
  180. }
  181. }
  182. // guestbookHandler will proxy the request to agnhost-master if the given cmd is
  183. // 'set' or agnhost-slave if the given cmd is 'get'.
  184. func guestbookHandler(w http.ResponseWriter, r *http.Request) {
  185. values, err := url.Parse(r.URL.RequestURI())
  186. if err != nil {
  187. http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
  188. return
  189. }
  190. cmd := strings.ToLower(values.Query().Get("cmd"))
  191. key := values.Query().Get("key")
  192. value := values.Query().Get("value")
  193. log.Printf("GET /guestbook?cmd=%s&key=%s&value=%s", cmd, key, value)
  194. if cmd != "get" && cmd != "set" {
  195. http.Error(w, fmt.Sprintf("unsupported cmd: '%s'", cmd), http.StatusBadRequest)
  196. return
  197. }
  198. if cmd == "set" && key == "" {
  199. http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
  200. return
  201. }
  202. host := "agnhost-master"
  203. if cmd == "get" {
  204. host = "agnhost-slave"
  205. }
  206. hostPort := net.JoinHostPort(host, backendPort)
  207. _, err = net.ResolveTCPAddr("tcp", hostPort)
  208. if err != nil {
  209. http.Error(w, fmt.Sprintf("host and/or port param are invalid. %v", err), http.StatusBadRequest)
  210. return
  211. }
  212. request := fmt.Sprintf("%s?key=%s&value=%s", cmd, key, value)
  213. response, err := dialHTTP(request, hostPort)
  214. if err == nil {
  215. fmt.Fprint(w, response)
  216. } else {
  217. http.Error(w, fmt.Sprintf("encountered error: %v", err), http.StatusExpectationFailed)
  218. }
  219. }
  220. func dialHTTP(request, hostPort string) (string, error) {
  221. transport := utilnet.SetTransportDefaults(&http.Transport{})
  222. httpClient := createHTTPClient(transport)
  223. resp, err := httpClient.Get(fmt.Sprintf("http://%s/%s", hostPort, request))
  224. defer transport.CloseIdleConnections()
  225. if err == nil {
  226. defer resp.Body.Close()
  227. body, err := ioutil.ReadAll(resp.Body)
  228. if err == nil {
  229. return string(body), nil
  230. }
  231. }
  232. return "", err
  233. }
  234. func createHTTPClient(transport *http.Transport) *http.Client {
  235. client := &http.Client{
  236. Transport: transport,
  237. Timeout: 5 * time.Second,
  238. }
  239. return client
  240. }