taint_manager.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "fmt"
  16. "hash/fnv"
  17. "io"
  18. "sync"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/fields"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/types"
  26. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. "k8s.io/client-go/tools/record"
  31. "k8s.io/client-go/util/workqueue"
  32. "k8s.io/kubernetes/pkg/apis/core/helper"
  33. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  34. "k8s.io/klog"
  35. )
  36. const (
  37. // TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
  38. // the number of workers up making it a paramater of Run() function.
  39. // NodeUpdateChannelSize defines the size of channel for node update events.
  40. NodeUpdateChannelSize = 10
  41. // UpdateWorkerSize defines the size of workers for node update or/and pod update.
  42. UpdateWorkerSize = 8
  43. podUpdateChannelSize = 1
  44. retries = 5
  45. )
  46. type nodeUpdateItem struct {
  47. nodeName string
  48. }
  49. type podUpdateItem struct {
  50. podName string
  51. podNamespace string
  52. nodeName string
  53. }
  54. func hash(val string, max int) int {
  55. hasher := fnv.New32a()
  56. io.WriteString(hasher, val)
  57. return int(hasher.Sum32() % uint32(max))
  58. }
  59. // GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing.
  60. type GetPodFunc func(name, namespace string) (*v1.Pod, error)
  61. // GetNodeFunc returns the node for the specified name, or a NotFound error if missing.
  62. type GetNodeFunc func(name string) (*v1.Node, error)
  63. // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
  64. // from Nodes tainted with NoExecute Taints.
  65. type NoExecuteTaintManager struct {
  66. client clientset.Interface
  67. recorder record.EventRecorder
  68. getPod GetPodFunc
  69. getNode GetNodeFunc
  70. taintEvictionQueue *TimedWorkerQueue
  71. // keeps a map from nodeName to all noExecute taints on that Node
  72. taintedNodesLock sync.Mutex
  73. taintedNodes map[string][]v1.Taint
  74. nodeUpdateChannels []chan nodeUpdateItem
  75. podUpdateChannels []chan podUpdateItem
  76. nodeUpdateQueue workqueue.Interface
  77. podUpdateQueue workqueue.Interface
  78. }
  79. func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
  80. return func(args *WorkArgs) error {
  81. ns := args.NamespacedName.Namespace
  82. name := args.NamespacedName.Name
  83. klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
  84. if emitEventFunc != nil {
  85. emitEventFunc(args.NamespacedName)
  86. }
  87. var err error
  88. for i := 0; i < retries; i++ {
  89. err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
  90. if err == nil {
  91. break
  92. }
  93. time.Sleep(10 * time.Millisecond)
  94. }
  95. return err
  96. }
  97. }
  98. func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
  99. result := []v1.Taint{}
  100. for i := range taints {
  101. if taints[i].Effect == v1.TaintEffectNoExecute {
  102. result = append(result, taints[i])
  103. }
  104. }
  105. return result
  106. }
  107. func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, error) {
  108. selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
  109. pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
  110. FieldSelector: selector.String(),
  111. LabelSelector: labels.Everything().String(),
  112. })
  113. for i := 0; i < retries && err != nil; i++ {
  114. pods, err = c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
  115. FieldSelector: selector.String(),
  116. LabelSelector: labels.Everything().String(),
  117. })
  118. time.Sleep(100 * time.Millisecond)
  119. }
  120. if err != nil {
  121. return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
  122. }
  123. return pods.Items, nil
  124. }
  125. // getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
  126. func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
  127. minTolerationTime := int64(-1)
  128. if len(tolerations) == 0 {
  129. return 0
  130. }
  131. for i := range tolerations {
  132. if tolerations[i].TolerationSeconds != nil {
  133. tolerationSeconds := *(tolerations[i].TolerationSeconds)
  134. if tolerationSeconds <= 0 {
  135. return 0
  136. } else if tolerationSeconds < minTolerationTime || minTolerationTime == -1 {
  137. minTolerationTime = tolerationSeconds
  138. }
  139. }
  140. }
  141. return time.Duration(minTolerationTime) * time.Second
  142. }
  143. // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
  144. // communicate with the API server.
  145. func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager {
  146. eventBroadcaster := record.NewBroadcaster()
  147. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
  148. eventBroadcaster.StartLogging(klog.Infof)
  149. if c != nil {
  150. klog.V(0).Infof("Sending events to api server.")
  151. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
  152. } else {
  153. klog.Fatalf("kubeClient is nil when starting NodeController")
  154. }
  155. tm := &NoExecuteTaintManager{
  156. client: c,
  157. recorder: recorder,
  158. getPod: getPod,
  159. getNode: getNode,
  160. taintedNodes: make(map[string][]v1.Taint),
  161. nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
  162. podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
  163. }
  164. tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
  165. return tm
  166. }
  167. // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
  168. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
  169. klog.V(0).Infof("Starting NoExecuteTaintManager")
  170. for i := 0; i < UpdateWorkerSize; i++ {
  171. tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
  172. tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
  173. }
  174. // Functions that are responsible for taking work items out of the workqueues and putting them
  175. // into channels.
  176. go func(stopCh <-chan struct{}) {
  177. for {
  178. item, shutdown := tc.nodeUpdateQueue.Get()
  179. if shutdown {
  180. break
  181. }
  182. nodeUpdate := item.(nodeUpdateItem)
  183. hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
  184. select {
  185. case <-stopCh:
  186. tc.nodeUpdateQueue.Done(item)
  187. return
  188. case tc.nodeUpdateChannels[hash] <- nodeUpdate:
  189. // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
  190. }
  191. }
  192. }(stopCh)
  193. go func(stopCh <-chan struct{}) {
  194. for {
  195. item, shutdown := tc.podUpdateQueue.Get()
  196. if shutdown {
  197. break
  198. }
  199. podUpdate := item.(podUpdateItem)
  200. hash := hash(podUpdate.nodeName, UpdateWorkerSize)
  201. select {
  202. case <-stopCh:
  203. tc.podUpdateQueue.Done(item)
  204. return
  205. case tc.podUpdateChannels[hash] <- podUpdate:
  206. // tc.podUpdateQueue.Done is called by the podUpdateChannels worker
  207. }
  208. }
  209. }(stopCh)
  210. wg := sync.WaitGroup{}
  211. wg.Add(UpdateWorkerSize)
  212. for i := 0; i < UpdateWorkerSize; i++ {
  213. go tc.worker(i, wg.Done, stopCh)
  214. }
  215. wg.Wait()
  216. }
  217. func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
  218. defer done()
  219. // When processing events we want to prioritize Node updates over Pod updates,
  220. // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
  221. // we don't want user (or system) to wait until PodUpdate queue is drained before it can
  222. // start evicting Pods from tainted Nodes.
  223. for {
  224. select {
  225. case <-stopCh:
  226. return
  227. case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
  228. tc.handleNodeUpdate(nodeUpdate)
  229. tc.nodeUpdateQueue.Done(nodeUpdate)
  230. case podUpdate := <-tc.podUpdateChannels[worker]:
  231. // If we found a Pod update we need to empty Node queue first.
  232. priority:
  233. for {
  234. select {
  235. case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
  236. tc.handleNodeUpdate(nodeUpdate)
  237. tc.nodeUpdateQueue.Done(nodeUpdate)
  238. default:
  239. break priority
  240. }
  241. }
  242. // After Node queue is emptied we process podUpdate.
  243. tc.handlePodUpdate(podUpdate)
  244. tc.podUpdateQueue.Done(podUpdate)
  245. }
  246. }
  247. }
  248. // PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
  249. func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
  250. podName := ""
  251. podNamespace := ""
  252. nodeName := ""
  253. oldTolerations := []v1.Toleration{}
  254. if oldPod != nil {
  255. podName = oldPod.Name
  256. podNamespace = oldPod.Namespace
  257. nodeName = oldPod.Spec.NodeName
  258. oldTolerations = oldPod.Spec.Tolerations
  259. }
  260. newTolerations := []v1.Toleration{}
  261. if newPod != nil {
  262. podName = newPod.Name
  263. podNamespace = newPod.Namespace
  264. nodeName = newPod.Spec.NodeName
  265. newTolerations = newPod.Spec.Tolerations
  266. }
  267. if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
  268. return
  269. }
  270. updateItem := podUpdateItem{
  271. podName: podName,
  272. podNamespace: podNamespace,
  273. nodeName: nodeName,
  274. }
  275. tc.podUpdateQueue.Add(updateItem)
  276. }
  277. // NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
  278. func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
  279. nodeName := ""
  280. oldTaints := []v1.Taint{}
  281. if oldNode != nil {
  282. nodeName = oldNode.Name
  283. oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
  284. }
  285. newTaints := []v1.Taint{}
  286. if newNode != nil {
  287. nodeName = newNode.Name
  288. newTaints = getNoExecuteTaints(newNode.Spec.Taints)
  289. }
  290. if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
  291. return
  292. }
  293. updateItem := nodeUpdateItem{
  294. nodeName: nodeName,
  295. }
  296. tc.nodeUpdateQueue.Add(updateItem)
  297. }
  298. func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) {
  299. if tc.taintEvictionQueue.CancelWork(nsName.String()) {
  300. tc.emitCancelPodDeletionEvent(nsName)
  301. }
  302. }
  303. func (tc *NoExecuteTaintManager) processPodOnNode(
  304. podNamespacedName types.NamespacedName,
  305. nodeName string,
  306. tolerations []v1.Toleration,
  307. taints []v1.Taint,
  308. now time.Time,
  309. ) {
  310. if len(taints) == 0 {
  311. tc.cancelWorkWithEvent(podNamespacedName)
  312. }
  313. allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
  314. if !allTolerated {
  315. klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
  316. // We're canceling scheduled work (if any), as we're going to delete the Pod right away.
  317. tc.cancelWorkWithEvent(podNamespacedName)
  318. tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
  319. return
  320. }
  321. minTolerationTime := getMinTolerationTime(usedTolerations)
  322. // getMinTolerationTime returns negative value to denote infinite toleration.
  323. if minTolerationTime < 0 {
  324. klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
  325. return
  326. }
  327. startTime := now
  328. triggerTime := startTime.Add(minTolerationTime)
  329. scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
  330. if scheduledEviction != nil {
  331. startTime = scheduledEviction.CreatedAt
  332. if startTime.Add(minTolerationTime).Before(triggerTime) {
  333. return
  334. }
  335. tc.cancelWorkWithEvent(podNamespacedName)
  336. }
  337. tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
  338. }
  339. func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
  340. pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
  341. if err != nil {
  342. if apierrors.IsNotFound(err) {
  343. // Delete
  344. podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
  345. klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
  346. tc.cancelWorkWithEvent(podNamespacedName)
  347. return
  348. }
  349. utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
  350. return
  351. }
  352. // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
  353. if pod.Spec.NodeName != podUpdate.nodeName {
  354. return
  355. }
  356. // Create or Update
  357. podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
  358. klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
  359. nodeName := pod.Spec.NodeName
  360. if nodeName == "" {
  361. return
  362. }
  363. taints, ok := func() ([]v1.Taint, bool) {
  364. tc.taintedNodesLock.Lock()
  365. defer tc.taintedNodesLock.Unlock()
  366. taints, ok := tc.taintedNodes[nodeName]
  367. return taints, ok
  368. }()
  369. // It's possible that Node was deleted, or Taints were removed before, which triggered
  370. // eviction cancelling if it was needed.
  371. if !ok {
  372. return
  373. }
  374. tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
  375. }
  376. func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
  377. node, err := tc.getNode(nodeUpdate.nodeName)
  378. if err != nil {
  379. if apierrors.IsNotFound(err) {
  380. // Delete
  381. klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
  382. tc.taintedNodesLock.Lock()
  383. defer tc.taintedNodesLock.Unlock()
  384. delete(tc.taintedNodes, nodeUpdate.nodeName)
  385. return
  386. }
  387. utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
  388. return
  389. }
  390. // Create or Update
  391. klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
  392. taints := getNoExecuteTaints(node.Spec.Taints)
  393. func() {
  394. tc.taintedNodesLock.Lock()
  395. defer tc.taintedNodesLock.Unlock()
  396. klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
  397. if len(taints) == 0 {
  398. delete(tc.taintedNodes, node.Name)
  399. } else {
  400. tc.taintedNodes[node.Name] = taints
  401. }
  402. }()
  403. pods, err := getPodsAssignedToNode(tc.client, node.Name)
  404. if err != nil {
  405. klog.Errorf(err.Error())
  406. return
  407. }
  408. if len(pods) == 0 {
  409. return
  410. }
  411. // Short circuit, to make this controller a bit faster.
  412. if len(taints) == 0 {
  413. klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
  414. for i := range pods {
  415. tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
  416. }
  417. return
  418. }
  419. now := time.Now()
  420. for i := range pods {
  421. pod := &pods[i]
  422. podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
  423. tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
  424. }
  425. }
  426. func (tc *NoExecuteTaintManager) emitPodDeletionEvent(nsName types.NamespacedName) {
  427. if tc.recorder == nil {
  428. return
  429. }
  430. ref := &v1.ObjectReference{
  431. Kind: "Pod",
  432. Name: nsName.Name,
  433. Namespace: nsName.Namespace,
  434. }
  435. tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
  436. }
  437. func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
  438. if tc.recorder == nil {
  439. return
  440. }
  441. ref := &v1.ObjectReference{
  442. Kind: "Pod",
  443. Name: nsName.Name,
  444. Namespace: nsName.Namespace,
  445. }
  446. tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
  447. }