cache_test.go 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "time"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. featuregatetesting "k8s.io/component-base/featuregate/testing"
  27. "k8s.io/kubernetes/pkg/features"
  28. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  29. schedutil "k8s.io/kubernetes/pkg/scheduler/util"
  30. )
  31. func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) error {
  32. if (actual == nil) != (expected == nil) {
  33. return errors.New("one of the actual or expected is nil and the other is not")
  34. }
  35. // Ignore generation field.
  36. if actual != nil {
  37. actual.info.SetGeneration(0)
  38. }
  39. if expected != nil {
  40. expected.SetGeneration(0)
  41. }
  42. if actual != nil && !reflect.DeepEqual(actual.info, expected) {
  43. return fmt.Errorf("got node info %s, want %s", actual.info, expected)
  44. }
  45. return nil
  46. }
  47. type hostPortInfoParam struct {
  48. protocol, ip string
  49. port int32
  50. }
  51. type hostPortInfoBuilder struct {
  52. inputs []hostPortInfoParam
  53. }
  54. func newHostPortInfoBuilder() *hostPortInfoBuilder {
  55. return &hostPortInfoBuilder{}
  56. }
  57. func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder {
  58. b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port})
  59. return b
  60. }
  61. func (b *hostPortInfoBuilder) build() schedulernodeinfo.HostPortInfo {
  62. res := make(schedulernodeinfo.HostPortInfo)
  63. for _, param := range b.inputs {
  64. res.Add(param.ip, param.protocol, param.port)
  65. }
  66. return res
  67. }
  68. func newNodeInfo(requestedResource *schedulernodeinfo.Resource,
  69. nonzeroRequest *schedulernodeinfo.Resource,
  70. pods []*v1.Pod,
  71. usedPorts schedulernodeinfo.HostPortInfo,
  72. imageStates map[string]*schedulernodeinfo.ImageStateSummary,
  73. ) *schedulernodeinfo.NodeInfo {
  74. nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
  75. nodeInfo.SetRequestedResource(requestedResource)
  76. nodeInfo.SetNonZeroRequest(nonzeroRequest)
  77. nodeInfo.SetUsedPorts(usedPorts)
  78. nodeInfo.SetImageStates(imageStates)
  79. return nodeInfo
  80. }
  81. // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
  82. // on node level.
  83. func TestAssumePodScheduled(t *testing.T) {
  84. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  85. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  86. nodeName := "node"
  87. testPods := []*v1.Pod{
  88. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  89. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  90. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  91. makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  92. makeBasePod(t, nodeName, "test", "100m", "500", "example.com/foo:3", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  93. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "example.com/foo:5", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  94. makeBasePod(t, nodeName, "test", "100m", "500", "random-invalid-extended-key:100", []v1.ContainerPort{{}}),
  95. }
  96. tests := []struct {
  97. pods []*v1.Pod
  98. wNodeInfo *schedulernodeinfo.NodeInfo
  99. }{{
  100. pods: []*v1.Pod{testPods[0]},
  101. wNodeInfo: newNodeInfo(
  102. &schedulernodeinfo.Resource{
  103. MilliCPU: 100,
  104. Memory: 500,
  105. },
  106. &schedulernodeinfo.Resource{
  107. MilliCPU: 100,
  108. Memory: 500,
  109. },
  110. []*v1.Pod{testPods[0]},
  111. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  112. make(map[string]*schedulernodeinfo.ImageStateSummary),
  113. ),
  114. }, {
  115. pods: []*v1.Pod{testPods[1], testPods[2]},
  116. wNodeInfo: newNodeInfo(
  117. &schedulernodeinfo.Resource{
  118. MilliCPU: 300,
  119. Memory: 1524,
  120. },
  121. &schedulernodeinfo.Resource{
  122. MilliCPU: 300,
  123. Memory: 1524,
  124. },
  125. []*v1.Pod{testPods[1], testPods[2]},
  126. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
  127. make(map[string]*schedulernodeinfo.ImageStateSummary),
  128. ),
  129. }, { // test non-zero request
  130. pods: []*v1.Pod{testPods[3]},
  131. wNodeInfo: newNodeInfo(
  132. &schedulernodeinfo.Resource{
  133. MilliCPU: 0,
  134. Memory: 0,
  135. },
  136. &schedulernodeinfo.Resource{
  137. MilliCPU: schedutil.DefaultMilliCPURequest,
  138. Memory: schedutil.DefaultMemoryRequest,
  139. },
  140. []*v1.Pod{testPods[3]},
  141. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  142. make(map[string]*schedulernodeinfo.ImageStateSummary),
  143. ),
  144. }, {
  145. pods: []*v1.Pod{testPods[4]},
  146. wNodeInfo: newNodeInfo(
  147. &schedulernodeinfo.Resource{
  148. MilliCPU: 100,
  149. Memory: 500,
  150. ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3},
  151. },
  152. &schedulernodeinfo.Resource{
  153. MilliCPU: 100,
  154. Memory: 500,
  155. },
  156. []*v1.Pod{testPods[4]},
  157. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  158. make(map[string]*schedulernodeinfo.ImageStateSummary),
  159. ),
  160. }, {
  161. pods: []*v1.Pod{testPods[4], testPods[5]},
  162. wNodeInfo: newNodeInfo(
  163. &schedulernodeinfo.Resource{
  164. MilliCPU: 300,
  165. Memory: 1524,
  166. ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8},
  167. },
  168. &schedulernodeinfo.Resource{
  169. MilliCPU: 300,
  170. Memory: 1524,
  171. },
  172. []*v1.Pod{testPods[4], testPods[5]},
  173. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
  174. make(map[string]*schedulernodeinfo.ImageStateSummary),
  175. ),
  176. }, {
  177. pods: []*v1.Pod{testPods[6]},
  178. wNodeInfo: newNodeInfo(
  179. &schedulernodeinfo.Resource{
  180. MilliCPU: 100,
  181. Memory: 500,
  182. },
  183. &schedulernodeinfo.Resource{
  184. MilliCPU: 100,
  185. Memory: 500,
  186. },
  187. []*v1.Pod{testPods[6]},
  188. newHostPortInfoBuilder().build(),
  189. make(map[string]*schedulernodeinfo.ImageStateSummary),
  190. ),
  191. },
  192. }
  193. for i, tt := range tests {
  194. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  195. cache := newSchedulerCache(time.Second, time.Second, nil)
  196. for _, pod := range tt.pods {
  197. if err := cache.AssumePod(pod); err != nil {
  198. t.Fatalf("AssumePod failed: %v", err)
  199. }
  200. }
  201. n := cache.nodes[nodeName]
  202. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  203. t.Error(err)
  204. }
  205. for _, pod := range tt.pods {
  206. if err := cache.ForgetPod(pod); err != nil {
  207. t.Fatalf("ForgetPod failed: %v", err)
  208. }
  209. if err := isForgottenFromCache(pod, cache); err != nil {
  210. t.Errorf("pod %s: %v", pod.Name, err)
  211. }
  212. }
  213. })
  214. }
  215. }
  216. type testExpirePodStruct struct {
  217. pod *v1.Pod
  218. finishBind bool
  219. assumedTime time.Time
  220. }
  221. func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
  222. if err := cache.AssumePod(pod); err != nil {
  223. return err
  224. }
  225. return cache.finishBinding(pod, assumedTime)
  226. }
  227. // TestExpirePod tests that assumed pods will be removed if expired.
  228. // The removal will be reflected in node info.
  229. func TestExpirePod(t *testing.T) {
  230. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  231. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  232. nodeName := "node"
  233. testPods := []*v1.Pod{
  234. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  235. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  236. makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  237. }
  238. now := time.Now()
  239. ttl := 10 * time.Second
  240. tests := []struct {
  241. pods []*testExpirePodStruct
  242. cleanupTime time.Time
  243. wNodeInfo *schedulernodeinfo.NodeInfo
  244. }{{ // assumed pod would expires
  245. pods: []*testExpirePodStruct{
  246. {pod: testPods[0], finishBind: true, assumedTime: now},
  247. },
  248. cleanupTime: now.Add(2 * ttl),
  249. wNodeInfo: schedulernodeinfo.NewNodeInfo(),
  250. }, { // first one would expire, second and third would not.
  251. pods: []*testExpirePodStruct{
  252. {pod: testPods[0], finishBind: true, assumedTime: now},
  253. {pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)},
  254. {pod: testPods[2]},
  255. },
  256. cleanupTime: now.Add(2 * ttl),
  257. wNodeInfo: newNodeInfo(
  258. &schedulernodeinfo.Resource{
  259. MilliCPU: 400,
  260. Memory: 2048,
  261. },
  262. &schedulernodeinfo.Resource{
  263. MilliCPU: 400,
  264. Memory: 2048,
  265. },
  266. // Order gets altered when removing pods.
  267. []*v1.Pod{testPods[2], testPods[1]},
  268. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  269. make(map[string]*schedulernodeinfo.ImageStateSummary),
  270. ),
  271. }}
  272. for i, tt := range tests {
  273. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  274. cache := newSchedulerCache(ttl, time.Second, nil)
  275. for _, pod := range tt.pods {
  276. if err := cache.AssumePod(pod.pod); err != nil {
  277. t.Fatal(err)
  278. }
  279. if !pod.finishBind {
  280. continue
  281. }
  282. if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil {
  283. t.Fatal(err)
  284. }
  285. }
  286. // pods that got bound and have assumedTime + ttl < cleanupTime will get
  287. // expired and removed
  288. cache.cleanupAssumedPods(tt.cleanupTime)
  289. n := cache.nodes[nodeName]
  290. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  291. t.Error(err)
  292. }
  293. })
  294. }
  295. }
  296. // TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
  297. // The pod info should still exist after manually expiring unconfirmed pods.
  298. func TestAddPodWillConfirm(t *testing.T) {
  299. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  300. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  301. nodeName := "node"
  302. now := time.Now()
  303. ttl := 10 * time.Second
  304. testPods := []*v1.Pod{
  305. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  306. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  307. }
  308. tests := []struct {
  309. podsToAssume []*v1.Pod
  310. podsToAdd []*v1.Pod
  311. wNodeInfo *schedulernodeinfo.NodeInfo
  312. }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
  313. podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
  314. podsToAdd: []*v1.Pod{testPods[0]},
  315. wNodeInfo: newNodeInfo(
  316. &schedulernodeinfo.Resource{
  317. MilliCPU: 100,
  318. Memory: 500,
  319. },
  320. &schedulernodeinfo.Resource{
  321. MilliCPU: 100,
  322. Memory: 500,
  323. },
  324. []*v1.Pod{testPods[0]},
  325. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  326. make(map[string]*schedulernodeinfo.ImageStateSummary),
  327. ),
  328. }}
  329. for i, tt := range tests {
  330. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  331. cache := newSchedulerCache(ttl, time.Second, nil)
  332. for _, podToAssume := range tt.podsToAssume {
  333. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  334. t.Fatalf("assumePod failed: %v", err)
  335. }
  336. }
  337. for _, podToAdd := range tt.podsToAdd {
  338. if err := cache.AddPod(podToAdd); err != nil {
  339. t.Fatalf("AddPod failed: %v", err)
  340. }
  341. }
  342. cache.cleanupAssumedPods(now.Add(2 * ttl))
  343. // check after expiration. confirmed pods shouldn't be expired.
  344. n := cache.nodes[nodeName]
  345. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  346. t.Error(err)
  347. }
  348. })
  349. }
  350. }
  351. func TestSnapshot(t *testing.T) {
  352. nodeName := "node"
  353. now := time.Now()
  354. ttl := 10 * time.Second
  355. testPods := []*v1.Pod{
  356. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  357. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  358. }
  359. tests := []struct {
  360. podsToAssume []*v1.Pod
  361. podsToAdd []*v1.Pod
  362. }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
  363. podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
  364. podsToAdd: []*v1.Pod{testPods[0]},
  365. }}
  366. for _, tt := range tests {
  367. cache := newSchedulerCache(ttl, time.Second, nil)
  368. for _, podToAssume := range tt.podsToAssume {
  369. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  370. t.Errorf("assumePod failed: %v", err)
  371. }
  372. }
  373. for _, podToAdd := range tt.podsToAdd {
  374. if err := cache.AddPod(podToAdd); err != nil {
  375. t.Errorf("AddPod failed: %v", err)
  376. }
  377. }
  378. snapshot := cache.Dump()
  379. if len(snapshot.Nodes) != len(cache.nodes) {
  380. t.Errorf("Unequal number of nodes in the cache and its snapshot. expected: %v, got: %v", len(cache.nodes), len(snapshot.Nodes))
  381. }
  382. for name, ni := range snapshot.Nodes {
  383. nItem := cache.nodes[name]
  384. if !reflect.DeepEqual(ni, nItem.info) {
  385. t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni)
  386. }
  387. }
  388. if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
  389. t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
  390. }
  391. }
  392. }
  393. // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
  394. func TestAddPodWillReplaceAssumed(t *testing.T) {
  395. now := time.Now()
  396. ttl := 10 * time.Second
  397. assumedPod := makeBasePod(t, "assumed-node-1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
  398. addedPod := makeBasePod(t, "actual-node", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
  399. updatedPod := makeBasePod(t, "actual-node", "test-1", "200m", "500", "", []v1.ContainerPort{{HostPort: 90}})
  400. tests := []struct {
  401. podsToAssume []*v1.Pod
  402. podsToAdd []*v1.Pod
  403. podsToUpdate [][]*v1.Pod
  404. wNodeInfo map[string]*schedulernodeinfo.NodeInfo
  405. }{{
  406. podsToAssume: []*v1.Pod{assumedPod.DeepCopy()},
  407. podsToAdd: []*v1.Pod{addedPod.DeepCopy()},
  408. podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}},
  409. wNodeInfo: map[string]*schedulernodeinfo.NodeInfo{
  410. "assumed-node": nil,
  411. "actual-node": newNodeInfo(
  412. &schedulernodeinfo.Resource{
  413. MilliCPU: 200,
  414. Memory: 500,
  415. },
  416. &schedulernodeinfo.Resource{
  417. MilliCPU: 200,
  418. Memory: 500,
  419. },
  420. []*v1.Pod{updatedPod.DeepCopy()},
  421. newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
  422. make(map[string]*schedulernodeinfo.ImageStateSummary),
  423. ),
  424. },
  425. }}
  426. for i, tt := range tests {
  427. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  428. cache := newSchedulerCache(ttl, time.Second, nil)
  429. for _, podToAssume := range tt.podsToAssume {
  430. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  431. t.Fatalf("assumePod failed: %v", err)
  432. }
  433. }
  434. for _, podToAdd := range tt.podsToAdd {
  435. if err := cache.AddPod(podToAdd); err != nil {
  436. t.Fatalf("AddPod failed: %v", err)
  437. }
  438. }
  439. for _, podToUpdate := range tt.podsToUpdate {
  440. if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil {
  441. t.Fatalf("UpdatePod failed: %v", err)
  442. }
  443. }
  444. for nodeName, expected := range tt.wNodeInfo {
  445. n := cache.nodes[nodeName]
  446. if err := deepEqualWithoutGeneration(n, expected); err != nil {
  447. t.Errorf("node %q: %v", nodeName, err)
  448. }
  449. }
  450. })
  451. }
  452. }
  453. // TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
  454. func TestAddPodAfterExpiration(t *testing.T) {
  455. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  456. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  457. nodeName := "node"
  458. ttl := 10 * time.Second
  459. basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  460. tests := []struct {
  461. pod *v1.Pod
  462. wNodeInfo *schedulernodeinfo.NodeInfo
  463. }{{
  464. pod: basePod,
  465. wNodeInfo: newNodeInfo(
  466. &schedulernodeinfo.Resource{
  467. MilliCPU: 100,
  468. Memory: 500,
  469. },
  470. &schedulernodeinfo.Resource{
  471. MilliCPU: 100,
  472. Memory: 500,
  473. },
  474. []*v1.Pod{basePod},
  475. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  476. make(map[string]*schedulernodeinfo.ImageStateSummary),
  477. ),
  478. }}
  479. for i, tt := range tests {
  480. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  481. now := time.Now()
  482. cache := newSchedulerCache(ttl, time.Second, nil)
  483. if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
  484. t.Fatalf("assumePod failed: %v", err)
  485. }
  486. cache.cleanupAssumedPods(now.Add(2 * ttl))
  487. // It should be expired and removed.
  488. if err := isForgottenFromCache(tt.pod, cache); err != nil {
  489. t.Error(err)
  490. }
  491. if err := cache.AddPod(tt.pod); err != nil {
  492. t.Fatalf("AddPod failed: %v", err)
  493. }
  494. // check after expiration. confirmed pods shouldn't be expired.
  495. n := cache.nodes[nodeName]
  496. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  497. t.Error(err)
  498. }
  499. })
  500. }
  501. }
  502. // TestUpdatePod tests that a pod will be updated if added before.
  503. func TestUpdatePod(t *testing.T) {
  504. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  505. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  506. nodeName := "node"
  507. ttl := 10 * time.Second
  508. testPods := []*v1.Pod{
  509. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  510. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  511. }
  512. tests := []struct {
  513. podsToAdd []*v1.Pod
  514. podsToUpdate []*v1.Pod
  515. wNodeInfo []*schedulernodeinfo.NodeInfo
  516. }{{ // add a pod and then update it twice
  517. podsToAdd: []*v1.Pod{testPods[0]},
  518. podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
  519. wNodeInfo: []*schedulernodeinfo.NodeInfo{newNodeInfo(
  520. &schedulernodeinfo.Resource{
  521. MilliCPU: 200,
  522. Memory: 1024,
  523. },
  524. &schedulernodeinfo.Resource{
  525. MilliCPU: 200,
  526. Memory: 1024,
  527. },
  528. []*v1.Pod{testPods[1]},
  529. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  530. make(map[string]*schedulernodeinfo.ImageStateSummary),
  531. ), newNodeInfo(
  532. &schedulernodeinfo.Resource{
  533. MilliCPU: 100,
  534. Memory: 500,
  535. },
  536. &schedulernodeinfo.Resource{
  537. MilliCPU: 100,
  538. Memory: 500,
  539. },
  540. []*v1.Pod{testPods[0]},
  541. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  542. make(map[string]*schedulernodeinfo.ImageStateSummary),
  543. )},
  544. }}
  545. for i, tt := range tests {
  546. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  547. cache := newSchedulerCache(ttl, time.Second, nil)
  548. for _, podToAdd := range tt.podsToAdd {
  549. if err := cache.AddPod(podToAdd); err != nil {
  550. t.Fatalf("AddPod failed: %v", err)
  551. }
  552. }
  553. for j := range tt.podsToUpdate {
  554. if j == 0 {
  555. continue
  556. }
  557. if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
  558. t.Fatalf("UpdatePod failed: %v", err)
  559. }
  560. // check after expiration. confirmed pods shouldn't be expired.
  561. n := cache.nodes[nodeName]
  562. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
  563. t.Errorf("update %d: %v", j, err)
  564. }
  565. }
  566. })
  567. }
  568. }
  569. // TestUpdatePodAndGet tests get always return latest pod state
  570. func TestUpdatePodAndGet(t *testing.T) {
  571. nodeName := "node"
  572. ttl := 10 * time.Second
  573. testPods := []*v1.Pod{
  574. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  575. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  576. }
  577. tests := []struct {
  578. pod *v1.Pod
  579. podToUpdate *v1.Pod
  580. handler func(cache Cache, pod *v1.Pod) error
  581. assumePod bool
  582. }{
  583. {
  584. pod: testPods[0],
  585. podToUpdate: testPods[0],
  586. handler: func(cache Cache, pod *v1.Pod) error {
  587. return cache.AssumePod(pod)
  588. },
  589. assumePod: true,
  590. },
  591. {
  592. pod: testPods[0],
  593. podToUpdate: testPods[1],
  594. handler: func(cache Cache, pod *v1.Pod) error {
  595. return cache.AddPod(pod)
  596. },
  597. assumePod: false,
  598. },
  599. }
  600. for _, tt := range tests {
  601. cache := newSchedulerCache(ttl, time.Second, nil)
  602. if err := tt.handler(cache, tt.pod); err != nil {
  603. t.Fatalf("unexpected err: %v", err)
  604. }
  605. if !tt.assumePod {
  606. if err := cache.UpdatePod(tt.pod, tt.podToUpdate); err != nil {
  607. t.Fatalf("UpdatePod failed: %v", err)
  608. }
  609. }
  610. cachedPod, err := cache.GetPod(tt.pod)
  611. if err != nil {
  612. t.Fatalf("GetPod failed: %v", err)
  613. }
  614. if !reflect.DeepEqual(tt.podToUpdate, cachedPod) {
  615. t.Fatalf("pod get=%s, want=%s", cachedPod, tt.podToUpdate)
  616. }
  617. }
  618. }
  619. // TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
  620. func TestExpireAddUpdatePod(t *testing.T) {
  621. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  622. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  623. nodeName := "node"
  624. ttl := 10 * time.Second
  625. testPods := []*v1.Pod{
  626. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  627. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  628. }
  629. tests := []struct {
  630. podsToAssume []*v1.Pod
  631. podsToAdd []*v1.Pod
  632. podsToUpdate []*v1.Pod
  633. wNodeInfo []*schedulernodeinfo.NodeInfo
  634. }{{ // Pod is assumed, expired, and added. Then it would be updated twice.
  635. podsToAssume: []*v1.Pod{testPods[0]},
  636. podsToAdd: []*v1.Pod{testPods[0]},
  637. podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
  638. wNodeInfo: []*schedulernodeinfo.NodeInfo{newNodeInfo(
  639. &schedulernodeinfo.Resource{
  640. MilliCPU: 200,
  641. Memory: 1024,
  642. },
  643. &schedulernodeinfo.Resource{
  644. MilliCPU: 200,
  645. Memory: 1024,
  646. },
  647. []*v1.Pod{testPods[1]},
  648. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  649. make(map[string]*schedulernodeinfo.ImageStateSummary),
  650. ), newNodeInfo(
  651. &schedulernodeinfo.Resource{
  652. MilliCPU: 100,
  653. Memory: 500,
  654. },
  655. &schedulernodeinfo.Resource{
  656. MilliCPU: 100,
  657. Memory: 500,
  658. },
  659. []*v1.Pod{testPods[0]},
  660. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  661. make(map[string]*schedulernodeinfo.ImageStateSummary),
  662. )},
  663. }}
  664. for i, tt := range tests {
  665. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  666. now := time.Now()
  667. cache := newSchedulerCache(ttl, time.Second, nil)
  668. for _, podToAssume := range tt.podsToAssume {
  669. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  670. t.Fatalf("assumePod failed: %v", err)
  671. }
  672. }
  673. cache.cleanupAssumedPods(now.Add(2 * ttl))
  674. for _, podToAdd := range tt.podsToAdd {
  675. if err := cache.AddPod(podToAdd); err != nil {
  676. t.Fatalf("AddPod failed: %v", err)
  677. }
  678. }
  679. for j := range tt.podsToUpdate {
  680. if j == 0 {
  681. continue
  682. }
  683. if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil {
  684. t.Fatalf("UpdatePod failed: %v", err)
  685. }
  686. // check after expiration. confirmed pods shouldn't be expired.
  687. n := cache.nodes[nodeName]
  688. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil {
  689. t.Errorf("update %d: %v", j, err)
  690. }
  691. }
  692. })
  693. }
  694. }
  695. func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
  696. req := v1.ResourceList{
  697. v1.ResourceEphemeralStorage: resource.MustParse(ephemeralStorage),
  698. }
  699. return &v1.Pod{
  700. ObjectMeta: metav1.ObjectMeta{
  701. Namespace: "default-namespace",
  702. Name: "pod-with-ephemeral-storage",
  703. UID: types.UID("pod-with-ephemeral-storage"),
  704. },
  705. Spec: v1.PodSpec{
  706. Containers: []v1.Container{{
  707. Resources: v1.ResourceRequirements{
  708. Requests: req,
  709. },
  710. }},
  711. NodeName: nodeName,
  712. },
  713. }
  714. }
  715. func TestEphemeralStorageResource(t *testing.T) {
  716. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  717. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  718. nodeName := "node"
  719. podE := makePodWithEphemeralStorage(nodeName, "500")
  720. tests := []struct {
  721. pod *v1.Pod
  722. wNodeInfo *schedulernodeinfo.NodeInfo
  723. }{
  724. {
  725. pod: podE,
  726. wNodeInfo: newNodeInfo(
  727. &schedulernodeinfo.Resource{
  728. EphemeralStorage: 500,
  729. },
  730. &schedulernodeinfo.Resource{
  731. MilliCPU: schedutil.DefaultMilliCPURequest,
  732. Memory: schedutil.DefaultMemoryRequest,
  733. },
  734. []*v1.Pod{podE},
  735. schedulernodeinfo.HostPortInfo{},
  736. make(map[string]*schedulernodeinfo.ImageStateSummary),
  737. ),
  738. },
  739. }
  740. for i, tt := range tests {
  741. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  742. cache := newSchedulerCache(time.Second, time.Second, nil)
  743. if err := cache.AddPod(tt.pod); err != nil {
  744. t.Fatalf("AddPod failed: %v", err)
  745. }
  746. n := cache.nodes[nodeName]
  747. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  748. t.Error(err)
  749. }
  750. if err := cache.RemovePod(tt.pod); err != nil {
  751. t.Fatalf("RemovePod failed: %v", err)
  752. }
  753. if _, err := cache.GetPod(tt.pod); err == nil {
  754. t.Errorf("pod was not deleted")
  755. }
  756. })
  757. }
  758. }
  759. // TestRemovePod tests after added pod is removed, its information should also be subtracted.
  760. func TestRemovePod(t *testing.T) {
  761. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  762. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  763. basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  764. tests := []struct {
  765. nodes []*v1.Node
  766. pod *v1.Pod
  767. wNodeInfo *schedulernodeinfo.NodeInfo
  768. }{{
  769. nodes: []*v1.Node{
  770. {
  771. ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
  772. },
  773. {
  774. ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
  775. },
  776. },
  777. pod: basePod,
  778. wNodeInfo: newNodeInfo(
  779. &schedulernodeinfo.Resource{
  780. MilliCPU: 100,
  781. Memory: 500,
  782. },
  783. &schedulernodeinfo.Resource{
  784. MilliCPU: 100,
  785. Memory: 500,
  786. },
  787. []*v1.Pod{basePod},
  788. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  789. make(map[string]*schedulernodeinfo.ImageStateSummary),
  790. ),
  791. }}
  792. for i, tt := range tests {
  793. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  794. nodeName := tt.pod.Spec.NodeName
  795. cache := newSchedulerCache(time.Second, time.Second, nil)
  796. // Add pod succeeds even before adding the nodes.
  797. if err := cache.AddPod(tt.pod); err != nil {
  798. t.Fatalf("AddPod failed: %v", err)
  799. }
  800. n := cache.nodes[nodeName]
  801. if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
  802. t.Error(err)
  803. }
  804. for _, n := range tt.nodes {
  805. if err := cache.AddNode(n); err != nil {
  806. t.Error(err)
  807. }
  808. }
  809. if err := cache.RemovePod(tt.pod); err != nil {
  810. t.Fatalf("RemovePod failed: %v", err)
  811. }
  812. if _, err := cache.GetPod(tt.pod); err == nil {
  813. t.Errorf("pod was not deleted")
  814. }
  815. // Node that owned the Pod should be at the head of the list.
  816. if cache.headNode.info.Node().Name != nodeName {
  817. t.Errorf("node %q is not at the head of the list", nodeName)
  818. }
  819. })
  820. }
  821. }
  822. func TestForgetPod(t *testing.T) {
  823. nodeName := "node"
  824. basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  825. pods := []*v1.Pod{basePod}
  826. now := time.Now()
  827. ttl := 10 * time.Second
  828. cache := newSchedulerCache(ttl, time.Second, nil)
  829. for _, pod := range pods {
  830. if err := assumeAndFinishBinding(cache, pod, now); err != nil {
  831. t.Fatalf("assumePod failed: %v", err)
  832. }
  833. isAssumed, err := cache.IsAssumedPod(pod)
  834. if err != nil {
  835. t.Fatalf("IsAssumedPod failed: %v.", err)
  836. }
  837. if !isAssumed {
  838. t.Fatalf("Pod is expected to be assumed.")
  839. }
  840. assumedPod, err := cache.GetPod(pod)
  841. if err != nil {
  842. t.Fatalf("GetPod failed: %v.", err)
  843. }
  844. if assumedPod.Namespace != pod.Namespace {
  845. t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
  846. }
  847. if assumedPod.Name != pod.Name {
  848. t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
  849. }
  850. }
  851. for _, pod := range pods {
  852. if err := cache.ForgetPod(pod); err != nil {
  853. t.Fatalf("ForgetPod failed: %v", err)
  854. }
  855. if err := isForgottenFromCache(pod, cache); err != nil {
  856. t.Errorf("pod %q: %v", pod.Name, err)
  857. }
  858. }
  859. }
  860. // getResourceRequest returns the resource request of all containers in Pods;
  861. // excluding initContainers.
  862. func getResourceRequest(pod *v1.Pod) v1.ResourceList {
  863. result := &schedulernodeinfo.Resource{}
  864. for _, container := range pod.Spec.Containers {
  865. result.Add(container.Resources.Requests)
  866. }
  867. return result.ResourceList()
  868. }
  869. // buildNodeInfo creates a NodeInfo by simulating node operations in cache.
  870. func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *schedulernodeinfo.NodeInfo {
  871. expected := schedulernodeinfo.NewNodeInfo()
  872. // Simulate SetNode.
  873. expected.SetNode(node)
  874. expected.SetAllocatableResource(schedulernodeinfo.NewResource(node.Status.Allocatable))
  875. expected.SetTaints(node.Spec.Taints)
  876. expected.SetGeneration(expected.GetGeneration() + 1)
  877. for _, pod := range pods {
  878. // Simulate AddPod
  879. pods := append(expected.Pods(), pod)
  880. expected.SetPods(pods)
  881. requestedResource := expected.RequestedResource()
  882. newRequestedResource := &requestedResource
  883. newRequestedResource.Add(getResourceRequest(pod))
  884. expected.SetRequestedResource(newRequestedResource)
  885. nonZeroRequest := expected.NonZeroRequest()
  886. newNonZeroRequest := &nonZeroRequest
  887. newNonZeroRequest.Add(getResourceRequest(pod))
  888. expected.SetNonZeroRequest(newNonZeroRequest)
  889. expected.UpdateUsedPorts(pod, true)
  890. expected.SetGeneration(expected.GetGeneration() + 1)
  891. }
  892. return expected
  893. }
  894. // TestNodeOperators tests node operations of cache, including add, update
  895. // and remove.
  896. func TestNodeOperators(t *testing.T) {
  897. // Test datas
  898. nodeName := "test-node"
  899. cpu1 := resource.MustParse("1000m")
  900. mem100m := resource.MustParse("100m")
  901. cpuHalf := resource.MustParse("500m")
  902. mem50m := resource.MustParse("50m")
  903. resourceFooName := "example.com/foo"
  904. resourceFoo := resource.MustParse("1")
  905. tests := []struct {
  906. node *v1.Node
  907. pods []*v1.Pod
  908. }{
  909. {
  910. node: &v1.Node{
  911. ObjectMeta: metav1.ObjectMeta{
  912. Name: nodeName,
  913. },
  914. Status: v1.NodeStatus{
  915. Allocatable: v1.ResourceList{
  916. v1.ResourceCPU: cpu1,
  917. v1.ResourceMemory: mem100m,
  918. v1.ResourceName(resourceFooName): resourceFoo,
  919. },
  920. },
  921. Spec: v1.NodeSpec{
  922. Taints: []v1.Taint{
  923. {
  924. Key: "test-key",
  925. Value: "test-value",
  926. Effect: v1.TaintEffectPreferNoSchedule,
  927. },
  928. },
  929. },
  930. },
  931. pods: []*v1.Pod{
  932. {
  933. ObjectMeta: metav1.ObjectMeta{
  934. Name: "pod1",
  935. UID: types.UID("pod1"),
  936. },
  937. Spec: v1.PodSpec{
  938. NodeName: nodeName,
  939. Containers: []v1.Container{
  940. {
  941. Resources: v1.ResourceRequirements{
  942. Requests: v1.ResourceList{
  943. v1.ResourceCPU: cpuHalf,
  944. v1.ResourceMemory: mem50m,
  945. },
  946. },
  947. Ports: []v1.ContainerPort{
  948. {
  949. Name: "http",
  950. HostPort: 80,
  951. ContainerPort: 80,
  952. },
  953. },
  954. },
  955. },
  956. },
  957. },
  958. },
  959. },
  960. {
  961. node: &v1.Node{
  962. ObjectMeta: metav1.ObjectMeta{
  963. Name: nodeName,
  964. },
  965. Status: v1.NodeStatus{
  966. Allocatable: v1.ResourceList{
  967. v1.ResourceCPU: cpu1,
  968. v1.ResourceMemory: mem100m,
  969. v1.ResourceName(resourceFooName): resourceFoo,
  970. },
  971. },
  972. Spec: v1.NodeSpec{
  973. Taints: []v1.Taint{
  974. {
  975. Key: "test-key",
  976. Value: "test-value",
  977. Effect: v1.TaintEffectPreferNoSchedule,
  978. },
  979. },
  980. },
  981. },
  982. pods: []*v1.Pod{
  983. {
  984. ObjectMeta: metav1.ObjectMeta{
  985. Name: "pod1",
  986. UID: types.UID("pod1"),
  987. },
  988. Spec: v1.PodSpec{
  989. NodeName: nodeName,
  990. Containers: []v1.Container{
  991. {
  992. Resources: v1.ResourceRequirements{
  993. Requests: v1.ResourceList{
  994. v1.ResourceCPU: cpuHalf,
  995. v1.ResourceMemory: mem50m,
  996. },
  997. },
  998. },
  999. },
  1000. },
  1001. },
  1002. {
  1003. ObjectMeta: metav1.ObjectMeta{
  1004. Name: "pod2",
  1005. UID: types.UID("pod2"),
  1006. },
  1007. Spec: v1.PodSpec{
  1008. NodeName: nodeName,
  1009. Containers: []v1.Container{
  1010. {
  1011. Resources: v1.ResourceRequirements{
  1012. Requests: v1.ResourceList{
  1013. v1.ResourceCPU: cpuHalf,
  1014. v1.ResourceMemory: mem50m,
  1015. },
  1016. },
  1017. },
  1018. },
  1019. },
  1020. },
  1021. },
  1022. },
  1023. }
  1024. for i, test := range tests {
  1025. t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
  1026. expected := buildNodeInfo(test.node, test.pods)
  1027. node := test.node
  1028. cache := newSchedulerCache(time.Second, time.Second, nil)
  1029. if err := cache.AddNode(node); err != nil {
  1030. t.Fatal(err)
  1031. }
  1032. for _, pod := range test.pods {
  1033. if err := cache.AddPod(pod); err != nil {
  1034. t.Fatal(err)
  1035. }
  1036. }
  1037. // Step 1: the node was added into cache successfully.
  1038. got, found := cache.nodes[node.Name]
  1039. if !found {
  1040. t.Errorf("Failed to find node %v in internalcache.", node.Name)
  1041. }
  1042. if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
  1043. t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
  1044. }
  1045. // Generations are globally unique. We check in our unit tests that they are incremented correctly.
  1046. expected.SetGeneration(got.info.GetGeneration())
  1047. if !reflect.DeepEqual(got.info, expected) {
  1048. t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
  1049. }
  1050. // Step 2: dump cached nodes successfully.
  1051. cachedNodes := NewEmptySnapshot()
  1052. if err := cache.UpdateSnapshot(cachedNodes); err != nil {
  1053. t.Error(err)
  1054. }
  1055. newNode, found := cachedNodes.nodeInfoMap[node.Name]
  1056. if !found || len(cachedNodes.nodeInfoMap) != 1 {
  1057. t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
  1058. }
  1059. expected.SetGeneration(newNode.GetGeneration())
  1060. if !reflect.DeepEqual(newNode, expected) {
  1061. t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
  1062. }
  1063. // Step 3: update node attribute successfully.
  1064. node.Status.Allocatable[v1.ResourceMemory] = mem50m
  1065. allocatableResource := expected.AllocatableResource()
  1066. newAllocatableResource := &allocatableResource
  1067. newAllocatableResource.Memory = mem50m.Value()
  1068. expected.SetAllocatableResource(newAllocatableResource)
  1069. if err := cache.UpdateNode(nil, node); err != nil {
  1070. t.Error(err)
  1071. }
  1072. got, found = cache.nodes[node.Name]
  1073. if !found {
  1074. t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name)
  1075. }
  1076. if got.info.GetGeneration() <= expected.GetGeneration() {
  1077. t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration())
  1078. }
  1079. expected.SetGeneration(got.info.GetGeneration())
  1080. if !reflect.DeepEqual(got.info, expected) {
  1081. t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
  1082. }
  1083. // Check nodeTree after update
  1084. if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
  1085. t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
  1086. }
  1087. // Step 4: the node can be removed even if it still has pods.
  1088. if err := cache.RemoveNode(node); err != nil {
  1089. t.Error(err)
  1090. }
  1091. if _, err := cache.GetNodeInfo(node.Name); err == nil {
  1092. t.Errorf("The node %v should be removed.", node.Name)
  1093. }
  1094. // Check node is removed from nodeTree as well.
  1095. if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
  1096. t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
  1097. }
  1098. // Pods are still in the pods cache.
  1099. for _, p := range test.pods {
  1100. if _, err := cache.GetPod(p); err != nil {
  1101. t.Error(err)
  1102. }
  1103. }
  1104. // Step 5: removing pods for the removed node still succeeds.
  1105. for _, p := range test.pods {
  1106. if err := cache.RemovePod(p); err != nil {
  1107. t.Error(err)
  1108. }
  1109. if _, err := cache.GetPod(p); err == nil {
  1110. t.Errorf("pod %q still in cache", p.Name)
  1111. }
  1112. }
  1113. })
  1114. }
  1115. }
  1116. func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
  1117. // Create a few nodes to be used in tests.
  1118. nodes := []*v1.Node{}
  1119. for i := 0; i < 10; i++ {
  1120. node := &v1.Node{
  1121. ObjectMeta: metav1.ObjectMeta{
  1122. Name: fmt.Sprintf("test-node%v", i),
  1123. },
  1124. Status: v1.NodeStatus{
  1125. Allocatable: v1.ResourceList{
  1126. v1.ResourceCPU: resource.MustParse("1000m"),
  1127. v1.ResourceMemory: resource.MustParse("100m"),
  1128. },
  1129. },
  1130. }
  1131. nodes = append(nodes, node)
  1132. }
  1133. // Create a few nodes as updated versions of the above nodes
  1134. updatedNodes := []*v1.Node{}
  1135. for _, n := range nodes {
  1136. updatedNode := n.DeepCopy()
  1137. updatedNode.Status.Allocatable = v1.ResourceList{
  1138. v1.ResourceCPU: resource.MustParse("2000m"),
  1139. v1.ResourceMemory: resource.MustParse("500m"),
  1140. }
  1141. updatedNodes = append(updatedNodes, updatedNode)
  1142. }
  1143. // Create a few pods for tests.
  1144. pods := []*v1.Pod{}
  1145. for i := 0; i < 10; i++ {
  1146. pod := &v1.Pod{
  1147. ObjectMeta: metav1.ObjectMeta{
  1148. Name: fmt.Sprintf("test-pod%v", i),
  1149. Namespace: "test-ns",
  1150. UID: types.UID(fmt.Sprintf("test-puid%v", i)),
  1151. },
  1152. Spec: v1.PodSpec{
  1153. NodeName: fmt.Sprintf("test-node%v", i),
  1154. },
  1155. }
  1156. pods = append(pods, pod)
  1157. }
  1158. // Create a few pods as updated versions of the above pods.
  1159. updatedPods := []*v1.Pod{}
  1160. for _, p := range pods {
  1161. updatedPod := p.DeepCopy()
  1162. priority := int32(1000)
  1163. updatedPod.Spec.Priority = &priority
  1164. updatedPods = append(updatedPods, updatedPod)
  1165. }
  1166. // Add a couple of pods with affinity, on the first and seconds nodes.
  1167. podsWithAffinity := []*v1.Pod{}
  1168. for i := 0; i < 2; i++ {
  1169. pod := &v1.Pod{
  1170. ObjectMeta: metav1.ObjectMeta{
  1171. Name: fmt.Sprintf("test-pod%v", i),
  1172. Namespace: "test-ns",
  1173. UID: types.UID(fmt.Sprintf("test-puid%v", i)),
  1174. },
  1175. Spec: v1.PodSpec{
  1176. NodeName: fmt.Sprintf("test-node%v", i),
  1177. Affinity: &v1.Affinity{
  1178. PodAffinity: &v1.PodAffinity{},
  1179. },
  1180. },
  1181. }
  1182. podsWithAffinity = append(podsWithAffinity, pod)
  1183. }
  1184. var cache *schedulerCache
  1185. var snapshot *Snapshot
  1186. type operation = func()
  1187. addNode := func(i int) operation {
  1188. return func() {
  1189. if err := cache.AddNode(nodes[i]); err != nil {
  1190. t.Error(err)
  1191. }
  1192. }
  1193. }
  1194. removeNode := func(i int) operation {
  1195. return func() {
  1196. if err := cache.RemoveNode(nodes[i]); err != nil {
  1197. t.Error(err)
  1198. }
  1199. }
  1200. }
  1201. updateNode := func(i int) operation {
  1202. return func() {
  1203. if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil {
  1204. t.Error(err)
  1205. }
  1206. }
  1207. }
  1208. addPod := func(i int) operation {
  1209. return func() {
  1210. if err := cache.AddPod(pods[i]); err != nil {
  1211. t.Error(err)
  1212. }
  1213. }
  1214. }
  1215. addPodWithAffinity := func(i int) operation {
  1216. return func() {
  1217. if err := cache.AddPod(podsWithAffinity[i]); err != nil {
  1218. t.Error(err)
  1219. }
  1220. }
  1221. }
  1222. removePodWithAffinity := func(i int) operation {
  1223. return func() {
  1224. if err := cache.RemovePod(podsWithAffinity[i]); err != nil {
  1225. t.Error(err)
  1226. }
  1227. }
  1228. }
  1229. updatePod := func(i int) operation {
  1230. return func() {
  1231. if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
  1232. t.Error(err)
  1233. }
  1234. }
  1235. }
  1236. updateSnapshot := func() operation {
  1237. return func() {
  1238. cache.UpdateSnapshot(snapshot)
  1239. if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
  1240. t.Error(err)
  1241. }
  1242. }
  1243. }
  1244. tests := []struct {
  1245. name string
  1246. operations []operation
  1247. expected []*v1.Node
  1248. expectedHavePodsWithAffinity int
  1249. }{
  1250. {
  1251. name: "Empty cache",
  1252. operations: []operation{},
  1253. expected: []*v1.Node{},
  1254. },
  1255. {
  1256. name: "Single node",
  1257. operations: []operation{addNode(1)},
  1258. expected: []*v1.Node{nodes[1]},
  1259. },
  1260. {
  1261. name: "Add node, remove it, add it again",
  1262. operations: []operation{
  1263. addNode(1), updateSnapshot(), removeNode(1), addNode(1),
  1264. },
  1265. expected: []*v1.Node{nodes[1]},
  1266. },
  1267. {
  1268. name: "Add node and remove it in the same cycle, add it again",
  1269. operations: []operation{
  1270. addNode(1), updateSnapshot(), addNode(2), removeNode(1),
  1271. },
  1272. expected: []*v1.Node{nodes[2]},
  1273. },
  1274. {
  1275. name: "Add a few nodes, and snapshot in the middle",
  1276. operations: []operation{
  1277. addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2),
  1278. updateSnapshot(), addNode(3),
  1279. },
  1280. expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
  1281. },
  1282. {
  1283. name: "Add a few nodes, and snapshot in the end",
  1284. operations: []operation{
  1285. addNode(0), addNode(2), addNode(5), addNode(6),
  1286. },
  1287. expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
  1288. },
  1289. {
  1290. name: "Remove non-existing node",
  1291. operations: []operation{
  1292. addNode(0), addNode(1), updateSnapshot(),
  1293. },
  1294. expected: []*v1.Node{nodes[1], nodes[0]},
  1295. },
  1296. {
  1297. name: "Update some nodes",
  1298. operations: []operation{
  1299. addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
  1300. },
  1301. expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
  1302. },
  1303. {
  1304. name: "Add a few nodes, and remove all of them",
  1305. operations: []operation{
  1306. addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1307. removeNode(0), removeNode(2), removeNode(5), removeNode(6),
  1308. },
  1309. expected: []*v1.Node{},
  1310. },
  1311. {
  1312. name: "Add a few nodes, and remove some of them",
  1313. operations: []operation{
  1314. addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1315. removeNode(0), removeNode(6),
  1316. },
  1317. expected: []*v1.Node{nodes[5], nodes[2]},
  1318. },
  1319. {
  1320. name: "Add a few nodes, remove all of them, and add more",
  1321. operations: []operation{
  1322. addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1323. removeNode(2), removeNode(5), removeNode(6), updateSnapshot(),
  1324. addNode(7), addNode(9),
  1325. },
  1326. expected: []*v1.Node{nodes[9], nodes[7]},
  1327. },
  1328. {
  1329. name: "Update nodes in particular order",
  1330. operations: []operation{
  1331. addNode(8), updateNode(2), updateNode(8), updateSnapshot(),
  1332. addNode(1),
  1333. },
  1334. expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
  1335. },
  1336. {
  1337. name: "Add some nodes and some pods",
  1338. operations: []operation{
  1339. addNode(0), addNode(2), addNode(8), updateSnapshot(),
  1340. addPod(8), addPod(2),
  1341. },
  1342. expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
  1343. },
  1344. {
  1345. name: "Updating a pod moves its node to the head",
  1346. operations: []operation{
  1347. addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
  1348. },
  1349. expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
  1350. },
  1351. {
  1352. name: "Remove pod from non-existing node",
  1353. operations: []operation{
  1354. addNode(0), addPod(0), addNode(2), updateSnapshot(),
  1355. },
  1356. expected: []*v1.Node{nodes[2], nodes[0]},
  1357. },
  1358. {
  1359. name: "Add Pods with affinity",
  1360. operations: []operation{
  1361. addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1),
  1362. },
  1363. expected: []*v1.Node{nodes[1], nodes[0]},
  1364. expectedHavePodsWithAffinity: 1,
  1365. },
  1366. {
  1367. name: "Add multiple nodes with pods with affinity",
  1368. operations: []operation{
  1369. addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), addPodWithAffinity(1), updateSnapshot(),
  1370. },
  1371. expected: []*v1.Node{nodes[1], nodes[0]},
  1372. expectedHavePodsWithAffinity: 2,
  1373. },
  1374. {
  1375. name: "Add then Remove pods with affinity",
  1376. operations: []operation{
  1377. addNode(0), addNode(1), addPodWithAffinity(0), updateSnapshot(), removePodWithAffinity(0), updateSnapshot(),
  1378. },
  1379. expected: []*v1.Node{nodes[0], nodes[1]},
  1380. expectedHavePodsWithAffinity: 0,
  1381. },
  1382. }
  1383. for _, test := range tests {
  1384. t.Run(test.name, func(t *testing.T) {
  1385. cache = newSchedulerCache(time.Second, time.Second, nil)
  1386. snapshot = NewEmptySnapshot()
  1387. for _, op := range test.operations {
  1388. op()
  1389. }
  1390. if len(test.expected) != len(cache.nodes) {
  1391. t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes))
  1392. }
  1393. var i int
  1394. // Check that cache is in the expected state.
  1395. for node := cache.headNode; node != nil; node = node.next {
  1396. if node.info.Node().Name != test.expected[i].Name {
  1397. t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
  1398. }
  1399. i++
  1400. }
  1401. // Make sure we visited all the cached nodes in the above for loop.
  1402. if i != len(cache.nodes) {
  1403. t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
  1404. }
  1405. // Check number of nodes with pods with affinity
  1406. if len(snapshot.havePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity {
  1407. t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList))
  1408. }
  1409. // Always update the snapshot at the end of operations and compare it.
  1410. if err := cache.UpdateSnapshot(snapshot); err != nil {
  1411. t.Error(err)
  1412. }
  1413. if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
  1414. t.Error(err)
  1415. }
  1416. })
  1417. }
  1418. }
  1419. func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *Snapshot) error {
  1420. // Compare the map.
  1421. if len(snapshot.nodeInfoMap) != len(cache.nodes) {
  1422. return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoMap))
  1423. }
  1424. for name, ni := range cache.nodes {
  1425. if !reflect.DeepEqual(snapshot.nodeInfoMap[name], ni.info) {
  1426. return fmt.Errorf("unexpected node info for node %q. Expected: %v, got: %v", name, ni.info, snapshot.nodeInfoMap[name])
  1427. }
  1428. }
  1429. // Compare the lists.
  1430. if len(snapshot.nodeInfoList) != len(cache.nodes) {
  1431. return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoList))
  1432. }
  1433. expectedNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
  1434. expectedHavePodsWithAffinityNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
  1435. for i := 0; i < cache.nodeTree.numNodes; i++ {
  1436. nodeName := cache.nodeTree.next()
  1437. if n := snapshot.nodeInfoMap[nodeName]; n != nil {
  1438. expectedNodeInfoList = append(expectedNodeInfoList, n)
  1439. if len(n.PodsWithAffinity()) > 0 {
  1440. expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
  1441. }
  1442. } else {
  1443. return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName)
  1444. }
  1445. }
  1446. for i, expected := range expectedNodeInfoList {
  1447. got := snapshot.nodeInfoList[i]
  1448. if expected != got {
  1449. return fmt.Errorf("unexpected NodeInfo pointer in NodeInfoList. Expected: %p, got: %p", expected, got)
  1450. }
  1451. }
  1452. for i, expected := range expectedHavePodsWithAffinityNodeInfoList {
  1453. got := snapshot.havePodsWithAffinityNodeInfoList[i]
  1454. if expected != got {
  1455. return fmt.Errorf("unexpected NodeInfo pointer in HavePodsWithAffinityNodeInfoList. Expected: %p, got: %p", expected, got)
  1456. }
  1457. }
  1458. return nil
  1459. }
  1460. func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
  1461. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  1462. defer featuregatetesting.SetFeatureGateDuringTest(nil, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  1463. cache := setupCacheOf1kNodes30kPods(b)
  1464. b.ResetTimer()
  1465. for n := 0; n < b.N; n++ {
  1466. cachedNodes := NewEmptySnapshot()
  1467. cache.UpdateSnapshot(cachedNodes)
  1468. }
  1469. }
  1470. func BenchmarkExpirePods(b *testing.B) {
  1471. podNums := []int{
  1472. 100,
  1473. 1000,
  1474. 10000,
  1475. }
  1476. for _, podNum := range podNums {
  1477. name := fmt.Sprintf("%dPods", podNum)
  1478. b.Run(name, func(b *testing.B) {
  1479. benchmarkExpire(b, podNum)
  1480. })
  1481. }
  1482. }
  1483. func benchmarkExpire(b *testing.B, podNum int) {
  1484. now := time.Now()
  1485. for n := 0; n < b.N; n++ {
  1486. b.StopTimer()
  1487. cache := setupCacheWithAssumedPods(b, podNum, now)
  1488. b.StartTimer()
  1489. cache.cleanupAssumedPods(now.Add(2 * time.Second))
  1490. }
  1491. }
  1492. type testingMode interface {
  1493. Fatalf(format string, args ...interface{})
  1494. }
  1495. func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod {
  1496. req := v1.ResourceList{}
  1497. if cpu != "" {
  1498. req = v1.ResourceList{
  1499. v1.ResourceCPU: resource.MustParse(cpu),
  1500. v1.ResourceMemory: resource.MustParse(mem),
  1501. }
  1502. if extended != "" {
  1503. parts := strings.Split(extended, ":")
  1504. if len(parts) != 2 {
  1505. t.Fatalf("Invalid extended resource string: \"%s\"", extended)
  1506. }
  1507. req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1])
  1508. }
  1509. }
  1510. return &v1.Pod{
  1511. ObjectMeta: metav1.ObjectMeta{
  1512. UID: types.UID(objName),
  1513. Namespace: "node_info_cache_test",
  1514. Name: objName,
  1515. },
  1516. Spec: v1.PodSpec{
  1517. Containers: []v1.Container{{
  1518. Resources: v1.ResourceRequirements{
  1519. Requests: req,
  1520. },
  1521. Ports: ports,
  1522. }},
  1523. NodeName: nodeName,
  1524. },
  1525. }
  1526. }
  1527. func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
  1528. cache := newSchedulerCache(time.Second, time.Second, nil)
  1529. for i := 0; i < 1000; i++ {
  1530. nodeName := fmt.Sprintf("node-%d", i)
  1531. for j := 0; j < 30; j++ {
  1532. objName := fmt.Sprintf("%s-pod-%d", nodeName, j)
  1533. pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
  1534. if err := cache.AddPod(pod); err != nil {
  1535. b.Fatalf("AddPod failed: %v", err)
  1536. }
  1537. }
  1538. }
  1539. return cache
  1540. }
  1541. func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache {
  1542. cache := newSchedulerCache(time.Second, time.Second, nil)
  1543. for i := 0; i < podNum; i++ {
  1544. nodeName := fmt.Sprintf("node-%d", i/10)
  1545. objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
  1546. pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
  1547. err := assumeAndFinishBinding(cache, pod, assumedTime)
  1548. if err != nil {
  1549. b.Fatalf("assumePod failed: %v", err)
  1550. }
  1551. }
  1552. return cache
  1553. }
  1554. func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
  1555. if assumed, err := c.IsAssumedPod(p); err != nil {
  1556. return err
  1557. } else if assumed {
  1558. return errors.New("still assumed")
  1559. }
  1560. if _, err := c.GetPod(p); err == nil {
  1561. return errors.New("still in cache")
  1562. }
  1563. return nil
  1564. }