cache_test.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "reflect"
  17. "strings"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/resource"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. featuregatetesting "k8s.io/component-base/featuregate/testing"
  27. "k8s.io/kubernetes/pkg/features"
  28. priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
  29. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  30. )
  31. func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) {
  32. if (actual == nil) != (expected == nil) {
  33. t.Error("One of the actual or expeted is nil and the other is not!")
  34. }
  35. // Ignore generation field.
  36. if actual != nil {
  37. actual.info.SetGeneration(0)
  38. }
  39. if expected != nil {
  40. expected.SetGeneration(0)
  41. }
  42. if actual != nil && !reflect.DeepEqual(actual.info, expected) {
  43. t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected)
  44. }
  45. }
  46. type hostPortInfoParam struct {
  47. protocol, ip string
  48. port int32
  49. }
  50. type hostPortInfoBuilder struct {
  51. inputs []hostPortInfoParam
  52. }
  53. func newHostPortInfoBuilder() *hostPortInfoBuilder {
  54. return &hostPortInfoBuilder{}
  55. }
  56. func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder {
  57. b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port})
  58. return b
  59. }
  60. func (b *hostPortInfoBuilder) build() schedulernodeinfo.HostPortInfo {
  61. res := make(schedulernodeinfo.HostPortInfo)
  62. for _, param := range b.inputs {
  63. res.Add(param.ip, param.protocol, param.port)
  64. }
  65. return res
  66. }
  67. func newNodeInfo(requestedResource *schedulernodeinfo.Resource,
  68. nonzeroRequest *schedulernodeinfo.Resource,
  69. pods []*v1.Pod,
  70. usedPorts schedulernodeinfo.HostPortInfo,
  71. imageStates map[string]*schedulernodeinfo.ImageStateSummary,
  72. ) *schedulernodeinfo.NodeInfo {
  73. nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
  74. nodeInfo.SetRequestedResource(requestedResource)
  75. nodeInfo.SetNonZeroRequest(nonzeroRequest)
  76. nodeInfo.SetUsedPorts(usedPorts)
  77. nodeInfo.SetImageStates(imageStates)
  78. return nodeInfo
  79. }
  80. // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
  81. // on node level.
  82. func TestAssumePodScheduled(t *testing.T) {
  83. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  84. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  85. nodeName := "node"
  86. testPods := []*v1.Pod{
  87. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  88. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  89. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  90. makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  91. makeBasePod(t, nodeName, "test", "100m", "500", "example.com/foo:3", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  92. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "example.com/foo:5", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  93. makeBasePod(t, nodeName, "test", "100m", "500", "random-invalid-extended-key:100", []v1.ContainerPort{{}}),
  94. }
  95. tests := []struct {
  96. pods []*v1.Pod
  97. wNodeInfo *schedulernodeinfo.NodeInfo
  98. }{{
  99. pods: []*v1.Pod{testPods[0]},
  100. wNodeInfo: newNodeInfo(
  101. &schedulernodeinfo.Resource{
  102. MilliCPU: 100,
  103. Memory: 500,
  104. },
  105. &schedulernodeinfo.Resource{
  106. MilliCPU: 100,
  107. Memory: 500,
  108. },
  109. []*v1.Pod{testPods[0]},
  110. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  111. make(map[string]*schedulernodeinfo.ImageStateSummary),
  112. ),
  113. }, {
  114. pods: []*v1.Pod{testPods[1], testPods[2]},
  115. wNodeInfo: newNodeInfo(
  116. &schedulernodeinfo.Resource{
  117. MilliCPU: 300,
  118. Memory: 1524,
  119. },
  120. &schedulernodeinfo.Resource{
  121. MilliCPU: 300,
  122. Memory: 1524,
  123. },
  124. []*v1.Pod{testPods[1], testPods[2]},
  125. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
  126. make(map[string]*schedulernodeinfo.ImageStateSummary),
  127. ),
  128. }, { // test non-zero request
  129. pods: []*v1.Pod{testPods[3]},
  130. wNodeInfo: newNodeInfo(
  131. &schedulernodeinfo.Resource{
  132. MilliCPU: 0,
  133. Memory: 0,
  134. },
  135. &schedulernodeinfo.Resource{
  136. MilliCPU: priorityutil.DefaultMilliCPURequest,
  137. Memory: priorityutil.DefaultMemoryRequest,
  138. },
  139. []*v1.Pod{testPods[3]},
  140. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  141. make(map[string]*schedulernodeinfo.ImageStateSummary),
  142. ),
  143. }, {
  144. pods: []*v1.Pod{testPods[4]},
  145. wNodeInfo: newNodeInfo(
  146. &schedulernodeinfo.Resource{
  147. MilliCPU: 100,
  148. Memory: 500,
  149. ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3},
  150. },
  151. &schedulernodeinfo.Resource{
  152. MilliCPU: 100,
  153. Memory: 500,
  154. },
  155. []*v1.Pod{testPods[4]},
  156. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  157. make(map[string]*schedulernodeinfo.ImageStateSummary),
  158. ),
  159. }, {
  160. pods: []*v1.Pod{testPods[4], testPods[5]},
  161. wNodeInfo: newNodeInfo(
  162. &schedulernodeinfo.Resource{
  163. MilliCPU: 300,
  164. Memory: 1524,
  165. ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8},
  166. },
  167. &schedulernodeinfo.Resource{
  168. MilliCPU: 300,
  169. Memory: 1524,
  170. },
  171. []*v1.Pod{testPods[4], testPods[5]},
  172. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
  173. make(map[string]*schedulernodeinfo.ImageStateSummary),
  174. ),
  175. }, {
  176. pods: []*v1.Pod{testPods[6]},
  177. wNodeInfo: newNodeInfo(
  178. &schedulernodeinfo.Resource{
  179. MilliCPU: 100,
  180. Memory: 500,
  181. },
  182. &schedulernodeinfo.Resource{
  183. MilliCPU: 100,
  184. Memory: 500,
  185. },
  186. []*v1.Pod{testPods[6]},
  187. newHostPortInfoBuilder().build(),
  188. make(map[string]*schedulernodeinfo.ImageStateSummary),
  189. ),
  190. },
  191. }
  192. for i, tt := range tests {
  193. cache := newSchedulerCache(time.Second, time.Second, nil)
  194. for _, pod := range tt.pods {
  195. if err := cache.AssumePod(pod); err != nil {
  196. t.Fatalf("AssumePod failed: %v", err)
  197. }
  198. }
  199. n := cache.nodes[nodeName]
  200. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  201. for _, pod := range tt.pods {
  202. if err := cache.ForgetPod(pod); err != nil {
  203. t.Fatalf("ForgetPod failed: %v", err)
  204. }
  205. }
  206. if cache.nodes[nodeName] != nil {
  207. t.Errorf("NodeInfo should be cleaned for %s", nodeName)
  208. }
  209. }
  210. }
  211. type testExpirePodStruct struct {
  212. pod *v1.Pod
  213. assumedTime time.Time
  214. }
  215. func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
  216. if err := cache.AssumePod(pod); err != nil {
  217. return err
  218. }
  219. return cache.finishBinding(pod, assumedTime)
  220. }
  221. // TestExpirePod tests that assumed pods will be removed if expired.
  222. // The removal will be reflected in node info.
  223. func TestExpirePod(t *testing.T) {
  224. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  225. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  226. nodeName := "node"
  227. testPods := []*v1.Pod{
  228. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  229. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  230. }
  231. now := time.Now()
  232. ttl := 10 * time.Second
  233. tests := []struct {
  234. pods []*testExpirePodStruct
  235. cleanupTime time.Time
  236. wNodeInfo *schedulernodeinfo.NodeInfo
  237. }{{ // assumed pod would expires
  238. pods: []*testExpirePodStruct{
  239. {pod: testPods[0], assumedTime: now},
  240. },
  241. cleanupTime: now.Add(2 * ttl),
  242. wNodeInfo: nil,
  243. }, { // first one would expire, second one would not.
  244. pods: []*testExpirePodStruct{
  245. {pod: testPods[0], assumedTime: now},
  246. {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)},
  247. },
  248. cleanupTime: now.Add(2 * ttl),
  249. wNodeInfo: newNodeInfo(
  250. &schedulernodeinfo.Resource{
  251. MilliCPU: 200,
  252. Memory: 1024,
  253. },
  254. &schedulernodeinfo.Resource{
  255. MilliCPU: 200,
  256. Memory: 1024,
  257. },
  258. []*v1.Pod{testPods[1]},
  259. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  260. make(map[string]*schedulernodeinfo.ImageStateSummary),
  261. ),
  262. }}
  263. for i, tt := range tests {
  264. cache := newSchedulerCache(ttl, time.Second, nil)
  265. for _, pod := range tt.pods {
  266. if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil {
  267. t.Fatalf("assumePod failed: %v", err)
  268. }
  269. }
  270. // pods that have assumedTime + ttl < cleanupTime will get expired and removed
  271. cache.cleanupAssumedPods(tt.cleanupTime)
  272. n := cache.nodes[nodeName]
  273. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  274. }
  275. }
  276. // TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
  277. // The pod info should still exist after manually expiring unconfirmed pods.
  278. func TestAddPodWillConfirm(t *testing.T) {
  279. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  280. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  281. nodeName := "node"
  282. now := time.Now()
  283. ttl := 10 * time.Second
  284. testPods := []*v1.Pod{
  285. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  286. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  287. }
  288. tests := []struct {
  289. podsToAssume []*v1.Pod
  290. podsToAdd []*v1.Pod
  291. wNodeInfo *schedulernodeinfo.NodeInfo
  292. }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
  293. podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
  294. podsToAdd: []*v1.Pod{testPods[0]},
  295. wNodeInfo: newNodeInfo(
  296. &schedulernodeinfo.Resource{
  297. MilliCPU: 100,
  298. Memory: 500,
  299. },
  300. &schedulernodeinfo.Resource{
  301. MilliCPU: 100,
  302. Memory: 500,
  303. },
  304. []*v1.Pod{testPods[0]},
  305. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  306. make(map[string]*schedulernodeinfo.ImageStateSummary),
  307. ),
  308. }}
  309. for i, tt := range tests {
  310. cache := newSchedulerCache(ttl, time.Second, nil)
  311. for _, podToAssume := range tt.podsToAssume {
  312. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  313. t.Fatalf("assumePod failed: %v", err)
  314. }
  315. }
  316. for _, podToAdd := range tt.podsToAdd {
  317. if err := cache.AddPod(podToAdd); err != nil {
  318. t.Fatalf("AddPod failed: %v", err)
  319. }
  320. }
  321. cache.cleanupAssumedPods(now.Add(2 * ttl))
  322. // check after expiration. confirmed pods shouldn't be expired.
  323. n := cache.nodes[nodeName]
  324. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  325. }
  326. }
  327. func TestSnapshot(t *testing.T) {
  328. nodeName := "node"
  329. now := time.Now()
  330. ttl := 10 * time.Second
  331. testPods := []*v1.Pod{
  332. makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  333. makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  334. }
  335. tests := []struct {
  336. podsToAssume []*v1.Pod
  337. podsToAdd []*v1.Pod
  338. }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
  339. podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
  340. podsToAdd: []*v1.Pod{testPods[0]},
  341. }}
  342. for _, tt := range tests {
  343. cache := newSchedulerCache(ttl, time.Second, nil)
  344. for _, podToAssume := range tt.podsToAssume {
  345. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  346. t.Errorf("assumePod failed: %v", err)
  347. }
  348. }
  349. for _, podToAdd := range tt.podsToAdd {
  350. if err := cache.AddPod(podToAdd); err != nil {
  351. t.Errorf("AddPod failed: %v", err)
  352. }
  353. }
  354. snapshot := cache.Snapshot()
  355. if len(snapshot.Nodes) != len(cache.nodes) {
  356. t.Errorf("Unequal number of nodes in the cache and its snapshot. expeted: %v, got: %v", len(cache.nodes), len(snapshot.Nodes))
  357. }
  358. for name, ni := range snapshot.Nodes {
  359. nItem := cache.nodes[name]
  360. if !reflect.DeepEqual(ni, nItem.info) {
  361. t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni)
  362. }
  363. }
  364. if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
  365. t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
  366. }
  367. }
  368. }
  369. // TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
  370. func TestAddPodWillReplaceAssumed(t *testing.T) {
  371. now := time.Now()
  372. ttl := 10 * time.Second
  373. assumedPod := makeBasePod(t, "assumed-node-1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
  374. addedPod := makeBasePod(t, "actual-node", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
  375. updatedPod := makeBasePod(t, "actual-node", "test-1", "200m", "500", "", []v1.ContainerPort{{HostPort: 90}})
  376. tests := []struct {
  377. podsToAssume []*v1.Pod
  378. podsToAdd []*v1.Pod
  379. podsToUpdate [][]*v1.Pod
  380. wNodeInfo map[string]*schedulernodeinfo.NodeInfo
  381. }{{
  382. podsToAssume: []*v1.Pod{assumedPod.DeepCopy()},
  383. podsToAdd: []*v1.Pod{addedPod.DeepCopy()},
  384. podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}},
  385. wNodeInfo: map[string]*schedulernodeinfo.NodeInfo{
  386. "assumed-node": nil,
  387. "actual-node": newNodeInfo(
  388. &schedulernodeinfo.Resource{
  389. MilliCPU: 200,
  390. Memory: 500,
  391. },
  392. &schedulernodeinfo.Resource{
  393. MilliCPU: 200,
  394. Memory: 500,
  395. },
  396. []*v1.Pod{updatedPod.DeepCopy()},
  397. newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
  398. make(map[string]*schedulernodeinfo.ImageStateSummary),
  399. ),
  400. },
  401. }}
  402. for i, tt := range tests {
  403. cache := newSchedulerCache(ttl, time.Second, nil)
  404. for _, podToAssume := range tt.podsToAssume {
  405. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  406. t.Fatalf("assumePod failed: %v", err)
  407. }
  408. }
  409. for _, podToAdd := range tt.podsToAdd {
  410. if err := cache.AddPod(podToAdd); err != nil {
  411. t.Fatalf("AddPod failed: %v", err)
  412. }
  413. }
  414. for _, podToUpdate := range tt.podsToUpdate {
  415. if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil {
  416. t.Fatalf("UpdatePod failed: %v", err)
  417. }
  418. }
  419. for nodeName, expected := range tt.wNodeInfo {
  420. t.Log(nodeName)
  421. n := cache.nodes[nodeName]
  422. deepEqualWithoutGeneration(t, i, n, expected)
  423. }
  424. }
  425. }
  426. // TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
  427. func TestAddPodAfterExpiration(t *testing.T) {
  428. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  429. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  430. nodeName := "node"
  431. ttl := 10 * time.Second
  432. basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  433. tests := []struct {
  434. pod *v1.Pod
  435. wNodeInfo *schedulernodeinfo.NodeInfo
  436. }{{
  437. pod: basePod,
  438. wNodeInfo: newNodeInfo(
  439. &schedulernodeinfo.Resource{
  440. MilliCPU: 100,
  441. Memory: 500,
  442. },
  443. &schedulernodeinfo.Resource{
  444. MilliCPU: 100,
  445. Memory: 500,
  446. },
  447. []*v1.Pod{basePod},
  448. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  449. make(map[string]*schedulernodeinfo.ImageStateSummary),
  450. ),
  451. }}
  452. now := time.Now()
  453. for i, tt := range tests {
  454. cache := newSchedulerCache(ttl, time.Second, nil)
  455. if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
  456. t.Fatalf("assumePod failed: %v", err)
  457. }
  458. cache.cleanupAssumedPods(now.Add(2 * ttl))
  459. // It should be expired and removed.
  460. n := cache.nodes[nodeName]
  461. if n != nil {
  462. t.Errorf("#%d: expecting nil node info, but get=%v", i, n)
  463. }
  464. if err := cache.AddPod(tt.pod); err != nil {
  465. t.Fatalf("AddPod failed: %v", err)
  466. }
  467. // check after expiration. confirmed pods shouldn't be expired.
  468. n = cache.nodes[nodeName]
  469. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  470. }
  471. }
  472. // TestUpdatePod tests that a pod will be updated if added before.
  473. func TestUpdatePod(t *testing.T) {
  474. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  475. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  476. nodeName := "node"
  477. ttl := 10 * time.Second
  478. testPods := []*v1.Pod{
  479. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  480. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  481. }
  482. tests := []struct {
  483. podsToAdd []*v1.Pod
  484. podsToUpdate []*v1.Pod
  485. wNodeInfo []*schedulernodeinfo.NodeInfo
  486. }{{ // add a pod and then update it twice
  487. podsToAdd: []*v1.Pod{testPods[0]},
  488. podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
  489. wNodeInfo: []*schedulernodeinfo.NodeInfo{newNodeInfo(
  490. &schedulernodeinfo.Resource{
  491. MilliCPU: 200,
  492. Memory: 1024,
  493. },
  494. &schedulernodeinfo.Resource{
  495. MilliCPU: 200,
  496. Memory: 1024,
  497. },
  498. []*v1.Pod{testPods[1]},
  499. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  500. make(map[string]*schedulernodeinfo.ImageStateSummary),
  501. ), newNodeInfo(
  502. &schedulernodeinfo.Resource{
  503. MilliCPU: 100,
  504. Memory: 500,
  505. },
  506. &schedulernodeinfo.Resource{
  507. MilliCPU: 100,
  508. Memory: 500,
  509. },
  510. []*v1.Pod{testPods[0]},
  511. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  512. make(map[string]*schedulernodeinfo.ImageStateSummary),
  513. )},
  514. }}
  515. for _, tt := range tests {
  516. cache := newSchedulerCache(ttl, time.Second, nil)
  517. for _, podToAdd := range tt.podsToAdd {
  518. if err := cache.AddPod(podToAdd); err != nil {
  519. t.Fatalf("AddPod failed: %v", err)
  520. }
  521. }
  522. for i := range tt.podsToUpdate {
  523. if i == 0 {
  524. continue
  525. }
  526. if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil {
  527. t.Fatalf("UpdatePod failed: %v", err)
  528. }
  529. // check after expiration. confirmed pods shouldn't be expired.
  530. n := cache.nodes[nodeName]
  531. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
  532. }
  533. }
  534. }
  535. // TestUpdatePodAndGet tests get always return latest pod state
  536. func TestUpdatePodAndGet(t *testing.T) {
  537. nodeName := "node"
  538. ttl := 10 * time.Second
  539. testPods := []*v1.Pod{
  540. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  541. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  542. }
  543. tests := []struct {
  544. pod *v1.Pod
  545. podToUpdate *v1.Pod
  546. handler func(cache Cache, pod *v1.Pod) error
  547. assumePod bool
  548. }{
  549. {
  550. pod: testPods[0],
  551. podToUpdate: testPods[0],
  552. handler: func(cache Cache, pod *v1.Pod) error {
  553. return cache.AssumePod(pod)
  554. },
  555. assumePod: true,
  556. },
  557. {
  558. pod: testPods[0],
  559. podToUpdate: testPods[1],
  560. handler: func(cache Cache, pod *v1.Pod) error {
  561. return cache.AddPod(pod)
  562. },
  563. assumePod: false,
  564. },
  565. }
  566. for _, tt := range tests {
  567. cache := newSchedulerCache(ttl, time.Second, nil)
  568. if err := tt.handler(cache, tt.pod); err != nil {
  569. t.Fatalf("unexpected err: %v", err)
  570. }
  571. if !tt.assumePod {
  572. if err := cache.UpdatePod(tt.pod, tt.podToUpdate); err != nil {
  573. t.Fatalf("UpdatePod failed: %v", err)
  574. }
  575. }
  576. cachedPod, err := cache.GetPod(tt.pod)
  577. if err != nil {
  578. t.Fatalf("GetPod failed: %v", err)
  579. }
  580. if !reflect.DeepEqual(tt.podToUpdate, cachedPod) {
  581. t.Fatalf("pod get=%s, want=%s", cachedPod, tt.podToUpdate)
  582. }
  583. }
  584. }
  585. // TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
  586. func TestExpireAddUpdatePod(t *testing.T) {
  587. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  588. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  589. nodeName := "node"
  590. ttl := 10 * time.Second
  591. testPods := []*v1.Pod{
  592. makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
  593. makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
  594. }
  595. tests := []struct {
  596. podsToAssume []*v1.Pod
  597. podsToAdd []*v1.Pod
  598. podsToUpdate []*v1.Pod
  599. wNodeInfo []*schedulernodeinfo.NodeInfo
  600. }{{ // Pod is assumed, expired, and added. Then it would be updated twice.
  601. podsToAssume: []*v1.Pod{testPods[0]},
  602. podsToAdd: []*v1.Pod{testPods[0]},
  603. podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
  604. wNodeInfo: []*schedulernodeinfo.NodeInfo{newNodeInfo(
  605. &schedulernodeinfo.Resource{
  606. MilliCPU: 200,
  607. Memory: 1024,
  608. },
  609. &schedulernodeinfo.Resource{
  610. MilliCPU: 200,
  611. Memory: 1024,
  612. },
  613. []*v1.Pod{testPods[1]},
  614. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
  615. make(map[string]*schedulernodeinfo.ImageStateSummary),
  616. ), newNodeInfo(
  617. &schedulernodeinfo.Resource{
  618. MilliCPU: 100,
  619. Memory: 500,
  620. },
  621. &schedulernodeinfo.Resource{
  622. MilliCPU: 100,
  623. Memory: 500,
  624. },
  625. []*v1.Pod{testPods[0]},
  626. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  627. make(map[string]*schedulernodeinfo.ImageStateSummary),
  628. )},
  629. }}
  630. now := time.Now()
  631. for _, tt := range tests {
  632. cache := newSchedulerCache(ttl, time.Second, nil)
  633. for _, podToAssume := range tt.podsToAssume {
  634. if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
  635. t.Fatalf("assumePod failed: %v", err)
  636. }
  637. }
  638. cache.cleanupAssumedPods(now.Add(2 * ttl))
  639. for _, podToAdd := range tt.podsToAdd {
  640. if err := cache.AddPod(podToAdd); err != nil {
  641. t.Fatalf("AddPod failed: %v", err)
  642. }
  643. }
  644. for i := range tt.podsToUpdate {
  645. if i == 0 {
  646. continue
  647. }
  648. if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil {
  649. t.Fatalf("UpdatePod failed: %v", err)
  650. }
  651. // check after expiration. confirmed pods shouldn't be expired.
  652. n := cache.nodes[nodeName]
  653. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
  654. }
  655. }
  656. }
  657. func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod {
  658. req := v1.ResourceList{
  659. v1.ResourceEphemeralStorage: resource.MustParse(ephemeralStorage),
  660. }
  661. return &v1.Pod{
  662. ObjectMeta: metav1.ObjectMeta{
  663. Namespace: "default-namespace",
  664. Name: "pod-with-ephemeral-storage",
  665. UID: types.UID("pod-with-ephemeral-storage"),
  666. },
  667. Spec: v1.PodSpec{
  668. Containers: []v1.Container{{
  669. Resources: v1.ResourceRequirements{
  670. Requests: req,
  671. },
  672. }},
  673. NodeName: nodeName,
  674. },
  675. }
  676. }
  677. func TestEphemeralStorageResource(t *testing.T) {
  678. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  679. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  680. nodeName := "node"
  681. podE := makePodWithEphemeralStorage(nodeName, "500")
  682. tests := []struct {
  683. pod *v1.Pod
  684. wNodeInfo *schedulernodeinfo.NodeInfo
  685. }{
  686. {
  687. pod: podE,
  688. wNodeInfo: newNodeInfo(
  689. &schedulernodeinfo.Resource{
  690. EphemeralStorage: 500,
  691. },
  692. &schedulernodeinfo.Resource{
  693. MilliCPU: priorityutil.DefaultMilliCPURequest,
  694. Memory: priorityutil.DefaultMemoryRequest,
  695. },
  696. []*v1.Pod{podE},
  697. schedulernodeinfo.HostPortInfo{},
  698. make(map[string]*schedulernodeinfo.ImageStateSummary),
  699. ),
  700. },
  701. }
  702. for i, tt := range tests {
  703. cache := newSchedulerCache(time.Second, time.Second, nil)
  704. if err := cache.AddPod(tt.pod); err != nil {
  705. t.Fatalf("AddPod failed: %v", err)
  706. }
  707. n := cache.nodes[nodeName]
  708. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  709. if err := cache.RemovePod(tt.pod); err != nil {
  710. t.Fatalf("RemovePod failed: %v", err)
  711. }
  712. n = cache.nodes[nodeName]
  713. if n != nil {
  714. t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
  715. }
  716. }
  717. }
  718. // TestRemovePod tests after added pod is removed, its information should also be subtracted.
  719. func TestRemovePod(t *testing.T) {
  720. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  721. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  722. nodeName := "node"
  723. basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  724. tests := []struct {
  725. pod *v1.Pod
  726. wNodeInfo *schedulernodeinfo.NodeInfo
  727. }{{
  728. pod: basePod,
  729. wNodeInfo: newNodeInfo(
  730. &schedulernodeinfo.Resource{
  731. MilliCPU: 100,
  732. Memory: 500,
  733. },
  734. &schedulernodeinfo.Resource{
  735. MilliCPU: 100,
  736. Memory: 500,
  737. },
  738. []*v1.Pod{basePod},
  739. newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
  740. make(map[string]*schedulernodeinfo.ImageStateSummary),
  741. ),
  742. }}
  743. for i, tt := range tests {
  744. cache := newSchedulerCache(time.Second, time.Second, nil)
  745. if err := cache.AddPod(tt.pod); err != nil {
  746. t.Fatalf("AddPod failed: %v", err)
  747. }
  748. n := cache.nodes[nodeName]
  749. deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
  750. if err := cache.RemovePod(tt.pod); err != nil {
  751. t.Fatalf("RemovePod failed: %v", err)
  752. }
  753. n = cache.nodes[nodeName]
  754. if n != nil {
  755. t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
  756. }
  757. }
  758. }
  759. func TestForgetPod(t *testing.T) {
  760. nodeName := "node"
  761. basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
  762. tests := []struct {
  763. pods []*v1.Pod
  764. }{{
  765. pods: []*v1.Pod{basePod},
  766. }}
  767. now := time.Now()
  768. ttl := 10 * time.Second
  769. for i, tt := range tests {
  770. cache := newSchedulerCache(ttl, time.Second, nil)
  771. for _, pod := range tt.pods {
  772. if err := assumeAndFinishBinding(cache, pod, now); err != nil {
  773. t.Fatalf("assumePod failed: %v", err)
  774. }
  775. isAssumed, err := cache.IsAssumedPod(pod)
  776. if err != nil {
  777. t.Fatalf("IsAssumedPod failed: %v.", err)
  778. }
  779. if !isAssumed {
  780. t.Fatalf("Pod is expected to be assumed.")
  781. }
  782. assumedPod, err := cache.GetPod(pod)
  783. if err != nil {
  784. t.Fatalf("GetPod failed: %v.", err)
  785. }
  786. if assumedPod.Namespace != pod.Namespace {
  787. t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace)
  788. }
  789. if assumedPod.Name != pod.Name {
  790. t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name)
  791. }
  792. }
  793. for _, pod := range tt.pods {
  794. if err := cache.ForgetPod(pod); err != nil {
  795. t.Fatalf("ForgetPod failed: %v", err)
  796. }
  797. isAssumed, err := cache.IsAssumedPod(pod)
  798. if err != nil {
  799. t.Fatalf("IsAssumedPod failed: %v.", err)
  800. }
  801. if isAssumed {
  802. t.Fatalf("Pod is expected to be unassumed.")
  803. }
  804. }
  805. cache.cleanupAssumedPods(now.Add(2 * ttl))
  806. if n := cache.nodes[nodeName]; n != nil {
  807. t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
  808. }
  809. }
  810. }
  811. // getResourceRequest returns the resource request of all containers in Pods;
  812. // excluding initContainers.
  813. func getResourceRequest(pod *v1.Pod) v1.ResourceList {
  814. result := &schedulernodeinfo.Resource{}
  815. for _, container := range pod.Spec.Containers {
  816. result.Add(container.Resources.Requests)
  817. }
  818. return result.ResourceList()
  819. }
  820. // buildNodeInfo creates a NodeInfo by simulating node operations in cache.
  821. func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *schedulernodeinfo.NodeInfo {
  822. expected := schedulernodeinfo.NewNodeInfo()
  823. // Simulate SetNode.
  824. expected.SetNode(node)
  825. expected.SetAllocatableResource(schedulernodeinfo.NewResource(node.Status.Allocatable))
  826. expected.SetTaints(node.Spec.Taints)
  827. expected.SetGeneration(expected.GetGeneration() + 1)
  828. for _, pod := range pods {
  829. // Simulate AddPod
  830. pods := append(expected.Pods(), pod)
  831. expected.SetPods(pods)
  832. requestedResource := expected.RequestedResource()
  833. newRequestedResource := &requestedResource
  834. newRequestedResource.Add(getResourceRequest(pod))
  835. expected.SetRequestedResource(newRequestedResource)
  836. nonZeroRequest := expected.NonZeroRequest()
  837. newNonZeroRequest := &nonZeroRequest
  838. newNonZeroRequest.Add(getResourceRequest(pod))
  839. expected.SetNonZeroRequest(newNonZeroRequest)
  840. expected.UpdateUsedPorts(pod, true)
  841. expected.SetGeneration(expected.GetGeneration() + 1)
  842. }
  843. return expected
  844. }
  845. // TestNodeOperators tests node operations of cache, including add, update
  846. // and remove.
  847. func TestNodeOperators(t *testing.T) {
  848. // Test datas
  849. nodeName := "test-node"
  850. cpu1 := resource.MustParse("1000m")
  851. mem100m := resource.MustParse("100m")
  852. cpuHalf := resource.MustParse("500m")
  853. mem50m := resource.MustParse("50m")
  854. resourceFooName := "example.com/foo"
  855. resourceFoo := resource.MustParse("1")
  856. tests := []struct {
  857. node *v1.Node
  858. pods []*v1.Pod
  859. }{
  860. {
  861. node: &v1.Node{
  862. ObjectMeta: metav1.ObjectMeta{
  863. Name: nodeName,
  864. },
  865. Status: v1.NodeStatus{
  866. Allocatable: v1.ResourceList{
  867. v1.ResourceCPU: cpu1,
  868. v1.ResourceMemory: mem100m,
  869. v1.ResourceName(resourceFooName): resourceFoo,
  870. },
  871. },
  872. Spec: v1.NodeSpec{
  873. Taints: []v1.Taint{
  874. {
  875. Key: "test-key",
  876. Value: "test-value",
  877. Effect: v1.TaintEffectPreferNoSchedule,
  878. },
  879. },
  880. },
  881. },
  882. pods: []*v1.Pod{
  883. {
  884. ObjectMeta: metav1.ObjectMeta{
  885. Name: "pod1",
  886. UID: types.UID("pod1"),
  887. },
  888. Spec: v1.PodSpec{
  889. NodeName: nodeName,
  890. Containers: []v1.Container{
  891. {
  892. Resources: v1.ResourceRequirements{
  893. Requests: v1.ResourceList{
  894. v1.ResourceCPU: cpuHalf,
  895. v1.ResourceMemory: mem50m,
  896. },
  897. },
  898. Ports: []v1.ContainerPort{
  899. {
  900. Name: "http",
  901. HostPort: 80,
  902. ContainerPort: 80,
  903. },
  904. },
  905. },
  906. },
  907. },
  908. },
  909. },
  910. },
  911. {
  912. node: &v1.Node{
  913. ObjectMeta: metav1.ObjectMeta{
  914. Name: nodeName,
  915. },
  916. Status: v1.NodeStatus{
  917. Allocatable: v1.ResourceList{
  918. v1.ResourceCPU: cpu1,
  919. v1.ResourceMemory: mem100m,
  920. v1.ResourceName(resourceFooName): resourceFoo,
  921. },
  922. },
  923. Spec: v1.NodeSpec{
  924. Taints: []v1.Taint{
  925. {
  926. Key: "test-key",
  927. Value: "test-value",
  928. Effect: v1.TaintEffectPreferNoSchedule,
  929. },
  930. },
  931. },
  932. },
  933. pods: []*v1.Pod{
  934. {
  935. ObjectMeta: metav1.ObjectMeta{
  936. Name: "pod1",
  937. UID: types.UID("pod1"),
  938. },
  939. Spec: v1.PodSpec{
  940. NodeName: nodeName,
  941. Containers: []v1.Container{
  942. {
  943. Resources: v1.ResourceRequirements{
  944. Requests: v1.ResourceList{
  945. v1.ResourceCPU: cpuHalf,
  946. v1.ResourceMemory: mem50m,
  947. },
  948. },
  949. },
  950. },
  951. },
  952. },
  953. {
  954. ObjectMeta: metav1.ObjectMeta{
  955. Name: "pod2",
  956. UID: types.UID("pod2"),
  957. },
  958. Spec: v1.PodSpec{
  959. NodeName: nodeName,
  960. Containers: []v1.Container{
  961. {
  962. Resources: v1.ResourceRequirements{
  963. Requests: v1.ResourceList{
  964. v1.ResourceCPU: cpuHalf,
  965. v1.ResourceMemory: mem50m,
  966. },
  967. },
  968. },
  969. },
  970. },
  971. },
  972. },
  973. },
  974. }
  975. for _, test := range tests {
  976. expected := buildNodeInfo(test.node, test.pods)
  977. node := test.node
  978. cache := newSchedulerCache(time.Second, time.Second, nil)
  979. cache.AddNode(node)
  980. for _, pod := range test.pods {
  981. cache.AddPod(pod)
  982. }
  983. // Case 1: the node was added into cache successfully.
  984. got, found := cache.nodes[node.Name]
  985. if !found {
  986. t.Errorf("Failed to find node %v in internalcache.", node.Name)
  987. }
  988. if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
  989. t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
  990. }
  991. // Generations are globally unique. We check in our unit tests that they are incremented correctly.
  992. expected.SetGeneration(got.info.GetGeneration())
  993. if !reflect.DeepEqual(got.info, expected) {
  994. t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
  995. }
  996. // Case 2: dump cached nodes successfully.
  997. cachedNodes := NewNodeInfoSnapshot()
  998. cache.UpdateNodeInfoSnapshot(cachedNodes)
  999. newNode, found := cachedNodes.NodeInfoMap[node.Name]
  1000. if !found || len(cachedNodes.NodeInfoMap) != 1 {
  1001. t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
  1002. }
  1003. expected.SetGeneration(newNode.GetGeneration())
  1004. if !reflect.DeepEqual(newNode, expected) {
  1005. t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
  1006. }
  1007. // Case 3: update node attribute successfully.
  1008. node.Status.Allocatable[v1.ResourceMemory] = mem50m
  1009. allocatableResource := expected.AllocatableResource()
  1010. newAllocatableResource := &allocatableResource
  1011. newAllocatableResource.Memory = mem50m.Value()
  1012. expected.SetAllocatableResource(newAllocatableResource)
  1013. cache.UpdateNode(nil, node)
  1014. got, found = cache.nodes[node.Name]
  1015. if !found {
  1016. t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name)
  1017. }
  1018. if got.info.GetGeneration() <= expected.GetGeneration() {
  1019. t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration())
  1020. }
  1021. expected.SetGeneration(got.info.GetGeneration())
  1022. if !reflect.DeepEqual(got.info, expected) {
  1023. t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
  1024. }
  1025. // Check nodeTree after update
  1026. if cache.nodeTree.NumNodes() != 1 || cache.nodeTree.Next() != node.Name {
  1027. t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
  1028. }
  1029. // Case 4: the node can not be removed if pods is not empty.
  1030. cache.RemoveNode(node)
  1031. if _, found := cache.nodes[node.Name]; !found {
  1032. t.Errorf("The node %v should not be removed if pods is not empty.", node.Name)
  1033. }
  1034. // Check nodeTree after remove. The node should be removed from the nodeTree even if there are
  1035. // still pods on it.
  1036. if cache.nodeTree.NumNodes() != 0 || cache.nodeTree.Next() != "" {
  1037. t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
  1038. }
  1039. }
  1040. }
  1041. // TestSchedulerCache_UpdateNodeInfoSnapshot tests UpdateNodeInfoSnapshot function of cache.
  1042. func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
  1043. // Create a few nodes to be used in tests.
  1044. nodes := []*v1.Node{}
  1045. for i := 0; i < 10; i++ {
  1046. node := &v1.Node{
  1047. ObjectMeta: metav1.ObjectMeta{
  1048. Name: fmt.Sprintf("test-node%v", i),
  1049. },
  1050. Status: v1.NodeStatus{
  1051. Allocatable: v1.ResourceList{
  1052. v1.ResourceCPU: resource.MustParse("1000m"),
  1053. v1.ResourceMemory: resource.MustParse("100m"),
  1054. },
  1055. },
  1056. }
  1057. nodes = append(nodes, node)
  1058. }
  1059. // Create a few nodes as updated versions of the above nodes
  1060. updatedNodes := []*v1.Node{}
  1061. for _, n := range nodes {
  1062. updatedNode := n.DeepCopy()
  1063. updatedNode.Status.Allocatable = v1.ResourceList{
  1064. v1.ResourceCPU: resource.MustParse("2000m"),
  1065. v1.ResourceMemory: resource.MustParse("500m"),
  1066. }
  1067. updatedNodes = append(updatedNodes, updatedNode)
  1068. }
  1069. // Create a few pods for tests.
  1070. pods := []*v1.Pod{}
  1071. for i := 0; i < 10; i++ {
  1072. pod := &v1.Pod{
  1073. ObjectMeta: metav1.ObjectMeta{
  1074. Name: fmt.Sprintf("test-pod%v", i),
  1075. Namespace: "test-ns",
  1076. UID: types.UID(fmt.Sprintf("test-puid%v", i)),
  1077. },
  1078. Spec: v1.PodSpec{
  1079. NodeName: fmt.Sprintf("test-node%v", i),
  1080. },
  1081. }
  1082. pods = append(pods, pod)
  1083. }
  1084. // Create a few pods as updated versions of the above pods.
  1085. updatedPods := []*v1.Pod{}
  1086. for _, p := range pods {
  1087. updatedPod := p.DeepCopy()
  1088. priority := int32(1000)
  1089. updatedPod.Spec.Priority = &priority
  1090. updatedPods = append(updatedPods, updatedPod)
  1091. }
  1092. var cache *schedulerCache
  1093. var snapshot *NodeInfoSnapshot
  1094. type operation = func()
  1095. addNode := func(i int) operation {
  1096. return func() {
  1097. cache.AddNode(nodes[i])
  1098. }
  1099. }
  1100. removeNode := func(i int) operation {
  1101. return func() {
  1102. cache.RemoveNode(nodes[i])
  1103. }
  1104. }
  1105. updateNode := func(i int) operation {
  1106. return func() {
  1107. cache.UpdateNode(nodes[i], updatedNodes[i])
  1108. }
  1109. }
  1110. addPod := func(i int) operation {
  1111. return func() {
  1112. cache.AddPod(pods[i])
  1113. }
  1114. }
  1115. removePod := func(i int) operation {
  1116. return func() {
  1117. cache.RemovePod(pods[i])
  1118. }
  1119. }
  1120. updatePod := func(i int) operation {
  1121. return func() {
  1122. cache.UpdatePod(pods[i], updatedPods[i])
  1123. }
  1124. }
  1125. updateSnapshot := func() operation {
  1126. return func() {
  1127. cache.UpdateNodeInfoSnapshot(snapshot)
  1128. if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
  1129. t.Error(err)
  1130. }
  1131. }
  1132. }
  1133. tests := []struct {
  1134. name string
  1135. operations []operation
  1136. expected []*v1.Node
  1137. }{
  1138. {
  1139. name: "Empty cache",
  1140. operations: []operation{},
  1141. expected: []*v1.Node{},
  1142. },
  1143. {
  1144. name: "Single node",
  1145. operations: []operation{addNode(1)},
  1146. expected: []*v1.Node{nodes[1]},
  1147. },
  1148. {
  1149. name: "Add node, remove it, add it again",
  1150. operations: []operation{
  1151. addNode(1), updateSnapshot(), removeNode(1), addNode(1),
  1152. },
  1153. expected: []*v1.Node{nodes[1]},
  1154. },
  1155. {
  1156. name: "Add a few nodes, and snapshot in the middle",
  1157. operations: []operation{
  1158. addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2),
  1159. updateSnapshot(), addNode(3),
  1160. },
  1161. expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
  1162. },
  1163. {
  1164. name: "Add a few nodes, and snapshot in the end",
  1165. operations: []operation{
  1166. addNode(0), addNode(2), addNode(5), addNode(6),
  1167. },
  1168. expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
  1169. },
  1170. {
  1171. name: "Remove non-existing node",
  1172. operations: []operation{
  1173. addNode(0), addNode(1), updateSnapshot(), removeNode(8),
  1174. },
  1175. expected: []*v1.Node{nodes[1], nodes[0]},
  1176. },
  1177. {
  1178. name: "Update some nodes",
  1179. operations: []operation{
  1180. addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
  1181. },
  1182. expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
  1183. },
  1184. {
  1185. name: "Add a few nodes, and remove all of them",
  1186. operations: []operation{
  1187. addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1188. removeNode(0), removeNode(2), removeNode(5), removeNode(6),
  1189. },
  1190. expected: []*v1.Node{},
  1191. },
  1192. {
  1193. name: "Add a few nodes, and remove some of them",
  1194. operations: []operation{
  1195. addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1196. removeNode(0), removeNode(6),
  1197. },
  1198. expected: []*v1.Node{nodes[5], nodes[2]},
  1199. },
  1200. {
  1201. name: "Add a few nodes, remove all of them, and add more",
  1202. operations: []operation{
  1203. addNode(2), addNode(5), addNode(6), updateSnapshot(),
  1204. removeNode(2), removeNode(5), removeNode(6), updateSnapshot(),
  1205. addNode(7), addNode(9),
  1206. },
  1207. expected: []*v1.Node{nodes[9], nodes[7]},
  1208. },
  1209. {
  1210. name: "Update nodes in particular order",
  1211. operations: []operation{
  1212. addNode(8), updateNode(2), updateNode(8), updateSnapshot(),
  1213. addNode(1),
  1214. },
  1215. expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
  1216. },
  1217. {
  1218. name: "Add some nodes and some pods",
  1219. operations: []operation{
  1220. addNode(0), addNode(2), addNode(8), updateSnapshot(),
  1221. addPod(8), addPod(2),
  1222. },
  1223. expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
  1224. },
  1225. {
  1226. name: "Updating a pod moves its node to the head",
  1227. operations: []operation{
  1228. addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
  1229. },
  1230. expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
  1231. },
  1232. {
  1233. name: "Remove pod from non-existing node",
  1234. operations: []operation{
  1235. addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3),
  1236. },
  1237. expected: []*v1.Node{nodes[2], nodes[0]},
  1238. },
  1239. }
  1240. for _, test := range tests {
  1241. t.Run(test.name, func(t *testing.T) {
  1242. cache = newSchedulerCache(time.Second, time.Second, nil)
  1243. snapshot = NewNodeInfoSnapshot()
  1244. for _, op := range test.operations {
  1245. op()
  1246. }
  1247. if len(test.expected) != len(cache.nodes) {
  1248. t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes))
  1249. }
  1250. var i int
  1251. // Check that cache is in the expected state.
  1252. for node := cache.headNode; node != nil; node = node.next {
  1253. if node.info.Node().Name != test.expected[i].Name {
  1254. t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
  1255. }
  1256. i++
  1257. }
  1258. // Make sure we visited all the cached nodes in the above for loop.
  1259. if i != len(cache.nodes) {
  1260. t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
  1261. }
  1262. // Always update the snapshot at the end of operations and compare it.
  1263. cache.UpdateNodeInfoSnapshot(snapshot)
  1264. if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
  1265. t.Error(err)
  1266. }
  1267. })
  1268. }
  1269. }
  1270. func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error {
  1271. if len(snapshot.NodeInfoMap) != len(cache.nodes) {
  1272. return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap))
  1273. }
  1274. for name, ni := range cache.nodes {
  1275. if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) {
  1276. return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name])
  1277. }
  1278. }
  1279. return nil
  1280. }
  1281. func BenchmarkList1kNodes30kPods(b *testing.B) {
  1282. cache := setupCacheOf1kNodes30kPods(b)
  1283. b.ResetTimer()
  1284. for n := 0; n < b.N; n++ {
  1285. cache.List(labels.Everything())
  1286. }
  1287. }
  1288. func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
  1289. // Enable volumesOnNodeForBalancing to do balanced resource allocation
  1290. defer featuregatetesting.SetFeatureGateDuringTest(nil, utilfeature.DefaultFeatureGate, features.BalanceAttachedNodeVolumes, true)()
  1291. cache := setupCacheOf1kNodes30kPods(b)
  1292. b.ResetTimer()
  1293. for n := 0; n < b.N; n++ {
  1294. cachedNodes := NewNodeInfoSnapshot()
  1295. cache.UpdateNodeInfoSnapshot(cachedNodes)
  1296. }
  1297. }
  1298. func BenchmarkExpirePods(b *testing.B) {
  1299. podNums := []int{
  1300. 100,
  1301. 1000,
  1302. 10000,
  1303. }
  1304. for _, podNum := range podNums {
  1305. name := fmt.Sprintf("%dPods", podNum)
  1306. b.Run(name, func(b *testing.B) {
  1307. benchmarkExpire(b, podNum)
  1308. })
  1309. }
  1310. }
  1311. func benchmarkExpire(b *testing.B, podNum int) {
  1312. now := time.Now()
  1313. for n := 0; n < b.N; n++ {
  1314. b.StopTimer()
  1315. cache := setupCacheWithAssumedPods(b, podNum, now)
  1316. b.StartTimer()
  1317. cache.cleanupAssumedPods(now.Add(2 * time.Second))
  1318. }
  1319. }
  1320. type testingMode interface {
  1321. Fatalf(format string, args ...interface{})
  1322. }
  1323. func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod {
  1324. req := v1.ResourceList{}
  1325. if cpu != "" {
  1326. req = v1.ResourceList{
  1327. v1.ResourceCPU: resource.MustParse(cpu),
  1328. v1.ResourceMemory: resource.MustParse(mem),
  1329. }
  1330. if extended != "" {
  1331. parts := strings.Split(extended, ":")
  1332. if len(parts) != 2 {
  1333. t.Fatalf("Invalid extended resource string: \"%s\"", extended)
  1334. }
  1335. req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1])
  1336. }
  1337. }
  1338. return &v1.Pod{
  1339. ObjectMeta: metav1.ObjectMeta{
  1340. UID: types.UID(objName),
  1341. Namespace: "node_info_cache_test",
  1342. Name: objName,
  1343. },
  1344. Spec: v1.PodSpec{
  1345. Containers: []v1.Container{{
  1346. Resources: v1.ResourceRequirements{
  1347. Requests: req,
  1348. },
  1349. Ports: ports,
  1350. }},
  1351. NodeName: nodeName,
  1352. },
  1353. }
  1354. }
  1355. func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
  1356. cache := newSchedulerCache(time.Second, time.Second, nil)
  1357. for i := 0; i < 1000; i++ {
  1358. nodeName := fmt.Sprintf("node-%d", i)
  1359. for j := 0; j < 30; j++ {
  1360. objName := fmt.Sprintf("%s-pod-%d", nodeName, j)
  1361. pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
  1362. if err := cache.AddPod(pod); err != nil {
  1363. b.Fatalf("AddPod failed: %v", err)
  1364. }
  1365. }
  1366. }
  1367. return cache
  1368. }
  1369. func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache {
  1370. cache := newSchedulerCache(time.Second, time.Second, nil)
  1371. for i := 0; i < podNum; i++ {
  1372. nodeName := fmt.Sprintf("node-%d", i/10)
  1373. objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
  1374. pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil)
  1375. err := assumeAndFinishBinding(cache, pod, assumedTime)
  1376. if err != nil {
  1377. b.Fatalf("assumePod failed: %v", err)
  1378. }
  1379. }
  1380. return cache
  1381. }