resource_consumer_handler.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "fmt"
  16. "net/http"
  17. "net/url"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "k8s.io/kubernetes/test/images/resource-consumer/common"
  22. )
  23. // ResourceConsumerHandler holds metrics for a resource consumer.
  24. type ResourceConsumerHandler struct {
  25. metrics map[string]float64
  26. metricsLock sync.Mutex
  27. }
  28. // NewResourceConsumerHandler creates and initializes a ResourceConsumerHandler to defaults.
  29. func NewResourceConsumerHandler() *ResourceConsumerHandler {
  30. return &ResourceConsumerHandler{metrics: map[string]float64{}}
  31. }
  32. func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  33. // handle exposing metrics in Prometheus format (both GET & POST)
  34. if req.URL.Path == common.MetricsAddress {
  35. handler.handleMetrics(w)
  36. return
  37. }
  38. if req.Method != "POST" {
  39. http.Error(w, common.BadRequest, http.StatusBadRequest)
  40. return
  41. }
  42. // parsing POST request data and URL data
  43. if err := req.ParseForm(); err != nil {
  44. http.Error(w, err.Error(), http.StatusBadRequest)
  45. return
  46. }
  47. // handle consumeCPU
  48. if req.URL.Path == common.ConsumeCPUAddress {
  49. handler.handleConsumeCPU(w, req.Form)
  50. return
  51. }
  52. // handle consumeMem
  53. if req.URL.Path == common.ConsumeMemAddress {
  54. handler.handleConsumeMem(w, req.Form)
  55. return
  56. }
  57. // handle getCurrentStatus
  58. if req.URL.Path == common.GetCurrentStatusAddress {
  59. handler.handleGetCurrentStatus(w)
  60. return
  61. }
  62. // handle bumpMetric
  63. if req.URL.Path == common.BumpMetricAddress {
  64. handler.handleBumpMetric(w, req.Form)
  65. return
  66. }
  67. http.Error(w, fmt.Sprintf("%s: %s", common.UnknownFunction, req.URL.Path), http.StatusNotFound)
  68. }
  69. func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
  70. // getting string data for consumeCPU
  71. durationSecString := query.Get(common.DurationSecQuery)
  72. millicoresString := query.Get(common.MillicoresQuery)
  73. if durationSecString == "" || millicoresString == "" {
  74. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  75. return
  76. }
  77. // convert data (strings to ints) for consumeCPU
  78. durationSec, durationSecError := strconv.Atoi(durationSecString)
  79. millicores, millicoresError := strconv.Atoi(millicoresString)
  80. if durationSecError != nil || millicoresError != nil {
  81. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  82. return
  83. }
  84. go ConsumeCPU(millicores, durationSec)
  85. fmt.Fprintln(w, common.ConsumeCPUAddress[1:])
  86. fmt.Fprintln(w, millicores, common.MillicoresQuery)
  87. fmt.Fprintln(w, durationSec, common.DurationSecQuery)
  88. }
  89. func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
  90. // getting string data for consumeMem
  91. durationSecString := query.Get(common.DurationSecQuery)
  92. megabytesString := query.Get(common.MegabytesQuery)
  93. if durationSecString == "" || megabytesString == "" {
  94. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  95. return
  96. }
  97. // convert data (strings to ints) for consumeMem
  98. durationSec, durationSecError := strconv.Atoi(durationSecString)
  99. megabytes, megabytesError := strconv.Atoi(megabytesString)
  100. if durationSecError != nil || megabytesError != nil {
  101. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  102. return
  103. }
  104. go ConsumeMem(megabytes, durationSec)
  105. fmt.Fprintln(w, common.ConsumeMemAddress[1:])
  106. fmt.Fprintln(w, megabytes, common.MegabytesQuery)
  107. fmt.Fprintln(w, durationSec, common.DurationSecQuery)
  108. }
  109. func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
  110. GetCurrentStatus()
  111. fmt.Fprintln(w, "Warning: not implemented!")
  112. fmt.Fprint(w, common.GetCurrentStatusAddress[1:])
  113. }
  114. func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
  115. handler.metricsLock.Lock()
  116. defer handler.metricsLock.Unlock()
  117. for k, v := range handler.metrics {
  118. fmt.Fprintf(w, "# HELP %s info message.\n", k)
  119. fmt.Fprintf(w, "# TYPE %s gauge\n", k)
  120. fmt.Fprintf(w, "%s %f\n", k, v)
  121. }
  122. }
  123. func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
  124. handler.metricsLock.Lock()
  125. if _, ok := handler.metrics[metric]; ok {
  126. handler.metrics[metric] += delta
  127. } else {
  128. handler.metrics[metric] = delta
  129. }
  130. handler.metricsLock.Unlock()
  131. time.Sleep(duration)
  132. handler.metricsLock.Lock()
  133. handler.metrics[metric] -= delta
  134. handler.metricsLock.Unlock()
  135. }
  136. func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
  137. // getting string data for handleBumpMetric
  138. metric := query.Get(common.MetricNameQuery)
  139. deltaString := query.Get(common.DeltaQuery)
  140. durationSecString := query.Get(common.DurationSecQuery)
  141. if durationSecString == "" || metric == "" || deltaString == "" {
  142. http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
  143. return
  144. }
  145. // convert data (strings to ints/floats) for handleBumpMetric
  146. durationSec, durationSecError := strconv.Atoi(durationSecString)
  147. delta, deltaError := strconv.ParseFloat(deltaString, 64)
  148. if durationSecError != nil || deltaError != nil {
  149. http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
  150. return
  151. }
  152. go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
  153. fmt.Fprintln(w, common.BumpMetricAddress[1:])
  154. fmt.Fprintln(w, metric, common.MetricNameQuery)
  155. fmt.Fprintln(w, delta, common.DeltaQuery)
  156. fmt.Fprintln(w, durationSec, common.DurationSecQuery)
  157. }