proxier_health.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package healthcheck
  14. import (
  15. "fmt"
  16. "net/http"
  17. "sync/atomic"
  18. "time"
  19. "k8s.io/klog"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/util/clock"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. "k8s.io/client-go/tools/record"
  24. api "k8s.io/kubernetes/pkg/apis/core"
  25. )
  26. var proxierHealthzRetryInterval = 60 * time.Second
  27. // ProxierHealthUpdater allows callers to update healthz timestamp only.
  28. type ProxierHealthUpdater interface {
  29. // QueuedUpdate should be called when the proxier receives a Service or Endpoints
  30. // event containing information that requires updating service rules.
  31. QueuedUpdate()
  32. // Updated should be called when the proxier has successfully updated the service
  33. // rules to reflect the current state.
  34. Updated()
  35. // Run starts the healthz http server and returns.
  36. Run()
  37. }
  38. var _ ProxierHealthUpdater = &proxierHealthServer{}
  39. // proxierHealthServer returns 200 "OK" by default. It verifies that the delay between
  40. // QueuedUpdate() calls and Updated() calls never exceeds healthTimeout.
  41. type proxierHealthServer struct {
  42. listener listener
  43. httpFactory httpServerFactory
  44. clock clock.Clock
  45. addr string
  46. healthTimeout time.Duration
  47. recorder record.EventRecorder
  48. nodeRef *v1.ObjectReference
  49. lastUpdated atomic.Value
  50. lastQueued atomic.Value
  51. }
  52. // NewProxierHealthServer returns a proxier health http server.
  53. func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater {
  54. return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef)
  55. }
  56. func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
  57. return &proxierHealthServer{
  58. listener: listener,
  59. httpFactory: httpServerFactory,
  60. clock: c,
  61. addr: addr,
  62. healthTimeout: healthTimeout,
  63. recorder: recorder,
  64. nodeRef: nodeRef,
  65. }
  66. }
  67. // Updated updates the lastUpdated timestamp.
  68. func (hs *proxierHealthServer) Updated() {
  69. hs.lastUpdated.Store(hs.clock.Now())
  70. }
  71. // QueuedUpdate updates the lastQueued timestamp.
  72. func (hs *proxierHealthServer) QueuedUpdate() {
  73. hs.lastQueued.Store(hs.clock.Now())
  74. }
  75. // Run starts the healthz http server and returns.
  76. func (hs *proxierHealthServer) Run() {
  77. serveMux := http.NewServeMux()
  78. serveMux.Handle("/healthz", healthzHandler{hs: hs})
  79. server := hs.httpFactory.New(hs.addr, serveMux)
  80. go wait.Until(func() {
  81. klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr)
  82. listener, err := hs.listener.Listen(hs.addr)
  83. if err != nil {
  84. msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err)
  85. if hs.recorder != nil {
  86. hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
  87. }
  88. klog.Error(msg)
  89. return
  90. }
  91. if err := server.Serve(listener); err != nil {
  92. klog.Errorf("Proxier healthz closed with error: %v", err)
  93. return
  94. }
  95. klog.Error("Unexpected proxier healthz closed.")
  96. }, proxierHealthzRetryInterval, wait.NeverStop)
  97. }
  98. type healthzHandler struct {
  99. hs *proxierHealthServer
  100. }
  101. func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
  102. var lastQueued, lastUpdated time.Time
  103. if val := h.hs.lastQueued.Load(); val != nil {
  104. lastQueued = val.(time.Time)
  105. }
  106. if val := h.hs.lastUpdated.Load(); val != nil {
  107. lastUpdated = val.(time.Time)
  108. }
  109. currentTime := h.hs.clock.Now()
  110. healthy := false
  111. switch {
  112. case lastUpdated.IsZero():
  113. // The proxy is healthy while it's starting up
  114. // TODO: this makes it useless as a readinessProbe. Consider changing
  115. // to only become healthy after the proxy is fully synced.
  116. healthy = true
  117. case lastUpdated.After(lastQueued):
  118. // We've processed all updates
  119. healthy = true
  120. case currentTime.Sub(lastQueued) < h.hs.healthTimeout:
  121. // There's an unprocessed update queued, but it's not late yet
  122. healthy = true
  123. }
  124. resp.Header().Set("Content-Type", "application/json")
  125. resp.Header().Set("X-Content-Type-Options", "nosniff")
  126. if !healthy {
  127. resp.WriteHeader(http.StatusServiceUnavailable)
  128. } else {
  129. resp.WriteHeader(http.StatusOK)
  130. // In older releases, the returned "lastUpdated" time indicated the last
  131. // time the proxier sync loop ran, even if nothing had changed. To
  132. // preserve compatibility, we use the same semantics: the returned
  133. // lastUpdated value is "recent" if the server is healthy. The kube-proxy
  134. // metrics provide more detailed information.
  135. lastUpdated = currentTime
  136. }
  137. fmt.Fprintf(resp, `{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)
  138. }