daemon_set.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apps
  14. import (
  15. "fmt"
  16. "reflect"
  17. "strings"
  18. "time"
  19. apps "k8s.io/api/apps/v1"
  20. "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  30. extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
  31. "k8s.io/kubernetes/pkg/controller/daemon"
  32. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  35. "github.com/onsi/ginkgo"
  36. "github.com/onsi/gomega"
  37. )
  38. const (
  39. // this should not be a multiple of 5, because node status updates
  40. // every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915.
  41. dsRetryPeriod = 1 * time.Second
  42. dsRetryTimeout = 5 * time.Minute
  43. daemonsetLabelPrefix = "daemonset-"
  44. daemonsetNameLabel = daemonsetLabelPrefix + "name"
  45. daemonsetColorLabel = daemonsetLabelPrefix + "color"
  46. )
  47. // NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning
  48. // node selectors labels to namespaces
  49. var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
  50. // This test must be run in serial because it assumes the Daemon Set pods will
  51. // always get scheduled. If we run other tests in parallel, this may not
  52. // happen. In the future, running in parallel may work if we have an eviction
  53. // model which lets the DS controller kick out other pods to make room.
  54. // See http://issues.k8s.io/21767 for more details
  55. var _ = SIGDescribe("Daemon set [Serial]", func() {
  56. var f *framework.Framework
  57. ginkgo.AfterEach(func() {
  58. // Clean up
  59. daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{})
  60. framework.ExpectNoError(err, "unable to dump DaemonSets")
  61. if daemonsets != nil && len(daemonsets.Items) > 0 {
  62. for _, ds := range daemonsets.Items {
  63. ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
  64. framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
  65. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
  66. framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
  67. }
  68. }
  69. if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{}); err == nil {
  70. e2elog.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
  71. } else {
  72. e2elog.Logf("unable to dump daemonsets: %v", err)
  73. }
  74. if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(metav1.ListOptions{}); err == nil {
  75. e2elog.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
  76. } else {
  77. e2elog.Logf("unable to dump pods: %v", err)
  78. }
  79. err = clearDaemonSetNodeLabels(f.ClientSet)
  80. framework.ExpectNoError(err)
  81. })
  82. f = framework.NewDefaultFramework("daemonsets")
  83. image := NginxImage
  84. dsName := "daemon-set"
  85. var ns string
  86. var c clientset.Interface
  87. ginkgo.BeforeEach(func() {
  88. ns = f.Namespace.Name
  89. c = f.ClientSet
  90. updatedNS, err := updateNamespaceAnnotations(c, ns)
  91. framework.ExpectNoError(err)
  92. ns = updatedNS.Name
  93. err = clearDaemonSetNodeLabels(c)
  94. framework.ExpectNoError(err)
  95. })
  96. /*
  97. Testname: DaemonSet-Creation
  98. Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet
  99. Pod is deleted, the DaemonSet controller MUST create a replacement Pod.
  100. */
  101. framework.ConformanceIt("should run and stop simple daemon", func() {
  102. label := map[string]string{daemonsetNameLabel: dsName}
  103. ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
  104. ds, err := c.AppsV1().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
  105. framework.ExpectNoError(err)
  106. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  107. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  108. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  109. err = checkDaemonStatus(f, dsName)
  110. framework.ExpectNoError(err)
  111. ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
  112. podList := listDaemonPods(c, ns, label)
  113. pod := podList.Items[0]
  114. err = c.CoreV1().Pods(ns).Delete(pod.Name, nil)
  115. framework.ExpectNoError(err)
  116. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  117. framework.ExpectNoError(err, "error waiting for daemon pod to revive")
  118. })
  119. /*
  120. Testname: DaemonSet-NodeSelection
  121. Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label
  122. selectors.
  123. */
  124. framework.ConformanceIt("should run and stop complex daemon", func() {
  125. complexLabel := map[string]string{daemonsetNameLabel: dsName}
  126. nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
  127. e2elog.Logf("Creating daemon %q with a node selector", dsName)
  128. ds := newDaemonSet(dsName, image, complexLabel)
  129. ds.Spec.Template.Spec.NodeSelector = nodeSelector
  130. ds, err := c.AppsV1().DaemonSets(ns).Create(ds)
  131. framework.ExpectNoError(err)
  132. ginkgo.By("Initially, daemon pods should not be running on any nodes.")
  133. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  134. framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
  135. ginkgo.By("Change node label to blue, check that daemon pod is launched.")
  136. nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
  137. gomega.Expect(len(nodeList.Items)).To(gomega.BeNumerically(">", 0))
  138. newNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector)
  139. framework.ExpectNoError(err, "error setting labels on node")
  140. daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
  141. gomega.Expect(len(daemonSetLabels)).To(gomega.Equal(1))
  142. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
  143. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  144. err = checkDaemonStatus(f, dsName)
  145. framework.ExpectNoError(err)
  146. ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
  147. nodeSelector[daemonsetColorLabel] = "green"
  148. greenNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector)
  149. framework.ExpectNoError(err, "error removing labels on node")
  150. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  151. framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
  152. ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
  153. patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
  154. daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
  155. ds, err = c.AppsV1().DaemonSets(ns).Patch(dsName, types.StrategicMergePatchType, []byte(patch))
  156. framework.ExpectNoError(err, "error patching daemon set")
  157. daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
  158. gomega.Expect(len(daemonSetLabels)).To(gomega.Equal(1))
  159. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
  160. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  161. err = checkDaemonStatus(f, dsName)
  162. framework.ExpectNoError(err)
  163. })
  164. // We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the
  165. // default scheduler.
  166. ginkgo.It("should run and stop complex daemon with node affinity", func() {
  167. complexLabel := map[string]string{daemonsetNameLabel: dsName}
  168. nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
  169. e2elog.Logf("Creating daemon %q with a node affinity", dsName)
  170. ds := newDaemonSet(dsName, image, complexLabel)
  171. ds.Spec.Template.Spec.Affinity = &v1.Affinity{
  172. NodeAffinity: &v1.NodeAffinity{
  173. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  174. NodeSelectorTerms: []v1.NodeSelectorTerm{
  175. {
  176. MatchExpressions: []v1.NodeSelectorRequirement{
  177. {
  178. Key: daemonsetColorLabel,
  179. Operator: v1.NodeSelectorOpIn,
  180. Values: []string{nodeSelector[daemonsetColorLabel]},
  181. },
  182. },
  183. },
  184. },
  185. },
  186. },
  187. }
  188. ds, err := c.AppsV1().DaemonSets(ns).Create(ds)
  189. framework.ExpectNoError(err)
  190. ginkgo.By("Initially, daemon pods should not be running on any nodes.")
  191. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  192. framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
  193. ginkgo.By("Change node label to blue, check that daemon pod is launched.")
  194. nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
  195. gomega.Expect(len(nodeList.Items)).To(gomega.BeNumerically(">", 0))
  196. newNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector)
  197. framework.ExpectNoError(err, "error setting labels on node")
  198. daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
  199. gomega.Expect(len(daemonSetLabels)).To(gomega.Equal(1))
  200. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
  201. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  202. err = checkDaemonStatus(f, dsName)
  203. framework.ExpectNoError(err)
  204. ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
  205. _, err = setDaemonSetNodeLabels(c, nodeList.Items[0].Name, map[string]string{})
  206. framework.ExpectNoError(err, "error removing labels on node")
  207. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  208. framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
  209. })
  210. /*
  211. Testname: DaemonSet-FailedPodCreation
  212. Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail.
  213. */
  214. framework.ConformanceIt("should retry creating failed daemon pods", func() {
  215. label := map[string]string{daemonsetNameLabel: dsName}
  216. ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
  217. ds, err := c.AppsV1().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
  218. framework.ExpectNoError(err)
  219. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  220. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  221. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  222. err = checkDaemonStatus(f, dsName)
  223. framework.ExpectNoError(err)
  224. ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
  225. podList := listDaemonPods(c, ns, label)
  226. pod := podList.Items[0]
  227. pod.ResourceVersion = ""
  228. pod.Status.Phase = v1.PodFailed
  229. _, err = c.CoreV1().Pods(ns).UpdateStatus(&pod)
  230. framework.ExpectNoError(err, "error failing a daemon pod")
  231. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  232. framework.ExpectNoError(err, "error waiting for daemon pod to revive")
  233. ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
  234. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, waitFailedDaemonPodDeleted(c, &pod))
  235. framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
  236. })
  237. // This test should not be added to conformance. We will consider deprecating OnDelete when the
  238. // extensions/v1beta1 and apps/v1beta1 are removed.
  239. ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func() {
  240. label := map[string]string{daemonsetNameLabel: dsName}
  241. e2elog.Logf("Creating simple daemon set %s", dsName)
  242. ds := newDaemonSet(dsName, image, label)
  243. ds.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{Type: apps.OnDeleteDaemonSetStrategyType}
  244. ds, err := c.AppsV1().DaemonSets(ns).Create(ds)
  245. framework.ExpectNoError(err)
  246. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  247. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  248. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  249. // Check history and labels
  250. ds, err = c.AppsV1().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
  251. framework.ExpectNoError(err)
  252. waitForHistoryCreated(c, ns, label, 1)
  253. first := curHistory(listDaemonHistories(c, ns, label), ds)
  254. firstHash := first.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  255. gomega.Expect(first.Revision).To(gomega.Equal(int64(1)))
  256. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
  257. ginkgo.By("Update daemon pods image.")
  258. patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, RedisImage)
  259. ds, err = c.AppsV1().DaemonSets(ns).Patch(dsName, types.StrategicMergePatchType, []byte(patch))
  260. framework.ExpectNoError(err)
  261. ginkgo.By("Check that daemon pods images aren't updated.")
  262. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
  263. framework.ExpectNoError(err)
  264. ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
  265. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  266. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  267. // Check history and labels
  268. ds, err = c.AppsV1().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
  269. framework.ExpectNoError(err)
  270. waitForHistoryCreated(c, ns, label, 2)
  271. cur := curHistory(listDaemonHistories(c, ns, label), ds)
  272. gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
  273. gomega.Expect(cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]).NotTo(gomega.Equal(firstHash))
  274. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
  275. })
  276. /*
  277. Testname: DaemonSet-RollingUpdate
  278. Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates.
  279. */
  280. framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func() {
  281. label := map[string]string{daemonsetNameLabel: dsName}
  282. e2elog.Logf("Creating simple daemon set %s", dsName)
  283. ds := newDaemonSet(dsName, image, label)
  284. ds.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{Type: apps.RollingUpdateDaemonSetStrategyType}
  285. ds, err := c.AppsV1().DaemonSets(ns).Create(ds)
  286. framework.ExpectNoError(err)
  287. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  288. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  289. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  290. // Check history and labels
  291. ds, err = c.AppsV1().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
  292. framework.ExpectNoError(err)
  293. waitForHistoryCreated(c, ns, label, 1)
  294. cur := curHistory(listDaemonHistories(c, ns, label), ds)
  295. hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  296. gomega.Expect(cur.Revision).To(gomega.Equal(int64(1)))
  297. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
  298. ginkgo.By("Update daemon pods image.")
  299. patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, RedisImage)
  300. ds, err = c.AppsV1().DaemonSets(ns).Patch(dsName, types.StrategicMergePatchType, []byte(patch))
  301. framework.ExpectNoError(err)
  302. // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
  303. // Get the number of nodes, and set the timeout appropriately.
  304. nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
  305. framework.ExpectNoError(err)
  306. nodeCount := len(nodes.Items)
  307. retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
  308. ginkgo.By("Check that daemon pods images are updated.")
  309. err = wait.PollImmediate(dsRetryPeriod, retryTimeout, checkDaemonPodsImageAndAvailability(c, ds, RedisImage, 1))
  310. framework.ExpectNoError(err)
  311. ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
  312. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  313. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  314. // Check history and labels
  315. ds, err = c.AppsV1().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
  316. framework.ExpectNoError(err)
  317. waitForHistoryCreated(c, ns, label, 2)
  318. cur = curHistory(listDaemonHistories(c, ns, label), ds)
  319. hash = cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  320. gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
  321. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
  322. })
  323. /*
  324. Testname: DaemonSet-Rollback
  325. Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive
  326. rollback of updates to a DaemonSet.
  327. */
  328. framework.ConformanceIt("should rollback without unnecessary restarts", func() {
  329. schedulableNodes := framework.GetReadySchedulableNodesOrDie(c)
  330. gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
  331. e2elog.Logf("Create a RollingUpdate DaemonSet")
  332. label := map[string]string{daemonsetNameLabel: dsName}
  333. ds := newDaemonSet(dsName, image, label)
  334. ds.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{Type: apps.RollingUpdateDaemonSetStrategyType}
  335. ds, err := c.AppsV1().DaemonSets(ns).Create(ds)
  336. framework.ExpectNoError(err)
  337. e2elog.Logf("Check that daemon pods launch on every node of the cluster")
  338. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  339. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  340. e2elog.Logf("Update the DaemonSet to trigger a rollout")
  341. // We use a nonexistent image here, so that we make sure it won't finish
  342. newImage := "foo:non-existent"
  343. newDS, err := framework.UpdateDaemonSetWithRetries(c, ns, ds.Name, func(update *apps.DaemonSet) {
  344. update.Spec.Template.Spec.Containers[0].Image = newImage
  345. })
  346. framework.ExpectNoError(err)
  347. // Make sure we're in the middle of a rollout
  348. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkAtLeastOneNewPod(c, ns, label, newImage))
  349. framework.ExpectNoError(err)
  350. pods := listDaemonPods(c, ns, label)
  351. var existingPods, newPods []*v1.Pod
  352. for i := range pods.Items {
  353. pod := pods.Items[i]
  354. image := pod.Spec.Containers[0].Image
  355. switch image {
  356. case ds.Spec.Template.Spec.Containers[0].Image:
  357. existingPods = append(existingPods, &pod)
  358. case newDS.Spec.Template.Spec.Containers[0].Image:
  359. newPods = append(newPods, &pod)
  360. default:
  361. framework.Failf("unexpected pod found, image = %s", image)
  362. }
  363. }
  364. schedulableNodes = framework.GetReadySchedulableNodesOrDie(c)
  365. if len(schedulableNodes.Items) < 2 {
  366. gomega.Expect(len(existingPods)).To(gomega.Equal(0))
  367. } else {
  368. gomega.Expect(len(existingPods)).NotTo(gomega.Equal(0))
  369. }
  370. gomega.Expect(len(newPods)).NotTo(gomega.Equal(0))
  371. e2elog.Logf("Roll back the DaemonSet before rollout is complete")
  372. rollbackDS, err := framework.UpdateDaemonSetWithRetries(c, ns, ds.Name, func(update *apps.DaemonSet) {
  373. update.Spec.Template.Spec.Containers[0].Image = image
  374. })
  375. framework.ExpectNoError(err)
  376. e2elog.Logf("Make sure DaemonSet rollback is complete")
  377. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
  378. framework.ExpectNoError(err)
  379. // After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted
  380. pods = listDaemonPods(c, ns, label)
  381. rollbackPods := map[string]bool{}
  382. for _, pod := range pods.Items {
  383. rollbackPods[pod.Name] = true
  384. }
  385. for _, pod := range existingPods {
  386. gomega.Expect(rollbackPods[pod.Name]).To(gomega.BeTrue(), fmt.Sprintf("unexpected pod %s be restarted", pod.Name))
  387. }
  388. })
  389. })
  390. // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
  391. func getDaemonSetImagePatch(containerName, containerImage string) string {
  392. return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
  393. }
  394. func newDaemonSet(dsName, image string, label map[string]string) *apps.DaemonSet {
  395. return &apps.DaemonSet{
  396. ObjectMeta: metav1.ObjectMeta{
  397. Name: dsName,
  398. },
  399. Spec: apps.DaemonSetSpec{
  400. Selector: &metav1.LabelSelector{
  401. MatchLabels: label,
  402. },
  403. Template: v1.PodTemplateSpec{
  404. ObjectMeta: metav1.ObjectMeta{
  405. Labels: label,
  406. },
  407. Spec: v1.PodSpec{
  408. Containers: []v1.Container{
  409. {
  410. Name: "app",
  411. Image: image,
  412. Ports: []v1.ContainerPort{{ContainerPort: 9376}},
  413. },
  414. },
  415. },
  416. },
  417. },
  418. }
  419. }
  420. func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList {
  421. selector := labels.Set(label).AsSelector()
  422. options := metav1.ListOptions{LabelSelector: selector.String()}
  423. podList, err := c.CoreV1().Pods(ns).List(options)
  424. framework.ExpectNoError(err)
  425. gomega.Expect(len(podList.Items)).To(gomega.BeNumerically(">", 0))
  426. return podList
  427. }
  428. func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
  429. daemonSetLabels := map[string]string{}
  430. otherLabels := map[string]string{}
  431. for k, v := range labels {
  432. if strings.HasPrefix(k, daemonsetLabelPrefix) {
  433. daemonSetLabels[k] = v
  434. } else {
  435. otherLabels[k] = v
  436. }
  437. }
  438. return daemonSetLabels, otherLabels
  439. }
  440. func clearDaemonSetNodeLabels(c clientset.Interface) error {
  441. nodeList := framework.GetReadySchedulableNodesOrDie(c)
  442. for _, node := range nodeList.Items {
  443. _, err := setDaemonSetNodeLabels(c, node.Name, map[string]string{})
  444. if err != nil {
  445. return err
  446. }
  447. }
  448. return nil
  449. }
  450. // updateNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty
  451. func updateNamespaceAnnotations(c clientset.Interface, nsName string) (*v1.Namespace, error) {
  452. nsClient := c.CoreV1().Namespaces()
  453. ns, err := nsClient.Get(nsName, metav1.GetOptions{})
  454. if err != nil {
  455. return nil, err
  456. }
  457. if ns.Annotations == nil {
  458. ns.Annotations = make(map[string]string)
  459. }
  460. for _, n := range NamespaceNodeSelectors {
  461. ns.Annotations[n] = ""
  462. }
  463. return nsClient.Update(ns)
  464. }
  465. func setDaemonSetNodeLabels(c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
  466. nodeClient := c.CoreV1().Nodes()
  467. var newNode *v1.Node
  468. var newLabels map[string]string
  469. err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, func() (bool, error) {
  470. node, err := nodeClient.Get(nodeName, metav1.GetOptions{})
  471. if err != nil {
  472. return false, err
  473. }
  474. // remove all labels this test is creating
  475. daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
  476. if reflect.DeepEqual(daemonSetLabels, labels) {
  477. newNode = node
  478. return true, nil
  479. }
  480. node.Labels = otherLabels
  481. for k, v := range labels {
  482. node.Labels[k] = v
  483. }
  484. newNode, err = nodeClient.Update(node)
  485. if err == nil {
  486. newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
  487. return true, err
  488. }
  489. if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
  490. e2elog.Logf("failed to update node due to resource version conflict")
  491. return false, nil
  492. }
  493. return false, err
  494. })
  495. if err != nil {
  496. return nil, err
  497. } else if len(newLabels) != len(labels) {
  498. return nil, fmt.Errorf("Could not set daemon set test labels as expected")
  499. }
  500. return newNode, nil
  501. }
  502. func checkDaemonPodOnNodes(f *framework.Framework, ds *apps.DaemonSet, nodeNames []string) func() (bool, error) {
  503. return func() (bool, error) {
  504. podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(metav1.ListOptions{})
  505. if err != nil {
  506. e2elog.Logf("could not get the pod list: %v", err)
  507. return false, nil
  508. }
  509. pods := podList.Items
  510. nodesToPodCount := make(map[string]int)
  511. for _, pod := range pods {
  512. if !metav1.IsControlledBy(&pod, ds) {
  513. continue
  514. }
  515. if pod.DeletionTimestamp != nil {
  516. continue
  517. }
  518. if podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  519. nodesToPodCount[pod.Spec.NodeName]++
  520. }
  521. }
  522. e2elog.Logf("Number of nodes with available pods: %d", len(nodesToPodCount))
  523. // Ensure that exactly 1 pod is running on all nodes in nodeNames.
  524. for _, nodeName := range nodeNames {
  525. if nodesToPodCount[nodeName] != 1 {
  526. e2elog.Logf("Node %s is running more than one daemon pod", nodeName)
  527. return false, nil
  528. }
  529. }
  530. e2elog.Logf("Number of running nodes: %d, number of available pods: %d", len(nodeNames), len(nodesToPodCount))
  531. // Ensure that sizes of the lists are the same. We've verified that every element of nodeNames is in
  532. // nodesToPodCount, so verifying the lengths are equal ensures that there aren't pods running on any
  533. // other nodes.
  534. return len(nodesToPodCount) == len(nodeNames), nil
  535. }
  536. }
  537. func checkRunningOnAllNodes(f *framework.Framework, ds *apps.DaemonSet) func() (bool, error) {
  538. return func() (bool, error) {
  539. nodeNames := schedulableNodes(f.ClientSet, ds)
  540. return checkDaemonPodOnNodes(f, ds, nodeNames)()
  541. }
  542. }
  543. func schedulableNodes(c clientset.Interface, ds *apps.DaemonSet) []string {
  544. nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
  545. framework.ExpectNoError(err)
  546. nodeNames := make([]string, 0)
  547. for _, node := range nodeList.Items {
  548. if !canScheduleOnNode(node, ds) {
  549. e2elog.Logf("DaemonSet pods can't tolerate node %s with taints %+v, skip checking this node", node.Name, node.Spec.Taints)
  550. continue
  551. }
  552. nodeNames = append(nodeNames, node.Name)
  553. }
  554. return nodeNames
  555. }
  556. func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func() (bool, error) {
  557. return func() (bool, error) {
  558. pods := listDaemonPods(c, ns, label)
  559. for _, pod := range pods.Items {
  560. if pod.Spec.Containers[0].Image == newImage {
  561. return true, nil
  562. }
  563. }
  564. return false, nil
  565. }
  566. }
  567. // canScheduleOnNode checks if a given DaemonSet can schedule pods on the given node
  568. func canScheduleOnNode(node v1.Node, ds *apps.DaemonSet) bool {
  569. newPod := daemon.NewPod(ds, node.Name)
  570. nodeInfo := schedulernodeinfo.NewNodeInfo()
  571. nodeInfo.SetNode(&node)
  572. fit, _, err := daemon.Predicates(newPod, nodeInfo)
  573. if err != nil {
  574. framework.Failf("Can't test DaemonSet predicates for node %s: %v", node.Name, err)
  575. return false
  576. }
  577. return fit
  578. }
  579. func checkRunningOnNoNodes(f *framework.Framework, ds *apps.DaemonSet) func() (bool, error) {
  580. return checkDaemonPodOnNodes(f, ds, make([]string, 0))
  581. }
  582. func checkDaemonStatus(f *framework.Framework, dsName string) error {
  583. ds, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Get(dsName, metav1.GetOptions{})
  584. if err != nil {
  585. return fmt.Errorf("Could not get daemon set from v1")
  586. }
  587. desired, scheduled, ready := ds.Status.DesiredNumberScheduled, ds.Status.CurrentNumberScheduled, ds.Status.NumberReady
  588. if desired != scheduled && desired != ready {
  589. return fmt.Errorf("Error in daemon status. DesiredScheduled: %d, CurrentScheduled: %d, Ready: %d", desired, scheduled, ready)
  590. }
  591. return nil
  592. }
  593. func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *apps.DaemonSet, image string, maxUnavailable int) func() (bool, error) {
  594. return func() (bool, error) {
  595. podList, err := c.CoreV1().Pods(ds.Namespace).List(metav1.ListOptions{})
  596. if err != nil {
  597. return false, err
  598. }
  599. pods := podList.Items
  600. unavailablePods := 0
  601. nodesToUpdatedPodCount := make(map[string]int)
  602. for _, pod := range pods {
  603. if !metav1.IsControlledBy(&pod, ds) {
  604. continue
  605. }
  606. podImage := pod.Spec.Containers[0].Image
  607. if podImage != image {
  608. e2elog.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
  609. } else {
  610. nodesToUpdatedPodCount[pod.Spec.NodeName]++
  611. }
  612. if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  613. e2elog.Logf("Pod %s is not available", pod.Name)
  614. unavailablePods++
  615. }
  616. }
  617. if unavailablePods > maxUnavailable {
  618. return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
  619. }
  620. // Make sure every daemon pod on the node has been updated
  621. nodeNames := schedulableNodes(c, ds)
  622. for _, node := range nodeNames {
  623. if nodesToUpdatedPodCount[node] == 0 {
  624. return false, nil
  625. }
  626. }
  627. return true, nil
  628. }
  629. }
  630. func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
  631. for _, pod := range podList.Items {
  632. podHash := pod.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  633. gomega.Expect(len(podHash)).To(gomega.BeNumerically(">", 0))
  634. if len(hash) > 0 {
  635. gomega.Expect(podHash).To(gomega.Equal(hash))
  636. }
  637. }
  638. }
  639. func waitForHistoryCreated(c clientset.Interface, ns string, label map[string]string, numHistory int) {
  640. listHistoryFn := func() (bool, error) {
  641. selector := labels.Set(label).AsSelector()
  642. options := metav1.ListOptions{LabelSelector: selector.String()}
  643. historyList, err := c.AppsV1().ControllerRevisions(ns).List(options)
  644. if err != nil {
  645. return false, err
  646. }
  647. if len(historyList.Items) == numHistory {
  648. return true, nil
  649. }
  650. e2elog.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
  651. return false, nil
  652. }
  653. err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, listHistoryFn)
  654. framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
  655. }
  656. func listDaemonHistories(c clientset.Interface, ns string, label map[string]string) *apps.ControllerRevisionList {
  657. selector := labels.Set(label).AsSelector()
  658. options := metav1.ListOptions{LabelSelector: selector.String()}
  659. historyList, err := c.AppsV1().ControllerRevisions(ns).List(options)
  660. framework.ExpectNoError(err)
  661. gomega.Expect(len(historyList.Items)).To(gomega.BeNumerically(">", 0))
  662. return historyList
  663. }
  664. func curHistory(historyList *apps.ControllerRevisionList, ds *apps.DaemonSet) *apps.ControllerRevision {
  665. var curHistory *apps.ControllerRevision
  666. foundCurHistories := 0
  667. for i := range historyList.Items {
  668. history := &historyList.Items[i]
  669. // Every history should have the hash label
  670. gomega.Expect(len(history.Labels[apps.DefaultDaemonSetUniqueLabelKey])).To(gomega.BeNumerically(">", 0))
  671. match, err := daemon.Match(ds, history)
  672. framework.ExpectNoError(err)
  673. if match {
  674. curHistory = history
  675. foundCurHistories++
  676. }
  677. }
  678. gomega.Expect(foundCurHistories).To(gomega.Equal(1))
  679. gomega.Expect(curHistory).NotTo(gomega.BeNil())
  680. return curHistory
  681. }
  682. func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func() (bool, error) {
  683. return func() (bool, error) {
  684. if _, err := c.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
  685. if apierrors.IsNotFound(err) {
  686. return true, nil
  687. }
  688. return false, fmt.Errorf("failed to get failed daemon pod %q: %v", pod.Name, err)
  689. }
  690. return false, nil
  691. }
  692. }