util.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "context"
  16. "fmt"
  17. "net/http/httptest"
  18. "testing"
  19. "time"
  20. appsv1 "k8s.io/api/apps/v1"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. "k8s.io/client-go/informers"
  27. clientset "k8s.io/client-go/kubernetes"
  28. typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
  29. typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
  30. restclient "k8s.io/client-go/rest"
  31. "k8s.io/client-go/util/retry"
  32. //svc "k8s.io/kubernetes/pkg/api/v1/service"
  33. "k8s.io/kubernetes/pkg/controller/statefulset"
  34. "k8s.io/kubernetes/test/integration/framework"
  35. )
  36. const (
  37. pollInterval = 100 * time.Millisecond
  38. pollTimeout = 60 * time.Second
  39. )
  40. func labelMap() map[string]string {
  41. return map[string]string{"foo": "bar"}
  42. }
  43. // newService returns a service with a fake name for StatefulSet to be created soon
  44. func newHeadlessService(namespace string) *v1.Service {
  45. return &v1.Service{
  46. TypeMeta: metav1.TypeMeta{
  47. Kind: "Service",
  48. APIVersion: "v1",
  49. },
  50. ObjectMeta: metav1.ObjectMeta{
  51. Namespace: namespace,
  52. Name: "fake-service-name",
  53. },
  54. Spec: v1.ServiceSpec{
  55. ClusterIP: "None",
  56. Ports: []v1.ServicePort{
  57. {Port: 80, Name: "http", Protocol: "TCP"},
  58. },
  59. Selector: labelMap(),
  60. },
  61. }
  62. }
  63. // newSTS returns a StatefulSet with a fake container image
  64. func newSTS(name, namespace string, replicas int) *appsv1.StatefulSet {
  65. replicasCopy := int32(replicas)
  66. return &appsv1.StatefulSet{
  67. TypeMeta: metav1.TypeMeta{
  68. Kind: "StatefulSet",
  69. APIVersion: "apps/v1",
  70. },
  71. ObjectMeta: metav1.ObjectMeta{
  72. Namespace: namespace,
  73. Name: name,
  74. },
  75. Spec: appsv1.StatefulSetSpec{
  76. PodManagementPolicy: appsv1.ParallelPodManagement,
  77. Replicas: &replicasCopy,
  78. Selector: &metav1.LabelSelector{
  79. MatchLabels: labelMap(),
  80. },
  81. Template: v1.PodTemplateSpec{
  82. ObjectMeta: metav1.ObjectMeta{
  83. Labels: labelMap(),
  84. },
  85. Spec: v1.PodSpec{
  86. Containers: []v1.Container{
  87. {
  88. Name: "fake-name",
  89. Image: "fakeimage",
  90. VolumeMounts: []v1.VolumeMount{
  91. {Name: "datadir", MountPath: "/data/"},
  92. {Name: "home", MountPath: "/home"},
  93. },
  94. },
  95. },
  96. Volumes: []v1.Volume{
  97. {
  98. Name: "datadir",
  99. VolumeSource: v1.VolumeSource{
  100. HostPath: &v1.HostPathVolumeSource{
  101. Path: fmt.Sprintf("/tmp/%v", "datadir"),
  102. },
  103. },
  104. },
  105. {
  106. Name: "home",
  107. VolumeSource: v1.VolumeSource{
  108. HostPath: &v1.HostPathVolumeSource{
  109. Path: fmt.Sprintf("/tmp/%v", "home"),
  110. },
  111. },
  112. },
  113. },
  114. },
  115. },
  116. ServiceName: "fake-service-name",
  117. UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
  118. Type: appsv1.RollingUpdateStatefulSetStrategyType,
  119. },
  120. VolumeClaimTemplates: []v1.PersistentVolumeClaim{
  121. // for volume mount "datadir"
  122. newStatefulSetPVC("fake-pvc-name"),
  123. },
  124. },
  125. }
  126. }
  127. func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
  128. return v1.PersistentVolumeClaim{
  129. ObjectMeta: metav1.ObjectMeta{
  130. Name: name,
  131. Annotations: map[string]string{
  132. "volume.alpha.kubernetes.io/storage-class": "anything",
  133. },
  134. },
  135. Spec: v1.PersistentVolumeClaimSpec{
  136. AccessModes: []v1.PersistentVolumeAccessMode{
  137. v1.ReadWriteOnce,
  138. },
  139. Resources: v1.ResourceRequirements{
  140. Requests: v1.ResourceList{
  141. v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
  142. },
  143. },
  144. },
  145. }
  146. }
  147. // scSetup sets up necessities for Statefulset integration test, including master, apiserver, informers, and clientset
  148. func scSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
  149. masterConfig := framework.NewIntegrationTestMasterConfig()
  150. _, s, closeFn := framework.RunAMaster(masterConfig)
  151. config := restclient.Config{Host: s.URL}
  152. clientSet, err := clientset.NewForConfig(&config)
  153. if err != nil {
  154. t.Fatalf("error in create clientset: %v", err)
  155. }
  156. resyncPeriod := 12 * time.Hour
  157. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
  158. sc := statefulset.NewStatefulSetController(
  159. informers.Core().V1().Pods(),
  160. informers.Apps().V1().StatefulSets(),
  161. informers.Core().V1().PersistentVolumeClaims(),
  162. informers.Apps().V1().ControllerRevisions(),
  163. clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
  164. )
  165. return s, closeFn, sc, informers, clientSet
  166. }
  167. // Run STS controller and informers
  168. func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) chan struct{} {
  169. stopCh := make(chan struct{})
  170. informers.Start(stopCh)
  171. go sc.Run(5, stopCh)
  172. return stopCh
  173. }
  174. func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) {
  175. _, err := clientSet.CoreV1().Services(headlessService.Namespace).Create(context.TODO(), headlessService, metav1.CreateOptions{})
  176. if err != nil {
  177. t.Fatalf("failed creating headless service: %v", err)
  178. }
  179. }
  180. func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet, pods []*v1.Pod) ([]*appsv1.StatefulSet, []*v1.Pod) {
  181. var createdSTSs []*appsv1.StatefulSet
  182. var createdPods []*v1.Pod
  183. for _, sts := range stss {
  184. createdSTS, err := clientSet.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
  185. if err != nil {
  186. t.Fatalf("failed to create sts %s: %v", sts.Name, err)
  187. }
  188. createdSTSs = append(createdSTSs, createdSTS)
  189. }
  190. for _, pod := range pods {
  191. createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
  192. if err != nil {
  193. t.Fatalf("failed to create pod %s: %v", pod.Name, err)
  194. }
  195. createdPods = append(createdPods, createdPod)
  196. }
  197. return createdSTSs, createdPods
  198. }
  199. // Verify .Status.Replicas is equal to .Spec.Replicas
  200. func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *appsv1.StatefulSet) {
  201. stsClient := clientSet.AppsV1().StatefulSets(sts.Namespace)
  202. desiredGeneration := sts.Generation
  203. if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  204. newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
  205. if err != nil {
  206. return false, err
  207. }
  208. return newSTS.Status.Replicas == *newSTS.Spec.Replicas && newSTS.Status.ObservedGeneration >= desiredGeneration, nil
  209. }); err != nil {
  210. t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err)
  211. }
  212. }
  213. func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
  214. var pod *v1.Pod
  215. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  216. newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
  217. if err != nil {
  218. return err
  219. }
  220. updateFunc(newPod)
  221. pod, err = podClient.Update(context.TODO(), newPod, metav1.UpdateOptions{})
  222. return err
  223. }); err != nil {
  224. t.Fatalf("failed to update pod %s: %v", podName, err)
  225. }
  226. return pod
  227. }
  228. func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod {
  229. var pod *v1.Pod
  230. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  231. newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
  232. if err != nil {
  233. return err
  234. }
  235. updateStatusFunc(newPod)
  236. pod, err = podClient.UpdateStatus(context.TODO(), newPod, metav1.UpdateOptions{})
  237. return err
  238. }); err != nil {
  239. t.Fatalf("failed to update status of pod %s: %v", podName, err)
  240. }
  241. return pod
  242. }
  243. func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
  244. podSelector := labels.Set(labelMap).AsSelector()
  245. options := metav1.ListOptions{LabelSelector: podSelector.String()}
  246. pods, err := podClient.List(context.TODO(), options)
  247. if err != nil {
  248. t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
  249. }
  250. if pods == nil {
  251. t.Fatalf("obtained a nil list of pods")
  252. }
  253. return pods
  254. }
  255. func updateSTS(t *testing.T, stsClient typedappsv1.StatefulSetInterface, stsName string, updateFunc func(*appsv1.StatefulSet)) *appsv1.StatefulSet {
  256. var sts *appsv1.StatefulSet
  257. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  258. newSTS, err := stsClient.Get(context.TODO(), stsName, metav1.GetOptions{})
  259. if err != nil {
  260. return err
  261. }
  262. updateFunc(newSTS)
  263. sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
  264. return err
  265. }); err != nil {
  266. t.Fatalf("failed to update sts %s: %v", stsName, err)
  267. }
  268. return sts
  269. }
  270. // Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
  271. func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, replicas int32) {
  272. stsClient := c.AppsV1().StatefulSets(sts.Namespace)
  273. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  274. newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
  275. if err != nil {
  276. return err
  277. }
  278. *newSTS.Spec.Replicas = replicas
  279. sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
  280. return err
  281. }); err != nil {
  282. t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err)
  283. }
  284. waitSTSStable(t, c, sts)
  285. }