rolling_updater.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubectl
  14. import (
  15. "fmt"
  16. "io"
  17. "strconv"
  18. "strings"
  19. "time"
  20. corev1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/util/intstr"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  29. scaleclient "k8s.io/client-go/scale"
  30. "k8s.io/client-go/util/retry"
  31. "k8s.io/kubernetes/pkg/kubectl/util"
  32. deploymentutil "k8s.io/kubernetes/pkg/kubectl/util/deployment"
  33. "k8s.io/kubernetes/pkg/kubectl/util/podutils"
  34. "k8s.io/utils/integer"
  35. utilpointer "k8s.io/utils/pointer"
  36. )
  37. func valOrZero(val *int32) int32 {
  38. if val == nil {
  39. return int32(0)
  40. }
  41. return *val
  42. }
  43. const (
  44. kubectlAnnotationPrefix = "kubectl.kubernetes.io/"
  45. sourceIDAnnotation = kubectlAnnotationPrefix + "update-source-id"
  46. desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
  47. originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas"
  48. nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
  49. )
  50. // RollingUpdaterConfig is the configuration for a rolling deployment process.
  51. type RollingUpdaterConfig struct {
  52. // Out is a writer for progress output.
  53. Out io.Writer
  54. // OldRC is an existing controller to be replaced.
  55. OldRc *corev1.ReplicationController
  56. // NewRc is a controller that will take ownership of updated pods (will be
  57. // created if needed).
  58. NewRc *corev1.ReplicationController
  59. // UpdatePeriod is the time to wait between individual pod updates.
  60. UpdatePeriod time.Duration
  61. // Interval is the time to wait between polling controller status after
  62. // update.
  63. Interval time.Duration
  64. // Timeout is the time to wait for controller updates before giving up.
  65. Timeout time.Duration
  66. // MinReadySeconds is the number of seconds to wait after the pods are ready
  67. MinReadySeconds int32
  68. // CleanupPolicy defines the cleanup action to take after the deployment is
  69. // complete.
  70. CleanupPolicy RollingUpdaterCleanupPolicy
  71. // MaxUnavailable is the maximum number of pods that can be unavailable during the update.
  72. // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
  73. // Absolute number is calculated from percentage by rounding up.
  74. // This can not be 0 if MaxSurge is 0.
  75. // By default, a fixed value of 1 is used.
  76. // Example: when this is set to 30%, the old RC can be scaled down to 70% of desired pods
  77. // immediately when the rolling update starts. Once new pods are ready, old RC
  78. // can be scaled down further, followed by scaling up the new RC, ensuring
  79. // that the total number of pods available at all times during the update is at
  80. // least 70% of desired pods.
  81. MaxUnavailable intstr.IntOrString
  82. // MaxSurge is the maximum number of pods that can be scheduled above the desired number of pods.
  83. // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
  84. // This can not be 0 if MaxUnavailable is 0.
  85. // Absolute number is calculated from percentage by rounding up.
  86. // By default, a value of 1 is used.
  87. // Example: when this is set to 30%, the new RC can be scaled up immediately
  88. // when the rolling update starts, such that the total number of old and new pods do not exceed
  89. // 130% of desired pods. Once old pods have been killed, new RC can be scaled up
  90. // further, ensuring that total number of pods running at any time during
  91. // the update is at most 130% of desired pods.
  92. MaxSurge intstr.IntOrString
  93. // OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or
  94. // abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value
  95. // is a synthetic "progress" calculation that represents the approximate percentage completion.
  96. OnProgress func(oldRc, newRc *corev1.ReplicationController, percentage int) error
  97. }
  98. // RollingUpdaterCleanupPolicy is a cleanup action to take after the
  99. // deployment is complete.
  100. type RollingUpdaterCleanupPolicy string
  101. const (
  102. // DeleteRollingUpdateCleanupPolicy means delete the old controller.
  103. DeleteRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Delete"
  104. // PreserveRollingUpdateCleanupPolicy means keep the old controller.
  105. PreserveRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Preserve"
  106. // RenameRollingUpdateCleanupPolicy means delete the old controller, and rename
  107. // the new controller to the name of the old controller.
  108. RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
  109. )
  110. // RollingUpdater provides methods for updating replicated pods in a predictable,
  111. // fault-tolerant way.
  112. type RollingUpdater struct {
  113. rcClient corev1client.ReplicationControllersGetter
  114. podClient corev1client.PodsGetter
  115. scaleClient scaleclient.ScalesGetter
  116. // Namespace for resources
  117. ns string
  118. // scaleAndWait scales a controller and returns its updated state.
  119. scaleAndWait func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error)
  120. //getOrCreateTargetController gets and validates an existing controller or
  121. //makes a new one.
  122. getOrCreateTargetController func(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error)
  123. // cleanup performs post deployment cleanup tasks for newRc and oldRc.
  124. cleanup func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error
  125. // getReadyPods returns the amount of old and new ready pods.
  126. getReadyPods func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error)
  127. // nowFn returns the current time used to calculate the minReadySeconds
  128. nowFn func() metav1.Time
  129. }
  130. // NewRollingUpdater creates a RollingUpdater from a client.
  131. func NewRollingUpdater(namespace string, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
  132. updater := &RollingUpdater{
  133. rcClient: rcClient,
  134. podClient: podClient,
  135. scaleClient: sc,
  136. ns: namespace,
  137. }
  138. // Inject real implementations.
  139. updater.scaleAndWait = updater.scaleAndWaitWithScaler
  140. updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
  141. updater.getReadyPods = updater.readyPods
  142. updater.cleanup = updater.cleanupWithClients
  143. updater.nowFn = func() metav1.Time { return metav1.Now() }
  144. return updater
  145. }
  146. // Update all pods for a ReplicationController (oldRc) by creating a new
  147. // controller (newRc) with 0 replicas, and synchronously scaling oldRc and
  148. // newRc until oldRc has 0 replicas and newRc has the original # of desired
  149. // replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
  150. //
  151. // Each interval, the updater will attempt to make progress however it can
  152. // without violating any availability constraints defined by the config. This
  153. // means the amount scaled up or down each interval will vary based on the
  154. // timeliness of readiness and the updater will always try to make progress,
  155. // even slowly.
  156. //
  157. // If an update from newRc to oldRc is already in progress, we attempt to
  158. // drive it to completion. If an error occurs at any step of the update, the
  159. // error will be returned.
  160. //
  161. // A scaling event (either up or down) is considered progress; if no progress
  162. // is made within the config.Timeout, an error is returned.
  163. //
  164. // TODO: make this handle performing a rollback of a partially completed
  165. // rollout.
  166. func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
  167. out := config.Out
  168. oldRc := config.OldRc
  169. scaleRetryParams := NewRetryParams(config.Interval, config.Timeout)
  170. // Find an existing controller (for continuing an interrupted update) or
  171. // create a new one if necessary.
  172. sourceID := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID)
  173. newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceID)
  174. if err != nil {
  175. return err
  176. }
  177. if existed {
  178. fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name)
  179. } else {
  180. fmt.Fprintf(out, "Created %s\n", newRc.Name)
  181. }
  182. // Extract the desired replica count from the controller.
  183. desiredAnnotation, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation])
  184. if err != nil {
  185. return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
  186. newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation])
  187. }
  188. desired := int32(desiredAnnotation)
  189. // Extract the original replica count from the old controller, adding the
  190. // annotation if it doesn't yet exist.
  191. _, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation]
  192. if !hasOriginalAnnotation {
  193. existing, err := r.rcClient.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name, metav1.GetOptions{})
  194. if err != nil {
  195. return err
  196. }
  197. originReplicas := strconv.Itoa(int(valOrZero(existing.Spec.Replicas)))
  198. applyUpdate := func(rc *corev1.ReplicationController) {
  199. if rc.Annotations == nil {
  200. rc.Annotations = map[string]string{}
  201. }
  202. rc.Annotations[originalReplicasAnnotation] = originReplicas
  203. }
  204. if oldRc, err = updateRcWithRetries(r.rcClient, existing.Namespace, existing, applyUpdate); err != nil {
  205. return err
  206. }
  207. }
  208. // maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
  209. // that can be unavailable during a rollout.
  210. maxSurge, maxUnavailable, err := deploymentutil.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired)
  211. if err != nil {
  212. return err
  213. }
  214. // Validate maximums.
  215. if desired > 0 && maxUnavailable == 0 && maxSurge == 0 {
  216. return fmt.Errorf("one of maxSurge or maxUnavailable must be specified")
  217. }
  218. // The minimum pods which must remain available throughout the update
  219. // calculated for internal convenience.
  220. minAvailable := int32(integer.IntMax(0, int(desired-maxUnavailable)))
  221. // If the desired new scale is 0, then the max unavailable is necessarily
  222. // the effective scale of the old RC regardless of the configuration
  223. // (equivalent to 100% maxUnavailable).
  224. if desired == 0 {
  225. maxUnavailable = valOrZero(oldRc.Spec.Replicas)
  226. minAvailable = 0
  227. }
  228. fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
  229. newRc.Name, valOrZero(newRc.Spec.Replicas), desired, oldRc.Name, valOrZero(oldRc.Spec.Replicas), minAvailable, desired+maxSurge)
  230. // give a caller incremental notification and allow them to exit early
  231. goal := desired - valOrZero(newRc.Spec.Replicas)
  232. if goal < 0 {
  233. goal = -goal
  234. }
  235. progress := func(complete bool) error {
  236. if config.OnProgress == nil {
  237. return nil
  238. }
  239. progress := desired - valOrZero(newRc.Spec.Replicas)
  240. if progress < 0 {
  241. progress = -progress
  242. }
  243. percentage := 100
  244. if !complete && goal > 0 {
  245. percentage = int((goal - progress) * 100 / goal)
  246. }
  247. return config.OnProgress(oldRc, newRc, percentage)
  248. }
  249. // Scale newRc and oldRc until newRc has the desired number of replicas and
  250. // oldRc has 0 replicas.
  251. progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
  252. for valOrZero(newRc.Spec.Replicas) != desired || valOrZero(oldRc.Spec.Replicas) != 0 {
  253. // Store the existing replica counts for progress timeout tracking.
  254. newReplicas := valOrZero(newRc.Spec.Replicas)
  255. oldReplicas := valOrZero(oldRc.Spec.Replicas)
  256. // Scale up as much as possible.
  257. scaledRc, err := r.scaleUp(newRc, oldRc, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
  258. if err != nil {
  259. return err
  260. }
  261. newRc = scaledRc
  262. // notify the caller if necessary
  263. if err := progress(false); err != nil {
  264. return err
  265. }
  266. // Wait between scaling operations for things to settle.
  267. time.Sleep(config.UpdatePeriod)
  268. // Scale down as much as possible.
  269. scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config)
  270. if err != nil {
  271. return err
  272. }
  273. oldRc = scaledRc
  274. // notify the caller if necessary
  275. if err := progress(false); err != nil {
  276. return err
  277. }
  278. // If we are making progress, continue to advance the progress deadline.
  279. // Otherwise, time out with an error.
  280. progressMade := (valOrZero(newRc.Spec.Replicas) != newReplicas) || (valOrZero(oldRc.Spec.Replicas) != oldReplicas)
  281. if progressMade {
  282. progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
  283. } else if time.Now().UnixNano() > progressDeadline {
  284. return fmt.Errorf("timed out waiting for any update progress to be made")
  285. }
  286. }
  287. // notify the caller if necessary
  288. if err := progress(true); err != nil {
  289. return err
  290. }
  291. // Housekeeping and cleanup policy execution.
  292. return r.cleanup(oldRc, newRc, config)
  293. }
  294. // scaleUp scales up newRc to desired by whatever increment is possible given
  295. // the configured surge threshold. scaleUp will safely no-op as necessary when
  296. // it detects redundancy or other relevant conditions.
  297. func (r *RollingUpdater) scaleUp(newRc, oldRc *corev1.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
  298. // If we're already at the desired, do nothing.
  299. if valOrZero(newRc.Spec.Replicas) == desired {
  300. return newRc, nil
  301. }
  302. // Scale up as far as we can based on the surge limit.
  303. increment := (desired + maxSurge) - (valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas))
  304. // If the old is already scaled down, go ahead and scale all the way up.
  305. if valOrZero(oldRc.Spec.Replicas) == 0 {
  306. increment = desired - valOrZero(newRc.Spec.Replicas)
  307. }
  308. // We can't scale up without violating the surge limit, so do nothing.
  309. if increment <= 0 {
  310. return newRc, nil
  311. }
  312. // Increase the replica count, and deal with fenceposts.
  313. nextVal := valOrZero(newRc.Spec.Replicas) + increment
  314. newRc.Spec.Replicas = &nextVal
  315. if valOrZero(newRc.Spec.Replicas) > desired {
  316. newRc.Spec.Replicas = &desired
  317. }
  318. // Perform the scale-up.
  319. fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, valOrZero(newRc.Spec.Replicas))
  320. scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
  321. if err != nil {
  322. return nil, err
  323. }
  324. return scaledRc, nil
  325. }
  326. // scaleDown scales down oldRc to 0 at whatever decrement possible given the
  327. // thresholds defined on the config. scaleDown will safely no-op as necessary
  328. // when it detects redundancy or other relevant conditions.
  329. func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
  330. // Already scaled down; do nothing.
  331. if valOrZero(oldRc.Spec.Replicas) == 0 {
  332. return oldRc, nil
  333. }
  334. // Get ready pods. We shouldn't block, otherwise in case both old and new
  335. // pods are unavailable then the rolling update process blocks.
  336. // Timeout-wise we are already covered by the progress check.
  337. _, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds)
  338. if err != nil {
  339. return nil, err
  340. }
  341. // The old controller is considered as part of the total because we want to
  342. // maintain minimum availability even with a volatile old controller.
  343. // Scale down as much as possible while maintaining minimum availability
  344. allPods := valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas)
  345. newUnavailable := valOrZero(newRc.Spec.Replicas) - newAvailable
  346. decrement := allPods - minAvailable - newUnavailable
  347. // The decrement normally shouldn't drop below 0 because the available count
  348. // always starts below the old replica count, but the old replica count can
  349. // decrement due to externalities like pods death in the replica set. This
  350. // will be considered a transient condition; do nothing and try again later
  351. // with new readiness values.
  352. //
  353. // If the most we can scale is 0, it means we can't scale down without
  354. // violating the minimum. Do nothing and try again later when conditions may
  355. // have changed.
  356. if decrement <= 0 {
  357. return oldRc, nil
  358. }
  359. // Reduce the replica count, and deal with fenceposts.
  360. nextOldVal := valOrZero(oldRc.Spec.Replicas) - decrement
  361. oldRc.Spec.Replicas = &nextOldVal
  362. if valOrZero(oldRc.Spec.Replicas) < 0 {
  363. oldRc.Spec.Replicas = utilpointer.Int32Ptr(0)
  364. }
  365. // If the new is already fully scaled and available up to the desired size, go
  366. // ahead and scale old all the way down.
  367. if valOrZero(newRc.Spec.Replicas) == desired && newAvailable == desired {
  368. oldRc.Spec.Replicas = utilpointer.Int32Ptr(0)
  369. }
  370. // Perform the scale-down.
  371. fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, valOrZero(oldRc.Spec.Replicas))
  372. retryWait := &RetryParams{config.Interval, config.Timeout}
  373. scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return scaledRc, nil
  378. }
  379. // scalerScaleAndWait scales a controller using a Scaler and a real client.
  380. func (r *RollingUpdater) scaleAndWaitWithScaler(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
  381. scaler := NewScaler(r.scaleClient)
  382. if err := scaler.Scale(rc.Namespace, rc.Name, uint(valOrZero(rc.Spec.Replicas)), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil {
  383. return nil, err
  384. }
  385. return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{})
  386. }
  387. // readyPods returns the old and new ready counts for their pods.
  388. // If a pod is observed as being ready, it's considered ready even
  389. // if it later becomes notReady.
  390. func (r *RollingUpdater) readyPods(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
  391. controllers := []*corev1.ReplicationController{oldRc, newRc}
  392. oldReady := int32(0)
  393. newReady := int32(0)
  394. if r.nowFn == nil {
  395. r.nowFn = func() metav1.Time { return metav1.Now() }
  396. }
  397. for i := range controllers {
  398. controller := controllers[i]
  399. selector := labels.Set(controller.Spec.Selector).AsSelector()
  400. options := metav1.ListOptions{LabelSelector: selector.String()}
  401. pods, err := r.podClient.Pods(controller.Namespace).List(options)
  402. if err != nil {
  403. return 0, 0, err
  404. }
  405. for _, v1Pod := range pods.Items {
  406. // Do not count deleted pods as ready
  407. if v1Pod.DeletionTimestamp != nil {
  408. continue
  409. }
  410. if !podutils.IsPodAvailable(&v1Pod, minReadySeconds, r.nowFn()) {
  411. continue
  412. }
  413. switch controller.Name {
  414. case oldRc.Name:
  415. oldReady++
  416. case newRc.Name:
  417. newReady++
  418. }
  419. }
  420. }
  421. return oldReady, newReady, nil
  422. }
  423. // getOrCreateTargetControllerWithClient looks for an existing controller with
  424. // sourceID. If found, the existing controller is returned with true
  425. // indicating that the controller already exists. If the controller isn't
  426. // found, a new one is created and returned along with false indicating the
  427. // controller was created.
  428. //
  429. // Existing controllers are validated to ensure their sourceIDAnnotation
  430. // matches sourceID; if there's a mismatch, an error is returned.
  431. func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error) {
  432. existingRc, err := r.existingController(controller)
  433. if err != nil {
  434. if !errors.IsNotFound(err) {
  435. // There was an error trying to find the controller; don't assume we
  436. // should create it.
  437. return nil, false, err
  438. }
  439. if valOrZero(controller.Spec.Replicas) <= 0 {
  440. return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d", controller.Name, valOrZero(controller.Spec.Replicas))
  441. }
  442. // The controller wasn't found, so create it.
  443. if controller.Annotations == nil {
  444. controller.Annotations = map[string]string{}
  445. }
  446. controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", valOrZero(controller.Spec.Replicas))
  447. controller.Annotations[sourceIDAnnotation] = sourceID
  448. controller.Spec.Replicas = utilpointer.Int32Ptr(0)
  449. newRc, err := r.rcClient.ReplicationControllers(r.ns).Create(controller)
  450. return newRc, false, err
  451. }
  452. // Validate and use the existing controller.
  453. annotations := existingRc.Annotations
  454. source := annotations[sourceIDAnnotation]
  455. _, ok := annotations[desiredReplicasAnnotation]
  456. if source != sourceID || !ok {
  457. return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceID, annotations)
  458. }
  459. return existingRc, true, nil
  460. }
  461. // existingController verifies if the controller already exists
  462. func (r *RollingUpdater) existingController(controller *corev1.ReplicationController) (*corev1.ReplicationController, error) {
  463. // without rc name but generate name, there's no existing rc
  464. if len(controller.Name) == 0 && len(controller.GenerateName) > 0 {
  465. return nil, errors.NewNotFound(corev1.Resource("replicationcontrollers"), controller.Name)
  466. }
  467. // controller name is required to get rc back
  468. return r.rcClient.ReplicationControllers(controller.Namespace).Get(controller.Name, metav1.GetOptions{})
  469. }
  470. // cleanupWithClients performs cleanup tasks after the rolling update. Update
  471. // process related annotations are removed from oldRc and newRc. The
  472. // CleanupPolicy on config is executed.
  473. func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
  474. // Clean up annotations
  475. var err error
  476. newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name, metav1.GetOptions{})
  477. if err != nil {
  478. return err
  479. }
  480. applyUpdate := func(rc *corev1.ReplicationController) {
  481. delete(rc.Annotations, sourceIDAnnotation)
  482. delete(rc.Annotations, desiredReplicasAnnotation)
  483. }
  484. if newRc, err = updateRcWithRetries(r.rcClient, r.ns, newRc, applyUpdate); err != nil {
  485. return err
  486. }
  487. if err = wait.Poll(config.Interval, config.Timeout, ControllerHasDesiredReplicas(r.rcClient, newRc)); err != nil {
  488. return err
  489. }
  490. newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name, metav1.GetOptions{})
  491. if err != nil {
  492. return err
  493. }
  494. switch config.CleanupPolicy {
  495. case DeleteRollingUpdateCleanupPolicy:
  496. // delete old rc
  497. fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name)
  498. return r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil)
  499. case RenameRollingUpdateCleanupPolicy:
  500. // delete old rc
  501. fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name)
  502. if err := r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil); err != nil {
  503. return err
  504. }
  505. fmt.Fprintf(config.Out, "Renaming %s to %s\n", newRc.Name, oldRc.Name)
  506. return Rename(r.rcClient, newRc, oldRc.Name)
  507. case PreserveRollingUpdateCleanupPolicy:
  508. return nil
  509. default:
  510. return nil
  511. }
  512. }
  513. func Rename(c corev1client.ReplicationControllersGetter, rc *corev1.ReplicationController, newName string) error {
  514. oldName := rc.Name
  515. rc.Name = newName
  516. rc.ResourceVersion = ""
  517. // First delete the oldName RC and orphan its pods.
  518. policy := metav1.DeletePropagationOrphan
  519. err := c.ReplicationControllers(rc.Namespace).Delete(oldName, &metav1.DeleteOptions{PropagationPolicy: &policy})
  520. if err != nil && !errors.IsNotFound(err) {
  521. return err
  522. }
  523. err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  524. _, err := c.ReplicationControllers(rc.Namespace).Get(oldName, metav1.GetOptions{})
  525. if err == nil {
  526. return false, nil
  527. } else if errors.IsNotFound(err) {
  528. return true, nil
  529. } else {
  530. return false, err
  531. }
  532. })
  533. if err != nil {
  534. return err
  535. }
  536. // Then create the same RC with the new name.
  537. _, err = c.ReplicationControllers(rc.Namespace).Create(rc)
  538. return err
  539. }
  540. func LoadExistingNextReplicationController(c corev1client.ReplicationControllersGetter, namespace, newName string) (*corev1.ReplicationController, error) {
  541. if len(newName) == 0 {
  542. return nil, nil
  543. }
  544. newRc, err := c.ReplicationControllers(namespace).Get(newName, metav1.GetOptions{})
  545. if err != nil && errors.IsNotFound(err) {
  546. return nil, nil
  547. }
  548. return newRc, err
  549. }
  550. type NewControllerConfig struct {
  551. Namespace string
  552. OldName, NewName string
  553. Image string
  554. Container string
  555. DeploymentKey string
  556. PullPolicy corev1.PullPolicy
  557. }
  558. func CreateNewControllerFromCurrentController(rcClient corev1client.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*corev1.ReplicationController, error) {
  559. containerIndex := 0
  560. // load the old RC into the "new" RC
  561. newRc, err := rcClient.ReplicationControllers(cfg.Namespace).Get(cfg.OldName, metav1.GetOptions{})
  562. if err != nil {
  563. return nil, err
  564. }
  565. if len(cfg.Container) != 0 {
  566. containerFound := false
  567. for i, c := range newRc.Spec.Template.Spec.Containers {
  568. if c.Name == cfg.Container {
  569. containerIndex = i
  570. containerFound = true
  571. break
  572. }
  573. }
  574. if !containerFound {
  575. return nil, fmt.Errorf("container %s not found in pod", cfg.Container)
  576. }
  577. }
  578. if len(newRc.Spec.Template.Spec.Containers) > 1 && len(cfg.Container) == 0 {
  579. return nil, fmt.Errorf("must specify container to update when updating a multi-container pod")
  580. }
  581. if len(newRc.Spec.Template.Spec.Containers) == 0 {
  582. return nil, fmt.Errorf("pod has no containers! (%v)", newRc)
  583. }
  584. newRc.Spec.Template.Spec.Containers[containerIndex].Image = cfg.Image
  585. if len(cfg.PullPolicy) != 0 {
  586. newRc.Spec.Template.Spec.Containers[containerIndex].ImagePullPolicy = cfg.PullPolicy
  587. }
  588. newHash, err := util.HashObject(newRc, codec)
  589. if err != nil {
  590. return nil, err
  591. }
  592. if len(cfg.NewName) == 0 {
  593. cfg.NewName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
  594. }
  595. newRc.Name = cfg.NewName
  596. newRc.Spec.Selector[cfg.DeploymentKey] = newHash
  597. newRc.Spec.Template.Labels[cfg.DeploymentKey] = newHash
  598. // Clear resource version after hashing so that identical updates get different hashes.
  599. newRc.ResourceVersion = ""
  600. return newRc, nil
  601. }
  602. func AbortRollingUpdate(c *RollingUpdaterConfig) error {
  603. // Swap the controllers
  604. tmp := c.OldRc
  605. c.OldRc = c.NewRc
  606. c.NewRc = tmp
  607. if c.NewRc.Annotations == nil {
  608. c.NewRc.Annotations = map[string]string{}
  609. }
  610. c.NewRc.Annotations[sourceIDAnnotation] = fmt.Sprintf("%s:%s", c.OldRc.Name, c.OldRc.UID)
  611. // Use the original value since the replica count change from old to new
  612. // could be asymmetric. If we don't know the original count, we can't safely
  613. // roll back to a known good size.
  614. originalSize, foundOriginal := tmp.Annotations[originalReplicasAnnotation]
  615. if !foundOriginal {
  616. return fmt.Errorf("couldn't find original replica count of %q", tmp.Name)
  617. }
  618. fmt.Fprintf(c.Out, "Setting %q replicas to %s\n", c.NewRc.Name, originalSize)
  619. c.NewRc.Annotations[desiredReplicasAnnotation] = originalSize
  620. c.CleanupPolicy = DeleteRollingUpdateCleanupPolicy
  621. return nil
  622. }
  623. func GetNextControllerAnnotation(rc *corev1.ReplicationController) (string, bool) {
  624. res, found := rc.Annotations[nextControllerAnnotation]
  625. return res, found
  626. }
  627. func SetNextControllerAnnotation(rc *corev1.ReplicationController, name string) {
  628. if rc.Annotations == nil {
  629. rc.Annotations = map[string]string{}
  630. }
  631. rc.Annotations[nextControllerAnnotation] = name
  632. }
  633. func UpdateExistingReplicationController(rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, oldRc *corev1.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*corev1.ReplicationController, error) {
  634. if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
  635. SetNextControllerAnnotation(oldRc, newName)
  636. return AddDeploymentKeyToReplicationController(oldRc, rcClient, podClient, deploymentKey, deploymentValue, namespace, out)
  637. }
  638. // If we didn't need to update the controller for the deployment key, we still need to write
  639. // the "next" controller.
  640. applyUpdate := func(rc *corev1.ReplicationController) {
  641. SetNextControllerAnnotation(rc, newName)
  642. }
  643. return updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate)
  644. }
  645. func AddDeploymentKeyToReplicationController(oldRc *corev1.ReplicationController, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*corev1.ReplicationController, error) {
  646. var err error
  647. // First, update the template label. This ensures that any newly created pods will have the new label
  648. applyUpdate := func(rc *corev1.ReplicationController) {
  649. if rc.Spec.Template.Labels == nil {
  650. rc.Spec.Template.Labels = map[string]string{}
  651. }
  652. rc.Spec.Template.Labels[deploymentKey] = deploymentValue
  653. }
  654. if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
  655. return nil, err
  656. }
  657. // Update all pods managed by the rc to have the new hash label, so they are correctly adopted
  658. // TODO: extract the code from the label command and re-use it here.
  659. selector := labels.SelectorFromSet(oldRc.Spec.Selector)
  660. options := metav1.ListOptions{LabelSelector: selector.String()}
  661. podList, err := podClient.Pods(namespace).List(options)
  662. if err != nil {
  663. return nil, err
  664. }
  665. for ix := range podList.Items {
  666. pod := &podList.Items[ix]
  667. applyUpdate := func(p *corev1.Pod) {
  668. if p.Labels == nil {
  669. p.Labels = map[string]string{
  670. deploymentKey: deploymentValue,
  671. }
  672. } else {
  673. p.Labels[deploymentKey] = deploymentValue
  674. }
  675. }
  676. if pod, err = updatePodWithRetries(podClient, namespace, pod, applyUpdate); err != nil {
  677. return nil, err
  678. }
  679. }
  680. if oldRc.Spec.Selector == nil {
  681. oldRc.Spec.Selector = map[string]string{}
  682. }
  683. // Copy the old selector, so that we can scrub out any orphaned pods
  684. selectorCopy := map[string]string{}
  685. for k, v := range oldRc.Spec.Selector {
  686. selectorCopy[k] = v
  687. }
  688. applyUpdate = func(rc *corev1.ReplicationController) {
  689. rc.Spec.Selector[deploymentKey] = deploymentValue
  690. }
  691. // Update the selector of the rc so it manages all the pods we updated above
  692. if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
  693. return nil, err
  694. }
  695. // Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
  696. // doesn't see the update to its pod template and creates a new pod with the old labels after
  697. // we've finished re-adopting existing pods to the rc.
  698. selector = labels.SelectorFromSet(selectorCopy)
  699. options = metav1.ListOptions{LabelSelector: selector.String()}
  700. if podList, err = podClient.Pods(namespace).List(options); err != nil {
  701. return nil, err
  702. }
  703. for ix := range podList.Items {
  704. pod := &podList.Items[ix]
  705. if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue {
  706. if err := podClient.Pods(namespace).Delete(pod.Name, nil); err != nil {
  707. return nil, err
  708. }
  709. }
  710. }
  711. return oldRc, nil
  712. }
  713. type updateRcFunc func(controller *corev1.ReplicationController)
  714. // updateRcWithRetries retries updating the given rc on conflict with the following steps:
  715. // 1. Get latest resource
  716. // 2. applyUpdate
  717. // 3. Update the resource
  718. func updateRcWithRetries(rcClient corev1client.ReplicationControllersGetter, namespace string, rc *corev1.ReplicationController, applyUpdate updateRcFunc) (*corev1.ReplicationController, error) {
  719. // Deep copy the rc in case we failed on Get during retry loop
  720. oldRc := rc.DeepCopy()
  721. err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
  722. // Apply the update, then attempt to push it to the apiserver.
  723. applyUpdate(rc)
  724. if rc, e = rcClient.ReplicationControllers(namespace).Update(rc); e == nil {
  725. // rc contains the latest controller post update
  726. return
  727. }
  728. updateErr := e
  729. // Update the controller with the latest resource version, if the update failed we
  730. // can't trust rc so use oldRc.Name.
  731. if rc, e = rcClient.ReplicationControllers(namespace).Get(oldRc.Name, metav1.GetOptions{}); e != nil {
  732. // The Get failed: Value in rc cannot be trusted.
  733. rc = oldRc
  734. }
  735. // Only return the error from update
  736. return updateErr
  737. })
  738. // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
  739. // controller contains the applied update.
  740. return rc, err
  741. }
  742. type updatePodFunc func(controller *corev1.Pod)
  743. // updatePodWithRetries retries updating the given pod on conflict with the following steps:
  744. // 1. Get latest resource
  745. // 2. applyUpdate
  746. // 3. Update the resource
  747. func updatePodWithRetries(podClient corev1client.PodsGetter, namespace string, pod *corev1.Pod, applyUpdate updatePodFunc) (*corev1.Pod, error) {
  748. // Deep copy the pod in case we failed on Get during retry loop
  749. oldPod := pod.DeepCopy()
  750. err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
  751. // Apply the update, then attempt to push it to the apiserver.
  752. applyUpdate(pod)
  753. if pod, e = podClient.Pods(namespace).Update(pod); e == nil {
  754. return
  755. }
  756. updateErr := e
  757. if pod, e = podClient.Pods(namespace).Get(oldPod.Name, metav1.GetOptions{}); e != nil {
  758. pod = oldPod
  759. }
  760. // Only return the error from update
  761. return updateErr
  762. })
  763. // If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
  764. // controller contains the applied update.
  765. return pod, err
  766. }
  767. func FindSourceController(r corev1client.ReplicationControllersGetter, namespace, name string) (*corev1.ReplicationController, error) {
  768. list, err := r.ReplicationControllers(namespace).List(metav1.ListOptions{})
  769. if err != nil {
  770. return nil, err
  771. }
  772. for ix := range list.Items {
  773. rc := &list.Items[ix]
  774. if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIDAnnotation], name) {
  775. return rc, nil
  776. }
  777. }
  778. return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
  779. }