gc_controller.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package podgc
  14. import (
  15. "context"
  16. "sort"
  17. "sync"
  18. "time"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/labels"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. coreinformers "k8s.io/client-go/informers/core/v1"
  27. clientset "k8s.io/client-go/kubernetes"
  28. corelisters "k8s.io/client-go/listers/core/v1"
  29. "k8s.io/client-go/tools/cache"
  30. "k8s.io/client-go/util/workqueue"
  31. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  32. "k8s.io/klog"
  33. )
  34. const (
  35. // gcCheckPeriod defines frequency of running main controller loop
  36. gcCheckPeriod = 20 * time.Second
  37. // quarantineTime defines how long Orphaned GC waits for nodes to show up
  38. // in an informer before issuing a GET call to check if they are truly gone
  39. quarantineTime = 40 * time.Second
  40. )
  41. type PodGCController struct {
  42. kubeClient clientset.Interface
  43. podLister corelisters.PodLister
  44. podListerSynced cache.InformerSynced
  45. nodeLister corelisters.NodeLister
  46. nodeListerSynced cache.InformerSynced
  47. nodeQueue workqueue.DelayingInterface
  48. deletePod func(namespace, name string) error
  49. terminatedPodThreshold int
  50. }
  51. func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
  52. nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController {
  53. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  54. ratelimiter.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
  55. }
  56. gcc := &PodGCController{
  57. kubeClient: kubeClient,
  58. terminatedPodThreshold: terminatedPodThreshold,
  59. podLister: podInformer.Lister(),
  60. podListerSynced: podInformer.Informer().HasSynced,
  61. nodeLister: nodeInformer.Lister(),
  62. nodeListerSynced: nodeInformer.Informer().HasSynced,
  63. nodeQueue: workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"),
  64. deletePod: func(namespace, name string) error {
  65. klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name)
  66. return kubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.NewDeleteOptions(0))
  67. },
  68. }
  69. return gcc
  70. }
  71. func (gcc *PodGCController) Run(stop <-chan struct{}) {
  72. defer utilruntime.HandleCrash()
  73. klog.Infof("Starting GC controller")
  74. defer gcc.nodeQueue.ShutDown()
  75. defer klog.Infof("Shutting down GC controller")
  76. if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced, gcc.nodeListerSynced) {
  77. return
  78. }
  79. go wait.Until(gcc.gc, gcCheckPeriod, stop)
  80. <-stop
  81. }
  82. func (gcc *PodGCController) gc() {
  83. pods, err := gcc.podLister.List(labels.Everything())
  84. if err != nil {
  85. klog.Errorf("Error while listing all pods: %v", err)
  86. return
  87. }
  88. nodes, err := gcc.nodeLister.List(labels.Everything())
  89. if err != nil {
  90. klog.Errorf("Error while listing all nodes: %v", err)
  91. return
  92. }
  93. if gcc.terminatedPodThreshold > 0 {
  94. gcc.gcTerminated(pods)
  95. }
  96. gcc.gcOrphaned(pods, nodes)
  97. gcc.gcUnscheduledTerminating(pods)
  98. }
  99. func isPodTerminated(pod *v1.Pod) bool {
  100. if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
  101. return true
  102. }
  103. return false
  104. }
  105. func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
  106. terminatedPods := []*v1.Pod{}
  107. for _, pod := range pods {
  108. if isPodTerminated(pod) {
  109. terminatedPods = append(terminatedPods, pod)
  110. }
  111. }
  112. terminatedPodCount := len(terminatedPods)
  113. deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
  114. if deleteCount > terminatedPodCount {
  115. deleteCount = terminatedPodCount
  116. }
  117. if deleteCount <= 0 {
  118. return
  119. }
  120. klog.Infof("garbage collecting %v pods", deleteCount)
  121. // sort only when necessary
  122. sort.Sort(byCreationTimestamp(terminatedPods))
  123. var wait sync.WaitGroup
  124. for i := 0; i < deleteCount; i++ {
  125. wait.Add(1)
  126. go func(namespace string, name string) {
  127. defer wait.Done()
  128. if err := gcc.deletePod(namespace, name); err != nil {
  129. // ignore not founds
  130. defer utilruntime.HandleError(err)
  131. }
  132. }(terminatedPods[i].Namespace, terminatedPods[i].Name)
  133. }
  134. wait.Wait()
  135. }
  136. // gcOrphaned deletes pods that are bound to nodes that don't exist.
  137. func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod, nodes []*v1.Node) {
  138. klog.V(4).Infof("GC'ing orphaned")
  139. existingNodeNames := sets.NewString()
  140. for _, node := range nodes {
  141. existingNodeNames.Insert(node.Name)
  142. }
  143. // Add newly found unknown nodes to quarantine
  144. for _, pod := range pods {
  145. if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) {
  146. gcc.nodeQueue.AddAfter(pod.Spec.NodeName, quarantineTime)
  147. }
  148. }
  149. // Check if nodes are still missing after quarantine period
  150. deletedNodesNames, quit := gcc.discoverDeletedNodes(existingNodeNames)
  151. if quit {
  152. return
  153. }
  154. // Delete orphaned pods
  155. for _, pod := range pods {
  156. if !deletedNodesNames.Has(pod.Spec.NodeName) {
  157. continue
  158. }
  159. klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName)
  160. if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
  161. utilruntime.HandleError(err)
  162. } else {
  163. klog.V(0).Infof("Forced deletion of orphaned Pod %v/%v succeeded", pod.Namespace, pod.Name)
  164. }
  165. }
  166. }
  167. func (gcc *PodGCController) discoverDeletedNodes(existingNodeNames sets.String) (sets.String, bool) {
  168. deletedNodesNames := sets.NewString()
  169. for gcc.nodeQueue.Len() > 0 {
  170. item, quit := gcc.nodeQueue.Get()
  171. if quit {
  172. return nil, true
  173. }
  174. nodeName := item.(string)
  175. if !existingNodeNames.Has(nodeName) {
  176. exists, err := gcc.checkIfNodeExists(nodeName)
  177. switch {
  178. case err != nil:
  179. klog.Errorf("Error while getting node %q: %v", nodeName, err)
  180. // Node will be added back to the queue in the subsequent loop if still needed
  181. case !exists:
  182. deletedNodesNames.Insert(nodeName)
  183. }
  184. }
  185. gcc.nodeQueue.Done(item)
  186. }
  187. return deletedNodesNames, false
  188. }
  189. func (gcc *PodGCController) checkIfNodeExists(name string) (bool, error) {
  190. _, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
  191. if errors.IsNotFound(fetchErr) {
  192. return false, nil
  193. }
  194. return fetchErr == nil, fetchErr
  195. }
  196. // gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
  197. func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
  198. klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")
  199. for _, pod := range pods {
  200. if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
  201. continue
  202. }
  203. klog.V(2).Infof("Found unscheduled terminating Pod %v/%v not assigned to any Node. Deleting.", pod.Namespace, pod.Name)
  204. if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
  205. utilruntime.HandleError(err)
  206. } else {
  207. klog.V(0).Infof("Forced deletion of unscheduled terminating Pod %v/%v succeeded", pod.Namespace, pod.Name)
  208. }
  209. }
  210. }
  211. // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
  212. type byCreationTimestamp []*v1.Pod
  213. func (o byCreationTimestamp) Len() int { return len(o) }
  214. func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  215. func (o byCreationTimestamp) Less(i, j int) bool {
  216. if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
  217. return o[i].Name < o[j].Name
  218. }
  219. return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
  220. }