statefulset_utils.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "fmt"
  16. "path/filepath"
  17. "reflect"
  18. "regexp"
  19. "sort"
  20. "strconv"
  21. "strings"
  22. "time"
  23. apps "k8s.io/api/apps/v1"
  24. appsV1beta2 "k8s.io/api/apps/v1beta2"
  25. "k8s.io/api/core/v1"
  26. apierrs "k8s.io/apimachinery/pkg/api/errors"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/labels"
  30. "k8s.io/apimachinery/pkg/util/intstr"
  31. "k8s.io/apimachinery/pkg/util/sets"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. clientset "k8s.io/client-go/kubernetes"
  34. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  35. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  36. "k8s.io/kubernetes/test/e2e/manifest"
  37. imageutils "k8s.io/kubernetes/test/utils/image"
  38. )
  39. const (
  40. // StatefulSetPoll is a poll interval for StatefulSet tests
  41. StatefulSetPoll = 10 * time.Second
  42. // StatefulSetTimeout is a timeout interval for StatefulSet operations
  43. StatefulSetTimeout = 10 * time.Minute
  44. // StatefulPodTimeout is a timeout for stateful pods to change state
  45. StatefulPodTimeout = 5 * time.Minute
  46. )
  47. // CreateStatefulSetService creates a Headless Service with Name name and Selector set to match labels.
  48. func CreateStatefulSetService(name string, labels map[string]string) *v1.Service {
  49. headlessService := &v1.Service{
  50. ObjectMeta: metav1.ObjectMeta{
  51. Name: name,
  52. },
  53. Spec: v1.ServiceSpec{
  54. Selector: labels,
  55. },
  56. }
  57. headlessService.Spec.Ports = []v1.ServicePort{
  58. {Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
  59. }
  60. headlessService.Spec.ClusterIP = "None"
  61. return headlessService
  62. }
  63. // StatefulSetTester is a struct that contains utility methods for testing StatefulSet related functionality. It uses a
  64. // clientset.Interface to communicate with the API server.
  65. type StatefulSetTester struct {
  66. c clientset.Interface
  67. }
  68. // NewStatefulSetTester creates a StatefulSetTester that uses c to interact with the API server.
  69. func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester {
  70. return &StatefulSetTester{c}
  71. }
  72. // GetStatefulSet gets the StatefulSet named name in namespace.
  73. func (s *StatefulSetTester) GetStatefulSet(namespace, name string) *apps.StatefulSet {
  74. ss, err := s.c.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
  75. if err != nil {
  76. Failf("Failed to get StatefulSet %s/%s: %v", namespace, name, err)
  77. }
  78. return ss
  79. }
  80. // CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
  81. func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet {
  82. mkpath := func(file string) string {
  83. return filepath.Join(manifestPath, file)
  84. }
  85. e2elog.Logf("Parsing statefulset from %v", mkpath("statefulset.yaml"))
  86. ss, err := manifest.StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
  87. ExpectNoError(err)
  88. e2elog.Logf("Parsing service from %v", mkpath("service.yaml"))
  89. svc, err := manifest.SvcFromManifest(mkpath("service.yaml"))
  90. ExpectNoError(err)
  91. e2elog.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
  92. _, err = s.c.CoreV1().Services(ns).Create(svc)
  93. ExpectNoError(err)
  94. e2elog.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
  95. _, err = s.c.AppsV1().StatefulSets(ns).Create(ss)
  96. ExpectNoError(err)
  97. s.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
  98. return ss
  99. }
  100. // CheckMount checks that the mount at mountPath is valid for all Pods in ss.
  101. func (s *StatefulSetTester) CheckMount(ss *apps.StatefulSet, mountPath string) error {
  102. for _, cmd := range []string{
  103. // Print inode, size etc
  104. fmt.Sprintf("ls -idlh %v", mountPath),
  105. // Print subdirs
  106. fmt.Sprintf("find %v", mountPath),
  107. // Try writing
  108. fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
  109. } {
  110. if err := s.ExecInStatefulPods(ss, cmd); err != nil {
  111. return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
  112. }
  113. }
  114. return nil
  115. }
  116. // ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
  117. func (s *StatefulSetTester) ExecInStatefulPods(ss *apps.StatefulSet, cmd string) error {
  118. podList := s.GetPodList(ss)
  119. for _, statefulPod := range podList.Items {
  120. stdout, err := RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
  121. e2elog.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
  122. if err != nil {
  123. return err
  124. }
  125. }
  126. return nil
  127. }
  128. // CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
  129. func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
  130. cmd := "printf $(hostname)"
  131. podList := s.GetPodList(ss)
  132. for _, statefulPod := range podList.Items {
  133. hostname, err := RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
  134. if err != nil {
  135. return err
  136. }
  137. if hostname != statefulPod.Name {
  138. return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
  139. }
  140. }
  141. return nil
  142. }
  143. // Saturate waits for all Pods in ss to become Running and Ready.
  144. func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) {
  145. var i int32
  146. for i = 0; i < *(ss.Spec.Replicas); i++ {
  147. e2elog.Logf("Waiting for stateful pod at index %v to enter Running", i)
  148. s.WaitForRunning(i+1, i, ss)
  149. e2elog.Logf("Resuming stateful pod at index %v", i)
  150. s.ResumeNextPod(ss)
  151. }
  152. }
  153. // DeleteStatefulPodAtIndex deletes the Pod with ordinal index in ss.
  154. func (s *StatefulSetTester) DeleteStatefulPodAtIndex(index int, ss *apps.StatefulSet) {
  155. name := getStatefulSetPodNameAtIndex(index, ss)
  156. noGrace := int64(0)
  157. if err := s.c.CoreV1().Pods(ss.Namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
  158. Failf("Failed to delete stateful pod %v for StatefulSet %v/%v: %v", name, ss.Namespace, ss.Name, err)
  159. }
  160. }
  161. // VerifyStatefulPodFunc is a func that examines a StatefulSetPod.
  162. type VerifyStatefulPodFunc func(*v1.Pod)
  163. // VerifyPodAtIndex applies a visitor patter to the Pod at index in ss. verify is applied to the Pod to "visit" it.
  164. func (s *StatefulSetTester) VerifyPodAtIndex(index int, ss *apps.StatefulSet, verify VerifyStatefulPodFunc) {
  165. name := getStatefulSetPodNameAtIndex(index, ss)
  166. pod, err := s.c.CoreV1().Pods(ss.Namespace).Get(name, metav1.GetOptions{})
  167. ExpectNoError(err, fmt.Sprintf("Failed to get stateful pod %s for StatefulSet %s/%s", name, ss.Namespace, ss.Name))
  168. verify(pod)
  169. }
  170. func getStatefulSetPodNameAtIndex(index int, ss *apps.StatefulSet) string {
  171. // TODO: we won't use "-index" as the name strategy forever,
  172. // pull the name out from an identity mapper.
  173. return fmt.Sprintf("%v-%v", ss.Name, index)
  174. }
  175. // Scale scales ss to count replicas.
  176. func (s *StatefulSetTester) Scale(ss *apps.StatefulSet, count int32) (*apps.StatefulSet, error) {
  177. name := ss.Name
  178. ns := ss.Namespace
  179. e2elog.Logf("Scaling statefulset %s to %d", name, count)
  180. ss = s.update(ns, name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
  181. var statefulPodList *v1.PodList
  182. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
  183. statefulPodList = s.GetPodList(ss)
  184. if int32(len(statefulPodList.Items)) == count {
  185. return true, nil
  186. }
  187. return false, nil
  188. })
  189. if pollErr != nil {
  190. unhealthy := []string{}
  191. for _, statefulPod := range statefulPodList.Items {
  192. delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, podutil.IsPodReady(&statefulPod)
  193. if delTs != nil || phase != v1.PodRunning || !readiness {
  194. unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
  195. }
  196. }
  197. return ss, fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
  198. }
  199. return ss, nil
  200. }
  201. // UpdateReplicas updates the replicas of ss to count.
  202. func (s *StatefulSetTester) UpdateReplicas(ss *apps.StatefulSet, count int32) {
  203. s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = count })
  204. }
  205. // Restart scales ss to 0 and then back to its previous number of replicas.
  206. func (s *StatefulSetTester) Restart(ss *apps.StatefulSet) {
  207. oldReplicas := *(ss.Spec.Replicas)
  208. ss, err := s.Scale(ss, 0)
  209. ExpectNoError(err)
  210. // Wait for controller to report the desired number of Pods.
  211. // This way we know the controller has observed all Pod deletions
  212. // before we scale it back up.
  213. s.WaitForStatusReplicas(ss, 0)
  214. s.update(ss.Namespace, ss.Name, func(ss *apps.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
  215. }
  216. func (s *StatefulSetTester) update(ns, name string, update func(ss *apps.StatefulSet)) *apps.StatefulSet {
  217. for i := 0; i < 3; i++ {
  218. ss, err := s.c.AppsV1().StatefulSets(ns).Get(name, metav1.GetOptions{})
  219. if err != nil {
  220. Failf("failed to get statefulset %q: %v", name, err)
  221. }
  222. update(ss)
  223. ss, err = s.c.AppsV1().StatefulSets(ns).Update(ss)
  224. if err == nil {
  225. return ss
  226. }
  227. if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
  228. Failf("failed to update statefulset %q: %v", name, err)
  229. }
  230. }
  231. Failf("too many retries draining statefulset %q", name)
  232. return nil
  233. }
  234. // GetPodList gets the current Pods in ss.
  235. func (s *StatefulSetTester) GetPodList(ss *apps.StatefulSet) *v1.PodList {
  236. selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
  237. ExpectNoError(err)
  238. podList, err := s.c.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
  239. ExpectNoError(err)
  240. return podList
  241. }
  242. // ConfirmStatefulPodCount asserts that the current number of Pods in ss is count waiting up to timeout for ss to
  243. // to scale to count.
  244. func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.StatefulSet, timeout time.Duration, hard bool) {
  245. start := time.Now()
  246. deadline := start.Add(timeout)
  247. for t := time.Now(); t.Before(deadline); t = time.Now() {
  248. podList := s.GetPodList(ss)
  249. statefulPodCount := len(podList.Items)
  250. if statefulPodCount != count {
  251. logPodStates(podList.Items)
  252. if hard {
  253. Failf("StatefulSet %v scaled unexpectedly scaled to %d -> %d replicas", ss.Name, count, len(podList.Items))
  254. } else {
  255. e2elog.Logf("StatefulSet %v has not reached scale %d, at %d", ss.Name, count, statefulPodCount)
  256. }
  257. time.Sleep(1 * time.Second)
  258. continue
  259. }
  260. e2elog.Logf("Verifying statefulset %v doesn't scale past %d for another %+v", ss.Name, count, deadline.Sub(t))
  261. time.Sleep(1 * time.Second)
  262. }
  263. }
  264. // WaitForRunning waits for numPodsRunning in ss to be Running and for the first
  265. // numPodsReady ordinals to be Ready.
  266. func (s *StatefulSetTester) WaitForRunning(numPodsRunning, numPodsReady int32, ss *apps.StatefulSet) {
  267. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  268. func() (bool, error) {
  269. podList := s.GetPodList(ss)
  270. s.SortStatefulPods(podList)
  271. if int32(len(podList.Items)) < numPodsRunning {
  272. e2elog.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numPodsRunning)
  273. return false, nil
  274. }
  275. if int32(len(podList.Items)) > numPodsRunning {
  276. return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numPodsRunning, len(podList.Items))
  277. }
  278. for _, p := range podList.Items {
  279. shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
  280. isReady := podutil.IsPodReady(&p)
  281. desiredReadiness := shouldBeReady == isReady
  282. e2elog.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
  283. if p.Status.Phase != v1.PodRunning || !desiredReadiness {
  284. return false, nil
  285. }
  286. }
  287. return true, nil
  288. })
  289. if pollErr != nil {
  290. Failf("Failed waiting for pods to enter running: %v", pollErr)
  291. }
  292. }
  293. // WaitForState periodically polls for the ss and its pods until the until function returns either true or an error
  294. func (s *StatefulSetTester) WaitForState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) {
  295. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  296. func() (bool, error) {
  297. ssGet, err := s.c.AppsV1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
  298. if err != nil {
  299. return false, err
  300. }
  301. podList := s.GetPodList(ssGet)
  302. return until(ssGet, podList)
  303. })
  304. if pollErr != nil {
  305. Failf("Failed waiting for state update: %v", pollErr)
  306. }
  307. }
  308. // WaitForStatus waits for the StatefulSetStatus's ObservedGeneration to be greater than or equal to set's Generation.
  309. // The returned StatefulSet contains such a StatefulSetStatus
  310. func (s *StatefulSetTester) WaitForStatus(set *apps.StatefulSet) *apps.StatefulSet {
  311. s.WaitForState(set, func(set2 *apps.StatefulSet, pods *v1.PodList) (bool, error) {
  312. if set2.Status.ObservedGeneration >= set.Generation {
  313. set = set2
  314. return true, nil
  315. }
  316. return false, nil
  317. })
  318. return set
  319. }
  320. // WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
  321. func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
  322. s.WaitForRunning(numStatefulPods, numStatefulPods, ss)
  323. }
  324. // WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition.
  325. func (s *StatefulSetTester) WaitForPodReady(set *apps.StatefulSet, podName string) (*apps.StatefulSet, *v1.PodList) {
  326. var pods *v1.PodList
  327. s.WaitForState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
  328. set = set2
  329. pods = pods2
  330. for i := range pods.Items {
  331. if pods.Items[i].Name == podName {
  332. return podutil.IsPodReady(&pods.Items[i]), nil
  333. }
  334. }
  335. return false, nil
  336. })
  337. return set, pods
  338. }
  339. // WaitForPodNotReady waist for the Pod named podName in set to exist and to not have a Ready condition.
  340. func (s *StatefulSetTester) WaitForPodNotReady(set *apps.StatefulSet, podName string) (*apps.StatefulSet, *v1.PodList) {
  341. var pods *v1.PodList
  342. s.WaitForState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
  343. set = set2
  344. pods = pods2
  345. for i := range pods.Items {
  346. if pods.Items[i].Name == podName {
  347. return !podutil.IsPodReady(&pods.Items[i]), nil
  348. }
  349. }
  350. return false, nil
  351. })
  352. return set, pods
  353. }
  354. // WaitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
  355. // complete. set must have a RollingUpdateStatefulSetStrategyType.
  356. func (s *StatefulSetTester) WaitForRollingUpdate(set *apps.StatefulSet) (*apps.StatefulSet, *v1.PodList) {
  357. var pods *v1.PodList
  358. if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType {
  359. Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
  360. set.Namespace,
  361. set.Name,
  362. set.Spec.UpdateStrategy.Type)
  363. }
  364. s.WaitForState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
  365. set = set2
  366. pods = pods2
  367. if len(pods.Items) < int(*set.Spec.Replicas) {
  368. return false, nil
  369. }
  370. if set.Status.UpdateRevision != set.Status.CurrentRevision {
  371. e2elog.Logf("Waiting for StatefulSet %s/%s to complete update",
  372. set.Namespace,
  373. set.Name,
  374. )
  375. s.SortStatefulPods(pods)
  376. for i := range pods.Items {
  377. if pods.Items[i].Labels[apps.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
  378. e2elog.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
  379. pods.Items[i].Namespace,
  380. pods.Items[i].Name,
  381. set.Status.UpdateRevision,
  382. pods.Items[i].Labels[apps.StatefulSetRevisionLabel])
  383. }
  384. }
  385. return false, nil
  386. }
  387. return true, nil
  388. })
  389. return set, pods
  390. }
  391. // WaitForPartitionedRollingUpdate waits for all Pods in set to exist and have the correct revision. set must have
  392. // a RollingUpdateStatefulSetStrategyType with a non-nil RollingUpdate and Partition. All Pods with ordinals less
  393. // than or equal to the Partition are expected to be at set's current revision. All other Pods are expected to be
  394. // at its update revision.
  395. func (s *StatefulSetTester) WaitForPartitionedRollingUpdate(set *apps.StatefulSet) (*apps.StatefulSet, *v1.PodList) {
  396. var pods *v1.PodList
  397. if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType {
  398. Failf("StatefulSet %s/%s attempt to wait for partitioned update with updateStrategy %s",
  399. set.Namespace,
  400. set.Name,
  401. set.Spec.UpdateStrategy.Type)
  402. }
  403. if set.Spec.UpdateStrategy.RollingUpdate == nil || set.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
  404. Failf("StatefulSet %s/%s attempt to wait for partitioned update with nil RollingUpdate or nil Partition",
  405. set.Namespace,
  406. set.Name)
  407. }
  408. s.WaitForState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
  409. set = set2
  410. pods = pods2
  411. partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  412. if len(pods.Items) < int(*set.Spec.Replicas) {
  413. return false, nil
  414. }
  415. if partition <= 0 && set.Status.UpdateRevision != set.Status.CurrentRevision {
  416. e2elog.Logf("Waiting for StatefulSet %s/%s to complete update",
  417. set.Namespace,
  418. set.Name,
  419. )
  420. s.SortStatefulPods(pods)
  421. for i := range pods.Items {
  422. if pods.Items[i].Labels[apps.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
  423. e2elog.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
  424. pods.Items[i].Namespace,
  425. pods.Items[i].Name,
  426. set.Status.UpdateRevision,
  427. pods.Items[i].Labels[apps.StatefulSetRevisionLabel])
  428. }
  429. }
  430. return false, nil
  431. }
  432. for i := int(*set.Spec.Replicas) - 1; i >= partition; i-- {
  433. if pods.Items[i].Labels[apps.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
  434. e2elog.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
  435. pods.Items[i].Namespace,
  436. pods.Items[i].Name,
  437. set.Status.UpdateRevision,
  438. pods.Items[i].Labels[apps.StatefulSetRevisionLabel])
  439. return false, nil
  440. }
  441. }
  442. return true, nil
  443. })
  444. return set, pods
  445. }
  446. // WaitForRunningAndNotReady waits for numStatefulPods in ss to be Running and not Ready.
  447. func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
  448. s.WaitForRunning(numStatefulPods, 0, ss)
  449. }
  450. var httpProbe = &v1.Probe{
  451. Handler: v1.Handler{
  452. HTTPGet: &v1.HTTPGetAction{
  453. Path: "/index.html",
  454. Port: intstr.IntOrString{IntVal: 80},
  455. },
  456. },
  457. PeriodSeconds: 1,
  458. SuccessThreshold: 1,
  459. FailureThreshold: 1,
  460. }
  461. // SetHTTPProbe sets the pod template's ReadinessProbe for Nginx StatefulSet containers.
  462. // This probe can then be controlled with BreakHTTPProbe() and RestoreHTTPProbe().
  463. // Note that this cannot be used together with PauseNewPods().
  464. func (s *StatefulSetTester) SetHTTPProbe(ss *apps.StatefulSet) {
  465. ss.Spec.Template.Spec.Containers[0].ReadinessProbe = httpProbe
  466. }
  467. // BreakHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
  468. func (s *StatefulSetTester) BreakHTTPProbe(ss *apps.StatefulSet) error {
  469. path := httpProbe.HTTPGet.Path
  470. if path == "" {
  471. return fmt.Errorf("Path expected to be not empty: %v", path)
  472. }
  473. // Ignore 'mv' errors to make this idempotent.
  474. cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/ || true", path)
  475. return s.ExecInStatefulPods(ss, cmd)
  476. }
  477. // BreakPodHTTPProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
  478. func (s *StatefulSetTester) BreakPodHTTPProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
  479. path := httpProbe.HTTPGet.Path
  480. if path == "" {
  481. return fmt.Errorf("Path expected to be not empty: %v", path)
  482. }
  483. // Ignore 'mv' errors to make this idempotent.
  484. cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/ || true", path)
  485. stdout, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
  486. e2elog.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
  487. return err
  488. }
  489. // RestoreHTTPProbe restores the readiness probe for Nginx StatefulSet containers in ss.
  490. func (s *StatefulSetTester) RestoreHTTPProbe(ss *apps.StatefulSet) error {
  491. path := httpProbe.HTTPGet.Path
  492. if path == "" {
  493. return fmt.Errorf("Path expected to be not empty: %v", path)
  494. }
  495. // Ignore 'mv' errors to make this idempotent.
  496. cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/ || true", path)
  497. return s.ExecInStatefulPods(ss, cmd)
  498. }
  499. // RestorePodHTTPProbe restores the readiness probe for Nginx StatefulSet containers in pod.
  500. func (s *StatefulSetTester) RestorePodHTTPProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
  501. path := httpProbe.HTTPGet.Path
  502. if path == "" {
  503. return fmt.Errorf("Path expected to be not empty: %v", path)
  504. }
  505. // Ignore 'mv' errors to make this idempotent.
  506. cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/ || true", path)
  507. stdout, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
  508. e2elog.Logf("stdout of %v on %v: %v", cmd, pod.Name, stdout)
  509. return err
  510. }
  511. var pauseProbe = &v1.Probe{
  512. Handler: v1.Handler{
  513. Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
  514. },
  515. PeriodSeconds: 1,
  516. SuccessThreshold: 1,
  517. FailureThreshold: 1,
  518. }
  519. func hasPauseProbe(pod *v1.Pod) bool {
  520. probe := pod.Spec.Containers[0].ReadinessProbe
  521. return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
  522. }
  523. // PauseNewPods adds an always-failing ReadinessProbe to the StatefulSet PodTemplate.
  524. // This causes all newly-created Pods to stay Unready until they are manually resumed
  525. // with ResumeNextPod().
  526. // Note that this cannot be used together with SetHTTPProbe().
  527. func (s *StatefulSetTester) PauseNewPods(ss *apps.StatefulSet) {
  528. ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe
  529. }
  530. // ResumeNextPod allows the next Pod in the StatefulSet to continue by removing the ReadinessProbe
  531. // added by PauseNewPods(), if it's still there.
  532. // It fails the test if it finds any pods that are not in phase Running,
  533. // or if it finds more than one paused Pod existing at the same time.
  534. // This is a no-op if there are no paused pods.
  535. func (s *StatefulSetTester) ResumeNextPod(ss *apps.StatefulSet) {
  536. podList := s.GetPodList(ss)
  537. resumedPod := ""
  538. for _, pod := range podList.Items {
  539. if pod.Status.Phase != v1.PodRunning {
  540. Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
  541. }
  542. if podutil.IsPodReady(&pod) || !hasPauseProbe(&pod) {
  543. continue
  544. }
  545. if resumedPod != "" {
  546. Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
  547. }
  548. _, err := RunHostCmdWithRetries(pod.Namespace, pod.Name, "dd if=/dev/zero of=/data/statefulset-continue bs=1 count=1 conv=fsync", StatefulSetPoll, StatefulPodTimeout)
  549. ExpectNoError(err)
  550. e2elog.Logf("Resumed pod %v", pod.Name)
  551. resumedPod = pod.Name
  552. }
  553. }
  554. // WaitForStatusReadyReplicas waits for the ss.Status.ReadyReplicas to be equal to expectedReplicas
  555. func (s *StatefulSetTester) WaitForStatusReadyReplicas(ss *apps.StatefulSet, expectedReplicas int32) {
  556. e2elog.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
  557. ns, name := ss.Namespace, ss.Name
  558. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  559. func() (bool, error) {
  560. ssGet, err := s.c.AppsV1().StatefulSets(ns).Get(name, metav1.GetOptions{})
  561. if err != nil {
  562. return false, err
  563. }
  564. if ssGet.Status.ObservedGeneration < ss.Generation {
  565. return false, nil
  566. }
  567. if ssGet.Status.ReadyReplicas != expectedReplicas {
  568. e2elog.Logf("Waiting for stateful set status.readyReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.ReadyReplicas)
  569. return false, nil
  570. }
  571. return true, nil
  572. })
  573. if pollErr != nil {
  574. Failf("Failed waiting for stateful set status.readyReplicas updated to %d: %v", expectedReplicas, pollErr)
  575. }
  576. }
  577. // WaitForStatusReplicas waits for the ss.Status.Replicas to be equal to expectedReplicas
  578. func (s *StatefulSetTester) WaitForStatusReplicas(ss *apps.StatefulSet, expectedReplicas int32) {
  579. e2elog.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
  580. ns, name := ss.Namespace, ss.Name
  581. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
  582. func() (bool, error) {
  583. ssGet, err := s.c.AppsV1().StatefulSets(ns).Get(name, metav1.GetOptions{})
  584. if err != nil {
  585. return false, err
  586. }
  587. if ssGet.Status.ObservedGeneration < ss.Generation {
  588. return false, nil
  589. }
  590. if ssGet.Status.Replicas != expectedReplicas {
  591. e2elog.Logf("Waiting for stateful set status.replicas to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
  592. return false, nil
  593. }
  594. return true, nil
  595. })
  596. if pollErr != nil {
  597. Failf("Failed waiting for stateful set status.replicas updated to %d: %v", expectedReplicas, pollErr)
  598. }
  599. }
  600. // CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName.
  601. func (s *StatefulSetTester) CheckServiceName(ss *apps.StatefulSet, expectedServiceName string) error {
  602. e2elog.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
  603. if expectedServiceName != ss.Spec.ServiceName {
  604. return fmt.Errorf("Wrong service name governing statefulset. Expected %s got %s",
  605. expectedServiceName, ss.Spec.ServiceName)
  606. }
  607. return nil
  608. }
  609. // SortStatefulPods sorts pods by their ordinals
  610. func (s *StatefulSetTester) SortStatefulPods(pods *v1.PodList) {
  611. sort.Sort(statefulPodsByOrdinal(pods.Items))
  612. }
  613. // DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
  614. func DeleteAllStatefulSets(c clientset.Interface, ns string) {
  615. sst := &StatefulSetTester{c: c}
  616. ssList, err := c.AppsV1().StatefulSets(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
  617. ExpectNoError(err)
  618. // Scale down each statefulset, then delete it completely.
  619. // Deleting a pvc without doing this will leak volumes, #25101.
  620. errList := []string{}
  621. for i := range ssList.Items {
  622. ss := &ssList.Items[i]
  623. var err error
  624. if ss, err = sst.Scale(ss, 0); err != nil {
  625. errList = append(errList, fmt.Sprintf("%v", err))
  626. }
  627. sst.WaitForStatusReplicas(ss, 0)
  628. e2elog.Logf("Deleting statefulset %v", ss.Name)
  629. // Use OrphanDependents=false so it's deleted synchronously.
  630. // We already made sure the Pods are gone inside Scale().
  631. if err := c.AppsV1().StatefulSets(ss.Namespace).Delete(ss.Name, &metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil {
  632. errList = append(errList, fmt.Sprintf("%v", err))
  633. }
  634. }
  635. // pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
  636. pvNames := sets.NewString()
  637. // TODO: Don't assume all pvcs in the ns belong to a statefulset
  638. pvcPollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
  639. pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
  640. if err != nil {
  641. e2elog.Logf("WARNING: Failed to list pvcs, retrying %v", err)
  642. return false, nil
  643. }
  644. for _, pvc := range pvcList.Items {
  645. pvNames.Insert(pvc.Spec.VolumeName)
  646. // TODO: Double check that there are no pods referencing the pvc
  647. e2elog.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
  648. if err := c.CoreV1().PersistentVolumeClaims(ns).Delete(pvc.Name, nil); err != nil {
  649. return false, nil
  650. }
  651. }
  652. return true, nil
  653. })
  654. if pvcPollErr != nil {
  655. errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
  656. }
  657. pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
  658. pvList, err := c.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: labels.Everything().String()})
  659. if err != nil {
  660. e2elog.Logf("WARNING: Failed to list pvs, retrying %v", err)
  661. return false, nil
  662. }
  663. waitingFor := []string{}
  664. for _, pv := range pvList.Items {
  665. if pvNames.Has(pv.Name) {
  666. waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
  667. }
  668. }
  669. if len(waitingFor) == 0 {
  670. return true, nil
  671. }
  672. e2elog.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
  673. return false, nil
  674. })
  675. if pollErr != nil {
  676. errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
  677. }
  678. if len(errList) != 0 {
  679. ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
  680. }
  681. }
  682. // NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
  683. func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
  684. return v1.PersistentVolumeClaim{
  685. ObjectMeta: metav1.ObjectMeta{
  686. Name: name,
  687. },
  688. Spec: v1.PersistentVolumeClaimSpec{
  689. AccessModes: []v1.PersistentVolumeAccessMode{
  690. v1.ReadWriteOnce,
  691. },
  692. Resources: v1.ResourceRequirements{
  693. Requests: v1.ResourceList{
  694. v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
  695. },
  696. },
  697. },
  698. }
  699. }
  700. // NewStatefulSet creates a new NGINX StatefulSet for testing. The StatefulSet is named name, is in namespace ns,
  701. // statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly
  702. // to the Pod. labels are the labels that will be usd for the StatefulSet selector.
  703. func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *apps.StatefulSet {
  704. mounts := append(statefulPodMounts, podMounts...)
  705. claims := []v1.PersistentVolumeClaim{}
  706. for _, m := range statefulPodMounts {
  707. claims = append(claims, NewStatefulSetPVC(m.Name))
  708. }
  709. vols := []v1.Volume{}
  710. for _, m := range podMounts {
  711. vols = append(vols, v1.Volume{
  712. Name: m.Name,
  713. VolumeSource: v1.VolumeSource{
  714. HostPath: &v1.HostPathVolumeSource{
  715. Path: fmt.Sprintf("/tmp/%v", m.Name),
  716. },
  717. },
  718. })
  719. }
  720. return &apps.StatefulSet{
  721. TypeMeta: metav1.TypeMeta{
  722. Kind: "StatefulSet",
  723. APIVersion: "apps/v1",
  724. },
  725. ObjectMeta: metav1.ObjectMeta{
  726. Name: name,
  727. Namespace: ns,
  728. },
  729. Spec: apps.StatefulSetSpec{
  730. Selector: &metav1.LabelSelector{
  731. MatchLabels: labels,
  732. },
  733. Replicas: func(i int32) *int32 { return &i }(replicas),
  734. Template: v1.PodTemplateSpec{
  735. ObjectMeta: metav1.ObjectMeta{
  736. Labels: labels,
  737. Annotations: map[string]string{},
  738. },
  739. Spec: v1.PodSpec{
  740. Containers: []v1.Container{
  741. {
  742. Name: "nginx",
  743. Image: imageutils.GetE2EImage(imageutils.Nginx),
  744. VolumeMounts: mounts,
  745. SecurityContext: &v1.SecurityContext{},
  746. },
  747. },
  748. Volumes: vols,
  749. },
  750. },
  751. UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
  752. VolumeClaimTemplates: claims,
  753. ServiceName: governingSvcName,
  754. },
  755. }
  756. }
  757. // NewStatefulSetScale creates a new StatefulSet scale subresource and returns it
  758. func NewStatefulSetScale(ss *apps.StatefulSet) *appsV1beta2.Scale {
  759. return &appsV1beta2.Scale{
  760. // TODO: Create a variant of ObjectMeta type that only contains the fields below.
  761. ObjectMeta: metav1.ObjectMeta{
  762. Name: ss.Name,
  763. Namespace: ss.Namespace,
  764. },
  765. Spec: appsV1beta2.ScaleSpec{
  766. Replicas: *(ss.Spec.Replicas),
  767. },
  768. Status: appsV1beta2.ScaleStatus{
  769. Replicas: ss.Status.Replicas,
  770. },
  771. }
  772. }
  773. var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
  774. func getStatefulPodOrdinal(pod *v1.Pod) int {
  775. ordinal := -1
  776. subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
  777. if len(subMatches) < 3 {
  778. return ordinal
  779. }
  780. if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
  781. ordinal = int(i)
  782. }
  783. return ordinal
  784. }
  785. type statefulPodsByOrdinal []v1.Pod
  786. func (sp statefulPodsByOrdinal) Len() int {
  787. return len(sp)
  788. }
  789. func (sp statefulPodsByOrdinal) Swap(i, j int) {
  790. sp[i], sp[j] = sp[j], sp[i]
  791. }
  792. func (sp statefulPodsByOrdinal) Less(i, j int) bool {
  793. return getStatefulPodOrdinal(&sp[i]) < getStatefulPodOrdinal(&sp[j])
  794. }
  795. type updateStatefulSetFunc func(*apps.StatefulSet)
  796. // UpdateStatefulSetWithRetries updates statfulset template with retries.
  797. func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateStatefulSetFunc) (statefulSet *apps.StatefulSet, err error) {
  798. statefulSets := c.AppsV1().StatefulSets(namespace)
  799. var updateErr error
  800. pollErr := wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
  801. if statefulSet, err = statefulSets.Get(name, metav1.GetOptions{}); err != nil {
  802. return false, err
  803. }
  804. // Apply the update, then attempt to push it to the apiserver.
  805. applyUpdate(statefulSet)
  806. if statefulSet, err = statefulSets.Update(statefulSet); err == nil {
  807. e2elog.Logf("Updating stateful set %s", name)
  808. return true, nil
  809. }
  810. updateErr = err
  811. return false, nil
  812. })
  813. if pollErr == wait.ErrWaitTimeout {
  814. pollErr = fmt.Errorf("couldn't apply the provided updated to stateful set %q: %v", name, updateErr)
  815. }
  816. return statefulSet, pollErr
  817. }