scheduling_queue.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This file contains structures that implement scheduling queue types.
  14. // Scheduling queues hold pods waiting to be scheduled. This file implements a
  15. // priority queue which has two sub queues. One sub-queue holds pods that are
  16. // being considered for scheduling. This is called activeQ. Another queue holds
  17. // pods that are already tried and are determined to be unschedulable. The latter
  18. // is called unschedulableQ.
  19. package queue
  20. import (
  21. "fmt"
  22. "reflect"
  23. "sync"
  24. "time"
  25. "k8s.io/klog"
  26. "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. ktypes "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  32. priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
  33. framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
  34. "k8s.io/kubernetes/pkg/scheduler/metrics"
  35. "k8s.io/kubernetes/pkg/scheduler/util"
  36. )
  37. var (
  38. queueClosed = "scheduling queue is closed"
  39. )
  40. // If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
  41. // the pod will be moved from unschedulableQ to activeQ.
  42. const unschedulableQTimeInterval = 60 * time.Second
  43. // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
  44. // The interface follows a pattern similar to cache.FIFO and cache.Heap and
  45. // makes it easy to use those data structures as a SchedulingQueue.
  46. type SchedulingQueue interface {
  47. Add(pod *v1.Pod) error
  48. AddIfNotPresent(pod *v1.Pod) error
  49. // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
  50. // The podSchedulingCycle represents the current scheduling cycle number which can be
  51. // returned by calling SchedulingCycle().
  52. AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
  53. // SchedulingCycle returns the current number of scheduling cycle which is
  54. // cached by scheduling queue. Normally, incrementing this number whenever
  55. // a pod is popped (e.g. called Pop()) is enough.
  56. SchedulingCycle() int64
  57. // Pop removes the head of the queue and returns it. It blocks if the
  58. // queue is empty and waits until a new item is added to the queue.
  59. Pop() (*v1.Pod, error)
  60. Update(oldPod, newPod *v1.Pod) error
  61. Delete(pod *v1.Pod) error
  62. MoveAllToActiveQueue()
  63. AssignedPodAdded(pod *v1.Pod)
  64. AssignedPodUpdated(pod *v1.Pod)
  65. NominatedPodsForNode(nodeName string) []*v1.Pod
  66. PendingPods() []*v1.Pod
  67. // Close closes the SchedulingQueue so that the goroutine which is
  68. // waiting to pop items can exit gracefully.
  69. Close()
  70. // UpdateNominatedPodForNode adds the given pod to the nominated pod map or
  71. // updates it if it already exists.
  72. UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
  73. // DeleteNominatedPodIfExists deletes nominatedPod from internal cache
  74. DeleteNominatedPodIfExists(pod *v1.Pod)
  75. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  76. NumUnschedulablePods() int
  77. }
  78. // NewSchedulingQueue initializes a priority queue as a new scheduling queue.
  79. func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework) SchedulingQueue {
  80. return NewPriorityQueue(stop, fwk)
  81. }
  82. // NominatedNodeName returns nominated node name of a Pod.
  83. func NominatedNodeName(pod *v1.Pod) string {
  84. return pod.Status.NominatedNodeName
  85. }
  86. // PriorityQueue implements a scheduling queue.
  87. // The head of PriorityQueue is the highest priority pending pod. This structure
  88. // has three sub queues. One sub-queue holds pods that are being considered for
  89. // scheduling. This is called activeQ and is a Heap. Another queue holds
  90. // pods that are already tried and are determined to be unschedulable. The latter
  91. // is called unschedulableQ. The third queue holds pods that are moved from
  92. // unschedulable queues and will be moved to active queue when backoff are completed.
  93. type PriorityQueue struct {
  94. stop <-chan struct{}
  95. clock util.Clock
  96. // podBackoff tracks backoff for pods attempting to be rescheduled
  97. podBackoff *PodBackoffMap
  98. lock sync.RWMutex
  99. cond sync.Cond
  100. // activeQ is heap structure that scheduler actively looks at to find pods to
  101. // schedule. Head of heap is the highest priority pod.
  102. activeQ *util.Heap
  103. // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
  104. // are popped from this heap before the scheduler looks at activeQ
  105. podBackoffQ *util.Heap
  106. // unschedulableQ holds pods that have been tried and determined unschedulable.
  107. unschedulableQ *UnschedulablePodsMap
  108. // nominatedPods is a structures that stores pods which are nominated to run
  109. // on nodes.
  110. nominatedPods *nominatedPodMap
  111. // schedulingCycle represents sequence number of scheduling cycle and is incremented
  112. // when a pod is popped.
  113. schedulingCycle int64
  114. // moveRequestCycle caches the sequence number of scheduling cycle when we
  115. // received a move request. Unscheduable pods in and before this scheduling
  116. // cycle will be put back to activeQueue if we were trying to schedule them
  117. // when we received move request.
  118. moveRequestCycle int64
  119. // closed indicates that the queue is closed.
  120. // It is mainly used to let Pop() exit its control loop while waiting for an item.
  121. closed bool
  122. }
  123. // Making sure that PriorityQueue implements SchedulingQueue.
  124. var _ = SchedulingQueue(&PriorityQueue{})
  125. // newPodInfoNoTimestamp builds a PodInfo object without timestamp.
  126. func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
  127. return &framework.PodInfo{
  128. Pod: pod,
  129. }
  130. }
  131. // activeQComp is the function used by the activeQ heap algorithm to sort pods.
  132. // It sorts pods based on their priority. When priorities are equal, it uses
  133. // PodInfo.timestamp.
  134. func activeQComp(podInfo1, podInfo2 interface{}) bool {
  135. pInfo1 := podInfo1.(*framework.PodInfo)
  136. pInfo2 := podInfo2.(*framework.PodInfo)
  137. prio1 := util.GetPodPriority(pInfo1.Pod)
  138. prio2 := util.GetPodPriority(pInfo2.Pod)
  139. return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
  140. }
  141. // NewPriorityQueue creates a PriorityQueue object.
  142. func NewPriorityQueue(stop <-chan struct{}, fwk framework.Framework) *PriorityQueue {
  143. return NewPriorityQueueWithClock(stop, util.RealClock{}, fwk)
  144. }
  145. // NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.
  146. func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk framework.Framework) *PriorityQueue {
  147. comp := activeQComp
  148. if fwk != nil {
  149. if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
  150. comp = func(podInfo1, podInfo2 interface{}) bool {
  151. pInfo1 := podInfo1.(*framework.PodInfo)
  152. pInfo2 := podInfo2.(*framework.PodInfo)
  153. return queueSortFunc(pInfo1, pInfo2)
  154. }
  155. }
  156. }
  157. pq := &PriorityQueue{
  158. clock: clock,
  159. stop: stop,
  160. podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
  161. activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
  162. unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
  163. nominatedPods: newNominatedPodMap(),
  164. moveRequestCycle: -1,
  165. }
  166. pq.cond.L = &pq.lock
  167. pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
  168. pq.run()
  169. return pq
  170. }
  171. // run starts the goroutine to pump from podBackoffQ to activeQ
  172. func (p *PriorityQueue) run() {
  173. go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  174. go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
  175. }
  176. // Add adds a pod to the active queue. It should be called only when a new pod
  177. // is added so there is no chance the pod is already in active/unschedulable/backoff queues
  178. func (p *PriorityQueue) Add(pod *v1.Pod) error {
  179. p.lock.Lock()
  180. defer p.lock.Unlock()
  181. pInfo := p.newPodInfo(pod)
  182. if err := p.activeQ.Add(pInfo); err != nil {
  183. klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
  184. return err
  185. }
  186. if p.unschedulableQ.get(pod) != nil {
  187. klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
  188. p.unschedulableQ.delete(pod)
  189. }
  190. // Delete pod from backoffQ if it is backing off
  191. if err := p.podBackoffQ.Delete(pInfo); err == nil {
  192. klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
  193. }
  194. p.nominatedPods.add(pod, "")
  195. p.cond.Broadcast()
  196. return nil
  197. }
  198. // AddIfNotPresent adds a pod to the active queue if it is not present in any of
  199. // the queues. If it is present in any, it doesn't do any thing.
  200. func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
  201. p.lock.Lock()
  202. defer p.lock.Unlock()
  203. if p.unschedulableQ.get(pod) != nil {
  204. return nil
  205. }
  206. pInfo := p.newPodInfo(pod)
  207. if _, exists, _ := p.activeQ.Get(pInfo); exists {
  208. return nil
  209. }
  210. if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
  211. return nil
  212. }
  213. err := p.activeQ.Add(pInfo)
  214. if err != nil {
  215. klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
  216. } else {
  217. p.nominatedPods.add(pod, "")
  218. p.cond.Broadcast()
  219. }
  220. return err
  221. }
  222. // nsNameForPod returns a namespacedname for a pod
  223. func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
  224. return ktypes.NamespacedName{
  225. Namespace: pod.Namespace,
  226. Name: pod.Name,
  227. }
  228. }
  229. // clearPodBackoff clears all backoff state for a pod (resets expiry)
  230. func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
  231. p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
  232. }
  233. // isPodBackingOff returns true if a pod is still waiting for its backoff timer.
  234. // If this returns true, the pod should not be re-tried.
  235. func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
  236. boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
  237. if !exists {
  238. return false
  239. }
  240. return boTime.After(p.clock.Now())
  241. }
  242. // backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
  243. // timeout otherwise it does nothing.
  244. func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
  245. p.podBackoff.CleanupPodsCompletesBackingoff()
  246. podID := nsNameForPod(pod)
  247. boTime, found := p.podBackoff.GetBackoffTime(podID)
  248. if !found || boTime.Before(p.clock.Now()) {
  249. p.podBackoff.BackoffPod(podID)
  250. }
  251. }
  252. // SchedulingCycle returns current scheduling cycle.
  253. func (p *PriorityQueue) SchedulingCycle() int64 {
  254. p.lock.RLock()
  255. defer p.lock.RUnlock()
  256. return p.schedulingCycle
  257. }
  258. // AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
  259. // the queue, unless it is already in the queue. Normally, PriorityQueue puts
  260. // unschedulable pods in `unschedulableQ`. But if there has been a recent move
  261. // request, then the pod is put in `podBackoffQ`.
  262. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
  263. p.lock.Lock()
  264. defer p.lock.Unlock()
  265. if p.unschedulableQ.get(pod) != nil {
  266. return fmt.Errorf("pod is already present in unschedulableQ")
  267. }
  268. pInfo := p.newPodInfo(pod)
  269. if _, exists, _ := p.activeQ.Get(pInfo); exists {
  270. return fmt.Errorf("pod is already present in the activeQ")
  271. }
  272. if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
  273. return fmt.Errorf("pod is already present in the backoffQ")
  274. }
  275. // Every unschedulable pod is subject to backoff timers.
  276. p.backoffPod(pod)
  277. // If a move request has been received, move it to the BackoffQ, otherwise move
  278. // it to unschedulableQ.
  279. if p.moveRequestCycle >= podSchedulingCycle {
  280. if err := p.podBackoffQ.Add(pInfo); err != nil {
  281. return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
  282. }
  283. } else {
  284. p.unschedulableQ.addOrUpdate(pInfo)
  285. }
  286. p.nominatedPods.add(pod, "")
  287. return nil
  288. }
  289. // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
  290. func (p *PriorityQueue) flushBackoffQCompleted() {
  291. p.lock.Lock()
  292. defer p.lock.Unlock()
  293. for {
  294. rawPodInfo := p.podBackoffQ.Peek()
  295. if rawPodInfo == nil {
  296. return
  297. }
  298. pod := rawPodInfo.(*framework.PodInfo).Pod
  299. boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
  300. if !found {
  301. klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
  302. p.podBackoffQ.Pop()
  303. p.activeQ.Add(rawPodInfo)
  304. defer p.cond.Broadcast()
  305. continue
  306. }
  307. if boTime.After(p.clock.Now()) {
  308. return
  309. }
  310. _, err := p.podBackoffQ.Pop()
  311. if err != nil {
  312. klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
  313. return
  314. }
  315. p.activeQ.Add(rawPodInfo)
  316. defer p.cond.Broadcast()
  317. }
  318. }
  319. // flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the durationStayUnschedulableQ
  320. // to activeQ.
  321. func (p *PriorityQueue) flushUnschedulableQLeftover() {
  322. p.lock.Lock()
  323. defer p.lock.Unlock()
  324. var podsToMove []*framework.PodInfo
  325. currentTime := p.clock.Now()
  326. for _, pInfo := range p.unschedulableQ.podInfoMap {
  327. lastScheduleTime := pInfo.Timestamp
  328. if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
  329. podsToMove = append(podsToMove, pInfo)
  330. }
  331. }
  332. if len(podsToMove) > 0 {
  333. p.movePodsToActiveQueue(podsToMove)
  334. }
  335. }
  336. // Pop removes the head of the active queue and returns it. It blocks if the
  337. // activeQ is empty and waits until a new item is added to the queue. It
  338. // increments scheduling cycle when a pod is popped.
  339. func (p *PriorityQueue) Pop() (*v1.Pod, error) {
  340. p.lock.Lock()
  341. defer p.lock.Unlock()
  342. for p.activeQ.Len() == 0 {
  343. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  344. // When Close() is called, the p.closed is set and the condition is broadcast,
  345. // which causes this loop to continue and return from the Pop().
  346. if p.closed {
  347. return nil, fmt.Errorf(queueClosed)
  348. }
  349. p.cond.Wait()
  350. }
  351. obj, err := p.activeQ.Pop()
  352. if err != nil {
  353. return nil, err
  354. }
  355. pInfo := obj.(*framework.PodInfo)
  356. p.schedulingCycle++
  357. return pInfo.Pod, err
  358. }
  359. // isPodUpdated checks if the pod is updated in a way that it may have become
  360. // schedulable. It drops status of the pod and compares it with old version.
  361. func isPodUpdated(oldPod, newPod *v1.Pod) bool {
  362. strip := func(pod *v1.Pod) *v1.Pod {
  363. p := pod.DeepCopy()
  364. p.ResourceVersion = ""
  365. p.Generation = 0
  366. p.Status = v1.PodStatus{}
  367. return p
  368. }
  369. return !reflect.DeepEqual(strip(oldPod), strip(newPod))
  370. }
  371. // Update updates a pod in the active or backoff queue if present. Otherwise, it removes
  372. // the item from the unschedulable queue if pod is updated in a way that it may
  373. // become schedulable and adds the updated one to the active queue.
  374. // If pod is not present in any of the queues, it is added to the active queue.
  375. func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
  376. p.lock.Lock()
  377. defer p.lock.Unlock()
  378. if oldPod != nil {
  379. oldPodInfo := newPodInfoNoTimestamp(oldPod)
  380. // If the pod is already in the active queue, just update it there.
  381. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
  382. p.nominatedPods.update(oldPod, newPod)
  383. newPodInfo := newPodInfoNoTimestamp(newPod)
  384. newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
  385. err := p.activeQ.Update(newPodInfo)
  386. return err
  387. }
  388. // If the pod is in the backoff queue, update it there.
  389. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
  390. p.nominatedPods.update(oldPod, newPod)
  391. p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
  392. newPodInfo := newPodInfoNoTimestamp(newPod)
  393. newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
  394. err := p.activeQ.Add(newPodInfo)
  395. if err == nil {
  396. p.cond.Broadcast()
  397. }
  398. return err
  399. }
  400. }
  401. // If the pod is in the unschedulable queue, updating it may make it schedulable.
  402. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
  403. p.nominatedPods.update(oldPod, newPod)
  404. newPodInfo := newPodInfoNoTimestamp(newPod)
  405. newPodInfo.Timestamp = usPodInfo.Timestamp
  406. if isPodUpdated(oldPod, newPod) {
  407. // If the pod is updated reset backoff
  408. p.clearPodBackoff(newPod)
  409. p.unschedulableQ.delete(usPodInfo.Pod)
  410. err := p.activeQ.Add(newPodInfo)
  411. if err == nil {
  412. p.cond.Broadcast()
  413. }
  414. return err
  415. }
  416. // Pod is already in unschedulable queue and hasnt updated, no need to backoff again
  417. p.unschedulableQ.addOrUpdate(newPodInfo)
  418. return nil
  419. }
  420. // If pod is not in any of the queues, we put it in the active queue.
  421. err := p.activeQ.Add(p.newPodInfo(newPod))
  422. if err == nil {
  423. p.nominatedPods.add(newPod, "")
  424. p.cond.Broadcast()
  425. }
  426. return err
  427. }
  428. // Delete deletes the item from either of the two queues. It assumes the pod is
  429. // only in one queue.
  430. func (p *PriorityQueue) Delete(pod *v1.Pod) error {
  431. p.lock.Lock()
  432. defer p.lock.Unlock()
  433. p.nominatedPods.delete(pod)
  434. err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
  435. if err != nil { // The item was probably not found in the activeQ.
  436. p.clearPodBackoff(pod)
  437. p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
  438. p.unschedulableQ.delete(pod)
  439. }
  440. return nil
  441. }
  442. // AssignedPodAdded is called when a bound pod is added. Creation of this pod
  443. // may make pending pods with matching affinity terms schedulable.
  444. func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
  445. p.lock.Lock()
  446. p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
  447. p.lock.Unlock()
  448. }
  449. // AssignedPodUpdated is called when a bound pod is updated. Change of labels
  450. // may make pending pods with matching affinity terms schedulable.
  451. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
  452. p.lock.Lock()
  453. p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
  454. p.lock.Unlock()
  455. }
  456. // MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
  457. // function adds all pods and then signals the condition variable to ensure that
  458. // if Pop() is waiting for an item, it receives it after all the pods are in the
  459. // queue and the head is the highest priority pod.
  460. func (p *PriorityQueue) MoveAllToActiveQueue() {
  461. p.lock.Lock()
  462. defer p.lock.Unlock()
  463. for _, pInfo := range p.unschedulableQ.podInfoMap {
  464. pod := pInfo.Pod
  465. if p.isPodBackingOff(pod) {
  466. if err := p.podBackoffQ.Add(pInfo); err != nil {
  467. klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
  468. }
  469. } else {
  470. if err := p.activeQ.Add(pInfo); err != nil {
  471. klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
  472. }
  473. }
  474. }
  475. p.unschedulableQ.clear()
  476. p.moveRequestCycle = p.schedulingCycle
  477. p.cond.Broadcast()
  478. }
  479. // NOTE: this function assumes lock has been acquired in caller
  480. func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*framework.PodInfo) {
  481. for _, pInfo := range podInfoList {
  482. pod := pInfo.Pod
  483. if p.isPodBackingOff(pod) {
  484. if err := p.podBackoffQ.Add(pInfo); err != nil {
  485. klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
  486. }
  487. } else {
  488. if err := p.activeQ.Add(pInfo); err != nil {
  489. klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
  490. }
  491. }
  492. p.unschedulableQ.delete(pod)
  493. }
  494. p.moveRequestCycle = p.schedulingCycle
  495. p.cond.Broadcast()
  496. }
  497. // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
  498. // any affinity term that matches "pod".
  499. // NOTE: this function assumes lock has been acquired in caller.
  500. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
  501. var podsToMove []*framework.PodInfo
  502. for _, pInfo := range p.unschedulableQ.podInfoMap {
  503. up := pInfo.Pod
  504. affinity := up.Spec.Affinity
  505. if affinity != nil && affinity.PodAffinity != nil {
  506. terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
  507. for _, term := range terms {
  508. namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
  509. selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
  510. if err != nil {
  511. klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
  512. }
  513. if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
  514. podsToMove = append(podsToMove, pInfo)
  515. break
  516. }
  517. }
  518. }
  519. }
  520. return podsToMove
  521. }
  522. // NominatedPodsForNode returns pods that are nominated to run on the given node,
  523. // but they are waiting for other pods to be removed from the node before they
  524. // can be actually scheduled.
  525. func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
  526. p.lock.RLock()
  527. defer p.lock.RUnlock()
  528. return p.nominatedPods.podsForNode(nodeName)
  529. }
  530. // PendingPods returns all the pending pods in the queue. This function is
  531. // used for debugging purposes in the scheduler cache dumper and comparer.
  532. func (p *PriorityQueue) PendingPods() []*v1.Pod {
  533. p.lock.RLock()
  534. defer p.lock.RUnlock()
  535. result := []*v1.Pod{}
  536. for _, pInfo := range p.activeQ.List() {
  537. result = append(result, pInfo.(*framework.PodInfo).Pod)
  538. }
  539. for _, pInfo := range p.podBackoffQ.List() {
  540. result = append(result, pInfo.(*framework.PodInfo).Pod)
  541. }
  542. for _, pInfo := range p.unschedulableQ.podInfoMap {
  543. result = append(result, pInfo.Pod)
  544. }
  545. return result
  546. }
  547. // Close closes the priority queue.
  548. func (p *PriorityQueue) Close() {
  549. p.lock.Lock()
  550. defer p.lock.Unlock()
  551. p.closed = true
  552. p.cond.Broadcast()
  553. }
  554. // DeleteNominatedPodIfExists deletes pod nominatedPods.
  555. func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
  556. p.lock.Lock()
  557. p.nominatedPods.delete(pod)
  558. p.lock.Unlock()
  559. }
  560. // UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
  561. // This is called during the preemption process after a node is nominated to run
  562. // the pod. We update the structure before sending a request to update the pod
  563. // object to avoid races with the following scheduling cycles.
  564. func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
  565. p.lock.Lock()
  566. p.nominatedPods.add(pod, nodeName)
  567. p.lock.Unlock()
  568. }
  569. func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
  570. pInfo1 := podInfo1.(*framework.PodInfo)
  571. pInfo2 := podInfo2.(*framework.PodInfo)
  572. bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
  573. bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
  574. return bo1.Before(bo2)
  575. }
  576. // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
  577. func (p *PriorityQueue) NumUnschedulablePods() int {
  578. p.lock.RLock()
  579. defer p.lock.RUnlock()
  580. return len(p.unschedulableQ.podInfoMap)
  581. }
  582. // newPodInfo builds a PodInfo object.
  583. func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
  584. if p.clock == nil {
  585. return &framework.PodInfo{
  586. Pod: pod,
  587. }
  588. }
  589. return &framework.PodInfo{
  590. Pod: pod,
  591. Timestamp: p.clock.Now(),
  592. }
  593. }
  594. // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
  595. // is used to implement unschedulableQ.
  596. type UnschedulablePodsMap struct {
  597. // podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
  598. podInfoMap map[string]*framework.PodInfo
  599. keyFunc func(*v1.Pod) string
  600. // metricRecorder updates the counter when elements of an unschedulablePodsMap
  601. // get added or removed, and it does nothing if it's nil
  602. metricRecorder metrics.MetricRecorder
  603. }
  604. // Add adds a pod to the unschedulable podInfoMap.
  605. func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
  606. podID := u.keyFunc(pInfo.Pod)
  607. if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
  608. u.metricRecorder.Inc()
  609. }
  610. u.podInfoMap[podID] = pInfo
  611. }
  612. // Delete deletes a pod from the unschedulable podInfoMap.
  613. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
  614. podID := u.keyFunc(pod)
  615. if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
  616. u.metricRecorder.Dec()
  617. }
  618. delete(u.podInfoMap, podID)
  619. }
  620. // Get returns the PodInfo if a pod with the same key as the key of the given "pod"
  621. // is found in the map. It returns nil otherwise.
  622. func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
  623. podKey := u.keyFunc(pod)
  624. if pInfo, exists := u.podInfoMap[podKey]; exists {
  625. return pInfo
  626. }
  627. return nil
  628. }
  629. // Clear removes all the entries from the unschedulable podInfoMap.
  630. func (u *UnschedulablePodsMap) clear() {
  631. u.podInfoMap = make(map[string]*framework.PodInfo)
  632. if u.metricRecorder != nil {
  633. u.metricRecorder.Clear()
  634. }
  635. }
  636. // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
  637. func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
  638. return &UnschedulablePodsMap{
  639. podInfoMap: make(map[string]*framework.PodInfo),
  640. keyFunc: util.GetPodFullName,
  641. metricRecorder: metricRecorder,
  642. }
  643. }
  644. // nominatedPodMap is a structure that stores pods nominated to run on nodes.
  645. // It exists because nominatedNodeName of pod objects stored in the structure
  646. // may be different than what scheduler has here. We should be able to find pods
  647. // by their UID and update/delete them.
  648. type nominatedPodMap struct {
  649. // nominatedPods is a map keyed by a node name and the value is a list of
  650. // pods which are nominated to run on the node. These are pods which can be in
  651. // the activeQ or unschedulableQ.
  652. nominatedPods map[string][]*v1.Pod
  653. // nominatedPodToNode is map keyed by a Pod UID to the node name where it is
  654. // nominated.
  655. nominatedPodToNode map[ktypes.UID]string
  656. }
  657. func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
  658. // always delete the pod if it already exist, to ensure we never store more than
  659. // one instance of the pod.
  660. npm.delete(p)
  661. nnn := nodeName
  662. if len(nnn) == 0 {
  663. nnn = NominatedNodeName(p)
  664. if len(nnn) == 0 {
  665. return
  666. }
  667. }
  668. npm.nominatedPodToNode[p.UID] = nnn
  669. for _, np := range npm.nominatedPods[nnn] {
  670. if np.UID == p.UID {
  671. klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
  672. return
  673. }
  674. }
  675. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
  676. }
  677. func (npm *nominatedPodMap) delete(p *v1.Pod) {
  678. nnn, ok := npm.nominatedPodToNode[p.UID]
  679. if !ok {
  680. return
  681. }
  682. for i, np := range npm.nominatedPods[nnn] {
  683. if np.UID == p.UID {
  684. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
  685. if len(npm.nominatedPods[nnn]) == 0 {
  686. delete(npm.nominatedPods, nnn)
  687. }
  688. break
  689. }
  690. }
  691. delete(npm.nominatedPodToNode, p.UID)
  692. }
  693. func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
  694. // In some cases, an Update event with no "NominatedNode" present is received right
  695. // after a node("NominatedNode") is reserved for this pod in memory.
  696. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
  697. nodeName := ""
  698. // We won't fall into below `if` block if the Update event represents:
  699. // (1) NominatedNode info is added
  700. // (2) NominatedNode info is updated
  701. // (3) NominatedNode info is removed
  702. if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
  703. if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
  704. // This is the only case we should continue reserving the NominatedNode
  705. nodeName = nnn
  706. }
  707. }
  708. // We update irrespective of the nominatedNodeName changed or not, to ensure
  709. // that pod pointer is updated.
  710. npm.delete(oldPod)
  711. npm.add(newPod, nodeName)
  712. }
  713. func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
  714. if list, ok := npm.nominatedPods[nodeName]; ok {
  715. return list
  716. }
  717. return nil
  718. }
  719. func newNominatedPodMap() *nominatedPodMap {
  720. return &nominatedPodMap{
  721. nominatedPods: make(map[string][]*v1.Pod),
  722. nominatedPodToNode: make(map[ktypes.UID]string),
  723. }
  724. }
  725. // MakeNextPodFunc returns a function to retrieve the next pod from a given
  726. // scheduling queue
  727. func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
  728. return func() *v1.Pod {
  729. pod, err := queue.Pop()
  730. if err == nil {
  731. klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
  732. return pod
  733. }
  734. klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
  735. return nil
  736. }
  737. }
  738. func podInfoKeyFunc(obj interface{}) (string, error) {
  739. return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
  740. }