evictions_test.go 12 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package evictions
  14. import (
  15. "fmt"
  16. "net/http/httptest"
  17. "reflect"
  18. "sync"
  19. "sync/atomic"
  20. "testing"
  21. "time"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/api/policy/v1beta1"
  24. "k8s.io/apimachinery/pkg/api/errors"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  30. "k8s.io/client-go/dynamic"
  31. "k8s.io/client-go/informers"
  32. clientset "k8s.io/client-go/kubernetes"
  33. restclient "k8s.io/client-go/rest"
  34. "k8s.io/client-go/restmapper"
  35. "k8s.io/client-go/scale"
  36. "k8s.io/client-go/tools/cache"
  37. "k8s.io/kubernetes/pkg/controller/disruption"
  38. "k8s.io/kubernetes/test/integration/framework"
  39. )
  40. const (
  41. numOfEvictions = 10
  42. )
  43. // TestConcurrentEvictionRequests is to make sure pod disruption budgets (PDB) controller is able to
  44. // handle concurrent eviction requests. Original issue:#37605
  45. func TestConcurrentEvictionRequests(t *testing.T) {
  46. podNameFormat := "test-pod-%d"
  47. s, closeFn, rm, informers, clientSet := rmSetup(t)
  48. defer closeFn()
  49. ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
  50. defer framework.DeleteTestingNamespace(ns, s, t)
  51. stopCh := make(chan struct{})
  52. informers.Start(stopCh)
  53. go rm.Run(stopCh)
  54. defer close(stopCh)
  55. config := restclient.Config{Host: s.URL}
  56. clientSet, err := clientset.NewForConfig(&config)
  57. if err != nil {
  58. t.Fatalf("Failed to create clientset: %v", err)
  59. }
  60. var gracePeriodSeconds int64 = 30
  61. deleteOption := &metav1.DeleteOptions{
  62. GracePeriodSeconds: &gracePeriodSeconds,
  63. }
  64. // Generate numOfEvictions pods to evict
  65. for i := 0; i < numOfEvictions; i++ {
  66. podName := fmt.Sprintf(podNameFormat, i)
  67. pod := newPod(podName)
  68. if _, err := clientSet.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  69. t.Errorf("Failed to create pod: %v", err)
  70. }
  71. addPodConditionReady(pod)
  72. if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
  73. t.Fatal(err)
  74. }
  75. }
  76. waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions, v1.PodRunning)
  77. pdb := newPDB()
  78. if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
  79. t.Errorf("Failed to create PodDisruptionBudget: %v", err)
  80. }
  81. waitPDBStable(t, clientSet, numOfEvictions, ns.Name, pdb.Name)
  82. var numberPodsEvicted uint32
  83. errCh := make(chan error, 3*numOfEvictions)
  84. var wg sync.WaitGroup
  85. // spawn numOfEvictions goroutines to concurrently evict the pods
  86. for i := 0; i < numOfEvictions; i++ {
  87. wg.Add(1)
  88. go func(id int, errCh chan error) {
  89. defer wg.Done()
  90. podName := fmt.Sprintf(podNameFormat, id)
  91. eviction := newEviction(ns.Name, podName, deleteOption)
  92. err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
  93. e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
  94. switch {
  95. case errors.IsTooManyRequests(e):
  96. return false, nil
  97. case errors.IsConflict(e):
  98. return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
  99. case e == nil:
  100. return true, nil
  101. default:
  102. return false, e
  103. }
  104. })
  105. if err != nil {
  106. errCh <- err
  107. // should not return here otherwise we would leak the pod
  108. }
  109. _, err = clientSet.CoreV1().Pods(ns.Name).Get(podName, metav1.GetOptions{})
  110. switch {
  111. case errors.IsNotFound(err):
  112. atomic.AddUint32(&numberPodsEvicted, 1)
  113. // pod was evicted and deleted so return from goroutine immediately
  114. return
  115. case err == nil:
  116. // this shouldn't happen if the pod was evicted successfully
  117. errCh <- fmt.Errorf("Pod %q is expected to be evicted", podName)
  118. default:
  119. errCh <- err
  120. }
  121. // delete pod which still exists due to error
  122. e := clientSet.CoreV1().Pods(ns.Name).Delete(podName, deleteOption)
  123. if e != nil {
  124. errCh <- e
  125. }
  126. }(i, errCh)
  127. }
  128. wg.Wait()
  129. close(errCh)
  130. var errList []error
  131. if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil {
  132. errList = append(errList, fmt.Errorf("Failed to delete PodDisruptionBudget: %v", err))
  133. }
  134. for err := range errCh {
  135. errList = append(errList, err)
  136. }
  137. if len(errList) > 0 {
  138. t.Fatal(utilerrors.NewAggregate(errList))
  139. }
  140. if atomic.LoadUint32(&numberPodsEvicted) != numOfEvictions {
  141. t.Fatalf("fewer number of successful evictions than expected : %d", numberPodsEvicted)
  142. }
  143. }
  144. // TestTerminalPodEviction ensures that PDB is not checked for terminal pods.
  145. func TestTerminalPodEviction(t *testing.T) {
  146. s, closeFn, rm, informers, clientSet := rmSetup(t)
  147. defer closeFn()
  148. ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t)
  149. defer framework.DeleteTestingNamespace(ns, s, t)
  150. stopCh := make(chan struct{})
  151. informers.Start(stopCh)
  152. go rm.Run(stopCh)
  153. defer close(stopCh)
  154. config := restclient.Config{Host: s.URL}
  155. clientSet, err := clientset.NewForConfig(&config)
  156. if err != nil {
  157. t.Fatalf("Failed to create clientset: %v", err)
  158. }
  159. var gracePeriodSeconds int64 = 30
  160. deleteOption := &metav1.DeleteOptions{
  161. GracePeriodSeconds: &gracePeriodSeconds,
  162. }
  163. pod := newPod("test-terminal-pod1")
  164. if _, err := clientSet.CoreV1().Pods(ns.Name).Create(pod); err != nil {
  165. t.Errorf("Failed to create pod: %v", err)
  166. }
  167. addPodConditionSucceeded(pod)
  168. if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(pod); err != nil {
  169. t.Fatal(err)
  170. }
  171. waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodSucceeded)
  172. pdb := newPDB()
  173. if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
  174. t.Errorf("Failed to create PodDisruptionBudget: %v", err)
  175. }
  176. waitPDBStable(t, clientSet, 1, ns.Name, pdb.Name)
  177. pdbList, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(metav1.ListOptions{})
  178. if err != nil {
  179. t.Fatalf("Error while listing pod disruption budget")
  180. }
  181. oldPdb := pdbList.Items[0]
  182. eviction := newEviction(ns.Name, pod.Name, deleteOption)
  183. err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
  184. e := clientSet.PolicyV1beta1().Evictions(ns.Name).Evict(eviction)
  185. switch {
  186. case errors.IsTooManyRequests(e):
  187. return false, nil
  188. case errors.IsConflict(e):
  189. return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e)
  190. case e == nil:
  191. return true, nil
  192. default:
  193. return false, e
  194. }
  195. })
  196. if err != nil {
  197. t.Fatalf("Eviction of pod failed %v", err)
  198. }
  199. pdbList, err = clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).List(metav1.ListOptions{})
  200. if err != nil {
  201. t.Fatalf("Error while listing pod disruption budget")
  202. }
  203. newPdb := pdbList.Items[0]
  204. // We shouldn't see an update in pod disruption budget status' generation number as we are evicting terminal pods without checking for pod disruption.
  205. if !reflect.DeepEqual(newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration) {
  206. t.Fatalf("Expected the pdb generation to be of same value %v but got %v", newPdb.Status.ObservedGeneration, oldPdb.Status.ObservedGeneration)
  207. }
  208. if err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil {
  209. t.Fatalf("Failed to delete pod disruption budget")
  210. }
  211. }
  212. func newPod(podName string) *v1.Pod {
  213. return &v1.Pod{
  214. ObjectMeta: metav1.ObjectMeta{
  215. Name: podName,
  216. Labels: map[string]string{"app": "test-evictions"},
  217. },
  218. Spec: v1.PodSpec{
  219. Containers: []v1.Container{
  220. {
  221. Name: "fake-name",
  222. Image: "fakeimage",
  223. },
  224. },
  225. },
  226. }
  227. }
  228. func addPodConditionSucceeded(pod *v1.Pod) {
  229. pod.Status = v1.PodStatus{
  230. Phase: v1.PodSucceeded,
  231. Conditions: []v1.PodCondition{
  232. {
  233. Type: v1.PodReady,
  234. Status: v1.ConditionTrue,
  235. },
  236. },
  237. }
  238. }
  239. func addPodConditionReady(pod *v1.Pod) {
  240. pod.Status = v1.PodStatus{
  241. Phase: v1.PodRunning,
  242. Conditions: []v1.PodCondition{
  243. {
  244. Type: v1.PodReady,
  245. Status: v1.ConditionTrue,
  246. },
  247. },
  248. }
  249. }
  250. func newPDB() *v1beta1.PodDisruptionBudget {
  251. return &v1beta1.PodDisruptionBudget{
  252. ObjectMeta: metav1.ObjectMeta{
  253. Name: "test-pdb",
  254. },
  255. Spec: v1beta1.PodDisruptionBudgetSpec{
  256. MinAvailable: &intstr.IntOrString{
  257. Type: intstr.Int,
  258. IntVal: 0,
  259. },
  260. Selector: &metav1.LabelSelector{
  261. MatchLabels: map[string]string{"app": "test-evictions"},
  262. },
  263. },
  264. }
  265. }
  266. func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v1beta1.Eviction {
  267. return &v1beta1.Eviction{
  268. TypeMeta: metav1.TypeMeta{
  269. APIVersion: "Policy/v1beta1",
  270. Kind: "Eviction",
  271. },
  272. ObjectMeta: metav1.ObjectMeta{
  273. Name: evictionName,
  274. Namespace: ns,
  275. },
  276. DeleteOptions: deleteOption,
  277. }
  278. }
  279. func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) {
  280. masterConfig := framework.NewIntegrationTestMasterConfig()
  281. _, s, closeFn := framework.RunAMaster(masterConfig)
  282. config := restclient.Config{Host: s.URL}
  283. clientSet, err := clientset.NewForConfig(&config)
  284. if err != nil {
  285. t.Fatalf("Error in create clientset: %v", err)
  286. }
  287. resyncPeriod := 12 * time.Hour
  288. informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
  289. client := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller"))
  290. discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
  291. mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
  292. scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
  293. scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
  294. if err != nil {
  295. t.Fatalf("Error in create scaleClient: %v", err)
  296. }
  297. rm := disruption.NewDisruptionController(
  298. informers.Core().V1().Pods(),
  299. informers.Policy().V1beta1().PodDisruptionBudgets(),
  300. informers.Core().V1().ReplicationControllers(),
  301. informers.Apps().V1().ReplicaSets(),
  302. informers.Apps().V1().Deployments(),
  303. informers.Apps().V1().StatefulSets(),
  304. client,
  305. mapper,
  306. scaleClient,
  307. )
  308. return s, closeFn, rm, informers, clientSet
  309. }
  310. // wait for the podInformer to observe the pods. Call this function before
  311. // running the RS controller to prevent the rc manager from creating new pods
  312. // rather than adopting the existing ones.
  313. func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) {
  314. if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
  315. objects := podInformer.GetIndexer().List()
  316. if len(objects) != podNum {
  317. return false, nil
  318. }
  319. for _, obj := range objects {
  320. pod := obj.(*v1.Pod)
  321. if pod.Status.Phase != phase {
  322. return false, nil
  323. }
  324. }
  325. return true, nil
  326. }); err != nil {
  327. t.Fatal(err)
  328. }
  329. }
  330. func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
  331. if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
  332. pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(pdbName, metav1.GetOptions{})
  333. if err != nil {
  334. return false, err
  335. }
  336. if pdb.Status.CurrentHealthy != podNum {
  337. return false, nil
  338. }
  339. return true, nil
  340. }); err != nil {
  341. t.Fatal(err)
  342. }
  343. }