runners.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "os"
  19. "strings"
  20. "sync"
  21. "time"
  22. apps "k8s.io/api/apps/v1"
  23. batch "k8s.io/api/batch/v1"
  24. v1 "k8s.io/api/core/v1"
  25. storagev1beta1 "k8s.io/api/storage/v1beta1"
  26. apiequality "k8s.io/apimachinery/pkg/api/equality"
  27. apierrors "k8s.io/apimachinery/pkg/api/errors"
  28. "k8s.io/apimachinery/pkg/api/resource"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. "k8s.io/apimachinery/pkg/fields"
  31. "k8s.io/apimachinery/pkg/labels"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/types"
  34. "k8s.io/apimachinery/pkg/util/json"
  35. "k8s.io/apimachinery/pkg/util/sets"
  36. "k8s.io/apimachinery/pkg/util/strategicpatch"
  37. "k8s.io/apimachinery/pkg/util/uuid"
  38. "k8s.io/apimachinery/pkg/util/wait"
  39. clientset "k8s.io/client-go/kubernetes"
  40. scaleclient "k8s.io/client-go/scale"
  41. "k8s.io/client-go/util/workqueue"
  42. batchinternal "k8s.io/kubernetes/pkg/apis/batch"
  43. api "k8s.io/kubernetes/pkg/apis/core"
  44. extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
  45. "k8s.io/klog"
  46. )
  47. const (
  48. // String used to mark pod deletion
  49. nonExist = "NonExist"
  50. )
  51. func removePtr(replicas *int32) int32 {
  52. if replicas == nil {
  53. return 0
  54. }
  55. return *replicas
  56. }
  57. func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
  58. // Wait until it's scheduled
  59. p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
  60. if err == nil && p.Spec.NodeName != "" {
  61. return p, nil
  62. }
  63. pollingPeriod := 200 * time.Millisecond
  64. startTime := time.Now()
  65. for startTime.Add(timeout).After(time.Now()) {
  66. time.Sleep(pollingPeriod)
  67. p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
  68. if err == nil && p.Spec.NodeName != "" {
  69. return p, nil
  70. }
  71. }
  72. return nil, fmt.Errorf("Timed out after %v when waiting for pod %v/%v to start.", timeout, namespace, name)
  73. }
  74. func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
  75. name := pod.Name
  76. namespace := pod.Namespace
  77. if err := CreatePodWithRetries(c, namespace, pod); err != nil {
  78. return "", err
  79. }
  80. p, err := WaitUntilPodIsScheduled(c, name, namespace, timeout)
  81. if err != nil {
  82. return "", err
  83. }
  84. return p.Spec.NodeName, nil
  85. }
  86. type RunObjectConfig interface {
  87. Run() error
  88. GetName() string
  89. GetNamespace() string
  90. GetKind() schema.GroupKind
  91. GetClient() clientset.Interface
  92. GetScalesGetter() scaleclient.ScalesGetter
  93. SetClient(clientset.Interface)
  94. SetScalesClient(scaleclient.ScalesGetter)
  95. GetReplicas() int
  96. GetLabelValue(string) (string, bool)
  97. GetGroupResource() schema.GroupResource
  98. GetGroupVersionResource() schema.GroupVersionResource
  99. }
  100. type RCConfig struct {
  101. Affinity *v1.Affinity
  102. Client clientset.Interface
  103. ScalesGetter scaleclient.ScalesGetter
  104. Image string
  105. Command []string
  106. Name string
  107. Namespace string
  108. PollInterval time.Duration
  109. Timeout time.Duration
  110. PodStatusFile *os.File
  111. Replicas int
  112. CpuRequest int64 // millicores
  113. CpuLimit int64 // millicores
  114. MemRequest int64 // bytes
  115. MemLimit int64 // bytes
  116. GpuLimit int64 // count
  117. ReadinessProbe *v1.Probe
  118. DNSPolicy *v1.DNSPolicy
  119. PriorityClassName string
  120. TerminationGracePeriodSeconds *int64
  121. Lifecycle *v1.Lifecycle
  122. // Env vars, set the same for every pod.
  123. Env map[string]string
  124. // Extra labels and annotations added to every pod.
  125. Labels map[string]string
  126. Annotations map[string]string
  127. // Node selector for pods in the RC.
  128. NodeSelector map[string]string
  129. // Tolerations for pods in the RC.
  130. Tolerations []v1.Toleration
  131. // Ports to declare in the container (map of name to containerPort).
  132. Ports map[string]int
  133. // Ports to declare in the container as host and container ports.
  134. HostPorts map[string]int
  135. Volumes []v1.Volume
  136. VolumeMounts []v1.VolumeMount
  137. // Pointer to a list of pods; if non-nil, will be set to a list of pods
  138. // created by this RC by RunRC.
  139. CreatedPods *[]*v1.Pod
  140. // Maximum allowable container failures. If exceeded, RunRC returns an error.
  141. // Defaults to replicas*0.1 if unspecified.
  142. MaxContainerFailures *int
  143. // Maximum allowed pod deletions count. If exceeded, RunRC returns an error.
  144. // Defaults to 0.
  145. MaxAllowedPodDeletions int
  146. // If set to false starting RC will print progress, otherwise only errors will be printed.
  147. Silent bool
  148. // If set this function will be used to print log lines instead of klog.
  149. LogFunc func(fmt string, args ...interface{})
  150. // If set those functions will be used to gather data from Nodes - in integration tests where no
  151. // kubelets are running those variables should be nil.
  152. NodeDumpFunc func(c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
  153. ContainerDumpFunc func(c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
  154. // Names of the secrets and configmaps to mount.
  155. SecretNames []string
  156. ConfigMapNames []string
  157. ServiceAccountTokenProjections int
  158. }
  159. func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
  160. if rc.LogFunc != nil {
  161. rc.LogFunc(fmt, args...)
  162. }
  163. klog.Infof(fmt, args...)
  164. }
  165. type DeploymentConfig struct {
  166. RCConfig
  167. }
  168. type ReplicaSetConfig struct {
  169. RCConfig
  170. }
  171. type JobConfig struct {
  172. RCConfig
  173. }
  174. // podInfo contains pod information useful for debugging e2e tests.
  175. type podInfo struct {
  176. oldHostname string
  177. oldPhase string
  178. hostname string
  179. phase string
  180. }
  181. // PodDiff is a map of pod name to podInfos
  182. type PodDiff map[string]*podInfo
  183. // Print formats and prints the give PodDiff.
  184. func (p PodDiff) String(ignorePhases sets.String) string {
  185. ret := ""
  186. for name, info := range p {
  187. if ignorePhases.Has(info.phase) {
  188. continue
  189. }
  190. if info.phase == nonExist {
  191. ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
  192. continue
  193. }
  194. phaseChange, hostChange := false, false
  195. msg := fmt.Sprintf("Pod %v ", name)
  196. if info.oldPhase != info.phase {
  197. phaseChange = true
  198. if info.oldPhase == nonExist {
  199. msg += fmt.Sprintf("in phase %v ", info.phase)
  200. } else {
  201. msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
  202. }
  203. }
  204. if info.oldHostname != info.hostname {
  205. hostChange = true
  206. if info.oldHostname == nonExist || info.oldHostname == "" {
  207. msg += fmt.Sprintf("assigned host %v ", info.hostname)
  208. } else {
  209. msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
  210. }
  211. }
  212. if phaseChange || hostChange {
  213. ret += msg + "\n"
  214. }
  215. }
  216. return ret
  217. }
  218. // DeletedPods returns a slice of pods that were present at the beginning
  219. // and then disappeared.
  220. func (p PodDiff) DeletedPods() []string {
  221. var deletedPods []string
  222. for podName, podInfo := range p {
  223. if podInfo.hostname == nonExist {
  224. deletedPods = append(deletedPods, podName)
  225. }
  226. }
  227. return deletedPods
  228. }
  229. // Diff computes a PodDiff given 2 lists of pods.
  230. func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
  231. podInfoMap := PodDiff{}
  232. // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
  233. for _, pod := range curPods {
  234. podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
  235. }
  236. // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
  237. for _, pod := range oldPods {
  238. if info, ok := podInfoMap[pod.Name]; ok {
  239. info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
  240. } else {
  241. podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
  242. }
  243. }
  244. return podInfoMap
  245. }
  246. // RunDeployment Launches (and verifies correctness) of a Deployment
  247. // and will wait for all pods it spawns to become "Running".
  248. // It's the caller's responsibility to clean up externally (i.e. use the
  249. // namespace lifecycle for handling Cleanup).
  250. func RunDeployment(config DeploymentConfig) error {
  251. err := config.create()
  252. if err != nil {
  253. return err
  254. }
  255. return config.start()
  256. }
  257. func (config *DeploymentConfig) Run() error {
  258. return RunDeployment(*config)
  259. }
  260. func (config *DeploymentConfig) GetKind() schema.GroupKind {
  261. return extensionsinternal.Kind("Deployment")
  262. }
  263. func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
  264. return extensionsinternal.Resource("deployments")
  265. }
  266. func (config *DeploymentConfig) GetGroupVersionResource() schema.GroupVersionResource {
  267. return extensionsinternal.SchemeGroupVersion.WithResource("deployments")
  268. }
  269. func (config *DeploymentConfig) create() error {
  270. deployment := &apps.Deployment{
  271. ObjectMeta: metav1.ObjectMeta{
  272. Name: config.Name,
  273. },
  274. Spec: apps.DeploymentSpec{
  275. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  276. Selector: &metav1.LabelSelector{
  277. MatchLabels: map[string]string{
  278. "name": config.Name,
  279. },
  280. },
  281. Template: v1.PodTemplateSpec{
  282. ObjectMeta: metav1.ObjectMeta{
  283. Labels: map[string]string{"name": config.Name},
  284. Annotations: config.Annotations,
  285. },
  286. Spec: v1.PodSpec{
  287. Affinity: config.Affinity,
  288. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  289. Containers: []v1.Container{
  290. {
  291. Name: config.Name,
  292. Image: config.Image,
  293. Command: config.Command,
  294. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  295. Lifecycle: config.Lifecycle,
  296. },
  297. },
  298. },
  299. },
  300. },
  301. }
  302. if len(config.SecretNames) > 0 {
  303. attachSecrets(&deployment.Spec.Template, config.SecretNames)
  304. }
  305. if len(config.ConfigMapNames) > 0 {
  306. attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
  307. }
  308. for i := 0; i < config.ServiceAccountTokenProjections; i++ {
  309. attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
  310. }
  311. config.applyTo(&deployment.Spec.Template)
  312. if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
  313. return fmt.Errorf("Error creating deployment: %v", err)
  314. }
  315. config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
  316. return nil
  317. }
  318. // RunReplicaSet launches (and verifies correctness) of a ReplicaSet
  319. // and waits until all the pods it launches to reach the "Running" state.
  320. // It's the caller's responsibility to clean up externally (i.e. use the
  321. // namespace lifecycle for handling Cleanup).
  322. func RunReplicaSet(config ReplicaSetConfig) error {
  323. err := config.create()
  324. if err != nil {
  325. return err
  326. }
  327. return config.start()
  328. }
  329. func (config *ReplicaSetConfig) Run() error {
  330. return RunReplicaSet(*config)
  331. }
  332. func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
  333. return extensionsinternal.Kind("ReplicaSet")
  334. }
  335. func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
  336. return extensionsinternal.Resource("replicasets")
  337. }
  338. func (config *ReplicaSetConfig) GetGroupVersionResource() schema.GroupVersionResource {
  339. return extensionsinternal.SchemeGroupVersion.WithResource("replicasets")
  340. }
  341. func (config *ReplicaSetConfig) create() error {
  342. rs := &apps.ReplicaSet{
  343. ObjectMeta: metav1.ObjectMeta{
  344. Name: config.Name,
  345. },
  346. Spec: apps.ReplicaSetSpec{
  347. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  348. Selector: &metav1.LabelSelector{
  349. MatchLabels: map[string]string{
  350. "name": config.Name,
  351. },
  352. },
  353. Template: v1.PodTemplateSpec{
  354. ObjectMeta: metav1.ObjectMeta{
  355. Labels: map[string]string{"name": config.Name},
  356. Annotations: config.Annotations,
  357. },
  358. Spec: v1.PodSpec{
  359. Affinity: config.Affinity,
  360. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  361. Containers: []v1.Container{
  362. {
  363. Name: config.Name,
  364. Image: config.Image,
  365. Command: config.Command,
  366. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  367. Lifecycle: config.Lifecycle,
  368. },
  369. },
  370. },
  371. },
  372. },
  373. }
  374. if len(config.SecretNames) > 0 {
  375. attachSecrets(&rs.Spec.Template, config.SecretNames)
  376. }
  377. if len(config.ConfigMapNames) > 0 {
  378. attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
  379. }
  380. config.applyTo(&rs.Spec.Template)
  381. if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
  382. return fmt.Errorf("Error creating replica set: %v", err)
  383. }
  384. config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
  385. return nil
  386. }
  387. // RunJob baunches (and verifies correctness) of a Job
  388. // and will wait for all pods it spawns to become "Running".
  389. // It's the caller's responsibility to clean up externally (i.e. use the
  390. // namespace lifecycle for handling Cleanup).
  391. func RunJob(config JobConfig) error {
  392. err := config.create()
  393. if err != nil {
  394. return err
  395. }
  396. return config.start()
  397. }
  398. func (config *JobConfig) Run() error {
  399. return RunJob(*config)
  400. }
  401. func (config *JobConfig) GetKind() schema.GroupKind {
  402. return batchinternal.Kind("Job")
  403. }
  404. func (config *JobConfig) GetGroupResource() schema.GroupResource {
  405. return batchinternal.Resource("jobs")
  406. }
  407. func (config *JobConfig) GetGroupVersionResource() schema.GroupVersionResource {
  408. return batchinternal.SchemeGroupVersion.WithResource("jobs")
  409. }
  410. func (config *JobConfig) create() error {
  411. job := &batch.Job{
  412. ObjectMeta: metav1.ObjectMeta{
  413. Name: config.Name,
  414. },
  415. Spec: batch.JobSpec{
  416. Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  417. Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  418. Template: v1.PodTemplateSpec{
  419. ObjectMeta: metav1.ObjectMeta{
  420. Labels: map[string]string{"name": config.Name},
  421. Annotations: config.Annotations,
  422. },
  423. Spec: v1.PodSpec{
  424. Affinity: config.Affinity,
  425. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
  426. Containers: []v1.Container{
  427. {
  428. Name: config.Name,
  429. Image: config.Image,
  430. Command: config.Command,
  431. Lifecycle: config.Lifecycle,
  432. },
  433. },
  434. RestartPolicy: v1.RestartPolicyOnFailure,
  435. },
  436. },
  437. },
  438. }
  439. if len(config.SecretNames) > 0 {
  440. attachSecrets(&job.Spec.Template, config.SecretNames)
  441. }
  442. if len(config.ConfigMapNames) > 0 {
  443. attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
  444. }
  445. config.applyTo(&job.Spec.Template)
  446. if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
  447. return fmt.Errorf("Error creating job: %v", err)
  448. }
  449. config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
  450. return nil
  451. }
  452. // RunRC Launches (and verifies correctness) of a Replication Controller
  453. // and will wait for all pods it spawns to become "Running".
  454. // It's the caller's responsibility to clean up externally (i.e. use the
  455. // namespace lifecycle for handling Cleanup).
  456. func RunRC(config RCConfig) error {
  457. err := config.create()
  458. if err != nil {
  459. return err
  460. }
  461. return config.start()
  462. }
  463. func (config *RCConfig) Run() error {
  464. return RunRC(*config)
  465. }
  466. func (config *RCConfig) GetName() string {
  467. return config.Name
  468. }
  469. func (config *RCConfig) GetNamespace() string {
  470. return config.Namespace
  471. }
  472. func (config *RCConfig) GetKind() schema.GroupKind {
  473. return api.Kind("ReplicationController")
  474. }
  475. func (config *RCConfig) GetGroupResource() schema.GroupResource {
  476. return api.Resource("replicationcontrollers")
  477. }
  478. func (config *RCConfig) GetGroupVersionResource() schema.GroupVersionResource {
  479. return api.SchemeGroupVersion.WithResource("replicationcontrollers")
  480. }
  481. func (config *RCConfig) GetClient() clientset.Interface {
  482. return config.Client
  483. }
  484. func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
  485. return config.ScalesGetter
  486. }
  487. func (config *RCConfig) SetClient(c clientset.Interface) {
  488. config.Client = c
  489. }
  490. func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
  491. config.ScalesGetter = getter
  492. }
  493. func (config *RCConfig) GetReplicas() int {
  494. return config.Replicas
  495. }
  496. func (config *RCConfig) GetLabelValue(key string) (string, bool) {
  497. value, found := config.Labels[key]
  498. return value, found
  499. }
  500. func (config *RCConfig) create() error {
  501. dnsDefault := v1.DNSDefault
  502. if config.DNSPolicy == nil {
  503. config.DNSPolicy = &dnsDefault
  504. }
  505. one := int64(1)
  506. rc := &v1.ReplicationController{
  507. ObjectMeta: metav1.ObjectMeta{
  508. Name: config.Name,
  509. },
  510. Spec: v1.ReplicationControllerSpec{
  511. Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
  512. Selector: map[string]string{
  513. "name": config.Name,
  514. },
  515. Template: &v1.PodTemplateSpec{
  516. ObjectMeta: metav1.ObjectMeta{
  517. Labels: map[string]string{"name": config.Name},
  518. Annotations: config.Annotations,
  519. },
  520. Spec: v1.PodSpec{
  521. Affinity: config.Affinity,
  522. Containers: []v1.Container{
  523. {
  524. Name: config.Name,
  525. Image: config.Image,
  526. Command: config.Command,
  527. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  528. ReadinessProbe: config.ReadinessProbe,
  529. Lifecycle: config.Lifecycle,
  530. },
  531. },
  532. DNSPolicy: *config.DNSPolicy,
  533. NodeSelector: config.NodeSelector,
  534. Tolerations: config.Tolerations,
  535. TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(&one),
  536. PriorityClassName: config.PriorityClassName,
  537. },
  538. },
  539. },
  540. }
  541. if len(config.SecretNames) > 0 {
  542. attachSecrets(rc.Spec.Template, config.SecretNames)
  543. }
  544. if len(config.ConfigMapNames) > 0 {
  545. attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
  546. }
  547. config.applyTo(rc.Spec.Template)
  548. if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
  549. return fmt.Errorf("Error creating replication controller: %v", err)
  550. }
  551. config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
  552. return nil
  553. }
  554. func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
  555. if config.Env != nil {
  556. for k, v := range config.Env {
  557. c := &template.Spec.Containers[0]
  558. c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
  559. }
  560. }
  561. if config.Labels != nil {
  562. for k, v := range config.Labels {
  563. template.ObjectMeta.Labels[k] = v
  564. }
  565. }
  566. if config.NodeSelector != nil {
  567. template.Spec.NodeSelector = make(map[string]string)
  568. for k, v := range config.NodeSelector {
  569. template.Spec.NodeSelector[k] = v
  570. }
  571. }
  572. if config.Tolerations != nil {
  573. template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
  574. }
  575. if config.Ports != nil {
  576. for k, v := range config.Ports {
  577. c := &template.Spec.Containers[0]
  578. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
  579. }
  580. }
  581. if config.HostPorts != nil {
  582. for k, v := range config.HostPorts {
  583. c := &template.Spec.Containers[0]
  584. c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
  585. }
  586. }
  587. if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
  588. template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
  589. }
  590. if config.CpuLimit > 0 {
  591. template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
  592. }
  593. if config.MemLimit > 0 {
  594. template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
  595. }
  596. if config.CpuRequest > 0 || config.MemRequest > 0 {
  597. template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
  598. }
  599. if config.CpuRequest > 0 {
  600. template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
  601. }
  602. if config.MemRequest > 0 {
  603. template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
  604. }
  605. if config.GpuLimit > 0 {
  606. template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
  607. }
  608. if config.Lifecycle != nil {
  609. template.Spec.Containers[0].Lifecycle = config.Lifecycle
  610. }
  611. if len(config.Volumes) > 0 {
  612. template.Spec.Volumes = config.Volumes
  613. }
  614. if len(config.VolumeMounts) > 0 {
  615. template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
  616. }
  617. if config.PriorityClassName != "" {
  618. template.Spec.PriorityClassName = config.PriorityClassName
  619. }
  620. }
  621. type RCStartupStatus struct {
  622. Expected int
  623. Terminating int
  624. Running int
  625. RunningButNotReady int
  626. Waiting int
  627. Pending int
  628. Scheduled int
  629. Unknown int
  630. Inactive int
  631. FailedContainers int
  632. Created []*v1.Pod
  633. ContainerRestartNodes sets.String
  634. }
  635. func (s *RCStartupStatus) String(name string) string {
  636. return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
  637. name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
  638. }
  639. func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
  640. startupStatus := RCStartupStatus{
  641. Expected: expected,
  642. Created: make([]*v1.Pod, 0, expected),
  643. ContainerRestartNodes: sets.NewString(),
  644. }
  645. for _, p := range pods {
  646. if p.DeletionTimestamp != nil {
  647. startupStatus.Terminating++
  648. continue
  649. }
  650. startupStatus.Created = append(startupStatus.Created, p)
  651. if p.Status.Phase == v1.PodRunning {
  652. ready := false
  653. for _, c := range p.Status.Conditions {
  654. if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
  655. ready = true
  656. break
  657. }
  658. }
  659. if ready {
  660. // Only count a pod is running when it is also ready.
  661. startupStatus.Running++
  662. } else {
  663. startupStatus.RunningButNotReady++
  664. }
  665. for _, v := range FailedContainers(p) {
  666. startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
  667. startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
  668. }
  669. } else if p.Status.Phase == v1.PodPending {
  670. if p.Spec.NodeName == "" {
  671. startupStatus.Waiting++
  672. } else {
  673. startupStatus.Pending++
  674. }
  675. } else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
  676. startupStatus.Inactive++
  677. } else if p.Status.Phase == v1.PodUnknown {
  678. startupStatus.Unknown++
  679. }
  680. // Record count of scheduled pods (useful for computing scheduler throughput).
  681. if p.Spec.NodeName != "" {
  682. startupStatus.Scheduled++
  683. }
  684. }
  685. return startupStatus
  686. }
  687. func (config *RCConfig) start() error {
  688. // Don't force tests to fail if they don't care about containers restarting.
  689. var maxContainerFailures int
  690. if config.MaxContainerFailures == nil {
  691. maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
  692. } else {
  693. maxContainerFailures = *config.MaxContainerFailures
  694. }
  695. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
  696. ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
  697. if err != nil {
  698. return err
  699. }
  700. defer ps.Stop()
  701. interval := config.PollInterval
  702. if interval <= 0 {
  703. interval = 10 * time.Second
  704. }
  705. timeout := config.Timeout
  706. if timeout <= 0 {
  707. timeout = 5 * time.Minute
  708. }
  709. oldPods := make([]*v1.Pod, 0)
  710. oldRunning := 0
  711. lastChange := time.Now()
  712. podDeletionsCount := 0
  713. for oldRunning != config.Replicas {
  714. time.Sleep(interval)
  715. pods := ps.List()
  716. startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
  717. if config.CreatedPods != nil {
  718. *config.CreatedPods = startupStatus.Created
  719. }
  720. if !config.Silent {
  721. config.RCConfigLog(startupStatus.String(config.Name))
  722. }
  723. if config.PodStatusFile != nil {
  724. fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
  725. }
  726. if startupStatus.FailedContainers > maxContainerFailures {
  727. if config.NodeDumpFunc != nil {
  728. config.NodeDumpFunc(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
  729. }
  730. if config.ContainerDumpFunc != nil {
  731. // Get the logs from the failed containers to help diagnose what caused them to fail
  732. config.ContainerDumpFunc(config.Client, config.Namespace, config.RCConfigLog)
  733. }
  734. return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
  735. }
  736. diff := Diff(oldPods, pods)
  737. deletedPods := diff.DeletedPods()
  738. podDeletionsCount += len(deletedPods)
  739. if podDeletionsCount > config.MaxAllowedPodDeletions {
  740. // Number of pods which disappeared is over threshold
  741. err := fmt.Errorf("%d pods disappeared for %s: %v", podDeletionsCount, config.Name, strings.Join(deletedPods, ", "))
  742. config.RCConfigLog(err.Error())
  743. config.RCConfigLog(diff.String(sets.NewString()))
  744. return err
  745. }
  746. if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
  747. lastChange = time.Now()
  748. }
  749. oldPods = pods
  750. oldRunning = startupStatus.Running
  751. if time.Since(lastChange) > timeout {
  752. break
  753. }
  754. }
  755. if oldRunning != config.Replicas {
  756. // List only pods from a given replication controller.
  757. options := metav1.ListOptions{LabelSelector: label.String()}
  758. if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(context.TODO(), options); err == nil {
  759. for _, pod := range pods.Items {
  760. config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
  761. }
  762. } else {
  763. config.RCConfigLog("Can't list pod debug info: %v", err)
  764. }
  765. return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
  766. }
  767. return nil
  768. }
  769. // Simplified version of RunRC, that does not create RC, but creates plain Pods.
  770. // Optionally waits for pods to start running (if waitForRunning == true).
  771. // The number of replicas must be non-zero.
  772. func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
  773. pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
  774. // no pod to start
  775. if replicas < 1 {
  776. panic("StartPods: number of replicas must be non-zero")
  777. }
  778. startPodsID := string(uuid.NewUUID()) // So that we can label and find them
  779. for i := 0; i < replicas; i++ {
  780. podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
  781. pod.ObjectMeta.Name = podName
  782. pod.ObjectMeta.Labels["name"] = podName
  783. pod.ObjectMeta.Labels["startPodsID"] = startPodsID
  784. pod.Spec.Containers[0].Name = podName
  785. if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
  786. return err
  787. }
  788. }
  789. logFunc("Waiting for running...")
  790. if waitForRunning {
  791. label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
  792. err := WaitForPodsWithLabelRunning(c, namespace, label)
  793. if err != nil {
  794. return fmt.Errorf("Error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
  795. }
  796. }
  797. return nil
  798. }
  799. // Wait up to 10 minutes for all matching pods to become Running and at least one
  800. // matching pod exists.
  801. func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
  802. return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
  803. }
  804. // Wait up to 10 minutes for at least 'replicas' many pods to be Running and at least
  805. // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
  806. func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
  807. running := false
  808. ps, err := NewPodStore(c, ns, label, fields.Everything())
  809. if err != nil {
  810. return err
  811. }
  812. defer ps.Stop()
  813. for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
  814. pods := ps.List()
  815. if len(pods) == 0 {
  816. continue
  817. }
  818. runningPodsCount := 0
  819. for _, p := range pods {
  820. if p.Status.Phase == v1.PodRunning {
  821. runningPodsCount++
  822. }
  823. }
  824. if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
  825. continue
  826. }
  827. running = true
  828. break
  829. }
  830. if !running {
  831. return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
  832. }
  833. return nil
  834. }
  835. type CountToStrategy struct {
  836. Count int
  837. Strategy PrepareNodeStrategy
  838. }
  839. type TestNodePreparer interface {
  840. PrepareNodes() error
  841. CleanupNodes() error
  842. }
  843. type PrepareNodeStrategy interface {
  844. // Modify pre-created Node objects before the test starts.
  845. PreparePatch(node *v1.Node) []byte
  846. // Create or modify any objects that depend on the node before the test starts.
  847. // Caller will re-try when http.StatusConflict error is returned.
  848. PrepareDependentObjects(node *v1.Node, client clientset.Interface) error
  849. // Clean up any node modifications after the test finishes.
  850. CleanupNode(node *v1.Node) *v1.Node
  851. // Clean up any objects that depend on the node after the test finishes.
  852. // Caller will re-try when http.StatusConflict error is returned.
  853. CleanupDependentObjects(nodeName string, client clientset.Interface) error
  854. }
  855. type TrivialNodePrepareStrategy struct{}
  856. var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
  857. func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  858. return []byte{}
  859. }
  860. func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  861. nodeCopy := *node
  862. return &nodeCopy
  863. }
  864. func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  865. return nil
  866. }
  867. func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  868. return nil
  869. }
  870. type LabelNodePrepareStrategy struct {
  871. LabelKey string
  872. LabelValue string
  873. }
  874. var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
  875. func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy {
  876. return &LabelNodePrepareStrategy{
  877. LabelKey: labelKey,
  878. LabelValue: labelValue,
  879. }
  880. }
  881. func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
  882. labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, s.LabelValue)
  883. patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  884. return []byte(patch)
  885. }
  886. func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
  887. nodeCopy := node.DeepCopy()
  888. if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  889. delete(nodeCopy.Labels, s.LabelKey)
  890. }
  891. return nodeCopy
  892. }
  893. func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  894. return nil
  895. }
  896. func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  897. return nil
  898. }
  899. // NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable.
  900. // csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is
  901. // set to nil.
  902. type NodeAllocatableStrategy struct {
  903. // Node.status.allocatable to fill to all nodes.
  904. NodeAllocatable map[v1.ResourceName]string
  905. // Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>].
  906. CsiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources
  907. // List of in-tree volume plugins migrated to CSI.
  908. MigratedPlugins []string
  909. }
  910. var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
  911. func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
  912. return &NodeAllocatableStrategy{
  913. NodeAllocatable: nodeAllocatable,
  914. CsiNodeAllocatable: csiNodeAllocatable,
  915. MigratedPlugins: migratedPlugins,
  916. }
  917. }
  918. func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
  919. newNode := node.DeepCopy()
  920. for name, value := range s.NodeAllocatable {
  921. newNode.Status.Allocatable[name] = resource.MustParse(value)
  922. }
  923. oldJSON, err := json.Marshal(node)
  924. if err != nil {
  925. panic(err)
  926. }
  927. newJSON, err := json.Marshal(newNode)
  928. if err != nil {
  929. panic(err)
  930. }
  931. patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
  932. if err != nil {
  933. panic(err)
  934. }
  935. return patch
  936. }
  937. func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node {
  938. nodeCopy := node.DeepCopy()
  939. for name := range s.NodeAllocatable {
  940. delete(nodeCopy.Status.Allocatable, name)
  941. }
  942. return nodeCopy
  943. }
  944. func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error {
  945. csiNode := &storagev1beta1.CSINode{
  946. ObjectMeta: metav1.ObjectMeta{
  947. Name: nodeName,
  948. Annotations: map[string]string{
  949. v1.MigratedPluginsAnnotationKey: strings.Join(s.MigratedPlugins, ","),
  950. },
  951. },
  952. Spec: storagev1beta1.CSINodeSpec{
  953. Drivers: []storagev1beta1.CSINodeDriver{},
  954. },
  955. }
  956. for driver, allocatable := range s.CsiNodeAllocatable {
  957. d := storagev1beta1.CSINodeDriver{
  958. Name: driver,
  959. Allocatable: allocatable,
  960. NodeID: nodeName,
  961. }
  962. csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  963. }
  964. _, err := client.StorageV1beta1().CSINodes().Create(context.TODO(), csiNode, metav1.CreateOptions{})
  965. if apierrors.IsAlreadyExists(err) {
  966. // Something created CSINode instance after we checked it did not exist.
  967. // Make the caller to re-try PrepareDependentObjects by returning Conflict error
  968. err = apierrors.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
  969. }
  970. return err
  971. }
  972. func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1beta1.CSINode, client clientset.Interface) error {
  973. for driverName, allocatable := range s.CsiNodeAllocatable {
  974. found := false
  975. for i, driver := range csiNode.Spec.Drivers {
  976. if driver.Name == driverName {
  977. found = true
  978. csiNode.Spec.Drivers[i].Allocatable = allocatable
  979. break
  980. }
  981. }
  982. if !found {
  983. d := storagev1beta1.CSINodeDriver{
  984. Name: driverName,
  985. Allocatable: allocatable,
  986. }
  987. csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
  988. }
  989. }
  990. csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.MigratedPlugins, ",")
  991. _, err := client.StorageV1beta1().CSINodes().Update(context.TODO(), csiNode, metav1.UpdateOptions{})
  992. return err
  993. }
  994. func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  995. csiNode, err := client.StorageV1beta1().CSINodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
  996. if err != nil {
  997. if apierrors.IsNotFound(err) {
  998. return s.createCSINode(node.Name, client)
  999. }
  1000. return err
  1001. }
  1002. return s.updateCSINode(csiNode, client)
  1003. }
  1004. func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  1005. csiNode, err := client.StorageV1beta1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  1006. if err != nil {
  1007. if apierrors.IsNotFound(err) {
  1008. return nil
  1009. }
  1010. return err
  1011. }
  1012. for driverName := range s.CsiNodeAllocatable {
  1013. for i, driver := range csiNode.Spec.Drivers {
  1014. if driver.Name == driverName {
  1015. csiNode.Spec.Drivers[i].Allocatable = nil
  1016. }
  1017. }
  1018. }
  1019. return s.updateCSINode(csiNode, client)
  1020. }
  1021. // UniqueNodeLabelStrategy sets a unique label for each node.
  1022. type UniqueNodeLabelStrategy struct {
  1023. LabelKey string
  1024. }
  1025. var _ PrepareNodeStrategy = &UniqueNodeLabelStrategy{}
  1026. func NewUniqueNodeLabelStrategy(labelKey string) *UniqueNodeLabelStrategy {
  1027. return &UniqueNodeLabelStrategy{
  1028. LabelKey: labelKey,
  1029. }
  1030. }
  1031. func (s *UniqueNodeLabelStrategy) PreparePatch(*v1.Node) []byte {
  1032. labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, string(uuid.NewUUID()))
  1033. patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
  1034. return []byte(patch)
  1035. }
  1036. func (s *UniqueNodeLabelStrategy) CleanupNode(node *v1.Node) *v1.Node {
  1037. nodeCopy := node.DeepCopy()
  1038. if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
  1039. delete(nodeCopy.Labels, s.LabelKey)
  1040. }
  1041. return nodeCopy
  1042. }
  1043. func (*UniqueNodeLabelStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
  1044. return nil
  1045. }
  1046. func (*UniqueNodeLabelStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
  1047. return nil
  1048. }
  1049. func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
  1050. var err error
  1051. patch := strategy.PreparePatch(node)
  1052. if len(patch) == 0 {
  1053. return nil
  1054. }
  1055. for attempt := 0; attempt < retries; attempt++ {
  1056. if _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err == nil {
  1057. break
  1058. }
  1059. if !apierrors.IsConflict(err) {
  1060. return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
  1061. }
  1062. time.Sleep(100 * time.Millisecond)
  1063. }
  1064. if err != nil {
  1065. return fmt.Errorf("Too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
  1066. }
  1067. for attempt := 0; attempt < retries; attempt++ {
  1068. if err = strategy.PrepareDependentObjects(node, client); err == nil {
  1069. break
  1070. }
  1071. if !apierrors.IsConflict(err) {
  1072. return fmt.Errorf("Error while preparing objects for node %s: %s", node.Name, err)
  1073. }
  1074. time.Sleep(100 * time.Millisecond)
  1075. }
  1076. if err != nil {
  1077. return fmt.Errorf("Too many conflicts when creating objects for node %s: %s", node.Name, err)
  1078. }
  1079. return nil
  1080. }
  1081. func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
  1082. var err error
  1083. for attempt := 0; attempt < retries; attempt++ {
  1084. node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
  1085. if err != nil {
  1086. return fmt.Errorf("Skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
  1087. }
  1088. updatedNode := strategy.CleanupNode(node)
  1089. if apiequality.Semantic.DeepEqual(node, updatedNode) {
  1090. return nil
  1091. }
  1092. if _, err = client.CoreV1().Nodes().Update(context.TODO(), updatedNode, metav1.UpdateOptions{}); err == nil {
  1093. break
  1094. }
  1095. if !apierrors.IsConflict(err) {
  1096. return fmt.Errorf("Error when updating Node %v: %v", nodeName, err)
  1097. }
  1098. time.Sleep(100 * time.Millisecond)
  1099. }
  1100. if err != nil {
  1101. return fmt.Errorf("Too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
  1102. }
  1103. for attempt := 0; attempt < retries; attempt++ {
  1104. err = strategy.CleanupDependentObjects(nodeName, client)
  1105. if err == nil {
  1106. break
  1107. }
  1108. if !apierrors.IsConflict(err) {
  1109. return fmt.Errorf("Error when cleaning up Node %v objects: %v", nodeName, err)
  1110. }
  1111. time.Sleep(100 * time.Millisecond)
  1112. }
  1113. if err != nil {
  1114. return fmt.Errorf("Too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
  1115. }
  1116. return nil
  1117. }
  1118. type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
  1119. type CountToPodStrategy struct {
  1120. Count int
  1121. Strategy TestPodCreateStrategy
  1122. }
  1123. type TestPodCreatorConfig map[string][]CountToPodStrategy
  1124. func NewTestPodCreatorConfig() *TestPodCreatorConfig {
  1125. config := make(TestPodCreatorConfig)
  1126. return &config
  1127. }
  1128. func (c *TestPodCreatorConfig) AddStrategy(
  1129. namespace string, podCount int, strategy TestPodCreateStrategy) {
  1130. (*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
  1131. }
  1132. type TestPodCreator struct {
  1133. Client clientset.Interface
  1134. // namespace -> count -> strategy
  1135. Config *TestPodCreatorConfig
  1136. }
  1137. func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
  1138. return &TestPodCreator{
  1139. Client: client,
  1140. Config: config,
  1141. }
  1142. }
  1143. func (c *TestPodCreator) CreatePods() error {
  1144. for ns, v := range *(c.Config) {
  1145. for _, countToStrategy := range v {
  1146. if err := countToStrategy.Strategy(c.Client, ns, countToStrategy.Count); err != nil {
  1147. return err
  1148. }
  1149. }
  1150. }
  1151. return nil
  1152. }
  1153. func MakePodSpec() v1.PodSpec {
  1154. return v1.PodSpec{
  1155. Containers: []v1.Container{{
  1156. Name: "pause",
  1157. Image: "k8s.gcr.io/pause:3.1",
  1158. Ports: []v1.ContainerPort{{ContainerPort: 80}},
  1159. Resources: v1.ResourceRequirements{
  1160. Limits: v1.ResourceList{
  1161. v1.ResourceCPU: resource.MustParse("100m"),
  1162. v1.ResourceMemory: resource.MustParse("500Mi"),
  1163. },
  1164. Requests: v1.ResourceList{
  1165. v1.ResourceCPU: resource.MustParse("100m"),
  1166. v1.ResourceMemory: resource.MustParse("500Mi"),
  1167. },
  1168. },
  1169. }},
  1170. }
  1171. }
  1172. func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
  1173. if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
  1174. return fmt.Errorf("Error creating pod: %v", err)
  1175. }
  1176. return nil
  1177. }
  1178. func CreatePod(client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
  1179. var createError error
  1180. lock := sync.Mutex{}
  1181. createPodFunc := func(i int) {
  1182. if err := makeCreatePod(client, namespace, podTemplate); err != nil {
  1183. lock.Lock()
  1184. defer lock.Unlock()
  1185. createError = err
  1186. }
  1187. }
  1188. if podCount < 30 {
  1189. workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
  1190. } else {
  1191. workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
  1192. }
  1193. return createError
  1194. }
  1195. func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error {
  1196. var createError error
  1197. lock := sync.Mutex{}
  1198. createPodFunc := func(i int) {
  1199. pvcName := fmt.Sprintf("pvc-%d", i)
  1200. // pv
  1201. pv := factory(i)
  1202. // bind to "pvc-$i"
  1203. pv.Spec.ClaimRef = &v1.ObjectReference{
  1204. Kind: "PersistentVolumeClaim",
  1205. Namespace: namespace,
  1206. Name: pvcName,
  1207. APIVersion: "v1",
  1208. }
  1209. pv.Status.Phase = v1.VolumeBound
  1210. if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
  1211. lock.Lock()
  1212. defer lock.Unlock()
  1213. createError = fmt.Errorf("error creating PV: %s", err)
  1214. return
  1215. }
  1216. // pvc
  1217. pvc := claimTemplate.DeepCopy()
  1218. pvc.Name = pvcName
  1219. // bind to "pv-$i"
  1220. pvc.Spec.VolumeName = pv.Name
  1221. pvc.Status.Phase = v1.ClaimBound
  1222. if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
  1223. lock.Lock()
  1224. defer lock.Unlock()
  1225. createError = fmt.Errorf("error creating PVC: %s", err)
  1226. return
  1227. }
  1228. // pod
  1229. pod := podTemplate.DeepCopy()
  1230. pod.Spec.Volumes = []v1.Volume{
  1231. {
  1232. Name: "vol",
  1233. VolumeSource: v1.VolumeSource{
  1234. PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1235. ClaimName: pvcName,
  1236. },
  1237. },
  1238. },
  1239. }
  1240. if err := makeCreatePod(client, namespace, pod); err != nil {
  1241. lock.Lock()
  1242. defer lock.Unlock()
  1243. createError = err
  1244. return
  1245. }
  1246. }
  1247. if count < 30 {
  1248. workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc)
  1249. } else {
  1250. workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc)
  1251. }
  1252. return createError
  1253. }
  1254. func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
  1255. rc := &v1.ReplicationController{
  1256. ObjectMeta: metav1.ObjectMeta{
  1257. Name: controllerName,
  1258. },
  1259. Spec: v1.ReplicationControllerSpec{
  1260. Replicas: func(i int) *int32 { x := int32(i); return &x }(podCount),
  1261. Selector: map[string]string{"name": controllerName},
  1262. Template: &v1.PodTemplateSpec{
  1263. ObjectMeta: metav1.ObjectMeta{
  1264. Labels: map[string]string{"name": controllerName},
  1265. },
  1266. Spec: podTemplate.Spec,
  1267. },
  1268. },
  1269. }
  1270. if err := CreateRCWithRetries(client, namespace, rc); err != nil {
  1271. return fmt.Errorf("Error creating replication controller: %v", err)
  1272. }
  1273. return nil
  1274. }
  1275. func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
  1276. return func(client clientset.Interface, namespace string, podCount int) error {
  1277. return CreatePod(client, namespace, podCount, podTemplate)
  1278. }
  1279. }
  1280. // volumeFactory creates an unique PersistentVolume for given integer.
  1281. type volumeFactory func(uniqueID int) *v1.PersistentVolume
  1282. func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
  1283. return func(client clientset.Interface, namespace string, podCount int) error {
  1284. return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount)
  1285. }
  1286. }
  1287. func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
  1288. basePod := &v1.Pod{
  1289. ObjectMeta: metav1.ObjectMeta{
  1290. GenerateName: "simple-pod-",
  1291. },
  1292. Spec: MakePodSpec(),
  1293. }
  1294. return NewCustomCreatePodStrategy(basePod)
  1295. }
  1296. func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
  1297. return func(client clientset.Interface, namespace string, podCount int) error {
  1298. basePod := &v1.Pod{
  1299. ObjectMeta: metav1.ObjectMeta{
  1300. GenerateName: controllerName + "-pod-",
  1301. Labels: map[string]string{"name": controllerName},
  1302. },
  1303. Spec: MakePodSpec(),
  1304. }
  1305. if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
  1306. return err
  1307. }
  1308. return CreatePod(client, namespace, podCount, basePod)
  1309. }
  1310. }
  1311. type SecretConfig struct {
  1312. Content map[string]string
  1313. Client clientset.Interface
  1314. Name string
  1315. Namespace string
  1316. // If set this function will be used to print log lines instead of klog.
  1317. LogFunc func(fmt string, args ...interface{})
  1318. }
  1319. func (config *SecretConfig) Run() error {
  1320. secret := &v1.Secret{
  1321. ObjectMeta: metav1.ObjectMeta{
  1322. Name: config.Name,
  1323. },
  1324. StringData: map[string]string{},
  1325. }
  1326. for k, v := range config.Content {
  1327. secret.StringData[k] = v
  1328. }
  1329. if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
  1330. return fmt.Errorf("Error creating secret: %v", err)
  1331. }
  1332. config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
  1333. return nil
  1334. }
  1335. func (config *SecretConfig) Stop() error {
  1336. if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1337. return fmt.Errorf("Error deleting secret: %v", err)
  1338. }
  1339. config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
  1340. return nil
  1341. }
  1342. // TODO: attach secrets using different possibilities: env vars, image pull secrets.
  1343. func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
  1344. volumes := make([]v1.Volume, 0, len(secretNames))
  1345. mounts := make([]v1.VolumeMount, 0, len(secretNames))
  1346. for _, name := range secretNames {
  1347. volumes = append(volumes, v1.Volume{
  1348. Name: name,
  1349. VolumeSource: v1.VolumeSource{
  1350. Secret: &v1.SecretVolumeSource{
  1351. SecretName: name,
  1352. },
  1353. },
  1354. })
  1355. mounts = append(mounts, v1.VolumeMount{
  1356. Name: name,
  1357. MountPath: fmt.Sprintf("/%v", name),
  1358. })
  1359. }
  1360. template.Spec.Volumes = volumes
  1361. template.Spec.Containers[0].VolumeMounts = mounts
  1362. }
  1363. type ConfigMapConfig struct {
  1364. Content map[string]string
  1365. Client clientset.Interface
  1366. Name string
  1367. Namespace string
  1368. // If set this function will be used to print log lines instead of klog.
  1369. LogFunc func(fmt string, args ...interface{})
  1370. }
  1371. func (config *ConfigMapConfig) Run() error {
  1372. configMap := &v1.ConfigMap{
  1373. ObjectMeta: metav1.ObjectMeta{
  1374. Name: config.Name,
  1375. },
  1376. Data: map[string]string{},
  1377. }
  1378. for k, v := range config.Content {
  1379. configMap.Data[k] = v
  1380. }
  1381. if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
  1382. return fmt.Errorf("Error creating configmap: %v", err)
  1383. }
  1384. config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
  1385. return nil
  1386. }
  1387. func (config *ConfigMapConfig) Stop() error {
  1388. if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, &metav1.DeleteOptions{}); err != nil {
  1389. return fmt.Errorf("Error deleting configmap: %v", err)
  1390. }
  1391. config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
  1392. return nil
  1393. }
  1394. // TODO: attach configmaps using different possibilities: env vars.
  1395. func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
  1396. volumes := make([]v1.Volume, 0, len(configMapNames))
  1397. mounts := make([]v1.VolumeMount, 0, len(configMapNames))
  1398. for _, name := range configMapNames {
  1399. volumes = append(volumes, v1.Volume{
  1400. Name: name,
  1401. VolumeSource: v1.VolumeSource{
  1402. ConfigMap: &v1.ConfigMapVolumeSource{
  1403. LocalObjectReference: v1.LocalObjectReference{
  1404. Name: name,
  1405. },
  1406. },
  1407. },
  1408. })
  1409. mounts = append(mounts, v1.VolumeMount{
  1410. Name: name,
  1411. MountPath: fmt.Sprintf("/%v", name),
  1412. })
  1413. }
  1414. template.Spec.Volumes = volumes
  1415. template.Spec.Containers[0].VolumeMounts = mounts
  1416. }
  1417. func (config *RCConfig) getTerminationGracePeriodSeconds(defaultGrace *int64) *int64 {
  1418. if config.TerminationGracePeriodSeconds == nil || *config.TerminationGracePeriodSeconds < 0 {
  1419. return defaultGrace
  1420. }
  1421. return config.TerminationGracePeriodSeconds
  1422. }
  1423. func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
  1424. template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
  1425. v1.VolumeMount{
  1426. Name: name,
  1427. MountPath: "/var/service-account-tokens/" + name,
  1428. })
  1429. template.Spec.Volumes = append(template.Spec.Volumes,
  1430. v1.Volume{
  1431. Name: name,
  1432. VolumeSource: v1.VolumeSource{
  1433. Projected: &v1.ProjectedVolumeSource{
  1434. Sources: []v1.VolumeProjection{
  1435. {
  1436. ServiceAccountToken: &v1.ServiceAccountTokenProjection{
  1437. Path: "token",
  1438. Audience: name,
  1439. },
  1440. },
  1441. {
  1442. ConfigMap: &v1.ConfigMapProjection{
  1443. LocalObjectReference: v1.LocalObjectReference{
  1444. Name: "kube-root-ca-crt",
  1445. },
  1446. Items: []v1.KeyToPath{
  1447. {
  1448. Key: "ca.crt",
  1449. Path: "ca.crt",
  1450. },
  1451. },
  1452. },
  1453. },
  1454. {
  1455. DownwardAPI: &v1.DownwardAPIProjection{
  1456. Items: []v1.DownwardAPIVolumeFile{
  1457. {
  1458. Path: "namespace",
  1459. FieldRef: &v1.ObjectFieldSelector{
  1460. APIVersion: "v1",
  1461. FieldPath: "metadata.namespace",
  1462. },
  1463. },
  1464. },
  1465. },
  1466. },
  1467. },
  1468. },
  1469. },
  1470. })
  1471. }
  1472. type DaemonConfig struct {
  1473. Client clientset.Interface
  1474. Name string
  1475. Namespace string
  1476. Image string
  1477. // If set this function will be used to print log lines instead of klog.
  1478. LogFunc func(fmt string, args ...interface{})
  1479. // How long we wait for DaemonSet to become running.
  1480. Timeout time.Duration
  1481. }
  1482. func (config *DaemonConfig) Run() error {
  1483. if config.Image == "" {
  1484. config.Image = "k8s.gcr.io/pause:3.1"
  1485. }
  1486. nameLabel := map[string]string{
  1487. "name": config.Name + "-daemon",
  1488. }
  1489. daemon := &apps.DaemonSet{
  1490. ObjectMeta: metav1.ObjectMeta{
  1491. Name: config.Name,
  1492. },
  1493. Spec: apps.DaemonSetSpec{
  1494. Template: v1.PodTemplateSpec{
  1495. ObjectMeta: metav1.ObjectMeta{
  1496. Labels: nameLabel,
  1497. },
  1498. Spec: v1.PodSpec{
  1499. Containers: []v1.Container{
  1500. {
  1501. Name: config.Name,
  1502. Image: config.Image,
  1503. },
  1504. },
  1505. },
  1506. },
  1507. },
  1508. }
  1509. if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
  1510. return fmt.Errorf("Error creating daemonset: %v", err)
  1511. }
  1512. var nodes *v1.NodeList
  1513. var err error
  1514. for i := 0; i < retries; i++ {
  1515. // Wait for all daemons to be running
  1516. nodes, err = config.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
  1517. if err == nil {
  1518. break
  1519. } else if i+1 == retries {
  1520. return fmt.Errorf("Error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
  1521. }
  1522. }
  1523. timeout := config.Timeout
  1524. if timeout <= 0 {
  1525. timeout = 5 * time.Minute
  1526. }
  1527. ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
  1528. if err != nil {
  1529. return err
  1530. }
  1531. defer ps.Stop()
  1532. err = wait.Poll(time.Second, timeout, func() (bool, error) {
  1533. pods := ps.List()
  1534. nodeHasDaemon := sets.NewString()
  1535. for _, pod := range pods {
  1536. podReady, _ := PodRunningReady(pod)
  1537. if pod.Spec.NodeName != "" && podReady {
  1538. nodeHasDaemon.Insert(pod.Spec.NodeName)
  1539. }
  1540. }
  1541. running := len(nodeHasDaemon)
  1542. config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
  1543. return running == len(nodes.Items), nil
  1544. })
  1545. if err != nil {
  1546. config.LogFunc("Timed out while waiting for DaemonSet %v/%v to be running.", config.Namespace, config.Name)
  1547. } else {
  1548. config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
  1549. }
  1550. return err
  1551. }