util.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "fmt"
  16. "net/http"
  17. "net/http/httptest"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. policy "k8s.io/api/policy/v1beta1"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. "k8s.io/apimachinery/pkg/api/resource"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/labels"
  26. "k8s.io/apimachinery/pkg/runtime/schema"
  27. "k8s.io/apimachinery/pkg/util/uuid"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/apiserver/pkg/admission"
  30. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  31. "k8s.io/client-go/dynamic"
  32. "k8s.io/client-go/informers"
  33. coreinformers "k8s.io/client-go/informers/core/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
  36. corelisters "k8s.io/client-go/listers/core/v1"
  37. restclient "k8s.io/client-go/rest"
  38. "k8s.io/client-go/restmapper"
  39. "k8s.io/client-go/scale"
  40. "k8s.io/client-go/tools/record"
  41. "k8s.io/kubernetes/pkg/api/legacyscheme"
  42. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  43. "k8s.io/kubernetes/pkg/controller"
  44. "k8s.io/kubernetes/pkg/controller/disruption"
  45. "k8s.io/kubernetes/pkg/scheduler"
  46. schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
  47. // Register defaults in pkg/scheduler/algorithmprovider.
  48. _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
  49. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  50. "k8s.io/kubernetes/pkg/scheduler/factory"
  51. schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  52. taintutils "k8s.io/kubernetes/pkg/util/taints"
  53. "k8s.io/kubernetes/test/integration/framework"
  54. imageutils "k8s.io/kubernetes/test/utils/image"
  55. )
  56. type testContext struct {
  57. closeFn framework.CloseFunc
  58. httpServer *httptest.Server
  59. ns *v1.Namespace
  60. clientSet *clientset.Clientset
  61. informerFactory informers.SharedInformerFactory
  62. schedulerConfigFactory factory.Configurator
  63. schedulerConfig *factory.Config
  64. scheduler *scheduler.Scheduler
  65. stopCh chan struct{}
  66. }
  67. // createConfiguratorWithPodInformer creates a configurator for scheduler.
  68. func createConfiguratorWithPodInformer(
  69. schedulerName string,
  70. clientSet clientset.Interface,
  71. podInformer coreinformers.PodInformer,
  72. informerFactory informers.SharedInformerFactory,
  73. pluginRegistry schedulerframework.Registry,
  74. plugins *schedulerconfig.Plugins,
  75. pluginConfig []schedulerconfig.PluginConfig,
  76. stopCh <-chan struct{},
  77. ) factory.Configurator {
  78. return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
  79. SchedulerName: schedulerName,
  80. Client: clientSet,
  81. NodeInformer: informerFactory.Core().V1().Nodes(),
  82. PodInformer: podInformer,
  83. PvInformer: informerFactory.Core().V1().PersistentVolumes(),
  84. PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
  85. ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
  86. ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
  87. StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
  88. ServiceInformer: informerFactory.Core().V1().Services(),
  89. PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  90. StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
  91. Registry: pluginRegistry,
  92. Plugins: plugins,
  93. PluginConfig: pluginConfig,
  94. HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
  95. DisablePreemption: false,
  96. PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
  97. BindTimeoutSeconds: 600,
  98. StopCh: stopCh,
  99. })
  100. }
  101. // initTestMasterAndScheduler initializes a test environment and creates a master with default
  102. // configuration.
  103. func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *testContext {
  104. context := testContext{
  105. stopCh: make(chan struct{}),
  106. }
  107. // 1. Create master
  108. h := &framework.MasterHolder{Initialized: make(chan struct{})}
  109. s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  110. <-h.Initialized
  111. h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
  112. }))
  113. masterConfig := framework.NewIntegrationTestMasterConfig()
  114. if admission != nil {
  115. masterConfig.GenericConfig.AdmissionControl = admission
  116. }
  117. _, context.httpServer, context.closeFn = framework.RunAMasterUsingServer(masterConfig, s, h)
  118. if nsPrefix != "default" {
  119. context.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), s, t)
  120. } else {
  121. context.ns = framework.CreateTestingNamespace("default", s, t)
  122. }
  123. // 2. Create kubeclient
  124. context.clientSet = clientset.NewForConfigOrDie(
  125. &restclient.Config{
  126. QPS: -1, Host: s.URL,
  127. ContentConfig: restclient.ContentConfig{
  128. GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
  129. },
  130. },
  131. )
  132. return &context
  133. }
  134. // initTestScheduler initializes a test environment and creates a scheduler with default
  135. // configuration.
  136. func initTestScheduler(
  137. t *testing.T,
  138. context *testContext,
  139. setPodInformer bool,
  140. policy *schedulerapi.Policy,
  141. ) *testContext {
  142. // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
  143. // feature gate is enabled at the same time.
  144. return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(),
  145. nil, []schedulerconfig.PluginConfig{}, false, time.Second)
  146. }
  147. // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
  148. // configuration and other options.
  149. func initTestSchedulerWithOptions(
  150. t *testing.T,
  151. context *testContext,
  152. setPodInformer bool,
  153. policy *schedulerapi.Policy,
  154. pluginRegistry schedulerframework.Registry,
  155. plugins *schedulerconfig.Plugins,
  156. pluginConfig []schedulerconfig.PluginConfig,
  157. disablePreemption bool,
  158. resyncPeriod time.Duration,
  159. ) *testContext {
  160. // 1. Create scheduler
  161. context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
  162. var podInformer coreinformers.PodInformer
  163. // create independent pod informer if required
  164. if setPodInformer {
  165. podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour)
  166. } else {
  167. podInformer = context.informerFactory.Core().V1().Pods()
  168. }
  169. context.schedulerConfigFactory = createConfiguratorWithPodInformer(
  170. v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, plugins,
  171. pluginConfig, context.stopCh)
  172. var err error
  173. if policy != nil {
  174. context.schedulerConfig, err = context.schedulerConfigFactory.CreateFromConfig(*policy)
  175. } else {
  176. context.schedulerConfig, err = context.schedulerConfigFactory.Create()
  177. }
  178. if err != nil {
  179. t.Fatalf("Couldn't create scheduler config: %v", err)
  180. }
  181. // set DisablePreemption option
  182. context.schedulerConfig.DisablePreemption = disablePreemption
  183. context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
  184. scheduler.AddAllEventHandlers(context.scheduler,
  185. v1.DefaultSchedulerName,
  186. context.informerFactory.Core().V1().Nodes(),
  187. podInformer,
  188. context.informerFactory.Core().V1().PersistentVolumes(),
  189. context.informerFactory.Core().V1().PersistentVolumeClaims(),
  190. context.informerFactory.Core().V1().Services(),
  191. context.informerFactory.Storage().V1().StorageClasses(),
  192. )
  193. // set setPodInformer if provided.
  194. if setPodInformer {
  195. go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
  196. controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
  197. }
  198. eventBroadcaster := record.NewBroadcaster()
  199. context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
  200. legacyscheme.Scheme,
  201. v1.EventSource{Component: v1.DefaultSchedulerName},
  202. )
  203. eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
  204. Interface: context.clientSet.CoreV1().Events(""),
  205. })
  206. context.informerFactory.Start(context.schedulerConfig.StopEverything)
  207. context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
  208. context.scheduler.Run()
  209. return context
  210. }
  211. // initDisruptionController initializes and runs a Disruption Controller to properly
  212. // update PodDisuptionBudget objects.
  213. func initDisruptionController(t *testing.T, context *testContext) *disruption.DisruptionController {
  214. informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour)
  215. discoveryClient := cacheddiscovery.NewMemCacheClient(context.clientSet.Discovery())
  216. mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
  217. config := restclient.Config{Host: context.httpServer.URL}
  218. scaleKindResolver := scale.NewDiscoveryScaleKindResolver(context.clientSet.Discovery())
  219. scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
  220. if err != nil {
  221. t.Fatalf("Error in create scaleClient: %v", err)
  222. }
  223. dc := disruption.NewDisruptionController(
  224. informers.Core().V1().Pods(),
  225. informers.Policy().V1beta1().PodDisruptionBudgets(),
  226. informers.Core().V1().ReplicationControllers(),
  227. informers.Apps().V1().ReplicaSets(),
  228. informers.Apps().V1().Deployments(),
  229. informers.Apps().V1().StatefulSets(),
  230. context.clientSet,
  231. mapper,
  232. scaleClient)
  233. informers.Start(context.schedulerConfig.StopEverything)
  234. informers.WaitForCacheSync(context.schedulerConfig.StopEverything)
  235. go dc.Run(context.schedulerConfig.StopEverything)
  236. return dc
  237. }
  238. // initTest initializes a test environment and creates master and scheduler with default
  239. // configuration.
  240. func initTest(t *testing.T, nsPrefix string) *testContext {
  241. return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), true, nil)
  242. }
  243. // initTestDisablePreemption initializes a test environment and creates master and scheduler with default
  244. // configuration but with pod preemption disabled.
  245. func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext {
  246. return initTestSchedulerWithOptions(
  247. t, initTestMaster(t, nsPrefix, nil), true, nil,
  248. schedulerframework.NewRegistry(), nil, []schedulerconfig.PluginConfig{},
  249. true, time.Second)
  250. }
  251. // cleanupTest deletes the scheduler and the test namespace. It should be called
  252. // at the end of a test.
  253. func cleanupTest(t *testing.T, context *testContext) {
  254. // Kill the scheduler.
  255. close(context.stopCh)
  256. // Cleanup nodes.
  257. context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
  258. framework.DeleteTestingNamespace(context.ns, context.httpServer, t)
  259. context.closeFn()
  260. }
  261. // waitForReflection waits till the passFunc confirms that the object it expects
  262. // to see is in the store. Used to observe reflected events.
  263. func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
  264. passFunc func(n interface{}) bool) error {
  265. nodes := []*v1.Node{}
  266. err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
  267. n, err := nodeLister.Get(key)
  268. switch {
  269. case err == nil && passFunc(n):
  270. return true, nil
  271. case errors.IsNotFound(err):
  272. nodes = append(nodes, nil)
  273. case err != nil:
  274. t.Errorf("Unexpected error: %v", err)
  275. default:
  276. nodes = append(nodes, n)
  277. }
  278. return false, nil
  279. })
  280. if err != nil {
  281. t.Logf("Logging consecutive node versions received from store:")
  282. for i, n := range nodes {
  283. t.Logf("%d: %#v", i, n)
  284. }
  285. }
  286. return err
  287. }
  288. // nodeHasLabels returns a function that checks if a node has all the given labels.
  289. func nodeHasLabels(cs clientset.Interface, nodeName string, labels map[string]string) wait.ConditionFunc {
  290. return func() (bool, error) {
  291. node, err := cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  292. if errors.IsNotFound(err) {
  293. return false, nil
  294. }
  295. if err != nil {
  296. // This could be a connection error so we want to retry.
  297. return false, nil
  298. }
  299. for k, v := range labels {
  300. if node.Labels == nil || node.Labels[k] != v {
  301. return false, nil
  302. }
  303. }
  304. return true, nil
  305. }
  306. }
  307. // waitForNodeLabels waits for the given node to have all the given labels.
  308. func waitForNodeLabels(cs clientset.Interface, nodeName string, labels map[string]string) error {
  309. return wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, nodeHasLabels(cs, nodeName, labels))
  310. }
  311. // initNode returns a node with the given resource list and images. If 'res' is nil, a predefined amount of
  312. // resource will be used.
  313. func initNode(name string, res *v1.ResourceList, images []v1.ContainerImage) *v1.Node {
  314. // if resource is nil, we use a default amount of resources for the node.
  315. if res == nil {
  316. res = &v1.ResourceList{
  317. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  318. }
  319. }
  320. n := &v1.Node{
  321. ObjectMeta: metav1.ObjectMeta{Name: name},
  322. Spec: v1.NodeSpec{Unschedulable: false},
  323. Status: v1.NodeStatus{
  324. Capacity: *res,
  325. Images: images,
  326. },
  327. }
  328. return n
  329. }
  330. // createNode creates a node with the given resource list.
  331. func createNode(cs clientset.Interface, name string, res *v1.ResourceList) (*v1.Node, error) {
  332. return cs.CoreV1().Nodes().Create(initNode(name, res, nil))
  333. }
  334. // createNodeWithImages creates a node with the given resource list and images.
  335. func createNodeWithImages(cs clientset.Interface, name string, res *v1.ResourceList, images []v1.ContainerImage) (*v1.Node, error) {
  336. return cs.CoreV1().Nodes().Create(initNode(name, res, images))
  337. }
  338. // updateNodeStatus updates the status of node.
  339. func updateNodeStatus(cs clientset.Interface, node *v1.Node) error {
  340. _, err := cs.CoreV1().Nodes().UpdateStatus(node)
  341. return err
  342. }
  343. // createNodes creates `numNodes` nodes. The created node names will be in the
  344. // form of "`prefix`-X" where X is an ordinal.
  345. func createNodes(cs clientset.Interface, prefix string, res *v1.ResourceList, numNodes int) ([]*v1.Node, error) {
  346. nodes := make([]*v1.Node, numNodes)
  347. for i := 0; i < numNodes; i++ {
  348. nodeName := fmt.Sprintf("%v-%d", prefix, i)
  349. node, err := createNode(cs, nodeName, res)
  350. if err != nil {
  351. return nodes[:], err
  352. }
  353. nodes[i] = node
  354. }
  355. return nodes[:], nil
  356. }
  357. // nodeTainted return a condition function that returns true if the given node contains
  358. // the taints.
  359. func nodeTainted(cs clientset.Interface, nodeName string, taints []v1.Taint) wait.ConditionFunc {
  360. return func() (bool, error) {
  361. node, err := cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  362. if err != nil {
  363. return false, err
  364. }
  365. if len(taints) != len(node.Spec.Taints) {
  366. return false, nil
  367. }
  368. for _, taint := range taints {
  369. if !taintutils.TaintExists(node.Spec.Taints, &taint) {
  370. return false, nil
  371. }
  372. }
  373. return true, nil
  374. }
  375. }
  376. // waitForNodeTaints waits for a node to have the target taints and returns
  377. // an error if it does not have taints within the given timeout.
  378. func waitForNodeTaints(cs clientset.Interface, node *v1.Node, taints []v1.Taint) error {
  379. return wait.Poll(100*time.Millisecond, 30*time.Second, nodeTainted(cs, node.Name, taints))
  380. }
  381. // cleanupNodes deletes all nodes.
  382. func cleanupNodes(cs clientset.Interface, t *testing.T) {
  383. err := cs.CoreV1().Nodes().DeleteCollection(
  384. metav1.NewDeleteOptions(0), metav1.ListOptions{})
  385. if err != nil {
  386. t.Errorf("error while deleting all nodes: %v", err)
  387. }
  388. }
  389. type pausePodConfig struct {
  390. Name string
  391. Namespace string
  392. Affinity *v1.Affinity
  393. Annotations, Labels, NodeSelector map[string]string
  394. Resources *v1.ResourceRequirements
  395. Tolerations []v1.Toleration
  396. NodeName string
  397. SchedulerName string
  398. Priority *int32
  399. }
  400. // initPausePod initializes a pod API object from the given config. It is used
  401. // mainly in pod creation process.
  402. func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
  403. pod := &v1.Pod{
  404. ObjectMeta: metav1.ObjectMeta{
  405. Name: conf.Name,
  406. Namespace: conf.Namespace,
  407. Labels: conf.Labels,
  408. Annotations: conf.Annotations,
  409. },
  410. Spec: v1.PodSpec{
  411. NodeSelector: conf.NodeSelector,
  412. Affinity: conf.Affinity,
  413. Containers: []v1.Container{
  414. {
  415. Name: conf.Name,
  416. Image: imageutils.GetPauseImageName(),
  417. },
  418. },
  419. Tolerations: conf.Tolerations,
  420. NodeName: conf.NodeName,
  421. SchedulerName: conf.SchedulerName,
  422. Priority: conf.Priority,
  423. },
  424. }
  425. if conf.Resources != nil {
  426. pod.Spec.Containers[0].Resources = *conf.Resources
  427. }
  428. return pod
  429. }
  430. // createPausePod creates a pod with "Pause" image and the given config and
  431. // return its pointer and error status.
  432. func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
  433. return cs.CoreV1().Pods(p.Namespace).Create(p)
  434. }
  435. // createPausePodWithResource creates a pod with "Pause" image and the given
  436. // resources and returns its pointer and error status. The resource list can be
  437. // nil.
  438. func createPausePodWithResource(cs clientset.Interface, podName string,
  439. nsName string, res *v1.ResourceList) (*v1.Pod, error) {
  440. var conf pausePodConfig
  441. if res == nil {
  442. conf = pausePodConfig{
  443. Name: podName,
  444. Namespace: nsName,
  445. }
  446. } else {
  447. conf = pausePodConfig{
  448. Name: podName,
  449. Namespace: nsName,
  450. Resources: &v1.ResourceRequirements{
  451. Requests: *res,
  452. },
  453. }
  454. }
  455. return createPausePod(cs, initPausePod(cs, &conf))
  456. }
  457. // runPausePod creates a pod with "Pause" image and the given config and waits
  458. // until it is scheduled. It returns its pointer and error status.
  459. func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
  460. pod, err := cs.CoreV1().Pods(pod.Namespace).Create(pod)
  461. if err != nil {
  462. return nil, fmt.Errorf("Error creating pause pod: %v", err)
  463. }
  464. if err = waitForPodToSchedule(cs, pod); err != nil {
  465. return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
  466. }
  467. if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
  468. return pod, fmt.Errorf("Error getting pod %v/%v info: %v", pod.Namespace, pod.Name, err)
  469. }
  470. return pod, nil
  471. }
  472. type podWithContainersConfig struct {
  473. Name string
  474. Namespace string
  475. Containers []v1.Container
  476. }
  477. // initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
  478. // pods with containers each having a specific image.
  479. func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod {
  480. pod := &v1.Pod{
  481. ObjectMeta: metav1.ObjectMeta{
  482. Name: conf.Name,
  483. Namespace: conf.Namespace,
  484. },
  485. Spec: v1.PodSpec{
  486. Containers: conf.Containers,
  487. },
  488. }
  489. return pod
  490. }
  491. // runPodWithContainers creates a pod with given config and containers and waits
  492. // until it is scheduled. It returns its pointer and error status.
  493. func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
  494. pod, err := cs.CoreV1().Pods(pod.Namespace).Create(pod)
  495. if err != nil {
  496. return nil, fmt.Errorf("Error creating pod-with-containers: %v", err)
  497. }
  498. if err = waitForPodToSchedule(cs, pod); err != nil {
  499. return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
  500. }
  501. if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
  502. return pod, fmt.Errorf("Error getting pod %v info: %v", pod.Name, err)
  503. }
  504. return pod, nil
  505. }
  506. // podDeleted returns true if a pod is not found in the given namespace.
  507. func podDeleted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  508. return func() (bool, error) {
  509. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  510. if errors.IsNotFound(err) {
  511. return true, nil
  512. }
  513. if pod.DeletionTimestamp != nil {
  514. return true, nil
  515. }
  516. return false, nil
  517. }
  518. }
  519. // podIsGettingEvicted returns true if the pod's deletion timestamp is set.
  520. func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  521. return func() (bool, error) {
  522. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  523. if err != nil {
  524. return false, err
  525. }
  526. if pod.DeletionTimestamp != nil {
  527. return true, nil
  528. }
  529. return false, nil
  530. }
  531. }
  532. // podScheduled returns true if a node is assigned to the given pod.
  533. func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  534. return func() (bool, error) {
  535. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  536. if errors.IsNotFound(err) {
  537. return false, nil
  538. }
  539. if err != nil {
  540. // This could be a connection error so we want to retry.
  541. return false, nil
  542. }
  543. if pod.Spec.NodeName == "" {
  544. return false, nil
  545. }
  546. return true, nil
  547. }
  548. }
  549. // podUnschedulable returns a condition function that returns true if the given pod
  550. // gets unschedulable status.
  551. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  552. return func() (bool, error) {
  553. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  554. if errors.IsNotFound(err) {
  555. return false, nil
  556. }
  557. if err != nil {
  558. // This could be a connection error so we want to retry.
  559. return false, nil
  560. }
  561. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  562. return cond != nil && cond.Status == v1.ConditionFalse &&
  563. cond.Reason == v1.PodReasonUnschedulable, nil
  564. }
  565. }
  566. // podSchedulingError returns a condition function that returns true if the given pod
  567. // gets unschedulable status for reasons other than "Unschedulable". The scheduler
  568. // records such reasons in case of error.
  569. func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  570. return func() (bool, error) {
  571. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  572. if errors.IsNotFound(err) {
  573. return false, nil
  574. }
  575. if err != nil {
  576. // This could be a connection error so we want to retry.
  577. return false, nil
  578. }
  579. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  580. return cond != nil && cond.Status == v1.ConditionFalse &&
  581. cond.Reason != v1.PodReasonUnschedulable, nil
  582. }
  583. }
  584. // waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
  585. // an error if it does not scheduled within the given timeout.
  586. func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  587. return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name))
  588. }
  589. // waitForPodToSchedule waits for a pod to get scheduled and returns an error if
  590. // it does not get scheduled within the timeout duration (30 seconds).
  591. func waitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error {
  592. return waitForPodToScheduleWithTimeout(cs, pod, 30*time.Second)
  593. }
  594. // waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
  595. // an error if it does not become unschedulable within the given timeout.
  596. func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  597. return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
  598. }
  599. // waitForPodUnschedule waits for a pod to fail scheduling and returns
  600. // an error if it does not become unschedulable within the timeout duration (30 seconds).
  601. func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
  602. return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
  603. }
  604. // waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
  605. // the expected values.
  606. func waitForPDBsStable(context *testContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
  607. return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
  608. pdbList, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).List(metav1.ListOptions{})
  609. if err != nil {
  610. return false, err
  611. }
  612. if len(pdbList.Items) != len(pdbs) {
  613. return false, nil
  614. }
  615. for i, pdb := range pdbs {
  616. found := false
  617. for _, cpdb := range pdbList.Items {
  618. if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
  619. found = true
  620. if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
  621. return false, nil
  622. }
  623. }
  624. }
  625. if !found {
  626. return false, nil
  627. }
  628. }
  629. return true, nil
  630. })
  631. }
  632. // waitCachedPodsStable waits until scheduler cache has the given pods.
  633. func waitCachedPodsStable(context *testContext, pods []*v1.Pod) error {
  634. return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
  635. cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything())
  636. if err != nil {
  637. return false, err
  638. }
  639. if len(pods) != len(cachedPods) {
  640. return false, nil
  641. }
  642. for _, p := range pods {
  643. actualPod, err1 := context.clientSet.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{})
  644. if err1 != nil {
  645. return false, err1
  646. }
  647. cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod)
  648. if err2 != nil || cachedPod == nil {
  649. return false, err2
  650. }
  651. }
  652. return true, nil
  653. })
  654. }
  655. // deletePod deletes the given pod in the given namespace.
  656. func deletePod(cs clientset.Interface, podName string, nsName string) error {
  657. return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0))
  658. }
  659. // cleanupPods deletes the given pods and waits for them to be actually deleted.
  660. func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
  661. for _, p := range pods {
  662. err := cs.CoreV1().Pods(p.Namespace).Delete(p.Name, metav1.NewDeleteOptions(0))
  663. if err != nil && !errors.IsNotFound(err) {
  664. t.Errorf("error while deleting pod %v/%v: %v", p.Namespace, p.Name, err)
  665. }
  666. }
  667. for _, p := range pods {
  668. if err := wait.Poll(time.Millisecond, wait.ForeverTestTimeout,
  669. podDeleted(cs, p.Namespace, p.Name)); err != nil {
  670. t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err)
  671. }
  672. }
  673. }
  674. // noPodsInNamespace returns true if no pods in the given namespace.
  675. func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc {
  676. return func() (bool, error) {
  677. pods, err := c.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
  678. if err != nil {
  679. return false, err
  680. }
  681. return len(pods.Items) == 0, nil
  682. }
  683. }
  684. // cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to
  685. // be actually deleted.
  686. func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) {
  687. if err := cs.CoreV1().Pods(ns).DeleteCollection(nil, metav1.ListOptions{}); err != nil {
  688. t.Errorf("error while listing pod in namespace %v: %v", ns, err)
  689. return
  690. }
  691. if err := wait.Poll(time.Second, wait.ForeverTestTimeout,
  692. noPodsInNamespace(cs, ns)); err != nil {
  693. t.Errorf("error while waiting for pods in namespace %v: %v", ns, err)
  694. }
  695. }
  696. func waitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
  697. schedulerCacheIsEmpty := func() (bool, error) {
  698. snapshot := sched.Cache().Snapshot()
  699. return len(snapshot.Nodes) == 0 && len(snapshot.AssumedPods) == 0, nil
  700. }
  701. if err := wait.Poll(time.Second, wait.ForeverTestTimeout, schedulerCacheIsEmpty); err != nil {
  702. t.Errorf("Failed to wait for scheduler cache cleanup: %v", err)
  703. }
  704. }