replica_set_test.go 67 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package replicaset
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "math/rand"
  19. "net/http/httptest"
  20. "net/url"
  21. "reflect"
  22. "sort"
  23. "strings"
  24. "sync"
  25. "testing"
  26. "time"
  27. apps "k8s.io/api/apps/v1"
  28. "k8s.io/api/core/v1"
  29. apiequality "k8s.io/apimachinery/pkg/api/equality"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/util/sets"
  34. "k8s.io/apimachinery/pkg/util/uuid"
  35. "k8s.io/apimachinery/pkg/util/wait"
  36. "k8s.io/apimachinery/pkg/watch"
  37. "k8s.io/client-go/informers"
  38. clientset "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/kubernetes/fake"
  40. restclient "k8s.io/client-go/rest"
  41. core "k8s.io/client-go/testing"
  42. "k8s.io/client-go/tools/cache"
  43. utiltesting "k8s.io/client-go/util/testing"
  44. "k8s.io/client-go/util/workqueue"
  45. "k8s.io/klog"
  46. "k8s.io/kubernetes/pkg/controller"
  47. . "k8s.io/kubernetes/pkg/controller/testutil"
  48. "k8s.io/kubernetes/pkg/securitycontext"
  49. )
  50. var (
  51. informerSyncTimeout = 30 * time.Second
  52. )
  53. func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) {
  54. informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  55. ret := NewReplicaSetController(
  56. informers.Apps().V1().ReplicaSets(),
  57. informers.Core().V1().Pods(),
  58. client,
  59. burstReplicas,
  60. )
  61. ret.podListerSynced = alwaysReady
  62. ret.rsListerSynced = alwaysReady
  63. return ret, informers
  64. }
  65. func skipListerFunc(verb string, url url.URL) bool {
  66. if verb != "GET" {
  67. return false
  68. }
  69. if strings.HasSuffix(url.Path, "/pods") || strings.Contains(url.Path, "/replicasets") {
  70. return true
  71. }
  72. return false
  73. }
  74. var alwaysReady = func() bool { return true }
  75. func newReplicaSet(replicas int, selectorMap map[string]string) *apps.ReplicaSet {
  76. isController := true
  77. rs := &apps.ReplicaSet{
  78. TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "ReplicaSet"},
  79. ObjectMeta: metav1.ObjectMeta{
  80. UID: uuid.NewUUID(),
  81. Name: "foobar",
  82. Namespace: metav1.NamespaceDefault,
  83. OwnerReferences: []metav1.OwnerReference{
  84. {UID: "123", Controller: &isController},
  85. },
  86. ResourceVersion: "18",
  87. },
  88. Spec: apps.ReplicaSetSpec{
  89. Replicas: func() *int32 { i := int32(replicas); return &i }(),
  90. Selector: &metav1.LabelSelector{MatchLabels: selectorMap},
  91. Template: v1.PodTemplateSpec{
  92. ObjectMeta: metav1.ObjectMeta{
  93. Labels: map[string]string{
  94. "name": "foo",
  95. "type": "production",
  96. },
  97. },
  98. Spec: v1.PodSpec{
  99. Containers: []v1.Container{
  100. {
  101. Image: "foo/bar",
  102. TerminationMessagePath: v1.TerminationMessagePathDefault,
  103. ImagePullPolicy: v1.PullIfNotPresent,
  104. SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
  105. },
  106. },
  107. RestartPolicy: v1.RestartPolicyAlways,
  108. DNSPolicy: v1.DNSDefault,
  109. NodeSelector: map[string]string{
  110. "baz": "blah",
  111. },
  112. },
  113. },
  114. },
  115. }
  116. return rs
  117. }
  118. // create a pod with the given phase for the given rs (same selectors and namespace)
  119. func newPod(name string, rs *apps.ReplicaSet, status v1.PodPhase, lastTransitionTime *metav1.Time, properlyOwned bool) *v1.Pod {
  120. var conditions []v1.PodCondition
  121. if status == v1.PodRunning {
  122. condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  123. if lastTransitionTime != nil {
  124. condition.LastTransitionTime = *lastTransitionTime
  125. }
  126. conditions = append(conditions, condition)
  127. }
  128. var controllerReference metav1.OwnerReference
  129. if properlyOwned {
  130. var trueVar = true
  131. controllerReference = metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
  132. }
  133. return &v1.Pod{
  134. ObjectMeta: metav1.ObjectMeta{
  135. UID: uuid.NewUUID(),
  136. Name: name,
  137. Namespace: rs.Namespace,
  138. Labels: rs.Spec.Selector.MatchLabels,
  139. OwnerReferences: []metav1.OwnerReference{controllerReference},
  140. },
  141. Status: v1.PodStatus{Phase: status, Conditions: conditions},
  142. }
  143. }
  144. // create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store.
  145. func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[string]string, rs *apps.ReplicaSet, name string) *v1.PodList {
  146. pods := []v1.Pod{}
  147. var trueVar = true
  148. controllerReference := metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
  149. for i := 0; i < count; i++ {
  150. pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status, nil, false)
  151. pod.ObjectMeta.Labels = labelMap
  152. pod.OwnerReferences = []metav1.OwnerReference{controllerReference}
  153. if store != nil {
  154. store.Add(pod)
  155. }
  156. pods = append(pods, *pod)
  157. }
  158. return &v1.PodList{
  159. Items: pods,
  160. }
  161. }
  162. // processSync initiates a sync via processNextWorkItem() to test behavior that
  163. // depends on both functions (such as re-queueing on sync error).
  164. func processSync(rsc *ReplicaSetController, key string) error {
  165. // Save old syncHandler and replace with one that captures the error.
  166. oldSyncHandler := rsc.syncHandler
  167. defer func() {
  168. rsc.syncHandler = oldSyncHandler
  169. }()
  170. var syncErr error
  171. rsc.syncHandler = func(key string) error {
  172. syncErr = oldSyncHandler(key)
  173. return syncErr
  174. }
  175. rsc.queue.Add(key)
  176. rsc.processNextWorkItem()
  177. return syncErr
  178. }
  179. func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) error {
  180. if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
  181. return fmt.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
  182. }
  183. if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
  184. return fmt.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
  185. }
  186. if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
  187. return fmt.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
  188. }
  189. return nil
  190. }
  191. func TestSyncReplicaSetDoesNothing(t *testing.T) {
  192. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  193. fakePodControl := controller.FakePodControl{}
  194. stopCh := make(chan struct{})
  195. defer close(stopCh)
  196. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  197. // 2 running pods, a controller with 2 replicas, sync is a no-op
  198. labelMap := map[string]string{"foo": "bar"}
  199. rsSpec := newReplicaSet(2, labelMap)
  200. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  201. newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod")
  202. manager.podControl = &fakePodControl
  203. manager.syncReplicaSet(GetKey(rsSpec, t))
  204. err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
  205. if err != nil {
  206. t.Fatal(err)
  207. }
  208. }
  209. func TestDeleteFinalStateUnknown(t *testing.T) {
  210. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  211. fakePodControl := controller.FakePodControl{}
  212. stopCh := make(chan struct{})
  213. defer close(stopCh)
  214. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  215. manager.podControl = &fakePodControl
  216. received := make(chan string)
  217. manager.syncHandler = func(key string) error {
  218. received <- key
  219. return nil
  220. }
  221. // The DeletedFinalStateUnknown object should cause the ReplicaSet manager to insert
  222. // the controller matching the selectors of the deleted pod into the work queue.
  223. labelMap := map[string]string{"foo": "bar"}
  224. rsSpec := newReplicaSet(1, labelMap)
  225. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  226. pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod")
  227. manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
  228. go manager.worker()
  229. expected := GetKey(rsSpec, t)
  230. select {
  231. case key := <-received:
  232. if key != expected {
  233. t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected)
  234. }
  235. case <-time.After(wait.ForeverTestTimeout):
  236. t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
  237. }
  238. }
  239. // Tell the rs to create 100 replicas, but simulate a limit (like a quota limit)
  240. // of 10, and verify that the rs doesn't make 100 create calls per sync pass
  241. func TestSyncReplicaSetCreateFailures(t *testing.T) {
  242. fakePodControl := controller.FakePodControl{}
  243. fakePodControl.CreateLimit = 10
  244. labelMap := map[string]string{"foo": "bar"}
  245. rs := newReplicaSet(fakePodControl.CreateLimit*10, labelMap)
  246. client := fake.NewSimpleClientset(rs)
  247. stopCh := make(chan struct{})
  248. defer close(stopCh)
  249. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  250. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  251. manager.podControl = &fakePodControl
  252. manager.syncReplicaSet(GetKey(rs, t))
  253. err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0)
  254. if err != nil {
  255. t.Fatal(err)
  256. }
  257. expectedLimit := 0
  258. for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ {
  259. expectedLimit += controller.SlowStartInitialBatchSize << pass
  260. }
  261. if fakePodControl.CreateCallCount > expectedLimit {
  262. t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
  263. }
  264. }
  265. func TestSyncReplicaSetDormancy(t *testing.T) {
  266. // Setup a test server so we can lie about the current state of pods
  267. fakeHandler := utiltesting.FakeHandler{
  268. StatusCode: 200,
  269. ResponseBody: "{}",
  270. SkipRequestFn: skipListerFunc,
  271. T: t,
  272. }
  273. testServer := httptest.NewServer(&fakeHandler)
  274. defer testServer.Close()
  275. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  276. fakePodControl := controller.FakePodControl{}
  277. stopCh := make(chan struct{})
  278. defer close(stopCh)
  279. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  280. manager.podControl = &fakePodControl
  281. labelMap := map[string]string{"foo": "bar"}
  282. rsSpec := newReplicaSet(2, labelMap)
  283. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  284. newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap, rsSpec, "pod")
  285. // Creates a replica and sets expectations
  286. rsSpec.Status.Replicas = 1
  287. rsSpec.Status.ReadyReplicas = 1
  288. rsSpec.Status.AvailableReplicas = 1
  289. manager.syncReplicaSet(GetKey(rsSpec, t))
  290. err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. // Expectations prevents replicas but not an update on status
  295. rsSpec.Status.Replicas = 0
  296. rsSpec.Status.ReadyReplicas = 0
  297. rsSpec.Status.AvailableReplicas = 0
  298. fakePodControl.Clear()
  299. manager.syncReplicaSet(GetKey(rsSpec, t))
  300. err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. // Get the key for the controller
  305. rsKey, err := controller.KeyFunc(rsSpec)
  306. if err != nil {
  307. t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err)
  308. }
  309. // Lowering expectations should lead to a sync that creates a replica, however the
  310. // fakePodControl error will prevent this, leaving expectations at 0, 0
  311. manager.expectations.CreationObserved(rsKey)
  312. rsSpec.Status.Replicas = 1
  313. rsSpec.Status.ReadyReplicas = 1
  314. rsSpec.Status.AvailableReplicas = 1
  315. fakePodControl.Clear()
  316. fakePodControl.Err = fmt.Errorf("fake Error")
  317. manager.syncReplicaSet(GetKey(rsSpec, t))
  318. err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. // This replica should not need a Lowering of expectations, since the previous create failed
  323. fakePodControl.Clear()
  324. fakePodControl.Err = nil
  325. manager.syncReplicaSet(GetKey(rsSpec, t))
  326. err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. // 2 PUT for the ReplicaSet status during dormancy window.
  331. // Note that the pod creates go through pod control so they're not recorded.
  332. fakeHandler.ValidateRequestCount(t, 2)
  333. }
  334. func TestGetReplicaSetsWithSameController(t *testing.T) {
  335. someRS := newReplicaSet(1, map[string]string{"foo": "bar"})
  336. someRS.Name = "rs1"
  337. relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"})
  338. relatedRS.Name = "rs2"
  339. unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"})
  340. unrelatedRS.Name = "rs3"
  341. unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456"
  342. pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"})
  343. pendingDeletionRS.Name = "rs4"
  344. pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789"
  345. now := metav1.Now()
  346. pendingDeletionRS.DeletionTimestamp = &now
  347. stopCh := make(chan struct{})
  348. defer close(stopCh)
  349. manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
  350. testCases := []struct {
  351. name string
  352. rss []*apps.ReplicaSet
  353. rs *apps.ReplicaSet
  354. expectedRSs []*apps.ReplicaSet
  355. }{
  356. {
  357. name: "expect to get back a ReplicaSet that is pending deletion",
  358. rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS},
  359. rs: pendingDeletionRS,
  360. expectedRSs: []*apps.ReplicaSet{pendingDeletionRS},
  361. },
  362. {
  363. name: "expect to get back only the given ReplicaSet if there is no related ReplicaSet",
  364. rss: []*apps.ReplicaSet{someRS, unrelatedRS},
  365. rs: someRS,
  366. expectedRSs: []*apps.ReplicaSet{someRS},
  367. },
  368. {
  369. name: "expect to get back the given ReplicaSet as well as any related ReplicaSet but not an unrelated ReplicaSet",
  370. rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS},
  371. rs: someRS,
  372. expectedRSs: []*apps.ReplicaSet{someRS, relatedRS},
  373. },
  374. }
  375. for _, c := range testCases {
  376. for _, r := range c.rss {
  377. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r)
  378. }
  379. actualRSs := manager.getReplicaSetsWithSameController(c.rs)
  380. var actualRSNames, expectedRSNames []string
  381. for _, r := range actualRSs {
  382. actualRSNames = append(actualRSNames, r.Name)
  383. }
  384. for _, r := range c.expectedRSs {
  385. expectedRSNames = append(expectedRSNames, r.Name)
  386. }
  387. sort.Strings(actualRSNames)
  388. sort.Strings(expectedRSNames)
  389. if !reflect.DeepEqual(actualRSNames, expectedRSNames) {
  390. t.Errorf("Got [%s]; expected [%s]", strings.Join(actualRSNames, ", "), strings.Join(expectedRSNames, ", "))
  391. }
  392. }
  393. }
  394. func TestPodControllerLookup(t *testing.T) {
  395. stopCh := make(chan struct{})
  396. defer close(stopCh)
  397. manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
  398. testCases := []struct {
  399. inRSs []*apps.ReplicaSet
  400. pod *v1.Pod
  401. outRSName string
  402. }{
  403. // pods without labels don't match any ReplicaSets
  404. {
  405. inRSs: []*apps.ReplicaSet{
  406. {ObjectMeta: metav1.ObjectMeta{Name: "basic"}}},
  407. pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll}},
  408. outRSName: "",
  409. },
  410. // Matching labels, not namespace
  411. {
  412. inRSs: []*apps.ReplicaSet{
  413. {
  414. ObjectMeta: metav1.ObjectMeta{Name: "foo"},
  415. Spec: apps.ReplicaSetSpec{
  416. Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
  417. },
  418. },
  419. },
  420. pod: &v1.Pod{
  421. ObjectMeta: metav1.ObjectMeta{
  422. Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  423. outRSName: "",
  424. },
  425. // Matching ns and labels returns the key to the ReplicaSet, not the ReplicaSet name
  426. {
  427. inRSs: []*apps.ReplicaSet{
  428. {
  429. ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"},
  430. Spec: apps.ReplicaSetSpec{
  431. Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
  432. },
  433. },
  434. },
  435. pod: &v1.Pod{
  436. ObjectMeta: metav1.ObjectMeta{
  437. Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  438. outRSName: "bar",
  439. },
  440. }
  441. for _, c := range testCases {
  442. for _, r := range c.inRSs {
  443. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r)
  444. }
  445. if rss := manager.getPodReplicaSets(c.pod); rss != nil {
  446. if len(rss) != 1 {
  447. t.Errorf("len(rss) = %v, want %v", len(rss), 1)
  448. continue
  449. }
  450. rs := rss[0]
  451. if c.outRSName != rs.Name {
  452. t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName)
  453. }
  454. } else if c.outRSName != "" {
  455. t.Errorf("Expected a replica set %v pod %v, found none", c.outRSName, c.pod.Name)
  456. }
  457. }
  458. }
  459. // byName sorts pods by their names.
  460. type byName []*v1.Pod
  461. func (pods byName) Len() int { return len(pods) }
  462. func (pods byName) Swap(i, j int) { pods[i], pods[j] = pods[j], pods[i] }
  463. func (pods byName) Less(i, j int) bool { return pods[i].Name < pods[j].Name }
  464. func TestRelatedPodsLookup(t *testing.T) {
  465. someRS := newReplicaSet(1, map[string]string{"foo": "bar"})
  466. someRS.Name = "foo1"
  467. relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"})
  468. relatedRS.Name = "foo2"
  469. unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"})
  470. unrelatedRS.Name = "bar1"
  471. unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456"
  472. pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"})
  473. pendingDeletionRS.Name = "foo3"
  474. pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789"
  475. now := metav1.Now()
  476. pendingDeletionRS.DeletionTimestamp = &now
  477. pod1 := newPod("pod1", someRS, v1.PodRunning, nil, true)
  478. pod2 := newPod("pod2", someRS, v1.PodRunning, nil, true)
  479. pod3 := newPod("pod3", relatedRS, v1.PodRunning, nil, true)
  480. pod4 := newPod("pod4", unrelatedRS, v1.PodRunning, nil, true)
  481. stopCh := make(chan struct{})
  482. defer close(stopCh)
  483. manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
  484. testCases := []struct {
  485. name string
  486. rss []*apps.ReplicaSet
  487. pods []*v1.Pod
  488. rs *apps.ReplicaSet
  489. expectedPodNames []string
  490. }{
  491. {
  492. name: "expect to get a pod even if its owning ReplicaSet is pending deletion",
  493. rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS},
  494. rs: pendingDeletionRS,
  495. pods: []*v1.Pod{newPod("pod", pendingDeletionRS, v1.PodRunning, nil, true)},
  496. expectedPodNames: []string{"pod"},
  497. },
  498. {
  499. name: "expect to get only the ReplicaSet's own pods if there is no related ReplicaSet",
  500. rss: []*apps.ReplicaSet{someRS, unrelatedRS},
  501. rs: someRS,
  502. pods: []*v1.Pod{pod1, pod2, pod4},
  503. expectedPodNames: []string{"pod1", "pod2"},
  504. },
  505. {
  506. name: "expect to get own pods as well as any related ReplicaSet's but not an unrelated ReplicaSet's",
  507. rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS},
  508. rs: someRS,
  509. pods: []*v1.Pod{pod1, pod2, pod3, pod4},
  510. expectedPodNames: []string{"pod1", "pod2", "pod3"},
  511. },
  512. }
  513. for _, c := range testCases {
  514. for _, r := range c.rss {
  515. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r)
  516. }
  517. for _, pod := range c.pods {
  518. informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
  519. manager.addPod(pod)
  520. }
  521. actualPods, err := manager.getIndirectlyRelatedPods(c.rs)
  522. if err != nil {
  523. t.Errorf("Unexpected error from getIndirectlyRelatedPods: %v", err)
  524. }
  525. var actualPodNames []string
  526. for _, pod := range actualPods {
  527. actualPodNames = append(actualPodNames, pod.Name)
  528. }
  529. sort.Strings(actualPodNames)
  530. sort.Strings(c.expectedPodNames)
  531. if !reflect.DeepEqual(actualPodNames, c.expectedPodNames) {
  532. t.Errorf("Got [%s]; expected [%s]", strings.Join(actualPodNames, ", "), strings.Join(c.expectedPodNames, ", "))
  533. }
  534. }
  535. }
  536. func TestWatchControllers(t *testing.T) {
  537. fakeWatch := watch.NewFake()
  538. client := fake.NewSimpleClientset()
  539. client.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil))
  540. stopCh := make(chan struct{})
  541. defer close(stopCh)
  542. informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  543. manager := NewReplicaSetController(
  544. informers.Apps().V1().ReplicaSets(),
  545. informers.Core().V1().Pods(),
  546. client,
  547. BurstReplicas,
  548. )
  549. informers.Start(stopCh)
  550. var testRSSpec apps.ReplicaSet
  551. received := make(chan string)
  552. // The update sent through the fakeWatcher should make its way into the workqueue,
  553. // and eventually into the syncHandler. The handler validates the received controller
  554. // and closes the received channel to indicate that the test can finish.
  555. manager.syncHandler = func(key string) error {
  556. obj, exists, err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().GetByKey(key)
  557. if !exists || err != nil {
  558. t.Errorf("Expected to find replica set under key %v", key)
  559. }
  560. rsSpec := *obj.(*apps.ReplicaSet)
  561. if !apiequality.Semantic.DeepDerivative(rsSpec, testRSSpec) {
  562. t.Errorf("Expected %#v, but got %#v", testRSSpec, rsSpec)
  563. }
  564. close(received)
  565. return nil
  566. }
  567. // Start only the ReplicaSet watcher and the workqueue, send a watch event,
  568. // and make sure it hits the sync method.
  569. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  570. testRSSpec.Name = "foo"
  571. fakeWatch.Add(&testRSSpec)
  572. select {
  573. case <-received:
  574. case <-time.After(wait.ForeverTestTimeout):
  575. t.Errorf("unexpected timeout from result channel")
  576. }
  577. }
  578. func TestWatchPods(t *testing.T) {
  579. client := fake.NewSimpleClientset()
  580. fakeWatch := watch.NewFake()
  581. client.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
  582. stopCh := make(chan struct{})
  583. defer close(stopCh)
  584. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  585. // Put one ReplicaSet into the shared informer
  586. labelMap := map[string]string{"foo": "bar"}
  587. testRSSpec := newReplicaSet(1, labelMap)
  588. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(testRSSpec)
  589. received := make(chan string)
  590. // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
  591. // send it into the syncHandler.
  592. manager.syncHandler = func(key string) error {
  593. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  594. if err != nil {
  595. t.Errorf("Error splitting key: %v", err)
  596. }
  597. rsSpec, err := manager.rsLister.ReplicaSets(namespace).Get(name)
  598. if err != nil {
  599. t.Errorf("Expected to find replica set under key %v: %v", key, err)
  600. }
  601. if !apiequality.Semantic.DeepDerivative(rsSpec, testRSSpec) {
  602. t.Errorf("\nExpected %#v,\nbut got %#v", testRSSpec, rsSpec)
  603. }
  604. close(received)
  605. return nil
  606. }
  607. // Start only the pod watcher and the workqueue, send a watch event,
  608. // and make sure it hits the sync method for the right ReplicaSet.
  609. go informers.Core().V1().Pods().Informer().Run(stopCh)
  610. go manager.Run(1, stopCh)
  611. pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod")
  612. testPod := pods.Items[0]
  613. testPod.Status.Phase = v1.PodFailed
  614. fakeWatch.Add(&testPod)
  615. select {
  616. case <-received:
  617. case <-time.After(wait.ForeverTestTimeout):
  618. t.Errorf("unexpected timeout from result channel")
  619. }
  620. }
  621. func TestUpdatePods(t *testing.T) {
  622. stopCh := make(chan struct{})
  623. defer close(stopCh)
  624. manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas)
  625. received := make(chan string)
  626. manager.syncHandler = func(key string) error {
  627. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  628. if err != nil {
  629. t.Errorf("Error splitting key: %v", err)
  630. }
  631. rsSpec, err := manager.rsLister.ReplicaSets(namespace).Get(name)
  632. if err != nil {
  633. t.Errorf("Expected to find replica set under key %v: %v", key, err)
  634. }
  635. received <- rsSpec.Name
  636. return nil
  637. }
  638. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  639. // Put 2 ReplicaSets and one pod into the informers
  640. labelMap1 := map[string]string{"foo": "bar"}
  641. testRSSpec1 := newReplicaSet(1, labelMap1)
  642. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(testRSSpec1)
  643. testRSSpec2 := *testRSSpec1
  644. labelMap2 := map[string]string{"bar": "foo"}
  645. testRSSpec2.Spec.Selector = &metav1.LabelSelector{MatchLabels: labelMap2}
  646. testRSSpec2.Name = "barfoo"
  647. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(&testRSSpec2)
  648. isController := true
  649. controllerRef1 := metav1.OwnerReference{UID: testRSSpec1.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec1.Name, Controller: &isController}
  650. controllerRef2 := metav1.OwnerReference{UID: testRSSpec2.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec2.Name, Controller: &isController}
  651. // case 1: Pod with a ControllerRef
  652. pod1 := newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
  653. pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1}
  654. pod1.ResourceVersion = "1"
  655. pod2 := pod1
  656. pod2.Labels = labelMap2
  657. pod2.ResourceVersion = "2"
  658. manager.updatePod(&pod1, &pod2)
  659. expected := sets.NewString(testRSSpec1.Name)
  660. for _, name := range expected.List() {
  661. t.Logf("Expecting update for %+v", name)
  662. select {
  663. case got := <-received:
  664. if !expected.Has(got) {
  665. t.Errorf("Expected keys %#v got %v", expected, got)
  666. }
  667. case <-time.After(wait.ForeverTestTimeout):
  668. t.Errorf("Expected update notifications for replica sets")
  669. }
  670. }
  671. // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RS.
  672. pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
  673. pod1.ResourceVersion = "1"
  674. pod1.Labels = labelMap2
  675. pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2}
  676. pod2 = pod1
  677. pod2.OwnerReferences = nil
  678. pod2.ResourceVersion = "2"
  679. manager.updatePod(&pod1, &pod2)
  680. expected = sets.NewString(testRSSpec2.Name)
  681. for _, name := range expected.List() {
  682. t.Logf("Expecting update for %+v", name)
  683. select {
  684. case got := <-received:
  685. if !expected.Has(got) {
  686. t.Errorf("Expected keys %#v got %v", expected, got)
  687. }
  688. case <-time.After(wait.ForeverTestTimeout):
  689. t.Errorf("Expected update notifications for replica sets")
  690. }
  691. }
  692. // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and
  693. // any label-matching RS.
  694. pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
  695. pod1.ResourceVersion = "1"
  696. pod1.Labels = labelMap2
  697. pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1}
  698. pod2 = pod1
  699. pod2.OwnerReferences = nil
  700. pod2.ResourceVersion = "2"
  701. manager.updatePod(&pod1, &pod2)
  702. expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
  703. for _, name := range expected.List() {
  704. t.Logf("Expecting update for %+v", name)
  705. select {
  706. case got := <-received:
  707. if !expected.Has(got) {
  708. t.Errorf("Expected keys %#v got %v", expected, got)
  709. }
  710. case <-time.After(wait.ForeverTestTimeout):
  711. t.Errorf("Expected update notifications for replica sets")
  712. }
  713. }
  714. // case 4: Keep ControllerRef, change labels. Expect to sync owning RS.
  715. pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
  716. pod1.ResourceVersion = "1"
  717. pod1.Labels = labelMap1
  718. pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2}
  719. pod2 = pod1
  720. pod2.Labels = labelMap2
  721. pod2.ResourceVersion = "2"
  722. manager.updatePod(&pod1, &pod2)
  723. expected = sets.NewString(testRSSpec2.Name)
  724. for _, name := range expected.List() {
  725. t.Logf("Expecting update for %+v", name)
  726. select {
  727. case got := <-received:
  728. if !expected.Has(got) {
  729. t.Errorf("Expected keys %#v got %v", expected, got)
  730. }
  731. case <-time.After(wait.ForeverTestTimeout):
  732. t.Errorf("Expected update notifications for replica sets")
  733. }
  734. }
  735. }
  736. func TestControllerUpdateRequeue(t *testing.T) {
  737. // This server should force a requeue of the controller because it fails to update status.Replicas.
  738. labelMap := map[string]string{"foo": "bar"}
  739. rs := newReplicaSet(1, labelMap)
  740. client := fake.NewSimpleClientset(rs)
  741. client.PrependReactor("update", "replicasets",
  742. func(action core.Action) (bool, runtime.Object, error) {
  743. if action.GetSubresource() != "status" {
  744. return false, nil, nil
  745. }
  746. return true, nil, errors.New("failed to update status")
  747. })
  748. stopCh := make(chan struct{})
  749. defer close(stopCh)
  750. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
  751. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  752. rs.Status = apps.ReplicaSetStatus{Replicas: 2}
  753. newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap, rs, "pod")
  754. fakePodControl := controller.FakePodControl{}
  755. manager.podControl = &fakePodControl
  756. // Enqueue once. Then process it. Disable rate-limiting for this.
  757. manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
  758. manager.enqueueRS(rs)
  759. manager.processNextWorkItem()
  760. // It should have been requeued.
  761. if got, want := manager.queue.Len(), 1; got != want {
  762. t.Errorf("queue.Len() = %v, want %v", got, want)
  763. }
  764. }
  765. func TestControllerUpdateStatusWithFailure(t *testing.T) {
  766. rs := newReplicaSet(1, map[string]string{"foo": "bar"})
  767. fakeClient := &fake.Clientset{}
  768. fakeClient.AddReactor("get", "replicasets", func(action core.Action) (bool, runtime.Object, error) { return true, rs, nil })
  769. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  770. return true, &apps.ReplicaSet{}, fmt.Errorf("fake error")
  771. })
  772. fakeRSClient := fakeClient.AppsV1().ReplicaSets("default")
  773. numReplicas := int32(10)
  774. newStatus := apps.ReplicaSetStatus{Replicas: numReplicas}
  775. updateReplicaSetStatus(fakeRSClient, rs, newStatus)
  776. updates, gets := 0, 0
  777. for _, a := range fakeClient.Actions() {
  778. if a.GetResource().Resource != "replicasets" {
  779. t.Errorf("Unexpected action %+v", a)
  780. continue
  781. }
  782. switch action := a.(type) {
  783. case core.GetAction:
  784. gets++
  785. // Make sure the get is for the right ReplicaSet even though the update failed.
  786. if action.GetName() != rs.Name {
  787. t.Errorf("Expected get for ReplicaSet %v, got %+v instead", rs.Name, action.GetName())
  788. }
  789. case core.UpdateAction:
  790. updates++
  791. // Confirm that the update has the right status.Replicas even though the Get
  792. // returned a ReplicaSet with replicas=1.
  793. if c, ok := action.GetObject().(*apps.ReplicaSet); !ok {
  794. t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c)
  795. } else if c.Status.Replicas != numReplicas {
  796. t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead",
  797. numReplicas, c.Status.Replicas)
  798. }
  799. default:
  800. t.Errorf("Unexpected action %+v", a)
  801. break
  802. }
  803. }
  804. if gets != 1 || updates != 2 {
  805. t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
  806. }
  807. }
  808. // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
  809. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
  810. labelMap := map[string]string{"foo": "bar"}
  811. rsSpec := newReplicaSet(numReplicas, labelMap)
  812. client := fake.NewSimpleClientset(rsSpec)
  813. fakePodControl := controller.FakePodControl{}
  814. stopCh := make(chan struct{})
  815. defer close(stopCh)
  816. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas)
  817. manager.podControl = &fakePodControl
  818. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  819. expectedPods := int32(0)
  820. pods := newPodList(nil, numReplicas, v1.PodPending, labelMap, rsSpec, "pod")
  821. rsKey, err := controller.KeyFunc(rsSpec)
  822. if err != nil {
  823. t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err)
  824. }
  825. // Size up the controller, then size it down, and confirm the expected create/delete pattern
  826. for _, replicas := range []int32{int32(numReplicas), 0} {
  827. *(rsSpec.Spec.Replicas) = replicas
  828. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  829. for i := 0; i < numReplicas; i += burstReplicas {
  830. manager.syncReplicaSet(GetKey(rsSpec, t))
  831. // The store accrues active pods. It's also used by the ReplicaSet to determine how many
  832. // replicas to create.
  833. activePods := int32(len(informers.Core().V1().Pods().Informer().GetIndexer().List()))
  834. if replicas != 0 {
  835. // This is the number of pods currently "in flight". They were created by the
  836. // ReplicaSet controller above, which then puts the ReplicaSet to sleep till
  837. // all of them have been observed.
  838. expectedPods = replicas - activePods
  839. if expectedPods > int32(burstReplicas) {
  840. expectedPods = int32(burstReplicas)
  841. }
  842. // This validates the ReplicaSet manager sync actually created pods
  843. err := validateSyncReplicaSet(&fakePodControl, int(expectedPods), 0, 0)
  844. if err != nil {
  845. t.Fatal(err)
  846. }
  847. // This simulates the watch events for all but 1 of the expected pods.
  848. // None of these should wake the controller because it has expectations==BurstReplicas.
  849. for i := int32(0); i < expectedPods-1; i++ {
  850. informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[i])
  851. manager.addPod(&pods.Items[i])
  852. }
  853. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  854. if !exists || err != nil {
  855. t.Fatalf("Did not find expectations for rs.")
  856. }
  857. if add, _ := podExp.GetExpectations(); add != 1 {
  858. t.Fatalf("Expectations are wrong %v", podExp)
  859. }
  860. } else {
  861. expectedPods = (replicas - activePods) * -1
  862. if expectedPods > int32(burstReplicas) {
  863. expectedPods = int32(burstReplicas)
  864. }
  865. err := validateSyncReplicaSet(&fakePodControl, 0, int(expectedPods), 0)
  866. if err != nil {
  867. t.Fatal(err)
  868. }
  869. // To accurately simulate a watch we must delete the exact pods
  870. // the rs is waiting for.
  871. expectedDels := manager.expectations.GetUIDs(GetKey(rsSpec, t))
  872. podsToDelete := []*v1.Pod{}
  873. isController := true
  874. for _, key := range expectedDels.List() {
  875. nsName := strings.Split(key, "/")
  876. podsToDelete = append(podsToDelete, &v1.Pod{
  877. ObjectMeta: metav1.ObjectMeta{
  878. Name: nsName[1],
  879. Namespace: nsName[0],
  880. Labels: rsSpec.Spec.Selector.MatchLabels,
  881. OwnerReferences: []metav1.OwnerReference{
  882. {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController},
  883. },
  884. },
  885. })
  886. }
  887. // Don't delete all pods because we confirm that the last pod
  888. // has exactly one expectation at the end, to verify that we
  889. // don't double delete.
  890. for i := range podsToDelete[1:] {
  891. informers.Core().V1().Pods().Informer().GetIndexer().Delete(podsToDelete[i])
  892. manager.deletePod(podsToDelete[i])
  893. }
  894. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  895. if !exists || err != nil {
  896. t.Fatalf("Did not find expectations for ReplicaSet.")
  897. }
  898. if _, del := podExp.GetExpectations(); del != 1 {
  899. t.Fatalf("Expectations are wrong %v", podExp)
  900. }
  901. }
  902. // Check that the ReplicaSet didn't take any action for all the above pods
  903. fakePodControl.Clear()
  904. manager.syncReplicaSet(GetKey(rsSpec, t))
  905. err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
  906. if err != nil {
  907. t.Fatal(err)
  908. }
  909. // Create/Delete the last pod
  910. // The last add pod will decrease the expectation of the ReplicaSet to 0,
  911. // which will cause it to create/delete the remaining replicas up to burstReplicas.
  912. if replicas != 0 {
  913. informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[expectedPods-1])
  914. manager.addPod(&pods.Items[expectedPods-1])
  915. } else {
  916. expectedDel := manager.expectations.GetUIDs(GetKey(rsSpec, t))
  917. if expectedDel.Len() != 1 {
  918. t.Fatalf("Waiting on unexpected number of deletes.")
  919. }
  920. nsName := strings.Split(expectedDel.List()[0], "/")
  921. isController := true
  922. lastPod := &v1.Pod{
  923. ObjectMeta: metav1.ObjectMeta{
  924. Name: nsName[1],
  925. Namespace: nsName[0],
  926. Labels: rsSpec.Spec.Selector.MatchLabels,
  927. OwnerReferences: []metav1.OwnerReference{
  928. {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController},
  929. },
  930. },
  931. }
  932. informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod)
  933. manager.deletePod(lastPod)
  934. }
  935. pods.Items = pods.Items[expectedPods:]
  936. }
  937. // Confirm that we've created the right number of replicas
  938. activePods := int32(len(informers.Core().V1().Pods().Informer().GetIndexer().List()))
  939. if activePods != *(rsSpec.Spec.Replicas) {
  940. t.Fatalf("Unexpected number of active pods, expected %d, got %d", *(rsSpec.Spec.Replicas), activePods)
  941. }
  942. // Replenish the pod list, since we cut it down sizing up
  943. pods = newPodList(nil, int(replicas), v1.PodRunning, labelMap, rsSpec, "pod")
  944. }
  945. }
  946. func TestControllerBurstReplicas(t *testing.T) {
  947. doTestControllerBurstReplicas(t, 5, 30)
  948. doTestControllerBurstReplicas(t, 5, 12)
  949. doTestControllerBurstReplicas(t, 3, 2)
  950. }
  951. type FakeRSExpectations struct {
  952. *controller.ControllerExpectations
  953. satisfied bool
  954. expSatisfied func()
  955. }
  956. func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
  957. fe.expSatisfied()
  958. return fe.satisfied
  959. }
  960. // TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods
  961. // and checking expectations.
  962. func TestRSSyncExpectations(t *testing.T) {
  963. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  964. fakePodControl := controller.FakePodControl{}
  965. stopCh := make(chan struct{})
  966. defer close(stopCh)
  967. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2)
  968. manager.podControl = &fakePodControl
  969. labelMap := map[string]string{"foo": "bar"}
  970. rsSpec := newReplicaSet(2, labelMap)
  971. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
  972. pods := newPodList(nil, 2, v1.PodPending, labelMap, rsSpec, "pod")
  973. informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[0])
  974. postExpectationsPod := pods.Items[1]
  975. manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{
  976. controller.NewControllerExpectations(), true, func() {
  977. // If we check active pods before checking expectataions, the
  978. // ReplicaSet will create a new replica because it doesn't see
  979. // this pod, but has fulfilled its expectations.
  980. informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod)
  981. },
  982. })
  983. manager.syncReplicaSet(GetKey(rsSpec, t))
  984. err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
  985. if err != nil {
  986. t.Fatal(err)
  987. }
  988. }
  989. func TestDeleteControllerAndExpectations(t *testing.T) {
  990. rs := newReplicaSet(1, map[string]string{"foo": "bar"})
  991. client := fake.NewSimpleClientset(rs)
  992. stopCh := make(chan struct{})
  993. defer close(stopCh)
  994. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
  995. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  996. fakePodControl := controller.FakePodControl{}
  997. manager.podControl = &fakePodControl
  998. // This should set expectations for the ReplicaSet
  999. manager.syncReplicaSet(GetKey(rs, t))
  1000. err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  1001. if err != nil {
  1002. t.Fatal(err)
  1003. }
  1004. fakePodControl.Clear()
  1005. // Get the ReplicaSet key
  1006. rsKey, err := controller.KeyFunc(rs)
  1007. if err != nil {
  1008. t.Errorf("Couldn't get key for object %#v: %v", rs, err)
  1009. }
  1010. // This is to simulate a concurrent addPod, that has a handle on the expectations
  1011. // as the controller deletes it.
  1012. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  1013. if !exists || err != nil {
  1014. t.Errorf("No expectations found for ReplicaSet")
  1015. }
  1016. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs)
  1017. manager.deleteRS(rs)
  1018. manager.syncReplicaSet(GetKey(rs, t))
  1019. if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
  1020. t.Errorf("Found expectations, expected none since the ReplicaSet has been deleted.")
  1021. }
  1022. // This should have no effect, since we've deleted the ReplicaSet.
  1023. podExp.Add(-1, 0)
  1024. informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0")
  1025. manager.syncReplicaSet(GetKey(rs, t))
  1026. err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
  1027. if err != nil {
  1028. t.Fatal(err)
  1029. }
  1030. }
  1031. func TestExpectationsOnRecreate(t *testing.T) {
  1032. client := fake.NewSimpleClientset()
  1033. stopCh := make(chan struct{})
  1034. defer close(stopCh)
  1035. f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
  1036. manager := NewReplicaSetController(
  1037. f.Apps().V1().ReplicaSets(),
  1038. f.Core().V1().Pods(),
  1039. client,
  1040. 100,
  1041. )
  1042. f.Start(stopCh)
  1043. fakePodControl := controller.FakePodControl{}
  1044. manager.podControl = &fakePodControl
  1045. if manager.queue.Len() != 0 {
  1046. t.Fatal("Unexpected item in the queue")
  1047. }
  1048. oldRS := newReplicaSet(1, map[string]string{"foo": "bar"})
  1049. oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(context.TODO(), oldRS, metav1.CreateOptions{})
  1050. if err != nil {
  1051. t.Fatal(err)
  1052. }
  1053. err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
  1054. klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
  1055. return manager.queue.Len() == 1, nil
  1056. })
  1057. if err != nil {
  1058. t.Fatalf("initial RS didn't result in new item in the queue: %v", err)
  1059. }
  1060. ok := manager.processNextWorkItem()
  1061. if !ok {
  1062. t.Fatal("queue is shutting down")
  1063. }
  1064. err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  1065. if err != nil {
  1066. t.Fatal(err)
  1067. }
  1068. fakePodControl.Clear()
  1069. oldRSKey, err := controller.KeyFunc(oldRS)
  1070. if err != nil {
  1071. t.Fatal(err)
  1072. }
  1073. rsExp, exists, err := manager.expectations.GetExpectations(oldRSKey)
  1074. if err != nil {
  1075. t.Fatal(err)
  1076. }
  1077. if !exists {
  1078. t.Errorf("No expectations found for ReplicaSet %q", oldRSKey)
  1079. }
  1080. if rsExp.Fulfilled() {
  1081. t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey)
  1082. }
  1083. if manager.queue.Len() != 0 {
  1084. t.Fatal("Unexpected item in the queue")
  1085. }
  1086. err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(context.TODO(), oldRS.Name, &metav1.DeleteOptions{})
  1087. if err != nil {
  1088. t.Fatal(err)
  1089. }
  1090. err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
  1091. klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
  1092. return manager.queue.Len() == 1, nil
  1093. })
  1094. if err != nil {
  1095. t.Fatalf("Deleting RS didn't result in new item in the queue: %v", err)
  1096. }
  1097. rsExp, exists, err = manager.expectations.GetExpectations(oldRSKey)
  1098. if err != nil {
  1099. t.Fatal(err)
  1100. }
  1101. if exists {
  1102. t.Errorf("There should be no expectations for ReplicaSet %q after it was deleted", oldRSKey)
  1103. }
  1104. // skip sync for the delete event so we only see the new RS in sync
  1105. key, quit := manager.queue.Get()
  1106. if quit {
  1107. t.Fatal("Queue is shutting down!")
  1108. }
  1109. manager.queue.Done(key)
  1110. if key != oldRSKey {
  1111. t.Fatal("Keys should be equal!")
  1112. }
  1113. if manager.queue.Len() != 0 {
  1114. t.Fatal("Unexpected item in the queue")
  1115. }
  1116. newRS := oldRS.DeepCopy()
  1117. newRS.UID = uuid.NewUUID()
  1118. newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), newRS, metav1.CreateOptions{})
  1119. if err != nil {
  1120. t.Fatal(err)
  1121. }
  1122. // Sanity check
  1123. if newRS.UID == oldRS.UID {
  1124. t.Fatal("New RS has the same UID as the old one!")
  1125. }
  1126. err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
  1127. klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
  1128. return manager.queue.Len() == 1, nil
  1129. })
  1130. if err != nil {
  1131. t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err)
  1132. }
  1133. ok = manager.processNextWorkItem()
  1134. if !ok {
  1135. t.Fatal("Queue is shutting down!")
  1136. }
  1137. newRSKey, err := controller.KeyFunc(newRS)
  1138. if err != nil {
  1139. t.Fatal(err)
  1140. }
  1141. rsExp, exists, err = manager.expectations.GetExpectations(newRSKey)
  1142. if err != nil {
  1143. t.Fatal(err)
  1144. }
  1145. if !exists {
  1146. t.Errorf("No expectations found for ReplicaSet %q", oldRSKey)
  1147. }
  1148. if rsExp.Fulfilled() {
  1149. t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey)
  1150. }
  1151. err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
  1152. if err != nil {
  1153. t.Fatal(err)
  1154. }
  1155. fakePodControl.Clear()
  1156. }
  1157. // shuffle returns a new shuffled list of container controllers.
  1158. func shuffle(controllers []*apps.ReplicaSet) []*apps.ReplicaSet {
  1159. numControllers := len(controllers)
  1160. randIndexes := rand.Perm(numControllers)
  1161. shuffled := make([]*apps.ReplicaSet, numControllers)
  1162. for i := 0; i < numControllers; i++ {
  1163. shuffled[i] = controllers[randIndexes[i]]
  1164. }
  1165. return shuffled
  1166. }
  1167. func TestOverlappingRSs(t *testing.T) {
  1168. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1169. labelMap := map[string]string{"foo": "bar"}
  1170. stopCh := make(chan struct{})
  1171. defer close(stopCh)
  1172. manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
  1173. // Create 10 ReplicaSets, shuffled them randomly and insert them into the
  1174. // ReplicaSet controller's store.
  1175. // All use the same CreationTimestamp since ControllerRef should be able
  1176. // to handle that.
  1177. timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local)
  1178. var controllers []*apps.ReplicaSet
  1179. for j := 1; j < 10; j++ {
  1180. rsSpec := newReplicaSet(1, labelMap)
  1181. rsSpec.CreationTimestamp = timestamp
  1182. rsSpec.Name = fmt.Sprintf("rs%d", j)
  1183. controllers = append(controllers, rsSpec)
  1184. }
  1185. shuffledControllers := shuffle(controllers)
  1186. for j := range shuffledControllers {
  1187. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j])
  1188. }
  1189. // Add a pod with a ControllerRef and make sure only the corresponding
  1190. // ReplicaSet is synced. Pick a RS in the middle since the old code used to
  1191. // sort by name if all timestamps were equal.
  1192. rs := controllers[3]
  1193. pods := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod")
  1194. pod := &pods.Items[0]
  1195. isController := true
  1196. pod.OwnerReferences = []metav1.OwnerReference{
  1197. {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController},
  1198. }
  1199. rsKey := GetKey(rs, t)
  1200. manager.addPod(pod)
  1201. queueRS, _ := manager.queue.Get()
  1202. if queueRS != rsKey {
  1203. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1204. }
  1205. }
  1206. func TestDeletionTimestamp(t *testing.T) {
  1207. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
  1208. labelMap := map[string]string{"foo": "bar"}
  1209. stopCh := make(chan struct{})
  1210. defer close(stopCh)
  1211. manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10)
  1212. rs := newReplicaSet(1, labelMap)
  1213. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  1214. rsKey, err := controller.KeyFunc(rs)
  1215. if err != nil {
  1216. t.Errorf("Couldn't get key for object %#v: %v", rs, err)
  1217. }
  1218. pod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0]
  1219. pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
  1220. pod.ResourceVersion = "1"
  1221. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
  1222. // A pod added with a deletion timestamp should decrement deletions, not creations.
  1223. manager.addPod(&pod)
  1224. queueRS, _ := manager.queue.Get()
  1225. if queueRS != rsKey {
  1226. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1227. }
  1228. manager.queue.Done(rsKey)
  1229. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  1230. if !exists || err != nil || !podExp.Fulfilled() {
  1231. t.Fatalf("Wrong expectations %#v", podExp)
  1232. }
  1233. // An update from no deletion timestamp to having one should be treated
  1234. // as a deletion.
  1235. oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0]
  1236. oldPod.ResourceVersion = "2"
  1237. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
  1238. manager.updatePod(&oldPod, &pod)
  1239. queueRS, _ = manager.queue.Get()
  1240. if queueRS != rsKey {
  1241. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1242. }
  1243. manager.queue.Done(rsKey)
  1244. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  1245. if !exists || err != nil || !podExp.Fulfilled() {
  1246. t.Fatalf("Wrong expectations %#v", podExp)
  1247. }
  1248. // An update to the pod (including an update to the deletion timestamp)
  1249. // should not be counted as a second delete.
  1250. isController := true
  1251. secondPod := &v1.Pod{
  1252. ObjectMeta: metav1.ObjectMeta{
  1253. Namespace: pod.Namespace,
  1254. Name: "secondPod",
  1255. Labels: pod.Labels,
  1256. OwnerReferences: []metav1.OwnerReference{
  1257. {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController},
  1258. },
  1259. },
  1260. }
  1261. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
  1262. oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
  1263. oldPod.ResourceVersion = "2"
  1264. manager.updatePod(&oldPod, &pod)
  1265. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  1266. if !exists || err != nil || podExp.Fulfilled() {
  1267. t.Fatalf("Wrong expectations %#v", podExp)
  1268. }
  1269. // A pod with a non-nil deletion timestamp should also be ignored by the
  1270. // delete handler, because it's already been counted in the update.
  1271. manager.deletePod(&pod)
  1272. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  1273. if !exists || err != nil || podExp.Fulfilled() {
  1274. t.Fatalf("Wrong expectations %#v", podExp)
  1275. }
  1276. // Deleting the second pod should clear expectations.
  1277. manager.deletePod(secondPod)
  1278. queueRS, _ = manager.queue.Get()
  1279. if queueRS != rsKey {
  1280. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1281. }
  1282. manager.queue.Done(rsKey)
  1283. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  1284. if !exists || err != nil || !podExp.Fulfilled() {
  1285. t.Fatalf("Wrong expectations %#v", podExp)
  1286. }
  1287. }
  1288. // setupManagerWithGCEnabled creates a RS manager with a fakePodControl
  1289. func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) {
  1290. c := fake.NewSimpleClientset(objs...)
  1291. fakePodControl = &controller.FakePodControl{}
  1292. manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas)
  1293. manager.podControl = fakePodControl
  1294. return manager, fakePodControl, informers
  1295. }
  1296. func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
  1297. labelMap := map[string]string{"foo": "bar"}
  1298. rs := newReplicaSet(2, labelMap)
  1299. stopCh := make(chan struct{})
  1300. defer close(stopCh)
  1301. manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
  1302. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  1303. var trueVar = true
  1304. otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
  1305. // add to podLister a matching Pod controlled by another controller. Expect no patch.
  1306. pod := newPod("pod", rs, v1.PodRunning, nil, true)
  1307. pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference}
  1308. informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
  1309. err := manager.syncReplicaSet(GetKey(rs, t))
  1310. if err != nil {
  1311. t.Fatal(err)
  1312. }
  1313. // because the matching pod already has a controller, so 2 pods should be created.
  1314. err = validateSyncReplicaSet(fakePodControl, 2, 0, 0)
  1315. if err != nil {
  1316. t.Fatal(err)
  1317. }
  1318. }
  1319. func TestPatchPodFails(t *testing.T) {
  1320. labelMap := map[string]string{"foo": "bar"}
  1321. rs := newReplicaSet(2, labelMap)
  1322. stopCh := make(chan struct{})
  1323. defer close(stopCh)
  1324. manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
  1325. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  1326. // add to podLister two matching pods. Expect two patches to take control
  1327. // them.
  1328. informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil, false))
  1329. informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil, false))
  1330. // let both patches fail. The rs controller will assume it fails to take
  1331. // control of the pods and requeue to try again.
  1332. fakePodControl.Err = fmt.Errorf("fake Error")
  1333. rsKey := GetKey(rs, t)
  1334. err := processSync(manager, rsKey)
  1335. if err == nil || !strings.Contains(err.Error(), "fake Error") {
  1336. t.Errorf("expected fake Error, got %+v", err)
  1337. }
  1338. // 2 patches to take control of pod1 and pod2 (both fail).
  1339. err = validateSyncReplicaSet(fakePodControl, 0, 0, 2)
  1340. if err != nil {
  1341. t.Fatal(err)
  1342. }
  1343. // RS should requeue itself.
  1344. queueRS, _ := manager.queue.Get()
  1345. if queueRS != rsKey {
  1346. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1347. }
  1348. }
  1349. // RS controller shouldn't adopt or create more pods if the rc is about to be
  1350. // deleted.
  1351. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
  1352. labelMap := map[string]string{"foo": "bar"}
  1353. rs := newReplicaSet(2, labelMap)
  1354. now := metav1.Now()
  1355. rs.DeletionTimestamp = &now
  1356. stopCh := make(chan struct{})
  1357. defer close(stopCh)
  1358. manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
  1359. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
  1360. pod1 := newPod("pod1", rs, v1.PodRunning, nil, false)
  1361. informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  1362. // no patch, no create
  1363. err := manager.syncReplicaSet(GetKey(rs, t))
  1364. if err != nil {
  1365. t.Fatal(err)
  1366. }
  1367. err = validateSyncReplicaSet(fakePodControl, 0, 0, 0)
  1368. if err != nil {
  1369. t.Fatal(err)
  1370. }
  1371. }
  1372. func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
  1373. labelMap := map[string]string{"foo": "bar"}
  1374. // Bare client says it IS deleted.
  1375. rs := newReplicaSet(2, labelMap)
  1376. now := metav1.Now()
  1377. rs.DeletionTimestamp = &now
  1378. stopCh := make(chan struct{})
  1379. defer close(stopCh)
  1380. manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
  1381. // Lister (cache) says it's NOT deleted.
  1382. rs2 := *rs
  1383. rs2.DeletionTimestamp = nil
  1384. informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(&rs2)
  1385. // Recheck occurs if a matching orphan is present.
  1386. pod1 := newPod("pod1", rs, v1.PodRunning, nil, false)
  1387. informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
  1388. // sync should abort.
  1389. err := manager.syncReplicaSet(GetKey(rs, t))
  1390. if err == nil {
  1391. t.Error("syncReplicaSet() err = nil, expected non-nil")
  1392. }
  1393. // no patch, no create.
  1394. err = validateSyncReplicaSet(fakePodControl, 0, 0, 0)
  1395. if err != nil {
  1396. t.Fatal(err)
  1397. }
  1398. }
  1399. var (
  1400. imagePullBackOff apps.ReplicaSetConditionType = "ImagePullBackOff"
  1401. condImagePullBackOff = func() apps.ReplicaSetCondition {
  1402. return apps.ReplicaSetCondition{
  1403. Type: imagePullBackOff,
  1404. Status: v1.ConditionTrue,
  1405. Reason: "NonExistentImage",
  1406. }
  1407. }
  1408. condReplicaFailure = func() apps.ReplicaSetCondition {
  1409. return apps.ReplicaSetCondition{
  1410. Type: apps.ReplicaSetReplicaFailure,
  1411. Status: v1.ConditionTrue,
  1412. Reason: "OtherFailure",
  1413. }
  1414. }
  1415. condReplicaFailure2 = func() apps.ReplicaSetCondition {
  1416. return apps.ReplicaSetCondition{
  1417. Type: apps.ReplicaSetReplicaFailure,
  1418. Status: v1.ConditionTrue,
  1419. Reason: "AnotherFailure",
  1420. }
  1421. }
  1422. status = func() *apps.ReplicaSetStatus {
  1423. return &apps.ReplicaSetStatus{
  1424. Conditions: []apps.ReplicaSetCondition{condReplicaFailure()},
  1425. }
  1426. }
  1427. )
  1428. func TestGetCondition(t *testing.T) {
  1429. exampleStatus := status()
  1430. tests := []struct {
  1431. name string
  1432. status apps.ReplicaSetStatus
  1433. condType apps.ReplicaSetConditionType
  1434. expected bool
  1435. }{
  1436. {
  1437. name: "condition exists",
  1438. status: *exampleStatus,
  1439. condType: apps.ReplicaSetReplicaFailure,
  1440. expected: true,
  1441. },
  1442. {
  1443. name: "condition does not exist",
  1444. status: *exampleStatus,
  1445. condType: imagePullBackOff,
  1446. expected: false,
  1447. },
  1448. }
  1449. for _, test := range tests {
  1450. cond := GetCondition(test.status, test.condType)
  1451. exists := cond != nil
  1452. if exists != test.expected {
  1453. t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists)
  1454. }
  1455. }
  1456. }
  1457. func TestSetCondition(t *testing.T) {
  1458. tests := []struct {
  1459. name string
  1460. status *apps.ReplicaSetStatus
  1461. cond apps.ReplicaSetCondition
  1462. expectedStatus *apps.ReplicaSetStatus
  1463. }{
  1464. {
  1465. name: "set for the first time",
  1466. status: &apps.ReplicaSetStatus{},
  1467. cond: condReplicaFailure(),
  1468. expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}},
  1469. },
  1470. {
  1471. name: "simple set",
  1472. status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condImagePullBackOff()}},
  1473. cond: condReplicaFailure(),
  1474. expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condImagePullBackOff(), condReplicaFailure()}},
  1475. },
  1476. {
  1477. name: "overwrite",
  1478. status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}},
  1479. cond: condReplicaFailure2(),
  1480. expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure2()}},
  1481. },
  1482. }
  1483. for _, test := range tests {
  1484. SetCondition(test.status, test.cond)
  1485. if !reflect.DeepEqual(test.status, test.expectedStatus) {
  1486. t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
  1487. }
  1488. }
  1489. }
  1490. func TestRemoveCondition(t *testing.T) {
  1491. tests := []struct {
  1492. name string
  1493. status *apps.ReplicaSetStatus
  1494. condType apps.ReplicaSetConditionType
  1495. expectedStatus *apps.ReplicaSetStatus
  1496. }{
  1497. {
  1498. name: "remove from empty status",
  1499. status: &apps.ReplicaSetStatus{},
  1500. condType: apps.ReplicaSetReplicaFailure,
  1501. expectedStatus: &apps.ReplicaSetStatus{},
  1502. },
  1503. {
  1504. name: "simple remove",
  1505. status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}},
  1506. condType: apps.ReplicaSetReplicaFailure,
  1507. expectedStatus: &apps.ReplicaSetStatus{},
  1508. },
  1509. {
  1510. name: "doesn't remove anything",
  1511. status: status(),
  1512. condType: imagePullBackOff,
  1513. expectedStatus: status(),
  1514. },
  1515. }
  1516. for _, test := range tests {
  1517. RemoveCondition(test.status, test.condType)
  1518. if !reflect.DeepEqual(test.status, test.expectedStatus) {
  1519. t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
  1520. }
  1521. }
  1522. }
  1523. func TestSlowStartBatch(t *testing.T) {
  1524. fakeErr := fmt.Errorf("fake error")
  1525. callCnt := 0
  1526. callLimit := 0
  1527. var lock sync.Mutex
  1528. fn := func() error {
  1529. lock.Lock()
  1530. defer lock.Unlock()
  1531. callCnt++
  1532. if callCnt > callLimit {
  1533. return fakeErr
  1534. }
  1535. return nil
  1536. }
  1537. tests := []struct {
  1538. name string
  1539. count int
  1540. callLimit int
  1541. fn func() error
  1542. expectedSuccesses int
  1543. expectedErr error
  1544. expectedCallCnt int
  1545. }{
  1546. {
  1547. name: "callLimit = 0 (all fail)",
  1548. count: 10,
  1549. callLimit: 0,
  1550. fn: fn,
  1551. expectedSuccesses: 0,
  1552. expectedErr: fakeErr,
  1553. expectedCallCnt: 1, // 1(first batch): function will be called at least once
  1554. },
  1555. {
  1556. name: "callLimit = count (all succeed)",
  1557. count: 10,
  1558. callLimit: 10,
  1559. fn: fn,
  1560. expectedSuccesses: 10,
  1561. expectedErr: nil,
  1562. expectedCallCnt: 10, // 1(first batch) + 2(2nd batch) + 4(3rd batch) + 3(4th batch) = 10
  1563. },
  1564. {
  1565. name: "callLimit < count (some succeed)",
  1566. count: 10,
  1567. callLimit: 5,
  1568. fn: fn,
  1569. expectedSuccesses: 5,
  1570. expectedErr: fakeErr,
  1571. expectedCallCnt: 7, // 1(first batch) + 2(2nd batch) + 4(3rd batch) = 7
  1572. },
  1573. }
  1574. for _, test := range tests {
  1575. callCnt = 0
  1576. callLimit = test.callLimit
  1577. successes, err := slowStartBatch(test.count, 1, test.fn)
  1578. if successes != test.expectedSuccesses {
  1579. t.Errorf("%s: unexpected processed batch size, expected %d, got %d", test.name, test.expectedSuccesses, successes)
  1580. }
  1581. if err != test.expectedErr {
  1582. t.Errorf("%s: unexpected processed batch size, expected %v, got %v", test.name, test.expectedErr, err)
  1583. }
  1584. // verify that slowStartBatch stops trying more calls after a batch fails
  1585. if callCnt != test.expectedCallCnt {
  1586. t.Errorf("%s: slowStartBatch() still tries calls after a batch fails, expected %d calls, got %d", test.name, test.expectedCallCnt, callCnt)
  1587. }
  1588. }
  1589. }
  1590. func TestGetPodsToDelete(t *testing.T) {
  1591. labelMap := map[string]string{"name": "foo"}
  1592. rs := newReplicaSet(1, labelMap)
  1593. // an unscheduled, pending pod
  1594. unscheduledPendingPod := newPod("unscheduled-pending-pod", rs, v1.PodPending, nil, true)
  1595. // a scheduled, pending pod
  1596. scheduledPendingPod := newPod("scheduled-pending-pod", rs, v1.PodPending, nil, true)
  1597. scheduledPendingPod.Spec.NodeName = "fake-node"
  1598. // a scheduled, running, not-ready pod
  1599. scheduledRunningNotReadyPod := newPod("scheduled-running-not-ready-pod", rs, v1.PodRunning, nil, true)
  1600. scheduledRunningNotReadyPod.Spec.NodeName = "fake-node"
  1601. scheduledRunningNotReadyPod.Status.Conditions = []v1.PodCondition{
  1602. {
  1603. Type: v1.PodReady,
  1604. Status: v1.ConditionFalse,
  1605. },
  1606. }
  1607. // a scheduled, running, ready pod on fake-node-1
  1608. scheduledRunningReadyPodOnNode1 := newPod("scheduled-running-ready-pod-on-node-1", rs, v1.PodRunning, nil, true)
  1609. scheduledRunningReadyPodOnNode1.Spec.NodeName = "fake-node-1"
  1610. scheduledRunningReadyPodOnNode1.Status.Conditions = []v1.PodCondition{
  1611. {
  1612. Type: v1.PodReady,
  1613. Status: v1.ConditionTrue,
  1614. },
  1615. }
  1616. // a scheduled, running, ready pod on fake-node-2
  1617. scheduledRunningReadyPodOnNode2 := newPod("scheduled-running-ready-pod-on-node-2", rs, v1.PodRunning, nil, true)
  1618. scheduledRunningReadyPodOnNode2.Spec.NodeName = "fake-node-2"
  1619. scheduledRunningReadyPodOnNode2.Status.Conditions = []v1.PodCondition{
  1620. {
  1621. Type: v1.PodReady,
  1622. Status: v1.ConditionTrue,
  1623. },
  1624. }
  1625. tests := []struct {
  1626. name string
  1627. pods []*v1.Pod
  1628. // related defaults to pods if nil.
  1629. related []*v1.Pod
  1630. diff int
  1631. expectedPodsToDelete []*v1.Pod
  1632. }{
  1633. // Order used when selecting pods for deletion:
  1634. // an unscheduled, pending pod
  1635. // a scheduled, pending pod
  1636. // a scheduled, running, not-ready pod
  1637. // a scheduled, running, ready pod on same node as a related pod
  1638. // a scheduled, running, ready pod not on node with related pods
  1639. // Note that a pending pod cannot be ready
  1640. {
  1641. name: "len(pods) = 0 (i.e., diff = 0 too)",
  1642. pods: []*v1.Pod{},
  1643. diff: 0,
  1644. expectedPodsToDelete: []*v1.Pod{},
  1645. },
  1646. {
  1647. name: "diff = len(pods)",
  1648. pods: []*v1.Pod{
  1649. scheduledRunningNotReadyPod,
  1650. scheduledRunningReadyPodOnNode1,
  1651. },
  1652. diff: 2,
  1653. expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPodOnNode1},
  1654. },
  1655. {
  1656. name: "diff < len(pods)",
  1657. pods: []*v1.Pod{
  1658. scheduledRunningReadyPodOnNode1,
  1659. scheduledRunningNotReadyPod,
  1660. },
  1661. diff: 1,
  1662. expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod},
  1663. },
  1664. {
  1665. name: "various pod phases and conditions, diff = len(pods)",
  1666. pods: []*v1.Pod{
  1667. scheduledRunningReadyPodOnNode1,
  1668. scheduledRunningReadyPodOnNode1,
  1669. scheduledRunningReadyPodOnNode2,
  1670. scheduledRunningNotReadyPod,
  1671. scheduledPendingPod,
  1672. unscheduledPendingPod,
  1673. },
  1674. diff: 6,
  1675. expectedPodsToDelete: []*v1.Pod{
  1676. scheduledRunningReadyPodOnNode1,
  1677. scheduledRunningReadyPodOnNode1,
  1678. scheduledRunningReadyPodOnNode2,
  1679. scheduledRunningNotReadyPod,
  1680. scheduledPendingPod,
  1681. unscheduledPendingPod,
  1682. },
  1683. },
  1684. {
  1685. name: "various pod phases and conditions, diff = len(pods), relatedPods empty",
  1686. pods: []*v1.Pod{
  1687. scheduledRunningReadyPodOnNode1,
  1688. scheduledRunningReadyPodOnNode1,
  1689. scheduledRunningReadyPodOnNode2,
  1690. scheduledRunningNotReadyPod,
  1691. scheduledPendingPod,
  1692. unscheduledPendingPod,
  1693. },
  1694. related: []*v1.Pod{},
  1695. diff: 6,
  1696. expectedPodsToDelete: []*v1.Pod{
  1697. scheduledRunningReadyPodOnNode1,
  1698. scheduledRunningReadyPodOnNode1,
  1699. scheduledRunningReadyPodOnNode2,
  1700. scheduledRunningNotReadyPod,
  1701. scheduledPendingPod,
  1702. unscheduledPendingPod,
  1703. },
  1704. },
  1705. {
  1706. name: "scheduled vs unscheduled, diff < len(pods)",
  1707. pods: []*v1.Pod{
  1708. scheduledPendingPod,
  1709. unscheduledPendingPod,
  1710. },
  1711. diff: 1,
  1712. expectedPodsToDelete: []*v1.Pod{
  1713. unscheduledPendingPod,
  1714. },
  1715. },
  1716. {
  1717. name: "ready vs not-ready, diff < len(pods)",
  1718. pods: []*v1.Pod{
  1719. scheduledRunningReadyPodOnNode1,
  1720. scheduledRunningNotReadyPod,
  1721. scheduledRunningNotReadyPod,
  1722. },
  1723. diff: 2,
  1724. expectedPodsToDelete: []*v1.Pod{
  1725. scheduledRunningNotReadyPod,
  1726. scheduledRunningNotReadyPod,
  1727. },
  1728. },
  1729. {
  1730. name: "ready and colocated with another ready pod vs not colocated, diff < len(pods)",
  1731. pods: []*v1.Pod{
  1732. scheduledRunningReadyPodOnNode1,
  1733. scheduledRunningReadyPodOnNode2,
  1734. },
  1735. related: []*v1.Pod{
  1736. scheduledRunningReadyPodOnNode1,
  1737. scheduledRunningReadyPodOnNode2,
  1738. scheduledRunningReadyPodOnNode2,
  1739. },
  1740. diff: 1,
  1741. expectedPodsToDelete: []*v1.Pod{
  1742. scheduledRunningReadyPodOnNode2,
  1743. },
  1744. },
  1745. {
  1746. name: "pending vs running, diff < len(pods)",
  1747. pods: []*v1.Pod{
  1748. scheduledPendingPod,
  1749. scheduledRunningNotReadyPod,
  1750. },
  1751. diff: 1,
  1752. expectedPodsToDelete: []*v1.Pod{
  1753. scheduledPendingPod,
  1754. },
  1755. },
  1756. {
  1757. name: "various pod phases and conditions, diff < len(pods)",
  1758. pods: []*v1.Pod{
  1759. scheduledRunningReadyPodOnNode1,
  1760. scheduledRunningReadyPodOnNode2,
  1761. scheduledRunningNotReadyPod,
  1762. scheduledPendingPod,
  1763. unscheduledPendingPod,
  1764. },
  1765. diff: 3,
  1766. expectedPodsToDelete: []*v1.Pod{
  1767. unscheduledPendingPod,
  1768. scheduledPendingPod,
  1769. scheduledRunningNotReadyPod,
  1770. },
  1771. },
  1772. }
  1773. for _, test := range tests {
  1774. related := test.related
  1775. if related == nil {
  1776. related = test.pods
  1777. }
  1778. podsToDelete := getPodsToDelete(test.pods, related, test.diff)
  1779. if len(podsToDelete) != len(test.expectedPodsToDelete) {
  1780. t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete)
  1781. }
  1782. if !reflect.DeepEqual(podsToDelete, test.expectedPodsToDelete) {
  1783. t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete)
  1784. }
  1785. }
  1786. }
  1787. func TestGetPodKeys(t *testing.T) {
  1788. labelMap := map[string]string{"name": "foo"}
  1789. rs := newReplicaSet(1, labelMap)
  1790. pod1 := newPod("pod1", rs, v1.PodRunning, nil, true)
  1791. pod2 := newPod("pod2", rs, v1.PodRunning, nil, true)
  1792. tests := []struct {
  1793. name string
  1794. pods []*v1.Pod
  1795. expectedPodKeys []string
  1796. }{
  1797. {
  1798. "len(pods) = 0 (i.e., pods = nil)",
  1799. []*v1.Pod{},
  1800. []string{},
  1801. },
  1802. {
  1803. "len(pods) > 0",
  1804. []*v1.Pod{
  1805. pod1,
  1806. pod2,
  1807. },
  1808. []string{"default/pod1", "default/pod2"},
  1809. },
  1810. }
  1811. for _, test := range tests {
  1812. podKeys := getPodKeys(test.pods)
  1813. if len(podKeys) != len(test.expectedPodKeys) {
  1814. t.Errorf("%s: unexpected keys for pods to delete, expected %v, got %v", test.name, test.expectedPodKeys, podKeys)
  1815. }
  1816. for i := 0; i < len(podKeys); i++ {
  1817. if podKeys[i] != test.expectedPodKeys[i] {
  1818. t.Errorf("%s: unexpected keys for pods to delete, expected %v, got %v", test.name, test.expectedPodKeys, podKeys)
  1819. }
  1820. }
  1821. }
  1822. }