statefulset_test.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "context"
  16. "fmt"
  17. "testing"
  18. appsv1 "k8s.io/api/apps/v1"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  22. "k8s.io/apimachinery/pkg/util/json"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/client-go/dynamic"
  25. apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
  26. "k8s.io/kubernetes/test/integration/framework"
  27. )
  28. // TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated
  29. func TestVolumeTemplateNoopUpdate(t *testing.T) {
  30. // Start the server with default storage setup
  31. server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. defer server.TearDownFn()
  36. c, err := dynamic.NewForConfig(server.ClientConfig)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. // Use an unstructured client to ensure we send exactly the bytes we expect for the embedded PVC template
  41. sts := &unstructured.Unstructured{}
  42. err = json.Unmarshal([]byte(`{
  43. "apiVersion": "apps/v1",
  44. "kind": "StatefulSet",
  45. "metadata": {"name": "web"},
  46. "spec": {
  47. "selector": {"matchLabels": {"app": "nginx"}},
  48. "serviceName": "nginx",
  49. "replicas": 3,
  50. "template": {
  51. "metadata": {"labels": {"app": "nginx"}},
  52. "spec": {
  53. "terminationGracePeriodSeconds": 10,
  54. "containers": [{
  55. "name": "nginx",
  56. "image": "k8s.gcr.io/nginx-slim:0.8",
  57. "ports": [{"containerPort": 80,"name": "web"}],
  58. "volumeMounts": [{"name": "www","mountPath": "/usr/share/nginx/html"}]
  59. }]
  60. }
  61. },
  62. "volumeClaimTemplates": [{
  63. "apiVersion": "v1",
  64. "kind": "PersistentVolumeClaim",
  65. "metadata": {"name": "www"},
  66. "spec": {
  67. "accessModes": ["ReadWriteOnce"],
  68. "storageClassName": "my-storage-class",
  69. "resources": {"requests": {"storage": "1Gi"}}
  70. }
  71. }
  72. ]
  73. }
  74. }`), &sts.Object)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. stsClient := c.Resource(appsv1.SchemeGroupVersion.WithResource("statefulsets")).Namespace("default")
  79. // Create the statefulset
  80. persistedSTS, err := stsClient.Create(sts, metav1.CreateOptions{})
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. // Update with the original spec (all the same defaulting should apply, should be a no-op and pass validation
  85. originalSpec, ok, err := unstructured.NestedFieldCopy(sts.Object, "spec")
  86. if err != nil || !ok {
  87. t.Fatal(err, ok)
  88. }
  89. err = unstructured.SetNestedField(persistedSTS.Object, originalSpec, "spec")
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. _, err = stsClient.Update(persistedSTS, metav1.UpdateOptions{})
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. }
  98. func TestSpecReplicasChange(t *testing.T) {
  99. s, closeFn, rm, informers, c := scSetup(t)
  100. defer closeFn()
  101. ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t)
  102. defer framework.DeleteTestingNamespace(ns, s, t)
  103. stopCh := runControllerAndInformers(rm, informers)
  104. defer close(stopCh)
  105. createHeadlessService(t, c, newHeadlessService(ns.Name))
  106. sts := newSTS("sts", ns.Name, 2)
  107. stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
  108. sts = stss[0]
  109. waitSTSStable(t, c, sts)
  110. // Update .Spec.Replicas and verify .Status.Replicas is changed accordingly
  111. scaleSTS(t, c, sts, 3)
  112. scaleSTS(t, c, sts, 0)
  113. scaleSTS(t, c, sts, 2)
  114. // Add a template annotation change to test STS's status does update
  115. // without .Spec.Replicas change
  116. stsClient := c.AppsV1().StatefulSets(ns.Name)
  117. var oldGeneration int64
  118. newSTS := updateSTS(t, stsClient, sts.Name, func(sts *appsv1.StatefulSet) {
  119. oldGeneration = sts.Generation
  120. sts.Spec.Template.Annotations = map[string]string{"test": "annotation"}
  121. })
  122. savedGeneration := newSTS.Generation
  123. if savedGeneration == oldGeneration {
  124. t.Fatalf("failed to verify .Generation has incremented for sts %s", sts.Name)
  125. }
  126. if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  127. newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
  128. if err != nil {
  129. return false, err
  130. }
  131. return newSTS.Status.ObservedGeneration >= savedGeneration, nil
  132. }); err != nil {
  133. t.Fatalf("failed to verify .Status.ObservedGeneration has incremented for sts %s: %v", sts.Name, err)
  134. }
  135. }
  136. func TestDeletingAndFailedPods(t *testing.T) {
  137. s, closeFn, rm, informers, c := scSetup(t)
  138. defer closeFn()
  139. ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", s, t)
  140. defer framework.DeleteTestingNamespace(ns, s, t)
  141. stopCh := runControllerAndInformers(rm, informers)
  142. defer close(stopCh)
  143. labelMap := labelMap()
  144. sts := newSTS("sts", ns.Name, 2)
  145. stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
  146. sts = stss[0]
  147. waitSTSStable(t, c, sts)
  148. // Verify STS creates 2 pods
  149. podClient := c.CoreV1().Pods(ns.Name)
  150. pods := getPods(t, podClient, labelMap)
  151. if len(pods.Items) != 2 {
  152. t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
  153. }
  154. // Set first pod as deleting pod
  155. // Set finalizers for the pod to simulate pending deletion status
  156. deletingPod := &pods.Items[0]
  157. updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
  158. pod.Finalizers = []string{"fake.example.com/blockDeletion"}
  159. })
  160. if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), deletingPod.Name, &metav1.DeleteOptions{}); err != nil {
  161. t.Fatalf("error deleting pod %s: %v", deletingPod.Name, err)
  162. }
  163. // Set second pod as failed pod
  164. failedPod := &pods.Items[1]
  165. updatePodStatus(t, podClient, failedPod.Name, func(pod *v1.Pod) {
  166. pod.Status.Phase = v1.PodFailed
  167. })
  168. if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  169. // Verify only 2 pods exist: deleting pod and new pod replacing failed pod
  170. pods = getPods(t, podClient, labelMap)
  171. if len(pods.Items) != 2 {
  172. return false, nil
  173. }
  174. // Verify deleting pod still exists
  175. // Immediately return false with an error if it does not exist
  176. if pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID {
  177. return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name)
  178. }
  179. // Verify failed pod does not exist anymore
  180. if pods.Items[0].UID == failedPod.UID || pods.Items[1].UID == failedPod.UID {
  181. return false, nil
  182. }
  183. // Verify both pods have non-failed status
  184. return pods.Items[0].Status.Phase != v1.PodFailed && pods.Items[1].Status.Phase != v1.PodFailed, nil
  185. }); err != nil {
  186. t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err)
  187. }
  188. // Remove finalizers of deleting pod to simulate successful deletion
  189. updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
  190. pod.Finalizers = []string{}
  191. })
  192. if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  193. // Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
  194. pods = getPods(t, podClient, labelMap)
  195. if len(pods.Items) != 2 {
  196. return false, nil
  197. }
  198. // Verify deleting pod does not exist anymore
  199. return pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID, nil
  200. }); err != nil {
  201. t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
  202. }
  203. }