deployment_util.go 39 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "fmt"
  16. "math"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "k8s.io/klog"
  22. apps "k8s.io/api/apps/v1"
  23. v1 "k8s.io/api/core/v1"
  24. apiequality "k8s.io/apimachinery/pkg/api/equality"
  25. "k8s.io/apimachinery/pkg/api/meta"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/types"
  29. intstrutil "k8s.io/apimachinery/pkg/util/intstr"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
  32. "k8s.io/kubernetes/pkg/controller"
  33. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  34. "k8s.io/utils/integer"
  35. )
  36. const (
  37. // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
  38. RevisionAnnotation = "deployment.kubernetes.io/revision"
  39. // RevisionHistoryAnnotation maintains the history of all old revisions that a replica set has served for a deployment.
  40. RevisionHistoryAnnotation = "deployment.kubernetes.io/revision-history"
  41. // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation
  42. // in its replica sets. Helps in separating scaling events from the rollout process and for
  43. // determining if the new replica set for a deployment is really saturated.
  44. DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas"
  45. // MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which
  46. // is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their
  47. // proportions in case the deployment has surge replicas.
  48. MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas"
  49. // RollbackRevisionNotFound is not found rollback event reason
  50. RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound"
  51. // RollbackTemplateUnchanged is the template unchanged rollback event reason
  52. RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged"
  53. // RollbackDone is the done rollback event reason
  54. RollbackDone = "DeploymentRollback"
  55. // Reasons for deployment conditions
  56. //
  57. // Progressing:
  58. // ReplicaSetUpdatedReason is added in a deployment when one of its replica sets is updated as part
  59. // of the rollout process.
  60. ReplicaSetUpdatedReason = "ReplicaSetUpdated"
  61. // FailedRSCreateReason is added in a deployment when it cannot create a new replica set.
  62. FailedRSCreateReason = "ReplicaSetCreateError"
  63. // NewReplicaSetReason is added in a deployment when it creates a new replica set.
  64. NewReplicaSetReason = "NewReplicaSetCreated"
  65. // FoundNewRSReason is added in a deployment when it adopts an existing replica set.
  66. FoundNewRSReason = "FoundNewReplicaSet"
  67. // NewRSAvailableReason is added in a deployment when its newest replica set is made available
  68. // ie. the number of new pods that have passed readiness checks and run for at least minReadySeconds
  69. // is at least the minimum available pods that need to run for the deployment.
  70. NewRSAvailableReason = "NewReplicaSetAvailable"
  71. // TimedOutReason is added in a deployment when its newest replica set fails to show any progress
  72. // within the given deadline (progressDeadlineSeconds).
  73. TimedOutReason = "ProgressDeadlineExceeded"
  74. // PausedDeployReason is added in a deployment when it is paused. Lack of progress shouldn't be
  75. // estimated once a deployment is paused.
  76. PausedDeployReason = "DeploymentPaused"
  77. // ResumedDeployReason is added in a deployment when it is resumed. Useful for not failing accidentally
  78. // deployments that paused amidst a rollout and are bounded by a deadline.
  79. ResumedDeployReason = "DeploymentResumed"
  80. //
  81. // Available:
  82. // MinimumReplicasAvailable is added in a deployment when it has its minimum replicas required available.
  83. MinimumReplicasAvailable = "MinimumReplicasAvailable"
  84. // MinimumReplicasUnavailable is added in a deployment when it doesn't have the minimum required replicas
  85. // available.
  86. MinimumReplicasUnavailable = "MinimumReplicasUnavailable"
  87. )
  88. // NewDeploymentCondition creates a new deployment condition.
  89. func NewDeploymentCondition(condType apps.DeploymentConditionType, status v1.ConditionStatus, reason, message string) *apps.DeploymentCondition {
  90. return &apps.DeploymentCondition{
  91. Type: condType,
  92. Status: status,
  93. LastUpdateTime: metav1.Now(),
  94. LastTransitionTime: metav1.Now(),
  95. Reason: reason,
  96. Message: message,
  97. }
  98. }
  99. // GetDeploymentCondition returns the condition with the provided type.
  100. func GetDeploymentCondition(status apps.DeploymentStatus, condType apps.DeploymentConditionType) *apps.DeploymentCondition {
  101. for i := range status.Conditions {
  102. c := status.Conditions[i]
  103. if c.Type == condType {
  104. return &c
  105. }
  106. }
  107. return nil
  108. }
  109. // SetDeploymentCondition updates the deployment to include the provided condition. If the condition that
  110. // we are about to add already exists and has the same status and reason then we are not going to update.
  111. func SetDeploymentCondition(status *apps.DeploymentStatus, condition apps.DeploymentCondition) {
  112. currentCond := GetDeploymentCondition(*status, condition.Type)
  113. if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
  114. return
  115. }
  116. // Do not update lastTransitionTime if the status of the condition doesn't change.
  117. if currentCond != nil && currentCond.Status == condition.Status {
  118. condition.LastTransitionTime = currentCond.LastTransitionTime
  119. }
  120. newConditions := filterOutCondition(status.Conditions, condition.Type)
  121. status.Conditions = append(newConditions, condition)
  122. }
  123. // RemoveDeploymentCondition removes the deployment condition with the provided type.
  124. func RemoveDeploymentCondition(status *apps.DeploymentStatus, condType apps.DeploymentConditionType) {
  125. status.Conditions = filterOutCondition(status.Conditions, condType)
  126. }
  127. // filterOutCondition returns a new slice of deployment conditions without conditions with the provided type.
  128. func filterOutCondition(conditions []apps.DeploymentCondition, condType apps.DeploymentConditionType) []apps.DeploymentCondition {
  129. var newConditions []apps.DeploymentCondition
  130. for _, c := range conditions {
  131. if c.Type == condType {
  132. continue
  133. }
  134. newConditions = append(newConditions, c)
  135. }
  136. return newConditions
  137. }
  138. // ReplicaSetToDeploymentCondition converts a replica set condition into a deployment condition.
  139. // Useful for promoting replica set failure conditions into deployments.
  140. func ReplicaSetToDeploymentCondition(cond apps.ReplicaSetCondition) apps.DeploymentCondition {
  141. return apps.DeploymentCondition{
  142. Type: apps.DeploymentConditionType(cond.Type),
  143. Status: cond.Status,
  144. LastTransitionTime: cond.LastTransitionTime,
  145. LastUpdateTime: cond.LastTransitionTime,
  146. Reason: cond.Reason,
  147. Message: cond.Message,
  148. }
  149. }
  150. // SetDeploymentRevision updates the revision for a deployment.
  151. func SetDeploymentRevision(deployment *apps.Deployment, revision string) bool {
  152. updated := false
  153. if deployment.Annotations == nil {
  154. deployment.Annotations = make(map[string]string)
  155. }
  156. if deployment.Annotations[RevisionAnnotation] != revision {
  157. deployment.Annotations[RevisionAnnotation] = revision
  158. updated = true
  159. }
  160. return updated
  161. }
  162. // MaxRevision finds the highest revision in the replica sets
  163. func MaxRevision(allRSs []*apps.ReplicaSet) int64 {
  164. max := int64(0)
  165. for _, rs := range allRSs {
  166. if v, err := Revision(rs); err != nil {
  167. // Skip the replica sets when it failed to parse their revision information
  168. klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
  169. } else if v > max {
  170. max = v
  171. }
  172. }
  173. return max
  174. }
  175. // LastRevision finds the second max revision number in all replica sets (the last revision)
  176. func LastRevision(allRSs []*apps.ReplicaSet) int64 {
  177. max, secMax := int64(0), int64(0)
  178. for _, rs := range allRSs {
  179. if v, err := Revision(rs); err != nil {
  180. // Skip the replica sets when it failed to parse their revision information
  181. klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
  182. } else if v >= max {
  183. secMax = max
  184. max = v
  185. } else if v > secMax {
  186. secMax = v
  187. }
  188. }
  189. return secMax
  190. }
  191. // Revision returns the revision number of the input object.
  192. func Revision(obj runtime.Object) (int64, error) {
  193. acc, err := meta.Accessor(obj)
  194. if err != nil {
  195. return 0, err
  196. }
  197. v, ok := acc.GetAnnotations()[RevisionAnnotation]
  198. if !ok {
  199. return 0, nil
  200. }
  201. return strconv.ParseInt(v, 10, 64)
  202. }
  203. // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
  204. // copying required deployment annotations to it; it returns true if replica set's annotation is changed.
  205. func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool, revHistoryLimitInChars int) bool {
  206. // First, copy deployment's annotations (except for apply and revision annotations)
  207. annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
  208. // Then, update replica set's revision annotation
  209. if newRS.Annotations == nil {
  210. newRS.Annotations = make(map[string]string)
  211. }
  212. oldRevision, ok := newRS.Annotations[RevisionAnnotation]
  213. // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
  214. // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
  215. // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
  216. oldRevisionInt, err := strconv.ParseInt(oldRevision, 10, 64)
  217. if err != nil {
  218. if oldRevision != "" {
  219. klog.Warningf("Updating replica set revision OldRevision not int %s", err)
  220. return false
  221. }
  222. //If the RS annotation is empty then initialise it to 0
  223. oldRevisionInt = 0
  224. }
  225. newRevisionInt, err := strconv.ParseInt(newRevision, 10, 64)
  226. if err != nil {
  227. klog.Warningf("Updating replica set revision NewRevision not int %s", err)
  228. return false
  229. }
  230. if oldRevisionInt < newRevisionInt {
  231. newRS.Annotations[RevisionAnnotation] = newRevision
  232. annotationChanged = true
  233. klog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
  234. }
  235. // If a revision annotation already existed and this replica set was updated with a new revision
  236. // then that means we are rolling back to this replica set. We need to preserve the old revisions
  237. // for historical information.
  238. if ok && oldRevisionInt < newRevisionInt {
  239. revisionHistoryAnnotation := newRS.Annotations[RevisionHistoryAnnotation]
  240. oldRevisions := strings.Split(revisionHistoryAnnotation, ",")
  241. if len(oldRevisions[0]) == 0 {
  242. newRS.Annotations[RevisionHistoryAnnotation] = oldRevision
  243. } else {
  244. totalLen := len(revisionHistoryAnnotation) + len(oldRevision) + 1
  245. // index for the starting position in oldRevisions
  246. start := 0
  247. for totalLen > revHistoryLimitInChars && start < len(oldRevisions) {
  248. totalLen = totalLen - len(oldRevisions[start]) - 1
  249. start++
  250. }
  251. if totalLen <= revHistoryLimitInChars {
  252. oldRevisions = append(oldRevisions[start:], oldRevision)
  253. newRS.Annotations[RevisionHistoryAnnotation] = strings.Join(oldRevisions, ",")
  254. } else {
  255. klog.Warningf("Not appending revision due to length limit of %v reached", revHistoryLimitInChars)
  256. }
  257. }
  258. }
  259. // If the new replica set is about to be created, we need to add replica annotations to it.
  260. if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(*deployment)) {
  261. annotationChanged = true
  262. }
  263. return annotationChanged
  264. }
  265. var annotationsToSkip = map[string]bool{
  266. v1.LastAppliedConfigAnnotation: true,
  267. RevisionAnnotation: true,
  268. RevisionHistoryAnnotation: true,
  269. DesiredReplicasAnnotation: true,
  270. MaxReplicasAnnotation: true,
  271. apps.DeprecatedRollbackTo: true,
  272. }
  273. // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
  274. // TODO: How to decide which annotations should / should not be copied?
  275. // See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
  276. func skipCopyAnnotation(key string) bool {
  277. return annotationsToSkip[key]
  278. }
  279. // copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations,
  280. // and returns true if replica set's annotation is changed.
  281. // Note that apply and revision annotations are not copied.
  282. func copyDeploymentAnnotationsToReplicaSet(deployment *apps.Deployment, rs *apps.ReplicaSet) bool {
  283. rsAnnotationsChanged := false
  284. if rs.Annotations == nil {
  285. rs.Annotations = make(map[string]string)
  286. }
  287. for k, v := range deployment.Annotations {
  288. // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated
  289. // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of
  290. // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable.
  291. if skipCopyAnnotation(k) || rs.Annotations[k] == v {
  292. continue
  293. }
  294. rs.Annotations[k] = v
  295. rsAnnotationsChanged = true
  296. }
  297. return rsAnnotationsChanged
  298. }
  299. // SetDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations.
  300. // This action should be done if and only if the deployment is rolling back to this rs.
  301. // Note that apply and revision annotations are not changed.
  302. func SetDeploymentAnnotationsTo(deployment *apps.Deployment, rollbackToRS *apps.ReplicaSet) {
  303. deployment.Annotations = getSkippedAnnotations(deployment.Annotations)
  304. for k, v := range rollbackToRS.Annotations {
  305. if !skipCopyAnnotation(k) {
  306. deployment.Annotations[k] = v
  307. }
  308. }
  309. }
  310. func getSkippedAnnotations(annotations map[string]string) map[string]string {
  311. skippedAnnotations := make(map[string]string)
  312. for k, v := range annotations {
  313. if skipCopyAnnotation(k) {
  314. skippedAnnotations[k] = v
  315. }
  316. }
  317. return skippedAnnotations
  318. }
  319. // FindActiveOrLatest returns the only active or the latest replica set in case there is at most one active
  320. // replica set. If there are more active replica sets, then we should proportionally scale them.
  321. func FindActiveOrLatest(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) *apps.ReplicaSet {
  322. if newRS == nil && len(oldRSs) == 0 {
  323. return nil
  324. }
  325. sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
  326. allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
  327. switch len(allRSs) {
  328. case 0:
  329. // If there is no active replica set then we should return the newest.
  330. if newRS != nil {
  331. return newRS
  332. }
  333. return oldRSs[0]
  334. case 1:
  335. return allRSs[0]
  336. default:
  337. return nil
  338. }
  339. }
  340. // GetDesiredReplicasAnnotation returns the number of desired replicas
  341. func GetDesiredReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) {
  342. return getIntFromAnnotation(rs, DesiredReplicasAnnotation)
  343. }
  344. func getMaxReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) {
  345. return getIntFromAnnotation(rs, MaxReplicasAnnotation)
  346. }
  347. func getIntFromAnnotation(rs *apps.ReplicaSet, annotationKey string) (int32, bool) {
  348. annotationValue, ok := rs.Annotations[annotationKey]
  349. if !ok {
  350. return int32(0), false
  351. }
  352. intValue, err := strconv.Atoi(annotationValue)
  353. if err != nil {
  354. klog.V(2).Infof("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, annotationKey, rs.Name)
  355. return int32(0), false
  356. }
  357. return int32(intValue), true
  358. }
  359. // SetReplicasAnnotations sets the desiredReplicas and maxReplicas into the annotations
  360. func SetReplicasAnnotations(rs *apps.ReplicaSet, desiredReplicas, maxReplicas int32) bool {
  361. updated := false
  362. if rs.Annotations == nil {
  363. rs.Annotations = make(map[string]string)
  364. }
  365. desiredString := fmt.Sprintf("%d", desiredReplicas)
  366. if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString {
  367. rs.Annotations[DesiredReplicasAnnotation] = desiredString
  368. updated = true
  369. }
  370. maxString := fmt.Sprintf("%d", maxReplicas)
  371. if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString {
  372. rs.Annotations[MaxReplicasAnnotation] = maxString
  373. updated = true
  374. }
  375. return updated
  376. }
  377. // ReplicasAnnotationsNeedUpdate return true if ReplicasAnnotations need to be updated
  378. func ReplicasAnnotationsNeedUpdate(rs *apps.ReplicaSet, desiredReplicas, maxReplicas int32) bool {
  379. if rs.Annotations == nil {
  380. return true
  381. }
  382. desiredString := fmt.Sprintf("%d", desiredReplicas)
  383. if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString {
  384. return true
  385. }
  386. maxString := fmt.Sprintf("%d", maxReplicas)
  387. if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString {
  388. return true
  389. }
  390. return false
  391. }
  392. // MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
  393. func MaxUnavailable(deployment apps.Deployment) int32 {
  394. if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 {
  395. return int32(0)
  396. }
  397. // Error caught by validation
  398. _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
  399. if maxUnavailable > *deployment.Spec.Replicas {
  400. return *deployment.Spec.Replicas
  401. }
  402. return maxUnavailable
  403. }
  404. // MinAvailable returns the minimum available pods of a given deployment
  405. func MinAvailable(deployment *apps.Deployment) int32 {
  406. if !IsRollingUpdate(deployment) {
  407. return int32(0)
  408. }
  409. return *(deployment.Spec.Replicas) - MaxUnavailable(*deployment)
  410. }
  411. // MaxSurge returns the maximum surge pods a rolling deployment can take.
  412. func MaxSurge(deployment apps.Deployment) int32 {
  413. if !IsRollingUpdate(&deployment) {
  414. return int32(0)
  415. }
  416. // Error caught by validation
  417. maxSurge, _, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
  418. return maxSurge
  419. }
  420. // GetProportion will estimate the proportion for the provided replica set using 1. the current size
  421. // of the parent deployment, 2. the replica count that needs be added on the replica sets of the
  422. // deployment, and 3. the total replicas added in the replica sets of the deployment so far.
  423. func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
  424. if rs == nil || *(rs.Spec.Replicas) == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded {
  425. return int32(0)
  426. }
  427. rsFraction := getReplicaSetFraction(*rs, d)
  428. allowed := deploymentReplicasToAdd - deploymentReplicasAdded
  429. if deploymentReplicasToAdd > 0 {
  430. // Use the minimum between the replica set fraction and the maximum allowed replicas
  431. // when scaling up. This way we ensure we will not scale up more than the allowed
  432. // replicas we can add.
  433. return integer.Int32Min(rsFraction, allowed)
  434. }
  435. // Use the maximum between the replica set fraction and the maximum allowed replicas
  436. // when scaling down. This way we ensure we will not scale down more than the allowed
  437. // replicas we can remove.
  438. return integer.Int32Max(rsFraction, allowed)
  439. }
  440. // getReplicaSetFraction estimates the fraction of replicas a replica set can have in
  441. // 1. a scaling event during a rollout or 2. when scaling a paused deployment.
  442. func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment) int32 {
  443. // If we are scaling down to zero then the fraction of this replica set is its whole size (negative)
  444. if *(d.Spec.Replicas) == int32(0) {
  445. return -*(rs.Spec.Replicas)
  446. }
  447. deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(d)
  448. annotatedReplicas, ok := getMaxReplicasAnnotation(&rs)
  449. if !ok {
  450. // If we cannot find the annotation then fallback to the current deployment size. Note that this
  451. // will not be an accurate proportion estimation in case other replica sets have different values
  452. // which means that the deployment was scaled at some point but we at least will stay in limits
  453. // due to the min-max comparisons in getProportion.
  454. annotatedReplicas = d.Status.Replicas
  455. }
  456. // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas
  457. // will never be zero here.
  458. newRSsize := (float64(*(rs.Spec.Replicas) * deploymentReplicas)) / float64(annotatedReplicas)
  459. return integer.RoundToInt32(newRSsize) - *(rs.Spec.Replicas)
  460. }
  461. // GetAllReplicaSets returns the old and new replica sets targeted by the given Deployment. It gets PodList and ReplicaSetList from client interface.
  462. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  463. // The third returned value is the new replica set, and it may be nil if it doesn't exist yet.
  464. func GetAllReplicaSets(deployment *apps.Deployment, c appsclient.AppsV1Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, *apps.ReplicaSet, error) {
  465. rsList, err := ListReplicaSets(deployment, RsListFromClient(c))
  466. if err != nil {
  467. return nil, nil, nil, err
  468. }
  469. oldRSes, allOldRSes := FindOldReplicaSets(deployment, rsList)
  470. newRS := FindNewReplicaSet(deployment, rsList)
  471. return oldRSes, allOldRSes, newRS, nil
  472. }
  473. // GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface.
  474. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  475. func GetOldReplicaSets(deployment *apps.Deployment, c appsclient.AppsV1Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, error) {
  476. rsList, err := ListReplicaSets(deployment, RsListFromClient(c))
  477. if err != nil {
  478. return nil, nil, err
  479. }
  480. oldRSes, allOldRSes := FindOldReplicaSets(deployment, rsList)
  481. return oldRSes, allOldRSes, nil
  482. }
  483. // GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
  484. // Returns nil if the new replica set doesn't exist yet.
  485. func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) {
  486. rsList, err := ListReplicaSets(deployment, RsListFromClient(c))
  487. if err != nil {
  488. return nil, err
  489. }
  490. return FindNewReplicaSet(deployment, rsList), nil
  491. }
  492. // RsListFromClient returns an rsListFunc that wraps the given client.
  493. func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc {
  494. return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) {
  495. rsList, err := c.ReplicaSets(namespace).List(options)
  496. if err != nil {
  497. return nil, err
  498. }
  499. var ret []*apps.ReplicaSet
  500. for i := range rsList.Items {
  501. ret = append(ret, &rsList.Items[i])
  502. }
  503. return ret, err
  504. }
  505. }
  506. // TODO: switch RsListFunc and podListFunc to full namespacers
  507. // RsListFunc returns the ReplicaSet from the ReplicaSet namespace and the List metav1.ListOptions.
  508. type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error)
  509. // podListFunc returns the PodList from the Pod namespace and the List metav1.ListOptions.
  510. type podListFunc func(string, metav1.ListOptions) (*v1.PodList, error)
  511. // ListReplicaSets returns a slice of RSes the given deployment targets.
  512. // Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
  513. // because only the controller itself should do that.
  514. // However, it does filter out anything whose ControllerRef doesn't match.
  515. func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) {
  516. // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
  517. // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
  518. namespace := deployment.Namespace
  519. selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  520. if err != nil {
  521. return nil, err
  522. }
  523. options := metav1.ListOptions{LabelSelector: selector.String()}
  524. all, err := getRSList(namespace, options)
  525. if err != nil {
  526. return nil, err
  527. }
  528. // Only include those whose ControllerRef matches the Deployment.
  529. owned := make([]*apps.ReplicaSet, 0, len(all))
  530. for _, rs := range all {
  531. if metav1.IsControlledBy(rs, deployment) {
  532. owned = append(owned, rs)
  533. }
  534. }
  535. return owned, nil
  536. }
  537. // ListPods returns a list of pods the given deployment targets.
  538. // This needs a list of ReplicaSets for the Deployment,
  539. // which can be found with ListReplicaSets().
  540. // Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
  541. // because only the controller itself should do that.
  542. // However, it does filter out anything whose ControllerRef doesn't match.
  543. func ListPods(deployment *apps.Deployment, rsList []*apps.ReplicaSet, getPodList podListFunc) (*v1.PodList, error) {
  544. namespace := deployment.Namespace
  545. selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  546. if err != nil {
  547. return nil, err
  548. }
  549. options := metav1.ListOptions{LabelSelector: selector.String()}
  550. all, err := getPodList(namespace, options)
  551. if err != nil {
  552. return all, err
  553. }
  554. // Only include those whose ControllerRef points to a ReplicaSet that is in
  555. // turn owned by this Deployment.
  556. rsMap := make(map[types.UID]bool, len(rsList))
  557. for _, rs := range rsList {
  558. rsMap[rs.UID] = true
  559. }
  560. owned := &v1.PodList{Items: make([]v1.Pod, 0, len(all.Items))}
  561. for i := range all.Items {
  562. pod := &all.Items[i]
  563. controllerRef := metav1.GetControllerOf(pod)
  564. if controllerRef != nil && rsMap[controllerRef.UID] {
  565. owned.Items = append(owned.Items, *pod)
  566. }
  567. }
  568. return owned, nil
  569. }
  570. // EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
  571. // We ignore pod-template-hash because:
  572. // 1. The hash result would be different upon podTemplateSpec API changes
  573. // (e.g. the addition of a new field will cause the hash code to change)
  574. // 2. The deployment template won't have hash labels
  575. func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool {
  576. t1Copy := template1.DeepCopy()
  577. t2Copy := template2.DeepCopy()
  578. // Remove hash labels from template.Labels before comparing
  579. delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
  580. delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
  581. return apiequality.Semantic.DeepEqual(t1Copy, t2Copy)
  582. }
  583. // FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
  584. func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet {
  585. sort.Sort(controller.ReplicaSetsByCreationTimestamp(rsList))
  586. for i := range rsList {
  587. if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
  588. // In rare cases, such as after cluster upgrades, Deployment may end up with
  589. // having more than one new ReplicaSets that have the same template as its template,
  590. // see https://github.com/kubernetes/kubernetes/issues/40415
  591. // We deterministically choose the oldest new ReplicaSet.
  592. return rsList[i]
  593. }
  594. }
  595. // new ReplicaSet does not exist.
  596. return nil
  597. }
  598. // FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given slice of RSes.
  599. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  600. func FindOldReplicaSets(deployment *apps.Deployment, rsList []*apps.ReplicaSet) ([]*apps.ReplicaSet, []*apps.ReplicaSet) {
  601. var requiredRSs []*apps.ReplicaSet
  602. var allRSs []*apps.ReplicaSet
  603. newRS := FindNewReplicaSet(deployment, rsList)
  604. for _, rs := range rsList {
  605. // Filter out new replica set
  606. if newRS != nil && rs.UID == newRS.UID {
  607. continue
  608. }
  609. allRSs = append(allRSs, rs)
  610. if *(rs.Spec.Replicas) != 0 {
  611. requiredRSs = append(requiredRSs, rs)
  612. }
  613. }
  614. return requiredRSs, allRSs
  615. }
  616. // SetFromReplicaSetTemplate sets the desired PodTemplateSpec from a replica set template to the given deployment.
  617. func SetFromReplicaSetTemplate(deployment *apps.Deployment, template v1.PodTemplateSpec) *apps.Deployment {
  618. deployment.Spec.Template.ObjectMeta = template.ObjectMeta
  619. deployment.Spec.Template.Spec = template.Spec
  620. deployment.Spec.Template.ObjectMeta.Labels = labelsutil.CloneAndRemoveLabel(
  621. deployment.Spec.Template.ObjectMeta.Labels,
  622. apps.DefaultDeploymentUniqueLabelKey)
  623. return deployment
  624. }
  625. // GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets.
  626. func GetReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 {
  627. totalReplicas := int32(0)
  628. for _, rs := range replicaSets {
  629. if rs != nil {
  630. totalReplicas += *(rs.Spec.Replicas)
  631. }
  632. }
  633. return totalReplicas
  634. }
  635. // GetActualReplicaCountForReplicaSets returns the sum of actual replicas of the given replica sets.
  636. func GetActualReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 {
  637. totalActualReplicas := int32(0)
  638. for _, rs := range replicaSets {
  639. if rs != nil {
  640. totalActualReplicas += rs.Status.Replicas
  641. }
  642. }
  643. return totalActualReplicas
  644. }
  645. // GetReadyReplicaCountForReplicaSets returns the number of ready pods corresponding to the given replica sets.
  646. func GetReadyReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 {
  647. totalReadyReplicas := int32(0)
  648. for _, rs := range replicaSets {
  649. if rs != nil {
  650. totalReadyReplicas += rs.Status.ReadyReplicas
  651. }
  652. }
  653. return totalReadyReplicas
  654. }
  655. // GetAvailableReplicaCountForReplicaSets returns the number of available pods corresponding to the given replica sets.
  656. func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int32 {
  657. totalAvailableReplicas := int32(0)
  658. for _, rs := range replicaSets {
  659. if rs != nil {
  660. totalAvailableReplicas += rs.Status.AvailableReplicas
  661. }
  662. }
  663. return totalAvailableReplicas
  664. }
  665. // IsRollingUpdate returns true if the strategy type is a rolling update.
  666. func IsRollingUpdate(deployment *apps.Deployment) bool {
  667. return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType
  668. }
  669. // DeploymentComplete considers a deployment to be complete once all of its desired replicas
  670. // are updated and available, and no old pods are running.
  671. func DeploymentComplete(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool {
  672. return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) &&
  673. newStatus.Replicas == *(deployment.Spec.Replicas) &&
  674. newStatus.AvailableReplicas == *(deployment.Spec.Replicas) &&
  675. newStatus.ObservedGeneration >= deployment.Generation
  676. }
  677. // DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the
  678. // current with the new status of the deployment that the controller is observing. More specifically,
  679. // when new pods are scaled up or become ready or available, or old pods are scaled down, then we
  680. // consider the deployment is progressing.
  681. func DeploymentProgressing(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool {
  682. oldStatus := deployment.Status
  683. // Old replicas that need to be scaled down
  684. oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas
  685. newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas
  686. return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) ||
  687. (newStatusOldReplicas < oldStatusOldReplicas) ||
  688. newStatus.ReadyReplicas > deployment.Status.ReadyReplicas ||
  689. newStatus.AvailableReplicas > deployment.Status.AvailableReplicas
  690. }
  691. // used for unit testing
  692. var nowFn = func() time.Time { return time.Now() }
  693. // DeploymentTimedOut considers a deployment to have timed out once its condition that reports progress
  694. // is older than progressDeadlineSeconds or a Progressing condition with a TimedOutReason reason already
  695. // exists.
  696. func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool {
  697. if !HasProgressDeadline(deployment) {
  698. return false
  699. }
  700. // Look for the Progressing condition. If it doesn't exist, we have no base to estimate progress.
  701. // If it's already set with a TimedOutReason reason, we have already timed out, no need to check
  702. // again.
  703. condition := GetDeploymentCondition(*newStatus, apps.DeploymentProgressing)
  704. if condition == nil {
  705. return false
  706. }
  707. // If the previous condition has been a successful rollout then we shouldn't try to
  708. // estimate any progress. Scenario:
  709. //
  710. // * progressDeadlineSeconds is smaller than the difference between now and the time
  711. // the last rollout finished in the past.
  712. // * the creation of a new ReplicaSet triggers a resync of the Deployment prior to the
  713. // cached copy of the Deployment getting updated with the status.condition that indicates
  714. // the creation of the new ReplicaSet.
  715. //
  716. // The Deployment will be resynced and eventually its Progressing condition will catch
  717. // up with the state of the world.
  718. if condition.Reason == NewRSAvailableReason {
  719. return false
  720. }
  721. if condition.Reason == TimedOutReason {
  722. return true
  723. }
  724. // Look at the difference in seconds between now and the last time we reported any
  725. // progress or tried to create a replica set, or resumed a paused deployment and
  726. // compare against progressDeadlineSeconds.
  727. from := condition.LastUpdateTime
  728. now := nowFn()
  729. delta := time.Duration(*deployment.Spec.ProgressDeadlineSeconds) * time.Second
  730. timedOut := from.Add(delta).Before(now)
  731. klog.V(4).Infof("Deployment %q timed out (%t) [last progress check: %v - now: %v]", deployment.Name, timedOut, from, now)
  732. return timedOut
  733. }
  734. // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have.
  735. // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
  736. // 1) The new RS is saturated: newRS's replicas == deployment's replicas
  737. // 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
  738. func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
  739. switch deployment.Spec.Strategy.Type {
  740. case apps.RollingUpdateDeploymentStrategyType:
  741. // Check if we can scale up.
  742. maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
  743. if err != nil {
  744. return 0, err
  745. }
  746. // Find the total number of pods
  747. currentPodCount := GetReplicaCountForReplicaSets(allRSs)
  748. maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
  749. if currentPodCount >= maxTotalPods {
  750. // Cannot scale up.
  751. return *(newRS.Spec.Replicas), nil
  752. }
  753. // Scale up.
  754. scaleUpCount := maxTotalPods - currentPodCount
  755. // Do not exceed the number of desired replicas.
  756. scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
  757. return *(newRS.Spec.Replicas) + scaleUpCount, nil
  758. case apps.RecreateDeploymentStrategyType:
  759. return *(deployment.Spec.Replicas), nil
  760. default:
  761. return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
  762. }
  763. }
  764. // IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size.
  765. // Both the deployment and the replica set have to believe this replica set can own all of the desired
  766. // replicas in the deployment and the annotation helps in achieving that. All pods of the ReplicaSet
  767. // need to be available.
  768. func IsSaturated(deployment *apps.Deployment, rs *apps.ReplicaSet) bool {
  769. if rs == nil {
  770. return false
  771. }
  772. desiredString := rs.Annotations[DesiredReplicasAnnotation]
  773. desired, err := strconv.Atoi(desiredString)
  774. if err != nil {
  775. return false
  776. }
  777. return *(rs.Spec.Replicas) == *(deployment.Spec.Replicas) &&
  778. int32(desired) == *(deployment.Spec.Replicas) &&
  779. rs.Status.AvailableReplicas == *(deployment.Spec.Replicas)
  780. }
  781. // WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
  782. // Returns error if polling timesout.
  783. func WaitForObservedDeployment(getDeploymentFunc func() (*apps.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
  784. // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
  785. return wait.PollImmediate(interval, timeout, func() (bool, error) {
  786. deployment, err := getDeploymentFunc()
  787. if err != nil {
  788. return false, err
  789. }
  790. return deployment.Status.ObservedGeneration >= desiredGeneration, nil
  791. })
  792. }
  793. // ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
  794. // step. For example:
  795. //
  796. // 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1)
  797. // 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1)
  798. // 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
  799. // 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1)
  800. // 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
  801. // 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1)
  802. func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) {
  803. surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true)
  804. if err != nil {
  805. return 0, 0, err
  806. }
  807. unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false)
  808. if err != nil {
  809. return 0, 0, err
  810. }
  811. if surge == 0 && unavailable == 0 {
  812. // Validation should never allow the user to explicitly use zero values for both maxSurge
  813. // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero.
  814. // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the
  815. // theory that surge might not work due to quota.
  816. unavailable = 1
  817. }
  818. return int32(surge), int32(unavailable), nil
  819. }
  820. // HasProgressDeadline checks if the Deployment d is expected to surface the reason
  821. // "ProgressDeadlineExceeded" when the Deployment progress takes longer than expected time.
  822. func HasProgressDeadline(d *apps.Deployment) bool {
  823. return d.Spec.ProgressDeadlineSeconds != nil && *d.Spec.ProgressDeadlineSeconds != math.MaxInt32
  824. }
  825. // HasRevisionHistoryLimit checks if the Deployment d is expected to keep a specified number of
  826. // old replicaSets. These replicaSets are mainly kept with the purpose of rollback.
  827. // The RevisionHistoryLimit can start from 0 (no retained replicasSet). When set to math.MaxInt32,
  828. // the Deployment will keep all revisions.
  829. func HasRevisionHistoryLimit(d *apps.Deployment) bool {
  830. return d.Spec.RevisionHistoryLimit != nil && *d.Spec.RevisionHistoryLimit != math.MaxInt32
  831. }