operation_executor_test.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package operationexecutor
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "strconv"
  18. "testing"
  19. "time"
  20. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  21. )
  22. const (
  23. numPluginsToRegister = 2
  24. numPluginsToUnregister = 2
  25. )
  26. var _ OperationGenerator = &fakeOperationGenerator{}
  27. var socketDir string
  28. func init() {
  29. d, err := ioutil.TempDir("", "operation_executor_test")
  30. if err != nil {
  31. panic(fmt.Sprintf("Could not create a temp directory: %s", d))
  32. }
  33. socketDir = d
  34. }
  35. func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
  36. ch, quit, oe := setup()
  37. for i := 0; i < numPluginsToRegister; i++ {
  38. socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
  39. oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
  40. }
  41. if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
  42. t.Fatalf("Unable to start register operations in Concurrent for plugins")
  43. }
  44. }
  45. func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
  46. ch, quit, oe := setup()
  47. socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
  48. for i := 0; i < numPluginsToRegister; i++ {
  49. oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
  50. }
  51. if !isOperationRunSerially(ch, quit) {
  52. t.Fatalf("Unable to start register operations serially for plugins")
  53. }
  54. }
  55. func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
  56. ch, quit, oe := setup()
  57. for i := 0; i < numPluginsToUnregister; i++ {
  58. socketPath := "socket-path" + strconv.Itoa(i)
  59. oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
  60. }
  61. if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
  62. t.Fatalf("Unable to start unregister operations in Concurrent for plugins")
  63. }
  64. }
  65. func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
  66. ch, quit, oe := setup()
  67. socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
  68. for i := 0; i < numPluginsToUnregister; i++ {
  69. oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
  70. }
  71. if !isOperationRunSerially(ch, quit) {
  72. t.Fatalf("Unable to start unregister operations serially for plugins")
  73. }
  74. }
  75. type fakeOperationGenerator struct {
  76. ch chan interface{}
  77. quit chan interface{}
  78. }
  79. func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
  80. return &fakeOperationGenerator{
  81. ch: ch,
  82. quit: quit,
  83. }
  84. }
  85. func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
  86. socketPath string,
  87. foundInDeprecatedDir bool,
  88. timestamp time.Time,
  89. pluginHandlers map[string]cache.PluginHandler,
  90. actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
  91. opFunc := func() error {
  92. startOperationAndBlock(fopg.ch, fopg.quit)
  93. return nil
  94. }
  95. return opFunc
  96. }
  97. func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
  98. socketPath string,
  99. pluginHandlers map[string]cache.PluginHandler,
  100. actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
  101. opFunc := func() error {
  102. startOperationAndBlock(fopg.ch, fopg.quit)
  103. return nil
  104. }
  105. return opFunc
  106. }
  107. func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
  108. defer close(quit)
  109. numOperationsStarted := 0
  110. loop:
  111. for {
  112. select {
  113. case <-ch:
  114. numOperationsStarted++
  115. if numOperationsStarted > 1 {
  116. return false
  117. }
  118. case <-time.After(5 * time.Second):
  119. break loop
  120. }
  121. }
  122. return true
  123. }
  124. func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
  125. defer close(quit)
  126. numOperationsStarted := 0
  127. loop:
  128. for {
  129. select {
  130. case <-ch:
  131. numOperationsStarted++
  132. if numOperationsStarted == numOperationsToRun {
  133. return true
  134. }
  135. case <-time.After(5 * time.Second):
  136. break loop
  137. }
  138. }
  139. return false
  140. }
  141. func setup() (chan interface{}, chan interface{}, OperationExecutor) {
  142. ch, quit := make(chan interface{}), make(chan interface{})
  143. return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
  144. }
  145. // This function starts by writing to ch and blocks on the quit channel
  146. // until it is closed by the currently running test
  147. func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
  148. ch <- nil
  149. <-quit
  150. }