123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package healthcheck
- import (
- "fmt"
- "net"
- "net/http"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/lithammer/dedent"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/record"
- api "k8s.io/kubernetes/pkg/apis/core"
- )
- var nodeHealthzRetryInterval = 60 * time.Second
- // Server serves HTTP endpoints for each service name, with results
- // based on the endpoints. If there are 0 endpoints for a service, it returns a
- // 503 "Service Unavailable" error (telling LBs not to use this node). If there
- // are 1 or more endpoints, it returns a 200 "OK".
- type Server interface {
- // Make the new set of services be active. Services that were open before
- // will be closed. Services that are new will be opened. Service that
- // existed and are in the new set will be left alone. The value of the map
- // is the healthcheck-port to listen on.
- SyncServices(newServices map[types.NamespacedName]uint16) error
- // Make the new set of endpoints be active. Endpoints for services that do
- // not exist will be dropped. The value of the map is the number of
- // endpoints the service has on this node.
- SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
- }
- // Listener allows for testing of Server. If the Listener argument
- // to NewServer() is nil, the real net.Listen function will be used.
- type Listener interface {
- // Listen is very much like net.Listen, except the first arg (network) is
- // fixed to be "tcp".
- Listen(addr string) (net.Listener, error)
- }
- // HTTPServerFactory allows for testing of Server. If the
- // HTTPServerFactory argument to NewServer() is nil, the real
- // http.Server type will be used.
- type HTTPServerFactory interface {
- // New creates an instance of a type satisfying HTTPServer. This is
- // designed to include http.Server.
- New(addr string, handler http.Handler) HTTPServer
- }
- // HTTPServer allows for testing of Server.
- type HTTPServer interface {
- // Server is designed so that http.Server satisfies this interface,
- Serve(listener net.Listener) error
- }
- // NewServer allocates a new healthcheck server manager. If either
- // of the injected arguments are nil, defaults will be used.
- func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
- if listener == nil {
- listener = stdNetListener{}
- }
- if httpServerFactory == nil {
- httpServerFactory = stdHTTPServerFactory{}
- }
- return &server{
- hostname: hostname,
- recorder: recorder,
- listener: listener,
- httpFactory: httpServerFactory,
- services: map[types.NamespacedName]*hcInstance{},
- }
- }
- // Implement Listener in terms of net.Listen.
- type stdNetListener struct{}
- func (stdNetListener) Listen(addr string) (net.Listener, error) {
- return net.Listen("tcp", addr)
- }
- var _ Listener = stdNetListener{}
- // Implement HTTPServerFactory in terms of http.Server.
- type stdHTTPServerFactory struct{}
- func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
- return &http.Server{
- Addr: addr,
- Handler: handler,
- }
- }
- var _ HTTPServerFactory = stdHTTPServerFactory{}
- type server struct {
- hostname string
- recorder record.EventRecorder // can be nil
- listener Listener
- httpFactory HTTPServerFactory
- lock sync.RWMutex
- services map[types.NamespacedName]*hcInstance
- }
- func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
- hcs.lock.Lock()
- defer hcs.lock.Unlock()
- // Remove any that are not needed any more.
- for nsn, svc := range hcs.services {
- if port, found := newServices[nsn]; !found || port != svc.port {
- klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
- if err := svc.listener.Close(); err != nil {
- klog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
- }
- delete(hcs.services, nsn)
- }
- }
- // Add any that are needed.
- for nsn, port := range newServices {
- if hcs.services[nsn] != nil {
- klog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
- continue
- }
- klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
- svc := &hcInstance{port: port}
- addr := fmt.Sprintf(":%d", port)
- svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
- var err error
- svc.listener, err = hcs.listener.Listen(addr)
- if err != nil {
- msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
- if hcs.recorder != nil {
- hcs.recorder.Eventf(
- &v1.ObjectReference{
- Kind: "Service",
- Namespace: nsn.Namespace,
- Name: nsn.Name,
- UID: types.UID(nsn.String()),
- }, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
- }
- klog.Error(msg)
- continue
- }
- hcs.services[nsn] = svc
- go func(nsn types.NamespacedName, svc *hcInstance) {
- // Serve() will exit when the listener is closed.
- klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
- if err := svc.server.Serve(svc.listener); err != nil {
- klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
- return
- }
- klog.V(3).Infof("Healthcheck %q closed", nsn.String())
- }(nsn, svc)
- }
- return nil
- }
- type hcInstance struct {
- port uint16
- listener net.Listener
- server HTTPServer
- endpoints int // number of local endpoints for a service
- }
- type hcHandler struct {
- name types.NamespacedName
- hcs *server
- }
- var _ http.Handler = hcHandler{}
- func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- h.hcs.lock.RLock()
- svc, ok := h.hcs.services[h.name]
- if !ok || svc == nil {
- h.hcs.lock.RUnlock()
- klog.Errorf("Received request for closed healthcheck %q", h.name.String())
- return
- }
- count := svc.endpoints
- h.hcs.lock.RUnlock()
- resp.Header().Set("Content-Type", "application/json")
- if count == 0 {
- resp.WriteHeader(http.StatusServiceUnavailable)
- } else {
- resp.WriteHeader(http.StatusOK)
- }
- fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(`
- {
- "service": {
- "namespace": %q,
- "name": %q
- },
- "localEndpoints": %d
- }
- `, h.name.Namespace, h.name.Name, count)), "\n"))
- }
- func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
- hcs.lock.Lock()
- defer hcs.lock.Unlock()
- for nsn, count := range newEndpoints {
- if hcs.services[nsn] == nil {
- klog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String())
- continue
- }
- klog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String())
- hcs.services[nsn].endpoints = count
- }
- for nsn, hci := range hcs.services {
- if _, found := newEndpoints[nsn]; !found {
- hci.endpoints = 0
- }
- }
- return nil
- }
- // HealthzUpdater allows callers to update healthz timestamp only.
- type HealthzUpdater interface {
- UpdateTimestamp()
- }
- // HealthzServer returns 200 "OK" by default. Once timestamp has been
- // updated, it verifies we don't exceed max no respond duration since
- // last update.
- type HealthzServer struct {
- listener Listener
- httpFactory HTTPServerFactory
- clock clock.Clock
- addr string
- port int32
- healthTimeout time.Duration
- recorder record.EventRecorder
- nodeRef *v1.ObjectReference
- lastUpdated atomic.Value
- }
- // NewDefaultHealthzServer returns a default healthz http server.
- func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
- return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef)
- }
- func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
- if listener == nil {
- listener = stdNetListener{}
- }
- if httpServerFactory == nil {
- httpServerFactory = stdHTTPServerFactory{}
- }
- if c == nil {
- c = clock.RealClock{}
- }
- return &HealthzServer{
- listener: listener,
- httpFactory: httpServerFactory,
- clock: c,
- addr: addr,
- healthTimeout: healthTimeout,
- recorder: recorder,
- nodeRef: nodeRef,
- }
- }
- // UpdateTimestamp updates the lastUpdated timestamp.
- func (hs *HealthzServer) UpdateTimestamp() {
- hs.lastUpdated.Store(hs.clock.Now())
- }
- // Run starts the healthz http server and returns.
- func (hs *HealthzServer) Run() {
- serveMux := http.NewServeMux()
- serveMux.Handle("/healthz", healthzHandler{hs: hs})
- server := hs.httpFactory.New(hs.addr, serveMux)
- go wait.Until(func() {
- klog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
- listener, err := hs.listener.Listen(hs.addr)
- if err != nil {
- msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err)
- if hs.recorder != nil {
- hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg)
- }
- klog.Error(msg)
- return
- }
- if err := server.Serve(listener); err != nil {
- klog.Errorf("Healthz closed with error: %v", err)
- return
- }
- klog.Error("Unexpected healthz closed.")
- }, nodeHealthzRetryInterval, wait.NeverStop)
- }
- type healthzHandler struct {
- hs *HealthzServer
- }
- func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- lastUpdated := time.Time{}
- if val := h.hs.lastUpdated.Load(); val != nil {
- lastUpdated = val.(time.Time)
- }
- currentTime := h.hs.clock.Now()
- resp.Header().Set("Content-Type", "application/json")
- if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
- resp.WriteHeader(http.StatusServiceUnavailable)
- } else {
- resp.WriteHeader(http.StatusOK)
- }
- fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
- }
|