123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cpumanager
- import (
- "fmt"
- "sort"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
- "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
- )
- type cpuAccumulator struct {
- topo *topology.CPUTopology
- details topology.CPUDetails
- numCPUsNeeded int
- result cpuset.CPUSet
- }
- func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
- return &cpuAccumulator{
- topo: topo,
- details: topo.CPUDetails.KeepOnly(availableCPUs),
- numCPUsNeeded: numCPUs,
- result: cpuset.NewCPUSet(),
- }
- }
- func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
- a.result = a.result.Union(cpus)
- a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result))
- a.numCPUsNeeded -= cpus.Size()
- }
- // Returns true if the supplied socket is fully available in `topoDetails`.
- func (a *cpuAccumulator) isSocketFree(socketID int) bool {
- return a.details.CPUsInSocket(socketID).Size() == a.topo.CPUsPerSocket()
- }
- // Returns true if the supplied core is fully available in `topoDetails`.
- func (a *cpuAccumulator) isCoreFree(coreID int) bool {
- return a.details.CPUsInCore(coreID).Size() == a.topo.CPUsPerCore()
- }
- // Returns free socket IDs as a slice sorted by:
- // - socket ID, ascending.
- func (a *cpuAccumulator) freeSockets() []int {
- return a.details.Sockets().Filter(a.isSocketFree).ToSlice()
- }
- // Returns core IDs as a slice sorted by:
- // - the number of whole available cores on the socket, ascending
- // - socket ID, ascending
- // - core ID, ascending
- func (a *cpuAccumulator) freeCores() []int {
- socketIDs := a.details.Sockets().ToSliceNoSort()
- sort.Slice(socketIDs,
- func(i, j int) bool {
- iCores := a.details.CoresInSocket(socketIDs[i]).Filter(a.isCoreFree)
- jCores := a.details.CoresInSocket(socketIDs[j]).Filter(a.isCoreFree)
- return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j]
- })
- coreIDs := []int{}
- for _, s := range socketIDs {
- coreIDs = append(coreIDs, a.details.CoresInSocket(s).Filter(a.isCoreFree).ToSlice()...)
- }
- return coreIDs
- }
- // Returns CPU IDs as a slice sorted by:
- // - socket affinity with result
- // - number of CPUs available on the same socket
- // - number of CPUs available on the same core
- // - socket ID.
- // - core ID.
- func (a *cpuAccumulator) freeCPUs() []int {
- result := []int{}
- cores := a.details.Cores().ToSlice()
- sort.Slice(
- cores,
- func(i, j int) bool {
- iCore := cores[i]
- jCore := cores[j]
- iCPUs := a.topo.CPUDetails.CPUsInCore(iCore).ToSlice()
- jCPUs := a.topo.CPUDetails.CPUsInCore(jCore).ToSlice()
- iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID
- jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID
- // Compute the number of CPUs in the result reside on the same socket
- // as each core.
- iSocketColoScore := a.topo.CPUDetails.CPUsInSocket(iSocket).Intersection(a.result).Size()
- jSocketColoScore := a.topo.CPUDetails.CPUsInSocket(jSocket).Intersection(a.result).Size()
- // Compute the number of available CPUs available on the same socket
- // as each core.
- iSocketFreeScore := a.details.CPUsInSocket(iSocket).Size()
- jSocketFreeScore := a.details.CPUsInSocket(jSocket).Size()
- // Compute the number of available CPUs on each core.
- iCoreFreeScore := a.details.CPUsInCore(iCore).Size()
- jCoreFreeScore := a.details.CPUsInCore(jCore).Size()
- return iSocketColoScore > jSocketColoScore ||
- iSocketFreeScore < jSocketFreeScore ||
- iCoreFreeScore < jCoreFreeScore ||
- iSocket < jSocket ||
- iCore < jCore
- })
- // For each core, append sorted CPU IDs to result.
- for _, core := range cores {
- result = append(result, a.details.CPUsInCore(core).ToSlice()...)
- }
- return result
- }
- func (a *cpuAccumulator) needs(n int) bool {
- return a.numCPUsNeeded >= n
- }
- func (a *cpuAccumulator) isSatisfied() bool {
- return a.numCPUsNeeded < 1
- }
- func (a *cpuAccumulator) isFailed() bool {
- return a.numCPUsNeeded > a.details.CPUs().Size()
- }
- func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
- acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
- if acc.isSatisfied() {
- return acc.result, nil
- }
- if acc.isFailed() {
- return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
- }
- // Algorithm: topology-aware best-fit
- // 1. Acquire whole sockets, if available and the container requires at
- // least a socket's-worth of CPUs.
- if acc.needs(acc.topo.CPUsPerSocket()) {
- for _, s := range acc.freeSockets() {
- klog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s)
- acc.take(acc.details.CPUsInSocket(s))
- if acc.isSatisfied() {
- return acc.result, nil
- }
- if !acc.needs(acc.topo.CPUsPerSocket()) {
- break
- }
- }
- }
- // 2. Acquire whole cores, if available and the container requires at least
- // a core's-worth of CPUs.
- if acc.needs(acc.topo.CPUsPerCore()) {
- for _, c := range acc.freeCores() {
- klog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c)
- acc.take(acc.details.CPUsInCore(c))
- if acc.isSatisfied() {
- return acc.result, nil
- }
- if !acc.needs(acc.topo.CPUsPerCore()) {
- break
- }
- }
- }
- // 3. Acquire single threads, preferring to fill partially-allocated cores
- // on the same sockets as the whole cores we have already taken in this
- // allocation.
- for _, c := range acc.freeCPUs() {
- klog.V(4).Infof("[cpumanager] takeByTopology: claiming CPU [%d]", c)
- if acc.needs(1) {
- acc.take(cpuset.NewCPUSet(c))
- }
- if acc.isSatisfied() {
- return acc.result, nil
- }
- }
- return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
- }
|