123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "fmt"
- "net/http/httptest"
- "testing"
- "time"
- appsv1 "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
- typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/util/retry"
- //svc "k8s.io/kubernetes/pkg/api/v1/service"
- "k8s.io/kubernetes/pkg/controller/statefulset"
- "k8s.io/kubernetes/test/integration/framework"
- )
- const (
- pollInterval = 100 * time.Millisecond
- pollTimeout = 60 * time.Second
- )
- func labelMap() map[string]string {
- return map[string]string{"foo": "bar"}
- }
- // newService returns a service with a fake name for StatefulSet to be created soon
- func newHeadlessService(namespace string) *v1.Service {
- return &v1.Service{
- TypeMeta: metav1.TypeMeta{
- Kind: "Service",
- APIVersion: "v1",
- },
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: "fake-service-name",
- },
- Spec: v1.ServiceSpec{
- ClusterIP: "None",
- Ports: []v1.ServicePort{
- {Port: 80, Name: "http", Protocol: "TCP"},
- },
- Selector: labelMap(),
- },
- }
- }
- // newSTS returns a StatefulSet with a fake container image
- func newSTS(name, namespace string, replicas int) *appsv1.StatefulSet {
- replicasCopy := int32(replicas)
- return &appsv1.StatefulSet{
- TypeMeta: metav1.TypeMeta{
- Kind: "StatefulSet",
- APIVersion: "apps/v1",
- },
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: name,
- },
- Spec: appsv1.StatefulSetSpec{
- PodManagementPolicy: appsv1.ParallelPodManagement,
- Replicas: &replicasCopy,
- Selector: &metav1.LabelSelector{
- MatchLabels: labelMap(),
- },
- Template: v1.PodTemplateSpec{
- ObjectMeta: metav1.ObjectMeta{
- Labels: labelMap(),
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "fake-name",
- Image: "fakeimage",
- VolumeMounts: []v1.VolumeMount{
- {Name: "datadir", MountPath: "/data/"},
- {Name: "home", MountPath: "/home"},
- },
- },
- },
- Volumes: []v1.Volume{
- {
- Name: "datadir",
- VolumeSource: v1.VolumeSource{
- HostPath: &v1.HostPathVolumeSource{
- Path: fmt.Sprintf("/tmp/%v", "datadir"),
- },
- },
- },
- {
- Name: "home",
- VolumeSource: v1.VolumeSource{
- HostPath: &v1.HostPathVolumeSource{
- Path: fmt.Sprintf("/tmp/%v", "home"),
- },
- },
- },
- },
- },
- },
- ServiceName: "fake-service-name",
- UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
- Type: appsv1.RollingUpdateStatefulSetStrategyType,
- },
- VolumeClaimTemplates: []v1.PersistentVolumeClaim{
- // for volume mount "datadir"
- newStatefulSetPVC("fake-pvc-name"),
- },
- },
- }
- }
- func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
- return v1.PersistentVolumeClaim{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Annotations: map[string]string{
- "volume.alpha.kubernetes.io/storage-class": "anything",
- },
- },
- Spec: v1.PersistentVolumeClaimSpec{
- AccessModes: []v1.PersistentVolumeAccessMode{
- v1.ReadWriteOnce,
- },
- Resources: v1.ResourceRequirements{
- Requests: v1.ResourceList{
- v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
- },
- },
- },
- }
- }
- // scSetup sets up necessities for Statefulset integration test, including master, apiserver, informers, and clientset
- func scSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
- masterConfig := framework.NewIntegrationTestMasterConfig()
- _, s, closeFn := framework.RunAMaster(masterConfig)
- config := restclient.Config{Host: s.URL}
- clientSet, err := clientset.NewForConfig(&config)
- if err != nil {
- t.Fatalf("error in create clientset: %v", err)
- }
- resyncPeriod := 12 * time.Hour
- informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
- sc := statefulset.NewStatefulSetController(
- informers.Core().V1().Pods(),
- informers.Apps().V1().StatefulSets(),
- informers.Core().V1().PersistentVolumeClaims(),
- informers.Apps().V1().ControllerRevisions(),
- clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
- )
- return s, closeFn, sc, informers, clientSet
- }
- // Run STS controller and informers
- func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) chan struct{} {
- stopCh := make(chan struct{})
- informers.Start(stopCh)
- go sc.Run(5, stopCh)
- return stopCh
- }
- func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) {
- _, err := clientSet.CoreV1().Services(headlessService.Namespace).Create(headlessService)
- if err != nil {
- t.Fatalf("failed creating headless service: %v", err)
- }
- }
- func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet, pods []*v1.Pod) ([]*appsv1.StatefulSet, []*v1.Pod) {
- var createdSTSs []*appsv1.StatefulSet
- var createdPods []*v1.Pod
- for _, sts := range stss {
- createdSTS, err := clientSet.AppsV1().StatefulSets(sts.Namespace).Create(sts)
- if err != nil {
- t.Fatalf("failed to create sts %s: %v", sts.Name, err)
- }
- createdSTSs = append(createdSTSs, createdSTS)
- }
- for _, pod := range pods {
- createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(pod)
- if err != nil {
- t.Fatalf("failed to create pod %s: %v", pod.Name, err)
- }
- createdPods = append(createdPods, createdPod)
- }
- return createdSTSs, createdPods
- }
- // Verify .Status.Replicas is equal to .Spec.Replicas
- func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *appsv1.StatefulSet) {
- stsClient := clientSet.AppsV1().StatefulSets(sts.Namespace)
- desiredGeneration := sts.Generation
- if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
- newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- return newSTS.Status.Replicas == *newSTS.Spec.Replicas && newSTS.Status.ObservedGeneration >= desiredGeneration, nil
- }); err != nil {
- t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err)
- }
- }
- func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
- var pod *v1.Pod
- if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
- newPod, err := podClient.Get(podName, metav1.GetOptions{})
- if err != nil {
- return err
- }
- updateFunc(newPod)
- pod, err = podClient.Update(newPod)
- return err
- }); err != nil {
- t.Fatalf("failed to update pod %s: %v", podName, err)
- }
- return pod
- }
- func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod {
- var pod *v1.Pod
- if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
- newPod, err := podClient.Get(podName, metav1.GetOptions{})
- if err != nil {
- return err
- }
- updateStatusFunc(newPod)
- pod, err = podClient.UpdateStatus(newPod)
- return err
- }); err != nil {
- t.Fatalf("failed to update status of pod %s: %v", podName, err)
- }
- return pod
- }
- func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
- podSelector := labels.Set(labelMap).AsSelector()
- options := metav1.ListOptions{LabelSelector: podSelector.String()}
- pods, err := podClient.List(options)
- if err != nil {
- t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
- }
- if pods == nil {
- t.Fatalf("obtained a nil list of pods")
- }
- return pods
- }
- func updateSTS(t *testing.T, stsClient typedappsv1.StatefulSetInterface, stsName string, updateFunc func(*appsv1.StatefulSet)) *appsv1.StatefulSet {
- var sts *appsv1.StatefulSet
- if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
- newSTS, err := stsClient.Get(stsName, metav1.GetOptions{})
- if err != nil {
- return err
- }
- updateFunc(newSTS)
- sts, err = stsClient.Update(newSTS)
- return err
- }); err != nil {
- t.Fatalf("failed to update sts %s: %v", stsName, err)
- }
- return sts
- }
- // Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
- func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, replicas int32) {
- stsClient := c.AppsV1().StatefulSets(sts.Namespace)
- if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
- newSTS, err := stsClient.Get(sts.Name, metav1.GetOptions{})
- if err != nil {
- return err
- }
- *newSTS.Spec.Replicas = replicas
- sts, err = stsClient.Update(newSTS)
- return err
- }); err != nil {
- t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err)
- }
- waitSTSStable(t, c, sts)
- }
|