server.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package server
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "fmt"
  18. "io"
  19. "net"
  20. "net/http"
  21. "net/http/pprof"
  22. "net/url"
  23. "path"
  24. "reflect"
  25. goruntime "runtime"
  26. "strconv"
  27. "strings"
  28. "time"
  29. restful "github.com/emicklei/go-restful"
  30. cadvisormetrics "github.com/google/cadvisor/container"
  31. cadvisorapi "github.com/google/cadvisor/info/v1"
  32. "github.com/google/cadvisor/metrics"
  33. "github.com/prometheus/client_golang/prometheus"
  34. "github.com/prometheus/client_golang/prometheus/promhttp"
  35. "google.golang.org/grpc"
  36. "k8s.io/klog"
  37. v1 "k8s.io/api/core/v1"
  38. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  39. "k8s.io/apimachinery/pkg/runtime"
  40. "k8s.io/apimachinery/pkg/runtime/schema"
  41. "k8s.io/apimachinery/pkg/types"
  42. "k8s.io/apimachinery/pkg/util/proxy"
  43. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  44. "k8s.io/apiserver/pkg/authentication/authenticator"
  45. "k8s.io/apiserver/pkg/authorization/authorizer"
  46. "k8s.io/apiserver/pkg/server/healthz"
  47. "k8s.io/apiserver/pkg/server/httplog"
  48. "k8s.io/apiserver/pkg/server/routes"
  49. "k8s.io/apiserver/pkg/util/flushwriter"
  50. "k8s.io/component-base/logs"
  51. compbasemetrics "k8s.io/component-base/metrics"
  52. "k8s.io/kubernetes/pkg/api/legacyscheme"
  53. api "k8s.io/kubernetes/pkg/apis/core"
  54. "k8s.io/kubernetes/pkg/apis/core/v1/validation"
  55. "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
  56. podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
  57. "k8s.io/kubernetes/pkg/kubelet/apis/resourcemetrics/v1alpha1"
  58. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  59. "k8s.io/kubernetes/pkg/kubelet/prober"
  60. servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
  61. "k8s.io/kubernetes/pkg/kubelet/server/portforward"
  62. remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
  63. "k8s.io/kubernetes/pkg/kubelet/server/stats"
  64. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  65. kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
  66. "k8s.io/kubernetes/pkg/kubelet/util"
  67. "k8s.io/kubernetes/pkg/util/configz"
  68. )
  69. const (
  70. metricsPath = "/metrics"
  71. cadvisorMetricsPath = "/metrics/cadvisor"
  72. resourceMetricsPathPrefix = "/metrics/resource"
  73. proberMetricsPath = "/metrics/probes"
  74. specPath = "/spec/"
  75. statsPath = "/stats/"
  76. logsPath = "/logs/"
  77. )
  78. // Server is a http.Handler which exposes kubelet functionality over HTTP.
  79. type Server struct {
  80. auth AuthInterface
  81. host HostInterface
  82. restfulCont containerInterface
  83. resourceAnalyzer stats.ResourceAnalyzer
  84. redirectContainerStreaming bool
  85. }
  86. // TLSOptions holds the TLS options.
  87. type TLSOptions struct {
  88. Config *tls.Config
  89. CertFile string
  90. KeyFile string
  91. }
  92. // containerInterface defines the restful.Container functions used on the root container
  93. type containerInterface interface {
  94. Add(service *restful.WebService) *restful.Container
  95. Handle(path string, handler http.Handler)
  96. Filter(filter restful.FilterFunction)
  97. ServeHTTP(w http.ResponseWriter, r *http.Request)
  98. RegisteredWebServices() []*restful.WebService
  99. // RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
  100. // Used to test filters are being applied on non-web-service handlers
  101. RegisteredHandlePaths() []string
  102. }
  103. // filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
  104. // so we can ensure restful.FilterFunctions are used for all handlers
  105. type filteringContainer struct {
  106. *restful.Container
  107. registeredHandlePaths []string
  108. }
  109. func (a *filteringContainer) Handle(path string, handler http.Handler) {
  110. a.HandleWithFilter(path, handler)
  111. a.registeredHandlePaths = append(a.registeredHandlePaths, path)
  112. }
  113. func (a *filteringContainer) RegisteredHandlePaths() []string {
  114. return a.registeredHandlePaths
  115. }
  116. // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
  117. func ListenAndServeKubeletServer(
  118. host HostInterface,
  119. resourceAnalyzer stats.ResourceAnalyzer,
  120. address net.IP,
  121. port uint,
  122. tlsOptions *TLSOptions,
  123. auth AuthInterface,
  124. enableCAdvisorJSONEndpoints,
  125. enableDebuggingHandlers,
  126. enableContentionProfiling,
  127. redirectContainerStreaming bool,
  128. criHandler http.Handler) {
  129. klog.Infof("Starting to listen on %s:%d", address, port)
  130. handler := NewServer(host, resourceAnalyzer, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler)
  131. s := &http.Server{
  132. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  133. Handler: &handler,
  134. MaxHeaderBytes: 1 << 20,
  135. }
  136. if tlsOptions != nil {
  137. s.TLSConfig = tlsOptions.Config
  138. // Passing empty strings as the cert and key files means no
  139. // cert/keys are specified and GetCertificate in the TLSConfig
  140. // should be called instead.
  141. klog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile))
  142. } else {
  143. klog.Fatal(s.ListenAndServe())
  144. }
  145. }
  146. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
  147. func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
  148. klog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
  149. s := NewServer(host, resourceAnalyzer, nil, enableCAdvisorJSONEndpoints, false, false, false, nil)
  150. server := &http.Server{
  151. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  152. Handler: &s,
  153. MaxHeaderBytes: 1 << 20,
  154. }
  155. klog.Fatal(server.ListenAndServe())
  156. }
  157. // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
  158. func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
  159. server := grpc.NewServer()
  160. podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
  161. l, err := util.CreateListener(socket)
  162. if err != nil {
  163. klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)
  164. }
  165. klog.Fatal(server.Serve(l))
  166. }
  167. // AuthInterface contains all methods required by the auth filters
  168. type AuthInterface interface {
  169. authenticator.Request
  170. authorizer.RequestAttributesGetter
  171. authorizer.Authorizer
  172. }
  173. // HostInterface contains all the kubelet methods required by the server.
  174. // For testability.
  175. type HostInterface interface {
  176. stats.Provider
  177. GetVersionInfo() (*cadvisorapi.VersionInfo, error)
  178. GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
  179. GetRunningPods() ([]*v1.Pod, error)
  180. RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
  181. GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
  182. ServeLogs(w http.ResponseWriter, req *http.Request)
  183. ResyncInterval() time.Duration
  184. GetHostname() string
  185. LatestLoopEntryTime() time.Time
  186. GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
  187. GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
  188. GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
  189. }
  190. // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
  191. func NewServer(
  192. host HostInterface,
  193. resourceAnalyzer stats.ResourceAnalyzer,
  194. auth AuthInterface,
  195. enableCAdvisorJSONEndpoints,
  196. enableDebuggingHandlers,
  197. enableContentionProfiling,
  198. redirectContainerStreaming bool,
  199. criHandler http.Handler) Server {
  200. server := Server{
  201. host: host,
  202. resourceAnalyzer: resourceAnalyzer,
  203. auth: auth,
  204. restfulCont: &filteringContainer{Container: restful.NewContainer()},
  205. redirectContainerStreaming: redirectContainerStreaming,
  206. }
  207. if auth != nil {
  208. server.InstallAuthFilter()
  209. }
  210. server.InstallDefaultHandlers(enableCAdvisorJSONEndpoints)
  211. if enableDebuggingHandlers {
  212. server.InstallDebuggingHandlers(criHandler)
  213. if enableContentionProfiling {
  214. goruntime.SetBlockProfileRate(1)
  215. }
  216. } else {
  217. server.InstallDebuggingDisabledHandlers()
  218. }
  219. return server
  220. }
  221. // InstallAuthFilter installs authentication filters with the restful Container.
  222. func (s *Server) InstallAuthFilter() {
  223. s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  224. // Authenticate
  225. info, ok, err := s.auth.AuthenticateRequest(req.Request)
  226. if err != nil {
  227. klog.Errorf("Unable to authenticate the request due to an error: %v", err)
  228. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  229. return
  230. }
  231. if !ok {
  232. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  233. return
  234. }
  235. // Get authorization attributes
  236. attrs := s.auth.GetRequestAttributes(info.User, req.Request)
  237. // Authorize
  238. decision, _, err := s.auth.Authorize(attrs)
  239. if err != nil {
  240. msg := fmt.Sprintf("Authorization error (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
  241. klog.Errorf(msg, err)
  242. resp.WriteErrorString(http.StatusInternalServerError, msg)
  243. return
  244. }
  245. if decision != authorizer.DecisionAllow {
  246. msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
  247. klog.V(2).Info(msg)
  248. resp.WriteErrorString(http.StatusForbidden, msg)
  249. return
  250. }
  251. // Continue
  252. chain.ProcessFilter(req, resp)
  253. })
  254. }
  255. // InstallDefaultHandlers registers the default set of supported HTTP request
  256. // patterns with the restful Container.
  257. func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
  258. healthz.InstallHandler(s.restfulCont,
  259. healthz.PingHealthz,
  260. healthz.LogHealthz,
  261. healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
  262. )
  263. ws := new(restful.WebService)
  264. ws.
  265. Path("/pods").
  266. Produces(restful.MIME_JSON)
  267. ws.Route(ws.GET("").
  268. To(s.getPods).
  269. Operation("getPods"))
  270. s.restfulCont.Add(ws)
  271. s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer, enableCAdvisorJSONEndpoints))
  272. s.restfulCont.Handle(metricsPath, prometheus.Handler())
  273. // cAdvisor metrics are exposed under the secured handler as well
  274. r := prometheus.NewRegistry()
  275. includedMetrics := cadvisormetrics.MetricSet{
  276. cadvisormetrics.CpuUsageMetrics: struct{}{},
  277. cadvisormetrics.MemoryUsageMetrics: struct{}{},
  278. cadvisormetrics.CpuLoadMetrics: struct{}{},
  279. cadvisormetrics.DiskIOMetrics: struct{}{},
  280. cadvisormetrics.DiskUsageMetrics: struct{}{},
  281. cadvisormetrics.NetworkUsageMetrics: struct{}{},
  282. cadvisormetrics.AcceleratorUsageMetrics: struct{}{},
  283. cadvisormetrics.AppMetrics: struct{}{},
  284. }
  285. r.MustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics))
  286. s.restfulCont.Handle(cadvisorMetricsPath,
  287. promhttp.HandlerFor(r, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
  288. )
  289. v1alpha1ResourceRegistry := prometheus.NewRegistry()
  290. v1alpha1ResourceRegistry.MustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
  291. s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version),
  292. promhttp.HandlerFor(v1alpha1ResourceRegistry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
  293. )
  294. // prober metrics are exposed under a different endpoint
  295. p := prometheus.NewRegistry()
  296. compbasemetrics.RegisterProcessStartTime(p)
  297. p.MustRegister(prober.ProberResults)
  298. s.restfulCont.Handle(proberMetricsPath,
  299. promhttp.HandlerFor(p, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}),
  300. )
  301. if enableCAdvisorJSONEndpoints {
  302. ws := new(restful.WebService)
  303. ws.
  304. Path(specPath).
  305. Produces(restful.MIME_JSON)
  306. ws.Route(ws.GET("").
  307. To(s.getSpec).
  308. Operation("getSpec").
  309. Writes(cadvisorapi.MachineInfo{}))
  310. s.restfulCont.Add(ws)
  311. }
  312. }
  313. const pprofBasePath = "/debug/pprof/"
  314. // InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
  315. func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  316. klog.Infof("Adding debug handlers to kubelet server.")
  317. ws := new(restful.WebService)
  318. ws.
  319. Path("/run")
  320. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  321. To(s.getRun).
  322. Operation("getRun"))
  323. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  324. To(s.getRun).
  325. Operation("getRun"))
  326. s.restfulCont.Add(ws)
  327. ws = new(restful.WebService)
  328. ws.
  329. Path("/exec")
  330. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  331. To(s.getExec).
  332. Operation("getExec"))
  333. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  334. To(s.getExec).
  335. Operation("getExec"))
  336. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  337. To(s.getExec).
  338. Operation("getExec"))
  339. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  340. To(s.getExec).
  341. Operation("getExec"))
  342. s.restfulCont.Add(ws)
  343. ws = new(restful.WebService)
  344. ws.
  345. Path("/attach")
  346. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  347. To(s.getAttach).
  348. Operation("getAttach"))
  349. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  350. To(s.getAttach).
  351. Operation("getAttach"))
  352. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  353. To(s.getAttach).
  354. Operation("getAttach"))
  355. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  356. To(s.getAttach).
  357. Operation("getAttach"))
  358. s.restfulCont.Add(ws)
  359. ws = new(restful.WebService)
  360. ws.
  361. Path("/portForward")
  362. ws.Route(ws.GET("/{podNamespace}/{podID}").
  363. To(s.getPortForward).
  364. Operation("getPortForward"))
  365. ws.Route(ws.POST("/{podNamespace}/{podID}").
  366. To(s.getPortForward).
  367. Operation("getPortForward"))
  368. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
  369. To(s.getPortForward).
  370. Operation("getPortForward"))
  371. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
  372. To(s.getPortForward).
  373. Operation("getPortForward"))
  374. s.restfulCont.Add(ws)
  375. ws = new(restful.WebService)
  376. ws.
  377. Path(logsPath)
  378. ws.Route(ws.GET("").
  379. To(s.getLogs).
  380. Operation("getLogs"))
  381. ws.Route(ws.GET("/{logpath:*}").
  382. To(s.getLogs).
  383. Operation("getLogs").
  384. Param(ws.PathParameter("logpath", "path to the log").DataType("string")))
  385. s.restfulCont.Add(ws)
  386. ws = new(restful.WebService)
  387. ws.
  388. Path("/containerLogs")
  389. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  390. To(s.getContainerLogs).
  391. Operation("getContainerLogs"))
  392. s.restfulCont.Add(ws)
  393. configz.InstallHandler(s.restfulCont)
  394. handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
  395. name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
  396. switch name {
  397. case "profile":
  398. pprof.Profile(resp, req.Request)
  399. case "symbol":
  400. pprof.Symbol(resp, req.Request)
  401. case "cmdline":
  402. pprof.Cmdline(resp, req.Request)
  403. case "trace":
  404. pprof.Trace(resp, req.Request)
  405. default:
  406. pprof.Index(resp, req.Request)
  407. }
  408. }
  409. // Setup pprof handlers.
  410. ws = new(restful.WebService).Path(pprofBasePath)
  411. ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
  412. handlePprofEndpoint(req, resp)
  413. })).Doc("pprof endpoint")
  414. s.restfulCont.Add(ws)
  415. // Setup flags handlers.
  416. // so far, only logging related endpoints are considered valid to add for these debug flags.
  417. s.restfulCont.Handle("/debug/flags/v", routes.StringFlagPutHandler(logs.GlogSetter))
  418. // The /runningpods endpoint is used for testing only.
  419. ws = new(restful.WebService)
  420. ws.
  421. Path("/runningpods/").
  422. Produces(restful.MIME_JSON)
  423. ws.Route(ws.GET("").
  424. To(s.getRunningPods).
  425. Operation("getRunningPods"))
  426. s.restfulCont.Add(ws)
  427. if criHandler != nil {
  428. s.restfulCont.Handle("/cri/", criHandler)
  429. }
  430. }
  431. // InstallDebuggingDisabledHandlers registers the HTTP request patterns that provide better error message
  432. func (s *Server) InstallDebuggingDisabledHandlers() {
  433. h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  434. http.Error(w, "Debug endpoints are disabled.", http.StatusMethodNotAllowed)
  435. })
  436. paths := []string{
  437. "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/",
  438. "/runningpods/", pprofBasePath, logsPath}
  439. for _, p := range paths {
  440. s.restfulCont.Handle(p, h)
  441. }
  442. }
  443. // Checks if kubelet's sync loop that updates containers is working.
  444. func (s *Server) syncLoopHealthCheck(req *http.Request) error {
  445. duration := s.host.ResyncInterval() * 2
  446. minDuration := time.Minute * 5
  447. if duration < minDuration {
  448. duration = minDuration
  449. }
  450. enterLoopTime := s.host.LatestLoopEntryTime()
  451. if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
  452. return fmt.Errorf("sync Loop took longer than expected")
  453. }
  454. return nil
  455. }
  456. // getContainerLogs handles containerLogs request against the Kubelet
  457. func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
  458. podNamespace := request.PathParameter("podNamespace")
  459. podID := request.PathParameter("podID")
  460. containerName := request.PathParameter("containerName")
  461. ctx := request.Request.Context()
  462. if len(podID) == 0 {
  463. // TODO: Why return JSON when the rest return plaintext errors?
  464. // TODO: Why return plaintext errors?
  465. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
  466. return
  467. }
  468. if len(containerName) == 0 {
  469. // TODO: Why return JSON when the rest return plaintext errors?
  470. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
  471. return
  472. }
  473. if len(podNamespace) == 0 {
  474. // TODO: Why return JSON when the rest return plaintext errors?
  475. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
  476. return
  477. }
  478. query := request.Request.URL.Query()
  479. // backwards compatibility for the "tail" query parameter
  480. if tail := request.QueryParameter("tail"); len(tail) > 0 {
  481. query["tailLines"] = []string{tail}
  482. // "all" is the same as omitting tail
  483. if tail == "all" {
  484. delete(query, "tailLines")
  485. }
  486. }
  487. // container logs on the kubelet are locked to the v1 API version of PodLogOptions
  488. logOptions := &v1.PodLogOptions{}
  489. if err := legacyscheme.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
  490. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
  491. return
  492. }
  493. logOptions.TypeMeta = metav1.TypeMeta{}
  494. if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
  495. response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
  496. return
  497. }
  498. pod, ok := s.host.GetPodByName(podNamespace, podID)
  499. if !ok {
  500. response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
  501. return
  502. }
  503. // Check if containerName is valid.
  504. containerExists := false
  505. for _, container := range pod.Spec.Containers {
  506. if container.Name == containerName {
  507. containerExists = true
  508. break
  509. }
  510. }
  511. if !containerExists {
  512. for _, container := range pod.Spec.InitContainers {
  513. if container.Name == containerName {
  514. containerExists = true
  515. break
  516. }
  517. }
  518. }
  519. if !containerExists {
  520. response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
  521. return
  522. }
  523. if _, ok := response.ResponseWriter.(http.Flusher); !ok {
  524. response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
  525. return
  526. }
  527. fw := flushwriter.Wrap(response.ResponseWriter)
  528. response.Header().Set("Transfer-Encoding", "chunked")
  529. if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
  530. response.WriteError(http.StatusBadRequest, err)
  531. return
  532. }
  533. }
  534. // encodePods creates an v1.PodList object from pods and returns the encoded
  535. // PodList.
  536. func encodePods(pods []*v1.Pod) (data []byte, err error) {
  537. podList := new(v1.PodList)
  538. for _, pod := range pods {
  539. podList.Items = append(podList.Items, *pod)
  540. }
  541. // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
  542. // as API server refactor.
  543. // TODO: Locked to v1, needs to be made generic
  544. codec := legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
  545. return runtime.Encode(codec, podList)
  546. }
  547. // getPods returns a list of pods bound to the Kubelet and their spec.
  548. func (s *Server) getPods(request *restful.Request, response *restful.Response) {
  549. pods := s.host.GetPods()
  550. data, err := encodePods(pods)
  551. if err != nil {
  552. response.WriteError(http.StatusInternalServerError, err)
  553. return
  554. }
  555. writeJSONResponse(response, data)
  556. }
  557. // getRunningPods returns a list of pods running on Kubelet. The list is
  558. // provided by the container runtime, and is different from the list returned
  559. // by getPods, which is a set of desired pods to run.
  560. func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
  561. pods, err := s.host.GetRunningPods()
  562. if err != nil {
  563. response.WriteError(http.StatusInternalServerError, err)
  564. return
  565. }
  566. data, err := encodePods(pods)
  567. if err != nil {
  568. response.WriteError(http.StatusInternalServerError, err)
  569. return
  570. }
  571. writeJSONResponse(response, data)
  572. }
  573. // getLogs handles logs requests against the Kubelet.
  574. func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
  575. s.host.ServeLogs(response, request.Request)
  576. }
  577. // getSpec handles spec requests against the Kubelet.
  578. func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
  579. info, err := s.host.GetCachedMachineInfo()
  580. if err != nil {
  581. response.WriteError(http.StatusInternalServerError, err)
  582. return
  583. }
  584. response.WriteEntity(info)
  585. }
  586. type execRequestParams struct {
  587. podNamespace string
  588. podName string
  589. podUID types.UID
  590. containerName string
  591. cmd []string
  592. }
  593. func getExecRequestParams(req *restful.Request) execRequestParams {
  594. return execRequestParams{
  595. podNamespace: req.PathParameter("podNamespace"),
  596. podName: req.PathParameter("podID"),
  597. podUID: types.UID(req.PathParameter("uid")),
  598. containerName: req.PathParameter("containerName"),
  599. cmd: req.Request.URL.Query()[api.ExecCommandParam],
  600. }
  601. }
  602. type portForwardRequestParams struct {
  603. podNamespace string
  604. podName string
  605. podUID types.UID
  606. }
  607. func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
  608. return portForwardRequestParams{
  609. podNamespace: req.PathParameter("podNamespace"),
  610. podName: req.PathParameter("podID"),
  611. podUID: types.UID(req.PathParameter("uid")),
  612. }
  613. }
  614. type responder struct {
  615. errorMessage string
  616. }
  617. func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
  618. klog.Errorf("Error while proxying request: %v", err)
  619. http.Error(w, err.Error(), http.StatusInternalServerError)
  620. }
  621. // proxyStream proxies stream to url.
  622. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
  623. // TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
  624. handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
  625. handler.ServeHTTP(w, r)
  626. }
  627. // getAttach handles requests to attach to a container.
  628. func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
  629. params := getExecRequestParams(request)
  630. streamOpts, err := remotecommandserver.NewOptions(request.Request)
  631. if err != nil {
  632. utilruntime.HandleError(err)
  633. response.WriteError(http.StatusBadRequest, err)
  634. return
  635. }
  636. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  637. if !ok {
  638. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  639. return
  640. }
  641. podFullName := kubecontainer.GetPodFullName(pod)
  642. url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
  643. if err != nil {
  644. streaming.WriteError(err, response.ResponseWriter)
  645. return
  646. }
  647. if s.redirectContainerStreaming {
  648. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  649. return
  650. }
  651. proxyStream(response.ResponseWriter, request.Request, url)
  652. }
  653. // getExec handles requests to run a command inside a container.
  654. func (s *Server) getExec(request *restful.Request, response *restful.Response) {
  655. params := getExecRequestParams(request)
  656. streamOpts, err := remotecommandserver.NewOptions(request.Request)
  657. if err != nil {
  658. utilruntime.HandleError(err)
  659. response.WriteError(http.StatusBadRequest, err)
  660. return
  661. }
  662. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  663. if !ok {
  664. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  665. return
  666. }
  667. podFullName := kubecontainer.GetPodFullName(pod)
  668. url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
  669. if err != nil {
  670. streaming.WriteError(err, response.ResponseWriter)
  671. return
  672. }
  673. if s.redirectContainerStreaming {
  674. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  675. return
  676. }
  677. proxyStream(response.ResponseWriter, request.Request, url)
  678. }
  679. // getRun handles requests to run a command inside a container.
  680. func (s *Server) getRun(request *restful.Request, response *restful.Response) {
  681. params := getExecRequestParams(request)
  682. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  683. if !ok {
  684. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  685. return
  686. }
  687. // For legacy reasons, run uses different query param than exec.
  688. params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
  689. data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
  690. if err != nil {
  691. response.WriteError(http.StatusInternalServerError, err)
  692. return
  693. }
  694. writeJSONResponse(response, data)
  695. }
  696. // Derived from go-restful writeJSON.
  697. func writeJSONResponse(response *restful.Response, data []byte) {
  698. if data == nil {
  699. response.WriteHeader(http.StatusOK)
  700. // do not write a nil representation
  701. return
  702. }
  703. response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
  704. response.WriteHeader(http.StatusOK)
  705. if _, err := response.Write(data); err != nil {
  706. klog.Errorf("Error writing response: %v", err)
  707. }
  708. }
  709. // getPortForward handles a new restful port forward request. It determines the
  710. // pod name and uid and then calls ServePortForward.
  711. func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
  712. params := getPortForwardRequestParams(request)
  713. portForwardOptions, err := portforward.NewV4Options(request.Request)
  714. if err != nil {
  715. utilruntime.HandleError(err)
  716. response.WriteError(http.StatusBadRequest, err)
  717. return
  718. }
  719. pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
  720. if !ok {
  721. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  722. return
  723. }
  724. if len(params.podUID) > 0 && pod.UID != params.podUID {
  725. response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
  726. return
  727. }
  728. url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
  729. if err != nil {
  730. streaming.WriteError(err, response.ResponseWriter)
  731. return
  732. }
  733. if s.redirectContainerStreaming {
  734. http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
  735. return
  736. }
  737. proxyStream(response.ResponseWriter, request.Request, url)
  738. }
  739. // trimURLPath trims a URL path.
  740. // For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
  741. // For all other paths, the first part of the path is returned.
  742. func trimURLPath(path string) string {
  743. parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 3)
  744. if len(parts) == 0 {
  745. return path
  746. }
  747. if parts[0] == "metrics" && len(parts) > 1 {
  748. return fmt.Sprintf("%s/%s", parts[0], parts[1])
  749. }
  750. return parts[0]
  751. }
  752. var longRunningRequestPathMap = map[string]bool{
  753. "exec": true,
  754. "attach": true,
  755. "portforward": true,
  756. "debug": true,
  757. }
  758. // isLongRunningRequest determines whether the request is long-running or not.
  759. func isLongRunningRequest(path string) bool {
  760. _, ok := longRunningRequestPathMap[path]
  761. return ok
  762. }
  763. var statusesNoTracePred = httplog.StatusIsNot(
  764. http.StatusOK,
  765. http.StatusFound,
  766. http.StatusMovedPermanently,
  767. http.StatusTemporaryRedirect,
  768. http.StatusBadRequest,
  769. http.StatusNotFound,
  770. http.StatusSwitchingProtocols,
  771. )
  772. // ServeHTTP responds to HTTP requests on the Kubelet.
  773. func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  774. defer httplog.NewLogged(req, &w).StacktraceWhen(statusesNoTracePred).Log()
  775. // monitor http requests
  776. var serverType string
  777. if s.auth == nil {
  778. serverType = "readonly"
  779. } else {
  780. serverType = "readwrite"
  781. }
  782. method, path := req.Method, trimURLPath(req.URL.Path)
  783. longRunning := strconv.FormatBool(isLongRunningRequest(path))
  784. servermetrics.HTTPRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  785. servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  786. defer servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Dec()
  787. startTime := time.Now()
  788. defer servermetrics.HTTPRequestsDuration.WithLabelValues(method, path, serverType, longRunning).Observe(servermetrics.SinceInSeconds(startTime))
  789. s.restfulCont.ServeHTTP(w, req)
  790. }
  791. // prometheusHostAdapter adapts the HostInterface to the interface expected by the
  792. // cAdvisor prometheus collector.
  793. type prometheusHostAdapter struct {
  794. host HostInterface
  795. }
  796. func (a prometheusHostAdapter) SubcontainersInfo(containerName string, query *cadvisorapi.ContainerInfoRequest) ([]*cadvisorapi.ContainerInfo, error) {
  797. all, err := a.host.GetRawContainerInfo(containerName, query, true)
  798. items := make([]*cadvisorapi.ContainerInfo, 0, len(all))
  799. for _, v := range all {
  800. items = append(items, v)
  801. }
  802. return items, err
  803. }
  804. func (a prometheusHostAdapter) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
  805. return a.host.GetVersionInfo()
  806. }
  807. func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error) {
  808. return a.host.GetCachedMachineInfo()
  809. }
  810. func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
  811. // containerPrometheusLabels maps cAdvisor labels to prometheus labels.
  812. return func(c *cadvisorapi.ContainerInfo) map[string]string {
  813. // Prometheus requires that all metrics in the same family have the same labels,
  814. // so we arrange to supply blank strings for missing labels
  815. var name, image, podName, namespace, containerName string
  816. if len(c.Aliases) > 0 {
  817. name = c.Aliases[0]
  818. }
  819. image = c.Spec.Image
  820. if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNameLabel]; ok {
  821. podName = v
  822. }
  823. if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNamespaceLabel]; ok {
  824. namespace = v
  825. }
  826. if v, ok := c.Spec.Labels[kubelettypes.KubernetesContainerNameLabel]; ok {
  827. containerName = v
  828. }
  829. // Associate pod cgroup with pod so we have an accurate accounting of sandbox
  830. if podName == "" && namespace == "" {
  831. if pod, found := s.GetPodByCgroupfs(c.Name); found {
  832. podName = pod.Name
  833. namespace = pod.Namespace
  834. }
  835. }
  836. set := map[string]string{
  837. metrics.LabelID: c.Name,
  838. metrics.LabelName: name,
  839. metrics.LabelImage: image,
  840. "pod_name": podName,
  841. "pod": podName,
  842. "namespace": namespace,
  843. "container_name": containerName,
  844. "container": containerName,
  845. }
  846. return set
  847. }
  848. }