manager_test.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package devicemanager
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "os"
  18. "path/filepath"
  19. "reflect"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/assert"
  23. "github.com/stretchr/testify/require"
  24. v1 "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/util/sets"
  28. "k8s.io/apimachinery/pkg/util/uuid"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/tools/record"
  31. pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
  32. watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
  33. "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
  34. "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
  35. "k8s.io/kubernetes/pkg/kubelet/config"
  36. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  37. "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
  38. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  39. )
  40. const (
  41. testResourceName = "fake-domain/resource"
  42. )
  43. func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) {
  44. socketDir, err = ioutil.TempDir("", "device_plugin")
  45. if err != nil {
  46. return
  47. }
  48. socketName = socketDir + "/server.sock"
  49. pluginSocketName = socketDir + "/device-plugin.sock"
  50. os.MkdirAll(socketDir, 0755)
  51. return
  52. }
  53. func TestNewManagerImpl(t *testing.T) {
  54. socketDir, socketName, _, err := tmpSocketDir()
  55. topologyStore := topologymanager.NewFakeManager()
  56. require.NoError(t, err)
  57. defer os.RemoveAll(socketDir)
  58. _, err = newManagerImpl(socketName, nil, topologyStore)
  59. require.NoError(t, err)
  60. os.RemoveAll(socketDir)
  61. }
  62. func TestNewManagerImplStart(t *testing.T) {
  63. socketDir, socketName, pluginSocketName, err := tmpSocketDir()
  64. require.NoError(t, err)
  65. defer os.RemoveAll(socketDir)
  66. m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
  67. cleanup(t, m, p)
  68. // Stop should tolerate being called more than once.
  69. cleanup(t, m, p)
  70. }
  71. func TestNewManagerImplStartProbeMode(t *testing.T) {
  72. socketDir, socketName, pluginSocketName, err := tmpSocketDir()
  73. require.NoError(t, err)
  74. defer os.RemoveAll(socketDir)
  75. m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
  76. cleanup(t, m, p)
  77. }
  78. // Tests that the device plugin manager correctly handles registration and re-registration by
  79. // making sure that after registration, devices are correctly updated and if a re-registration
  80. // happens, we will NOT delete devices; and no orphaned devices left.
  81. func TestDevicePluginReRegistration(t *testing.T) {
  82. socketDir, socketName, pluginSocketName, err := tmpSocketDir()
  83. require.NoError(t, err)
  84. defer os.RemoveAll(socketDir)
  85. devs := []*pluginapi.Device{
  86. {ID: "Dev1", Health: pluginapi.Healthy},
  87. {ID: "Dev2", Health: pluginapi.Healthy},
  88. }
  89. devsForRegistration := []*pluginapi.Device{
  90. {ID: "Dev3", Health: pluginapi.Healthy},
  91. }
  92. for _, preStartContainerFlag := range []bool{false, true} {
  93. m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
  94. p1.Register(socketName, testResourceName, "")
  95. select {
  96. case <-ch:
  97. case <-time.After(5 * time.Second):
  98. t.Fatalf("timeout while waiting for manager update")
  99. }
  100. capacity, allocatable, _ := m.GetCapacity()
  101. resourceCapacity := capacity[v1.ResourceName(testResourceName)]
  102. resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
  103. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  104. require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
  105. p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
  106. err = p2.Start()
  107. require.NoError(t, err)
  108. p2.Register(socketName, testResourceName, "")
  109. select {
  110. case <-ch:
  111. case <-time.After(5 * time.Second):
  112. t.Fatalf("timeout while waiting for manager update")
  113. }
  114. capacity, allocatable, _ = m.GetCapacity()
  115. resourceCapacity = capacity[v1.ResourceName(testResourceName)]
  116. resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
  117. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  118. require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
  119. // Test the scenario that a plugin re-registers with different devices.
  120. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
  121. err = p3.Start()
  122. require.NoError(t, err)
  123. p3.Register(socketName, testResourceName, "")
  124. select {
  125. case <-ch:
  126. case <-time.After(5 * time.Second):
  127. t.Fatalf("timeout while waiting for manager update")
  128. }
  129. capacity, allocatable, _ = m.GetCapacity()
  130. resourceCapacity = capacity[v1.ResourceName(testResourceName)]
  131. resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
  132. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  133. require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
  134. p2.Stop()
  135. p3.Stop()
  136. cleanup(t, m, p1)
  137. }
  138. }
  139. // Tests that the device plugin manager correctly handles registration and re-registration by
  140. // making sure that after registration, devices are correctly updated and if a re-registration
  141. // happens, we will NOT delete devices; and no orphaned devices left.
  142. // While testing above scenario, plugin discovery and registration will be done using
  143. // Kubelet probe based mechanism
  144. func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
  145. socketDir, socketName, pluginSocketName, err := tmpSocketDir()
  146. require.NoError(t, err)
  147. defer os.RemoveAll(socketDir)
  148. devs := []*pluginapi.Device{
  149. {ID: "Dev1", Health: pluginapi.Healthy},
  150. {ID: "Dev2", Health: pluginapi.Healthy},
  151. }
  152. devsForRegistration := []*pluginapi.Device{
  153. {ID: "Dev3", Health: pluginapi.Healthy},
  154. }
  155. m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
  156. // Wait for the first callback to be issued.
  157. select {
  158. case <-ch:
  159. case <-time.After(5 * time.Second):
  160. t.FailNow()
  161. }
  162. capacity, allocatable, _ := m.GetCapacity()
  163. resourceCapacity := capacity[v1.ResourceName(testResourceName)]
  164. resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
  165. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  166. require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
  167. p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
  168. err = p2.Start()
  169. require.NoError(t, err)
  170. // Wait for the second callback to be issued.
  171. select {
  172. case <-ch:
  173. case <-time.After(5 * time.Second):
  174. t.FailNow()
  175. }
  176. capacity, allocatable, _ = m.GetCapacity()
  177. resourceCapacity = capacity[v1.ResourceName(testResourceName)]
  178. resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
  179. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  180. require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
  181. // Test the scenario that a plugin re-registers with different devices.
  182. p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
  183. err = p3.Start()
  184. require.NoError(t, err)
  185. // Wait for the third callback to be issued.
  186. select {
  187. case <-ch:
  188. case <-time.After(5 * time.Second):
  189. t.FailNow()
  190. }
  191. capacity, allocatable, _ = m.GetCapacity()
  192. resourceCapacity = capacity[v1.ResourceName(testResourceName)]
  193. resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
  194. require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
  195. require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
  196. p2.Stop()
  197. p3.Stop()
  198. cleanup(t, m, p1)
  199. }
  200. func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
  201. topologyStore := topologymanager.NewFakeManager()
  202. m, err := newManagerImpl(socketName, nil, topologyStore)
  203. require.NoError(t, err)
  204. updateChan := make(chan interface{})
  205. if callback != nil {
  206. m.callback = callback
  207. }
  208. originalCallback := m.callback
  209. m.callback = func(resourceName string, devices []pluginapi.Device) {
  210. originalCallback(resourceName, devices)
  211. updateChan <- new(interface{})
  212. }
  213. activePods := func() []*v1.Pod {
  214. return []*v1.Pod{}
  215. }
  216. err = m.Start(activePods, &sourcesReadyStub{})
  217. require.NoError(t, err)
  218. return m, updateChan
  219. }
  220. func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
  221. p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false)
  222. err := p.Start()
  223. require.NoError(t, err)
  224. return p
  225. }
  226. func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
  227. pluginManager := pluginmanager.NewPluginManager(
  228. filepath.Dir(pluginSocketName), /* sockDir */
  229. &record.FakeRecorder{},
  230. )
  231. runPluginManager(pluginManager)
  232. pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
  233. return pluginManager
  234. }
  235. func runPluginManager(pluginManager pluginmanager.PluginManager) {
  236. sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
  237. go pluginManager.Run(sourcesReady, wait.NeverStop)
  238. }
  239. func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
  240. m, updateChan := setupDeviceManager(t, devs, callback, socketName)
  241. p := setupDevicePlugin(t, devs, pluginSocketName)
  242. return m, updateChan, p
  243. }
  244. func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) {
  245. m, updateChan := setupDeviceManager(t, devs, callback, socketName)
  246. pm := setupPluginManager(t, pluginSocketName, m)
  247. p := setupDevicePlugin(t, devs, pluginSocketName)
  248. return m, updateChan, p, pm
  249. }
  250. func cleanup(t *testing.T, m Manager, p *Stub) {
  251. p.Stop()
  252. m.Stop()
  253. }
  254. func TestUpdateCapacityAllocatable(t *testing.T) {
  255. socketDir, socketName, _, err := tmpSocketDir()
  256. topologyStore := topologymanager.NewFakeManager()
  257. require.NoError(t, err)
  258. defer os.RemoveAll(socketDir)
  259. testManager, err := newManagerImpl(socketName, nil, topologyStore)
  260. as := assert.New(t)
  261. as.NotNil(testManager)
  262. as.Nil(err)
  263. devs := []pluginapi.Device{
  264. {ID: "Device1", Health: pluginapi.Healthy},
  265. {ID: "Device2", Health: pluginapi.Healthy},
  266. {ID: "Device3", Health: pluginapi.Unhealthy},
  267. }
  268. callback := testManager.genericDeviceUpdateCallback
  269. // Adds three devices for resource1, two healthy and one unhealthy.
  270. // Expects capacity for resource1 to be 2.
  271. resourceName1 := "domain1.com/resource1"
  272. e1 := &endpointImpl{}
  273. testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
  274. callback(resourceName1, devs)
  275. capacity, allocatable, removedResources := testManager.GetCapacity()
  276. resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
  277. as.True(ok)
  278. resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
  279. as.True(ok)
  280. as.Equal(int64(3), resource1Capacity.Value())
  281. as.Equal(int64(2), resource1Allocatable.Value())
  282. as.Equal(0, len(removedResources))
  283. // Deletes an unhealthy device should NOT change allocatable but change capacity.
  284. devs1 := devs[:len(devs)-1]
  285. callback(resourceName1, devs1)
  286. capacity, allocatable, removedResources = testManager.GetCapacity()
  287. resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
  288. as.True(ok)
  289. resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
  290. as.True(ok)
  291. as.Equal(int64(2), resource1Capacity.Value())
  292. as.Equal(int64(2), resource1Allocatable.Value())
  293. as.Equal(0, len(removedResources))
  294. // Updates a healthy device to unhealthy should reduce allocatable by 1.
  295. devs[1].Health = pluginapi.Unhealthy
  296. callback(resourceName1, devs)
  297. capacity, allocatable, removedResources = testManager.GetCapacity()
  298. resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
  299. as.True(ok)
  300. resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
  301. as.True(ok)
  302. as.Equal(int64(3), resource1Capacity.Value())
  303. as.Equal(int64(1), resource1Allocatable.Value())
  304. as.Equal(0, len(removedResources))
  305. // Deletes a healthy device should reduce capacity and allocatable by 1.
  306. devs2 := devs[1:]
  307. callback(resourceName1, devs2)
  308. capacity, allocatable, removedResources = testManager.GetCapacity()
  309. resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
  310. as.True(ok)
  311. resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
  312. as.True(ok)
  313. as.Equal(int64(0), resource1Allocatable.Value())
  314. as.Equal(int64(2), resource1Capacity.Value())
  315. as.Equal(0, len(removedResources))
  316. // Tests adding another resource.
  317. resourceName2 := "resource2"
  318. e2 := &endpointImpl{}
  319. testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
  320. callback(resourceName2, devs)
  321. capacity, allocatable, removedResources = testManager.GetCapacity()
  322. as.Equal(2, len(capacity))
  323. resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
  324. as.True(ok)
  325. resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
  326. as.True(ok)
  327. as.Equal(int64(3), resource2Capacity.Value())
  328. as.Equal(int64(1), resource2Allocatable.Value())
  329. as.Equal(0, len(removedResources))
  330. // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
  331. // is removed from capacity and it no longer exists in healthyDevices after the call.
  332. e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
  333. capacity, allocatable, removed := testManager.GetCapacity()
  334. as.Equal([]string{resourceName1}, removed)
  335. as.NotContains(capacity, v1.ResourceName(resourceName1))
  336. as.NotContains(allocatable, v1.ResourceName(resourceName1))
  337. val, ok := capacity[v1.ResourceName(resourceName2)]
  338. as.True(ok)
  339. as.Equal(int64(3), val.Value())
  340. as.NotContains(testManager.healthyDevices, resourceName1)
  341. as.NotContains(testManager.unhealthyDevices, resourceName1)
  342. as.NotContains(testManager.endpoints, resourceName1)
  343. as.Equal(1, len(testManager.endpoints))
  344. // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
  345. // preStartContainer calls return errors.
  346. e2.stop()
  347. as.False(e2.stopTime.IsZero())
  348. _, err = e2.allocate([]string{"Device1"})
  349. reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
  350. _, err = e2.preStartContainer([]string{"Device1"})
  351. reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
  352. // Marks resourceName2 unhealthy and verifies its capacity/allocatable are
  353. // correctly updated.
  354. testManager.markResourceUnhealthy(resourceName2)
  355. capacity, allocatable, removed = testManager.GetCapacity()
  356. val, ok = capacity[v1.ResourceName(resourceName2)]
  357. as.True(ok)
  358. as.Equal(int64(3), val.Value())
  359. val, ok = allocatable[v1.ResourceName(resourceName2)]
  360. as.True(ok)
  361. as.Equal(int64(0), val.Value())
  362. as.Empty(removed)
  363. // Writes and re-reads checkpoints. Verifies we create a stopped endpoint
  364. // for resourceName2, its capacity is set to zero, and we still consider
  365. // it as a DevicePlugin resource. This makes sure any pod that was scheduled
  366. // during the time of propagating capacity change to the scheduler will be
  367. // properly rejected instead of being incorrectly started.
  368. err = testManager.writeCheckpoint()
  369. as.Nil(err)
  370. testManager.healthyDevices = make(map[string]sets.String)
  371. testManager.unhealthyDevices = make(map[string]sets.String)
  372. err = testManager.readCheckpoint()
  373. as.Nil(err)
  374. as.Equal(1, len(testManager.endpoints))
  375. as.Contains(testManager.endpoints, resourceName2)
  376. capacity, allocatable, removed = testManager.GetCapacity()
  377. val, ok = capacity[v1.ResourceName(resourceName2)]
  378. as.True(ok)
  379. as.Equal(int64(0), val.Value())
  380. val, ok = allocatable[v1.ResourceName(resourceName2)]
  381. as.True(ok)
  382. as.Equal(int64(0), val.Value())
  383. as.Empty(removed)
  384. as.True(testManager.isDevicePluginResource(resourceName2))
  385. }
  386. func constructDevices(devices []string) sets.String {
  387. ret := sets.NewString()
  388. for _, dev := range devices {
  389. ret.Insert(dev)
  390. }
  391. return ret
  392. }
  393. func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.ContainerAllocateResponse {
  394. resp := &pluginapi.ContainerAllocateResponse{}
  395. for k, v := range devices {
  396. resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  397. HostPath: k,
  398. ContainerPath: v,
  399. Permissions: "mrw",
  400. })
  401. }
  402. for k, v := range mounts {
  403. resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
  404. ContainerPath: k,
  405. HostPath: v,
  406. ReadOnly: true,
  407. })
  408. }
  409. resp.Envs = make(map[string]string)
  410. for k, v := range envs {
  411. resp.Envs[k] = v
  412. }
  413. return resp
  414. }
  415. func TestCheckpoint(t *testing.T) {
  416. resourceName1 := "domain1.com/resource1"
  417. resourceName2 := "domain2.com/resource2"
  418. as := assert.New(t)
  419. tmpDir, err := ioutil.TempDir("", "checkpoint")
  420. as.Nil(err)
  421. ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  422. as.Nil(err)
  423. testManager := &ManagerImpl{
  424. endpoints: make(map[string]endpointInfo),
  425. healthyDevices: make(map[string]sets.String),
  426. unhealthyDevices: make(map[string]sets.String),
  427. allocatedDevices: make(map[string]sets.String),
  428. podDevices: make(podDevices),
  429. checkpointManager: ckm,
  430. }
  431. testManager.podDevices.insert("pod1", "con1", resourceName1,
  432. constructDevices([]string{"dev1", "dev2"}),
  433. constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
  434. map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
  435. testManager.podDevices.insert("pod1", "con1", resourceName2,
  436. constructDevices([]string{"dev1", "dev2"}),
  437. constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
  438. map[string]string{"/home/r2lib1": "/usr/r2lib1"},
  439. map[string]string{"r2devices": "dev1 dev2"}))
  440. testManager.podDevices.insert("pod1", "con2", resourceName1,
  441. constructDevices([]string{"dev3"}),
  442. constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
  443. map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
  444. testManager.podDevices.insert("pod2", "con1", resourceName1,
  445. constructDevices([]string{"dev4"}),
  446. constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
  447. map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
  448. testManager.healthyDevices[resourceName1] = sets.NewString()
  449. testManager.healthyDevices[resourceName1].Insert("dev1")
  450. testManager.healthyDevices[resourceName1].Insert("dev2")
  451. testManager.healthyDevices[resourceName1].Insert("dev3")
  452. testManager.healthyDevices[resourceName1].Insert("dev4")
  453. testManager.healthyDevices[resourceName1].Insert("dev5")
  454. testManager.healthyDevices[resourceName2] = sets.NewString()
  455. testManager.healthyDevices[resourceName2].Insert("dev1")
  456. testManager.healthyDevices[resourceName2].Insert("dev2")
  457. expectedPodDevices := testManager.podDevices
  458. expectedAllocatedDevices := testManager.podDevices.devices()
  459. expectedAllDevices := testManager.healthyDevices
  460. err = testManager.writeCheckpoint()
  461. as.Nil(err)
  462. testManager.podDevices = make(podDevices)
  463. err = testManager.readCheckpoint()
  464. as.Nil(err)
  465. as.Equal(len(expectedPodDevices), len(testManager.podDevices))
  466. for podUID, containerDevices := range expectedPodDevices {
  467. for conName, resources := range containerDevices {
  468. for resource := range resources {
  469. expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
  470. testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
  471. as.True(reflect.DeepEqual(expDevices, testDevices))
  472. opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
  473. opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
  474. as.Equal(len(opts1.Envs), len(opts2.Envs))
  475. as.Equal(len(opts1.Mounts), len(opts2.Mounts))
  476. as.Equal(len(opts1.Devices), len(opts2.Devices))
  477. }
  478. }
  479. }
  480. as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
  481. as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
  482. }
  483. type activePodsStub struct {
  484. activePods []*v1.Pod
  485. }
  486. func (a *activePodsStub) getActivePods() []*v1.Pod {
  487. return a.activePods
  488. }
  489. func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
  490. a.activePods = newPods
  491. }
  492. type MockEndpoint struct {
  493. allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
  494. initChan chan []string
  495. }
  496. func (m *MockEndpoint) stop() {}
  497. func (m *MockEndpoint) run() {}
  498. func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {}
  499. func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
  500. m.initChan <- devs
  501. return &pluginapi.PreStartContainerResponse{}, nil
  502. }
  503. func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
  504. if m.allocateFunc != nil {
  505. return m.allocateFunc(devs)
  506. }
  507. return nil, nil
  508. }
  509. func (m *MockEndpoint) isStopped() bool { return false }
  510. func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
  511. func makePod(limits v1.ResourceList) *v1.Pod {
  512. return &v1.Pod{
  513. ObjectMeta: metav1.ObjectMeta{
  514. UID: uuid.NewUUID(),
  515. },
  516. Spec: v1.PodSpec{
  517. Containers: []v1.Container{
  518. {
  519. Resources: v1.ResourceRequirements{
  520. Limits: limits,
  521. },
  522. },
  523. },
  524. },
  525. }
  526. }
  527. func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) {
  528. monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
  529. ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  530. if err != nil {
  531. return nil, err
  532. }
  533. testManager := &ManagerImpl{
  534. socketdir: tmpDir,
  535. callback: monitorCallback,
  536. healthyDevices: make(map[string]sets.String),
  537. unhealthyDevices: make(map[string]sets.String),
  538. allocatedDevices: make(map[string]sets.String),
  539. endpoints: make(map[string]endpointInfo),
  540. podDevices: make(podDevices),
  541. topologyAffinityStore: topologymanager.NewFakeManager(),
  542. activePods: activePods,
  543. sourcesReady: &sourcesReadyStub{},
  544. checkpointManager: ckm,
  545. }
  546. for _, res := range testRes {
  547. testManager.healthyDevices[res.resourceName] = sets.NewString()
  548. for _, dev := range res.devs {
  549. testManager.healthyDevices[res.resourceName].Insert(dev)
  550. }
  551. if res.resourceName == "domain1.com/resource1" {
  552. testManager.endpoints[res.resourceName] = endpointInfo{
  553. e: &MockEndpoint{allocateFunc: allocateStubFunc()},
  554. opts: nil,
  555. }
  556. }
  557. if res.resourceName == "domain2.com/resource2" {
  558. testManager.endpoints[res.resourceName] = endpointInfo{
  559. e: &MockEndpoint{
  560. allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
  561. resp := new(pluginapi.ContainerAllocateResponse)
  562. resp.Envs = make(map[string]string)
  563. for _, dev := range devs {
  564. switch dev {
  565. case "dev3":
  566. resp.Envs["key2"] = "val2"
  567. case "dev4":
  568. resp.Envs["key2"] = "val3"
  569. }
  570. }
  571. resps := new(pluginapi.AllocateResponse)
  572. resps.ContainerResponses = append(resps.ContainerResponses, resp)
  573. return resps, nil
  574. },
  575. },
  576. opts: nil,
  577. }
  578. }
  579. }
  580. return testManager, nil
  581. }
  582. func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo {
  583. cachedNode := &v1.Node{
  584. Status: v1.NodeStatus{
  585. Allocatable: allocatable,
  586. },
  587. }
  588. nodeInfo := &schedulernodeinfo.NodeInfo{}
  589. nodeInfo.SetNode(cachedNode)
  590. return nodeInfo
  591. }
  592. type TestResource struct {
  593. resourceName string
  594. resourceQuantity resource.Quantity
  595. devs []string
  596. }
  597. func TestPodContainerDeviceAllocation(t *testing.T) {
  598. res1 := TestResource{
  599. resourceName: "domain1.com/resource1",
  600. resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  601. devs: []string{"dev1", "dev2"},
  602. }
  603. res2 := TestResource{
  604. resourceName: "domain2.com/resource2",
  605. resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
  606. devs: []string{"dev3", "dev4"},
  607. }
  608. testResources := make([]TestResource, 2)
  609. testResources = append(testResources, res1)
  610. testResources = append(testResources, res2)
  611. as := require.New(t)
  612. podsStub := activePodsStub{
  613. activePods: []*v1.Pod{},
  614. }
  615. tmpDir, err := ioutil.TempDir("", "checkpoint")
  616. as.Nil(err)
  617. defer os.RemoveAll(tmpDir)
  618. nodeInfo := getTestNodeInfo(v1.ResourceList{})
  619. testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  620. as.Nil(err)
  621. testPods := []*v1.Pod{
  622. makePod(v1.ResourceList{
  623. v1.ResourceName(res1.resourceName): res1.resourceQuantity,
  624. v1.ResourceName("cpu"): res1.resourceQuantity,
  625. v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
  626. makePod(v1.ResourceList{
  627. v1.ResourceName(res1.resourceName): res2.resourceQuantity}),
  628. makePod(v1.ResourceList{
  629. v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
  630. }
  631. testCases := []struct {
  632. description string
  633. testPod *v1.Pod
  634. expectedContainerOptsLen []int
  635. expectedAllocatedResName1 int
  636. expectedAllocatedResName2 int
  637. expErr error
  638. }{
  639. {
  640. description: "Successful allocation of two Res1 resources and one Res2 resource",
  641. testPod: testPods[0],
  642. expectedContainerOptsLen: []int{3, 2, 2},
  643. expectedAllocatedResName1: 2,
  644. expectedAllocatedResName2: 1,
  645. expErr: nil,
  646. },
  647. {
  648. description: "Requesting to create a pod without enough resources should fail",
  649. testPod: testPods[1],
  650. expectedContainerOptsLen: nil,
  651. expectedAllocatedResName1: 2,
  652. expectedAllocatedResName2: 1,
  653. expErr: fmt.Errorf("requested number of devices unavailable for domain1.com/resource1. Requested: 1, Available: 0"),
  654. },
  655. {
  656. description: "Successful allocation of all available Res1 resources and Res2 resources",
  657. testPod: testPods[2],
  658. expectedContainerOptsLen: []int{0, 0, 1},
  659. expectedAllocatedResName1: 2,
  660. expectedAllocatedResName2: 2,
  661. expErr: nil,
  662. },
  663. }
  664. activePods := []*v1.Pod{}
  665. for _, testCase := range testCases {
  666. pod := testCase.testPod
  667. activePods = append(activePods, pod)
  668. podsStub.updateActivePods(activePods)
  669. err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
  670. if !reflect.DeepEqual(err, testCase.expErr) {
  671. t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
  672. testCase.description, testCase.expErr, err)
  673. }
  674. runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
  675. if testCase.expErr == nil {
  676. as.Nil(err)
  677. }
  678. if testCase.expectedContainerOptsLen == nil {
  679. as.Nil(runContainerOpts)
  680. } else {
  681. as.Equal(len(runContainerOpts.Devices), testCase.expectedContainerOptsLen[0])
  682. as.Equal(len(runContainerOpts.Mounts), testCase.expectedContainerOptsLen[1])
  683. as.Equal(len(runContainerOpts.Envs), testCase.expectedContainerOptsLen[2])
  684. }
  685. as.Equal(testCase.expectedAllocatedResName1, testManager.allocatedDevices[res1.resourceName].Len())
  686. as.Equal(testCase.expectedAllocatedResName2, testManager.allocatedDevices[res2.resourceName].Len())
  687. }
  688. }
  689. func TestInitContainerDeviceAllocation(t *testing.T) {
  690. // Requesting to create a pod that requests resourceName1 in init containers and normal containers
  691. // should succeed with devices allocated to init containers reallocated to normal containers.
  692. res1 := TestResource{
  693. resourceName: "domain1.com/resource1",
  694. resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  695. devs: []string{"dev1", "dev2"},
  696. }
  697. res2 := TestResource{
  698. resourceName: "domain2.com/resource2",
  699. resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
  700. devs: []string{"dev3", "dev4"},
  701. }
  702. testResources := make([]TestResource, 2)
  703. testResources = append(testResources, res1)
  704. testResources = append(testResources, res2)
  705. as := require.New(t)
  706. podsStub := activePodsStub{
  707. activePods: []*v1.Pod{},
  708. }
  709. nodeInfo := getTestNodeInfo(v1.ResourceList{})
  710. tmpDir, err := ioutil.TempDir("", "checkpoint")
  711. as.Nil(err)
  712. defer os.RemoveAll(tmpDir)
  713. testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  714. as.Nil(err)
  715. podWithPluginResourcesInInitContainers := &v1.Pod{
  716. ObjectMeta: metav1.ObjectMeta{
  717. UID: uuid.NewUUID(),
  718. },
  719. Spec: v1.PodSpec{
  720. InitContainers: []v1.Container{
  721. {
  722. Name: string(uuid.NewUUID()),
  723. Resources: v1.ResourceRequirements{
  724. Limits: v1.ResourceList{
  725. v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  726. },
  727. },
  728. },
  729. {
  730. Name: string(uuid.NewUUID()),
  731. Resources: v1.ResourceRequirements{
  732. Limits: v1.ResourceList{
  733. v1.ResourceName(res1.resourceName): res1.resourceQuantity,
  734. },
  735. },
  736. },
  737. },
  738. Containers: []v1.Container{
  739. {
  740. Name: string(uuid.NewUUID()),
  741. Resources: v1.ResourceRequirements{
  742. Limits: v1.ResourceList{
  743. v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  744. v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  745. },
  746. },
  747. },
  748. {
  749. Name: string(uuid.NewUUID()),
  750. Resources: v1.ResourceRequirements{
  751. Limits: v1.ResourceList{
  752. v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  753. v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  754. },
  755. },
  756. },
  757. },
  758. },
  759. }
  760. podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
  761. err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers})
  762. as.Nil(err)
  763. podUID := string(podWithPluginResourcesInInitContainers.UID)
  764. initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
  765. initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name
  766. normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name
  767. normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name
  768. initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, res1.resourceName)
  769. initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, res1.resourceName)
  770. normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
  771. normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
  772. as.Equal(1, initCont1Devices.Len())
  773. as.Equal(2, initCont2Devices.Len())
  774. as.Equal(1, normalCont1Devices.Len())
  775. as.Equal(1, normalCont2Devices.Len())
  776. as.True(initCont2Devices.IsSuperset(initCont1Devices))
  777. as.True(initCont2Devices.IsSuperset(normalCont1Devices))
  778. as.True(initCont2Devices.IsSuperset(normalCont2Devices))
  779. as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
  780. }
  781. func TestSanitizeNodeAllocatable(t *testing.T) {
  782. resourceName1 := "domain1.com/resource1"
  783. devID1 := "dev1"
  784. resourceName2 := "domain2.com/resource2"
  785. devID2 := "dev2"
  786. as := assert.New(t)
  787. monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
  788. tmpDir, err := ioutil.TempDir("", "checkpoint")
  789. as.Nil(err)
  790. ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  791. as.Nil(err)
  792. testManager := &ManagerImpl{
  793. callback: monitorCallback,
  794. allocatedDevices: make(map[string]sets.String),
  795. healthyDevices: make(map[string]sets.String),
  796. podDevices: make(podDevices),
  797. checkpointManager: ckm,
  798. }
  799. // require one of resource1 and one of resource2
  800. testManager.allocatedDevices[resourceName1] = sets.NewString()
  801. testManager.allocatedDevices[resourceName1].Insert(devID1)
  802. testManager.allocatedDevices[resourceName2] = sets.NewString()
  803. testManager.allocatedDevices[resourceName2].Insert(devID2)
  804. cachedNode := &v1.Node{
  805. Status: v1.NodeStatus{
  806. Allocatable: v1.ResourceList{
  807. // has no resource1 and two of resource2
  808. v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
  809. },
  810. },
  811. }
  812. nodeInfo := &schedulernodeinfo.NodeInfo{}
  813. nodeInfo.SetNode(cachedNode)
  814. testManager.sanitizeNodeAllocatable(nodeInfo)
  815. allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
  816. // allocatable in nodeInfo is less than needed, should update
  817. as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
  818. // allocatable in nodeInfo is more than needed, should skip updating
  819. as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
  820. }
  821. func TestDevicePreStartContainer(t *testing.T) {
  822. // Ensures that if device manager is indicated to invoke `PreStartContainer` RPC
  823. // by device plugin, then device manager invokes PreStartContainer at endpoint interface.
  824. // Also verifies that final allocation of mounts, envs etc is same as expected.
  825. res1 := TestResource{
  826. resourceName: "domain1.com/resource1",
  827. resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  828. devs: []string{"dev1", "dev2"},
  829. }
  830. as := require.New(t)
  831. podsStub := activePodsStub{
  832. activePods: []*v1.Pod{},
  833. }
  834. tmpDir, err := ioutil.TempDir("", "checkpoint")
  835. as.Nil(err)
  836. defer os.RemoveAll(tmpDir)
  837. nodeInfo := getTestNodeInfo(v1.ResourceList{})
  838. testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
  839. as.Nil(err)
  840. ch := make(chan []string, 1)
  841. testManager.endpoints[res1.resourceName] = endpointInfo{
  842. e: &MockEndpoint{
  843. initChan: ch,
  844. allocateFunc: allocateStubFunc(),
  845. },
  846. opts: &pluginapi.DevicePluginOptions{PreStartRequired: true},
  847. }
  848. pod := makePod(v1.ResourceList{
  849. v1.ResourceName(res1.resourceName): res1.resourceQuantity})
  850. activePods := []*v1.Pod{}
  851. activePods = append(activePods, pod)
  852. podsStub.updateActivePods(activePods)
  853. err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
  854. as.Nil(err)
  855. runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
  856. as.Nil(err)
  857. var initializedDevs []string
  858. select {
  859. case <-time.After(time.Second):
  860. t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
  861. case initializedDevs = <-ch:
  862. break
  863. }
  864. as.Contains(initializedDevs, "dev1")
  865. as.Contains(initializedDevs, "dev2")
  866. as.Equal(len(initializedDevs), len(res1.devs))
  867. expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"})
  868. as.Nil(err)
  869. as.Equal(1, len(expectedResps.ContainerResponses))
  870. expectedResp := expectedResps.ContainerResponses[0]
  871. as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
  872. as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
  873. as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
  874. }
  875. func TestResetExtendedResource(t *testing.T) {
  876. as := assert.New(t)
  877. tmpDir, err := ioutil.TempDir("", "checkpoint")
  878. as.Nil(err)
  879. ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  880. as.Nil(err)
  881. testManager := &ManagerImpl{
  882. endpoints: make(map[string]endpointInfo),
  883. healthyDevices: make(map[string]sets.String),
  884. unhealthyDevices: make(map[string]sets.String),
  885. allocatedDevices: make(map[string]sets.String),
  886. podDevices: make(podDevices),
  887. checkpointManager: ckm,
  888. }
  889. extendedResourceName := "domain.com/resource"
  890. testManager.podDevices.insert("pod", "con", extendedResourceName,
  891. constructDevices([]string{"dev1"}),
  892. constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"},
  893. map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{}))
  894. testManager.healthyDevices[extendedResourceName] = sets.NewString()
  895. testManager.healthyDevices[extendedResourceName].Insert("dev1")
  896. // checkpoint is present, indicating node hasn't been recreated
  897. err = testManager.writeCheckpoint()
  898. as.Nil(err)
  899. as.False(testManager.ShouldResetExtendedResourceCapacity())
  900. // checkpoint is absent, representing node recreation
  901. ckpts, err := ckm.ListCheckpoints()
  902. as.Nil(err)
  903. for _, ckpt := range ckpts {
  904. err = ckm.RemoveCheckpoint(ckpt)
  905. as.Nil(err)
  906. }
  907. as.True(testManager.ShouldResetExtendedResourceCapacity())
  908. }
  909. func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
  910. return func(devs []string) (*pluginapi.AllocateResponse, error) {
  911. resp := new(pluginapi.ContainerAllocateResponse)
  912. resp.Envs = make(map[string]string)
  913. for _, dev := range devs {
  914. switch dev {
  915. case "dev1":
  916. resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  917. ContainerPath: "/dev/aaa",
  918. HostPath: "/dev/aaa",
  919. Permissions: "mrw",
  920. })
  921. resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  922. ContainerPath: "/dev/bbb",
  923. HostPath: "/dev/bbb",
  924. Permissions: "mrw",
  925. })
  926. resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
  927. ContainerPath: "/container_dir1/file1",
  928. HostPath: "host_dir1/file1",
  929. ReadOnly: true,
  930. })
  931. case "dev2":
  932. resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  933. ContainerPath: "/dev/ccc",
  934. HostPath: "/dev/ccc",
  935. Permissions: "mrw",
  936. })
  937. resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
  938. ContainerPath: "/container_dir1/file2",
  939. HostPath: "host_dir1/file2",
  940. ReadOnly: true,
  941. })
  942. resp.Envs["key1"] = "val1"
  943. }
  944. }
  945. resps := new(pluginapi.AllocateResponse)
  946. resps.ContainerResponses = append(resps.ContainerResponses, resp)
  947. return resps, nil
  948. }
  949. }