topology.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This suite tests volume topology
  14. package testsuites
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "github.com/onsi/ginkgo"
  20. v1 "k8s.io/api/core/v1"
  21. storagev1 "k8s.io/api/storage/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. clientset "k8s.io/client-go/kubernetes"
  24. "k8s.io/kubernetes/test/e2e/framework"
  25. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  26. e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
  27. e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
  28. e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
  29. "k8s.io/kubernetes/test/e2e/storage/testpatterns"
  30. )
  31. type topologyTestSuite struct {
  32. tsInfo TestSuiteInfo
  33. }
  34. type topologyTest struct {
  35. config *PerTestConfig
  36. driverCleanup func()
  37. intreeOps opCounts
  38. migratedOps opCounts
  39. resource VolumeResource
  40. pod *v1.Pod
  41. allTopologies []topology
  42. }
  43. type topology map[string]string
  44. var _ TestSuite = &topologyTestSuite{}
  45. // InitTopologyTestSuite returns topologyTestSuite that implements TestSuite interface
  46. func InitTopologyTestSuite() TestSuite {
  47. return &topologyTestSuite{
  48. tsInfo: TestSuiteInfo{
  49. Name: "topology",
  50. TestPatterns: []testpatterns.TestPattern{
  51. testpatterns.TopologyImmediate,
  52. testpatterns.TopologyDelayed,
  53. },
  54. },
  55. }
  56. }
  57. func (t *topologyTestSuite) GetTestSuiteInfo() TestSuiteInfo {
  58. return t.tsInfo
  59. }
  60. func (t *topologyTestSuite) SkipRedundantSuite(driver TestDriver, pattern testpatterns.TestPattern) {
  61. }
  62. func (t *topologyTestSuite) DefineTests(driver TestDriver, pattern testpatterns.TestPattern) {
  63. var (
  64. dInfo = driver.GetDriverInfo()
  65. dDriver DynamicPVTestDriver
  66. cs clientset.Interface
  67. err error
  68. )
  69. ginkgo.BeforeEach(func() {
  70. // Check preconditions.
  71. ok := false
  72. dDriver, ok = driver.(DynamicPVTestDriver)
  73. if !ok {
  74. e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", dInfo.Name, pattern.VolType)
  75. }
  76. if !dInfo.Capabilities[CapTopology] {
  77. e2eskipper.Skipf("Driver %q does not support topology - skipping", dInfo.Name)
  78. }
  79. })
  80. // This intentionally comes after checking the preconditions because it
  81. // registers its own BeforeEach which creates the namespace. Beware that it
  82. // also registers an AfterEach which renders f unusable. Any code using
  83. // f must run inside an It or Context callback.
  84. f := framework.NewDefaultFramework("topology")
  85. init := func() topologyTest {
  86. l := topologyTest{}
  87. // Now do the more expensive test initialization.
  88. l.config, l.driverCleanup = driver.PrepareTest(f)
  89. l.resource = VolumeResource{
  90. Config: l.config,
  91. Pattern: pattern,
  92. }
  93. // After driver is installed, check driver topologies on nodes
  94. cs = f.ClientSet
  95. keys := dInfo.TopologyKeys
  96. if len(keys) == 0 {
  97. e2eskipper.Skipf("Driver didn't provide topology keys -- skipping")
  98. }
  99. if dInfo.NumAllowedTopologies == 0 {
  100. // Any plugin that supports topology defaults to 1 topology
  101. dInfo.NumAllowedTopologies = 1
  102. }
  103. // We collect 1 additional topology, if possible, for the conflicting topology test
  104. // case, but it's not needed for the positive test
  105. l.allTopologies, err = t.getCurrentTopologies(cs, keys, dInfo.NumAllowedTopologies+1)
  106. framework.ExpectNoError(err, "failed to get current driver topologies")
  107. if len(l.allTopologies) < dInfo.NumAllowedTopologies {
  108. e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
  109. }
  110. l.resource.Sc = dDriver.GetDynamicProvisionStorageClass(l.config, pattern.FsType)
  111. framework.ExpectNotEqual(l.resource.Sc, nil, "driver failed to provide a StorageClass")
  112. l.resource.Sc.VolumeBindingMode = &pattern.BindingMode
  113. testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
  114. driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
  115. claimSize, err := getSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
  116. framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, driverVolumeSizeRange)
  117. l.resource.Pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
  118. ClaimSize: claimSize,
  119. StorageClassName: &(l.resource.Sc.Name),
  120. }, l.config.Framework.Namespace.Name)
  121. l.intreeOps, l.migratedOps = getMigrationVolumeOpCounts(f.ClientSet, dInfo.InTreePluginName)
  122. return l
  123. }
  124. cleanup := func(l topologyTest) {
  125. t.CleanupResources(cs, &l)
  126. err := tryFunc(l.driverCleanup)
  127. l.driverCleanup = nil
  128. framework.ExpectNoError(err, "while cleaning up driver")
  129. validateMigrationVolumeOpCounts(f.ClientSet, dInfo.InTreePluginName, l.intreeOps, l.migratedOps)
  130. }
  131. ginkgo.It("should provision a volume and schedule a pod with AllowedTopologies", func() {
  132. l := init()
  133. defer func() {
  134. cleanup(l)
  135. }()
  136. // If possible, exclude one topology, otherwise allow them all
  137. excludedIndex := -1
  138. if len(l.allTopologies) > dInfo.NumAllowedTopologies {
  139. excludedIndex = rand.Intn(len(l.allTopologies))
  140. }
  141. allowedTopologies := t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
  142. t.createResources(cs, &l, nil)
  143. err = e2epod.WaitForPodRunningInNamespace(cs, l.pod)
  144. framework.ExpectNoError(err)
  145. ginkgo.By("Verifying pod scheduled to correct node")
  146. pod, err := cs.CoreV1().Pods(l.pod.Namespace).Get(context.TODO(), l.pod.Name, metav1.GetOptions{})
  147. framework.ExpectNoError(err)
  148. node, err := cs.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
  149. framework.ExpectNoError(err)
  150. t.verifyNodeTopology(node, allowedTopologies)
  151. })
  152. ginkgo.It("should fail to schedule a pod which has topologies that conflict with AllowedTopologies", func() {
  153. l := init()
  154. defer func() {
  155. cleanup(l)
  156. }()
  157. if len(l.allTopologies) < dInfo.NumAllowedTopologies+1 {
  158. e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
  159. }
  160. // Exclude one topology
  161. excludedIndex := rand.Intn(len(l.allTopologies))
  162. t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
  163. // Set pod nodeSelector to the excluded topology
  164. exprs := []v1.NodeSelectorRequirement{}
  165. for k, v := range l.allTopologies[excludedIndex] {
  166. exprs = append(exprs, v1.NodeSelectorRequirement{
  167. Key: k,
  168. Operator: v1.NodeSelectorOpIn,
  169. Values: []string{v},
  170. })
  171. }
  172. affinity := &v1.Affinity{
  173. NodeAffinity: &v1.NodeAffinity{
  174. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  175. NodeSelectorTerms: []v1.NodeSelectorTerm{
  176. {
  177. MatchExpressions: exprs,
  178. },
  179. },
  180. },
  181. },
  182. }
  183. t.createResources(cs, &l, affinity)
  184. // Wait for pod to fail scheduling
  185. // With delayed binding, the scheduler errors before provisioning
  186. // With immediate binding, the volume gets provisioned but cannot be scheduled
  187. err = e2epod.WaitForPodNameUnschedulableInNamespace(cs, l.pod.Name, l.pod.Namespace)
  188. framework.ExpectNoError(err)
  189. })
  190. }
  191. // getCurrentTopologies() goes through all Nodes and returns up to maxCount unique driver topologies
  192. func (t *topologyTestSuite) getCurrentTopologies(cs clientset.Interface, keys []string, maxCount int) ([]topology, error) {
  193. nodes, err := e2enode.GetReadySchedulableNodes(cs)
  194. if err != nil {
  195. return nil, err
  196. }
  197. topos := []topology{}
  198. // TODO: scale?
  199. for _, n := range nodes.Items {
  200. topo := map[string]string{}
  201. for _, k := range keys {
  202. v, ok := n.Labels[k]
  203. if !ok {
  204. return nil, fmt.Errorf("node %v missing topology label %v", n.Name, k)
  205. }
  206. topo[k] = v
  207. }
  208. found := false
  209. for _, existingTopo := range topos {
  210. if topologyEqual(existingTopo, topo) {
  211. found = true
  212. break
  213. }
  214. }
  215. if !found {
  216. framework.Logf("found topology %v", topo)
  217. topos = append(topos, topo)
  218. }
  219. if len(topos) >= maxCount {
  220. break
  221. }
  222. }
  223. return topos, nil
  224. }
  225. // reflect.DeepEqual doesn't seem to work
  226. func topologyEqual(t1, t2 topology) bool {
  227. if len(t1) != len(t2) {
  228. return false
  229. }
  230. for k1, v1 := range t1 {
  231. if v2, ok := t2[k1]; !ok || v1 != v2 {
  232. return false
  233. }
  234. }
  235. return true
  236. }
  237. // Set StorageClass.Allowed topologies from topos while excluding the topology at excludedIndex.
  238. // excludedIndex can be -1 to specify nothing should be excluded.
  239. // Return the list of allowed topologies generated.
  240. func (t *topologyTestSuite) setAllowedTopologies(sc *storagev1.StorageClass, topos []topology, excludedIndex int) []topology {
  241. allowedTopologies := []topology{}
  242. sc.AllowedTopologies = []v1.TopologySelectorTerm{}
  243. for i := 0; i < len(topos); i++ {
  244. if i != excludedIndex {
  245. exprs := []v1.TopologySelectorLabelRequirement{}
  246. for k, v := range topos[i] {
  247. exprs = append(exprs, v1.TopologySelectorLabelRequirement{
  248. Key: k,
  249. Values: []string{v},
  250. })
  251. }
  252. sc.AllowedTopologies = append(sc.AllowedTopologies, v1.TopologySelectorTerm{MatchLabelExpressions: exprs})
  253. allowedTopologies = append(allowedTopologies, topos[i])
  254. }
  255. }
  256. return allowedTopologies
  257. }
  258. func (t *topologyTestSuite) verifyNodeTopology(node *v1.Node, allowedTopos []topology) {
  259. for _, topo := range allowedTopos {
  260. for k, v := range topo {
  261. nodeV, _ := node.Labels[k]
  262. if nodeV == v {
  263. return
  264. }
  265. }
  266. }
  267. framework.Failf("node %v topology labels %+v doesn't match allowed topologies +%v", node.Name, node.Labels, allowedTopos)
  268. }
  269. func (t *topologyTestSuite) createResources(cs clientset.Interface, l *topologyTest, affinity *v1.Affinity) {
  270. var err error
  271. framework.Logf("Creating storage class object and pvc object for driver - sc: %v, pvc: %v", l.resource.Sc, l.resource.Pvc)
  272. ginkgo.By("Creating sc")
  273. l.resource.Sc, err = cs.StorageV1().StorageClasses().Create(context.TODO(), l.resource.Sc, metav1.CreateOptions{})
  274. framework.ExpectNoError(err)
  275. ginkgo.By("Creating pvc")
  276. l.resource.Pvc, err = cs.CoreV1().PersistentVolumeClaims(l.resource.Pvc.Namespace).Create(context.TODO(), l.resource.Pvc, metav1.CreateOptions{})
  277. framework.ExpectNoError(err)
  278. ginkgo.By("Creating pod")
  279. l.pod = e2epod.MakeSecPod(l.config.Framework.Namespace.Name,
  280. []*v1.PersistentVolumeClaim{l.resource.Pvc},
  281. nil,
  282. false,
  283. "",
  284. false,
  285. false,
  286. e2epv.SELinuxLabel,
  287. nil)
  288. l.pod.Spec.Affinity = affinity
  289. l.pod, err = cs.CoreV1().Pods(l.pod.Namespace).Create(context.TODO(), l.pod, metav1.CreateOptions{})
  290. framework.ExpectNoError(err)
  291. }
  292. func (t *topologyTestSuite) CleanupResources(cs clientset.Interface, l *topologyTest) {
  293. if l.pod != nil {
  294. ginkgo.By("Deleting pod")
  295. err := e2epod.DeletePodWithWait(cs, l.pod)
  296. framework.ExpectNoError(err, "while deleting pod")
  297. }
  298. err := l.resource.CleanupResource()
  299. framework.ExpectNoError(err, "while clean up resource")
  300. }