cpu_assignment.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cpumanager
  14. import (
  15. "fmt"
  16. "sort"
  17. "k8s.io/klog"
  18. "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
  19. "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
  20. )
  21. type cpuAccumulator struct {
  22. topo *topology.CPUTopology
  23. details topology.CPUDetails
  24. numCPUsNeeded int
  25. result cpuset.CPUSet
  26. }
  27. func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
  28. return &cpuAccumulator{
  29. topo: topo,
  30. details: topo.CPUDetails.KeepOnly(availableCPUs),
  31. numCPUsNeeded: numCPUs,
  32. result: cpuset.NewCPUSet(),
  33. }
  34. }
  35. func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
  36. a.result = a.result.Union(cpus)
  37. a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
  38. a.numCPUsNeeded -= cpus.Size()
  39. }
  40. // Returns true if the supplied socket is fully available in `topoDetails`.
  41. func (a *cpuAccumulator) isSocketFree(socketID int) bool {
  42. return a.details.CPUsInSocket(socketID).Size() == a.topo.CPUsPerSocket()
  43. }
  44. // Returns true if the supplied core is fully available in `topoDetails`.
  45. func (a *cpuAccumulator) isCoreFree(coreID int) bool {
  46. return a.details.CPUsInCore(coreID).Size() == a.topo.CPUsPerCore()
  47. }
  48. // Returns free socket IDs as a slice sorted by:
  49. // - socket ID, ascending.
  50. func (a *cpuAccumulator) freeSockets() []int {
  51. return a.details.Sockets().Filter(a.isSocketFree).ToSlice()
  52. }
  53. // Returns core IDs as a slice sorted by:
  54. // - the number of whole available cores on the socket, ascending
  55. // - socket ID, ascending
  56. // - core ID, ascending
  57. func (a *cpuAccumulator) freeCores() []int {
  58. socketIDs := a.details.Sockets().ToSliceNoSort()
  59. sort.Slice(socketIDs,
  60. func(i, j int) bool {
  61. iCores := a.details.CoresInSocket(socketIDs[i]).Filter(a.isCoreFree)
  62. jCores := a.details.CoresInSocket(socketIDs[j]).Filter(a.isCoreFree)
  63. return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
  64. })
  65. coreIDs := []int{}
  66. for _, s := range socketIDs {
  67. coreIDs = append(coreIDs, a.details.CoresInSocket(s).Filter(a.isCoreFree).ToSlice()...)
  68. }
  69. return coreIDs
  70. }
  71. // Returns CPU IDs as a slice sorted by:
  72. // - socket affinity with result
  73. // - number of CPUs available on the same socket
  74. // - number of CPUs available on the same core
  75. // - socket ID.
  76. // - core ID.
  77. func (a *cpuAccumulator) freeCPUs() []int {
  78. result := []int{}
  79. cores := a.details.Cores().ToSlice()
  80. sort.Slice(
  81. cores,
  82. func(i, j int) bool {
  83. iCore := cores[i]
  84. jCore := cores[j]
  85. iCPUs := a.topo.CPUDetails.CPUsInCore(iCore).ToSlice()
  86. jCPUs := a.topo.CPUDetails.CPUsInCore(jCore).ToSlice()
  87. iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
  88. jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
  89. // Compute the number of CPUs in the result reside on the same socket
  90. // as each core.
  91. iSocketColoScore := a.topo.CPUDetails.CPUsInSocket(iSocket).Intersection(a.result).Size()
  92. jSocketColoScore := a.topo.CPUDetails.CPUsInSocket(jSocket).Intersection(a.result).Size()
  93. // Compute the number of available CPUs available on the same socket
  94. // as each core.
  95. iSocketFreeScore := a.details.CPUsInSocket(iSocket).Size()
  96. jSocketFreeScore := a.details.CPUsInSocket(jSocket).Size()
  97. // Compute the number of available CPUs on each core.
  98. iCoreFreeScore := a.details.CPUsInCore(iCore).Size()
  99. jCoreFreeScore := a.details.CPUsInCore(jCore).Size()
  100. return iSocketColoScore > jSocketColoScore ||
  101. iSocketFreeScore < jSocketFreeScore ||
  102. iCoreFreeScore < jCoreFreeScore ||
  103. iSocket < jSocket ||
  104. iCore < jCore
  105. })
  106. // For each core, append sorted CPU IDs to result.
  107. for _, core := range cores {
  108. result = append(result, a.details.CPUsInCore(core).ToSlice()...)
  109. }
  110. return result
  111. }
  112. func (a *cpuAccumulator) needs(n int) bool {
  113. return a.numCPUsNeeded >= n
  114. }
  115. func (a *cpuAccumulator) isSatisfied() bool {
  116. return a.numCPUsNeeded < 1
  117. }
  118. func (a *cpuAccumulator) isFailed() bool {
  119. return a.numCPUsNeeded > a.details.CPUs().Size()
  120. }
  121. func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
  122. acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
  123. if acc.isSatisfied() {
  124. return acc.result, nil
  125. }
  126. if acc.isFailed() {
  127. return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
  128. }
  129. // Algorithm: topology-aware best-fit
  130. // 1. Acquire whole sockets, if available and the container requires at
  131. // least a socket's-worth of CPUs.
  132. if acc.needs(acc.topo.CPUsPerSocket()) {
  133. for _, s := range acc.freeSockets() {
  134. klog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s)
  135. acc.take(acc.details.CPUsInSocket(s))
  136. if acc.isSatisfied() {
  137. return acc.result, nil
  138. }
  139. if !acc.needs(acc.topo.CPUsPerSocket()) {
  140. break
  141. }
  142. }
  143. }
  144. // 2. Acquire whole cores, if available and the container requires at least
  145. // a core's-worth of CPUs.
  146. if acc.needs(acc.topo.CPUsPerCore()) {
  147. for _, c := range acc.freeCores() {
  148. klog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c)
  149. acc.take(acc.details.CPUsInCore(c))
  150. if acc.isSatisfied() {
  151. return acc.result, nil
  152. }
  153. if !acc.needs(acc.topo.CPUsPerCore()) {
  154. break
  155. }
  156. }
  157. }
  158. // 3. Acquire single threads, preferring to fill partially-allocated cores
  159. // on the same sockets as the whole cores we have already taken in this
  160. // allocation.
  161. for _, c := range acc.freeCPUs() {
  162. klog.V(4).Infof("[cpumanager] takeByTopology: claiming CPU [%d]", c)
  163. if acc.needs(1) {
  164. acc.take(cpuset.NewCPUSet(c))
  165. }
  166. if acc.isSatisfied() {
  167. return acc.result, nil
  168. }
  169. }
  170. return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
  171. }