util.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "fmt"
  17. "net/http"
  18. "net/http/httptest"
  19. "testing"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. policy "k8s.io/api/policy/v1beta1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/api/resource"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/runtime/schema"
  29. "k8s.io/apimachinery/pkg/util/uuid"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. "k8s.io/apiserver/pkg/admission"
  32. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  33. "k8s.io/client-go/dynamic"
  34. "k8s.io/client-go/informers"
  35. coreinformers "k8s.io/client-go/informers/core/v1"
  36. clientset "k8s.io/client-go/kubernetes"
  37. corelisters "k8s.io/client-go/listers/core/v1"
  38. restclient "k8s.io/client-go/rest"
  39. "k8s.io/client-go/restmapper"
  40. "k8s.io/client-go/scale"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/events"
  43. "k8s.io/kubernetes/pkg/api/legacyscheme"
  44. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  45. "k8s.io/kubernetes/pkg/controller/disruption"
  46. "k8s.io/kubernetes/pkg/scheduler"
  47. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  48. "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
  49. schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1"
  50. taintutils "k8s.io/kubernetes/pkg/util/taints"
  51. "k8s.io/kubernetes/test/integration/framework"
  52. imageutils "k8s.io/kubernetes/test/utils/image"
  53. )
  54. type testContext struct {
  55. closeFn framework.CloseFunc
  56. httpServer *httptest.Server
  57. ns *v1.Namespace
  58. clientSet *clientset.Clientset
  59. informerFactory informers.SharedInformerFactory
  60. scheduler *scheduler.Scheduler
  61. ctx context.Context
  62. cancelFn context.CancelFunc
  63. }
  64. func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clientset.Interface) schedulerapi.SchedulerAlgorithmSource {
  65. // Serialize the Policy object into a ConfigMap later.
  66. info, ok := runtime.SerializerInfoForMediaType(scheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
  67. if !ok {
  68. panic("could not find json serializer")
  69. }
  70. encoder := scheme.Codecs.EncoderForVersion(info.Serializer, schedulerapiv1.SchemeGroupVersion)
  71. policyString := runtime.EncodeOrDie(encoder, policy)
  72. configPolicyName := "scheduler-custom-policy-config"
  73. policyConfigMap := v1.ConfigMap{
  74. ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName},
  75. Data: map[string]string{schedulerapi.SchedulerPolicyConfigMapKey: policyString},
  76. }
  77. policyConfigMap.APIVersion = "v1"
  78. clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &policyConfigMap, metav1.CreateOptions{})
  79. return schedulerapi.SchedulerAlgorithmSource{
  80. Policy: &schedulerapi.SchedulerPolicySource{
  81. ConfigMap: &schedulerapi.SchedulerPolicyConfigMapSource{
  82. Namespace: policyConfigMap.Namespace,
  83. Name: policyConfigMap.Name,
  84. },
  85. },
  86. }
  87. }
  88. // initTestMasterAndScheduler initializes a test environment and creates a master with default
  89. // configuration.
  90. func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *testContext {
  91. ctx, cancelFunc := context.WithCancel(context.Background())
  92. testCtx := testContext{
  93. ctx: ctx,
  94. cancelFn: cancelFunc,
  95. }
  96. // 1. Create master
  97. h := &framework.MasterHolder{Initialized: make(chan struct{})}
  98. s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  99. <-h.Initialized
  100. h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
  101. }))
  102. masterConfig := framework.NewIntegrationTestMasterConfig()
  103. if admission != nil {
  104. masterConfig.GenericConfig.AdmissionControl = admission
  105. }
  106. _, testCtx.httpServer, testCtx.closeFn = framework.RunAMasterUsingServer(masterConfig, s, h)
  107. if nsPrefix != "default" {
  108. testCtx.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), s, t)
  109. } else {
  110. testCtx.ns = framework.CreateTestingNamespace("default", s, t)
  111. }
  112. // 2. Create kubeclient
  113. testCtx.clientSet = clientset.NewForConfigOrDie(
  114. &restclient.Config{
  115. QPS: -1, Host: s.URL,
  116. ContentConfig: restclient.ContentConfig{
  117. GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
  118. },
  119. },
  120. )
  121. return &testCtx
  122. }
  123. // initTestScheduler initializes a test environment and creates a scheduler with default
  124. // configuration.
  125. func initTestScheduler(
  126. t *testing.T,
  127. testCtx *testContext,
  128. setPodInformer bool,
  129. policy *schedulerapi.Policy,
  130. ) *testContext {
  131. // Pod preemption is enabled by default scheduler configuration.
  132. return initTestSchedulerWithOptions(t, testCtx, setPodInformer, policy, time.Second)
  133. }
  134. // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
  135. // configuration and other options.
  136. func initTestSchedulerWithOptions(
  137. t *testing.T,
  138. testCtx *testContext,
  139. setPodInformer bool,
  140. policy *schedulerapi.Policy,
  141. resyncPeriod time.Duration,
  142. opts ...scheduler.Option,
  143. ) *testContext {
  144. // 1. Create scheduler
  145. testCtx.informerFactory = informers.NewSharedInformerFactory(testCtx.clientSet, resyncPeriod)
  146. var podInformer coreinformers.PodInformer
  147. // create independent pod informer if required
  148. if setPodInformer {
  149. podInformer = scheduler.NewPodInformer(testCtx.clientSet, 12*time.Hour)
  150. } else {
  151. podInformer = testCtx.informerFactory.Core().V1().Pods()
  152. }
  153. var err error
  154. eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
  155. Interface: testCtx.clientSet.EventsV1beta1().Events(""),
  156. })
  157. recorder := eventBroadcaster.NewRecorder(
  158. legacyscheme.Scheme,
  159. v1.DefaultSchedulerName,
  160. )
  161. if policy != nil {
  162. opts = append(opts, scheduler.WithAlgorithmSource(createAlgorithmSourceFromPolicy(policy, testCtx.clientSet)))
  163. }
  164. opts = append([]scheduler.Option{scheduler.WithBindTimeoutSeconds(600)}, opts...)
  165. testCtx.scheduler, err = scheduler.New(
  166. testCtx.clientSet,
  167. testCtx.informerFactory,
  168. podInformer,
  169. recorder,
  170. testCtx.ctx.Done(),
  171. opts...,
  172. )
  173. if err != nil {
  174. t.Fatalf("Couldn't create scheduler: %v", err)
  175. }
  176. // set setPodInformer if provided.
  177. if setPodInformer {
  178. go podInformer.Informer().Run(testCtx.scheduler.StopEverything)
  179. cache.WaitForNamedCacheSync("scheduler", testCtx.scheduler.StopEverything, podInformer.Informer().HasSynced)
  180. }
  181. stopCh := make(chan struct{})
  182. eventBroadcaster.StartRecordingToSink(stopCh)
  183. testCtx.informerFactory.Start(testCtx.scheduler.StopEverything)
  184. testCtx.informerFactory.WaitForCacheSync(testCtx.scheduler.StopEverything)
  185. go testCtx.scheduler.Run(testCtx.ctx)
  186. return testCtx
  187. }
  188. // initDisruptionController initializes and runs a Disruption Controller to properly
  189. // update PodDisuptionBudget objects.
  190. func initDisruptionController(t *testing.T, testCtx *testContext) *disruption.DisruptionController {
  191. informers := informers.NewSharedInformerFactory(testCtx.clientSet, 12*time.Hour)
  192. discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.clientSet.Discovery())
  193. mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
  194. config := restclient.Config{Host: testCtx.httpServer.URL}
  195. scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.clientSet.Discovery())
  196. scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
  197. if err != nil {
  198. t.Fatalf("Error in create scaleClient: %v", err)
  199. }
  200. dc := disruption.NewDisruptionController(
  201. informers.Core().V1().Pods(),
  202. informers.Policy().V1beta1().PodDisruptionBudgets(),
  203. informers.Core().V1().ReplicationControllers(),
  204. informers.Apps().V1().ReplicaSets(),
  205. informers.Apps().V1().Deployments(),
  206. informers.Apps().V1().StatefulSets(),
  207. testCtx.clientSet,
  208. mapper,
  209. scaleClient)
  210. informers.Start(testCtx.scheduler.StopEverything)
  211. informers.WaitForCacheSync(testCtx.scheduler.StopEverything)
  212. go dc.Run(testCtx.scheduler.StopEverything)
  213. return dc
  214. }
  215. // initTest initializes a test environment and creates master and scheduler with default
  216. // configuration.
  217. func initTest(t *testing.T, nsPrefix string) *testContext {
  218. return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), true, nil)
  219. }
  220. // initTestDisablePreemption initializes a test environment and creates master and scheduler with default
  221. // configuration but with pod preemption disabled.
  222. func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext {
  223. return initTestSchedulerWithOptions(
  224. t, initTestMaster(t, nsPrefix, nil), true, nil,
  225. time.Second, scheduler.WithPreemptionDisabled(true))
  226. }
  227. // cleanupTest deletes the scheduler and the test namespace. It should be called
  228. // at the end of a test.
  229. func cleanupTest(t *testing.T, testCtx *testContext) {
  230. // Kill the scheduler.
  231. testCtx.cancelFn()
  232. // Cleanup nodes.
  233. testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), nil, metav1.ListOptions{})
  234. framework.DeleteTestingNamespace(testCtx.ns, testCtx.httpServer, t)
  235. testCtx.closeFn()
  236. }
  237. // waitForReflection waits till the passFunc confirms that the object it expects
  238. // to see is in the store. Used to observe reflected events.
  239. func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
  240. passFunc func(n interface{}) bool) error {
  241. nodes := []*v1.Node{}
  242. err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
  243. n, err := nodeLister.Get(key)
  244. switch {
  245. case err == nil && passFunc(n):
  246. return true, nil
  247. case apierrors.IsNotFound(err):
  248. nodes = append(nodes, nil)
  249. case err != nil:
  250. t.Errorf("Unexpected error: %v", err)
  251. default:
  252. nodes = append(nodes, n)
  253. }
  254. return false, nil
  255. })
  256. if err != nil {
  257. t.Logf("Logging consecutive node versions received from store:")
  258. for i, n := range nodes {
  259. t.Logf("%d: %#v", i, n)
  260. }
  261. }
  262. return err
  263. }
  264. // nodeHasLabels returns a function that checks if a node has all the given labels.
  265. func nodeHasLabels(cs clientset.Interface, nodeName string, labels map[string]string) wait.ConditionFunc {
  266. return func() (bool, error) {
  267. node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  268. if err != nil {
  269. // This could be a connection error so we want to retry.
  270. return false, nil
  271. }
  272. for k, v := range labels {
  273. if node.Labels == nil || node.Labels[k] != v {
  274. return false, nil
  275. }
  276. }
  277. return true, nil
  278. }
  279. }
  280. // waitForNodeLabels waits for the given node to have all the given labels.
  281. func waitForNodeLabels(cs clientset.Interface, nodeName string, labels map[string]string) error {
  282. return wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, nodeHasLabels(cs, nodeName, labels))
  283. }
  284. // initNode returns a node with the given resource list and images. If 'res' is nil, a predefined amount of
  285. // resource will be used.
  286. func initNode(name string, res *v1.ResourceList, images []v1.ContainerImage) *v1.Node {
  287. // if resource is nil, we use a default amount of resources for the node.
  288. if res == nil {
  289. res = &v1.ResourceList{
  290. v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
  291. }
  292. }
  293. n := &v1.Node{
  294. ObjectMeta: metav1.ObjectMeta{Name: name},
  295. Spec: v1.NodeSpec{Unschedulable: false},
  296. Status: v1.NodeStatus{
  297. Capacity: *res,
  298. Images: images,
  299. },
  300. }
  301. return n
  302. }
  303. // createNode creates a node with the given resource list.
  304. func createNode(cs clientset.Interface, name string, res *v1.ResourceList) (*v1.Node, error) {
  305. return cs.CoreV1().Nodes().Create(context.TODO(), initNode(name, res, nil), metav1.CreateOptions{})
  306. }
  307. // createNodeWithImages creates a node with the given resource list and images.
  308. func createNodeWithImages(cs clientset.Interface, name string, res *v1.ResourceList, images []v1.ContainerImage) (*v1.Node, error) {
  309. return cs.CoreV1().Nodes().Create(context.TODO(), initNode(name, res, images), metav1.CreateOptions{})
  310. }
  311. // updateNodeStatus updates the status of node.
  312. func updateNodeStatus(cs clientset.Interface, node *v1.Node) error {
  313. _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
  314. return err
  315. }
  316. // createNodes creates `numNodes` nodes. The created node names will be in the
  317. // form of "`prefix`-X" where X is an ordinal.
  318. func createNodes(cs clientset.Interface, prefix string, res *v1.ResourceList, numNodes int) ([]*v1.Node, error) {
  319. nodes := make([]*v1.Node, numNodes)
  320. for i := 0; i < numNodes; i++ {
  321. nodeName := fmt.Sprintf("%v-%d", prefix, i)
  322. node, err := createNode(cs, nodeName, res)
  323. if err != nil {
  324. return nodes[:], err
  325. }
  326. nodes[i] = node
  327. }
  328. return nodes[:], nil
  329. }
  330. // nodeTainted return a condition function that returns true if the given node contains
  331. // the taints.
  332. func nodeTainted(cs clientset.Interface, nodeName string, taints []v1.Taint) wait.ConditionFunc {
  333. return func() (bool, error) {
  334. node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  335. if err != nil {
  336. return false, err
  337. }
  338. // node.Spec.Taints may have more taints
  339. if len(taints) > len(node.Spec.Taints) {
  340. return false, nil
  341. }
  342. for _, taint := range taints {
  343. if !taintutils.TaintExists(node.Spec.Taints, &taint) {
  344. return false, nil
  345. }
  346. }
  347. return true, nil
  348. }
  349. }
  350. func addTaintToNode(cs clientset.Interface, nodeName string, taint v1.Taint) error {
  351. node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  352. if err != nil {
  353. return err
  354. }
  355. copy := node.DeepCopy()
  356. copy.Spec.Taints = append(copy.Spec.Taints, taint)
  357. _, err = cs.CoreV1().Nodes().Update(context.TODO(), copy, metav1.UpdateOptions{})
  358. return err
  359. }
  360. // waitForNodeTaints waits for a node to have the target taints and returns
  361. // an error if it does not have taints within the given timeout.
  362. func waitForNodeTaints(cs clientset.Interface, node *v1.Node, taints []v1.Taint) error {
  363. return wait.Poll(100*time.Millisecond, 30*time.Second, nodeTainted(cs, node.Name, taints))
  364. }
  365. // cleanupNodes deletes all nodes.
  366. func cleanupNodes(cs clientset.Interface, t *testing.T) {
  367. err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.NewDeleteOptions(0), metav1.ListOptions{})
  368. if err != nil {
  369. t.Errorf("error while deleting all nodes: %v", err)
  370. }
  371. }
  372. type pausePodConfig struct {
  373. Name string
  374. Namespace string
  375. Affinity *v1.Affinity
  376. Annotations, Labels, NodeSelector map[string]string
  377. Resources *v1.ResourceRequirements
  378. Tolerations []v1.Toleration
  379. NodeName string
  380. SchedulerName string
  381. Priority *int32
  382. PriorityClassName string
  383. }
  384. // initPausePod initializes a pod API object from the given config. It is used
  385. // mainly in pod creation process.
  386. func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod {
  387. pod := &v1.Pod{
  388. ObjectMeta: metav1.ObjectMeta{
  389. Name: conf.Name,
  390. Namespace: conf.Namespace,
  391. Labels: conf.Labels,
  392. Annotations: conf.Annotations,
  393. },
  394. Spec: v1.PodSpec{
  395. NodeSelector: conf.NodeSelector,
  396. Affinity: conf.Affinity,
  397. Containers: []v1.Container{
  398. {
  399. Name: conf.Name,
  400. Image: imageutils.GetPauseImageName(),
  401. },
  402. },
  403. Tolerations: conf.Tolerations,
  404. NodeName: conf.NodeName,
  405. SchedulerName: conf.SchedulerName,
  406. Priority: conf.Priority,
  407. PriorityClassName: conf.PriorityClassName,
  408. },
  409. }
  410. if conf.Resources != nil {
  411. pod.Spec.Containers[0].Resources = *conf.Resources
  412. }
  413. return pod
  414. }
  415. // createPausePod creates a pod with "Pause" image and the given config and
  416. // return its pointer and error status.
  417. func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
  418. return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
  419. }
  420. // createPausePodWithResource creates a pod with "Pause" image and the given
  421. // resources and returns its pointer and error status. The resource list can be
  422. // nil.
  423. func createPausePodWithResource(cs clientset.Interface, podName string,
  424. nsName string, res *v1.ResourceList) (*v1.Pod, error) {
  425. var conf pausePodConfig
  426. if res == nil {
  427. conf = pausePodConfig{
  428. Name: podName,
  429. Namespace: nsName,
  430. }
  431. } else {
  432. conf = pausePodConfig{
  433. Name: podName,
  434. Namespace: nsName,
  435. Resources: &v1.ResourceRequirements{
  436. Requests: *res,
  437. },
  438. }
  439. }
  440. return createPausePod(cs, initPausePod(cs, &conf))
  441. }
  442. // runPausePod creates a pod with "Pause" image and the given config and waits
  443. // until it is scheduled. It returns its pointer and error status.
  444. func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
  445. pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
  446. if err != nil {
  447. return nil, fmt.Errorf("Error creating pause pod: %v", err)
  448. }
  449. if err = waitForPodToSchedule(cs, pod); err != nil {
  450. return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
  451. }
  452. if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
  453. return pod, fmt.Errorf("Error getting pod %v/%v info: %v", pod.Namespace, pod.Name, err)
  454. }
  455. return pod, nil
  456. }
  457. type podWithContainersConfig struct {
  458. Name string
  459. Namespace string
  460. Containers []v1.Container
  461. }
  462. // initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
  463. // pods with containers each having a specific image.
  464. func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod {
  465. pod := &v1.Pod{
  466. ObjectMeta: metav1.ObjectMeta{
  467. Name: conf.Name,
  468. Namespace: conf.Namespace,
  469. },
  470. Spec: v1.PodSpec{
  471. Containers: conf.Containers,
  472. },
  473. }
  474. return pod
  475. }
  476. // runPodWithContainers creates a pod with given config and containers and waits
  477. // until it is scheduled. It returns its pointer and error status.
  478. func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
  479. pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
  480. if err != nil {
  481. return nil, fmt.Errorf("Error creating pod-with-containers: %v", err)
  482. }
  483. if err = waitForPodToSchedule(cs, pod); err != nil {
  484. return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
  485. }
  486. if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
  487. return pod, fmt.Errorf("Error getting pod %v info: %v", pod.Name, err)
  488. }
  489. return pod, nil
  490. }
  491. // podDeleted returns true if a pod is not found in the given namespace.
  492. func podDeleted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  493. return func() (bool, error) {
  494. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  495. if apierrors.IsNotFound(err) {
  496. return true, nil
  497. }
  498. if pod.DeletionTimestamp != nil {
  499. return true, nil
  500. }
  501. return false, nil
  502. }
  503. }
  504. // podIsGettingEvicted returns true if the pod's deletion timestamp is set.
  505. func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  506. return func() (bool, error) {
  507. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  508. if err != nil {
  509. return false, err
  510. }
  511. if pod.DeletionTimestamp != nil {
  512. return true, nil
  513. }
  514. return false, nil
  515. }
  516. }
  517. // podScheduled returns true if a node is assigned to the given pod.
  518. func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  519. return func() (bool, error) {
  520. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  521. if err != nil {
  522. // This could be a connection error so we want to retry.
  523. return false, nil
  524. }
  525. if pod.Spec.NodeName == "" {
  526. return false, nil
  527. }
  528. return true, nil
  529. }
  530. }
  531. // podScheduledIn returns true if a given pod is placed onto one of the expected nodes.
  532. func podScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc {
  533. return func() (bool, error) {
  534. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  535. if err != nil {
  536. // This could be a connection error so we want to retry.
  537. return false, nil
  538. }
  539. if pod.Spec.NodeName == "" {
  540. return false, nil
  541. }
  542. for _, nodeName := range nodeNames {
  543. if pod.Spec.NodeName == nodeName {
  544. return true, nil
  545. }
  546. }
  547. return false, nil
  548. }
  549. }
  550. // podUnschedulable returns a condition function that returns true if the given pod
  551. // gets unschedulable status.
  552. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  553. return func() (bool, error) {
  554. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  555. if err != nil {
  556. // This could be a connection error so we want to retry.
  557. return false, nil
  558. }
  559. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  560. return cond != nil && cond.Status == v1.ConditionFalse &&
  561. cond.Reason == v1.PodReasonUnschedulable, nil
  562. }
  563. }
  564. // podSchedulingError returns a condition function that returns true if the given pod
  565. // gets unschedulable status for reasons other than "Unschedulable". The scheduler
  566. // records such reasons in case of error.
  567. func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  568. return func() (bool, error) {
  569. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  570. if err != nil {
  571. // This could be a connection error so we want to retry.
  572. return false, nil
  573. }
  574. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  575. return cond != nil && cond.Status == v1.ConditionFalse &&
  576. cond.Reason != v1.PodReasonUnschedulable, nil
  577. }
  578. }
  579. // waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
  580. // an error if it does not scheduled within the given timeout.
  581. func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  582. return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name))
  583. }
  584. // waitForPodToSchedule waits for a pod to get scheduled and returns an error if
  585. // it does not get scheduled within the timeout duration (30 seconds).
  586. func waitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error {
  587. return waitForPodToScheduleWithTimeout(cs, pod, 30*time.Second)
  588. }
  589. // waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
  590. // an error if it does not become unschedulable within the given timeout.
  591. func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  592. return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
  593. }
  594. // waitForPodUnschedule waits for a pod to fail scheduling and returns
  595. // an error if it does not become unschedulable within the timeout duration (30 seconds).
  596. func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
  597. return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
  598. }
  599. // waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
  600. // the expected values.
  601. func waitForPDBsStable(testCtx *testContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
  602. return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
  603. pdbList, err := testCtx.clientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.ns.Name).List(context.TODO(), metav1.ListOptions{})
  604. if err != nil {
  605. return false, err
  606. }
  607. if len(pdbList.Items) != len(pdbs) {
  608. return false, nil
  609. }
  610. for i, pdb := range pdbs {
  611. found := false
  612. for _, cpdb := range pdbList.Items {
  613. if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
  614. found = true
  615. if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
  616. return false, nil
  617. }
  618. }
  619. }
  620. if !found {
  621. return false, nil
  622. }
  623. }
  624. return true, nil
  625. })
  626. }
  627. // waitCachedPodsStable waits until scheduler cache has the given pods.
  628. func waitCachedPodsStable(testCtx *testContext, pods []*v1.Pod) error {
  629. return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
  630. cachedPods, err := testCtx.scheduler.SchedulerCache.List(labels.Everything())
  631. if err != nil {
  632. return false, err
  633. }
  634. if len(pods) != len(cachedPods) {
  635. return false, nil
  636. }
  637. for _, p := range pods {
  638. actualPod, err1 := testCtx.clientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
  639. if err1 != nil {
  640. return false, err1
  641. }
  642. cachedPod, err2 := testCtx.scheduler.SchedulerCache.GetPod(actualPod)
  643. if err2 != nil || cachedPod == nil {
  644. return false, err2
  645. }
  646. }
  647. return true, nil
  648. })
  649. }
  650. // deletePod deletes the given pod in the given namespace.
  651. func deletePod(cs clientset.Interface, podName string, nsName string) error {
  652. return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, metav1.NewDeleteOptions(0))
  653. }
  654. func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
  655. return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  656. }
  657. // cleanupPods deletes the given pods and waits for them to be actually deleted.
  658. func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
  659. for _, p := range pods {
  660. err := cs.CoreV1().Pods(p.Namespace).Delete(context.TODO(), p.Name, metav1.NewDeleteOptions(0))
  661. if err != nil && !apierrors.IsNotFound(err) {
  662. t.Errorf("error while deleting pod %v/%v: %v", p.Namespace, p.Name, err)
  663. }
  664. }
  665. for _, p := range pods {
  666. if err := wait.Poll(time.Millisecond, wait.ForeverTestTimeout,
  667. podDeleted(cs, p.Namespace, p.Name)); err != nil {
  668. t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err)
  669. }
  670. }
  671. }
  672. // noPodsInNamespace returns true if no pods in the given namespace.
  673. func noPodsInNamespace(c clientset.Interface, podNamespace string) wait.ConditionFunc {
  674. return func() (bool, error) {
  675. pods, err := c.CoreV1().Pods(podNamespace).List(context.TODO(), metav1.ListOptions{})
  676. if err != nil {
  677. return false, err
  678. }
  679. return len(pods.Items) == 0, nil
  680. }
  681. }
  682. // cleanupPodsInNamespace deletes the pods in the given namespace and waits for them to
  683. // be actually deleted.
  684. func cleanupPodsInNamespace(cs clientset.Interface, t *testing.T, ns string) {
  685. if err := cs.CoreV1().Pods(ns).DeleteCollection(context.TODO(), nil, metav1.ListOptions{}); err != nil {
  686. t.Errorf("error while listing pod in namespace %v: %v", ns, err)
  687. return
  688. }
  689. if err := wait.Poll(time.Second, wait.ForeverTestTimeout,
  690. noPodsInNamespace(cs, ns)); err != nil {
  691. t.Errorf("error while waiting for pods in namespace %v: %v", ns, err)
  692. }
  693. }
  694. func waitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
  695. schedulerCacheIsEmpty := func() (bool, error) {
  696. dump := sched.Cache().Dump()
  697. return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil
  698. }
  699. if err := wait.Poll(time.Second, wait.ForeverTestTimeout, schedulerCacheIsEmpty); err != nil {
  700. t.Errorf("Failed to wait for scheduler cache cleanup: %v", err)
  701. }
  702. }