service_health.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package healthcheck
  14. import (
  15. "fmt"
  16. "net"
  17. "net/http"
  18. "strings"
  19. "sync"
  20. "github.com/lithammer/dedent"
  21. "k8s.io/klog"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/client-go/tools/record"
  25. api "k8s.io/kubernetes/pkg/apis/core"
  26. )
  27. // ServiceHealthServer serves HTTP endpoints for each service name, with results
  28. // based on the endpoints. If there are 0 endpoints for a service, it returns a
  29. // 503 "Service Unavailable" error (telling LBs not to use this node). If there
  30. // are 1 or more endpoints, it returns a 200 "OK".
  31. type ServiceHealthServer interface {
  32. // Make the new set of services be active. Services that were open before
  33. // will be closed. Services that are new will be opened. Service that
  34. // existed and are in the new set will be left alone. The value of the map
  35. // is the healthcheck-port to listen on.
  36. SyncServices(newServices map[types.NamespacedName]uint16) error
  37. // Make the new set of endpoints be active. Endpoints for services that do
  38. // not exist will be dropped. The value of the map is the number of
  39. // endpoints the service has on this node.
  40. SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
  41. }
  42. func newServiceHealthServer(hostname string, recorder record.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
  43. return &server{
  44. hostname: hostname,
  45. recorder: recorder,
  46. listener: listener,
  47. httpFactory: factory,
  48. services: map[types.NamespacedName]*hcInstance{},
  49. }
  50. }
  51. // NewServiceHealthServer allocates a new service healthcheck server manager
  52. func NewServiceHealthServer(hostname string, recorder record.EventRecorder) ServiceHealthServer {
  53. return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{})
  54. }
  55. type server struct {
  56. hostname string
  57. recorder record.EventRecorder // can be nil
  58. listener listener
  59. httpFactory httpServerFactory
  60. lock sync.RWMutex
  61. services map[types.NamespacedName]*hcInstance
  62. }
  63. func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
  64. hcs.lock.Lock()
  65. defer hcs.lock.Unlock()
  66. // Remove any that are not needed any more.
  67. for nsn, svc := range hcs.services {
  68. if port, found := newServices[nsn]; !found || port != svc.port {
  69. klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
  70. if err := svc.listener.Close(); err != nil {
  71. klog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
  72. }
  73. delete(hcs.services, nsn)
  74. }
  75. }
  76. // Add any that are needed.
  77. for nsn, port := range newServices {
  78. if hcs.services[nsn] != nil {
  79. klog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
  80. continue
  81. }
  82. klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
  83. svc := &hcInstance{port: port}
  84. addr := fmt.Sprintf(":%d", port)
  85. svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
  86. var err error
  87. svc.listener, err = hcs.listener.Listen(addr)
  88. if err != nil {
  89. msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
  90. if hcs.recorder != nil {
  91. hcs.recorder.Eventf(
  92. &v1.ObjectReference{
  93. Kind: "Service",
  94. Namespace: nsn.Namespace,
  95. Name: nsn.Name,
  96. UID: types.UID(nsn.String()),
  97. }, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
  98. }
  99. klog.Error(msg)
  100. continue
  101. }
  102. hcs.services[nsn] = svc
  103. go func(nsn types.NamespacedName, svc *hcInstance) {
  104. // Serve() will exit when the listener is closed.
  105. klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
  106. if err := svc.server.Serve(svc.listener); err != nil {
  107. klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
  108. return
  109. }
  110. klog.V(3).Infof("Healthcheck %q closed", nsn.String())
  111. }(nsn, svc)
  112. }
  113. return nil
  114. }
  115. type hcInstance struct {
  116. port uint16
  117. listener net.Listener
  118. server httpServer
  119. endpoints int // number of local endpoints for a service
  120. }
  121. type hcHandler struct {
  122. name types.NamespacedName
  123. hcs *server
  124. }
  125. var _ http.Handler = hcHandler{}
  126. func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
  127. h.hcs.lock.RLock()
  128. svc, ok := h.hcs.services[h.name]
  129. if !ok || svc == nil {
  130. h.hcs.lock.RUnlock()
  131. klog.Errorf("Received request for closed healthcheck %q", h.name.String())
  132. return
  133. }
  134. count := svc.endpoints
  135. h.hcs.lock.RUnlock()
  136. resp.Header().Set("Content-Type", "application/json")
  137. resp.Header().Set("X-Content-Type-Options", "nosniff")
  138. if count == 0 {
  139. resp.WriteHeader(http.StatusServiceUnavailable)
  140. } else {
  141. resp.WriteHeader(http.StatusOK)
  142. }
  143. fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
  144. {
  145. "service": {
  146. "namespace": %q,
  147. "name": %q
  148. },
  149. "localEndpoints": %d
  150. }
  151. `, h.name.Namespace, h.name.Name, count)), "\n"))
  152. }
  153. func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
  154. hcs.lock.Lock()
  155. defer hcs.lock.Unlock()
  156. for nsn, count := range newEndpoints {
  157. if hcs.services[nsn] == nil {
  158. klog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
  159. continue
  160. }
  161. klog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
  162. hcs.services[nsn].endpoints = count
  163. }
  164. for nsn, hci := range hcs.services {
  165. if _, found := newEndpoints[nsn]; !found {
  166. hci.endpoints = 0
  167. }
  168. }
  169. return nil
  170. }
  171. // FakeServiceHealthServer is a fake ServiceHealthServer for test programs
  172. type FakeServiceHealthServer struct{}
  173. // NewFakeServiceHealthServer allocates a new fake service healthcheck server manager
  174. func NewFakeServiceHealthServer() ServiceHealthServer {
  175. return FakeServiceHealthServer{}
  176. }
  177. // SyncServices is part of ServiceHealthServer
  178. func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
  179. return nil
  180. }
  181. // SyncEndpoints is part of ServiceHealthServer
  182. func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
  183. return nil
  184. }