runners.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "os"
  19. "strings"
  20. "sync"
  21. "time"
  22. apps "k8s.io/api/apps/v1"
  23. batch "k8s.io/api/batch/v1"
  24. v1 "k8s.io/api/core/v1"
  25. apiequality "k8s.io/apimachinery/pkg/api/equality"
  26. apierrs "k8s.io/apimachinery/pkg/api/errors"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/fields"
  30. "k8s.io/apimachinery/pkg/labels"
  31. "k8s.io/apimachinery/pkg/runtime/schema"
  32. "k8s.io/apimachinery/pkg/types"
  33. "k8s.io/apimachinery/pkg/util/sets"
  34. "k8s.io/apimachinery/pkg/util/uuid"
  35. "k8s.io/apimachinery/pkg/util/wait"
  36. clientset "k8s.io/client-go/kubernetes"
  37. scaleclient "k8s.io/client-go/scale"
  38. "k8s.io/client-go/util/workqueue"
  39. batchinternal "k8s.io/kubernetes/pkg/apis/batch"
  40. api "k8s.io/kubernetes/pkg/apis/core"
  41. extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
  42. "k8s.io/klog"
  43. )
  44. const (
  45. // String used to mark pod deletion
  46. nonExist = "NonExist"
  47. )
  48. func removePtr(replicas *int32) int32 {
  49. if replicas == nil {
  50. return 0
  51. }
  52. return *replicas
  53. }
  54. func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
  55. // Wait until it's scheduled
  56. p, err := c.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{ResourceVersion: "0"})
  57. if err == nil && p.Spec.NodeName != "" {
  58. return p, nil
  59. }
  60. pollingPeriod := 200 * time.Millisecond
  61. startTime := time.Now()
  62. for startTime.Add(timeout).After(time.Now()) {
  63. time.Sleep(pollingPeriod)
  64. p, err := c.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{ResourceVersion: "0"})
  65. if err == nil && p.Spec.NodeName != "" {
  66. return p, nil
  67. }
  68. }
  69. return nil, fmt.Errorf("Timed out after %v when waiting for pod %v/%v to start.", timeout, namespace, name)
  70. }
  71. func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
  72. name := pod.Name
  73. namespace := pod.Namespace
  74. if err := CreatePodWithRetries(c, namespace, pod); err != nil {
  75. return "", err
  76. }
  77. p, err := WaitUntilPodIsScheduled(c, name, namespace, timeout)
  78. if err != nil {
  79. return "", err
  80. }
  81. return p.Spec.NodeName, nil
  82. }
  83. type RunObjectConfig interface {
  84. Run() error
  85. GetName() string
  86. GetNamespace() string
  87. GetKind() schema.GroupKind
  88. GetClient() clientset.Interface
  89. GetScalesGetter() scaleclient.ScalesGetter
  90. SetClient(clientset.Interface)
  91. SetScalesClient(scaleclient.ScalesGetter)
  92. GetReplicas() int
  93. GetLabelValue(string) (string, bool)
  94. GetGroupResource() schema.GroupResource
  95. }
  96. type RCConfig struct {
  97. Affinity *v1.Affinity
  98. Client clientset.Interface
  99. ScalesGetter scaleclient.ScalesGetter
  100. Image string
  101. Command []string
  102. Name string
  103. Namespace string
  104. PollInterval time.Duration
  105. Timeout time.Duration
  106. PodStatusFile *os.File
  107. Replicas int
  108. CpuRequest int64 // millicores
  109. CpuLimit int64 // millicores
  110. MemRequest int64 // bytes
  111. MemLimit int64 // bytes
  112. GpuLimit int64 // count
  113. ReadinessProbe *v1.Probe
  114. DNSPolicy *v1.DNSPolicy
  115. PriorityClassName string
  116. // Env vars, set the same for every pod.
  117. Env map[string]string
  118. // Extra labels and annotations added to every pod.
  119. Labels map[string]string
  120. Annotations map[string]string
  121. // Node selector for pods in the RC.
  122. NodeSelector map[string]string
  123. // Tolerations for pods in the RC.
  124. Tolerations []v1.Toleration
  125. // Ports to declare in the container (map of name to containerPort).
  126. Ports map[string]int
  127. // Ports to declare in the container as host and container ports.
  128. HostPorts map[string]int
  129. Volumes []v1.Volume
  130. VolumeMounts []v1.VolumeMount
  131. // Pointer to a list of pods; if non-nil, will be set to a list of pods
  132. // created by this RC by RunRC.
  133. CreatedPods *[]*v1.Pod
  134. // Maximum allowable container failures. If exceeded, RunRC returns an error.
  135. // Defaults to replicas*0.1 if unspecified.
  136. MaxContainerFailures *int
  137. // If set to false starting RC will print progress, otherwise only errors will be printed.
  138. Silent bool
  139. // If set this function will be used to print log lines instead of klog.
  140. LogFunc func(fmt string, args ...interface{})
  141. // If set those functions will be used to gather data from Nodes - in integration tests where no
  142. // kubelets are running those variables should be nil.
  143. NodeDumpFunc func(c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
  144. ContainerDumpFunc func(c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
  145. // Names of the secrets and configmaps to mount.
  146. SecretNames []string
  147. ConfigMapNames []string
  148. ServiceAccountTokenProjections int
  149. }
  150. func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
  151. if rc.LogFunc != nil {
  152. rc.LogFunc(fmt, args...)
  153. }
  154. klog.Infof(fmt, args...)
  155. }
  156. type DeploymentConfig struct {
  157. RCConfig
  158. }
  159. type ReplicaSetConfig struct {
  160. RCConfig
  161. }
  162. type JobConfig struct {
  163. RCConfig
  164. }
  165. // podInfo contains pod information useful for debugging e2e tests.
  166. type podInfo struct {
  167. oldHostname string
  168. oldPhase string
  169. hostname string
  170. phase string
  171. }
  172. // PodDiff is a map of pod name to podInfos
  173. type PodDiff map[string]*podInfo
  174. // Print formats and prints the give PodDiff.
  175. func (p PodDiff) String(ignorePhases sets.String) string {
  176. ret := ""
  177. for name, info := range p {
  178. if ignorePhases.Has(info.phase) {
  179. continue
  180. }
  181. if info.phase == nonExist {
  182. ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
  183. continue
  184. }
  185. phaseChange, hostChange := false, false
  186. msg := fmt.Sprintf("Pod %v ", name)
  187. if info.oldPhase != info.phase {
  188. phaseChange = true
  189. if info.oldPhase == nonExist {
  190. msg += fmt.Sprintf("in phase %v ", info.phase)
  191. } else {
  192. msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
  193. }
  194. }
  195. if info.oldHostname != info.hostname {
  196. hostChange = true
  197. if info.oldHostname == nonExist || info.oldHostname == "" {
  198. msg += fmt.Sprintf("assigned host %v ", info.hostname)
  199. } else {
  200. msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
  201. }
  202. }
  203. if phaseChange || hostChange {
  204. ret += msg + "\n"
  205. }
  206. }
  207. return ret
  208. }
  209. // DeletedPods returns a slice of pods that were present at the beginning
  210. // and then disappeared.
  211. func (p PodDiff) DeletedPods() []string {
  212. var deletedPods []string
  213. for podName, podInfo := range p {
  214. if podInfo.hostname == nonExist {
  215. deletedPods = append(deletedPods, podName)
  216. }
  217. }
  218. return deletedPods
  219. }
  220. // Diff computes a PodDiff given 2 lists of pods.
  221. func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
  222. podInfoMap := PodDiff{}
  223. // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
  224. for _, pod := range curPods {
  225. podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
  226. }
  227. // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
  228. for _, pod := range oldPods {
  229. if info, ok := podInfoMap[pod.Name]; ok {
  230. info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
  231. } else {
  232. podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
  233. }
  234. }
  235. return podInfoMap
  236. }
  237. // RunDeployment Launches (and verifies correctness) of a Deployment
  238. // and will wait for all pods it spawns to become "Running".
  239. // It's the caller's responsibility to clean up externally (i.e. use the
  240. // namespace lifecycle for handling Cleanup).
  241. func RunDeployment(config DeploymentConfig) error {
  242. err := config.create()
  243. if err != nil {
  244. return err
  245. }
  246. return config.start()
  247. }
  248. func (config *DeploymentConfig) Run() error {
  249. return RunDeployment(*config)
  250. }
  251. func (config *DeploymentConfig) GetKind() schema.GroupKind {
  252. return extensionsinternal.Kind("Deployment")
  253. }
  254. func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
  255. return extensionsinternal.Resource("deployments")
  256. }
  257. func (config *DeploymentConfig) create() error {
  258. deployment := &apps.Deployment{
  259. ObjectMeta: metav1.ObjectMeta{
  260. Name: config.Name,
  261. },
  262. Spec: apps.DeploymentSpec{
  263. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  264. Selector: &metav1.LabelSelector{
  265. MatchLabels: map[string]string{
  266. "name": config.Name,
  267. },
  268. },
  269. Template: v1.PodTemplateSpec{
  270. ObjectMeta: metav1.ObjectMeta{
  271. Labels: map[string]string{"name": config.Name},
  272. Annotations: config.Annotations,
  273. },
  274. Spec: v1.PodSpec{
  275. Affinity: config.Affinity,
  276. Containers: []v1.Container{
  277. {
  278. Name: config.Name,
  279. Image: config.Image,
  280. Command: config.Command,
  281. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  282. },
  283. },
  284. },
  285. },
  286. },
  287. }
  288. if len(config.SecretNames) > 0 {
  289. attachSecrets(&deployment.Spec.Template, config.SecretNames)
  290. }
  291. if len(config.ConfigMapNames) > 0 {
  292. attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
  293. }
  294. for i := 0; i < config.ServiceAccountTokenProjections; i++ {
  295. attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
  296. }
  297. config.applyTo(&deployment.Spec.Template)
  298. if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
  299. return fmt.Errorf("Error creating deployment: %v", err)
  300. }
  301. config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
  302. return nil
  303. }
  304. // RunReplicaSet launches (and verifies correctness) of a ReplicaSet
  305. // and waits until all the pods it launches to reach the "Running" state.
  306. // It's the caller's responsibility to clean up externally (i.e. use the
  307. // namespace lifecycle for handling Cleanup).
  308. func RunReplicaSet(config ReplicaSetConfig) error {
  309. err := config.create()
  310. if err != nil {
  311. return err
  312. }
  313. return config.start()
  314. }
  315. func (config *ReplicaSetConfig) Run() error {
  316. return RunReplicaSet(*config)
  317. }
  318. func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
  319. return extensionsinternal.Kind("ReplicaSet")
  320. }
  321. func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
  322. return extensionsinternal.Resource("replicasets")
  323. }
  324. func (config *ReplicaSetConfig) create() error {
  325. rs := &apps.ReplicaSet{
  326. ObjectMeta: metav1.ObjectMeta{
  327. Name: config.Name,
  328. },
  329. Spec: apps.ReplicaSetSpec{
  330. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  331. Selector: &metav1.LabelSelector{
  332. MatchLabels: map[string]string{
  333. "name": config.Name,
  334. },
  335. },
  336. Template: v1.PodTemplateSpec{
  337. ObjectMeta: metav1.ObjectMeta{
  338. Labels: map[string]string{"name": config.Name},
  339. Annotations: config.Annotations,
  340. },
  341. Spec: v1.PodSpec{
  342. Affinity: config.Affinity,
  343. Containers: []v1.Container{
  344. {
  345. Name: config.Name,
  346. Image: config.Image,
  347. Command: config.Command,
  348. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  349. },
  350. },
  351. },
  352. },
  353. },
  354. }
  355. if len(config.SecretNames) > 0 {
  356. attachSecrets(&rs.Spec.Template, config.SecretNames)
  357. }
  358. if len(config.ConfigMapNames) > 0 {
  359. attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
  360. }
  361. config.applyTo(&rs.Spec.Template)
  362. if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
  363. return fmt.Errorf("Error creating replica set: %v", err)
  364. }
  365. config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
  366. return nil
  367. }
  368. // RunJob baunches (and verifies correctness) of a Job
  369. // and will wait for all pods it spawns to become "Running".
  370. // It's the caller's responsibility to clean up externally (i.e. use the
  371. // namespace lifecycle for handling Cleanup).
  372. func RunJob(config JobConfig) error {
  373. err := config.create()
  374. if err != nil {
  375. return err
  376. }
  377. return config.start()
  378. }
  379. func (config *JobConfig) Run() error {
  380. return RunJob(*config)
  381. }
  382. func (config *JobConfig) GetKind() schema.GroupKind {
  383. return batchinternal.Kind("Job")
  384. }
  385. func (config *JobConfig) GetGroupResource() schema.GroupResource {
  386. return batchinternal.Resource("jobs")
  387. }
  388. func (config *JobConfig) create() error {
  389. job := &batch.Job{
  390. ObjectMeta: metav1.ObjectMeta{
  391. Name: config.Name,
  392. },
  393. Spec: batch.JobSpec{
  394. Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  395. Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  396. Template: v1.PodTemplateSpec{
  397. ObjectMeta: metav1.ObjectMeta{
  398. Labels: map[string]string{"name": config.Name},
  399. Annotations: config.Annotations,
  400. },
  401. Spec: v1.PodSpec{
  402. Affinity: config.Affinity,
  403. Containers: []v1.Container{
  404. {
  405. Name: config.Name,
  406. Image: config.Image,
  407. Command: config.Command,
  408. },
  409. },
  410. RestartPolicy: v1.RestartPolicyOnFailure,
  411. },
  412. },
  413. },
  414. }
  415. if len(config.SecretNames) > 0 {
  416. attachSecrets(&job.Spec.Template, config.SecretNames)
  417. }
  418. if len(config.ConfigMapNames) > 0 {
  419. attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
  420. }
  421. config.applyTo(&job.Spec.Template)
  422. if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
  423. return fmt.Errorf("Error creating job: %v", err)
  424. }
  425. config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
  426. return nil
  427. }
  428. // RunRC Launches (and verifies correctness) of a Replication Controller
  429. // and will wait for all pods it spawns to become "Running".
  430. // It's the caller's responsibility to clean up externally (i.e. use the
  431. // namespace lifecycle for handling Cleanup).
  432. func RunRC(config RCConfig) error {
  433. err := config.create()
  434. if err != nil {
  435. return err
  436. }
  437. return config.start()
  438. }
  439. func (config *RCConfig) Run() error {
  440. return RunRC(*config)
  441. }
  442. func (config *RCConfig) GetName() string {
  443. return config.Name
  444. }
  445. func (config *RCConfig) GetNamespace() string {
  446. return config.Namespace
  447. }
  448. func (config *RCConfig) GetKind() schema.GroupKind {
  449. return api.Kind("ReplicationController")
  450. }
  451. func (config *RCConfig) GetGroupResource() schema.GroupResource {
  452. return api.Resource("replicationcontrollers")
  453. }
  454. func (config *RCConfig) GetClient() clientset.Interface {
  455. return config.Client
  456. }
  457. func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
  458. return config.ScalesGetter
  459. }
  460. func (config *RCConfig) SetClient(c clientset.Interface) {
  461. config.Client = c
  462. }
  463. func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
  464. config.ScalesGetter = getter
  465. }
  466. func (config *RCConfig) GetReplicas() int {
  467. return config.Replicas
  468. }
  469. func (config *RCConfig) GetLabelValue(key string) (string, bool) {
  470. value, found := config.Labels[key]
  471. return value, found
  472. }
  473. func (config *RCConfig) create() error {
  474. dnsDefault := v1.DNSDefault
  475. if config.DNSPolicy == nil {
  476. config.DNSPolicy = &dnsDefault
  477. }
  478. one := int64(1)
  479. rc := &v1.ReplicationController{
  480. ObjectMeta: metav1.ObjectMeta{
  481. Name: config.Name,
  482. },
  483. Spec: v1.ReplicationControllerSpec{
  484. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  485. Selector: map[string]string{
  486. "name": config.Name,
  487. },
  488. Template: &v1.PodTemplateSpec{
  489. ObjectMeta: metav1.ObjectMeta{
  490. Labels: map[string]string{"name": config.Name},
  491. Annotations: config.Annotations,
  492. },
  493. Spec: v1.PodSpec{
  494. Affinity: config.Affinity,
  495. Containers: []v1.Container{
  496. {
  497. Name: config.Name,
  498. Image: config.Image,
  499. Command: config.Command,
  500. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  501. ReadinessProbe: config.ReadinessProbe,
  502. },
  503. },
  504. DNSPolicy: *config.DNSPolicy,
  505. NodeSelector: config.NodeSelector,
  506. Tolerations: config.Tolerations,
  507. TerminationGracePeriodSeconds: &one,
  508. PriorityClassName: config.PriorityClassName,
  509. },
  510. },
  511. },
  512. }
  513. if len(config.SecretNames) > 0 {
  514. attachSecrets(rc.Spec.Template, config.SecretNames)
  515. }
  516. if len(config.ConfigMapNames) > 0 {
  517. attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
  518. }
  519. config.applyTo(rc.Spec.Template)
  520. if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
  521. return fmt.Errorf("Error creating replication controller: %v", err)
  522. }
  523. config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
  524. return nil
  525. }
  526. func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
  527. if config.Env != nil {
  528. for k, v := range config.Env {
  529. c := &template.Spec.Containers[0]
  530. c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
  531. }
  532. }
  533. if config.Labels != nil {
  534. for k, v := range config.Labels {
  535. template.ObjectMeta.Labels[k] = v
  536. }
  537. }
  538. if config.NodeSelector != nil {
  539. template.Spec.NodeSelector = make(map[string]string)
  540. for k, v := range config.NodeSelector {
  541. template.Spec.NodeSelector[k] = v
  542. }
  543. }
  544. if config.Tolerations != nil {
  545. template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
  546. }
  547. if config.Ports != nil {
  548. for k, v := range config.Ports {
  549. c := &template.Spec.Containers[0]
  550. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
  551. }
  552. }
  553. if config.HostPorts != nil {
  554. for k, v := range config.HostPorts {
  555. c := &template.Spec.Containers[0]
  556. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
  557. }
  558. }
  559. if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
  560. template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
  561. }
  562. if config.CpuLimit > 0 {
  563. template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
  564. }
  565. if config.MemLimit > 0 {
  566. template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
  567. }
  568. if config.CpuRequest > 0 || config.MemRequest > 0 {
  569. template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
  570. }
  571. if config.CpuRequest > 0 {
  572. template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
  573. }
  574. if config.MemRequest > 0 {
  575. template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
  576. }
  577. if config.GpuLimit > 0 {
  578. template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
  579. }
  580. if len(config.Volumes) > 0 {
  581. template.Spec.Volumes = config.Volumes
  582. }
  583. if len(config.VolumeMounts) > 0 {
  584. template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
  585. }
  586. if config.PriorityClassName != "" {
  587. template.Spec.PriorityClassName = config.PriorityClassName
  588. }
  589. }
  590. type RCStartupStatus struct {
  591. Expected int
  592. Terminating int
  593. Running int
  594. RunningButNotReady int
  595. Waiting int
  596. Pending int
  597. Scheduled int
  598. Unknown int
  599. Inactive int
  600. FailedContainers int
  601. Created []*v1.Pod
  602. ContainerRestartNodes sets.String
  603. }
  604. func (s *RCStartupStatus) String(name string) string {
  605. return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
  606. name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
  607. }
  608. func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
  609. startupStatus := RCStartupStatus{
  610. Expected: expected,
  611. Created: make([]*v1.Pod, 0, expected),
  612. ContainerRestartNodes: sets.NewString(),
  613. }
  614. for _, p := range pods {
  615. if p.DeletionTimestamp != nil {
  616. startupStatus.Terminating++
  617. continue
  618. }
  619. startupStatus.Created = append(startupStatus.Created, p)
  620. if p.Status.Phase == v1.PodRunning {
  621. ready := false
  622. for _, c := range p.Status.Conditions {
  623. if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
  624. ready = true
  625. break
  626. }
  627. }
  628. if ready {
  629. // Only count a pod is running when it is also ready.
  630. startupStatus.Running++
  631. } else {
  632. startupStatus.RunningButNotReady++
  633. }
  634. for _, v := range FailedContainers(p) {
  635. startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
  636. startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
  637. }
  638. } else if p.Status.Phase == v1.PodPending {
  639. if p.Spec.NodeName == "" {
  640. startupStatus.Waiting++
  641. } else {
  642. startupStatus.Pending++
  643. }
  644. } else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
  645. startupStatus.Inactive++
  646. } else if p.Status.Phase == v1.PodUnknown {
  647. startupStatus.Unknown++
  648. }
  649. // Record count of scheduled pods (useful for computing scheduler throughput).
  650. if p.Spec.NodeName != "" {
  651. startupStatus.Scheduled++
  652. }
  653. }
  654. return startupStatus
  655. }
  656. func (config *RCConfig) start() error {
  657. // Don't force tests to fail if they don't care about containers restarting.
  658. var maxContainerFailures int
  659. if config.MaxContainerFailures == nil {
  660. maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
  661. } else {
  662. maxContainerFailures = *config.MaxContainerFailures
  663. }
  664. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
  665. ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
  666. if err != nil {
  667. return err
  668. }
  669. defer ps.Stop()
  670. interval := config.PollInterval
  671. if interval <= 0 {
  672. interval = 10 * time.Second
  673. }
  674. timeout := config.Timeout
  675. if timeout <= 0 {
  676. timeout = 5 * time.Minute
  677. }
  678. oldPods := make([]*v1.Pod, 0)
  679. oldRunning := 0
  680. lastChange := time.Now()
  681. for oldRunning != config.Replicas {
  682. time.Sleep(interval)
  683. pods := ps.List()
  684. startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
  685. if config.CreatedPods != nil {
  686. *config.CreatedPods = startupStatus.Created
  687. }
  688. if !config.Silent {
  689. config.RCConfigLog(startupStatus.String(config.Name))
  690. }
  691. if config.PodStatusFile != nil {
  692. fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
  693. }
  694. if startupStatus.FailedContainers > maxContainerFailures {
  695. if config.NodeDumpFunc != nil {
  696. config.NodeDumpFunc(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
  697. }
  698. if config.ContainerDumpFunc != nil {
  699. // Get the logs from the failed containers to help diagnose what caused them to fail
  700. config.ContainerDumpFunc(config.Client, config.Namespace, config.RCConfigLog)
  701. }
  702. return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
  703. }
  704. diff := Diff(oldPods, pods)
  705. deletedPods := diff.DeletedPods()
  706. if len(deletedPods) != 0 {
  707. // There are some pods that have disappeared.
  708. err := fmt.Errorf("%d pods disappeared for %s: %v", len(deletedPods), config.Name, strings.Join(deletedPods, ", "))
  709. config.RCConfigLog(err.Error())
  710. config.RCConfigLog(diff.String(sets.NewString()))
  711. return err
  712. }
  713. if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
  714. lastChange = time.Now()
  715. }
  716. oldPods = pods
  717. oldRunning = startupStatus.Running
  718. if time.Since(lastChange) > timeout {
  719. break
  720. }
  721. }
  722. if oldRunning != config.Replicas {
  723. // List only pods from a given replication controller.
  724. options := metav1.ListOptions{LabelSelector: label.String()}
  725. if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(options); err == nil {
  726. for _, pod := range pods.Items {
  727. config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
  728. }
  729. } else {
  730. config.RCConfigLog("Can't list pod debug info: %v", err)
  731. }
  732. return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
  733. }
  734. return nil
  735. }
  736. // Simplified version of RunRC, that does not create RC, but creates plain Pods.
  737. // Optionally waits for pods to start running (if waitForRunning == true).
  738. // The number of replicas must be non-zero.
  739. func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
  740. pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
  741. // no pod to start
  742. if replicas < 1 {
  743. panic("StartPods: number of replicas must be non-zero")
  744. }
  745. startPodsID := string(uuid.NewUUID()) // So that we can label and find them
  746. for i := 0; i < replicas; i++ {
  747. podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
  748. pod.ObjectMeta.Name = podName
  749. pod.ObjectMeta.Labels["name"] = podName
  750. pod.ObjectMeta.Labels["startPodsID"] = startPodsID
  751. pod.Spec.Containers[0].Name = podName
  752. if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
  753. return err
  754. }
  755. }
  756. logFunc("Waiting for running...")
  757. if waitForRunning {
  758. label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
  759. err := WaitForPodsWithLabelRunning(c, namespace, label)
  760. if err != nil {
  761. return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
  762. }
  763. }
  764. return nil
  765. }
  766. // Wait up to 10 minutes for all matching pods to become Running and at least one
  767. // matching pod exists.
  768. func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
  769. return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
  770. }
  771. // Wait up to 10 minutes for at least 'replicas' many pods to be Running and at least
  772. // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
  773. func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
  774. running := false
  775. ps, err := NewPodStore(c, ns, label, fields.Everything())
  776. if err != nil {
  777. return err
  778. }
  779. defer ps.Stop()
  780. for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
  781. pods := ps.List()
  782. if len(pods) == 0 {
  783. continue
  784. }
  785. runningPodsCount := 0
  786. for _, p := range pods {
  787. if p.Status.Phase == v1.PodRunning {
  788. runningPodsCount++
  789. }
  790. }
  791. if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
  792. continue
  793. }
  794. running = true
  795. break
  796. }
  797. if !running {
  798. return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
  799. }
  800. return nil
  801. }
  802. type CountToStrategy struct {
  803. Count int
  804. Strategy PrepareNodeStrategy
  805. }
  806. type TestNodePreparer interface {
  807. PrepareNodes() error
  808. CleanupNodes() error
  809. }
  810. type PrepareNodeStrategy interface {
  811. PreparePatch(node *v1.Node) []byte
  812. CleanupNode(node *v1.Node) *v1.Node
  813. }
  814. type TrivialNodePrepareStrategy struct{}
  815. func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  816. return []byte{}
  817. }
  818. func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  819. nodeCopy := *node
  820. return &nodeCopy
  821. }
  822. type LabelNodePrepareStrategy struct {
  823. labelKey string
  824. labelValue string
  825. }
  826. func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy {
  827. return &LabelNodePrepareStrategy{
  828. labelKey: labelKey,
  829. labelValue: labelValue,
  830. }
  831. }
  832. func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  833. labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.labelKey, s.labelValue)
  834. patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  835. return []byte(patch)
  836. }
  837. func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  838. nodeCopy := node.DeepCopy()
  839. if node.Labels != nil && len(node.Labels[s.labelKey]) != 0 {
  840. delete(nodeCopy.Labels, s.labelKey)
  841. }
  842. return nodeCopy
  843. }
  844. func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
  845. var err error
  846. patch := strategy.PreparePatch(node)
  847. if len(patch) == 0 {
  848. return nil
  849. }
  850. for attempt := 0; attempt < retries; attempt++ {
  851. if _, err = client.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patch)); err == nil {
  852. return nil
  853. }
  854. if !apierrs.IsConflict(err) {
  855. return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
  856. }
  857. time.Sleep(100 * time.Millisecond)
  858. }
  859. return fmt.Errorf("To many conflicts when applying patch %v to Node %v", string(patch), node.Name)
  860. }
  861. func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
  862. for attempt := 0; attempt < retries; attempt++ {
  863. node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
  864. if err != nil {
  865. return fmt.Errorf("Skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
  866. }
  867. updatedNode := strategy.CleanupNode(node)
  868. if apiequality.Semantic.DeepEqual(node, updatedNode) {
  869. return nil
  870. }
  871. if _, err = client.CoreV1().Nodes().Update(updatedNode); err == nil {
  872. return nil
  873. }
  874. if !apierrs.IsConflict(err) {
  875. return fmt.Errorf("Error when updating Node %v: %v", nodeName, err)
  876. }
  877. time.Sleep(100 * time.Millisecond)
  878. }
  879. return fmt.Errorf("To many conflicts when trying to cleanup Node %v", nodeName)
  880. }
  881. type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
  882. type CountToPodStrategy struct {
  883. Count int
  884. Strategy TestPodCreateStrategy
  885. }
  886. type TestPodCreatorConfig map[string][]CountToPodStrategy
  887. func NewTestPodCreatorConfig() *TestPodCreatorConfig {
  888. config := make(TestPodCreatorConfig)
  889. return &config
  890. }
  891. func (c *TestPodCreatorConfig) AddStrategy(
  892. namespace string, podCount int, strategy TestPodCreateStrategy) {
  893. (*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
  894. }
  895. type TestPodCreator struct {
  896. Client clientset.Interface
  897. // namespace -> count -> strategy
  898. Config *TestPodCreatorConfig
  899. }
  900. func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
  901. return &TestPodCreator{
  902. Client: client,
  903. Config: config,
  904. }
  905. }
  906. func (c *TestPodCreator) CreatePods() error {
  907. for ns, v := range *(c.Config) {
  908. for _, countToStrategy := range v {
  909. if err := countToStrategy.Strategy(c.Client, ns, countToStrategy.Count); err != nil {
  910. return err
  911. }
  912. }
  913. }
  914. return nil
  915. }
  916. func MakePodSpec() v1.PodSpec {
  917. return v1.PodSpec{
  918. Containers: []v1.Container{{
  919. Name: "pause",
  920. Image: "k8s.gcr.io/pause:3.1",
  921. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  922. Resources: v1.ResourceRequirements{
  923. Limits: v1.ResourceList{
  924. v1.ResourceCPU: resource.MustParse("100m"),
  925. v1.ResourceMemory: resource.MustParse("500Mi"),
  926. },
  927. Requests: v1.ResourceList{
  928. v1.ResourceCPU: resource.MustParse("100m"),
  929. v1.ResourceMemory: resource.MustParse("500Mi"),
  930. },
  931. },
  932. }},
  933. }
  934. }
  935. func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
  936. if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
  937. return fmt.Errorf("Error creating pod: %v", err)
  938. }
  939. return nil
  940. }
  941. func CreatePod(client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
  942. var createError error
  943. lock := sync.Mutex{}
  944. createPodFunc := func(i int) {
  945. if err := makeCreatePod(client, namespace, podTemplate); err != nil {
  946. lock.Lock()
  947. defer lock.Unlock()
  948. createError = err
  949. }
  950. }
  951. if podCount < 30 {
  952. workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
  953. } else {
  954. workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
  955. }
  956. return createError
  957. }
  958. func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
  959. rc := &v1.ReplicationController{
  960. ObjectMeta: metav1.ObjectMeta{
  961. Name: controllerName,
  962. },
  963. Spec: v1.ReplicationControllerSpec{
  964. Replicas: func(i int) *int32 { x := int32(i); return &x }(podCount),
  965. Selector: map[string]string{"name": controllerName},
  966. Template: &v1.PodTemplateSpec{
  967. ObjectMeta: metav1.ObjectMeta{
  968. Labels: map[string]string{"name": controllerName},
  969. },
  970. Spec: podTemplate.Spec,
  971. },
  972. },
  973. }
  974. if err := CreateRCWithRetries(client, namespace, rc); err != nil {
  975. return fmt.Errorf("Error creating replication controller: %v", err)
  976. }
  977. return nil
  978. }
  979. func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
  980. return func(client clientset.Interface, namespace string, podCount int) error {
  981. return CreatePod(client, namespace, podCount, podTemplate)
  982. }
  983. }
  984. func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
  985. basePod := &v1.Pod{
  986. ObjectMeta: metav1.ObjectMeta{
  987. GenerateName: "simple-pod-",
  988. },
  989. Spec: MakePodSpec(),
  990. }
  991. return NewCustomCreatePodStrategy(basePod)
  992. }
  993. func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
  994. return func(client clientset.Interface, namespace string, podCount int) error {
  995. basePod := &v1.Pod{
  996. ObjectMeta: metav1.ObjectMeta{
  997. GenerateName: controllerName + "-pod-",
  998. Labels: map[string]string{"name": controllerName},
  999. },
  1000. Spec: MakePodSpec(),
  1001. }
  1002. if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
  1003. return err
  1004. }
  1005. return CreatePod(client, namespace, podCount, basePod)
  1006. }
  1007. }
  1008. type SecretConfig struct {
  1009. Content map[string]string
  1010. Client clientset.Interface
  1011. Name string
  1012. Namespace string
  1013. // If set this function will be used to print log lines instead of klog.
  1014. LogFunc func(fmt string, args ...interface{})
  1015. }
  1016. func (config *SecretConfig) Run() error {
  1017. secret := &v1.Secret{
  1018. ObjectMeta: metav1.ObjectMeta{
  1019. Name: config.Name,
  1020. },
  1021. StringData: map[string]string{},
  1022. }
  1023. for k, v := range config.Content {
  1024. secret.StringData[k] = v
  1025. }
  1026. if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
  1027. return fmt.Errorf("Error creating secret: %v", err)
  1028. }
  1029. config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
  1030. return nil
  1031. }
  1032. func (config *SecretConfig) Stop() error {
  1033. if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1034. return fmt.Errorf("Error deleting secret: %v", err)
  1035. }
  1036. config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
  1037. return nil
  1038. }
  1039. // TODO: attach secrets using different possibilities: env vars, image pull secrets.
  1040. func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
  1041. volumes := make([]v1.Volume, 0, len(secretNames))
  1042. mounts := make([]v1.VolumeMount, 0, len(secretNames))
  1043. for _, name := range secretNames {
  1044. volumes = append(volumes, v1.Volume{
  1045. Name: name,
  1046. VolumeSource: v1.VolumeSource{
  1047. Secret: &v1.SecretVolumeSource{
  1048. SecretName: name,
  1049. },
  1050. },
  1051. })
  1052. mounts = append(mounts, v1.VolumeMount{
  1053. Name: name,
  1054. MountPath: fmt.Sprintf("/%v", name),
  1055. })
  1056. }
  1057. template.Spec.Volumes = volumes
  1058. template.Spec.Containers[0].VolumeMounts = mounts
  1059. }
  1060. type ConfigMapConfig struct {
  1061. Content map[string]string
  1062. Client clientset.Interface
  1063. Name string
  1064. Namespace string
  1065. // If set this function will be used to print log lines instead of klog.
  1066. LogFunc func(fmt string, args ...interface{})
  1067. }
  1068. func (config *ConfigMapConfig) Run() error {
  1069. configMap := &v1.ConfigMap{
  1070. ObjectMeta: metav1.ObjectMeta{
  1071. Name: config.Name,
  1072. },
  1073. Data: map[string]string{},
  1074. }
  1075. for k, v := range config.Content {
  1076. configMap.Data[k] = v
  1077. }
  1078. if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
  1079. return fmt.Errorf("Error creating configmap: %v", err)
  1080. }
  1081. config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
  1082. return nil
  1083. }
  1084. func (config *ConfigMapConfig) Stop() error {
  1085. if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1086. return fmt.Errorf("Error deleting configmap: %v", err)
  1087. }
  1088. config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
  1089. return nil
  1090. }
  1091. // TODO: attach configmaps using different possibilities: env vars.
  1092. func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
  1093. volumes := make([]v1.Volume, 0, len(configMapNames))
  1094. mounts := make([]v1.VolumeMount, 0, len(configMapNames))
  1095. for _, name := range configMapNames {
  1096. volumes = append(volumes, v1.Volume{
  1097. Name: name,
  1098. VolumeSource: v1.VolumeSource{
  1099. ConfigMap: &v1.ConfigMapVolumeSource{
  1100. LocalObjectReference: v1.LocalObjectReference{
  1101. Name: name,
  1102. },
  1103. },
  1104. },
  1105. })
  1106. mounts = append(mounts, v1.VolumeMount{
  1107. Name: name,
  1108. MountPath: fmt.Sprintf("/%v", name),
  1109. })
  1110. }
  1111. template.Spec.Volumes = volumes
  1112. template.Spec.Containers[0].VolumeMounts = mounts
  1113. }
  1114. func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
  1115. template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
  1116. v1.VolumeMount{
  1117. Name: name,
  1118. MountPath: "/var/service-account-tokens/" + name,
  1119. })
  1120. template.Spec.Volumes = append(template.Spec.Volumes,
  1121. v1.Volume{
  1122. Name: name,
  1123. VolumeSource: v1.VolumeSource{
  1124. Projected: &v1.ProjectedVolumeSource{
  1125. Sources: []v1.VolumeProjection{
  1126. {
  1127. ServiceAccountToken: &v1.ServiceAccountTokenProjection{
  1128. Path: "token",
  1129. Audience: name,
  1130. },
  1131. },
  1132. {
  1133. ConfigMap: &v1.ConfigMapProjection{
  1134. LocalObjectReference: v1.LocalObjectReference{
  1135. Name: "kube-root-ca-crt",
  1136. },
  1137. Items: []v1.KeyToPath{
  1138. {
  1139. Key: "ca.crt",
  1140. Path: "ca.crt",
  1141. },
  1142. },
  1143. },
  1144. },
  1145. {
  1146. DownwardAPI: &v1.DownwardAPIProjection{
  1147. Items: []v1.DownwardAPIVolumeFile{
  1148. {
  1149. Path: "namespace",
  1150. FieldRef: &v1.ObjectFieldSelector{
  1151. APIVersion: "v1",
  1152. FieldPath: "metadata.namespace",
  1153. },
  1154. },
  1155. },
  1156. },
  1157. },
  1158. },
  1159. },
  1160. },
  1161. })
  1162. }
  1163. type DaemonConfig struct {
  1164. Client clientset.Interface
  1165. Name string
  1166. Namespace string
  1167. Image string
  1168. // If set this function will be used to print log lines instead of klog.
  1169. LogFunc func(fmt string, args ...interface{})
  1170. // How long we wait for DaemonSet to become running.
  1171. Timeout time.Duration
  1172. }
  1173. func (config *DaemonConfig) Run() error {
  1174. if config.Image == "" {
  1175. config.Image = "k8s.gcr.io/pause:3.1"
  1176. }
  1177. nameLabel := map[string]string{
  1178. "name": config.Name + "-daemon",
  1179. }
  1180. daemon := &apps.DaemonSet{
  1181. ObjectMeta: metav1.ObjectMeta{
  1182. Name: config.Name,
  1183. },
  1184. Spec: apps.DaemonSetSpec{
  1185. Template: v1.PodTemplateSpec{
  1186. ObjectMeta: metav1.ObjectMeta{
  1187. Labels: nameLabel,
  1188. },
  1189. Spec: v1.PodSpec{
  1190. Containers: []v1.Container{
  1191. {
  1192. Name: config.Name,
  1193. Image: config.Image,
  1194. },
  1195. },
  1196. },
  1197. },
  1198. },
  1199. }
  1200. if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
  1201. return fmt.Errorf("Error creating daemonset: %v", err)
  1202. }
  1203. var nodes *v1.NodeList
  1204. var err error
  1205. for i := 0; i < retries; i++ {
  1206. // Wait for all daemons to be running
  1207. nodes, err = config.Client.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
  1208. if err == nil {
  1209. break
  1210. } else if i+1 == retries {
  1211. return fmt.Errorf("Error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
  1212. }
  1213. }
  1214. timeout := config.Timeout
  1215. if timeout <= 0 {
  1216. timeout = 5 * time.Minute
  1217. }
  1218. ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
  1219. if err != nil {
  1220. return err
  1221. }
  1222. defer ps.Stop()
  1223. err = wait.Poll(time.Second, timeout, func() (bool, error) {
  1224. pods := ps.List()
  1225. nodeHasDaemon := sets.NewString()
  1226. for _, pod := range pods {
  1227. podReady, _ := PodRunningReady(pod)
  1228. if pod.Spec.NodeName != "" && podReady {
  1229. nodeHasDaemon.Insert(pod.Spec.NodeName)
  1230. }
  1231. }
  1232. running := len(nodeHasDaemon)
  1233. config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
  1234. return running == len(nodes.Items), nil
  1235. })
  1236. if err != nil {
  1237. config.LogFunc("Timed out while waiting for DaemonsSet %v/%v to be running.", config.Namespace, config.Name)
  1238. } else {
  1239. config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
  1240. }
  1241. return err
  1242. }