daemonset_test.go 30 KB


  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package daemonset
  14. import (
  15. "context"
  16. "fmt"
  17. "net/http/httptest"
  18. "testing"
  19. "time"
  20. apps "k8s.io/api/apps/v1"
  21. v1 "k8s.io/api/core/v1"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. "k8s.io/apimachinery/pkg/api/resource"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/util/intstr"
  26. "k8s.io/apimachinery/pkg/util/uuid"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/client-go/informers"
  29. clientset "k8s.io/client-go/kubernetes"
  30. appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
  31. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  32. restclient "k8s.io/client-go/rest"
  33. "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/tools/events"
  35. "k8s.io/client-go/util/flowcontrol"
  36. "k8s.io/client-go/util/retry"
  37. "k8s.io/kubernetes/pkg/api/legacyscheme"
  38. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  39. api "k8s.io/kubernetes/pkg/apis/core"
  40. "k8s.io/kubernetes/pkg/controller"
  41. "k8s.io/kubernetes/pkg/controller/daemon"
  42. "k8s.io/kubernetes/pkg/scheduler"
  43. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  44. "k8s.io/kubernetes/test/integration/framework"
  45. )
  46. var zero = int64(0)
  47. func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
  48. masterConfig := framework.NewIntegrationTestMasterConfig()
  49. _, server, closeFn := framework.RunAMaster(masterConfig)
  50. config := restclient.Config{Host: server.URL}
  51. clientSet, err := clientset.NewForConfig(&config)
  52. if err != nil {
  53. t.Fatalf("Error in creating clientset: %v", err)
  54. }
  55. resyncPeriod := 12 * time.Hour
  56. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
  57. dc, err := daemon.NewDaemonSetsController(
  58. informers.Apps().V1().DaemonSets(),
  59. informers.Apps().V1().ControllerRevisions(),
  60. informers.Core().V1().Pods(),
  61. informers.Core().V1().Nodes(),
  62. clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")),
  63. flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
  64. )
  65. if err != nil {
  66. t.Fatalf("error creating DaemonSets controller: %v", err)
  67. }
  68. return server, closeFn, dc, informers, clientSet
  69. }
  70. func setupScheduler(
  71. ctx context.Context,
  72. t *testing.T,
  73. cs clientset.Interface,
  74. informerFactory informers.SharedInformerFactory,
  75. ) {
  76. eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
  77. Interface: cs.EventsV1beta1().Events(""),
  78. })
  79. sched, err := scheduler.New(
  80. cs,
  81. informerFactory,
  82. informerFactory.Core().V1().Pods(),
  83. eventBroadcaster.NewRecorder(
  84. legacyscheme.Scheme,
  85. v1.DefaultSchedulerName,
  86. ),
  87. ctx.Done(),
  88. )
  89. if err != nil {
  90. t.Fatalf("Couldn't create scheduler: %v", err)
  91. }
  92. eventBroadcaster.StartRecordingToSink(ctx.Done())
  93. go sched.Run(ctx)
  94. return
  95. }
  96. func testLabels() map[string]string {
  97. return map[string]string{"name": "test"}
  98. }
  99. func newDaemonSet(name, namespace string) *apps.DaemonSet {
  100. two := int32(2)
  101. return &apps.DaemonSet{
  102. TypeMeta: metav1.TypeMeta{
  103. Kind: "DaemonSet",
  104. APIVersion: "apps/v1",
  105. },
  106. ObjectMeta: metav1.ObjectMeta{
  107. Namespace: namespace,
  108. Name: name,
  109. },
  110. Spec: apps.DaemonSetSpec{
  111. RevisionHistoryLimit: &two,
  112. Selector: &metav1.LabelSelector{MatchLabels: testLabels()},
  113. UpdateStrategy: apps.DaemonSetUpdateStrategy{
  114. Type: apps.OnDeleteDaemonSetStrategyType,
  115. },
  116. Template: v1.PodTemplateSpec{
  117. ObjectMeta: metav1.ObjectMeta{
  118. Labels: testLabels(),
  119. },
  120. Spec: v1.PodSpec{
  121. Containers: []v1.Container{{Name: "foo", Image: "bar"}},
  122. TerminationGracePeriodSeconds: &zero,
  123. },
  124. },
  125. },
  126. }
  127. }
  128. func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) {
  129. ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  130. if err != nil {
  131. t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  132. return
  133. }
  134. // We set the nodeSelector to a random label. This label is nearly guaranteed
  135. // to not be set on any node so the DameonSetController will start deleting
  136. // daemon pods. Once it's done deleting the daemon pods, it's safe to delete
  137. // the DaemonSet.
  138. ds.Spec.Template.Spec.NodeSelector = map[string]string{
  139. string(uuid.NewUUID()): string(uuid.NewUUID()),
  140. }
  141. // force update to avoid version conflict
  142. ds.ResourceVersion = ""
  143. if ds, err = cs.AppsV1().DaemonSets(ds.Namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}); err != nil {
  144. t.Errorf("Failed to update DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  145. return
  146. }
  147. // Wait for the daemon set controller to kill all the daemon pods.
  148. if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
  149. updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  150. if err != nil {
  151. return false, nil
  152. }
  153. return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
  154. }); err != nil {
  155. t.Errorf("Failed to kill the pods of DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  156. return
  157. }
  158. falseVar := false
  159. deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
  160. if err := cs.AppsV1().DaemonSets(ds.Namespace).Delete(context.TODO(), ds.Name, deleteOptions); err != nil {
  161. t.Errorf("Failed to delete DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  162. }
  163. }
  164. func newRollbackStrategy() *apps.DaemonSetUpdateStrategy {
  165. one := intstr.FromInt(1)
  166. return &apps.DaemonSetUpdateStrategy{
  167. Type: apps.RollingUpdateDaemonSetStrategyType,
  168. RollingUpdate: &apps.RollingUpdateDaemonSet{MaxUnavailable: &one},
  169. }
  170. }
  171. func newOnDeleteStrategy() *apps.DaemonSetUpdateStrategy {
  172. return &apps.DaemonSetUpdateStrategy{
  173. Type: apps.OnDeleteDaemonSetStrategyType,
  174. }
  175. }
  176. func updateStrategies() []*apps.DaemonSetUpdateStrategy {
  177. return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
  178. }
  179. func allocatableResources(memory, cpu string) v1.ResourceList {
  180. return v1.ResourceList{
  181. v1.ResourceMemory: resource.MustParse(memory),
  182. v1.ResourceCPU: resource.MustParse(cpu),
  183. v1.ResourcePods: resource.MustParse("100"),
  184. }
  185. }
  186. func resourcePodSpec(nodeName, memory, cpu string) v1.PodSpec {
  187. return v1.PodSpec{
  188. NodeName: nodeName,
  189. Containers: []v1.Container{
  190. {
  191. Name: "foo",
  192. Image: "bar",
  193. Resources: v1.ResourceRequirements{
  194. Requests: v1.ResourceList{
  195. v1.ResourceMemory: resource.MustParse(memory),
  196. v1.ResourceCPU: resource.MustParse(cpu),
  197. },
  198. },
  199. },
  200. },
  201. TerminationGracePeriodSeconds: &zero,
  202. }
  203. }
  204. func newNode(name string, label map[string]string) *v1.Node {
  205. return &v1.Node{
  206. TypeMeta: metav1.TypeMeta{
  207. Kind: "Node",
  208. APIVersion: "v1",
  209. },
  210. ObjectMeta: metav1.ObjectMeta{
  211. Name: name,
  212. Labels: label,
  213. Namespace: metav1.NamespaceNone,
  214. },
  215. Status: v1.NodeStatus{
  216. Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
  217. Allocatable: v1.ResourceList{v1.ResourcePods: resource.MustParse("100")},
  218. },
  219. }
  220. }
  221. func addNodes(nodeClient corev1client.NodeInterface, startIndex, numNodes int, label map[string]string, t *testing.T) {
  222. for i := startIndex; i < startIndex+numNodes; i++ {
  223. _, err := nodeClient.Create(context.TODO(), newNode(fmt.Sprintf("node-%d", i), label), metav1.CreateOptions{})
  224. if err != nil {
  225. t.Fatalf("Failed to create node: %v", err)
  226. }
  227. }
  228. }
  229. func validateDaemonSetPodsAndMarkReady(
  230. podClient corev1client.PodInterface,
  231. podInformer cache.SharedIndexInformer,
  232. numberPods int,
  233. t *testing.T,
  234. ) {
  235. if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
  236. objects := podInformer.GetIndexer().List()
  237. if len(objects) != numberPods {
  238. return false, nil
  239. }
  240. for _, object := range objects {
  241. pod := object.(*v1.Pod)
  242. ownerReferences := pod.ObjectMeta.OwnerReferences
  243. if len(ownerReferences) != 1 {
  244. return false, fmt.Errorf("Pod %s has %d OwnerReferences, expected only 1", pod.Name, len(ownerReferences))
  245. }
  246. controllerRef := ownerReferences[0]
  247. if got, want := controllerRef.Kind, "DaemonSet"; got != want {
  248. t.Errorf("controllerRef.Kind = %q, want %q", got, want)
  249. }
  250. if controllerRef.Controller == nil || *controllerRef.Controller != true {
  251. t.Errorf("controllerRef.Controller is not set to true")
  252. }
  253. if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
  254. podCopy := pod.DeepCopy()
  255. podCopy.Status = v1.PodStatus{
  256. Phase: v1.PodRunning,
  257. Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
  258. }
  259. _, err := podClient.UpdateStatus(context.TODO(), podCopy, metav1.UpdateOptions{})
  260. if err != nil {
  261. return false, err
  262. }
  263. }
  264. }
  265. return true, nil
  266. }); err != nil {
  267. t.Fatal(err)
  268. }
  269. }
  270. // podUnschedulable returns a condition function that returns true if the given pod
  271. // gets unschedulable status.
  272. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  273. return func() (bool, error) {
  274. pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  275. if apierrors.IsNotFound(err) {
  276. return false, nil
  277. }
  278. if err != nil {
  279. // This could be a connection error so we want to retry.
  280. return false, nil
  281. }
  282. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  283. return cond != nil && cond.Status == v1.ConditionFalse &&
  284. cond.Reason == v1.PodReasonUnschedulable, nil
  285. }
  286. }
  287. // waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
  288. // an error if it does not become unschedulable within the given timeout.
  289. func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  290. return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
  291. }
  292. // waitForPodUnschedule waits for a pod to fail scheduling and returns
  293. // an error if it does not become unschedulable within the timeout duration (30 seconds).
  294. func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
  295. return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second)
  296. }
  297. // waitForPodsCreated waits for number of pods are created.
  298. func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
  299. return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  300. objects := podInformer.GetIndexer().List()
  301. return len(objects) == num, nil
  302. })
  303. }
  304. func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
  305. return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  306. ds, err := c.AppsV1().DaemonSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
  307. if err != nil {
  308. return false, err
  309. }
  310. if ds == nil {
  311. return false, nil
  312. }
  313. revs, err := c.AppsV1().ControllerRevisions(namespace).List(context.TODO(), metav1.ListOptions{})
  314. if err != nil {
  315. return false, err
  316. }
  317. if revs.Size() == 0 {
  318. return false, nil
  319. }
  320. for _, rev := range revs.Items {
  321. for _, oref := range rev.OwnerReferences {
  322. if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
  323. return true, nil
  324. }
  325. }
  326. }
  327. return false, nil
  328. })
  329. }
  330. func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
  331. hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
  332. name := ds.Name + "-" + hash
  333. return hash, name
  334. }
  335. func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
  336. ds, err := dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
  337. if err != nil {
  338. t.Fatalf("Failed to look up DaemonSet: %v", err)
  339. }
  340. collisionCount := ds.Status.CollisionCount
  341. if *collisionCount != expCount {
  342. t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
  343. }
  344. }
  345. func validateDaemonSetStatus(
  346. dsClient appstyped.DaemonSetInterface,
  347. dsName string,
  348. expectedNumberReady int32,
  349. t *testing.T) {
  350. if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  351. ds, err := dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
  352. if err != nil {
  353. return false, err
  354. }
  355. return ds.Status.NumberReady == expectedNumberReady, nil
  356. }); err != nil {
  357. t.Fatal(err)
  358. }
  359. }
  360. func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
  361. var ds *apps.DaemonSet
  362. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  363. newDS, err := dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
  364. if err != nil {
  365. return err
  366. }
  367. updateFunc(newDS)
  368. ds, err = dsClient.Update(context.TODO(), newDS, metav1.UpdateOptions{})
  369. return err
  370. }); err != nil {
  371. t.Fatalf("Failed to update DaemonSet: %v", err)
  372. }
  373. return ds
  374. }
  375. func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
  376. for _, strategy := range updateStrategies() {
  377. t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
  378. func(tt *testing.T) { tf(tt, strategy) })
  379. }
  380. }
  381. func TestOneNodeDaemonLaunchesPod(t *testing.T) {
  382. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  383. server, closeFn, dc, informers, clientset := setup(t)
  384. defer closeFn()
  385. ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
  386. defer framework.DeleteTestingNamespace(ns, server, t)
  387. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  388. podClient := clientset.CoreV1().Pods(ns.Name)
  389. nodeClient := clientset.CoreV1().Nodes()
  390. podInformer := informers.Core().V1().Pods().Informer()
  391. ctx, cancel := context.WithCancel(context.Background())
  392. defer cancel()
  393. // Start Scheduler
  394. setupScheduler(ctx, t, clientset, informers)
  395. informers.Start(ctx.Done())
  396. go dc.Run(5, ctx.Done())
  397. ds := newDaemonSet("foo", ns.Name)
  398. ds.Spec.UpdateStrategy = *strategy
  399. _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  400. if err != nil {
  401. t.Fatalf("Failed to create DaemonSet: %v", err)
  402. }
  403. defer cleanupDaemonSets(t, clientset, ds)
  404. _, err = nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
  405. if err != nil {
  406. t.Fatalf("Failed to create node: %v", err)
  407. }
  408. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  409. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  410. })
  411. }
  412. func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
  413. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  414. server, closeFn, dc, informers, clientset := setup(t)
  415. defer closeFn()
  416. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  417. defer framework.DeleteTestingNamespace(ns, server, t)
  418. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  419. podClient := clientset.CoreV1().Pods(ns.Name)
  420. nodeClient := clientset.CoreV1().Nodes()
  421. podInformer := informers.Core().V1().Pods().Informer()
  422. ctx, cancel := context.WithCancel(context.Background())
  423. defer cancel()
  424. informers.Start(ctx.Done())
  425. go dc.Run(5, ctx.Done())
  426. // Start Scheduler
  427. setupScheduler(ctx, t, clientset, informers)
  428. ds := newDaemonSet("foo", ns.Name)
  429. ds.Spec.UpdateStrategy = *strategy
  430. _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  431. if err != nil {
  432. t.Fatalf("Failed to create DaemonSet: %v", err)
  433. }
  434. defer cleanupDaemonSets(t, clientset, ds)
  435. addNodes(nodeClient, 0, 5, nil, t)
  436. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
  437. validateDaemonSetStatus(dsClient, ds.Name, 5, t)
  438. })
  439. }
  440. func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
  441. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  442. server, closeFn, dc, informers, clientset := setup(t)
  443. defer closeFn()
  444. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  445. defer framework.DeleteTestingNamespace(ns, server, t)
  446. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  447. podClient := clientset.CoreV1().Pods(ns.Name)
  448. nodeClient := clientset.CoreV1().Nodes()
  449. podInformer := informers.Core().V1().Pods().Informer()
  450. ctx, cancel := context.WithCancel(context.Background())
  451. defer cancel()
  452. informers.Start(ctx.Done())
  453. go dc.Run(5, ctx.Done())
  454. // Start Scheduler
  455. setupScheduler(ctx, t, clientset, informers)
  456. ds := newDaemonSet("foo", ns.Name)
  457. ds.Spec.UpdateStrategy = *strategy
  458. ds.Spec.Template.Spec.Affinity = &v1.Affinity{
  459. NodeAffinity: &v1.NodeAffinity{
  460. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  461. NodeSelectorTerms: []v1.NodeSelectorTerm{
  462. {
  463. MatchExpressions: []v1.NodeSelectorRequirement{
  464. {
  465. Key: "zone",
  466. Operator: v1.NodeSelectorOpIn,
  467. Values: []string{"test"},
  468. },
  469. },
  470. },
  471. {
  472. MatchFields: []v1.NodeSelectorRequirement{
  473. {
  474. Key: api.ObjectNameField,
  475. Operator: v1.NodeSelectorOpIn,
  476. Values: []string{"node-1"},
  477. },
  478. },
  479. },
  480. },
  481. },
  482. },
  483. }
  484. _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  485. if err != nil {
  486. t.Fatalf("Failed to create DaemonSet: %v", err)
  487. }
  488. defer cleanupDaemonSets(t, clientset, ds)
  489. addNodes(nodeClient, 0, 2, nil, t)
  490. // Two nodes with labels
  491. addNodes(nodeClient, 2, 2, map[string]string{
  492. "zone": "test",
  493. }, t)
  494. addNodes(nodeClient, 4, 2, nil, t)
  495. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
  496. validateDaemonSetStatus(dsClient, ds.Name, 3, t)
  497. })
  498. }
  499. func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
  500. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  501. server, closeFn, dc, informers, clientset := setup(t)
  502. defer closeFn()
  503. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  504. defer framework.DeleteTestingNamespace(ns, server, t)
  505. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  506. podClient := clientset.CoreV1().Pods(ns.Name)
  507. nodeClient := clientset.CoreV1().Nodes()
  508. podInformer := informers.Core().V1().Pods().Informer()
  509. ctx, cancel := context.WithCancel(context.Background())
  510. defer cancel()
  511. informers.Start(ctx.Done())
  512. go dc.Run(5, ctx.Done())
  513. // Start Scheduler
  514. setupScheduler(ctx, t, clientset, informers)
  515. ds := newDaemonSet("foo", ns.Name)
  516. ds.Spec.UpdateStrategy = *strategy
  517. _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  518. if err != nil {
  519. t.Fatalf("Failed to create DaemonSet: %v", err)
  520. }
  521. defer cleanupDaemonSets(t, clientset, ds)
  522. node := newNode("single-node", nil)
  523. node.Status.Conditions = []v1.NodeCondition{
  524. {Type: v1.NodeReady, Status: v1.ConditionFalse},
  525. }
  526. _, err = nodeClient.Create(context.TODO(), node, metav1.CreateOptions{})
  527. if err != nil {
  528. t.Fatalf("Failed to create node: %v", err)
  529. }
  530. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  531. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  532. })
  533. }
  534. // TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests thaat the DaemonSet should create
  535. // Pods for all the nodes regardless of available resource on the nodes, and kube-scheduler should
  536. // not schedule Pods onto the nodes with insufficient resource.
  537. func TestInsufficientCapacityNode(t *testing.T) {
  538. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  539. server, closeFn, dc, informers, clientset := setup(t)
  540. defer closeFn()
  541. ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
  542. defer framework.DeleteTestingNamespace(ns, server, t)
  543. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  544. podClient := clientset.CoreV1().Pods(ns.Name)
  545. podInformer := informers.Core().V1().Pods().Informer()
  546. nodeClient := clientset.CoreV1().Nodes()
  547. ctx, cancel := context.WithCancel(context.Background())
  548. defer cancel()
  549. informers.Start(ctx.Done())
  550. go dc.Run(5, ctx.Done())
  551. // Start Scheduler
  552. setupScheduler(ctx, t, clientset, informers)
  553. ds := newDaemonSet("foo", ns.Name)
  554. ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
  555. ds.Spec.UpdateStrategy = *strategy
  556. ds, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  557. if err != nil {
  558. t.Fatalf("Failed to create DaemonSet: %v", err)
  559. }
  560. defer cleanupDaemonSets(t, clientset, ds)
  561. node := newNode("node-with-limited-memory", nil)
  562. node.Status.Allocatable = allocatableResources("100M", "200m")
  563. _, err = nodeClient.Create(context.TODO(), node, metav1.CreateOptions{})
  564. if err != nil {
  565. t.Fatalf("Failed to create node: %v", err)
  566. }
  567. if err := waitForPodsCreated(podInformer, 1); err != nil {
  568. t.Errorf("Failed to wait for pods created: %v", err)
  569. }
  570. objects := podInformer.GetIndexer().List()
  571. for _, object := range objects {
  572. pod := object.(*v1.Pod)
  573. if err := waitForPodUnschedulable(clientset, pod); err != nil {
  574. t.Errorf("Failed to wait for unschedulable status of pod %+v", pod)
  575. }
  576. }
  577. node1 := newNode("node-with-enough-memory", nil)
  578. node1.Status.Allocatable = allocatableResources("200M", "2000m")
  579. _, err = nodeClient.Create(context.TODO(), node1, metav1.CreateOptions{})
  580. if err != nil {
  581. t.Fatalf("Failed to create node: %v", err)
  582. }
  583. // 2 pods are created. But only one of two Pods is scheduled by default scheduler.
  584. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  585. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  586. })
  587. }
  588. // TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
  589. // hash collision with an existing ControllerRevision
  590. func TestLaunchWithHashCollision(t *testing.T) {
  591. server, closeFn, dc, informers, clientset := setup(t)
  592. defer closeFn()
  593. ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
  594. defer framework.DeleteTestingNamespace(ns, server, t)
  595. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  596. podInformer := informers.Core().V1().Pods().Informer()
  597. nodeClient := clientset.CoreV1().Nodes()
  598. ctx, cancel := context.WithCancel(context.Background())
  599. defer cancel()
  600. informers.Start(ctx.Done())
  601. go dc.Run(5, ctx.Done())
  602. // Start Scheduler
  603. setupScheduler(ctx, t, clientset, informers)
  604. // Create single node
  605. _, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
  606. if err != nil {
  607. t.Fatalf("Failed to create node: %v", err)
  608. }
  609. // Create new DaemonSet with RollingUpdate strategy
  610. orgDs := newDaemonSet("foo", ns.Name)
  611. oneIntString := intstr.FromInt(1)
  612. orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
  613. Type: apps.RollingUpdateDaemonSetStrategyType,
  614. RollingUpdate: &apps.RollingUpdateDaemonSet{
  615. MaxUnavailable: &oneIntString,
  616. },
  617. }
  618. ds, err := dsClient.Create(context.TODO(), orgDs, metav1.CreateOptions{})
  619. if err != nil {
  620. t.Fatalf("Failed to create DaemonSet: %v", err)
  621. }
  622. // Wait for the DaemonSet to be created before proceeding
  623. err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
  624. if err != nil {
  625. t.Fatalf("Failed to create DaemonSet: %v", err)
  626. }
  627. ds, err = dsClient.Get(context.TODO(), ds.Name, metav1.GetOptions{})
  628. if err != nil {
  629. t.Fatalf("Failed to get DaemonSet: %v", err)
  630. }
  631. var orgCollisionCount int32
  632. if ds.Status.CollisionCount != nil {
  633. orgCollisionCount = *ds.Status.CollisionCount
  634. }
  635. // Look up the ControllerRevision for the DaemonSet
  636. _, name := hashAndNameForDaemonSet(ds)
  637. revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
  638. if err != nil || revision == nil {
  639. t.Fatalf("Failed to look up ControllerRevision: %v", err)
  640. }
  641. // Create a "fake" ControllerRevision that we know will create a hash collision when we make
  642. // the next update
  643. one := int64(1)
  644. ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
  645. newHash, newName := hashAndNameForDaemonSet(ds)
  646. newRevision := &apps.ControllerRevision{
  647. ObjectMeta: metav1.ObjectMeta{
  648. Name: newName,
  649. Namespace: ds.Namespace,
  650. Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
  651. Annotations: ds.Annotations,
  652. OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
  653. },
  654. Data: revision.Data,
  655. Revision: revision.Revision + 1,
  656. }
  657. _, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(context.TODO(), newRevision, metav1.CreateOptions{})
  658. if err != nil {
  659. t.Fatalf("Failed to create ControllerRevision: %v", err)
  660. }
  661. // Make an update of the DaemonSet which we know will create a hash collision when
  662. // the next ControllerRevision is created.
  663. ds = updateDS(t, dsClient, ds.Name, func(updateDS *apps.DaemonSet) {
  664. updateDS.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
  665. })
  666. // Wait for any pod with the latest Spec to exist
  667. err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  668. objects := podInformer.GetIndexer().List()
  669. for _, object := range objects {
  670. pod := object.(*v1.Pod)
  671. if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
  672. return true, nil
  673. }
  674. }
  675. return false, nil
  676. })
  677. if err != nil {
  678. t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
  679. }
  680. validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
  681. }
  682. // TestTaintedNode tests tainted node isn't expected to have pod scheduled
  683. func TestTaintedNode(t *testing.T) {
  684. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  685. server, closeFn, dc, informers, clientset := setup(t)
  686. defer closeFn()
  687. ns := framework.CreateTestingNamespace("tainted-node", server, t)
  688. defer framework.DeleteTestingNamespace(ns, server, t)
  689. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  690. podClient := clientset.CoreV1().Pods(ns.Name)
  691. podInformer := informers.Core().V1().Pods().Informer()
  692. nodeClient := clientset.CoreV1().Nodes()
  693. ctx, cancel := context.WithCancel(context.Background())
  694. defer cancel()
  695. informers.Start(ctx.Done())
  696. go dc.Run(5, ctx.Done())
  697. // Start Scheduler
  698. setupScheduler(ctx, t, clientset, informers)
  699. ds := newDaemonSet("foo", ns.Name)
  700. ds.Spec.UpdateStrategy = *strategy
  701. ds, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  702. if err != nil {
  703. t.Fatalf("Failed to create DaemonSet: %v", err)
  704. }
  705. defer cleanupDaemonSets(t, clientset, ds)
  706. nodeWithTaint := newNode("node-with-taint", nil)
  707. nodeWithTaint.Spec.Taints = []v1.Taint{{Key: "key1", Value: "val1", Effect: "NoSchedule"}}
  708. _, err = nodeClient.Create(context.TODO(), nodeWithTaint, metav1.CreateOptions{})
  709. if err != nil {
  710. t.Fatalf("Failed to create nodeWithTaint: %v", err)
  711. }
  712. nodeWithoutTaint := newNode("node-without-taint", nil)
  713. _, err = nodeClient.Create(context.TODO(), nodeWithoutTaint, metav1.CreateOptions{})
  714. if err != nil {
  715. t.Fatalf("Failed to create nodeWithoutTaint: %v", err)
  716. }
  717. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  718. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  719. // remove taint from nodeWithTaint
  720. nodeWithTaint, err = nodeClient.Get(context.TODO(), "node-with-taint", metav1.GetOptions{})
  721. if err != nil {
  722. t.Fatalf("Failed to retrieve nodeWithTaint: %v", err)
  723. }
  724. nodeWithTaintCopy := nodeWithTaint.DeepCopy()
  725. nodeWithTaintCopy.Spec.Taints = []v1.Taint{}
  726. _, err = nodeClient.Update(context.TODO(), nodeWithTaintCopy, metav1.UpdateOptions{})
  727. if err != nil {
  728. t.Fatalf("Failed to update nodeWithTaint: %v", err)
  729. }
  730. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  731. validateDaemonSetStatus(dsClient, ds.Name, 2, t)
  732. })
  733. }
  734. // TestUnschedulableNodeDaemonDoesLaunchPod tests that the DaemonSet Pods can still be scheduled
  735. // to the Unschedulable nodes.
  736. func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
  737. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  738. server, closeFn, dc, informers, clientset := setup(t)
  739. defer closeFn()
  740. ns := framework.CreateTestingNamespace("daemonset-unschedulable-test", server, t)
  741. defer framework.DeleteTestingNamespace(ns, server, t)
  742. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  743. podClient := clientset.CoreV1().Pods(ns.Name)
  744. nodeClient := clientset.CoreV1().Nodes()
  745. podInformer := informers.Core().V1().Pods().Informer()
  746. ctx, cancel := context.WithCancel(context.Background())
  747. defer cancel()
  748. informers.Start(ctx.Done())
  749. go dc.Run(5, ctx.Done())
  750. // Start Scheduler
  751. setupScheduler(ctx, t, clientset, informers)
  752. ds := newDaemonSet("foo", ns.Name)
  753. ds.Spec.UpdateStrategy = *strategy
  754. ds.Spec.Template.Spec.HostNetwork = true
  755. _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
  756. if err != nil {
  757. t.Fatalf("Failed to create DaemonSet: %v", err)
  758. }
  759. defer cleanupDaemonSets(t, clientset, ds)
  760. // Creates unschedulable node.
  761. node := newNode("unschedulable-node", nil)
  762. node.Spec.Unschedulable = true
  763. node.Spec.Taints = []v1.Taint{
  764. {
  765. Key: v1.TaintNodeUnschedulable,
  766. Effect: v1.TaintEffectNoSchedule,
  767. },
  768. }
  769. _, err = nodeClient.Create(context.TODO(), node, metav1.CreateOptions{})
  770. if err != nil {
  771. t.Fatalf("Failed to create node: %v", err)
  772. }
  773. // Creates network-unavailable node.
  774. nodeNU := newNode("network-unavailable-node", nil)
  775. nodeNU.Status.Conditions = []v1.NodeCondition{
  776. {Type: v1.NodeReady, Status: v1.ConditionFalse},
  777. {Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue},
  778. }
  779. nodeNU.Spec.Taints = []v1.Taint{
  780. {
  781. Key: v1.TaintNodeNetworkUnavailable,
  782. Effect: v1.TaintEffectNoSchedule,
  783. },
  784. }
  785. _, err = nodeClient.Create(context.TODO(), nodeNU, metav1.CreateOptions{})
  786. if err != nil {
  787. t.Fatalf("Failed to create node: %v", err)
  788. }
  789. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  790. validateDaemonSetStatus(dsClient, ds.Name, 2, t)
  791. })
  792. }