123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package server
- import (
- "context"
- "crypto/tls"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/http/pprof"
- "net/url"
- "path"
- "reflect"
- goruntime "runtime"
- "strconv"
- "strings"
- "time"
- restful "github.com/emicklei/go-restful"
- cadvisormetrics "github.com/google/cadvisor/container"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- "github.com/google/cadvisor/metrics"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "google.golang.org/grpc"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/proxy"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apiserver/pkg/authentication/authenticator"
- "k8s.io/apiserver/pkg/authorization/authorizer"
- "k8s.io/apiserver/pkg/server/healthz"
- "k8s.io/apiserver/pkg/server/httplog"
- "k8s.io/apiserver/pkg/server/routes"
- "k8s.io/apiserver/pkg/util/flushwriter"
- "k8s.io/component-base/logs"
- compbasemetrics "k8s.io/component-base/metrics"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/apis/core/v1/validation"
- "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
- podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
- "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/prober"
- servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
- "k8s.io/kubernetes/pkg/kubelet/server/portforward"
- remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
- "k8s.io/kubernetes/pkg/kubelet/server/stats"
- "k8s.io/kubernetes/pkg/kubelet/server/streaming"
- kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/kubelet/util"
- "k8s.io/kubernetes/pkg/util/configz"
- )
- const (
- metricsPath = "/metrics"
- cadvisorMetricsPath = "/metrics/cadvisor"
- resourceMetricsPathPrefix = "/metrics/resource"
- proberMetricsPath = "/metrics/probes"
- specPath = "/spec/"
- statsPath = "/stats/"
- logsPath = "/logs/"
- )
- // Server is a http.Handler which exposes kubelet functionality over HTTP.
- type Server struct {
- auth AuthInterface
- host HostInterface
- restfulCont containerInterface
- resourceAnalyzer stats.ResourceAnalyzer
- redirectContainerStreaming bool
- }
- // TLSOptions holds the TLS options.
- type TLSOptions struct {
- Config *tls.Config
- CertFile string
- KeyFile string
- }
- // containerInterface defines the restful.Container functions used on the root container
- type containerInterface interface {
- Add(service *restful.WebService) *restful.Container
- Handle(path string, handler http.Handler)
- Filter(filter restful.FilterFunction)
- ServeHTTP(w http.ResponseWriter, r *http.Request)
- RegisteredWebServices() []*restful.WebService
- // RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
- // Used to test filters are being applied on non-web-service handlers
- RegisteredHandlePaths() []string
- }
- // filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
- // so we can ensure restful.FilterFunctions are used for all handlers
- type filteringContainer struct {
- *restful.Container
- registeredHandlePaths []string
- }
- func (a *filteringContainer) Handle(path string, handler http.Handler) {
- a.HandleWithFilter(path, handler)
- a.registeredHandlePaths = append(a.registeredHandlePaths, path)
- }
- func (a *filteringContainer) RegisteredHandlePaths() []string {
- return a.registeredHandlePaths
- }
- // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
- func ListenAndServeKubeletServer(
- host HostInterface,
- resourceAnalyzer stats.ResourceAnalyzer,
- address net.IP,
- port uint,
- tlsOptions *TLSOptions,
- auth AuthInterface,
- enableCAdvisorJSONEndpoints,
- enableDebuggingHandlers,
- enableContentionProfiling,
- redirectContainerStreaming bool,
- criHandler http.Handler) {
- klog.Infof("Starting to listen on %s:%d", address, port)
- handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler)
- s := &http.Server{
- Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
- Handler: &handler,
- MaxHeaderBytes: 1 << 20,
- }
- if tlsOptions != nil {
- s.TLSConfig = tlsOptions.Config
- // Passing empty strings as the cert and key files means no
- // cert/keys are specified and GetCertificate in the TLSConfig
- // should be called instead.
- klog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile))
- } else {
- klog.Fatal(s.ListenAndServe())
- }
- }
- // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
- func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
- klog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
- s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false, nil)
- server := &http.Server{
- Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
- Handler: &s,
- MaxHeaderBytes: 1 << 20,
- }
- klog.Fatal(server.ListenAndServe())
- }
- // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
- func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
- server := grpc.NewServer()
- podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
- l, err := util.CreateListener(socket)
- if err != nil {
- klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)
- }
- klog.Fatal(server.Serve(l))
- }
- // AuthInterface contains all methods required by the auth filters
- type AuthInterface interface {
- authenticator.Request
- authorizer.RequestAttributesGetter
- authorizer.Authorizer
- }
- // HostInterface contains all the kubelet methods required by the server.
- // For testability.
- type HostInterface interface {
- stats.Provider
- GetVersionInfo() (*cadvisorapi.VersionInfo, error)
- GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
- GetRunningPods() ([]*v1.Pod, error)
- RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
- GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
- ServeLogs(w http.ResponseWriter, req *http.Request)
- ResyncInterval() time.Duration
- GetHostname() string
- LatestLoopEntryTime() time.Time
- GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
- GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
- GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
- }
- // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
- func NewServer(
- host HostInterface,
- resourceAnalyzer stats.ResourceAnalyzer,
- auth AuthInterface,
- enableCAdvisorJSONEndpoints,
- enableDebuggingHandlers,
- enableContentionProfiling,
- redirectContainerStreaming bool,
- criHandler http.Handler) Server {
- server := Server{
- host: host,
- resourceAnalyzer: resourceAnalyzer,
- auth: auth,
- restfulCont: &filteringContainer{Container: restful.NewContainer()},
- redirectContainerStreaming: redirectContainerStreaming,
- }
- if auth != nil {
- server.InstallAuthFilter()
- }
- server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints)
- if enableDebuggingHandlers {
- server.InstallDebuggingHandlers(criHandler)
- if enableContentionProfiling {
- goruntime.SetBlockProfileRate(1)
- }
- } else {
- server.InstallDebuggingDisabledHandlers()
- }
- return server
- }
- // InstallAuthFilter installs authentication filters with the restful Container.
- func (s *Server) InstallAuthFilter() {
- s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
- // Authenticate
- info, ok, err := s.auth.AuthenticateRequest(req.Request)
- if err != nil {
- klog.Errorf("Unable to authenticate the request due to an error: %v", err)
- resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
- return
- }
- if !ok {
- resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
- return
- }
- // Get authorization attributes
- attrs := s.auth.GetRequestAttributes(info.User, req.Request)
- // Authorize
- decision, _, err := s.auth.Authorize(attrs)
- if err != nil {
- msg := fmt.Sprintf("Authorization error (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
- klog.Errorf(msg, err)
- resp.WriteErrorString(http.StatusInternalServerError, msg)
- return
- }
- if decision != authorizer.DecisionAllow {
- msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
- klog.V(2).Info(msg)
- resp.WriteErrorString(http.StatusForbidden, msg)
- return
- }
- // Continue
- chain.ProcessFilter(req, resp)
- })
- }
- // InstallDefaultHandlers registers the default set of supported HTTP request
- // patterns with the restful Container.
- func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
- healthz.InstallHandler(s.restfulCont,
- healthz.PingHealthz,
- healthz.LogHealthz,
- healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
- )
- ws := new(restful.WebService)
- ws.
- Path("/pods").
- Produces(restful.MIME_JSON)
- ws.Route(ws.GET("").
- To(s.getPods).
- Operation("getPods"))
- s.restfulCont.Add(ws)
- s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer, enableCAdvisorJSONEndpoints))
- s.restfulCont.Handle(metricsPath, prometheus.Handler())
- // cAdvisor metrics are exposed under the secured handler as well
- r := prometheus.NewRegistry()
- includedMetrics := cadvisormetrics.MetricSet{
- cadvisormetrics.CpuUsageMetrics: struct{}{},
- cadvisormetrics.MemoryUsageMetrics: struct{}{},
- cadvisormetrics.CpuLoadMetrics: struct{}{},
- cadvisormetrics.DiskIOMetrics: struct{}{},
- cadvisormetrics.DiskUsageMetrics: struct{}{},
- cadvisormetrics.NetworkUsageMetrics: struct{}{},
- cadvisormetrics.AcceleratorUsageMetrics: struct{}{},
- cadvisormetrics.AppMetrics: struct{}{},
- }
- r.MustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics))
- s.restfulCont.Handle(cadvisorMetricsPath,
- promhttp.HandlerFor(r, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
- )
- v1alpha1ResourceRegistry := prometheus.NewRegistry()
- v1alpha1ResourceRegistry.MustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
- s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version),
- promhttp.HandlerFor(v1alpha1ResourceRegistry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
- )
- // prober metrics are exposed under a different endpoint
- p := prometheus.NewRegistry()
- compbasemetrics.RegisterProcessStartTime(p)
- p.MustRegister(prober.ProberResults)
- s.restfulCont.Handle(proberMetricsPath,
- promhttp.HandlerFor(p, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
- )
- if enableCAdvisorJSONEndpoints {
- ws := new(restful.WebService)
- ws.
- Path(specPath).
- Produces(restful.MIME_JSON)
- ws.Route(ws.GET("").
- To(s.getSpec).
- Operation("getSpec").
- Writes(cadvisorapi.MachineInfo{}))
- s.restfulCont.Add(ws)
- }
- }
- const pprofBasePath = "/debug/pprof/"
- // InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
- func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
- klog.Infof("Adding debug handlers to kubelet server.")
- ws := new(restful.WebService)
- ws.
- Path("/run")
- ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
- To(s.getRun).
- Operation("getRun"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
- To(s.getRun).
- Operation("getRun"))
- s.restfulCont.Add(ws)
- ws = new(restful.WebService)
- ws.
- Path("/exec")
- ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
- To(s.getExec).
- Operation("getExec"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
- To(s.getExec).
- Operation("getExec"))
- ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
- To(s.getExec).
- Operation("getExec"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
- To(s.getExec).
- Operation("getExec"))
- s.restfulCont.Add(ws)
- ws = new(restful.WebService)
- ws.
- Path("/attach")
- ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
- To(s.getAttach).
- Operation("getAttach"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
- To(s.getAttach).
- Operation("getAttach"))
- ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
- To(s.getAttach).
- Operation("getAttach"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
- To(s.getAttach).
- Operation("getAttach"))
- s.restfulCont.Add(ws)
- ws = new(restful.WebService)
- ws.
- Path("/portForward")
- ws.Route(ws.GET("/{podNamespace}/{podID}").
- To(s.getPortForward).
- Operation("getPortForward"))
- ws.Route(ws.POST("/{podNamespace}/{podID}").
- To(s.getPortForward).
- Operation("getPortForward"))
- ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
- To(s.getPortForward).
- Operation("getPortForward"))
- ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
- To(s.getPortForward).
- Operation("getPortForward"))
- s.restfulCont.Add(ws)
- ws = new(restful.WebService)
- ws.
- Path(logsPath)
- ws.Route(ws.GET("").
- To(s.getLogs).
- Operation("getLogs"))
- ws.Route(ws.GET("/{logpath:*}").
- To(s.getLogs).
- Operation("getLogs").
- Param(ws.PathParameter("logpath", "path to the log").DataType("string")))
- s.restfulCont.Add(ws)
- ws = new(restful.WebService)
- ws.
- Path("/containerLogs")
- ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
- To(s.getContainerLogs).
- Operation("getContainerLogs"))
- s.restfulCont.Add(ws)
- configz.InstallHandler(s.restfulCont)
- handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
- name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
- switch name {
- case "profile":
- pprof.Profile(resp, req.Request)
- case "symbol":
- pprof.Symbol(resp, req.Request)
- case "cmdline":
- pprof.Cmdline(resp, req.Request)
- case "trace":
- pprof.Trace(resp, req.Request)
- default:
- pprof.Index(resp, req.Request)
- }
- }
- // Setup pprof handlers.
- ws = new(restful.WebService).Path(pprofBasePath)
- ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
- handlePprofEndpoint(req, resp)
- })).Doc("pprof endpoint")
- s.restfulCont.Add(ws)
- // Setup flags handlers.
- // so far, only logging related endpoints are considered valid to add for these debug flags.
- s.restfulCont.Handle("/debug/flags/v", routes.StringFlagPutHandler(logs.GlogSetter))
- // The /runningpods endpoint is used for testing only.
- ws = new(restful.WebService)
- ws.
- Path("/runningpods/").
- Produces(restful.MIME_JSON)
- ws.Route(ws.GET("").
- To(s.getRunningPods).
- Operation("getRunningPods"))
- s.restfulCont.Add(ws)
- if criHandler != nil {
- s.restfulCont.Handle("/cri/", criHandler)
- }
- }
- // InstallDebuggingDisabledHandlers registers the HTTP request patterns that provide better error message
- func (s *Server) InstallDebuggingDisabledHandlers() {
- h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- http.Error(w, "Debug endpoints are disabled.", http.StatusMethodNotAllowed)
- })
- paths := []string{
- "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/",
- "/runningpods/", pprofBasePath, logsPath}
- for _, p := range paths {
- s.restfulCont.Handle(p, h)
- }
- }
- // Checks if kubelet's sync loop that updates containers is working.
- func (s *Server) syncLoopHealthCheck(req *http.Request) error {
- duration := s.host.ResyncInterval() * 2
- minDuration := time.Minute * 5
- if duration < minDuration {
- duration = minDuration
- }
- enterLoopTime := s.host.LatestLoopEntryTime()
- if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
- return fmt.Errorf("sync Loop took longer than expected")
- }
- return nil
- }
- // getContainerLogs handles containerLogs request against the Kubelet
- func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
- podNamespace := request.PathParameter("podNamespace")
- podID := request.PathParameter("podID")
- containerName := request.PathParameter("containerName")
- ctx := request.Request.Context()
- if len(podID) == 0 {
- // TODO: Why return JSON when the rest return plaintext errors?
- // TODO: Why return plaintext errors?
- response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
- return
- }
- if len(containerName) == 0 {
- // TODO: Why return JSON when the rest return plaintext errors?
- response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
- return
- }
- if len(podNamespace) == 0 {
- // TODO: Why return JSON when the rest return plaintext errors?
- response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
- return
- }
- query := request.Request.URL.Query()
- // backwards compatibility for the "tail" query parameter
- if tail := request.QueryParameter("tail"); len(tail) > 0 {
- query["tailLines"] = []string{tail}
- // "all" is the same as omitting tail
- if tail == "all" {
- delete(query, "tailLines")
- }
- }
- // container logs on the kubelet are locked to the v1 API version of PodLogOptions
- logOptions := &v1.PodLogOptions{}
- if err := legacyscheme.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
- response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
- return
- }
- logOptions.TypeMeta = metav1.TypeMeta{}
- if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
- response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
- return
- }
- pod, ok := s.host.GetPodByName(podNamespace, podID)
- if !ok {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
- return
- }
- // Check if containerName is valid.
- containerExists := false
- for _, container := range pod.Spec.Containers {
- if container.Name == containerName {
- containerExists = true
- break
- }
- }
- if !containerExists {
- for _, container := range pod.Spec.InitContainers {
- if container.Name == containerName {
- containerExists = true
- break
- }
- }
- }
- if !containerExists {
- response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
- return
- }
- if _, ok := response.ResponseWriter.(http.Flusher); !ok {
- response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
- return
- }
- fw := flushwriter.Wrap(response.ResponseWriter)
- response.Header().Set("Transfer-Encoding", "chunked")
- if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
- response.WriteError(http.StatusBadRequest, err)
- return
- }
- }
- // encodePods creates an v1.PodList object from pods and returns the encoded
- // PodList.
- func encodePods(pods []*v1.Pod) (data []byte, err error) {
- podList := new(v1.PodList)
- for _, pod := range pods {
- podList.Items = append(podList.Items, *pod)
- }
- // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
- // as API server refactor.
- // TODO: Locked to v1, needs to be made generic
- codec := legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
- return runtime.Encode(codec, podList)
- }
- // getPods returns a list of pods bound to the Kubelet and their spec.
- func (s *Server) getPods(request *restful.Request, response *restful.Response) {
- pods := s.host.GetPods()
- data, err := encodePods(pods)
- if err != nil {
- response.WriteError(http.StatusInternalServerError, err)
- return
- }
- writeJSONResponse(response, data)
- }
- // getRunningPods returns a list of pods running on Kubelet. The list is
- // provided by the container runtime, and is different from the list returned
- // by getPods, which is a set of desired pods to run.
- func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
- pods, err := s.host.GetRunningPods()
- if err != nil {
- response.WriteError(http.StatusInternalServerError, err)
- return
- }
- data, err := encodePods(pods)
- if err != nil {
- response.WriteError(http.StatusInternalServerError, err)
- return
- }
- writeJSONResponse(response, data)
- }
- // getLogs handles logs requests against the Kubelet.
- func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
- s.host.ServeLogs(response, request.Request)
- }
- // getSpec handles spec requests against the Kubelet.
- func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
- info, err := s.host.GetCachedMachineInfo()
- if err != nil {
- response.WriteError(http.StatusInternalServerError, err)
- return
- }
- response.WriteEntity(info)
- }
- type execRequestParams struct {
- podNamespace string
- podName string
- podUID types.UID
- containerName string
- cmd []string
- }
- func getExecRequestParams(req *restful.Request) execRequestParams {
- return execRequestParams{
- podNamespace: req.PathParameter("podNamespace"),
- podName: req.PathParameter("podID"),
- podUID: types.UID(req.PathParameter("uid")),
- containerName: req.PathParameter("containerName"),
- cmd: req.Request.URL.Query()[api.ExecCommandParam],
- }
- }
- type portForwardRequestParams struct {
- podNamespace string
- podName string
- podUID types.UID
- }
- func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
- return portForwardRequestParams{
- podNamespace: req.PathParameter("podNamespace"),
- podName: req.PathParameter("podID"),
- podUID: types.UID(req.PathParameter("uid")),
- }
- }
- type responder struct {
- errorMessage string
- }
- func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
- klog.Errorf("Error while proxying request: %v", err)
- http.Error(w, err.Error(), http.StatusInternalServerError)
- }
- // proxyStream proxies stream to url.
- func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
- // TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
- handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
- handler.ServeHTTP(w, r)
- }
- // getAttach handles requests to attach to a container.
- func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
- params := getExecRequestParams(request)
- streamOpts, err := remotecommandserver.NewOptions(request.Request)
- if err != nil {
- utilruntime.HandleError(err)
- response.WriteError(http.StatusBadRequest, err)
- return
- }
- pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
- if !ok {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
- }
- podFullName := kubecontainer.GetPodFullName(pod)
- url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
- if err != nil {
- streaming.WriteError(err, response.ResponseWriter)
- return
- }
- if s.redirectContainerStreaming {
- http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
- return
- }
- proxyStream(response.ResponseWriter, request.Request, url)
- }
- // getExec handles requests to run a command inside a container.
- func (s *Server) getExec(request *restful.Request, response *restful.Response) {
- params := getExecRequestParams(request)
- streamOpts, err := remotecommandserver.NewOptions(request.Request)
- if err != nil {
- utilruntime.HandleError(err)
- response.WriteError(http.StatusBadRequest, err)
- return
- }
- pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
- if !ok {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
- }
- podFullName := kubecontainer.GetPodFullName(pod)
- url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
- if err != nil {
- streaming.WriteError(err, response.ResponseWriter)
- return
- }
- if s.redirectContainerStreaming {
- http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
- return
- }
- proxyStream(response.ResponseWriter, request.Request, url)
- }
- // getRun handles requests to run a command inside a container.
- func (s *Server) getRun(request *restful.Request, response *restful.Response) {
- params := getExecRequestParams(request)
- pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
- if !ok {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
- }
- // For legacy reasons, run uses different query param than exec.
- params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
- data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
- if err != nil {
- response.WriteError(http.StatusInternalServerError, err)
- return
- }
- writeJSONResponse(response, data)
- }
- // Derived from go-restful writeJSON.
- func writeJSONResponse(response *restful.Response, data []byte) {
- if data == nil {
- response.WriteHeader(http.StatusOK)
- // do not write a nil representation
- return
- }
- response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
- response.WriteHeader(http.StatusOK)
- if _, err := response.Write(data); err != nil {
- klog.Errorf("Error writing response: %v", err)
- }
- }
- // getPortForward handles a new restful port forward request. It determines the
- // pod name and uid and then calls ServePortForward.
- func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
- params := getPortForwardRequestParams(request)
- portForwardOptions, err := portforward.NewV4Options(request.Request)
- if err != nil {
- utilruntime.HandleError(err)
- response.WriteError(http.StatusBadRequest, err)
- return
- }
- pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
- if !ok {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
- }
- if len(params.podUID) > 0 && pod.UID != params.podUID {
- response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
- return
- }
- url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
- if err != nil {
- streaming.WriteError(err, response.ResponseWriter)
- return
- }
- if s.redirectContainerStreaming {
- http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
- return
- }
- proxyStream(response.ResponseWriter, request.Request, url)
- }
- // trimURLPath trims a URL path.
- // For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
- // For all other paths, the first part of the path is returned.
- func trimURLPath(path string) string {
- parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 3)
- if len(parts) == 0 {
- return path
- }
- if parts[0] == "metrics" && len(parts) > 1 {
- return fmt.Sprintf("%s/%s", parts[0], parts[1])
- }
- return parts[0]
- }
- var longRunningRequestPathMap = map[string]bool{
- "exec": true,
- "attach": true,
- "portforward": true,
- "debug": true,
- }
- // isLongRunningRequest determines whether the request is long-running or not.
- func isLongRunningRequest(path string) bool {
- _, ok := longRunningRequestPathMap[path]
- return ok
- }
- var statusesNoTracePred = httplog.StatusIsNot(
- http.StatusOK,
- http.StatusFound,
- http.StatusMovedPermanently,
- http.StatusTemporaryRedirect,
- http.StatusBadRequest,
- http.StatusNotFound,
- http.StatusSwitchingProtocols,
- )
- // ServeHTTP responds to HTTP requests on the Kubelet.
- func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- defer httplog.NewLogged(req, &w).StacktraceWhen(statusesNoTracePred).Log()
- // monitor http requests
- var serverType string
- if s.auth == nil {
- serverType = "readonly"
- } else {
- serverType = "readwrite"
- }
- method, path := req.Method, trimURLPath(req.URL.Path)
- longRunning := strconv.FormatBool(isLongRunningRequest(path))
- servermetrics.HTTPRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
- servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
- defer servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Dec()
- startTime := time.Now()
- defer servermetrics.HTTPRequestsDuration.WithLabelValues(method, path, serverType, longRunning).Observe(servermetrics.SinceInSeconds(startTime))
- s.restfulCont.ServeHTTP(w, req)
- }
- // prometheusHostAdapter adapts the HostInterface to the interface expected by the
- // cAdvisor prometheus collector.
- type prometheusHostAdapter struct {
- host HostInterface
- }
- func (a prometheusHostAdapter) SubcontainersInfo(containerName string, query *cadvisorapi.ContainerInfoRequest) ([]*cadvisorapi.ContainerInfo, error) {
- all, err := a.host.GetRawContainerInfo(containerName, query, true)
- items := make([]*cadvisorapi.ContainerInfo, 0, len(all))
- for _, v := range all {
- items = append(items, v)
- }
- return items, err
- }
- func (a prometheusHostAdapter) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
- return a.host.GetVersionInfo()
- }
- func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error) {
- return a.host.GetCachedMachineInfo()
- }
- func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
- // containerPrometheusLabels maps cAdvisor labels to prometheus labels.
- return func(c *cadvisorapi.ContainerInfo) map[string]string {
- // Prometheus requires that all metrics in the same family have the same labels,
- // so we arrange to supply blank strings for missing labels
- var name, image, podName, namespace, containerName string
- if len(c.Aliases) > 0 {
- name = c.Aliases[0]
- }
- image = c.Spec.Image
- if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNameLabel]; ok {
- podName = v
- }
- if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNamespaceLabel]; ok {
- namespace = v
- }
- if v, ok := c.Spec.Labels[kubelettypes.KubernetesContainerNameLabel]; ok {
- containerName = v
- }
- // Associate pod cgroup with pod so we have an accurate accounting of sandbox
- if podName == "" && namespace == "" {
- if pod, found := s.GetPodByCgroupfs(c.Name); found {
- podName = pod.Name
- namespace = pod.Namespace
- }
- }
- set := map[string]string{
- metrics.LabelID: c.Name,
- metrics.LabelName: name,
- metrics.LabelImage: image,
- "pod_name": podName,
- "pod": podName,
- "namespace": namespace,
- "container_name": containerName,
- "container": containerName,
- }
- return set
- }
- }
|