scheduling_queue.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This file contains structures that implement scheduling queue types.
  14. // Scheduling queues hold pods waiting to be scheduled. This file implements a/
  15. // priority queue which has two sub queues. One sub-queue holds pods that are
  16. // being considered for scheduling. This is called activeQ. Another queue holds
  17. // pods that are already tried and are determined to be unschedulable. The latter
  18. // is called unschedulableQ.
  19. package queue
  20. import (
  21. "fmt"
  22. "reflect"
  23. "sync"
  24. "time"
  25. "k8s.io/klog"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. ktypes "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/tools/cache"
  31. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  32. "k8s.io/kubernetes/pkg/scheduler/internal/heap"
  33. "k8s.io/kubernetes/pkg/scheduler/metrics"
  34. "k8s.io/kubernetes/pkg/scheduler/util"
  35. )
  36. const (
  37. // If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
  38. // the pod will be moved from unschedulableQ to activeQ.
  39. unschedulableQTimeInterval = 60 * time.Second
  40. queueClosed = "scheduling queue is closed"
  41. )
  42. const (
  43. // DefaultPodInitialBackoffDuration is the default value for the initial backoff duration
  44. // for unschedulable pods. To change the default podInitialBackoffDurationSeconds used by the
  45. // scheduler, update the ComponentConfig value in defaults.go
  46. DefaultPodInitialBackoffDuration time.Duration = 1 * time.Second
  47. // DefaultPodMaxBackoffDuration is the default value for the max backoff duration
  48. // for unschedulable pods. To change the default podMaxBackoffDurationSeconds used by the
  49. // scheduler, update the ComponentConfig value in defaults.go
  50. DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second
  51. )
  52. // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
  53. // The interface follows a pattern similar to cache.FIFO and cache.Heap and
  54. // makes it easy to use those data structures as a SchedulingQueue.
  55. type SchedulingQueue interface {
  56. Add(pod *v1.Pod) error
  57. // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
  58. // The podSchedulingCycle represents the current scheduling cycle number which can be
  59. // returned by calling SchedulingCycle().
  60. AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error
  61. // SchedulingCycle returns the current number of scheduling cycle which is
  62. // cached by scheduling queue. Normally, incrementing this number whenever
  63. // a pod is popped (e.g. called Pop()) is enough.
  64. SchedulingCycle() int64
  65. // Pop removes the head of the queue and returns it. It blocks if the
  66. // queue is empty and waits until a new item is added to the queue.
  67. Pop() (*framework.PodInfo, error)
  68. Update(oldPod, newPod *v1.Pod) error
  69. Delete(pod *v1.Pod) error
  70. MoveAllToActiveOrBackoffQueue(event string)
  71. AssignedPodAdded(pod *v1.Pod)
  72. AssignedPodUpdated(pod *v1.Pod)
  73. NominatedPodsForNode(nodeName string) []*v1.Pod
  74. PendingPods() []*v1.Pod
  75. // Close closes the SchedulingQueue so that the goroutine which is
  76. // waiting to pop items can exit gracefully.
  77. Close()
  78. // UpdateNominatedPodForNode adds the given pod to the nominated pod map or
  79. // updates it if it already exists.
  80. UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
  81. // DeleteNominatedPodIfExists deletes nominatedPod from internal cache
  82. DeleteNominatedPodIfExists(pod *v1.Pod)
  83. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  84. NumUnschedulablePods() int
  85. // Run starts the goroutines managing the queue.
  86. Run()
  87. }
  88. // NewSchedulingQueue initializes a priority queue as a new scheduling queue.
  89. func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
  90. return NewPriorityQueue(fwk, opts...)
  91. }
  92. // NominatedNodeName returns nominated node name of a Pod.
  93. func NominatedNodeName(pod *v1.Pod) string {
  94. return pod.Status.NominatedNodeName
  95. }
  96. // PriorityQueue implements a scheduling queue.
  97. // The head of PriorityQueue is the highest priority pending pod. This structure
  98. // has three sub queues. One sub-queue holds pods that are being considered for
  99. // scheduling. This is called activeQ and is a Heap. Another queue holds
  100. // pods that are already tried and are determined to be unschedulable. The latter
  101. // is called unschedulableQ. The third queue holds pods that are moved from
  102. // unschedulable queues and will be moved to active queue when backoff are completed.
  103. type PriorityQueue struct {
  104. stop chan struct{}
  105. clock util.Clock
  106. // pod initial backoff duration.
  107. podInitialBackoffDuration time.Duration
  108. // pod maximum backoff duration.
  109. podMaxBackoffDuration time.Duration
  110. lock sync.RWMutex
  111. cond sync.Cond
  112. // activeQ is heap structure that scheduler actively looks at to find pods to
  113. // schedule. Head of heap is the highest priority pod.
  114. activeQ *heap.Heap
  115. // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
  116. // are popped from this heap before the scheduler looks at activeQ
  117. podBackoffQ *heap.Heap
  118. // unschedulableQ holds pods that have been tried and determined unschedulable.
  119. unschedulableQ *UnschedulablePodsMap
  120. // nominatedPods is a structures that stores pods which are nominated to run
  121. // on nodes.
  122. nominatedPods *nominatedPodMap
  123. // schedulingCycle represents sequence number of scheduling cycle and is incremented
  124. // when a pod is popped.
  125. schedulingCycle int64
  126. // moveRequestCycle caches the sequence number of scheduling cycle when we
  127. // received a move request. Unscheduable pods in and before this scheduling
  128. // cycle will be put back to activeQueue if we were trying to schedule them
  129. // when we received move request.
  130. moveRequestCycle int64
  131. // closed indicates that the queue is closed.
  132. // It is mainly used to let Pop() exit its control loop while waiting for an item.
  133. closed bool
  134. }
  135. type priorityQueueOptions struct {
  136. clock util.Clock
  137. podInitialBackoffDuration time.Duration
  138. podMaxBackoffDuration time.Duration
  139. }
  140. // Option configures a PriorityQueue
  141. type Option func(*priorityQueueOptions)
  142. // WithClock sets clock for PriorityQueue, the default clock is util.RealClock.
  143. func WithClock(clock util.Clock) Option {
  144. return func(o *priorityQueueOptions) {
  145. o.clock = clock
  146. }
  147. }
  148. // WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue,
  149. func WithPodInitialBackoffDuration(duration time.Duration) Option {
  150. return func(o *priorityQueueOptions) {
  151. o.podInitialBackoffDuration = duration
  152. }
  153. }
  154. // WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue,
  155. func WithPodMaxBackoffDuration(duration time.Duration) Option {
  156. return func(o *priorityQueueOptions) {
  157. o.podMaxBackoffDuration = duration
  158. }
  159. }
  160. var defaultPriorityQueueOptions = priorityQueueOptions{
  161. clock: util.RealClock{},
  162. podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
  163. podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
  164. }
  165. // Making sure that PriorityQueue implements SchedulingQueue.
  166. var _ SchedulingQueue = &PriorityQueue{}
  167. // newPodInfoNoTimestamp builds a PodInfo object without timestamp.
  168. func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
  169. return &framework.PodInfo{
  170. Pod: pod,
  171. }
  172. }
  173. // NewPriorityQueue creates a PriorityQueue object.
  174. func NewPriorityQueue(
  175. fwk framework.Framework,
  176. opts ...Option,
  177. ) *PriorityQueue {
  178. options := defaultPriorityQueueOptions
  179. for _, opt := range opts {
  180. opt(&options)
  181. }
  182. comp := func(podInfo1, podInfo2 interface{}) bool {
  183. pInfo1 := podInfo1.(*framework.PodInfo)
  184. pInfo2 := podInfo2.(*framework.PodInfo)
  185. return fwk.QueueSortFunc()(pInfo1, pInfo2)
  186. }
  187. pq := &PriorityQueue{
  188. clock: options.clock,
  189. stop: make(chan struct{}),
  190. podInitialBackoffDuration: options.podInitialBackoffDuration,
  191. podMaxBackoffDuration: options.podMaxBackoffDuration,
  192. activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
  193. unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
  194. nominatedPods: newNominatedPodMap(),
  195. moveRequestCycle: -1,
  196. }
  197. pq.cond.L = &pq.lock
  198. pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
  199. return pq
  200. }
  201. // Run starts the goroutine to pump from podBackoffQ to activeQ
  202. func (p *PriorityQueue) Run() {
  203. go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  204. go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
  205. }
  206. // Add adds a pod to the active queue. It should be called only when a new pod
  207. // is added so there is no chance the pod is already in active/unschedulable/backoff queues
  208. func (p *PriorityQueue) Add(pod *v1.Pod) error {
  209. p.lock.Lock()
  210. defer p.lock.Unlock()
  211. pInfo := p.newPodInfo(pod)
  212. if err := p.activeQ.Add(pInfo); err != nil {
  213. klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
  214. return err
  215. }
  216. if p.unschedulableQ.get(pod) != nil {
  217. klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
  218. p.unschedulableQ.delete(pod)
  219. }
  220. // Delete pod from backoffQ if it is backing off
  221. if err := p.podBackoffQ.Delete(pInfo); err == nil {
  222. klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
  223. }
  224. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
  225. p.nominatedPods.add(pod, "")
  226. p.cond.Broadcast()
  227. return nil
  228. }
  229. // nsNameForPod returns a namespacedname for a pod
  230. func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
  231. return ktypes.NamespacedName{
  232. Namespace: pod.Namespace,
  233. Name: pod.Name,
  234. }
  235. }
  236. // isPodBackingoff returns true if a pod is still waiting for its backoff timer.
  237. // If this returns true, the pod should not be re-tried.
  238. func (p *PriorityQueue) isPodBackingoff(podInfo *framework.PodInfo) bool {
  239. boTime := p.getBackoffTime(podInfo)
  240. return boTime.After(p.clock.Now())
  241. }
  242. // SchedulingCycle returns current scheduling cycle.
  243. func (p *PriorityQueue) SchedulingCycle() int64 {
  244. p.lock.RLock()
  245. defer p.lock.RUnlock()
  246. return p.schedulingCycle
  247. }
  248. // AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
  249. // the queue, unless it is already in the queue. Normally, PriorityQueue puts
  250. // unschedulable pods in `unschedulableQ`. But if there has been a recent move
  251. // request, then the pod is put in `podBackoffQ`.
  252. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error {
  253. p.lock.Lock()
  254. defer p.lock.Unlock()
  255. pod := pInfo.Pod
  256. if p.unschedulableQ.get(pod) != nil {
  257. return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod))
  258. }
  259. // Refresh the timestamp since the pod is re-added.
  260. pInfo.Timestamp = p.clock.Now()
  261. if _, exists, _ := p.activeQ.Get(pInfo); exists {
  262. return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod))
  263. }
  264. if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
  265. return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
  266. }
  267. // If a move request has been received, move it to the BackoffQ, otherwise move
  268. // it to unschedulableQ.
  269. if p.moveRequestCycle >= podSchedulingCycle {
  270. if err := p.podBackoffQ.Add(pInfo); err != nil {
  271. return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
  272. }
  273. metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
  274. } else {
  275. p.unschedulableQ.addOrUpdate(pInfo)
  276. metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
  277. }
  278. p.nominatedPods.add(pod, "")
  279. return nil
  280. }
  281. // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
  282. func (p *PriorityQueue) flushBackoffQCompleted() {
  283. p.lock.Lock()
  284. defer p.lock.Unlock()
  285. for {
  286. rawPodInfo := p.podBackoffQ.Peek()
  287. if rawPodInfo == nil {
  288. return
  289. }
  290. pod := rawPodInfo.(*framework.PodInfo).Pod
  291. boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
  292. if boTime.After(p.clock.Now()) {
  293. return
  294. }
  295. _, err := p.podBackoffQ.Pop()
  296. if err != nil {
  297. klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
  298. return
  299. }
  300. p.activeQ.Add(rawPodInfo)
  301. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
  302. defer p.cond.Broadcast()
  303. }
  304. }
  305. // flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
  306. // to activeQ.
  307. func (p *PriorityQueue) flushUnschedulableQLeftover() {
  308. p.lock.Lock()
  309. defer p.lock.Unlock()
  310. var podsToMove []*framework.PodInfo
  311. currentTime := p.clock.Now()
  312. for _, pInfo := range p.unschedulableQ.podInfoMap {
  313. lastScheduleTime := pInfo.Timestamp
  314. if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
  315. podsToMove = append(podsToMove, pInfo)
  316. }
  317. }
  318. if len(podsToMove) > 0 {
  319. p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
  320. }
  321. }
  322. // Pop removes the head of the active queue and returns it. It blocks if the
  323. // activeQ is empty and waits until a new item is added to the queue. It
  324. // increments scheduling cycle when a pod is popped.
  325. func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
  326. p.lock.Lock()
  327. defer p.lock.Unlock()
  328. for p.activeQ.Len() == 0 {
  329. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  330. // When Close() is called, the p.closed is set and the condition is broadcast,
  331. // which causes this loop to continue and return from the Pop().
  332. if p.closed {
  333. return nil, fmt.Errorf(queueClosed)
  334. }
  335. p.cond.Wait()
  336. }
  337. obj, err := p.activeQ.Pop()
  338. if err != nil {
  339. return nil, err
  340. }
  341. pInfo := obj.(*framework.PodInfo)
  342. pInfo.Attempts++
  343. p.schedulingCycle++
  344. return pInfo, err
  345. }
  346. // isPodUpdated checks if the pod is updated in a way that it may have become
  347. // schedulable. It drops status of the pod and compares it with old version.
  348. func isPodUpdated(oldPod, newPod *v1.Pod) bool {
  349. strip := func(pod *v1.Pod) *v1.Pod {
  350. p := pod.DeepCopy()
  351. p.ResourceVersion = ""
  352. p.Generation = 0
  353. p.Status = v1.PodStatus{}
  354. return p
  355. }
  356. return !reflect.DeepEqual(strip(oldPod), strip(newPod))
  357. }
  358. // Update updates a pod in the active or backoff queue if present. Otherwise, it removes
  359. // the item from the unschedulable queue if pod is updated in a way that it may
  360. // become schedulable and adds the updated one to the active queue.
  361. // If pod is not present in any of the queues, it is added to the active queue.
  362. func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
  363. p.lock.Lock()
  364. defer p.lock.Unlock()
  365. if oldPod != nil {
  366. oldPodInfo := newPodInfoNoTimestamp(oldPod)
  367. // If the pod is already in the active queue, just update it there.
  368. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
  369. p.nominatedPods.update(oldPod, newPod)
  370. err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
  371. return err
  372. }
  373. // If the pod is in the backoff queue, update it there.
  374. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
  375. p.nominatedPods.update(oldPod, newPod)
  376. p.podBackoffQ.Delete(oldPodInfo)
  377. err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
  378. if err == nil {
  379. p.cond.Broadcast()
  380. }
  381. return err
  382. }
  383. }
  384. // If the pod is in the unschedulable queue, updating it may make it schedulable.
  385. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
  386. p.nominatedPods.update(oldPod, newPod)
  387. if isPodUpdated(oldPod, newPod) {
  388. p.unschedulableQ.delete(usPodInfo.Pod)
  389. err := p.activeQ.Add(updatePod(usPodInfo, newPod))
  390. if err == nil {
  391. p.cond.Broadcast()
  392. }
  393. return err
  394. }
  395. // Pod is already in unschedulable queue and hasnt updated, no need to backoff again
  396. p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod))
  397. return nil
  398. }
  399. // If pod is not in any of the queues, we put it in the active queue.
  400. err := p.activeQ.Add(p.newPodInfo(newPod))
  401. if err == nil {
  402. p.nominatedPods.add(newPod, "")
  403. p.cond.Broadcast()
  404. }
  405. return err
  406. }
  407. // Delete deletes the item from either of the two queues. It assumes the pod is
  408. // only in one queue.
  409. func (p *PriorityQueue) Delete(pod *v1.Pod) error {
  410. p.lock.Lock()
  411. defer p.lock.Unlock()
  412. p.nominatedPods.delete(pod)
  413. err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
  414. if err != nil { // The item was probably not found in the activeQ.
  415. p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
  416. p.unschedulableQ.delete(pod)
  417. }
  418. return nil
  419. }
  420. // AssignedPodAdded is called when a bound pod is added. Creation of this pod
  421. // may make pending pods with matching affinity terms schedulable.
  422. func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
  423. p.lock.Lock()
  424. p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
  425. p.lock.Unlock()
  426. }
  427. // AssignedPodUpdated is called when a bound pod is updated. Change of labels
  428. // may make pending pods with matching affinity terms schedulable.
  429. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
  430. p.lock.Lock()
  431. p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
  432. p.lock.Unlock()
  433. }
  434. // MoveAllToActiveOrBackoffQueue moves all pods from unschedulableQ to activeQ or backoffQ.
  435. // This function adds all pods and then signals the condition variable to ensure that
  436. // if Pop() is waiting for an item, it receives it after all the pods are in the
  437. // queue and the head is the highest priority pod.
  438. func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
  439. p.lock.Lock()
  440. defer p.lock.Unlock()
  441. unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
  442. for _, pInfo := range p.unschedulableQ.podInfoMap {
  443. unschedulablePods = append(unschedulablePods, pInfo)
  444. }
  445. p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
  446. p.moveRequestCycle = p.schedulingCycle
  447. p.cond.Broadcast()
  448. }
  449. // NOTE: this function assumes lock has been acquired in caller
  450. func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
  451. for _, pInfo := range podInfoList {
  452. pod := pInfo.Pod
  453. if p.isPodBackingoff(pInfo) {
  454. if err := p.podBackoffQ.Add(pInfo); err != nil {
  455. klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
  456. } else {
  457. metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
  458. p.unschedulableQ.delete(pod)
  459. }
  460. } else {
  461. if err := p.activeQ.Add(pInfo); err != nil {
  462. klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
  463. } else {
  464. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
  465. p.unschedulableQ.delete(pod)
  466. }
  467. }
  468. }
  469. p.moveRequestCycle = p.schedulingCycle
  470. p.cond.Broadcast()
  471. }
  472. // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
  473. // any affinity term that matches "pod".
  474. // NOTE: this function assumes lock has been acquired in caller.
  475. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
  476. var podsToMove []*framework.PodInfo
  477. for _, pInfo := range p.unschedulableQ.podInfoMap {
  478. up := pInfo.Pod
  479. affinity := up.Spec.Affinity
  480. if affinity != nil && affinity.PodAffinity != nil {
  481. terms := util.GetPodAffinityTerms(affinity.PodAffinity)
  482. for _, term := range terms {
  483. namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
  484. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  485. if err != nil {
  486. klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
  487. }
  488. if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
  489. podsToMove = append(podsToMove, pInfo)
  490. break
  491. }
  492. }
  493. }
  494. }
  495. return podsToMove
  496. }
  497. // NominatedPodsForNode returns pods that are nominated to run on the given node,
  498. // but they are waiting for other pods to be removed from the node before they
  499. // can be actually scheduled.
  500. func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
  501. p.lock.RLock()
  502. defer p.lock.RUnlock()
  503. return p.nominatedPods.podsForNode(nodeName)
  504. }
  505. // PendingPods returns all the pending pods in the queue. This function is
  506. // used for debugging purposes in the scheduler cache dumper and comparer.
  507. func (p *PriorityQueue) PendingPods() []*v1.Pod {
  508. p.lock.RLock()
  509. defer p.lock.RUnlock()
  510. result := []*v1.Pod{}
  511. for _, pInfo := range p.activeQ.List() {
  512. result = append(result, pInfo.(*framework.PodInfo).Pod)
  513. }
  514. for _, pInfo := range p.podBackoffQ.List() {
  515. result = append(result, pInfo.(*framework.PodInfo).Pod)
  516. }
  517. for _, pInfo := range p.unschedulableQ.podInfoMap {
  518. result = append(result, pInfo.Pod)
  519. }
  520. return result
  521. }
  522. // Close closes the priority queue.
  523. func (p *PriorityQueue) Close() {
  524. p.lock.Lock()
  525. defer p.lock.Unlock()
  526. close(p.stop)
  527. p.closed = true
  528. p.cond.Broadcast()
  529. }
  530. // DeleteNominatedPodIfExists deletes pod nominatedPods.
  531. func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
  532. p.lock.Lock()
  533. p.nominatedPods.delete(pod)
  534. p.lock.Unlock()
  535. }
  536. // UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
  537. // This is called during the preemption process after a node is nominated to run
  538. // the pod. We update the structure before sending a request to update the pod
  539. // object to avoid races with the following scheduling cycles.
  540. func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
  541. p.lock.Lock()
  542. p.nominatedPods.add(pod, nodeName)
  543. p.lock.Unlock()
  544. }
  545. func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
  546. pInfo1 := podInfo1.(*framework.PodInfo)
  547. pInfo2 := podInfo2.(*framework.PodInfo)
  548. bo1 := p.getBackoffTime(pInfo1)
  549. bo2 := p.getBackoffTime(pInfo2)
  550. return bo1.Before(bo2)
  551. }
  552. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  553. func (p *PriorityQueue) NumUnschedulablePods() int {
  554. p.lock.RLock()
  555. defer p.lock.RUnlock()
  556. return len(p.unschedulableQ.podInfoMap)
  557. }
  558. // newPodInfo builds a PodInfo object.
  559. func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
  560. now := p.clock.Now()
  561. return &framework.PodInfo{
  562. Pod: pod,
  563. Timestamp: now,
  564. InitialAttemptTimestamp: now,
  565. }
  566. }
  567. // getBackoffTime returns the time that podInfo completes backoff
  568. func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
  569. duration := p.calculateBackoffDuration(podInfo)
  570. backoffTime := podInfo.Timestamp.Add(duration)
  571. return backoffTime
  572. }
  573. // calculateBackoffDuration is a helper function for calculating the backoffDuration
  574. // based on the number of attempts the pod has made.
  575. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
  576. duration := p.podInitialBackoffDuration
  577. for i := 1; i < podInfo.Attempts; i++ {
  578. duration = duration * 2
  579. if duration > p.podMaxBackoffDuration {
  580. return p.podMaxBackoffDuration
  581. }
  582. }
  583. return duration
  584. }
  585. func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
  586. pInfo := oldPodInfo.(*framework.PodInfo)
  587. pInfo.Pod = newPod
  588. return pInfo
  589. }
  590. // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
  591. // is used to implement unschedulableQ.
  592. type UnschedulablePodsMap struct {
  593. // podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
  594. podInfoMap map[string]*framework.PodInfo
  595. keyFunc func(*v1.Pod) string
  596. // metricRecorder updates the counter when elements of an unschedulablePodsMap
  597. // get added or removed, and it does nothing if it's nil
  598. metricRecorder metrics.MetricRecorder
  599. }
  600. // Add adds a pod to the unschedulable podInfoMap.
  601. func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
  602. podID := u.keyFunc(pInfo.Pod)
  603. if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
  604. u.metricRecorder.Inc()
  605. }
  606. u.podInfoMap[podID] = pInfo
  607. }
  608. // Delete deletes a pod from the unschedulable podInfoMap.
  609. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
  610. podID := u.keyFunc(pod)
  611. if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
  612. u.metricRecorder.Dec()
  613. }
  614. delete(u.podInfoMap, podID)
  615. }
  616. // Get returns the PodInfo if a pod with the same key as the key of the given "pod"
  617. // is found in the map. It returns nil otherwise.
  618. func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
  619. podKey := u.keyFunc(pod)
  620. if pInfo, exists := u.podInfoMap[podKey]; exists {
  621. return pInfo
  622. }
  623. return nil
  624. }
  625. // Clear removes all the entries from the unschedulable podInfoMap.
  626. func (u *UnschedulablePodsMap) clear() {
  627. u.podInfoMap = make(map[string]*framework.PodInfo)
  628. if u.metricRecorder != nil {
  629. u.metricRecorder.Clear()
  630. }
  631. }
  632. // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
  633. func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
  634. return &UnschedulablePodsMap{
  635. podInfoMap: make(map[string]*framework.PodInfo),
  636. keyFunc: util.GetPodFullName,
  637. metricRecorder: metricRecorder,
  638. }
  639. }
  640. // nominatedPodMap is a structure that stores pods nominated to run on nodes.
  641. // It exists because nominatedNodeName of pod objects stored in the structure
  642. // may be different than what scheduler has here. We should be able to find pods
  643. // by their UID and update/delete them.
  644. type nominatedPodMap struct {
  645. // nominatedPods is a map keyed by a node name and the value is a list of
  646. // pods which are nominated to run on the node. These are pods which can be in
  647. // the activeQ or unschedulableQ.
  648. nominatedPods map[string][]*v1.Pod
  649. // nominatedPodToNode is map keyed by a Pod UID to the node name where it is
  650. // nominated.
  651. nominatedPodToNode map[ktypes.UID]string
  652. }
  653. func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
  654. // always delete the pod if it already exist, to ensure we never store more than
  655. // one instance of the pod.
  656. npm.delete(p)
  657. nnn := nodeName
  658. if len(nnn) == 0 {
  659. nnn = NominatedNodeName(p)
  660. if len(nnn) == 0 {
  661. return
  662. }
  663. }
  664. npm.nominatedPodToNode[p.UID] = nnn
  665. for _, np := range npm.nominatedPods[nnn] {
  666. if np.UID == p.UID {
  667. klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
  668. return
  669. }
  670. }
  671. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
  672. }
  673. func (npm *nominatedPodMap) delete(p *v1.Pod) {
  674. nnn, ok := npm.nominatedPodToNode[p.UID]
  675. if !ok {
  676. return
  677. }
  678. for i, np := range npm.nominatedPods[nnn] {
  679. if np.UID == p.UID {
  680. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
  681. if len(npm.nominatedPods[nnn]) == 0 {
  682. delete(npm.nominatedPods, nnn)
  683. }
  684. break
  685. }
  686. }
  687. delete(npm.nominatedPodToNode, p.UID)
  688. }
  689. func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
  690. // In some cases, an Update event with no "NominatedNode" present is received right
  691. // after a node("NominatedNode") is reserved for this pod in memory.
  692. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
  693. nodeName := ""
  694. // We won't fall into below `if` block if the Update event represents:
  695. // (1) NominatedNode info is added
  696. // (2) NominatedNode info is updated
  697. // (3) NominatedNode info is removed
  698. if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
  699. if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
  700. // This is the only case we should continue reserving the NominatedNode
  701. nodeName = nnn
  702. }
  703. }
  704. // We update irrespective of the nominatedNodeName changed or not, to ensure
  705. // that pod pointer is updated.
  706. npm.delete(oldPod)
  707. npm.add(newPod, nodeName)
  708. }
  709. func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
  710. if list, ok := npm.nominatedPods[nodeName]; ok {
  711. return list
  712. }
  713. return nil
  714. }
  715. func newNominatedPodMap() *nominatedPodMap {
  716. return &nominatedPodMap{
  717. nominatedPods: make(map[string][]*v1.Pod),
  718. nominatedPodToNode: make(map[ktypes.UID]string),
  719. }
  720. }
  721. // MakeNextPodFunc returns a function to retrieve the next pod from a given
  722. // scheduling queue
  723. func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
  724. return func() *framework.PodInfo {
  725. podInfo, err := queue.Pop()
  726. if err == nil {
  727. klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
  728. return podInfo
  729. }
  730. klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
  731. return nil
  732. }
  733. }
  734. func podInfoKeyFunc(obj interface{}) (string, error) {
  735. return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
  736. }