kubelet.go 89 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "fmt"
  18. "math"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "path"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. cadvisorapi "github.com/google/cadvisor/info/v1"
  30. utilexec "k8s.io/utils/exec"
  31. "k8s.io/utils/integer"
  32. "k8s.io/utils/mount"
  33. utilnet "k8s.io/utils/net"
  34. v1 "k8s.io/api/core/v1"
  35. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  36. "k8s.io/apimachinery/pkg/fields"
  37. "k8s.io/apimachinery/pkg/labels"
  38. "k8s.io/apimachinery/pkg/types"
  39. "k8s.io/apimachinery/pkg/util/clock"
  40. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  41. "k8s.io/apimachinery/pkg/util/sets"
  42. "k8s.io/apimachinery/pkg/util/wait"
  43. utilfeature "k8s.io/apiserver/pkg/util/feature"
  44. clientset "k8s.io/client-go/kubernetes"
  45. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  46. corelisters "k8s.io/client-go/listers/core/v1"
  47. "k8s.io/client-go/tools/cache"
  48. "k8s.io/client-go/tools/record"
  49. "k8s.io/client-go/util/certificate"
  50. "k8s.io/client-go/util/flowcontrol"
  51. cloudprovider "k8s.io/cloud-provider"
  52. internalapi "k8s.io/cri-api/pkg/apis"
  53. "k8s.io/klog"
  54. pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
  55. api "k8s.io/kubernetes/pkg/apis/core"
  56. "k8s.io/kubernetes/pkg/features"
  57. kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
  58. "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
  59. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  60. kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
  61. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  62. "k8s.io/kubernetes/pkg/kubelet/cloudresource"
  63. "k8s.io/kubernetes/pkg/kubelet/cm"
  64. "k8s.io/kubernetes/pkg/kubelet/config"
  65. "k8s.io/kubernetes/pkg/kubelet/configmap"
  66. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  67. "k8s.io/kubernetes/pkg/kubelet/dockershim"
  68. dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
  69. "k8s.io/kubernetes/pkg/kubelet/events"
  70. "k8s.io/kubernetes/pkg/kubelet/eviction"
  71. "k8s.io/kubernetes/pkg/kubelet/images"
  72. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
  73. "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
  74. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  75. "k8s.io/kubernetes/pkg/kubelet/logs"
  76. "k8s.io/kubernetes/pkg/kubelet/metrics"
  77. "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
  78. "k8s.io/kubernetes/pkg/kubelet/network/dns"
  79. "k8s.io/kubernetes/pkg/kubelet/nodelease"
  80. oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
  81. "k8s.io/kubernetes/pkg/kubelet/pleg"
  82. "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
  83. plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  84. kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
  85. "k8s.io/kubernetes/pkg/kubelet/preemption"
  86. "k8s.io/kubernetes/pkg/kubelet/prober"
  87. proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
  88. "k8s.io/kubernetes/pkg/kubelet/remote"
  89. "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
  90. "k8s.io/kubernetes/pkg/kubelet/secret"
  91. "k8s.io/kubernetes/pkg/kubelet/server"
  92. servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
  93. serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
  94. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  95. "k8s.io/kubernetes/pkg/kubelet/stats"
  96. "k8s.io/kubernetes/pkg/kubelet/status"
  97. "k8s.io/kubernetes/pkg/kubelet/sysctl"
  98. "k8s.io/kubernetes/pkg/kubelet/token"
  99. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  100. "k8s.io/kubernetes/pkg/kubelet/util"
  101. "k8s.io/kubernetes/pkg/kubelet/util/format"
  102. "k8s.io/kubernetes/pkg/kubelet/util/manager"
  103. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  104. "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
  105. "k8s.io/kubernetes/pkg/kubelet/volumemanager"
  106. "k8s.io/kubernetes/pkg/security/apparmor"
  107. sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
  108. utilipt "k8s.io/kubernetes/pkg/util/iptables"
  109. nodeutil "k8s.io/kubernetes/pkg/util/node"
  110. "k8s.io/kubernetes/pkg/util/oom"
  111. "k8s.io/kubernetes/pkg/util/selinux"
  112. "k8s.io/kubernetes/pkg/volume"
  113. "k8s.io/kubernetes/pkg/volume/csi"
  114. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  115. "k8s.io/kubernetes/pkg/volume/util/subpath"
  116. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  117. )
  118. const (
  119. // Max amount of time to wait for the container runtime to come up.
  120. maxWaitForContainerRuntime = 30 * time.Second
  121. // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
  122. nodeStatusUpdateRetry = 5
  123. // ContainerLogsDir is the location of container logs.
  124. ContainerLogsDir = "/var/log/containers"
  125. // MaxContainerBackOff is the max backoff period, exported for the e2e test
  126. MaxContainerBackOff = 300 * time.Second
  127. // Capacity of the channel for storing pods to kill. A small number should
  128. // suffice because a goroutine is dedicated to check the channel and does
  129. // not block on anything else.
  130. podKillingChannelCapacity = 50
  131. // Period for performing global cleanup tasks.
  132. housekeepingPeriod = time.Second * 2
  133. // Period for performing eviction monitoring.
  134. // TODO ensure this is in sync with internal cadvisor housekeeping.
  135. evictionMonitoringPeriod = time.Second * 10
  136. // The path in containers' filesystems where the hosts file is mounted.
  137. etcHostsPath = "/etc/hosts"
  138. // Capacity of the channel for receiving pod lifecycle events. This number
  139. // is a bit arbitrary and may be adjusted in the future.
  140. plegChannelCapacity = 1000
  141. // Generic PLEG relies on relisting for discovering container events.
  142. // A longer period means that kubelet will take longer to detect container
  143. // changes and to update pod status. On the other hand, a shorter period
  144. // will cause more frequent relisting (e.g., container runtime operations),
  145. // leading to higher cpu usage.
  146. // Note that even though we set the period to 1s, the relisting itself can
  147. // take more than 1s to finish if the container runtime responds slowly
  148. // and/or when there are many container changes in one cycle.
  149. plegRelistPeriod = time.Second * 1
  150. // backOffPeriod is the period to back off when pod syncing results in an
  151. // error. It is also used as the base period for the exponential backoff
  152. // container restarts and image pulls.
  153. backOffPeriod = time.Second * 10
  154. // ContainerGCPeriod is the period for performing container garbage collection.
  155. ContainerGCPeriod = time.Minute
  156. // ImageGCPeriod is the period for performing image garbage collection.
  157. ImageGCPeriod = 5 * time.Minute
  158. // Minimum number of dead containers to keep in a pod
  159. minDeadContainerInPod = 1
  160. )
  161. // SyncHandler is an interface implemented by Kubelet, for testability
  162. type SyncHandler interface {
  163. HandlePodAdditions(pods []*v1.Pod)
  164. HandlePodUpdates(pods []*v1.Pod)
  165. HandlePodRemoves(pods []*v1.Pod)
  166. HandlePodReconcile(pods []*v1.Pod)
  167. HandlePodSyncs(pods []*v1.Pod)
  168. HandlePodCleanups() error
  169. }
  170. // Option is a functional option type for Kubelet
  171. type Option func(*Kubelet)
  172. // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
  173. type Bootstrap interface {
  174. GetConfiguration() kubeletconfiginternal.KubeletConfiguration
  175. BirthCry()
  176. StartGarbageCollection()
  177. ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool)
  178. ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
  179. ListenAndServePodResources()
  180. Run(<-chan kubetypes.PodUpdate)
  181. RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
  182. }
  183. // Builder creates and initializes a Kubelet instance
  184. type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  185. kubeDeps *Dependencies,
  186. crOptions *config.ContainerRuntimeOptions,
  187. containerRuntime string,
  188. runtimeCgroups string,
  189. hostnameOverride string,
  190. nodeIP string,
  191. providerID string,
  192. cloudProvider string,
  193. certDirectory string,
  194. rootDirectory string,
  195. registerNode bool,
  196. registerWithTaints []api.Taint,
  197. allowedUnsafeSysctls []string,
  198. remoteRuntimeEndpoint string,
  199. remoteImageEndpoint string,
  200. experimentalMounterPath string,
  201. experimentalKernelMemcgNotification bool,
  202. experimentalCheckNodeCapabilitiesBeforeMount bool,
  203. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  204. minimumGCAge metav1.Duration,
  205. maxPerPodContainerCount int32,
  206. maxContainerCount int32,
  207. masterServiceNamespace string,
  208. registerSchedulable bool,
  209. nonMasqueradeCIDR string,
  210. keepTerminatedPodVolumes bool,
  211. nodeLabels map[string]string,
  212. seccompProfileRoot string,
  213. bootstrapCheckpointPath string,
  214. nodeStatusMaxImages int32) (Bootstrap, error)
  215. // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
  216. // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
  217. // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
  218. type Dependencies struct {
  219. Options []Option
  220. // Injected Dependencies
  221. Auth server.AuthInterface
  222. CAdvisorInterface cadvisor.Interface
  223. Cloud cloudprovider.Interface
  224. ContainerManager cm.ContainerManager
  225. DockerClientConfig *dockershim.ClientConfig
  226. EventClient v1core.EventsGetter
  227. HeartbeatClient clientset.Interface
  228. OnHeartbeatFailure func()
  229. KubeClient clientset.Interface
  230. Mounter mount.Interface
  231. HostUtil hostutil.HostUtils
  232. OOMAdjuster *oom.OOMAdjuster
  233. OSInterface kubecontainer.OSInterface
  234. PodConfig *config.PodConfig
  235. Recorder record.EventRecorder
  236. Subpather subpath.Interface
  237. VolumePlugins []volume.VolumePlugin
  238. DynamicPluginProber volume.DynamicPluginProber
  239. TLSOptions *server.TLSOptions
  240. KubeletConfigController *kubeletconfig.Controller
  241. RemoteRuntimeService internalapi.RuntimeService
  242. RemoteImageService internalapi.ImageManagerService
  243. criHandler http.Handler
  244. dockerLegacyService dockershim.DockerLegacyService
  245. // remove it after cadvisor.UsingLegacyCadvisorStats dropped.
  246. useLegacyCadvisorStats bool
  247. }
  248. // makePodSourceConfig creates a config.PodConfig from the given
  249. // KubeletConfiguration or returns an error.
  250. func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
  251. manifestURLHeader := make(http.Header)
  252. if len(kubeCfg.StaticPodURLHeader) > 0 {
  253. for k, v := range kubeCfg.StaticPodURLHeader {
  254. for i := range v {
  255. manifestURLHeader.Add(k, v[i])
  256. }
  257. }
  258. }
  259. // source of all configuration
  260. cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
  261. // define file config source
  262. if kubeCfg.StaticPodPath != "" {
  263. klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
  264. config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
  265. }
  266. // define url config source
  267. if kubeCfg.StaticPodURL != "" {
  268. klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
  269. config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
  270. }
  271. // Restore from the checkpoint path
  272. // NOTE: This MUST happen before creating the apiserver source
  273. // below, or the checkpoint would override the source of truth.
  274. var updatechannel chan<- interface{}
  275. if bootstrapCheckpointPath != "" {
  276. klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
  277. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  278. err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
  279. if err != nil {
  280. return nil, err
  281. }
  282. }
  283. if kubeDeps.KubeClient != nil {
  284. klog.Infof("Watching apiserver")
  285. if updatechannel == nil {
  286. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  287. }
  288. config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
  289. }
  290. return cfg, nil
  291. }
  292. // PreInitRuntimeService will init runtime service before RunKubelet.
  293. func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  294. kubeDeps *Dependencies,
  295. crOptions *config.ContainerRuntimeOptions,
  296. containerRuntime string,
  297. runtimeCgroups string,
  298. remoteRuntimeEndpoint string,
  299. remoteImageEndpoint string,
  300. nonMasqueradeCIDR string) error {
  301. if remoteRuntimeEndpoint != "" {
  302. // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
  303. if remoteImageEndpoint == "" {
  304. remoteImageEndpoint = remoteRuntimeEndpoint
  305. }
  306. }
  307. switch containerRuntime {
  308. case kubetypes.DockerContainerRuntime:
  309. // TODO: These need to become arguments to a standalone docker shim.
  310. pluginSettings := dockershim.NetworkPluginSettings{
  311. HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
  312. NonMasqueradeCIDR: nonMasqueradeCIDR,
  313. PluginName: crOptions.NetworkPluginName,
  314. PluginConfDir: crOptions.CNIConfDir,
  315. PluginBinDirString: crOptions.CNIBinDir,
  316. PluginCacheDir: crOptions.CNICacheDir,
  317. MTU: int(crOptions.NetworkPluginMTU),
  318. }
  319. // Create and start the CRI shim running as a grpc server.
  320. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
  321. ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
  322. &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
  323. if err != nil {
  324. return err
  325. }
  326. if crOptions.RedirectContainerStreaming {
  327. kubeDeps.criHandler = ds
  328. }
  329. // The unix socket for kubelet <-> dockershim communication, dockershim start before runtime service init.
  330. klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
  331. remoteRuntimeEndpoint,
  332. remoteImageEndpoint)
  333. klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  334. dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
  335. if err := dockerServer.Start(); err != nil {
  336. return err
  337. }
  338. // Create dockerLegacyService when the logging driver is not supported.
  339. supported, err := ds.IsCRISupportedLogDriver()
  340. if err != nil {
  341. return err
  342. }
  343. if !supported {
  344. kubeDeps.dockerLegacyService = ds
  345. }
  346. case kubetypes.RemoteContainerRuntime:
  347. // No-op.
  348. break
  349. default:
  350. return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
  351. }
  352. var err error
  353. if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
  354. return err
  355. }
  356. if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
  357. return err
  358. }
  359. kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint)
  360. return nil
  361. }
  362. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
  363. // No initialization of Kubelet and its modules should happen here.
  364. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  365. kubeDeps *Dependencies,
  366. crOptions *config.ContainerRuntimeOptions,
  367. containerRuntime string,
  368. hostnameOverride string,
  369. nodeIP string,
  370. providerID string,
  371. cloudProvider string,
  372. certDirectory string,
  373. rootDirectory string,
  374. registerNode bool,
  375. registerWithTaints []api.Taint,
  376. allowedUnsafeSysctls []string,
  377. experimentalMounterPath string,
  378. experimentalKernelMemcgNotification bool,
  379. experimentalCheckNodeCapabilitiesBeforeMount bool,
  380. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  381. minimumGCAge metav1.Duration,
  382. maxPerPodContainerCount int32,
  383. maxContainerCount int32,
  384. masterServiceNamespace string,
  385. registerSchedulable bool,
  386. keepTerminatedPodVolumes bool,
  387. nodeLabels map[string]string,
  388. seccompProfileRoot string,
  389. bootstrapCheckpointPath string,
  390. nodeStatusMaxImages int32) (*Kubelet, error) {
  391. if rootDirectory == "" {
  392. return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
  393. }
  394. if kubeCfg.SyncFrequency.Duration <= 0 {
  395. return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
  396. }
  397. if kubeCfg.MakeIPTablesUtilChains {
  398. if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
  399. return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
  400. }
  401. if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
  402. return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
  403. }
  404. if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
  405. return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
  406. }
  407. }
  408. hostname, err := nodeutil.GetHostname(hostnameOverride)
  409. if err != nil {
  410. return nil, err
  411. }
  412. // Query the cloud provider for our node name, default to hostname
  413. nodeName := types.NodeName(hostname)
  414. if kubeDeps.Cloud != nil {
  415. var err error
  416. instances, ok := kubeDeps.Cloud.Instances()
  417. if !ok {
  418. return nil, fmt.Errorf("failed to get instances from cloud provider")
  419. }
  420. nodeName, err = instances.CurrentNodeName(context.TODO(), hostname)
  421. if err != nil {
  422. return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
  423. }
  424. klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  425. }
  426. if kubeDeps.PodConfig == nil {
  427. var err error
  428. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
  429. if err != nil {
  430. return nil, err
  431. }
  432. }
  433. containerGCPolicy := kubecontainer.ContainerGCPolicy{
  434. MinAge: minimumGCAge.Duration,
  435. MaxPerPodContainer: int(maxPerPodContainerCount),
  436. MaxContainers: int(maxContainerCount),
  437. }
  438. daemonEndpoints := &v1.NodeDaemonEndpoints{
  439. KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
  440. }
  441. imageGCPolicy := images.ImageGCPolicy{
  442. MinAge: kubeCfg.ImageMinimumGCAge.Duration,
  443. HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
  444. LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
  445. }
  446. enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
  447. if experimentalNodeAllocatableIgnoreEvictionThreshold {
  448. // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
  449. enforceNodeAllocatable = []string{}
  450. }
  451. thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
  452. if err != nil {
  453. return nil, err
  454. }
  455. evictionConfig := eviction.Config{
  456. PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
  457. MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
  458. Thresholds: thresholds,
  459. KernelMemcgNotification: experimentalKernelMemcgNotification,
  460. PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
  461. }
  462. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  463. if kubeDeps.KubeClient != nil {
  464. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  465. r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
  466. go r.Run(wait.NeverStop)
  467. }
  468. serviceLister := corelisters.NewServiceLister(serviceIndexer)
  469. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
  470. if kubeDeps.KubeClient != nil {
  471. fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
  472. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  473. r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
  474. go r.Run(wait.NeverStop)
  475. }
  476. nodeLister := corelisters.NewNodeLister(nodeIndexer)
  477. // TODO: get the real node object of ourself,
  478. // and use the real node name and UID.
  479. // TODO: what is namespace for node?
  480. nodeRef := &v1.ObjectReference{
  481. Kind: "Node",
  482. Name: string(nodeName),
  483. UID: types.UID(nodeName),
  484. Namespace: "",
  485. }
  486. containerRefManager := kubecontainer.NewRefManager()
  487. oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
  488. if err != nil {
  489. return nil, err
  490. }
  491. clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
  492. for _, ipEntry := range kubeCfg.ClusterDNS {
  493. ip := net.ParseIP(ipEntry)
  494. if ip == nil {
  495. klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
  496. } else {
  497. clusterDNS = append(clusterDNS, ip)
  498. }
  499. }
  500. httpClient := &http.Client{}
  501. parsedNodeIP := net.ParseIP(nodeIP)
  502. protocol := utilipt.ProtocolIpv4
  503. if utilnet.IsIPv6(parsedNodeIP) {
  504. klog.V(0).Infof("IPv6 node IP (%s), assume IPv6 operation", nodeIP)
  505. protocol = utilipt.ProtocolIpv6
  506. }
  507. klet := &Kubelet{
  508. hostname: hostname,
  509. hostnameOverridden: len(hostnameOverride) > 0,
  510. nodeName: nodeName,
  511. kubeClient: kubeDeps.KubeClient,
  512. heartbeatClient: kubeDeps.HeartbeatClient,
  513. onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
  514. rootDirectory: rootDirectory,
  515. resyncInterval: kubeCfg.SyncFrequency.Duration,
  516. sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
  517. registerNode: registerNode,
  518. registerWithTaints: registerWithTaints,
  519. registerSchedulable: registerSchedulable,
  520. dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
  521. serviceLister: serviceLister,
  522. nodeLister: nodeLister,
  523. masterServiceNamespace: masterServiceNamespace,
  524. streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
  525. recorder: kubeDeps.Recorder,
  526. cadvisor: kubeDeps.CAdvisorInterface,
  527. cloud: kubeDeps.Cloud,
  528. externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
  529. providerID: providerID,
  530. nodeRef: nodeRef,
  531. nodeLabels: nodeLabels,
  532. nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
  533. nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
  534. os: kubeDeps.OSInterface,
  535. oomWatcher: oomWatcher,
  536. cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
  537. cgroupRoot: kubeCfg.CgroupRoot,
  538. mounter: kubeDeps.Mounter,
  539. hostutil: kubeDeps.HostUtil,
  540. subpather: kubeDeps.Subpather,
  541. maxPods: int(kubeCfg.MaxPods),
  542. podsPerCore: int(kubeCfg.PodsPerCore),
  543. syncLoopMonitor: atomic.Value{},
  544. daemonEndpoints: daemonEndpoints,
  545. containerManager: kubeDeps.ContainerManager,
  546. containerRuntimeName: containerRuntime,
  547. redirectContainerStreaming: crOptions.RedirectContainerStreaming,
  548. nodeIP: parsedNodeIP,
  549. nodeIPValidator: validateNodeIP,
  550. clock: clock.RealClock{},
  551. enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
  552. iptClient: utilipt.New(utilexec.New(), protocol),
  553. makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
  554. iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
  555. iptablesDropBit: int(kubeCfg.IPTablesDropBit),
  556. experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
  557. keepTerminatedPodVolumes: keepTerminatedPodVolumes,
  558. nodeStatusMaxImages: nodeStatusMaxImages,
  559. }
  560. if klet.cloud != nil {
  561. klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
  562. }
  563. var secretManager secret.Manager
  564. var configMapManager configmap.Manager
  565. switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
  566. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  567. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  568. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  569. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  570. secretManager = secret.NewCachingSecretManager(
  571. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  572. configMapManager = configmap.NewCachingConfigMapManager(
  573. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  574. case kubeletconfiginternal.GetChangeDetectionStrategy:
  575. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  576. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  577. default:
  578. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  579. }
  580. klet.secretManager = secretManager
  581. klet.configMapManager = configMapManager
  582. if klet.experimentalHostUserNamespaceDefaulting {
  583. klog.Infof("Experimental host user namespace defaulting is enabled.")
  584. }
  585. machineInfo, err := klet.cadvisor.MachineInfo()
  586. if err != nil {
  587. return nil, err
  588. }
  589. klet.machineInfo = machineInfo
  590. imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  591. klet.livenessManager = proberesults.NewManager()
  592. klet.startupManager = proberesults.NewManager()
  593. klet.podCache = kubecontainer.NewCache()
  594. var checkpointManager checkpointmanager.CheckpointManager
  595. if bootstrapCheckpointPath != "" {
  596. checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
  597. if err != nil {
  598. return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
  599. }
  600. }
  601. // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
  602. mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
  603. klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager, checkpointManager)
  604. klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
  605. klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
  606. klet.dockerLegacyService = kubeDeps.dockerLegacyService
  607. klet.criHandler = kubeDeps.criHandler
  608. klet.runtimeService = kubeDeps.RemoteRuntimeService
  609. if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
  610. klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
  611. }
  612. runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
  613. kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
  614. klet.livenessManager,
  615. klet.startupManager,
  616. seccompProfileRoot,
  617. containerRefManager,
  618. machineInfo,
  619. klet,
  620. kubeDeps.OSInterface,
  621. klet,
  622. httpClient,
  623. imageBackOff,
  624. kubeCfg.SerializeImagePulls,
  625. float32(kubeCfg.RegistryPullQPS),
  626. int(kubeCfg.RegistryBurst),
  627. kubeCfg.CPUCFSQuota,
  628. kubeCfg.CPUCFSQuotaPeriod,
  629. kubeDeps.RemoteRuntimeService,
  630. kubeDeps.RemoteImageService,
  631. kubeDeps.ContainerManager.InternalContainerLifecycle(),
  632. kubeDeps.dockerLegacyService,
  633. klet.runtimeClassManager,
  634. )
  635. if err != nil {
  636. return nil, err
  637. }
  638. klet.containerRuntime = runtime
  639. klet.streamingRuntime = runtime
  640. klet.runner = runtime
  641. runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
  642. if err != nil {
  643. return nil, err
  644. }
  645. klet.runtimeCache = runtimeCache
  646. if kubeDeps.useLegacyCadvisorStats {
  647. klet.StatsProvider = stats.NewCadvisorStatsProvider(
  648. klet.cadvisor,
  649. klet.resourceAnalyzer,
  650. klet.podManager,
  651. klet.runtimeCache,
  652. klet.containerRuntime,
  653. klet.statusManager)
  654. } else {
  655. klet.StatsProvider = stats.NewCRIStatsProvider(
  656. klet.cadvisor,
  657. klet.resourceAnalyzer,
  658. klet.podManager,
  659. klet.runtimeCache,
  660. kubeDeps.RemoteRuntimeService,
  661. kubeDeps.RemoteImageService,
  662. stats.NewLogMetricsService(),
  663. kubecontainer.RealOS{})
  664. }
  665. klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
  666. klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
  667. klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
  668. if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
  669. klog.Errorf("Pod CIDR update failed %v", err)
  670. }
  671. // setup containerGC
  672. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
  673. if err != nil {
  674. return nil, err
  675. }
  676. klet.containerGC = containerGC
  677. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
  678. // setup imageManager
  679. imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
  680. if err != nil {
  681. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  682. }
  683. klet.imageManager = imageManager
  684. if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
  685. // setup containerLogManager for CRI container runtime
  686. containerLogManager, err := logs.NewContainerLogManager(
  687. klet.runtimeService,
  688. kubeCfg.ContainerLogMaxSize,
  689. int(kubeCfg.ContainerLogMaxFiles),
  690. )
  691. if err != nil {
  692. return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
  693. }
  694. klet.containerLogManager = containerLogManager
  695. } else {
  696. klet.containerLogManager = logs.NewStubContainerLogManager()
  697. }
  698. if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
  699. klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
  700. if err != nil {
  701. return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
  702. }
  703. kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
  704. cert := klet.serverCertificateManager.Current()
  705. if cert == nil {
  706. return nil, fmt.Errorf("no serving certificate available for the kubelet")
  707. }
  708. return cert, nil
  709. }
  710. }
  711. klet.probeManager = prober.NewManager(
  712. klet.statusManager,
  713. klet.livenessManager,
  714. klet.startupManager,
  715. klet.runner,
  716. containerRefManager,
  717. kubeDeps.Recorder)
  718. tokenManager := token.NewManager(kubeDeps.KubeClient)
  719. // NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
  720. // which affects node ready status. This function must be called before Kubelet is initialized so that the Node
  721. // ReadyState is accurate with the storage state.
  722. klet.volumePluginMgr, err =
  723. NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
  724. if err != nil {
  725. return nil, err
  726. }
  727. klet.pluginManager = pluginmanager.NewPluginManager(
  728. klet.getPluginsRegistrationDir(), /* sockDir */
  729. kubeDeps.Recorder,
  730. )
  731. // If the experimentalMounterPathFlag is set, we do not want to
  732. // check node capabilities since the mount path is not the default
  733. if len(experimentalMounterPath) != 0 {
  734. experimentalCheckNodeCapabilitiesBeforeMount = false
  735. // Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
  736. // so that service name could be resolved
  737. klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
  738. }
  739. // setup volumeManager
  740. klet.volumeManager = volumemanager.NewVolumeManager(
  741. kubeCfg.EnableControllerAttachDetach,
  742. nodeName,
  743. klet.podManager,
  744. klet.statusManager,
  745. klet.kubeClient,
  746. klet.volumePluginMgr,
  747. klet.containerRuntime,
  748. kubeDeps.Mounter,
  749. kubeDeps.HostUtil,
  750. klet.getPodsDir(),
  751. kubeDeps.Recorder,
  752. experimentalCheckNodeCapabilitiesBeforeMount,
  753. keepTerminatedPodVolumes,
  754. volumepathhandler.NewBlockVolumePathHandler())
  755. klet.reasonCache = NewReasonCache()
  756. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  757. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
  758. klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  759. klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
  760. // setup eviction manager
  761. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  762. klet.evictionManager = evictionManager
  763. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  764. if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
  765. // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec.
  766. // Hence, we concatenate those two lists.
  767. safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
  768. sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
  769. if err != nil {
  770. return nil, err
  771. }
  772. klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
  773. }
  774. // enable active deadline handler
  775. activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
  776. if err != nil {
  777. return nil, err
  778. }
  779. klet.AddPodSyncLoopHandler(activeDeadlineHandler)
  780. klet.AddPodSyncHandler(activeDeadlineHandler)
  781. if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
  782. klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
  783. }
  784. criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
  785. klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
  786. // apply functional Option's
  787. for _, opt := range kubeDeps.Options {
  788. opt(klet)
  789. }
  790. klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
  791. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
  792. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
  793. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
  794. klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, klet.onRepeatedHeartbeatFailure)
  795. // Finally, put the most recent version of the config on the Kubelet, so
  796. // people can see how it was configured.
  797. klet.kubeletConfiguration = *kubeCfg
  798. // Generating the status funcs should be the last thing we do,
  799. // since this relies on the rest of the Kubelet having been constructed.
  800. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
  801. return klet, nil
  802. }
  803. type serviceLister interface {
  804. List(labels.Selector) ([]*v1.Service, error)
  805. }
  806. // Kubelet is the main kubelet implementation.
  807. type Kubelet struct {
  808. kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
  809. // hostname is the hostname the kubelet detected or was given via flag/config
  810. hostname string
  811. // hostnameOverridden indicates the hostname was overridden via flag/config
  812. hostnameOverridden bool
  813. nodeName types.NodeName
  814. runtimeCache kubecontainer.RuntimeCache
  815. kubeClient clientset.Interface
  816. heartbeatClient clientset.Interface
  817. iptClient utilipt.Interface
  818. rootDirectory string
  819. lastObservedNodeAddressesMux sync.RWMutex
  820. lastObservedNodeAddresses []v1.NodeAddress
  821. // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
  822. onRepeatedHeartbeatFailure func()
  823. // podWorkers handle syncing Pods in response to events.
  824. podWorkers PodWorkers
  825. // resyncInterval is the interval between periodic full reconciliations of
  826. // pods on this node.
  827. resyncInterval time.Duration
  828. // sourcesReady records the sources seen by the kubelet, it is thread-safe.
  829. sourcesReady config.SourcesReady
  830. // podManager is a facade that abstracts away the various sources of pods
  831. // this Kubelet services.
  832. podManager kubepod.Manager
  833. // Needed to observe and respond to situations that could impact node stability
  834. evictionManager eviction.Manager
  835. // Optional, defaults to /logs/ from /var/log
  836. logServer http.Handler
  837. // Optional, defaults to simple Docker implementation
  838. runner kubecontainer.ContainerCommandRunner
  839. // cAdvisor used for container information.
  840. cadvisor cadvisor.Interface
  841. // Set to true to have the node register itself with the apiserver.
  842. registerNode bool
  843. // List of taints to add to a node object when the kubelet registers itself.
  844. registerWithTaints []api.Taint
  845. // Set to true to have the node register itself as schedulable.
  846. registerSchedulable bool
  847. // for internal book keeping; access only from within registerWithApiserver
  848. registrationCompleted bool
  849. // dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
  850. dnsConfigurer *dns.Configurer
  851. // masterServiceNamespace is the namespace that the master service is exposed in.
  852. masterServiceNamespace string
  853. // serviceLister knows how to list services
  854. serviceLister serviceLister
  855. // nodeLister knows how to list nodes
  856. nodeLister corelisters.NodeLister
  857. // a list of node labels to register
  858. nodeLabels map[string]string
  859. // Last timestamp when runtime responded on ping.
  860. // Mutex is used to protect this value.
  861. runtimeState *runtimeState
  862. // Volume plugins.
  863. volumePluginMgr *volume.VolumePluginMgr
  864. // Handles container probing.
  865. probeManager prober.Manager
  866. // Manages container health check results.
  867. livenessManager proberesults.Manager
  868. startupManager proberesults.Manager
  869. // How long to keep idle streaming command execution/port forwarding
  870. // connections open before terminating them
  871. streamingConnectionIdleTimeout time.Duration
  872. // The EventRecorder to use
  873. recorder record.EventRecorder
  874. // Policy for handling garbage collection of dead containers.
  875. containerGC kubecontainer.ContainerGC
  876. // Manager for image garbage collection.
  877. imageManager images.ImageGCManager
  878. // Manager for container logs.
  879. containerLogManager logs.ContainerLogManager
  880. // Secret manager.
  881. secretManager secret.Manager
  882. // ConfigMap manager.
  883. configMapManager configmap.Manager
  884. // Cached MachineInfo returned by cadvisor.
  885. machineInfo *cadvisorapi.MachineInfo
  886. // Handles certificate rotations.
  887. serverCertificateManager certificate.Manager
  888. // Syncs pods statuses with apiserver; also used as a cache of statuses.
  889. statusManager status.Manager
  890. // VolumeManager runs a set of asynchronous loops that figure out which
  891. // volumes need to be attached/mounted/unmounted/detached based on the pods
  892. // scheduled on this node and makes it so.
  893. volumeManager volumemanager.VolumeManager
  894. // Cloud provider interface.
  895. cloud cloudprovider.Interface
  896. // Handles requests to cloud provider with timeout
  897. cloudResourceSyncManager cloudresource.SyncManager
  898. // Indicates that the node initialization happens in an external cloud controller
  899. externalCloudProvider bool
  900. // Reference to this node.
  901. nodeRef *v1.ObjectReference
  902. // The name of the container runtime
  903. containerRuntimeName string
  904. // redirectContainerStreaming enables container streaming redirect.
  905. redirectContainerStreaming bool
  906. // Container runtime.
  907. containerRuntime kubecontainer.Runtime
  908. // Streaming runtime handles container streaming.
  909. streamingRuntime kubecontainer.StreamingRuntime
  910. // Container runtime service (needed by container runtime Start()).
  911. // TODO(CD): try to make this available without holding a reference in this
  912. // struct. For example, by adding a getter to generic runtime.
  913. runtimeService internalapi.RuntimeService
  914. // reasonCache caches the failure reason of the last creation of all containers, which is
  915. // used for generating ContainerStatus.
  916. reasonCache *ReasonCache
  917. // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
  918. // feature is not enabled, it is also the frequency that kubelet posts node status to master.
  919. // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
  920. // in nodecontroller. There are several constraints:
  921. // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
  922. // N means number of retries allowed for kubelet to post node status. It is pointless
  923. // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
  924. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
  925. // The constant must be less than podEvictionTimeout.
  926. // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
  927. // status. Kubelet may fail to update node status reliably if the value is too small,
  928. // as it takes time to gather all necessary node information.
  929. nodeStatusUpdateFrequency time.Duration
  930. // nodeStatusReportFrequency is the frequency that kubelet posts node
  931. // status to master. It is only used when node lease feature is enabled.
  932. nodeStatusReportFrequency time.Duration
  933. // lastStatusReportTime is the time when node status was last reported.
  934. lastStatusReportTime time.Time
  935. // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
  936. // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
  937. syncNodeStatusMux sync.Mutex
  938. // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
  939. // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
  940. updatePodCIDRMux sync.Mutex
  941. // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
  942. // This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
  943. updateRuntimeMux sync.Mutex
  944. // nodeLeaseController claims and renews the node lease for this Kubelet
  945. nodeLeaseController nodelease.Controller
  946. // Generates pod events.
  947. pleg pleg.PodLifecycleEventGenerator
  948. // Store kubecontainer.PodStatus for all pods.
  949. podCache kubecontainer.Cache
  950. // os is a facade for various syscalls that need to be mocked during testing.
  951. os kubecontainer.OSInterface
  952. // Watcher of out of memory events.
  953. oomWatcher oomwatcher.Watcher
  954. // Monitor resource usage
  955. resourceAnalyzer serverstats.ResourceAnalyzer
  956. // Whether or not we should have the QOS cgroup hierarchy for resource management
  957. cgroupsPerQOS bool
  958. // If non-empty, pass this to the container runtime as the root cgroup.
  959. cgroupRoot string
  960. // Mounter to use for volumes.
  961. mounter mount.Interface
  962. // hostutil to interact with filesystems
  963. hostutil hostutil.HostUtils
  964. // subpather to execute subpath actions
  965. subpather subpath.Interface
  966. // Manager of non-Runtime containers.
  967. containerManager cm.ContainerManager
  968. // Maximum Number of Pods which can be run by this Kubelet
  969. maxPods int
  970. // Monitor Kubelet's sync loop
  971. syncLoopMonitor atomic.Value
  972. // Container restart Backoff
  973. backOff *flowcontrol.Backoff
  974. // Channel for sending pods to kill.
  975. podKillingCh chan *kubecontainer.PodPair
  976. // Information about the ports which are opened by daemons on Node running this Kubelet server.
  977. daemonEndpoints *v1.NodeDaemonEndpoints
  978. // A queue used to trigger pod workers.
  979. workQueue queue.WorkQueue
  980. // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
  981. oneTimeInitializer sync.Once
  982. // If non-nil, use this IP address for the node
  983. nodeIP net.IP
  984. // use this function to validate the kubelet nodeIP
  985. nodeIPValidator func(net.IP) error
  986. // If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
  987. providerID string
  988. // clock is an interface that provides time related functionality in a way that makes it
  989. // easy to test the code.
  990. clock clock.Clock
  991. // handlers called during the tryUpdateNodeStatus cycle
  992. setNodeStatusFuncs []func(*v1.Node) error
  993. lastNodeUnschedulableLock sync.Mutex
  994. // maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
  995. lastNodeUnschedulable bool
  996. // TODO: think about moving this to be centralized in PodWorkers in follow-on.
  997. // the list of handlers to call during pod admission.
  998. admitHandlers lifecycle.PodAdmitHandlers
  999. // softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
  1000. // run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
  1001. // rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
  1002. // admission rule should be applied by a softAdmitHandler.
  1003. softAdmitHandlers lifecycle.PodAdmitHandlers
  1004. // the list of handlers to call during pod sync loop.
  1005. lifecycle.PodSyncLoopHandlers
  1006. // the list of handlers to call during pod sync.
  1007. lifecycle.PodSyncHandlers
  1008. // the number of allowed pods per core
  1009. podsPerCore int
  1010. // enableControllerAttachDetach indicates the Attach/Detach controller
  1011. // should manage attachment/detachment of volumes scheduled to this node,
  1012. // and disable kubelet from executing any attach/detach operations
  1013. enableControllerAttachDetach bool
  1014. // trigger deleting containers in a pod
  1015. containerDeletor *podContainerDeletor
  1016. // config iptables util rules
  1017. makeIPTablesUtilChains bool
  1018. // The bit of the fwmark space to mark packets for SNAT.
  1019. iptablesMasqueradeBit int
  1020. // The bit of the fwmark space to mark packets for dropping.
  1021. iptablesDropBit int
  1022. // The AppArmor validator for checking whether AppArmor is supported.
  1023. appArmorValidator apparmor.Validator
  1024. // The handler serving CRI streaming calls (exec/attach/port-forward).
  1025. criHandler http.Handler
  1026. // experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
  1027. // are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
  1028. // or using host path volumes.
  1029. // This should only be enabled when the container runtime is performing user remapping AND if the
  1030. // experimental behavior is desired.
  1031. experimentalHostUserNamespaceDefaulting bool
  1032. // dockerLegacyService contains some legacy methods for backward compatibility.
  1033. // It should be set only when docker is using non json-file logging driver.
  1034. dockerLegacyService dockershim.DockerLegacyService
  1035. // StatsProvider provides the node and the container stats.
  1036. *stats.StatsProvider
  1037. // This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
  1038. // This can be useful for debugging volume related issues.
  1039. keepTerminatedPodVolumes bool // DEPRECATED
  1040. // pluginmanager runs a set of asynchronous loops that figure out which
  1041. // plugins need to be registered/unregistered based on this node and makes it so.
  1042. pluginManager pluginmanager.PluginManager
  1043. // This flag sets a maximum number of images to report in the node status.
  1044. nodeStatusMaxImages int32
  1045. // Handles RuntimeClass objects for the Kubelet.
  1046. runtimeClassManager *runtimeclass.Manager
  1047. }
  1048. // setupDataDirs creates:
  1049. // 1. the root directory
  1050. // 2. the pods directory
  1051. // 3. the plugins directory
  1052. // 4. the pod-resources directory
  1053. func (kl *Kubelet) setupDataDirs() error {
  1054. kl.rootDirectory = path.Clean(kl.rootDirectory)
  1055. pluginRegistrationDir := kl.getPluginsRegistrationDir()
  1056. pluginsDir := kl.getPluginsDir()
  1057. if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
  1058. return fmt.Errorf("error creating root directory: %v", err)
  1059. }
  1060. if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil {
  1061. return fmt.Errorf("error configuring root directory: %v", err)
  1062. }
  1063. if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
  1064. return fmt.Errorf("error creating pods directory: %v", err)
  1065. }
  1066. if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
  1067. return fmt.Errorf("error creating plugins directory: %v", err)
  1068. }
  1069. if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
  1070. return fmt.Errorf("error creating plugins registry directory: %v", err)
  1071. }
  1072. if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
  1073. return fmt.Errorf("error creating podresources directory: %v", err)
  1074. }
  1075. if selinux.SELinuxEnabled() {
  1076. err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
  1077. if err != nil {
  1078. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginRegistrationDir, err)
  1079. }
  1080. err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
  1081. if err != nil {
  1082. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginsDir, err)
  1083. }
  1084. }
  1085. return nil
  1086. }
  1087. // StartGarbageCollection starts garbage collection threads.
  1088. func (kl *Kubelet) StartGarbageCollection() {
  1089. loggedContainerGCFailure := false
  1090. go wait.Until(func() {
  1091. if err := kl.containerGC.GarbageCollect(); err != nil {
  1092. klog.Errorf("Container garbage collection failed: %v", err)
  1093. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
  1094. loggedContainerGCFailure = true
  1095. } else {
  1096. var vLevel klog.Level = 4
  1097. if loggedContainerGCFailure {
  1098. vLevel = 1
  1099. loggedContainerGCFailure = false
  1100. }
  1101. klog.V(vLevel).Infof("Container garbage collection succeeded")
  1102. }
  1103. }, ContainerGCPeriod, wait.NeverStop)
  1104. // when the high threshold is set to 100, stub the image GC manager
  1105. if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
  1106. klog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
  1107. return
  1108. }
  1109. prevImageGCFailed := false
  1110. go wait.Until(func() {
  1111. if err := kl.imageManager.GarbageCollect(); err != nil {
  1112. if prevImageGCFailed {
  1113. klog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
  1114. // Only create an event for repeated failures
  1115. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
  1116. } else {
  1117. klog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
  1118. }
  1119. prevImageGCFailed = true
  1120. } else {
  1121. var vLevel klog.Level = 4
  1122. if prevImageGCFailed {
  1123. vLevel = 1
  1124. prevImageGCFailed = false
  1125. }
  1126. klog.V(vLevel).Infof("Image garbage collection succeeded")
  1127. }
  1128. }, ImageGCPeriod, wait.NeverStop)
  1129. }
  1130. // initializeModules will initialize internal modules that do not require the container runtime to be up.
  1131. // Note that the modules here must not depend on modules that are not initialized here.
  1132. func (kl *Kubelet) initializeModules() error {
  1133. // Prometheus metrics.
  1134. metrics.Register(
  1135. kl.runtimeCache,
  1136. collectors.NewVolumeStatsCollector(kl),
  1137. collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
  1138. )
  1139. metrics.SetNodeName(kl.nodeName)
  1140. servermetrics.Register()
  1141. // Setup filesystem directories.
  1142. if err := kl.setupDataDirs(); err != nil {
  1143. return err
  1144. }
  1145. // If the container logs directory does not exist, create it.
  1146. if _, err := os.Stat(ContainerLogsDir); err != nil {
  1147. if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
  1148. klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
  1149. }
  1150. }
  1151. // Start the image manager.
  1152. kl.imageManager.Start()
  1153. // Start the certificate manager if it was enabled.
  1154. if kl.serverCertificateManager != nil {
  1155. kl.serverCertificateManager.Start()
  1156. }
  1157. // Start out of memory watcher.
  1158. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
  1159. return fmt.Errorf("failed to start OOM watcher %v", err)
  1160. }
  1161. // Start resource analyzer
  1162. kl.resourceAnalyzer.Start()
  1163. return nil
  1164. }
  1165. // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
  1166. func (kl *Kubelet) initializeRuntimeDependentModules() {
  1167. if err := kl.cadvisor.Start(); err != nil {
  1168. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1169. // TODO(random-liu): Add backoff logic in the babysitter
  1170. klog.Fatalf("Failed to start cAdvisor %v", err)
  1171. }
  1172. // trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
  1173. // ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
  1174. kl.StatsProvider.GetCgroupStats("/", true)
  1175. // Start container manager.
  1176. node, err := kl.getNodeAnyWay()
  1177. if err != nil {
  1178. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1179. klog.Fatalf("Kubelet failed to get node info: %v", err)
  1180. }
  1181. // containerManager must start after cAdvisor because it needs filesystem capacity information
  1182. if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
  1183. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1184. klog.Fatalf("Failed to start ContainerManager %v", err)
  1185. }
  1186. // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
  1187. kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
  1188. // container log manager must start after container runtime is up to retrieve information from container runtime
  1189. // and inform container to reopen log file after log rotation.
  1190. kl.containerLogManager.Start()
  1191. // Adding Registration Callback function for CSI Driver
  1192. kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
  1193. // Adding Registration Callback function for Device Manager
  1194. kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
  1195. // Start the plugin manager
  1196. klog.V(4).Infof("starting plugin manager")
  1197. go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
  1198. }
  1199. // Run starts the kubelet reacting to config updates
  1200. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  1201. if kl.logServer == nil {
  1202. kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
  1203. }
  1204. if kl.kubeClient == nil {
  1205. klog.Warning("No api server defined - no node status update will be sent.")
  1206. }
  1207. // Start the cloud provider sync manager
  1208. if kl.cloudResourceSyncManager != nil {
  1209. go kl.cloudResourceSyncManager.Run(wait.NeverStop)
  1210. }
  1211. if err := kl.initializeModules(); err != nil {
  1212. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  1213. klog.Fatal(err)
  1214. }
  1215. // Start volume manager
  1216. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  1217. if kl.kubeClient != nil {
  1218. // Start syncing node status immediately, this may set up things the runtime needs to run.
  1219. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  1220. go kl.fastStatusUpdateOnce()
  1221. // start syncing lease
  1222. go kl.nodeLeaseController.Run(wait.NeverStop)
  1223. }
  1224. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  1225. // Set up iptables util rules
  1226. if kl.makeIPTablesUtilChains {
  1227. kl.initNetworkUtil()
  1228. }
  1229. // Start a goroutine responsible for killing pods (that are not properly
  1230. // handled by pod workers).
  1231. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
  1232. // Start component sync loops.
  1233. kl.statusManager.Start()
  1234. kl.probeManager.Start()
  1235. // Start syncing RuntimeClasses if enabled.
  1236. if kl.runtimeClassManager != nil {
  1237. kl.runtimeClassManager.Start(wait.NeverStop)
  1238. }
  1239. // Start the pod lifecycle event generator.
  1240. kl.pleg.Start()
  1241. kl.syncLoop(updates, kl)
  1242. }
  1243. // syncPod is the transaction script for the sync of a single pod.
  1244. //
  1245. // Arguments:
  1246. //
  1247. // o - the SyncPodOptions for this invocation
  1248. //
  1249. // The workflow is:
  1250. // * If the pod is being created, record pod worker start latency
  1251. // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
  1252. // * If the pod is being seen as running for the first time, record pod
  1253. // start latency
  1254. // * Update the status of the pod in the status manager
  1255. // * Kill the pod if it should not be running
  1256. // * Create a mirror pod if the pod is a static pod, and does not
  1257. // already have a mirror pod
  1258. // * Create the data directories for the pod if they do not exist
  1259. // * Wait for volumes to attach/mount
  1260. // * Fetch the pull secrets for the pod
  1261. // * Call the container runtime's SyncPod callback
  1262. // * Update the traffic shaping for the pod's ingress and egress limits
  1263. //
  1264. // If any step of this workflow errors, the error is returned, and is repeated
  1265. // on the next syncPod call.
  1266. //
  1267. // This operation writes all events that are dispatched in order to provide
  1268. // the most accurate information possible about an error situation to aid debugging.
  1269. // Callers should not throw an event if this operation returns an error.
  1270. func (kl *Kubelet) syncPod(o syncPodOptions) error {
  1271. // pull out the required options
  1272. pod := o.pod
  1273. mirrorPod := o.mirrorPod
  1274. podStatus := o.podStatus
  1275. updateType := o.updateType
  1276. // if we want to kill a pod, do it now!
  1277. if updateType == kubetypes.SyncPodKill {
  1278. killPodOptions := o.killPodOptions
  1279. if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
  1280. return fmt.Errorf("kill pod options are required if update type is kill")
  1281. }
  1282. apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
  1283. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1284. // we kill the pod with the specified grace period since this is a termination
  1285. if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
  1286. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
  1287. // there was an error killing the pod, so we return that error directly
  1288. utilruntime.HandleError(err)
  1289. return err
  1290. }
  1291. return nil
  1292. }
  1293. // Latency measurements for the main workflow are relative to the
  1294. // first time the pod was seen by the API server.
  1295. var firstSeenTime time.Time
  1296. if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
  1297. firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
  1298. }
  1299. // Record pod worker start latency if being created
  1300. // TODO: make pod workers record their own latencies
  1301. if updateType == kubetypes.SyncPodCreate {
  1302. if !firstSeenTime.IsZero() {
  1303. // This is the first time we are syncing the pod. Record the latency
  1304. // since kubelet first saw the pod if firstSeenTime is set.
  1305. metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
  1306. } else {
  1307. klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
  1308. }
  1309. }
  1310. // Generate final API pod status with pod and status manager status
  1311. apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
  1312. // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
  1313. // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
  1314. // set pod IP to hostIP directly in runtime.GetPodStatus
  1315. podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
  1316. for _, ipInfo := range apiPodStatus.PodIPs {
  1317. podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
  1318. }
  1319. if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
  1320. podStatus.IPs = []string{apiPodStatus.PodIP}
  1321. }
  1322. // Record the time it takes for the pod to become running.
  1323. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
  1324. if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
  1325. !firstSeenTime.IsZero() {
  1326. metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
  1327. }
  1328. runnable := kl.canRunPod(pod)
  1329. if !runnable.Admit {
  1330. // Pod is not runnable; update the Pod and Container statuses to why.
  1331. apiPodStatus.Reason = runnable.Reason
  1332. apiPodStatus.Message = runnable.Message
  1333. // Waiting containers are not creating.
  1334. const waitingReason = "Blocked"
  1335. for _, cs := range apiPodStatus.InitContainerStatuses {
  1336. if cs.State.Waiting != nil {
  1337. cs.State.Waiting.Reason = waitingReason
  1338. }
  1339. }
  1340. for _, cs := range apiPodStatus.ContainerStatuses {
  1341. if cs.State.Waiting != nil {
  1342. cs.State.Waiting.Reason = waitingReason
  1343. }
  1344. }
  1345. }
  1346. // Update status in the status manager
  1347. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1348. // Kill pod if it should not be running
  1349. if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
  1350. var syncErr error
  1351. if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
  1352. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
  1353. syncErr = fmt.Errorf("error killing pod: %v", err)
  1354. utilruntime.HandleError(syncErr)
  1355. } else {
  1356. if !runnable.Admit {
  1357. // There was no error killing the pod, but the pod cannot be run.
  1358. // Return an error to signal that the sync loop should back off.
  1359. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
  1360. }
  1361. }
  1362. return syncErr
  1363. }
  1364. // If the network plugin is not ready, only start the pod if it uses the host network
  1365. if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
  1366. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
  1367. return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
  1368. }
  1369. // Create Cgroups for the pod and apply resource parameters
  1370. // to them if cgroups-per-qos flag is enabled.
  1371. pcm := kl.containerManager.NewPodContainerManager()
  1372. // If pod has already been terminated then we need not create
  1373. // or update the pod's cgroup
  1374. if !kl.podIsTerminated(pod) {
  1375. // When the kubelet is restarted with the cgroups-per-qos
  1376. // flag enabled, all the pod's running containers
  1377. // should be killed intermittently and brought back up
  1378. // under the qos cgroup hierarchy.
  1379. // Check if this is the pod's first sync
  1380. firstSync := true
  1381. for _, containerStatus := range apiPodStatus.ContainerStatuses {
  1382. if containerStatus.State.Running != nil {
  1383. firstSync = false
  1384. break
  1385. }
  1386. }
  1387. // Don't kill containers in pod if pod's cgroups already
  1388. // exists or the pod is running for the first time
  1389. podKilled := false
  1390. if !pcm.Exists(pod) && !firstSync {
  1391. if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
  1392. podKilled = true
  1393. }
  1394. }
  1395. // Create and Update pod's Cgroups
  1396. // Don't create cgroups for run once pod if it was killed above
  1397. // The current policy is not to restart the run once pods when
  1398. // the kubelet is restarted with the new flag as run once pods are
  1399. // expected to run only once and if the kubelet is restarted then
  1400. // they are not expected to run again.
  1401. // We don't create and apply updates to cgroup if its a run once pod and was killed above
  1402. if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
  1403. if !pcm.Exists(pod) {
  1404. if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
  1405. klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
  1406. }
  1407. if err := pcm.EnsureExists(pod); err != nil {
  1408. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
  1409. return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
  1410. }
  1411. }
  1412. }
  1413. }
  1414. // Create Mirror Pod for Static Pod if it doesn't already exist
  1415. if kubetypes.IsStaticPod(pod) {
  1416. podFullName := kubecontainer.GetPodFullName(pod)
  1417. deleted := false
  1418. if mirrorPod != nil {
  1419. if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
  1420. // The mirror pod is semantically different from the static pod. Remove
  1421. // it. The mirror pod will get recreated later.
  1422. klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
  1423. var err error
  1424. deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
  1425. if deleted {
  1426. klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
  1427. } else if err != nil {
  1428. klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
  1429. }
  1430. }
  1431. }
  1432. if mirrorPod == nil || deleted {
  1433. node, err := kl.GetNode()
  1434. if err != nil || node.DeletionTimestamp != nil {
  1435. klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
  1436. } else {
  1437. klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
  1438. if err := kl.podManager.CreateMirrorPod(pod); err != nil {
  1439. klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
  1440. }
  1441. }
  1442. }
  1443. }
  1444. // Make data directories for the pod
  1445. if err := kl.makePodDataDirs(pod); err != nil {
  1446. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
  1447. klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
  1448. return err
  1449. }
  1450. // Volume manager will not mount volumes for terminated pods
  1451. if !kl.podIsTerminated(pod) {
  1452. // Wait for volumes to attach/mount
  1453. if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
  1454. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
  1455. klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
  1456. return err
  1457. }
  1458. }
  1459. // Fetch the pull secrets for the pod
  1460. pullSecrets := kl.getPullSecretsForPod(pod)
  1461. // Call the container runtime's SyncPod callback
  1462. result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
  1463. kl.reasonCache.Update(pod.UID, result)
  1464. if err := result.Error(); err != nil {
  1465. // Do not return error if the only failures were pods in backoff
  1466. for _, r := range result.SyncResults {
  1467. if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
  1468. // Do not record an event here, as we keep all event logging for sync pod failures
  1469. // local to container runtime so we get better errors
  1470. return err
  1471. }
  1472. }
  1473. return nil
  1474. }
  1475. return nil
  1476. }
  1477. // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
  1478. // * pod whose work is ready.
  1479. // * internal modules that request sync of a pod.
  1480. func (kl *Kubelet) getPodsToSync() []*v1.Pod {
  1481. allPods := kl.podManager.GetPods()
  1482. podUIDs := kl.workQueue.GetWork()
  1483. podUIDSet := sets.NewString()
  1484. for _, podUID := range podUIDs {
  1485. podUIDSet.Insert(string(podUID))
  1486. }
  1487. var podsToSync []*v1.Pod
  1488. for _, pod := range allPods {
  1489. if podUIDSet.Has(string(pod.UID)) {
  1490. // The work of the pod is ready
  1491. podsToSync = append(podsToSync, pod)
  1492. continue
  1493. }
  1494. for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
  1495. if podSyncLoopHandler.ShouldSync(pod) {
  1496. podsToSync = append(podsToSync, pod)
  1497. break
  1498. }
  1499. }
  1500. }
  1501. return podsToSync
  1502. }
  1503. // deletePod deletes the pod from the internal state of the kubelet by:
  1504. // 1. stopping the associated pod worker asynchronously
  1505. // 2. signaling to kill the pod by sending on the podKillingCh channel
  1506. //
  1507. // deletePod returns an error if not all sources are ready or the pod is not
  1508. // found in the runtime cache.
  1509. func (kl *Kubelet) deletePod(pod *v1.Pod) error {
  1510. if pod == nil {
  1511. return fmt.Errorf("deletePod does not allow nil pod")
  1512. }
  1513. if !kl.sourcesReady.AllReady() {
  1514. // If the sources aren't ready, skip deletion, as we may accidentally delete pods
  1515. // for sources that haven't reported yet.
  1516. return fmt.Errorf("skipping delete because sources aren't ready yet")
  1517. }
  1518. kl.podWorkers.ForgetWorker(pod.UID)
  1519. // Runtime cache may not have been updated to with the pod, but it's okay
  1520. // because the periodic cleanup routine will attempt to delete again later.
  1521. runningPods, err := kl.runtimeCache.GetPods()
  1522. if err != nil {
  1523. return fmt.Errorf("error listing containers: %v", err)
  1524. }
  1525. runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
  1526. if runningPod.IsEmpty() {
  1527. return fmt.Errorf("pod not found")
  1528. }
  1529. podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
  1530. kl.podKillingCh <- &podPair
  1531. // TODO: delete the mirror pod here?
  1532. // We leave the volume/directory cleanup to the periodic cleanup routine.
  1533. return nil
  1534. }
  1535. // rejectPod records an event about the pod with the given reason and message,
  1536. // and updates the pod to the failed phase in the status manage.
  1537. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
  1538. kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
  1539. kl.statusManager.SetPodStatus(pod, v1.PodStatus{
  1540. Phase: v1.PodFailed,
  1541. Reason: reason,
  1542. Message: "Pod " + message})
  1543. }
  1544. // canAdmitPod determines if a pod can be admitted, and gives a reason if it
  1545. // cannot. "pod" is new pod, while "pods" are all admitted pods
  1546. // The function returns a boolean value indicating whether the pod
  1547. // can be admitted, a brief single-word reason and a message explaining why
  1548. // the pod cannot be admitted.
  1549. func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
  1550. // the kubelet will invoke each pod admit handler in sequence
  1551. // if any handler rejects, the pod is rejected.
  1552. // TODO: move out of disk check into a pod admitter
  1553. // TODO: out of resource eviction should have a pod admitter call-out
  1554. attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
  1555. for _, podAdmitHandler := range kl.admitHandlers {
  1556. if result := podAdmitHandler.Admit(attrs); !result.Admit {
  1557. return false, result.Reason, result.Message
  1558. }
  1559. }
  1560. return true, "", ""
  1561. }
  1562. func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
  1563. attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
  1564. // Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
  1565. attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
  1566. for _, handler := range kl.softAdmitHandlers {
  1567. if result := handler.Admit(attrs); !result.Admit {
  1568. return result
  1569. }
  1570. }
  1571. return lifecycle.PodAdmitResult{Admit: true}
  1572. }
  1573. // syncLoop is the main loop for processing changes. It watches for changes from
  1574. // three channels (file, apiserver, and http) and creates a union of them. For
  1575. // any new change seen, will run a sync against desired state and running state. If
  1576. // no changes are seen to the configuration, will synchronize the last known desired
  1577. // state every sync-frequency seconds. Never returns.
  1578. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  1579. klog.Info("Starting kubelet main sync loop.")
  1580. // The syncTicker wakes up kubelet to checks if there are any pod workers
  1581. // that need to be sync'd. A one-second period is sufficient because the
  1582. // sync interval is defaulted to 10s.
  1583. syncTicker := time.NewTicker(time.Second)
  1584. defer syncTicker.Stop()
  1585. housekeepingTicker := time.NewTicker(housekeepingPeriod)
  1586. defer housekeepingTicker.Stop()
  1587. plegCh := kl.pleg.Watch()
  1588. const (
  1589. base = 100 * time.Millisecond
  1590. max = 5 * time.Second
  1591. factor = 2
  1592. )
  1593. duration := base
  1594. // Responsible for checking limits in resolv.conf
  1595. // The limits do not have anything to do with individual pods
  1596. // Since this is called in syncLoop, we don't need to call it anywhere else
  1597. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  1598. kl.dnsConfigurer.CheckLimitsForResolvConf()
  1599. }
  1600. for {
  1601. if err := kl.runtimeState.runtimeErrors(); err != nil {
  1602. klog.Errorf("skipping pod synchronization - %v", err)
  1603. // exponential backoff
  1604. time.Sleep(duration)
  1605. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
  1606. continue
  1607. }
  1608. // reset backoff if we have a success
  1609. duration = base
  1610. kl.syncLoopMonitor.Store(kl.clock.Now())
  1611. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  1612. break
  1613. }
  1614. kl.syncLoopMonitor.Store(kl.clock.Now())
  1615. }
  1616. }
  1617. // syncLoopIteration reads from various channels and dispatches pods to the
  1618. // given handler.
  1619. //
  1620. // Arguments:
  1621. // 1. configCh: a channel to read config events from
  1622. // 2. handler: the SyncHandler to dispatch pods to
  1623. // 3. syncCh: a channel to read periodic sync events from
  1624. // 4. housekeepingCh: a channel to read housekeeping events from
  1625. // 5. plegCh: a channel to read PLEG updates from
  1626. //
  1627. // Events are also read from the kubelet liveness manager's update channel.
  1628. //
  1629. // The workflow is to read from one of the channels, handle that event, and
  1630. // update the timestamp in the sync loop monitor.
  1631. //
  1632. // Here is an appropriate place to note that despite the syntactical
  1633. // similarity to the switch statement, the case statements in a select are
  1634. // evaluated in a pseudorandom order if there are multiple channels ready to
  1635. // read from when the select is evaluated. In other words, case statements
  1636. // are evaluated in random order, and you can not assume that the case
  1637. // statements evaluate in order if multiple channels have events.
  1638. //
  1639. // With that in mind, in truly no particular order, the different channels
  1640. // are handled as follows:
  1641. //
  1642. // * configCh: dispatch the pods for the config change to the appropriate
  1643. // handler callback for the event type
  1644. // * plegCh: update the runtime cache; sync pod
  1645. // * syncCh: sync all pods waiting for sync
  1646. // * housekeepingCh: trigger cleanup of pods
  1647. // * liveness manager: sync pods that have failed or in which one or more
  1648. // containers have failed liveness checks
  1649. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  1650. syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  1651. select {
  1652. case u, open := <-configCh:
  1653. // Update from a config source; dispatch it to the right handler
  1654. // callback.
  1655. if !open {
  1656. klog.Errorf("Update channel is closed. Exiting the sync loop.")
  1657. return false
  1658. }
  1659. switch u.Op {
  1660. case kubetypes.ADD:
  1661. klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
  1662. // After restarting, kubelet will get all existing pods through
  1663. // ADD as if they are new pods. These pods will then go through the
  1664. // admission process and *may* be rejected. This can be resolved
  1665. // once we have checkpointing.
  1666. handler.HandlePodAdditions(u.Pods)
  1667. case kubetypes.UPDATE:
  1668. klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
  1669. handler.HandlePodUpdates(u.Pods)
  1670. case kubetypes.REMOVE:
  1671. klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
  1672. handler.HandlePodRemoves(u.Pods)
  1673. case kubetypes.RECONCILE:
  1674. klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
  1675. handler.HandlePodReconcile(u.Pods)
  1676. case kubetypes.DELETE:
  1677. klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
  1678. // DELETE is treated as a UPDATE because of graceful deletion.
  1679. handler.HandlePodUpdates(u.Pods)
  1680. case kubetypes.RESTORE:
  1681. klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
  1682. // These are pods restored from the checkpoint. Treat them as new
  1683. // pods.
  1684. handler.HandlePodAdditions(u.Pods)
  1685. case kubetypes.SET:
  1686. // TODO: Do we want to support this?
  1687. klog.Errorf("Kubelet does not support snapshot update")
  1688. }
  1689. if u.Op != kubetypes.RESTORE {
  1690. // If the update type is RESTORE, it means that the update is from
  1691. // the pod checkpoints and may be incomplete. Do not mark the
  1692. // source as ready.
  1693. // Mark the source ready after receiving at least one update from the
  1694. // source. Once all the sources are marked ready, various cleanup
  1695. // routines will start reclaiming resources. It is important that this
  1696. // takes place only after kubelet calls the update handler to process
  1697. // the update to ensure the internal pod cache is up-to-date.
  1698. kl.sourcesReady.AddSource(u.Source)
  1699. }
  1700. case e := <-plegCh:
  1701. if isSyncPodWorthy(e) {
  1702. // PLEG event for a pod; sync it.
  1703. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
  1704. klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
  1705. handler.HandlePodSyncs([]*v1.Pod{pod})
  1706. } else {
  1707. // If the pod no longer exists, ignore the event.
  1708. klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
  1709. }
  1710. }
  1711. if e.Type == pleg.ContainerDied {
  1712. if containerID, ok := e.Data.(string); ok {
  1713. kl.cleanUpContainersInPod(e.ID, containerID)
  1714. }
  1715. }
  1716. case <-syncCh:
  1717. // Sync pods waiting for sync
  1718. podsToSync := kl.getPodsToSync()
  1719. if len(podsToSync) == 0 {
  1720. break
  1721. }
  1722. klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
  1723. handler.HandlePodSyncs(podsToSync)
  1724. case update := <-kl.livenessManager.Updates():
  1725. if update.Result == proberesults.Failure {
  1726. // The liveness manager detected a failure; sync the pod.
  1727. // We should not use the pod from livenessManager, because it is never updated after
  1728. // initialization.
  1729. pod, ok := kl.podManager.GetPodByUID(update.PodUID)
  1730. if !ok {
  1731. // If the pod no longer exists, ignore the update.
  1732. klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
  1733. break
  1734. }
  1735. klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
  1736. handler.HandlePodSyncs([]*v1.Pod{pod})
  1737. }
  1738. case <-housekeepingCh:
  1739. if !kl.sourcesReady.AllReady() {
  1740. // If the sources aren't ready or volume manager has not yet synced the states,
  1741. // skip housekeeping, as we may accidentally delete pods from unready sources.
  1742. klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
  1743. } else {
  1744. klog.V(4).Infof("SyncLoop (housekeeping)")
  1745. if err := handler.HandlePodCleanups(); err != nil {
  1746. klog.Errorf("Failed cleaning pods: %v", err)
  1747. }
  1748. }
  1749. }
  1750. return true
  1751. }
  1752. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
  1753. // If the pod is terminated, dispatchWork
  1754. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
  1755. if kl.podIsTerminated(pod) {
  1756. if pod.DeletionTimestamp != nil {
  1757. // If the pod is in a terminated state, there is no pod worker to
  1758. // handle the work item. Check if the DeletionTimestamp has been
  1759. // set, and force a status update to trigger a pod deletion request
  1760. // to the apiserver.
  1761. kl.statusManager.TerminatePod(pod)
  1762. }
  1763. return
  1764. }
  1765. // Run the sync in an async worker.
  1766. kl.podWorkers.UpdatePod(&UpdatePodOptions{
  1767. Pod: pod,
  1768. MirrorPod: mirrorPod,
  1769. UpdateType: syncType,
  1770. OnCompleteFunc: func(err error) {
  1771. if err != nil {
  1772. metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
  1773. }
  1774. },
  1775. })
  1776. // Note the number of containers for new pods.
  1777. if syncType == kubetypes.SyncPodCreate {
  1778. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
  1779. }
  1780. }
  1781. // TODO: handle mirror pods in a separate component (issue #17251)
  1782. func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
  1783. // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
  1784. // corresponding static pod. Send update to the pod worker if the static
  1785. // pod exists.
  1786. if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
  1787. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  1788. }
  1789. }
  1790. // HandlePodAdditions is the callback in SyncHandler for pods being added from
  1791. // a config source.
  1792. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
  1793. start := kl.clock.Now()
  1794. sort.Sort(sliceutils.PodsByCreationTime(pods))
  1795. for _, pod := range pods {
  1796. existingPods := kl.podManager.GetPods()
  1797. // Always add the pod to the pod manager. Kubelet relies on the pod
  1798. // manager as the source of truth for the desired state. If a pod does
  1799. // not exist in the pod manager, it means that it has been deleted in
  1800. // the apiserver and no action (other than cleanup) is required.
  1801. kl.podManager.AddPod(pod)
  1802. if kubetypes.IsMirrorPod(pod) {
  1803. kl.handleMirrorPod(pod, start)
  1804. continue
  1805. }
  1806. if !kl.podIsTerminated(pod) {
  1807. // Only go through the admission process if the pod is not
  1808. // terminated.
  1809. // We failed pods that we rejected, so activePods include all admitted
  1810. // pods that are alive.
  1811. activePods := kl.filterOutTerminatedPods(existingPods)
  1812. // Check if we can admit the pod; if not, reject it.
  1813. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
  1814. kl.rejectPod(pod, reason, message)
  1815. continue
  1816. }
  1817. }
  1818. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1819. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
  1820. kl.probeManager.AddPod(pod)
  1821. }
  1822. }
  1823. // HandlePodUpdates is the callback in the SyncHandler interface for pods
  1824. // being updated from a config source.
  1825. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
  1826. start := kl.clock.Now()
  1827. for _, pod := range pods {
  1828. kl.podManager.UpdatePod(pod)
  1829. if kubetypes.IsMirrorPod(pod) {
  1830. kl.handleMirrorPod(pod, start)
  1831. continue
  1832. }
  1833. // TODO: Evaluate if we need to validate and reject updates.
  1834. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1835. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  1836. }
  1837. }
  1838. // HandlePodRemoves is the callback in the SyncHandler interface for pods
  1839. // being removed from a config source.
  1840. func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
  1841. start := kl.clock.Now()
  1842. for _, pod := range pods {
  1843. kl.podManager.DeletePod(pod)
  1844. if kubetypes.IsMirrorPod(pod) {
  1845. kl.handleMirrorPod(pod, start)
  1846. continue
  1847. }
  1848. // Deletion is allowed to fail because the periodic cleanup routine
  1849. // will trigger deletion again.
  1850. if err := kl.deletePod(pod); err != nil {
  1851. klog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
  1852. }
  1853. kl.probeManager.RemovePod(pod)
  1854. }
  1855. }
  1856. // HandlePodReconcile is the callback in the SyncHandler interface for pods
  1857. // that should be reconciled.
  1858. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
  1859. start := kl.clock.Now()
  1860. for _, pod := range pods {
  1861. // Update the pod in pod manager, status manager will do periodically reconcile according
  1862. // to the pod manager.
  1863. kl.podManager.UpdatePod(pod)
  1864. // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
  1865. if status.NeedToReconcilePodReadiness(pod) {
  1866. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1867. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
  1868. }
  1869. // After an evicted pod is synced, all dead containers in the pod can be removed.
  1870. if eviction.PodIsEvicted(pod.Status) {
  1871. if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
  1872. kl.containerDeletor.deleteContainersInPod("", podStatus, true)
  1873. }
  1874. }
  1875. }
  1876. }
  1877. // HandlePodSyncs is the callback in the syncHandler interface for pods
  1878. // that should be dispatched to pod workers for sync.
  1879. func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
  1880. start := kl.clock.Now()
  1881. for _, pod := range pods {
  1882. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1883. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
  1884. }
  1885. }
  1886. // LatestLoopEntryTime returns the last time in the sync loop monitor.
  1887. func (kl *Kubelet) LatestLoopEntryTime() time.Time {
  1888. val := kl.syncLoopMonitor.Load()
  1889. if val == nil {
  1890. return time.Time{}
  1891. }
  1892. return val.(time.Time)
  1893. }
  1894. // updateRuntimeUp calls the container runtime status callback, initializing
  1895. // the runtime dependent modules when the container runtime first comes up,
  1896. // and returns an error if the status check fails. If the status check is OK,
  1897. // update the container runtime uptime in the kubelet runtimeState.
  1898. func (kl *Kubelet) updateRuntimeUp() {
  1899. kl.updateRuntimeMux.Lock()
  1900. defer kl.updateRuntimeMux.Unlock()
  1901. s, err := kl.containerRuntime.Status()
  1902. if err != nil {
  1903. klog.Errorf("Container runtime sanity check failed: %v", err)
  1904. return
  1905. }
  1906. if s == nil {
  1907. klog.Errorf("Container runtime status is nil")
  1908. return
  1909. }
  1910. // Periodically log the whole runtime status for debugging.
  1911. // TODO(random-liu): Consider to send node event when optional
  1912. // condition is unmet.
  1913. klog.V(4).Infof("Container runtime status: %v", s)
  1914. networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
  1915. if networkReady == nil || !networkReady.Status {
  1916. klog.Errorf("Container runtime network not ready: %v", networkReady)
  1917. kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
  1918. } else {
  1919. // Set nil if the container runtime network is ready.
  1920. kl.runtimeState.setNetworkState(nil)
  1921. }
  1922. // information in RuntimeReady condition will be propagated to NodeReady condition.
  1923. runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
  1924. // If RuntimeReady is not set or is false, report an error.
  1925. if runtimeReady == nil || !runtimeReady.Status {
  1926. err := fmt.Errorf("Container runtime not ready: %v", runtimeReady)
  1927. klog.Error(err)
  1928. kl.runtimeState.setRuntimeState(err)
  1929. return
  1930. }
  1931. kl.runtimeState.setRuntimeState(nil)
  1932. kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
  1933. kl.runtimeState.setRuntimeSync(kl.clock.Now())
  1934. }
  1935. // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
  1936. func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
  1937. return kl.kubeletConfiguration
  1938. }
  1939. // BirthCry sends an event that the kubelet has started up.
  1940. func (kl *Kubelet) BirthCry() {
  1941. // Make an event that kubelet restarted.
  1942. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
  1943. }
  1944. // ResyncInterval returns the interval used for periodic syncs.
  1945. func (kl *Kubelet) ResyncInterval() time.Duration {
  1946. return kl.resyncInterval
  1947. }
  1948. // ListenAndServe runs the kubelet HTTP server.
  1949. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool) {
  1950. server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
  1951. }
  1952. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
  1953. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
  1954. server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, enableCAdvisorJSONEndpoints)
  1955. }
  1956. // ListenAndServePodResources runs the kubelet podresources grpc service
  1957. func (kl *Kubelet) ListenAndServePodResources() {
  1958. socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
  1959. if err != nil {
  1960. klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
  1961. return
  1962. }
  1963. server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
  1964. }
  1965. // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
  1966. func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
  1967. if podStatus, err := kl.podCache.Get(podID); err == nil {
  1968. removeAll := false
  1969. if syncedPod, ok := kl.podManager.GetPodByUID(podID); ok {
  1970. // generate the api status using the cached runtime status to get up-to-date ContainerStatuses
  1971. apiPodStatus := kl.generateAPIPodStatus(syncedPod, podStatus)
  1972. // When an evicted or deleted pod has already synced, all containers can be removed.
  1973. removeAll = eviction.PodIsEvicted(syncedPod.Status) || (syncedPod.DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses))
  1974. }
  1975. kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
  1976. }
  1977. }
  1978. // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
  1979. // is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
  1980. // a runtime update and a node status update. Function returns after one successful node status update.
  1981. // Function is executed only during Kubelet start which improves latency to ready node by updating
  1982. // pod CIDR, runtime status and node statuses ASAP.
  1983. func (kl *Kubelet) fastStatusUpdateOnce() {
  1984. for {
  1985. time.Sleep(100 * time.Millisecond)
  1986. node, err := kl.GetNode()
  1987. if err != nil {
  1988. klog.Errorf(err.Error())
  1989. continue
  1990. }
  1991. if len(node.Spec.PodCIDRs) != 0 {
  1992. podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
  1993. if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
  1994. klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
  1995. continue
  1996. }
  1997. kl.updateRuntimeUp()
  1998. kl.syncNodeStatus()
  1999. return
  2000. }
  2001. }
  2002. }
  2003. // isSyncPodWorthy filters out events that are not worthy of pod syncing
  2004. func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
  2005. // ContainerRemoved doesn't affect pod state
  2006. return event.Type != pleg.ContainerRemoved
  2007. }
  2008. // Gets the streaming server configuration to use with in-process CRI shims.
  2009. func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
  2010. config := &streaming.Config{
  2011. StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
  2012. StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
  2013. SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
  2014. SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
  2015. }
  2016. if !crOptions.RedirectContainerStreaming {
  2017. config.Addr = net.JoinHostPort("localhost", "0")
  2018. } else {
  2019. // Use a relative redirect (no scheme or host).
  2020. config.BaseURL = &url.URL{
  2021. Path: "/cri/",
  2022. }
  2023. if kubeDeps.TLSOptions != nil {
  2024. config.TLSConfig = kubeDeps.TLSOptions.Config
  2025. }
  2026. }
  2027. return config
  2028. }