manager.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package devicemanager
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "os"
  19. "path/filepath"
  20. "sync"
  21. "time"
  22. "google.golang.org/grpc"
  23. "k8s.io/klog"
  24. v1 "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. "k8s.io/apimachinery/pkg/util/sets"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  29. "k8s.io/kubernetes/pkg/features"
  30. pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
  31. podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
  32. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  33. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
  34. "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
  35. "k8s.io/kubernetes/pkg/kubelet/config"
  36. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  37. "k8s.io/kubernetes/pkg/kubelet/metrics"
  38. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  39. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  40. "k8s.io/kubernetes/pkg/util/selinux"
  41. )
  42. // ActivePodsFunc is a function that returns a list of pods to reconcile.
  43. type ActivePodsFunc func() []*v1.Pod
  44. // monitorCallback is the function called when a device's health state changes,
  45. // or new devices are reported, or old devices are deleted.
  46. // Updated contains the most recent state of the Device.
  47. type monitorCallback func(resourceName string, devices []pluginapi.Device)
  48. // ManagerImpl is the structure in charge of managing Device Plugins.
  49. type ManagerImpl struct {
  50. socketname string
  51. socketdir string
  52. endpoints map[string]endpointInfo // Key is ResourceName
  53. mutex sync.Mutex
  54. server *grpc.Server
  55. wg sync.WaitGroup
  56. // activePods is a method for listing active pods on the node
  57. // so the amount of pluginResources requested by existing pods
  58. // could be counted when updating allocated devices
  59. activePods ActivePodsFunc
  60. // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
  61. // We use it to determine when we can purge inactive pods from checkpointed state.
  62. sourcesReady config.SourcesReady
  63. // callback is used for updating devices' states in one time call.
  64. // e.g. a new device is advertised, two old devices are deleted and a running device fails.
  65. callback monitorCallback
  66. // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
  67. healthyDevices map[string]sets.String
  68. // unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
  69. unhealthyDevices map[string]sets.String
  70. // allocatedDevices contains allocated deviceIds, keyed by resourceName.
  71. allocatedDevices map[string]sets.String
  72. // podDevices contains pod to allocated device mapping.
  73. podDevices podDevices
  74. checkpointManager checkpointmanager.CheckpointManager
  75. }
  76. type endpointInfo struct {
  77. e endpoint
  78. opts *pluginapi.DevicePluginOptions
  79. }
  80. type sourcesReadyStub struct{}
  81. func (s *sourcesReadyStub) AddSource(source string) {}
  82. func (s *sourcesReadyStub) AllReady() bool { return true }
  83. // NewManagerImpl creates a new manager.
  84. func NewManagerImpl() (*ManagerImpl, error) {
  85. return newManagerImpl(pluginapi.KubeletSocket)
  86. }
  87. func newManagerImpl(socketPath string) (*ManagerImpl, error) {
  88. klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
  89. if socketPath == "" || !filepath.IsAbs(socketPath) {
  90. return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
  91. }
  92. dir, file := filepath.Split(socketPath)
  93. manager := &ManagerImpl{
  94. endpoints: make(map[string]endpointInfo),
  95. socketname: file,
  96. socketdir: dir,
  97. healthyDevices: make(map[string]sets.String),
  98. unhealthyDevices: make(map[string]sets.String),
  99. allocatedDevices: make(map[string]sets.String),
  100. podDevices: make(podDevices),
  101. }
  102. manager.callback = manager.genericDeviceUpdateCallback
  103. // The following structures are populated with real implementations in manager.Start()
  104. // Before that, initializes them to perform no-op operations.
  105. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
  106. manager.sourcesReady = &sourcesReadyStub{}
  107. checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
  108. if err != nil {
  109. return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
  110. }
  111. manager.checkpointManager = checkpointManager
  112. return manager, nil
  113. }
  114. func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
  115. m.mutex.Lock()
  116. m.healthyDevices[resourceName] = sets.NewString()
  117. m.unhealthyDevices[resourceName] = sets.NewString()
  118. for _, dev := range devices {
  119. if dev.Health == pluginapi.Healthy {
  120. m.healthyDevices[resourceName].Insert(dev.ID)
  121. } else {
  122. m.unhealthyDevices[resourceName].Insert(dev.ID)
  123. }
  124. }
  125. m.mutex.Unlock()
  126. m.writeCheckpoint()
  127. }
  128. func (m *ManagerImpl) removeContents(dir string) error {
  129. d, err := os.Open(dir)
  130. if err != nil {
  131. return err
  132. }
  133. defer d.Close()
  134. names, err := d.Readdirnames(-1)
  135. if err != nil {
  136. return err
  137. }
  138. for _, name := range names {
  139. filePath := filepath.Join(dir, name)
  140. if filePath == m.checkpointFile() {
  141. continue
  142. }
  143. stat, err := os.Stat(filePath)
  144. if err != nil {
  145. klog.Errorf("Failed to stat file %s: %v", filePath, err)
  146. continue
  147. }
  148. if stat.IsDir() {
  149. continue
  150. }
  151. err = os.RemoveAll(filePath)
  152. if err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. // checkpointFile returns device plugin checkpoint file path.
  159. func (m *ManagerImpl) checkpointFile() string {
  160. return filepath.Join(m.socketdir, kubeletDeviceManagerCheckpoint)
  161. }
  162. // Start starts the Device Plugin Manager and start initialization of
  163. // podDevices and allocatedDevices information from checkpointed state and
  164. // starts device plugin registration service.
  165. func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
  166. klog.V(2).Infof("Starting Device Plugin manager")
  167. m.activePods = activePods
  168. m.sourcesReady = sourcesReady
  169. // Loads in allocatedDevices information from disk.
  170. err := m.readCheckpoint()
  171. if err != nil {
  172. klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
  173. }
  174. socketPath := filepath.Join(m.socketdir, m.socketname)
  175. os.MkdirAll(m.socketdir, 0755)
  176. if selinux.SELinuxEnabled() {
  177. if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
  178. klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
  179. }
  180. }
  181. // Removes all stale sockets in m.socketdir. Device plugins can monitor
  182. // this and use it as a signal to re-register with the new Kubelet.
  183. if err := m.removeContents(m.socketdir); err != nil {
  184. klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
  185. }
  186. s, err := net.Listen("unix", socketPath)
  187. if err != nil {
  188. klog.Errorf(errListenSocket+" %v", err)
  189. return err
  190. }
  191. m.wg.Add(1)
  192. m.server = grpc.NewServer([]grpc.ServerOption{}...)
  193. pluginapi.RegisterRegistrationServer(m.server, m)
  194. go func() {
  195. defer m.wg.Done()
  196. m.server.Serve(s)
  197. }()
  198. klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
  199. return nil
  200. }
  201. // GetWatcherHandler returns the plugin handler
  202. func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
  203. if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
  204. klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
  205. } else {
  206. f.Close()
  207. klog.V(4).Infof("created deprecation file %s", f.Name())
  208. }
  209. return cache.PluginHandler(m)
  210. }
  211. // ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
  212. func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
  213. klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
  214. if !m.isVersionCompatibleWithPlugin(versions) {
  215. return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
  216. }
  217. if !v1helper.IsExtendedResourceName(v1.ResourceName(pluginName)) {
  218. return fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, pluginName))
  219. }
  220. return nil
  221. }
  222. // RegisterPlugin starts the endpoint and registers it
  223. // TODO: Start the endpoint and wait for the First ListAndWatch call
  224. // before registering the plugin
  225. func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
  226. klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
  227. e, err := newEndpointImpl(endpoint, pluginName, m.callback)
  228. if err != nil {
  229. return fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", endpoint, err)
  230. }
  231. options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
  232. if err != nil {
  233. return fmt.Errorf("Failed to get device plugin options: %v", err)
  234. }
  235. m.registerEndpoint(pluginName, options, e)
  236. go m.runEndpoint(pluginName, e)
  237. return nil
  238. }
  239. // DeRegisterPlugin deregisters the plugin
  240. // TODO work on the behavior for deregistering plugins
  241. // e.g: Should we delete the resource
  242. func (m *ManagerImpl) DeRegisterPlugin(pluginName string) {
  243. m.mutex.Lock()
  244. defer m.mutex.Unlock()
  245. // Note: This will mark the resource unhealthy as per the behavior
  246. // in runEndpoint
  247. if eI, ok := m.endpoints[pluginName]; ok {
  248. eI.e.stop()
  249. }
  250. }
  251. func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
  252. // TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support
  253. // multiple versions in the future, we may need to extend this function to return a supported version.
  254. // E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
  255. // this function should return v1beta1
  256. for _, version := range versions {
  257. for _, supportedVersion := range pluginapi.SupportedVersions {
  258. if version == supportedVersion {
  259. return true
  260. }
  261. }
  262. }
  263. return false
  264. }
  265. func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
  266. devicesToReuse := make(map[string]sets.String)
  267. for _, container := range pod.Spec.InitContainers {
  268. if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
  269. return err
  270. }
  271. m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
  272. }
  273. for _, container := range pod.Spec.Containers {
  274. if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
  275. return err
  276. }
  277. m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
  278. }
  279. return nil
  280. }
  281. // Allocate is the call that you can use to allocate a set of devices
  282. // from the registered device plugins.
  283. func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
  284. pod := attrs.Pod
  285. err := m.allocatePodResources(pod)
  286. if err != nil {
  287. klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
  288. return err
  289. }
  290. m.mutex.Lock()
  291. defer m.mutex.Unlock()
  292. // quick return if no pluginResources requested
  293. if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
  294. return nil
  295. }
  296. m.sanitizeNodeAllocatable(node)
  297. return nil
  298. }
  299. // Register registers a device plugin.
  300. func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
  301. klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
  302. metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
  303. metrics.DeprecatedDevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
  304. var versionCompatible bool
  305. for _, v := range pluginapi.SupportedVersions {
  306. if r.Version == v {
  307. versionCompatible = true
  308. break
  309. }
  310. }
  311. if !versionCompatible {
  312. errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
  313. klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
  314. return &pluginapi.Empty{}, fmt.Errorf(errorString)
  315. }
  316. if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
  317. errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
  318. klog.Infof("Bad registration request from device plugin: %s", errorString)
  319. return &pluginapi.Empty{}, fmt.Errorf(errorString)
  320. }
  321. // TODO: for now, always accepts newest device plugin. Later may consider to
  322. // add some policies here, e.g., verify whether an old device plugin with the
  323. // same resource name is still alive to determine whether we want to accept
  324. // the new registration.
  325. go m.addEndpoint(r)
  326. return &pluginapi.Empty{}, nil
  327. }
  328. // Stop is the function that can stop the gRPC server.
  329. // Can be called concurrently, more than once, and is safe to call
  330. // without a prior Start.
  331. func (m *ManagerImpl) Stop() error {
  332. m.mutex.Lock()
  333. defer m.mutex.Unlock()
  334. for _, eI := range m.endpoints {
  335. eI.e.stop()
  336. }
  337. if m.server == nil {
  338. return nil
  339. }
  340. m.server.Stop()
  341. m.wg.Wait()
  342. m.server = nil
  343. return nil
  344. }
  345. func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
  346. m.mutex.Lock()
  347. defer m.mutex.Unlock()
  348. m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
  349. klog.V(2).Infof("Registered endpoint %v", e)
  350. }
  351. func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
  352. e.run()
  353. e.stop()
  354. m.mutex.Lock()
  355. defer m.mutex.Unlock()
  356. if old, ok := m.endpoints[resourceName]; ok && old.e == e {
  357. m.markResourceUnhealthy(resourceName)
  358. }
  359. klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
  360. }
  361. func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
  362. new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
  363. if err != nil {
  364. klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
  365. return
  366. }
  367. m.registerEndpoint(r.ResourceName, r.Options, new)
  368. go func() {
  369. m.runEndpoint(r.ResourceName, new)
  370. }()
  371. }
  372. func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
  373. klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
  374. healthyDevices := sets.NewString()
  375. if _, ok := m.healthyDevices[resourceName]; ok {
  376. healthyDevices = m.healthyDevices[resourceName]
  377. m.healthyDevices[resourceName] = sets.NewString()
  378. }
  379. if _, ok := m.unhealthyDevices[resourceName]; !ok {
  380. m.unhealthyDevices[resourceName] = sets.NewString()
  381. }
  382. m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
  383. }
  384. // GetCapacity is expected to be called when Kubelet updates its node status.
  385. // The first returned variable contains the registered device plugin resource capacity.
  386. // The second returned variable contains the registered device plugin resource allocatable.
  387. // The third returned variable contains previously registered resources that are no longer active.
  388. // Kubelet uses this information to update resource capacity/allocatable in its node status.
  389. // After the call, device plugin can remove the inactive resources from its internal list as the
  390. // change is already reflected in Kubelet node status.
  391. // Note in the special case after Kubelet restarts, device plugin resource capacities can
  392. // temporarily drop to zero till corresponding device plugins re-register. This is OK because
  393. // cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
  394. // capacity for already allocated pods so that they can continue to run. However, new pods
  395. // requiring device plugin resources will not be scheduled till device plugin re-registers.
  396. func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
  397. needsUpdateCheckpoint := false
  398. var capacity = v1.ResourceList{}
  399. var allocatable = v1.ResourceList{}
  400. deletedResources := sets.NewString()
  401. m.mutex.Lock()
  402. for resourceName, devices := range m.healthyDevices {
  403. eI, ok := m.endpoints[resourceName]
  404. if (ok && eI.e.stopGracePeriodExpired()) || !ok {
  405. // The resources contained in endpoints and (un)healthyDevices
  406. // should always be consistent. Otherwise, we run with the risk
  407. // of failing to garbage collect non-existing resources or devices.
  408. if !ok {
  409. klog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
  410. }
  411. delete(m.endpoints, resourceName)
  412. delete(m.healthyDevices, resourceName)
  413. deletedResources.Insert(resourceName)
  414. needsUpdateCheckpoint = true
  415. } else {
  416. capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  417. allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  418. }
  419. }
  420. for resourceName, devices := range m.unhealthyDevices {
  421. eI, ok := m.endpoints[resourceName]
  422. if (ok && eI.e.stopGracePeriodExpired()) || !ok {
  423. if !ok {
  424. klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
  425. }
  426. delete(m.endpoints, resourceName)
  427. delete(m.unhealthyDevices, resourceName)
  428. deletedResources.Insert(resourceName)
  429. needsUpdateCheckpoint = true
  430. } else {
  431. capacityCount := capacity[v1.ResourceName(resourceName)]
  432. unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
  433. capacityCount.Add(unhealthyCount)
  434. capacity[v1.ResourceName(resourceName)] = capacityCount
  435. }
  436. }
  437. m.mutex.Unlock()
  438. if needsUpdateCheckpoint {
  439. m.writeCheckpoint()
  440. }
  441. return capacity, allocatable, deletedResources.UnsortedList()
  442. }
  443. // Checkpoints device to container allocation information to disk.
  444. func (m *ManagerImpl) writeCheckpoint() error {
  445. m.mutex.Lock()
  446. registeredDevs := make(map[string][]string)
  447. for resource, devices := range m.healthyDevices {
  448. registeredDevs[resource] = devices.UnsortedList()
  449. }
  450. data := checkpoint.New(m.podDevices.toCheckpointData(),
  451. registeredDevs)
  452. m.mutex.Unlock()
  453. err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
  454. if err != nil {
  455. return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
  456. }
  457. return nil
  458. }
  459. // Reads device to container allocation information from disk, and populates
  460. // m.allocatedDevices accordingly.
  461. func (m *ManagerImpl) readCheckpoint() error {
  462. registeredDevs := make(map[string][]string)
  463. devEntries := make([]checkpoint.PodDevicesEntry, 0)
  464. cp := checkpoint.New(devEntries, registeredDevs)
  465. err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
  466. if err != nil {
  467. if err == errors.ErrCheckpointNotFound {
  468. klog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
  469. return nil
  470. }
  471. return err
  472. }
  473. m.mutex.Lock()
  474. defer m.mutex.Unlock()
  475. podDevices, registeredDevs := cp.GetData()
  476. m.podDevices.fromCheckpointData(podDevices)
  477. m.allocatedDevices = m.podDevices.devices()
  478. for resource := range registeredDevs {
  479. // During start up, creates empty healthyDevices list so that the resource capacity
  480. // will stay zero till the corresponding device plugin re-registers.
  481. m.healthyDevices[resource] = sets.NewString()
  482. m.unhealthyDevices[resource] = sets.NewString()
  483. m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
  484. }
  485. return nil
  486. }
  487. // updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
  488. // terminated pods. Returns error on failure.
  489. func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
  490. if !m.sourcesReady.AllReady() {
  491. return
  492. }
  493. m.mutex.Lock()
  494. defer m.mutex.Unlock()
  495. activePodUids := sets.NewString()
  496. for _, pod := range activePods {
  497. activePodUids.Insert(string(pod.UID))
  498. }
  499. allocatedPodUids := m.podDevices.pods()
  500. podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
  501. if len(podsToBeRemoved) <= 0 {
  502. return
  503. }
  504. klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
  505. m.podDevices.delete(podsToBeRemoved.List())
  506. // Regenerated allocatedDevices after we update pod allocation information.
  507. m.allocatedDevices = m.podDevices.devices()
  508. }
  509. // Returns list of device Ids we need to allocate with Allocate rpc call.
  510. // Returns empty list in case we don't need to issue the Allocate rpc call.
  511. func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
  512. m.mutex.Lock()
  513. defer m.mutex.Unlock()
  514. needed := required
  515. // Gets list of devices that have already been allocated.
  516. // This can happen if a container restarts for example.
  517. devices := m.podDevices.containerDevices(podUID, contName, resource)
  518. if devices != nil {
  519. klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
  520. needed = needed - devices.Len()
  521. // A pod's resource is not expected to change once admitted by the API server,
  522. // so just fail loudly here. We can revisit this part if this no longer holds.
  523. if needed != 0 {
  524. return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
  525. }
  526. }
  527. if needed == 0 {
  528. // No change, no work.
  529. return nil, nil
  530. }
  531. klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
  532. // Needs to allocate additional devices.
  533. if _, ok := m.healthyDevices[resource]; !ok {
  534. return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
  535. }
  536. devices = sets.NewString()
  537. // Allocates from reusableDevices list first.
  538. for device := range reusableDevices {
  539. devices.Insert(device)
  540. needed--
  541. if needed == 0 {
  542. return devices, nil
  543. }
  544. }
  545. // Needs to allocate additional devices.
  546. if m.allocatedDevices[resource] == nil {
  547. m.allocatedDevices[resource] = sets.NewString()
  548. }
  549. // Gets Devices in use.
  550. devicesInUse := m.allocatedDevices[resource]
  551. // Gets a list of available devices.
  552. available := m.healthyDevices[resource].Difference(devicesInUse)
  553. if available.Len() < needed {
  554. return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
  555. }
  556. allocated := available.UnsortedList()[:needed]
  557. // Updates m.allocatedDevices with allocated devices to prevent them
  558. // from being allocated to other pods/containers, given that we are
  559. // not holding lock during the rpc call.
  560. for _, device := range allocated {
  561. m.allocatedDevices[resource].Insert(device)
  562. devices.Insert(device)
  563. }
  564. return devices, nil
  565. }
  566. // allocateContainerResources attempts to allocate all of required device
  567. // plugin resources for the input container, issues an Allocate rpc request
  568. // for each new device resource requirement, processes their AllocateResponses,
  569. // and updates the cached containerDevices on success.
  570. func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
  571. podUID := string(pod.UID)
  572. contName := container.Name
  573. allocatedDevicesUpdated := false
  574. // Extended resources are not allowed to be overcommitted.
  575. // Since device plugin advertises extended resources,
  576. // therefore Requests must be equal to Limits and iterating
  577. // over the Limits should be sufficient.
  578. for k, v := range container.Resources.Limits {
  579. resource := string(k)
  580. needed := int(v.Value())
  581. klog.V(3).Infof("needs %d %s", needed, resource)
  582. if !m.isDevicePluginResource(resource) {
  583. continue
  584. }
  585. // Updates allocatedDevices to garbage collect any stranded resources
  586. // before doing the device plugin allocation.
  587. if !allocatedDevicesUpdated {
  588. m.updateAllocatedDevices(m.activePods())
  589. allocatedDevicesUpdated = true
  590. }
  591. allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
  592. if err != nil {
  593. return err
  594. }
  595. if allocDevices == nil || len(allocDevices) <= 0 {
  596. continue
  597. }
  598. startRPCTime := time.Now()
  599. // Manager.Allocate involves RPC calls to device plugin, which
  600. // could be heavy-weight. Therefore we want to perform this operation outside
  601. // mutex lock. Note if Allocate call fails, we may leave container resources
  602. // partially allocated for the failed container. We rely on updateAllocatedDevices()
  603. // to garbage collect these resources later. Another side effect is that if
  604. // we have X resource A and Y resource B in total, and two containers, container1
  605. // and container2 both require X resource A and Y resource B. Both allocation
  606. // requests may fail if we serve them in mixed order.
  607. // TODO: may revisit this part later if we see inefficient resource allocation
  608. // in real use as the result of this. Should also consider to parallelize device
  609. // plugin Allocate grpc calls if it becomes common that a container may require
  610. // resources from multiple device plugins.
  611. m.mutex.Lock()
  612. eI, ok := m.endpoints[resource]
  613. m.mutex.Unlock()
  614. if !ok {
  615. m.mutex.Lock()
  616. m.allocatedDevices = m.podDevices.devices()
  617. m.mutex.Unlock()
  618. return fmt.Errorf("Unknown Device Plugin %s", resource)
  619. }
  620. devs := allocDevices.UnsortedList()
  621. // TODO: refactor this part of code to just append a ContainerAllocationRequest
  622. // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
  623. klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
  624. resp, err := eI.e.allocate(devs)
  625. metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
  626. metrics.DeprecatedDevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))
  627. if err != nil {
  628. // In case of allocation failure, we want to restore m.allocatedDevices
  629. // to the actual allocated state from m.podDevices.
  630. m.mutex.Lock()
  631. m.allocatedDevices = m.podDevices.devices()
  632. m.mutex.Unlock()
  633. return err
  634. }
  635. if len(resp.ContainerResponses) == 0 {
  636. return fmt.Errorf("No containers return in allocation response %v", resp)
  637. }
  638. // Update internal cached podDevices state.
  639. m.mutex.Lock()
  640. m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
  641. m.mutex.Unlock()
  642. }
  643. // Checkpoints device to container allocation information.
  644. return m.writeCheckpoint()
  645. }
  646. // GetDeviceRunContainerOptions checks whether we have cached containerDevices
  647. // for the passed-in <pod, container> and returns its DeviceRunContainerOptions
  648. // for the found one. An empty struct is returned in case no cached state is found.
  649. func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
  650. podUID := string(pod.UID)
  651. contName := container.Name
  652. needsReAllocate := false
  653. for k := range container.Resources.Limits {
  654. resource := string(k)
  655. if !m.isDevicePluginResource(resource) {
  656. continue
  657. }
  658. err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
  659. if err != nil {
  660. return nil, err
  661. }
  662. // This is a device plugin resource yet we don't have cached
  663. // resource state. This is likely due to a race during node
  664. // restart. We re-issue allocate request to cover this race.
  665. if m.podDevices.containerDevices(podUID, contName, resource) == nil {
  666. needsReAllocate = true
  667. }
  668. }
  669. if needsReAllocate {
  670. klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID)
  671. m.allocatePodResources(pod)
  672. }
  673. m.mutex.Lock()
  674. defer m.mutex.Unlock()
  675. return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
  676. }
  677. // callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource
  678. // with PreStartRequired option set.
  679. func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
  680. m.mutex.Lock()
  681. eI, ok := m.endpoints[resource]
  682. if !ok {
  683. m.mutex.Unlock()
  684. return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
  685. }
  686. if eI.opts == nil || !eI.opts.PreStartRequired {
  687. m.mutex.Unlock()
  688. klog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource)
  689. return nil
  690. }
  691. devices := m.podDevices.containerDevices(podUID, contName, resource)
  692. if devices == nil {
  693. m.mutex.Unlock()
  694. return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
  695. }
  696. m.mutex.Unlock()
  697. devs := devices.UnsortedList()
  698. klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
  699. _, err := eI.e.preStartContainer(devs)
  700. if err != nil {
  701. return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
  702. }
  703. // TODO: Add metrics support for init RPC
  704. return nil
  705. }
  706. // sanitizeNodeAllocatable scans through allocatedDevices in the device manager
  707. // and if necessary, updates allocatableResource in nodeInfo to at least equal to
  708. // the allocated capacity. This allows pods that have already been scheduled on
  709. // the node to pass GeneralPredicates admission checking even upon device plugin failure.
  710. func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
  711. var newAllocatableResource *schedulernodeinfo.Resource
  712. allocatableResource := node.AllocatableResource()
  713. if allocatableResource.ScalarResources == nil {
  714. allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
  715. }
  716. for resource, devices := range m.allocatedDevices {
  717. needed := devices.Len()
  718. quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
  719. if ok && int(quant) >= needed {
  720. continue
  721. }
  722. // Needs to update nodeInfo.AllocatableResource to make sure
  723. // NodeInfo.allocatableResource at least equal to the capacity already allocated.
  724. if newAllocatableResource == nil {
  725. newAllocatableResource = allocatableResource.Clone()
  726. }
  727. newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
  728. }
  729. if newAllocatableResource != nil {
  730. node.SetAllocatableResource(newAllocatableResource)
  731. }
  732. }
  733. func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
  734. _, registeredResource := m.healthyDevices[resource]
  735. _, allocatedResource := m.allocatedDevices[resource]
  736. // Return true if this is either an active device plugin resource or
  737. // a resource we have previously allocated.
  738. if registeredResource || allocatedResource {
  739. return true
  740. }
  741. return false
  742. }
  743. // GetDevices returns the devices used by the specified container
  744. func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
  745. m.mutex.Lock()
  746. defer m.mutex.Unlock()
  747. return m.podDevices.getContainerDevices(podUID, containerName)
  748. }
  749. // ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
  750. // depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
  751. // has been recreated.
  752. func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
  753. if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
  754. checkpoints, err := m.checkpointManager.ListCheckpoints()
  755. if err != nil {
  756. return false
  757. }
  758. return len(checkpoints) == 0
  759. }
  760. return false
  761. }