server.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package server
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "fmt"
  18. "io"
  19. "net"
  20. "net/http"
  21. "net/http/pprof"
  22. "net/url"
  23. "path"
  24. "reflect"
  25. goruntime "runtime"
  26. "strconv"
  27. "strings"
  28. "time"
  29. "github.com/emicklei/go-restful"
  30. cadvisormetrics "github.com/google/cadvisor/container"
  31. cadvisorapi "github.com/google/cadvisor/info/v1"
  32. "github.com/google/cadvisor/metrics"
  33. "google.golang.org/grpc"
  34. "k8s.io/klog"
  35. "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
  36. "k8s.io/api/core/v1"
  37. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  38. "k8s.io/apimachinery/pkg/runtime"
  39. "k8s.io/apimachinery/pkg/runtime/schema"
  40. "k8s.io/apimachinery/pkg/types"
  41. "k8s.io/apimachinery/pkg/util/proxy"
  42. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  43. "k8s.io/apiserver/pkg/authentication/authenticator"
  44. "k8s.io/apiserver/pkg/authorization/authorizer"
  45. "k8s.io/apiserver/pkg/server/healthz"
  46. "k8s.io/apiserver/pkg/server/httplog"
  47. "k8s.io/apiserver/pkg/server/routes"
  48. "k8s.io/apiserver/pkg/util/flushwriter"
  49. "k8s.io/component-base/logs"
  50. compbasemetrics "k8s.io/component-base/metrics"
  51. "k8s.io/component-base/metrics/legacyregistry"
  52. "k8s.io/kubernetes/pkg/api/legacyscheme"
  53. api "k8s.io/kubernetes/pkg/apis/core"
  54. "k8s.io/kubernetes/pkg/apis/core/v1/validation"
  55. "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
  56. podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
  57. "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1"
  58. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  59. "k8s.io/kubernetes/pkg/kubelet/prober"
  60. servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
  61. "k8s.io/kubernetes/pkg/kubelet/server/portforward"
  62. remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
  63. "k8s.io/kubernetes/pkg/kubelet/server/stats"
  64. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  65. kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
  66. "k8s.io/kubernetes/pkg/kubelet/util"
  67. "k8s.io/kubernetes/pkg/util/configz"
  68. )
  69. const (
  70. metricsPath = "/metrics"
  71. cadvisorMetricsPath = "/metrics/cadvisor"
  72. resourceMetricsPath = "/metrics/resource"
  73. proberMetricsPath = "/metrics/probes"
  74. specPath = "/spec/"
  75. statsPath = "/stats/"
  76. logsPath = "/logs/"
  77. )
  78. // Server is a http.Handler which exposes kubelet functionality over HTTP.
  79. type Server struct {
  80. auth AuthInterface
  81. host HostInterface
  82. restfulCont containerInterface
  83. metricsBuckets map[string]bool
  84. resourceAnalyzer stats.ResourceAnalyzer
  85. redirectContainerStreaming bool
  86. }
  87. // TLSOptions holds the TLS options.
  88. type TLSOptions struct {
  89. Config *tls.Config
  90. CertFile string
  91. KeyFile string
  92. }
  93. // containerInterface defines the restful.Container functions used on the root container
  94. type containerInterface interface {
  95. Add(service *restful.WebService) *restful.Container
  96. Handle(path string, handler http.Handler)
  97. Filter(filter restful.FilterFunction)
  98. ServeHTTP(w http.ResponseWriter, r *http.Request)
  99. RegisteredWebServices() []*restful.WebService
  100. // RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
  101. // Used to test filters are being applied on non-web-service handlers
  102. RegisteredHandlePaths() []string
  103. }
  104. // filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
  105. // so we can ensure restful.FilterFunctions are used for all handlers
  106. type filteringContainer struct {
  107. *restful.Container
  108. registeredHandlePaths []string
  109. }
  110. func (a *filteringContainer) Handle(path string, handler http.Handler) {
  111. a.HandleWithFilter(path, handler)
  112. a.registeredHandlePaths = append(a.registeredHandlePaths, path)
  113. }
  114. func (a *filteringContainer) RegisteredHandlePaths() []string {
  115. return a.registeredHandlePaths
  116. }
  117. // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
  118. func ListenAndServeKubeletServer(
  119. host HostInterface,
  120. resourceAnalyzer stats.ResourceAnalyzer,
  121. address net.IP,
  122. port uint,
  123. tlsOptions *TLSOptions,
  124. auth AuthInterface,
  125. enableCAdvisorJSONEndpoints,
  126. enableDebuggingHandlers,
  127. enableContentionProfiling,
  128. redirectContainerStreaming bool,
  129. criHandler http.Handler) {
  130. klog.Infof("Starting to listen on %s:%d", address, port)
  131. handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler)
  132. s := &http.Server{
  133. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  134. Handler: &handler,
  135. ReadTimeout: 4 * 60 * time.Minute,
  136. WriteTimeout: 4 * 60 * time.Minute,
  137. MaxHeaderBytes: 1 << 20,
  138. }
  139. if tlsOptions != nil {
  140. s.TLSConfig = tlsOptions.Config
  141. // Passing empty strings as the cert and key files means no
  142. // cert/keys are specified and GetCertificate in the TLSConfig
  143. // should be called instead.
  144. klog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile))
  145. } else {
  146. klog.Fatal(s.ListenAndServe())
  147. }
  148. }
  149. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
  150. func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
  151. klog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
  152. s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false, nil)
  153. server := &http.Server{
  154. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  155. Handler: &s,
  156. MaxHeaderBytes: 1 << 20,
  157. }
  158. klog.Fatal(server.ListenAndServe())
  159. }
  160. // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
  161. func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
  162. server := grpc.NewServer()
  163. podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
  164. l, err := util.CreateListener(socket)
  165. if err != nil {
  166. klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)
  167. }
  168. klog.Fatal(server.Serve(l))
  169. }
  170. // AuthInterface contains all methods required by the auth filters
  171. type AuthInterface interface {
  172. authenticator.Request
  173. authorizer.RequestAttributesGetter
  174. authorizer.Authorizer
  175. }
  176. // HostInterface contains all the kubelet methods required by the server.
  177. // For testability.
  178. type HostInterface interface {
  179. stats.Provider
  180. GetVersionInfo() (*cadvisorapi.VersionInfo, error)
  181. GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
  182. GetRunningPods() ([]*v1.Pod, error)
  183. RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
  184. GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
  185. ServeLogs(w http.ResponseWriter, req *http.Request)
  186. ResyncInterval() time.Duration
  187. GetHostname() string
  188. LatestLoopEntryTime() time.Time
  189. GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
  190. GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
  191. GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
  192. }
  193. // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
  194. func NewServer(
  195. host HostInterface,
  196. resourceAnalyzer stats.ResourceAnalyzer,
  197. auth AuthInterface,
  198. enableCAdvisorJSONEndpoints,
  199. enableDebuggingHandlers,
  200. enableContentionProfiling,
  201. redirectContainerStreaming bool,
  202. criHandler http.Handler) Server {
  203. server := Server{
  204. host: host,
  205. resourceAnalyzer: resourceAnalyzer,
  206. auth: auth,
  207. restfulCont: &filteringContainer{Container: restful.NewContainer()},
  208. metricsBuckets: make(map[string]bool),
  209. redirectContainerStreaming: redirectContainerStreaming,
  210. }
  211. if auth != nil {
  212. server.InstallAuthFilter()
  213. }
  214. server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints)
  215. if enableDebuggingHandlers {
  216. server.InstallDebuggingHandlers(criHandler)
  217. if enableContentionProfiling {
  218. goruntime.SetBlockProfileRate(1)
  219. }
  220. } else {
  221. server.InstallDebuggingDisabledHandlers()
  222. }
  223. return server
  224. }
  225. // InstallAuthFilter installs authentication filters with the restful Container.
  226. func (s *Server) InstallAuthFilter() {
  227. s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  228. // Authenticate
  229. info, ok, err := s.auth.AuthenticateRequest(req.Request)
  230. if err != nil {
  231. klog.Errorf("Unable to authenticate the request due to an error: %v", err)
  232. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  233. return
  234. }
  235. if !ok {
  236. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  237. return
  238. }
  239. // Get authorization attributes
  240. attrs := s.auth.GetRequestAttributes(info.User, req.Request)
  241. // Authorize
  242. decision, _, err := s.auth.Authorize(req.Request.Context(), attrs)
  243. if err != nil {
  244. msg := fmt.Sprintf("Authorization error (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
  245. klog.Errorf(msg, err)
  246. resp.WriteErrorString(http.StatusInternalServerError, msg)
  247. return
  248. }
  249. if decision != authorizer.DecisionAllow {
  250. msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
  251. klog.V(2).Info(msg)
  252. resp.WriteErrorString(http.StatusForbidden, msg)
  253. return
  254. }
  255. // Continue
  256. chain.ProcessFilter(req, resp)
  257. })
  258. }
  259. // addMetricsBucketMatcher adds a regexp matcher and the relevant bucket to use when
  260. // it matches. Please be aware this is not thread safe and should not be used dynamically
  261. func (s *Server) addMetricsBucketMatcher(bucket string) {
  262. s.metricsBuckets[bucket] = true
  263. }
  264. // getMetricBucket find the appropriate metrics reporting bucket for the given path
  265. func (s *Server) getMetricBucket(path string) string {
  266. root := getURLRootPath(path)
  267. if s.metricsBuckets[root] == true {
  268. return root
  269. }
  270. return "Invalid path"
  271. }
  272. // InstallDefaultHandlers registers the default set of supported HTTP request
  273. // patterns with the restful Container.
  274. func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
  275. s.addMetricsBucketMatcher("healthz")
  276. healthz.InstallHandler(s.restfulCont,
  277. healthz.PingHealthz,
  278. healthz.LogHealthz,
  279. healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
  280. )
  281. s.addMetricsBucketMatcher("pods")
  282. ws := new(restful.WebService)
  283. ws.
  284. Path("/pods").
  285. Produces(restful.MIME_JSON)
  286. ws.Route(ws.GET("").
  287. To(s.getPods).
  288. Operation("getPods"))
  289. s.restfulCont.Add(ws)
  290. s.addMetricsBucketMatcher("stats")
  291. s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer, enableCAdvisorJSONEndpoints))
  292. s.addMetricsBucketMatcher("metrics")
  293. s.addMetricsBucketMatcher("metrics/cadvisor")
  294. s.addMetricsBucketMatcher("metrics/probes")
  295. s.addMetricsBucketMatcher("metrics/resource/v1alpha1")
  296. s.addMetricsBucketMatcher("metrics/resource")
  297. //lint:ignore SA1019 https://github.com/kubernetes/enhancements/issues/1206
  298. s.restfulCont.Handle(metricsPath, legacyregistry.Handler())
  299. // cAdvisor metrics are exposed under the secured handler as well
  300. r := compbasemetrics.NewKubeRegistry()
  301. includedMetrics := cadvisormetrics.MetricSet{
  302. cadvisormetrics.CpuUsageMetrics: struct{}{},
  303. cadvisormetrics.MemoryUsageMetrics: struct{}{},
  304. cadvisormetrics.CpuLoadMetrics: struct{}{},
  305. cadvisormetrics.DiskIOMetrics: struct{}{},
  306. cadvisormetrics.DiskUsageMetrics: struct{}{},
  307. cadvisormetrics.NetworkUsageMetrics: struct{}{},
  308. cadvisormetrics.AcceleratorUsageMetrics: struct{}{},
  309. cadvisormetrics.AppMetrics: struct{}{},
  310. cadvisormetrics.ProcessMetrics: struct{}{},
  311. }
  312. r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics))
  313. s.restfulCont.Handle(cadvisorMetricsPath,
  314. compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
  315. )
  316. // deprecated endpoint which will be removed in release 1.20.0+.
  317. s.addMetricsBucketMatcher("metrics/resource/v1alpha1")
  318. v1alpha1ResourceRegistry := compbasemetrics.NewKubeRegistry()
  319. v1alpha1ResourceRegistry.CustomMustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
  320. s.restfulCont.Handle(path.Join(resourceMetricsPath, v1alpha1.Version),
  321. compbasemetrics.HandlerFor(v1alpha1ResourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
  322. )
  323. s.addMetricsBucketMatcher("metrics/resource")
  324. resourceRegistry := compbasemetrics.NewKubeRegistry()
  325. resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
  326. s.restfulCont.Handle(resourceMetricsPath,
  327. compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
  328. )
  329. // prober metrics are exposed under a different endpoint
  330. s.addMetricsBucketMatcher("metrics/probes")
  331. p := compbasemetrics.NewKubeRegistry()
  332. _ = compbasemetrics.RegisterProcessStartTime(p.Register)
  333. p.MustRegister(prober.ProberResults)
  334. s.restfulCont.Handle(proberMetricsPath,
  335. compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
  336. )
  337. s.addMetricsBucketMatcher("spec")
  338. if enableCAdvisorJSONEndpoints {
  339. ws := new(restful.WebService)
  340. ws.
  341. Path(specPath).
  342. Produces(restful.MIME_JSON)
  343. ws.Route(ws.GET("").
  344. To(s.getSpec).
  345. Operation("getSpec").
  346. Writes(cadvisorapi.MachineInfo{}))
  347. s.restfulCont.Add(ws)
  348. }
  349. }
  350. const pprofBasePath = "/debug/pprof/"
  351. // InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
  352. func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  353. klog.Infof("Adding debug handlers to kubelet server.")
  354. s.addMetricsBucketMatcher("run")
  355. ws := new(restful.WebService)
  356. ws.
  357. Path("/run")
  358. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  359. To(s.getRun).
  360. Operation("getRun"))
  361. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  362. To(s.getRun).
  363. Operation("getRun"))
  364. s.restfulCont.Add(ws)
  365. s.addMetricsBucketMatcher("exec")
  366. ws = new(restful.WebService)
  367. ws.
  368. Path("/exec")
  369. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  370. To(s.getExec).
  371. Operation("getExec"))
  372. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  373. To(s.getExec).
  374. Operation("getExec"))
  375. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  376. To(s.getExec).
  377. Operation("getExec"))
  378. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  379. To(s.getExec).
  380. Operation("getExec"))
  381. s.restfulCont.Add(ws)
  382. s.addMetricsBucketMatcher("attach")
  383. ws = new(restful.WebService)
  384. ws.
  385. Path("/attach")
  386. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  387. To(s.getAttach).
  388. Operation("getAttach"))
  389. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  390. To(s.getAttach).
  391. Operation("getAttach"))
  392. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  393. To(s.getAttach).
  394. Operation("getAttach"))
  395. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  396. To(s.getAttach).
  397. Operation("getAttach"))
  398. s.restfulCont.Add(ws)
  399. s.addMetricsBucketMatcher("portForward")
  400. ws = new(restful.WebService)
  401. ws.
  402. Path("/portForward")
  403. ws.Route(ws.GET("/{podNamespace}/{podID}").
  404. To(s.getPortForward).
  405. Operation("getPortForward"))
  406. ws.Route(ws.POST("/{podNamespace}/{podID}").
  407. To(s.getPortForward).
  408. Operation("getPortForward"))
  409. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
  410. To(s.getPortForward).
  411. Operation("getPortForward"))
  412. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
  413. To(s.getPortForward).
  414. Operation("getPortForward"))
  415. s.restfulCont.Add(ws)
  416. s.addMetricsBucketMatcher("logs")
  417. ws = new(restful.WebService)
  418. ws.
  419. Path(logsPath)
  420. ws.Route(ws.GET("").
  421. To(s.getLogs).
  422. Operation("getLogs"))
  423. ws.Route(ws.GET("/{logpath:*}").
  424. To(s.getLogs).
  425. Operation("getLogs").
  426. Param(ws.PathParameter("logpath", "path to the log").DataType("string")))
  427. s.restfulCont.Add(ws)
  428. s.addMetricsBucketMatcher("containerLogs")
  429. ws = new(restful.WebService)
  430. ws.
  431. Path("/containerLogs")
  432. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  433. To(s.getContainerLogs).
  434. Operation("getContainerLogs"))
  435. s.restfulCont.Add(ws)
  436. s.addMetricsBucketMatcher("configz")
  437. configz.InstallHandler(s.restfulCont)
  438. s.addMetricsBucketMatcher("debug")
  439. handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
  440. name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
  441. switch name {
  442. case "profile":
  443. pprof.Profile(resp, req.Request)
  444. case "symbol":
  445. pprof.Symbol(resp, req.Request)
  446. case "cmdline":
  447. pprof.Cmdline(resp, req.Request)
  448. case "trace":
  449. pprof.Trace(resp, req.Request)
  450. default:
  451. pprof.Index(resp, req.Request)
  452. }
  453. }
  454. // Setup pprof handlers.
  455. ws = new(restful.WebService).Path(pprofBasePath)
  456. ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
  457. handlePprofEndpoint(req, resp)
  458. })).Doc("pprof endpoint")
  459. s.restfulCont.Add(ws)
  460. // Setup flags handlers.
  461. // so far, only logging related endpoints are considered valid to add for these debug flags.
  462. s.restfulCont.Handle("/debug/flags/v", routes.StringFlagPutHandler(logs.GlogSetter))
  463. // The /runningpods endpoint is used for testing only.
  464. s.addMetricsBucketMatcher("runningpods")
  465. ws = new(restful.WebService)
  466. ws.
  467. Path("/runningpods/").
  468. Produces(restful.MIME_JSON)
  469. ws.Route(ws.GET("").
  470. To(s.getRunningPods).
  471. Operation("getRunningPods"))
  472. s.restfulCont.Add(ws)
  473. s.addMetricsBucketMatcher("cri")
  474. if criHandler != nil {
  475. s.restfulCont.Handle("/cri/", criHandler)
  476. }
  477. }
  478. // InstallDebuggingDisabledHandlers registers the HTTP request patterns that provide better error message
  479. func (s *Server) InstallDebuggingDisabledHandlers() {
  480. h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  481. http.Error(w, "Debug endpoints are disabled.", http.StatusMethodNotAllowed)
  482. })
  483. s.addMetricsBucketMatcher("run")
  484. s.addMetricsBucketMatcher("exec")
  485. s.addMetricsBucketMatcher("attach")
  486. s.addMetricsBucketMatcher("portForward")
  487. s.addMetricsBucketMatcher("containerLogs")
  488. s.addMetricsBucketMatcher("runningpods")
  489. s.addMetricsBucketMatcher("pprof")
  490. s.addMetricsBucketMatcher("logs")
  491. paths := []string{
  492. "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/",
  493. "/runningpods/", pprofBasePath, logsPath}
  494. for _, p := range paths {
  495. s.restfulCont.Handle(p, h)
  496. }
  497. }
  498. // Checks if kubelet's sync loop that updates containers is working.
  499. func (s *Server) syncLoopHealthCheck(req *http.Request) error {
  500. duration := s.host.ResyncInterval() * 2
  501. minDuration := time.Minute * 5
  502. if duration < minDuration {
  503. duration = minDuration
  504. }
  505. enterLoopTime := s.host.LatestLoopEntryTime()
  506. if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
  507. return fmt.Errorf("sync Loop took longer than expected")
  508. }
  509. return nil
  510. }
  511. // getContainerLogs handles containerLogs request against the Kubelet
  512. func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
  513. podNamespace := request.PathParameter("podNamespace")
  514. podID := request.PathParameter("podID")
  515. containerName := request.PathParameter("containerName")
  516. ctx := request.Request.Context()
  517. if len(podID) == 0 {
  518. // TODO: Why return JSON when the rest return plaintext errors?
  519. // TODO: Why return plaintext errors?
  520. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
  521. return
  522. }
  523. if len(containerName) == 0 {
  524. // TODO: Why return JSON when the rest return plaintext errors?
  525. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
  526. return
  527. }
  528. if len(podNamespace) == 0 {
  529. // TODO: Why return JSON when the rest return plaintext errors?
  530. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
  531. return
  532. }
  533. query := request.Request.URL.Query()
  534. // backwards compatibility for the "tail" query parameter
  535. if tail := request.QueryParameter("tail"); len(tail) > 0 {
  536. query["tailLines"] = []string{tail}
  537. // "all" is the same as omitting tail
  538. if tail == "all" {
  539. delete(query, "tailLines")
  540. }
  541. }
  542. // container logs on the kubelet are locked to the v1 API version of PodLogOptions
  543. logOptions := &v1.PodLogOptions{}
  544. if err := legacyscheme.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
  545. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
  546. return
  547. }
  548. logOptions.TypeMeta = metav1.TypeMeta{}
  549. if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
  550. response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
  551. return
  552. }
  553. pod, ok := s.host.GetPodByName(podNamespace, podID)
  554. if !ok {
  555. response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
  556. return
  557. }
  558. // Check if containerName is valid.
  559. if kubecontainer.GetContainerSpec(pod, containerName) == nil {
  560. response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
  561. return
  562. }
  563. if _, ok := response.ResponseWriter.(http.Flusher); !ok {
  564. response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
  565. return
  566. }
  567. fw := flushwriter.Wrap(response.ResponseWriter)
  568. response.Header().Set("Transfer-Encoding", "chunked")
  569. if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
  570. response.WriteError(http.StatusBadRequest, err)
  571. return
  572. }
  573. }
  574. // encodePods creates an v1.PodList object from pods and returns the encoded
  575. // PodList.
  576. func encodePods(pods []*v1.Pod) (data []byte, err error) {
  577. podList := new(v1.PodList)
  578. for _, pod := range pods {
  579. podList.Items = append(podList.Items, *pod)
  580. }
  581. // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
  582. // as API server refactor.
  583. // TODO: Locked to v1, needs to be made generic
  584. codec := legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
  585. return runtime.Encode(codec, podList)
  586. }
  587. // getPods returns a list of pods bound to the Kubelet and their spec.
  588. func (s *Server) getPods(request *restful.Request, response *restful.Response) {
  589. pods := s.host.GetPods()
  590. data, err := encodePods(pods)
  591. if err != nil {
  592. response.WriteError(http.StatusInternalServerError, err)
  593. return
  594. }
  595. writeJSONResponse(response, data)
  596. }
  597. // getRunningPods returns a list of pods running on Kubelet. The list is
  598. // provided by the container runtime, and is different from the list returned
  599. // by getPods, which is a set of desired pods to run.
  600. func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
  601. pods, err := s.host.GetRunningPods()
  602. if err != nil {
  603. response.WriteError(http.StatusInternalServerError, err)
  604. return
  605. }
  606. data, err := encodePods(pods)
  607. if err != nil {
  608. response.WriteError(http.StatusInternalServerError, err)
  609. return
  610. }
  611. writeJSONResponse(response, data)
  612. }
  613. // getLogs handles logs requests against the Kubelet.
  614. func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
  615. s.host.ServeLogs(response, request.Request)
  616. }
  617. // getSpec handles spec requests against the Kubelet.
  618. func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
  619. info, err := s.host.GetCachedMachineInfo()
  620. if err != nil {
  621. response.WriteError(http.StatusInternalServerError, err)
  622. return
  623. }
  624. response.WriteEntity(info)
  625. }
  626. type execRequestParams struct {
  627. podNamespace string
  628. podName string
  629. podUID types.UID
  630. containerName string
  631. cmd []string
  632. }
  633. func getExecRequestParams(req *restful.Request) execRequestParams {
  634. return execRequestParams{
  635. podNamespace: req.PathParameter("podNamespace"),
  636. podName: req.PathParameter("podID"),
  637. podUID: types.UID(req.PathParameter("uid")),
  638. containerName: req.PathParameter("containerName"),
  639. cmd: req.Request.URL.Query()[api.ExecCommandParam],
  640. }
  641. }
  642. type portForwardRequestParams struct {
  643. podNamespace string
  644. podName string
  645. podUID types.UID
  646. }
  647. func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
  648. return portForwardRequestParams{
  649. podNamespace: req.PathParameter("podNamespace"),
  650. podName: req.PathParameter("podID"),
  651. podUID: types.UID(req.PathParameter("uid")),
  652. }
  653. }
  654. type responder struct{}
  655. func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
  656. klog.Errorf("Error while proxying request: %v", err)
  657. http.Error(w, err.Error(), http.StatusInternalServerError)
  658. }
  659. // proxyStream proxies stream to url.
  660. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
  661. // TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
  662. handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
  663. handler.ServeHTTP(w, r)
  664. }
  665. // getAttach handles requests to attach to a container.
  666. func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
  667. params := getExecRequestParams(request)
  668. streamOpts, err := remotecommandserver.NewOptions(request.Request)
  669. if err != nil {
  670. utilruntime.HandleError(err)
  671. response.WriteError(http.StatusBadRequest, err)
  672. return
  673. }
  674. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  675. if !ok {
  676. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  677. return
  678. }
  679. podFullName := kubecontainer.GetPodFullName(pod)
  680. url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
  681. if err != nil {
  682. streaming.WriteError(err, response.ResponseWriter)
  683. return
  684. }
  685. if s.redirectContainerStreaming {
  686. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  687. return
  688. }
  689. proxyStream(response.ResponseWriter, request.Request, url)
  690. }
  691. // getExec handles requests to run a command inside a container.
  692. func (s *Server) getExec(request *restful.Request, response *restful.Response) {
  693. params := getExecRequestParams(request)
  694. streamOpts, err := remotecommandserver.NewOptions(request.Request)
  695. if err != nil {
  696. utilruntime.HandleError(err)
  697. response.WriteError(http.StatusBadRequest, err)
  698. return
  699. }
  700. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  701. if !ok {
  702. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  703. return
  704. }
  705. podFullName := kubecontainer.GetPodFullName(pod)
  706. url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
  707. if err != nil {
  708. streaming.WriteError(err, response.ResponseWriter)
  709. return
  710. }
  711. if s.redirectContainerStreaming {
  712. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  713. return
  714. }
  715. proxyStream(response.ResponseWriter, request.Request, url)
  716. }
  717. // getRun handles requests to run a command inside a container.
  718. func (s *Server) getRun(request *restful.Request, response *restful.Response) {
  719. params := getExecRequestParams(request)
  720. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  721. if !ok {
  722. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  723. return
  724. }
  725. // For legacy reasons, run uses different query param than exec.
  726. params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
  727. data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
  728. if err != nil {
  729. response.WriteError(http.StatusInternalServerError, err)
  730. return
  731. }
  732. writeJSONResponse(response, data)
  733. }
  734. // Derived from go-restful writeJSON.
  735. func writeJSONResponse(response *restful.Response, data []byte) {
  736. if data == nil {
  737. response.WriteHeader(http.StatusOK)
  738. // do not write a nil representation
  739. return
  740. }
  741. response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
  742. response.WriteHeader(http.StatusOK)
  743. if _, err := response.Write(data); err != nil {
  744. klog.Errorf("Error writing response: %v", err)
  745. }
  746. }
  747. // getPortForward handles a new restful port forward request. It determines the
  748. // pod name and uid and then calls ServePortForward.
  749. func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
  750. params := getPortForwardRequestParams(request)
  751. portForwardOptions, err := portforward.NewV4Options(request.Request)
  752. if err != nil {
  753. utilruntime.HandleError(err)
  754. response.WriteError(http.StatusBadRequest, err)
  755. return
  756. }
  757. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  758. if !ok {
  759. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  760. return
  761. }
  762. if len(params.podUID) > 0 && pod.UID != params.podUID {
  763. response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
  764. return
  765. }
  766. url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
  767. if err != nil {
  768. streaming.WriteError(err, response.ResponseWriter)
  769. return
  770. }
  771. if s.redirectContainerStreaming {
  772. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  773. return
  774. }
  775. proxyStream(response.ResponseWriter, request.Request, url)
  776. }
  777. // getURLRootPath trims a URL path.
  778. // For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
  779. // For all other paths, the first part of the path is returned.
  780. func getURLRootPath(path string) string {
  781. parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 3)
  782. if len(parts) == 0 {
  783. return path
  784. }
  785. if parts[0] == "metrics" && len(parts) > 1 {
  786. return fmt.Sprintf("%s/%s", parts[0], parts[1])
  787. }
  788. return parts[0]
  789. }
  790. var longRunningRequestPathMap = map[string]bool{
  791. "exec": true,
  792. "attach": true,
  793. "portforward": true,
  794. "debug": true,
  795. }
  796. // isLongRunningRequest determines whether the request is long-running or not.
  797. func isLongRunningRequest(path string) bool {
  798. _, ok := longRunningRequestPathMap[path]
  799. return ok
  800. }
  801. var statusesNoTracePred = httplog.StatusIsNot(
  802. http.StatusOK,
  803. http.StatusFound,
  804. http.StatusMovedPermanently,
  805. http.StatusTemporaryRedirect,
  806. http.StatusBadRequest,
  807. http.StatusNotFound,
  808. http.StatusSwitchingProtocols,
  809. )
  810. // ServeHTTP responds to HTTP requests on the Kubelet.
  811. func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  812. handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred)
  813. // monitor http requests
  814. var serverType string
  815. if s.auth == nil {
  816. serverType = "readonly"
  817. } else {
  818. serverType = "readwrite"
  819. }
  820. method, path := req.Method, s.getMetricBucket(req.URL.Path)
  821. longRunning := strconv.FormatBool(isLongRunningRequest(path))
  822. servermetrics.HTTPRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  823. servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  824. defer servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Dec()
  825. startTime := time.Now()
  826. defer servermetrics.HTTPRequestsDuration.WithLabelValues(method, path, serverType, longRunning).Observe(servermetrics.SinceInSeconds(startTime))
  827. handler.ServeHTTP(w, req)
  828. }
  829. // prometheusHostAdapter adapts the HostInterface to the interface expected by the
  830. // cAdvisor prometheus collector.
  831. type prometheusHostAdapter struct {
  832. host HostInterface
  833. }
  834. func (a prometheusHostAdapter) SubcontainersInfo(containerName string, query *cadvisorapi.ContainerInfoRequest) ([]*cadvisorapi.ContainerInfo, error) {
  835. all, err := a.host.GetRawContainerInfo(containerName, query, true)
  836. items := make([]*cadvisorapi.ContainerInfo, 0, len(all))
  837. for _, v := range all {
  838. items = append(items, v)
  839. }
  840. return items, err
  841. }
  842. func (a prometheusHostAdapter) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
  843. return a.host.GetVersionInfo()
  844. }
  845. func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error) {
  846. return a.host.GetCachedMachineInfo()
  847. }
  848. func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
  849. // containerPrometheusLabels maps cAdvisor labels to prometheus labels.
  850. return func(c *cadvisorapi.ContainerInfo) map[string]string {
  851. // Prometheus requires that all metrics in the same family have the same labels,
  852. // so we arrange to supply blank strings for missing labels
  853. var name, image, podName, namespace, containerName string
  854. if len(c.Aliases) > 0 {
  855. name = c.Aliases[0]
  856. }
  857. image = c.Spec.Image
  858. if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNameLabel]; ok {
  859. podName = v
  860. }
  861. if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNamespaceLabel]; ok {
  862. namespace = v
  863. }
  864. if v, ok := c.Spec.Labels[kubelettypes.KubernetesContainerNameLabel]; ok {
  865. containerName = v
  866. }
  867. // Associate pod cgroup with pod so we have an accurate accounting of sandbox
  868. if podName == "" && namespace == "" {
  869. if pod, found := s.GetPodByCgroupfs(c.Name); found {
  870. podName = pod.Name
  871. namespace = pod.Namespace
  872. }
  873. }
  874. set := map[string]string{
  875. metrics.LabelID: c.Name,
  876. metrics.LabelName: name,
  877. metrics.LabelImage: image,
  878. "pod": podName,
  879. "namespace": namespace,
  880. "container": containerName,
  881. }
  882. return set
  883. }
  884. }