scheduling_queue.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This file contains structures that implement scheduling queue types.
  14. // Scheduling queues hold pods waiting to be scheduled. This file implements a/
  15. // priority queue which has two sub queues. One sub-queue holds pods that are
  16. // being considered for scheduling. This is called activeQ. Another queue holds
  17. // pods that are already tried and are determined to be unschedulable. The latter
  18. // is called unschedulableQ.
  19. package queue
  20. import (
  21. "fmt"
  22. "reflect"
  23. "sync"
  24. "time"
  25. "k8s.io/klog"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. ktypes "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/tools/cache"
  31. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  32. "k8s.io/kubernetes/pkg/scheduler/internal/heap"
  33. "k8s.io/kubernetes/pkg/scheduler/metrics"
  34. "k8s.io/kubernetes/pkg/scheduler/util"
  35. )
  36. const (
  37. // If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
  38. // the pod will be moved from unschedulableQ to activeQ.
  39. unschedulableQTimeInterval = 60 * time.Second
  40. queueClosed = "scheduling queue is closed"
  41. )
  42. const (
  43. // DefaultPodInitialBackoffDuration is the default value for the initial backoff duration
  44. // for unschedulable pods. To change the default podInitialBackoffDurationSeconds used by the
  45. // scheduler, update the ComponentConfig value in defaults.go
  46. DefaultPodInitialBackoffDuration time.Duration = 1 * time.Second
  47. // DefaultPodMaxBackoffDuration is the default value for the max backoff duration
  48. // for unschedulable pods. To change the default podMaxBackoffDurationSeconds used by the
  49. // scheduler, update the ComponentConfig value in defaults.go
  50. DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second
  51. )
  52. // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
  53. // The interface follows a pattern similar to cache.FIFO and cache.Heap and
  54. // makes it easy to use those data structures as a SchedulingQueue.
  55. type SchedulingQueue interface {
  56. Add(pod *v1.Pod) error
  57. // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
  58. // The podSchedulingCycle represents the current scheduling cycle number which can be
  59. // returned by calling SchedulingCycle().
  60. AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error
  61. // SchedulingCycle returns the current number of scheduling cycle which is
  62. // cached by scheduling queue. Normally, incrementing this number whenever
  63. // a pod is popped (e.g. called Pop()) is enough.
  64. SchedulingCycle() int64
  65. // Pop removes the head of the queue and returns it. It blocks if the
  66. // queue is empty and waits until a new item is added to the queue.
  67. Pop() (*framework.PodInfo, error)
  68. Update(oldPod, newPod *v1.Pod) error
  69. Delete(pod *v1.Pod) error
  70. MoveAllToActiveOrBackoffQueue(event string)
  71. AssignedPodAdded(pod *v1.Pod)
  72. AssignedPodUpdated(pod *v1.Pod)
  73. NominatedPodsForNode(nodeName string) []*v1.Pod
  74. PendingPods() []*v1.Pod
  75. // Close closes the SchedulingQueue so that the goroutine which is
  76. // waiting to pop items can exit gracefully.
  77. Close()
  78. // UpdateNominatedPodForNode adds the given pod to the nominated pod map or
  79. // updates it if it already exists.
  80. UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
  81. // DeleteNominatedPodIfExists deletes nominatedPod from internal cache
  82. DeleteNominatedPodIfExists(pod *v1.Pod)
  83. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  84. NumUnschedulablePods() int
  85. // Run starts the goroutines managing the queue.
  86. Run()
  87. }
  88. // NewSchedulingQueue initializes a priority queue as a new scheduling queue.
  89. func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
  90. return NewPriorityQueue(fwk, opts...)
  91. }
  92. // NominatedNodeName returns nominated node name of a Pod.
  93. func NominatedNodeName(pod *v1.Pod) string {
  94. return pod.Status.NominatedNodeName
  95. }
  96. // PriorityQueue implements a scheduling queue.
  97. // The head of PriorityQueue is the highest priority pending pod. This structure
  98. // has three sub queues. One sub-queue holds pods that are being considered for
  99. // scheduling. This is called activeQ and is a Heap. Another queue holds
  100. // pods that are already tried and are determined to be unschedulable. The latter
  101. // is called unschedulableQ. The third queue holds pods that are moved from
  102. // unschedulable queues and will be moved to active queue when backoff are completed.
  103. type PriorityQueue struct {
  104. stop chan struct{}
  105. clock util.Clock
  106. // podBackoff tracks backoff for pods attempting to be rescheduled
  107. podBackoff *PodBackoffMap
  108. lock sync.RWMutex
  109. cond sync.Cond
  110. // activeQ is heap structure that scheduler actively looks at to find pods to
  111. // schedule. Head of heap is the highest priority pod.
  112. activeQ *heap.Heap
  113. // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
  114. // are popped from this heap before the scheduler looks at activeQ
  115. podBackoffQ *heap.Heap
  116. // unschedulableQ holds pods that have been tried and determined unschedulable.
  117. unschedulableQ *UnschedulablePodsMap
  118. // nominatedPods is a structures that stores pods which are nominated to run
  119. // on nodes.
  120. nominatedPods *nominatedPodMap
  121. // schedulingCycle represents sequence number of scheduling cycle and is incremented
  122. // when a pod is popped.
  123. schedulingCycle int64
  124. // moveRequestCycle caches the sequence number of scheduling cycle when we
  125. // received a move request. Unscheduable pods in and before this scheduling
  126. // cycle will be put back to activeQueue if we were trying to schedule them
  127. // when we received move request.
  128. moveRequestCycle int64
  129. // closed indicates that the queue is closed.
  130. // It is mainly used to let Pop() exit its control loop while waiting for an item.
  131. closed bool
  132. }
  133. type priorityQueueOptions struct {
  134. clock util.Clock
  135. podInitialBackoffDuration time.Duration
  136. podMaxBackoffDuration time.Duration
  137. }
  138. // Option configures a PriorityQueue
  139. type Option func(*priorityQueueOptions)
  140. // WithClock sets clock for PriorityQueue, the default clock is util.RealClock.
  141. func WithClock(clock util.Clock) Option {
  142. return func(o *priorityQueueOptions) {
  143. o.clock = clock
  144. }
  145. }
  146. // WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue,
  147. func WithPodInitialBackoffDuration(duration time.Duration) Option {
  148. return func(o *priorityQueueOptions) {
  149. o.podInitialBackoffDuration = duration
  150. }
  151. }
  152. // WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue,
  153. func WithPodMaxBackoffDuration(duration time.Duration) Option {
  154. return func(o *priorityQueueOptions) {
  155. o.podMaxBackoffDuration = duration
  156. }
  157. }
  158. var defaultPriorityQueueOptions = priorityQueueOptions{
  159. clock: util.RealClock{},
  160. podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
  161. podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
  162. }
  163. // Making sure that PriorityQueue implements SchedulingQueue.
  164. var _ SchedulingQueue = &PriorityQueue{}
  165. // newPodInfoNoTimestamp builds a PodInfo object without timestamp.
  166. func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
  167. return &framework.PodInfo{
  168. Pod: pod,
  169. }
  170. }
  171. // NewPriorityQueue creates a PriorityQueue object.
  172. func NewPriorityQueue(
  173. fwk framework.Framework,
  174. opts ...Option,
  175. ) *PriorityQueue {
  176. options := defaultPriorityQueueOptions
  177. for _, opt := range opts {
  178. opt(&options)
  179. }
  180. comp := func(podInfo1, podInfo2 interface{}) bool {
  181. pInfo1 := podInfo1.(*framework.PodInfo)
  182. pInfo2 := podInfo2.(*framework.PodInfo)
  183. return fwk.QueueSortFunc()(pInfo1, pInfo2)
  184. }
  185. pq := &PriorityQueue{
  186. clock: options.clock,
  187. stop: make(chan struct{}),
  188. podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration, options.clock),
  189. activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
  190. unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
  191. nominatedPods: newNominatedPodMap(),
  192. moveRequestCycle: -1,
  193. }
  194. pq.cond.L = &pq.lock
  195. pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
  196. return pq
  197. }
  198. // Run starts the goroutine to pump from podBackoffQ to activeQ
  199. func (p *PriorityQueue) Run() {
  200. go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  201. go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
  202. }
  203. // Add adds a pod to the active queue. It should be called only when a new pod
  204. // is added so there is no chance the pod is already in active/unschedulable/backoff queues
  205. func (p *PriorityQueue) Add(pod *v1.Pod) error {
  206. p.lock.Lock()
  207. defer p.lock.Unlock()
  208. pInfo := p.newPodInfo(pod)
  209. if err := p.activeQ.Add(pInfo); err != nil {
  210. klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
  211. return err
  212. }
  213. if p.unschedulableQ.get(pod) != nil {
  214. klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
  215. p.unschedulableQ.delete(pod)
  216. }
  217. // Delete pod from backoffQ if it is backing off
  218. if err := p.podBackoffQ.Delete(pInfo); err == nil {
  219. klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
  220. }
  221. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
  222. p.nominatedPods.add(pod, "")
  223. p.cond.Broadcast()
  224. return nil
  225. }
  226. // nsNameForPod returns a namespacedname for a pod
  227. func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
  228. return ktypes.NamespacedName{
  229. Namespace: pod.Namespace,
  230. Name: pod.Name,
  231. }
  232. }
  233. // clearPodBackoff clears all backoff state for a pod (resets expiry)
  234. func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
  235. p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
  236. }
  237. // isPodBackingOff returns true if a pod is still waiting for its backoff timer.
  238. // If this returns true, the pod should not be re-tried.
  239. func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
  240. boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
  241. if !exists {
  242. return false
  243. }
  244. return boTime.After(p.clock.Now())
  245. }
  246. // backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
  247. // timeout otherwise it does nothing.
  248. func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
  249. p.podBackoff.CleanupPodsCompletesBackingoff()
  250. podID := nsNameForPod(pod)
  251. boTime, found := p.podBackoff.GetBackoffTime(podID)
  252. if !found || boTime.Before(p.clock.Now()) {
  253. p.podBackoff.BackoffPod(podID)
  254. }
  255. }
  256. // SchedulingCycle returns current scheduling cycle.
  257. func (p *PriorityQueue) SchedulingCycle() int64 {
  258. p.lock.RLock()
  259. defer p.lock.RUnlock()
  260. return p.schedulingCycle
  261. }
  262. // AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
  263. // the queue, unless it is already in the queue. Normally, PriorityQueue puts
  264. // unschedulable pods in `unschedulableQ`. But if there has been a recent move
  265. // request, then the pod is put in `podBackoffQ`.
  266. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error {
  267. p.lock.Lock()
  268. defer p.lock.Unlock()
  269. pod := pInfo.Pod
  270. if p.unschedulableQ.get(pod) != nil {
  271. return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod))
  272. }
  273. // Refresh the timestamp since the pod is re-added.
  274. pInfo.Timestamp = p.clock.Now()
  275. if _, exists, _ := p.activeQ.Get(pInfo); exists {
  276. return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod))
  277. }
  278. if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
  279. return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
  280. }
  281. // Every unschedulable pod is subject to backoff timers.
  282. p.backoffPod(pod)
  283. // If a move request has been received, move it to the BackoffQ, otherwise move
  284. // it to unschedulableQ.
  285. if p.moveRequestCycle >= podSchedulingCycle {
  286. if err := p.podBackoffQ.Add(pInfo); err != nil {
  287. return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
  288. }
  289. metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
  290. } else {
  291. p.unschedulableQ.addOrUpdate(pInfo)
  292. metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
  293. }
  294. p.nominatedPods.add(pod, "")
  295. return nil
  296. }
  297. // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
  298. func (p *PriorityQueue) flushBackoffQCompleted() {
  299. p.lock.Lock()
  300. defer p.lock.Unlock()
  301. for {
  302. rawPodInfo := p.podBackoffQ.Peek()
  303. if rawPodInfo == nil {
  304. return
  305. }
  306. pod := rawPodInfo.(*framework.PodInfo).Pod
  307. boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
  308. if !found {
  309. klog.Errorf("Unable to find backoff value for pod %v in backoff queue", nsNameForPod(pod))
  310. p.podBackoffQ.Pop()
  311. p.activeQ.Add(rawPodInfo)
  312. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
  313. defer p.cond.Broadcast()
  314. continue
  315. }
  316. if boTime.After(p.clock.Now()) {
  317. return
  318. }
  319. _, err := p.podBackoffQ.Pop()
  320. if err != nil {
  321. klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
  322. return
  323. }
  324. p.activeQ.Add(rawPodInfo)
  325. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
  326. defer p.cond.Broadcast()
  327. }
  328. }
  329. // flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
  330. // to activeQ.
  331. func (p *PriorityQueue) flushUnschedulableQLeftover() {
  332. p.lock.Lock()
  333. defer p.lock.Unlock()
  334. var podsToMove []*framework.PodInfo
  335. currentTime := p.clock.Now()
  336. for _, pInfo := range p.unschedulableQ.podInfoMap {
  337. lastScheduleTime := pInfo.Timestamp
  338. if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
  339. podsToMove = append(podsToMove, pInfo)
  340. }
  341. }
  342. if len(podsToMove) > 0 {
  343. p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
  344. }
  345. }
  346. // Pop removes the head of the active queue and returns it. It blocks if the
  347. // activeQ is empty and waits until a new item is added to the queue. It
  348. // increments scheduling cycle when a pod is popped.
  349. func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
  350. p.lock.Lock()
  351. defer p.lock.Unlock()
  352. for p.activeQ.Len() == 0 {
  353. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  354. // When Close() is called, the p.closed is set and the condition is broadcast,
  355. // which causes this loop to continue and return from the Pop().
  356. if p.closed {
  357. return nil, fmt.Errorf(queueClosed)
  358. }
  359. p.cond.Wait()
  360. }
  361. obj, err := p.activeQ.Pop()
  362. if err != nil {
  363. return nil, err
  364. }
  365. pInfo := obj.(*framework.PodInfo)
  366. pInfo.Attempts++
  367. p.schedulingCycle++
  368. return pInfo, err
  369. }
  370. // isPodUpdated checks if the pod is updated in a way that it may have become
  371. // schedulable. It drops status of the pod and compares it with old version.
  372. func isPodUpdated(oldPod, newPod *v1.Pod) bool {
  373. strip := func(pod *v1.Pod) *v1.Pod {
  374. p := pod.DeepCopy()
  375. p.ResourceVersion = ""
  376. p.Generation = 0
  377. p.Status = v1.PodStatus{}
  378. return p
  379. }
  380. return !reflect.DeepEqual(strip(oldPod), strip(newPod))
  381. }
  382. // Update updates a pod in the active or backoff queue if present. Otherwise, it removes
  383. // the item from the unschedulable queue if pod is updated in a way that it may
  384. // become schedulable and adds the updated one to the active queue.
  385. // If pod is not present in any of the queues, it is added to the active queue.
  386. func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
  387. p.lock.Lock()
  388. defer p.lock.Unlock()
  389. if oldPod != nil {
  390. oldPodInfo := newPodInfoNoTimestamp(oldPod)
  391. // If the pod is already in the active queue, just update it there.
  392. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
  393. p.nominatedPods.update(oldPod, newPod)
  394. err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
  395. return err
  396. }
  397. // If the pod is in the backoff queue, update it there.
  398. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
  399. p.nominatedPods.update(oldPod, newPod)
  400. p.podBackoffQ.Delete(oldPodInfo)
  401. err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
  402. if err == nil {
  403. p.cond.Broadcast()
  404. }
  405. return err
  406. }
  407. }
  408. // If the pod is in the unschedulable queue, updating it may make it schedulable.
  409. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
  410. p.nominatedPods.update(oldPod, newPod)
  411. if isPodUpdated(oldPod, newPod) {
  412. // If the pod is updated reset backoff
  413. p.clearPodBackoff(newPod)
  414. p.unschedulableQ.delete(usPodInfo.Pod)
  415. err := p.activeQ.Add(updatePod(usPodInfo, newPod))
  416. if err == nil {
  417. p.cond.Broadcast()
  418. }
  419. return err
  420. }
  421. // Pod is already in unschedulable queue and hasnt updated, no need to backoff again
  422. p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod))
  423. return nil
  424. }
  425. // If pod is not in any of the queues, we put it in the active queue.
  426. err := p.activeQ.Add(p.newPodInfo(newPod))
  427. if err == nil {
  428. p.nominatedPods.add(newPod, "")
  429. p.cond.Broadcast()
  430. }
  431. return err
  432. }
  433. // Delete deletes the item from either of the two queues. It assumes the pod is
  434. // only in one queue.
  435. func (p *PriorityQueue) Delete(pod *v1.Pod) error {
  436. p.lock.Lock()
  437. defer p.lock.Unlock()
  438. p.nominatedPods.delete(pod)
  439. err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
  440. if err != nil { // The item was probably not found in the activeQ.
  441. p.clearPodBackoff(pod)
  442. p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
  443. p.unschedulableQ.delete(pod)
  444. }
  445. return nil
  446. }
  447. // AssignedPodAdded is called when a bound pod is added. Creation of this pod
  448. // may make pending pods with matching affinity terms schedulable.
  449. func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
  450. p.lock.Lock()
  451. p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
  452. p.lock.Unlock()
  453. }
  454. // AssignedPodUpdated is called when a bound pod is updated. Change of labels
  455. // may make pending pods with matching affinity terms schedulable.
  456. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
  457. p.lock.Lock()
  458. p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
  459. p.lock.Unlock()
  460. }
  461. // MoveAllToActiveOrBackoffQueue moves all pods from unschedulableQ to activeQ or backoffQ.
  462. // This function adds all pods and then signals the condition variable to ensure that
  463. // if Pop() is waiting for an item, it receives it after all the pods are in the
  464. // queue and the head is the highest priority pod.
  465. func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
  466. p.lock.Lock()
  467. defer p.lock.Unlock()
  468. unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
  469. for _, pInfo := range p.unschedulableQ.podInfoMap {
  470. unschedulablePods = append(unschedulablePods, pInfo)
  471. }
  472. p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
  473. p.moveRequestCycle = p.schedulingCycle
  474. p.cond.Broadcast()
  475. }
  476. // NOTE: this function assumes lock has been acquired in caller
  477. func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
  478. for _, pInfo := range podInfoList {
  479. pod := pInfo.Pod
  480. if p.isPodBackingOff(pod) {
  481. if err := p.podBackoffQ.Add(pInfo); err != nil {
  482. klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
  483. } else {
  484. metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
  485. p.unschedulableQ.delete(pod)
  486. }
  487. } else {
  488. if err := p.activeQ.Add(pInfo); err != nil {
  489. klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
  490. } else {
  491. metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
  492. p.unschedulableQ.delete(pod)
  493. }
  494. }
  495. }
  496. p.moveRequestCycle = p.schedulingCycle
  497. p.cond.Broadcast()
  498. }
  499. // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
  500. // any affinity term that matches "pod".
  501. // NOTE: this function assumes lock has been acquired in caller.
  502. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
  503. var podsToMove []*framework.PodInfo
  504. for _, pInfo := range p.unschedulableQ.podInfoMap {
  505. up := pInfo.Pod
  506. affinity := up.Spec.Affinity
  507. if affinity != nil && affinity.PodAffinity != nil {
  508. terms := util.GetPodAffinityTerms(affinity.PodAffinity)
  509. for _, term := range terms {
  510. namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
  511. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  512. if err != nil {
  513. klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
  514. }
  515. if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
  516. podsToMove = append(podsToMove, pInfo)
  517. break
  518. }
  519. }
  520. }
  521. }
  522. return podsToMove
  523. }
  524. // NominatedPodsForNode returns pods that are nominated to run on the given node,
  525. // but they are waiting for other pods to be removed from the node before they
  526. // can be actually scheduled.
  527. func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
  528. p.lock.RLock()
  529. defer p.lock.RUnlock()
  530. return p.nominatedPods.podsForNode(nodeName)
  531. }
  532. // PendingPods returns all the pending pods in the queue. This function is
  533. // used for debugging purposes in the scheduler cache dumper and comparer.
  534. func (p *PriorityQueue) PendingPods() []*v1.Pod {
  535. p.lock.RLock()
  536. defer p.lock.RUnlock()
  537. result := []*v1.Pod{}
  538. for _, pInfo := range p.activeQ.List() {
  539. result = append(result, pInfo.(*framework.PodInfo).Pod)
  540. }
  541. for _, pInfo := range p.podBackoffQ.List() {
  542. result = append(result, pInfo.(*framework.PodInfo).Pod)
  543. }
  544. for _, pInfo := range p.unschedulableQ.podInfoMap {
  545. result = append(result, pInfo.Pod)
  546. }
  547. return result
  548. }
  549. // Close closes the priority queue.
  550. func (p *PriorityQueue) Close() {
  551. p.lock.Lock()
  552. defer p.lock.Unlock()
  553. close(p.stop)
  554. p.closed = true
  555. p.cond.Broadcast()
  556. }
  557. // DeleteNominatedPodIfExists deletes pod nominatedPods.
  558. func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
  559. p.lock.Lock()
  560. p.nominatedPods.delete(pod)
  561. p.lock.Unlock()
  562. }
  563. // UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
  564. // This is called during the preemption process after a node is nominated to run
  565. // the pod. We update the structure before sending a request to update the pod
  566. // object to avoid races with the following scheduling cycles.
  567. func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
  568. p.lock.Lock()
  569. p.nominatedPods.add(pod, nodeName)
  570. p.lock.Unlock()
  571. }
  572. func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
  573. pInfo1 := podInfo1.(*framework.PodInfo)
  574. pInfo2 := podInfo2.(*framework.PodInfo)
  575. bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
  576. bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
  577. return bo1.Before(bo2)
  578. }
  579. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  580. func (p *PriorityQueue) NumUnschedulablePods() int {
  581. p.lock.RLock()
  582. defer p.lock.RUnlock()
  583. return len(p.unschedulableQ.podInfoMap)
  584. }
  585. // newPodInfo builds a PodInfo object.
  586. func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
  587. now := p.clock.Now()
  588. return &framework.PodInfo{
  589. Pod: pod,
  590. Timestamp: now,
  591. InitialAttemptTimestamp: now,
  592. }
  593. }
  594. func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
  595. pInfo := oldPodInfo.(*framework.PodInfo)
  596. pInfo.Pod = newPod
  597. return pInfo
  598. }
  599. // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
  600. // is used to implement unschedulableQ.
  601. type UnschedulablePodsMap struct {
  602. // podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
  603. podInfoMap map[string]*framework.PodInfo
  604. keyFunc func(*v1.Pod) string
  605. // metricRecorder updates the counter when elements of an unschedulablePodsMap
  606. // get added or removed, and it does nothing if it's nil
  607. metricRecorder metrics.MetricRecorder
  608. }
  609. // Add adds a pod to the unschedulable podInfoMap.
  610. func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
  611. podID := u.keyFunc(pInfo.Pod)
  612. if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
  613. u.metricRecorder.Inc()
  614. }
  615. u.podInfoMap[podID] = pInfo
  616. }
  617. // Delete deletes a pod from the unschedulable podInfoMap.
  618. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
  619. podID := u.keyFunc(pod)
  620. if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
  621. u.metricRecorder.Dec()
  622. }
  623. delete(u.podInfoMap, podID)
  624. }
  625. // Get returns the PodInfo if a pod with the same key as the key of the given "pod"
  626. // is found in the map. It returns nil otherwise.
  627. func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
  628. podKey := u.keyFunc(pod)
  629. if pInfo, exists := u.podInfoMap[podKey]; exists {
  630. return pInfo
  631. }
  632. return nil
  633. }
  634. // Clear removes all the entries from the unschedulable podInfoMap.
  635. func (u *UnschedulablePodsMap) clear() {
  636. u.podInfoMap = make(map[string]*framework.PodInfo)
  637. if u.metricRecorder != nil {
  638. u.metricRecorder.Clear()
  639. }
  640. }
  641. // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
  642. func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
  643. return &UnschedulablePodsMap{
  644. podInfoMap: make(map[string]*framework.PodInfo),
  645. keyFunc: util.GetPodFullName,
  646. metricRecorder: metricRecorder,
  647. }
  648. }
  649. // nominatedPodMap is a structure that stores pods nominated to run on nodes.
  650. // It exists because nominatedNodeName of pod objects stored in the structure
  651. // may be different than what scheduler has here. We should be able to find pods
  652. // by their UID and update/delete them.
  653. type nominatedPodMap struct {
  654. // nominatedPods is a map keyed by a node name and the value is a list of
  655. // pods which are nominated to run on the node. These are pods which can be in
  656. // the activeQ or unschedulableQ.
  657. nominatedPods map[string][]*v1.Pod
  658. // nominatedPodToNode is map keyed by a Pod UID to the node name where it is
  659. // nominated.
  660. nominatedPodToNode map[ktypes.UID]string
  661. }
  662. func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
  663. // always delete the pod if it already exist, to ensure we never store more than
  664. // one instance of the pod.
  665. npm.delete(p)
  666. nnn := nodeName
  667. if len(nnn) == 0 {
  668. nnn = NominatedNodeName(p)
  669. if len(nnn) == 0 {
  670. return
  671. }
  672. }
  673. npm.nominatedPodToNode[p.UID] = nnn
  674. for _, np := range npm.nominatedPods[nnn] {
  675. if np.UID == p.UID {
  676. klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
  677. return
  678. }
  679. }
  680. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
  681. }
  682. func (npm *nominatedPodMap) delete(p *v1.Pod) {
  683. nnn, ok := npm.nominatedPodToNode[p.UID]
  684. if !ok {
  685. return
  686. }
  687. for i, np := range npm.nominatedPods[nnn] {
  688. if np.UID == p.UID {
  689. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
  690. if len(npm.nominatedPods[nnn]) == 0 {
  691. delete(npm.nominatedPods, nnn)
  692. }
  693. break
  694. }
  695. }
  696. delete(npm.nominatedPodToNode, p.UID)
  697. }
  698. func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
  699. // In some cases, an Update event with no "NominatedNode" present is received right
  700. // after a node("NominatedNode") is reserved for this pod in memory.
  701. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
  702. nodeName := ""
  703. // We won't fall into below `if` block if the Update event represents:
  704. // (1) NominatedNode info is added
  705. // (2) NominatedNode info is updated
  706. // (3) NominatedNode info is removed
  707. if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
  708. if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
  709. // This is the only case we should continue reserving the NominatedNode
  710. nodeName = nnn
  711. }
  712. }
  713. // We update irrespective of the nominatedNodeName changed or not, to ensure
  714. // that pod pointer is updated.
  715. npm.delete(oldPod)
  716. npm.add(newPod, nodeName)
  717. }
  718. func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
  719. if list, ok := npm.nominatedPods[nodeName]; ok {
  720. return list
  721. }
  722. return nil
  723. }
  724. func newNominatedPodMap() *nominatedPodMap {
  725. return &nominatedPodMap{
  726. nominatedPods: make(map[string][]*v1.Pod),
  727. nominatedPodToNode: make(map[ktypes.UID]string),
  728. }
  729. }
  730. // MakeNextPodFunc returns a function to retrieve the next pod from a given
  731. // scheduling queue
  732. func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
  733. return func() *framework.PodInfo {
  734. podInfo, err := queue.Pop()
  735. if err == nil {
  736. klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
  737. return podInfo
  738. }
  739. klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
  740. return nil
  741. }
  742. }
  743. func podInfoKeyFunc(obj interface{}) (string, error) {
  744. return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
  745. }