123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package scheduler
- // This file tests scheduler extender.
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "net/http/httptest"
- "strings"
- "testing"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- imageutils "k8s.io/kubernetes/test/utils/image"
- )
- const (
- filter = "filter"
- prioritize = "prioritize"
- bind = "bind"
- extendedResourceName = "foo.com/bar"
- )
- type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
- type priorityFunc func(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error)
- type priorityConfig struct {
- function priorityFunc
- weight int
- }
- type Extender struct {
- name string
- predicates []fitPredicate
- prioritizers []priorityConfig
- nodeCacheCapable bool
- Client clientset.Interface
- }
- func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) {
- decoder := json.NewDecoder(req.Body)
- defer req.Body.Close()
- encoder := json.NewEncoder(w)
- if strings.Contains(req.URL.Path, filter) || strings.Contains(req.URL.Path, prioritize) {
- var args schedulerapi.ExtenderArgs
- if err := decoder.Decode(&args); err != nil {
- http.Error(w, "Decode error", http.StatusBadRequest)
- return
- }
- if strings.Contains(req.URL.Path, filter) {
- resp := &schedulerapi.ExtenderFilterResult{}
- resp, err := e.Filter(&args)
- if err != nil {
- resp.Error = err.Error()
- }
- if err := encoder.Encode(resp); err != nil {
- t.Fatalf("Failed to encode %v", resp)
- }
- } else if strings.Contains(req.URL.Path, prioritize) {
- // Prioritize errors are ignored. Default k8s priorities or another extender's
- // priorities may be applied.
- priorities, _ := e.Prioritize(&args)
- if err := encoder.Encode(priorities); err != nil {
- t.Fatalf("Failed to encode %+v", priorities)
- }
- }
- } else if strings.Contains(req.URL.Path, bind) {
- var args schedulerapi.ExtenderBindingArgs
- if err := decoder.Decode(&args); err != nil {
- http.Error(w, "Decode error", http.StatusBadRequest)
- return
- }
- resp := &schedulerapi.ExtenderBindingResult{}
- if err := e.Bind(&args); err != nil {
- resp.Error = err.Error()
- }
- if err := encoder.Encode(resp); err != nil {
- t.Fatalf("Failed to encode %+v", resp)
- }
- } else {
- http.Error(w, "Unknown method", http.StatusNotFound)
- }
- }
- func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
- nodeSlice := make([]string, 0)
- failedNodesMap := schedulerapi.FailedNodesMap{}
- for _, nodeName := range *args.NodeNames {
- fits := true
- for _, predicate := range e.predicates {
- fit, err := predicate(args.Pod,
- &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
- if err != nil {
- return &schedulerapi.ExtenderFilterResult{
- Nodes: nil,
- NodeNames: nil,
- FailedNodes: schedulerapi.FailedNodesMap{},
- Error: err.Error(),
- }, err
- }
- if !fit {
- fits = false
- break
- }
- }
- if fits {
- nodeSlice = append(nodeSlice, nodeName)
- } else {
- failedNodesMap[nodeName] = fmt.Sprintf("extender failed: %s", e.name)
- }
- }
- return &schedulerapi.ExtenderFilterResult{
- Nodes: nil,
- NodeNames: &nodeSlice,
- FailedNodes: failedNodesMap,
- }, nil
- }
- func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
- filtered := []v1.Node{}
- failedNodesMap := schedulerapi.FailedNodesMap{}
- if e.nodeCacheCapable {
- return e.filterUsingNodeCache(args)
- }
- for _, node := range args.Nodes.Items {
- fits := true
- for _, predicate := range e.predicates {
- fit, err := predicate(args.Pod, &node)
- if err != nil {
- return &schedulerapi.ExtenderFilterResult{
- Nodes: &v1.NodeList{},
- NodeNames: nil,
- FailedNodes: schedulerapi.FailedNodesMap{},
- Error: err.Error(),
- }, err
- }
- if !fit {
- fits = false
- break
- }
- }
- if fits {
- filtered = append(filtered, node)
- } else {
- failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
- }
- }
- return &schedulerapi.ExtenderFilterResult{
- Nodes: &v1.NodeList{Items: filtered},
- NodeNames: nil,
- FailedNodes: failedNodesMap,
- }, nil
- }
- func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) {
- result := schedulerapi.HostPriorityList{}
- combinedScores := map[string]int{}
- var nodes = &v1.NodeList{Items: []v1.Node{}}
- if e.nodeCacheCapable {
- for _, nodeName := range *args.NodeNames {
- nodes.Items = append(nodes.Items, v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
- }
- } else {
- nodes = args.Nodes
- }
- for _, prioritizer := range e.prioritizers {
- weight := prioritizer.weight
- if weight == 0 {
- continue
- }
- priorityFunc := prioritizer.function
- prioritizedList, err := priorityFunc(args.Pod, nodes)
- if err != nil {
- return &schedulerapi.HostPriorityList{}, err
- }
- for _, hostEntry := range *prioritizedList {
- combinedScores[hostEntry.Host] += hostEntry.Score * weight
- }
- }
- for host, score := range combinedScores {
- result = append(result, schedulerapi.HostPriority{Host: host, Score: score})
- }
- return &result, nil
- }
- func (e *Extender) Bind(binding *schedulerapi.ExtenderBindingArgs) error {
- b := &v1.Binding{
- ObjectMeta: metav1.ObjectMeta{Namespace: binding.PodNamespace, Name: binding.PodName, UID: binding.PodUID},
- Target: v1.ObjectReference{
- Kind: "Node",
- Name: binding.Node,
- },
- }
- return e.Client.CoreV1().Pods(b.Namespace).Bind(b)
- }
- func machine1_2_3Predicate(pod *v1.Pod, node *v1.Node) (bool, error) {
- if node.Name == "machine1" || node.Name == "machine2" || node.Name == "machine3" {
- return true, nil
- }
- return false, nil
- }
- func machine2_3_5Predicate(pod *v1.Pod, node *v1.Node) (bool, error) {
- if node.Name == "machine2" || node.Name == "machine3" || node.Name == "machine5" {
- return true, nil
- }
- return false, nil
- }
- func machine2Prioritizer(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error) {
- result := schedulerapi.HostPriorityList{}
- for _, node := range nodes.Items {
- score := 1
- if node.Name == "machine2" {
- score = 10
- }
- result = append(result, schedulerapi.HostPriority{
- Host: node.Name,
- Score: score,
- })
- }
- return &result, nil
- }
- func machine3Prioritizer(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error) {
- result := schedulerapi.HostPriorityList{}
- for _, node := range nodes.Items {
- score := 1
- if node.Name == "machine3" {
- score = 10
- }
- result = append(result, schedulerapi.HostPriority{
- Host: node.Name,
- Score: score,
- })
- }
- return &result, nil
- }
- func TestSchedulerExtender(t *testing.T) {
- context := initTestMaster(t, "scheduler-extender", nil)
- clientSet := context.clientSet
- extender1 := &Extender{
- name: "extender1",
- predicates: []fitPredicate{machine1_2_3Predicate},
- prioritizers: []priorityConfig{{machine2Prioritizer, 1}},
- }
- es1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- extender1.serveHTTP(t, w, req)
- }))
- defer es1.Close()
- extender2 := &Extender{
- name: "extender2",
- predicates: []fitPredicate{machine2_3_5Predicate},
- prioritizers: []priorityConfig{{machine3Prioritizer, 1}},
- Client: clientSet,
- }
- es2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- extender2.serveHTTP(t, w, req)
- }))
- defer es2.Close()
- extender3 := &Extender{
- name: "extender3",
- predicates: []fitPredicate{machine1_2_3Predicate},
- prioritizers: []priorityConfig{{machine2Prioritizer, 5}},
- nodeCacheCapable: true,
- }
- es3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- extender3.serveHTTP(t, w, req)
- }))
- defer es3.Close()
- policy := schedulerapi.Policy{
- ExtenderConfigs: []schedulerapi.ExtenderConfig{
- {
- URLPrefix: es1.URL,
- FilterVerb: filter,
- PrioritizeVerb: prioritize,
- Weight: 3,
- EnableHTTPS: false,
- },
- {
- URLPrefix: es2.URL,
- FilterVerb: filter,
- PrioritizeVerb: prioritize,
- BindVerb: bind,
- Weight: 4,
- EnableHTTPS: false,
- ManagedResources: []schedulerapi.ExtenderManagedResource{
- {
- Name: extendedResourceName,
- IgnoredByScheduler: true,
- },
- },
- },
- {
- URLPrefix: es3.URL,
- FilterVerb: filter,
- PrioritizeVerb: prioritize,
- Weight: 10,
- EnableHTTPS: false,
- NodeCacheCapable: true,
- },
- },
- }
- policy.APIVersion = "v1"
- context = initTestScheduler(t, context, false, &policy)
- defer cleanupTest(t, context)
- DoTestPodScheduling(context.ns, t, clientSet)
- }
- func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) {
- // NOTE: This test cannot run in parallel, because it is creating and deleting
- // non-namespaced objects (Nodes).
- defer cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
- goodCondition := v1.NodeCondition{
- Type: v1.NodeReady,
- Status: v1.ConditionTrue,
- Reason: fmt.Sprintf("schedulable condition"),
- LastHeartbeatTime: metav1.Time{Time: time.Now()},
- }
- node := &v1.Node{
- Spec: v1.NodeSpec{Unschedulable: false},
- Status: v1.NodeStatus{
- Capacity: v1.ResourceList{
- v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
- },
- Conditions: []v1.NodeCondition{goodCondition},
- },
- }
- for ii := 0; ii < 5; ii++ {
- node.Name = fmt.Sprintf("machine%d", ii+1)
- if _, err := cs.CoreV1().Nodes().Create(node); err != nil {
- t.Fatalf("Failed to create nodes: %v", err)
- }
- }
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{Name: "extender-test-pod"},
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "container",
- Image: imageutils.GetPauseImageName(),
- Resources: v1.ResourceRequirements{
- Limits: v1.ResourceList{
- extendedResourceName: *resource.NewQuantity(1, resource.DecimalSI),
- },
- },
- },
- },
- },
- }
- myPod, err := cs.CoreV1().Pods(ns.Name).Create(pod)
- if err != nil {
- t.Fatalf("Failed to create pod: %v", err)
- }
- err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(cs, myPod.Namespace, myPod.Name))
- if err != nil {
- t.Fatalf("Failed to schedule pod: %v", err)
- }
- myPod, err = cs.CoreV1().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{})
- if err != nil {
- t.Fatalf("Failed to get pod: %v", err)
- } else if myPod.Spec.NodeName != "machine2" {
- t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName)
- }
- var gracePeriod int64
- if err := cs.CoreV1().Pods(ns.Name).Delete(myPod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil {
- t.Fatalf("Failed to delete pod: %v", err)
- }
- _, err = cs.CoreV1().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{})
- if err == nil {
- t.Fatalf("Failed to delete pod: %v", err)
- }
- t.Logf("Scheduled pod using extenders")
- }
|