|
- package queue
- import (
- "fmt"
- "reflect"
- "sync"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- ktypes "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/cache"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- "k8s.io/kubernetes/pkg/scheduler/util"
- )
- var (
- queueClosed = "scheduling queue is closed"
- )
- const unschedulableQTimeInterval = 60 * time.Second
- type SchedulingQueue interface {
- Add(pod *v1.Pod) error
- AddIfNotPresent(pod *v1.Pod) error
-
-
-
- AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
-
-
-
- SchedulingCycle() int64
-
-
- Pop() (*v1.Pod, error)
- Update(oldPod, newPod *v1.Pod) error
- Delete(pod *v1.Pod) error
- MoveAllToActiveQueue()
- AssignedPodAdded(pod *v1.Pod)
- AssignedPodUpdated(pod *v1.Pod)
- NominatedPodsForNode(nodeName string) []*v1.Pod
- PendingPods() []*v1.Pod
-
-
- Close()
-
-
- UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
-
- DeleteNominatedPodIfExists(pod *v1.Pod)
-
- NumUnschedulablePods() int
- }
- func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework) SchedulingQueue {
- return NewPriorityQueue(stop, fwk)
- }
- func NominatedNodeName(pod *v1.Pod) string {
- return pod.Status.NominatedNodeName
- }
- type PriorityQueue struct {
- stop <-chan struct{}
- clock util.Clock
-
- podBackoff *PodBackoffMap
- lock sync.RWMutex
- cond sync.Cond
-
-
- activeQ *util.Heap
-
-
- podBackoffQ *util.Heap
-
- unschedulableQ *UnschedulablePodsMap
-
-
- nominatedPods *nominatedPodMap
-
-
- schedulingCycle int64
-
-
-
-
- moveRequestCycle int64
-
-
- closed bool
- }
- var _ = SchedulingQueue(&PriorityQueue{})
- func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
- return &framework.PodInfo{
- Pod: pod,
- }
- }
- func activeQComp(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- prio1 := util.GetPodPriority(pInfo1.Pod)
- prio2 := util.GetPodPriority(pInfo2.Pod)
- return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
- }
- func NewPriorityQueue(stop <-chan struct{}, fwk framework.Framework) *PriorityQueue {
- return NewPriorityQueueWithClock(stop, util.RealClock{}, fwk)
- }
- func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk framework.Framework) *PriorityQueue {
- comp := activeQComp
- if fwk != nil {
- if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
- comp = func(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- return queueSortFunc(pInfo1, pInfo2)
- }
- }
- }
- pq := &PriorityQueue{
- clock: clock,
- stop: stop,
- podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
- activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
- unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
- nominatedPods: newNominatedPodMap(),
- moveRequestCycle: -1,
- }
- pq.cond.L = &pq.lock
- pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
- pq.run()
- return pq
- }
- func (p *PriorityQueue) run() {
- go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
- go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
- }
- func (p *PriorityQueue) Add(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- pInfo := p.newPodInfo(pod)
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
- return err
- }
- if p.unschedulableQ.get(pod) != nil {
- klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
- p.unschedulableQ.delete(pod)
- }
-
- if err := p.podBackoffQ.Delete(pInfo); err == nil {
- klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
- }
- p.nominatedPods.add(pod, "")
- p.cond.Broadcast()
- return nil
- }
- func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.unschedulableQ.get(pod) != nil {
- return nil
- }
- pInfo := p.newPodInfo(pod)
- if _, exists, _ := p.activeQ.Get(pInfo); exists {
- return nil
- }
- if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
- return nil
- }
- err := p.activeQ.Add(pInfo)
- if err != nil {
- klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
- } else {
- p.nominatedPods.add(pod, "")
- p.cond.Broadcast()
- }
- return err
- }
- func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
- return ktypes.NamespacedName{
- Namespace: pod.Namespace,
- Name: pod.Name,
- }
- }
- func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
- p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
- }
- func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
- boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
- if !exists {
- return false
- }
- return boTime.After(p.clock.Now())
- }
- func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
- p.podBackoff.CleanupPodsCompletesBackingoff()
- podID := nsNameForPod(pod)
- boTime, found := p.podBackoff.GetBackoffTime(podID)
- if !found || boTime.Before(p.clock.Now()) {
- p.podBackoff.BackoffPod(podID)
- }
- }
- func (p *PriorityQueue) SchedulingCycle() int64 {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return p.schedulingCycle
- }
- func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.unschedulableQ.get(pod) != nil {
- return fmt.Errorf("pod is already present in unschedulableQ")
- }
- pInfo := p.newPodInfo(pod)
- if _, exists, _ := p.activeQ.Get(pInfo); exists {
- return fmt.Errorf("pod is already present in the activeQ")
- }
- if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
- return fmt.Errorf("pod is already present in the backoffQ")
- }
-
- p.backoffPod(pod)
-
-
- if p.moveRequestCycle >= podSchedulingCycle {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- p.unschedulableQ.addOrUpdate(pInfo)
- }
- p.nominatedPods.add(pod, "")
- return nil
- }
- func (p *PriorityQueue) flushBackoffQCompleted() {
- p.lock.Lock()
- defer p.lock.Unlock()
- for {
- rawPodInfo := p.podBackoffQ.Peek()
- if rawPodInfo == nil {
- return
- }
- pod := rawPodInfo.(*framework.PodInfo).Pod
- boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
- if !found {
- klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
- p.podBackoffQ.Pop()
- p.activeQ.Add(rawPodInfo)
- defer p.cond.Broadcast()
- continue
- }
- if boTime.After(p.clock.Now()) {
- return
- }
- _, err := p.podBackoffQ.Pop()
- if err != nil {
- klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
- return
- }
- p.activeQ.Add(rawPodInfo)
- defer p.cond.Broadcast()
- }
- }
- func (p *PriorityQueue) flushUnschedulableQLeftover() {
- p.lock.Lock()
- defer p.lock.Unlock()
- var podsToMove []*framework.PodInfo
- currentTime := p.clock.Now()
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- lastScheduleTime := pInfo.Timestamp
- if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
- podsToMove = append(podsToMove, pInfo)
- }
- }
- if len(podsToMove) > 0 {
- p.movePodsToActiveQueue(podsToMove)
- }
- }
- func (p *PriorityQueue) Pop() (*v1.Pod, error) {
- p.lock.Lock()
- defer p.lock.Unlock()
- for p.activeQ.Len() == 0 {
-
-
-
- if p.closed {
- return nil, fmt.Errorf(queueClosed)
- }
- p.cond.Wait()
- }
- obj, err := p.activeQ.Pop()
- if err != nil {
- return nil, err
- }
- pInfo := obj.(*framework.PodInfo)
- p.schedulingCycle++
- return pInfo.Pod, err
- }
- func isPodUpdated(oldPod, newPod *v1.Pod) bool {
- strip := func(pod *v1.Pod) *v1.Pod {
- p := pod.DeepCopy()
- p.ResourceVersion = ""
- p.Generation = 0
- p.Status = v1.PodStatus{}
- return p
- }
- return !reflect.DeepEqual(strip(oldPod), strip(newPod))
- }
- func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- if oldPod != nil {
- oldPodInfo := newPodInfoNoTimestamp(oldPod)
-
- if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
- p.nominatedPods.update(oldPod, newPod)
- newPodInfo := newPodInfoNoTimestamp(newPod)
- newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
- err := p.activeQ.Update(newPodInfo)
- return err
- }
-
- if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
- p.nominatedPods.update(oldPod, newPod)
- p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
- newPodInfo := newPodInfoNoTimestamp(newPod)
- newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
- err := p.activeQ.Add(newPodInfo)
- if err == nil {
- p.cond.Broadcast()
- }
- return err
- }
- }
-
- if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
- p.nominatedPods.update(oldPod, newPod)
- newPodInfo := newPodInfoNoTimestamp(newPod)
- newPodInfo.Timestamp = usPodInfo.Timestamp
- if isPodUpdated(oldPod, newPod) {
-
- p.clearPodBackoff(newPod)
- p.unschedulableQ.delete(usPodInfo.Pod)
- err := p.activeQ.Add(newPodInfo)
- if err == nil {
- p.cond.Broadcast()
- }
- return err
- }
-
- p.unschedulableQ.addOrUpdate(newPodInfo)
- return nil
- }
-
- err := p.activeQ.Add(p.newPodInfo(newPod))
- if err == nil {
- p.nominatedPods.add(newPod, "")
- p.cond.Broadcast()
- }
- return err
- }
- func (p *PriorityQueue) Delete(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.nominatedPods.delete(pod)
- err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
- if err != nil {
- p.clearPodBackoff(pod)
- p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
- p.unschedulableQ.delete(pod)
- }
- return nil
- }
- func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
- p.lock.Lock()
- p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
- p.lock.Unlock()
- }
- func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
- p.lock.Lock()
- p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
- p.lock.Unlock()
- }
- func (p *PriorityQueue) MoveAllToActiveQueue() {
- p.lock.Lock()
- defer p.lock.Unlock()
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- pod := pInfo.Pod
- if p.isPodBackingOff(pod) {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
- }
- }
- }
- p.unschedulableQ.clear()
- p.moveRequestCycle = p.schedulingCycle
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*framework.PodInfo) {
- for _, pInfo := range podInfoList {
- pod := pInfo.Pod
- if p.isPodBackingOff(pod) {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
- }
- }
- p.unschedulableQ.delete(pod)
- }
- p.moveRequestCycle = p.schedulingCycle
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
- var podsToMove []*framework.PodInfo
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- up := pInfo.Pod
- affinity := up.Spec.Affinity
- if affinity != nil && affinity.PodAffinity != nil {
- terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
- for _, term := range terms {
- namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
- selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
- if err != nil {
- klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
- }
- if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
- podsToMove = append(podsToMove, pInfo)
- break
- }
- }
- }
- }
- return podsToMove
- }
- func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return p.nominatedPods.podsForNode(nodeName)
- }
- func (p *PriorityQueue) PendingPods() []*v1.Pod {
- p.lock.RLock()
- defer p.lock.RUnlock()
- result := []*v1.Pod{}
- for _, pInfo := range p.activeQ.List() {
- result = append(result, pInfo.(*framework.PodInfo).Pod)
- }
- for _, pInfo := range p.podBackoffQ.List() {
- result = append(result, pInfo.(*framework.PodInfo).Pod)
- }
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- result = append(result, pInfo.Pod)
- }
- return result
- }
- func (p *PriorityQueue) Close() {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.closed = true
- p.cond.Broadcast()
- }
- func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
- p.lock.Lock()
- p.nominatedPods.delete(pod)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
- p.lock.Lock()
- p.nominatedPods.add(pod, nodeName)
- p.lock.Unlock()
- }
- func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
- bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
- return bo1.Before(bo2)
- }
- func (p *PriorityQueue) NumUnschedulablePods() int {
- p.lock.RLock()
- defer p.lock.RUnlock()
- return len(p.unschedulableQ.podInfoMap)
- }
- func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
- if p.clock == nil {
- return &framework.PodInfo{
- Pod: pod,
- }
- }
- return &framework.PodInfo{
- Pod: pod,
- Timestamp: p.clock.Now(),
- }
- }
- type UnschedulablePodsMap struct {
-
- podInfoMap map[string]*framework.PodInfo
- keyFunc func(*v1.Pod) string
- // metricRecorder updates the counter when elements of an unschedulablePodsMap
- // get added or removed, and it does nothing if it's nil
- metricRecorder metrics.MetricRecorder
- }
- // Add adds a pod to the unschedulable podInfoMap.
- func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
- podID := u.keyFunc(pInfo.Pod)
- if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
- u.metricRecorder.Inc()
- }
- u.podInfoMap[podID] = pInfo
- }
- func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
- podID := u.keyFunc(pod)
- if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
- u.metricRecorder.Dec()
- }
- delete(u.podInfoMap, podID)
- }
- func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
- podKey := u.keyFunc(pod)
- if pInfo, exists := u.podInfoMap[podKey]; exists {
- return pInfo
- }
- return nil
- }
- func (u *UnschedulablePodsMap) clear() {
- u.podInfoMap = make(map[string]*framework.PodInfo)
- if u.metricRecorder != nil {
- u.metricRecorder.Clear()
- }
- }
- func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
- return &UnschedulablePodsMap{
- podInfoMap: make(map[string]*framework.PodInfo),
- keyFunc: util.GetPodFullName,
- metricRecorder: metricRecorder,
- }
- }
- type nominatedPodMap struct {
-
-
-
- nominatedPods map[string][]*v1.Pod
-
-
- nominatedPodToNode map[ktypes.UID]string
- }
- func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
-
-
- npm.delete(p)
- nnn := nodeName
- if len(nnn) == 0 {
- nnn = NominatedNodeName(p)
- if len(nnn) == 0 {
- return
- }
- }
- npm.nominatedPodToNode[p.UID] = nnn
- for _, np := range npm.nominatedPods[nnn] {
- if np.UID == p.UID {
- klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
- return
- }
- }
- npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
- }
- func (npm *nominatedPodMap) delete(p *v1.Pod) {
- nnn, ok := npm.nominatedPodToNode[p.UID]
- if !ok {
- return
- }
- for i, np := range npm.nominatedPods[nnn] {
- if np.UID == p.UID {
- npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
- if len(npm.nominatedPods[nnn]) == 0 {
- delete(npm.nominatedPods, nnn)
- }
- break
- }
- }
- delete(npm.nominatedPodToNode, p.UID)
- }
- func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
-
-
-
- nodeName := ""
-
-
-
-
- if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
- if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
-
- nodeName = nnn
- }
- }
-
-
- npm.delete(oldPod)
- npm.add(newPod, nodeName)
- }
- func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
- if list, ok := npm.nominatedPods[nodeName]; ok {
- return list
- }
- return nil
- }
- func newNominatedPodMap() *nominatedPodMap {
- return &nominatedPodMap{
- nominatedPods: make(map[string][]*v1.Pod),
- nominatedPodToNode: make(map[ktypes.UID]string),
- }
- }
- func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
- return func() *v1.Pod {
- pod, err := queue.Pop()
- if err == nil {
- klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
- return pod
- }
- klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
- return nil
- }
- }
- func podInfoKeyFunc(obj interface{}) (string, error) {
- return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
- }
|