plugin_watcher_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginwatcher
  14. import (
  15. "flag"
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/klog"
  25. registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  26. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  27. )
  28. var (
  29. socketDir string
  30. deprecatedSocketDir string
  31. supportedVersions = []string{"v1beta1", "v1beta2"}
  32. )
  33. func init() {
  34. var logLevel string
  35. klog.InitFlags(flag.CommandLine)
  36. flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
  37. flag.StringVar(&logLevel, "logLevel", "6", "test")
  38. flag.Lookup("v").Value.Set(logLevel)
  39. d, err := ioutil.TempDir("", "plugin_test")
  40. if err != nil {
  41. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  42. }
  43. d2, err := ioutil.TempDir("", "deprecated_plugin_test")
  44. if err != nil {
  45. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  46. }
  47. socketDir = d
  48. deprecatedSocketDir = d2
  49. }
  50. func cleanup(t *testing.T) {
  51. require.NoError(t, os.RemoveAll(socketDir))
  52. require.NoError(t, os.RemoveAll(deprecatedSocketDir))
  53. os.MkdirAll(socketDir, 0755)
  54. os.MkdirAll(deprecatedSocketDir, 0755)
  55. }
  56. func waitForRegistration(
  57. t *testing.T,
  58. socketPath string,
  59. dsw cache.DesiredStateOfWorld) {
  60. err := retryWithExponentialBackOff(
  61. time.Duration(500*time.Millisecond),
  62. func() (bool, error) {
  63. if dsw.PluginExists(socketPath) {
  64. return true, nil
  65. }
  66. return false, nil
  67. },
  68. )
  69. if err != nil {
  70. t.Fatalf("Timed out waiting for plugin to be added to desired state of world cache:\n%s.", socketPath)
  71. }
  72. }
  73. func waitForUnregistration(
  74. t *testing.T,
  75. socketPath string,
  76. dsw cache.DesiredStateOfWorld) {
  77. err := retryWithExponentialBackOff(
  78. time.Duration(500*time.Millisecond),
  79. func() (bool, error) {
  80. if !dsw.PluginExists(socketPath) {
  81. return true, nil
  82. }
  83. return false, nil
  84. },
  85. )
  86. if err != nil {
  87. t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
  88. }
  89. }
  90. func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  91. backoff := wait.Backoff{
  92. Duration: initialDuration,
  93. Factor: 3,
  94. Jitter: 0,
  95. Steps: 6,
  96. }
  97. return wait.ExponentialBackoff(backoff, fn)
  98. }
  99. func TestPluginRegistration(t *testing.T) {
  100. defer cleanup(t)
  101. dsw := cache.NewDesiredStateOfWorld()
  102. newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
  103. for i := 0; i < 10; i++ {
  104. socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
  105. pluginName := fmt.Sprintf("example-plugin-%d", i)
  106. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  107. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  108. pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
  109. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  110. // Check the desired state for plugins
  111. dswPlugins := dsw.GetPluginsToRegister()
  112. if len(dswPlugins) != 1 {
  113. t.Fatalf("TestPluginRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
  114. }
  115. // Stop the plugin; the plugin should be removed from the desired state of world cache
  116. require.NoError(t, p.Stop())
  117. // The following doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
  118. waitForUnregistration(t, pluginInfo.SocketPath, dsw)
  119. dswPlugins = dsw.GetPluginsToRegister()
  120. if len(dswPlugins) != 0 {
  121. t.Fatalf("TestPluginRegistration: desired state of world length should be 0 but it's %d", len(dswPlugins))
  122. }
  123. }
  124. }
  125. func TestPluginRegistrationDeprecated(t *testing.T) {
  126. defer cleanup(t)
  127. dsw := cache.NewDesiredStateOfWorld()
  128. newWatcher(t, true /* testDeprecatedDir */, dsw, wait.NeverStop)
  129. // Test plugins in deprecated dir
  130. for i := 0; i < 10; i++ {
  131. endpoint := fmt.Sprintf("%s/dep-plugin-%d.sock", deprecatedSocketDir, i)
  132. pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
  133. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, endpoint, supportedVersions...)
  134. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  135. pluginInfo := GetPluginInfo(p, true /* testDeprecatedDir */)
  136. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  137. // Check the desired state for plugins
  138. dswPlugins := dsw.GetPluginsToRegister()
  139. if len(dswPlugins) != i+1 {
  140. t.Fatalf("TestPluginRegistrationDeprecated: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
  141. }
  142. }
  143. }
  144. func TestPluginRegistrationSameName(t *testing.T) {
  145. defer cleanup(t)
  146. dsw := cache.NewDesiredStateOfWorld()
  147. newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
  148. // Make 10 plugins with the same name and same type but different socket path;
  149. // all 10 should be in desired state of world cache
  150. pluginName := "dep-example-plugin"
  151. for i := 0; i < 10; i++ {
  152. socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
  153. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  154. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  155. pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
  156. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  157. // Check the desired state for plugins
  158. dswPlugins := dsw.GetPluginsToRegister()
  159. if len(dswPlugins) != i+1 {
  160. t.Fatalf("TestPluginRegistrationSameName: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
  161. }
  162. }
  163. }
  164. func TestPluginReRegistration(t *testing.T) {
  165. defer cleanup(t)
  166. dsw := cache.NewDesiredStateOfWorld()
  167. newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
  168. // Create a plugin first, we are then going to remove the plugin, update the plugin with a different name
  169. // and recreate it.
  170. socketPath := fmt.Sprintf("%s/plugin-reregistration.sock", socketDir)
  171. pluginName := "reregister-plugin"
  172. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  173. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  174. pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
  175. lastTimestamp := time.Now()
  176. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  177. // Remove this plugin, then recreate it again with a different name for 10 times
  178. // The updated plugin should be in the desired state of world cache
  179. for i := 0; i < 10; i++ {
  180. // Stop the plugin; the plugin should be removed from the desired state of world cache
  181. // The plugin removel doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
  182. require.NoError(t, p.Stop())
  183. waitForUnregistration(t, pluginInfo.SocketPath, dsw)
  184. // Add the plugin again
  185. pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
  186. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  187. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  188. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  189. // Check the dsw cache. The updated plugin should be the only plugin in it
  190. dswPlugins := dsw.GetPluginsToRegister()
  191. if len(dswPlugins) != 1 {
  192. t.Fatalf("TestPluginReRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
  193. }
  194. if !dswPlugins[0].Timestamp.After(lastTimestamp) {
  195. t.Fatalf("TestPluginReRegistration: for plugin %s timestamp of plugin is not updated", pluginName)
  196. }
  197. lastTimestamp = dswPlugins[0].Timestamp
  198. }
  199. }
  200. func TestPluginRegistrationAtKubeletStart(t *testing.T) {
  201. defer cleanup(t)
  202. plugins := make([]*examplePlugin, 10)
  203. for i := 0; i < len(plugins); i++ {
  204. socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
  205. pluginName := fmt.Sprintf("example-plugin-%d", i)
  206. p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
  207. require.NoError(t, p.Serve("v1beta1", "v1beta2"))
  208. defer func(p *examplePlugin) {
  209. require.NoError(t, p.Stop())
  210. }(p)
  211. plugins[i] = p
  212. }
  213. dsw := cache.NewDesiredStateOfWorld()
  214. newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
  215. var wg sync.WaitGroup
  216. for i := 0; i < len(plugins); i++ {
  217. wg.Add(1)
  218. go func(p *examplePlugin) {
  219. defer wg.Done()
  220. pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
  221. // Validate that the plugin is in the desired state cache
  222. waitForRegistration(t, pluginInfo.SocketPath, dsw)
  223. }(plugins[i])
  224. }
  225. c := make(chan struct{})
  226. go func() {
  227. defer close(c)
  228. wg.Wait()
  229. }()
  230. select {
  231. case <-c:
  232. return
  233. case <-time.After(wait.ForeverTestTimeout):
  234. t.Fatalf("Timeout while waiting for the plugin registration status")
  235. }
  236. }
  237. func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
  238. select {
  239. case status := <-statusChan:
  240. return status.PluginRegistered
  241. case <-time.After(wait.ForeverTestTimeout):
  242. t.Fatalf("Timed out while waiting for registration status")
  243. }
  244. return false
  245. }
  246. func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan examplePluginEvent) bool {
  247. select {
  248. case event := <-eventChan:
  249. return event == expected
  250. case <-time.After(wait.ForeverTestTimeout):
  251. t.Fatalf("Timed out while waiting for registration status %v", expected)
  252. }
  253. return false
  254. }
  255. func newWatcher(t *testing.T, testDeprecatedDir bool, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
  256. depSocketDir := ""
  257. if testDeprecatedDir {
  258. depSocketDir = deprecatedSocketDir
  259. }
  260. w := NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
  261. require.NoError(t, w.Start(stopCh))
  262. return w
  263. }
  264. func TestFoundInDeprecatedDir(t *testing.T) {
  265. testCases := []struct {
  266. sockDir string
  267. deprecatedSockDir string
  268. socketPath string
  269. expectFoundInDeprecatedDir bool
  270. }{
  271. {
  272. sockDir: "/var/lib/kubelet/plugins_registry",
  273. deprecatedSockDir: "/var/lib/kubelet/plugins",
  274. socketPath: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
  275. expectFoundInDeprecatedDir: false,
  276. },
  277. {
  278. sockDir: "/var/lib/kubelet/plugins_registry",
  279. deprecatedSockDir: "/var/lib/kubelet/plugins",
  280. socketPath: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
  281. expectFoundInDeprecatedDir: true,
  282. },
  283. {
  284. sockDir: "/var/lib/kubelet/plugins_registry",
  285. deprecatedSockDir: "/var/lib/kubelet/plugins",
  286. socketPath: "/var/lib/kubelet/plugins_registry",
  287. expectFoundInDeprecatedDir: false,
  288. },
  289. {
  290. sockDir: "/var/lib/kubelet/plugins_registry",
  291. deprecatedSockDir: "/var/lib/kubelet/plugins",
  292. socketPath: "/var/lib/kubelet/plugins",
  293. expectFoundInDeprecatedDir: true,
  294. },
  295. {
  296. sockDir: "/var/lib/kubelet/plugins_registry",
  297. deprecatedSockDir: "/var/lib/kubelet/plugins",
  298. socketPath: "/var/lib/kubelet/plugins/kubernetes.io",
  299. expectFoundInDeprecatedDir: true,
  300. },
  301. {
  302. sockDir: "/var/lib/kubelet/plugins_registry",
  303. deprecatedSockDir: "/var/lib/kubelet/plugins",
  304. socketPath: "/var/lib/kubelet/plugins/my.driver.com",
  305. expectFoundInDeprecatedDir: true,
  306. },
  307. {
  308. sockDir: "/var/lib/kubelet/plugins_registry",
  309. deprecatedSockDir: "/var/lib/kubelet/plugins",
  310. socketPath: "/var/lib/kubelet/plugins_registry",
  311. expectFoundInDeprecatedDir: false,
  312. },
  313. {
  314. sockDir: "/var/lib/kubelet/plugins_registry",
  315. deprecatedSockDir: "/var/lib/kubelet/plugins",
  316. socketPath: "/var/lib/kubelet/plugins_registry/kubernetes.io",
  317. expectFoundInDeprecatedDir: false,
  318. },
  319. {
  320. sockDir: "/var/lib/kubelet/plugins_registry",
  321. deprecatedSockDir: "/var/lib/kubelet/plugins",
  322. socketPath: "/var/lib/kubelet/plugins_registry/my.driver.com",
  323. expectFoundInDeprecatedDir: false,
  324. },
  325. }
  326. for _, tc := range testCases {
  327. // Arrange & Act
  328. watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
  329. actualFoundInDeprecatedDir := watcher.foundInDeprecatedDir(tc.socketPath)
  330. // Assert
  331. if tc.expectFoundInDeprecatedDir != actualFoundInDeprecatedDir {
  332. t.Fatalf("expecting actualFoundInDeprecatedDir=%v, but got %v for testcase: %#v", tc.expectFoundInDeprecatedDir, actualFoundInDeprecatedDir, tc)
  333. }
  334. }
  335. }
  336. func TestContainsBlacklistedDir(t *testing.T) {
  337. testCases := []struct {
  338. sockDir string
  339. deprecatedSockDir string
  340. path string
  341. expected bool
  342. }{
  343. {
  344. sockDir: "/var/lib/kubelet/plugins_registry",
  345. deprecatedSockDir: "/var/lib/kubelet/plugins",
  346. path: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
  347. expected: false,
  348. },
  349. {
  350. sockDir: "/var/lib/kubelet/plugins_registry",
  351. deprecatedSockDir: "/var/lib/kubelet/plugins",
  352. path: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
  353. expected: false,
  354. },
  355. {
  356. sockDir: "/var/lib/kubelet/plugins_registry",
  357. deprecatedSockDir: "/var/lib/kubelet/plugins",
  358. path: "/var/lib/kubelet/plugins_registry",
  359. expected: false,
  360. },
  361. {
  362. sockDir: "/var/lib/kubelet/plugins_registry",
  363. deprecatedSockDir: "/var/lib/kubelet/plugins",
  364. path: "/var/lib/kubelet/plugins",
  365. expected: false,
  366. },
  367. {
  368. sockDir: "/var/lib/kubelet/plugins_registry",
  369. deprecatedSockDir: "/var/lib/kubelet/plugins",
  370. path: "/var/lib/kubelet/plugins/kubernetes.io",
  371. expected: true,
  372. },
  373. {
  374. sockDir: "/var/lib/kubelet/plugins_registry",
  375. deprecatedSockDir: "/var/lib/kubelet/plugins",
  376. path: "/var/lib/kubelet/plugins/kubernetes.io/csi.sock",
  377. expected: true,
  378. },
  379. {
  380. sockDir: "/var/lib/kubelet/plugins_registry",
  381. deprecatedSockDir: "/var/lib/kubelet/plugins",
  382. path: "/var/lib/kubelet/plugins/kubernetes.io/my.plugin/csi.sock",
  383. expected: true,
  384. },
  385. {
  386. sockDir: "/var/lib/kubelet/plugins_registry",
  387. deprecatedSockDir: "/var/lib/kubelet/plugins",
  388. path: "/var/lib/kubelet/plugins/kubernetes.io/",
  389. expected: true,
  390. },
  391. {
  392. sockDir: "/var/lib/kubelet/plugins_registry",
  393. deprecatedSockDir: "/var/lib/kubelet/plugins",
  394. path: "/var/lib/kubelet/plugins/my.driver.com",
  395. expected: false,
  396. },
  397. {
  398. sockDir: "/var/lib/kubelet/plugins_registry",
  399. deprecatedSockDir: "/var/lib/kubelet/plugins",
  400. path: "/var/lib/kubelet/plugins_registry",
  401. expected: false,
  402. },
  403. {
  404. sockDir: "/var/lib/kubelet/plugins_registry",
  405. deprecatedSockDir: "/var/lib/kubelet/plugins",
  406. path: "/var/lib/kubelet/plugins_registry/kubernetes.io",
  407. expected: false, // New (non-deprecated dir) has no blacklist
  408. },
  409. {
  410. sockDir: "/var/lib/kubelet/plugins_registry",
  411. deprecatedSockDir: "/var/lib/kubelet/plugins",
  412. path: "/var/lib/kubelet/plugins_registry/my.driver.com",
  413. expected: false,
  414. },
  415. {
  416. sockDir: "/var/lib/kubelet/plugins_registry",
  417. deprecatedSockDir: "/var/lib/kubelet/plugins",
  418. path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin",
  419. expected: false,
  420. },
  421. {
  422. sockDir: "/var/lib/kubelet/plugins_registry",
  423. deprecatedSockDir: "/var/lib/kubelet/plugins",
  424. path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin/csi.sock",
  425. expected: false,
  426. },
  427. {
  428. sockDir: "/var/lib/kubelet/plugins_registry",
  429. deprecatedSockDir: "/var/lib/kubelet/plugins",
  430. path: "/var/lib/kubelet/plugins/kubernetes.io-plugin",
  431. expected: false,
  432. },
  433. {
  434. sockDir: "/var/lib/kubelet/plugins_registry",
  435. deprecatedSockDir: "/var/lib/kubelet/plugins",
  436. path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/csi.sock",
  437. expected: false,
  438. },
  439. {
  440. sockDir: "/var/lib/kubelet/plugins_registry",
  441. deprecatedSockDir: "/var/lib/kubelet/plugins",
  442. path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/",
  443. expected: false,
  444. },
  445. }
  446. for _, tc := range testCases {
  447. // Arrange & Act
  448. watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
  449. actual := watcher.containsBlacklistedDir(tc.path)
  450. // Assert
  451. if tc.expected != actual {
  452. t.Fatalf("expecting %v but got %v for testcase: %#v", tc.expected, actual, tc)
  453. }
  454. }
  455. }