12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kubelet
- import (
- "context"
- "crypto/tls"
- "fmt"
- "math"
- "net"
- "net/http"
- "net/url"
- "os"
- "path"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/clock"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- clientset "k8s.io/client-go/kubernetes"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/certificate"
- "k8s.io/client-go/util/flowcontrol"
- cloudprovider "k8s.io/cloud-provider"
- internalapi "k8s.io/cri-api/pkg/apis"
- "k8s.io/klog"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/features"
- kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
- pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
- "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
- "k8s.io/kubernetes/pkg/kubelet/cadvisor"
- kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
- "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
- "k8s.io/kubernetes/pkg/kubelet/cloudresource"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- "k8s.io/kubernetes/pkg/kubelet/config"
- "k8s.io/kubernetes/pkg/kubelet/configmap"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/dockershim"
- dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
- "k8s.io/kubernetes/pkg/kubelet/events"
- "k8s.io/kubernetes/pkg/kubelet/eviction"
- "k8s.io/kubernetes/pkg/kubelet/images"
- "k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
- "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
- "k8s.io/kubernetes/pkg/kubelet/lifecycle"
- "k8s.io/kubernetes/pkg/kubelet/logs"
- "k8s.io/kubernetes/pkg/kubelet/metrics"
- "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
- "k8s.io/kubernetes/pkg/kubelet/network/dns"
- "k8s.io/kubernetes/pkg/kubelet/nodelease"
- oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
- "k8s.io/kubernetes/pkg/kubelet/pleg"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
- plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
- kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
- "k8s.io/kubernetes/pkg/kubelet/preemption"
- "k8s.io/kubernetes/pkg/kubelet/prober"
- proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
- "k8s.io/kubernetes/pkg/kubelet/remote"
- "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
- "k8s.io/kubernetes/pkg/kubelet/secret"
- "k8s.io/kubernetes/pkg/kubelet/server"
- servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
- serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
- "k8s.io/kubernetes/pkg/kubelet/server/streaming"
- "k8s.io/kubernetes/pkg/kubelet/stats"
- "k8s.io/kubernetes/pkg/kubelet/status"
- "k8s.io/kubernetes/pkg/kubelet/sysctl"
- "k8s.io/kubernetes/pkg/kubelet/token"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/kubelet/util"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/kubelet/util/manager"
- "k8s.io/kubernetes/pkg/kubelet/util/queue"
- "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- "k8s.io/kubernetes/pkg/security/apparmor"
- sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
- utildbus "k8s.io/kubernetes/pkg/util/dbus"
- utilipt "k8s.io/kubernetes/pkg/util/iptables"
- "k8s.io/kubernetes/pkg/util/mount"
- nodeutil "k8s.io/kubernetes/pkg/util/node"
- "k8s.io/kubernetes/pkg/util/oom"
- "k8s.io/kubernetes/pkg/util/selinux"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/csi"
- "k8s.io/kubernetes/pkg/volume/util/subpath"
- utilexec "k8s.io/utils/exec"
- "k8s.io/utils/integer"
- )
- const (
- // Max amount of time to wait for the container runtime to come up.
- maxWaitForContainerRuntime = 30 * time.Second
- // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
- nodeStatusUpdateRetry = 5
- // ContainerLogsDir is the location of container logs.
- ContainerLogsDir = "/var/log/containers"
- // MaxContainerBackOff is the max backoff period, exported for the e2e test
- MaxContainerBackOff = 300 * time.Second
- // Capacity of the channel for storing pods to kill. A small number should
- // suffice because a goroutine is dedicated to check the channel and does
- // not block on anything else.
- podKillingChannelCapacity = 50
- // Period for performing global cleanup tasks.
- housekeepingPeriod = time.Second * 2
- // Period for performing eviction monitoring.
- // TODO ensure this is in sync with internal cadvisor housekeeping.
- evictionMonitoringPeriod = time.Second * 10
- // The path in containers' filesystems where the hosts file is mounted.
- etcHostsPath = "/etc/hosts"
- // Capacity of the channel for receiving pod lifecycle events. This number
- // is a bit arbitrary and may be adjusted in the future.
- plegChannelCapacity = 1000
- // Generic PLEG relies on relisting for discovering container events.
- // A longer period means that kubelet will take longer to detect container
- // changes and to update pod status. On the other hand, a shorter period
- // will cause more frequent relisting (e.g., container runtime operations),
- // leading to higher cpu usage.
- // Note that even though we set the period to 1s, the relisting itself can
- // take more than 1s to finish if the container runtime responds slowly
- // and/or when there are many container changes in one cycle.
- plegRelistPeriod = time.Second * 1
- // backOffPeriod is the period to back off when pod syncing results in an
- // error. It is also used as the base period for the exponential backoff
- // container restarts and image pulls.
- backOffPeriod = time.Second * 10
- // ContainerGCPeriod is the period for performing container garbage collection.
- ContainerGCPeriod = time.Minute
- // ImageGCPeriod is the period for performing image garbage collection.
- ImageGCPeriod = 5 * time.Minute
- // Minimum number of dead containers to keep in a pod
- minDeadContainerInPod = 1
- )
- // SyncHandler is an interface implemented by Kubelet, for testability
- type SyncHandler interface {
- HandlePodAdditions(pods []*v1.Pod)
- HandlePodUpdates(pods []*v1.Pod)
- HandlePodRemoves(pods []*v1.Pod)
- HandlePodReconcile(pods []*v1.Pod)
- HandlePodSyncs(pods []*v1.Pod)
- HandlePodCleanups() error
- }
- // Option is a functional option type for Kubelet
- type Option func(*Kubelet)
- // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
- type Bootstrap interface {
- GetConfiguration() kubeletconfiginternal.KubeletConfiguration
- BirthCry()
- StartGarbageCollection()
- ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool)
- ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
- ListenAndServePodResources()
- Run(<-chan kubetypes.PodUpdate)
- RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
- }
- // Builder creates and initializes a Kubelet instance
- type Builder func(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
- kubeDeps *Dependencies,
- crOptions *config.ContainerRuntimeOptions,
- containerRuntime string,
- runtimeCgroups string,
- hostnameOverride string,
- nodeIP string,
- providerID string,
- cloudProvider string,
- certDirectory string,
- rootDirectory string,
- registerNode bool,
- registerWithTaints []api.Taint,
- allowedUnsafeSysctls []string,
- remoteRuntimeEndpoint string,
- remoteImageEndpoint string,
- experimentalMounterPath string,
- experimentalKernelMemcgNotification bool,
- experimentalCheckNodeCapabilitiesBeforeMount bool,
- experimentalNodeAllocatableIgnoreEvictionThreshold bool,
- minimumGCAge metav1.Duration,
- maxPerPodContainerCount int32,
- maxContainerCount int32,
- masterServiceNamespace string,
- registerSchedulable bool,
- nonMasqueradeCIDR string,
- keepTerminatedPodVolumes bool,
- nodeLabels map[string]string,
- seccompProfileRoot string,
- bootstrapCheckpointPath string,
- nodeStatusMaxImages int32) (Bootstrap, error)
- // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
- // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
- // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
- type Dependencies struct {
- Options []Option
- // Injected Dependencies
- Auth server.AuthInterface
- CAdvisorInterface cadvisor.Interface
- Cloud cloudprovider.Interface
- ContainerManager cm.ContainerManager
- DockerClientConfig *dockershim.ClientConfig
- EventClient v1core.EventsGetter
- HeartbeatClient clientset.Interface
- OnHeartbeatFailure func()
- KubeClient clientset.Interface
- Mounter mount.Interface
- OOMAdjuster *oom.OOMAdjuster
- OSInterface kubecontainer.OSInterface
- PodConfig *config.PodConfig
- Recorder record.EventRecorder
- Subpather subpath.Interface
- VolumePlugins []volume.VolumePlugin
- DynamicPluginProber volume.DynamicPluginProber
- TLSOptions *server.TLSOptions
- KubeletConfigController *kubeletconfig.Controller
- }
- // makePodSourceConfig creates a config.PodConfig from the given
- // KubeletConfiguration or returns an error.
- func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
- manifestURLHeader := make(http.Header)
- if len(kubeCfg.StaticPodURLHeader) > 0 {
- for k, v := range kubeCfg.StaticPodURLHeader {
- for i := range v {
- manifestURLHeader.Add(k, v[i])
- }
- }
- }
- // source of all configuration
- cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
- // define file config source
- if kubeCfg.StaticPodPath != "" {
- klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
- config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
- }
- // define url config source
- if kubeCfg.StaticPodURL != "" {
- klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
- config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
- }
- // Restore from the checkpoint path
- // NOTE: This MUST happen before creating the apiserver source
- // below, or the checkpoint would override the source of truth.
- var updatechannel chan<- interface{}
- if bootstrapCheckpointPath != "" {
- klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
- updatechannel = cfg.Channel(kubetypes.ApiserverSource)
- err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
- if err != nil {
- return nil, err
- }
- }
- if kubeDeps.KubeClient != nil {
- klog.Infof("Watching apiserver")
- if updatechannel == nil {
- updatechannel = cfg.Channel(kubetypes.ApiserverSource)
- }
- config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
- }
- return cfg, nil
- }
- func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
- rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
- if err != nil {
- return nil, nil, err
- }
- is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
- if err != nil {
- return nil, nil, err
- }
- return rs, is, err
- }
- // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
- // No initialization of Kubelet and its modules should happen here.
- func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
- kubeDeps *Dependencies,
- crOptions *config.ContainerRuntimeOptions,
- containerRuntime string,
- runtimeCgroups string,
- hostnameOverride string,
- nodeIP string,
- providerID string,
- cloudProvider string,
- certDirectory string,
- rootDirectory string,
- registerNode bool,
- registerWithTaints []api.Taint,
- allowedUnsafeSysctls []string,
- remoteRuntimeEndpoint string,
- remoteImageEndpoint string,
- experimentalMounterPath string,
- experimentalKernelMemcgNotification bool,
- experimentalCheckNodeCapabilitiesBeforeMount bool,
- experimentalNodeAllocatableIgnoreEvictionThreshold bool,
- minimumGCAge metav1.Duration,
- maxPerPodContainerCount int32,
- maxContainerCount int32,
- masterServiceNamespace string,
- registerSchedulable bool,
- nonMasqueradeCIDR string,
- keepTerminatedPodVolumes bool,
- nodeLabels map[string]string,
- seccompProfileRoot string,
- bootstrapCheckpointPath string,
- nodeStatusMaxImages int32) (*Kubelet, error) {
- if rootDirectory == "" {
- return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
- }
- if kubeCfg.SyncFrequency.Duration <= 0 {
- return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
- }
- if kubeCfg.MakeIPTablesUtilChains {
- if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
- return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
- }
- if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
- return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
- }
- if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
- return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
- }
- }
- hostname, err := nodeutil.GetHostname(hostnameOverride)
- if err != nil {
- return nil, err
- }
- // Query the cloud provider for our node name, default to hostname
- nodeName := types.NodeName(hostname)
- if kubeDeps.Cloud != nil {
- var err error
- instances, ok := kubeDeps.Cloud.Instances()
- if !ok {
- return nil, fmt.Errorf("failed to get instances from cloud provider")
- }
- nodeName, err = instances.CurrentNodeName(context.TODO(), hostname)
- if err != nil {
- return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
- }
- klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
- }
- if kubeDeps.PodConfig == nil {
- var err error
- kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
- if err != nil {
- return nil, err
- }
- }
- containerGCPolicy := kubecontainer.ContainerGCPolicy{
- MinAge: minimumGCAge.Duration,
- MaxPerPodContainer: int(maxPerPodContainerCount),
- MaxContainers: int(maxContainerCount),
- }
- daemonEndpoints := &v1.NodeDaemonEndpoints{
- KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
- }
- imageGCPolicy := images.ImageGCPolicy{
- MinAge: kubeCfg.ImageMinimumGCAge.Duration,
- HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
- LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
- }
- enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
- if experimentalNodeAllocatableIgnoreEvictionThreshold {
- // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
- enforceNodeAllocatable = []string{}
- }
- thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
- if err != nil {
- return nil, err
- }
- evictionConfig := eviction.Config{
- PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
- MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
- Thresholds: thresholds,
- KernelMemcgNotification: experimentalKernelMemcgNotification,
- PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
- }
- serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
- if kubeDeps.KubeClient != nil {
- serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
- r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
- go r.Run(wait.NeverStop)
- }
- serviceLister := corelisters.NewServiceLister(serviceIndexer)
- nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
- if kubeDeps.KubeClient != nil {
- fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
- nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
- r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
- go r.Run(wait.NeverStop)
- }
- nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
- // TODO: get the real node object of ourself,
- // and use the real node name and UID.
- // TODO: what is namespace for node?
- nodeRef := &v1.ObjectReference{
- Kind: "Node",
- Name: string(nodeName),
- UID: types.UID(nodeName),
- Namespace: "",
- }
- containerRefManager := kubecontainer.NewRefManager()
- oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
- clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
- for _, ipEntry := range kubeCfg.ClusterDNS {
- ip := net.ParseIP(ipEntry)
- if ip == nil {
- klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
- } else {
- clusterDNS = append(clusterDNS, ip)
- }
- }
- httpClient := &http.Client{}
- parsedNodeIP := net.ParseIP(nodeIP)
- protocol := utilipt.ProtocolIpv4
- if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
- klog.V(0).Infof("IPv6 node IP (%s), assume IPv6 operation", nodeIP)
- protocol = utilipt.ProtocolIpv6
- }
- klet := &Kubelet{
- hostname: hostname,
- hostnameOverridden: len(hostnameOverride) > 0,
- nodeName: nodeName,
- kubeClient: kubeDeps.KubeClient,
- heartbeatClient: kubeDeps.HeartbeatClient,
- onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
- rootDirectory: rootDirectory,
- resyncInterval: kubeCfg.SyncFrequency.Duration,
- sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
- registerNode: registerNode,
- registerWithTaints: registerWithTaints,
- registerSchedulable: registerSchedulable,
- dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
- serviceLister: serviceLister,
- nodeInfo: nodeInfo,
- masterServiceNamespace: masterServiceNamespace,
- streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
- recorder: kubeDeps.Recorder,
- cadvisor: kubeDeps.CAdvisorInterface,
- cloud: kubeDeps.Cloud,
- externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
- providerID: providerID,
- nodeRef: nodeRef,
- nodeLabels: nodeLabels,
- nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
- nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
- os: kubeDeps.OSInterface,
- oomWatcher: oomWatcher,
- cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
- cgroupRoot: kubeCfg.CgroupRoot,
- mounter: kubeDeps.Mounter,
- subpather: kubeDeps.Subpather,
- maxPods: int(kubeCfg.MaxPods),
- podsPerCore: int(kubeCfg.PodsPerCore),
- syncLoopMonitor: atomic.Value{},
- daemonEndpoints: daemonEndpoints,
- containerManager: kubeDeps.ContainerManager,
- containerRuntimeName: containerRuntime,
- redirectContainerStreaming: crOptions.RedirectContainerStreaming,
- nodeIP: parsedNodeIP,
- nodeIPValidator: validateNodeIP,
- clock: clock.RealClock{},
- enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
- iptClient: utilipt.New(utilexec.New(), utildbus.New(), protocol),
- makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
- iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
- iptablesDropBit: int(kubeCfg.IPTablesDropBit),
- experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
- keepTerminatedPodVolumes: keepTerminatedPodVolumes,
- nodeStatusMaxImages: nodeStatusMaxImages,
- enablePluginsWatcher: utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher),
- }
- if klet.cloud != nil {
- klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
- }
- var secretManager secret.Manager
- var configMapManager configmap.Manager
- switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
- case kubeletconfiginternal.WatchChangeDetectionStrategy:
- secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
- configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
- case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
- secretManager = secret.NewCachingSecretManager(
- kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
- configMapManager = configmap.NewCachingConfigMapManager(
- kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
- case kubeletconfiginternal.GetChangeDetectionStrategy:
- secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
- configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
- default:
- return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
- }
- klet.secretManager = secretManager
- klet.configMapManager = configMapManager
- if klet.experimentalHostUserNamespaceDefaulting {
- klog.Infof("Experimental host user namespace defaulting is enabled.")
- }
- machineInfo, err := klet.cadvisor.MachineInfo()
- if err != nil {
- return nil, err
- }
- klet.machineInfo = machineInfo
- imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
- klet.livenessManager = proberesults.NewManager()
- klet.podCache = kubecontainer.NewCache()
- var checkpointManager checkpointmanager.CheckpointManager
- if bootstrapCheckpointPath != "" {
- checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
- }
- }
- // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
- klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
- klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
- if remoteRuntimeEndpoint != "" {
- // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
- if remoteImageEndpoint == "" {
- remoteImageEndpoint = remoteRuntimeEndpoint
- }
- }
- // TODO: These need to become arguments to a standalone docker shim.
- pluginSettings := dockershim.NetworkPluginSettings{
- HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
- NonMasqueradeCIDR: nonMasqueradeCIDR,
- PluginName: crOptions.NetworkPluginName,
- PluginConfDir: crOptions.CNIConfDir,
- PluginBinDirString: crOptions.CNIBinDir,
- MTU: int(crOptions.NetworkPluginMTU),
- }
- klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
- // if left at nil, that means it is unneeded
- var legacyLogProvider kuberuntime.LegacyLogProvider
- switch containerRuntime {
- case kubetypes.DockerContainerRuntime:
- // Create and start the CRI shim running as a grpc server.
- streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
- ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
- &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
- if err != nil {
- return nil, err
- }
- if crOptions.RedirectContainerStreaming {
- klet.criHandler = ds
- }
- // The unix socket for kubelet <-> dockershim communication.
- klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
- remoteRuntimeEndpoint,
- remoteImageEndpoint)
- klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
- server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
- if err := server.Start(); err != nil {
- return nil, err
- }
- // Create dockerLegacyService when the logging driver is not supported.
- supported, err := ds.IsCRISupportedLogDriver()
- if err != nil {
- return nil, err
- }
- if !supported {
- klet.dockerLegacyService = ds
- legacyLogProvider = ds
- }
- case kubetypes.RemoteContainerRuntime:
- // No-op.
- break
- default:
- return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
- }
- runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
- if err != nil {
- return nil, err
- }
- klet.runtimeService = runtimeService
- if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
- klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
- }
- runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
- kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
- klet.livenessManager,
- seccompProfileRoot,
- containerRefManager,
- machineInfo,
- klet,
- kubeDeps.OSInterface,
- klet,
- httpClient,
- imageBackOff,
- kubeCfg.SerializeImagePulls,
- float32(kubeCfg.RegistryPullQPS),
- int(kubeCfg.RegistryBurst),
- kubeCfg.CPUCFSQuota,
- kubeCfg.CPUCFSQuotaPeriod,
- runtimeService,
- imageService,
- kubeDeps.ContainerManager.InternalContainerLifecycle(),
- legacyLogProvider,
- klet.runtimeClassManager,
- )
- if err != nil {
- return nil, err
- }
- klet.containerRuntime = runtime
- klet.streamingRuntime = runtime
- klet.runner = runtime
- runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
- if err != nil {
- return nil, err
- }
- klet.runtimeCache = runtimeCache
- if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
- klet.StatsProvider = stats.NewCadvisorStatsProvider(
- klet.cadvisor,
- klet.resourceAnalyzer,
- klet.podManager,
- klet.runtimeCache,
- klet.containerRuntime,
- klet.statusManager)
- } else {
- klet.StatsProvider = stats.NewCRIStatsProvider(
- klet.cadvisor,
- klet.resourceAnalyzer,
- klet.podManager,
- klet.runtimeCache,
- runtimeService,
- imageService,
- stats.NewLogMetricsService(),
- kubecontainer.RealOS{})
- }
- klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
- klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
- klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
- if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
- klog.Errorf("Pod CIDR update failed %v", err)
- }
- // setup containerGC
- containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
- if err != nil {
- return nil, err
- }
- klet.containerGC = containerGC
- klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
- // setup imageManager
- imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize image manager: %v", err)
- }
- klet.imageManager = imageManager
- if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
- // setup containerLogManager for CRI container runtime
- containerLogManager, err := logs.NewContainerLogManager(
- klet.runtimeService,
- kubeCfg.ContainerLogMaxSize,
- int(kubeCfg.ContainerLogMaxFiles),
- )
- if err != nil {
- return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
- }
- klet.containerLogManager = containerLogManager
- } else {
- klet.containerLogManager = logs.NewStubContainerLogManager()
- }
- if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
- klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
- }
- kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
- cert := klet.serverCertificateManager.Current()
- if cert == nil {
- return nil, fmt.Errorf("no serving certificate available for the kubelet")
- }
- return cert, nil
- }
- }
- klet.probeManager = prober.NewManager(
- klet.statusManager,
- klet.livenessManager,
- klet.runner,
- containerRefManager,
- kubeDeps.Recorder)
- tokenManager := token.NewManager(kubeDeps.KubeClient)
- // NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
- // which affects node ready status. This function must be called before Kubelet is initialized so that the Node
- // ReadyState is accurate with the storage state.
- klet.volumePluginMgr, err =
- NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
- if err != nil {
- return nil, err
- }
- if klet.enablePluginsWatcher {
- klet.pluginManager = pluginmanager.NewPluginManager(
- klet.getPluginsRegistrationDir(), /* sockDir */
- klet.getPluginsDir(), /* deprecatedSockDir */
- kubeDeps.Recorder,
- )
- }
- // If the experimentalMounterPathFlag is set, we do not want to
- // check node capabilities since the mount path is not the default
- if len(experimentalMounterPath) != 0 {
- experimentalCheckNodeCapabilitiesBeforeMount = false
- // Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
- // so that service name could be resolved
- klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
- }
- // setup volumeManager
- klet.volumeManager = volumemanager.NewVolumeManager(
- kubeCfg.EnableControllerAttachDetach,
- nodeName,
- klet.podManager,
- klet.statusManager,
- klet.kubeClient,
- klet.volumePluginMgr,
- klet.containerRuntime,
- kubeDeps.Mounter,
- klet.getPodsDir(),
- kubeDeps.Recorder,
- experimentalCheckNodeCapabilitiesBeforeMount,
- keepTerminatedPodVolumes)
- klet.reasonCache = NewReasonCache()
- klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
- klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
- klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
- klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
- // setup eviction manager
- evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
- klet.evictionManager = evictionManager
- klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
- if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
- // add sysctl admission
- runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
- if err != nil {
- return nil, err
- }
- // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec.
- // Hence, we concatenate those two lists.
- safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
- sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
- if err != nil {
- return nil, err
- }
- klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
- klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
- }
- // enable active deadline handler
- activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
- if err != nil {
- return nil, err
- }
- klet.AddPodSyncLoopHandler(activeDeadlineHandler)
- klet.AddPodSyncHandler(activeDeadlineHandler)
- criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
- klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
- // apply functional Option's
- for _, opt := range kubeDeps.Options {
- opt(klet)
- }
- klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
- klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
- klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
- if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
- klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, kubeCfg.NodeStatusUpdateFrequency.Duration, klet.onRepeatedHeartbeatFailure)
- }
- klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
- // Finally, put the most recent version of the config on the Kubelet, so
- // people can see how it was configured.
- klet.kubeletConfiguration = *kubeCfg
- // Generating the status funcs should be the last thing we do,
- // since this relies on the rest of the Kubelet having been constructed.
- klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
- return klet, nil
- }
- type serviceLister interface {
- List(labels.Selector) ([]*v1.Service, error)
- }
- // Kubelet is the main kubelet implementation.
- type Kubelet struct {
- kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
- // hostname is the hostname the kubelet detected or was given via flag/config
- hostname string
- // hostnameOverridden indicates the hostname was overridden via flag/config
- hostnameOverridden bool
- nodeName types.NodeName
- runtimeCache kubecontainer.RuntimeCache
- kubeClient clientset.Interface
- heartbeatClient clientset.Interface
- iptClient utilipt.Interface
- rootDirectory string
- lastObservedNodeAddressesMux sync.RWMutex
- lastObservedNodeAddresses []v1.NodeAddress
- // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
- onRepeatedHeartbeatFailure func()
- // podWorkers handle syncing Pods in response to events.
- podWorkers PodWorkers
- // resyncInterval is the interval between periodic full reconciliations of
- // pods on this node.
- resyncInterval time.Duration
- // sourcesReady records the sources seen by the kubelet, it is thread-safe.
- sourcesReady config.SourcesReady
- // podManager is a facade that abstracts away the various sources of pods
- // this Kubelet services.
- podManager kubepod.Manager
- // Needed to observe and respond to situations that could impact node stability
- evictionManager eviction.Manager
- // Optional, defaults to /logs/ from /var/log
- logServer http.Handler
- // Optional, defaults to simple Docker implementation
- runner kubecontainer.ContainerCommandRunner
- // cAdvisor used for container information.
- cadvisor cadvisor.Interface
- // Set to true to have the node register itself with the apiserver.
- registerNode bool
- // List of taints to add to a node object when the kubelet registers itself.
- registerWithTaints []api.Taint
- // Set to true to have the node register itself as schedulable.
- registerSchedulable bool
- // for internal book keeping; access only from within registerWithApiserver
- registrationCompleted bool
- // dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
- dnsConfigurer *dns.Configurer
- // masterServiceNamespace is the namespace that the master service is exposed in.
- masterServiceNamespace string
- // serviceLister knows how to list services
- serviceLister serviceLister
- // nodeInfo knows how to get information about the node for this kubelet.
- nodeInfo predicates.NodeInfo
- // a list of node labels to register
- nodeLabels map[string]string
- // Last timestamp when runtime responded on ping.
- // Mutex is used to protect this value.
- runtimeState *runtimeState
- // Volume plugins.
- volumePluginMgr *volume.VolumePluginMgr
- // Handles container probing.
- probeManager prober.Manager
- // Manages container health check results.
- livenessManager proberesults.Manager
- // How long to keep idle streaming command execution/port forwarding
- // connections open before terminating them
- streamingConnectionIdleTimeout time.Duration
- // The EventRecorder to use
- recorder record.EventRecorder
- // Policy for handling garbage collection of dead containers.
- containerGC kubecontainer.ContainerGC
- // Manager for image garbage collection.
- imageManager images.ImageGCManager
- // Manager for container logs.
- containerLogManager logs.ContainerLogManager
- // Secret manager.
- secretManager secret.Manager
- // ConfigMap manager.
- configMapManager configmap.Manager
- // Cached MachineInfo returned by cadvisor.
- machineInfo *cadvisorapi.MachineInfo
- // Handles certificate rotations.
- serverCertificateManager certificate.Manager
- // Syncs pods statuses with apiserver; also used as a cache of statuses.
- statusManager status.Manager
- // VolumeManager runs a set of asynchronous loops that figure out which
- // volumes need to be attached/mounted/unmounted/detached based on the pods
- // scheduled on this node and makes it so.
- volumeManager volumemanager.VolumeManager
- // Cloud provider interface.
- cloud cloudprovider.Interface
- // Handles requests to cloud provider with timeout
- cloudResourceSyncManager cloudresource.SyncManager
- // Indicates that the node initialization happens in an external cloud controller
- externalCloudProvider bool
- // Reference to this node.
- nodeRef *v1.ObjectReference
- // The name of the container runtime
- containerRuntimeName string
- // redirectContainerStreaming enables container streaming redirect.
- redirectContainerStreaming bool
- // Container runtime.
- containerRuntime kubecontainer.Runtime
- // Streaming runtime handles container streaming.
- streamingRuntime kubecontainer.StreamingRuntime
- // Container runtime service (needed by container runtime Start()).
- // TODO(CD): try to make this available without holding a reference in this
- // struct. For example, by adding a getter to generic runtime.
- runtimeService internalapi.RuntimeService
- // reasonCache caches the failure reason of the last creation of all containers, which is
- // used for generating ContainerStatus.
- reasonCache *ReasonCache
- // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
- // feature is not enabled, it is also the frequency that kubelet posts node status to master.
- // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
- // in nodecontroller. There are several constraints:
- // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
- // N means number of retries allowed for kubelet to post node status. It is pointless
- // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
- // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
- // The constant must be less than podEvictionTimeout.
- // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
- // status. Kubelet may fail to update node status reliably if the value is too small,
- // as it takes time to gather all necessary node information.
- nodeStatusUpdateFrequency time.Duration
- // nodeStatusReportFrequency is the frequency that kubelet posts node
- // status to master. It is only used when node lease feature is enabled.
- nodeStatusReportFrequency time.Duration
- // lastStatusReportTime is the time when node status was last reported.
- lastStatusReportTime time.Time
- // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
- // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
- syncNodeStatusMux sync.Mutex
- // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
- // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
- updatePodCIDRMux sync.Mutex
- // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
- // This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
- updateRuntimeMux sync.Mutex
- // nodeLeaseController claims and renews the node lease for this Kubelet
- nodeLeaseController nodelease.Controller
- // Generates pod events.
- pleg pleg.PodLifecycleEventGenerator
- // Store kubecontainer.PodStatus for all pods.
- podCache kubecontainer.Cache
- // os is a facade for various syscalls that need to be mocked during testing.
- os kubecontainer.OSInterface
- // Watcher of out of memory events.
- oomWatcher oomwatcher.Watcher
- // Monitor resource usage
- resourceAnalyzer serverstats.ResourceAnalyzer
- // Whether or not we should have the QOS cgroup hierarchy for resource management
- cgroupsPerQOS bool
- // If non-empty, pass this to the container runtime as the root cgroup.
- cgroupRoot string
- // Mounter to use for volumes.
- mounter mount.Interface
- // subpather to execute subpath actions
- subpather subpath.Interface
- // Manager of non-Runtime containers.
- containerManager cm.ContainerManager
- // Maximum Number of Pods which can be run by this Kubelet
- maxPods int
- // Monitor Kubelet's sync loop
- syncLoopMonitor atomic.Value
- // Container restart Backoff
- backOff *flowcontrol.Backoff
- // Channel for sending pods to kill.
- podKillingCh chan *kubecontainer.PodPair
- // Information about the ports which are opened by daemons on Node running this Kubelet server.
- daemonEndpoints *v1.NodeDaemonEndpoints
- // A queue used to trigger pod workers.
- workQueue queue.WorkQueue
- // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
- oneTimeInitializer sync.Once
- // If non-nil, use this IP address for the node
- nodeIP net.IP
- // use this function to validate the kubelet nodeIP
- nodeIPValidator func(net.IP) error
- // If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
- providerID string
- // clock is an interface that provides time related functionality in a way that makes it
- // easy to test the code.
- clock clock.Clock
- // handlers called during the tryUpdateNodeStatus cycle
- setNodeStatusFuncs []func(*v1.Node) error
- lastNodeUnschedulableLock sync.Mutex
- // maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
- lastNodeUnschedulable bool
- // TODO: think about moving this to be centralized in PodWorkers in follow-on.
- // the list of handlers to call during pod admission.
- admitHandlers lifecycle.PodAdmitHandlers
- // softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
- // run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
- // rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
- // admission rule should be applied by a softAdmitHandler.
- softAdmitHandlers lifecycle.PodAdmitHandlers
- // the list of handlers to call during pod sync loop.
- lifecycle.PodSyncLoopHandlers
- // the list of handlers to call during pod sync.
- lifecycle.PodSyncHandlers
- // the number of allowed pods per core
- podsPerCore int
- // enableControllerAttachDetach indicates the Attach/Detach controller
- // should manage attachment/detachment of volumes scheduled to this node,
- // and disable kubelet from executing any attach/detach operations
- enableControllerAttachDetach bool
- // trigger deleting containers in a pod
- containerDeletor *podContainerDeletor
- // config iptables util rules
- makeIPTablesUtilChains bool
- // The bit of the fwmark space to mark packets for SNAT.
- iptablesMasqueradeBit int
- // The bit of the fwmark space to mark packets for dropping.
- iptablesDropBit int
- // The AppArmor validator for checking whether AppArmor is supported.
- appArmorValidator apparmor.Validator
- // The handler serving CRI streaming calls (exec/attach/port-forward).
- criHandler http.Handler
- // experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
- // are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
- // or using host path volumes.
- // This should only be enabled when the container runtime is performing user remapping AND if the
- // experimental behavior is desired.
- experimentalHostUserNamespaceDefaulting bool
- // dockerLegacyService contains some legacy methods for backward compatibility.
- // It should be set only when docker is using non json-file logging driver.
- dockerLegacyService dockershim.DockerLegacyService
- // StatsProvider provides the node and the container stats.
- *stats.StatsProvider
- // This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
- // This can be useful for debugging volume related issues.
- keepTerminatedPodVolumes bool // DEPRECATED
- // pluginmanager runs a set of asynchronous loops that figure out which
- // plugins need to be registered/unregistered based on this node and makes it so.
- pluginManager pluginmanager.PluginManager
- // This flag sets a maximum number of images to report in the node status.
- nodeStatusMaxImages int32
- // This flag indicates that kubelet should start plugin watcher utility server for discovering Kubelet plugins
- enablePluginsWatcher bool
- // Handles RuntimeClass objects for the Kubelet.
- runtimeClassManager *runtimeclass.Manager
- }
- // setupDataDirs creates:
- // 1. the root directory
- // 2. the pods directory
- // 3. the plugins directory
- // 4. the pod-resources directory
- func (kl *Kubelet) setupDataDirs() error {
- kl.rootDirectory = path.Clean(kl.rootDirectory)
- pluginRegistrationDir := kl.getPluginsRegistrationDir()
- pluginsDir := kl.getPluginsDir()
- if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
- return fmt.Errorf("error creating root directory: %v", err)
- }
- if err := kl.mounter.MakeRShared(kl.getRootDir()); err != nil {
- return fmt.Errorf("error configuring root directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
- return fmt.Errorf("error creating pods directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
- return fmt.Errorf("error creating plugins directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
- return fmt.Errorf("error creating plugins registry directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
- return fmt.Errorf("error creating podresources directory: %v", err)
- }
- if selinux.SELinuxEnabled() {
- err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
- if err != nil {
- klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginRegistrationDir, err)
- }
- err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
- if err != nil {
- klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", pluginsDir, err)
- }
- }
- return nil
- }
- // StartGarbageCollection starts garbage collection threads.
- func (kl *Kubelet) StartGarbageCollection() {
- loggedContainerGCFailure := false
- go wait.Until(func() {
- if err := kl.containerGC.GarbageCollect(); err != nil {
- klog.Errorf("Container garbage collection failed: %v", err)
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
- loggedContainerGCFailure = true
- } else {
- var vLevel klog.Level = 4
- if loggedContainerGCFailure {
- vLevel = 1
- loggedContainerGCFailure = false
- }
- klog.V(vLevel).Infof("Container garbage collection succeeded")
- }
- }, ContainerGCPeriod, wait.NeverStop)
- // when the high threshold is set to 100, stub the image GC manager
- if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
- klog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
- return
- }
- prevImageGCFailed := false
- go wait.Until(func() {
- if err := kl.imageManager.GarbageCollect(); err != nil {
- if prevImageGCFailed {
- klog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
- // Only create an event for repeated failures
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
- } else {
- klog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
- }
- prevImageGCFailed = true
- } else {
- var vLevel klog.Level = 4
- if prevImageGCFailed {
- vLevel = 1
- prevImageGCFailed = false
- }
- klog.V(vLevel).Infof("Image garbage collection succeeded")
- }
- }, ImageGCPeriod, wait.NeverStop)
- }
- // initializeModules will initialize internal modules that do not require the container runtime to be up.
- // Note that the modules here must not depend on modules that are not initialized here.
- func (kl *Kubelet) initializeModules() error {
- // Prometheus metrics.
- metrics.Register(
- kl.runtimeCache,
- collectors.NewVolumeStatsCollector(kl),
- collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
- )
- metrics.SetNodeName(kl.nodeName)
- servermetrics.Register()
- // Setup filesystem directories.
- if err := kl.setupDataDirs(); err != nil {
- return err
- }
- // If the container logs directory does not exist, create it.
- if _, err := os.Stat(ContainerLogsDir); err != nil {
- if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
- klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
- }
- }
- // Start the image manager.
- kl.imageManager.Start()
- // Start the certificate manager if it was enabled.
- if kl.serverCertificateManager != nil {
- kl.serverCertificateManager.Start()
- }
- // Start out of memory watcher.
- if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
- return fmt.Errorf("Failed to start OOM watcher %v", err)
- }
- // Start resource analyzer
- kl.resourceAnalyzer.Start()
- return nil
- }
- // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
- func (kl *Kubelet) initializeRuntimeDependentModules() {
- if err := kl.cadvisor.Start(); err != nil {
- // Fail kubelet and rely on the babysitter to retry starting kubelet.
- // TODO(random-liu): Add backoff logic in the babysitter
- klog.Fatalf("Failed to start cAdvisor %v", err)
- }
- // trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
- // ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
- kl.StatsProvider.GetCgroupStats("/", true)
- // Start container manager.
- node, err := kl.getNodeAnyWay()
- if err != nil {
- // Fail kubelet and rely on the babysitter to retry starting kubelet.
- klog.Fatalf("Kubelet failed to get node info: %v", err)
- }
- // containerManager must start after cAdvisor because it needs filesystem capacity information
- if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
- // Fail kubelet and rely on the babysitter to retry starting kubelet.
- klog.Fatalf("Failed to start ContainerManager %v", err)
- }
- // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
- kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
- // container log manager must start after container runtime is up to retrieve information from container runtime
- // and inform container to reopen log file after log rotation.
- kl.containerLogManager.Start()
- if kl.enablePluginsWatcher {
- // Adding Registration Callback function for CSI Driver
- kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
- // Adding Registration Callback function for Device Manager
- kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
- // Start the plugin manager
- klog.V(4).Infof("starting plugin manager")
- go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
- }
- }
- // Run starts the kubelet reacting to config updates
- func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
- if kl.logServer == nil {
- kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
- }
- if kl.kubeClient == nil {
- klog.Warning("No api server defined - no node status update will be sent.")
- }
- // Start the cloud provider sync manager
- if kl.cloudResourceSyncManager != nil {
- go kl.cloudResourceSyncManager.Run(wait.NeverStop)
- }
- if err := kl.initializeModules(); err != nil {
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
- klog.Fatal(err)
- }
- // Start volume manager
- go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
- if kl.kubeClient != nil {
- // Start syncing node status immediately, this may set up things the runtime needs to run.
- go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
- go kl.fastStatusUpdateOnce()
- // start syncing lease
- if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
- go kl.nodeLeaseController.Run(wait.NeverStop)
- }
- }
- go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
- // Start loop to sync iptables util rules
- if kl.makeIPTablesUtilChains {
- go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
- }
- // Start a goroutine responsible for killing pods (that are not properly
- // handled by pod workers).
- go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
- // Start component sync loops.
- kl.statusManager.Start()
- kl.probeManager.Start()
- // Start syncing RuntimeClasses if enabled.
- if kl.runtimeClassManager != nil {
- kl.runtimeClassManager.Start(wait.NeverStop)
- }
- // Start the pod lifecycle event generator.
- kl.pleg.Start()
- kl.syncLoop(updates, kl)
- }
- // syncPod is the transaction script for the sync of a single pod.
- //
- // Arguments:
- //
- // o - the SyncPodOptions for this invocation
- //
- // The workflow is:
- // * If the pod is being created, record pod worker start latency
- // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
- // * If the pod is being seen as running for the first time, record pod
- // start latency
- // * Update the status of the pod in the status manager
- // * Kill the pod if it should not be running
- // * Create a mirror pod if the pod is a static pod, and does not
- // already have a mirror pod
- // * Create the data directories for the pod if they do not exist
- // * Wait for volumes to attach/mount
- // * Fetch the pull secrets for the pod
- // * Call the container runtime's SyncPod callback
- // * Update the traffic shaping for the pod's ingress and egress limits
- //
- // If any step of this workflow errors, the error is returned, and is repeated
- // on the next syncPod call.
- //
- // This operation writes all events that are dispatched in order to provide
- // the most accurate information possible about an error situation to aid debugging.
- // Callers should not throw an event if this operation returns an error.
- func (kl *Kubelet) syncPod(o syncPodOptions) error {
- // pull out the required options
- pod := o.pod
- mirrorPod := o.mirrorPod
- podStatus := o.podStatus
- updateType := o.updateType
- // if we want to kill a pod, do it now!
- if updateType == kubetypes.SyncPodKill {
- killPodOptions := o.killPodOptions
- if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
- return fmt.Errorf("kill pod options are required if update type is kill")
- }
- apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
- kl.statusManager.SetPodStatus(pod, apiPodStatus)
- // we kill the pod with the specified grace period since this is a termination
- if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
- // there was an error killing the pod, so we return that error directly
- utilruntime.HandleError(err)
- return err
- }
- return nil
- }
- // Latency measurements for the main workflow are relative to the
- // first time the pod was seen by the API server.
- var firstSeenTime time.Time
- if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
- firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
- }
- // Record pod worker start latency if being created
- // TODO: make pod workers record their own latencies
- if updateType == kubetypes.SyncPodCreate {
- if !firstSeenTime.IsZero() {
- // This is the first time we are syncing the pod. Record the latency
- // since kubelet first saw the pod if firstSeenTime is set.
- metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
- metrics.DeprecatedPodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
- } else {
- klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
- }
- }
- // Generate final API pod status with pod and status manager status
- apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
- // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
- // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
- // set pod IP to hostIP directly in runtime.GetPodStatus
- podStatus.IP = apiPodStatus.PodIP
- // Record the time it takes for the pod to become running.
- existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
- if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
- !firstSeenTime.IsZero() {
- metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
- metrics.DeprecatedPodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
- }
- runnable := kl.canRunPod(pod)
- if !runnable.Admit {
- // Pod is not runnable; update the Pod and Container statuses to why.
- apiPodStatus.Reason = runnable.Reason
- apiPodStatus.Message = runnable.Message
- // Waiting containers are not creating.
- const waitingReason = "Blocked"
- for _, cs := range apiPodStatus.InitContainerStatuses {
- if cs.State.Waiting != nil {
- cs.State.Waiting.Reason = waitingReason
- }
- }
- for _, cs := range apiPodStatus.ContainerStatuses {
- if cs.State.Waiting != nil {
- cs.State.Waiting.Reason = waitingReason
- }
- }
- }
- // Update status in the status manager
- kl.statusManager.SetPodStatus(pod, apiPodStatus)
- // Kill pod if it should not be running
- if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
- var syncErr error
- if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
- syncErr = fmt.Errorf("error killing pod: %v", err)
- utilruntime.HandleError(syncErr)
- } else {
- if !runnable.Admit {
- // There was no error killing the pod, but the pod cannot be run.
- // Return an error to signal that the sync loop should back off.
- syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
- }
- }
- return syncErr
- }
- // If the network plugin is not ready, only start the pod if it uses the host network
- if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
- return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
- }
- // Create Cgroups for the pod and apply resource parameters
- // to them if cgroups-per-qos flag is enabled.
- pcm := kl.containerManager.NewPodContainerManager()
- // If pod has already been terminated then we need not create
- // or update the pod's cgroup
- if !kl.podIsTerminated(pod) {
- // When the kubelet is restarted with the cgroups-per-qos
- // flag enabled, all the pod's running containers
- // should be killed intermittently and brought back up
- // under the qos cgroup hierarchy.
- // Check if this is the pod's first sync
- firstSync := true
- for _, containerStatus := range apiPodStatus.ContainerStatuses {
- if containerStatus.State.Running != nil {
- firstSync = false
- break
- }
- }
- // Don't kill containers in pod if pod's cgroups already
- // exists or the pod is running for the first time
- podKilled := false
- if !pcm.Exists(pod) && !firstSync {
- if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
- podKilled = true
- }
- }
- // Create and Update pod's Cgroups
- // Don't create cgroups for run once pod if it was killed above
- // The current policy is not to restart the run once pods when
- // the kubelet is restarted with the new flag as run once pods are
- // expected to run only once and if the kubelet is restarted then
- // they are not expected to run again.
- // We don't create and apply updates to cgroup if its a run once pod and was killed above
- if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
- if !pcm.Exists(pod) {
- if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
- klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
- }
- if err := pcm.EnsureExists(pod); err != nil {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
- return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
- }
- }
- }
- }
- // Create Mirror Pod for Static Pod if it doesn't already exist
- if kubepod.IsStaticPod(pod) {
- podFullName := kubecontainer.GetPodFullName(pod)
- deleted := false
- if mirrorPod != nil {
- if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
- // The mirror pod is semantically different from the static pod. Remove
- // it. The mirror pod will get recreated later.
- klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
- var err error
- deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
- if deleted {
- klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
- } else if err != nil {
- klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
- }
- }
- }
- if mirrorPod == nil || deleted {
- node, err := kl.GetNode()
- if err != nil || node.DeletionTimestamp != nil {
- klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
- } else {
- klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
- if err := kl.podManager.CreateMirrorPod(pod); err != nil {
- klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
- }
- }
- }
- }
- // Make data directories for the pod
- if err := kl.makePodDataDirs(pod); err != nil {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
- klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
- return err
- }
- // Volume manager will not mount volumes for terminated pods
- if !kl.podIsTerminated(pod) {
- // Wait for volumes to attach/mount
- if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
- klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
- return err
- }
- }
- // Fetch the pull secrets for the pod
- pullSecrets := kl.getPullSecretsForPod(pod)
- // Call the container runtime's SyncPod callback
- result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
- kl.reasonCache.Update(pod.UID, result)
- if err := result.Error(); err != nil {
- // Do not return error if the only failures were pods in backoff
- for _, r := range result.SyncResults {
- if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
- // Do not record an event here, as we keep all event logging for sync pod failures
- // local to container runtime so we get better errors
- return err
- }
- }
- return nil
- }
- return nil
- }
- // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
- // * pod whose work is ready.
- // * internal modules that request sync of a pod.
- func (kl *Kubelet) getPodsToSync() []*v1.Pod {
- allPods := kl.podManager.GetPods()
- podUIDs := kl.workQueue.GetWork()
- podUIDSet := sets.NewString()
- for _, podUID := range podUIDs {
- podUIDSet.Insert(string(podUID))
- }
- var podsToSync []*v1.Pod
- for _, pod := range allPods {
- if podUIDSet.Has(string(pod.UID)) {
- // The work of the pod is ready
- podsToSync = append(podsToSync, pod)
- continue
- }
- for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
- if podSyncLoopHandler.ShouldSync(pod) {
- podsToSync = append(podsToSync, pod)
- break
- }
- }
- }
- return podsToSync
- }
- // deletePod deletes the pod from the internal state of the kubelet by:
- // 1. stopping the associated pod worker asynchronously
- // 2. signaling to kill the pod by sending on the podKillingCh channel
- //
- // deletePod returns an error if not all sources are ready or the pod is not
- // found in the runtime cache.
- func (kl *Kubelet) deletePod(pod *v1.Pod) error {
- if pod == nil {
- return fmt.Errorf("deletePod does not allow nil pod")
- }
- if !kl.sourcesReady.AllReady() {
- // If the sources aren't ready, skip deletion, as we may accidentally delete pods
- // for sources that haven't reported yet.
- return fmt.Errorf("skipping delete because sources aren't ready yet")
- }
- kl.podWorkers.ForgetWorker(pod.UID)
- // Runtime cache may not have been updated to with the pod, but it's okay
- // because the periodic cleanup routine will attempt to delete again later.
- runningPods, err := kl.runtimeCache.GetPods()
- if err != nil {
- return fmt.Errorf("error listing containers: %v", err)
- }
- runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
- if runningPod.IsEmpty() {
- return fmt.Errorf("pod not found")
- }
- podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
- kl.podKillingCh <- &podPair
- // TODO: delete the mirror pod here?
- // We leave the volume/directory cleanup to the periodic cleanup routine.
- return nil
- }
- // rejectPod records an event about the pod with the given reason and message,
- // and updates the pod to the failed phase in the status manage.
- func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
- kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
- kl.statusManager.SetPodStatus(pod, v1.PodStatus{
- Phase: v1.PodFailed,
- Reason: reason,
- Message: "Pod " + message})
- }
- // canAdmitPod determines if a pod can be admitted, and gives a reason if it
- // cannot. "pod" is new pod, while "pods" are all admitted pods
- // The function returns a boolean value indicating whether the pod
- // can be admitted, a brief single-word reason and a message explaining why
- // the pod cannot be admitted.
- func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
- // the kubelet will invoke each pod admit handler in sequence
- // if any handler rejects, the pod is rejected.
- // TODO: move out of disk check into a pod admitter
- // TODO: out of resource eviction should have a pod admitter call-out
- attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
- for _, podAdmitHandler := range kl.admitHandlers {
- if result := podAdmitHandler.Admit(attrs); !result.Admit {
- return false, result.Reason, result.Message
- }
- }
- return true, "", ""
- }
- func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
- attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
- // Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
- attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
- for _, handler := range kl.softAdmitHandlers {
- if result := handler.Admit(attrs); !result.Admit {
- return result
- }
- }
- return lifecycle.PodAdmitResult{Admit: true}
- }
- // syncLoop is the main loop for processing changes. It watches for changes from
- // three channels (file, apiserver, and http) and creates a union of them. For
- // any new change seen, will run a sync against desired state and running state. If
- // no changes are seen to the configuration, will synchronize the last known desired
- // state every sync-frequency seconds. Never returns.
- func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
- klog.Info("Starting kubelet main sync loop.")
- // The syncTicker wakes up kubelet to checks if there are any pod workers
- // that need to be sync'd. A one-second period is sufficient because the
- // sync interval is defaulted to 10s.
- syncTicker := time.NewTicker(time.Second)
- defer syncTicker.Stop()
- housekeepingTicker := time.NewTicker(housekeepingPeriod)
- defer housekeepingTicker.Stop()
- plegCh := kl.pleg.Watch()
- const (
- base = 100 * time.Millisecond
- max = 5 * time.Second
- factor = 2
- )
- duration := base
- for {
- if err := kl.runtimeState.runtimeErrors(); err != nil {
- klog.Infof("skipping pod synchronization - %v", err)
- // exponential backoff
- time.Sleep(duration)
- duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
- continue
- }
- // reset backoff if we have a success
- duration = base
- kl.syncLoopMonitor.Store(kl.clock.Now())
- if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
- break
- }
- kl.syncLoopMonitor.Store(kl.clock.Now())
- }
- }
- // syncLoopIteration reads from various channels and dispatches pods to the
- // given handler.
- //
- // Arguments:
- // 1. configCh: a channel to read config events from
- // 2. handler: the SyncHandler to dispatch pods to
- // 3. syncCh: a channel to read periodic sync events from
- // 4. housekeepingCh: a channel to read housekeeping events from
- // 5. plegCh: a channel to read PLEG updates from
- //
- // Events are also read from the kubelet liveness manager's update channel.
- //
- // The workflow is to read from one of the channels, handle that event, and
- // update the timestamp in the sync loop monitor.
- //
- // Here is an appropriate place to note that despite the syntactical
- // similarity to the switch statement, the case statements in a select are
- // evaluated in a pseudorandom order if there are multiple channels ready to
- // read from when the select is evaluated. In other words, case statements
- // are evaluated in random order, and you can not assume that the case
- // statements evaluate in order if multiple channels have events.
- //
- // With that in mind, in truly no particular order, the different channels
- // are handled as follows:
- //
- // * configCh: dispatch the pods for the config change to the appropriate
- // handler callback for the event type
- // * plegCh: update the runtime cache; sync pod
- // * syncCh: sync all pods waiting for sync
- // * housekeepingCh: trigger cleanup of pods
- // * liveness manager: sync pods that have failed or in which one or more
- // containers have failed liveness checks
- func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
- syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
- select {
- case u, open := <-configCh:
- // Update from a config source; dispatch it to the right handler
- // callback.
- if !open {
- klog.Errorf("Update channel is closed. Exiting the sync loop.")
- return false
- }
- switch u.Op {
- case kubetypes.ADD:
- klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
- // After restarting, kubelet will get all existing pods through
- // ADD as if they are new pods. These pods will then go through the
- // admission process and *may* be rejected. This can be resolved
- // once we have checkpointing.
- handler.HandlePodAdditions(u.Pods)
- case kubetypes.UPDATE:
- klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
- handler.HandlePodUpdates(u.Pods)
- case kubetypes.REMOVE:
- klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
- handler.HandlePodRemoves(u.Pods)
- case kubetypes.RECONCILE:
- klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
- handler.HandlePodReconcile(u.Pods)
- case kubetypes.DELETE:
- klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
- // DELETE is treated as a UPDATE because of graceful deletion.
- handler.HandlePodUpdates(u.Pods)
- case kubetypes.RESTORE:
- klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
- // These are pods restored from the checkpoint. Treat them as new
- // pods.
- handler.HandlePodAdditions(u.Pods)
- case kubetypes.SET:
- // TODO: Do we want to support this?
- klog.Errorf("Kubelet does not support snapshot update")
- }
- if u.Op != kubetypes.RESTORE {
- // If the update type is RESTORE, it means that the update is from
- // the pod checkpoints and may be incomplete. Do not mark the
- // source as ready.
- // Mark the source ready after receiving at least one update from the
- // source. Once all the sources are marked ready, various cleanup
- // routines will start reclaiming resources. It is important that this
- // takes place only after kubelet calls the update handler to process
- // the update to ensure the internal pod cache is up-to-date.
- kl.sourcesReady.AddSource(u.Source)
- }
- case e := <-plegCh:
- if isSyncPodWorthy(e) {
- // PLEG event for a pod; sync it.
- if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
- klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
- handler.HandlePodSyncs([]*v1.Pod{pod})
- } else {
- // If the pod no longer exists, ignore the event.
- klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
- }
- }
- if e.Type == pleg.ContainerDied {
- if containerID, ok := e.Data.(string); ok {
- kl.cleanUpContainersInPod(e.ID, containerID)
- }
- }
- case <-syncCh:
- // Sync pods waiting for sync
- podsToSync := kl.getPodsToSync()
- if len(podsToSync) == 0 {
- break
- }
- klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
- handler.HandlePodSyncs(podsToSync)
- case update := <-kl.livenessManager.Updates():
- if update.Result == proberesults.Failure {
- // The liveness manager detected a failure; sync the pod.
- // We should not use the pod from livenessManager, because it is never updated after
- // initialization.
- pod, ok := kl.podManager.GetPodByUID(update.PodUID)
- if !ok {
- // If the pod no longer exists, ignore the update.
- klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
- break
- }
- klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
- handler.HandlePodSyncs([]*v1.Pod{pod})
- }
- case <-housekeepingCh:
- if !kl.sourcesReady.AllReady() {
- // If the sources aren't ready or volume manager has not yet synced the states,
- // skip housekeeping, as we may accidentally delete pods from unready sources.
- klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
- } else {
- klog.V(4).Infof("SyncLoop (housekeeping)")
- if err := handler.HandlePodCleanups(); err != nil {
- klog.Errorf("Failed cleaning pods: %v", err)
- }
- }
- }
- return true
- }
- // dispatchWork starts the asynchronous sync of the pod in a pod worker.
- // If the pod is terminated, dispatchWork
- func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
- if kl.podIsTerminated(pod) {
- if pod.DeletionTimestamp != nil {
- // If the pod is in a terminated state, there is no pod worker to
- // handle the work item. Check if the DeletionTimestamp has been
- // set, and force a status update to trigger a pod deletion request
- // to the apiserver.
- kl.statusManager.TerminatePod(pod)
- }
- return
- }
- // Run the sync in an async worker.
- kl.podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- MirrorPod: mirrorPod,
- UpdateType: syncType,
- OnCompleteFunc: func(err error) {
- if err != nil {
- metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
- metrics.DeprecatedPodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
- }
- },
- })
- // Note the number of containers for new pods.
- if syncType == kubetypes.SyncPodCreate {
- metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
- }
- }
- // TODO: handle mirror pods in a separate component (issue #17251)
- func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
- // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
- // corresponding static pod. Send update to the pod worker if the static
- // pod exists.
- if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
- kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
- }
- }
- // HandlePodAdditions is the callback in SyncHandler for pods being added from
- // a config source.
- func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
- start := kl.clock.Now()
- sort.Sort(sliceutils.PodsByCreationTime(pods))
- for _, pod := range pods {
- // Responsible for checking limits in resolv.conf
- if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
- kl.dnsConfigurer.CheckLimitsForResolvConf()
- }
- existingPods := kl.podManager.GetPods()
- // Always add the pod to the pod manager. Kubelet relies on the pod
- // manager as the source of truth for the desired state. If a pod does
- // not exist in the pod manager, it means that it has been deleted in
- // the apiserver and no action (other than cleanup) is required.
- kl.podManager.AddPod(pod)
- if kubepod.IsMirrorPod(pod) {
- kl.handleMirrorPod(pod, start)
- continue
- }
- if !kl.podIsTerminated(pod) {
- // Only go through the admission process if the pod is not
- // terminated.
- // We failed pods that we rejected, so activePods include all admitted
- // pods that are alive.
- activePods := kl.filterOutTerminatedPods(existingPods)
- // Check if we can admit the pod; if not, reject it.
- if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
- kl.rejectPod(pod, reason, message)
- continue
- }
- }
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
- kl.probeManager.AddPod(pod)
- }
- }
- // HandlePodUpdates is the callback in the SyncHandler interface for pods
- // being updated from a config source.
- func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- // Responsible for checking limits in resolv.conf
- if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
- kl.dnsConfigurer.CheckLimitsForResolvConf()
- }
- kl.podManager.UpdatePod(pod)
- if kubepod.IsMirrorPod(pod) {
- kl.handleMirrorPod(pod, start)
- continue
- }
- // TODO: Evaluate if we need to validate and reject updates.
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
- }
- }
- // HandlePodRemoves is the callback in the SyncHandler interface for pods
- // being removed from a config source.
- func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- kl.podManager.DeletePod(pod)
- if kubepod.IsMirrorPod(pod) {
- kl.handleMirrorPod(pod, start)
- continue
- }
- // Deletion is allowed to fail because the periodic cleanup routine
- // will trigger deletion again.
- if err := kl.deletePod(pod); err != nil {
- klog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
- }
- kl.probeManager.RemovePod(pod)
- }
- }
- // HandlePodReconcile is the callback in the SyncHandler interface for pods
- // that should be reconciled.
- func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- // Update the pod in pod manager, status manager will do periodically reconcile according
- // to the pod manager.
- kl.podManager.UpdatePod(pod)
- // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
- if status.NeedToReconcilePodReadiness(pod) {
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
- }
- // After an evicted pod is synced, all dead containers in the pod can be removed.
- if eviction.PodIsEvicted(pod.Status) {
- if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
- kl.containerDeletor.deleteContainersInPod("", podStatus, true)
- }
- }
- }
- }
- // HandlePodSyncs is the callback in the syncHandler interface for pods
- // that should be dispatched to pod workers for sync.
- func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
- }
- }
- // LatestLoopEntryTime returns the last time in the sync loop monitor.
- func (kl *Kubelet) LatestLoopEntryTime() time.Time {
- val := kl.syncLoopMonitor.Load()
- if val == nil {
- return time.Time{}
- }
- return val.(time.Time)
- }
- // updateRuntimeUp calls the container runtime status callback, initializing
- // the runtime dependent modules when the container runtime first comes up,
- // and returns an error if the status check fails. If the status check is OK,
- // update the container runtime uptime in the kubelet runtimeState.
- func (kl *Kubelet) updateRuntimeUp() {
- kl.updateRuntimeMux.Lock()
- defer kl.updateRuntimeMux.Unlock()
- s, err := kl.containerRuntime.Status()
- if err != nil {
- klog.Errorf("Container runtime sanity check failed: %v", err)
- return
- }
- if s == nil {
- klog.Errorf("Container runtime status is nil")
- return
- }
- // Periodically log the whole runtime status for debugging.
- // TODO(random-liu): Consider to send node event when optional
- // condition is unmet.
- klog.V(4).Infof("Container runtime status: %v", s)
- networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
- if networkReady == nil || !networkReady.Status {
- klog.Errorf("Container runtime network not ready: %v", networkReady)
- kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
- } else {
- // Set nil if the container runtime network is ready.
- kl.runtimeState.setNetworkState(nil)
- }
- // TODO(random-liu): Add runtime error in runtimeState, and update it
- // when runtime is not ready, so that the information in RuntimeReady
- // condition will be propagated to NodeReady condition.
- runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
- // If RuntimeReady is not set or is false, report an error.
- if runtimeReady == nil || !runtimeReady.Status {
- klog.Errorf("Container runtime not ready: %v", runtimeReady)
- return
- }
- kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
- kl.runtimeState.setRuntimeSync(kl.clock.Now())
- }
- // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
- func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
- return kl.kubeletConfiguration
- }
- // BirthCry sends an event that the kubelet has started up.
- func (kl *Kubelet) BirthCry() {
- // Make an event that kubelet restarted.
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
- }
- // ResyncInterval returns the interval used for periodic syncs.
- func (kl *Kubelet) ResyncInterval() time.Duration {
- return kl.resyncInterval
- }
- // ListenAndServe runs the kubelet HTTP server.
- func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling bool) {
- server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
- }
- // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
- func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool) {
- server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, enableCAdvisorJSONEndpoints)
- }
- // ListenAndServePodResources runs the kubelet podresources grpc service
- func (kl *Kubelet) ListenAndServePodResources() {
- socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
- if err != nil {
- klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
- return
- }
- server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
- }
- // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
- func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
- if podStatus, err := kl.podCache.Get(podID); err == nil {
- removeAll := false
- if syncedPod, ok := kl.podManager.GetPodByUID(podID); ok {
- // generate the api status using the cached runtime status to get up-to-date ContainerStatuses
- apiPodStatus := kl.generateAPIPodStatus(syncedPod, podStatus)
- // When an evicted or deleted pod has already synced, all containers can be removed.
- removeAll = eviction.PodIsEvicted(syncedPod.Status) || (syncedPod.DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses))
- }
- kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
- }
- }
- // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
- // is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
- // a runtime update and a node status update. Function returns after one successful node status update.
- // Function is executed only during Kubelet start which improves latency to ready node by updating
- // pod CIDR, runtime status and node statuses ASAP.
- func (kl *Kubelet) fastStatusUpdateOnce() {
- for {
- time.Sleep(100 * time.Millisecond)
- node, err := kl.GetNode()
- if err != nil {
- klog.Errorf(err.Error())
- continue
- }
- if node.Spec.PodCIDR != "" {
- if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
- klog.Errorf("Pod CIDR update failed %v", err)
- continue
- }
- kl.updateRuntimeUp()
- kl.syncNodeStatus()
- return
- }
- }
- }
- // isSyncPodWorthy filters out events that are not worthy of pod syncing
- func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
- // ContatnerRemoved doesn't affect pod state
- return event.Type != pleg.ContainerRemoved
- }
- // Gets the streaming server configuration to use with in-process CRI shims.
- func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
- config := &streaming.Config{
- StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
- StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
- SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
- SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
- }
- if !crOptions.RedirectContainerStreaming {
- config.Addr = net.JoinHostPort("localhost", "0")
- } else {
- // Use a relative redirect (no scheme or host).
- config.BaseURL = &url.URL{
- Path: "/cri/",
- }
- if kubeDeps.TLSOptions != nil {
- config.TLSConfig = kubeDeps.TLSOptions.Config
- }
- }
- return config
- }
|