reconciler_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconciler
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "os"
  18. "testing"
  19. "time"
  20. "github.com/stretchr/testify/require"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/client-go/tools/record"
  23. pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  24. registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  25. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  26. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
  27. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
  28. )
  29. const (
  30. // reconcilerLoopSleepDuration is the amount of time the reconciler loop
  31. // waits between successive executions
  32. reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
  33. )
  34. var (
  35. socketDir string
  36. supportedVersions = []string{"v1beta1", "v1beta2"}
  37. )
  38. func init() {
  39. d, err := ioutil.TempDir("", "reconciler_test")
  40. if err != nil {
  41. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  42. }
  43. socketDir = d
  44. }
  45. func cleanup(t *testing.T) {
  46. require.NoError(t, os.RemoveAll(socketDir))
  47. os.MkdirAll(socketDir, 0755)
  48. }
  49. func runReconciler(reconciler Reconciler) {
  50. go reconciler.Run(wait.NeverStop)
  51. }
  52. func waitForRegistration(
  53. t *testing.T,
  54. socketPath string,
  55. previousTimestamp time.Time,
  56. asw cache.ActualStateOfWorld) {
  57. err := retryWithExponentialBackOff(
  58. time.Duration(500*time.Millisecond),
  59. func() (bool, error) {
  60. registeredPlugins := asw.GetRegisteredPlugins()
  61. for _, plugin := range registeredPlugins {
  62. if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
  63. return true, nil
  64. }
  65. }
  66. return false, nil
  67. },
  68. )
  69. if err != nil {
  70. t.Fatalf("Timed out waiting for plugin to be registered:\n%s.", socketPath)
  71. }
  72. }
  73. func waitForUnregistration(
  74. t *testing.T,
  75. socketPath string,
  76. asw cache.ActualStateOfWorld) {
  77. err := retryWithExponentialBackOff(
  78. time.Duration(500*time.Millisecond),
  79. func() (bool, error) {
  80. registeredPlugins := asw.GetRegisteredPlugins()
  81. for _, plugin := range registeredPlugins {
  82. if plugin.SocketPath == socketPath {
  83. return false, nil
  84. }
  85. }
  86. return true, nil
  87. },
  88. )
  89. if err != nil {
  90. t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
  91. }
  92. }
  93. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  94. backoff := wait.Backoff{
  95. Duration: initialDuration,
  96. Factor: 3,
  97. Jitter: 0,
  98. Steps: 6,
  99. }
  100. return wait.ExponentialBackoff(backoff, fn)
  101. }
  102. type DummyImpl struct {
  103. dummy string
  104. }
  105. func NewDummyImpl() *DummyImpl {
  106. return &DummyImpl{}
  107. }
  108. // ValidatePlugin is a dummy implementation
  109. func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
  110. return nil
  111. }
  112. // RegisterPlugin is a dummy implementation
  113. func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
  114. return nil
  115. }
  116. // DeRegisterPlugin is a dummy implementation
  117. func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
  118. }
  119. // Calls Run()
  120. // Verifies that asw and dsw have no plugins
  121. func Test_Run_Positive_DoNothing(t *testing.T) {
  122. defer cleanup(t)
  123. dsw := cache.NewDesiredStateOfWorld()
  124. asw := cache.NewActualStateOfWorld()
  125. fakeRecorder := &record.FakeRecorder{}
  126. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  127. fakeRecorder,
  128. ))
  129. reconciler := NewReconciler(
  130. oex,
  131. reconcilerLoopSleepDuration,
  132. dsw,
  133. asw,
  134. )
  135. // Act
  136. runReconciler(reconciler)
  137. // Get dsw and asw plugins; they should both be empty
  138. if len(asw.GetRegisteredPlugins()) != 0 {
  139. t.Fatalf("Test_Run_Positive_DoNothing: actual state of world should be empty but it's not")
  140. }
  141. if len(dsw.GetPluginsToRegister()) != 0 {
  142. t.Fatalf("Test_Run_Positive_DoNothing: desired state of world should be empty but it's not")
  143. }
  144. }
  145. // Populates desiredStateOfWorld cache with one plugin.
  146. // Calls Run()
  147. // Verifies the actual state of world contains that plugin
  148. func Test_Run_Positive_Register(t *testing.T) {
  149. defer cleanup(t)
  150. dsw := cache.NewDesiredStateOfWorld()
  151. asw := cache.NewActualStateOfWorld()
  152. di := NewDummyImpl()
  153. fakeRecorder := &record.FakeRecorder{}
  154. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  155. fakeRecorder,
  156. ))
  157. reconciler := NewReconciler(
  158. oex,
  159. reconcilerLoopSleepDuration,
  160. dsw,
  161. asw,
  162. )
  163. reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
  164. // Start the reconciler to fill ASW.
  165. stopChan := make(chan struct{})
  166. defer close(stopChan)
  167. go reconciler.Run(stopChan)
  168. socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
  169. pluginName := fmt.Sprintf("example-plugin")
  170. p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  171. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  172. timestampBeforeRegistration := time.Now()
  173. dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
  174. waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
  175. // Get asw plugins; it should contain the added plugin
  176. aswPlugins := asw.GetRegisteredPlugins()
  177. if len(aswPlugins) != 1 {
  178. t.Fatalf("Test_Run_Positive_Register: actual state of world length should be one but it's %d", len(aswPlugins))
  179. }
  180. if aswPlugins[0].SocketPath != socketPath {
  181. t.Fatalf("Test_Run_Positive_Register: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
  182. }
  183. }
  184. // Populates desiredStateOfWorld cache with one plugin
  185. // Calls Run()
  186. // Verifies there is one plugin now in actual state of world.
  187. // Deletes plugin from desired state of world.
  188. // Verifies that plugin no longer exists in actual state of world.
  189. func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
  190. defer cleanup(t)
  191. dsw := cache.NewDesiredStateOfWorld()
  192. asw := cache.NewActualStateOfWorld()
  193. di := NewDummyImpl()
  194. fakeRecorder := &record.FakeRecorder{}
  195. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  196. fakeRecorder,
  197. ))
  198. reconciler := NewReconciler(
  199. oex,
  200. reconcilerLoopSleepDuration,
  201. dsw,
  202. asw,
  203. )
  204. reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
  205. // Start the reconciler to fill ASW.
  206. stopChan := make(chan struct{})
  207. defer close(stopChan)
  208. go reconciler.Run(stopChan)
  209. socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
  210. pluginName := fmt.Sprintf("example-plugin")
  211. p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  212. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  213. timestampBeforeRegistration := time.Now()
  214. dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
  215. waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
  216. // Get asw plugins; it should contain the added plugin
  217. aswPlugins := asw.GetRegisteredPlugins()
  218. if len(aswPlugins) != 1 {
  219. t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
  220. }
  221. if aswPlugins[0].SocketPath != socketPath {
  222. t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
  223. }
  224. dsw.RemovePlugin(socketPath)
  225. waitForUnregistration(t, socketPath, asw)
  226. // Get asw plugins; it should no longer contain the added plugin
  227. aswPlugins = asw.GetRegisteredPlugins()
  228. if len(aswPlugins) != 0 {
  229. t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be zero but it's %d", len(aswPlugins))
  230. }
  231. }
  232. // Populates desiredStateOfWorld cache with one plugin
  233. // Calls Run()
  234. // Then update the timestamp of the plugin
  235. // Verifies that the plugin is reregistered.
  236. // Verifies the plugin with updated timestamp now in actual state of world.
  237. func Test_Run_Positive_ReRegister(t *testing.T) {
  238. defer cleanup(t)
  239. dsw := cache.NewDesiredStateOfWorld()
  240. asw := cache.NewActualStateOfWorld()
  241. di := NewDummyImpl()
  242. fakeRecorder := &record.FakeRecorder{}
  243. oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  244. fakeRecorder,
  245. ))
  246. reconciler := NewReconciler(
  247. oex,
  248. reconcilerLoopSleepDuration,
  249. dsw,
  250. asw,
  251. )
  252. reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
  253. // Start the reconciler to fill ASW.
  254. stopChan := make(chan struct{})
  255. defer close(stopChan)
  256. go reconciler.Run(stopChan)
  257. socketPath := fmt.Sprintf("%s/plugin2.sock", socketDir)
  258. pluginName := fmt.Sprintf("example-plugin2")
  259. p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  260. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  261. timestampBeforeRegistration := time.Now()
  262. dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
  263. waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
  264. timeStampBeforeReRegistration := time.Now()
  265. // Add the plugin again to update the timestamp
  266. dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
  267. // This should trigger a deregistration and a regitration
  268. // The process of unregistration and reregistration can happen so fast that
  269. // we are not able to catch it with waitForUnregistration, so here we are checking
  270. // the plugin has an updated timestamp.
  271. waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
  272. // Get asw plugins; it should contain the added plugin
  273. aswPlugins := asw.GetRegisteredPlugins()
  274. if len(aswPlugins) != 1 {
  275. t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
  276. }
  277. if aswPlugins[0].SocketPath != socketPath {
  278. t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
  279. }
  280. }