extender.go 15 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package core
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "strings"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. utilnet "k8s.io/apimachinery/pkg/util/net"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. restclient "k8s.io/client-go/rest"
  25. "k8s.io/kubernetes/pkg/scheduler/algorithm"
  26. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  27. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  28. )
  29. const (
  30. // DefaultExtenderTimeout defines the default extender timeout in second.
  31. DefaultExtenderTimeout = 5 * time.Second
  32. )
  33. // HTTPExtender implements the algorithm.SchedulerExtender interface.
  34. type HTTPExtender struct {
  35. extenderURL string
  36. preemptVerb string
  37. filterVerb string
  38. prioritizeVerb string
  39. bindVerb string
  40. weight int
  41. client *http.Client
  42. nodeCacheCapable bool
  43. managedResources sets.String
  44. ignorable bool
  45. }
  46. func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
  47. var cfg restclient.Config
  48. if config.TLSConfig != nil {
  49. cfg.TLSClientConfig.Insecure = config.TLSConfig.Insecure
  50. cfg.TLSClientConfig.ServerName = config.TLSConfig.ServerName
  51. cfg.TLSClientConfig.CertFile = config.TLSConfig.CertFile
  52. cfg.TLSClientConfig.KeyFile = config.TLSConfig.KeyFile
  53. cfg.TLSClientConfig.CAFile = config.TLSConfig.CAFile
  54. cfg.TLSClientConfig.CertData = config.TLSConfig.CertData
  55. cfg.TLSClientConfig.KeyData = config.TLSConfig.KeyData
  56. cfg.TLSClientConfig.CAData = config.TLSConfig.CAData
  57. }
  58. if config.EnableHTTPS {
  59. hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
  60. if !hasCA {
  61. cfg.Insecure = true
  62. }
  63. }
  64. tlsConfig, err := restclient.TLSConfigFor(&cfg)
  65. if err != nil {
  66. return nil, err
  67. }
  68. if tlsConfig != nil {
  69. return utilnet.SetTransportDefaults(&http.Transport{
  70. TLSClientConfig: tlsConfig,
  71. }), nil
  72. }
  73. return utilnet.SetTransportDefaults(&http.Transport{}), nil
  74. }
  75. // NewHTTPExtender creates an HTTPExtender object.
  76. func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
  77. if config.HTTPTimeout.Nanoseconds() == 0 {
  78. config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
  79. }
  80. transport, err := makeTransport(config)
  81. if err != nil {
  82. return nil, err
  83. }
  84. client := &http.Client{
  85. Transport: transport,
  86. Timeout: config.HTTPTimeout,
  87. }
  88. managedResources := sets.NewString()
  89. for _, r := range config.ManagedResources {
  90. managedResources.Insert(string(r.Name))
  91. }
  92. return &HTTPExtender{
  93. extenderURL: config.URLPrefix,
  94. preemptVerb: config.PreemptVerb,
  95. filterVerb: config.FilterVerb,
  96. prioritizeVerb: config.PrioritizeVerb,
  97. bindVerb: config.BindVerb,
  98. weight: config.Weight,
  99. client: client,
  100. nodeCacheCapable: config.NodeCacheCapable,
  101. managedResources: managedResources,
  102. ignorable: config.Ignorable,
  103. }, nil
  104. }
  105. // Name returns extenderURL to identify the extender.
  106. func (h *HTTPExtender) Name() string {
  107. return h.extenderURL
  108. }
  109. // IsIgnorable returns true indicates scheduling should not fail when this extender
  110. // is unavailable
  111. func (h *HTTPExtender) IsIgnorable() bool {
  112. return h.ignorable
  113. }
  114. // SupportsPreemption returns true if an extender supports preemption.
  115. // An extender should have preempt verb defined and enabled its own node cache.
  116. func (h *HTTPExtender) SupportsPreemption() bool {
  117. return len(h.preemptVerb) > 0
  118. }
  119. // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
  120. func (h *HTTPExtender) ProcessPreemption(
  121. pod *v1.Pod,
  122. nodeToVictims map[*v1.Node]*schedulerapi.Victims,
  123. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  124. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  125. var (
  126. result schedulerapi.ExtenderPreemptionResult
  127. args *schedulerapi.ExtenderPreemptionArgs
  128. )
  129. if !h.SupportsPreemption() {
  130. return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
  131. }
  132. if h.nodeCacheCapable {
  133. // If extender has cached node info, pass NodeNameToMetaVictims in args.
  134. nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
  135. args = &schedulerapi.ExtenderPreemptionArgs{
  136. Pod: pod,
  137. NodeNameToMetaVictims: nodeNameToMetaVictims,
  138. }
  139. } else {
  140. nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
  141. args = &schedulerapi.ExtenderPreemptionArgs{
  142. Pod: pod,
  143. NodeNameToVictims: nodeNameToVictims,
  144. }
  145. }
  146. if err := h.send(h.preemptVerb, args, &result); err != nil {
  147. return nil, err
  148. }
  149. // Extender will always return NodeNameToMetaVictims.
  150. // So let's convert it to NodeToVictims by using NodeNameToInfo.
  151. newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
  152. if err != nil {
  153. return nil, err
  154. }
  155. // Do not override nodeToVictims
  156. return newNodeToVictims, nil
  157. }
  158. // convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
  159. // such as UIDs and names, to object pointers.
  160. func (h *HTTPExtender) convertToNodeToVictims(
  161. nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims,
  162. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  163. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  164. nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
  165. for nodeName, metaVictims := range nodeNameToMetaVictims {
  166. victims := &schedulerapi.Victims{
  167. Pods: []*v1.Pod{},
  168. }
  169. for _, metaPod := range metaVictims.Pods {
  170. pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo)
  171. if err != nil {
  172. return nil, err
  173. }
  174. victims.Pods = append(victims.Pods, pod)
  175. }
  176. nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims
  177. }
  178. return nodeToVictims, nil
  179. }
  180. // convertPodUIDToPod returns v1.Pod object for given MetaPod and node name.
  181. // The v1.Pod object is restored by nodeInfo.Pods().
  182. // It should return error if there's cache inconsistency between default scheduler and extender
  183. // so that this pod or node is missing from nodeNameToInfo.
  184. func (h *HTTPExtender) convertPodUIDToPod(
  185. metaPod *schedulerapi.MetaPod,
  186. nodeName string,
  187. nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) (*v1.Pod, error) {
  188. var nodeInfo *schedulernodeinfo.NodeInfo
  189. if nodeInfo, ok := nodeNameToInfo[nodeName]; ok {
  190. for _, pod := range nodeInfo.Pods() {
  191. if string(pod.UID) == metaPod.UID {
  192. return pod, nil
  193. }
  194. }
  195. return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
  196. h.extenderURL, metaPod, nodeInfo.Node().Name)
  197. }
  198. return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map",
  199. h.extenderURL, nodeInfo.Node().Name)
  200. }
  201. // convertToNodeNameToMetaVictims converts from struct type to meta types.
  202. func convertToNodeNameToMetaVictims(
  203. nodeToVictims map[*v1.Node]*schedulerapi.Victims,
  204. ) map[string]*schedulerapi.MetaVictims {
  205. nodeNameToVictims := map[string]*schedulerapi.MetaVictims{}
  206. for node, victims := range nodeToVictims {
  207. metaVictims := &schedulerapi.MetaVictims{
  208. Pods: []*schedulerapi.MetaPod{},
  209. }
  210. for _, pod := range victims.Pods {
  211. metaPod := &schedulerapi.MetaPod{
  212. UID: string(pod.UID),
  213. }
  214. metaVictims.Pods = append(metaVictims.Pods, metaPod)
  215. }
  216. nodeNameToVictims[node.GetName()] = metaVictims
  217. }
  218. return nodeNameToVictims
  219. }
  220. // convertToNodeNameToVictims converts from node type to node name as key.
  221. func convertToNodeNameToVictims(
  222. nodeToVictims map[*v1.Node]*schedulerapi.Victims,
  223. ) map[string]*schedulerapi.Victims {
  224. nodeNameToVictims := map[string]*schedulerapi.Victims{}
  225. for node, victims := range nodeToVictims {
  226. nodeNameToVictims[node.GetName()] = victims
  227. }
  228. return nodeNameToVictims
  229. }
  230. // Filter based on extender implemented predicate functions. The filtered list is
  231. // expected to be a subset of the supplied list. failedNodesMap optionally contains
  232. // the list of failed nodes and failure reasons.
  233. func (h *HTTPExtender) Filter(
  234. pod *v1.Pod,
  235. nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
  236. ) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
  237. var (
  238. result schedulerapi.ExtenderFilterResult
  239. nodeList *v1.NodeList
  240. nodeNames *[]string
  241. nodeResult []*v1.Node
  242. args *schedulerapi.ExtenderArgs
  243. )
  244. if h.filterVerb == "" {
  245. return nodes, schedulerapi.FailedNodesMap{}, nil
  246. }
  247. if h.nodeCacheCapable {
  248. nodeNameSlice := make([]string, 0, len(nodes))
  249. for _, node := range nodes {
  250. nodeNameSlice = append(nodeNameSlice, node.Name)
  251. }
  252. nodeNames = &nodeNameSlice
  253. } else {
  254. nodeList = &v1.NodeList{}
  255. for _, node := range nodes {
  256. nodeList.Items = append(nodeList.Items, *node)
  257. }
  258. }
  259. args = &schedulerapi.ExtenderArgs{
  260. Pod: pod,
  261. Nodes: nodeList,
  262. NodeNames: nodeNames,
  263. }
  264. if err := h.send(h.filterVerb, args, &result); err != nil {
  265. return nil, nil, err
  266. }
  267. if result.Error != "" {
  268. return nil, nil, fmt.Errorf(result.Error)
  269. }
  270. if h.nodeCacheCapable && result.NodeNames != nil {
  271. nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
  272. for i := range *result.NodeNames {
  273. nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
  274. }
  275. } else if result.Nodes != nil {
  276. nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
  277. for i := range result.Nodes.Items {
  278. nodeResult = append(nodeResult, &result.Nodes.Items[i])
  279. }
  280. }
  281. return nodeResult, result.FailedNodes, nil
  282. }
  283. // Prioritize based on extender implemented priority functions. Weight*priority is added
  284. // up for each such priority function. The returned score is added to the score computed
  285. // by Kubernetes scheduler. The total score is used to do the host selection.
  286. func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) {
  287. var (
  288. result schedulerapi.HostPriorityList
  289. nodeList *v1.NodeList
  290. nodeNames *[]string
  291. args *schedulerapi.ExtenderArgs
  292. )
  293. if h.prioritizeVerb == "" {
  294. result := schedulerapi.HostPriorityList{}
  295. for _, node := range nodes {
  296. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
  297. }
  298. return &result, 0, nil
  299. }
  300. if h.nodeCacheCapable {
  301. nodeNameSlice := make([]string, 0, len(nodes))
  302. for _, node := range nodes {
  303. nodeNameSlice = append(nodeNameSlice, node.Name)
  304. }
  305. nodeNames = &nodeNameSlice
  306. } else {
  307. nodeList = &v1.NodeList{}
  308. for _, node := range nodes {
  309. nodeList.Items = append(nodeList.Items, *node)
  310. }
  311. }
  312. args = &schedulerapi.ExtenderArgs{
  313. Pod: pod,
  314. Nodes: nodeList,
  315. NodeNames: nodeNames,
  316. }
  317. if err := h.send(h.prioritizeVerb, args, &result); err != nil {
  318. return nil, 0, err
  319. }
  320. return &result, h.weight, nil
  321. }
  322. //------------------------------------------------------------------------------------------------
  323. //------------------------------------------------------------------------------------------------
  324. // ---------START OF CUSTOMIZATION------------------------------------------------------------------
  325. //------------------------------------------------------------------------------------------------
  326. //------------------------------------------------------------------------------------------------
  327. func (h *HTTPExtender) CustomPrioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.CustomHostPriorityList, int, error) {
  328. var (
  329. result schedulerapi.CustomHostPriorityList
  330. nodeList *v1.NodeList
  331. nodeNames *[]string
  332. args *schedulerapi.ExtenderArgs
  333. )
  334. if h.prioritizeVerb == "" {
  335. result := schedulerapi.CustomHostPriorityList{}
  336. for _, node := range nodes {
  337. result = append(result, schedulerapi.CustomHostPriority{Host: node.Name, Score: 0})
  338. }
  339. return &result, 0, nil
  340. }
  341. if h.nodeCacheCapable {
  342. nodeNameSlice := make([]string, 0, len(nodes))
  343. for _, node := range nodes {
  344. nodeNameSlice = append(nodeNameSlice, node.Name)
  345. }
  346. nodeNames = &nodeNameSlice
  347. } else {
  348. nodeList = &v1.NodeList{}
  349. for _, node := range nodes {
  350. nodeList.Items = append(nodeList.Items, *node)
  351. }
  352. }
  353. args = &schedulerapi.ExtenderArgs{
  354. Pod: pod,
  355. Nodes: nodeList,
  356. NodeNames: nodeNames,
  357. }
  358. if err := h.send(h.prioritizeVerb, args, &result); err != nil {
  359. return nil, 0, err
  360. }
  361. return &result, h.weight, nil
  362. }
  363. //------------------------------------------------------------------------------------------------
  364. //------------------------------------------------------------------------------------------------
  365. // ---------END OF CUSTOMIZATION------------------------------------------------------------------
  366. //------------------------------------------------------------------------------------------------
  367. //------------------------------------------------------------------------------------------------
  368. // Bind delegates the action of binding a pod to a node to the extender.
  369. func (h *HTTPExtender) Bind(binding *v1.Binding) error {
  370. var result schedulerapi.ExtenderBindingResult
  371. if !h.IsBinder() {
  372. // This shouldn't happen as this extender wouldn't have become a Binder.
  373. return fmt.Errorf("Unexpected empty bindVerb in extender")
  374. }
  375. req := &schedulerapi.ExtenderBindingArgs{
  376. PodName: binding.Name,
  377. PodNamespace: binding.Namespace,
  378. PodUID: binding.UID,
  379. Node: binding.Target.Name,
  380. }
  381. if err := h.send(h.bindVerb, &req, &result); err != nil {
  382. return err
  383. }
  384. if result.Error != "" {
  385. return fmt.Errorf(result.Error)
  386. }
  387. return nil
  388. }
  389. // IsBinder returns whether this extender is configured for the Bind method.
  390. func (h *HTTPExtender) IsBinder() bool {
  391. return h.bindVerb != ""
  392. }
  393. // Helper function to send messages to the extender
  394. func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
  395. out, err := json.Marshal(args)
  396. if err != nil {
  397. return err
  398. }
  399. url := strings.TrimRight(h.extenderURL, "/") + "/" + action
  400. req, err := http.NewRequest("POST", url, bytes.NewReader(out))
  401. if err != nil {
  402. return err
  403. }
  404. req.Header.Set("Content-Type", "application/json")
  405. resp, err := h.client.Do(req)
  406. if err != nil {
  407. return err
  408. }
  409. defer resp.Body.Close()
  410. if resp.StatusCode != http.StatusOK {
  411. return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
  412. }
  413. return json.NewDecoder(resp.Body).Decode(result)
  414. }
  415. // IsInterested returns true if at least one extended resource requested by
  416. // this pod is managed by this extender.
  417. func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
  418. if h.managedResources.Len() == 0 {
  419. return true
  420. }
  421. if h.hasManagedResources(pod.Spec.Containers) {
  422. return true
  423. }
  424. if h.hasManagedResources(pod.Spec.InitContainers) {
  425. return true
  426. }
  427. return false
  428. }
  429. func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
  430. for i := range containers {
  431. container := &containers[i]
  432. for resourceName := range container.Resources.Requests {
  433. if h.managedResources.Has(string(resourceName)) {
  434. return true
  435. }
  436. }
  437. for resourceName := range container.Resources.Limits {
  438. if h.managedResources.Has(string(resourceName)) {
  439. return true
  440. }
  441. }
  442. }
  443. return false
  444. }