123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cpumanager
- import (
- "fmt"
- "math"
- "sync"
- "time"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/klog"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
- "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
- "k8s.io/kubernetes/pkg/kubelet/config"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/status"
- )
- // ActivePodsFunc is a function that returns a list of pods to reconcile.
- type ActivePodsFunc func() []*v1.Pod
- type runtimeService interface {
- UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error
- }
- type policyName string
- // cpuManagerStateFileName is the file name where cpu manager stores its state
- const cpuManagerStateFileName = "cpu_manager_state"
- // Manager interface provides methods for Kubelet to manage pod cpus.
- type Manager interface {
- // Start is called during Kubelet initialization.
- Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
- // AddContainer is called between container create and container start
- // so that initial CPU affinity settings can be written through to the
- // container runtime before the first process begins to execute.
- AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
- // RemoveContainer is called after Kubelet decides to kill or delete a
- // container. After this call, the CPU manager stops trying to reconcile
- // that container and any CPUs dedicated to the container are freed.
- RemoveContainer(containerID string) error
- // State returns a read-only interface to the internal CPU manager state.
- State() state.Reader
- // GetTopologyHints implements the topologymanager.HintProvider Interface
- // and is consulted to achieve NUMA aware resource alignment among this
- // and other resource controllers.
- GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
- }
- type manager struct {
- sync.Mutex
- policy Policy
- // reconcilePeriod is the duration between calls to reconcileState.
- reconcilePeriod time.Duration
- // state allows pluggable CPU assignment policies while sharing a common
- // representation of state for the system to inspect and reconcile.
- state state.State
- // containerRuntime is the container runtime service interface needed
- // to make UpdateContainerResources() calls against the containers.
- containerRuntime runtimeService
- // activePods is a method for listing active pods on the node
- // so all the containers can be updated in the reconciliation loop.
- activePods ActivePodsFunc
- // podStatusProvider provides a method for obtaining pod statuses
- // and the containerID of their containers
- podStatusProvider status.PodStatusProvider
- // containerMap provides a mapping from (pod, container) -> containerID
- // for all containers a pod
- containerMap containermap.ContainerMap
- topology *topology.CPUTopology
- nodeAllocatableReservation v1.ResourceList
- // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
- // We use it to determine when we can purge inactive pods from checkpointed state.
- sourcesReady config.SourcesReady
- // stateFileDirectory holds the directory where the state file for checkpoints is held.
- stateFileDirectory string
- }
- var _ Manager = &manager{}
- type sourcesReadyStub struct{}
- func (s *sourcesReadyStub) AddSource(source string) {}
- func (s *sourcesReadyStub) AllReady() bool { return true }
- // NewManager creates new cpu manager based on provided policy
- func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
- var topo *topology.CPUTopology
- var policy Policy
- switch policyName(cpuPolicyName) {
- case PolicyNone:
- policy = NewNonePolicy()
- case PolicyStatic:
- var err error
- topo, err = topology.Discover(machineInfo, numaNodeInfo)
- if err != nil {
- return nil, err
- }
- klog.Infof("[cpumanager] detected CPU topology: %v", topo)
- reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
- if !ok {
- // The static policy cannot initialize without this information.
- return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
- }
- if reservedCPUs.IsZero() {
- // The static policy requires this to be nonzero. Zero CPU reservation
- // would allow the shared pool to be completely exhausted. At that point
- // either we would violate our guarantee of exclusivity or need to evict
- // any pod that has at least one container that requires zero CPUs.
- // See the comments in policy_static.go for more details.
- return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
- }
- // Take the ceiling of the reservation, since fractional CPUs cannot be
- // exclusively allocated.
- reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
- numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
- policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
- if err != nil {
- return nil, fmt.Errorf("new static policy error: %v", err)
- }
- default:
- return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
- }
- manager := &manager{
- policy: policy,
- reconcilePeriod: reconcilePeriod,
- topology: topo,
- nodeAllocatableReservation: nodeAllocatableReservation,
- stateFileDirectory: stateFileDirectory,
- }
- manager.sourcesReady = &sourcesReadyStub{}
- return manager, nil
- }
- func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
- klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
- klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
- m.sourcesReady = sourcesReady
- m.activePods = activePods
- m.podStatusProvider = podStatusProvider
- m.containerRuntime = containerRuntime
- m.containerMap = initialContainers
- stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
- if err != nil {
- klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
- return err
- }
- m.state = stateImpl
- err = m.policy.Start(m.state)
- if err != nil {
- klog.Errorf("[cpumanager] policy start error: %v", err)
- return err
- }
- if m.policy.Name() == string(PolicyNone) {
- return nil
- }
- // Periodically call m.reconcileState() to continue to keep the CPU sets of
- // all pods in sync with and guaranteed CPUs handed out among them.
- go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
- return nil
- }
- func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
- m.Lock()
- // Proactively remove CPUs from init containers that have already run.
- // They are guaranteed to have run to completion before any other
- // container is run.
- for _, initContainer := range p.Spec.InitContainers {
- if c.Name != initContainer.Name {
- err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
- if err != nil {
- klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
- }
- }
- }
- // Call down into the policy to assign this container CPUs if required.
- err := m.policyAddContainer(p, c, containerID)
- if err != nil {
- klog.Errorf("[cpumanager] AddContainer error: %v", err)
- m.Unlock()
- return err
- }
- // Get the CPUs just assigned to the container (or fall back to the default
- // CPUSet if none were assigned).
- cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
- m.Unlock()
- if !cpus.IsEmpty() {
- err = m.updateContainerCPUSet(containerID, cpus)
- if err != nil {
- klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
- m.Lock()
- err := m.policyRemoveContainerByID(containerID)
- if err != nil {
- klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
- }
- m.Unlock()
- }
- return err
- }
- klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
- return nil
- }
- func (m *manager) RemoveContainer(containerID string) error {
- m.Lock()
- defer m.Unlock()
- err := m.policyRemoveContainerByID(containerID)
- if err != nil {
- klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
- return err
- }
- return nil
- }
- func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
- err := m.policy.AddContainer(m.state, p, c)
- if err == nil {
- m.containerMap.Add(string(p.UID), c.Name, containerID)
- }
- return err
- }
- func (m *manager) policyRemoveContainerByID(containerID string) error {
- podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
- if err != nil {
- return nil
- }
- err = m.policy.RemoveContainer(m.state, podUID, containerName)
- if err == nil {
- m.containerMap.RemoveByContainerID(containerID)
- }
- return err
- }
- func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
- err := m.policy.RemoveContainer(m.state, podUID, containerName)
- if err == nil {
- m.containerMap.RemoveByContainerRef(podUID, containerName)
- }
- return err
- }
- func (m *manager) State() state.Reader {
- return m.state
- }
- func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
- // Garbage collect any stranded resources before providing TopologyHints
- m.removeStaleState()
- // Delegate to active policy
- return m.policy.GetTopologyHints(m.state, pod, container)
- }
- type reconciledContainer struct {
- podName string
- containerName string
- containerID string
- }
- func (m *manager) removeStaleState() {
- // Only once all sources are ready do we attempt to remove any stale state.
- // This ensures that the call to `m.activePods()` below will succeed with
- // the actual active pods list.
- if !m.sourcesReady.AllReady() {
- return
- }
- // We grab the lock to ensure that no new containers will grab CPUs while
- // executing the code below. Without this lock, its possible that we end up
- // removing state that is newly added by an asynchronous call to
- // AddContainer() during the execution of this code.
- m.Lock()
- defer m.Unlock()
- // Get the list of active pods.
- activePods := m.activePods()
- // Build a list of (podUID, containerName) pairs for all containers in all active Pods.
- activeContainers := make(map[string]map[string]struct{})
- for _, pod := range activePods {
- activeContainers[string(pod.UID)] = make(map[string]struct{})
- for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
- activeContainers[string(pod.UID)][container.Name] = struct{}{}
- }
- }
- // Loop through the CPUManager state. Remove any state for containers not
- // in the `activeContainers` list built above.
- assignments := m.state.GetCPUAssignments()
- for podUID := range assignments {
- for containerName := range assignments[podUID] {
- if _, ok := activeContainers[podUID][containerName]; !ok {
- klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
- err := m.policyRemoveContainerByRef(podUID, containerName)
- if err != nil {
- klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
- }
- }
- }
- }
- }
- func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
- success = []reconciledContainer{}
- failure = []reconciledContainer{}
- m.removeStaleState()
- for _, pod := range m.activePods() {
- pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
- if !ok {
- klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
- failure = append(failure, reconciledContainer{pod.Name, "", ""})
- continue
- }
- allContainers := pod.Spec.InitContainers
- allContainers = append(allContainers, pod.Spec.Containers...)
- for _, container := range allContainers {
- containerID, err := findContainerIDByName(&pstatus, container.Name)
- if err != nil {
- klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
- continue
- }
- cstatus, err := findContainerStatusByName(&pstatus, container.Name)
- if err != nil {
- klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
- continue
- }
- if cstatus.State.Waiting != nil ||
- (cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
- klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
- continue
- }
- if cstatus.State.Terminated != nil {
- // Since the container is terminated, we know it is safe to
- // remove it without any reconciliation. Removing the container
- // will also remove it from the `containerMap` so that this
- // container will be skipped next time around the loop.
- _, _, err := m.containerMap.GetContainerRef(containerID)
- if err == nil {
- klog.Warningf("[cpumanager] reconcileState: skipping container; already terminated (pod: %s, container id: %s)", pod.Name, containerID)
- err := m.RemoveContainer(containerID)
- if err != nil {
- klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
- }
- }
- continue
- }
- // Once we make it here we know we have a running container.
- // Idempotently add it to the containerMap incase it is missing.
- // This can happen after a kubelet restart, for example.
- m.containerMap.Add(string(pod.UID), container.Name, containerID)
- cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
- if cset.IsEmpty() {
- // NOTE: This should not happen outside of tests.
- klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
- continue
- }
- klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
- err = m.updateContainerCPUSet(containerID, cset)
- if err != nil {
- klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
- failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
- continue
- }
- success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
- }
- }
- return success, failure
- }
- func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
- allStatuses := status.InitContainerStatuses
- allStatuses = append(allStatuses, status.ContainerStatuses...)
- for _, container := range allStatuses {
- if container.Name == name && container.ContainerID != "" {
- cid := &kubecontainer.ContainerID{}
- err := cid.ParseString(container.ContainerID)
- if err != nil {
- return "", err
- }
- return cid.ID, nil
- }
- }
- return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
- }
- func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
- for _, status := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
- if status.Name == name {
- return &status, nil
- }
- }
- return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
- }
- func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
- // TODO: Consider adding a `ResourceConfigForContainer` helper in
- // helpers_linux.go similar to what exists for pods.
- // It would be better to pass the full container resources here instead of
- // this patch-like partial resources.
- return m.containerRuntime.UpdateContainerResources(
- containerID,
- &runtimeapi.LinuxContainerResources{
- CpusetCpus: cpus.String(),
- })
- }
|