taint_manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "context"
  16. "fmt"
  17. "hash/fnv"
  18. "io"
  19. "math"
  20. "sync"
  21. "time"
  22. "k8s.io/api/core/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. "k8s.io/client-go/tools/record"
  31. "k8s.io/client-go/util/workqueue"
  32. "k8s.io/kubernetes/pkg/apis/core/helper"
  33. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  34. "k8s.io/klog"
  35. )
  36. const (
  37. // TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
  38. // the number of workers up making it a parameter of Run() function.
  39. // NodeUpdateChannelSize defines the size of channel for node update events.
  40. NodeUpdateChannelSize = 10
  41. // UpdateWorkerSize defines the size of workers for node update or/and pod update.
  42. UpdateWorkerSize = 8
  43. podUpdateChannelSize = 1
  44. retries = 5
  45. )
  46. type nodeUpdateItem struct {
  47. nodeName string
  48. }
  49. type podUpdateItem struct {
  50. podName string
  51. podNamespace string
  52. nodeName string
  53. }
  54. func hash(val string, max int) int {
  55. hasher := fnv.New32a()
  56. io.WriteString(hasher, val)
  57. return int(hasher.Sum32() % uint32(max))
  58. }
  59. // GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing.
  60. type GetPodFunc func(name, namespace string) (*v1.Pod, error)
  61. // GetNodeFunc returns the node for the specified name, or a NotFound error if missing.
  62. type GetNodeFunc func(name string) (*v1.Node, error)
  63. // GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
  64. type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
  65. // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
  66. // from Nodes tainted with NoExecute Taints.
  67. type NoExecuteTaintManager struct {
  68. client clientset.Interface
  69. recorder record.EventRecorder
  70. getPod GetPodFunc
  71. getNode GetNodeFunc
  72. getPodsAssignedToNode GetPodsByNodeNameFunc
  73. taintEvictionQueue *TimedWorkerQueue
  74. // keeps a map from nodeName to all noExecute taints on that Node
  75. taintedNodesLock sync.Mutex
  76. taintedNodes map[string][]v1.Taint
  77. nodeUpdateChannels []chan nodeUpdateItem
  78. podUpdateChannels []chan podUpdateItem
  79. nodeUpdateQueue workqueue.Interface
  80. podUpdateQueue workqueue.Interface
  81. }
  82. func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
  83. return func(args *WorkArgs) error {
  84. ns := args.NamespacedName.Namespace
  85. name := args.NamespacedName.Name
  86. klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
  87. if emitEventFunc != nil {
  88. emitEventFunc(args.NamespacedName)
  89. }
  90. var err error
  91. for i := 0; i < retries; i++ {
  92. err = c.CoreV1().Pods(ns).Delete(context.TODO(), name, &metav1.DeleteOptions{})
  93. if err == nil {
  94. break
  95. }
  96. time.Sleep(10 * time.Millisecond)
  97. }
  98. return err
  99. }
  100. }
  101. func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
  102. result := []v1.Taint{}
  103. for i := range taints {
  104. if taints[i].Effect == v1.TaintEffectNoExecute {
  105. result = append(result, taints[i])
  106. }
  107. }
  108. return result
  109. }
  110. // getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
  111. func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
  112. minTolerationTime := int64(math.MaxInt64)
  113. if len(tolerations) == 0 {
  114. return 0
  115. }
  116. for i := range tolerations {
  117. if tolerations[i].TolerationSeconds != nil {
  118. tolerationSeconds := *(tolerations[i].TolerationSeconds)
  119. if tolerationSeconds <= 0 {
  120. return 0
  121. } else if tolerationSeconds < minTolerationTime {
  122. minTolerationTime = tolerationSeconds
  123. }
  124. }
  125. }
  126. if minTolerationTime == int64(math.MaxInt64) {
  127. return -1
  128. }
  129. return time.Duration(minTolerationTime) * time.Second
  130. }
  131. // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
  132. // communicate with the API server.
  133. func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
  134. eventBroadcaster := record.NewBroadcaster()
  135. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
  136. eventBroadcaster.StartLogging(klog.Infof)
  137. if c != nil {
  138. klog.V(0).Infof("Sending events to api server.")
  139. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
  140. } else {
  141. klog.Fatalf("kubeClient is nil when starting NodeController")
  142. }
  143. tm := &NoExecuteTaintManager{
  144. client: c,
  145. recorder: recorder,
  146. getPod: getPod,
  147. getNode: getNode,
  148. getPodsAssignedToNode: getPodsAssignedToNode,
  149. taintedNodes: make(map[string][]v1.Taint),
  150. nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
  151. podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
  152. }
  153. tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
  154. return tm
  155. }
  156. // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
  157. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
  158. klog.V(0).Infof("Starting NoExecuteTaintManager")
  159. for i := 0; i < UpdateWorkerSize; i++ {
  160. tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
  161. tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
  162. }
  163. // Functions that are responsible for taking work items out of the workqueues and putting them
  164. // into channels.
  165. go func(stopCh <-chan struct{}) {
  166. for {
  167. item, shutdown := tc.nodeUpdateQueue.Get()
  168. if shutdown {
  169. break
  170. }
  171. nodeUpdate := item.(nodeUpdateItem)
  172. hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
  173. select {
  174. case <-stopCh:
  175. tc.nodeUpdateQueue.Done(item)
  176. return
  177. case tc.nodeUpdateChannels[hash] <- nodeUpdate:
  178. // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
  179. }
  180. }
  181. }(stopCh)
  182. go func(stopCh <-chan struct{}) {
  183. for {
  184. item, shutdown := tc.podUpdateQueue.Get()
  185. if shutdown {
  186. break
  187. }
  188. // The fact that pods are processed by the same worker as nodes is used to avoid races
  189. // between node worker setting tc.taintedNodes and pod worker reading this to decide
  190. // whether to delete pod.
  191. // It's possible that even without this assumption this code is still correct.
  192. podUpdate := item.(podUpdateItem)
  193. hash := hash(podUpdate.nodeName, UpdateWorkerSize)
  194. select {
  195. case <-stopCh:
  196. tc.podUpdateQueue.Done(item)
  197. return
  198. case tc.podUpdateChannels[hash] <- podUpdate:
  199. // tc.podUpdateQueue.Done is called by the podUpdateChannels worker
  200. }
  201. }
  202. }(stopCh)
  203. wg := sync.WaitGroup{}
  204. wg.Add(UpdateWorkerSize)
  205. for i := 0; i < UpdateWorkerSize; i++ {
  206. go tc.worker(i, wg.Done, stopCh)
  207. }
  208. wg.Wait()
  209. }
  210. func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
  211. defer done()
  212. // When processing events we want to prioritize Node updates over Pod updates,
  213. // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
  214. // we don't want user (or system) to wait until PodUpdate queue is drained before it can
  215. // start evicting Pods from tainted Nodes.
  216. for {
  217. select {
  218. case <-stopCh:
  219. return
  220. case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
  221. tc.handleNodeUpdate(nodeUpdate)
  222. tc.nodeUpdateQueue.Done(nodeUpdate)
  223. case podUpdate := <-tc.podUpdateChannels[worker]:
  224. // If we found a Pod update we need to empty Node queue first.
  225. priority:
  226. for {
  227. select {
  228. case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
  229. tc.handleNodeUpdate(nodeUpdate)
  230. tc.nodeUpdateQueue.Done(nodeUpdate)
  231. default:
  232. break priority
  233. }
  234. }
  235. // After Node queue is emptied we process podUpdate.
  236. tc.handlePodUpdate(podUpdate)
  237. tc.podUpdateQueue.Done(podUpdate)
  238. }
  239. }
  240. }
  241. // PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
  242. func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
  243. podName := ""
  244. podNamespace := ""
  245. nodeName := ""
  246. oldTolerations := []v1.Toleration{}
  247. if oldPod != nil {
  248. podName = oldPod.Name
  249. podNamespace = oldPod.Namespace
  250. nodeName = oldPod.Spec.NodeName
  251. oldTolerations = oldPod.Spec.Tolerations
  252. }
  253. newTolerations := []v1.Toleration{}
  254. if newPod != nil {
  255. podName = newPod.Name
  256. podNamespace = newPod.Namespace
  257. nodeName = newPod.Spec.NodeName
  258. newTolerations = newPod.Spec.Tolerations
  259. }
  260. if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
  261. return
  262. }
  263. updateItem := podUpdateItem{
  264. podName: podName,
  265. podNamespace: podNamespace,
  266. nodeName: nodeName,
  267. }
  268. tc.podUpdateQueue.Add(updateItem)
  269. }
  270. // NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
  271. func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
  272. nodeName := ""
  273. oldTaints := []v1.Taint{}
  274. if oldNode != nil {
  275. nodeName = oldNode.Name
  276. oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
  277. }
  278. newTaints := []v1.Taint{}
  279. if newNode != nil {
  280. nodeName = newNode.Name
  281. newTaints = getNoExecuteTaints(newNode.Spec.Taints)
  282. }
  283. if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
  284. return
  285. }
  286. updateItem := nodeUpdateItem{
  287. nodeName: nodeName,
  288. }
  289. tc.nodeUpdateQueue.Add(updateItem)
  290. }
  291. func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) {
  292. if tc.taintEvictionQueue.CancelWork(nsName.String()) {
  293. tc.emitCancelPodDeletionEvent(nsName)
  294. }
  295. }
  296. func (tc *NoExecuteTaintManager) processPodOnNode(
  297. podNamespacedName types.NamespacedName,
  298. nodeName string,
  299. tolerations []v1.Toleration,
  300. taints []v1.Taint,
  301. now time.Time,
  302. ) {
  303. if len(taints) == 0 {
  304. tc.cancelWorkWithEvent(podNamespacedName)
  305. }
  306. allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
  307. if !allTolerated {
  308. klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
  309. // We're canceling scheduled work (if any), as we're going to delete the Pod right away.
  310. tc.cancelWorkWithEvent(podNamespacedName)
  311. tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
  312. return
  313. }
  314. minTolerationTime := getMinTolerationTime(usedTolerations)
  315. // getMinTolerationTime returns negative value to denote infinite toleration.
  316. if minTolerationTime < 0 {
  317. klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
  318. return
  319. }
  320. startTime := now
  321. triggerTime := startTime.Add(minTolerationTime)
  322. scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
  323. if scheduledEviction != nil {
  324. startTime = scheduledEviction.CreatedAt
  325. if startTime.Add(minTolerationTime).Before(triggerTime) {
  326. return
  327. }
  328. tc.cancelWorkWithEvent(podNamespacedName)
  329. }
  330. tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
  331. }
  332. func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
  333. pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
  334. if err != nil {
  335. if apierrors.IsNotFound(err) {
  336. // Delete
  337. podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
  338. klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
  339. tc.cancelWorkWithEvent(podNamespacedName)
  340. return
  341. }
  342. utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
  343. return
  344. }
  345. // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
  346. if pod.Spec.NodeName != podUpdate.nodeName {
  347. return
  348. }
  349. // Create or Update
  350. podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
  351. klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
  352. nodeName := pod.Spec.NodeName
  353. if nodeName == "" {
  354. return
  355. }
  356. taints, ok := func() ([]v1.Taint, bool) {
  357. tc.taintedNodesLock.Lock()
  358. defer tc.taintedNodesLock.Unlock()
  359. taints, ok := tc.taintedNodes[nodeName]
  360. return taints, ok
  361. }()
  362. // It's possible that Node was deleted, or Taints were removed before, which triggered
  363. // eviction cancelling if it was needed.
  364. if !ok {
  365. return
  366. }
  367. tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
  368. }
  369. func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
  370. node, err := tc.getNode(nodeUpdate.nodeName)
  371. if err != nil {
  372. if apierrors.IsNotFound(err) {
  373. // Delete
  374. klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
  375. tc.taintedNodesLock.Lock()
  376. defer tc.taintedNodesLock.Unlock()
  377. delete(tc.taintedNodes, nodeUpdate.nodeName)
  378. return
  379. }
  380. utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
  381. return
  382. }
  383. // Create or Update
  384. klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
  385. taints := getNoExecuteTaints(node.Spec.Taints)
  386. func() {
  387. tc.taintedNodesLock.Lock()
  388. defer tc.taintedNodesLock.Unlock()
  389. klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
  390. if len(taints) == 0 {
  391. delete(tc.taintedNodes, node.Name)
  392. } else {
  393. tc.taintedNodes[node.Name] = taints
  394. }
  395. }()
  396. // This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
  397. // getPodsAssignedToNode can be delayed as long as all future updates to pods will call
  398. // tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
  399. pods, err := tc.getPodsAssignedToNode(node.Name)
  400. if err != nil {
  401. klog.Errorf(err.Error())
  402. return
  403. }
  404. if len(pods) == 0 {
  405. return
  406. }
  407. // Short circuit, to make this controller a bit faster.
  408. if len(taints) == 0 {
  409. klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
  410. for i := range pods {
  411. tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
  412. }
  413. return
  414. }
  415. now := time.Now()
  416. for _, pod := range pods {
  417. podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
  418. tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
  419. }
  420. }
  421. func (tc *NoExecuteTaintManager) emitPodDeletionEvent(nsName types.NamespacedName) {
  422. if tc.recorder == nil {
  423. return
  424. }
  425. ref := &v1.ObjectReference{
  426. Kind: "Pod",
  427. Name: nsName.Name,
  428. Namespace: nsName.Namespace,
  429. }
  430. tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
  431. }
  432. func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
  433. if tc.recorder == nil {
  434. return
  435. }
  436. ref := &v1.ObjectReference{
  437. Kind: "Pod",
  438. Name: nsName.Name,
  439. Namespace: nsName.Namespace,
  440. }
  441. tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
  442. }