123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package devicemanager
- import (
- "fmt"
- "io/ioutil"
- "os"
- "path/filepath"
- "reflect"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/uuid"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/record"
- pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
- watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
- "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
- "k8s.io/kubernetes/pkg/kubelet/config"
- "k8s.io/kubernetes/pkg/kubelet/lifecycle"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- )
- const (
- testResourceName = "fake-domain/resource"
- )
- func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) {
- socketDir, err = ioutil.TempDir("", "device_plugin")
- if err != nil {
- return
- }
- socketName = socketDir + "/server.sock"
- pluginSocketName = socketDir + "/device-plugin.sock"
- os.MkdirAll(socketDir, 0755)
- return
- }
- func TestNewManagerImpl(t *testing.T) {
- socketDir, socketName, _, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- _, err = newManagerImpl(socketName)
- require.NoError(t, err)
- os.RemoveAll(socketDir)
- }
- func TestNewManagerImplStart(t *testing.T) {
- socketDir, socketName, pluginSocketName, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
- cleanup(t, m, p)
- // Stop should tolerate being called more than once.
- cleanup(t, m, p)
- }
- func TestNewManagerImplStartProbeMode(t *testing.T) {
- socketDir, socketName, pluginSocketName, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
- cleanup(t, m, p)
- }
- // Tests that the device plugin manager correctly handles registration and re-registration by
- // making sure that after registration, devices are correctly updated and if a re-registration
- // happens, we will NOT delete devices; and no orphaned devices left.
- func TestDevicePluginReRegistration(t *testing.T) {
- socketDir, socketName, pluginSocketName, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- devs := []*pluginapi.Device{
- {ID: "Dev1", Health: pluginapi.Healthy},
- {ID: "Dev2", Health: pluginapi.Healthy},
- }
- devsForRegistration := []*pluginapi.Device{
- {ID: "Dev3", Health: pluginapi.Healthy},
- }
- for _, preStartContainerFlag := range []bool{false, true} {
- m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
- p1.Register(socketName, testResourceName, "")
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.Fatalf("timeout while waiting for manager update")
- }
- capacity, allocatable, _ := m.GetCapacity()
- resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
- p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
- err = p2.Start()
- require.NoError(t, err)
- p2.Register(socketName, testResourceName, "")
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.Fatalf("timeout while waiting for manager update")
- }
- capacity, allocatable, _ = m.GetCapacity()
- resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
- // Test the scenario that a plugin re-registers with different devices.
- p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
- err = p3.Start()
- require.NoError(t, err)
- p3.Register(socketName, testResourceName, "")
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.Fatalf("timeout while waiting for manager update")
- }
- capacity, allocatable, _ = m.GetCapacity()
- resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
- p2.Stop()
- p3.Stop()
- cleanup(t, m, p1)
- }
- }
- // Tests that the device plugin manager correctly handles registration and re-registration by
- // making sure that after registration, devices are correctly updated and if a re-registration
- // happens, we will NOT delete devices; and no orphaned devices left.
- // While testing above scenario, plugin discovery and registration will be done using
- // Kubelet probe based mechanism
- func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
- socketDir, socketName, pluginSocketName, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- devs := []*pluginapi.Device{
- {ID: "Dev1", Health: pluginapi.Healthy},
- {ID: "Dev2", Health: pluginapi.Healthy},
- }
- devsForRegistration := []*pluginapi.Device{
- {ID: "Dev3", Health: pluginapi.Healthy},
- }
- m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
- // Wait for the first callback to be issued.
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.FailNow()
- }
- capacity, allocatable, _ := m.GetCapacity()
- resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
- p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
- err = p2.Start()
- require.NoError(t, err)
- // Wait for the second callback to be issued.
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.FailNow()
- }
- capacity, allocatable, _ = m.GetCapacity()
- resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
- // Test the scenario that a plugin re-registers with different devices.
- p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
- err = p3.Start()
- require.NoError(t, err)
- // Wait for the third callback to be issued.
- select {
- case <-ch:
- case <-time.After(5 * time.Second):
- t.FailNow()
- }
- capacity, allocatable, _ = m.GetCapacity()
- resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
- resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
- require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
- require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
- p2.Stop()
- p3.Stop()
- cleanup(t, m, p1)
- }
- func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
- m, err := newManagerImpl(socketName)
- require.NoError(t, err)
- updateChan := make(chan interface{})
- if callback != nil {
- m.callback = callback
- }
- originalCallback := m.callback
- m.callback = func(resourceName string, devices []pluginapi.Device) {
- originalCallback(resourceName, devices)
- updateChan <- new(interface{})
- }
- activePods := func() []*v1.Pod {
- return []*v1.Pod{}
- }
- err = m.Start(activePods, &sourcesReadyStub{})
- require.NoError(t, err)
- return m, updateChan
- }
- func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
- p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false)
- err := p.Start()
- require.NoError(t, err)
- return p
- }
- func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
- pluginManager := pluginmanager.NewPluginManager(
- filepath.Dir(pluginSocketName), /* sockDir */
- "", /* deprecatedSockDir */
- &record.FakeRecorder{},
- )
- runPluginManager(pluginManager)
- pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
- return pluginManager
- }
- func runPluginManager(pluginManager pluginmanager.PluginManager) {
- sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
- go pluginManager.Run(sourcesReady, wait.NeverStop)
- }
- func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
- m, updateChan := setupDeviceManager(t, devs, callback, socketName)
- p := setupDevicePlugin(t, devs, pluginSocketName)
- return m, updateChan, p
- }
- func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) {
- m, updateChan := setupDeviceManager(t, devs, callback, socketName)
- pm := setupPluginManager(t, pluginSocketName, m)
- p := setupDevicePlugin(t, devs, pluginSocketName)
- return m, updateChan, p, pm
- }
- func cleanup(t *testing.T, m Manager, p *Stub) {
- p.Stop()
- m.Stop()
- }
- func TestUpdateCapacityAllocatable(t *testing.T) {
- socketDir, socketName, _, err := tmpSocketDir()
- require.NoError(t, err)
- defer os.RemoveAll(socketDir)
- testManager, err := newManagerImpl(socketName)
- as := assert.New(t)
- as.NotNil(testManager)
- as.Nil(err)
- devs := []pluginapi.Device{
- {ID: "Device1", Health: pluginapi.Healthy},
- {ID: "Device2", Health: pluginapi.Healthy},
- {ID: "Device3", Health: pluginapi.Unhealthy},
- }
- callback := testManager.genericDeviceUpdateCallback
- // Adds three devices for resource1, two healthy and one unhealthy.
- // Expects capacity for resource1 to be 2.
- resourceName1 := "domain1.com/resource1"
- e1 := &endpointImpl{}
- testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
- callback(resourceName1, devs)
- capacity, allocatable, removedResources := testManager.GetCapacity()
- resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
- as.True(ok)
- resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
- as.True(ok)
- as.Equal(int64(3), resource1Capacity.Value())
- as.Equal(int64(2), resource1Allocatable.Value())
- as.Equal(0, len(removedResources))
- // Deletes an unhealthy device should NOT change allocatable but change capacity.
- devs1 := devs[:len(devs)-1]
- callback(resourceName1, devs1)
- capacity, allocatable, removedResources = testManager.GetCapacity()
- resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
- as.True(ok)
- resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
- as.True(ok)
- as.Equal(int64(2), resource1Capacity.Value())
- as.Equal(int64(2), resource1Allocatable.Value())
- as.Equal(0, len(removedResources))
- // Updates a healthy device to unhealthy should reduce allocatable by 1.
- devs[1].Health = pluginapi.Unhealthy
- callback(resourceName1, devs)
- capacity, allocatable, removedResources = testManager.GetCapacity()
- resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
- as.True(ok)
- resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
- as.True(ok)
- as.Equal(int64(3), resource1Capacity.Value())
- as.Equal(int64(1), resource1Allocatable.Value())
- as.Equal(0, len(removedResources))
- // Deletes a healthy device should reduce capacity and allocatable by 1.
- devs2 := devs[1:]
- callback(resourceName1, devs2)
- capacity, allocatable, removedResources = testManager.GetCapacity()
- resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
- as.True(ok)
- resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
- as.True(ok)
- as.Equal(int64(0), resource1Allocatable.Value())
- as.Equal(int64(2), resource1Capacity.Value())
- as.Equal(0, len(removedResources))
- // Tests adding another resource.
- resourceName2 := "resource2"
- e2 := &endpointImpl{}
- testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
- callback(resourceName2, devs)
- capacity, allocatable, removedResources = testManager.GetCapacity()
- as.Equal(2, len(capacity))
- resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
- as.True(ok)
- resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
- as.True(ok)
- as.Equal(int64(3), resource2Capacity.Value())
- as.Equal(int64(1), resource2Allocatable.Value())
- as.Equal(0, len(removedResources))
- // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
- // is removed from capacity and it no longer exists in healthyDevices after the call.
- e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
- capacity, allocatable, removed := testManager.GetCapacity()
- as.Equal([]string{resourceName1}, removed)
- _, ok = capacity[v1.ResourceName(resourceName1)]
- as.False(ok)
- val, ok := capacity[v1.ResourceName(resourceName2)]
- as.True(ok)
- as.Equal(int64(3), val.Value())
- _, ok = testManager.healthyDevices[resourceName1]
- as.False(ok)
- _, ok = testManager.unhealthyDevices[resourceName1]
- as.False(ok)
- _, ok = testManager.endpoints[resourceName1]
- as.False(ok)
- as.Equal(1, len(testManager.endpoints))
- // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
- // preStartContainer calls return errors.
- e2.stop()
- as.False(e2.stopTime.IsZero())
- _, err = e2.allocate([]string{"Device1"})
- reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
- _, err = e2.preStartContainer([]string{"Device1"})
- reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
- // Marks resourceName2 unhealthy and verifies its capacity/allocatable are
- // correctly updated.
- testManager.markResourceUnhealthy(resourceName2)
- capacity, allocatable, removed = testManager.GetCapacity()
- val, ok = capacity[v1.ResourceName(resourceName2)]
- as.True(ok)
- as.Equal(int64(3), val.Value())
- val, ok = allocatable[v1.ResourceName(resourceName2)]
- as.True(ok)
- as.Equal(int64(0), val.Value())
- as.Empty(removed)
- // Writes and re-reads checkpoints. Verifies we create a stopped endpoint
- // for resourceName2, its capacity is set to zero, and we still consider
- // it as a DevicePlugin resource. This makes sure any pod that was scheduled
- // during the time of propagating capacity change to the scheduler will be
- // properly rejected instead of being incorrectly started.
- err = testManager.writeCheckpoint()
- as.Nil(err)
- testManager.healthyDevices = make(map[string]sets.String)
- testManager.unhealthyDevices = make(map[string]sets.String)
- err = testManager.readCheckpoint()
- as.Nil(err)
- as.Equal(1, len(testManager.endpoints))
- _, ok = testManager.endpoints[resourceName2]
- as.True(ok)
- capacity, allocatable, removed = testManager.GetCapacity()
- val, ok = capacity[v1.ResourceName(resourceName2)]
- as.True(ok)
- as.Equal(int64(0), val.Value())
- as.Empty(removed)
- as.True(testManager.isDevicePluginResource(resourceName2))
- }
- func constructDevices(devices []string) sets.String {
- ret := sets.NewString()
- for _, dev := range devices {
- ret.Insert(dev)
- }
- return ret
- }
- func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.ContainerAllocateResponse {
- resp := &pluginapi.ContainerAllocateResponse{}
- for k, v := range devices {
- resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
- HostPath: k,
- ContainerPath: v,
- Permissions: "mrw",
- })
- }
- for k, v := range mounts {
- resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
- ContainerPath: k,
- HostPath: v,
- ReadOnly: true,
- })
- }
- resp.Envs = make(map[string]string)
- for k, v := range envs {
- resp.Envs[k] = v
- }
- return resp
- }
- func TestCheckpoint(t *testing.T) {
- resourceName1 := "domain1.com/resource1"
- resourceName2 := "domain2.com/resource2"
- as := assert.New(t)
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
- as.Nil(err)
- testManager := &ManagerImpl{
- endpoints: make(map[string]endpointInfo),
- healthyDevices: make(map[string]sets.String),
- unhealthyDevices: make(map[string]sets.String),
- allocatedDevices: make(map[string]sets.String),
- podDevices: make(podDevices),
- checkpointManager: ckm,
- }
- testManager.podDevices.insert("pod1", "con1", resourceName1,
- constructDevices([]string{"dev1", "dev2"}),
- constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
- map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
- testManager.podDevices.insert("pod1", "con1", resourceName2,
- constructDevices([]string{"dev1", "dev2"}),
- constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
- map[string]string{"/home/r2lib1": "/usr/r2lib1"},
- map[string]string{"r2devices": "dev1 dev2"}))
- testManager.podDevices.insert("pod1", "con2", resourceName1,
- constructDevices([]string{"dev3"}),
- constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
- map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
- testManager.podDevices.insert("pod2", "con1", resourceName1,
- constructDevices([]string{"dev4"}),
- constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
- map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
- testManager.healthyDevices[resourceName1] = sets.NewString()
- testManager.healthyDevices[resourceName1].Insert("dev1")
- testManager.healthyDevices[resourceName1].Insert("dev2")
- testManager.healthyDevices[resourceName1].Insert("dev3")
- testManager.healthyDevices[resourceName1].Insert("dev4")
- testManager.healthyDevices[resourceName1].Insert("dev5")
- testManager.healthyDevices[resourceName2] = sets.NewString()
- testManager.healthyDevices[resourceName2].Insert("dev1")
- testManager.healthyDevices[resourceName2].Insert("dev2")
- expectedPodDevices := testManager.podDevices
- expectedAllocatedDevices := testManager.podDevices.devices()
- expectedAllDevices := testManager.healthyDevices
- err = testManager.writeCheckpoint()
- as.Nil(err)
- testManager.podDevices = make(podDevices)
- err = testManager.readCheckpoint()
- as.Nil(err)
- as.Equal(len(expectedPodDevices), len(testManager.podDevices))
- for podUID, containerDevices := range expectedPodDevices {
- for conName, resources := range containerDevices {
- for resource := range resources {
- expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
- testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
- as.True(reflect.DeepEqual(expDevices, testDevices))
- opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
- opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
- as.Equal(len(opts1.Envs), len(opts2.Envs))
- as.Equal(len(opts1.Mounts), len(opts2.Mounts))
- as.Equal(len(opts1.Devices), len(opts2.Devices))
- }
- }
- }
- as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
- as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
- }
- type activePodsStub struct {
- activePods []*v1.Pod
- }
- func (a *activePodsStub) getActivePods() []*v1.Pod {
- return a.activePods
- }
- func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
- a.activePods = newPods
- }
- type MockEndpoint struct {
- allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
- initChan chan []string
- }
- func (m *MockEndpoint) stop() {}
- func (m *MockEndpoint) run() {}
- func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {}
- func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
- m.initChan <- devs
- return &pluginapi.PreStartContainerResponse{}, nil
- }
- func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
- if m.allocateFunc != nil {
- return m.allocateFunc(devs)
- }
- return nil, nil
- }
- func (m *MockEndpoint) isStopped() bool { return false }
- func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
- func makePod(limits v1.ResourceList) *v1.Pod {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- UID: uuid.NewUUID(),
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Resources: v1.ResourceRequirements{
- Limits: limits,
- },
- },
- },
- },
- }
- }
- func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) {
- monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
- ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
- if err != nil {
- return nil, err
- }
- testManager := &ManagerImpl{
- socketdir: tmpDir,
- callback: monitorCallback,
- healthyDevices: make(map[string]sets.String),
- unhealthyDevices: make(map[string]sets.String),
- allocatedDevices: make(map[string]sets.String),
- endpoints: make(map[string]endpointInfo),
- podDevices: make(podDevices),
- activePods: activePods,
- sourcesReady: &sourcesReadyStub{},
- checkpointManager: ckm,
- }
- for _, res := range testRes {
- testManager.healthyDevices[res.resourceName] = sets.NewString()
- for _, dev := range res.devs {
- testManager.healthyDevices[res.resourceName].Insert(dev)
- }
- if res.resourceName == "domain1.com/resource1" {
- testManager.endpoints[res.resourceName] = endpointInfo{
- e: &MockEndpoint{allocateFunc: allocateStubFunc()},
- opts: nil,
- }
- }
- if res.resourceName == "domain2.com/resource2" {
- testManager.endpoints[res.resourceName] = endpointInfo{
- e: &MockEndpoint{
- allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
- resp := new(pluginapi.ContainerAllocateResponse)
- resp.Envs = make(map[string]string)
- for _, dev := range devs {
- switch dev {
- case "dev3":
- resp.Envs["key2"] = "val2"
- case "dev4":
- resp.Envs["key2"] = "val3"
- }
- }
- resps := new(pluginapi.AllocateResponse)
- resps.ContainerResponses = append(resps.ContainerResponses, resp)
- return resps, nil
- },
- },
- opts: nil,
- }
- }
- }
- return testManager, nil
- }
- func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo {
- cachedNode := &v1.Node{
- Status: v1.NodeStatus{
- Allocatable: allocatable,
- },
- }
- nodeInfo := &schedulernodeinfo.NodeInfo{}
- nodeInfo.SetNode(cachedNode)
- return nodeInfo
- }
- type TestResource struct {
- resourceName string
- resourceQuantity resource.Quantity
- devs []string
- }
- func TestPodContainerDeviceAllocation(t *testing.T) {
- res1 := TestResource{
- resourceName: "domain1.com/resource1",
- resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
- devs: []string{"dev1", "dev2"},
- }
- res2 := TestResource{
- resourceName: "domain2.com/resource2",
- resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
- devs: []string{"dev3", "dev4"},
- }
- testResources := make([]TestResource, 2)
- testResources = append(testResources, res1)
- testResources = append(testResources, res2)
- as := require.New(t)
- podsStub := activePodsStub{
- activePods: []*v1.Pod{},
- }
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- defer os.RemoveAll(tmpDir)
- nodeInfo := getTestNodeInfo(v1.ResourceList{})
- testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
- as.Nil(err)
- testPods := []*v1.Pod{
- makePod(v1.ResourceList{
- v1.ResourceName(res1.resourceName): res1.resourceQuantity,
- v1.ResourceName("cpu"): res1.resourceQuantity,
- v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
- makePod(v1.ResourceList{
- v1.ResourceName(res1.resourceName): res2.resourceQuantity}),
- makePod(v1.ResourceList{
- v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
- }
- testCases := []struct {
- description string
- testPod *v1.Pod
- expectedContainerOptsLen []int
- expectedAllocatedResName1 int
- expectedAllocatedResName2 int
- expErr error
- }{
- {
- description: "Successful allocation of two Res1 resources and one Res2 resource",
- testPod: testPods[0],
- expectedContainerOptsLen: []int{3, 2, 2},
- expectedAllocatedResName1: 2,
- expectedAllocatedResName2: 1,
- expErr: nil,
- },
- {
- description: "Requesting to create a pod without enough resources should fail",
- testPod: testPods[1],
- expectedContainerOptsLen: nil,
- expectedAllocatedResName1: 2,
- expectedAllocatedResName2: 1,
- expErr: fmt.Errorf("requested number of devices unavailable for domain1.com/resource1. Requested: 1, Available: 0"),
- },
- {
- description: "Successful allocation of all available Res1 resources and Res2 resources",
- testPod: testPods[2],
- expectedContainerOptsLen: []int{0, 0, 1},
- expectedAllocatedResName1: 2,
- expectedAllocatedResName2: 2,
- expErr: nil,
- },
- }
- activePods := []*v1.Pod{}
- for _, testCase := range testCases {
- pod := testCase.testPod
- activePods = append(activePods, pod)
- podsStub.updateActivePods(activePods)
- err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
- if !reflect.DeepEqual(err, testCase.expErr) {
- t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
- testCase.description, testCase.expErr, err)
- }
- runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
- as.Nil(err)
- if testCase.expectedContainerOptsLen == nil {
- as.Nil(runContainerOpts)
- } else {
- as.Equal(len(runContainerOpts.Devices), testCase.expectedContainerOptsLen[0])
- as.Equal(len(runContainerOpts.Mounts), testCase.expectedContainerOptsLen[1])
- as.Equal(len(runContainerOpts.Envs), testCase.expectedContainerOptsLen[2])
- }
- as.Equal(testCase.expectedAllocatedResName1, testManager.allocatedDevices[res1.resourceName].Len())
- as.Equal(testCase.expectedAllocatedResName2, testManager.allocatedDevices[res2.resourceName].Len())
- }
- }
- func TestInitContainerDeviceAllocation(t *testing.T) {
- // Requesting to create a pod that requests resourceName1 in init containers and normal containers
- // should succeed with devices allocated to init containers reallocated to normal containers.
- res1 := TestResource{
- resourceName: "domain1.com/resource1",
- resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
- devs: []string{"dev1", "dev2"},
- }
- res2 := TestResource{
- resourceName: "domain2.com/resource2",
- resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
- devs: []string{"dev3", "dev4"},
- }
- testResources := make([]TestResource, 2)
- testResources = append(testResources, res1)
- testResources = append(testResources, res2)
- as := require.New(t)
- podsStub := activePodsStub{
- activePods: []*v1.Pod{},
- }
- nodeInfo := getTestNodeInfo(v1.ResourceList{})
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- defer os.RemoveAll(tmpDir)
- testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
- as.Nil(err)
- podWithPluginResourcesInInitContainers := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- UID: uuid.NewUUID(),
- },
- Spec: v1.PodSpec{
- InitContainers: []v1.Container{
- {
- Name: string(uuid.NewUUID()),
- Resources: v1.ResourceRequirements{
- Limits: v1.ResourceList{
- v1.ResourceName(res1.resourceName): res2.resourceQuantity,
- },
- },
- },
- {
- Name: string(uuid.NewUUID()),
- Resources: v1.ResourceRequirements{
- Limits: v1.ResourceList{
- v1.ResourceName(res1.resourceName): res1.resourceQuantity,
- },
- },
- },
- },
- Containers: []v1.Container{
- {
- Name: string(uuid.NewUUID()),
- Resources: v1.ResourceRequirements{
- Limits: v1.ResourceList{
- v1.ResourceName(res1.resourceName): res2.resourceQuantity,
- v1.ResourceName(res2.resourceName): res2.resourceQuantity,
- },
- },
- },
- {
- Name: string(uuid.NewUUID()),
- Resources: v1.ResourceRequirements{
- Limits: v1.ResourceList{
- v1.ResourceName(res1.resourceName): res2.resourceQuantity,
- v1.ResourceName(res2.resourceName): res2.resourceQuantity,
- },
- },
- },
- },
- },
- }
- podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
- err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers})
- as.Nil(err)
- podUID := string(podWithPluginResourcesInInitContainers.UID)
- initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
- initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name
- normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name
- normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name
- initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, res1.resourceName)
- initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, res1.resourceName)
- normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
- normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
- as.Equal(1, initCont1Devices.Len())
- as.Equal(2, initCont2Devices.Len())
- as.Equal(1, normalCont1Devices.Len())
- as.Equal(1, normalCont2Devices.Len())
- as.True(initCont2Devices.IsSuperset(initCont1Devices))
- as.True(initCont2Devices.IsSuperset(normalCont1Devices))
- as.True(initCont2Devices.IsSuperset(normalCont2Devices))
- as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
- }
- func TestSanitizeNodeAllocatable(t *testing.T) {
- resourceName1 := "domain1.com/resource1"
- devID1 := "dev1"
- resourceName2 := "domain2.com/resource2"
- devID2 := "dev2"
- as := assert.New(t)
- monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
- as.Nil(err)
- testManager := &ManagerImpl{
- callback: monitorCallback,
- allocatedDevices: make(map[string]sets.String),
- healthyDevices: make(map[string]sets.String),
- podDevices: make(podDevices),
- checkpointManager: ckm,
- }
- // require one of resource1 and one of resource2
- testManager.allocatedDevices[resourceName1] = sets.NewString()
- testManager.allocatedDevices[resourceName1].Insert(devID1)
- testManager.allocatedDevices[resourceName2] = sets.NewString()
- testManager.allocatedDevices[resourceName2].Insert(devID2)
- cachedNode := &v1.Node{
- Status: v1.NodeStatus{
- Allocatable: v1.ResourceList{
- // has no resource1 and two of resource2
- v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
- },
- },
- }
- nodeInfo := &schedulernodeinfo.NodeInfo{}
- nodeInfo.SetNode(cachedNode)
- testManager.sanitizeNodeAllocatable(nodeInfo)
- allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
- // allocatable in nodeInfo is less than needed, should update
- as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
- // allocatable in nodeInfo is more than needed, should skip updating
- as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
- }
- func TestDevicePreStartContainer(t *testing.T) {
- // Ensures that if device manager is indicated to invoke `PreStartContainer` RPC
- // by device plugin, then device manager invokes PreStartContainer at endpoint interface.
- // Also verifies that final allocation of mounts, envs etc is same as expected.
- res1 := TestResource{
- resourceName: "domain1.com/resource1",
- resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
- devs: []string{"dev1", "dev2"},
- }
- as := require.New(t)
- podsStub := activePodsStub{
- activePods: []*v1.Pod{},
- }
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- defer os.RemoveAll(tmpDir)
- nodeInfo := getTestNodeInfo(v1.ResourceList{})
- testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
- as.Nil(err)
- ch := make(chan []string, 1)
- testManager.endpoints[res1.resourceName] = endpointInfo{
- e: &MockEndpoint{
- initChan: ch,
- allocateFunc: allocateStubFunc(),
- },
- opts: &pluginapi.DevicePluginOptions{PreStartRequired: true},
- }
- pod := makePod(v1.ResourceList{
- v1.ResourceName(res1.resourceName): res1.resourceQuantity})
- activePods := []*v1.Pod{}
- activePods = append(activePods, pod)
- podsStub.updateActivePods(activePods)
- err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
- as.Nil(err)
- runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
- as.Nil(err)
- var initializedDevs []string
- select {
- case <-time.After(time.Second):
- t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
- case initializedDevs = <-ch:
- break
- }
- as.Contains(initializedDevs, "dev1")
- as.Contains(initializedDevs, "dev2")
- as.Equal(len(initializedDevs), len(res1.devs))
- expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"})
- as.Nil(err)
- as.Equal(1, len(expectedResps.ContainerResponses))
- expectedResp := expectedResps.ContainerResponses[0]
- as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
- as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
- as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
- }
- func TestResetExtendedResource(t *testing.T) {
- as := assert.New(t)
- tmpDir, err := ioutil.TempDir("", "checkpoint")
- as.Nil(err)
- ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
- as.Nil(err)
- testManager := &ManagerImpl{
- endpoints: make(map[string]endpointInfo),
- healthyDevices: make(map[string]sets.String),
- unhealthyDevices: make(map[string]sets.String),
- allocatedDevices: make(map[string]sets.String),
- podDevices: make(podDevices),
- checkpointManager: ckm,
- }
- extendedResourceName := "domain.com/resource"
- testManager.podDevices.insert("pod", "con", extendedResourceName,
- constructDevices([]string{"dev1"}),
- constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"},
- map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{}))
- testManager.healthyDevices[extendedResourceName] = sets.NewString()
- testManager.healthyDevices[extendedResourceName].Insert("dev1")
- // checkpoint is present, indicating node hasn't been recreated
- err = testManager.writeCheckpoint()
- as.Nil(err)
- as.False(testManager.ShouldResetExtendedResourceCapacity())
- // checkpoint is absent, representing node recreation
- ckpts, err := ckm.ListCheckpoints()
- as.Nil(err)
- for _, ckpt := range ckpts {
- err = ckm.RemoveCheckpoint(ckpt)
- as.Nil(err)
- }
- as.True(testManager.ShouldResetExtendedResourceCapacity())
- }
- func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
- return func(devs []string) (*pluginapi.AllocateResponse, error) {
- resp := new(pluginapi.ContainerAllocateResponse)
- resp.Envs = make(map[string]string)
- for _, dev := range devs {
- switch dev {
- case "dev1":
- resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
- ContainerPath: "/dev/aaa",
- HostPath: "/dev/aaa",
- Permissions: "mrw",
- })
- resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
- ContainerPath: "/dev/bbb",
- HostPath: "/dev/bbb",
- Permissions: "mrw",
- })
- resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
- ContainerPath: "/container_dir1/file1",
- HostPath: "host_dir1/file1",
- ReadOnly: true,
- })
- case "dev2":
- resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
- ContainerPath: "/dev/ccc",
- HostPath: "/dev/ccc",
- Permissions: "mrw",
- })
- resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
- ContainerPath: "/container_dir1/file2",
- HostPath: "host_dir1/file2",
- ReadOnly: true,
- })
- resp.Envs["key1"] = "val1"
- }
- }
- resps := new(pluginapi.AllocateResponse)
- resps.ContainerResponses = append(resps.ContainerResponses, resp)
- return resps, nil
- }
- }
|