123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913 |
- // +build linux
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cm
- import (
- "bytes"
- "fmt"
- "io/ioutil"
- "os"
- "path"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/opencontainers/runc/libcontainer/cgroups"
- "github.com/opencontainers/runc/libcontainer/cgroups/fs"
- "github.com/opencontainers/runc/libcontainer/configs"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- "k8s.io/apimachinery/pkg/util/sets"
- utilversion "k8s.io/apimachinery/pkg/util/version"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/tools/record"
- internalapi "k8s.io/cri-api/pkg/apis"
- kubefeatures "k8s.io/kubernetes/pkg/features"
- podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
- "k8s.io/kubernetes/pkg/kubelet/cadvisor"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
- "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
- cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
- "k8s.io/kubernetes/pkg/kubelet/config"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/lifecycle"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
- "k8s.io/kubernetes/pkg/kubelet/qos"
- "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
- "k8s.io/kubernetes/pkg/kubelet/status"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/util/oom"
- "k8s.io/kubernetes/pkg/util/procfs"
- utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
- utilpath "k8s.io/utils/path"
- )
- const (
- // The percent of the machine memory capacity. The value is used to calculate
- // docker memory resource container's hardlimit to workaround docker memory
- // leakage issue. Please see kubernetes/issues/9881 for more detail.
- DockerMemoryLimitThresholdPercent = 70
- // The minimum memory limit allocated to docker container: 150Mi
- MinDockerMemoryLimit = 150 * 1024 * 1024
- dockerProcessName = "docker"
- dockerPidFile = "/var/run/docker.pid"
- containerdProcessName = "docker-containerd"
- containerdPidFile = "/run/docker/libcontainerd/docker-containerd.pid"
- )
- var (
- // The docker version in which containerd was introduced.
- containerdAPIVersion = utilversion.MustParseGeneric("1.23")
- )
- // A non-user container tracked by the Kubelet.
- type systemContainer struct {
- // Absolute name of the container.
- name string
- // CPU limit in millicores.
- cpuMillicores int64
- // Function that ensures the state of the container.
- // m is the cgroup manager for the specified container.
- ensureStateFunc func(m *fs.Manager) error
- // Manager for the cgroups of the external container.
- manager *fs.Manager
- }
- func newSystemCgroups(containerName string) *systemContainer {
- return &systemContainer{
- name: containerName,
- manager: createManager(containerName),
- }
- }
- type containerManagerImpl struct {
- sync.RWMutex
- cadvisorInterface cadvisor.Interface
- mountUtil mount.Interface
- NodeConfig
- status Status
- // External containers being managed.
- systemContainers []*systemContainer
- qosContainers QOSContainersInfo
- // Tasks that are run periodically
- periodicTasks []func()
- // Holds all the mounted cgroup subsystems
- subsystems *CgroupSubsystems
- nodeInfo *v1.Node
- // Interface for cgroup management
- cgroupManager CgroupManager
- // Capacity of this node.
- capacity v1.ResourceList
- // Capacity of this node, including internal resources.
- internalCapacity v1.ResourceList
- // Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
- // This path include a top level container for enforcing Node Allocatable.
- cgroupRoot CgroupName
- // Event recorder interface.
- recorder record.EventRecorder
- // Interface for QoS cgroup management
- qosContainerManager QOSContainerManager
- // Interface for exporting and allocating devices reported by device plugins.
- deviceManager devicemanager.Manager
- // Interface for CPU affinity management.
- cpuManager cpumanager.Manager
- }
- type features struct {
- cpuHardcapping bool
- }
- var _ ContainerManager = &containerManagerImpl{}
- // checks if the required cgroups subsystems are mounted.
- // As of now, only 'cpu' and 'memory' are required.
- // cpu quota is a soft requirement.
- func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
- const (
- cgroupMountType = "cgroup"
- localErr = "system validation failed"
- )
- var (
- cpuMountPoint string
- f features
- )
- mountPoints, err := mountUtil.List()
- if err != nil {
- return f, fmt.Errorf("%s - %v", localErr, err)
- }
- expectedCgroups := sets.NewString("cpu", "cpuacct", "cpuset", "memory")
- for _, mountPoint := range mountPoints {
- if mountPoint.Type == cgroupMountType {
- for _, opt := range mountPoint.Opts {
- if expectedCgroups.Has(opt) {
- expectedCgroups.Delete(opt)
- }
- if opt == "cpu" {
- cpuMountPoint = mountPoint.Path
- }
- }
- }
- }
- if expectedCgroups.Len() > 0 {
- return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, expectedCgroups.List())
- }
- // Check if cpu quota is available.
- // CPU cgroup is required and so it expected to be mounted at this point.
- periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us"))
- if err != nil {
- klog.Errorf("failed to detect if CPU cgroup cpu.cfs_period_us is available - %v", err)
- }
- quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us"))
- if err != nil {
- klog.Errorf("failed to detect if CPU cgroup cpu.cfs_quota_us is available - %v", err)
- }
- if quotaExists && periodExists {
- f.cpuHardcapping = true
- }
- return f, nil
- }
- // TODO(vmarmol): Add limits to the system containers.
- // Takes the absolute name of the specified containers.
- // Empty container name disables use of the specified container.
- func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
- // Mitigation of the issue fixed in master where hugetlb prefix for page sizes with "KiB"
- // is "kB" in runc, but the correct is "KB"
- // See https://github.com/opencontainers/runc/pull/2065
- // and https://github.com/kubernetes/kubernetes/pull/78495
- // for more info.
- for i, pageSize := range fs.HugePageSizes {
- fs.HugePageSizes[i] = strings.ReplaceAll(pageSize, "kB", "KB")
- }
- subsystems, err := GetCgroupSubsystems()
- if err != nil {
- return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
- }
- if failSwapOn {
- // Check whether swap is enabled. The Kubelet does not support running with swap enabled.
- swapData, err := ioutil.ReadFile("/proc/swaps")
- if err != nil {
- return nil, err
- }
- swapData = bytes.TrimSpace(swapData) // extra trailing \n
- swapLines := strings.Split(string(swapData), "\n")
- // If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
- // error out unless --fail-swap-on is set to false.
- if len(swapLines) > 1 {
- return nil, fmt.Errorf("Running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
- }
- }
- var capacity = v1.ResourceList{}
- var internalCapacity = v1.ResourceList{}
- // It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
- // machine info is computed and cached once as part of cAdvisor object creation.
- // But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
- machineInfo, err := cadvisorInterface.MachineInfo()
- if err != nil {
- return nil, err
- }
- capacity = cadvisor.CapacityFromMachineInfo(machineInfo)
- for k, v := range capacity {
- internalCapacity[k] = v
- }
- pidlimits, err := pidlimit.Stats()
- if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
- internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
- int64(*pidlimits.MaxPID),
- resource.DecimalSI)
- }
- // Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
- cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
- cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
- // Check if Cgroup-root actually exists on the node
- if nodeConfig.CgroupsPerQOS {
- // this does default to / when enabled, but this tests against regressions.
- if nodeConfig.CgroupRoot == "" {
- return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
- }
- // we need to check that the cgroup root actually exists for each subsystem
- // of note, we always use the cgroupfs driver when performing this check since
- // the input is provided in that format.
- // this is important because we do not want any name conversion to occur.
- if !cgroupManager.Exists(cgroupRoot) {
- return nil, fmt.Errorf("invalid configuration: cgroup-root %q doesn't exist", cgroupRoot)
- }
- klog.Infof("container manager verified user specified cgroup-root exists: %v", cgroupRoot)
- // Include the top level cgroup for enforcing node allocatable into cgroup-root.
- // This way, all sub modules can avoid having to understand the concept of node allocatable.
- cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
- }
- klog.Infof("Creating Container Manager object based on Node Config: %+v", nodeConfig)
- qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
- if err != nil {
- return nil, err
- }
- cm := &containerManagerImpl{
- cadvisorInterface: cadvisorInterface,
- mountUtil: mountUtil,
- NodeConfig: nodeConfig,
- subsystems: subsystems,
- cgroupManager: cgroupManager,
- capacity: capacity,
- internalCapacity: internalCapacity,
- cgroupRoot: cgroupRoot,
- recorder: recorder,
- qosContainerManager: qosContainerManager,
- }
- klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
- if devicePluginEnabled {
- cm.deviceManager, err = devicemanager.NewManagerImpl()
- } else {
- cm.deviceManager, err = devicemanager.NewManagerStub()
- }
- if err != nil {
- return nil, err
- }
- // Initialize CPU manager
- if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
- cm.cpuManager, err = cpumanager.NewManager(
- nodeConfig.ExperimentalCPUManagerPolicy,
- nodeConfig.ExperimentalCPUManagerReconcilePeriod,
- machineInfo,
- cm.GetNodeAllocatableReservation(),
- nodeConfig.KubeletRootDir,
- )
- if err != nil {
- klog.Errorf("failed to initialize cpu manager: %v", err)
- return nil, err
- }
- }
- return cm, nil
- }
- // NewPodContainerManager is a factory method returns a PodContainerManager object
- // If qosCgroups are enabled then it returns the general pod container manager
- // otherwise it returns a no-op manager which essentially does nothing
- func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
- if cm.NodeConfig.CgroupsPerQOS {
- return &podContainerManagerImpl{
- qosContainersInfo: cm.GetQOSContainersInfo(),
- subsystems: cm.subsystems,
- cgroupManager: cm.cgroupManager,
- podPidsLimit: cm.ExperimentalPodPidsLimit,
- enforceCPULimits: cm.EnforceCPULimits,
- cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond),
- }
- }
- return &podContainerManagerNoop{
- cgroupRoot: cm.cgroupRoot,
- }
- }
- func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
- return &internalContainerLifecycleImpl{cm.cpuManager}
- }
- // Create a cgroup container manager.
- func createManager(containerName string) *fs.Manager {
- allowAllDevices := true
- return &fs.Manager{
- Cgroups: &configs.Cgroup{
- Parent: "/",
- Name: containerName,
- Resources: &configs.Resources{
- AllowAllDevices: &allowAllDevices,
- },
- },
- }
- }
- type KernelTunableBehavior string
- const (
- KernelTunableWarn KernelTunableBehavior = "warn"
- KernelTunableError KernelTunableBehavior = "error"
- KernelTunableModify KernelTunableBehavior = "modify"
- )
- // setupKernelTunables validates kernel tunable flags are set as expected
- // depending upon the specified option, it will either warn, error, or modify the kernel tunable flags
- func setupKernelTunables(option KernelTunableBehavior) error {
- desiredState := map[string]int{
- utilsysctl.VmOvercommitMemory: utilsysctl.VmOvercommitMemoryAlways,
- utilsysctl.VmPanicOnOOM: utilsysctl.VmPanicOnOOMInvokeOOMKiller,
- utilsysctl.KernelPanic: utilsysctl.KernelPanicRebootTimeout,
- utilsysctl.KernelPanicOnOops: utilsysctl.KernelPanicOnOopsAlways,
- utilsysctl.RootMaxKeys: utilsysctl.RootMaxKeysSetting,
- utilsysctl.RootMaxBytes: utilsysctl.RootMaxBytesSetting,
- }
- sysctl := utilsysctl.New()
- errList := []error{}
- for flag, expectedValue := range desiredState {
- val, err := sysctl.GetSysctl(flag)
- if err != nil {
- errList = append(errList, err)
- continue
- }
- if val == expectedValue {
- continue
- }
- switch option {
- case KernelTunableError:
- errList = append(errList, fmt.Errorf("Invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
- case KernelTunableWarn:
- klog.V(2).Infof("Invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val)
- case KernelTunableModify:
- klog.V(2).Infof("Updating kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val)
- err = sysctl.SetSysctl(flag, expectedValue)
- if err != nil {
- errList = append(errList, err)
- }
- }
- }
- return utilerrors.NewAggregate(errList)
- }
- func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
- f, err := validateSystemRequirements(cm.mountUtil)
- if err != nil {
- return err
- }
- if !f.cpuHardcapping {
- cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported")
- }
- b := KernelTunableModify
- if cm.GetNodeConfig().ProtectKernelDefaults {
- b = KernelTunableError
- }
- if err := setupKernelTunables(b); err != nil {
- return err
- }
- // Setup top level qos containers only if CgroupsPerQOS flag is specified as true
- if cm.NodeConfig.CgroupsPerQOS {
- if err := cm.createNodeAllocatableCgroups(); err != nil {
- return err
- }
- err = cm.qosContainerManager.Start(cm.getNodeAllocatableAbsolute, activePods)
- if err != nil {
- return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
- }
- }
- // Enforce Node Allocatable (if required)
- if err := cm.enforceNodeAllocatableCgroups(); err != nil {
- return err
- }
- systemContainers := []*systemContainer{}
- if cm.ContainerRuntime == "docker" {
- // With the docker-CRI integration, dockershim will manage the cgroups
- // and oom score for the docker processes.
- // In the future, NodeSpec should mandate the cgroup that the
- // runtime processes need to be in. For now, we still check the
- // cgroup for docker periodically, so that kubelet can recognize
- // the cgroup for docker and serve stats for the runtime.
- // TODO(#27097): Fix this after NodeSpec is clearly defined.
- cm.periodicTasks = append(cm.periodicTasks, func() {
- klog.V(4).Infof("[ContainerManager]: Adding periodic tasks for docker CRI integration")
- cont, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
- if err != nil {
- klog.Error(err)
- return
- }
- klog.V(2).Infof("[ContainerManager]: Discovered runtime cgroups name: %s", cont)
- cm.Lock()
- defer cm.Unlock()
- cm.RuntimeCgroupsName = cont
- })
- }
- if cm.SystemCgroupsName != "" {
- if cm.SystemCgroupsName == "/" {
- return fmt.Errorf("system container cannot be root (\"/\")")
- }
- cont := newSystemCgroups(cm.SystemCgroupsName)
- cont.ensureStateFunc = func(manager *fs.Manager) error {
- return ensureSystemCgroups("/", manager)
- }
- systemContainers = append(systemContainers, cont)
- }
- if cm.KubeletCgroupsName != "" {
- cont := newSystemCgroups(cm.KubeletCgroupsName)
- allowAllDevices := true
- manager := fs.Manager{
- Cgroups: &configs.Cgroup{
- Parent: "/",
- Name: cm.KubeletCgroupsName,
- Resources: &configs.Resources{
- AllowAllDevices: &allowAllDevices,
- },
- },
- }
- cont.ensureStateFunc = func(_ *fs.Manager) error {
- return ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, &manager)
- }
- systemContainers = append(systemContainers, cont)
- } else {
- cm.periodicTasks = append(cm.periodicTasks, func() {
- if err := ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, nil); err != nil {
- klog.Error(err)
- return
- }
- cont, err := getContainer(os.Getpid())
- if err != nil {
- klog.Errorf("failed to find cgroups of kubelet - %v", err)
- return
- }
- cm.Lock()
- defer cm.Unlock()
- cm.KubeletCgroupsName = cont
- })
- }
- cm.systemContainers = systemContainers
- return nil
- }
- func getContainerNameForProcess(name, pidFile string) (string, error) {
- pids, err := getPidsForProcess(name, pidFile)
- if err != nil {
- return "", fmt.Errorf("failed to detect process id for %q - %v", name, err)
- }
- if len(pids) == 0 {
- return "", nil
- }
- cont, err := getContainer(pids[0])
- if err != nil {
- return "", err
- }
- return cont, nil
- }
- func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
- cm.RLock()
- defer cm.RUnlock()
- return cm.NodeConfig
- }
- // GetPodCgroupRoot returns the literal cgroupfs value for the cgroup containing all pods.
- func (cm *containerManagerImpl) GetPodCgroupRoot() string {
- return cm.cgroupManager.Name(cm.cgroupRoot)
- }
- func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
- return cm.subsystems
- }
- func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
- return cm.qosContainerManager.GetQOSContainersInfo()
- }
- func (cm *containerManagerImpl) UpdateQOSCgroups() error {
- return cm.qosContainerManager.UpdateCgroups()
- }
- func (cm *containerManagerImpl) Status() Status {
- cm.RLock()
- defer cm.RUnlock()
- return cm.status
- }
- func (cm *containerManagerImpl) Start(node *v1.Node,
- activePods ActivePodsFunc,
- sourcesReady config.SourcesReady,
- podStatusProvider status.PodStatusProvider,
- runtimeService internalapi.RuntimeService) error {
- // Initialize CPU manager
- if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
- cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), podStatusProvider, runtimeService)
- }
- // cache the node Info including resource capacity and
- // allocatable of the node
- cm.nodeInfo = node
- if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) {
- rootfs, err := cm.cadvisorInterface.RootFsInfo()
- if err != nil {
- return fmt.Errorf("failed to get rootfs info: %v", err)
- }
- for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
- cm.capacity[rName] = rCap
- }
- }
- // Ensure that node allocatable configuration is valid.
- if err := cm.validateNodeAllocatable(); err != nil {
- return err
- }
- // Setup the node
- if err := cm.setupNode(activePods); err != nil {
- return err
- }
- // Don't run a background thread if there are no ensureStateFuncs.
- hasEnsureStateFuncs := false
- for _, cont := range cm.systemContainers {
- if cont.ensureStateFunc != nil {
- hasEnsureStateFuncs = true
- break
- }
- }
- if hasEnsureStateFuncs {
- // Run ensure state functions every minute.
- go wait.Until(func() {
- for _, cont := range cm.systemContainers {
- if cont.ensureStateFunc != nil {
- if err := cont.ensureStateFunc(cont.manager); err != nil {
- klog.Warningf("[ContainerManager] Failed to ensure state of %q: %v", cont.name, err)
- }
- }
- }
- }, time.Minute, wait.NeverStop)
- }
- if len(cm.periodicTasks) > 0 {
- go wait.Until(func() {
- for _, task := range cm.periodicTasks {
- if task != nil {
- task()
- }
- }
- }, 5*time.Minute, wait.NeverStop)
- }
- // Starts device manager.
- if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
- return err
- }
- return nil
- }
- func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
- return cm.deviceManager.GetWatcherHandler()
- }
- // TODO: move the GetResources logic to PodContainerManager.
- func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
- opts := &kubecontainer.RunContainerOptions{}
- // Allocate should already be called during predicateAdmitHandler.Admit(),
- // just try to fetch device runtime information from cached state here
- devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
- if err != nil {
- return nil, err
- } else if devOpts == nil {
- return opts, nil
- }
- opts.Devices = append(opts.Devices, devOpts.Devices...)
- opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
- opts.Envs = append(opts.Envs, devOpts.Envs...)
- opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
- return opts, nil
- }
- func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
- return cm.deviceManager.Allocate(node, attrs)
- }
- func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
- cpuLimit := int64(0)
- // Sum up resources of all external containers.
- for _, cont := range cm.systemContainers {
- cpuLimit += cont.cpuMillicores
- }
- return v1.ResourceList{
- v1.ResourceCPU: *resource.NewMilliQuantity(
- cpuLimit,
- resource.DecimalSI),
- }
- }
- func isProcessRunningInHost(pid int) (bool, error) {
- // Get init pid namespace.
- initPidNs, err := os.Readlink("/proc/1/ns/pid")
- if err != nil {
- return false, fmt.Errorf("failed to find pid namespace of init process")
- }
- klog.V(10).Infof("init pid ns is %q", initPidNs)
- processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid))
- if err != nil {
- return false, fmt.Errorf("failed to find pid namespace of process %q", pid)
- }
- klog.V(10).Infof("Pid %d pid ns is %q", pid, processPidNs)
- return initPidNs == processPidNs, nil
- }
- func getPidFromPidFile(pidFile string) (int, error) {
- file, err := os.Open(pidFile)
- if err != nil {
- return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
- }
- defer file.Close()
- data, err := ioutil.ReadAll(file)
- if err != nil {
- return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
- }
- pid, err := strconv.Atoi(string(data))
- if err != nil {
- return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
- }
- return pid, nil
- }
- func getPidsForProcess(name, pidFile string) ([]int, error) {
- if len(pidFile) == 0 {
- return procfs.PidOf(name)
- }
- pid, err := getPidFromPidFile(pidFile)
- if err == nil {
- return []int{pid}, nil
- }
- // Try to lookup pid by process name
- pids, err2 := procfs.PidOf(name)
- if err2 == nil {
- return pids, nil
- }
- // Return error from getPidFromPidFile since that should have worked
- // and is the real source of the problem.
- klog.V(4).Infof("unable to get pid from %s: %v", pidFile, err)
- return []int{}, err
- }
- // Ensures that the Docker daemon is in the desired container.
- // Temporarily export the function to be used by dockershim.
- // TODO(yujuhong): Move this function to dockershim once kubelet migrates to
- // dockershim as the default.
- func EnsureDockerInContainer(dockerAPIVersion *utilversion.Version, oomScoreAdj int, manager *fs.Manager) error {
- type process struct{ name, file string }
- dockerProcs := []process{{dockerProcessName, dockerPidFile}}
- if dockerAPIVersion.AtLeast(containerdAPIVersion) {
- dockerProcs = append(dockerProcs, process{containerdProcessName, containerdPidFile})
- }
- var errs []error
- for _, proc := range dockerProcs {
- pids, err := getPidsForProcess(proc.name, proc.file)
- if err != nil {
- errs = append(errs, fmt.Errorf("failed to get pids for %q: %v", proc.name, err))
- continue
- }
- // Move if the pid is not already in the desired container.
- for _, pid := range pids {
- if err := ensureProcessInContainerWithOOMScore(pid, oomScoreAdj, manager); err != nil {
- errs = append(errs, fmt.Errorf("errors moving %q pid: %v", proc.name, err))
- }
- }
- }
- return utilerrors.NewAggregate(errs)
- }
- func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager *fs.Manager) error {
- if runningInHost, err := isProcessRunningInHost(pid); err != nil {
- // Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context.
- return err
- } else if !runningInHost {
- // Process is running inside a container. Don't touch that.
- klog.V(2).Infof("pid %d is not running in the host namespaces", pid)
- return nil
- }
- var errs []error
- if manager != nil {
- cont, err := getContainer(pid)
- if err != nil {
- errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err))
- }
- if cont != manager.Cgroups.Name {
- err = manager.Apply(pid)
- if err != nil {
- errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, manager.Cgroups.Name, err))
- }
- }
- }
- // Also apply oom-score-adj to processes
- oomAdjuster := oom.NewOOMAdjuster()
- klog.V(5).Infof("attempting to apply oom_score_adj of %d to pid %d", oomScoreAdj, pid)
- if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil {
- klog.V(3).Infof("Failed to apply oom_score_adj %d for pid %d: %v", oomScoreAdj, pid, err)
- errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err))
- }
- return utilerrors.NewAggregate(errs)
- }
- // getContainer returns the cgroup associated with the specified pid.
- // It enforces a unified hierarchy for memory and cpu cgroups.
- // On systemd environments, it uses the name=systemd cgroup for the specified pid.
- func getContainer(pid int) (string, error) {
- cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
- if err != nil {
- return "", err
- }
- cpu, found := cgs["cpu"]
- if !found {
- return "", cgroups.NewNotFoundError("cpu")
- }
- memory, found := cgs["memory"]
- if !found {
- return "", cgroups.NewNotFoundError("memory")
- }
- // since we use this container for accounting, we need to ensure its a unified hierarchy.
- if cpu != memory {
- return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory)
- }
- // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
- // cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
- // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
- // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
- // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
- // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
- // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
- // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
- // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
- if systemd, found := cgs["name=systemd"]; found {
- if systemd != cpu {
- klog.Warningf("CPUAccounting not enabled for pid: %d", pid)
- }
- if systemd != memory {
- klog.Warningf("MemoryAccounting not enabled for pid: %d", pid)
- }
- return systemd, nil
- }
- return cpu, nil
- }
- // Ensures the system container is created and all non-kernel threads and process 1
- // without a container are moved to it.
- //
- // The reason of leaving kernel threads at root cgroup is that we don't want to tie the
- // execution of these threads with to-be defined /system quota and create priority inversions.
- //
- func ensureSystemCgroups(rootCgroupPath string, manager *fs.Manager) error {
- // Move non-kernel PIDs to the system container.
- attemptsRemaining := 10
- var errs []error
- for attemptsRemaining >= 0 {
- // Only keep errors on latest attempt.
- errs = []error{}
- attemptsRemaining--
- allPids, err := cmutil.GetPids(rootCgroupPath)
- if err != nil {
- errs = append(errs, fmt.Errorf("failed to list PIDs for root: %v", err))
- continue
- }
- // Remove kernel pids and other protected PIDs (pid 1, PIDs already in system & kubelet containers)
- pids := make([]int, 0, len(allPids))
- for _, pid := range allPids {
- if pid == 1 || isKernelPid(pid) {
- continue
- }
- pids = append(pids, pid)
- }
- klog.Infof("Found %d PIDs in root, %d of them are not to be moved", len(allPids), len(allPids)-len(pids))
- // Check if we have moved all the non-kernel PIDs.
- if len(pids) == 0 {
- break
- }
- klog.Infof("Moving non-kernel processes: %v", pids)
- for _, pid := range pids {
- err := manager.Apply(pid)
- if err != nil {
- errs = append(errs, fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, manager.Cgroups.Name, err))
- }
- }
- }
- if attemptsRemaining < 0 {
- errs = append(errs, fmt.Errorf("ran out of attempts to create system containers %q", manager.Cgroups.Name))
- }
- return utilerrors.NewAggregate(errs)
- }
- // Determines whether the specified PID is a kernel PID.
- func isKernelPid(pid int) bool {
- // Kernel threads have no associated executable.
- _, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
- return err != nil
- }
- func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
- return cm.capacity
- }
- func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
- return cm.deviceManager.GetCapacity()
- }
- func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
- return cm.deviceManager.GetDevices(podUID, containerName)
- }
- func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
- return cm.deviceManager.ShouldResetExtendedResourceCapacity()
- }
|