kubelet.go 88 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "fmt"
  18. "math"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "path"
  24. "sort"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. cadvisorapi "github.com/google/cadvisor/info/v1"
  29. v1 "k8s.io/api/core/v1"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/labels"
  33. "k8s.io/apimachinery/pkg/types"
  34. "k8s.io/apimachinery/pkg/util/clock"
  35. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  36. "k8s.io/apimachinery/pkg/util/sets"
  37. "k8s.io/apimachinery/pkg/util/wait"
  38. utilfeature "k8s.io/apiserver/pkg/util/feature"
  39. clientset "k8s.io/client-go/kubernetes"
  40. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  41. corelisters "k8s.io/client-go/listers/core/v1"
  42. "k8s.io/client-go/tools/cache"
  43. "k8s.io/client-go/tools/record"
  44. "k8s.io/client-go/util/certificate"
  45. "k8s.io/client-go/util/flowcontrol"
  46. cloudprovider "k8s.io/cloud-provider"
  47. internalapi "k8s.io/cri-api/pkg/apis"
  48. "k8s.io/klog"
  49. api "k8s.io/kubernetes/pkg/apis/core"
  50. "k8s.io/kubernetes/pkg/features"
  51. kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
  52. pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  53. "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
  54. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  55. kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
  56. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  57. "k8s.io/kubernetes/pkg/kubelet/cloudresource"
  58. "k8s.io/kubernetes/pkg/kubelet/cm"
  59. "k8s.io/kubernetes/pkg/kubelet/config"
  60. "k8s.io/kubernetes/pkg/kubelet/configmap"
  61. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  62. "k8s.io/kubernetes/pkg/kubelet/dockershim"
  63. dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
  64. "k8s.io/kubernetes/pkg/kubelet/events"
  65. "k8s.io/kubernetes/pkg/kubelet/eviction"
  66. "k8s.io/kubernetes/pkg/kubelet/images"
  67. "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
  68. "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
  69. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  70. "k8s.io/kubernetes/pkg/kubelet/logs"
  71. "k8s.io/kubernetes/pkg/kubelet/metrics"
  72. "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
  73. "k8s.io/kubernetes/pkg/kubelet/network/dns"
  74. "k8s.io/kubernetes/pkg/kubelet/nodelease"
  75. oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
  76. "k8s.io/kubernetes/pkg/kubelet/pleg"
  77. "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
  78. plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  79. kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
  80. "k8s.io/kubernetes/pkg/kubelet/preemption"
  81. "k8s.io/kubernetes/pkg/kubelet/prober"
  82. proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
  83. "k8s.io/kubernetes/pkg/kubelet/remote"
  84. "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
  85. "k8s.io/kubernetes/pkg/kubelet/secret"
  86. "k8s.io/kubernetes/pkg/kubelet/server"
  87. servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
  88. serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
  89. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  90. "k8s.io/kubernetes/pkg/kubelet/stats"
  91. "k8s.io/kubernetes/pkg/kubelet/status"
  92. "k8s.io/kubernetes/pkg/kubelet/sysctl"
  93. "k8s.io/kubernetes/pkg/kubelet/token"
  94. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  95. "k8s.io/kubernetes/pkg/kubelet/util"
  96. "k8s.io/kubernetes/pkg/kubelet/util/format"
  97. "k8s.io/kubernetes/pkg/kubelet/util/manager"
  98. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  99. "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
  100. "k8s.io/kubernetes/pkg/kubelet/volumemanager"
  101. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  102. "k8s.io/kubernetes/pkg/security/apparmor"
  103. sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
  104. utildbus "k8s.io/kubernetes/pkg/util/dbus"
  105. utilipt "k8s.io/kubernetes/pkg/util/iptables"
  106. "k8s.io/kubernetes/pkg/util/mount"
  107. nodeutil "k8s.io/kubernetes/pkg/util/node"
  108. "k8s.io/kubernetes/pkg/util/oom"
  109. "k8s.io/kubernetes/pkg/util/selinux"
  110. "k8s.io/kubernetes/pkg/volume"
  111. "k8s.io/kubernetes/pkg/volume/csi"
  112. "k8s.io/kubernetes/pkg/volume/util/subpath"
  113. utilexec "k8s.io/utils/exec"
  114. "k8s.io/utils/integer"
  115. )
  116. const (
  117. // Max amount of time to wait for the container runtime to come up.
  118. maxWaitForContainerRuntime = 30 * time.Second
  119. // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
  120. nodeStatusUpdateRetry = 5
  121. // ContainerLogsDir is the location of container logs.
  122. ContainerLogsDir = "/var/log/containers"
  123. // MaxContainerBackOff is the max backoff period, exported for the e2e test
  124. MaxContainerBackOff = 300 * time.Second
  125. // Capacity of the channel for storing pods to kill. A small number should
  126. // suffice because a goroutine is dedicated to check the channel and does
  127. // not block on anything else.
  128. podKillingChannelCapacity = 50
  129. // Period for performing global cleanup tasks.
  130. housekeepingPeriod = time.Second * 2
  131. // Period for performing eviction monitoring.
  132. // TODO ensure this is in sync with internal cadvisor housekeeping.
  133. evictionMonitoringPeriod = time.Second * 10
  134. // The path in containers' filesystems where the hosts file is mounted.
  135. etcHostsPath = "/etc/hosts"
  136. // Capacity of the channel for receiving pod lifecycle events. This number
  137. // is a bit arbitrary and may be adjusted in the future.
  138. plegChannelCapacity = 1000
  139. // Generic PLEG relies on relisting for discovering container events.
  140. // A longer period means that kubelet will take longer to detect container
  141. // changes and to update pod status. On the other hand, a shorter period
  142. // will cause more frequent relisting (e.g., container runtime operations),
  143. // leading to higher cpu usage.
  144. // Note that even though we set the period to 1s, the relisting itself can
  145. // take more than 1s to finish if the container runtime responds slowly
  146. // and/or when there are many container changes in one cycle.
  147. plegRelistPeriod = time.Second * 1
  148. // backOffPeriod is the period to back off when pod syncing results in an
  149. // error. It is also used as the base period for the exponential backoff
  150. // container restarts and image pulls.
  151. backOffPeriod = time.Second * 10
  152. // ContainerGCPeriod is the period for performing container garbage collection.
  153. ContainerGCPeriod = time.Minute
  154. // ImageGCPeriod is the period for performing image garbage collection.
  155. ImageGCPeriod = 5 * time.Minute
  156. // Minimum number of dead containers to keep in a pod
  157. minDeadContainerInPod = 1
  158. )
  159. // SyncHandler is an interface implemented by Kubelet, for testability
  160. type SyncHandler interface {
  161. HandlePodAdditions(pods []*v1.Pod)
  162. HandlePodUpdates(pods []*v1.Pod)
  163. HandlePodRemoves(pods []*v1.Pod)
  164. HandlePodReconcile(pods []*v1.Pod)
  165. HandlePodSyncs(pods []*v1.Pod)
  166. HandlePodCleanups() error
  167. }
  168. // Option is a functional option type for Kubelet
  169. type Option func(*Kubelet)
  170. // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
  171. type Bootstrap interface {
  172. GetConfiguration() kubeletconfiginternal.KubeletConfiguration
  173. BirthCry()
  174. StartGarbageCollection()
  175. ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool)
  176. ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
  177. ListenAndServePodResources()
  178. Run(<-chan kubetypes.PodUpdate)
  179. RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
  180. }
  181. // Builder creates and initializes a Kubelet instance
  182. type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  183. kubeDeps *Dependencies,
  184. crOptions *config.ContainerRuntimeOptions,
  185. containerRuntime string,
  186. runtimeCgroups string,
  187. hostnameOverride string,
  188. nodeIP string,
  189. providerID string,
  190. cloudProvider string,
  191. certDirectory string,
  192. rootDirectory string,
  193. registerNode bool,
  194. registerWithTaints []api.Taint,
  195. allowedUnsafeSysctls []string,
  196. remoteRuntimeEndpoint string,
  197. remoteImageEndpoint string,
  198. experimentalMounterPath string,
  199. experimentalKernelMemcgNotification bool,
  200. experimentalCheckNodeCapabilitiesBeforeMount bool,
  201. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  202. minimumGCAge metav1.Duration,
  203. maxPerPodContainerCount int32,
  204. maxContainerCount int32,
  205. masterServiceNamespace string,
  206. registerSchedulable bool,
  207. nonMasqueradeCIDR string,
  208. keepTerminatedPodVolumes bool,
  209. nodeLabels map[string]string,
  210. seccompProfileRoot string,
  211. bootstrapCheckpointPath string,
  212. nodeStatusMaxImages int32) (Bootstrap, error)
  213. // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
  214. // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
  215. // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
  216. type Dependencies struct {
  217. Options []Option
  218. // Injected Dependencies
  219. Auth server.AuthInterface
  220. CAdvisorInterface cadvisor.Interface
  221. Cloud cloudprovider.Interface
  222. ContainerManager cm.ContainerManager
  223. DockerClientConfig *dockershim.ClientConfig
  224. EventClient v1core.EventsGetter
  225. HeartbeatClient clientset.Interface
  226. OnHeartbeatFailure func()
  227. KubeClient clientset.Interface
  228. Mounter mount.Interface
  229. OOMAdjuster *oom.OOMAdjuster
  230. OSInterface kubecontainer.OSInterface
  231. PodConfig *config.PodConfig
  232. Recorder record.EventRecorder
  233. Subpather subpath.Interface
  234. VolumePlugins []volume.VolumePlugin
  235. DynamicPluginProber volume.DynamicPluginProber
  236. TLSOptions *server.TLSOptions
  237. KubeletConfigController *kubeletconfig.Controller
  238. }
  239. // makePodSourceConfig creates a config.PodConfig from the given
  240. // KubeletConfiguration or returns an error.
  241. func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
  242. manifestURLHeader := make(http.Header)
  243. if len(kubeCfg.StaticPodURLHeader) > 0 {
  244. for k, v := range kubeCfg.StaticPodURLHeader {
  245. for i := range v {
  246. manifestURLHeader.Add(k, v[i])
  247. }
  248. }
  249. }
  250. // source of all configuration
  251. cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
  252. // define file config source
  253. if kubeCfg.StaticPodPath != "" {
  254. klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
  255. config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
  256. }
  257. // define url config source
  258. if kubeCfg.StaticPodURL != "" {
  259. klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
  260. config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
  261. }
  262. // Restore from the checkpoint path
  263. // NOTE: This MUST happen before creating the apiserver source
  264. // below, or the checkpoint would override the source of truth.
  265. var updatechannel chan<- interface{}
  266. if bootstrapCheckpointPath != "" {
  267. klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
  268. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  269. err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
  270. if err != nil {
  271. return nil, err
  272. }
  273. }
  274. if kubeDeps.KubeClient != nil {
  275. klog.Infof("Watching apiserver")
  276. if updatechannel == nil {
  277. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  278. }
  279. config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
  280. }
  281. return cfg, nil
  282. }
  283. func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
  284. rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
  285. if err != nil {
  286. return nil, nil, err
  287. }
  288. is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
  289. if err != nil {
  290. return nil, nil, err
  291. }
  292. return rs, is, err
  293. }
  294. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
  295. // No initialization of Kubelet and its modules should happen here.
  296. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
  297. kubeDeps *Dependencies,
  298. crOptions *config.ContainerRuntimeOptions,
  299. containerRuntime string,
  300. runtimeCgroups string,
  301. hostnameOverride string,
  302. nodeIP string,
  303. providerID string,
  304. cloudProvider string,
  305. certDirectory string,
  306. rootDirectory string,
  307. registerNode bool,
  308. registerWithTaints []api.Taint,
  309. allowedUnsafeSysctls []string,
  310. remoteRuntimeEndpoint string,
  311. remoteImageEndpoint string,
  312. experimentalMounterPath string,
  313. experimentalKernelMemcgNotification bool,
  314. experimentalCheckNodeCapabilitiesBeforeMount bool,
  315. experimentalNodeAllocatableIgnoreEvictionThreshold bool,
  316. minimumGCAge metav1.Duration,
  317. maxPerPodContainerCount int32,
  318. maxContainerCount int32,
  319. masterServiceNamespace string,
  320. registerSchedulable bool,
  321. nonMasqueradeCIDR string,
  322. keepTerminatedPodVolumes bool,
  323. nodeLabels map[string]string,
  324. seccompProfileRoot string,
  325. bootstrapCheckpointPath string,
  326. nodeStatusMaxImages int32) (*Kubelet, error) {
  327. if rootDirectory == "" {
  328. return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
  329. }
  330. if kubeCfg.SyncFrequency.Duration <= 0 {
  331. return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
  332. }
  333. if kubeCfg.MakeIPTablesUtilChains {
  334. if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
  335. return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
  336. }
  337. if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
  338. return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
  339. }
  340. if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
  341. return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
  342. }
  343. }
  344. hostname, err := nodeutil.GetHostname(hostnameOverride)
  345. if err != nil {
  346. return nil, err
  347. }
  348. // Query the cloud provider for our node name, default to hostname
  349. nodeName := types.NodeName(hostname)
  350. if kubeDeps.Cloud != nil {
  351. var err error
  352. instances, ok := kubeDeps.Cloud.Instances()
  353. if !ok {
  354. return nil, fmt.Errorf("failed to get instances from cloud provider")
  355. }
  356. nodeName, err = instances.CurrentNodeName(context.TODO(), hostname)
  357. if err != nil {
  358. return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
  359. }
  360. klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  361. }
  362. if kubeDeps.PodConfig == nil {
  363. var err error
  364. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
  365. if err != nil {
  366. return nil, err
  367. }
  368. }
  369. containerGCPolicy := kubecontainer.ContainerGCPolicy{
  370. MinAge: minimumGCAge.Duration,
  371. MaxPerPodContainer: int(maxPerPodContainerCount),
  372. MaxContainers: int(maxContainerCount),
  373. }
  374. daemonEndpoints := &v1.NodeDaemonEndpoints{
  375. KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
  376. }
  377. imageGCPolicy := images.ImageGCPolicy{
  378. MinAge: kubeCfg.ImageMinimumGCAge.Duration,
  379. HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
  380. LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
  381. }
  382. enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
  383. if experimentalNodeAllocatableIgnoreEvictionThreshold {
  384. // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
  385. enforceNodeAllocatable = []string{}
  386. }
  387. thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
  388. if err != nil {
  389. return nil, err
  390. }
  391. evictionConfig := eviction.Config{
  392. PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
  393. MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
  394. Thresholds: thresholds,
  395. KernelMemcgNotification: experimentalKernelMemcgNotification,
  396. PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
  397. }
  398. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  399. if kubeDeps.KubeClient != nil {
  400. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  401. r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
  402. go r.Run(wait.NeverStop)
  403. }
  404. serviceLister := corelisters.NewServiceLister(serviceIndexer)
  405. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
  406. if kubeDeps.KubeClient != nil {
  407. fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
  408. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  409. r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
  410. go r.Run(wait.NeverStop)
  411. }
  412. nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
  413. // TODO: get the real node object of ourself,
  414. // and use the real node name and UID.
  415. // TODO: what is namespace for node?
  416. nodeRef := &v1.ObjectReference{
  417. Kind: "Node",
  418. Name: string(nodeName),
  419. UID: types.UID(nodeName),
  420. Namespace: "",
  421. }
  422. containerRefManager := kubecontainer.NewRefManager()
  423. oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
  424. clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
  425. for _, ipEntry := range kubeCfg.ClusterDNS {
  426. ip := net.ParseIP(ipEntry)
  427. if ip == nil {
  428. klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
  429. } else {
  430. clusterDNS = append(clusterDNS, ip)
  431. }
  432. }
  433. httpClient := &http.Client{}
  434. parsedNodeIP := net.ParseIP(nodeIP)
  435. protocol := utilipt.ProtocolIpv4
  436. if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
  437. klog.V(0).Infof("IPv6 node IP (%s), assume IPv6 operation", nodeIP)
  438. protocol = utilipt.ProtocolIpv6
  439. }
  440. klet := &Kubelet{
  441. hostname: hostname,
  442. hostnameOverridden: len(hostnameOverride) > 0,
  443. nodeName: nodeName,
  444. kubeClient: kubeDeps.KubeClient,
  445. heartbeatClient: kubeDeps.HeartbeatClient,
  446. onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
  447. rootDirectory: rootDirectory,
  448. resyncInterval: kubeCfg.SyncFrequency.Duration,
  449. sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
  450. registerNode: registerNode,
  451. registerWithTaints: registerWithTaints,
  452. registerSchedulable: registerSchedulable,
  453. dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
  454. serviceLister: serviceLister,
  455. nodeInfo: nodeInfo,
  456. masterServiceNamespace: masterServiceNamespace,
  457. streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
  458. recorder: kubeDeps.Recorder,
  459. cadvisor: kubeDeps.CAdvisorInterface,
  460. cloud: kubeDeps.Cloud,
  461. externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
  462. providerID: providerID,
  463. nodeRef: nodeRef,
  464. nodeLabels: nodeLabels,
  465. nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
  466. nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
  467. os: kubeDeps.OSInterface,
  468. oomWatcher: oomWatcher,
  469. cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
  470. cgroupRoot: kubeCfg.CgroupRoot,
  471. mounter: kubeDeps.Mounter,
  472. subpather: kubeDeps.Subpather,
  473. maxPods: int(kubeCfg.MaxPods),
  474. podsPerCore: int(kubeCfg.PodsPerCore),
  475. syncLoopMonitor: atomic.Value{},
  476. daemonEndpoints: daemonEndpoints,
  477. containerManager: kubeDeps.ContainerManager,
  478. containerRuntimeName: containerRuntime,
  479. redirectContainerStreaming: crOptions.RedirectContainerStreaming,
  480. nodeIP: parsedNodeIP,
  481. nodeIPValidator: validateNodeIP,
  482. clock: clock.RealClock{},
  483. enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
  484. iptClient: utilipt.New(utilexec.New(), utildbus.New(), protocol),
  485. makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
  486. iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
  487. iptablesDropBit: int(kubeCfg.IPTablesDropBit),
  488. experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
  489. keepTerminatedPodVolumes: keepTerminatedPodVolumes,
  490. nodeStatusMaxImages: nodeStatusMaxImages,
  491. enablePluginsWatcher: utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher),
  492. }
  493. if klet.cloud != nil {
  494. klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
  495. }
  496. var secretManager secret.Manager
  497. var configMapManager configmap.Manager
  498. switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
  499. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  500. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  501. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  502. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  503. secretManager = secret.NewCachingSecretManager(
  504. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  505. configMapManager = configmap.NewCachingConfigMapManager(
  506. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  507. case kubeletconfiginternal.GetChangeDetectionStrategy:
  508. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  509. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  510. default:
  511. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  512. }
  513. klet.secretManager = secretManager
  514. klet.configMapManager = configMapManager
  515. if klet.experimentalHostUserNamespaceDefaulting {
  516. klog.Infof("Experimental host user namespace defaulting is enabled.")
  517. }
  518. machineInfo, err := klet.cadvisor.MachineInfo()
  519. if err != nil {
  520. return nil, err
  521. }
  522. klet.machineInfo = machineInfo
  523. imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  524. klet.livenessManager = proberesults.NewManager()
  525. klet.podCache = kubecontainer.NewCache()
  526. var checkpointManager checkpointmanager.CheckpointManager
  527. if bootstrapCheckpointPath != "" {
  528. checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
  529. if err != nil {
  530. return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
  531. }
  532. }
  533. // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
  534. klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
  535. klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
  536. if remoteRuntimeEndpoint != "" {
  537. // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
  538. if remoteImageEndpoint == "" {
  539. remoteImageEndpoint = remoteRuntimeEndpoint
  540. }
  541. }
  542. // TODO: These need to become arguments to a standalone docker shim.
  543. pluginSettings := dockershim.NetworkPluginSettings{
  544. HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
  545. NonMasqueradeCIDR: nonMasqueradeCIDR,
  546. PluginName: crOptions.NetworkPluginName,
  547. PluginConfDir: crOptions.CNIConfDir,
  548. PluginBinDirString: crOptions.CNIBinDir,
  549. MTU: int(crOptions.NetworkPluginMTU),
  550. }
  551. klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
  552. // if left at nil, that means it is unneeded
  553. var legacyLogProvider kuberuntime.LegacyLogProvider
  554. switch containerRuntime {
  555. case kubetypes.DockerContainerRuntime:
  556. // Create and start the CRI shim running as a grpc server.
  557. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
  558. ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
  559. &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
  560. if err != nil {
  561. return nil, err
  562. }
  563. if crOptions.RedirectContainerStreaming {
  564. klet.criHandler = ds
  565. }
  566. // The unix socket for kubelet <-> dockershim communication.
  567. klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
  568. remoteRuntimeEndpoint,
  569. remoteImageEndpoint)
  570. klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  571. server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
  572. if err := server.Start(); err != nil {
  573. return nil, err
  574. }
  575. // Create dockerLegacyService when the logging driver is not supported.
  576. supported, err := ds.IsCRISupportedLogDriver()
  577. if err != nil {
  578. return nil, err
  579. }
  580. if !supported {
  581. klet.dockerLegacyService = ds
  582. legacyLogProvider = ds
  583. }
  584. case kubetypes.RemoteContainerRuntime:
  585. // No-op.
  586. break
  587. default:
  588. return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
  589. }
  590. runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
  591. if err != nil {
  592. return nil, err
  593. }
  594. klet.runtimeService = runtimeService
  595. if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
  596. klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
  597. }
  598. runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
  599. kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
  600. klet.livenessManager,
  601. seccompProfileRoot,
  602. containerRefManager,
  603. machineInfo,
  604. klet,
  605. kubeDeps.OSInterface,
  606. klet,
  607. httpClient,
  608. imageBackOff,
  609. kubeCfg.SerializeImagePulls,
  610. float32(kubeCfg.RegistryPullQPS),
  611. int(kubeCfg.RegistryBurst),
  612. kubeCfg.CPUCFSQuota,
  613. kubeCfg.CPUCFSQuotaPeriod,
  614. runtimeService,
  615. imageService,
  616. kubeDeps.ContainerManager.InternalContainerLifecycle(),
  617. legacyLogProvider,
  618. klet.runtimeClassManager,
  619. )
  620. if err != nil {
  621. return nil, err
  622. }
  623. klet.containerRuntime = runtime
  624. klet.streamingRuntime = runtime
  625. klet.runner = runtime
  626. runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
  627. if err != nil {
  628. return nil, err
  629. }
  630. klet.runtimeCache = runtimeCache
  631. if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
  632. klet.StatsProvider = stats.NewCadvisorStatsProvider(
  633. klet.cadvisor,
  634. klet.resourceAnalyzer,
  635. klet.podManager,
  636. klet.runtimeCache,
  637. klet.containerRuntime,
  638. klet.statusManager)
  639. } else {
  640. klet.StatsProvider = stats.NewCRIStatsProvider(
  641. klet.cadvisor,
  642. klet.resourceAnalyzer,
  643. klet.podManager,
  644. klet.runtimeCache,
  645. runtimeService,
  646. imageService,
  647. stats.NewLogMetricsService(),
  648. kubecontainer.RealOS{})
  649. }
  650. klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
  651. klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
  652. klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
  653. if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
  654. klog.Errorf("Pod CIDR update failed %v", err)
  655. }
  656. // setup containerGC
  657. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
  658. if err != nil {
  659. return nil, err
  660. }
  661. klet.containerGC = containerGC
  662. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
  663. // setup imageManager
  664. imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
  665. if err != nil {
  666. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  667. }
  668. klet.imageManager = imageManager
  669. if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
  670. // setup containerLogManager for CRI container runtime
  671. containerLogManager, err := logs.NewContainerLogManager(
  672. klet.runtimeService,
  673. kubeCfg.ContainerLogMaxSize,
  674. int(kubeCfg.ContainerLogMaxFiles),
  675. )
  676. if err != nil {
  677. return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
  678. }
  679. klet.containerLogManager = containerLogManager
  680. } else {
  681. klet.containerLogManager = logs.NewStubContainerLogManager()
  682. }
  683. if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
  684. klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
  685. if err != nil {
  686. return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
  687. }
  688. kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
  689. cert := klet.serverCertificateManager.Current()
  690. if cert == nil {
  691. return nil, fmt.Errorf("no serving certificate available for the kubelet")
  692. }
  693. return cert, nil
  694. }
  695. }
  696. klet.probeManager = prober.NewManager(
  697. klet.statusManager,
  698. klet.livenessManager,
  699. klet.runner,
  700. containerRefManager,
  701. kubeDeps.Recorder)
  702. tokenManager := token.NewManager(kubeDeps.KubeClient)
  703. // NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
  704. // which affects node ready status. This function must be called before Kubelet is initialized so that the Node
  705. // ReadyState is accurate with the storage state.
  706. klet.volumePluginMgr, err =
  707. NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
  708. if err != nil {
  709. return nil, err
  710. }
  711. if klet.enablePluginsWatcher {
  712. klet.pluginManager = pluginmanager.NewPluginManager(
  713. klet.getPluginsRegistrationDir(), /* sockDir */
  714. klet.getPluginsDir(), /* deprecatedSockDir */
  715. kubeDeps.Recorder,
  716. )
  717. }
  718. // If the experimentalMounterPathFlag is set, we do not want to
  719. // check node capabilities since the mount path is not the default
  720. if len(experimentalMounterPath) != 0 {
  721. experimentalCheckNodeCapabilitiesBeforeMount = false
  722. // Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
  723. // so that service name could be resolved
  724. klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
  725. }
  726. // setup volumeManager
  727. klet.volumeManager = volumemanager.NewVolumeManager(
  728. kubeCfg.EnableControllerAttachDetach,
  729. nodeName,
  730. klet.podManager,
  731. klet.statusManager,
  732. klet.kubeClient,
  733. klet.volumePluginMgr,
  734. klet.containerRuntime,
  735. kubeDeps.Mounter,
  736. klet.getPodsDir(),
  737. kubeDeps.Recorder,
  738. experimentalCheckNodeCapabilitiesBeforeMount,
  739. keepTerminatedPodVolumes)
  740. klet.reasonCache = NewReasonCache()
  741. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  742. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
  743. klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  744. klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
  745. // setup eviction manager
  746. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  747. klet.evictionManager = evictionManager
  748. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  749. if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
  750. // add sysctl admission
  751. runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
  752. if err != nil {
  753. return nil, err
  754. }
  755. // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec.
  756. // Hence, we concatenate those two lists.
  757. safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
  758. sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
  759. if err != nil {
  760. return nil, err
  761. }
  762. klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
  763. klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
  764. }
  765. // enable active deadline handler
  766. activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
  767. if err != nil {
  768. return nil, err
  769. }
  770. klet.AddPodSyncLoopHandler(activeDeadlineHandler)
  771. klet.AddPodSyncHandler(activeDeadlineHandler)
  772. criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
  773. klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
  774. // apply functional Option's
  775. for _, opt := range kubeDeps.Options {
  776. opt(klet)
  777. }
  778. klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
  779. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
  780. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
  781. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  782. klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, kubeCfg.NodeStatusUpdateFrequency.Duration, klet.onRepeatedHeartbeatFailure)
  783. }
  784. klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
  785. // Finally, put the most recent version of the config on the Kubelet, so
  786. // people can see how it was configured.
  787. klet.kubeletConfiguration = *kubeCfg
  788. // Generating the status funcs should be the last thing we do,
  789. // since this relies on the rest of the Kubelet having been constructed.
  790. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
  791. return klet, nil
  792. }
  793. type serviceLister interface {
  794. List(labels.Selector) ([]*v1.Service, error)
  795. }
  796. // Kubelet is the main kubelet implementation.
  797. type Kubelet struct {
  798. kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
  799. // hostname is the hostname the kubelet detected or was given via flag/config
  800. hostname string
  801. // hostnameOverridden indicates the hostname was overridden via flag/config
  802. hostnameOverridden bool
  803. nodeName types.NodeName
  804. runtimeCache kubecontainer.RuntimeCache
  805. kubeClient clientset.Interface
  806. heartbeatClient clientset.Interface
  807. iptClient utilipt.Interface
  808. rootDirectory string
  809. lastObservedNodeAddressesMux sync.RWMutex
  810. lastObservedNodeAddresses []v1.NodeAddress
  811. // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
  812. onRepeatedHeartbeatFailure func()
  813. // podWorkers handle syncing Pods in response to events.
  814. podWorkers PodWorkers
  815. // resyncInterval is the interval between periodic full reconciliations of
  816. // pods on this node.
  817. resyncInterval time.Duration
  818. // sourcesReady records the sources seen by the kubelet, it is thread-safe.
  819. sourcesReady config.SourcesReady
  820. // podManager is a facade that abstracts away the various sources of pods
  821. // this Kubelet services.
  822. podManager kubepod.Manager
  823. // Needed to observe and respond to situations that could impact node stability
  824. evictionManager eviction.Manager
  825. // Optional, defaults to /logs/ from /var/log
  826. logServer http.Handler
  827. // Optional, defaults to simple Docker implementation
  828. runner kubecontainer.ContainerCommandRunner
  829. // cAdvisor used for container information.
  830. cadvisor cadvisor.Interface
  831. // Set to true to have the node register itself with the apiserver.
  832. registerNode bool
  833. // List of taints to add to a node object when the kubelet registers itself.
  834. registerWithTaints []api.Taint
  835. // Set to true to have the node register itself as schedulable.
  836. registerSchedulable bool
  837. // for internal book keeping; access only from within registerWithApiserver
  838. registrationCompleted bool
  839. // dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
  840. dnsConfigurer *dns.Configurer
  841. // masterServiceNamespace is the namespace that the master service is exposed in.
  842. masterServiceNamespace string
  843. // serviceLister knows how to list services
  844. serviceLister serviceLister
  845. // nodeInfo knows how to get information about the node for this kubelet.
  846. nodeInfo predicates.NodeInfo
  847. // a list of node labels to register
  848. nodeLabels map[string]string
  849. // Last timestamp when runtime responded on ping.
  850. // Mutex is used to protect this value.
  851. runtimeState *runtimeState
  852. // Volume plugins.
  853. volumePluginMgr *volume.VolumePluginMgr
  854. // Handles container probing.
  855. probeManager prober.Manager
  856. // Manages container health check results.
  857. livenessManager proberesults.Manager
  858. // How long to keep idle streaming command execution/port forwarding
  859. // connections open before terminating them
  860. streamingConnectionIdleTimeout time.Duration
  861. // The EventRecorder to use
  862. recorder record.EventRecorder
  863. // Policy for handling garbage collection of dead containers.
  864. containerGC kubecontainer.ContainerGC
  865. // Manager for image garbage collection.
  866. imageManager images.ImageGCManager
  867. // Manager for container logs.
  868. containerLogManager logs.ContainerLogManager
  869. // Secret manager.
  870. secretManager secret.Manager
  871. // ConfigMap manager.
  872. configMapManager configmap.Manager
  873. // Cached MachineInfo returned by cadvisor.
  874. machineInfo *cadvisorapi.MachineInfo
  875. // Handles certificate rotations.
  876. serverCertificateManager certificate.Manager
  877. // Syncs pods statuses with apiserver; also used as a cache of statuses.
  878. statusManager status.Manager
  879. // VolumeManager runs a set of asynchronous loops that figure out which
  880. // volumes need to be attached/mounted/unmounted/detached based on the pods
  881. // scheduled on this node and makes it so.
  882. volumeManager volumemanager.VolumeManager
  883. // Cloud provider interface.
  884. cloud cloudprovider.Interface
  885. // Handles requests to cloud provider with timeout
  886. cloudResourceSyncManager cloudresource.SyncManager
  887. // Indicates that the node initialization happens in an external cloud controller
  888. externalCloudProvider bool
  889. // Reference to this node.
  890. nodeRef *v1.ObjectReference
  891. // The name of the container runtime
  892. containerRuntimeName string
  893. // redirectContainerStreaming enables container streaming redirect.
  894. redirectContainerStreaming bool
  895. // Container runtime.
  896. containerRuntime kubecontainer.Runtime
  897. // Streaming runtime handles container streaming.
  898. streamingRuntime kubecontainer.StreamingRuntime
  899. // Container runtime service (needed by container runtime Start()).
  900. // TODO(CD): try to make this available without holding a reference in this
  901. // struct. For example, by adding a getter to generic runtime.
  902. runtimeService internalapi.RuntimeService
  903. // reasonCache caches the failure reason of the last creation of all containers, which is
  904. // used for generating ContainerStatus.
  905. reasonCache *ReasonCache
  906. // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
  907. // feature is not enabled, it is also the frequency that kubelet posts node status to master.
  908. // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
  909. // in nodecontroller. There are several constraints:
  910. // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
  911. // N means number of retries allowed for kubelet to post node status. It is pointless
  912. // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
  913. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
  914. // The constant must be less than podEvictionTimeout.
  915. // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
  916. // status. Kubelet may fail to update node status reliably if the value is too small,
  917. // as it takes time to gather all necessary node information.
  918. nodeStatusUpdateFrequency time.Duration
  919. // nodeStatusReportFrequency is the frequency that kubelet posts node
  920. // status to master. It is only used when node lease feature is enabled.
  921. nodeStatusReportFrequency time.Duration
  922. // lastStatusReportTime is the time when node status was last reported.
  923. lastStatusReportTime time.Time
  924. // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
  925. // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
  926. syncNodeStatusMux sync.Mutex
  927. // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
  928. // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
  929. updatePodCIDRMux sync.Mutex
  930. // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
  931. // This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
  932. updateRuntimeMux sync.Mutex
  933. // nodeLeaseController claims and renews the node lease for this Kubelet
  934. nodeLeaseController nodelease.Controller
  935. // Generates pod events.
  936. pleg pleg.PodLifecycleEventGenerator
  937. // Store kubecontainer.PodStatus for all pods.
  938. podCache kubecontainer.Cache
  939. // os is a facade for various syscalls that need to be mocked during testing.
  940. os kubecontainer.OSInterface
  941. // Watcher of out of memory events.
  942. oomWatcher oomwatcher.Watcher
  943. // Monitor resource usage
  944. resourceAnalyzer serverstats.ResourceAnalyzer
  945. // Whether or not we should have the QOS cgroup hierarchy for resource management
  946. cgroupsPerQOS bool
  947. // If non-empty, pass this to the container runtime as the root cgroup.
  948. cgroupRoot string
  949. // Mounter to use for volumes.
  950. mounter mount.Interface
  951. // subpather to execute subpath actions
  952. subpather subpath.Interface
  953. // Manager of non-Runtime containers.
  954. containerManager cm.ContainerManager
  955. // Maximum Number of Pods which can be run by this Kubelet
  956. maxPods int
  957. // Monitor Kubelet's sync loop
  958. syncLoopMonitor atomic.Value
  959. // Container restart Backoff
  960. backOff *flowcontrol.Backoff
  961. // Channel for sending pods to kill.
  962. podKillingCh chan *kubecontainer.PodPair
  963. // Information about the ports which are opened by daemons on Node running this Kubelet server.
  964. daemonEndpoints *v1.NodeDaemonEndpoints
  965. // A queue used to trigger pod workers.
  966. workQueue queue.WorkQueue
  967. // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
  968. oneTimeInitializer sync.Once
  969. // If non-nil, use this IP address for the node
  970. nodeIP net.IP
  971. // use this function to validate the kubelet nodeIP
  972. nodeIPValidator func(net.IP) error
  973. // If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
  974. providerID string
  975. // clock is an interface that provides time related functionality in a way that makes it
  976. // easy to test the code.
  977. clock clock.Clock
  978. // handlers called during the tryUpdateNodeStatus cycle
  979. setNodeStatusFuncs []func(*v1.Node) error
  980. lastNodeUnschedulableLock sync.Mutex
  981. // maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
  982. lastNodeUnschedulable bool
  983. // TODO: think about moving this to be centralized in PodWorkers in follow-on.
  984. // the list of handlers to call during pod admission.
  985. admitHandlers lifecycle.PodAdmitHandlers
  986. // softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
  987. // run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
  988. // rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
  989. // admission rule should be applied by a softAdmitHandler.
  990. softAdmitHandlers lifecycle.PodAdmitHandlers
  991. // the list of handlers to call during pod sync loop.
  992. lifecycle.PodSyncLoopHandlers
  993. // the list of handlers to call during pod sync.
  994. lifecycle.PodSyncHandlers
  995. // the number of allowed pods per core
  996. podsPerCore int
  997. // enableControllerAttachDetach indicates the Attach/Detach controller
  998. // should manage attachment/detachment of volumes scheduled to this node,
  999. // and disable kubelet from executing any attach/detach operations
  1000. enableControllerAttachDetach bool
  1001. // trigger deleting containers in a pod
  1002. containerDeletor *podContainerDeletor
  1003. // config iptables util rules
  1004. makeIPTablesUtilChains bool
  1005. // The bit of the fwmark space to mark packets for SNAT.
  1006. iptablesMasqueradeBit int
  1007. // The bit of the fwmark space to mark packets for dropping.
  1008. iptablesDropBit int
  1009. // The AppArmor validator for checking whether AppArmor is supported.
  1010. appArmorValidator apparmor.Validator
  1011. // The handler serving CRI streaming calls (exec/attach/port-forward).
  1012. criHandler http.Handler
  1013. // experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
  1014. // are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
  1015. // or using host path volumes.
  1016. // This should only be enabled when the container runtime is performing user remapping AND if the
  1017. // experimental behavior is desired.
  1018. experimentalHostUserNamespaceDefaulting bool
  1019. // dockerLegacyService contains some legacy methods for backward compatibility.
  1020. // It should be set only when docker is using non json-file logging driver.
  1021. dockerLegacyService dockershim.DockerLegacyService
  1022. // StatsProvider provides the node and the container stats.
  1023. *stats.StatsProvider
  1024. // This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
  1025. // This can be useful for debugging volume related issues.
  1026. keepTerminatedPodVolumes bool // DEPRECATED
  1027. // pluginmanager runs a set of asynchronous loops that figure out which
  1028. // plugins need to be registered/unregistered based on this node and makes it so.
  1029. pluginManager pluginmanager.PluginManager
  1030. // This flag sets a maximum number of images to report in the node status.
  1031. nodeStatusMaxImages int32
  1032. // This flag indicates that kubelet should start plugin watcher utility server for discovering Kubelet plugins
  1033. enablePluginsWatcher bool
  1034. // Handles RuntimeClass objects for the Kubelet.
  1035. runtimeClassManager *runtimeclass.Manager
  1036. }
  1037. // setupDataDirs creates:
  1038. // 1. the root directory
  1039. // 2. the pods directory
  1040. // 3. the plugins directory
  1041. // 4. the pod-resources directory
  1042. func (kl *Kubelet) setupDataDirs() error {
  1043. kl.rootDirectory = path.Clean(kl.rootDirectory)
  1044. pluginRegistrationDir := kl.getPluginsRegistrationDir()
  1045. pluginsDir := kl.getPluginsDir()
  1046. if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
  1047. return fmt.Errorf("error creating root directory: %v", err)
  1048. }
  1049. if err := kl.mounter.MakeRShared(kl.getRootDir()); err != nil {
  1050. return fmt.Errorf("error configuring root directory: %v", err)
  1051. }
  1052. if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
  1053. return fmt.Errorf("error creating pods directory: %v", err)
  1054. }
  1055. if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
  1056. return fmt.Errorf("error creating plugins directory: %v", err)
  1057. }
  1058. if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
  1059. return fmt.Errorf("error creating plugins registry directory: %v", err)
  1060. }
  1061. if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
  1062. return fmt.Errorf("error creating podresources directory: %v", err)
  1063. }
  1064. if selinux.SELinuxEnabled() {
  1065. err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
  1066. if err != nil {
  1067. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginRegistrationDir, err)
  1068. }
  1069. err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
  1070. if err != nil {
  1071. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginsDir, err)
  1072. }
  1073. }
  1074. return nil
  1075. }
  1076. // StartGarbageCollection starts garbage collection threads.
  1077. func (kl *Kubelet) StartGarbageCollection() {
  1078. loggedContainerGCFailure := false
  1079. go wait.Until(func() {
  1080. if err := kl.containerGC.GarbageCollect(); err != nil {
  1081. klog.Errorf("Container garbage collection failed: %v", err)
  1082. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
  1083. loggedContainerGCFailure = true
  1084. } else {
  1085. var vLevel klog.Level = 4
  1086. if loggedContainerGCFailure {
  1087. vLevel = 1
  1088. loggedContainerGCFailure = false
  1089. }
  1090. klog.V(vLevel).Infof("Container garbage collection succeeded")
  1091. }
  1092. }, ContainerGCPeriod, wait.NeverStop)
  1093. // when the high threshold is set to 100, stub the image GC manager
  1094. if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
  1095. klog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
  1096. return
  1097. }
  1098. prevImageGCFailed := false
  1099. go wait.Until(func() {
  1100. if err := kl.imageManager.GarbageCollect(); err != nil {
  1101. if prevImageGCFailed {
  1102. klog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
  1103. // Only create an event for repeated failures
  1104. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
  1105. } else {
  1106. klog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
  1107. }
  1108. prevImageGCFailed = true
  1109. } else {
  1110. var vLevel klog.Level = 4
  1111. if prevImageGCFailed {
  1112. vLevel = 1
  1113. prevImageGCFailed = false
  1114. }
  1115. klog.V(vLevel).Infof("Image garbage collection succeeded")
  1116. }
  1117. }, ImageGCPeriod, wait.NeverStop)
  1118. }
  1119. // initializeModules will initialize internal modules that do not require the container runtime to be up.
  1120. // Note that the modules here must not depend on modules that are not initialized here.
  1121. func (kl *Kubelet) initializeModules() error {
  1122. // Prometheus metrics.
  1123. metrics.Register(
  1124. kl.runtimeCache,
  1125. collectors.NewVolumeStatsCollector(kl),
  1126. collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
  1127. )
  1128. metrics.SetNodeName(kl.nodeName)
  1129. servermetrics.Register()
  1130. // Setup filesystem directories.
  1131. if err := kl.setupDataDirs(); err != nil {
  1132. return err
  1133. }
  1134. // If the container logs directory does not exist, create it.
  1135. if _, err := os.Stat(ContainerLogsDir); err != nil {
  1136. if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
  1137. klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
  1138. }
  1139. }
  1140. // Start the image manager.
  1141. kl.imageManager.Start()
  1142. // Start the certificate manager if it was enabled.
  1143. if kl.serverCertificateManager != nil {
  1144. kl.serverCertificateManager.Start()
  1145. }
  1146. // Start out of memory watcher.
  1147. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
  1148. return fmt.Errorf("Failed to start OOM watcher %v", err)
  1149. }
  1150. // Start resource analyzer
  1151. kl.resourceAnalyzer.Start()
  1152. return nil
  1153. }
  1154. // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
  1155. func (kl *Kubelet) initializeRuntimeDependentModules() {
  1156. if err := kl.cadvisor.Start(); err != nil {
  1157. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1158. // TODO(random-liu): Add backoff logic in the babysitter
  1159. klog.Fatalf("Failed to start cAdvisor %v", err)
  1160. }
  1161. // trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
  1162. // ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
  1163. kl.StatsProvider.GetCgroupStats("/", true)
  1164. // Start container manager.
  1165. node, err := kl.getNodeAnyWay()
  1166. if err != nil {
  1167. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1168. klog.Fatalf("Kubelet failed to get node info: %v", err)
  1169. }
  1170. // containerManager must start after cAdvisor because it needs filesystem capacity information
  1171. if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
  1172. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  1173. klog.Fatalf("Failed to start ContainerManager %v", err)
  1174. }
  1175. // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
  1176. kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
  1177. // container log manager must start after container runtime is up to retrieve information from container runtime
  1178. // and inform container to reopen log file after log rotation.
  1179. kl.containerLogManager.Start()
  1180. if kl.enablePluginsWatcher {
  1181. // Adding Registration Callback function for CSI Driver
  1182. kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
  1183. // Adding Registration Callback function for Device Manager
  1184. kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
  1185. // Start the plugin manager
  1186. klog.V(4).Infof("starting plugin manager")
  1187. go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
  1188. }
  1189. }
  1190. // Run starts the kubelet reacting to config updates
  1191. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  1192. if kl.logServer == nil {
  1193. kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
  1194. }
  1195. if kl.kubeClient == nil {
  1196. klog.Warning("No api server defined - no node status update will be sent.")
  1197. }
  1198. // Start the cloud provider sync manager
  1199. if kl.cloudResourceSyncManager != nil {
  1200. go kl.cloudResourceSyncManager.Run(wait.NeverStop)
  1201. }
  1202. if err := kl.initializeModules(); err != nil {
  1203. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  1204. klog.Fatal(err)
  1205. }
  1206. // Start volume manager
  1207. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  1208. if kl.kubeClient != nil {
  1209. // Start syncing node status immediately, this may set up things the runtime needs to run.
  1210. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  1211. go kl.fastStatusUpdateOnce()
  1212. // start syncing lease
  1213. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  1214. go kl.nodeLeaseController.Run(wait.NeverStop)
  1215. }
  1216. }
  1217. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  1218. // Start loop to sync iptables util rules
  1219. if kl.makeIPTablesUtilChains {
  1220. go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
  1221. }
  1222. // Start a goroutine responsible for killing pods (that are not properly
  1223. // handled by pod workers).
  1224. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
  1225. // Start component sync loops.
  1226. kl.statusManager.Start()
  1227. kl.probeManager.Start()
  1228. // Start syncing RuntimeClasses if enabled.
  1229. if kl.runtimeClassManager != nil {
  1230. kl.runtimeClassManager.Start(wait.NeverStop)
  1231. }
  1232. // Start the pod lifecycle event generator.
  1233. kl.pleg.Start()
  1234. kl.syncLoop(updates, kl)
  1235. }
  1236. // syncPod is the transaction script for the sync of a single pod.
  1237. //
  1238. // Arguments:
  1239. //
  1240. // o - the SyncPodOptions for this invocation
  1241. //
  1242. // The workflow is:
  1243. // * If the pod is being created, record pod worker start latency
  1244. // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
  1245. // * If the pod is being seen as running for the first time, record pod
  1246. // start latency
  1247. // * Update the status of the pod in the status manager
  1248. // * Kill the pod if it should not be running
  1249. // * Create a mirror pod if the pod is a static pod, and does not
  1250. // already have a mirror pod
  1251. // * Create the data directories for the pod if they do not exist
  1252. // * Wait for volumes to attach/mount
  1253. // * Fetch the pull secrets for the pod
  1254. // * Call the container runtime's SyncPod callback
  1255. // * Update the traffic shaping for the pod's ingress and egress limits
  1256. //
  1257. // If any step of this workflow errors, the error is returned, and is repeated
  1258. // on the next syncPod call.
  1259. //
  1260. // This operation writes all events that are dispatched in order to provide
  1261. // the most accurate information possible about an error situation to aid debugging.
  1262. // Callers should not throw an event if this operation returns an error.
  1263. func (kl *Kubelet) syncPod(o syncPodOptions) error {
  1264. // pull out the required options
  1265. pod := o.pod
  1266. mirrorPod := o.mirrorPod
  1267. podStatus := o.podStatus
  1268. updateType := o.updateType
  1269. // if we want to kill a pod, do it now!
  1270. if updateType == kubetypes.SyncPodKill {
  1271. killPodOptions := o.killPodOptions
  1272. if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
  1273. return fmt.Errorf("kill pod options are required if update type is kill")
  1274. }
  1275. apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
  1276. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1277. // we kill the pod with the specified grace period since this is a termination
  1278. if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
  1279. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
  1280. // there was an error killing the pod, so we return that error directly
  1281. utilruntime.HandleError(err)
  1282. return err
  1283. }
  1284. return nil
  1285. }
  1286. // Latency measurements for the main workflow are relative to the
  1287. // first time the pod was seen by the API server.
  1288. var firstSeenTime time.Time
  1289. if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
  1290. firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
  1291. }
  1292. // Record pod worker start latency if being created
  1293. // TODO: make pod workers record their own latencies
  1294. if updateType == kubetypes.SyncPodCreate {
  1295. if !firstSeenTime.IsZero() {
  1296. // This is the first time we are syncing the pod. Record the latency
  1297. // since kubelet first saw the pod if firstSeenTime is set.
  1298. metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
  1299. metrics.DeprecatedPodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
  1300. } else {
  1301. klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
  1302. }
  1303. }
  1304. // Generate final API pod status with pod and status manager status
  1305. apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
  1306. // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
  1307. // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
  1308. // set pod IP to hostIP directly in runtime.GetPodStatus
  1309. podStatus.IP = apiPodStatus.PodIP
  1310. // Record the time it takes for the pod to become running.
  1311. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
  1312. if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
  1313. !firstSeenTime.IsZero() {
  1314. metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
  1315. metrics.DeprecatedPodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
  1316. }
  1317. runnable := kl.canRunPod(pod)
  1318. if !runnable.Admit {
  1319. // Pod is not runnable; update the Pod and Container statuses to why.
  1320. apiPodStatus.Reason = runnable.Reason
  1321. apiPodStatus.Message = runnable.Message
  1322. // Waiting containers are not creating.
  1323. const waitingReason = "Blocked"
  1324. for _, cs := range apiPodStatus.InitContainerStatuses {
  1325. if cs.State.Waiting != nil {
  1326. cs.State.Waiting.Reason = waitingReason
  1327. }
  1328. }
  1329. for _, cs := range apiPodStatus.ContainerStatuses {
  1330. if cs.State.Waiting != nil {
  1331. cs.State.Waiting.Reason = waitingReason
  1332. }
  1333. }
  1334. }
  1335. // Update status in the status manager
  1336. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1337. // Kill pod if it should not be running
  1338. if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
  1339. var syncErr error
  1340. if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
  1341. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
  1342. syncErr = fmt.Errorf("error killing pod: %v", err)
  1343. utilruntime.HandleError(syncErr)
  1344. } else {
  1345. if !runnable.Admit {
  1346. // There was no error killing the pod, but the pod cannot be run.
  1347. // Return an error to signal that the sync loop should back off.
  1348. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
  1349. }
  1350. }
  1351. return syncErr
  1352. }
  1353. // If the network plugin is not ready, only start the pod if it uses the host network
  1354. if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
  1355. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
  1356. return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
  1357. }
  1358. // Create Cgroups for the pod and apply resource parameters
  1359. // to them if cgroups-per-qos flag is enabled.
  1360. pcm := kl.containerManager.NewPodContainerManager()
  1361. // If pod has already been terminated then we need not create
  1362. // or update the pod's cgroup
  1363. if !kl.podIsTerminated(pod) {
  1364. // When the kubelet is restarted with the cgroups-per-qos
  1365. // flag enabled, all the pod's running containers
  1366. // should be killed intermittently and brought back up
  1367. // under the qos cgroup hierarchy.
  1368. // Check if this is the pod's first sync
  1369. firstSync := true
  1370. for _, containerStatus := range apiPodStatus.ContainerStatuses {
  1371. if containerStatus.State.Running != nil {
  1372. firstSync = false
  1373. break
  1374. }
  1375. }
  1376. // Don't kill containers in pod if pod's cgroups already
  1377. // exists or the pod is running for the first time
  1378. podKilled := false
  1379. if !pcm.Exists(pod) && !firstSync {
  1380. if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
  1381. podKilled = true
  1382. }
  1383. }
  1384. // Create and Update pod's Cgroups
  1385. // Don't create cgroups for run once pod if it was killed above
  1386. // The current policy is not to restart the run once pods when
  1387. // the kubelet is restarted with the new flag as run once pods are
  1388. // expected to run only once and if the kubelet is restarted then
  1389. // they are not expected to run again.
  1390. // We don't create and apply updates to cgroup if its a run once pod and was killed above
  1391. if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
  1392. if !pcm.Exists(pod) {
  1393. if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
  1394. klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
  1395. }
  1396. if err := pcm.EnsureExists(pod); err != nil {
  1397. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
  1398. return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
  1399. }
  1400. }
  1401. }
  1402. }
  1403. // Create Mirror Pod for Static Pod if it doesn't already exist
  1404. if kubepod.IsStaticPod(pod) {
  1405. podFullName := kubecontainer.GetPodFullName(pod)
  1406. deleted := false
  1407. if mirrorPod != nil {
  1408. if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
  1409. // The mirror pod is semantically different from the static pod. Remove
  1410. // it. The mirror pod will get recreated later.
  1411. klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
  1412. var err error
  1413. deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
  1414. if deleted {
  1415. klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
  1416. } else if err != nil {
  1417. klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
  1418. }
  1419. }
  1420. }
  1421. if mirrorPod == nil || deleted {
  1422. node, err := kl.GetNode()
  1423. if err != nil || node.DeletionTimestamp != nil {
  1424. klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
  1425. } else {
  1426. klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
  1427. if err := kl.podManager.CreateMirrorPod(pod); err != nil {
  1428. klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
  1429. }
  1430. }
  1431. }
  1432. }
  1433. // Make data directories for the pod
  1434. if err := kl.makePodDataDirs(pod); err != nil {
  1435. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
  1436. klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
  1437. return err
  1438. }
  1439. // Volume manager will not mount volumes for terminated pods
  1440. if !kl.podIsTerminated(pod) {
  1441. // Wait for volumes to attach/mount
  1442. if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
  1443. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
  1444. klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
  1445. return err
  1446. }
  1447. }
  1448. // Fetch the pull secrets for the pod
  1449. pullSecrets := kl.getPullSecretsForPod(pod)
  1450. // Call the container runtime's SyncPod callback
  1451. result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
  1452. kl.reasonCache.Update(pod.UID, result)
  1453. if err := result.Error(); err != nil {
  1454. // Do not return error if the only failures were pods in backoff
  1455. for _, r := range result.SyncResults {
  1456. if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
  1457. // Do not record an event here, as we keep all event logging for sync pod failures
  1458. // local to container runtime so we get better errors
  1459. return err
  1460. }
  1461. }
  1462. return nil
  1463. }
  1464. return nil
  1465. }
  1466. // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
  1467. // * pod whose work is ready.
  1468. // * internal modules that request sync of a pod.
  1469. func (kl *Kubelet) getPodsToSync() []*v1.Pod {
  1470. allPods := kl.podManager.GetPods()
  1471. podUIDs := kl.workQueue.GetWork()
  1472. podUIDSet := sets.NewString()
  1473. for _, podUID := range podUIDs {
  1474. podUIDSet.Insert(string(podUID))
  1475. }
  1476. var podsToSync []*v1.Pod
  1477. for _, pod := range allPods {
  1478. if podUIDSet.Has(string(pod.UID)) {
  1479. // The work of the pod is ready
  1480. podsToSync = append(podsToSync, pod)
  1481. continue
  1482. }
  1483. for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
  1484. if podSyncLoopHandler.ShouldSync(pod) {
  1485. podsToSync = append(podsToSync, pod)
  1486. break
  1487. }
  1488. }
  1489. }
  1490. return podsToSync
  1491. }
  1492. // deletePod deletes the pod from the internal state of the kubelet by:
  1493. // 1. stopping the associated pod worker asynchronously
  1494. // 2. signaling to kill the pod by sending on the podKillingCh channel
  1495. //
  1496. // deletePod returns an error if not all sources are ready or the pod is not
  1497. // found in the runtime cache.
  1498. func (kl *Kubelet) deletePod(pod *v1.Pod) error {
  1499. if pod == nil {
  1500. return fmt.Errorf("deletePod does not allow nil pod")
  1501. }
  1502. if !kl.sourcesReady.AllReady() {
  1503. // If the sources aren't ready, skip deletion, as we may accidentally delete pods
  1504. // for sources that haven't reported yet.
  1505. return fmt.Errorf("skipping delete because sources aren't ready yet")
  1506. }
  1507. kl.podWorkers.ForgetWorker(pod.UID)
  1508. // Runtime cache may not have been updated to with the pod, but it's okay
  1509. // because the periodic cleanup routine will attempt to delete again later.
  1510. runningPods, err := kl.runtimeCache.GetPods()
  1511. if err != nil {
  1512. return fmt.Errorf("error listing containers: %v", err)
  1513. }
  1514. runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
  1515. if runningPod.IsEmpty() {
  1516. return fmt.Errorf("pod not found")
  1517. }
  1518. podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
  1519. kl.podKillingCh <- &podPair
  1520. // TODO: delete the mirror pod here?
  1521. // We leave the volume/directory cleanup to the periodic cleanup routine.
  1522. return nil
  1523. }
  1524. // rejectPod records an event about the pod with the given reason and message,
  1525. // and updates the pod to the failed phase in the status manage.
  1526. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
  1527. kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
  1528. kl.statusManager.SetPodStatus(pod, v1.PodStatus{
  1529. Phase: v1.PodFailed,
  1530. Reason: reason,
  1531. Message: "Pod " + message})
  1532. }
  1533. // canAdmitPod determines if a pod can be admitted, and gives a reason if it
  1534. // cannot. "pod" is new pod, while "pods" are all admitted pods
  1535. // The function returns a boolean value indicating whether the pod
  1536. // can be admitted, a brief single-word reason and a message explaining why
  1537. // the pod cannot be admitted.
  1538. func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
  1539. // the kubelet will invoke each pod admit handler in sequence
  1540. // if any handler rejects, the pod is rejected.
  1541. // TODO: move out of disk check into a pod admitter
  1542. // TODO: out of resource eviction should have a pod admitter call-out
  1543. attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
  1544. for _, podAdmitHandler := range kl.admitHandlers {
  1545. if result := podAdmitHandler.Admit(attrs); !result.Admit {
  1546. return false, result.Reason, result.Message
  1547. }
  1548. }
  1549. return true, "", ""
  1550. }
  1551. func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
  1552. attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
  1553. // Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
  1554. attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
  1555. for _, handler := range kl.softAdmitHandlers {
  1556. if result := handler.Admit(attrs); !result.Admit {
  1557. return result
  1558. }
  1559. }
  1560. return lifecycle.PodAdmitResult{Admit: true}
  1561. }
  1562. // syncLoop is the main loop for processing changes. It watches for changes from
  1563. // three channels (file, apiserver, and http) and creates a union of them. For
  1564. // any new change seen, will run a sync against desired state and running state. If
  1565. // no changes are seen to the configuration, will synchronize the last known desired
  1566. // state every sync-frequency seconds. Never returns.
  1567. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  1568. klog.Info("Starting kubelet main sync loop.")
  1569. // The syncTicker wakes up kubelet to checks if there are any pod workers
  1570. // that need to be sync'd. A one-second period is sufficient because the
  1571. // sync interval is defaulted to 10s.
  1572. syncTicker := time.NewTicker(time.Second)
  1573. defer syncTicker.Stop()
  1574. housekeepingTicker := time.NewTicker(housekeepingPeriod)
  1575. defer housekeepingTicker.Stop()
  1576. plegCh := kl.pleg.Watch()
  1577. const (
  1578. base = 100 * time.Millisecond
  1579. max = 5 * time.Second
  1580. factor = 2
  1581. )
  1582. duration := base
  1583. for {
  1584. if err := kl.runtimeState.runtimeErrors(); err != nil {
  1585. klog.Infof("skipping pod synchronization - %v", err)
  1586. // exponential backoff
  1587. time.Sleep(duration)
  1588. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
  1589. continue
  1590. }
  1591. // reset backoff if we have a success
  1592. duration = base
  1593. kl.syncLoopMonitor.Store(kl.clock.Now())
  1594. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  1595. break
  1596. }
  1597. kl.syncLoopMonitor.Store(kl.clock.Now())
  1598. }
  1599. }
  1600. // syncLoopIteration reads from various channels and dispatches pods to the
  1601. // given handler.
  1602. //
  1603. // Arguments:
  1604. // 1. configCh: a channel to read config events from
  1605. // 2. handler: the SyncHandler to dispatch pods to
  1606. // 3. syncCh: a channel to read periodic sync events from
  1607. // 4. housekeepingCh: a channel to read housekeeping events from
  1608. // 5. plegCh: a channel to read PLEG updates from
  1609. //
  1610. // Events are also read from the kubelet liveness manager's update channel.
  1611. //
  1612. // The workflow is to read from one of the channels, handle that event, and
  1613. // update the timestamp in the sync loop monitor.
  1614. //
  1615. // Here is an appropriate place to note that despite the syntactical
  1616. // similarity to the switch statement, the case statements in a select are
  1617. // evaluated in a pseudorandom order if there are multiple channels ready to
  1618. // read from when the select is evaluated. In other words, case statements
  1619. // are evaluated in random order, and you can not assume that the case
  1620. // statements evaluate in order if multiple channels have events.
  1621. //
  1622. // With that in mind, in truly no particular order, the different channels
  1623. // are handled as follows:
  1624. //
  1625. // * configCh: dispatch the pods for the config change to the appropriate
  1626. // handler callback for the event type
  1627. // * plegCh: update the runtime cache; sync pod
  1628. // * syncCh: sync all pods waiting for sync
  1629. // * housekeepingCh: trigger cleanup of pods
  1630. // * liveness manager: sync pods that have failed or in which one or more
  1631. // containers have failed liveness checks
  1632. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  1633. syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  1634. select {
  1635. case u, open := <-configCh:
  1636. // Update from a config source; dispatch it to the right handler
  1637. // callback.
  1638. if !open {
  1639. klog.Errorf("Update channel is closed. Exiting the sync loop.")
  1640. return false
  1641. }
  1642. switch u.Op {
  1643. case kubetypes.ADD:
  1644. klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
  1645. // After restarting, kubelet will get all existing pods through
  1646. // ADD as if they are new pods. These pods will then go through the
  1647. // admission process and *may* be rejected. This can be resolved
  1648. // once we have checkpointing.
  1649. handler.HandlePodAdditions(u.Pods)
  1650. case kubetypes.UPDATE:
  1651. klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
  1652. handler.HandlePodUpdates(u.Pods)
  1653. case kubetypes.REMOVE:
  1654. klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
  1655. handler.HandlePodRemoves(u.Pods)
  1656. case kubetypes.RECONCILE:
  1657. klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
  1658. handler.HandlePodReconcile(u.Pods)
  1659. case kubetypes.DELETE:
  1660. klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
  1661. // DELETE is treated as a UPDATE because of graceful deletion.
  1662. handler.HandlePodUpdates(u.Pods)
  1663. case kubetypes.RESTORE:
  1664. klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
  1665. // These are pods restored from the checkpoint. Treat them as new
  1666. // pods.
  1667. handler.HandlePodAdditions(u.Pods)
  1668. case kubetypes.SET:
  1669. // TODO: Do we want to support this?
  1670. klog.Errorf("Kubelet does not support snapshot update")
  1671. }
  1672. if u.Op != kubetypes.RESTORE {
  1673. // If the update type is RESTORE, it means that the update is from
  1674. // the pod checkpoints and may be incomplete. Do not mark the
  1675. // source as ready.
  1676. // Mark the source ready after receiving at least one update from the
  1677. // source. Once all the sources are marked ready, various cleanup
  1678. // routines will start reclaiming resources. It is important that this
  1679. // takes place only after kubelet calls the update handler to process
  1680. // the update to ensure the internal pod cache is up-to-date.
  1681. kl.sourcesReady.AddSource(u.Source)
  1682. }
  1683. case e := <-plegCh:
  1684. if isSyncPodWorthy(e) {
  1685. // PLEG event for a pod; sync it.
  1686. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
  1687. klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
  1688. handler.HandlePodSyncs([]*v1.Pod{pod})
  1689. } else {
  1690. // If the pod no longer exists, ignore the event.
  1691. klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
  1692. }
  1693. }
  1694. if e.Type == pleg.ContainerDied {
  1695. if containerID, ok := e.Data.(string); ok {
  1696. kl.cleanUpContainersInPod(e.ID, containerID)
  1697. }
  1698. }
  1699. case <-syncCh:
  1700. // Sync pods waiting for sync
  1701. podsToSync := kl.getPodsToSync()
  1702. if len(podsToSync) == 0 {
  1703. break
  1704. }
  1705. klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
  1706. handler.HandlePodSyncs(podsToSync)
  1707. case update := <-kl.livenessManager.Updates():
  1708. if update.Result == proberesults.Failure {
  1709. // The liveness manager detected a failure; sync the pod.
  1710. // We should not use the pod from livenessManager, because it is never updated after
  1711. // initialization.
  1712. pod, ok := kl.podManager.GetPodByUID(update.PodUID)
  1713. if !ok {
  1714. // If the pod no longer exists, ignore the update.
  1715. klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
  1716. break
  1717. }
  1718. klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
  1719. handler.HandlePodSyncs([]*v1.Pod{pod})
  1720. }
  1721. case <-housekeepingCh:
  1722. if !kl.sourcesReady.AllReady() {
  1723. // If the sources aren't ready or volume manager has not yet synced the states,
  1724. // skip housekeeping, as we may accidentally delete pods from unready sources.
  1725. klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
  1726. } else {
  1727. klog.V(4).Infof("SyncLoop (housekeeping)")
  1728. if err := handler.HandlePodCleanups(); err != nil {
  1729. klog.Errorf("Failed cleaning pods: %v", err)
  1730. }
  1731. }
  1732. }
  1733. return true
  1734. }
  1735. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
  1736. // If the pod is terminated, dispatchWork
  1737. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
  1738. if kl.podIsTerminated(pod) {
  1739. if pod.DeletionTimestamp != nil {
  1740. // If the pod is in a terminated state, there is no pod worker to
  1741. // handle the work item. Check if the DeletionTimestamp has been
  1742. // set, and force a status update to trigger a pod deletion request
  1743. // to the apiserver.
  1744. kl.statusManager.TerminatePod(pod)
  1745. }
  1746. return
  1747. }
  1748. // Run the sync in an async worker.
  1749. kl.podWorkers.UpdatePod(&UpdatePodOptions{
  1750. Pod: pod,
  1751. MirrorPod: mirrorPod,
  1752. UpdateType: syncType,
  1753. OnCompleteFunc: func(err error) {
  1754. if err != nil {
  1755. metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
  1756. metrics.DeprecatedPodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
  1757. }
  1758. },
  1759. })
  1760. // Note the number of containers for new pods.
  1761. if syncType == kubetypes.SyncPodCreate {
  1762. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
  1763. }
  1764. }
  1765. // TODO: handle mirror pods in a separate component (issue #17251)
  1766. func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
  1767. // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
  1768. // corresponding static pod. Send update to the pod worker if the static
  1769. // pod exists.
  1770. if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
  1771. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  1772. }
  1773. }
  1774. // HandlePodAdditions is the callback in SyncHandler for pods being added from
  1775. // a config source.
  1776. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
  1777. start := kl.clock.Now()
  1778. sort.Sort(sliceutils.PodsByCreationTime(pods))
  1779. for _, pod := range pods {
  1780. // Responsible for checking limits in resolv.conf
  1781. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  1782. kl.dnsConfigurer.CheckLimitsForResolvConf()
  1783. }
  1784. existingPods := kl.podManager.GetPods()
  1785. // Always add the pod to the pod manager. Kubelet relies on the pod
  1786. // manager as the source of truth for the desired state. If a pod does
  1787. // not exist in the pod manager, it means that it has been deleted in
  1788. // the apiserver and no action (other than cleanup) is required.
  1789. kl.podManager.AddPod(pod)
  1790. if kubepod.IsMirrorPod(pod) {
  1791. kl.handleMirrorPod(pod, start)
  1792. continue
  1793. }
  1794. if !kl.podIsTerminated(pod) {
  1795. // Only go through the admission process if the pod is not
  1796. // terminated.
  1797. // We failed pods that we rejected, so activePods include all admitted
  1798. // pods that are alive.
  1799. activePods := kl.filterOutTerminatedPods(existingPods)
  1800. // Check if we can admit the pod; if not, reject it.
  1801. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
  1802. kl.rejectPod(pod, reason, message)
  1803. continue
  1804. }
  1805. }
  1806. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1807. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
  1808. kl.probeManager.AddPod(pod)
  1809. }
  1810. }
  1811. // HandlePodUpdates is the callback in the SyncHandler interface for pods
  1812. // being updated from a config source.
  1813. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
  1814. start := kl.clock.Now()
  1815. for _, pod := range pods {
  1816. // Responsible for checking limits in resolv.conf
  1817. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  1818. kl.dnsConfigurer.CheckLimitsForResolvConf()
  1819. }
  1820. kl.podManager.UpdatePod(pod)
  1821. if kubepod.IsMirrorPod(pod) {
  1822. kl.handleMirrorPod(pod, start)
  1823. continue
  1824. }
  1825. // TODO: Evaluate if we need to validate and reject updates.
  1826. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1827. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  1828. }
  1829. }
  1830. // HandlePodRemoves is the callback in the SyncHandler interface for pods
  1831. // being removed from a config source.
  1832. func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
  1833. start := kl.clock.Now()
  1834. for _, pod := range pods {
  1835. kl.podManager.DeletePod(pod)
  1836. if kubepod.IsMirrorPod(pod) {
  1837. kl.handleMirrorPod(pod, start)
  1838. continue
  1839. }
  1840. // Deletion is allowed to fail because the periodic cleanup routine
  1841. // will trigger deletion again.
  1842. if err := kl.deletePod(pod); err != nil {
  1843. klog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
  1844. }
  1845. kl.probeManager.RemovePod(pod)
  1846. }
  1847. }
  1848. // HandlePodReconcile is the callback in the SyncHandler interface for pods
  1849. // that should be reconciled.
  1850. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
  1851. start := kl.clock.Now()
  1852. for _, pod := range pods {
  1853. // Update the pod in pod manager, status manager will do periodically reconcile according
  1854. // to the pod manager.
  1855. kl.podManager.UpdatePod(pod)
  1856. // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
  1857. if status.NeedToReconcilePodReadiness(pod) {
  1858. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1859. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
  1860. }
  1861. // After an evicted pod is synced, all dead containers in the pod can be removed.
  1862. if eviction.PodIsEvicted(pod.Status) {
  1863. if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
  1864. kl.containerDeletor.deleteContainersInPod("", podStatus, true)
  1865. }
  1866. }
  1867. }
  1868. }
  1869. // HandlePodSyncs is the callback in the syncHandler interface for pods
  1870. // that should be dispatched to pod workers for sync.
  1871. func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
  1872. start := kl.clock.Now()
  1873. for _, pod := range pods {
  1874. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  1875. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
  1876. }
  1877. }
  1878. // LatestLoopEntryTime returns the last time in the sync loop monitor.
  1879. func (kl *Kubelet) LatestLoopEntryTime() time.Time {
  1880. val := kl.syncLoopMonitor.Load()
  1881. if val == nil {
  1882. return time.Time{}
  1883. }
  1884. return val.(time.Time)
  1885. }
  1886. // updateRuntimeUp calls the container runtime status callback, initializing
  1887. // the runtime dependent modules when the container runtime first comes up,
  1888. // and returns an error if the status check fails. If the status check is OK,
  1889. // update the container runtime uptime in the kubelet runtimeState.
  1890. func (kl *Kubelet) updateRuntimeUp() {
  1891. kl.updateRuntimeMux.Lock()
  1892. defer kl.updateRuntimeMux.Unlock()
  1893. s, err := kl.containerRuntime.Status()
  1894. if err != nil {
  1895. klog.Errorf("Container runtime sanity check failed: %v", err)
  1896. return
  1897. }
  1898. if s == nil {
  1899. klog.Errorf("Container runtime status is nil")
  1900. return
  1901. }
  1902. // Periodically log the whole runtime status for debugging.
  1903. // TODO(random-liu): Consider to send node event when optional
  1904. // condition is unmet.
  1905. klog.V(4).Infof("Container runtime status: %v", s)
  1906. networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
  1907. if networkReady == nil || !networkReady.Status {
  1908. klog.Errorf("Container runtime network not ready: %v", networkReady)
  1909. kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
  1910. } else {
  1911. // Set nil if the container runtime network is ready.
  1912. kl.runtimeState.setNetworkState(nil)
  1913. }
  1914. // TODO(random-liu): Add runtime error in runtimeState, and update it
  1915. // when runtime is not ready, so that the information in RuntimeReady
  1916. // condition will be propagated to NodeReady condition.
  1917. runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
  1918. // If RuntimeReady is not set or is false, report an error.
  1919. if runtimeReady == nil || !runtimeReady.Status {
  1920. klog.Errorf("Container runtime not ready: %v", runtimeReady)
  1921. return
  1922. }
  1923. kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
  1924. kl.runtimeState.setRuntimeSync(kl.clock.Now())
  1925. }
  1926. // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
  1927. func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
  1928. return kl.kubeletConfiguration
  1929. }
  1930. // BirthCry sends an event that the kubelet has started up.
  1931. func (kl *Kubelet) BirthCry() {
  1932. // Make an event that kubelet restarted.
  1933. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
  1934. }
  1935. // ResyncInterval returns the interval used for periodic syncs.
  1936. func (kl *Kubelet) ResyncInterval() time.Duration {
  1937. return kl.resyncInterval
  1938. }
  1939. // ListenAndServe runs the kubelet HTTP server.
  1940. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool) {
  1941. server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
  1942. }
  1943. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
  1944. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
  1945. server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, enableCAdvisorJSONEndpoints)
  1946. }
  1947. // ListenAndServePodResources runs the kubelet podresources grpc service
  1948. func (kl *Kubelet) ListenAndServePodResources() {
  1949. socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
  1950. if err != nil {
  1951. klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
  1952. return
  1953. }
  1954. server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
  1955. }
  1956. // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
  1957. func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
  1958. if podStatus, err := kl.podCache.Get(podID); err == nil {
  1959. removeAll := false
  1960. if syncedPod, ok := kl.podManager.GetPodByUID(podID); ok {
  1961. // generate the api status using the cached runtime status to get up-to-date ContainerStatuses
  1962. apiPodStatus := kl.generateAPIPodStatus(syncedPod, podStatus)
  1963. // When an evicted or deleted pod has already synced, all containers can be removed.
  1964. removeAll = eviction.PodIsEvicted(syncedPod.Status) || (syncedPod.DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses))
  1965. }
  1966. kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
  1967. }
  1968. }
  1969. // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
  1970. // is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
  1971. // a runtime update and a node status update. Function returns after one successful node status update.
  1972. // Function is executed only during Kubelet start which improves latency to ready node by updating
  1973. // pod CIDR, runtime status and node statuses ASAP.
  1974. func (kl *Kubelet) fastStatusUpdateOnce() {
  1975. for {
  1976. time.Sleep(100 * time.Millisecond)
  1977. node, err := kl.GetNode()
  1978. if err != nil {
  1979. klog.Errorf(err.Error())
  1980. continue
  1981. }
  1982. if node.Spec.PodCIDR != "" {
  1983. if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
  1984. klog.Errorf("Pod CIDR update failed %v", err)
  1985. continue
  1986. }
  1987. kl.updateRuntimeUp()
  1988. kl.syncNodeStatus()
  1989. return
  1990. }
  1991. }
  1992. }
  1993. // isSyncPodWorthy filters out events that are not worthy of pod syncing
  1994. func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
  1995. // ContatnerRemoved doesn't affect pod state
  1996. return event.Type != pleg.ContainerRemoved
  1997. }
  1998. // Gets the streaming server configuration to use with in-process CRI shims.
  1999. func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
  2000. config := &streaming.Config{
  2001. StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
  2002. StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
  2003. SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
  2004. SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
  2005. }
  2006. if !crOptions.RedirectContainerStreaming {
  2007. config.Addr = net.JoinHostPort("localhost", "0")
  2008. } else {
  2009. // Use a relative redirect (no scheme or host).
  2010. config.BaseURL = &url.URL{
  2011. Path: "/cri/",
  2012. }
  2013. if kubeDeps.TLSOptions != nil {
  2014. config.TLSConfig = kubeDeps.TLSOptions.Config
  2015. }
  2016. }
  2017. return config
  2018. }