healthcheck.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package healthcheck
  14. import (
  15. "fmt"
  16. "net"
  17. "net/http"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/lithammer/dedent"
  23. "k8s.io/klog"
  24. "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/clock"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/client-go/tools/record"
  29. api "k8s.io/kubernetes/pkg/apis/core"
  30. )
  31. var nodeHealthzRetryInterval = 60 * time.Second
  32. // Server serves HTTP endpoints for each service name, with results
  33. // based on the endpoints. If there are 0 endpoints for a service, it returns a
  34. // 503 "Service Unavailable" error (telling LBs not to use this node). If there
  35. // are 1 or more endpoints, it returns a 200 "OK".
  36. type Server interface {
  37. // Make the new set of services be active. Services that were open before
  38. // will be closed. Services that are new will be opened. Service that
  39. // existed and are in the new set will be left alone. The value of the map
  40. // is the healthcheck-port to listen on.
  41. SyncServices(newServices map[types.NamespacedName]uint16) error
  42. // Make the new set of endpoints be active. Endpoints for services that do
  43. // not exist will be dropped. The value of the map is the number of
  44. // endpoints the service has on this node.
  45. SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
  46. }
  47. // Listener allows for testing of Server. If the Listener argument
  48. // to NewServer() is nil, the real net.Listen function will be used.
  49. type Listener interface {
  50. // Listen is very much like net.Listen, except the first arg (network) is
  51. // fixed to be "tcp".
  52. Listen(addr string) (net.Listener, error)
  53. }
  54. // HTTPServerFactory allows for testing of Server. If the
  55. // HTTPServerFactory argument to NewServer() is nil, the real
  56. // http.Server type will be used.
  57. type HTTPServerFactory interface {
  58. // New creates an instance of a type satisfying HTTPServer. This is
  59. // designed to include http.Server.
  60. New(addr string, handler http.Handler) HTTPServer
  61. }
  62. // HTTPServer allows for testing of Server.
  63. type HTTPServer interface {
  64. // Server is designed so that http.Server satisfies this interface,
  65. Serve(listener net.Listener) error
  66. }
  67. // NewServer allocates a new healthcheck server manager. If either
  68. // of the injected arguments are nil, defaults will be used.
  69. func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
  70. if listener == nil {
  71. listener = stdNetListener{}
  72. }
  73. if httpServerFactory == nil {
  74. httpServerFactory = stdHTTPServerFactory{}
  75. }
  76. return &server{
  77. hostname: hostname,
  78. recorder: recorder,
  79. listener: listener,
  80. httpFactory: httpServerFactory,
  81. services: map[types.NamespacedName]*hcInstance{},
  82. }
  83. }
  84. // Implement Listener in terms of net.Listen.
  85. type stdNetListener struct{}
  86. func (stdNetListener) Listen(addr string) (net.Listener, error) {
  87. return net.Listen("tcp", addr)
  88. }
  89. var _ Listener = stdNetListener{}
  90. // Implement HTTPServerFactory in terms of http.Server.
  91. type stdHTTPServerFactory struct{}
  92. func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
  93. return &http.Server{
  94. Addr: addr,
  95. Handler: handler,
  96. }
  97. }
  98. var _ HTTPServerFactory = stdHTTPServerFactory{}
  99. type server struct {
  100. hostname string
  101. recorder record.EventRecorder // can be nil
  102. listener Listener
  103. httpFactory HTTPServerFactory
  104. lock sync.RWMutex
  105. services map[types.NamespacedName]*hcInstance
  106. }
  107. func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
  108. hcs.lock.Lock()
  109. defer hcs.lock.Unlock()
  110. // Remove any that are not needed any more.
  111. for nsn, svc := range hcs.services {
  112. if port, found := newServices[nsn]; !found || port != svc.port {
  113. klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
  114. if err := svc.listener.Close(); err != nil {
  115. klog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
  116. }
  117. delete(hcs.services, nsn)
  118. }
  119. }
  120. // Add any that are needed.
  121. for nsn, port := range newServices {
  122. if hcs.services[nsn] != nil {
  123. klog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
  124. continue
  125. }
  126. klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
  127. svc := &hcInstance{port: port}
  128. addr := fmt.Sprintf(":%d", port)
  129. svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
  130. var err error
  131. svc.listener, err = hcs.listener.Listen(addr)
  132. if err != nil {
  133. msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
  134. if hcs.recorder != nil {
  135. hcs.recorder.Eventf(
  136. &v1.ObjectReference{
  137. Kind: "Service",
  138. Namespace: nsn.Namespace,
  139. Name: nsn.Name,
  140. UID: types.UID(nsn.String()),
  141. }, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
  142. }
  143. klog.Error(msg)
  144. continue
  145. }
  146. hcs.services[nsn] = svc
  147. go func(nsn types.NamespacedName, svc *hcInstance) {
  148. // Serve() will exit when the listener is closed.
  149. klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
  150. if err := svc.server.Serve(svc.listener); err != nil {
  151. klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
  152. return
  153. }
  154. klog.V(3).Infof("Healthcheck %q closed", nsn.String())
  155. }(nsn, svc)
  156. }
  157. return nil
  158. }
  159. type hcInstance struct {
  160. port uint16
  161. listener net.Listener
  162. server HTTPServer
  163. endpoints int // number of local endpoints for a service
  164. }
  165. type hcHandler struct {
  166. name types.NamespacedName
  167. hcs *server
  168. }
  169. var _ http.Handler = hcHandler{}
  170. func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
  171. h.hcs.lock.RLock()
  172. svc, ok := h.hcs.services[h.name]
  173. if !ok || svc == nil {
  174. h.hcs.lock.RUnlock()
  175. klog.Errorf("Received request for closed healthcheck %q", h.name.String())
  176. return
  177. }
  178. count := svc.endpoints
  179. h.hcs.lock.RUnlock()
  180. resp.Header().Set("Content-Type", "application/json")
  181. if count == 0 {
  182. resp.WriteHeader(http.StatusServiceUnavailable)
  183. } else {
  184. resp.WriteHeader(http.StatusOK)
  185. }
  186. fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
  187. {
  188. "service": {
  189. "namespace": %q,
  190. "name": %q
  191. },
  192. "localEndpoints": %d
  193. }
  194. `, h.name.Namespace, h.name.Name, count)), "\n"))
  195. }
  196. func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
  197. hcs.lock.Lock()
  198. defer hcs.lock.Unlock()
  199. for nsn, count := range newEndpoints {
  200. if hcs.services[nsn] == nil {
  201. klog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
  202. continue
  203. }
  204. klog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
  205. hcs.services[nsn].endpoints = count
  206. }
  207. for nsn, hci := range hcs.services {
  208. if _, found := newEndpoints[nsn]; !found {
  209. hci.endpoints = 0
  210. }
  211. }
  212. return nil
  213. }
  214. // HealthzUpdater allows callers to update healthz timestamp only.
  215. type HealthzUpdater interface {
  216. UpdateTimestamp()
  217. }
  218. // HealthzServer returns 200 "OK" by default. Once timestamp has been
  219. // updated, it verifies we don't exceed max no respond duration since
  220. // last update.
  221. type HealthzServer struct {
  222. listener Listener
  223. httpFactory HTTPServerFactory
  224. clock clock.Clock
  225. addr string
  226. port int32
  227. healthTimeout time.Duration
  228. recorder record.EventRecorder
  229. nodeRef *v1.ObjectReference
  230. lastUpdated atomic.Value
  231. }
  232. // NewDefaultHealthzServer returns a default healthz http server.
  233. func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
  234. return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef)
  235. }
  236. func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
  237. if listener == nil {
  238. listener = stdNetListener{}
  239. }
  240. if httpServerFactory == nil {
  241. httpServerFactory = stdHTTPServerFactory{}
  242. }
  243. if c == nil {
  244. c = clock.RealClock{}
  245. }
  246. return &HealthzServer{
  247. listener: listener,
  248. httpFactory: httpServerFactory,
  249. clock: c,
  250. addr: addr,
  251. healthTimeout: healthTimeout,
  252. recorder: recorder,
  253. nodeRef: nodeRef,
  254. }
  255. }
  256. // UpdateTimestamp updates the lastUpdated timestamp.
  257. func (hs *HealthzServer) UpdateTimestamp() {
  258. hs.lastUpdated.Store(hs.clock.Now())
  259. }
  260. // Run starts the healthz http server and returns.
  261. func (hs *HealthzServer) Run() {
  262. serveMux := http.NewServeMux()
  263. serveMux.Handle("/healthz", healthzHandler{hs: hs})
  264. server := hs.httpFactory.New(hs.addr, serveMux)
  265. go wait.Until(func() {
  266. klog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
  267. listener, err := hs.listener.Listen(hs.addr)
  268. if err != nil {
  269. msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err)
  270. if hs.recorder != nil {
  271. hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg)
  272. }
  273. klog.Error(msg)
  274. return
  275. }
  276. if err := server.Serve(listener); err != nil {
  277. klog.Errorf("Healthz closed with error: %v", err)
  278. return
  279. }
  280. klog.Error("Unexpected healthz closed.")
  281. }, nodeHealthzRetryInterval, wait.NeverStop)
  282. }
  283. type healthzHandler struct {
  284. hs *HealthzServer
  285. }
  286. func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
  287. lastUpdated := time.Time{}
  288. if val := h.hs.lastUpdated.Load(); val != nil {
  289. lastUpdated = val.(time.Time)
  290. }
  291. currentTime := h.hs.clock.Now()
  292. resp.Header().Set("Content-Type", "application/json")
  293. if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
  294. resp.WriteHeader(http.StatusServiceUnavailable)
  295. } else {
  296. resp.WriteHeader(http.StatusOK)
  297. }
  298. fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
  299. }