123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package statefulset
- import (
- "context"
- "fmt"
- "path/filepath"
- "strings"
- "time"
- appsv1 "k8s.io/api/apps/v1"
- v1 "k8s.io/api/core/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/test/e2e/framework"
- "k8s.io/kubernetes/test/e2e/manifest"
- )
- // CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
- func CreateStatefulSet(c clientset.Interface, manifestPath, ns string) *appsv1.StatefulSet {
- mkpath := func(file string) string {
- return filepath.Join(manifestPath, file)
- }
- framework.Logf("Parsing statefulset from %v", mkpath("statefulset.yaml"))
- ss, err := manifest.StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
- framework.ExpectNoError(err)
- framework.Logf("Parsing service from %v", mkpath("service.yaml"))
- svc, err := manifest.SvcFromManifest(mkpath("service.yaml"))
- framework.ExpectNoError(err)
- framework.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
- _, err = c.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
- framework.ExpectNoError(err)
- framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
- _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
- framework.ExpectNoError(err)
- WaitForRunningAndReady(c, *ss.Spec.Replicas, ss)
- return ss
- }
- // GetPodList gets the current Pods in ss.
- func GetPodList(c clientset.Interface, ss *appsv1.StatefulSet) *v1.PodList {
- selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
- framework.ExpectNoError(err)
- podList, err := c.CoreV1().Pods(ss.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})
- framework.ExpectNoError(err)
- return podList
- }
- // DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
- func DeleteAllStatefulSets(c clientset.Interface, ns string) {
- ssList, err := c.AppsV1().StatefulSets(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String()})
- framework.ExpectNoError(err)
- // Scale down each statefulset, then delete it completely.
- // Deleting a pvc without doing this will leak volumes, #25101.
- errList := []string{}
- for i := range ssList.Items {
- ss := &ssList.Items[i]
- var err error
- if ss, err = Scale(c, ss, 0); err != nil {
- errList = append(errList, fmt.Sprintf("%v", err))
- }
- WaitForStatusReplicas(c, ss, 0)
- framework.Logf("Deleting statefulset %v", ss.Name)
- // Use OrphanDependents=false so it's deleted synchronously.
- // We already made sure the Pods are gone inside Scale().
- if err := c.AppsV1().StatefulSets(ss.Namespace).Delete(context.TODO(), ss.Name, &metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil {
- errList = append(errList, fmt.Sprintf("%v", err))
- }
- }
- // pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
- pvNames := sets.NewString()
- // TODO: Don't assume all pvcs in the ns belong to a statefulset
- pvcPollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
- pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String()})
- if err != nil {
- framework.Logf("WARNING: Failed to list pvcs, retrying %v", err)
- return false, nil
- }
- for _, pvc := range pvcList.Items {
- pvNames.Insert(pvc.Spec.VolumeName)
- // TODO: Double check that there are no pods referencing the pvc
- framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
- if err := c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvc.Name, nil); err != nil {
- return false, nil
- }
- }
- return true, nil
- })
- if pvcPollErr != nil {
- errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
- }
- pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
- pvList, err := c.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String()})
- if err != nil {
- framework.Logf("WARNING: Failed to list pvs, retrying %v", err)
- return false, nil
- }
- waitingFor := []string{}
- for _, pv := range pvList.Items {
- if pvNames.Has(pv.Name) {
- waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
- }
- }
- if len(waitingFor) == 0 {
- return true, nil
- }
- framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
- return false, nil
- })
- if pollErr != nil {
- errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
- }
- if len(errList) != 0 {
- framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
- }
- }
- // Scale scales ss to count replicas.
- func Scale(c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.StatefulSet, error) {
- name := ss.Name
- ns := ss.Namespace
- framework.Logf("Scaling statefulset %s to %d", name, count)
- ss = update(c, ns, name, func(ss *appsv1.StatefulSet) { *(ss.Spec.Replicas) = count })
- var statefulPodList *v1.PodList
- pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, func() (bool, error) {
- statefulPodList = GetPodList(c, ss)
- if int32(len(statefulPodList.Items)) == count {
- return true, nil
- }
- return false, nil
- })
- if pollErr != nil {
- unhealthy := []string{}
- for _, statefulPod := range statefulPodList.Items {
- delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, podutil.IsPodReady(&statefulPod)
- if delTs != nil || phase != v1.PodRunning || !readiness {
- unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
- }
- }
- return ss, fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
- }
- return ss, nil
- }
- // UpdateReplicas updates the replicas of ss to count.
- func UpdateReplicas(c clientset.Interface, ss *appsv1.StatefulSet, count int32) {
- update(c, ss.Namespace, ss.Name, func(ss *appsv1.StatefulSet) { *(ss.Spec.Replicas) = count })
- }
- // Restart scales ss to 0 and then back to its previous number of replicas.
- func Restart(c clientset.Interface, ss *appsv1.StatefulSet) {
- oldReplicas := *(ss.Spec.Replicas)
- ss, err := Scale(c, ss, 0)
- framework.ExpectNoError(err)
- // Wait for controller to report the desired number of Pods.
- // This way we know the controller has observed all Pod deletions
- // before we scale it back up.
- WaitForStatusReplicas(c, ss, 0)
- update(c, ss.Namespace, ss.Name, func(ss *appsv1.StatefulSet) { *(ss.Spec.Replicas) = oldReplicas })
- }
- // CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
- func CheckHostname(c clientset.Interface, ss *appsv1.StatefulSet) error {
- cmd := "printf $(hostname)"
- podList := GetPodList(c, ss)
- for _, statefulPod := range podList.Items {
- hostname, err := framework.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
- if err != nil {
- return err
- }
- if hostname != statefulPod.Name {
- return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
- }
- }
- return nil
- }
- // CheckMount checks that the mount at mountPath is valid for all Pods in ss.
- func CheckMount(c clientset.Interface, ss *appsv1.StatefulSet, mountPath string) error {
- for _, cmd := range []string{
- // Print inode, size etc
- fmt.Sprintf("ls -idlh %v", mountPath),
- // Print subdirs
- fmt.Sprintf("find %v", mountPath),
- // Try writing
- fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
- } {
- if err := ExecInStatefulPods(c, ss, cmd); err != nil {
- return fmt.Errorf("failed to execute %v, error: %v", cmd, err)
- }
- }
- return nil
- }
- // CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName.
- func CheckServiceName(ss *appsv1.StatefulSet, expectedServiceName string) error {
- framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
- if expectedServiceName != ss.Spec.ServiceName {
- return fmt.Errorf("wrong service name governing statefulset. Expected %s got %s",
- expectedServiceName, ss.Spec.ServiceName)
- }
- return nil
- }
- // ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
- func ExecInStatefulPods(c clientset.Interface, ss *appsv1.StatefulSet, cmd string) error {
- podList := GetPodList(c, ss)
- for _, statefulPod := range podList.Items {
- stdout, err := framework.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
- framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // udpate updates a statefulset, and it is only used within rest.go
- func update(c clientset.Interface, ns, name string, update func(ss *appsv1.StatefulSet)) *appsv1.StatefulSet {
- for i := 0; i < 3; i++ {
- ss, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{})
- if err != nil {
- framework.Failf("failed to get statefulset %q: %v", name, err)
- }
- update(ss)
- ss, err = c.AppsV1().StatefulSets(ns).Update(context.TODO(), ss, metav1.UpdateOptions{})
- if err == nil {
- return ss
- }
- if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
- framework.Failf("failed to update statefulset %q: %v", name, err)
- }
- }
- framework.Failf("too many retries draining statefulset %q", name)
- return nil
- }
|