util.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package statefulset
  14. import (
  15. "fmt"
  16. "net/http/httptest"
  17. "testing"
  18. "time"
  19. appsv1 "k8s.io/api/apps/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/resource"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/client-go/informers"
  26. clientset "k8s.io/client-go/kubernetes"
  27. typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
  28. typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
  29. restclient "k8s.io/client-go/rest"
  30. "k8s.io/client-go/util/retry"
  31. //svc "k8s.io/kubernetes/pkg/api/v1/service"
  32. "k8s.io/kubernetes/pkg/controller/statefulset"
  33. "k8s.io/kubernetes/test/integration/framework"
  34. )
  35. const (
  36. pollInterval = 100 * time.Millisecond
  37. pollTimeout = 60 * time.Second
  38. )
  39. func labelMap() map[string]string {
  40. return map[string]string{"foo": "bar"}
  41. }
  42. // newService returns a service with a fake name for StatefulSet to be created soon
  43. func newHeadlessService(namespace string) *v1.Service {
  44. return &v1.Service{
  45. TypeMeta: metav1.TypeMeta{
  46. Kind: "Service",
  47. APIVersion: "v1",
  48. },
  49. ObjectMeta: metav1.ObjectMeta{
  50. Namespace: namespace,
  51. Name: "fake-service-name",
  52. },
  53. Spec: v1.ServiceSpec{
  54. ClusterIP: "None",
  55. Ports: []v1.ServicePort{
  56. {Port: 80, Name: "http", Protocol: "TCP"},
  57. },
  58. Selector: labelMap(),
  59. },
  60. }
  61. }
  62. // newSTS returns a StatefulSet with a fake container image
  63. func newSTS(name, namespace string, replicas int) *appsv1.StatefulSet {
  64. replicasCopy := int32(replicas)
  65. return &appsv1.StatefulSet{
  66. TypeMeta: metav1.TypeMeta{
  67. Kind: "StatefulSet",
  68. APIVersion: "apps/v1",
  69. },
  70. ObjectMeta: metav1.ObjectMeta{
  71. Namespace: namespace,
  72. Name: name,
  73. },
  74. Spec: appsv1.StatefulSetSpec{
  75. PodManagementPolicy: appsv1.ParallelPodManagement,
  76. Replicas: &replicasCopy,
  77. Selector: &metav1.LabelSelector{
  78. MatchLabels: labelMap(),
  79. },
  80. Template: v1.PodTemplateSpec{
  81. ObjectMeta: metav1.ObjectMeta{
  82. Labels: labelMap(),
  83. },
  84. Spec: v1.PodSpec{
  85. Containers: []v1.Container{
  86. {
  87. Name: "fake-name",
  88. Image: "fakeimage",
  89. VolumeMounts: []v1.VolumeMount{
  90. {Name: "datadir", MountPath: "/data/"},
  91. {Name: "home", MountPath: "/home"},
  92. },
  93. },
  94. },
  95. Volumes: []v1.Volume{
  96. {
  97. Name: "datadir",
  98. VolumeSource: v1.VolumeSource{
  99. HostPath: &v1.HostPathVolumeSource{
  100. Path: fmt.Sprintf("/tmp/%v", "datadir"),
  101. },
  102. },
  103. },
  104. {
  105. Name: "home",
  106. VolumeSource: v1.VolumeSource{
  107. HostPath: &v1.HostPathVolumeSource{
  108. Path: fmt.Sprintf("/tmp/%v", "home"),
  109. },
  110. },
  111. },
  112. },
  113. },
  114. },
  115. ServiceName: "fake-service-name",
  116. UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
  117. Type: appsv1.RollingUpdateStatefulSetStrategyType,
  118. },
  119. VolumeClaimTemplates: []v1.PersistentVolumeClaim{
  120. // for volume mount "datadir"
  121. newStatefulSetPVC("fake-pvc-name"),
  122. },
  123. },
  124. }
  125. }
  126. func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
  127. return v1.PersistentVolumeClaim{
  128. ObjectMeta: metav1.ObjectMeta{
  129. Name: name,
  130. Annotations: map[string]string{
  131. "volume.alpha.kubernetes.io/storage-class": "anything",
  132. },
  133. },
  134. Spec: v1.PersistentVolumeClaimSpec{
  135. AccessModes: []v1.PersistentVolumeAccessMode{
  136. v1.ReadWriteOnce,
  137. },
  138. Resources: v1.ResourceRequirements{
  139. Requests: v1.ResourceList{
  140. v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
  141. },
  142. },
  143. },
  144. }
  145. }
  146. // scSetup sets up necessities for Statefulset integration test, including master, apiserver, informers, and clientset
  147. func scSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
  148. masterConfig := framework.NewIntegrationTestMasterConfig()
  149. _, s, closeFn := framework.RunAMaster(masterConfig)
  150. config := restclient.Config{Host: s.URL}
  151. clientSet, err := clientset.NewForConfig(&config)
  152. if err != nil {
  153. t.Fatalf("error in create clientset: %v", err)
  154. }
  155. resyncPeriod := 12 * time.Hour
  156. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
  157. sc := statefulset.NewStatefulSetController(
  158. informers.Core().V1().Pods(),
  159. informers.Apps().V1().StatefulSets(),
  160. informers.Core().V1().PersistentVolumeClaims(),
  161. informers.Apps().V1().ControllerRevisions(),
  162. clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
  163. )
  164. return s, closeFn, sc, informers, clientSet
  165. }
  166. // Run STS controller and informers
  167. func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) chan struct{} {
  168. stopCh := make(chan struct{})
  169. informers.Start(stopCh)
  170. go sc.Run(5, stopCh)
  171. return stopCh
  172. }
  173. func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) {
  174. _, err := clientSet.CoreV1().Services(headlessService.Namespace).Create(headlessService)
  175. if err != nil {
  176. t.Fatalf("failed creating headless service: %v", err)
  177. }
  178. }
  179. func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet, pods []*v1.Pod) ([]*appsv1.StatefulSet, []*v1.Pod) {
  180. var createdSTSs []*appsv1.StatefulSet
  181. var createdPods []*v1.Pod
  182. for _, sts := range stss {
  183. createdSTS, err := clientSet.AppsV1().StatefulSets(sts.Namespace).Create(sts)
  184. if err != nil {
  185. t.Fatalf("failed to create sts %s: %v", sts.Name, err)
  186. }
  187. createdSTSs = append(createdSTSs, createdSTS)
  188. }
  189. for _, pod := range pods {
  190. createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(pod)
  191. if err != nil {
  192. t.Fatalf("failed to create pod %s: %v", pod.Name, err)
  193. }
  194. createdPods = append(createdPods, createdPod)
  195. }
  196. return createdSTSs, createdPods
  197. }
  198. // Verify .Status.Replicas is equal to .Spec.Replicas
  199. func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *appsv1.StatefulSet) {
  200. stsClient := clientSet.AppsV1().StatefulSets(sts.Namespace)
  201. desiredGeneration := sts.Generation
  202. if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  203. newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{})
  204. if err != nil {
  205. return false, err
  206. }
  207. return newSTS.Status.Replicas == *newSTS.Spec.Replicas && newSTS.Status.ObservedGeneration >= desiredGeneration, nil
  208. }); err != nil {
  209. t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err)
  210. }
  211. }
  212. func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
  213. var pod *v1.Pod
  214. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  215. newPod, err := podClient.Get(podName, metav1.GetOptions{})
  216. if err != nil {
  217. return err
  218. }
  219. updateFunc(newPod)
  220. pod, err = podClient.Update(newPod)
  221. return err
  222. }); err != nil {
  223. t.Fatalf("failed to update pod %s: %v", podName, err)
  224. }
  225. return pod
  226. }
  227. func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod {
  228. var pod *v1.Pod
  229. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  230. newPod, err := podClient.Get(podName, metav1.GetOptions{})
  231. if err != nil {
  232. return err
  233. }
  234. updateStatusFunc(newPod)
  235. pod, err = podClient.UpdateStatus(newPod)
  236. return err
  237. }); err != nil {
  238. t.Fatalf("failed to update status of pod %s: %v", podName, err)
  239. }
  240. return pod
  241. }
  242. func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
  243. podSelector := labels.Set(labelMap).AsSelector()
  244. options := metav1.ListOptions{LabelSelector: podSelector.String()}
  245. pods, err := podClient.List(options)
  246. if err != nil {
  247. t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
  248. }
  249. if pods == nil {
  250. t.Fatalf("obtained a nil list of pods")
  251. }
  252. return pods
  253. }
  254. func updateSTS(t *testing.T, stsClient typedappsv1.StatefulSetInterface, stsName string, updateFunc func(*appsv1.StatefulSet)) *appsv1.StatefulSet {
  255. var sts *appsv1.StatefulSet
  256. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  257. newSTS, err := stsClient.Get(stsName, metav1.GetOptions{})
  258. if err != nil {
  259. return err
  260. }
  261. updateFunc(newSTS)
  262. sts, err = stsClient.Update(newSTS)
  263. return err
  264. }); err != nil {
  265. t.Fatalf("failed to update sts %s: %v", stsName, err)
  266. }
  267. return sts
  268. }
  269. // Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
  270. func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, replicas int32) {
  271. stsClient := c.AppsV1().StatefulSets(sts.Namespace)
  272. if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  273. newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{})
  274. if err != nil {
  275. return err
  276. }
  277. *newSTS.Spec.Replicas = replicas
  278. sts, err = stsClient.Update(newSTS)
  279. return err
  280. }); err != nil {
  281. t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err)
  282. }
  283. waitSTSStable(t, c, sts)
  284. }