|
- package queue
- import (
- "fmt"
- "reflect"
- "sync"
- "time"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- ktypes "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/cache"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- "k8s.io/kubernetes/pkg/scheduler/internal/heap"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- "k8s.io/kubernetes/pkg/scheduler/util"
- )
- const (
-
-
- unschedulableQTimeInterval = 60 * time.Second
- queueClosed = "scheduling queue is closed"
- )
- const (
-
-
-
- DefaultPodInitialBackoffDuration time.Duration = 1 * time.Second
-
-
-
- DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second
- )
- type SchedulingQueue interface {
- Add(pod *v1.Pod) error
-
-
-
- AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error
-
-
-
- SchedulingCycle() int64
-
-
- Pop() (*framework.PodInfo, error)
- Update(oldPod, newPod *v1.Pod) error
- Delete(pod *v1.Pod) error
- MoveAllToActiveOrBackoffQueue(event string)
- AssignedPodAdded(pod *v1.Pod)
- AssignedPodUpdated(pod *v1.Pod)
- NominatedPodsForNode(nodeName string) []*v1.Pod
- PendingPods() []*v1.Pod
-
-
- Close()
-
-
- UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
-
- DeleteNominatedPodIfExists(pod *v1.Pod)
-
- NumUnschedulablePods() int
-
- Run()
- }
- func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
- return NewPriorityQueue(fwk, opts...)
- }
- func NominatedNodeName(pod *v1.Pod) string {
- return pod.Status.NominatedNodeName
- }
- type PriorityQueue struct {
- stop chan struct{}
- clock util.Clock
-
- podInitialBackoffDuration time.Duration
-
- podMaxBackoffDuration time.Duration
- lock sync.RWMutex
- cond sync.Cond
-
-
- activeQ *heap.Heap
-
-
- podBackoffQ *heap.Heap
-
- unschedulableQ *UnschedulablePodsMap
-
-
- nominatedPods *nominatedPodMap
-
-
- schedulingCycle int64
-
-
-
-
- moveRequestCycle int64
-
-
- closed bool
- }
- type priorityQueueOptions struct {
- clock util.Clock
- podInitialBackoffDuration time.Duration
- podMaxBackoffDuration time.Duration
- }
- type Option func(*priorityQueueOptions)
- // WithClock sets clock for PriorityQueue, the default clock is util.RealClock.
- func WithClock(clock util.Clock) Option {
- return func(o *priorityQueueOptions) {
- o.clock = clock
- }
- }
- func WithPodInitialBackoffDuration(duration time.Duration) Option {
- return func(o *priorityQueueOptions) {
- o.podInitialBackoffDuration = duration
- }
- }
- func WithPodMaxBackoffDuration(duration time.Duration) Option {
- return func(o *priorityQueueOptions) {
- o.podMaxBackoffDuration = duration
- }
- }
- var defaultPriorityQueueOptions = priorityQueueOptions{
- clock: util.RealClock{},
- podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
- podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
- }
- var _ SchedulingQueue = &PriorityQueue{}
- func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
- return &framework.PodInfo{
- Pod: pod,
- }
- }
- func NewPriorityQueue(
- fwk framework.Framework,
- opts ...Option,
- ) *PriorityQueue {
- options := defaultPriorityQueueOptions
- for _, opt := range opts {
- opt(&options)
- }
- comp := func(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- return fwk.QueueSortFunc()(pInfo1, pInfo2)
- }
- pq := &PriorityQueue{
- clock: options.clock,
- stop: make(chan struct{}),
- podInitialBackoffDuration: options.podInitialBackoffDuration,
- podMaxBackoffDuration: options.podMaxBackoffDuration,
- activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
- unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
- nominatedPods: newNominatedPodMap(),
- moveRequestCycle: -1,
- }
- pq.cond.L = &pq.lock
- pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
- return pq
- }
- func (p *PriorityQueue) Run() {
- go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
- go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
- }
- func (p *PriorityQueue) Add(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- pInfo := p.newPodInfo(pod)
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
- return err
- }
- if p.unschedulableQ.get(pod) != nil {
- klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
- p.unschedulableQ.delete(pod)
- }
-
- if err := p.podBackoffQ.Delete(pInfo); err == nil {
- klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
- }
- metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
- p.nominatedPods.add(pod, "")
- p.cond.Broadcast()
- return nil
- }
- func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
- return ktypes.NamespacedName{
- Namespace: pod.Namespace,
- Name: pod.Name,
- }
- }
- func (p *PriorityQueue) isPodBackingoff(podInfo *framework.PodInfo) bool {
- boTime := p.getBackoffTime(podInfo)
- return boTime.After(p.clock.Now())
- }
- func (p *PriorityQueue) SchedulingCycle() int64 {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return p.schedulingCycle
- }
- func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- pod := pInfo.Pod
- if p.unschedulableQ.get(pod) != nil {
- return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod))
- }
-
- pInfo.Timestamp = p.clock.Now()
- if _, exists, _ := p.activeQ.Get(pInfo); exists {
- return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod))
- }
- if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
- return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
- }
-
-
- if p.moveRequestCycle >= podSchedulingCycle {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
- } else {
- p.unschedulableQ.addOrUpdate(pInfo)
- metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
- }
- p.nominatedPods.add(pod, "")
- return nil
- }
- func (p *PriorityQueue) flushBackoffQCompleted() {
- p.lock.Lock()
- defer p.lock.Unlock()
- for {
- rawPodInfo := p.podBackoffQ.Peek()
- if rawPodInfo == nil {
- return
- }
- pod := rawPodInfo.(*framework.PodInfo).Pod
- boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
- if boTime.After(p.clock.Now()) {
- return
- }
- _, err := p.podBackoffQ.Pop()
- if err != nil {
- klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
- return
- }
- p.activeQ.Add(rawPodInfo)
- metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
- defer p.cond.Broadcast()
- }
- }
- func (p *PriorityQueue) flushUnschedulableQLeftover() {
- p.lock.Lock()
- defer p.lock.Unlock()
- var podsToMove []*framework.PodInfo
- currentTime := p.clock.Now()
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- lastScheduleTime := pInfo.Timestamp
- if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
- podsToMove = append(podsToMove, pInfo)
- }
- }
- if len(podsToMove) > 0 {
- p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
- }
- }
- func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
- p.lock.Lock()
- defer p.lock.Unlock()
- for p.activeQ.Len() == 0 {
-
-
-
- if p.closed {
- return nil, fmt.Errorf(queueClosed)
- }
- p.cond.Wait()
- }
- obj, err := p.activeQ.Pop()
- if err != nil {
- return nil, err
- }
- pInfo := obj.(*framework.PodInfo)
- pInfo.Attempts++
- p.schedulingCycle++
- return pInfo, err
- }
- func isPodUpdated(oldPod, newPod *v1.Pod) bool {
- strip := func(pod *v1.Pod) *v1.Pod {
- p := pod.DeepCopy()
- p.ResourceVersion = ""
- p.Generation = 0
- p.Status = v1.PodStatus{}
- return p
- }
- return !reflect.DeepEqual(strip(oldPod), strip(newPod))
- }
- func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- if oldPod != nil {
- oldPodInfo := newPodInfoNoTimestamp(oldPod)
-
- if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
- p.nominatedPods.update(oldPod, newPod)
- err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
- return err
- }
-
- if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
- p.nominatedPods.update(oldPod, newPod)
- p.podBackoffQ.Delete(oldPodInfo)
- err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
- if err == nil {
- p.cond.Broadcast()
- }
- return err
- }
- }
-
- if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
- p.nominatedPods.update(oldPod, newPod)
- if isPodUpdated(oldPod, newPod) {
- p.unschedulableQ.delete(usPodInfo.Pod)
- err := p.activeQ.Add(updatePod(usPodInfo, newPod))
- if err == nil {
- p.cond.Broadcast()
- }
- return err
- }
-
- p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod))
- return nil
- }
-
- err := p.activeQ.Add(p.newPodInfo(newPod))
- if err == nil {
- p.nominatedPods.add(newPod, "")
- p.cond.Broadcast()
- }
- return err
- }
- func (p *PriorityQueue) Delete(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.nominatedPods.delete(pod)
- err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
- if err != nil {
- p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
- p.unschedulableQ.delete(pod)
- }
- return nil
- }
- func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
- p.lock.Lock()
- p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
- p.lock.Lock()
- p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
- p.lock.Lock()
- defer p.lock.Unlock()
- unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- unschedulablePods = append(unschedulablePods, pInfo)
- }
- p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
- p.moveRequestCycle = p.schedulingCycle
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
- for _, pInfo := range podInfoList {
- pod := pInfo.Pod
- if p.isPodBackingoff(pInfo) {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
- } else {
- metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
- p.unschedulableQ.delete(pod)
- }
- } else {
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
- } else {
- metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
- p.unschedulableQ.delete(pod)
- }
- }
- }
- p.moveRequestCycle = p.schedulingCycle
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
- var podsToMove []*framework.PodInfo
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- up := pInfo.Pod
- affinity := up.Spec.Affinity
- if affinity != nil && affinity.PodAffinity != nil {
- terms := util.GetPodAffinityTerms(affinity.PodAffinity)
- for _, term := range terms {
- namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
- selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
- if err != nil {
- klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
- }
- if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
- podsToMove = append(podsToMove, pInfo)
- break
- }
- }
- }
- }
- return podsToMove
- }
- func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return p.nominatedPods.podsForNode(nodeName)
- }
- func (p *PriorityQueue) PendingPods() []*v1.Pod {
- p.lock.RLock()
- defer p.lock.RUnlock()
- result := []*v1.Pod{}
- for _, pInfo := range p.activeQ.List() {
- result = append(result, pInfo.(*framework.PodInfo).Pod)
- }
- for _, pInfo := range p.podBackoffQ.List() {
- result = append(result, pInfo.(*framework.PodInfo).Pod)
- }
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- result = append(result, pInfo.Pod)
- }
- return result
- }
- func (p *PriorityQueue) Close() {
- p.lock.Lock()
- defer p.lock.Unlock()
- close(p.stop)
- p.closed = true
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
- p.lock.Lock()
- p.nominatedPods.delete(pod)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
- p.lock.Lock()
- p.nominatedPods.add(pod, nodeName)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- bo1 := p.getBackoffTime(pInfo1)
- bo2 := p.getBackoffTime(pInfo2)
- return bo1.Before(bo2)
- }
- func (p *PriorityQueue) NumUnschedulablePods() int {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return len(p.unschedulableQ.podInfoMap)
- }
- func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
- now := p.clock.Now()
- return &framework.PodInfo{
- Pod: pod,
- Timestamp: now,
- InitialAttemptTimestamp: now,
- }
- }
- func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
- duration := p.calculateBackoffDuration(podInfo)
- backoffTime := podInfo.Timestamp.Add(duration)
- return backoffTime
- }
- func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
- duration := p.podInitialBackoffDuration
- for i := 1; i < podInfo.Attempts; i++ {
- duration = duration * 2
- if duration > p.podMaxBackoffDuration {
- return p.podMaxBackoffDuration
- }
- }
- return duration
- }
- func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
- pInfo := oldPodInfo.(*framework.PodInfo)
- pInfo.Pod = newPod
- return pInfo
- }
- type UnschedulablePodsMap struct {
-
- podInfoMap map[string]*framework.PodInfo
- keyFunc func(*v1.Pod) string
- // metricRecorder updates the counter when elements of an unschedulablePodsMap
- // get added or removed, and it does nothing if it's nil
- metricRecorder metrics.MetricRecorder
- }
- // Add adds a pod to the unschedulable podInfoMap.
- func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
- podID := u.keyFunc(pInfo.Pod)
- if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
- u.metricRecorder.Inc()
- }
- u.podInfoMap[podID] = pInfo
- }
- func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
- podID := u.keyFunc(pod)
- if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
- u.metricRecorder.Dec()
- }
- delete(u.podInfoMap, podID)
- }
- func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
- podKey := u.keyFunc(pod)
- if pInfo, exists := u.podInfoMap[podKey]; exists {
- return pInfo
- }
- return nil
- }
- func (u *UnschedulablePodsMap) clear() {
- u.podInfoMap = make(map[string]*framework.PodInfo)
- if u.metricRecorder != nil {
- u.metricRecorder.Clear()
- }
- }
- func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
- return &UnschedulablePodsMap{
- podInfoMap: make(map[string]*framework.PodInfo),
- keyFunc: util.GetPodFullName,
- metricRecorder: metricRecorder,
- }
- }
- type nominatedPodMap struct {
-
-
-
- nominatedPods map[string][]*v1.Pod
-
-
- nominatedPodToNode map[ktypes.UID]string
- }
- func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
-
-
- npm.delete(p)
- nnn := nodeName
- if len(nnn) == 0 {
- nnn = NominatedNodeName(p)
- if len(nnn) == 0 {
- return
- }
- }
- npm.nominatedPodToNode[p.UID] = nnn
- for _, np := range npm.nominatedPods[nnn] {
- if np.UID == p.UID {
- klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
- return
- }
- }
- npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
- }
- func (npm *nominatedPodMap) delete(p *v1.Pod) {
- nnn, ok := npm.nominatedPodToNode[p.UID]
- if !ok {
- return
- }
- for i, np := range npm.nominatedPods[nnn] {
- if np.UID == p.UID {
- npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
- if len(npm.nominatedPods[nnn]) == 0 {
- delete(npm.nominatedPods, nnn)
- }
- break
- }
- }
- delete(npm.nominatedPodToNode, p.UID)
- }
- func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
-
-
-
- nodeName := ""
-
-
-
-
- if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
- if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
-
- nodeName = nnn
- }
- }
-
-
- npm.delete(oldPod)
- npm.add(newPod, nodeName)
- }
- func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
- if list, ok := npm.nominatedPods[nodeName]; ok {
- return list
- }
- return nil
- }
- func newNominatedPodMap() *nominatedPodMap {
- return &nominatedPodMap{
- nominatedPods: make(map[string][]*v1.Pod),
- nominatedPodToNode: make(map[ktypes.UID]string),
- }
- }
- func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
- return func() *framework.PodInfo {
- podInfo, err := queue.Pop()
- if err == nil {
- klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
- return podInfo
- }
- klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
- return nil
- }
- }
- func podInfoKeyFunc(obj interface{}) (string, error) {
- return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
- }
|