evictions_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package evictions
  14. import (
  15. "context"
  16. "fmt"
  17. "net/http/httptest"
  18. "reflect"
  19. "sync"
  20. "sync/atomic"
  21. "testing"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/api/policy/v1beta1"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  28. "k8s.io/apimachinery/pkg/util/intstr"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  31. "k8s.io/client-go/dynamic"
  32. "k8s.io/client-go/informers"
  33. clientset "k8s.io/client-go/kubernetes"
  34. restclient "k8s.io/client-go/rest"
  35. "k8s.io/client-go/restmapper"
  36. "k8s.io/client-go/scale"
  37. "k8s.io/client-go/tools/cache"
  38. "k8s.io/kubernetes/pkg/controller/disruption"
  39. "k8s.io/kubernetes/test/integration/framework"
  40. )
  41. const (
  42. numOfEvictions = 10
  43. )
  44. // TestConcurrentEvictionRequests is to make sure pod disruption budgets (PDB) controller is able to
  45. // handle concurrent eviction requests. Original issue:#37605
  46. func TestConcurrentEvictionRequests(t *testing.T) {
  47. podNameFormat := "test-pod-%d"
  48. s, closeFn, rm, informers, clientSet := rmSetup(t)
  49. defer closeFn()
  50. ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
  51. defer framework.DeleteTestingNamespace(ns, s, t)
  52. stopCh := make(chan struct{})
  53. informers.Start(stopCh)
  54. go rm.Run(stopCh)
  55. defer close(stopCh)
  56. config := restclient.Config{Host: s.URL}
  57. clientSet, err := clientset.NewForConfig(&config)
  58. if err != nil {
  59. t.Fatalf("Failed to create clientset: %v", err)
  60. }
  61. var gracePeriodSeconds int64 = 30
  62. deleteOption := &metav1.DeleteOptions{
  63. GracePeriodSeconds: &gracePeriodSeconds,
  64. }
  65. // Generate numOfEvictions pods to evict
  66. for i := 0; i < numOfEvictions; i++ {
  67. podName := fmt.Sprintf(podNameFormat, i)
  68. pod := newPod(podName)
  69. if _, err := clientSet.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
  70. t.Errorf("Failed to create pod: %v", err)
  71. }
  72. addPodConditionReady(pod)
  73. if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
  74. t.Fatal(err)
  75. }
  76. }
  77. waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions, v1.PodRunning)
  78. pdb := newPDB()
  79. if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(context.TODO(), pdb, metav1.CreateOptions{}); err != nil {
  80. t.Errorf("Failed to create PodDisruptionBudget: %v", err)
  81. }
  82. waitPDBStable(t, clientSet, numOfEvictions, ns.Name, pdb.Name)
  83. var numberPodsEvicted uint32
  84. errCh := make(chan error, 3*numOfEvictions)
  85. var wg sync.WaitGroup
  86. // spawn numOfEvictions goroutines to concurrently evict the pods
  87. for i := 0; i < numOfEvictions; i++ {
  88. wg.Add(1)
  89. go func(id int, errCh chan error) {
  90. defer wg.Done()
  91. podName := fmt.Sprintf(podNameFormat, id)
  92. eviction := newEviction(ns.Name, podName, deleteOption)
  93. err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
  94. e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
  95. switch {
  96. case apierrors.IsTooManyRequests(e):
  97. return false, nil
  98. case apierrors.IsConflict(e):
  99. return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
  100. case e == nil:
  101. return true, nil
  102. default:
  103. return false, e
  104. }
  105. })
  106. if err != nil {
  107. errCh <- err
  108. // should not return here otherwise we would leak the pod
  109. }
  110. _, err = clientSet.CoreV1().Pods(ns.Name).Get(context.TODO(), podName, metav1.GetOptions{})
  111. switch {
  112. case apierrors.IsNotFound(err):
  113. atomic.AddUint32(&numberPodsEvicted, 1)
  114. // pod was evicted and deleted so return from goroutine immediately
  115. return
  116. case err == nil:
  117. // this shouldn't happen if the pod was evicted successfully
  118. errCh <- fmt.Errorf("Pod %q is expected to be evicted", podName)
  119. default:
  120. errCh <- err
  121. }
  122. // delete pod which still exists due to error
  123. e := clientSet.CoreV1().Pods(ns.Name).Delete(context.TODO(), podName, deleteOption)
  124. if e != nil {
  125. errCh <- e
  126. }
  127. }(i, errCh)
  128. }
  129. wg.Wait()
  130. close(errCh)
  131. var errList []error
  132. if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(context.TODO(), pdb.Name, deleteOption); err != nil {
  133. errList = append(errList, fmt.Errorf("Failed to delete PodDisruptionBudget: %v", err))
  134. }
  135. for err := range errCh {
  136. errList = append(errList, err)
  137. }
  138. if len(errList) > 0 {
  139. t.Fatal(utilerrors.NewAggregate(errList))
  140. }
  141. if atomic.LoadUint32(&numberPodsEvicted) != numOfEvictions {
  142. t.Fatalf("fewer number of successful evictions than expected : %d", numberPodsEvicted)
  143. }
  144. }
  145. // TestTerminalPodEviction ensures that PDB is not checked for terminal pods.
  146. func TestTerminalPodEviction(t *testing.T) {
  147. s, closeFn, rm, informers, clientSet := rmSetup(t)
  148. defer closeFn()
  149. ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t)
  150. defer framework.DeleteTestingNamespace(ns, s, t)
  151. stopCh := make(chan struct{})
  152. informers.Start(stopCh)
  153. go rm.Run(stopCh)
  154. defer close(stopCh)
  155. config := restclient.Config{Host: s.URL}
  156. clientSet, err := clientset.NewForConfig(&config)
  157. if err != nil {
  158. t.Fatalf("Failed to create clientset: %v", err)
  159. }
  160. var gracePeriodSeconds int64 = 30
  161. deleteOption := &metav1.DeleteOptions{
  162. GracePeriodSeconds: &gracePeriodSeconds,
  163. }
  164. pod := newPod("test-terminal-pod1")
  165. if _, err := clientSet.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
  166. t.Errorf("Failed to create pod: %v", err)
  167. }
  168. addPodConditionSucceeded(pod)
  169. if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
  170. t.Fatal(err)
  171. }
  172. waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodSucceeded)
  173. pdb := newPDB()
  174. if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(context.TODO(), pdb, metav1.CreateOptions{}); err != nil {
  175. t.Errorf("Failed to create PodDisruptionBudget: %v", err)
  176. }
  177. waitPDBStable(t, clientSet, 1, ns.Name, pdb.Name)
  178. pdbList, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(context.TODO(), metav1.ListOptions{})
  179. if err != nil {
  180. t.Fatalf("Error while listing pod disruption budget")
  181. }
  182. oldPdb := pdbList.Items[0]
  183. eviction := newEviction(ns.Name, pod.Name, deleteOption)
  184. err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
  185. e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
  186. switch {
  187. case apierrors.IsTooManyRequests(e):
  188. return false, nil
  189. case apierrors.IsConflict(e):
  190. return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
  191. case e == nil:
  192. return true, nil
  193. default:
  194. return false, e
  195. }
  196. })
  197. if err != nil {
  198. t.Fatalf("Eviction of pod failed %v", err)
  199. }
  200. pdbList, err = clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(context.TODO(), metav1.ListOptions{})
  201. if err != nil {
  202. t.Fatalf("Error while listing pod disruption budget")
  203. }
  204. newPdb := pdbList.Items[0]
  205. // We shouldn't see an update in pod disruption budget status' generation number as we are evicting terminal pods without checking for pod disruption.
  206. if !reflect.DeepEqual(newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration) {
  207. t.Fatalf("Expected the pdb generation to be of same value %v but got %v", newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration)
  208. }
  209. if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(context.TODO(), pdb.Name, deleteOption); err != nil {
  210. t.Fatalf("Failed to delete pod disruption budget")
  211. }
  212. }
  213. func newPod(podName string) *v1.Pod {
  214. return &v1.Pod{
  215. ObjectMeta: metav1.ObjectMeta{
  216. Name: podName,
  217. Labels: map[string]string{"app": "test-evictions"},
  218. },
  219. Spec: v1.PodSpec{
  220. Containers: []v1.Container{
  221. {
  222. Name: "fake-name",
  223. Image: "fakeimage",
  224. },
  225. },
  226. },
  227. }
  228. }
  229. func addPodConditionSucceeded(pod *v1.Pod) {
  230. pod.Status = v1.PodStatus{
  231. Phase: v1.PodSucceeded,
  232. Conditions: []v1.PodCondition{
  233. {
  234. Type: v1.PodReady,
  235. Status: v1.ConditionTrue,
  236. },
  237. },
  238. }
  239. }
  240. func addPodConditionReady(pod *v1.Pod) {
  241. pod.Status = v1.PodStatus{
  242. Phase: v1.PodRunning,
  243. Conditions: []v1.PodCondition{
  244. {
  245. Type: v1.PodReady,
  246. Status: v1.ConditionTrue,
  247. },
  248. },
  249. }
  250. }
  251. func newPDB() *v1beta1.PodDisruptionBudget {
  252. return &v1beta1.PodDisruptionBudget{
  253. ObjectMeta: metav1.ObjectMeta{
  254. Name: "test-pdb",
  255. },
  256. Spec: v1beta1.PodDisruptionBudgetSpec{
  257. MinAvailable: &intstr.IntOrString{
  258. Type: intstr.Int,
  259. IntVal: 0,
  260. },
  261. Selector: &metav1.LabelSelector{
  262. MatchLabels: map[string]string{"app": "test-evictions"},
  263. },
  264. },
  265. }
  266. }
  267. func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v1beta1.Eviction {
  268. return &v1beta1.Eviction{
  269. TypeMeta: metav1.TypeMeta{
  270. APIVersion: "Policy/v1beta1",
  271. Kind: "Eviction",
  272. },
  273. ObjectMeta: metav1.ObjectMeta{
  274. Name: evictionName,
  275. Namespace: ns,
  276. },
  277. DeleteOptions: deleteOption,
  278. }
  279. }
  280. func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) {
  281. masterConfig := framework.NewIntegrationTestMasterConfig()
  282. _, s, closeFn := framework.RunAMaster(masterConfig)
  283. config := restclient.Config{Host: s.URL}
  284. clientSet, err := clientset.NewForConfig(&config)
  285. if err != nil {
  286. t.Fatalf("Error in create clientset: %v", err)
  287. }
  288. resyncPeriod := 12 * time.Hour
  289. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
  290. client := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller"))
  291. discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
  292. mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
  293. scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
  294. scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
  295. if err != nil {
  296. t.Fatalf("Error in create scaleClient: %v", err)
  297. }
  298. rm := disruption.NewDisruptionController(
  299. informers.Core().V1().Pods(),
  300. informers.Policy().V1beta1().PodDisruptionBudgets(),
  301. informers.Core().V1().ReplicationControllers(),
  302. informers.Apps().V1().ReplicaSets(),
  303. informers.Apps().V1().Deployments(),
  304. informers.Apps().V1().StatefulSets(),
  305. client,
  306. mapper,
  307. scaleClient,
  308. )
  309. return s, closeFn, rm, informers, clientSet
  310. }
  311. // wait for the podInformer to observe the pods. Call this function before
  312. // running the RS controller to prevent the rc manager from creating new pods
  313. // rather than adopting the existing ones.
  314. func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) {
  315. if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
  316. objects := podInformer.GetIndexer().List()
  317. if len(objects) != podNum {
  318. return false, nil
  319. }
  320. for _, obj := range objects {
  321. pod := obj.(*v1.Pod)
  322. if pod.Status.Phase != phase {
  323. return false, nil
  324. }
  325. }
  326. return true, nil
  327. }); err != nil {
  328. t.Fatal(err)
  329. }
  330. }
  331. func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
  332. if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
  333. pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), pdbName, metav1.GetOptions{})
  334. if err != nil {
  335. return false, err
  336. }
  337. if pdb.Status.CurrentHealthy != podNum {
  338. return false, nil
  339. }
  340. return true, nil
  341. }); err != nil {
  342. t.Fatal(err)
  343. }
  344. }