123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package core
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "net/http"
- "strings"
- "time"
- v1 "k8s.io/api/core/v1"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- "k8s.io/apimachinery/pkg/util/sets"
- restclient "k8s.io/client-go/rest"
- "k8s.io/kubernetes/pkg/scheduler/algorithm"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- )
- const (
- // DefaultExtenderTimeout defines the default extender timeout in second.
- DefaultExtenderTimeout = 5 * time.Second
- )
- // HTTPExtender implements the algorithm.SchedulerExtender interface.
- type HTTPExtender struct {
- extenderURL string
- preemptVerb string
- filterVerb string
- prioritizeVerb string
- bindVerb string
- weight int
- client *http.Client
- nodeCacheCapable bool
- managedResources sets.String
- ignorable bool
- }
- func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
- var cfg restclient.Config
- if config.TLSConfig != nil {
- cfg.TLSClientConfig.Insecure = config.TLSConfig.Insecure
- cfg.TLSClientConfig.ServerName = config.TLSConfig.ServerName
- cfg.TLSClientConfig.CertFile = config.TLSConfig.CertFile
- cfg.TLSClientConfig.KeyFile = config.TLSConfig.KeyFile
- cfg.TLSClientConfig.CAFile = config.TLSConfig.CAFile
- cfg.TLSClientConfig.CertData = config.TLSConfig.CertData
- cfg.TLSClientConfig.KeyData = config.TLSConfig.KeyData
- cfg.TLSClientConfig.CAData = config.TLSConfig.CAData
- }
- if config.EnableHTTPS {
- hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
- if !hasCA {
- cfg.Insecure = true
- }
- }
- tlsConfig, err := restclient.TLSConfigFor(&cfg)
- if err != nil {
- return nil, err
- }
- if tlsConfig != nil {
- return utilnet.SetTransportDefaults(&http.Transport{
- TLSClientConfig: tlsConfig,
- }), nil
- }
- return utilnet.SetTransportDefaults(&http.Transport{}), nil
- }
- // NewHTTPExtender creates an HTTPExtender object.
- func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
- if config.HTTPTimeout.Nanoseconds() == 0 {
- config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
- }
- transport, err := makeTransport(config)
- if err != nil {
- return nil, err
- }
- client := &http.Client{
- Transport: transport,
- Timeout: config.HTTPTimeout,
- }
- managedResources := sets.NewString()
- for _, r := range config.ManagedResources {
- managedResources.Insert(string(r.Name))
- }
- return &HTTPExtender{
- extenderURL: config.URLPrefix,
- preemptVerb: config.PreemptVerb,
- filterVerb: config.FilterVerb,
- prioritizeVerb: config.PrioritizeVerb,
- bindVerb: config.BindVerb,
- weight: config.Weight,
- client: client,
- nodeCacheCapable: config.NodeCacheCapable,
- managedResources: managedResources,
- ignorable: config.Ignorable,
- }, nil
- }
- // Name returns extenderURL to identify the extender.
- func (h *HTTPExtender) Name() string {
- return h.extenderURL
- }
- // IsIgnorable returns true indicates scheduling should not fail when this extender
- // is unavailable
- func (h *HTTPExtender) IsIgnorable() bool {
- return h.ignorable
- }
- // SupportsPreemption returns true if an extender supports preemption.
- // An extender should have preempt verb defined and enabled its own node cache.
- func (h *HTTPExtender) SupportsPreemption() bool {
- return len(h.preemptVerb) > 0
- }
- // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
- func (h *HTTPExtender) ProcessPreemption(
- pod *v1.Pod,
- nodeToVictims map[*v1.Node]*schedulerapi.Victims,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- ) (map[*v1.Node]*schedulerapi.Victims, error) {
- var (
- result schedulerapi.ExtenderPreemptionResult
- args *schedulerapi.ExtenderPreemptionArgs
- )
- if !h.SupportsPreemption() {
- return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
- }
- if h.nodeCacheCapable {
- // If extender has cached node info, pass NodeNameToMetaVictims in args.
- nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
- args = &schedulerapi.ExtenderPreemptionArgs{
- Pod: pod,
- NodeNameToMetaVictims: nodeNameToMetaVictims,
- }
- } else {
- nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
- args = &schedulerapi.ExtenderPreemptionArgs{
- Pod: pod,
- NodeNameToVictims: nodeNameToVictims,
- }
- }
- if err := h.send(h.preemptVerb, args, &result); err != nil {
- return nil, err
- }
- // Extender will always return NodeNameToMetaVictims.
- // So let's convert it to NodeToVictims by using NodeNameToInfo.
- newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
- if err != nil {
- return nil, err
- }
- // Do not override nodeToVictims
- return newNodeToVictims, nil
- }
- // convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
- // such as UIDs and names, to object pointers.
- func (h *HTTPExtender) convertToNodeToVictims(
- nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- ) (map[*v1.Node]*schedulerapi.Victims, error) {
- nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
- for nodeName, metaVictims := range nodeNameToMetaVictims {
- victims := &schedulerapi.Victims{
- Pods: []*v1.Pod{},
- }
- for _, metaPod := range metaVictims.Pods {
- pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo)
- if err != nil {
- return nil, err
- }
- victims.Pods = append(victims.Pods, pod)
- }
- nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims
- }
- return nodeToVictims, nil
- }
- // convertPodUIDToPod returns v1.Pod object for given MetaPod and node name.
- // The v1.Pod object is restored by nodeInfo.Pods().
- // It should return error if there's cache inconsistency between default scheduler and extender
- // so that this pod or node is missing from nodeNameToInfo.
- func (h *HTTPExtender) convertPodUIDToPod(
- metaPod *schedulerapi.MetaPod,
- nodeName string,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) (*v1.Pod, error) {
- var nodeInfo *schedulernodeinfo.NodeInfo
- if nodeInfo, ok := nodeNameToInfo[nodeName]; ok {
- for _, pod := range nodeInfo.Pods() {
- if string(pod.UID) == metaPod.UID {
- return pod, nil
- }
- }
- return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
- h.extenderURL, metaPod, nodeInfo.Node().Name)
- }
- return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map",
- h.extenderURL, nodeInfo.Node().Name)
- }
- // convertToNodeNameToMetaVictims converts from struct type to meta types.
- func convertToNodeNameToMetaVictims(
- nodeToVictims map[*v1.Node]*schedulerapi.Victims,
- ) map[string]*schedulerapi.MetaVictims {
- nodeNameToVictims := map[string]*schedulerapi.MetaVictims{}
- for node, victims := range nodeToVictims {
- metaVictims := &schedulerapi.MetaVictims{
- Pods: []*schedulerapi.MetaPod{},
- }
- for _, pod := range victims.Pods {
- metaPod := &schedulerapi.MetaPod{
- UID: string(pod.UID),
- }
- metaVictims.Pods = append(metaVictims.Pods, metaPod)
- }
- nodeNameToVictims[node.GetName()] = metaVictims
- }
- return nodeNameToVictims
- }
- // convertToNodeNameToVictims converts from node type to node name as key.
- func convertToNodeNameToVictims(
- nodeToVictims map[*v1.Node]*schedulerapi.Victims,
- ) map[string]*schedulerapi.Victims {
- nodeNameToVictims := map[string]*schedulerapi.Victims{}
- for node, victims := range nodeToVictims {
- nodeNameToVictims[node.GetName()] = victims
- }
- return nodeNameToVictims
- }
- // Filter based on extender implemented predicate functions. The filtered list is
- // expected to be a subset of the supplied list. failedNodesMap optionally contains
- // the list of failed nodes and failure reasons.
- func (h *HTTPExtender) Filter(
- pod *v1.Pod,
- nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- ) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
- var (
- result schedulerapi.ExtenderFilterResult
- nodeList *v1.NodeList
- nodeNames *[]string
- nodeResult []*v1.Node
- args *schedulerapi.ExtenderArgs
- )
- if h.filterVerb == "" {
- return nodes, schedulerapi.FailedNodesMap{}, nil
- }
- if h.nodeCacheCapable {
- nodeNameSlice := make([]string, 0, len(nodes))
- for _, node := range nodes {
- nodeNameSlice = append(nodeNameSlice, node.Name)
- }
- nodeNames = &nodeNameSlice
- } else {
- nodeList = &v1.NodeList{}
- for _, node := range nodes {
- nodeList.Items = append(nodeList.Items, *node)
- }
- }
- args = &schedulerapi.ExtenderArgs{
- Pod: pod,
- Nodes: nodeList,
- NodeNames: nodeNames,
- }
- if err := h.send(h.filterVerb, args, &result); err != nil {
- return nil, nil, err
- }
- if result.Error != "" {
- return nil, nil, fmt.Errorf(result.Error)
- }
- if h.nodeCacheCapable && result.NodeNames != nil {
- nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
- for i := range *result.NodeNames {
- nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
- }
- } else if result.Nodes != nil {
- nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
- for i := range result.Nodes.Items {
- nodeResult = append(nodeResult, &result.Nodes.Items[i])
- }
- }
- return nodeResult, result.FailedNodes, nil
- }
- // Prioritize based on extender implemented priority functions. Weight*priority is added
- // up for each such priority function. The returned score is added to the score computed
- // by Kubernetes scheduler. The total score is used to do the host selection.
- func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) {
- var (
- result schedulerapi.HostPriorityList
- nodeList *v1.NodeList
- nodeNames *[]string
- args *schedulerapi.ExtenderArgs
- )
- if h.prioritizeVerb == "" {
- result := schedulerapi.HostPriorityList{}
- for _, node := range nodes {
- result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
- }
- return &result, 0, nil
- }
- if h.nodeCacheCapable {
- nodeNameSlice := make([]string, 0, len(nodes))
- for _, node := range nodes {
- nodeNameSlice = append(nodeNameSlice, node.Name)
- }
- nodeNames = &nodeNameSlice
- } else {
- nodeList = &v1.NodeList{}
- for _, node := range nodes {
- nodeList.Items = append(nodeList.Items, *node)
- }
- }
- args = &schedulerapi.ExtenderArgs{
- Pod: pod,
- Nodes: nodeList,
- NodeNames: nodeNames,
- }
- if err := h.send(h.prioritizeVerb, args, &result); err != nil {
- return nil, 0, err
- }
- return &result, h.weight, nil
- }
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // ---------START OF CUSTOMIZATION------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- func (h *HTTPExtender) CustomPrioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.CustomHostPriorityList, int, error) {
- var (
- result schedulerapi.CustomHostPriorityList
- nodeList *v1.NodeList
- nodeNames *[]string
- args *schedulerapi.ExtenderArgs
- )
- if h.prioritizeVerb == "" {
- result := schedulerapi.CustomHostPriorityList{}
- for _, node := range nodes {
- result = append(result, schedulerapi.CustomHostPriority{Host: node.Name, Score: 0})
- }
- return &result, 0, nil
- }
- if h.nodeCacheCapable {
- nodeNameSlice := make([]string, 0, len(nodes))
- for _, node := range nodes {
- nodeNameSlice = append(nodeNameSlice, node.Name)
- }
- nodeNames = &nodeNameSlice
- } else {
- nodeList = &v1.NodeList{}
- for _, node := range nodes {
- nodeList.Items = append(nodeList.Items, *node)
- }
- }
- args = &schedulerapi.ExtenderArgs{
- Pod: pod,
- Nodes: nodeList,
- NodeNames: nodeNames,
- }
- if err := h.send(h.prioritizeVerb, args, &result); err != nil {
- return nil, 0, err
- }
- return &result, h.weight, nil
- }
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // ---------END OF CUSTOMIZATION------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // Bind delegates the action of binding a pod to a node to the extender.
- func (h *HTTPExtender) Bind(binding *v1.Binding) error {
- var result schedulerapi.ExtenderBindingResult
- if !h.IsBinder() {
- // This shouldn't happen as this extender wouldn't have become a Binder.
- return fmt.Errorf("Unexpected empty bindVerb in extender")
- }
- req := &schedulerapi.ExtenderBindingArgs{
- PodName: binding.Name,
- PodNamespace: binding.Namespace,
- PodUID: binding.UID,
- Node: binding.Target.Name,
- }
- if err := h.send(h.bindVerb, &req, &result); err != nil {
- return err
- }
- if result.Error != "" {
- return fmt.Errorf(result.Error)
- }
- return nil
- }
- // IsBinder returns whether this extender is configured for the Bind method.
- func (h *HTTPExtender) IsBinder() bool {
- return h.bindVerb != ""
- }
- // Helper function to send messages to the extender
- func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
- out, err := json.Marshal(args)
- if err != nil {
- return err
- }
- url := strings.TrimRight(h.extenderURL, "/") + "/" + action
- req, err := http.NewRequest("POST", url, bytes.NewReader(out))
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- resp, err := h.client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
- }
- return json.NewDecoder(resp.Body).Decode(result)
- }
- // IsInterested returns true if at least one extended resource requested by
- // this pod is managed by this extender.
- func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
- if h.managedResources.Len() == 0 {
- return true
- }
- if h.hasManagedResources(pod.Spec.Containers) {
- return true
- }
- if h.hasManagedResources(pod.Spec.InitContainers) {
- return true
- }
- return false
- }
- func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
- for i := range containers {
- container := &containers[i]
- for resourceName := range container.Resources.Requests {
- if h.managedResources.Has(string(resourceName)) {
- return true
- }
- }
- for resourceName := range container.Resources.Limits {
- if h.managedResources.Has(string(resourceName)) {
- return true
- }
- }
- }
- return false
- }
|