attach_detach_controller_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package attachdetach
  14. import (
  15. "context"
  16. "fmt"
  17. "testing"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/client-go/informers"
  24. kcache "k8s.io/client-go/tools/cache"
  25. "k8s.io/kubernetes/pkg/controller"
  26. "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  27. controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
  28. "k8s.io/kubernetes/pkg/volume"
  29. )
  30. func Test_NewAttachDetachController_Positive(t *testing.T) {
  31. // Arrange
  32. fakeKubeClient := controllervolumetesting.CreateTestClient()
  33. informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
  34. // Act
  35. _, err := NewAttachDetachController(
  36. fakeKubeClient,
  37. informerFactory.Core().V1().Pods(),
  38. informerFactory.Core().V1().Nodes(),
  39. informerFactory.Core().V1().PersistentVolumeClaims(),
  40. informerFactory.Core().V1().PersistentVolumes(),
  41. informerFactory.Storage().V1().CSINodes(),
  42. informerFactory.Storage().V1beta1().CSIDrivers(),
  43. nil, /* cloud */
  44. nil, /* plugins */
  45. nil, /* prober */
  46. false,
  47. 5*time.Second,
  48. DefaultTimerConfig,
  49. )
  50. // Assert
  51. if err != nil {
  52. t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
  53. }
  54. }
  55. func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
  56. // Arrange
  57. fakeKubeClient := controllervolumetesting.CreateTestClient()
  58. informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
  59. podInformer := informerFactory.Core().V1().Pods()
  60. nodeInformer := informerFactory.Core().V1().Nodes()
  61. pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
  62. pvInformer := informerFactory.Core().V1().PersistentVolumes()
  63. adc := &attachDetachController{
  64. kubeClient: fakeKubeClient,
  65. pvcLister: pvcInformer.Lister(),
  66. pvcsSynced: pvcInformer.Informer().HasSynced,
  67. pvLister: pvInformer.Lister(),
  68. pvsSynced: pvInformer.Informer().HasSynced,
  69. podLister: podInformer.Lister(),
  70. podsSynced: podInformer.Informer().HasSynced,
  71. nodeLister: nodeInformer.Lister(),
  72. nodesSynced: nodeInformer.Informer().HasSynced,
  73. cloud: nil,
  74. }
  75. // Act
  76. plugins := controllervolumetesting.CreateTestPlugin()
  77. var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
  78. if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
  79. t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
  80. }
  81. adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
  82. adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
  83. err := adc.populateActualStateOfWorld()
  84. if err != nil {
  85. t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
  86. }
  87. err = adc.populateDesiredStateOfWorld()
  88. if err != nil {
  89. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  90. }
  91. // Test the ActualStateOfWorld contains all the node volumes
  92. nodes, err := adc.nodeLister.List(labels.Everything())
  93. if err != nil {
  94. t.Fatalf("Failed to list nodes in indexer. Expected: <no error> Actual: %v", err)
  95. }
  96. for _, node := range nodes {
  97. nodeName := types.NodeName(node.Name)
  98. for _, attachedVolume := range node.Status.VolumesAttached {
  99. found := adc.actualStateOfWorld.IsVolumeAttachedToNode(attachedVolume.Name, nodeName)
  100. if !found {
  101. t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
  102. }
  103. }
  104. }
  105. pods, err := adc.podLister.List(labels.Everything())
  106. if err != nil {
  107. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  108. }
  109. for _, pod := range pods {
  110. uniqueName := fmt.Sprintf("%s/%s", controllervolumetesting.TestPluginName, pod.Spec.Volumes[0].Name)
  111. nodeName := types.NodeName(pod.Spec.NodeName)
  112. found := adc.desiredStateOfWorld.VolumeExists(v1.UniqueVolumeName(uniqueName), nodeName)
  113. if !found {
  114. t.Fatalf("Run failed with error. Volume %s, node %s not found in DesiredStateOfWorld",
  115. pod.Spec.Volumes[0].Name,
  116. pod.Spec.NodeName)
  117. }
  118. }
  119. }
  120. func Test_AttachDetachControllerRecovery(t *testing.T) {
  121. attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{})
  122. newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
  123. attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{})
  124. newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
  125. attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{newPod1})
  126. newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
  127. newPod2 := controllervolumetesting.NewPodWithVolume("newpod-2", "volumeName3", "mynode-1")
  128. attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{newPod2})
  129. }
  130. func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 []*v1.Pod) {
  131. fakeKubeClient := controllervolumetesting.CreateTestClient()
  132. informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
  133. //informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
  134. plugins := controllervolumetesting.CreateTestPlugin()
  135. var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
  136. nodeInformer := informerFactory.Core().V1().Nodes().Informer()
  137. csiNodeInformer := informerFactory.Storage().V1().CSINodes().Informer()
  138. podInformer := informerFactory.Core().V1().Pods().Informer()
  139. var podsNum, extraPodsNum, nodesNum, i int
  140. stopCh := make(chan struct{})
  141. pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
  142. if err != nil {
  143. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  144. }
  145. for _, pod := range pods.Items {
  146. podToAdd := pod
  147. podInformer.GetIndexer().Add(&podToAdd)
  148. podsNum++
  149. }
  150. nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  151. if err != nil {
  152. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  153. }
  154. for _, node := range nodes.Items {
  155. nodeToAdd := node
  156. nodeInformer.GetIndexer().Add(&nodeToAdd)
  157. nodesNum++
  158. }
  159. csiNodes, err := fakeKubeClient.StorageV1().CSINodes().List(context.TODO(), metav1.ListOptions{})
  160. if err != nil {
  161. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  162. }
  163. for _, csiNode := range csiNodes.Items {
  164. csiNodeToAdd := csiNode
  165. csiNodeInformer.GetIndexer().Add(&csiNodeToAdd)
  166. }
  167. informerFactory.Start(stopCh)
  168. if !kcache.WaitForNamedCacheSync("attach detach", stopCh,
  169. informerFactory.Core().V1().Pods().Informer().HasSynced,
  170. informerFactory.Core().V1().Nodes().Informer().HasSynced,
  171. informerFactory.Storage().V1().CSINodes().Informer().HasSynced) {
  172. t.Fatalf("Error waiting for the informer caches to sync")
  173. }
  174. // Make sure the nodes and pods are in the inforer cache
  175. i = 0
  176. nodeList, err := informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
  177. for len(nodeList) < nodesNum {
  178. if err != nil {
  179. t.Fatalf("Error getting list of nodes %v", err)
  180. }
  181. if i > 100 {
  182. t.Fatalf("Time out while waiting for the node informer sync: found %d nodes, expected %d nodes", len(nodeList), nodesNum)
  183. }
  184. time.Sleep(100 * time.Millisecond)
  185. nodeList, err = informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
  186. i++
  187. }
  188. i = 0
  189. podList, err := informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
  190. for len(podList) < podsNum {
  191. if err != nil {
  192. t.Fatalf("Error getting list of nodes %v", err)
  193. }
  194. if i > 100 {
  195. t.Fatalf("Time out while waiting for the pod informer sync: found %d pods, expected %d pods", len(podList), podsNum)
  196. }
  197. time.Sleep(100 * time.Millisecond)
  198. podList, err = informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
  199. i++
  200. }
  201. i = 0
  202. csiNodesList, err := informerFactory.Storage().V1().CSINodes().Lister().List(labels.Everything())
  203. for len(csiNodesList) < nodesNum {
  204. if err != nil {
  205. t.Fatalf("Error getting list of csi nodes %v", err)
  206. }
  207. if i > 100 {
  208. t.Fatalf("Time out while waiting for the csinodes informer sync: found %d csinodes, expected %d csinodes", len(csiNodesList), nodesNum)
  209. }
  210. time.Sleep(100 * time.Millisecond)
  211. csiNodesList, err = informerFactory.Storage().V1().CSINodes().Lister().List(labels.Everything())
  212. i++
  213. }
  214. // Create the controller
  215. adcObj, err := NewAttachDetachController(
  216. fakeKubeClient,
  217. informerFactory.Core().V1().Pods(),
  218. informerFactory.Core().V1().Nodes(),
  219. informerFactory.Core().V1().PersistentVolumeClaims(),
  220. informerFactory.Core().V1().PersistentVolumes(),
  221. informerFactory.Storage().V1().CSINodes(),
  222. informerFactory.Storage().V1beta1().CSIDrivers(),
  223. nil, /* cloud */
  224. plugins,
  225. prober,
  226. false,
  227. 1*time.Second,
  228. DefaultTimerConfig)
  229. if err != nil {
  230. t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
  231. }
  232. adc := adcObj.(*attachDetachController)
  233. // Populate ASW
  234. err = adc.populateActualStateOfWorld()
  235. if err != nil {
  236. t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
  237. }
  238. for _, newPod := range extraPods1 {
  239. // Add a new pod between ASW and DSW ppoulators
  240. _, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
  241. if err != nil {
  242. t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
  243. }
  244. extraPodsNum++
  245. podInformer.GetIndexer().Add(newPod)
  246. }
  247. // Populate DSW
  248. err = adc.populateDesiredStateOfWorld()
  249. if err != nil {
  250. t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
  251. }
  252. for _, newPod := range extraPods2 {
  253. // Add a new pod between DSW ppoulator and reconciler run
  254. _, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
  255. if err != nil {
  256. t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
  257. }
  258. extraPodsNum++
  259. podInformer.GetIndexer().Add(newPod)
  260. }
  261. go adc.reconciler.Run(stopCh)
  262. go adc.desiredStateOfWorldPopulator.Run(stopCh)
  263. defer close(stopCh)
  264. time.Sleep(time.Second * 1) // Wait so the reconciler calls sync at least once
  265. testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
  266. for i = 0; i <= 10; i++ {
  267. var attachedVolumesNum int = 0
  268. var detachedVolumesNum int = 0
  269. time.Sleep(time.Second * 1) // Wait for a second
  270. for _, volumeList := range testPlugin.GetAttachedVolumes() {
  271. attachedVolumesNum += len(volumeList)
  272. }
  273. for _, volumeList := range testPlugin.GetDetachedVolumes() {
  274. detachedVolumesNum += len(volumeList)
  275. }
  276. // All the "extra pods" should result in volume to be attached, the pods all share one volume
  277. // which should be attached (+1), the volumes found only in the nodes status should be detached
  278. if attachedVolumesNum == 1+extraPodsNum && detachedVolumesNum == nodesNum {
  279. break
  280. }
  281. if i == 10 { // 10 seconds time out
  282. t.Fatalf("Waiting for the volumes to attach/detach timed out: attached %d (expected %d); detached %d (%d)",
  283. attachedVolumesNum, 1+extraPodsNum, detachedVolumesNum, nodesNum)
  284. }
  285. }
  286. if testPlugin.GetErrorEncountered() {
  287. t.Fatalf("Fatal error encountered in the testing volume plugin")
  288. }
  289. }