controller.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "flag"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "sync"
  22. "k8s.io/kubernetes/test/images/resource-consumer/common"
  23. )
  24. var port = flag.Int("port", 8080, "Port number.")
  25. var consumerPort = flag.Int("consumer-port", 8080, "Port number of consumers.")
  26. var consumerServiceName = flag.String("consumer-service-name", "resource-consumer", "Name of service containing resource consumers.")
  27. var consumerServiceNamespace = flag.String("consumer-service-namespace", "default", "Namespace of service containing resource consumers.")
  28. func main() {
  29. flag.Parse()
  30. mgr := newController()
  31. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), mgr))
  32. }
  33. type controller struct {
  34. responseWriterLock sync.Mutex
  35. waitGroup sync.WaitGroup
  36. }
  37. func newController() *controller {
  38. c := &controller{}
  39. return c
  40. }
  41. func (c *controller) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  42. if req.Method != "POST" {
  43. http.Error(w, common.BadRequest, http.StatusBadRequest)
  44. return
  45. }
  46. // parsing POST request data and URL data
  47. if err := req.ParseForm(); err != nil {
  48. http.Error(w, err.Error(), http.StatusBadRequest)
  49. return
  50. }
  51. // handle consumeCPU
  52. if req.URL.Path == common.ConsumeCPUAddress {
  53. c.handleConsumeCPU(w, req.Form)
  54. return
  55. }
  56. // handle consumeMem
  57. if req.URL.Path == common.ConsumeMemAddress {
  58. c.handleConsumeMem(w, req.Form)
  59. return
  60. }
  61. // handle bumpMetric
  62. if req.URL.Path == common.BumpMetricAddress {
  63. c.handleBumpMetric(w, req.Form)
  64. return
  65. }
  66. http.Error(w, common.UnknownFunction, http.StatusNotFound)
  67. }
  68. func (c *controller) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
  69. // getting string data for consumeCPU
  70. durationSecString := query.Get(common.DurationSecQuery)
  71. millicoresString := query.Get(common.MillicoresQuery)
  72. requestSizeInMillicoresString := query.Get(common.RequestSizeInMillicoresQuery)
  73. if durationSecString == "" || millicoresString == "" || requestSizeInMillicoresString == "" {
  74. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  75. return
  76. }
  77. // convert data (strings to ints) for consumeCPU
  78. durationSec, durationSecError := strconv.Atoi(durationSecString)
  79. millicores, millicoresError := strconv.Atoi(millicoresString)
  80. requestSizeInMillicores, requestSizeInMillicoresError := strconv.Atoi(requestSizeInMillicoresString)
  81. if durationSecError != nil || millicoresError != nil || requestSizeInMillicoresError != nil || requestSizeInMillicores <= 0 {
  82. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  83. return
  84. }
  85. count := millicores / requestSizeInMillicores
  86. rest := millicores - count*requestSizeInMillicores
  87. fmt.Fprintf(w, "RC manager: sending %v requests to consume %v millicores each and 1 request to consume %v millicores\n", count, requestSizeInMillicores, rest)
  88. if count > 0 {
  89. c.waitGroup.Add(count)
  90. c.sendConsumeCPURequests(w, count, requestSizeInMillicores, durationSec)
  91. }
  92. if rest > 0 {
  93. c.waitGroup.Add(1)
  94. go c.sendOneConsumeCPURequest(w, rest, durationSec)
  95. }
  96. c.waitGroup.Wait()
  97. }
  98. func (c *controller) handleConsumeMem(w http.ResponseWriter, query url.Values) {
  99. // getting string data for consumeMem
  100. durationSecString := query.Get(common.DurationSecQuery)
  101. megabytesString := query.Get(common.MegabytesQuery)
  102. requestSizeInMegabytesString := query.Get(common.RequestSizeInMegabytesQuery)
  103. if durationSecString == "" || megabytesString == "" || requestSizeInMegabytesString == "" {
  104. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  105. return
  106. }
  107. // convert data (strings to ints) for consumeMem
  108. durationSec, durationSecError := strconv.Atoi(durationSecString)
  109. megabytes, megabytesError := strconv.Atoi(megabytesString)
  110. requestSizeInMegabytes, requestSizeInMegabytesError := strconv.Atoi(requestSizeInMegabytesString)
  111. if durationSecError != nil || megabytesError != nil || requestSizeInMegabytesError != nil || requestSizeInMegabytes <= 0 {
  112. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  113. return
  114. }
  115. count := megabytes / requestSizeInMegabytes
  116. rest := megabytes - count*requestSizeInMegabytes
  117. fmt.Fprintf(w, "RC manager: sending %v requests to consume %v MB each and 1 request to consume %v MB\n", count, requestSizeInMegabytes, rest)
  118. if count > 0 {
  119. c.waitGroup.Add(count)
  120. c.sendConsumeMemRequests(w, count, requestSizeInMegabytes, durationSec)
  121. }
  122. if rest > 0 {
  123. c.waitGroup.Add(1)
  124. go c.sendOneConsumeMemRequest(w, rest, durationSec)
  125. }
  126. c.waitGroup.Wait()
  127. }
  128. func (c *controller) handleBumpMetric(w http.ResponseWriter, query url.Values) {
  129. // getting string data for handleBumpMetric
  130. metric := query.Get(common.MetricNameQuery)
  131. deltaString := query.Get(common.DeltaQuery)
  132. durationSecString := query.Get(common.DurationSecQuery)
  133. requestSizeCustomMetricString := query.Get(common.RequestSizeCustomMetricQuery)
  134. if durationSecString == "" || metric == "" || deltaString == "" || requestSizeCustomMetricString == "" {
  135. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  136. return
  137. }
  138. // convert data (strings to ints/floats) for handleBumpMetric
  139. durationSec, durationSecError := strconv.Atoi(durationSecString)
  140. delta, deltaError := strconv.Atoi(deltaString)
  141. requestSizeCustomMetric, requestSizeCustomMetricError := strconv.Atoi(requestSizeCustomMetricString)
  142. if durationSecError != nil || deltaError != nil || requestSizeCustomMetricError != nil || requestSizeCustomMetric <= 0 {
  143. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  144. return
  145. }
  146. count := delta / requestSizeCustomMetric
  147. rest := delta - count*requestSizeCustomMetric
  148. fmt.Fprintf(w, "RC manager: sending %v requests to bump custom metric by %v each and 1 request to bump by %v\n", count, requestSizeCustomMetric, rest)
  149. if count > 0 {
  150. c.waitGroup.Add(count)
  151. c.sendConsumeCustomMetric(w, metric, count, requestSizeCustomMetric, durationSec)
  152. }
  153. if rest > 0 {
  154. c.waitGroup.Add(1)
  155. go c.sendOneConsumeCustomMetric(w, metric, rest, durationSec)
  156. }
  157. c.waitGroup.Wait()
  158. }
  159. func (c *controller) sendConsumeCPURequests(w http.ResponseWriter, requests, millicores, durationSec int) {
  160. for i := 0; i < requests; i++ {
  161. go c.sendOneConsumeCPURequest(w, millicores, durationSec)
  162. }
  163. }
  164. func (c *controller) sendConsumeMemRequests(w http.ResponseWriter, requests, megabytes, durationSec int) {
  165. for i := 0; i < requests; i++ {
  166. go c.sendOneConsumeMemRequest(w, megabytes, durationSec)
  167. }
  168. }
  169. func (c *controller) sendConsumeCustomMetric(w http.ResponseWriter, metric string, requests, delta, durationSec int) {
  170. for i := 0; i < requests; i++ {
  171. go c.sendOneConsumeCustomMetric(w, metric, delta, durationSec)
  172. }
  173. }
  174. func createConsumerURL(suffix string) string {
  175. return fmt.Sprintf("http://%s.%s.svc.cluster.local:%d%s", *consumerServiceName, *consumerServiceNamespace, *consumerPort, suffix)
  176. }
  177. // sendOneConsumeCPURequest sends POST request for cpu consumption
  178. func (c *controller) sendOneConsumeCPURequest(w http.ResponseWriter, millicores int, durationSec int) {
  179. defer c.waitGroup.Done()
  180. query := createConsumerURL(common.ConsumeCPUAddress)
  181. _, err := http.PostForm(query, url.Values{common.MillicoresQuery: {strconv.Itoa(millicores)}, common.DurationSecQuery: {strconv.Itoa(durationSec)}})
  182. c.responseWriterLock.Lock()
  183. defer c.responseWriterLock.Unlock()
  184. if err != nil {
  185. fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
  186. return
  187. }
  188. fmt.Fprintf(w, "Consumed %d millicores\n", millicores)
  189. }
  190. // sendOneConsumeMemRequest sends POST request for memory consumption
  191. func (c *controller) sendOneConsumeMemRequest(w http.ResponseWriter, megabytes int, durationSec int) {
  192. defer c.waitGroup.Done()
  193. query := createConsumerURL(common.ConsumeMemAddress)
  194. _, err := http.PostForm(query, url.Values{common.MegabytesQuery: {strconv.Itoa(megabytes)}, common.DurationSecQuery: {strconv.Itoa(durationSec)}})
  195. c.responseWriterLock.Lock()
  196. defer c.responseWriterLock.Unlock()
  197. if err != nil {
  198. fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
  199. return
  200. }
  201. fmt.Fprintf(w, "Consumed %d megabytes\n", megabytes)
  202. }
  203. // sendOneConsumeCustomMetric sends POST request for custom metric consumption
  204. func (c *controller) sendOneConsumeCustomMetric(w http.ResponseWriter, customMetricName string, delta int, durationSec int) {
  205. defer c.waitGroup.Done()
  206. query := createConsumerURL(common.BumpMetricAddress)
  207. _, err := http.PostForm(query,
  208. url.Values{common.MetricNameQuery: {customMetricName}, common.DurationSecQuery: {strconv.Itoa(durationSec)}, common.DeltaQuery: {strconv.Itoa(delta)}})
  209. c.responseWriterLock.Lock()
  210. defer c.responseWriterLock.Unlock()
  211. if err != nil {
  212. fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
  213. return
  214. }
  215. fmt.Fprintf(w, "Bumped metric %s by %d\n", customMetricName, delta)
  216. }