attach_detach_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package volume
  14. import (
  15. "fmt"
  16. "net/http/httptest"
  17. "testing"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/resource"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. utilfeature "k8s.io/apiserver/pkg/util/feature"
  25. clientgoinformers "k8s.io/client-go/informers"
  26. clientset "k8s.io/client-go/kubernetes"
  27. restclient "k8s.io/client-go/rest"
  28. "k8s.io/client-go/tools/cache"
  29. fakecloud "k8s.io/cloud-provider/fake"
  30. "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
  31. volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
  32. "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
  33. persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
  34. "k8s.io/kubernetes/pkg/features"
  35. "k8s.io/kubernetes/pkg/volume"
  36. volumetest "k8s.io/kubernetes/pkg/volume/testing"
  37. "k8s.io/kubernetes/pkg/volume/util"
  38. "k8s.io/kubernetes/test/integration/framework"
  39. )
  40. func fakePodWithVol(namespace string) *v1.Pod {
  41. fakePod := &v1.Pod{
  42. ObjectMeta: metav1.ObjectMeta{
  43. Namespace: namespace,
  44. Name: "fakepod",
  45. },
  46. Spec: v1.PodSpec{
  47. Containers: []v1.Container{
  48. {
  49. Name: "fake-container",
  50. Image: "nginx",
  51. VolumeMounts: []v1.VolumeMount{
  52. {
  53. Name: "fake-mount",
  54. MountPath: "/var/www/html",
  55. },
  56. },
  57. },
  58. },
  59. Volumes: []v1.Volume{
  60. {
  61. Name: "fake-mount",
  62. VolumeSource: v1.VolumeSource{
  63. HostPath: &v1.HostPathVolumeSource{
  64. Path: "/var/www/html",
  65. },
  66. },
  67. },
  68. },
  69. NodeName: "node-sandbox",
  70. },
  71. }
  72. return fakePod
  73. }
  74. func fakePodWithPVC(name, pvcName, namespace string) (*v1.Pod, *v1.PersistentVolumeClaim) {
  75. fakePod := &v1.Pod{
  76. ObjectMeta: metav1.ObjectMeta{
  77. Namespace: namespace,
  78. Name: name,
  79. },
  80. Spec: v1.PodSpec{
  81. Containers: []v1.Container{
  82. {
  83. Name: "fake-container",
  84. Image: "nginx",
  85. VolumeMounts: []v1.VolumeMount{
  86. {
  87. Name: "fake-mount",
  88. MountPath: "/var/www/html",
  89. },
  90. },
  91. },
  92. },
  93. Volumes: []v1.Volume{
  94. {
  95. Name: "fake-mount",
  96. VolumeSource: v1.VolumeSource{
  97. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  98. ClaimName: pvcName,
  99. },
  100. },
  101. },
  102. },
  103. NodeName: "node-sandbox",
  104. },
  105. }
  106. class := "fake-sc"
  107. fakePVC := &v1.PersistentVolumeClaim{
  108. ObjectMeta: metav1.ObjectMeta{
  109. Namespace: namespace,
  110. Name: pvcName,
  111. },
  112. Spec: v1.PersistentVolumeClaimSpec{
  113. AccessModes: []v1.PersistentVolumeAccessMode{
  114. v1.ReadWriteOnce,
  115. },
  116. Resources: v1.ResourceRequirements{
  117. Requests: v1.ResourceList{
  118. v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
  119. },
  120. },
  121. StorageClassName: &class,
  122. },
  123. }
  124. return fakePod, fakePVC
  125. }
  126. type podCountFunc func(int) bool
  127. var defaultTimerConfig = attachdetach.TimerConfig{
  128. ReconcilerLoopPeriod: 100 * time.Millisecond,
  129. ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
  130. DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Second,
  131. DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
  132. }
  133. // Via integration test we can verify that if pod delete
  134. // event is somehow missed by AttachDetach controller - it still
  135. // gets cleaned up by Desired State of World populator.
  136. func TestPodDeletionWithDswp(t *testing.T) {
  137. _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
  138. defer closeFn()
  139. namespaceName := "test-pod-deletion"
  140. node := &v1.Node{
  141. ObjectMeta: metav1.ObjectMeta{
  142. Name: "node-sandbox",
  143. Annotations: map[string]string{
  144. util.ControllerManagedAttachAnnotation: "true",
  145. },
  146. },
  147. }
  148. ns := framework.CreateTestingNamespace(namespaceName, server, t)
  149. defer framework.DeleteTestingNamespace(ns, server, t)
  150. testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
  151. pod := fakePodWithVol(namespaceName)
  152. podStopCh := make(chan struct{})
  153. if _, err := testClient.CoreV1().Nodes().Create(node); err != nil {
  154. t.Fatalf("Failed to created node : %v", err)
  155. }
  156. go informers.Core().V1().Nodes().Informer().Run(podStopCh)
  157. if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  158. t.Errorf("Failed to create pod : %v", err)
  159. }
  160. podInformer := informers.Core().V1().Pods().Informer()
  161. go podInformer.Run(podStopCh)
  162. // start controller loop
  163. stopCh := make(chan struct{})
  164. go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
  165. go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
  166. initCSIObjects(stopCh, informers)
  167. go ctrl.Run(stopCh)
  168. waitToObservePods(t, podInformer, 1)
  169. podKey, err := cache.MetaNamespaceKeyFunc(pod)
  170. if err != nil {
  171. t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
  172. }
  173. podInformerObj, _, err := podInformer.GetStore().GetByKey(podKey)
  174. if err != nil {
  175. t.Fatalf("Pod not found in Pod Informer cache : %v", err)
  176. }
  177. waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
  178. // let's stop pod events from getting triggered
  179. close(podStopCh)
  180. err = podInformer.GetStore().Delete(podInformerObj)
  181. if err != nil {
  182. t.Fatalf("Error deleting pod : %v", err)
  183. }
  184. waitToObservePods(t, podInformer, 0)
  185. // the populator loop turns every 1 minute
  186. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 80*time.Second, "expected 0 pods in dsw after pod delete", 0)
  187. close(stopCh)
  188. }
  189. func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) {
  190. if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
  191. utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
  192. go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh)
  193. }
  194. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  195. go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh)
  196. }
  197. }
  198. func TestPodUpdateWithWithADC(t *testing.T) {
  199. _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
  200. defer closeFn()
  201. namespaceName := "test-pod-update"
  202. node := &v1.Node{
  203. ObjectMeta: metav1.ObjectMeta{
  204. Name: "node-sandbox",
  205. Annotations: map[string]string{
  206. util.ControllerManagedAttachAnnotation: "true",
  207. },
  208. },
  209. }
  210. ns := framework.CreateTestingNamespace(namespaceName, server, t)
  211. defer framework.DeleteTestingNamespace(ns, server, t)
  212. testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
  213. pod := fakePodWithVol(namespaceName)
  214. podStopCh := make(chan struct{})
  215. if _, err := testClient.CoreV1().Nodes().Create(node); err != nil {
  216. t.Fatalf("Failed to created node : %v", err)
  217. }
  218. go informers.Core().V1().Nodes().Informer().Run(podStopCh)
  219. if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  220. t.Errorf("Failed to create pod : %v", err)
  221. }
  222. podInformer := informers.Core().V1().Pods().Informer()
  223. go podInformer.Run(podStopCh)
  224. // start controller loop
  225. stopCh := make(chan struct{})
  226. go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
  227. go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
  228. initCSIObjects(stopCh, informers)
  229. go ctrl.Run(stopCh)
  230. waitToObservePods(t, podInformer, 1)
  231. podKey, err := cache.MetaNamespaceKeyFunc(pod)
  232. if err != nil {
  233. t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
  234. }
  235. _, _, err = podInformer.GetStore().GetByKey(podKey)
  236. if err != nil {
  237. t.Fatalf("Pod not found in Pod Informer cache : %v", err)
  238. }
  239. waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
  240. pod.Status.Phase = v1.PodSucceeded
  241. if _, err := testClient.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
  242. t.Errorf("Failed to update pod : %v", err)
  243. }
  244. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected 0 pods in dsw after pod completion", 0)
  245. close(podStopCh)
  246. close(stopCh)
  247. }
  248. func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
  249. _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
  250. defer closeFn()
  251. namespaceName := "test-pod-update"
  252. node := &v1.Node{
  253. ObjectMeta: metav1.ObjectMeta{
  254. Name: "node-sandbox",
  255. Annotations: map[string]string{
  256. util.ControllerManagedAttachAnnotation: "true",
  257. util.KeepTerminatedPodVolumesAnnotation: "true",
  258. },
  259. },
  260. }
  261. ns := framework.CreateTestingNamespace(namespaceName, server, t)
  262. defer framework.DeleteTestingNamespace(ns, server, t)
  263. testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
  264. pod := fakePodWithVol(namespaceName)
  265. podStopCh := make(chan struct{})
  266. if _, err := testClient.CoreV1().Nodes().Create(node); err != nil {
  267. t.Fatalf("Failed to created node : %v", err)
  268. }
  269. go informers.Core().V1().Nodes().Informer().Run(podStopCh)
  270. if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  271. t.Errorf("Failed to create pod : %v", err)
  272. }
  273. podInformer := informers.Core().V1().Pods().Informer()
  274. go podInformer.Run(podStopCh)
  275. // start controller loop
  276. stopCh := make(chan struct{})
  277. go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
  278. go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
  279. initCSIObjects(stopCh, informers)
  280. go ctrl.Run(stopCh)
  281. waitToObservePods(t, podInformer, 1)
  282. podKey, err := cache.MetaNamespaceKeyFunc(pod)
  283. if err != nil {
  284. t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
  285. }
  286. _, _, err = podInformer.GetStore().GetByKey(podKey)
  287. if err != nil {
  288. t.Fatalf("Pod not found in Pod Informer cache : %v", err)
  289. }
  290. waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
  291. pod.Status.Phase = v1.PodSucceeded
  292. if _, err := testClient.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
  293. t.Errorf("Failed to update pod : %v", err)
  294. }
  295. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected non-zero pods in dsw if KeepTerminatedPodVolumesAnnotation is set", 1)
  296. close(podStopCh)
  297. close(stopCh)
  298. }
  299. // wait for the podInformer to observe the pods. Call this function before
  300. // running the RC manager to prevent the rc manager from creating new pods
  301. // rather than adopting the existing ones.
  302. func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int) {
  303. if err := wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) {
  304. objects := podInformer.GetIndexer().List()
  305. if len(objects) == podNum {
  306. return true, nil
  307. }
  308. return false, nil
  309. }); err != nil {
  310. t.Fatal(err)
  311. }
  312. }
  313. // wait for pods to be observed in desired state of world
  314. func waitForPodsInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld) {
  315. if err := wait.Poll(time.Millisecond*500, wait.ForeverTestTimeout, func() (bool, error) {
  316. pods := dswp.GetPodToAdd()
  317. if len(pods) > 0 {
  318. return true, nil
  319. }
  320. return false, nil
  321. }); err != nil {
  322. t.Fatalf("Pod not added to desired state of world : %v", err)
  323. }
  324. }
  325. // wait for pods to be observed in desired state of world
  326. func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, checkTimeout time.Duration, failMessage string, podCount int) {
  327. if err := wait.Poll(time.Millisecond*500, checkTimeout, func() (bool, error) {
  328. pods := dswp.GetPodToAdd()
  329. if len(pods) == podCount {
  330. return true, nil
  331. }
  332. return false, nil
  333. }); err != nil {
  334. t.Fatalf("%s but got error %v", failMessage, err)
  335. }
  336. }
  337. func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
  338. config := restclient.Config{
  339. Host: server.URL,
  340. ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
  341. QPS: 1000000,
  342. Burst: 1000000,
  343. }
  344. resyncPeriod := 12 * time.Hour
  345. testClient := clientset.NewForConfigOrDie(&config)
  346. host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
  347. plugin := &volumetest.FakeVolumePlugin{
  348. PluginName: provisionerPluginName,
  349. Host: host,
  350. Config: volume.VolumeConfig{},
  351. LastProvisionerOptions: volume.VolumeOptions{},
  352. NewAttacherCallCount: 0,
  353. NewDetacherCallCount: 0,
  354. Mounters: nil,
  355. Unmounters: nil,
  356. Attachers: nil,
  357. Detachers: nil,
  358. }
  359. plugins := []volume.VolumePlugin{plugin}
  360. cloud := &fakecloud.Cloud{}
  361. informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod)
  362. ctrl, err := attachdetach.NewAttachDetachController(
  363. testClient,
  364. informers.Core().V1().Pods(),
  365. informers.Core().V1().Nodes(),
  366. informers.Core().V1().PersistentVolumeClaims(),
  367. informers.Core().V1().PersistentVolumes(),
  368. informers.Storage().V1beta1().CSINodes(),
  369. informers.Storage().V1beta1().CSIDrivers(),
  370. cloud,
  371. plugins,
  372. nil, /* prober */
  373. false,
  374. 5*time.Second,
  375. timers)
  376. if err != nil {
  377. t.Fatalf("Error creating AttachDetach : %v", err)
  378. }
  379. // create pv controller
  380. controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions()
  381. params := persistentvolume.ControllerParameters{
  382. KubeClient: testClient,
  383. SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod,
  384. VolumePlugins: plugins,
  385. Cloud: nil,
  386. ClusterName: "volume-test-cluster",
  387. VolumeInformer: informers.Core().V1().PersistentVolumes(),
  388. ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
  389. ClassInformer: informers.Storage().V1().StorageClasses(),
  390. PodInformer: informers.Core().V1().Pods(),
  391. NodeInformer: informers.Core().V1().Nodes(),
  392. EnableDynamicProvisioning: false,
  393. }
  394. pvCtrl, err := persistentvolume.NewController(params)
  395. if err != nil {
  396. t.Fatalf("Failed to create PV controller: %v", err)
  397. }
  398. return testClient, ctrl, pvCtrl, informers
  399. }
  400. // Via integration test we can verify that if pod add
  401. // event is somehow missed by AttachDetach controller - it still
  402. // gets added by Desired State of World populator.
  403. func TestPodAddedByDswp(t *testing.T) {
  404. _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
  405. defer closeFn()
  406. namespaceName := "test-pod-deletion"
  407. node := &v1.Node{
  408. ObjectMeta: metav1.ObjectMeta{
  409. Name: "node-sandbox",
  410. Annotations: map[string]string{
  411. util.ControllerManagedAttachAnnotation: "true",
  412. },
  413. },
  414. }
  415. ns := framework.CreateTestingNamespace(namespaceName, server, t)
  416. defer framework.DeleteTestingNamespace(ns, server, t)
  417. testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig)
  418. pod := fakePodWithVol(namespaceName)
  419. podStopCh := make(chan struct{})
  420. if _, err := testClient.CoreV1().Nodes().Create(node); err != nil {
  421. t.Fatalf("Failed to created node : %v", err)
  422. }
  423. go informers.Core().V1().Nodes().Informer().Run(podStopCh)
  424. if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  425. t.Errorf("Failed to create pod : %v", err)
  426. }
  427. podInformer := informers.Core().V1().Pods().Informer()
  428. go podInformer.Run(podStopCh)
  429. // start controller loop
  430. stopCh := make(chan struct{})
  431. go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
  432. go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
  433. initCSIObjects(stopCh, informers)
  434. go ctrl.Run(stopCh)
  435. waitToObservePods(t, podInformer, 1)
  436. podKey, err := cache.MetaNamespaceKeyFunc(pod)
  437. if err != nil {
  438. t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
  439. }
  440. _, _, err = podInformer.GetStore().GetByKey(podKey)
  441. if err != nil {
  442. t.Fatalf("Pod not found in Pod Informer cache : %v", err)
  443. }
  444. waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
  445. // let's stop pod events from getting triggered
  446. close(podStopCh)
  447. podNew := pod.DeepCopy()
  448. newPodName := "newFakepod"
  449. podNew.SetName(newPodName)
  450. err = podInformer.GetStore().Add(podNew)
  451. if err != nil {
  452. t.Fatalf("Error adding pod : %v", err)
  453. }
  454. waitToObservePods(t, podInformer, 2)
  455. // the findAndAddActivePods loop turns every 3 minute
  456. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 200*time.Second, "expected 2 pods in dsw after pod addition", 2)
  457. close(stopCh)
  458. }
  459. func TestPVCBoundWithADC(t *testing.T) {
  460. _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
  461. defer closeFn()
  462. namespaceName := "test-pod-deletion"
  463. ns := framework.CreateTestingNamespace(namespaceName, server, t)
  464. defer framework.DeleteTestingNamespace(ns, server, t)
  465. testClient, ctrl, pvCtrl, informers := createAdClients(ns, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
  466. ReconcilerLoopPeriod: 100 * time.Millisecond,
  467. ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
  468. DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
  469. // Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test.
  470. DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour,
  471. })
  472. node := &v1.Node{
  473. ObjectMeta: metav1.ObjectMeta{
  474. Name: "node-sandbox",
  475. Annotations: map[string]string{
  476. util.ControllerManagedAttachAnnotation: "true",
  477. },
  478. },
  479. }
  480. if _, err := testClient.CoreV1().Nodes().Create(node); err != nil {
  481. t.Fatalf("Failed to created node : %v", err)
  482. }
  483. // pods with pvc not bound
  484. pvcs := []*v1.PersistentVolumeClaim{}
  485. for i := 0; i < 3; i++ {
  486. pod, pvc := fakePodWithPVC(fmt.Sprintf("fakepod-pvcnotbound-%d", i), fmt.Sprintf("fakepvc-%d", i), namespaceName)
  487. if _, err := testClient.CoreV1().Pods(pod.Namespace).Create(pod); err != nil {
  488. t.Errorf("Failed to create pod : %v", err)
  489. }
  490. if _, err := testClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc); err != nil {
  491. t.Errorf("Failed to create pvc : %v", err)
  492. }
  493. pvcs = append(pvcs, pvc)
  494. }
  495. // pod with no pvc
  496. podNew := fakePodWithVol(namespaceName)
  497. podNew.SetName("fakepod")
  498. if _, err := testClient.CoreV1().Pods(podNew.Namespace).Create(podNew); err != nil {
  499. t.Errorf("Failed to create pod : %v", err)
  500. }
  501. // start controller loop
  502. stopCh := make(chan struct{})
  503. informers.Start(stopCh)
  504. informers.WaitForCacheSync(stopCh)
  505. initCSIObjects(stopCh, informers)
  506. go ctrl.Run(stopCh)
  507. go pvCtrl.Run(stopCh)
  508. waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
  509. // Give attachdetach controller enough time to populate pods into DSWP.
  510. time.Sleep(10 * time.Second)
  511. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 1 pod in dsw", 1)
  512. for _, pvc := range pvcs {
  513. createPVForPVC(t, testClient, pvc)
  514. }
  515. waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4)
  516. close(stopCh)
  517. }
  518. // Create PV for PVC, pv controller will bind them together.
  519. func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.PersistentVolumeClaim) {
  520. pv := &v1.PersistentVolume{
  521. ObjectMeta: metav1.ObjectMeta{
  522. Name: fmt.Sprintf("fakepv-%s", pvc.Name),
  523. },
  524. Spec: v1.PersistentVolumeSpec{
  525. Capacity: pvc.Spec.Resources.Requests,
  526. AccessModes: pvc.Spec.AccessModes,
  527. PersistentVolumeSource: v1.PersistentVolumeSource{
  528. HostPath: &v1.HostPathVolumeSource{
  529. Path: "/var/www/html",
  530. },
  531. },
  532. ClaimRef: &v1.ObjectReference{Name: pvc.Name, Namespace: pvc.Namespace},
  533. StorageClassName: *pvc.Spec.StorageClassName,
  534. },
  535. }
  536. if _, err := testClient.CoreV1().PersistentVolumes().Create(pv); err != nil {
  537. t.Errorf("Failed to create pv : %v", err)
  538. }
  539. }