runners.go 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "os"
  19. "strings"
  20. "sync"
  21. "time"
  22. apps "k8s.io/api/apps/v1"
  23. batch "k8s.io/api/batch/v1"
  24. v1 "k8s.io/api/core/v1"
  25. storage "k8s.io/api/storage/v1"
  26. storagev1beta1 "k8s.io/api/storage/v1beta1"
  27. apiequality "k8s.io/apimachinery/pkg/api/equality"
  28. apierrors "k8s.io/apimachinery/pkg/api/errors"
  29. "k8s.io/apimachinery/pkg/api/resource"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/labels"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/types"
  35. "k8s.io/apimachinery/pkg/util/json"
  36. "k8s.io/apimachinery/pkg/util/sets"
  37. "k8s.io/apimachinery/pkg/util/strategicpatch"
  38. "k8s.io/apimachinery/pkg/util/uuid"
  39. "k8s.io/apimachinery/pkg/util/wait"
  40. clientset "k8s.io/client-go/kubernetes"
  41. scaleclient "k8s.io/client-go/scale"
  42. "k8s.io/client-go/util/workqueue"
  43. batchinternal "k8s.io/kubernetes/pkg/apis/batch"
  44. api "k8s.io/kubernetes/pkg/apis/core"
  45. extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
  46. "k8s.io/klog"
  47. )
  48. const (
  49. // String used to mark pod deletion
  50. nonExist = "NonExist"
  51. )
  52. func removePtr(replicas *int32) int32 {
  53. if replicas == nil {
  54. return 0
  55. }
  56. return *replicas
  57. }
  58. func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
  59. // Wait until it's scheduled
  60. p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
  61. if err == nil && p.Spec.NodeName != "" {
  62. return p, nil
  63. }
  64. pollingPeriod := 200 * time.Millisecond
  65. startTime := time.Now()
  66. for startTime.Add(timeout).After(time.Now()) {
  67. time.Sleep(pollingPeriod)
  68. p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
  69. if err == nil && p.Spec.NodeName != "" {
  70. return p, nil
  71. }
  72. }
  73. return nil, fmt.Errorf("Timed out after %v when waiting for pod %v/%v to start.", timeout, namespace, name)
  74. }
  75. func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
  76. name := pod.Name
  77. namespace := pod.Namespace
  78. if err := CreatePodWithRetries(c, namespace, pod); err != nil {
  79. return "", err
  80. }
  81. p, err := WaitUntilPodIsScheduled(c, name, namespace, timeout)
  82. if err != nil {
  83. return "", err
  84. }
  85. return p.Spec.NodeName, nil
  86. }
  87. type RunObjectConfig interface {
  88. Run() error
  89. GetName() string
  90. GetNamespace() string
  91. GetKind() schema.GroupKind
  92. GetClient() clientset.Interface
  93. GetScalesGetter() scaleclient.ScalesGetter
  94. SetClient(clientset.Interface)
  95. SetScalesClient(scaleclient.ScalesGetter)
  96. GetReplicas() int
  97. GetLabelValue(string) (string, bool)
  98. GetGroupResource() schema.GroupResource
  99. GetGroupVersionResource() schema.GroupVersionResource
  100. }
  101. type RCConfig struct {
  102. Affinity *v1.Affinity
  103. Client clientset.Interface
  104. ScalesGetter scaleclient.ScalesGetter
  105. Image string
  106. Command []string
  107. Name string
  108. Namespace string
  109. PollInterval time.Duration
  110. Timeout time.Duration
  111. PodStatusFile *os.File
  112. Replicas int
  113. CpuRequest int64 // millicores
  114. CpuLimit int64 // millicores
  115. MemRequest int64 // bytes
  116. MemLimit int64 // bytes
  117. GpuLimit int64 // count
  118. ReadinessProbe *v1.Probe
  119. DNSPolicy *v1.DNSPolicy
  120. PriorityClassName string
  121. TerminationGracePeriodSeconds *int64
  122. Lifecycle *v1.Lifecycle
  123. // Env vars, set the same for every pod.
  124. Env map[string]string
  125. // Extra labels and annotations added to every pod.
  126. Labels map[string]string
  127. Annotations map[string]string
  128. // Node selector for pods in the RC.
  129. NodeSelector map[string]string
  130. // Tolerations for pods in the RC.
  131. Tolerations []v1.Toleration
  132. // Ports to declare in the container (map of name to containerPort).
  133. Ports map[string]int
  134. // Ports to declare in the container as host and container ports.
  135. HostPorts map[string]int
  136. Volumes []v1.Volume
  137. VolumeMounts []v1.VolumeMount
  138. // Pointer to a list of pods; if non-nil, will be set to a list of pods
  139. // created by this RC by RunRC.
  140. CreatedPods *[]*v1.Pod
  141. // Maximum allowable container failures. If exceeded, RunRC returns an error.
  142. // Defaults to replicas*0.1 if unspecified.
  143. MaxContainerFailures *int
  144. // Maximum allowed pod deletions count. If exceeded, RunRC returns an error.
  145. // Defaults to 0.
  146. MaxAllowedPodDeletions int
  147. // If set to false starting RC will print progress, otherwise only errors will be printed.
  148. Silent bool
  149. // If set this function will be used to print log lines instead of klog.
  150. LogFunc func(fmt string, args ...interface{})
  151. // If set those functions will be used to gather data from Nodes - in integration tests where no
  152. // kubelets are running those variables should be nil.
  153. NodeDumpFunc func(c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
  154. ContainerDumpFunc func(c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
  155. // Names of the secrets and configmaps to mount.
  156. SecretNames []string
  157. ConfigMapNames []string
  158. ServiceAccountTokenProjections int
  159. }
  160. func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
  161. if rc.LogFunc != nil {
  162. rc.LogFunc(fmt, args...)
  163. }
  164. klog.Infof(fmt, args...)
  165. }
  166. type DeploymentConfig struct {
  167. RCConfig
  168. }
  169. type ReplicaSetConfig struct {
  170. RCConfig
  171. }
  172. type JobConfig struct {
  173. RCConfig
  174. }
  175. // podInfo contains pod information useful for debugging e2e tests.
  176. type podInfo struct {
  177. oldHostname string
  178. oldPhase string
  179. hostname string
  180. phase string
  181. }
  182. // PodDiff is a map of pod name to podInfos
  183. type PodDiff map[string]*podInfo
  184. // Print formats and prints the give PodDiff.
  185. func (p PodDiff) String(ignorePhases sets.String) string {
  186. ret := ""
  187. for name, info := range p {
  188. if ignorePhases.Has(info.phase) {
  189. continue
  190. }
  191. if info.phase == nonExist {
  192. ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
  193. continue
  194. }
  195. phaseChange, hostChange := false, false
  196. msg := fmt.Sprintf("Pod %v ", name)
  197. if info.oldPhase != info.phase {
  198. phaseChange = true
  199. if info.oldPhase == nonExist {
  200. msg += fmt.Sprintf("in phase %v ", info.phase)
  201. } else {
  202. msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
  203. }
  204. }
  205. if info.oldHostname != info.hostname {
  206. hostChange = true
  207. if info.oldHostname == nonExist || info.oldHostname == "" {
  208. msg += fmt.Sprintf("assigned host %v ", info.hostname)
  209. } else {
  210. msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
  211. }
  212. }
  213. if phaseChange || hostChange {
  214. ret += msg + "\n"
  215. }
  216. }
  217. return ret
  218. }
  219. // DeletedPods returns a slice of pods that were present at the beginning
  220. // and then disappeared.
  221. func (p PodDiff) DeletedPods() []string {
  222. var deletedPods []string
  223. for podName, podInfo := range p {
  224. if podInfo.hostname == nonExist {
  225. deletedPods = append(deletedPods, podName)
  226. }
  227. }
  228. return deletedPods
  229. }
  230. // Diff computes a PodDiff given 2 lists of pods.
  231. func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
  232. podInfoMap := PodDiff{}
  233. // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
  234. for _, pod := range curPods {
  235. podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
  236. }
  237. // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
  238. for _, pod := range oldPods {
  239. if info, ok := podInfoMap[pod.Name]; ok {
  240. info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
  241. } else {
  242. podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
  243. }
  244. }
  245. return podInfoMap
  246. }
  247. // RunDeployment Launches (and verifies correctness) of a Deployment
  248. // and will wait for all pods it spawns to become "Running".
  249. // It's the caller's responsibility to clean up externally (i.e. use the
  250. // namespace lifecycle for handling Cleanup).
  251. func RunDeployment(config DeploymentConfig) error {
  252. err := config.create()
  253. if err != nil {
  254. return err
  255. }
  256. return config.start()
  257. }
  258. func (config *DeploymentConfig) Run() error {
  259. return RunDeployment(*config)
  260. }
  261. func (config *DeploymentConfig) GetKind() schema.GroupKind {
  262. return extensionsinternal.Kind("Deployment")
  263. }
  264. func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
  265. return extensionsinternal.Resource("deployments")
  266. }
  267. func (config *DeploymentConfig) GetGroupVersionResource() schema.GroupVersionResource {
  268. return extensionsinternal.SchemeGroupVersion.WithResource("deployments")
  269. }
  270. func (config *DeploymentConfig) create() error {
  271. deployment := &apps.Deployment{
  272. ObjectMeta: metav1.ObjectMeta{
  273. Name: config.Name,
  274. },
  275. Spec: apps.DeploymentSpec{
  276. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  277. Selector: &metav1.LabelSelector{
  278. MatchLabels: map[string]string{
  279. "name": config.Name,
  280. },
  281. },
  282. Template: v1.PodTemplateSpec{
  283. ObjectMeta: metav1.ObjectMeta{
  284. Labels: map[string]string{"name": config.Name},
  285. Annotations: config.Annotations,
  286. },
  287. Spec: v1.PodSpec{
  288. Affinity: config.Affinity,
  289. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  290. Containers: []v1.Container{
  291. {
  292. Name: config.Name,
  293. Image: config.Image,
  294. Command: config.Command,
  295. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  296. Lifecycle: config.Lifecycle,
  297. },
  298. },
  299. },
  300. },
  301. },
  302. }
  303. if len(config.SecretNames) > 0 {
  304. attachSecrets(&deployment.Spec.Template, config.SecretNames)
  305. }
  306. if len(config.ConfigMapNames) > 0 {
  307. attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
  308. }
  309. for i := 0; i < config.ServiceAccountTokenProjections; i++ {
  310. attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
  311. }
  312. config.applyTo(&deployment.Spec.Template)
  313. if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
  314. return fmt.Errorf("Error creating deployment: %v", err)
  315. }
  316. config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
  317. return nil
  318. }
  319. // RunReplicaSet launches (and verifies correctness) of a ReplicaSet
  320. // and waits until all the pods it launches to reach the "Running" state.
  321. // It's the caller's responsibility to clean up externally (i.e. use the
  322. // namespace lifecycle for handling Cleanup).
  323. func RunReplicaSet(config ReplicaSetConfig) error {
  324. err := config.create()
  325. if err != nil {
  326. return err
  327. }
  328. return config.start()
  329. }
  330. func (config *ReplicaSetConfig) Run() error {
  331. return RunReplicaSet(*config)
  332. }
  333. func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
  334. return extensionsinternal.Kind("ReplicaSet")
  335. }
  336. func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
  337. return extensionsinternal.Resource("replicasets")
  338. }
  339. func (config *ReplicaSetConfig) GetGroupVersionResource() schema.GroupVersionResource {
  340. return extensionsinternal.SchemeGroupVersion.WithResource("replicasets")
  341. }
  342. func (config *ReplicaSetConfig) create() error {
  343. rs := &apps.ReplicaSet{
  344. ObjectMeta: metav1.ObjectMeta{
  345. Name: config.Name,
  346. },
  347. Spec: apps.ReplicaSetSpec{
  348. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  349. Selector: &metav1.LabelSelector{
  350. MatchLabels: map[string]string{
  351. "name": config.Name,
  352. },
  353. },
  354. Template: v1.PodTemplateSpec{
  355. ObjectMeta: metav1.ObjectMeta{
  356. Labels: map[string]string{"name": config.Name},
  357. Annotations: config.Annotations,
  358. },
  359. Spec: v1.PodSpec{
  360. Affinity: config.Affinity,
  361. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  362. Containers: []v1.Container{
  363. {
  364. Name: config.Name,
  365. Image: config.Image,
  366. Command: config.Command,
  367. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  368. Lifecycle: config.Lifecycle,
  369. },
  370. },
  371. },
  372. },
  373. },
  374. }
  375. if len(config.SecretNames) > 0 {
  376. attachSecrets(&rs.Spec.Template, config.SecretNames)
  377. }
  378. if len(config.ConfigMapNames) > 0 {
  379. attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
  380. }
  381. config.applyTo(&rs.Spec.Template)
  382. if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
  383. return fmt.Errorf("Error creating replica set: %v", err)
  384. }
  385. config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
  386. return nil
  387. }
  388. // RunJob baunches (and verifies correctness) of a Job
  389. // and will wait for all pods it spawns to become "Running".
  390. // It's the caller's responsibility to clean up externally (i.e. use the
  391. // namespace lifecycle for handling Cleanup).
  392. func RunJob(config JobConfig) error {
  393. err := config.create()
  394. if err != nil {
  395. return err
  396. }
  397. return config.start()
  398. }
  399. func (config *JobConfig) Run() error {
  400. return RunJob(*config)
  401. }
  402. func (config *JobConfig) GetKind() schema.GroupKind {
  403. return batchinternal.Kind("Job")
  404. }
  405. func (config *JobConfig) GetGroupResource() schema.GroupResource {
  406. return batchinternal.Resource("jobs")
  407. }
  408. func (config *JobConfig) GetGroupVersionResource() schema.GroupVersionResource {
  409. return batchinternal.SchemeGroupVersion.WithResource("jobs")
  410. }
  411. func (config *JobConfig) create() error {
  412. job := &batch.Job{
  413. ObjectMeta: metav1.ObjectMeta{
  414. Name: config.Name,
  415. },
  416. Spec: batch.JobSpec{
  417. Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  418. Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  419. Template: v1.PodTemplateSpec{
  420. ObjectMeta: metav1.ObjectMeta{
  421. Labels: map[string]string{"name": config.Name},
  422. Annotations: config.Annotations,
  423. },
  424. Spec: v1.PodSpec{
  425. Affinity: config.Affinity,
  426. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  427. Containers: []v1.Container{
  428. {
  429. Name: config.Name,
  430. Image: config.Image,
  431. Command: config.Command,
  432. Lifecycle: config.Lifecycle,
  433. },
  434. },
  435. RestartPolicy: v1.RestartPolicyOnFailure,
  436. },
  437. },
  438. },
  439. }
  440. if len(config.SecretNames) > 0 {
  441. attachSecrets(&job.Spec.Template, config.SecretNames)
  442. }
  443. if len(config.ConfigMapNames) > 0 {
  444. attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
  445. }
  446. config.applyTo(&job.Spec.Template)
  447. if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
  448. return fmt.Errorf("Error creating job: %v", err)
  449. }
  450. config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
  451. return nil
  452. }
  453. // RunRC Launches (and verifies correctness) of a Replication Controller
  454. // and will wait for all pods it spawns to become "Running".
  455. // It's the caller's responsibility to clean up externally (i.e. use the
  456. // namespace lifecycle for handling Cleanup).
  457. func RunRC(config RCConfig) error {
  458. err := config.create()
  459. if err != nil {
  460. return err
  461. }
  462. return config.start()
  463. }
  464. func (config *RCConfig) Run() error {
  465. return RunRC(*config)
  466. }
  467. func (config *RCConfig) GetName() string {
  468. return config.Name
  469. }
  470. func (config *RCConfig) GetNamespace() string {
  471. return config.Namespace
  472. }
  473. func (config *RCConfig) GetKind() schema.GroupKind {
  474. return api.Kind("ReplicationController")
  475. }
  476. func (config *RCConfig) GetGroupResource() schema.GroupResource {
  477. return api.Resource("replicationcontrollers")
  478. }
  479. func (config *RCConfig) GetGroupVersionResource() schema.GroupVersionResource {
  480. return api.SchemeGroupVersion.WithResource("replicationcontrollers")
  481. }
  482. func (config *RCConfig) GetClient() clientset.Interface {
  483. return config.Client
  484. }
  485. func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
  486. return config.ScalesGetter
  487. }
  488. func (config *RCConfig) SetClient(c clientset.Interface) {
  489. config.Client = c
  490. }
  491. func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
  492. config.ScalesGetter = getter
  493. }
  494. func (config *RCConfig) GetReplicas() int {
  495. return config.Replicas
  496. }
  497. func (config *RCConfig) GetLabelValue(key string) (string, bool) {
  498. value, found := config.Labels[key]
  499. return value, found
  500. }
  501. func (config *RCConfig) create() error {
  502. dnsDefault := v1.DNSDefault
  503. if config.DNSPolicy == nil {
  504. config.DNSPolicy = &dnsDefault
  505. }
  506. one := int64(1)
  507. rc := &v1.ReplicationController{
  508. ObjectMeta: metav1.ObjectMeta{
  509. Name: config.Name,
  510. },
  511. Spec: v1.ReplicationControllerSpec{
  512. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  513. Selector: map[string]string{
  514. "name": config.Name,
  515. },
  516. Template: &v1.PodTemplateSpec{
  517. ObjectMeta: metav1.ObjectMeta{
  518. Labels: map[string]string{"name": config.Name},
  519. Annotations: config.Annotations,
  520. },
  521. Spec: v1.PodSpec{
  522. Affinity: config.Affinity,
  523. Containers: []v1.Container{
  524. {
  525. Name: config.Name,
  526. Image: config.Image,
  527. Command: config.Command,
  528. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  529. ReadinessProbe: config.ReadinessProbe,
  530. Lifecycle: config.Lifecycle,
  531. },
  532. },
  533. DNSPolicy: *config.DNSPolicy,
  534. NodeSelector: config.NodeSelector,
  535. Tolerations: config.Tolerations,
  536. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(&one),
  537. PriorityClassName: config.PriorityClassName,
  538. },
  539. },
  540. },
  541. }
  542. if len(config.SecretNames) > 0 {
  543. attachSecrets(rc.Spec.Template, config.SecretNames)
  544. }
  545. if len(config.ConfigMapNames) > 0 {
  546. attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
  547. }
  548. config.applyTo(rc.Spec.Template)
  549. if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
  550. return fmt.Errorf("Error creating replication controller: %v", err)
  551. }
  552. config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
  553. return nil
  554. }
  555. func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
  556. if config.Env != nil {
  557. for k, v := range config.Env {
  558. c := &template.Spec.Containers[0]
  559. c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
  560. }
  561. }
  562. if config.Labels != nil {
  563. for k, v := range config.Labels {
  564. template.ObjectMeta.Labels[k] = v
  565. }
  566. }
  567. if config.NodeSelector != nil {
  568. template.Spec.NodeSelector = make(map[string]string)
  569. for k, v := range config.NodeSelector {
  570. template.Spec.NodeSelector[k] = v
  571. }
  572. }
  573. if config.Tolerations != nil {
  574. template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
  575. }
  576. if config.Ports != nil {
  577. for k, v := range config.Ports {
  578. c := &template.Spec.Containers[0]
  579. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
  580. }
  581. }
  582. if config.HostPorts != nil {
  583. for k, v := range config.HostPorts {
  584. c := &template.Spec.Containers[0]
  585. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
  586. }
  587. }
  588. if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
  589. template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
  590. }
  591. if config.CpuLimit > 0 {
  592. template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
  593. }
  594. if config.MemLimit > 0 {
  595. template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
  596. }
  597. if config.CpuRequest > 0 || config.MemRequest > 0 {
  598. template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
  599. }
  600. if config.CpuRequest > 0 {
  601. template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
  602. }
  603. if config.MemRequest > 0 {
  604. template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
  605. }
  606. if config.GpuLimit > 0 {
  607. template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
  608. }
  609. if config.Lifecycle != nil {
  610. template.Spec.Containers[0].Lifecycle = config.Lifecycle
  611. }
  612. if len(config.Volumes) > 0 {
  613. template.Spec.Volumes = config.Volumes
  614. }
  615. if len(config.VolumeMounts) > 0 {
  616. template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
  617. }
  618. if config.PriorityClassName != "" {
  619. template.Spec.PriorityClassName = config.PriorityClassName
  620. }
  621. }
  622. type RCStartupStatus struct {
  623. Expected int
  624. Terminating int
  625. Running int
  626. RunningButNotReady int
  627. Waiting int
  628. Pending int
  629. Scheduled int
  630. Unknown int
  631. Inactive int
  632. FailedContainers int
  633. Created []*v1.Pod
  634. ContainerRestartNodes sets.String
  635. }
  636. func (s *RCStartupStatus) String(name string) string {
  637. return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
  638. name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
  639. }
  640. func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
  641. startupStatus := RCStartupStatus{
  642. Expected: expected,
  643. Created: make([]*v1.Pod, 0, expected),
  644. ContainerRestartNodes: sets.NewString(),
  645. }
  646. for _, p := range pods {
  647. if p.DeletionTimestamp != nil {
  648. startupStatus.Terminating++
  649. continue
  650. }
  651. startupStatus.Created = append(startupStatus.Created, p)
  652. if p.Status.Phase == v1.PodRunning {
  653. ready := false
  654. for _, c := range p.Status.Conditions {
  655. if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
  656. ready = true
  657. break
  658. }
  659. }
  660. if ready {
  661. // Only count a pod is running when it is also ready.
  662. startupStatus.Running++
  663. } else {
  664. startupStatus.RunningButNotReady++
  665. }
  666. for _, v := range FailedContainers(p) {
  667. startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
  668. startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
  669. }
  670. } else if p.Status.Phase == v1.PodPending {
  671. if p.Spec.NodeName == "" {
  672. startupStatus.Waiting++
  673. } else {
  674. startupStatus.Pending++
  675. }
  676. } else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
  677. startupStatus.Inactive++
  678. } else if p.Status.Phase == v1.PodUnknown {
  679. startupStatus.Unknown++
  680. }
  681. // Record count of scheduled pods (useful for computing scheduler throughput).
  682. if p.Spec.NodeName != "" {
  683. startupStatus.Scheduled++
  684. }
  685. }
  686. return startupStatus
  687. }
  688. func (config *RCConfig) start() error {
  689. // Don't force tests to fail if they don't care about containers restarting.
  690. var maxContainerFailures int
  691. if config.MaxContainerFailures == nil {
  692. maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
  693. } else {
  694. maxContainerFailures = *config.MaxContainerFailures
  695. }
  696. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
  697. ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
  698. if err != nil {
  699. return err
  700. }
  701. defer ps.Stop()
  702. interval := config.PollInterval
  703. if interval <= 0 {
  704. interval = 10 * time.Second
  705. }
  706. timeout := config.Timeout
  707. if timeout <= 0 {
  708. timeout = 5 * time.Minute
  709. }
  710. oldPods := make([]*v1.Pod, 0)
  711. oldRunning := 0
  712. lastChange := time.Now()
  713. podDeletionsCount := 0
  714. for oldRunning != config.Replicas {
  715. time.Sleep(interval)
  716. pods := ps.List()
  717. startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
  718. if config.CreatedPods != nil {
  719. *config.CreatedPods = startupStatus.Created
  720. }
  721. if !config.Silent {
  722. config.RCConfigLog(startupStatus.String(config.Name))
  723. }
  724. if config.PodStatusFile != nil {
  725. fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
  726. }
  727. if startupStatus.FailedContainers > maxContainerFailures {
  728. if config.NodeDumpFunc != nil {
  729. config.NodeDumpFunc(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
  730. }
  731. if config.ContainerDumpFunc != nil {
  732. // Get the logs from the failed containers to help diagnose what caused them to fail
  733. config.ContainerDumpFunc(config.Client, config.Namespace, config.RCConfigLog)
  734. }
  735. return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
  736. }
  737. diff := Diff(oldPods, pods)
  738. deletedPods := diff.DeletedPods()
  739. podDeletionsCount += len(deletedPods)
  740. if podDeletionsCount > config.MaxAllowedPodDeletions {
  741. // Number of pods which disappeared is over threshold
  742. err := fmt.Errorf("%d pods disappeared for %s: %v", podDeletionsCount, config.Name, strings.Join(deletedPods, ", "))
  743. config.RCConfigLog(err.Error())
  744. config.RCConfigLog(diff.String(sets.NewString()))
  745. return err
  746. }
  747. if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
  748. lastChange = time.Now()
  749. }
  750. oldPods = pods
  751. oldRunning = startupStatus.Running
  752. if time.Since(lastChange) > timeout {
  753. break
  754. }
  755. }
  756. if oldRunning != config.Replicas {
  757. // List only pods from a given replication controller.
  758. options := metav1.ListOptions{LabelSelector: label.String()}
  759. if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(context.TODO(), options); err == nil {
  760. for _, pod := range pods.Items {
  761. config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
  762. }
  763. } else {
  764. config.RCConfigLog("Can't list pod debug info: %v", err)
  765. }
  766. return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
  767. }
  768. return nil
  769. }
  770. // Simplified version of RunRC, that does not create RC, but creates plain Pods.
  771. // Optionally waits for pods to start running (if waitForRunning == true).
  772. // The number of replicas must be non-zero.
  773. func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
  774. pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
  775. // no pod to start
  776. if replicas < 1 {
  777. panic("StartPods: number of replicas must be non-zero")
  778. }
  779. startPodsID := string(uuid.NewUUID()) // So that we can label and find them
  780. for i := 0; i < replicas; i++ {
  781. podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
  782. pod.ObjectMeta.Name = podName
  783. pod.ObjectMeta.Labels["name"] = podName
  784. pod.ObjectMeta.Labels["startPodsID"] = startPodsID
  785. pod.Spec.Containers[0].Name = podName
  786. if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
  787. return err
  788. }
  789. }
  790. logFunc("Waiting for running...")
  791. if waitForRunning {
  792. label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
  793. err := WaitForPodsWithLabelRunning(c, namespace, label)
  794. if err != nil {
  795. return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
  796. }
  797. }
  798. return nil
  799. }
  800. // Wait up to 10 minutes for all matching pods to become Running and at least one
  801. // matching pod exists.
  802. func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
  803. return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
  804. }
  805. // Wait up to 10 minutes for at least 'replicas' many pods to be Running and at least
  806. // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
  807. func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
  808. running := false
  809. ps, err := NewPodStore(c, ns, label, fields.Everything())
  810. if err != nil {
  811. return err
  812. }
  813. defer ps.Stop()
  814. for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
  815. pods := ps.List()
  816. if len(pods) == 0 {
  817. continue
  818. }
  819. runningPodsCount := 0
  820. for _, p := range pods {
  821. if p.Status.Phase == v1.PodRunning {
  822. runningPodsCount++
  823. }
  824. }
  825. if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
  826. continue
  827. }
  828. running = true
  829. break
  830. }
  831. if !running {
  832. return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
  833. }
  834. return nil
  835. }
  836. type CountToStrategy struct {
  837. Count int
  838. Strategy PrepareNodeStrategy
  839. }
  840. type TestNodePreparer interface {
  841. PrepareNodes() error
  842. CleanupNodes() error
  843. }
  844. type PrepareNodeStrategy interface {
  845. // Modify pre-created Node objects before the test starts.
  846. PreparePatch(node *v1.Node) []byte
  847. // Create or modify any objects that depend on the node before the test starts.
  848. // Caller will re-try when http.StatusConflict error is returned.
  849. PrepareDependentObjects(node *v1.Node, client clientset.Interface) error
  850. // Clean up any node modifications after the test finishes.
  851. CleanupNode(node *v1.Node) *v1.Node
  852. // Clean up any objects that depend on the node after the test finishes.
  853. // Caller will re-try when http.StatusConflict error is returned.
  854. CleanupDependentObjects(nodeName string, client clientset.Interface) error
  855. }
  856. type TrivialNodePrepareStrategy struct{}
  857. var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
  858. func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  859. return []byte{}
  860. }
  861. func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  862. nodeCopy := *node
  863. return &nodeCopy
  864. }
  865. func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  866. return nil
  867. }
  868. func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  869. return nil
  870. }
  871. type LabelNodePrepareStrategy struct {
  872. LabelKey string
  873. LabelValue string
  874. }
  875. var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
  876. func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy {
  877. return &LabelNodePrepareStrategy{
  878. LabelKey: labelKey,
  879. LabelValue: labelValue,
  880. }
  881. }
  882. func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  883. labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, s.LabelValue)
  884. patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  885. return []byte(patch)
  886. }
  887. func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  888. nodeCopy := node.DeepCopy()
  889. if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  890. delete(nodeCopy.Labels, s.LabelKey)
  891. }
  892. return nodeCopy
  893. }
  894. func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  895. return nil
  896. }
  897. func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  898. return nil
  899. }
  900. // NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable.
  901. // csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is
  902. // set to nil.
  903. type NodeAllocatableStrategy struct {
  904. // Node.status.allocatable to fill to all nodes.
  905. NodeAllocatable map[v1.ResourceName]string
  906. // Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>].
  907. CsiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources
  908. // List of in-tree volume plugins migrated to CSI.
  909. MigratedPlugins []string
  910. }
  911. var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
  912. func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
  913. return &NodeAllocatableStrategy{
  914. NodeAllocatable: nodeAllocatable,
  915. CsiNodeAllocatable: csiNodeAllocatable,
  916. MigratedPlugins: migratedPlugins,
  917. }
  918. }
  919. func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
  920. newNode := node.DeepCopy()
  921. for name, value := range s.NodeAllocatable {
  922. newNode.Status.Allocatable[name] = resource.MustParse(value)
  923. }
  924. oldJSON, err := json.Marshal(node)
  925. if err != nil {
  926. panic(err)
  927. }
  928. newJSON, err := json.Marshal(newNode)
  929. if err != nil {
  930. panic(err)
  931. }
  932. patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
  933. if err != nil {
  934. panic(err)
  935. }
  936. return patch
  937. }
  938. func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node {
  939. nodeCopy := node.DeepCopy()
  940. for name := range s.NodeAllocatable {
  941. delete(nodeCopy.Status.Allocatable, name)
  942. }
  943. return nodeCopy
  944. }
  945. func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error {
  946. csiNode := &storagev1beta1.CSINode{
  947. ObjectMeta: metav1.ObjectMeta{
  948. Name: nodeName,
  949. Annotations: map[string]string{
  950. v1.MigratedPluginsAnnotationKey: strings.Join(s.MigratedPlugins, ","),
  951. },
  952. },
  953. Spec: storagev1beta1.CSINodeSpec{
  954. Drivers: []storagev1beta1.CSINodeDriver{},
  955. },
  956. }
  957. for driver, allocatable := range s.CsiNodeAllocatable {
  958. d := storagev1beta1.CSINodeDriver{
  959. Name: driver,
  960. Allocatable: allocatable,
  961. NodeID: nodeName,
  962. }
  963. csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  964. }
  965. _, err := client.StorageV1beta1().CSINodes().Create(context.TODO(), csiNode, metav1.CreateOptions{})
  966. if apierrors.IsAlreadyExists(err) {
  967. // Something created CSINode instance after we checked it did not exist.
  968. // Make the caller to re-try PrepareDependentObjects by returning Conflict error
  969. err = apierrors.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
  970. }
  971. return err
  972. }
  973. func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1beta1.CSINode, client clientset.Interface) error {
  974. for driverName, allocatable := range s.CsiNodeAllocatable {
  975. found := false
  976. for i, driver := range csiNode.Spec.Drivers {
  977. if driver.Name == driverName {
  978. found = true
  979. csiNode.Spec.Drivers[i].Allocatable = allocatable
  980. break
  981. }
  982. }
  983. if !found {
  984. d := storagev1beta1.CSINodeDriver{
  985. Name: driverName,
  986. Allocatable: allocatable,
  987. }
  988. csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  989. }
  990. }
  991. csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.MigratedPlugins, ",")
  992. _, err := client.StorageV1beta1().CSINodes().Update(context.TODO(), csiNode, metav1.UpdateOptions{})
  993. return err
  994. }
  995. func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  996. csiNode, err := client.StorageV1beta1().CSINodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
  997. if err != nil {
  998. if apierrors.IsNotFound(err) {
  999. return s.createCSINode(node.Name, client)
  1000. }
  1001. return err
  1002. }
  1003. return s.updateCSINode(csiNode, client)
  1004. }
  1005. func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  1006. csiNode, err := client.StorageV1beta1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  1007. if err != nil {
  1008. if apierrors.IsNotFound(err) {
  1009. return nil
  1010. }
  1011. return err
  1012. }
  1013. for driverName := range s.CsiNodeAllocatable {
  1014. for i, driver := range csiNode.Spec.Drivers {
  1015. if driver.Name == driverName {
  1016. csiNode.Spec.Drivers[i].Allocatable = nil
  1017. }
  1018. }
  1019. }
  1020. return s.updateCSINode(csiNode, client)
  1021. }
  1022. // UniqueNodeLabelStrategy sets a unique label for each node.
  1023. type UniqueNodeLabelStrategy struct {
  1024. LabelKey string
  1025. }
  1026. var _ PrepareNodeStrategy = &UniqueNodeLabelStrategy{}
  1027. func NewUniqueNodeLabelStrategy(labelKey string) *UniqueNodeLabelStrategy {
  1028. return &UniqueNodeLabelStrategy{
  1029. LabelKey: labelKey,
  1030. }
  1031. }
  1032. func (s *UniqueNodeLabelStrategy) PreparePatch(*v1.Node) []byte {
  1033. labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, string(uuid.NewUUID()))
  1034. patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  1035. return []byte(patch)
  1036. }
  1037. func (s *UniqueNodeLabelStrategy) CleanupNode(node *v1.Node) *v1.Node {
  1038. nodeCopy := node.DeepCopy()
  1039. if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  1040. delete(nodeCopy.Labels, s.LabelKey)
  1041. }
  1042. return nodeCopy
  1043. }
  1044. func (*UniqueNodeLabelStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  1045. return nil
  1046. }
  1047. func (*UniqueNodeLabelStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  1048. return nil
  1049. }
  1050. func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
  1051. var err error
  1052. patch := strategy.PreparePatch(node)
  1053. if len(patch) == 0 {
  1054. return nil
  1055. }
  1056. for attempt := 0; attempt < retries; attempt++ {
  1057. if _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err == nil {
  1058. break
  1059. }
  1060. if !apierrors.IsConflict(err) {
  1061. return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
  1062. }
  1063. time.Sleep(100 * time.Millisecond)
  1064. }
  1065. if err != nil {
  1066. return fmt.Errorf("Too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
  1067. }
  1068. for attempt := 0; attempt < retries; attempt++ {
  1069. if err = strategy.PrepareDependentObjects(node, client); err == nil {
  1070. break
  1071. }
  1072. if !apierrors.IsConflict(err) {
  1073. return fmt.Errorf("Error while preparing objects for node %s: %s", node.Name, err)
  1074. }
  1075. time.Sleep(100 * time.Millisecond)
  1076. }
  1077. if err != nil {
  1078. return fmt.Errorf("Too many conflicts when creating objects for node %s: %s", node.Name, err)
  1079. }
  1080. return nil
  1081. }
  1082. func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
  1083. var err error
  1084. for attempt := 0; attempt < retries; attempt++ {
  1085. node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  1086. if err != nil {
  1087. return fmt.Errorf("Skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
  1088. }
  1089. updatedNode := strategy.CleanupNode(node)
  1090. if apiequality.Semantic.DeepEqual(node, updatedNode) {
  1091. return nil
  1092. }
  1093. if _, err = client.CoreV1().Nodes().Update(context.TODO(), updatedNode, metav1.UpdateOptions{}); err == nil {
  1094. break
  1095. }
  1096. if !apierrors.IsConflict(err) {
  1097. return fmt.Errorf("Error when updating Node %v: %v", nodeName, err)
  1098. }
  1099. time.Sleep(100 * time.Millisecond)
  1100. }
  1101. if err != nil {
  1102. return fmt.Errorf("Too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
  1103. }
  1104. for attempt := 0; attempt < retries; attempt++ {
  1105. err = strategy.CleanupDependentObjects(nodeName, client)
  1106. if err == nil {
  1107. break
  1108. }
  1109. if !apierrors.IsConflict(err) {
  1110. return fmt.Errorf("Error when cleaning up Node %v objects: %v", nodeName, err)
  1111. }
  1112. time.Sleep(100 * time.Millisecond)
  1113. }
  1114. if err != nil {
  1115. return fmt.Errorf("Too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
  1116. }
  1117. return nil
  1118. }
  1119. type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
  1120. type CountToPodStrategy struct {
  1121. Count int
  1122. Strategy TestPodCreateStrategy
  1123. }
  1124. type TestPodCreatorConfig map[string][]CountToPodStrategy
  1125. func NewTestPodCreatorConfig() *TestPodCreatorConfig {
  1126. config := make(TestPodCreatorConfig)
  1127. return &config
  1128. }
  1129. func (c *TestPodCreatorConfig) AddStrategy(
  1130. namespace string, podCount int, strategy TestPodCreateStrategy) {
  1131. (*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
  1132. }
  1133. type TestPodCreator struct {
  1134. Client clientset.Interface
  1135. // namespace -> count -> strategy
  1136. Config *TestPodCreatorConfig
  1137. }
  1138. func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
  1139. return &TestPodCreator{
  1140. Client: client,
  1141. Config: config,
  1142. }
  1143. }
  1144. func (c *TestPodCreator) CreatePods() error {
  1145. for ns, v := range *(c.Config) {
  1146. for _, countToStrategy := range v {
  1147. if err := countToStrategy.Strategy(c.Client, ns, countToStrategy.Count); err != nil {
  1148. return err
  1149. }
  1150. }
  1151. }
  1152. return nil
  1153. }
  1154. func MakePodSpec() v1.PodSpec {
  1155. return v1.PodSpec{
  1156. Containers: []v1.Container{{
  1157. Name: "pause",
  1158. Image: "k8s.gcr.io/pause:3.2",
  1159. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  1160. Resources: v1.ResourceRequirements{
  1161. Limits: v1.ResourceList{
  1162. v1.ResourceCPU: resource.MustParse("100m"),
  1163. v1.ResourceMemory: resource.MustParse("500Mi"),
  1164. },
  1165. Requests: v1.ResourceList{
  1166. v1.ResourceCPU: resource.MustParse("100m"),
  1167. v1.ResourceMemory: resource.MustParse("500Mi"),
  1168. },
  1169. },
  1170. }},
  1171. }
  1172. }
  1173. func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
  1174. if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
  1175. return fmt.Errorf("Error creating pod: %v", err)
  1176. }
  1177. return nil
  1178. }
  1179. func CreatePod(client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
  1180. var createError error
  1181. lock := sync.Mutex{}
  1182. createPodFunc := func(i int) {
  1183. if err := makeCreatePod(client, namespace, podTemplate); err != nil {
  1184. lock.Lock()
  1185. defer lock.Unlock()
  1186. createError = err
  1187. }
  1188. }
  1189. if podCount < 30 {
  1190. workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
  1191. } else {
  1192. workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
  1193. }
  1194. return createError
  1195. }
  1196. func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
  1197. var createError error
  1198. lock := sync.Mutex{}
  1199. createPodFunc := func(i int) {
  1200. pvcName := fmt.Sprintf("pvc-%d", i)
  1201. // pvc
  1202. pvc := claimTemplate.DeepCopy()
  1203. pvc.Name = pvcName
  1204. // pv
  1205. pv := factory(i)
  1206. // PVs are cluster-wide resources.
  1207. // Prepend a namespace to make the name globally unique.
  1208. pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
  1209. if bindVolume {
  1210. // bind pv to "pvc-$i"
  1211. pv.Spec.ClaimRef = &v1.ObjectReference{
  1212. Kind: "PersistentVolumeClaim",
  1213. Namespace: namespace,
  1214. Name: pvcName,
  1215. APIVersion: "v1",
  1216. }
  1217. pv.Status.Phase = v1.VolumeBound
  1218. // bind pvc to "pv-$i"
  1219. // pvc.Spec.VolumeName = pv.Name
  1220. pvc.Status.Phase = v1.ClaimBound
  1221. } else {
  1222. pv.Status.Phase = v1.VolumeAvailable
  1223. }
  1224. if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
  1225. lock.Lock()
  1226. defer lock.Unlock()
  1227. createError = fmt.Errorf("error creating PV: %s", err)
  1228. return
  1229. }
  1230. // We need to update status separately, as creating persistentvolumes resets status to the default one
  1231. // (so with Status.Phase will be equal to PersistentVolumePhase).
  1232. if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), pv, metav1.UpdateOptions{}); err != nil {
  1233. lock.Lock()
  1234. defer lock.Unlock()
  1235. createError = fmt.Errorf("error creating PV: %s", err)
  1236. return
  1237. }
  1238. if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
  1239. lock.Lock()
  1240. defer lock.Unlock()
  1241. createError = fmt.Errorf("error creating PVC: %s", err)
  1242. return
  1243. }
  1244. // pod
  1245. pod := podTemplate.DeepCopy()
  1246. pod.Spec.Volumes = []v1.Volume{
  1247. {
  1248. Name: "vol",
  1249. VolumeSource: v1.VolumeSource{
  1250. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1251. ClaimName: pvcName,
  1252. },
  1253. },
  1254. },
  1255. }
  1256. if err := makeCreatePod(client, namespace, pod); err != nil {
  1257. lock.Lock()
  1258. defer lock.Unlock()
  1259. createError = err
  1260. return
  1261. }
  1262. }
  1263. if count < 30 {
  1264. workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc)
  1265. } else {
  1266. workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc)
  1267. }
  1268. return createError
  1269. }
  1270. func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
  1271. rc := &v1.ReplicationController{
  1272. ObjectMeta: metav1.ObjectMeta{
  1273. Name: controllerName,
  1274. },
  1275. Spec: v1.ReplicationControllerSpec{
  1276. Replicas: func(i int) *int32 { x := int32(i); return &x }(podCount),
  1277. Selector: map[string]string{"name": controllerName},
  1278. Template: &v1.PodTemplateSpec{
  1279. ObjectMeta: metav1.ObjectMeta{
  1280. Labels: map[string]string{"name": controllerName},
  1281. },
  1282. Spec: podTemplate.Spec,
  1283. },
  1284. },
  1285. }
  1286. if err := CreateRCWithRetries(client, namespace, rc); err != nil {
  1287. return fmt.Errorf("Error creating replication controller: %v", err)
  1288. }
  1289. return nil
  1290. }
  1291. func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
  1292. return func(client clientset.Interface, namespace string, podCount int) error {
  1293. return CreatePod(client, namespace, podCount, podTemplate)
  1294. }
  1295. }
  1296. // volumeFactory creates an unique PersistentVolume for given integer.
  1297. type volumeFactory func(uniqueID int) *v1.PersistentVolume
  1298. func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
  1299. return func(client clientset.Interface, namespace string, podCount int) error {
  1300. return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */)
  1301. }
  1302. }
  1303. func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
  1304. return &v1.PersistentVolumeClaim{
  1305. Spec: v1.PersistentVolumeClaimSpec{
  1306. AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
  1307. StorageClassName: &storageClass,
  1308. Resources: v1.ResourceRequirements{
  1309. Requests: v1.ResourceList{
  1310. v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
  1311. },
  1312. },
  1313. },
  1314. }
  1315. }
  1316. func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
  1317. return func(client clientset.Interface, namespace string, podCount int) error {
  1318. volumeBindingMode := storage.VolumeBindingWaitForFirstConsumer
  1319. storageClass := &storage.StorageClass{
  1320. ObjectMeta: metav1.ObjectMeta{
  1321. Name: "storage-class-1",
  1322. },
  1323. Provisioner: "kubernetes.io/gce-pd",
  1324. VolumeBindingMode: &volumeBindingMode,
  1325. }
  1326. claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)
  1327. if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
  1328. return fmt.Errorf("failed to create storage class: %v", err)
  1329. }
  1330. factoryWithStorageClass := func(i int) *v1.PersistentVolume {
  1331. pv := factory(i)
  1332. pv.Spec.StorageClassName = storageClass.Name
  1333. return pv
  1334. }
  1335. return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */)
  1336. }
  1337. }
  1338. func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
  1339. basePod := &v1.Pod{
  1340. ObjectMeta: metav1.ObjectMeta{
  1341. GenerateName: "simple-pod-",
  1342. },
  1343. Spec: MakePodSpec(),
  1344. }
  1345. return NewCustomCreatePodStrategy(basePod)
  1346. }
  1347. func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
  1348. return func(client clientset.Interface, namespace string, podCount int) error {
  1349. basePod := &v1.Pod{
  1350. ObjectMeta: metav1.ObjectMeta{
  1351. GenerateName: controllerName + "-pod-",
  1352. Labels: map[string]string{"name": controllerName},
  1353. },
  1354. Spec: MakePodSpec(),
  1355. }
  1356. if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
  1357. return err
  1358. }
  1359. return CreatePod(client, namespace, podCount, basePod)
  1360. }
  1361. }
  1362. type SecretConfig struct {
  1363. Content map[string]string
  1364. Client clientset.Interface
  1365. Name string
  1366. Namespace string
  1367. // If set this function will be used to print log lines instead of klog.
  1368. LogFunc func(fmt string, args ...interface{})
  1369. }
  1370. func (config *SecretConfig) Run() error {
  1371. secret := &v1.Secret{
  1372. ObjectMeta: metav1.ObjectMeta{
  1373. Name: config.Name,
  1374. },
  1375. StringData: map[string]string{},
  1376. }
  1377. for k, v := range config.Content {
  1378. secret.StringData[k] = v
  1379. }
  1380. if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
  1381. return fmt.Errorf("Error creating secret: %v", err)
  1382. }
  1383. config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
  1384. return nil
  1385. }
  1386. func (config *SecretConfig) Stop() error {
  1387. if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1388. return fmt.Errorf("Error deleting secret: %v", err)
  1389. }
  1390. config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
  1391. return nil
  1392. }
  1393. // TODO: attach secrets using different possibilities: env vars, image pull secrets.
  1394. func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
  1395. volumes := make([]v1.Volume, 0, len(secretNames))
  1396. mounts := make([]v1.VolumeMount, 0, len(secretNames))
  1397. for _, name := range secretNames {
  1398. volumes = append(volumes, v1.Volume{
  1399. Name: name,
  1400. VolumeSource: v1.VolumeSource{
  1401. Secret: &v1.SecretVolumeSource{
  1402. SecretName: name,
  1403. },
  1404. },
  1405. })
  1406. mounts = append(mounts, v1.VolumeMount{
  1407. Name: name,
  1408. MountPath: fmt.Sprintf("/%v", name),
  1409. })
  1410. }
  1411. template.Spec.Volumes = volumes
  1412. template.Spec.Containers[0].VolumeMounts = mounts
  1413. }
  1414. type ConfigMapConfig struct {
  1415. Content map[string]string
  1416. Client clientset.Interface
  1417. Name string
  1418. Namespace string
  1419. // If set this function will be used to print log lines instead of klog.
  1420. LogFunc func(fmt string, args ...interface{})
  1421. }
  1422. func (config *ConfigMapConfig) Run() error {
  1423. configMap := &v1.ConfigMap{
  1424. ObjectMeta: metav1.ObjectMeta{
  1425. Name: config.Name,
  1426. },
  1427. Data: map[string]string{},
  1428. }
  1429. for k, v := range config.Content {
  1430. configMap.Data[k] = v
  1431. }
  1432. if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
  1433. return fmt.Errorf("Error creating configmap: %v", err)
  1434. }
  1435. config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
  1436. return nil
  1437. }
  1438. func (config *ConfigMapConfig) Stop() error {
  1439. if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1440. return fmt.Errorf("Error deleting configmap: %v", err)
  1441. }
  1442. config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
  1443. return nil
  1444. }
  1445. // TODO: attach configmaps using different possibilities: env vars.
  1446. func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
  1447. volumes := make([]v1.Volume, 0, len(configMapNames))
  1448. mounts := make([]v1.VolumeMount, 0, len(configMapNames))
  1449. for _, name := range configMapNames {
  1450. volumes = append(volumes, v1.Volume{
  1451. Name: name,
  1452. VolumeSource: v1.VolumeSource{
  1453. ConfigMap: &v1.ConfigMapVolumeSource{
  1454. LocalObjectReference: v1.LocalObjectReference{
  1455. Name: name,
  1456. },
  1457. },
  1458. },
  1459. })
  1460. mounts = append(mounts, v1.VolumeMount{
  1461. Name: name,
  1462. MountPath: fmt.Sprintf("/%v", name),
  1463. })
  1464. }
  1465. template.Spec.Volumes = volumes
  1466. template.Spec.Containers[0].VolumeMounts = mounts
  1467. }
  1468. func (config *RCConfig) getTerminationGracePeriodSeconds(defaultGrace *int64) *int64 {
  1469. if config.TerminationGracePeriodSeconds == nil || *config.TerminationGracePeriodSeconds < 0 {
  1470. return defaultGrace
  1471. }
  1472. return config.TerminationGracePeriodSeconds
  1473. }
  1474. func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
  1475. template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
  1476. v1.VolumeMount{
  1477. Name: name,
  1478. MountPath: "/var/service-account-tokens/" + name,
  1479. })
  1480. template.Spec.Volumes = append(template.Spec.Volumes,
  1481. v1.Volume{
  1482. Name: name,
  1483. VolumeSource: v1.VolumeSource{
  1484. Projected: &v1.ProjectedVolumeSource{
  1485. Sources: []v1.VolumeProjection{
  1486. {
  1487. ServiceAccountToken: &v1.ServiceAccountTokenProjection{
  1488. Path: "token",
  1489. Audience: name,
  1490. },
  1491. },
  1492. {
  1493. ConfigMap: &v1.ConfigMapProjection{
  1494. LocalObjectReference: v1.LocalObjectReference{
  1495. Name: "kube-root-ca-crt",
  1496. },
  1497. Items: []v1.KeyToPath{
  1498. {
  1499. Key: "ca.crt",
  1500. Path: "ca.crt",
  1501. },
  1502. },
  1503. },
  1504. },
  1505. {
  1506. DownwardAPI: &v1.DownwardAPIProjection{
  1507. Items: []v1.DownwardAPIVolumeFile{
  1508. {
  1509. Path: "namespace",
  1510. FieldRef: &v1.ObjectFieldSelector{
  1511. APIVersion: "v1",
  1512. FieldPath: "metadata.namespace",
  1513. },
  1514. },
  1515. },
  1516. },
  1517. },
  1518. },
  1519. },
  1520. },
  1521. })
  1522. }
  1523. type DaemonConfig struct {
  1524. Client clientset.Interface
  1525. Name string
  1526. Namespace string
  1527. Image string
  1528. // If set this function will be used to print log lines instead of klog.
  1529. LogFunc func(fmt string, args ...interface{})
  1530. // How long we wait for DaemonSet to become running.
  1531. Timeout time.Duration
  1532. }
  1533. func (config *DaemonConfig) Run() error {
  1534. if config.Image == "" {
  1535. config.Image = "k8s.gcr.io/pause:3.2"
  1536. }
  1537. nameLabel := map[string]string{
  1538. "name": config.Name + "-daemon",
  1539. }
  1540. daemon := &apps.DaemonSet{
  1541. ObjectMeta: metav1.ObjectMeta{
  1542. Name: config.Name,
  1543. },
  1544. Spec: apps.DaemonSetSpec{
  1545. Template: v1.PodTemplateSpec{
  1546. ObjectMeta: metav1.ObjectMeta{
  1547. Labels: nameLabel,
  1548. },
  1549. Spec: v1.PodSpec{
  1550. Containers: []v1.Container{
  1551. {
  1552. Name: config.Name,
  1553. Image: config.Image,
  1554. },
  1555. },
  1556. },
  1557. },
  1558. },
  1559. }
  1560. if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
  1561. return fmt.Errorf("Error creating daemonset: %v", err)
  1562. }
  1563. var nodes *v1.NodeList
  1564. var err error
  1565. for i := 0; i < retries; i++ {
  1566. // Wait for all daemons to be running
  1567. nodes, err = config.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
  1568. if err == nil {
  1569. break
  1570. } else if i+1 == retries {
  1571. return fmt.Errorf("Error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
  1572. }
  1573. }
  1574. timeout := config.Timeout
  1575. if timeout <= 0 {
  1576. timeout = 5 * time.Minute
  1577. }
  1578. ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
  1579. if err != nil {
  1580. return err
  1581. }
  1582. defer ps.Stop()
  1583. err = wait.Poll(time.Second, timeout, func() (bool, error) {
  1584. pods := ps.List()
  1585. nodeHasDaemon := sets.NewString()
  1586. for _, pod := range pods {
  1587. podReady, _ := PodRunningReady(pod)
  1588. if pod.Spec.NodeName != "" && podReady {
  1589. nodeHasDaemon.Insert(pod.Spec.NodeName)
  1590. }
  1591. }
  1592. running := len(nodeHasDaemon)
  1593. config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
  1594. return running == len(nodes.Items), nil
  1595. })
  1596. if err != nil {
  1597. config.LogFunc("Timed out while waiting for DaemonSet %v/%v to be running.", config.Namespace, config.Name)
  1598. } else {
  1599. config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
  1600. }
  1601. return err
  1602. }