cpu_manager.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cpumanager
  14. import (
  15. "fmt"
  16. "math"
  17. "sync"
  18. "time"
  19. cadvisorapi "github.com/google/cadvisor/info/v1"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/klog"
  23. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  24. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap"
  25. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
  26. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
  27. "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
  28. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
  29. "k8s.io/kubernetes/pkg/kubelet/config"
  30. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  31. "k8s.io/kubernetes/pkg/kubelet/status"
  32. )
  33. // ActivePodsFunc is a function that returns a list of pods to reconcile.
  34. type ActivePodsFunc func() []*v1.Pod
  35. type runtimeService interface {
  36. UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error
  37. }
  38. type policyName string
  39. // cpuManagerStateFileName is the file name where cpu manager stores its state
  40. const cpuManagerStateFileName = "cpu_manager_state"
  41. // Manager interface provides methods for Kubelet to manage pod cpus.
  42. type Manager interface {
  43. // Start is called during Kubelet initialization.
  44. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
  45. // AddContainer is called between container create and container start
  46. // so that initial CPU affinity settings can be written through to the
  47. // container runtime before the first process begins to execute.
  48. AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
  49. // RemoveContainer is called after Kubelet decides to kill or delete a
  50. // container. After this call, the CPU manager stops trying to reconcile
  51. // that container and any CPUs dedicated to the container are freed.
  52. RemoveContainer(containerID string) error
  53. // State returns a read-only interface to the internal CPU manager state.
  54. State() state.Reader
  55. // GetTopologyHints implements the topologymanager.HintProvider Interface
  56. // and is consulted to achieve NUMA aware resource alignment among this
  57. // and other resource controllers.
  58. GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
  59. }
  60. type manager struct {
  61. sync.Mutex
  62. policy Policy
  63. // reconcilePeriod is the duration between calls to reconcileState.
  64. reconcilePeriod time.Duration
  65. // state allows pluggable CPU assignment policies while sharing a common
  66. // representation of state for the system to inspect and reconcile.
  67. state state.State
  68. // containerRuntime is the container runtime service interface needed
  69. // to make UpdateContainerResources() calls against the containers.
  70. containerRuntime runtimeService
  71. // activePods is a method for listing active pods on the node
  72. // so all the containers can be updated in the reconciliation loop.
  73. activePods ActivePodsFunc
  74. // podStatusProvider provides a method for obtaining pod statuses
  75. // and the containerID of their containers
  76. podStatusProvider status.PodStatusProvider
  77. // containerMap provides a mapping from (pod, container) -> containerID
  78. // for all containers a pod
  79. containerMap containermap.ContainerMap
  80. topology *topology.CPUTopology
  81. nodeAllocatableReservation v1.ResourceList
  82. // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
  83. // We use it to determine when we can purge inactive pods from checkpointed state.
  84. sourcesReady config.SourcesReady
  85. // stateFileDirectory holds the directory where the state file for checkpoints is held.
  86. stateFileDirectory string
  87. }
  88. var _ Manager = &manager{}
  89. type sourcesReadyStub struct{}
  90. func (s *sourcesReadyStub) AddSource(source string) {}
  91. func (s *sourcesReadyStub) AllReady() bool { return true }
  92. // NewManager creates new cpu manager based on provided policy
  93. func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
  94. var topo *topology.CPUTopology
  95. var policy Policy
  96. switch policyName(cpuPolicyName) {
  97. case PolicyNone:
  98. policy = NewNonePolicy()
  99. case PolicyStatic:
  100. var err error
  101. topo, err = topology.Discover(machineInfo, numaNodeInfo)
  102. if err != nil {
  103. return nil, err
  104. }
  105. klog.Infof("[cpumanager] detected CPU topology: %v", topo)
  106. reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
  107. if !ok {
  108. // The static policy cannot initialize without this information.
  109. return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
  110. }
  111. if reservedCPUs.IsZero() {
  112. // The static policy requires this to be nonzero. Zero CPU reservation
  113. // would allow the shared pool to be completely exhausted. At that point
  114. // either we would violate our guarantee of exclusivity or need to evict
  115. // any pod that has at least one container that requires zero CPUs.
  116. // See the comments in policy_static.go for more details.
  117. return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
  118. }
  119. // Take the ceiling of the reservation, since fractional CPUs cannot be
  120. // exclusively allocated.
  121. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
  122. numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
  123. policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
  124. if err != nil {
  125. return nil, fmt.Errorf("new static policy error: %v", err)
  126. }
  127. default:
  128. return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
  129. }
  130. manager := &manager{
  131. policy: policy,
  132. reconcilePeriod: reconcilePeriod,
  133. topology: topo,
  134. nodeAllocatableReservation: nodeAllocatableReservation,
  135. stateFileDirectory: stateFileDirectory,
  136. }
  137. manager.sourcesReady = &sourcesReadyStub{}
  138. return manager, nil
  139. }
  140. func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
  141. klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
  142. klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
  143. m.sourcesReady = sourcesReady
  144. m.activePods = activePods
  145. m.podStatusProvider = podStatusProvider
  146. m.containerRuntime = containerRuntime
  147. m.containerMap = initialContainers
  148. stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
  149. if err != nil {
  150. klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
  151. return err
  152. }
  153. m.state = stateImpl
  154. err = m.policy.Start(m.state)
  155. if err != nil {
  156. klog.Errorf("[cpumanager] policy start error: %v", err)
  157. return err
  158. }
  159. if m.policy.Name() == string(PolicyNone) {
  160. return nil
  161. }
  162. // Periodically call m.reconcileState() to continue to keep the CPU sets of
  163. // all pods in sync with and guaranteed CPUs handed out among them.
  164. go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
  165. return nil
  166. }
  167. func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
  168. m.Lock()
  169. // Proactively remove CPUs from init containers that have already run.
  170. // They are guaranteed to have run to completion before any other
  171. // container is run.
  172. for _, initContainer := range p.Spec.InitContainers {
  173. if c.Name != initContainer.Name {
  174. err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
  175. if err != nil {
  176. klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
  177. }
  178. }
  179. }
  180. // Call down into the policy to assign this container CPUs if required.
  181. err := m.policyAddContainer(p, c, containerID)
  182. if err != nil {
  183. klog.Errorf("[cpumanager] AddContainer error: %v", err)
  184. m.Unlock()
  185. return err
  186. }
  187. // Get the CPUs just assigned to the container (or fall back to the default
  188. // CPUSet if none were assigned).
  189. cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
  190. m.Unlock()
  191. if !cpus.IsEmpty() {
  192. err = m.updateContainerCPUSet(containerID, cpus)
  193. if err != nil {
  194. klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
  195. m.Lock()
  196. err := m.policyRemoveContainerByID(containerID)
  197. if err != nil {
  198. klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
  199. }
  200. m.Unlock()
  201. }
  202. return err
  203. }
  204. klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
  205. return nil
  206. }
  207. func (m *manager) RemoveContainer(containerID string) error {
  208. m.Lock()
  209. defer m.Unlock()
  210. err := m.policyRemoveContainerByID(containerID)
  211. if err != nil {
  212. klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
  213. return err
  214. }
  215. return nil
  216. }
  217. func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
  218. err := m.policy.AddContainer(m.state, p, c)
  219. if err == nil {
  220. m.containerMap.Add(string(p.UID), c.Name, containerID)
  221. }
  222. return err
  223. }
  224. func (m *manager) policyRemoveContainerByID(containerID string) error {
  225. podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
  226. if err != nil {
  227. return nil
  228. }
  229. err = m.policy.RemoveContainer(m.state, podUID, containerName)
  230. if err == nil {
  231. m.containerMap.RemoveByContainerID(containerID)
  232. }
  233. return err
  234. }
  235. func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
  236. err := m.policy.RemoveContainer(m.state, podUID, containerName)
  237. if err == nil {
  238. m.containerMap.RemoveByContainerRef(podUID, containerName)
  239. }
  240. return err
  241. }
  242. func (m *manager) State() state.Reader {
  243. return m.state
  244. }
  245. func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
  246. // Garbage collect any stranded resources before providing TopologyHints
  247. m.removeStaleState()
  248. // Delegate to active policy
  249. return m.policy.GetTopologyHints(m.state, pod, container)
  250. }
  251. type reconciledContainer struct {
  252. podName string
  253. containerName string
  254. containerID string
  255. }
  256. func (m *manager) removeStaleState() {
  257. // Only once all sources are ready do we attempt to remove any stale state.
  258. // This ensures that the call to `m.activePods()` below will succeed with
  259. // the actual active pods list.
  260. if !m.sourcesReady.AllReady() {
  261. return
  262. }
  263. // We grab the lock to ensure that no new containers will grab CPUs while
  264. // executing the code below. Without this lock, its possible that we end up
  265. // removing state that is newly added by an asynchronous call to
  266. // AddContainer() during the execution of this code.
  267. m.Lock()
  268. defer m.Unlock()
  269. // Get the list of active pods.
  270. activePods := m.activePods()
  271. // Build a list of (podUID, containerName) pairs for all containers in all active Pods.
  272. activeContainers := make(map[string]map[string]struct{})
  273. for _, pod := range activePods {
  274. activeContainers[string(pod.UID)] = make(map[string]struct{})
  275. for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
  276. activeContainers[string(pod.UID)][container.Name] = struct{}{}
  277. }
  278. }
  279. // Loop through the CPUManager state. Remove any state for containers not
  280. // in the `activeContainers` list built above.
  281. assignments := m.state.GetCPUAssignments()
  282. for podUID := range assignments {
  283. for containerName := range assignments[podUID] {
  284. if _, ok := activeContainers[podUID][containerName]; !ok {
  285. klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
  286. err := m.policyRemoveContainerByRef(podUID, containerName)
  287. if err != nil {
  288. klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
  289. }
  290. }
  291. }
  292. }
  293. }
  294. func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
  295. success = []reconciledContainer{}
  296. failure = []reconciledContainer{}
  297. m.removeStaleState()
  298. for _, pod := range m.activePods() {
  299. pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
  300. if !ok {
  301. klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
  302. failure = append(failure, reconciledContainer{pod.Name, "", ""})
  303. continue
  304. }
  305. allContainers := pod.Spec.InitContainers
  306. allContainers = append(allContainers, pod.Spec.Containers...)
  307. for _, container := range allContainers {
  308. containerID, err := findContainerIDByName(&pstatus, container.Name)
  309. if err != nil {
  310. klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
  311. failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
  312. continue
  313. }
  314. cstatus, err := findContainerStatusByName(&pstatus, container.Name)
  315. if err != nil {
  316. klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
  317. failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
  318. continue
  319. }
  320. if cstatus.State.Waiting != nil ||
  321. (cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
  322. klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
  323. failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
  324. continue
  325. }
  326. if cstatus.State.Terminated != nil {
  327. // Since the container is terminated, we know it is safe to
  328. // remove it without any reconciliation. Removing the container
  329. // will also remove it from the `containerMap` so that this
  330. // container will be skipped next time around the loop.
  331. _, _, err := m.containerMap.GetContainerRef(containerID)
  332. if err == nil {
  333. klog.Warningf("[cpumanager] reconcileState: skipping container; already terminated (pod: %s, container id: %s)", pod.Name, containerID)
  334. err := m.RemoveContainer(containerID)
  335. if err != nil {
  336. klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
  337. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  338. }
  339. }
  340. continue
  341. }
  342. // Once we make it here we know we have a running container.
  343. // Idempotently add it to the containerMap incase it is missing.
  344. // This can happen after a kubelet restart, for example.
  345. m.containerMap.Add(string(pod.UID), container.Name, containerID)
  346. cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
  347. if cset.IsEmpty() {
  348. // NOTE: This should not happen outside of tests.
  349. klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
  350. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  351. continue
  352. }
  353. klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
  354. err = m.updateContainerCPUSet(containerID, cset)
  355. if err != nil {
  356. klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
  357. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  358. continue
  359. }
  360. success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
  361. }
  362. }
  363. return success, failure
  364. }
  365. func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
  366. allStatuses := status.InitContainerStatuses
  367. allStatuses = append(allStatuses, status.ContainerStatuses...)
  368. for _, container := range allStatuses {
  369. if container.Name == name && container.ContainerID != "" {
  370. cid := &kubecontainer.ContainerID{}
  371. err := cid.ParseString(container.ContainerID)
  372. if err != nil {
  373. return "", err
  374. }
  375. return cid.ID, nil
  376. }
  377. }
  378. return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
  379. }
  380. func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
  381. for _, status := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
  382. if status.Name == name {
  383. return &status, nil
  384. }
  385. }
  386. return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
  387. }
  388. func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
  389. // TODO: Consider adding a `ResourceConfigForContainer` helper in
  390. // helpers_linux.go similar to what exists for pods.
  391. // It would be better to pass the full container resources here instead of
  392. // this patch-like partial resources.
  393. return m.containerRuntime.UpdateContainerResources(
  394. containerID,
  395. &runtimeapi.LinuxContainerResources{
  396. CpusetCpus: cpus.String(),
  397. })
  398. }