policy_static.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cpumanager
  14. import (
  15. "fmt"
  16. v1 "k8s.io/api/core/v1"
  17. "k8s.io/klog"
  18. v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
  19. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
  20. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
  21. "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
  22. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
  23. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
  24. )
  25. // PolicyStatic is the name of the static policy
  26. const PolicyStatic policyName = "static"
  27. // staticPolicy is a CPU manager policy that does not change CPU
  28. // assignments for exclusively pinned guaranteed containers after the main
  29. // container process starts.
  30. //
  31. // This policy allocates CPUs exclusively for a container if all the following
  32. // conditions are met:
  33. //
  34. // - The pod QoS class is Guaranteed.
  35. // - The CPU request is a positive integer.
  36. //
  37. // The static policy maintains the following sets of logical CPUs:
  38. //
  39. // - SHARED: Burstable, BestEffort, and non-integral Guaranteed containers
  40. // run here. Initially this contains all CPU IDs on the system. As
  41. // exclusive allocations are created and destroyed, this CPU set shrinks
  42. // and grows, accordingly. This is stored in the state as the default
  43. // CPU set.
  44. //
  45. // - RESERVED: A subset of the shared pool which is not exclusively
  46. // allocatable. The membership of this pool is static for the lifetime of
  47. // the Kubelet. The size of the reserved pool is
  48. // ceil(systemreserved.cpu + kubereserved.cpu).
  49. // Reserved CPUs are taken topologically starting with lowest-indexed
  50. // physical core, as reported by cAdvisor.
  51. //
  52. // - ASSIGNABLE: Equal to SHARED - RESERVED. Exclusive CPUs are allocated
  53. // from this pool.
  54. //
  55. // - EXCLUSIVE ALLOCATIONS: CPU sets assigned exclusively to one container.
  56. // These are stored as explicit assignments in the state.
  57. //
  58. // When an exclusive allocation is made, the static policy also updates the
  59. // default cpuset in the state abstraction. The CPU manager's periodic
  60. // reconcile loop takes care of rewriting the cpuset in cgroupfs for any
  61. // containers that may be running in the shared pool. For this reason,
  62. // applications running within exclusively-allocated containers must tolerate
  63. // potentially sharing their allocated CPUs for up to the CPU manager
  64. // reconcile period.
  65. type staticPolicy struct {
  66. // cpu socket topology
  67. topology *topology.CPUTopology
  68. // set of CPUs that is not available for exclusive assignment
  69. reserved cpuset.CPUSet
  70. // topology manager reference to get container Topology affinity
  71. affinity topologymanager.Store
  72. }
  73. // Ensure staticPolicy implements Policy interface
  74. var _ Policy = &staticPolicy{}
  75. // NewStaticPolicy returns a CPU manager policy that does not change CPU
  76. // assignments for exclusively pinned guaranteed containers after the main
  77. // container process starts.
  78. func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) {
  79. allCPUs := topology.CPUDetails.CPUs()
  80. var reserved cpuset.CPUSet
  81. if reservedCPUs.Size() > 0 {
  82. reserved = reservedCPUs
  83. } else {
  84. // takeByTopology allocates CPUs associated with low-numbered cores from
  85. // allCPUs.
  86. //
  87. // For example: Given a system with 8 CPUs available and HT enabled,
  88. // if numReservedCPUs=2, then reserved={0,4}
  89. reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs)
  90. }
  91. if reserved.Size() != numReservedCPUs {
  92. err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
  93. return nil, err
  94. }
  95. klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)
  96. return &staticPolicy{
  97. topology: topology,
  98. reserved: reserved,
  99. affinity: affinity,
  100. }, nil
  101. }
  102. func (p *staticPolicy) Name() string {
  103. return string(PolicyStatic)
  104. }
  105. func (p *staticPolicy) Start(s state.State) error {
  106. if err := p.validateState(s); err != nil {
  107. klog.Errorf("[cpumanager] static policy invalid state: %v, please drain node and remove policy state file", err)
  108. return err
  109. }
  110. return nil
  111. }
  112. func (p *staticPolicy) validateState(s state.State) error {
  113. tmpAssignments := s.GetCPUAssignments()
  114. tmpDefaultCPUset := s.GetDefaultCPUSet()
  115. // Default cpuset cannot be empty when assignments exist
  116. if tmpDefaultCPUset.IsEmpty() {
  117. if len(tmpAssignments) != 0 {
  118. return fmt.Errorf("default cpuset cannot be empty")
  119. }
  120. // state is empty initialize
  121. allCPUs := p.topology.CPUDetails.CPUs()
  122. s.SetDefaultCPUSet(allCPUs)
  123. return nil
  124. }
  125. // State has already been initialized from file (is not empty)
  126. // 1. Check if the reserved cpuset is not part of default cpuset because:
  127. // - kube/system reserved have changed (increased) - may lead to some containers not being able to start
  128. // - user tampered with file
  129. if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) {
  130. return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
  131. p.reserved.String(), tmpDefaultCPUset.String())
  132. }
  133. // 2. Check if state for static policy is consistent
  134. for pod := range tmpAssignments {
  135. for container, cset := range tmpAssignments[pod] {
  136. // None of the cpu in DEFAULT cset should be in s.assignments
  137. if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
  138. return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
  139. pod, container, cset.String(), tmpDefaultCPUset.String())
  140. }
  141. }
  142. }
  143. // 3. It's possible that the set of available CPUs has changed since
  144. // the state was written. This can be due to for example
  145. // offlining a CPU when kubelet is not running. If this happens,
  146. // CPU manager will run into trouble when later it tries to
  147. // assign non-existent CPUs to containers. Validate that the
  148. // topology that was received during CPU manager startup matches with
  149. // the set of CPUs stored in the state.
  150. totalKnownCPUs := tmpDefaultCPUset.Clone()
  151. tmpCPUSets := []cpuset.CPUSet{}
  152. for pod := range tmpAssignments {
  153. for _, cset := range tmpAssignments[pod] {
  154. tmpCPUSets = append(tmpCPUSets, cset)
  155. }
  156. }
  157. totalKnownCPUs = totalKnownCPUs.UnionAll(tmpCPUSets)
  158. if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
  159. return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
  160. p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
  161. }
  162. return nil
  163. }
  164. // assignableCPUs returns the set of unassigned CPUs minus the reserved set.
  165. func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
  166. return s.GetDefaultCPUSet().Difference(p.reserved)
  167. }
  168. func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
  169. if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
  170. klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s)", pod.Name, container.Name)
  171. // container belongs in an exclusively allocated pool
  172. if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
  173. klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
  174. return nil
  175. }
  176. // Call Topology Manager to get the aligned socket affinity across all hint providers.
  177. hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
  178. klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)
  179. // Allocate CPUs according to the NUMA affinity contained in the hint.
  180. cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
  181. if err != nil {
  182. klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
  183. return err
  184. }
  185. s.SetCPUSet(string(pod.UID), container.Name, cpuset)
  186. }
  187. // container belongs in the shared pool (nothing to do; use default cpuset)
  188. return nil
  189. }
  190. func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
  191. klog.Infof("[cpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName)
  192. if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
  193. s.Delete(podUID, containerName)
  194. // Mutate the shared pool, adding released cpus.
  195. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
  196. }
  197. return nil
  198. }
  199. func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) {
  200. klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
  201. // If there are aligned CPUs in numaAffinity, attempt to take those first.
  202. result := cpuset.NewCPUSet()
  203. if numaAffinity != nil {
  204. alignedCPUs := cpuset.NewCPUSet()
  205. for _, numaNodeID := range numaAffinity.GetBits() {
  206. alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
  207. }
  208. numAlignedToAlloc := alignedCPUs.Size()
  209. if numCPUs < numAlignedToAlloc {
  210. numAlignedToAlloc = numCPUs
  211. }
  212. alignedCPUs, err := takeByTopology(p.topology, alignedCPUs, numAlignedToAlloc)
  213. if err != nil {
  214. return cpuset.NewCPUSet(), err
  215. }
  216. result = result.Union(alignedCPUs)
  217. }
  218. // Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
  219. remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
  220. if err != nil {
  221. return cpuset.NewCPUSet(), err
  222. }
  223. result = result.Union(remainingCPUs)
  224. // Remove allocated CPUs from the shared CPUSet.
  225. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
  226. klog.Infof("[cpumanager] allocateCPUs: returning \"%v\"", result)
  227. return result, nil
  228. }
  229. func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
  230. if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
  231. return 0
  232. }
  233. cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
  234. if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() {
  235. return 0
  236. }
  237. // Safe downcast to do for all systems with < 2.1 billion CPUs.
  238. // Per the language spec, `int` is guaranteed to be at least 32 bits wide.
  239. // https://golang.org/ref/spec#Numeric_types
  240. return int(cpuQuantity.Value())
  241. }
  242. func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
  243. // If there are no CPU resources requested for this container, we do not
  244. // generate any topology hints.
  245. if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
  246. return nil
  247. }
  248. // Get a count of how many guaranteed CPUs have been requested.
  249. requested := p.guaranteedCPUs(pod, container)
  250. // If there are no guaranteed CPUs being requested, we do not generate
  251. // any topology hints. This can happen, for example, because init
  252. // containers don't have to have guaranteed CPUs in order for the pod
  253. // to still be in the Guaranteed QOS tier.
  254. if requested == 0 {
  255. return nil
  256. }
  257. // Short circuit to regenerate the same hints if there are already
  258. // guaranteed CPUs allocated to the Container. This might happen after a
  259. // kubelet restart, for example.
  260. if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
  261. if allocated.Size() != requested {
  262. klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size())
  263. return map[string][]topologymanager.TopologyHint{
  264. string(v1.ResourceCPU): {},
  265. }
  266. }
  267. klog.Infof("[cpumanager] Regenerating TopologyHints for CPUs already allocated to (pod %v, container %v)", string(pod.UID), container.Name)
  268. return map[string][]topologymanager.TopologyHint{
  269. string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, requested),
  270. }
  271. }
  272. // Get a list of available CPUs.
  273. available := p.assignableCPUs(s)
  274. // Generate hints.
  275. cpuHints := p.generateCPUTopologyHints(available, requested)
  276. klog.Infof("[cpumanager] TopologyHints generated for pod '%v', container '%v': %v", pod.Name, container.Name, cpuHints)
  277. return map[string][]topologymanager.TopologyHint{
  278. string(v1.ResourceCPU): cpuHints,
  279. }
  280. }
  281. // generateCPUtopologyHints generates a set of TopologyHints given the set of
  282. // available CPUs and the number of CPUs being requested.
  283. //
  284. // It follows the convention of marking all hints that have the same number of
  285. // bits set as the narrowest matching NUMANodeAffinity with 'Preferred: true', and
  286. // marking all others with 'Preferred: false'.
  287. func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint {
  288. // Initialize minAffinitySize to include all NUMA Nodes.
  289. minAffinitySize := p.topology.CPUDetails.NUMANodes().Size()
  290. // Initialize minSocketsOnMinAffinity to include all Sockets.
  291. minSocketsOnMinAffinity := p.topology.CPUDetails.Sockets().Size()
  292. // Iterate through all combinations of socket bitmask and build hints from them.
  293. hints := []topologymanager.TopologyHint{}
  294. bitmask.IterateBitMasks(p.topology.CPUDetails.NUMANodes().ToSlice(), func(mask bitmask.BitMask) {
  295. // First, update minAffinitySize and minSocketsOnMinAffinity for the
  296. // current request size.
  297. cpusInMask := p.topology.CPUDetails.CPUsInNUMANodes(mask.GetBits()...).Size()
  298. socketsInMask := p.topology.CPUDetails.SocketsInNUMANodes(mask.GetBits()...).Size()
  299. if cpusInMask >= request && mask.Count() < minAffinitySize {
  300. minAffinitySize = mask.Count()
  301. if socketsInMask < minSocketsOnMinAffinity {
  302. minSocketsOnMinAffinity = socketsInMask
  303. }
  304. }
  305. // Then check to see if we have enough CPUs available on the current
  306. // socket bitmask to satisfy the CPU request.
  307. numMatching := 0
  308. for _, c := range availableCPUs.ToSlice() {
  309. if mask.IsSet(p.topology.CPUDetails[c].NUMANodeID) {
  310. numMatching++
  311. }
  312. }
  313. // If we don't, then move onto the next combination.
  314. if numMatching < request {
  315. return
  316. }
  317. // Otherwise, create a new hint from the socket bitmask and add it to the
  318. // list of hints. We set all hint preferences to 'false' on the first
  319. // pass through.
  320. hints = append(hints, topologymanager.TopologyHint{
  321. NUMANodeAffinity: mask,
  322. Preferred: false,
  323. })
  324. })
  325. // Loop back through all hints and update the 'Preferred' field based on
  326. // counting the number of bits sets in the affinity mask and comparing it
  327. // to the minAffinitySize. Only those with an equal number of bits set (and
  328. // with a minimal set of sockets) will be considered preferred.
  329. for i := range hints {
  330. if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
  331. nodes := hints[i].NUMANodeAffinity.GetBits()
  332. numSockets := p.topology.CPUDetails.SocketsInNUMANodes(nodes...).Size()
  333. if numSockets == minSocketsOnMinAffinity {
  334. hints[i].Preferred = true
  335. }
  336. }
  337. }
  338. return hints
  339. }