deployment.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. "github.com/davecgh/go-spew/spew"
  19. apps "k8s.io/api/apps/v1"
  20. "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. clientset "k8s.io/client-go/kubernetes"
  24. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  25. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  26. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  27. )
  28. type LogfFn func(format string, args ...interface{})
  29. func LogReplicaSetsOfDeployment(deployment *apps.Deployment, allOldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, logf LogfFn) {
  30. if newRS != nil {
  31. logf(spew.Sprintf("New ReplicaSet %q of Deployment %q:\n%+v", newRS.Name, deployment.Name, *newRS))
  32. } else {
  33. logf("New ReplicaSet of Deployment %q is nil.", deployment.Name)
  34. }
  35. if len(allOldRSs) > 0 {
  36. logf("All old ReplicaSets of Deployment %q:", deployment.Name)
  37. }
  38. for i := range allOldRSs {
  39. logf(spew.Sprintf("%+v", *allOldRSs[i]))
  40. }
  41. }
  42. func LogPodsOfDeployment(c clientset.Interface, deployment *apps.Deployment, rsList []*apps.ReplicaSet, logf LogfFn) {
  43. minReadySeconds := deployment.Spec.MinReadySeconds
  44. podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
  45. return c.CoreV1().Pods(namespace).List(context.TODO(), options)
  46. }
  47. podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
  48. if err != nil {
  49. logf("Failed to list Pods of Deployment %q: %v", deployment.Name, err)
  50. return
  51. }
  52. for _, pod := range podList.Items {
  53. availability := "not available"
  54. if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
  55. availability = "available"
  56. }
  57. logf(spew.Sprintf("Pod %q is %s:\n%+v", pod.Name, availability, pod))
  58. }
  59. }
  60. // Waits for the deployment to complete.
  61. // If during a rolling update (rolling == true), returns an error if the deployment's
  62. // rolling update strategy (max unavailable or max surge) is broken at any times.
  63. // It's not seen as a rolling update if shortly after a scaling event or the deployment is just created.
  64. func waitForDeploymentCompleteMaybeCheckRolling(c clientset.Interface, d *apps.Deployment, rolling bool, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  65. var (
  66. deployment *apps.Deployment
  67. reason string
  68. )
  69. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  70. var err error
  71. deployment, err = c.AppsV1().Deployments(d.Namespace).Get(context.TODO(), d.Name, metav1.GetOptions{})
  72. if err != nil {
  73. return false, err
  74. }
  75. // If during a rolling update, make sure rolling update strategy isn't broken at any times.
  76. if rolling {
  77. reason, err = checkRollingUpdateStatus(c, deployment, logf)
  78. if err != nil {
  79. return false, err
  80. }
  81. logf(reason)
  82. }
  83. // When the deployment status and its underlying resources reach the desired state, we're done
  84. if deploymentutil.DeploymentComplete(d, &deployment.Status) {
  85. return true, nil
  86. }
  87. reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
  88. logf(reason)
  89. return false, nil
  90. })
  91. if err == wait.ErrWaitTimeout {
  92. err = fmt.Errorf("%s", reason)
  93. }
  94. if err != nil {
  95. return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
  96. }
  97. return nil
  98. }
  99. func checkRollingUpdateStatus(c clientset.Interface, deployment *apps.Deployment, logf LogfFn) (string, error) {
  100. var reason string
  101. oldRSs, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
  102. if err != nil {
  103. return "", err
  104. }
  105. if newRS == nil {
  106. // New RC hasn't been created yet.
  107. reason = "new replica set hasn't been created yet"
  108. return reason, nil
  109. }
  110. allRSs := append(oldRSs, newRS)
  111. // The old/new ReplicaSets need to contain the pod-template-hash label
  112. for i := range allRSs {
  113. if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
  114. reason = "all replica sets need to contain the pod-template-hash label"
  115. return reason, nil
  116. }
  117. }
  118. // Check max surge and min available
  119. totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  120. maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
  121. if totalCreated > maxCreated {
  122. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  123. LogPodsOfDeployment(c, deployment, allRSs, logf)
  124. return "", fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
  125. }
  126. minAvailable := deploymentutil.MinAvailable(deployment)
  127. if deployment.Status.AvailableReplicas < minAvailable {
  128. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  129. LogPodsOfDeployment(c, deployment, allRSs, logf)
  130. return "", fmt.Errorf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
  131. }
  132. return "", nil
  133. }
  134. // Waits for the deployment to complete, and check rolling update strategy isn't broken at any times.
  135. // Rolling update strategy should not be broken during a rolling update.
  136. func WaitForDeploymentCompleteAndCheckRolling(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  137. rolling := true
  138. return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
  139. }
  140. // Waits for the deployment to complete, and don't check if rolling update strategy is broken.
  141. // Rolling update strategy is used only during a rolling update, and can be violated in other situations,
  142. // such as shortly after a scaling event or the deployment is just created.
  143. func WaitForDeploymentComplete(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  144. rolling := false
  145. return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
  146. }
  147. // WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
  148. // Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
  149. func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  150. var deployment *apps.Deployment
  151. var newRS *apps.ReplicaSet
  152. var reason string
  153. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  154. var err error
  155. deployment, err = c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  156. if err != nil {
  157. return false, err
  158. }
  159. // The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
  160. newRS, err = deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
  161. if err != nil {
  162. return false, err
  163. }
  164. if err := checkRevisionAndImage(deployment, newRS, revision, image); err != nil {
  165. reason = err.Error()
  166. logf(reason)
  167. return false, nil
  168. }
  169. return true, nil
  170. })
  171. if err == wait.ErrWaitTimeout {
  172. LogReplicaSetsOfDeployment(deployment, nil, newRS, logf)
  173. err = fmt.Errorf(reason)
  174. }
  175. if newRS == nil {
  176. return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
  177. }
  178. if err != nil {
  179. return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
  180. }
  181. return nil
  182. }
  183. // CheckDeploymentRevisionAndImage checks if the input deployment's and its new replica set's revision and image are as expected.
  184. func CheckDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName, revision, image string) error {
  185. deployment, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  186. if err != nil {
  187. return fmt.Errorf("unable to get deployment %s during revision check: %v", deploymentName, err)
  188. }
  189. // Check revision of the new replica set of this deployment
  190. newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
  191. if err != nil {
  192. return fmt.Errorf("unable to get new replicaset of deployment %s during revision check: %v", deploymentName, err)
  193. }
  194. return checkRevisionAndImage(deployment, newRS, revision, image)
  195. }
  196. func checkRevisionAndImage(deployment *apps.Deployment, newRS *apps.ReplicaSet, revision, image string) error {
  197. // The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
  198. if newRS == nil {
  199. return fmt.Errorf("new replicaset for deployment %q is yet to be created", deployment.Name)
  200. }
  201. if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
  202. return fmt.Errorf("new replica set %q doesn't have %q label selector", newRS.Name, apps.DefaultDeploymentUniqueLabelKey)
  203. }
  204. // Check revision of this deployment, and of the new replica set of this deployment
  205. if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
  206. return fmt.Errorf("deployment %q doesn't have the required revision set", deployment.Name)
  207. }
  208. if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
  209. return fmt.Errorf("new replicaset %q doesn't have the required revision set", newRS.Name)
  210. }
  211. // Check the image of this deployment, and of the new replica set of this deployment
  212. if !containsImage(deployment.Spec.Template.Spec.Containers, image) {
  213. return fmt.Errorf("deployment %q doesn't have the required image %s set", deployment.Name, image)
  214. }
  215. if !containsImage(newRS.Spec.Template.Spec.Containers, image) {
  216. return fmt.Errorf("new replica set %q doesn't have the required image %s.", newRS.Name, image)
  217. }
  218. return nil
  219. }
  220. func containsImage(containers []v1.Container, imageName string) bool {
  221. for _, container := range containers {
  222. if container.Image == imageName {
  223. return true
  224. }
  225. }
  226. return false
  227. }
  228. type UpdateDeploymentFunc func(d *apps.Deployment)
  229. func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*apps.Deployment, error) {
  230. var deployment *apps.Deployment
  231. var updateErr error
  232. pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  233. var err error
  234. if deployment, err = c.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}); err != nil {
  235. return false, err
  236. }
  237. // Apply the update, then attempt to push it to the apiserver.
  238. applyUpdate(deployment)
  239. if deployment, err = c.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}); err == nil {
  240. logf("Updating deployment %s", name)
  241. return true, nil
  242. }
  243. updateErr = err
  244. return false, nil
  245. })
  246. if pollErr == wait.ErrWaitTimeout {
  247. pollErr = fmt.Errorf("couldn't apply the provided updated to deployment %q: %v", name, updateErr)
  248. }
  249. return deployment, pollErr
  250. }
  251. func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error {
  252. return deploymentutil.WaitForObservedDeployment(func() (*apps.Deployment, error) {
  253. return c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  254. }, desiredGeneration, 2*time.Second, 1*time.Minute)
  255. }
  256. // WaitForDeploymentRollbackCleared waits for given deployment either started rolling back or doesn't need to rollback.
  257. func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName string, pollInterval, pollTimeout time.Duration) error {
  258. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  259. deployment, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  260. if err != nil {
  261. return false, err
  262. }
  263. // Rollback not set or is kicked off
  264. if deployment.Annotations[apps.DeprecatedRollbackTo] == "" {
  265. return true, nil
  266. }
  267. return false, nil
  268. })
  269. if err != nil {
  270. return fmt.Errorf("error waiting for deployment %s rollbackTo to be cleared: %v", deploymentName, err)
  271. }
  272. return nil
  273. }
  274. // WaitForDeploymentUpdatedReplicasGTE waits for given deployment to be observed by the controller and has at least a number of updatedReplicas
  275. func WaitForDeploymentUpdatedReplicasGTE(c clientset.Interface, ns, deploymentName string, minUpdatedReplicas int32, desiredGeneration int64, pollInterval, pollTimeout time.Duration) error {
  276. var deployment *apps.Deployment
  277. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  278. d, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  279. if err != nil {
  280. return false, err
  281. }
  282. deployment = d
  283. return deployment.Status.ObservedGeneration >= desiredGeneration && deployment.Status.UpdatedReplicas >= minUpdatedReplicas, nil
  284. })
  285. if err != nil {
  286. return fmt.Errorf("error waiting for deployment %q to have at least %d updatedReplicas: %v; latest .status.updatedReplicas: %d", deploymentName, minUpdatedReplicas, err, deployment.Status.UpdatedReplicas)
  287. }
  288. return nil
  289. }
  290. func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, reason string, condType apps.DeploymentConditionType, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  291. var deployment *apps.Deployment
  292. pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  293. d, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
  294. if err != nil {
  295. return false, err
  296. }
  297. deployment = d
  298. cond := deploymentutil.GetDeploymentCondition(deployment.Status, condType)
  299. return cond != nil && cond.Reason == reason, nil
  300. })
  301. if pollErr == wait.ErrWaitTimeout {
  302. pollErr = fmt.Errorf("deployment %q never updated with the desired condition and reason, latest deployment conditions: %+v", deployment.Name, deployment.Status.Conditions)
  303. _, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
  304. if err == nil {
  305. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  306. LogPodsOfDeployment(c, deployment, append(allOldRSs, newRS), logf)
  307. }
  308. }
  309. return pollErr
  310. }