framework.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package v1alpha1
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. "k8s.io/apimachinery/pkg/types"
  20. "k8s.io/klog"
  21. "k8s.io/kubernetes/pkg/scheduler/apis/config"
  22. "k8s.io/kubernetes/pkg/scheduler/internal/cache"
  23. )
  24. // framework is the component responsible for initializing and running scheduler
  25. // plugins.
  26. type framework struct {
  27. registry Registry
  28. nodeInfoSnapshot *cache.NodeInfoSnapshot
  29. waitingPods *waitingPodsMap
  30. plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
  31. queueSortPlugins []QueueSortPlugin
  32. reservePlugins []ReservePlugin
  33. prebindPlugins []PrebindPlugin
  34. postbindPlugins []PostbindPlugin
  35. unreservePlugins []UnreservePlugin
  36. permitPlugins []PermitPlugin
  37. }
  38. const (
  39. // Specifies the maximum timeout a permit plugin can return.
  40. maxTimeout time.Duration = 15 * time.Minute
  41. )
  42. var _ = Framework(&framework{})
  43. // NewFramework initializes plugins given the configuration and the registry.
  44. func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
  45. f := &framework{
  46. registry: r,
  47. nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
  48. plugins: make(map[string]Plugin),
  49. waitingPods: newWaitingPodsMap(),
  50. }
  51. if plugins == nil {
  52. return f, nil
  53. }
  54. // get needed plugins from config
  55. pg := pluginsNeeded(plugins)
  56. if len(pg) == 0 {
  57. return f, nil
  58. }
  59. pluginConfig := pluginNameToConfig(args)
  60. for name, factory := range r {
  61. // initialize only needed plugins
  62. if _, ok := pg[name]; !ok {
  63. continue
  64. }
  65. // find the config args of a plugin
  66. pc := pluginConfig[name]
  67. p, err := factory(pc, f)
  68. if err != nil {
  69. return nil, fmt.Errorf("error initializing plugin %v: %v", name, err)
  70. }
  71. f.plugins[name] = p
  72. }
  73. if plugins.Reserve != nil {
  74. for _, r := range plugins.Reserve.Enabled {
  75. if pg, ok := f.plugins[r.Name]; ok {
  76. p, ok := pg.(ReservePlugin)
  77. if !ok {
  78. return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name)
  79. }
  80. f.reservePlugins = append(f.reservePlugins, p)
  81. } else {
  82. return nil, fmt.Errorf("reserve plugin %v does not exist", r.Name)
  83. }
  84. }
  85. }
  86. if plugins.PreBind != nil {
  87. for _, pb := range plugins.PreBind.Enabled {
  88. if pg, ok := f.plugins[pb.Name]; ok {
  89. p, ok := pg.(PrebindPlugin)
  90. if !ok {
  91. return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name)
  92. }
  93. f.prebindPlugins = append(f.prebindPlugins, p)
  94. } else {
  95. return nil, fmt.Errorf("prebind plugin %v does not exist", pb.Name)
  96. }
  97. }
  98. }
  99. if plugins.PostBind != nil {
  100. for _, pb := range plugins.PostBind.Enabled {
  101. if pg, ok := f.plugins[pb.Name]; ok {
  102. p, ok := pg.(PostbindPlugin)
  103. if !ok {
  104. return nil, fmt.Errorf("plugin %v does not extend postbind plugin", pb.Name)
  105. }
  106. f.postbindPlugins = append(f.postbindPlugins, p)
  107. } else {
  108. return nil, fmt.Errorf("postbind plugin %v does not exist", pb.Name)
  109. }
  110. }
  111. }
  112. if plugins.Unreserve != nil {
  113. for _, ur := range plugins.Unreserve.Enabled {
  114. if pg, ok := f.plugins[ur.Name]; ok {
  115. p, ok := pg.(UnreservePlugin)
  116. if !ok {
  117. return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name)
  118. }
  119. f.unreservePlugins = append(f.unreservePlugins, p)
  120. } else {
  121. return nil, fmt.Errorf("unreserve plugin %v does not exist", ur.Name)
  122. }
  123. }
  124. }
  125. if plugins.Permit != nil {
  126. for _, pr := range plugins.Permit.Enabled {
  127. if pg, ok := f.plugins[pr.Name]; ok {
  128. p, ok := pg.(PermitPlugin)
  129. if !ok {
  130. return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name)
  131. }
  132. f.permitPlugins = append(f.permitPlugins, p)
  133. } else {
  134. return nil, fmt.Errorf("permit plugin %v does not exist", pr.Name)
  135. }
  136. }
  137. }
  138. if plugins.QueueSort != nil {
  139. for _, qs := range plugins.QueueSort.Enabled {
  140. if pg, ok := f.plugins[qs.Name]; ok {
  141. p, ok := pg.(QueueSortPlugin)
  142. if !ok {
  143. return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name)
  144. }
  145. f.queueSortPlugins = append(f.queueSortPlugins, p)
  146. if len(f.queueSortPlugins) > 1 {
  147. return nil, fmt.Errorf("only one queue sort plugin can be enabled")
  148. }
  149. } else {
  150. return nil, fmt.Errorf("queue sort plugin %v does not exist", qs.Name)
  151. }
  152. }
  153. }
  154. return f, nil
  155. }
  156. // QueueSortFunc returns the function to sort pods in scheduling queue
  157. func (f *framework) QueueSortFunc() LessFunc {
  158. if len(f.queueSortPlugins) == 0 {
  159. return nil
  160. }
  161. // Only one QueueSort plugin can be enabled.
  162. return f.queueSortPlugins[0].Less
  163. }
  164. // RunPrebindPlugins runs the set of configured prebind plugins. It returns a
  165. // failure (bool) if any of the plugins returns an error. It also returns an
  166. // error containing the rejection message or the error occurred in the plugin.
  167. func (f *framework) RunPrebindPlugins(
  168. pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
  169. for _, pl := range f.prebindPlugins {
  170. status := pl.Prebind(pc, pod, nodeName)
  171. if !status.IsSuccess() {
  172. if status.Code() == Unschedulable {
  173. msg := fmt.Sprintf("rejected by %v at prebind: %v", pl.Name(), status.Message())
  174. klog.V(4).Infof(msg)
  175. return NewStatus(status.Code(), msg)
  176. }
  177. msg := fmt.Sprintf("error while running %v prebind plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
  178. klog.Error(msg)
  179. return NewStatus(Error, msg)
  180. }
  181. }
  182. return nil
  183. }
  184. // RunPostbindPlugins runs the set of configured postbind plugins.
  185. func (f *framework) RunPostbindPlugins(
  186. pc *PluginContext, pod *v1.Pod, nodeName string) {
  187. for _, pl := range f.postbindPlugins {
  188. pl.Postbind(pc, pod, nodeName)
  189. }
  190. }
  191. // RunReservePlugins runs the set of configured reserve plugins. If any of these
  192. // plugins returns an error, it does not continue running the remaining ones and
  193. // returns the error. In such case, pod will not be scheduled.
  194. func (f *framework) RunReservePlugins(
  195. pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
  196. for _, pl := range f.reservePlugins {
  197. status := pl.Reserve(pc, pod, nodeName)
  198. if !status.IsSuccess() {
  199. msg := fmt.Sprintf("error while running %v reserve plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
  200. klog.Error(msg)
  201. return NewStatus(Error, msg)
  202. }
  203. }
  204. return nil
  205. }
  206. // RunUnreservePlugins runs the set of configured unreserve plugins.
  207. func (f *framework) RunUnreservePlugins(
  208. pc *PluginContext, pod *v1.Pod, nodeName string) {
  209. for _, pl := range f.unreservePlugins {
  210. pl.Unreserve(pc, pod, nodeName)
  211. }
  212. }
  213. // RunPermitPlugins runs the set of configured permit plugins. If any of these
  214. // plugins returns a status other than "Success" or "Wait", it does not continue
  215. // running the remaining plugins and returns an error. Otherwise, if any of the
  216. // plugins returns "Wait", then this function will block for the timeout period
  217. // returned by the plugin, if the time expires, then it will return an error.
  218. // Note that if multiple plugins asked to wait, then we wait for the minimum
  219. // timeout duration.
  220. func (f *framework) RunPermitPlugins(
  221. pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
  222. timeout := maxTimeout
  223. statusCode := Success
  224. for _, pl := range f.permitPlugins {
  225. status, d := pl.Permit(pc, pod, nodeName)
  226. if !status.IsSuccess() {
  227. if status.Code() == Unschedulable {
  228. msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message())
  229. klog.V(4).Infof(msg)
  230. return NewStatus(status.Code(), msg)
  231. }
  232. if status.Code() == Wait {
  233. // Use the minimum timeout duration.
  234. if timeout > d {
  235. timeout = d
  236. }
  237. statusCode = Wait
  238. } else {
  239. msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
  240. klog.Error(msg)
  241. return NewStatus(Error, msg)
  242. }
  243. }
  244. }
  245. // We now wait for the minimum duration if at least one plugin asked to
  246. // wait (and no plugin rejected the pod)
  247. if statusCode == Wait {
  248. w := newWaitingPod(pod)
  249. f.waitingPods.add(w)
  250. defer f.waitingPods.remove(pod.UID)
  251. timer := time.NewTimer(timeout)
  252. klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name)
  253. select {
  254. case <-timer.C:
  255. msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout)
  256. klog.V(4).Infof(msg)
  257. return NewStatus(Unschedulable, msg)
  258. case s := <-w.s:
  259. if !s.IsSuccess() {
  260. if s.Code() == Unschedulable {
  261. msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
  262. klog.V(4).Infof(msg)
  263. return NewStatus(s.Code(), msg)
  264. }
  265. msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message())
  266. klog.Error(msg)
  267. return NewStatus(Error, msg)
  268. }
  269. }
  270. }
  271. return nil
  272. }
  273. // NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
  274. // is taken at the beginning of a scheduling cycle and remains unchanged until a
  275. // pod finishes "Reserve". There is no guarantee that the information remains
  276. // unchanged after "Reserve".
  277. func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot {
  278. return f.nodeInfoSnapshot
  279. }
  280. // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
  281. func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
  282. f.waitingPods.iterate(callback)
  283. }
  284. // GetWaitingPod returns a reference to a WaitingPod given its UID.
  285. func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
  286. return f.waitingPods.get(uid)
  287. }
  288. func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
  289. pc := make(map[string]*runtime.Unknown, 0)
  290. for _, p := range args {
  291. pc[p.Name] = &p.Args
  292. }
  293. return pc
  294. }
  295. func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
  296. pgMap := make(map[string]struct{}, 0)
  297. if plugins == nil {
  298. return pgMap
  299. }
  300. find := func(pgs *config.PluginSet) {
  301. if pgs == nil {
  302. return
  303. }
  304. for _, pg := range pgs.Enabled {
  305. pgMap[pg.Name] = struct{}{}
  306. }
  307. }
  308. find(plugins.QueueSort)
  309. find(plugins.PreFilter)
  310. find(plugins.Filter)
  311. find(plugins.PostFilter)
  312. find(plugins.Score)
  313. find(plugins.NormalizeScore)
  314. find(plugins.Reserve)
  315. find(plugins.Permit)
  316. find(plugins.PreBind)
  317. find(plugins.Bind)
  318. find(plugins.PostBind)
  319. find(plugins.Unreserve)
  320. return pgMap
  321. }