cpu_manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cpumanager
  14. import (
  15. "fmt"
  16. "math"
  17. "sync"
  18. "time"
  19. cadvisorapi "github.com/google/cadvisor/info/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/klog"
  23. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  24. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
  25. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
  26. "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
  27. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  28. "k8s.io/kubernetes/pkg/kubelet/status"
  29. )
  30. // ActivePodsFunc is a function that returns a list of pods to reconcile.
  31. type ActivePodsFunc func() []*v1.Pod
  32. type runtimeService interface {
  33. UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error
  34. }
  35. type policyName string
  36. // cpuManagerStateFileName is the file name where cpu manager stores its state
  37. const cpuManagerStateFileName = "cpu_manager_state"
  38. // Manager interface provides methods for Kubelet to manage pod cpus.
  39. type Manager interface {
  40. // Start is called during Kubelet initialization.
  41. Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService)
  42. // AddContainer is called between container create and container start
  43. // so that initial CPU affinity settings can be written through to the
  44. // container runtime before the first process begins to execute.
  45. AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
  46. // RemoveContainer is called after Kubelet decides to kill or delete a
  47. // container. After this call, the CPU manager stops trying to reconcile
  48. // that container and any CPUs dedicated to the container are freed.
  49. RemoveContainer(containerID string) error
  50. // State returns a read-only interface to the internal CPU manager state.
  51. State() state.Reader
  52. }
  53. type manager struct {
  54. sync.Mutex
  55. policy Policy
  56. // reconcilePeriod is the duration between calls to reconcileState.
  57. reconcilePeriod time.Duration
  58. // state allows pluggable CPU assignment policies while sharing a common
  59. // representation of state for the system to inspect and reconcile.
  60. state state.State
  61. // containerRuntime is the container runtime service interface needed
  62. // to make UpdateContainerResources() calls against the containers.
  63. containerRuntime runtimeService
  64. // activePods is a method for listing active pods on the node
  65. // so all the containers can be updated in the reconciliation loop.
  66. activePods ActivePodsFunc
  67. // podStatusProvider provides a method for obtaining pod statuses
  68. // and the containerID of their containers
  69. podStatusProvider status.PodStatusProvider
  70. machineInfo *cadvisorapi.MachineInfo
  71. nodeAllocatableReservation v1.ResourceList
  72. }
  73. var _ Manager = &manager{}
  74. // NewManager creates new cpu manager based on provided policy
  75. func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string) (Manager, error) {
  76. var policy Policy
  77. switch policyName(cpuPolicyName) {
  78. case PolicyNone:
  79. policy = NewNonePolicy()
  80. case PolicyStatic:
  81. topo, err := topology.Discover(machineInfo)
  82. if err != nil {
  83. return nil, err
  84. }
  85. klog.Infof("[cpumanager] detected CPU topology: %v", topo)
  86. reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
  87. if !ok {
  88. // The static policy cannot initialize without this information.
  89. return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
  90. }
  91. if reservedCPUs.IsZero() {
  92. // The static policy requires this to be nonzero. Zero CPU reservation
  93. // would allow the shared pool to be completely exhausted. At that point
  94. // either we would violate our guarantee of exclusivity or need to evict
  95. // any pod that has at least one container that requires zero CPUs.
  96. // See the comments in policy_static.go for more details.
  97. return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
  98. }
  99. // Take the ceiling of the reservation, since fractional CPUs cannot be
  100. // exclusively allocated.
  101. reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
  102. numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
  103. policy = NewStaticPolicy(topo, numReservedCPUs)
  104. default:
  105. klog.Errorf("[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"", cpuPolicyName, PolicyNone)
  106. policy = NewNonePolicy()
  107. }
  108. stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name())
  109. if err != nil {
  110. return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err)
  111. }
  112. manager := &manager{
  113. policy: policy,
  114. reconcilePeriod: reconcilePeriod,
  115. state: stateImpl,
  116. machineInfo: machineInfo,
  117. nodeAllocatableReservation: nodeAllocatableReservation,
  118. }
  119. return manager, nil
  120. }
  121. func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
  122. klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
  123. klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
  124. m.activePods = activePods
  125. m.podStatusProvider = podStatusProvider
  126. m.containerRuntime = containerRuntime
  127. m.policy.Start(m.state)
  128. if m.policy.Name() == string(PolicyNone) {
  129. return
  130. }
  131. go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
  132. }
  133. func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
  134. m.Lock()
  135. err := m.policy.AddContainer(m.state, p, c, containerID)
  136. if err != nil {
  137. klog.Errorf("[cpumanager] AddContainer error: %v", err)
  138. m.Unlock()
  139. return err
  140. }
  141. cpus := m.state.GetCPUSetOrDefault(containerID)
  142. m.Unlock()
  143. if !cpus.IsEmpty() {
  144. err = m.updateContainerCPUSet(containerID, cpus)
  145. if err != nil {
  146. klog.Errorf("[cpumanager] AddContainer error: %v", err)
  147. m.Lock()
  148. err := m.policy.RemoveContainer(m.state, containerID)
  149. if err != nil {
  150. klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
  151. }
  152. m.Unlock()
  153. }
  154. return err
  155. }
  156. klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
  157. return nil
  158. }
  159. func (m *manager) RemoveContainer(containerID string) error {
  160. m.Lock()
  161. defer m.Unlock()
  162. err := m.policy.RemoveContainer(m.state, containerID)
  163. if err != nil {
  164. klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
  165. return err
  166. }
  167. return nil
  168. }
  169. func (m *manager) State() state.Reader {
  170. return m.state
  171. }
  172. type reconciledContainer struct {
  173. podName string
  174. containerName string
  175. containerID string
  176. }
  177. func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
  178. success = []reconciledContainer{}
  179. failure = []reconciledContainer{}
  180. activeContainers := make(map[string]*v1.Pod)
  181. for _, pod := range m.activePods() {
  182. allContainers := pod.Spec.InitContainers
  183. allContainers = append(allContainers, pod.Spec.Containers...)
  184. status, ok := m.podStatusProvider.GetPodStatus(pod.UID)
  185. for _, container := range allContainers {
  186. if !ok {
  187. klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
  188. failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
  189. break
  190. }
  191. containerID, err := findContainerIDByName(&status, container.Name)
  192. if err != nil {
  193. klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
  194. failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
  195. continue
  196. }
  197. // Check whether container is present in state, there may be 3 reasons why it's not present:
  198. // - policy does not want to track the container
  199. // - kubelet has just been restarted - and there is no previous state file
  200. // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
  201. if _, ok := m.state.GetCPUSet(containerID); !ok {
  202. if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
  203. klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
  204. err := m.AddContainer(pod, &container, containerID)
  205. if err != nil {
  206. klog.Errorf("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)", pod.Name, container.Name, containerID, err)
  207. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  208. continue
  209. }
  210. } else {
  211. // if DeletionTimestamp is set, pod has already been removed from state
  212. // skip the pod/container since it's not running and will be deleted soon
  213. continue
  214. }
  215. }
  216. activeContainers[containerID] = pod
  217. cset := m.state.GetCPUSetOrDefault(containerID)
  218. if cset.IsEmpty() {
  219. // NOTE: This should not happen outside of tests.
  220. klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
  221. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  222. continue
  223. }
  224. klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
  225. err = m.updateContainerCPUSet(containerID, cset)
  226. if err != nil {
  227. klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
  228. failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
  229. continue
  230. }
  231. success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
  232. }
  233. }
  234. for containerID := range m.state.GetCPUAssignments() {
  235. if pod, ok := activeContainers[containerID]; !ok {
  236. err := m.RemoveContainer(containerID)
  237. if err != nil {
  238. klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
  239. failure = append(failure, reconciledContainer{pod.Name, "", containerID})
  240. }
  241. }
  242. }
  243. return success, failure
  244. }
  245. func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
  246. allStatuses := status.InitContainerStatuses
  247. allStatuses = append(allStatuses, status.ContainerStatuses...)
  248. for _, container := range allStatuses {
  249. if container.Name == name && container.ContainerID != "" {
  250. cid := &kubecontainer.ContainerID{}
  251. err := cid.ParseString(container.ContainerID)
  252. if err != nil {
  253. return "", err
  254. }
  255. return cid.ID, nil
  256. }
  257. }
  258. return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
  259. }
  260. func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
  261. // TODO: Consider adding a `ResourceConfigForContainer` helper in
  262. // helpers_linux.go similar to what exists for pods.
  263. // It would be better to pass the full container resources here instead of
  264. // this patch-like partial resources.
  265. return m.containerRuntime.UpdateContainerResources(
  266. containerID,
  267. &runtimeapi.LinuxContainerResources{
  268. CpusetCpus: cpus.String(),
  269. })
  270. }