daemonset_test.go 34 KB


  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package daemonset
  14. import (
  15. "fmt"
  16. "net/http/httptest"
  17. "testing"
  18. "time"
  19. apps "k8s.io/api/apps/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/util/intstr"
  25. "k8s.io/apimachinery/pkg/util/uuid"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. "k8s.io/client-go/informers"
  29. clientset "k8s.io/client-go/kubernetes"
  30. appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
  31. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  32. restclient "k8s.io/client-go/rest"
  33. "k8s.io/client-go/tools/cache"
  34. "k8s.io/client-go/tools/record"
  35. "k8s.io/client-go/util/flowcontrol"
  36. "k8s.io/client-go/util/retry"
  37. "k8s.io/component-base/featuregate"
  38. featuregatetesting "k8s.io/component-base/featuregate/testing"
  39. "k8s.io/kubernetes/pkg/api/legacyscheme"
  40. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  41. "k8s.io/kubernetes/pkg/controller"
  42. "k8s.io/kubernetes/pkg/controller/daemon"
  43. "k8s.io/kubernetes/pkg/features"
  44. "k8s.io/kubernetes/pkg/scheduler"
  45. "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
  46. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  47. "k8s.io/kubernetes/pkg/scheduler/factory"
  48. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  49. "k8s.io/kubernetes/test/integration/framework"
  50. )
  51. var zero = int64(0)
  52. func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
  53. masterConfig := framework.NewIntegrationTestMasterConfig()
  54. _, server, closeFn := framework.RunAMaster(masterConfig)
  55. config := restclient.Config{Host: server.URL}
  56. clientSet, err := clientset.NewForConfig(&config)
  57. if err != nil {
  58. t.Fatalf("Error in creating clientset: %v", err)
  59. }
  60. resyncPeriod := 12 * time.Hour
  61. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
  62. dc, err := daemon.NewDaemonSetsController(
  63. informers.Apps().V1().DaemonSets(),
  64. informers.Apps().V1().ControllerRevisions(),
  65. informers.Core().V1().Pods(),
  66. informers.Core().V1().Nodes(),
  67. clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")),
  68. flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
  69. )
  70. if err != nil {
  71. t.Fatalf("error creating DaemonSets controller: %v", err)
  72. }
  73. return server, closeFn, dc, informers, clientSet
  74. }
  75. func setupScheduler(
  76. t *testing.T,
  77. cs clientset.Interface,
  78. informerFactory informers.SharedInformerFactory,
  79. stopCh chan struct{},
  80. ) {
  81. // If ScheduleDaemonSetPods is disabled, do not start scheduler.
  82. if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  83. return
  84. }
  85. // Enable Features.
  86. algorithmprovider.ApplyFeatureGates()
  87. schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
  88. SchedulerName: v1.DefaultSchedulerName,
  89. Client: cs,
  90. NodeInformer: informerFactory.Core().V1().Nodes(),
  91. PodInformer: informerFactory.Core().V1().Pods(),
  92. PvInformer: informerFactory.Core().V1().PersistentVolumes(),
  93. PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
  94. ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
  95. ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
  96. StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
  97. ServiceInformer: informerFactory.Core().V1().Services(),
  98. PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
  99. StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
  100. HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
  101. DisablePreemption: false,
  102. PercentageOfNodesToScore: 100,
  103. StopCh: stopCh,
  104. })
  105. schedulerConfig, err := schedulerConfigFactory.Create()
  106. if err != nil {
  107. t.Fatalf("Couldn't create scheduler config: %v", err)
  108. }
  109. // TODO: Replace NewFromConfig and AddAllEventHandlers with scheduler.New() in
  110. // all test/integration tests.
  111. sched := scheduler.NewFromConfig(schedulerConfig)
  112. scheduler.AddAllEventHandlers(sched,
  113. v1.DefaultSchedulerName,
  114. informerFactory.Core().V1().Nodes(),
  115. informerFactory.Core().V1().Pods(),
  116. informerFactory.Core().V1().PersistentVolumes(),
  117. informerFactory.Core().V1().PersistentVolumeClaims(),
  118. informerFactory.Core().V1().Services(),
  119. informerFactory.Storage().V1().StorageClasses(),
  120. )
  121. eventBroadcaster := record.NewBroadcaster()
  122. schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
  123. legacyscheme.Scheme,
  124. v1.EventSource{Component: v1.DefaultSchedulerName},
  125. )
  126. eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{
  127. Interface: cs.CoreV1().Events(""),
  128. })
  129. algorithmprovider.ApplyFeatureGates()
  130. go sched.Run()
  131. }
  132. func testLabels() map[string]string {
  133. return map[string]string{"name": "test"}
  134. }
  135. func newDaemonSet(name, namespace string) *apps.DaemonSet {
  136. two := int32(2)
  137. return &apps.DaemonSet{
  138. TypeMeta: metav1.TypeMeta{
  139. Kind: "DaemonSet",
  140. APIVersion: "apps/v1",
  141. },
  142. ObjectMeta: metav1.ObjectMeta{
  143. Namespace: namespace,
  144. Name: name,
  145. },
  146. Spec: apps.DaemonSetSpec{
  147. RevisionHistoryLimit: &two,
  148. Selector: &metav1.LabelSelector{MatchLabels: testLabels()},
  149. UpdateStrategy: apps.DaemonSetUpdateStrategy{
  150. Type: apps.OnDeleteDaemonSetStrategyType,
  151. },
  152. Template: v1.PodTemplateSpec{
  153. ObjectMeta: metav1.ObjectMeta{
  154. Labels: testLabels(),
  155. },
  156. Spec: v1.PodSpec{
  157. Containers: []v1.Container{{Name: "foo", Image: "bar"}},
  158. TerminationGracePeriodSeconds: &zero,
  159. },
  160. },
  161. },
  162. }
  163. }
  164. func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) {
  165. ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
  166. if err != nil {
  167. t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  168. return
  169. }
  170. // We set the nodeSelector to a random label. This label is nearly guaranteed
  171. // to not be set on any node so the DameonSetController will start deleting
  172. // daemon pods. Once it's done deleting the daemon pods, it's safe to delete
  173. // the DaemonSet.
  174. ds.Spec.Template.Spec.NodeSelector = map[string]string{
  175. string(uuid.NewUUID()): string(uuid.NewUUID()),
  176. }
  177. // force update to avoid version conflict
  178. ds.ResourceVersion = ""
  179. if ds, err = cs.AppsV1().DaemonSets(ds.Namespace).Update(ds); err != nil {
  180. t.Errorf("Failed to update DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  181. return
  182. }
  183. // Wait for the daemon set controller to kill all the daemon pods.
  184. if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
  185. updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
  186. if err != nil {
  187. return false, nil
  188. }
  189. return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
  190. }); err != nil {
  191. t.Errorf("Failed to kill the pods of DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  192. return
  193. }
  194. falseVar := false
  195. deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
  196. if err := cs.AppsV1().DaemonSets(ds.Namespace).Delete(ds.Name, deleteOptions); err != nil {
  197. t.Errorf("Failed to delete DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
  198. }
  199. }
  200. func newRollbackStrategy() *apps.DaemonSetUpdateStrategy {
  201. one := intstr.FromInt(1)
  202. return &apps.DaemonSetUpdateStrategy{
  203. Type: apps.RollingUpdateDaemonSetStrategyType,
  204. RollingUpdate: &apps.RollingUpdateDaemonSet{MaxUnavailable: &one},
  205. }
  206. }
  207. func newOnDeleteStrategy() *apps.DaemonSetUpdateStrategy {
  208. return &apps.DaemonSetUpdateStrategy{
  209. Type: apps.OnDeleteDaemonSetStrategyType,
  210. }
  211. }
  212. func updateStrategies() []*apps.DaemonSetUpdateStrategy {
  213. return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
  214. }
  215. func featureGates() []featuregate.Feature {
  216. return []featuregate.Feature{
  217. features.ScheduleDaemonSetPods,
  218. }
  219. }
  220. func allocatableResources(memory, cpu string) v1.ResourceList {
  221. return v1.ResourceList{
  222. v1.ResourceMemory: resource.MustParse(memory),
  223. v1.ResourceCPU: resource.MustParse(cpu),
  224. v1.ResourcePods: resource.MustParse("100"),
  225. }
  226. }
  227. func resourcePodSpec(nodeName, memory, cpu string) v1.PodSpec {
  228. return v1.PodSpec{
  229. NodeName: nodeName,
  230. Containers: []v1.Container{
  231. {
  232. Name: "foo",
  233. Image: "bar",
  234. Resources: v1.ResourceRequirements{
  235. Requests: v1.ResourceList{
  236. v1.ResourceMemory: resource.MustParse(memory),
  237. v1.ResourceCPU: resource.MustParse(cpu),
  238. },
  239. },
  240. },
  241. },
  242. TerminationGracePeriodSeconds: &zero,
  243. }
  244. }
  245. func newNode(name string, label map[string]string) *v1.Node {
  246. return &v1.Node{
  247. TypeMeta: metav1.TypeMeta{
  248. Kind: "Node",
  249. APIVersion: "v1",
  250. },
  251. ObjectMeta: metav1.ObjectMeta{
  252. Name: name,
  253. Labels: label,
  254. Namespace: metav1.NamespaceNone,
  255. },
  256. Status: v1.NodeStatus{
  257. Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
  258. Allocatable: v1.ResourceList{v1.ResourcePods: resource.MustParse("100")},
  259. },
  260. }
  261. }
  262. func addNodes(nodeClient corev1client.NodeInterface, startIndex, numNodes int, label map[string]string, t *testing.T) {
  263. for i := startIndex; i < startIndex+numNodes; i++ {
  264. _, err := nodeClient.Create(newNode(fmt.Sprintf("node-%d", i), label))
  265. if err != nil {
  266. t.Fatalf("Failed to create node: %v", err)
  267. }
  268. }
  269. }
  270. func validateDaemonSetPodsAndMarkReady(
  271. podClient corev1client.PodInterface,
  272. podInformer cache.SharedIndexInformer,
  273. numberPods int,
  274. t *testing.T,
  275. ) {
  276. if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
  277. objects := podInformer.GetIndexer().List()
  278. if len(objects) != numberPods {
  279. return false, nil
  280. }
  281. for _, object := range objects {
  282. pod := object.(*v1.Pod)
  283. ownerReferences := pod.ObjectMeta.OwnerReferences
  284. if len(ownerReferences) != 1 {
  285. return false, fmt.Errorf("Pod %s has %d OwnerReferences, expected only 1", pod.Name, len(ownerReferences))
  286. }
  287. controllerRef := ownerReferences[0]
  288. if got, want := controllerRef.Kind, "DaemonSet"; got != want {
  289. t.Errorf("controllerRef.Kind = %q, want %q", got, want)
  290. }
  291. if controllerRef.Controller == nil || *controllerRef.Controller != true {
  292. t.Errorf("controllerRef.Controller is not set to true")
  293. }
  294. if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
  295. podCopy := pod.DeepCopy()
  296. podCopy.Status = v1.PodStatus{
  297. Phase: v1.PodRunning,
  298. Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
  299. }
  300. _, err := podClient.UpdateStatus(podCopy)
  301. if err != nil {
  302. return false, err
  303. }
  304. }
  305. }
  306. return true, nil
  307. }); err != nil {
  308. t.Fatal(err)
  309. }
  310. }
  311. // podUnschedulable returns a condition function that returns true if the given pod
  312. // gets unschedulable status.
  313. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  314. return func() (bool, error) {
  315. pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
  316. if errors.IsNotFound(err) {
  317. return false, nil
  318. }
  319. if err != nil {
  320. // This could be a connection error so we want to retry.
  321. return false, nil
  322. }
  323. _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  324. return cond != nil && cond.Status == v1.ConditionFalse &&
  325. cond.Reason == v1.PodReasonUnschedulable, nil
  326. }
  327. }
  328. // waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
  329. // an error if it does not become unschedulable within the given timeout.
  330. func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  331. return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
  332. }
  333. // waitForPodUnschedule waits for a pod to fail scheduling and returns
  334. // an error if it does not become unschedulable within the timeout duration (30 seconds).
  335. func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
  336. return waitForPodUnschedulableWithTimeout(cs, pod, 10*time.Second)
  337. }
  338. // waitForPodsCreated waits for number of pods are created.
  339. func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
  340. return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  341. objects := podInformer.GetIndexer().List()
  342. return len(objects) == num, nil
  343. })
  344. }
  345. func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
  346. return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  347. ds, err := c.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
  348. if err != nil {
  349. return false, err
  350. }
  351. if ds == nil {
  352. return false, nil
  353. }
  354. revs, err := c.AppsV1().ControllerRevisions(namespace).List(metav1.ListOptions{})
  355. if err != nil {
  356. return false, err
  357. }
  358. if revs.Size() == 0 {
  359. return false, nil
  360. }
  361. for _, rev := range revs.Items {
  362. for _, oref := range rev.OwnerReferences {
  363. if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
  364. return true, nil
  365. }
  366. }
  367. }
  368. return false, nil
  369. })
  370. }
  371. func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
  372. hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
  373. name := ds.Name + "-" + hash
  374. return hash, name
  375. }
  376. func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
  377. ds, err := dsClient.Get(dsName, metav1.GetOptions{})
  378. if err != nil {
  379. t.Fatalf("Failed to look up DaemonSet: %v", err)
  380. }
  381. collisionCount := ds.Status.CollisionCount
  382. if *collisionCount != expCount {
  383. t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
  384. }
  385. }
  386. func validateDaemonSetStatus(
  387. dsClient appstyped.DaemonSetInterface,
  388. dsName string,
  389. expectedNumberReady int32,
  390. t *testing.T) {
  391. if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  392. ds, err := dsClient.Get(dsName, metav1.GetOptions{})
  393. if err != nil {
  394. return false, err
  395. }
  396. return ds.Status.NumberReady == expectedNumberReady, nil
  397. }); err != nil {
  398. t.Fatal(err)
  399. }
  400. }
  401. func validateFailedPlacementEvent(eventClient corev1client.EventInterface, t *testing.T) {
  402. if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  403. eventList, err := eventClient.List(metav1.ListOptions{})
  404. if err != nil {
  405. return false, err
  406. }
  407. if len(eventList.Items) == 0 {
  408. return false, nil
  409. }
  410. if len(eventList.Items) > 1 {
  411. t.Errorf("Expected 1 event got %d", len(eventList.Items))
  412. }
  413. event := eventList.Items[0]
  414. if event.Type != v1.EventTypeWarning {
  415. t.Errorf("Event type expected %s got %s", v1.EventTypeWarning, event.Type)
  416. }
  417. if event.Reason != daemon.FailedPlacementReason {
  418. t.Errorf("Event reason expected %s got %s", daemon.FailedPlacementReason, event.Reason)
  419. }
  420. return true, nil
  421. }); err != nil {
  422. t.Fatal(err)
  423. }
  424. }
  425. func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
  426. var ds *apps.DaemonSet
  427. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  428. newDS, err := dsClient.Get(dsName, metav1.GetOptions{})
  429. if err != nil {
  430. return err
  431. }
  432. updateFunc(newDS)
  433. ds, err = dsClient.Update(newDS)
  434. return err
  435. }); err != nil {
  436. t.Fatalf("Failed to update DaemonSet: %v", err)
  437. }
  438. return ds
  439. }
  440. func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) {
  441. for _, fg := range featureGates() {
  442. for _, f := range []bool{true, false} {
  443. func() {
  444. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, fg, f)()
  445. t.Run(fmt.Sprintf("%v (%t)", fg, f), tf)
  446. }()
  447. }
  448. }
  449. }
  450. func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
  451. for _, strategy := range updateStrategies() {
  452. t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
  453. func(tt *testing.T) { tf(tt, strategy) })
  454. }
  455. }
  456. func TestOneNodeDaemonLaunchesPod(t *testing.T) {
  457. forEachFeatureGate(t, func(t *testing.T) {
  458. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  459. server, closeFn, dc, informers, clientset := setup(t)
  460. defer closeFn()
  461. ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
  462. defer framework.DeleteTestingNamespace(ns, server, t)
  463. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  464. podClient := clientset.CoreV1().Pods(ns.Name)
  465. nodeClient := clientset.CoreV1().Nodes()
  466. podInformer := informers.Core().V1().Pods().Informer()
  467. stopCh := make(chan struct{})
  468. defer close(stopCh)
  469. // Start Scheduler
  470. setupScheduler(t, clientset, informers, stopCh)
  471. informers.Start(stopCh)
  472. go dc.Run(5, stopCh)
  473. ds := newDaemonSet("foo", ns.Name)
  474. ds.Spec.UpdateStrategy = *strategy
  475. _, err := dsClient.Create(ds)
  476. if err != nil {
  477. t.Fatalf("Failed to create DaemonSet: %v", err)
  478. }
  479. defer cleanupDaemonSets(t, clientset, ds)
  480. _, err = nodeClient.Create(newNode("single-node", nil))
  481. if err != nil {
  482. t.Fatalf("Failed to create node: %v", err)
  483. }
  484. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  485. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  486. })
  487. })
  488. }
  489. func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
  490. forEachFeatureGate(t, func(t *testing.T) {
  491. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  492. server, closeFn, dc, informers, clientset := setup(t)
  493. defer closeFn()
  494. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  495. defer framework.DeleteTestingNamespace(ns, server, t)
  496. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  497. podClient := clientset.CoreV1().Pods(ns.Name)
  498. nodeClient := clientset.CoreV1().Nodes()
  499. podInformer := informers.Core().V1().Pods().Informer()
  500. stopCh := make(chan struct{})
  501. defer close(stopCh)
  502. informers.Start(stopCh)
  503. go dc.Run(5, stopCh)
  504. // Start Scheduler
  505. setupScheduler(t, clientset, informers, stopCh)
  506. ds := newDaemonSet("foo", ns.Name)
  507. ds.Spec.UpdateStrategy = *strategy
  508. _, err := dsClient.Create(ds)
  509. if err != nil {
  510. t.Fatalf("Failed to create DaemonSet: %v", err)
  511. }
  512. defer cleanupDaemonSets(t, clientset, ds)
  513. addNodes(nodeClient, 0, 5, nil, t)
  514. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
  515. validateDaemonSetStatus(dsClient, ds.Name, 5, t)
  516. })
  517. })
  518. }
  519. func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
  520. forEachFeatureGate(t, func(t *testing.T) {
  521. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  522. server, closeFn, dc, informers, clientset := setup(t)
  523. defer closeFn()
  524. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  525. defer framework.DeleteTestingNamespace(ns, server, t)
  526. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  527. podClient := clientset.CoreV1().Pods(ns.Name)
  528. nodeClient := clientset.CoreV1().Nodes()
  529. podInformer := informers.Core().V1().Pods().Informer()
  530. stopCh := make(chan struct{})
  531. defer close(stopCh)
  532. informers.Start(stopCh)
  533. go dc.Run(5, stopCh)
  534. // Start Scheduler
  535. setupScheduler(t, clientset, informers, stopCh)
  536. ds := newDaemonSet("foo", ns.Name)
  537. ds.Spec.UpdateStrategy = *strategy
  538. ds.Spec.Template.Spec.Affinity = &v1.Affinity{
  539. NodeAffinity: &v1.NodeAffinity{
  540. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  541. NodeSelectorTerms: []v1.NodeSelectorTerm{
  542. {
  543. MatchExpressions: []v1.NodeSelectorRequirement{
  544. {
  545. Key: "zone",
  546. Operator: v1.NodeSelectorOpIn,
  547. Values: []string{"test"},
  548. },
  549. },
  550. },
  551. {
  552. MatchFields: []v1.NodeSelectorRequirement{
  553. {
  554. Key: schedulerapi.NodeFieldSelectorKeyNodeName,
  555. Operator: v1.NodeSelectorOpIn,
  556. Values: []string{"node-1"},
  557. },
  558. },
  559. },
  560. },
  561. },
  562. },
  563. }
  564. _, err := dsClient.Create(ds)
  565. if err != nil {
  566. t.Fatalf("Failed to create DaemonSet: %v", err)
  567. }
  568. defer cleanupDaemonSets(t, clientset, ds)
  569. addNodes(nodeClient, 0, 2, nil, t)
  570. // Two nodes with labels
  571. addNodes(nodeClient, 2, 2, map[string]string{
  572. "zone": "test",
  573. }, t)
  574. addNodes(nodeClient, 4, 2, nil, t)
  575. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
  576. validateDaemonSetStatus(dsClient, ds.Name, 3, t)
  577. })
  578. })
  579. }
  580. func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
  581. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  582. server, closeFn, dc, informers, clientset := setup(t)
  583. defer closeFn()
  584. ns := framework.CreateTestingNamespace("simple-daemonset-test", server, t)
  585. defer framework.DeleteTestingNamespace(ns, server, t)
  586. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  587. podClient := clientset.CoreV1().Pods(ns.Name)
  588. nodeClient := clientset.CoreV1().Nodes()
  589. podInformer := informers.Core().V1().Pods().Informer()
  590. stopCh := make(chan struct{})
  591. defer close(stopCh)
  592. informers.Start(stopCh)
  593. go dc.Run(5, stopCh)
  594. // Start Scheduler
  595. setupScheduler(t, clientset, informers, stopCh)
  596. ds := newDaemonSet("foo", ns.Name)
  597. ds.Spec.UpdateStrategy = *strategy
  598. _, err := dsClient.Create(ds)
  599. if err != nil {
  600. t.Fatalf("Failed to create DaemonSet: %v", err)
  601. }
  602. defer cleanupDaemonSets(t, clientset, ds)
  603. node := newNode("single-node", nil)
  604. node.Status.Conditions = []v1.NodeCondition{
  605. {Type: v1.NodeReady, Status: v1.ConditionFalse},
  606. }
  607. _, err = nodeClient.Create(node)
  608. if err != nil {
  609. t.Fatalf("Failed to create node: %v", err)
  610. }
  611. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  612. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  613. })
  614. }
  615. // When ScheduleDaemonSetPods is disabled, DaemonSets should not launch onto nodes with insufficient capacity.
  616. // Look for TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled, we don't need this test anymore.
  617. func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
  618. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, false)()
  619. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  620. server, closeFn, dc, informers, clientset := setup(t)
  621. defer closeFn()
  622. ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
  623. defer framework.DeleteTestingNamespace(ns, server, t)
  624. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  625. nodeClient := clientset.CoreV1().Nodes()
  626. eventClient := clientset.CoreV1().Events(ns.Namespace)
  627. stopCh := make(chan struct{})
  628. defer close(stopCh)
  629. informers.Start(stopCh)
  630. go dc.Run(5, stopCh)
  631. ds := newDaemonSet("foo", ns.Name)
  632. ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m")
  633. ds.Spec.UpdateStrategy = *strategy
  634. _, err := dsClient.Create(ds)
  635. if err != nil {
  636. t.Fatalf("Failed to create DaemonSet: %v", err)
  637. }
  638. defer cleanupDaemonSets(t, clientset, ds)
  639. node := newNode("node-with-limited-memory", nil)
  640. node.Status.Allocatable = allocatableResources("100M", "200m")
  641. _, err = nodeClient.Create(node)
  642. if err != nil {
  643. t.Fatalf("Failed to create node: %v", err)
  644. }
  645. validateFailedPlacementEvent(eventClient, t)
  646. })
  647. }
  648. // TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods"
  649. // feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource
  650. // on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource.
  651. func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) {
  652. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, true)()
  653. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  654. server, closeFn, dc, informers, clientset := setup(t)
  655. defer closeFn()
  656. ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
  657. defer framework.DeleteTestingNamespace(ns, server, t)
  658. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  659. podClient := clientset.CoreV1().Pods(ns.Name)
  660. podInformer := informers.Core().V1().Pods().Informer()
  661. nodeClient := clientset.CoreV1().Nodes()
  662. stopCh := make(chan struct{})
  663. defer close(stopCh)
  664. informers.Start(stopCh)
  665. go dc.Run(5, stopCh)
  666. // Start Scheduler
  667. setupScheduler(t, clientset, informers, stopCh)
  668. ds := newDaemonSet("foo", ns.Name)
  669. ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
  670. ds.Spec.UpdateStrategy = *strategy
  671. ds, err := dsClient.Create(ds)
  672. if err != nil {
  673. t.Fatalf("Failed to create DaemonSet: %v", err)
  674. }
  675. defer cleanupDaemonSets(t, clientset, ds)
  676. node := newNode("node-with-limited-memory", nil)
  677. node.Status.Allocatable = allocatableResources("100M", "200m")
  678. _, err = nodeClient.Create(node)
  679. if err != nil {
  680. t.Fatalf("Failed to create node: %v", err)
  681. }
  682. if err := waitForPodsCreated(podInformer, 1); err != nil {
  683. t.Errorf("Failed to wait for pods created: %v", err)
  684. }
  685. objects := podInformer.GetIndexer().List()
  686. for _, object := range objects {
  687. pod := object.(*v1.Pod)
  688. if err := waitForPodUnschedulable(clientset, pod); err != nil {
  689. t.Errorf("Failed to wait for unschedulable status of pod %+v", pod)
  690. }
  691. }
  692. node1 := newNode("node-with-enough-memory", nil)
  693. node1.Status.Allocatable = allocatableResources("200M", "2000m")
  694. _, err = nodeClient.Create(node1)
  695. if err != nil {
  696. t.Fatalf("Failed to create node: %v", err)
  697. }
  698. // When ScheduleDaemonSetPods enabled, 2 pods are created. But only one
  699. // of two Pods is scheduled by default scheduler.
  700. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  701. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  702. })
  703. }
  704. // TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
  705. // hash collision with an existing ControllerRevision
  706. func TestLaunchWithHashCollision(t *testing.T) {
  707. server, closeFn, dc, informers, clientset := setup(t)
  708. defer closeFn()
  709. ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
  710. defer framework.DeleteTestingNamespace(ns, server, t)
  711. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  712. podInformer := informers.Core().V1().Pods().Informer()
  713. nodeClient := clientset.CoreV1().Nodes()
  714. stopCh := make(chan struct{})
  715. defer close(stopCh)
  716. informers.Start(stopCh)
  717. go dc.Run(1, stopCh)
  718. setupScheduler(t, clientset, informers, stopCh)
  719. // Create single node
  720. _, err := nodeClient.Create(newNode("single-node", nil))
  721. if err != nil {
  722. t.Fatalf("Failed to create node: %v", err)
  723. }
  724. // Create new DaemonSet with RollingUpdate strategy
  725. orgDs := newDaemonSet("foo", ns.Name)
  726. oneIntString := intstr.FromInt(1)
  727. orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
  728. Type: apps.RollingUpdateDaemonSetStrategyType,
  729. RollingUpdate: &apps.RollingUpdateDaemonSet{
  730. MaxUnavailable: &oneIntString,
  731. },
  732. }
  733. ds, err := dsClient.Create(orgDs)
  734. if err != nil {
  735. t.Fatalf("Failed to create DaemonSet: %v", err)
  736. }
  737. // Wait for the DaemonSet to be created before proceeding
  738. err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
  739. if err != nil {
  740. t.Fatalf("Failed to create DaemonSet: %v", err)
  741. }
  742. ds, err = dsClient.Get(ds.Name, metav1.GetOptions{})
  743. if err != nil {
  744. t.Fatalf("Failed to get DaemonSet: %v", err)
  745. }
  746. var orgCollisionCount int32
  747. if ds.Status.CollisionCount != nil {
  748. orgCollisionCount = *ds.Status.CollisionCount
  749. }
  750. // Look up the ControllerRevision for the DaemonSet
  751. _, name := hashAndNameForDaemonSet(ds)
  752. revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
  753. if err != nil || revision == nil {
  754. t.Fatalf("Failed to look up ControllerRevision: %v", err)
  755. }
  756. // Create a "fake" ControllerRevision that we know will create a hash collision when we make
  757. // the next update
  758. one := int64(1)
  759. ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
  760. newHash, newName := hashAndNameForDaemonSet(ds)
  761. newRevision := &apps.ControllerRevision{
  762. ObjectMeta: metav1.ObjectMeta{
  763. Name: newName,
  764. Namespace: ds.Namespace,
  765. Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
  766. Annotations: ds.Annotations,
  767. OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
  768. },
  769. Data: revision.Data,
  770. Revision: revision.Revision + 1,
  771. }
  772. _, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(newRevision)
  773. if err != nil {
  774. t.Fatalf("Failed to create ControllerRevision: %v", err)
  775. }
  776. // Make an update of the DaemonSet which we know will create a hash collision when
  777. // the next ControllerRevision is created.
  778. ds = updateDS(t, dsClient, ds.Name, func(updateDS *apps.DaemonSet) {
  779. updateDS.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
  780. })
  781. // Wait for any pod with the latest Spec to exist
  782. err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
  783. objects := podInformer.GetIndexer().List()
  784. for _, object := range objects {
  785. pod := object.(*v1.Pod)
  786. if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
  787. return true, nil
  788. }
  789. }
  790. return false, nil
  791. })
  792. if err != nil {
  793. t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
  794. }
  795. validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
  796. }
  797. // TestTaintedNode tests that no matter "ScheduleDaemonSetPods" feature is enabled or not
  798. // tainted node isn't expected to have pod scheduled
  799. func TestTaintedNode(t *testing.T) {
  800. forEachFeatureGate(t, func(t *testing.T) {
  801. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  802. server, closeFn, dc, informers, clientset := setup(t)
  803. defer closeFn()
  804. ns := framework.CreateTestingNamespace("tainted-node", server, t)
  805. defer framework.DeleteTestingNamespace(ns, server, t)
  806. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  807. podClient := clientset.CoreV1().Pods(ns.Name)
  808. podInformer := informers.Core().V1().Pods().Informer()
  809. nodeClient := clientset.CoreV1().Nodes()
  810. stopCh := make(chan struct{})
  811. defer close(stopCh)
  812. // Start Scheduler
  813. setupScheduler(t, clientset, informers, stopCh)
  814. informers.Start(stopCh)
  815. go dc.Run(5, stopCh)
  816. ds := newDaemonSet("foo", ns.Name)
  817. ds.Spec.UpdateStrategy = *strategy
  818. ds, err := dsClient.Create(ds)
  819. if err != nil {
  820. t.Fatalf("Failed to create DaemonSet: %v", err)
  821. }
  822. defer cleanupDaemonSets(t, clientset, ds)
  823. nodeWithTaint := newNode("node-with-taint", nil)
  824. nodeWithTaint.Spec.Taints = []v1.Taint{{Key: "key1", Value: "val1", Effect: "NoSchedule"}}
  825. _, err = nodeClient.Create(nodeWithTaint)
  826. if err != nil {
  827. t.Fatalf("Failed to create nodeWithTaint: %v", err)
  828. }
  829. nodeWithoutTaint := newNode("node-without-taint", nil)
  830. _, err = nodeClient.Create(nodeWithoutTaint)
  831. if err != nil {
  832. t.Fatalf("Failed to create nodeWithoutTaint: %v", err)
  833. }
  834. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
  835. validateDaemonSetStatus(dsClient, ds.Name, 1, t)
  836. // remove taint from nodeWithTaint
  837. nodeWithTaint, err = nodeClient.Get("node-with-taint", metav1.GetOptions{})
  838. if err != nil {
  839. t.Fatalf("Failed to retrieve nodeWithTaint: %v", err)
  840. }
  841. nodeWithTaintCopy := nodeWithTaint.DeepCopy()
  842. nodeWithTaintCopy.Spec.Taints = []v1.Taint{}
  843. _, err = nodeClient.Update(nodeWithTaintCopy)
  844. if err != nil {
  845. t.Fatalf("Failed to update nodeWithTaint: %v", err)
  846. }
  847. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  848. validateDaemonSetStatus(dsClient, ds.Name, 2, t)
  849. })
  850. })
  851. }
  852. // TestUnschedulableNodeDaemonDoesLaunchPod tests that the DaemonSet Pods can still be scheduled
  853. // to the Unschedulable nodes when TaintNodesByCondition are enabled.
  854. func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
  855. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TaintNodesByCondition, true)()
  856. forEachFeatureGate(t, func(t *testing.T) {
  857. forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
  858. server, closeFn, dc, informers, clientset := setup(t)
  859. defer closeFn()
  860. ns := framework.CreateTestingNamespace("daemonset-unschedulable-test", server, t)
  861. defer framework.DeleteTestingNamespace(ns, server, t)
  862. dsClient := clientset.AppsV1().DaemonSets(ns.Name)
  863. podClient := clientset.CoreV1().Pods(ns.Name)
  864. nodeClient := clientset.CoreV1().Nodes()
  865. podInformer := informers.Core().V1().Pods().Informer()
  866. stopCh := make(chan struct{})
  867. defer close(stopCh)
  868. informers.Start(stopCh)
  869. go dc.Run(5, stopCh)
  870. // Start Scheduler
  871. setupScheduler(t, clientset, informers, stopCh)
  872. ds := newDaemonSet("foo", ns.Name)
  873. ds.Spec.UpdateStrategy = *strategy
  874. ds.Spec.Template.Spec.HostNetwork = true
  875. _, err := dsClient.Create(ds)
  876. if err != nil {
  877. t.Fatalf("Failed to create DaemonSet: %v", err)
  878. }
  879. defer cleanupDaemonSets(t, clientset, ds)
  880. // Creates unschedulable node.
  881. node := newNode("unschedulable-node", nil)
  882. node.Spec.Unschedulable = true
  883. node.Spec.Taints = []v1.Taint{
  884. {
  885. Key: schedulerapi.TaintNodeUnschedulable,
  886. Effect: v1.TaintEffectNoSchedule,
  887. },
  888. }
  889. _, err = nodeClient.Create(node)
  890. if err != nil {
  891. t.Fatalf("Failed to create node: %v", err)
  892. }
  893. // Creates network-unavailable node.
  894. nodeNU := newNode("network-unavailable-node", nil)
  895. nodeNU.Status.Conditions = []v1.NodeCondition{
  896. {Type: v1.NodeReady, Status: v1.ConditionFalse},
  897. {Type: v1.NodeNetworkUnavailable, Status: v1.ConditionTrue},
  898. }
  899. nodeNU.Spec.Taints = []v1.Taint{
  900. {
  901. Key: schedulerapi.TaintNodeNetworkUnavailable,
  902. Effect: v1.TaintEffectNoSchedule,
  903. },
  904. }
  905. _, err = nodeClient.Create(nodeNU)
  906. if err != nil {
  907. t.Fatalf("Failed to create node: %v", err)
  908. }
  909. validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
  910. validateDaemonSetStatus(dsClient, ds.Name, 2, t)
  911. })
  912. })
  913. }