daemon_set.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apps
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "strings"
  19. "time"
  20. appsv1 "k8s.io/api/apps/v1"
  21. v1 "k8s.io/api/core/v1"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/runtime"
  26. "k8s.io/apimachinery/pkg/types"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/scheme"
  30. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  31. extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
  32. "k8s.io/kubernetes/pkg/controller/daemon"
  33. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. e2enode "k8s.io/kubernetes/test/e2e/framework/node"
  36. testutils "k8s.io/kubernetes/test/utils"
  37. "github.com/onsi/ginkgo"
  38. "github.com/onsi/gomega"
  39. )
  40. const (
  41. // this should not be a multiple of 5, because node status updates
  42. // every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915.
  43. dsRetryPeriod = 1 * time.Second
  44. dsRetryTimeout = 5 * time.Minute
  45. daemonsetLabelPrefix = "daemonset-"
  46. daemonsetNameLabel = daemonsetLabelPrefix + "name"
  47. daemonsetColorLabel = daemonsetLabelPrefix + "color"
  48. )
  49. // NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning
  50. // node selectors labels to namespaces
  51. var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
  52. type updateDSFunc func(*appsv1.DaemonSet)
  53. // updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func
  54. // until it succeeds or a timeout expires.
  55. func updateDaemonSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) {
  56. daemonsets := c.AppsV1().DaemonSets(namespace)
  57. var updateErr error
  58. pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
  59. if ds, err = daemonsets.Get(context.TODO(), name, metav1.GetOptions{}); err != nil {
  60. if testutils.IsRetryableAPIError(err) {
  61. return false, nil
  62. }
  63. return false, err
  64. }
  65. // Apply the update, then attempt to push it to the apiserver.
  66. applyUpdate(ds)
  67. if ds, err = daemonsets.Update(context.TODO(), ds, metav1.UpdateOptions{}); err == nil {
  68. framework.Logf("Updating DaemonSet %s", name)
  69. return true, nil
  70. }
  71. updateErr = err
  72. return false, nil
  73. })
  74. if pollErr == wait.ErrWaitTimeout {
  75. pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr)
  76. }
  77. return ds, pollErr
  78. }
  79. // This test must be run in serial because it assumes the Daemon Set pods will
  80. // always get scheduled. If we run other tests in parallel, this may not
  81. // happen. In the future, running in parallel may work if we have an eviction
  82. // model which lets the DS controller kick out other pods to make room.
  83. // See http://issues.k8s.io/21767 for more details
  84. var _ = SIGDescribe("Daemon set [Serial]", func() {
  85. var f *framework.Framework
  86. ginkgo.AfterEach(func() {
  87. // Clean up
  88. daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
  89. framework.ExpectNoError(err, "unable to dump DaemonSets")
  90. if daemonsets != nil && len(daemonsets.Items) > 0 {
  91. for _, ds := range daemonsets.Items {
  92. ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
  93. framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
  94. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
  95. framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
  96. }
  97. }
  98. if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil {
  99. framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
  100. } else {
  101. framework.Logf("unable to dump daemonsets: %v", err)
  102. }
  103. if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil {
  104. framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
  105. } else {
  106. framework.Logf("unable to dump pods: %v", err)
  107. }
  108. err = clearDaemonSetNodeLabels(f.ClientSet)
  109. framework.ExpectNoError(err)
  110. })
  111. f = framework.NewDefaultFramework("daemonsets")
  112. image := WebserverImage
  113. dsName := "daemon-set"
  114. var ns string
  115. var c clientset.Interface
  116. ginkgo.BeforeEach(func() {
  117. ns = f.Namespace.Name
  118. c = f.ClientSet
  119. updatedNS, err := updateNamespaceAnnotations(c, ns)
  120. framework.ExpectNoError(err)
  121. ns = updatedNS.Name
  122. err = clearDaemonSetNodeLabels(c)
  123. framework.ExpectNoError(err)
  124. })
  125. /*
  126. Testname: DaemonSet-Creation
  127. Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet
  128. Pod is deleted, the DaemonSet controller MUST create a replacement Pod.
  129. */
  130. framework.ConformanceIt("should run and stop simple daemon", func() {
  131. label := map[string]string{daemonsetNameLabel: dsName}
  132. ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
  133. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{})
  134. framework.ExpectNoError(err)
  135. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  136. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  137. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  138. err = checkDaemonStatus(f, dsName)
  139. framework.ExpectNoError(err)
  140. ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
  141. podList := listDaemonPods(c, ns, label)
  142. pod := podList.Items[0]
  143. err = c.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, nil)
  144. framework.ExpectNoError(err)
  145. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  146. framework.ExpectNoError(err, "error waiting for daemon pod to revive")
  147. })
  148. /*
  149. Testname: DaemonSet-NodeSelection
  150. Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label
  151. selectors.
  152. */
  153. framework.ConformanceIt("should run and stop complex daemon", func() {
  154. complexLabel := map[string]string{daemonsetNameLabel: dsName}
  155. nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
  156. framework.Logf("Creating daemon %q with a node selector", dsName)
  157. ds := newDaemonSet(dsName, image, complexLabel)
  158. ds.Spec.Template.Spec.NodeSelector = nodeSelector
  159. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  160. framework.ExpectNoError(err)
  161. ginkgo.By("Initially, daemon pods should not be running on any nodes.")
  162. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  163. framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
  164. ginkgo.By("Change node label to blue, check that daemon pod is launched.")
  165. node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
  166. framework.ExpectNoError(err)
  167. newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
  168. framework.ExpectNoError(err, "error setting labels on node")
  169. daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
  170. framework.ExpectEqual(len(daemonSetLabels), 1)
  171. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
  172. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  173. err = checkDaemonStatus(f, dsName)
  174. framework.ExpectNoError(err)
  175. ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
  176. nodeSelector[daemonsetColorLabel] = "green"
  177. greenNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
  178. framework.ExpectNoError(err, "error removing labels on node")
  179. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  180. framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
  181. ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
  182. patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
  183. daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
  184. ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
  185. framework.ExpectNoError(err, "error patching daemon set")
  186. daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
  187. framework.ExpectEqual(len(daemonSetLabels), 1)
  188. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
  189. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  190. err = checkDaemonStatus(f, dsName)
  191. framework.ExpectNoError(err)
  192. })
  193. // We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the
  194. // default scheduler.
  195. ginkgo.It("should run and stop complex daemon with node affinity", func() {
  196. complexLabel := map[string]string{daemonsetNameLabel: dsName}
  197. nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
  198. framework.Logf("Creating daemon %q with a node affinity", dsName)
  199. ds := newDaemonSet(dsName, image, complexLabel)
  200. ds.Spec.Template.Spec.Affinity = &v1.Affinity{
  201. NodeAffinity: &v1.NodeAffinity{
  202. RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  203. NodeSelectorTerms: []v1.NodeSelectorTerm{
  204. {
  205. MatchExpressions: []v1.NodeSelectorRequirement{
  206. {
  207. Key: daemonsetColorLabel,
  208. Operator: v1.NodeSelectorOpIn,
  209. Values: []string{nodeSelector[daemonsetColorLabel]},
  210. },
  211. },
  212. },
  213. },
  214. },
  215. },
  216. }
  217. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  218. framework.ExpectNoError(err)
  219. ginkgo.By("Initially, daemon pods should not be running on any nodes.")
  220. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  221. framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
  222. ginkgo.By("Change node label to blue, check that daemon pod is launched.")
  223. node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
  224. framework.ExpectNoError(err)
  225. newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
  226. framework.ExpectNoError(err, "error setting labels on node")
  227. daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
  228. framework.ExpectEqual(len(daemonSetLabels), 1)
  229. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
  230. framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
  231. err = checkDaemonStatus(f, dsName)
  232. framework.ExpectNoError(err)
  233. ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
  234. _, err = setDaemonSetNodeLabels(c, node.Name, map[string]string{})
  235. framework.ExpectNoError(err, "error removing labels on node")
  236. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
  237. framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
  238. })
  239. /*
  240. Testname: DaemonSet-FailedPodCreation
  241. Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail.
  242. */
  243. framework.ConformanceIt("should retry creating failed daemon pods", func() {
  244. label := map[string]string{daemonsetNameLabel: dsName}
  245. ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
  246. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{})
  247. framework.ExpectNoError(err)
  248. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  249. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  250. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  251. err = checkDaemonStatus(f, dsName)
  252. framework.ExpectNoError(err)
  253. ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
  254. podList := listDaemonPods(c, ns, label)
  255. pod := podList.Items[0]
  256. pod.ResourceVersion = ""
  257. pod.Status.Phase = v1.PodFailed
  258. _, err = c.CoreV1().Pods(ns).UpdateStatus(context.TODO(), &pod, metav1.UpdateOptions{})
  259. framework.ExpectNoError(err, "error failing a daemon pod")
  260. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  261. framework.ExpectNoError(err, "error waiting for daemon pod to revive")
  262. ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
  263. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, waitFailedDaemonPodDeleted(c, &pod))
  264. framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
  265. })
  266. // This test should not be added to conformance. We will consider deprecating OnDelete when the
  267. // extensions/v1beta1 and apps/v1beta1 are removed.
  268. ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func() {
  269. label := map[string]string{daemonsetNameLabel: dsName}
  270. framework.Logf("Creating simple daemon set %s", dsName)
  271. ds := newDaemonSet(dsName, image, label)
  272. ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}
  273. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  274. framework.ExpectNoError(err)
  275. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  276. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  277. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  278. // Check history and labels
  279. ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  280. framework.ExpectNoError(err)
  281. waitForHistoryCreated(c, ns, label, 1)
  282. first := curHistory(listDaemonHistories(c, ns, label), ds)
  283. firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
  284. framework.ExpectEqual(first.Revision, int64(1))
  285. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
  286. ginkgo.By("Update daemon pods image.")
  287. patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
  288. ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
  289. framework.ExpectNoError(err)
  290. ginkgo.By("Check that daemon pods images aren't updated.")
  291. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
  292. framework.ExpectNoError(err)
  293. ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
  294. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  295. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  296. // Check history and labels
  297. ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  298. framework.ExpectNoError(err)
  299. waitForHistoryCreated(c, ns, label, 2)
  300. cur := curHistory(listDaemonHistories(c, ns, label), ds)
  301. framework.ExpectEqual(cur.Revision, int64(2))
  302. framework.ExpectNotEqual(cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey], firstHash)
  303. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
  304. })
  305. /*
  306. Testname: DaemonSet-RollingUpdate
  307. Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates.
  308. */
  309. framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func() {
  310. label := map[string]string{daemonsetNameLabel: dsName}
  311. framework.Logf("Creating simple daemon set %s", dsName)
  312. ds := newDaemonSet(dsName, image, label)
  313. ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
  314. ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  315. framework.ExpectNoError(err)
  316. ginkgo.By("Check that daemon pods launch on every node of the cluster.")
  317. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  318. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  319. // Check history and labels
  320. ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  321. framework.ExpectNoError(err)
  322. waitForHistoryCreated(c, ns, label, 1)
  323. cur := curHistory(listDaemonHistories(c, ns, label), ds)
  324. hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
  325. framework.ExpectEqual(cur.Revision, int64(1))
  326. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
  327. ginkgo.By("Update daemon pods image.")
  328. patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
  329. ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
  330. framework.ExpectNoError(err)
  331. // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
  332. // Get the number of nodes, and set the timeout appropriately.
  333. nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  334. framework.ExpectNoError(err)
  335. nodeCount := len(nodes.Items)
  336. retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
  337. ginkgo.By("Check that daemon pods images are updated.")
  338. err = wait.PollImmediate(dsRetryPeriod, retryTimeout, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1))
  339. framework.ExpectNoError(err)
  340. ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
  341. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  342. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  343. // Check history and labels
  344. ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  345. framework.ExpectNoError(err)
  346. waitForHistoryCreated(c, ns, label, 2)
  347. cur = curHistory(listDaemonHistories(c, ns, label), ds)
  348. hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
  349. framework.ExpectEqual(cur.Revision, int64(2))
  350. checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
  351. })
  352. /*
  353. Testname: DaemonSet-Rollback
  354. Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive
  355. rollback of updates to a DaemonSet.
  356. */
  357. framework.ConformanceIt("should rollback without unnecessary restarts", func() {
  358. schedulableNodes, err := e2enode.GetReadySchedulableNodes(c)
  359. framework.ExpectNoError(err)
  360. gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
  361. framework.Logf("Create a RollingUpdate DaemonSet")
  362. label := map[string]string{daemonsetNameLabel: dsName}
  363. ds := newDaemonSet(dsName, image, label)
  364. ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
  365. ds, err = c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
  366. framework.ExpectNoError(err)
  367. framework.Logf("Check that daemon pods launch on every node of the cluster")
  368. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
  369. framework.ExpectNoError(err, "error waiting for daemon pod to start")
  370. framework.Logf("Update the DaemonSet to trigger a rollout")
  371. // We use a nonexistent image here, so that we make sure it won't finish
  372. newImage := "foo:non-existent"
  373. newDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) {
  374. update.Spec.Template.Spec.Containers[0].Image = newImage
  375. })
  376. framework.ExpectNoError(err)
  377. // Make sure we're in the middle of a rollout
  378. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkAtLeastOneNewPod(c, ns, label, newImage))
  379. framework.ExpectNoError(err)
  380. pods := listDaemonPods(c, ns, label)
  381. var existingPods, newPods []*v1.Pod
  382. for i := range pods.Items {
  383. pod := pods.Items[i]
  384. image := pod.Spec.Containers[0].Image
  385. switch image {
  386. case ds.Spec.Template.Spec.Containers[0].Image:
  387. existingPods = append(existingPods, &pod)
  388. case newDS.Spec.Template.Spec.Containers[0].Image:
  389. newPods = append(newPods, &pod)
  390. default:
  391. framework.Failf("unexpected pod found, image = %s", image)
  392. }
  393. }
  394. schedulableNodes, err = e2enode.GetReadySchedulableNodes(c)
  395. framework.ExpectNoError(err)
  396. if len(schedulableNodes.Items) < 2 {
  397. framework.ExpectEqual(len(existingPods), 0)
  398. } else {
  399. framework.ExpectNotEqual(len(existingPods), 0)
  400. }
  401. framework.ExpectNotEqual(len(newPods), 0)
  402. framework.Logf("Roll back the DaemonSet before rollout is complete")
  403. rollbackDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) {
  404. update.Spec.Template.Spec.Containers[0].Image = image
  405. })
  406. framework.ExpectNoError(err)
  407. framework.Logf("Make sure DaemonSet rollback is complete")
  408. err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
  409. framework.ExpectNoError(err)
  410. // After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted
  411. pods = listDaemonPods(c, ns, label)
  412. rollbackPods := map[string]bool{}
  413. for _, pod := range pods.Items {
  414. rollbackPods[pod.Name] = true
  415. }
  416. for _, pod := range existingPods {
  417. framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name))
  418. }
  419. })
  420. })
  421. // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
  422. func getDaemonSetImagePatch(containerName, containerImage string) string {
  423. return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
  424. }
  425. func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet {
  426. return &appsv1.DaemonSet{
  427. ObjectMeta: metav1.ObjectMeta{
  428. Name: dsName,
  429. },
  430. Spec: appsv1.DaemonSetSpec{
  431. Selector: &metav1.LabelSelector{
  432. MatchLabels: label,
  433. },
  434. Template: v1.PodTemplateSpec{
  435. ObjectMeta: metav1.ObjectMeta{
  436. Labels: label,
  437. },
  438. Spec: v1.PodSpec{
  439. Containers: []v1.Container{
  440. {
  441. Name: "app",
  442. Image: image,
  443. Ports: []v1.ContainerPort{{ContainerPort: 9376}},
  444. },
  445. },
  446. },
  447. },
  448. },
  449. }
  450. }
  451. func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList {
  452. selector := labels.Set(label).AsSelector()
  453. options := metav1.ListOptions{LabelSelector: selector.String()}
  454. podList, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
  455. framework.ExpectNoError(err)
  456. gomega.Expect(len(podList.Items)).To(gomega.BeNumerically(">", 0))
  457. return podList
  458. }
  459. func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
  460. daemonSetLabels := map[string]string{}
  461. otherLabels := map[string]string{}
  462. for k, v := range labels {
  463. if strings.HasPrefix(k, daemonsetLabelPrefix) {
  464. daemonSetLabels[k] = v
  465. } else {
  466. otherLabels[k] = v
  467. }
  468. }
  469. return daemonSetLabels, otherLabels
  470. }
  471. func clearDaemonSetNodeLabels(c clientset.Interface) error {
  472. nodeList, err := e2enode.GetReadySchedulableNodes(c)
  473. if err != nil {
  474. return err
  475. }
  476. for _, node := range nodeList.Items {
  477. _, err := setDaemonSetNodeLabels(c, node.Name, map[string]string{})
  478. if err != nil {
  479. return err
  480. }
  481. }
  482. return nil
  483. }
  484. // updateNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty
  485. func updateNamespaceAnnotations(c clientset.Interface, nsName string) (*v1.Namespace, error) {
  486. nsClient := c.CoreV1().Namespaces()
  487. ns, err := nsClient.Get(context.TODO(), nsName, metav1.GetOptions{})
  488. if err != nil {
  489. return nil, err
  490. }
  491. if ns.Annotations == nil {
  492. ns.Annotations = make(map[string]string)
  493. }
  494. for _, n := range NamespaceNodeSelectors {
  495. ns.Annotations[n] = ""
  496. }
  497. return nsClient.Update(context.TODO(), ns, metav1.UpdateOptions{})
  498. }
  499. func setDaemonSetNodeLabels(c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
  500. nodeClient := c.CoreV1().Nodes()
  501. var newNode *v1.Node
  502. var newLabels map[string]string
  503. err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, func() (bool, error) {
  504. node, err := nodeClient.Get(context.TODO(), nodeName, metav1.GetOptions{})
  505. if err != nil {
  506. return false, err
  507. }
  508. // remove all labels this test is creating
  509. daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
  510. if reflect.DeepEqual(daemonSetLabels, labels) {
  511. newNode = node
  512. return true, nil
  513. }
  514. node.Labels = otherLabels
  515. for k, v := range labels {
  516. node.Labels[k] = v
  517. }
  518. newNode, err = nodeClient.Update(context.TODO(), node, metav1.UpdateOptions{})
  519. if err == nil {
  520. newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
  521. return true, err
  522. }
  523. if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
  524. framework.Logf("failed to update node due to resource version conflict")
  525. return false, nil
  526. }
  527. return false, err
  528. })
  529. if err != nil {
  530. return nil, err
  531. } else if len(newLabels) != len(labels) {
  532. return nil, fmt.Errorf("Could not set daemon set test labels as expected")
  533. }
  534. return newNode, nil
  535. }
  536. func checkDaemonPodOnNodes(f *framework.Framework, ds *appsv1.DaemonSet, nodeNames []string) func() (bool, error) {
  537. return func() (bool, error) {
  538. podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
  539. if err != nil {
  540. framework.Logf("could not get the pod list: %v", err)
  541. return false, nil
  542. }
  543. pods := podList.Items
  544. nodesToPodCount := make(map[string]int)
  545. for _, pod := range pods {
  546. if !metav1.IsControlledBy(&pod, ds) {
  547. continue
  548. }
  549. if pod.DeletionTimestamp != nil {
  550. continue
  551. }
  552. if podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  553. nodesToPodCount[pod.Spec.NodeName]++
  554. }
  555. }
  556. framework.Logf("Number of nodes with available pods: %d", len(nodesToPodCount))
  557. // Ensure that exactly 1 pod is running on all nodes in nodeNames.
  558. for _, nodeName := range nodeNames {
  559. if nodesToPodCount[nodeName] != 1 {
  560. framework.Logf("Node %s is running more than one daemon pod", nodeName)
  561. return false, nil
  562. }
  563. }
  564. framework.Logf("Number of running nodes: %d, number of available pods: %d", len(nodeNames), len(nodesToPodCount))
  565. // Ensure that sizes of the lists are the same. We've verified that every element of nodeNames is in
  566. // nodesToPodCount, so verifying the lengths are equal ensures that there aren't pods running on any
  567. // other nodes.
  568. return len(nodesToPodCount) == len(nodeNames), nil
  569. }
  570. }
  571. func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {
  572. return func() (bool, error) {
  573. nodeNames := schedulableNodes(f.ClientSet, ds)
  574. return checkDaemonPodOnNodes(f, ds, nodeNames)()
  575. }
  576. }
  577. func schedulableNodes(c clientset.Interface, ds *appsv1.DaemonSet) []string {
  578. nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
  579. framework.ExpectNoError(err)
  580. nodeNames := make([]string, 0)
  581. for _, node := range nodeList.Items {
  582. if !canScheduleOnNode(node, ds) {
  583. framework.Logf("DaemonSet pods can't tolerate node %s with taints %+v, skip checking this node", node.Name, node.Spec.Taints)
  584. continue
  585. }
  586. nodeNames = append(nodeNames, node.Name)
  587. }
  588. return nodeNames
  589. }
  590. func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func() (bool, error) {
  591. return func() (bool, error) {
  592. pods := listDaemonPods(c, ns, label)
  593. for _, pod := range pods.Items {
  594. if pod.Spec.Containers[0].Image == newImage {
  595. return true, nil
  596. }
  597. }
  598. return false, nil
  599. }
  600. }
  601. // canScheduleOnNode checks if a given DaemonSet can schedule pods on the given node
  602. func canScheduleOnNode(node v1.Node, ds *appsv1.DaemonSet) bool {
  603. newPod := daemon.NewPod(ds, node.Name)
  604. nodeInfo := schedulernodeinfo.NewNodeInfo()
  605. nodeInfo.SetNode(&node)
  606. taints, err := nodeInfo.Taints()
  607. if err != nil {
  608. framework.Failf("Can't test DaemonSet predicates for node %s: %v", node.Name, err)
  609. return false
  610. }
  611. fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.Predicates(newPod, &node, taints)
  612. return fitsNodeName && fitsNodeAffinity && fitsTaints
  613. }
  614. func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {
  615. return checkDaemonPodOnNodes(f, ds, make([]string, 0))
  616. }
  617. func checkDaemonStatus(f *framework.Framework, dsName string) error {
  618. ds, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Get(context.TODO(), dsName, metav1.GetOptions{})
  619. if err != nil {
  620. return fmt.Errorf("Could not get daemon set from v1")
  621. }
  622. desired, scheduled, ready := ds.Status.DesiredNumberScheduled, ds.Status.CurrentNumberScheduled, ds.Status.NumberReady
  623. if desired != scheduled && desired != ready {
  624. return fmt.Errorf("Error in daemon status. DesiredScheduled: %d, CurrentScheduled: %d, Ready: %d", desired, scheduled, ready)
  625. }
  626. return nil
  627. }
  628. func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func() (bool, error) {
  629. return func() (bool, error) {
  630. podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
  631. if err != nil {
  632. return false, err
  633. }
  634. pods := podList.Items
  635. unavailablePods := 0
  636. nodesToUpdatedPodCount := make(map[string]int)
  637. for _, pod := range pods {
  638. if !metav1.IsControlledBy(&pod, ds) {
  639. continue
  640. }
  641. podImage := pod.Spec.Containers[0].Image
  642. if podImage != image {
  643. framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
  644. } else {
  645. nodesToUpdatedPodCount[pod.Spec.NodeName]++
  646. }
  647. if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  648. framework.Logf("Pod %s is not available", pod.Name)
  649. unavailablePods++
  650. }
  651. }
  652. if unavailablePods > maxUnavailable {
  653. return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
  654. }
  655. // Make sure every daemon pod on the node has been updated
  656. nodeNames := schedulableNodes(c, ds)
  657. for _, node := range nodeNames {
  658. if nodesToUpdatedPodCount[node] == 0 {
  659. return false, nil
  660. }
  661. }
  662. return true, nil
  663. }
  664. }
  665. func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
  666. for _, pod := range podList.Items {
  667. podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
  668. gomega.Expect(len(podHash)).To(gomega.BeNumerically(">", 0))
  669. if len(hash) > 0 {
  670. framework.ExpectEqual(podHash, hash)
  671. }
  672. }
  673. }
  674. func waitForHistoryCreated(c clientset.Interface, ns string, label map[string]string, numHistory int) {
  675. listHistoryFn := func() (bool, error) {
  676. selector := labels.Set(label).AsSelector()
  677. options := metav1.ListOptions{LabelSelector: selector.String()}
  678. historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options)
  679. if err != nil {
  680. return false, err
  681. }
  682. if len(historyList.Items) == numHistory {
  683. return true, nil
  684. }
  685. framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
  686. return false, nil
  687. }
  688. err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, listHistoryFn)
  689. framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
  690. }
  691. func listDaemonHistories(c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList {
  692. selector := labels.Set(label).AsSelector()
  693. options := metav1.ListOptions{LabelSelector: selector.String()}
  694. historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options)
  695. framework.ExpectNoError(err)
  696. gomega.Expect(len(historyList.Items)).To(gomega.BeNumerically(">", 0))
  697. return historyList
  698. }
  699. func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision {
  700. var curHistory *appsv1.ControllerRevision
  701. foundCurHistories := 0
  702. for i := range historyList.Items {
  703. history := &historyList.Items[i]
  704. // Every history should have the hash label
  705. gomega.Expect(len(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey])).To(gomega.BeNumerically(">", 0))
  706. match, err := daemon.Match(ds, history)
  707. framework.ExpectNoError(err)
  708. if match {
  709. curHistory = history
  710. foundCurHistories++
  711. }
  712. }
  713. framework.ExpectEqual(foundCurHistories, 1)
  714. gomega.Expect(curHistory).NotTo(gomega.BeNil())
  715. return curHistory
  716. }
  717. func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func() (bool, error) {
  718. return func() (bool, error) {
  719. if _, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
  720. if apierrors.IsNotFound(err) {
  721. return true, nil
  722. }
  723. return false, fmt.Errorf("failed to get failed daemon pod %q: %v", pod.Name, err)
  724. }
  725. return false, nil
  726. }
  727. }