plugin_manager_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginmanager
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "os"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/stretchr/testify/require"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/client-go/tools/record"
  25. pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  26. registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  27. "k8s.io/kubernetes/pkg/kubelet/config"
  28. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  29. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
  30. )
  31. const (
  32. testHostname = "test-hostname"
  33. )
  34. var (
  35. socketDir string
  36. deprecatedSocketDir string
  37. supportedVersions = []string{"v1beta1", "v1beta2"}
  38. )
  39. // fake cache.PluginHandler
  40. type PluginHandler interface {
  41. ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error
  42. RegisterPlugin(pluginName, endpoint string, versions []string) error
  43. DeRegisterPlugin(pluginName string)
  44. }
  45. type fakePluginHandler struct {
  46. validatePluginCalled bool
  47. registerPluginCalled bool
  48. deregisterPluginCalled bool
  49. sync.RWMutex
  50. }
  51. func newFakePluginHandler() *fakePluginHandler {
  52. return &fakePluginHandler{
  53. validatePluginCalled: false,
  54. registerPluginCalled: false,
  55. deregisterPluginCalled: false,
  56. }
  57. }
  58. // ValidatePlugin is a fake method
  59. func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
  60. f.Lock()
  61. defer f.Unlock()
  62. f.validatePluginCalled = true
  63. return nil
  64. }
  65. // RegisterPlugin is a fake method
  66. func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
  67. f.Lock()
  68. defer f.Unlock()
  69. f.registerPluginCalled = true
  70. return nil
  71. }
  72. // DeRegisterPlugin is a fake method
  73. func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
  74. f.Lock()
  75. defer f.Unlock()
  76. f.deregisterPluginCalled = true
  77. return
  78. }
  79. func init() {
  80. d, err := ioutil.TempDir("", "plugin_manager_test")
  81. if err != nil {
  82. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  83. }
  84. d2, err := ioutil.TempDir("", "deprecateddir_plugin_manager_test")
  85. if err != nil {
  86. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  87. }
  88. socketDir = d
  89. deprecatedSocketDir = d2
  90. }
  91. func cleanup(t *testing.T) {
  92. require.NoError(t, os.RemoveAll(socketDir))
  93. require.NoError(t, os.RemoveAll(deprecatedSocketDir))
  94. os.MkdirAll(socketDir, 0755)
  95. os.MkdirAll(deprecatedSocketDir, 0755)
  96. }
  97. func newWatcher(
  98. t *testing.T, testDeprecatedDir bool,
  99. desiredStateOfWorldCache cache.DesiredStateOfWorld) *pluginwatcher.Watcher {
  100. depSocketDir := ""
  101. if testDeprecatedDir {
  102. depSocketDir = deprecatedSocketDir
  103. }
  104. w := pluginwatcher.NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
  105. require.NoError(t, w.Start(wait.NeverStop))
  106. return w
  107. }
  108. func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
  109. err := retryWithExponentialBackOff(
  110. time.Duration(500*time.Millisecond),
  111. func() (bool, error) {
  112. fakePluginHandler.Lock()
  113. defer fakePluginHandler.Unlock()
  114. if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
  115. return true, nil
  116. }
  117. return false, nil
  118. },
  119. )
  120. if err != nil {
  121. t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
  122. }
  123. }
  124. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  125. backoff := wait.Backoff{
  126. Duration: initialDuration,
  127. Factor: 3,
  128. Jitter: 0,
  129. Steps: 6,
  130. }
  131. return wait.ExponentialBackoff(backoff, fn)
  132. }
  133. func TestPluginRegistration(t *testing.T) {
  134. defer cleanup(t)
  135. pluginManager := newTestPluginManager(socketDir, deprecatedSocketDir)
  136. // Start the plugin manager
  137. stopChan := make(chan struct{})
  138. defer close(stopChan)
  139. go func() {
  140. sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
  141. pluginManager.Run(sourcesReady, stopChan)
  142. }()
  143. // Add handler for device plugin
  144. fakeHandler := newFakePluginHandler()
  145. pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, fakeHandler)
  146. // Add a new plugin
  147. socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
  148. pluginName := "example-plugin"
  149. p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  150. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  151. // Verify that the plugin is registered
  152. waitForRegistration(t, fakeHandler)
  153. }
  154. func newTestPluginManager(
  155. sockDir string,
  156. deprecatedSockDir string) PluginManager {
  157. pm := NewPluginManager(
  158. sockDir,
  159. deprecatedSockDir,
  160. &record.FakeRecorder{},
  161. )
  162. return pm
  163. }