123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package evictions
- import (
- "fmt"
- "net/http/httptest"
- "reflect"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/api/policy/v1beta1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/apimachinery/pkg/util/wait"
- cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
- "k8s.io/client-go/dynamic"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/restmapper"
- "k8s.io/client-go/scale"
- "k8s.io/client-go/tools/cache"
- "k8s.io/kubernetes/pkg/controller/disruption"
- "k8s.io/kubernetes/test/integration/framework"
- )
- const (
- numOfEvictions = 10
- )
- // TestConcurrentEvictionRequests is to make sure pod disruption budgets (PDB) controller is able to
- // handle concurrent eviction requests. Original issue:#37605
- func TestConcurrentEvictionRequests(t *testing.T) {
- podNameFormat := "test-pod-%d"
- s, closeFn, rm, informers, clientSet := rmSetup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
- defer framework.DeleteTestingNamespace(ns, s, t)
- stopCh := make(chan struct{})
- informers.Start(stopCh)
- go rm.Run(stopCh)
- defer close(stopCh)
- config := restclient.Config{Host: s.URL}
- clientSet, err := clientset.NewForConfig(&config)
- if err != nil {
- t.Fatalf("Failed to create clientset: %v", err)
- }
- var gracePeriodSeconds int64 = 30
- deleteOption := &metav1.DeleteOptions{
- GracePeriodSeconds: &gracePeriodSeconds,
- }
- // Generate numOfEvictions pods to evict
- for i := 0; i < numOfEvictions; i++ {
- podName := fmt.Sprintf(podNameFormat, i)
- pod := newPod(podName)
- if _, err := clientSet.CoreV1().Pods(ns.Name).Create(pod); err != nil {
- t.Errorf("Failed to create pod: %v", err)
- }
- addPodConditionReady(pod)
- if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
- t.Fatal(err)
- }
- }
- waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions, v1.PodRunning)
- pdb := newPDB()
- if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
- t.Errorf("Failed to create PodDisruptionBudget: %v", err)
- }
- waitPDBStable(t, clientSet, numOfEvictions, ns.Name, pdb.Name)
- var numberPodsEvicted uint32
- errCh := make(chan error, 3*numOfEvictions)
- var wg sync.WaitGroup
- // spawn numOfEvictions goroutines to concurrently evict the pods
- for i := 0; i < numOfEvictions; i++ {
- wg.Add(1)
- go func(id int, errCh chan error) {
- defer wg.Done()
- podName := fmt.Sprintf(podNameFormat, id)
- eviction := newEviction(ns.Name, podName, deleteOption)
- err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
- e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
- switch {
- case errors.IsTooManyRequests(e):
- return false, nil
- case errors.IsConflict(e):
- return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
- case e == nil:
- return true, nil
- default:
- return false, e
- }
- })
- if err != nil {
- errCh <- err
- // should not return here otherwise we would leak the pod
- }
- _, err = clientSet.CoreV1().Pods(ns.Name).Get(podName, metav1.GetOptions{})
- switch {
- case errors.IsNotFound(err):
- atomic.AddUint32(&numberPodsEvicted, 1)
- // pod was evicted and deleted so return from goroutine immediately
- return
- case err == nil:
- // this shouldn't happen if the pod was evicted successfully
- errCh <- fmt.Errorf("Pod %q is expected to be evicted", podName)
- default:
- errCh <- err
- }
- // delete pod which still exists due to error
- e := clientSet.CoreV1().Pods(ns.Name).Delete(podName, deleteOption)
- if e != nil {
- errCh <- e
- }
- }(i, errCh)
- }
- wg.Wait()
- close(errCh)
- var errList []error
- if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil {
- errList = append(errList, fmt.Errorf("Failed to delete PodDisruptionBudget: %v", err))
- }
- for err := range errCh {
- errList = append(errList, err)
- }
- if len(errList) > 0 {
- t.Fatal(utilerrors.NewAggregate(errList))
- }
- if atomic.LoadUint32(&numberPodsEvicted) != numOfEvictions {
- t.Fatalf("fewer number of successful evictions than expected : %d", numberPodsEvicted)
- }
- }
- // TestTerminalPodEviction ensures that PDB is not checked for terminal pods.
- func TestTerminalPodEviction(t *testing.T) {
- s, closeFn, rm, informers, clientSet := rmSetup(t)
- defer closeFn()
- ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t)
- defer framework.DeleteTestingNamespace(ns, s, t)
- stopCh := make(chan struct{})
- informers.Start(stopCh)
- go rm.Run(stopCh)
- defer close(stopCh)
- config := restclient.Config{Host: s.URL}
- clientSet, err := clientset.NewForConfig(&config)
- if err != nil {
- t.Fatalf("Failed to create clientset: %v", err)
- }
- var gracePeriodSeconds int64 = 30
- deleteOption := &metav1.DeleteOptions{
- GracePeriodSeconds: &gracePeriodSeconds,
- }
- pod := newPod("test-terminal-pod1")
- if _, err := clientSet.CoreV1().Pods(ns.Name).Create(pod); err != nil {
- t.Errorf("Failed to create pod: %v", err)
- }
- addPodConditionSucceeded(pod)
- if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
- t.Fatal(err)
- }
- waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodSucceeded)
- pdb := newPDB()
- if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
- t.Errorf("Failed to create PodDisruptionBudget: %v", err)
- }
- waitPDBStable(t, clientSet, 1, ns.Name, pdb.Name)
- pdbList, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(metav1.ListOptions{})
- if err != nil {
- t.Fatalf("Error while listing pod disruption budget")
- }
- oldPdb := pdbList.Items[0]
- eviction := newEviction(ns.Name, pod.Name, deleteOption)
- err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
- e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
- switch {
- case errors.IsTooManyRequests(e):
- return false, nil
- case errors.IsConflict(e):
- return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
- case e == nil:
- return true, nil
- default:
- return false, e
- }
- })
- if err != nil {
- t.Fatalf("Eviction of pod failed %v", err)
- }
- pdbList, err = clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(metav1.ListOptions{})
- if err != nil {
- t.Fatalf("Error while listing pod disruption budget")
- }
- newPdb := pdbList.Items[0]
- // We shouldn't see an update in pod disruption budget status' generation number as we are evicting terminal pods without checking for pod disruption.
- if !reflect.DeepEqual(newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration) {
- t.Fatalf("Expected the pdb generation to be of same value %v but got %v", newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration)
- }
- if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil {
- t.Fatalf("Failed to delete pod disruption budget")
- }
- }
- func newPod(podName string) *v1.Pod {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: podName,
- Labels: map[string]string{"app": "test-evictions"},
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "fake-name",
- Image: "fakeimage",
- },
- },
- },
- }
- }
- func addPodConditionSucceeded(pod *v1.Pod) {
- pod.Status = v1.PodStatus{
- Phase: v1.PodSucceeded,
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionTrue,
- },
- },
- }
- }
- func addPodConditionReady(pod *v1.Pod) {
- pod.Status = v1.PodStatus{
- Phase: v1.PodRunning,
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionTrue,
- },
- },
- }
- }
- func newPDB() *v1beta1.PodDisruptionBudget {
- return &v1beta1.PodDisruptionBudget{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-pdb",
- },
- Spec: v1beta1.PodDisruptionBudgetSpec{
- MinAvailable: &intstr.IntOrString{
- Type: intstr.Int,
- IntVal: 0,
- },
- Selector: &metav1.LabelSelector{
- MatchLabels: map[string]string{"app": "test-evictions"},
- },
- },
- }
- }
- func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v1beta1.Eviction {
- return &v1beta1.Eviction{
- TypeMeta: metav1.TypeMeta{
- APIVersion: "Policy/v1beta1",
- Kind: "Eviction",
- },
- ObjectMeta: metav1.ObjectMeta{
- Name: evictionName,
- Namespace: ns,
- },
- DeleteOptions: deleteOption,
- }
- }
- func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) {
- masterConfig := framework.NewIntegrationTestMasterConfig()
- _, s, closeFn := framework.RunAMaster(masterConfig)
- config := restclient.Config{Host: s.URL}
- clientSet, err := clientset.NewForConfig(&config)
- if err != nil {
- t.Fatalf("Error in create clientset: %v", err)
- }
- resyncPeriod := 12 * time.Hour
- informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
- client := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller"))
- discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
- mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
- scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
- scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
- if err != nil {
- t.Fatalf("Error in create scaleClient: %v", err)
- }
- rm := disruption.NewDisruptionController(
- informers.Core().V1().Pods(),
- informers.Policy().V1beta1().PodDisruptionBudgets(),
- informers.Core().V1().ReplicationControllers(),
- informers.Apps().V1().ReplicaSets(),
- informers.Apps().V1().Deployments(),
- informers.Apps().V1().StatefulSets(),
- client,
- mapper,
- scaleClient,
- )
- return s, closeFn, rm, informers, clientSet
- }
- // wait for the podInformer to observe the pods. Call this function before
- // running the RS controller to prevent the rc manager from creating new pods
- // rather than adopting the existing ones.
- func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) {
- if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
- objects := podInformer.GetIndexer().List()
- if len(objects) != podNum {
- return false, nil
- }
- for _, obj := range objects {
- pod := obj.(*v1.Pod)
- if pod.Status.Phase != phase {
- return false, nil
- }
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
- func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
- if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
- pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(pdbName, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- if pdb.Status.CurrentHealthy != podNum {
- return false, nil
- }
- return true, nil
- }); err != nil {
- t.Fatal(err)
- }
- }
|