plugin_manager_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginmanager
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "os"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/stretchr/testify/require"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/client-go/tools/record"
  25. registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
  26. "k8s.io/kubernetes/pkg/kubelet/config"
  27. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
  28. )
  29. var (
  30. socketDir string
  31. supportedVersions = []string{"v1beta1", "v1beta2"}
  32. )
  33. type fakePluginHandler struct {
  34. validatePluginCalled bool
  35. registerPluginCalled bool
  36. deregisterPluginCalled bool
  37. sync.RWMutex
  38. }
  39. func newFakePluginHandler() *fakePluginHandler {
  40. return &fakePluginHandler{
  41. validatePluginCalled: false,
  42. registerPluginCalled: false,
  43. deregisterPluginCalled: false,
  44. }
  45. }
  46. // ValidatePlugin is a fake method
  47. func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
  48. f.Lock()
  49. defer f.Unlock()
  50. f.validatePluginCalled = true
  51. return nil
  52. }
  53. // RegisterPlugin is a fake method
  54. func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
  55. f.Lock()
  56. defer f.Unlock()
  57. f.registerPluginCalled = true
  58. return nil
  59. }
  60. // DeRegisterPlugin is a fake method
  61. func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
  62. f.Lock()
  63. defer f.Unlock()
  64. f.deregisterPluginCalled = true
  65. }
  66. func init() {
  67. d, err := ioutil.TempDir("", "plugin_manager_test")
  68. if err != nil {
  69. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  70. }
  71. socketDir = d
  72. }
  73. func cleanup(t *testing.T) {
  74. require.NoError(t, os.RemoveAll(socketDir))
  75. os.MkdirAll(socketDir, 0755)
  76. }
  77. func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
  78. err := retryWithExponentialBackOff(
  79. time.Duration(500*time.Millisecond),
  80. func() (bool, error) {
  81. fakePluginHandler.Lock()
  82. defer fakePluginHandler.Unlock()
  83. if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
  84. return true, nil
  85. }
  86. return false, nil
  87. },
  88. )
  89. if err != nil {
  90. t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
  91. }
  92. }
  93. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  94. backoff := wait.Backoff{
  95. Duration: initialDuration,
  96. Factor: 3,
  97. Jitter: 0,
  98. Steps: 6,
  99. }
  100. return wait.ExponentialBackoff(backoff, fn)
  101. }
  102. func TestPluginRegistration(t *testing.T) {
  103. defer cleanup(t)
  104. pluginManager := newTestPluginManager(socketDir)
  105. // Start the plugin manager
  106. stopChan := make(chan struct{})
  107. defer close(stopChan)
  108. go func() {
  109. sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
  110. pluginManager.Run(sourcesReady, stopChan)
  111. }()
  112. // Add handler for device plugin
  113. fakeHandler := newFakePluginHandler()
  114. pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
  115. // Add a new plugin
  116. socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
  117. pluginName := "example-plugin"
  118. p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  119. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  120. // Verify that the plugin is registered
  121. waitForRegistration(t, fakeHandler)
  122. }
  123. func newTestPluginManager(
  124. sockDir string) PluginManager {
  125. pm := NewPluginManager(
  126. sockDir,
  127. &record.FakeRecorder{},
  128. )
  129. return pm
  130. }