123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package reconciler
- import (
- "fmt"
- "io/ioutil"
- "os"
- "testing"
- "time"
- "github.com/stretchr/testify/require"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/record"
- registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
- )
- const (
- // reconcilerLoopSleepDuration is the amount of time the reconciler loop
- // waits between successive executions
- reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
- )
- var (
- socketDir string
- supportedVersions = []string{"v1beta1", "v1beta2"}
- )
- func init() {
- d, err := ioutil.TempDir("", "reconciler_test")
- if err != nil {
- panic(fmt.Sprintf("Could not create a temp directory: %s", d))
- }
- socketDir = d
- }
- func cleanup(t *testing.T) {
- require.NoError(t, os.RemoveAll(socketDir))
- os.MkdirAll(socketDir, 0755)
- }
- func runReconciler(reconciler Reconciler) {
- go reconciler.Run(wait.NeverStop)
- }
- func waitForRegistration(
- t *testing.T,
- socketPath string,
- previousTimestamp time.Time,
- asw cache.ActualStateOfWorld) {
- err := retryWithExponentialBackOff(
- time.Duration(500*time.Millisecond),
- func() (bool, error) {
- registeredPlugins := asw.GetRegisteredPlugins()
- for _, plugin := range registeredPlugins {
- if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
- return true, nil
- }
- }
- return false, nil
- },
- )
- if err != nil {
- t.Fatalf("Timed out waiting for plugin to be registered:\n%s.", socketPath)
- }
- }
- func waitForUnregistration(
- t *testing.T,
- socketPath string,
- asw cache.ActualStateOfWorld) {
- err := retryWithExponentialBackOff(
- time.Duration(500*time.Millisecond),
- func() (bool, error) {
- registeredPlugins := asw.GetRegisteredPlugins()
- for _, plugin := range registeredPlugins {
- if plugin.SocketPath == socketPath {
- return false, nil
- }
- }
- return true, nil
- },
- )
- if err != nil {
- t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
- }
- }
- func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
- backoff := wait.Backoff{
- Duration: initialDuration,
- Factor: 3,
- Jitter: 0,
- Steps: 6,
- }
- return wait.ExponentialBackoff(backoff, fn)
- }
- type DummyImpl struct{}
- func NewDummyImpl() *DummyImpl {
- return &DummyImpl{}
- }
- // ValidatePlugin is a dummy implementation
- func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
- return nil
- }
- // RegisterPlugin is a dummy implementation
- func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
- return nil
- }
- // DeRegisterPlugin is a dummy implementation
- func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
- }
- // Calls Run()
- // Verifies that asw and dsw have no plugins
- func Test_Run_Positive_DoNothing(t *testing.T) {
- defer cleanup(t)
- dsw := cache.NewDesiredStateOfWorld()
- asw := cache.NewActualStateOfWorld()
- fakeRecorder := &record.FakeRecorder{}
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- fakeRecorder,
- ))
- reconciler := NewReconciler(
- oex,
- reconcilerLoopSleepDuration,
- dsw,
- asw,
- )
- // Act
- runReconciler(reconciler)
- // Get dsw and asw plugins; they should both be empty
- if len(asw.GetRegisteredPlugins()) != 0 {
- t.Fatalf("Test_Run_Positive_DoNothing: actual state of world should be empty but it's not")
- }
- if len(dsw.GetPluginsToRegister()) != 0 {
- t.Fatalf("Test_Run_Positive_DoNothing: desired state of world should be empty but it's not")
- }
- }
- // Populates desiredStateOfWorld cache with one plugin.
- // Calls Run()
- // Verifies the actual state of world contains that plugin
- func Test_Run_Positive_Register(t *testing.T) {
- defer cleanup(t)
- dsw := cache.NewDesiredStateOfWorld()
- asw := cache.NewActualStateOfWorld()
- di := NewDummyImpl()
- fakeRecorder := &record.FakeRecorder{}
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- fakeRecorder,
- ))
- reconciler := NewReconciler(
- oex,
- reconcilerLoopSleepDuration,
- dsw,
- asw,
- )
- reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
- // Start the reconciler to fill ASW.
- stopChan := make(chan struct{})
- defer close(stopChan)
- go reconciler.Run(stopChan)
- socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
- pluginName := fmt.Sprintf("example-plugin")
- p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
- require.NoError(t, p.Serve("v1beta1", "v1beta2"))
- timestampBeforeRegistration := time.Now()
- dsw.AddOrUpdatePlugin(socketPath)
- waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
- // Get asw plugins; it should contain the added plugin
- aswPlugins := asw.GetRegisteredPlugins()
- if len(aswPlugins) != 1 {
- t.Fatalf("Test_Run_Positive_Register: actual state of world length should be one but it's %d", len(aswPlugins))
- }
- if aswPlugins[0].SocketPath != socketPath {
- t.Fatalf("Test_Run_Positive_Register: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
- }
- }
- // Populates desiredStateOfWorld cache with one plugin
- // Calls Run()
- // Verifies there is one plugin now in actual state of world.
- // Deletes plugin from desired state of world.
- // Verifies that plugin no longer exists in actual state of world.
- func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
- defer cleanup(t)
- dsw := cache.NewDesiredStateOfWorld()
- asw := cache.NewActualStateOfWorld()
- di := NewDummyImpl()
- fakeRecorder := &record.FakeRecorder{}
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- fakeRecorder,
- ))
- reconciler := NewReconciler(
- oex,
- reconcilerLoopSleepDuration,
- dsw,
- asw,
- )
- reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
- // Start the reconciler to fill ASW.
- stopChan := make(chan struct{})
- defer close(stopChan)
- go reconciler.Run(stopChan)
- socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
- pluginName := fmt.Sprintf("example-plugin")
- p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
- require.NoError(t, p.Serve("v1beta1", "v1beta2"))
- timestampBeforeRegistration := time.Now()
- dsw.AddOrUpdatePlugin(socketPath)
- waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
- // Get asw plugins; it should contain the added plugin
- aswPlugins := asw.GetRegisteredPlugins()
- if len(aswPlugins) != 1 {
- t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
- }
- if aswPlugins[0].SocketPath != socketPath {
- t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
- }
- dsw.RemovePlugin(socketPath)
- waitForUnregistration(t, socketPath, asw)
- // Get asw plugins; it should no longer contain the added plugin
- aswPlugins = asw.GetRegisteredPlugins()
- if len(aswPlugins) != 0 {
- t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be zero but it's %d", len(aswPlugins))
- }
- }
- // Populates desiredStateOfWorld cache with one plugin
- // Calls Run()
- // Then update the timestamp of the plugin
- // Verifies that the plugin is reregistered.
- // Verifies the plugin with updated timestamp now in actual state of world.
- func Test_Run_Positive_ReRegister(t *testing.T) {
- defer cleanup(t)
- dsw := cache.NewDesiredStateOfWorld()
- asw := cache.NewActualStateOfWorld()
- di := NewDummyImpl()
- fakeRecorder := &record.FakeRecorder{}
- oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
- fakeRecorder,
- ))
- reconciler := NewReconciler(
- oex,
- reconcilerLoopSleepDuration,
- dsw,
- asw,
- )
- reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
- // Start the reconciler to fill ASW.
- stopChan := make(chan struct{})
- defer close(stopChan)
- go reconciler.Run(stopChan)
- socketPath := fmt.Sprintf("%s/plugin2.sock", socketDir)
- pluginName := fmt.Sprintf("example-plugin2")
- p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
- require.NoError(t, p.Serve("v1beta1", "v1beta2"))
- timestampBeforeRegistration := time.Now()
- dsw.AddOrUpdatePlugin(socketPath)
- waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
- timeStampBeforeReRegistration := time.Now()
- // Add the plugin again to update the timestamp
- dsw.AddOrUpdatePlugin(socketPath)
- // This should trigger a deregistration and a regitration
- // The process of unregistration and reregistration can happen so fast that
- // we are not able to catch it with waitForUnregistration, so here we are checking
- // the plugin has an updated timestamp.
- waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
- // Get asw plugins; it should contain the added plugin
- aswPlugins := asw.GetRegisteredPlugins()
- if len(aswPlugins) != 1 {
- t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
- }
- if aswPlugins[0].SocketPath != socketPath {
- t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
- }
- }
|