extender.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package core
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "k8s.io/api/core/v1"
  22. utilnet "k8s.io/apimachinery/pkg/util/net"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. restclient "k8s.io/client-go/rest"
  25. schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
  26. extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
  27. "k8s.io/kubernetes/pkg/scheduler/listers"
  28. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  29. )
  30. const (
  31. // DefaultExtenderTimeout defines the default extender timeout in second.
  32. DefaultExtenderTimeout = 5 * time.Second
  33. )
  34. // SchedulerExtender is an interface for external processes to influence scheduling
  35. // decisions made by Kubernetes. This is typically needed for resources not directly
  36. // managed by Kubernetes.
  37. type SchedulerExtender interface {
  38. // Name returns a unique name that identifies the extender.
  39. Name() string
  40. // Filter based on extender-implemented predicate functions. The filtered list is
  41. // expected to be a subset of the supplied list. failedNodesMap optionally contains
  42. // the list of failed nodes and failure reasons.
  43. Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
  44. // Prioritize based on extender-implemented priority functions. The returned scores & weight
  45. // are used to compute the weighted score for an extender. The weighted scores are added to
  46. // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
  47. Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
  48. // Bind delegates the action of binding a pod to a node to the extender.
  49. Bind(binding *v1.Binding) error
  50. // IsBinder returns whether this extender is configured for the Bind method.
  51. IsBinder() bool
  52. // IsInterested returns true if at least one extended resource requested by
  53. // this pod is managed by this extender.
  54. IsInterested(pod *v1.Pod) bool
  55. // ProcessPreemption returns nodes with their victim pods processed by extender based on
  56. // given:
  57. // 1. Pod to schedule
  58. // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
  59. // 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
  60. // The possible changes made by extender may include:
  61. // 1. Subset of given candidate nodes after preemption phase of extender.
  62. // 2. A different set of victim pod for every given candidate node after preemption phase of extender.
  63. ProcessPreemption(
  64. pod *v1.Pod,
  65. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  66. nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error)
  67. // SupportsPreemption returns if the scheduler extender support preemption or not.
  68. SupportsPreemption() bool
  69. // IsIgnorable returns true indicates scheduling should not fail when this extender
  70. // is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
  71. IsIgnorable() bool
  72. }
  73. // HTTPExtender implements the SchedulerExtender interface.
  74. type HTTPExtender struct {
  75. extenderURL string
  76. preemptVerb string
  77. filterVerb string
  78. prioritizeVerb string
  79. bindVerb string
  80. weight int64
  81. client *http.Client
  82. nodeCacheCapable bool
  83. managedResources sets.String
  84. ignorable bool
  85. }
  86. func makeTransport(config *schedulerapi.Extender) (http.RoundTripper, error) {
  87. var cfg restclient.Config
  88. if config.TLSConfig != nil {
  89. cfg.TLSClientConfig.Insecure = config.TLSConfig.Insecure
  90. cfg.TLSClientConfig.ServerName = config.TLSConfig.ServerName
  91. cfg.TLSClientConfig.CertFile = config.TLSConfig.CertFile
  92. cfg.TLSClientConfig.KeyFile = config.TLSConfig.KeyFile
  93. cfg.TLSClientConfig.CAFile = config.TLSConfig.CAFile
  94. cfg.TLSClientConfig.CertData = config.TLSConfig.CertData
  95. cfg.TLSClientConfig.KeyData = config.TLSConfig.KeyData
  96. cfg.TLSClientConfig.CAData = config.TLSConfig.CAData
  97. }
  98. if config.EnableHTTPS {
  99. hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
  100. if !hasCA {
  101. cfg.Insecure = true
  102. }
  103. }
  104. tlsConfig, err := restclient.TLSConfigFor(&cfg)
  105. if err != nil {
  106. return nil, err
  107. }
  108. if tlsConfig != nil {
  109. return utilnet.SetTransportDefaults(&http.Transport{
  110. TLSClientConfig: tlsConfig,
  111. }), nil
  112. }
  113. return utilnet.SetTransportDefaults(&http.Transport{}), nil
  114. }
  115. // NewHTTPExtender creates an HTTPExtender object.
  116. func NewHTTPExtender(config *schedulerapi.Extender) (SchedulerExtender, error) {
  117. if config.HTTPTimeout.Nanoseconds() == 0 {
  118. config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
  119. }
  120. transport, err := makeTransport(config)
  121. if err != nil {
  122. return nil, err
  123. }
  124. client := &http.Client{
  125. Transport: transport,
  126. Timeout: config.HTTPTimeout,
  127. }
  128. managedResources := sets.NewString()
  129. for _, r := range config.ManagedResources {
  130. managedResources.Insert(string(r.Name))
  131. }
  132. return &HTTPExtender{
  133. extenderURL: config.URLPrefix,
  134. preemptVerb: config.PreemptVerb,
  135. filterVerb: config.FilterVerb,
  136. prioritizeVerb: config.PrioritizeVerb,
  137. bindVerb: config.BindVerb,
  138. weight: config.Weight,
  139. client: client,
  140. nodeCacheCapable: config.NodeCacheCapable,
  141. managedResources: managedResources,
  142. ignorable: config.Ignorable,
  143. }, nil
  144. }
  145. // Equal is used to check if two extenders are equal
  146. // ignoring the client field, exported for testing
  147. func Equal(e1, e2 *HTTPExtender) bool {
  148. if e1.extenderURL != e2.extenderURL {
  149. return false
  150. }
  151. if e1.preemptVerb != e2.preemptVerb {
  152. return false
  153. }
  154. if e1.prioritizeVerb != e2.prioritizeVerb {
  155. return false
  156. }
  157. if e1.bindVerb != e2.bindVerb {
  158. return false
  159. }
  160. if e1.weight != e2.weight {
  161. return false
  162. }
  163. if e1.nodeCacheCapable != e2.nodeCacheCapable {
  164. return false
  165. }
  166. if !e1.managedResources.Equal(e2.managedResources) {
  167. return false
  168. }
  169. if e1.ignorable != e2.ignorable {
  170. return false
  171. }
  172. return true
  173. }
  174. // Name returns extenderURL to identify the extender.
  175. func (h *HTTPExtender) Name() string {
  176. return h.extenderURL
  177. }
  178. // IsIgnorable returns true indicates scheduling should not fail when this extender
  179. // is unavailable
  180. func (h *HTTPExtender) IsIgnorable() bool {
  181. return h.ignorable
  182. }
  183. // SupportsPreemption returns true if an extender supports preemption.
  184. // An extender should have preempt verb defined and enabled its own node cache.
  185. func (h *HTTPExtender) SupportsPreemption() bool {
  186. return len(h.preemptVerb) > 0
  187. }
  188. // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
  189. func (h *HTTPExtender) ProcessPreemption(
  190. pod *v1.Pod,
  191. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  192. nodeInfos listers.NodeInfoLister,
  193. ) (map[*v1.Node]*extenderv1.Victims, error) {
  194. var (
  195. result extenderv1.ExtenderPreemptionResult
  196. args *extenderv1.ExtenderPreemptionArgs
  197. )
  198. if !h.SupportsPreemption() {
  199. return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
  200. }
  201. if h.nodeCacheCapable {
  202. // If extender has cached node info, pass NodeNameToMetaVictims in args.
  203. nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
  204. args = &extenderv1.ExtenderPreemptionArgs{
  205. Pod: pod,
  206. NodeNameToMetaVictims: nodeNameToMetaVictims,
  207. }
  208. } else {
  209. nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
  210. args = &extenderv1.ExtenderPreemptionArgs{
  211. Pod: pod,
  212. NodeNameToVictims: nodeNameToVictims,
  213. }
  214. }
  215. if err := h.send(h.preemptVerb, args, &result); err != nil {
  216. return nil, err
  217. }
  218. // Extender will always return NodeNameToMetaVictims.
  219. // So let's convert it to NodeToVictims by using NodeNameToInfo.
  220. newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeInfos)
  221. if err != nil {
  222. return nil, err
  223. }
  224. // Do not override nodeToVictims
  225. return newNodeToVictims, nil
  226. }
  227. // convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
  228. // such as UIDs and names, to object pointers.
  229. func (h *HTTPExtender) convertToNodeToVictims(
  230. nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
  231. nodeInfos listers.NodeInfoLister,
  232. ) (map[*v1.Node]*extenderv1.Victims, error) {
  233. nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
  234. for nodeName, metaVictims := range nodeNameToMetaVictims {
  235. nodeInfo, err := nodeInfos.Get(nodeName)
  236. if err != nil {
  237. return nil, err
  238. }
  239. victims := &extenderv1.Victims{
  240. Pods: []*v1.Pod{},
  241. }
  242. for _, metaPod := range metaVictims.Pods {
  243. pod, err := h.convertPodUIDToPod(metaPod, nodeInfo)
  244. if err != nil {
  245. return nil, err
  246. }
  247. victims.Pods = append(victims.Pods, pod)
  248. }
  249. nodeToVictims[nodeInfo.Node()] = victims
  250. }
  251. return nodeToVictims, nil
  252. }
  253. // convertPodUIDToPod returns v1.Pod object for given MetaPod and node info.
  254. // The v1.Pod object is restored by nodeInfo.Pods().
  255. // It returns an error if there's cache inconsistency between default scheduler
  256. // and extender, i.e. when the pod is not found in nodeInfo.Pods.
  257. func (h *HTTPExtender) convertPodUIDToPod(
  258. metaPod *extenderv1.MetaPod,
  259. nodeInfo *schedulernodeinfo.NodeInfo) (*v1.Pod, error) {
  260. for _, pod := range nodeInfo.Pods() {
  261. if string(pod.UID) == metaPod.UID {
  262. return pod, nil
  263. }
  264. }
  265. return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
  266. h.extenderURL, metaPod, nodeInfo.Node().Name)
  267. }
  268. // convertToNodeNameToMetaVictims converts from struct type to meta types.
  269. func convertToNodeNameToMetaVictims(
  270. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  271. ) map[string]*extenderv1.MetaVictims {
  272. nodeNameToVictims := map[string]*extenderv1.MetaVictims{}
  273. for node, victims := range nodeToVictims {
  274. metaVictims := &extenderv1.MetaVictims{
  275. Pods: []*extenderv1.MetaPod{},
  276. }
  277. for _, pod := range victims.Pods {
  278. metaPod := &extenderv1.MetaPod{
  279. UID: string(pod.UID),
  280. }
  281. metaVictims.Pods = append(metaVictims.Pods, metaPod)
  282. }
  283. nodeNameToVictims[node.GetName()] = metaVictims
  284. }
  285. return nodeNameToVictims
  286. }
  287. // convertToNodeNameToVictims converts from node type to node name as key.
  288. func convertToNodeNameToVictims(
  289. nodeToVictims map[*v1.Node]*extenderv1.Victims,
  290. ) map[string]*extenderv1.Victims {
  291. nodeNameToVictims := map[string]*extenderv1.Victims{}
  292. for node, victims := range nodeToVictims {
  293. nodeNameToVictims[node.GetName()] = victims
  294. }
  295. return nodeNameToVictims
  296. }
  297. // Filter based on extender implemented predicate functions. The filtered list is
  298. // expected to be a subset of the supplied list; otherwise the function returns an error.
  299. // failedNodesMap optionally contains the list of failed nodes and failure reasons.
  300. func (h *HTTPExtender) Filter(
  301. pod *v1.Pod,
  302. nodes []*v1.Node,
  303. ) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
  304. var (
  305. result extenderv1.ExtenderFilterResult
  306. nodeList *v1.NodeList
  307. nodeNames *[]string
  308. nodeResult []*v1.Node
  309. args *extenderv1.ExtenderArgs
  310. )
  311. fromNodeName := make(map[string]*v1.Node)
  312. for _, n := range nodes {
  313. fromNodeName[n.Name] = n
  314. }
  315. if h.filterVerb == "" {
  316. return nodes, extenderv1.FailedNodesMap{}, nil
  317. }
  318. if h.nodeCacheCapable {
  319. nodeNameSlice := make([]string, 0, len(nodes))
  320. for _, node := range nodes {
  321. nodeNameSlice = append(nodeNameSlice, node.Name)
  322. }
  323. nodeNames = &nodeNameSlice
  324. } else {
  325. nodeList = &v1.NodeList{}
  326. for _, node := range nodes {
  327. nodeList.Items = append(nodeList.Items, *node)
  328. }
  329. }
  330. args = &extenderv1.ExtenderArgs{
  331. Pod: pod,
  332. Nodes: nodeList,
  333. NodeNames: nodeNames,
  334. }
  335. if err := h.send(h.filterVerb, args, &result); err != nil {
  336. return nil, nil, err
  337. }
  338. if result.Error != "" {
  339. return nil, nil, fmt.Errorf(result.Error)
  340. }
  341. if h.nodeCacheCapable && result.NodeNames != nil {
  342. nodeResult = make([]*v1.Node, len(*result.NodeNames))
  343. for i, nodeName := range *result.NodeNames {
  344. if n, ok := fromNodeName[nodeName]; ok {
  345. nodeResult[i] = n
  346. } else {
  347. return nil, nil, fmt.Errorf(
  348. "extender %q claims a filtered node %q which is not found in the input node list",
  349. h.extenderURL, nodeName)
  350. }
  351. }
  352. } else if result.Nodes != nil {
  353. nodeResult = make([]*v1.Node, len(result.Nodes.Items))
  354. for i := range result.Nodes.Items {
  355. nodeResult[i] = &result.Nodes.Items[i]
  356. }
  357. }
  358. return nodeResult, result.FailedNodes, nil
  359. }
  360. // Prioritize based on extender implemented priority functions. Weight*priority is added
  361. // up for each such priority function. The returned score is added to the score computed
  362. // by Kubernetes scheduler. The total score is used to do the host selection.
  363. func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) {
  364. var (
  365. result extenderv1.HostPriorityList
  366. nodeList *v1.NodeList
  367. nodeNames *[]string
  368. args *extenderv1.ExtenderArgs
  369. )
  370. if h.prioritizeVerb == "" {
  371. result := extenderv1.HostPriorityList{}
  372. for _, node := range nodes {
  373. result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0})
  374. }
  375. return &result, 0, nil
  376. }
  377. if h.nodeCacheCapable {
  378. nodeNameSlice := make([]string, 0, len(nodes))
  379. for _, node := range nodes {
  380. nodeNameSlice = append(nodeNameSlice, node.Name)
  381. }
  382. nodeNames = &nodeNameSlice
  383. } else {
  384. nodeList = &v1.NodeList{}
  385. for _, node := range nodes {
  386. nodeList.Items = append(nodeList.Items, *node)
  387. }
  388. }
  389. args = &extenderv1.ExtenderArgs{
  390. Pod: pod,
  391. Nodes: nodeList,
  392. NodeNames: nodeNames,
  393. }
  394. if err := h.send(h.prioritizeVerb, args, &result); err != nil {
  395. return nil, 0, err
  396. }
  397. return &result, h.weight, nil
  398. }
  399. // Bind delegates the action of binding a pod to a node to the extender.
  400. func (h *HTTPExtender) Bind(binding *v1.Binding) error {
  401. var result extenderv1.ExtenderBindingResult
  402. if !h.IsBinder() {
  403. // This shouldn't happen as this extender wouldn't have become a Binder.
  404. return fmt.Errorf("Unexpected empty bindVerb in extender")
  405. }
  406. req := &extenderv1.ExtenderBindingArgs{
  407. PodName: binding.Name,
  408. PodNamespace: binding.Namespace,
  409. PodUID: binding.UID,
  410. Node: binding.Target.Name,
  411. }
  412. if err := h.send(h.bindVerb, &req, &result); err != nil {
  413. return err
  414. }
  415. if result.Error != "" {
  416. return fmt.Errorf(result.Error)
  417. }
  418. return nil
  419. }
  420. // IsBinder returns whether this extender is configured for the Bind method.
  421. func (h *HTTPExtender) IsBinder() bool {
  422. return h.bindVerb != ""
  423. }
  424. // Helper function to send messages to the extender
  425. func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
  426. out, err := json.Marshal(args)
  427. if err != nil {
  428. return err
  429. }
  430. url := strings.TrimRight(h.extenderURL, "/") + "/" + action
  431. req, err := http.NewRequest("POST", url, bytes.NewReader(out))
  432. if err != nil {
  433. return err
  434. }
  435. req.Header.Set("Content-Type", "application/json")
  436. resp, err := h.client.Do(req)
  437. if err != nil {
  438. return err
  439. }
  440. defer resp.Body.Close()
  441. if resp.StatusCode != http.StatusOK {
  442. return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
  443. }
  444. return json.NewDecoder(resp.Body).Decode(result)
  445. }
  446. // IsInterested returns true if at least one extended resource requested by
  447. // this pod is managed by this extender.
  448. func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
  449. if h.managedResources.Len() == 0 {
  450. return true
  451. }
  452. if h.hasManagedResources(pod.Spec.Containers) {
  453. return true
  454. }
  455. if h.hasManagedResources(pod.Spec.InitContainers) {
  456. return true
  457. }
  458. return false
  459. }
  460. func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
  461. for i := range containers {
  462. container := &containers[i]
  463. for resourceName := range container.Resources.Requests {
  464. if h.managedResources.Has(string(resourceName)) {
  465. return true
  466. }
  467. }
  468. for resourceName := range container.Resources.Limits {
  469. if h.managedResources.Has(string(resourceName)) {
  470. return true
  471. }
  472. }
  473. }
  474. return false
  475. }