manager.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package devicemanager
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "os"
  19. "path/filepath"
  20. "sort"
  21. "sync"
  22. "time"
  23. "google.golang.org/grpc"
  24. "k8s.io/klog"
  25. v1 "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/resource"
  27. errorsutil "k8s.io/apimachinery/pkg/util/errors"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. utilfeature "k8s.io/apiserver/pkg/util/feature"
  30. pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
  31. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  32. "k8s.io/kubernetes/pkg/features"
  33. podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
  34. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  35. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
  36. cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
  37. "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
  38. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
  39. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
  40. "k8s.io/kubernetes/pkg/kubelet/config"
  41. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  42. "k8s.io/kubernetes/pkg/kubelet/metrics"
  43. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  44. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  45. "k8s.io/kubernetes/pkg/util/selinux"
  46. )
  47. // ActivePodsFunc is a function that returns a list of pods to reconcile.
  48. type ActivePodsFunc func() []*v1.Pod
  49. // monitorCallback is the function called when a device's health state changes,
  50. // or new devices are reported, or old devices are deleted.
  51. // Updated contains the most recent state of the Device.
  52. type monitorCallback func(resourceName string, devices []pluginapi.Device)
  53. // ManagerImpl is the structure in charge of managing Device Plugins.
  54. type ManagerImpl struct {
  55. socketname string
  56. socketdir string
  57. endpoints map[string]endpointInfo // Key is ResourceName
  58. mutex sync.Mutex
  59. server *grpc.Server
  60. wg sync.WaitGroup
  61. // activePods is a method for listing active pods on the node
  62. // so the amount of pluginResources requested by existing pods
  63. // could be counted when updating allocated devices
  64. activePods ActivePodsFunc
  65. // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
  66. // We use it to determine when we can purge inactive pods from checkpointed state.
  67. sourcesReady config.SourcesReady
  68. // callback is used for updating devices' states in one time call.
  69. // e.g. a new device is advertised, two old devices are deleted and a running device fails.
  70. callback monitorCallback
  71. // allDevices is a map by resource name of all the devices currently registered to the device manager
  72. allDevices map[string]map[string]pluginapi.Device
  73. // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
  74. healthyDevices map[string]sets.String
  75. // unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
  76. unhealthyDevices map[string]sets.String
  77. // allocatedDevices contains allocated deviceIds, keyed by resourceName.
  78. allocatedDevices map[string]sets.String
  79. // podDevices contains pod to allocated device mapping.
  80. podDevices podDevices
  81. checkpointManager checkpointmanager.CheckpointManager
  82. // List of NUMA Nodes available on the underlying machine
  83. numaNodes []int
  84. // Store of Topology Affinties that the Device Manager can query.
  85. topologyAffinityStore topologymanager.Store
  86. }
  87. type endpointInfo struct {
  88. e endpoint
  89. opts *pluginapi.DevicePluginOptions
  90. }
  91. type sourcesReadyStub struct{}
  92. func (s *sourcesReadyStub) AddSource(source string) {}
  93. func (s *sourcesReadyStub) AllReady() bool { return true }
  94. // NewManagerImpl creates a new manager.
  95. func NewManagerImpl(numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
  96. return newManagerImpl(pluginapi.KubeletSocket, numaNodeInfo, topologyAffinityStore)
  97. }
  98. func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
  99. klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
  100. if socketPath == "" || !filepath.IsAbs(socketPath) {
  101. return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
  102. }
  103. var numaNodes []int
  104. for node := range numaNodeInfo {
  105. numaNodes = append(numaNodes, node)
  106. }
  107. dir, file := filepath.Split(socketPath)
  108. manager := &ManagerImpl{
  109. endpoints: make(map[string]endpointInfo),
  110. socketname: file,
  111. socketdir: dir,
  112. allDevices: make(map[string]map[string]pluginapi.Device),
  113. healthyDevices: make(map[string]sets.String),
  114. unhealthyDevices: make(map[string]sets.String),
  115. allocatedDevices: make(map[string]sets.String),
  116. podDevices: make(podDevices),
  117. numaNodes: numaNodes,
  118. topologyAffinityStore: topologyAffinityStore,
  119. }
  120. manager.callback = manager.genericDeviceUpdateCallback
  121. // The following structures are populated with real implementations in manager.Start()
  122. // Before that, initializes them to perform no-op operations.
  123. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
  124. manager.sourcesReady = &sourcesReadyStub{}
  125. checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
  126. if err != nil {
  127. return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
  128. }
  129. manager.checkpointManager = checkpointManager
  130. return manager, nil
  131. }
  132. func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
  133. m.mutex.Lock()
  134. m.healthyDevices[resourceName] = sets.NewString()
  135. m.unhealthyDevices[resourceName] = sets.NewString()
  136. m.allDevices[resourceName] = make(map[string]pluginapi.Device)
  137. for _, dev := range devices {
  138. m.allDevices[resourceName][dev.ID] = dev
  139. if dev.Health == pluginapi.Healthy {
  140. m.healthyDevices[resourceName].Insert(dev.ID)
  141. } else {
  142. m.unhealthyDevices[resourceName].Insert(dev.ID)
  143. }
  144. }
  145. m.mutex.Unlock()
  146. if err := m.writeCheckpoint(); err != nil {
  147. klog.Errorf("writing checkpoint encountered %v", err)
  148. }
  149. }
  150. func (m *ManagerImpl) removeContents(dir string) error {
  151. d, err := os.Open(dir)
  152. if err != nil {
  153. return err
  154. }
  155. defer d.Close()
  156. names, err := d.Readdirnames(-1)
  157. if err != nil {
  158. return err
  159. }
  160. var errs []error
  161. for _, name := range names {
  162. filePath := filepath.Join(dir, name)
  163. if filePath == m.checkpointFile() {
  164. continue
  165. }
  166. stat, err := os.Stat(filePath)
  167. if err != nil {
  168. klog.Errorf("Failed to stat file %s: %v", filePath, err)
  169. continue
  170. }
  171. if stat.IsDir() {
  172. continue
  173. }
  174. err = os.RemoveAll(filePath)
  175. if err != nil {
  176. errs = append(errs, err)
  177. klog.Errorf("Failed to remove file %s: %v", filePath, err)
  178. continue
  179. }
  180. }
  181. return errorsutil.NewAggregate(errs)
  182. }
  183. // checkpointFile returns device plugin checkpoint file path.
  184. func (m *ManagerImpl) checkpointFile() string {
  185. return filepath.Join(m.socketdir, kubeletDeviceManagerCheckpoint)
  186. }
  187. // Start starts the Device Plugin Manager and start initialization of
  188. // podDevices and allocatedDevices information from checkpointed state and
  189. // starts device plugin registration service.
  190. func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
  191. klog.V(2).Infof("Starting Device Plugin manager")
  192. m.activePods = activePods
  193. m.sourcesReady = sourcesReady
  194. // Loads in allocatedDevices information from disk.
  195. err := m.readCheckpoint()
  196. if err != nil {
  197. klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
  198. }
  199. socketPath := filepath.Join(m.socketdir, m.socketname)
  200. if err = os.MkdirAll(m.socketdir, 0750); err != nil {
  201. return err
  202. }
  203. if selinux.SELinuxEnabled() {
  204. if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
  205. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
  206. }
  207. }
  208. // Removes all stale sockets in m.socketdir. Device plugins can monitor
  209. // this and use it as a signal to re-register with the new Kubelet.
  210. if err := m.removeContents(m.socketdir); err != nil {
  211. klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
  212. }
  213. s, err := net.Listen("unix", socketPath)
  214. if err != nil {
  215. klog.Errorf(errListenSocket+" %v", err)
  216. return err
  217. }
  218. m.wg.Add(1)
  219. m.server = grpc.NewServer([]grpc.ServerOption{}...)
  220. pluginapi.RegisterRegistrationServer(m.server, m)
  221. go func() {
  222. defer m.wg.Done()
  223. m.server.Serve(s)
  224. }()
  225. klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
  226. return nil
  227. }
  228. // GetWatcherHandler returns the plugin handler
  229. func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
  230. if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
  231. klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
  232. } else {
  233. f.Close()
  234. klog.V(4).Infof("created deprecation file %s", f.Name())
  235. }
  236. return cache.PluginHandler(m)
  237. }
  238. // ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
  239. func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
  240. klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
  241. if !m.isVersionCompatibleWithPlugin(versions) {
  242. return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
  243. }
  244. if !v1helper.IsExtendedResourceName(v1.ResourceName(pluginName)) {
  245. return fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, pluginName))
  246. }
  247. return nil
  248. }
  249. // RegisterPlugin starts the endpoint and registers it
  250. // TODO: Start the endpoint and wait for the First ListAndWatch call
  251. // before registering the plugin
  252. func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
  253. klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
  254. e, err := newEndpointImpl(endpoint, pluginName, m.callback)
  255. if err != nil {
  256. return fmt.Errorf("failed to dial device plugin with socketPath %s: %v", endpoint, err)
  257. }
  258. options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
  259. if err != nil {
  260. return fmt.Errorf("failed to get device plugin options: %v", err)
  261. }
  262. m.registerEndpoint(pluginName, options, e)
  263. go m.runEndpoint(pluginName, e)
  264. return nil
  265. }
  266. // DeRegisterPlugin deregisters the plugin
  267. // TODO work on the behavior for deregistering plugins
  268. // e.g: Should we delete the resource
  269. func (m *ManagerImpl) DeRegisterPlugin(pluginName string) {
  270. m.mutex.Lock()
  271. defer m.mutex.Unlock()
  272. // Note: This will mark the resource unhealthy as per the behavior
  273. // in runEndpoint
  274. if eI, ok := m.endpoints[pluginName]; ok {
  275. eI.e.stop()
  276. }
  277. }
  278. func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
  279. // TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support
  280. // multiple versions in the future, we may need to extend this function to return a supported version.
  281. // E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
  282. // this function should return v1beta1
  283. for _, version := range versions {
  284. for _, supportedVersion := range pluginapi.SupportedVersions {
  285. if version == supportedVersion {
  286. return true
  287. }
  288. }
  289. }
  290. return false
  291. }
  292. func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
  293. devicesToReuse := make(map[string]sets.String)
  294. for _, container := range pod.Spec.InitContainers {
  295. if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
  296. return err
  297. }
  298. m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
  299. }
  300. for _, container := range pod.Spec.Containers {
  301. if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
  302. return err
  303. }
  304. m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
  305. }
  306. return nil
  307. }
  308. // Allocate is the call that you can use to allocate a set of devices
  309. // from the registered device plugins.
  310. func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
  311. pod := attrs.Pod
  312. err := m.allocatePodResources(pod)
  313. if err != nil {
  314. klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
  315. return err
  316. }
  317. m.mutex.Lock()
  318. defer m.mutex.Unlock()
  319. // quick return if no pluginResources requested
  320. if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
  321. return nil
  322. }
  323. m.sanitizeNodeAllocatable(node)
  324. return nil
  325. }
  326. // Register registers a device plugin.
  327. func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
  328. klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
  329. metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
  330. var versionCompatible bool
  331. for _, v := range pluginapi.SupportedVersions {
  332. if r.Version == v {
  333. versionCompatible = true
  334. break
  335. }
  336. }
  337. if !versionCompatible {
  338. errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
  339. klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
  340. return &pluginapi.Empty{}, fmt.Errorf(errorString)
  341. }
  342. if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
  343. errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
  344. klog.Infof("Bad registration request from device plugin: %s", errorString)
  345. return &pluginapi.Empty{}, fmt.Errorf(errorString)
  346. }
  347. // TODO: for now, always accepts newest device plugin. Later may consider to
  348. // add some policies here, e.g., verify whether an old device plugin with the
  349. // same resource name is still alive to determine whether we want to accept
  350. // the new registration.
  351. go m.addEndpoint(r)
  352. return &pluginapi.Empty{}, nil
  353. }
  354. // Stop is the function that can stop the gRPC server.
  355. // Can be called concurrently, more than once, and is safe to call
  356. // without a prior Start.
  357. func (m *ManagerImpl) Stop() error {
  358. m.mutex.Lock()
  359. defer m.mutex.Unlock()
  360. for _, eI := range m.endpoints {
  361. eI.e.stop()
  362. }
  363. if m.server == nil {
  364. return nil
  365. }
  366. m.server.Stop()
  367. m.wg.Wait()
  368. m.server = nil
  369. return nil
  370. }
  371. func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
  372. m.mutex.Lock()
  373. defer m.mutex.Unlock()
  374. m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
  375. klog.V(2).Infof("Registered endpoint %v", e)
  376. }
  377. func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
  378. e.run()
  379. e.stop()
  380. m.mutex.Lock()
  381. defer m.mutex.Unlock()
  382. if old, ok := m.endpoints[resourceName]; ok && old.e == e {
  383. m.markResourceUnhealthy(resourceName)
  384. }
  385. klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
  386. }
  387. func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
  388. new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
  389. if err != nil {
  390. klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
  391. return
  392. }
  393. m.registerEndpoint(r.ResourceName, r.Options, new)
  394. go func() {
  395. m.runEndpoint(r.ResourceName, new)
  396. }()
  397. }
  398. func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
  399. klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
  400. healthyDevices := sets.NewString()
  401. if _, ok := m.healthyDevices[resourceName]; ok {
  402. healthyDevices = m.healthyDevices[resourceName]
  403. m.healthyDevices[resourceName] = sets.NewString()
  404. }
  405. if _, ok := m.unhealthyDevices[resourceName]; !ok {
  406. m.unhealthyDevices[resourceName] = sets.NewString()
  407. }
  408. m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
  409. }
  410. // GetCapacity is expected to be called when Kubelet updates its node status.
  411. // The first returned variable contains the registered device plugin resource capacity.
  412. // The second returned variable contains the registered device plugin resource allocatable.
  413. // The third returned variable contains previously registered resources that are no longer active.
  414. // Kubelet uses this information to update resource capacity/allocatable in its node status.
  415. // After the call, device plugin can remove the inactive resources from its internal list as the
  416. // change is already reflected in Kubelet node status.
  417. // Note in the special case after Kubelet restarts, device plugin resource capacities can
  418. // temporarily drop to zero till corresponding device plugins re-register. This is OK because
  419. // cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
  420. // capacity for already allocated pods so that they can continue to run. However, new pods
  421. // requiring device plugin resources will not be scheduled till device plugin re-registers.
  422. func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
  423. needsUpdateCheckpoint := false
  424. var capacity = v1.ResourceList{}
  425. var allocatable = v1.ResourceList{}
  426. deletedResources := sets.NewString()
  427. m.mutex.Lock()
  428. for resourceName, devices := range m.healthyDevices {
  429. eI, ok := m.endpoints[resourceName]
  430. if (ok && eI.e.stopGracePeriodExpired()) || !ok {
  431. // The resources contained in endpoints and (un)healthyDevices
  432. // should always be consistent. Otherwise, we run with the risk
  433. // of failing to garbage collect non-existing resources or devices.
  434. if !ok {
  435. klog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
  436. }
  437. delete(m.endpoints, resourceName)
  438. delete(m.healthyDevices, resourceName)
  439. deletedResources.Insert(resourceName)
  440. needsUpdateCheckpoint = true
  441. } else {
  442. capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  443. allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  444. }
  445. }
  446. for resourceName, devices := range m.unhealthyDevices {
  447. eI, ok := m.endpoints[resourceName]
  448. if (ok && eI.e.stopGracePeriodExpired()) || !ok {
  449. if !ok {
  450. klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
  451. }
  452. delete(m.endpoints, resourceName)
  453. delete(m.unhealthyDevices, resourceName)
  454. deletedResources.Insert(resourceName)
  455. needsUpdateCheckpoint = true
  456. } else {
  457. capacityCount := capacity[v1.ResourceName(resourceName)]
  458. unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  459. capacityCount.Add(unhealthyCount)
  460. capacity[v1.ResourceName(resourceName)] = capacityCount
  461. }
  462. }
  463. m.mutex.Unlock()
  464. if needsUpdateCheckpoint {
  465. if err := m.writeCheckpoint(); err != nil {
  466. klog.Errorf("writing checkpoint encountered %v", err)
  467. }
  468. }
  469. return capacity, allocatable, deletedResources.UnsortedList()
  470. }
  471. // Checkpoints device to container allocation information to disk.
  472. func (m *ManagerImpl) writeCheckpoint() error {
  473. m.mutex.Lock()
  474. registeredDevs := make(map[string][]string)
  475. for resource, devices := range m.healthyDevices {
  476. registeredDevs[resource] = devices.UnsortedList()
  477. }
  478. data := checkpoint.New(m.podDevices.toCheckpointData(),
  479. registeredDevs)
  480. m.mutex.Unlock()
  481. err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
  482. if err != nil {
  483. err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
  484. klog.Warning(err2)
  485. return err2
  486. }
  487. return nil
  488. }
  489. // Reads device to container allocation information from disk, and populates
  490. // m.allocatedDevices accordingly.
  491. func (m *ManagerImpl) readCheckpoint() error {
  492. registeredDevs := make(map[string][]string)
  493. devEntries := make([]checkpoint.PodDevicesEntry, 0)
  494. cp := checkpoint.New(devEntries, registeredDevs)
  495. err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
  496. if err != nil {
  497. if err == errors.ErrCheckpointNotFound {
  498. klog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
  499. return nil
  500. }
  501. return err
  502. }
  503. m.mutex.Lock()
  504. defer m.mutex.Unlock()
  505. podDevices, registeredDevs := cp.GetData()
  506. m.podDevices.fromCheckpointData(podDevices)
  507. m.allocatedDevices = m.podDevices.devices()
  508. for resource := range registeredDevs {
  509. // During start up, creates empty healthyDevices list so that the resource capacity
  510. // will stay zero till the corresponding device plugin re-registers.
  511. m.healthyDevices[resource] = sets.NewString()
  512. m.unhealthyDevices[resource] = sets.NewString()
  513. m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
  514. }
  515. return nil
  516. }
  517. // UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
  518. func (m *ManagerImpl) UpdateAllocatedDevices() {
  519. activePods := m.activePods()
  520. if !m.sourcesReady.AllReady() {
  521. return
  522. }
  523. m.mutex.Lock()
  524. defer m.mutex.Unlock()
  525. podsToBeRemoved := m.podDevices.pods()
  526. for _, pod := range activePods {
  527. podsToBeRemoved.Delete(string(pod.UID))
  528. }
  529. if len(podsToBeRemoved) <= 0 {
  530. return
  531. }
  532. klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
  533. m.podDevices.delete(podsToBeRemoved.List())
  534. // Regenerated allocatedDevices after we update pod allocation information.
  535. m.allocatedDevices = m.podDevices.devices()
  536. }
  537. // Returns list of device Ids we need to allocate with Allocate rpc call.
  538. // Returns empty list in case we don't need to issue the Allocate rpc call.
  539. func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
  540. m.mutex.Lock()
  541. defer m.mutex.Unlock()
  542. needed := required
  543. // Gets list of devices that have already been allocated.
  544. // This can happen if a container restarts for example.
  545. devices := m.podDevices.containerDevices(podUID, contName, resource)
  546. if devices != nil {
  547. klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
  548. needed = needed - devices.Len()
  549. // A pod's resource is not expected to change once admitted by the API server,
  550. // so just fail loudly here. We can revisit this part if this no longer holds.
  551. if needed != 0 {
  552. return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
  553. }
  554. }
  555. if needed == 0 {
  556. // No change, no work.
  557. return nil, nil
  558. }
  559. klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
  560. // Needs to allocate additional devices.
  561. if _, ok := m.healthyDevices[resource]; !ok {
  562. return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
  563. }
  564. devices = sets.NewString()
  565. // Allocates from reusableDevices list first.
  566. for device := range reusableDevices {
  567. devices.Insert(device)
  568. needed--
  569. if needed == 0 {
  570. return devices, nil
  571. }
  572. }
  573. // Needs to allocate additional devices.
  574. if m.allocatedDevices[resource] == nil {
  575. m.allocatedDevices[resource] = sets.NewString()
  576. }
  577. // Gets Devices in use.
  578. devicesInUse := m.allocatedDevices[resource]
  579. // Gets a list of available devices.
  580. available := m.healthyDevices[resource].Difference(devicesInUse)
  581. if available.Len() < needed {
  582. return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
  583. }
  584. // By default, pull devices from the unsorted list of available devices.
  585. allocated := available.UnsortedList()[:needed]
  586. // If topology alignment is desired, update allocated to the set of devices
  587. // with the best alignment.
  588. hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
  589. if m.deviceHasTopologyAlignment(resource) && hint.NUMANodeAffinity != nil {
  590. allocated = m.takeByTopology(resource, available, hint.NUMANodeAffinity, needed)
  591. }
  592. // Updates m.allocatedDevices with allocated devices to prevent them
  593. // from being allocated to other pods/containers, given that we are
  594. // not holding lock during the rpc call.
  595. for _, device := range allocated {
  596. m.allocatedDevices[resource].Insert(device)
  597. devices.Insert(device)
  598. }
  599. return devices, nil
  600. }
  601. func (m *ManagerImpl) takeByTopology(resource string, available sets.String, affinity bitmask.BitMask, request int) []string {
  602. // Build a map of NUMA Nodes to the devices associated with them. A
  603. // device may be associated to multiple NUMA nodes at the same time. If an
  604. // available device does not have any NUMA Nodes associated with it, add it
  605. // to a list of NUMA Nodes for the fake NUMANode -1.
  606. perNodeDevices := make(map[int]sets.String)
  607. nodeWithoutTopology := -1
  608. for d := range available {
  609. if m.allDevices[resource][d].Topology == nil || len(m.allDevices[resource][d].Topology.Nodes) == 0 {
  610. if _, ok := perNodeDevices[nodeWithoutTopology]; !ok {
  611. perNodeDevices[nodeWithoutTopology] = sets.NewString()
  612. }
  613. perNodeDevices[nodeWithoutTopology].Insert(d)
  614. continue
  615. }
  616. for _, node := range m.allDevices[resource][d].Topology.Nodes {
  617. if _, ok := perNodeDevices[int(node.ID)]; !ok {
  618. perNodeDevices[int(node.ID)] = sets.NewString()
  619. }
  620. perNodeDevices[int(node.ID)].Insert(d)
  621. }
  622. }
  623. // Get a flat list of all of the nodes associated with available devices.
  624. var nodes []int
  625. for node := range perNodeDevices {
  626. nodes = append(nodes, node)
  627. }
  628. // Sort the list of nodes by how many devices they contain.
  629. sort.Slice(nodes, func(i, j int) bool {
  630. return perNodeDevices[i].Len() < perNodeDevices[j].Len()
  631. })
  632. // Generate three sorted lists of devices. Devices in the first list come
  633. // from valid NUMA Nodes contained in the affinity mask. Devices in the
  634. // second list come from valid NUMA Nodes not in the affinity mask. Devices
  635. // in the third list come from devices with no NUMA Node association (i.e.
  636. // those mapped to the fake NUMA Node -1). Because we loop through the
  637. // sorted list of NUMA nodes in order, within each list, devices are sorted
  638. // by their connection to NUMA Nodes with more devices on them.
  639. var fromAffinity []string
  640. var notFromAffinity []string
  641. var withoutTopology []string
  642. for d := range available {
  643. // Since the same device may be associated with multiple NUMA Nodes. We
  644. // need to be careful not to add each device to multiple lists. The
  645. // logic below ensures this by breaking after the first NUMA node that
  646. // has the device is encountered.
  647. for _, n := range nodes {
  648. if perNodeDevices[n].Has(d) {
  649. if n == nodeWithoutTopology {
  650. withoutTopology = append(withoutTopology, d)
  651. } else if affinity.IsSet(n) {
  652. fromAffinity = append(fromAffinity, d)
  653. } else {
  654. notFromAffinity = append(notFromAffinity, d)
  655. }
  656. break
  657. }
  658. }
  659. }
  660. // Concatenate the lists above return the first 'request' devices from it..
  661. return append(append(fromAffinity, notFromAffinity...), withoutTopology...)[:request]
  662. }
  663. // allocateContainerResources attempts to allocate all of required device
  664. // plugin resources for the input container, issues an Allocate rpc request
  665. // for each new device resource requirement, processes their AllocateResponses,
  666. // and updates the cached containerDevices on success.
  667. func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
  668. podUID := string(pod.UID)
  669. contName := container.Name
  670. allocatedDevicesUpdated := false
  671. // Extended resources are not allowed to be overcommitted.
  672. // Since device plugin advertises extended resources,
  673. // therefore Requests must be equal to Limits and iterating
  674. // over the Limits should be sufficient.
  675. for k, v := range container.Resources.Limits {
  676. resource := string(k)
  677. needed := int(v.Value())
  678. klog.V(3).Infof("needs %d %s", needed, resource)
  679. if !m.isDevicePluginResource(resource) {
  680. continue
  681. }
  682. // Updates allocatedDevices to garbage collect any stranded resources
  683. // before doing the device plugin allocation.
  684. if !allocatedDevicesUpdated {
  685. m.UpdateAllocatedDevices()
  686. allocatedDevicesUpdated = true
  687. }
  688. allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
  689. if err != nil {
  690. return err
  691. }
  692. if allocDevices == nil || len(allocDevices) <= 0 {
  693. continue
  694. }
  695. startRPCTime := time.Now()
  696. // Manager.Allocate involves RPC calls to device plugin, which
  697. // could be heavy-weight. Therefore we want to perform this operation outside
  698. // mutex lock. Note if Allocate call fails, we may leave container resources
  699. // partially allocated for the failed container. We rely on UpdateAllocatedDevices()
  700. // to garbage collect these resources later. Another side effect is that if
  701. // we have X resource A and Y resource B in total, and two containers, container1
  702. // and container2 both require X resource A and Y resource B. Both allocation
  703. // requests may fail if we serve them in mixed order.
  704. // TODO: may revisit this part later if we see inefficient resource allocation
  705. // in real use as the result of this. Should also consider to parallelize device
  706. // plugin Allocate grpc calls if it becomes common that a container may require
  707. // resources from multiple device plugins.
  708. m.mutex.Lock()
  709. eI, ok := m.endpoints[resource]
  710. m.mutex.Unlock()
  711. if !ok {
  712. m.mutex.Lock()
  713. m.allocatedDevices = m.podDevices.devices()
  714. m.mutex.Unlock()
  715. return fmt.Errorf("unknown Device Plugin %s", resource)
  716. }
  717. devs := allocDevices.UnsortedList()
  718. // TODO: refactor this part of code to just append a ContainerAllocationRequest
  719. // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
  720. klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
  721. resp, err := eI.e.allocate(devs)
  722. metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
  723. if err != nil {
  724. // In case of allocation failure, we want to restore m.allocatedDevices
  725. // to the actual allocated state from m.podDevices.
  726. m.mutex.Lock()
  727. m.allocatedDevices = m.podDevices.devices()
  728. m.mutex.Unlock()
  729. return err
  730. }
  731. if len(resp.ContainerResponses) == 0 {
  732. return fmt.Errorf("no containers return in allocation response %v", resp)
  733. }
  734. // Update internal cached podDevices state.
  735. m.mutex.Lock()
  736. m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
  737. m.mutex.Unlock()
  738. }
  739. // Checkpoints device to container allocation information.
  740. return m.writeCheckpoint()
  741. }
  742. // GetDeviceRunContainerOptions checks whether we have cached containerDevices
  743. // for the passed-in <pod, container> and returns its DeviceRunContainerOptions
  744. // for the found one. An empty struct is returned in case no cached state is found.
  745. func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
  746. podUID := string(pod.UID)
  747. contName := container.Name
  748. needsReAllocate := false
  749. for k := range container.Resources.Limits {
  750. resource := string(k)
  751. if !m.isDevicePluginResource(resource) {
  752. continue
  753. }
  754. err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
  755. if err != nil {
  756. return nil, err
  757. }
  758. // This is a device plugin resource yet we don't have cached
  759. // resource state. This is likely due to a race during node
  760. // restart. We re-issue allocate request to cover this race.
  761. if m.podDevices.containerDevices(podUID, contName, resource) == nil {
  762. needsReAllocate = true
  763. }
  764. }
  765. if needsReAllocate {
  766. klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID)
  767. if err := m.allocatePodResources(pod); err != nil {
  768. return nil, err
  769. }
  770. }
  771. m.mutex.Lock()
  772. defer m.mutex.Unlock()
  773. return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
  774. }
  775. // callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource
  776. // with PreStartRequired option set.
  777. func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
  778. m.mutex.Lock()
  779. eI, ok := m.endpoints[resource]
  780. if !ok {
  781. m.mutex.Unlock()
  782. return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
  783. }
  784. if eI.opts == nil || !eI.opts.PreStartRequired {
  785. m.mutex.Unlock()
  786. klog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource)
  787. return nil
  788. }
  789. devices := m.podDevices.containerDevices(podUID, contName, resource)
  790. if devices == nil {
  791. m.mutex.Unlock()
  792. return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
  793. }
  794. m.mutex.Unlock()
  795. devs := devices.UnsortedList()
  796. klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
  797. _, err := eI.e.preStartContainer(devs)
  798. if err != nil {
  799. return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
  800. }
  801. // TODO: Add metrics support for init RPC
  802. return nil
  803. }
  804. // sanitizeNodeAllocatable scans through allocatedDevices in the device manager
  805. // and if necessary, updates allocatableResource in nodeInfo to at least equal to
  806. // the allocated capacity. This allows pods that have already been scheduled on
  807. // the node to pass GeneralPredicates admission checking even upon device plugin failure.
  808. func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
  809. var newAllocatableResource *schedulernodeinfo.Resource
  810. allocatableResource := node.AllocatableResource()
  811. if allocatableResource.ScalarResources == nil {
  812. allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
  813. }
  814. for resource, devices := range m.allocatedDevices {
  815. needed := devices.Len()
  816. quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
  817. if ok && int(quant) >= needed {
  818. continue
  819. }
  820. // Needs to update nodeInfo.AllocatableResource to make sure
  821. // NodeInfo.allocatableResource at least equal to the capacity already allocated.
  822. if newAllocatableResource == nil {
  823. newAllocatableResource = allocatableResource.Clone()
  824. }
  825. newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
  826. }
  827. if newAllocatableResource != nil {
  828. node.SetAllocatableResource(newAllocatableResource)
  829. }
  830. }
  831. func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
  832. _, registeredResource := m.healthyDevices[resource]
  833. _, allocatedResource := m.allocatedDevices[resource]
  834. // Return true if this is either an active device plugin resource or
  835. // a resource we have previously allocated.
  836. if registeredResource || allocatedResource {
  837. return true
  838. }
  839. return false
  840. }
  841. // GetDevices returns the devices used by the specified container
  842. func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
  843. m.mutex.Lock()
  844. defer m.mutex.Unlock()
  845. return m.podDevices.getContainerDevices(podUID, containerName)
  846. }
  847. // ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
  848. // depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
  849. // has been recreated.
  850. func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
  851. if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
  852. checkpoints, err := m.checkpointManager.ListCheckpoints()
  853. if err != nil {
  854. return false
  855. }
  856. return len(checkpoints) == 0
  857. }
  858. return false
  859. }