deployment.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "fmt"
  16. "time"
  17. "github.com/davecgh/go-spew/spew"
  18. apps "k8s.io/api/apps/v1"
  19. "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. clientset "k8s.io/client-go/kubernetes"
  23. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  24. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  25. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  26. )
  27. type LogfFn func(format string, args ...interface{})
  28. func LogReplicaSetsOfDeployment(deployment *apps.Deployment, allOldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, logf LogfFn) {
  29. if newRS != nil {
  30. logf(spew.Sprintf("New ReplicaSet %q of Deployment %q:\n%+v", newRS.Name, deployment.Name, *newRS))
  31. } else {
  32. logf("New ReplicaSet of Deployment %q is nil.", deployment.Name)
  33. }
  34. if len(allOldRSs) > 0 {
  35. logf("All old ReplicaSets of Deployment %q:", deployment.Name)
  36. }
  37. for i := range allOldRSs {
  38. logf(spew.Sprintf("%+v", *allOldRSs[i]))
  39. }
  40. }
  41. func LogPodsOfDeployment(c clientset.Interface, deployment *apps.Deployment, rsList []*apps.ReplicaSet, logf LogfFn) {
  42. minReadySeconds := deployment.Spec.MinReadySeconds
  43. podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
  44. return c.CoreV1().Pods(namespace).List(options)
  45. }
  46. podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
  47. if err != nil {
  48. logf("Failed to list Pods of Deployment %q: %v", deployment.Name, err)
  49. return
  50. }
  51. for _, pod := range podList.Items {
  52. availability := "not available"
  53. if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
  54. availability = "available"
  55. }
  56. logf(spew.Sprintf("Pod %q is %s:\n%+v", pod.Name, availability, pod))
  57. }
  58. }
  59. // Waits for the deployment to complete.
  60. // If during a rolling update (rolling == true), returns an error if the deployment's
  61. // rolling update strategy (max unavailable or max surge) is broken at any times.
  62. // It's not seen as a rolling update if shortly after a scaling event or the deployment is just created.
  63. func waitForDeploymentCompleteMaybeCheckRolling(c clientset.Interface, d *apps.Deployment, rolling bool, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  64. var (
  65. deployment *apps.Deployment
  66. reason string
  67. )
  68. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  69. var err error
  70. deployment, err = c.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
  71. if err != nil {
  72. return false, err
  73. }
  74. // If during a rolling update, make sure rolling update strategy isn't broken at any times.
  75. if rolling {
  76. reason, err = checkRollingUpdateStatus(c, deployment, logf)
  77. if err != nil {
  78. return false, err
  79. }
  80. logf(reason)
  81. }
  82. // When the deployment status and its underlying resources reach the desired state, we're done
  83. if deploymentutil.DeploymentComplete(d, &deployment.Status) {
  84. return true, nil
  85. }
  86. reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
  87. logf(reason)
  88. return false, nil
  89. })
  90. if err == wait.ErrWaitTimeout {
  91. err = fmt.Errorf("%s", reason)
  92. }
  93. if err != nil {
  94. return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
  95. }
  96. return nil
  97. }
  98. func checkRollingUpdateStatus(c clientset.Interface, deployment *apps.Deployment, logf LogfFn) (string, error) {
  99. var reason string
  100. oldRSs, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
  101. if err != nil {
  102. return "", err
  103. }
  104. if newRS == nil {
  105. // New RC hasn't been created yet.
  106. reason = "new replica set hasn't been created yet"
  107. return reason, nil
  108. }
  109. allRSs := append(oldRSs, newRS)
  110. // The old/new ReplicaSets need to contain the pod-template-hash label
  111. for i := range allRSs {
  112. if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
  113. reason = "all replica sets need to contain the pod-template-hash label"
  114. return reason, nil
  115. }
  116. }
  117. // Check max surge and min available
  118. totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  119. maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
  120. if totalCreated > maxCreated {
  121. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  122. LogPodsOfDeployment(c, deployment, allRSs, logf)
  123. return "", fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
  124. }
  125. minAvailable := deploymentutil.MinAvailable(deployment)
  126. if deployment.Status.AvailableReplicas < minAvailable {
  127. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  128. LogPodsOfDeployment(c, deployment, allRSs, logf)
  129. return "", fmt.Errorf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
  130. }
  131. return "", nil
  132. }
  133. // Waits for the deployment to complete, and check rolling update strategy isn't broken at any times.
  134. // Rolling update strategy should not be broken during a rolling update.
  135. func WaitForDeploymentCompleteAndCheckRolling(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  136. rolling := true
  137. return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
  138. }
  139. // Waits for the deployment to complete, and don't check if rolling update strategy is broken.
  140. // Rolling update strategy is used only during a rolling update, and can be violated in other situations,
  141. // such as shortly after a scaling event or the deployment is just created.
  142. func WaitForDeploymentComplete(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  143. rolling := false
  144. return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
  145. }
  146. // WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
  147. // Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
  148. func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  149. var deployment *apps.Deployment
  150. var newRS *apps.ReplicaSet
  151. var reason string
  152. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  153. var err error
  154. deployment, err = c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  155. if err != nil {
  156. return false, err
  157. }
  158. // The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
  159. newRS, err = deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
  160. if err != nil {
  161. return false, err
  162. }
  163. if err := checkRevisionAndImage(deployment, newRS, revision, image); err != nil {
  164. reason = err.Error()
  165. logf(reason)
  166. return false, nil
  167. }
  168. return true, nil
  169. })
  170. if err == wait.ErrWaitTimeout {
  171. LogReplicaSetsOfDeployment(deployment, nil, newRS, logf)
  172. err = fmt.Errorf(reason)
  173. }
  174. if newRS == nil {
  175. return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
  176. }
  177. if err != nil {
  178. return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
  179. }
  180. return nil
  181. }
  182. // CheckDeploymentRevisionAndImage checks if the input deployment's and its new replica set's revision and image are as expected.
  183. func CheckDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName, revision, image string) error {
  184. deployment, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  185. if err != nil {
  186. return fmt.Errorf("unable to get deployment %s during revision check: %v", deploymentName, err)
  187. }
  188. // Check revision of the new replica set of this deployment
  189. newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
  190. if err != nil {
  191. return fmt.Errorf("unable to get new replicaset of deployment %s during revision check: %v", deploymentName, err)
  192. }
  193. return checkRevisionAndImage(deployment, newRS, revision, image)
  194. }
  195. func checkRevisionAndImage(deployment *apps.Deployment, newRS *apps.ReplicaSet, revision, image string) error {
  196. // The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
  197. if newRS == nil {
  198. return fmt.Errorf("new replicaset for deployment %q is yet to be created", deployment.Name)
  199. }
  200. if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
  201. return fmt.Errorf("new replica set %q doesn't have %q label selector", newRS.Name, apps.DefaultDeploymentUniqueLabelKey)
  202. }
  203. // Check revision of this deployment, and of the new replica set of this deployment
  204. if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
  205. return fmt.Errorf("deployment %q doesn't have the required revision set", deployment.Name)
  206. }
  207. if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
  208. return fmt.Errorf("new replicaset %q doesn't have the required revision set", newRS.Name)
  209. }
  210. // Check the image of this deployment, and of the new replica set of this deployment
  211. if !containsImage(deployment.Spec.Template.Spec.Containers, image) {
  212. return fmt.Errorf("deployment %q doesn't have the required image %s set", deployment.Name, image)
  213. }
  214. if !containsImage(newRS.Spec.Template.Spec.Containers, image) {
  215. return fmt.Errorf("new replica set %q doesn't have the required image %s.", newRS.Name, image)
  216. }
  217. return nil
  218. }
  219. func containsImage(containers []v1.Container, imageName string) bool {
  220. for _, container := range containers {
  221. if container.Image == imageName {
  222. return true
  223. }
  224. }
  225. return false
  226. }
  227. type UpdateDeploymentFunc func(d *apps.Deployment)
  228. func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*apps.Deployment, error) {
  229. var deployment *apps.Deployment
  230. var updateErr error
  231. pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  232. var err error
  233. if deployment, err = c.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{}); err != nil {
  234. return false, err
  235. }
  236. // Apply the update, then attempt to push it to the apiserver.
  237. applyUpdate(deployment)
  238. if deployment, err = c.AppsV1().Deployments(namespace).Update(deployment); err == nil {
  239. logf("Updating deployment %s", name)
  240. return true, nil
  241. }
  242. updateErr = err
  243. return false, nil
  244. })
  245. if pollErr == wait.ErrWaitTimeout {
  246. pollErr = fmt.Errorf("couldn't apply the provided updated to deployment %q: %v", name, updateErr)
  247. }
  248. return deployment, pollErr
  249. }
  250. func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error {
  251. return deploymentutil.WaitForObservedDeployment(func() (*apps.Deployment, error) {
  252. return c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  253. }, desiredGeneration, 2*time.Second, 1*time.Minute)
  254. }
  255. // WaitForDeploymentRollbackCleared waits for given deployment either started rolling back or doesn't need to rollback.
  256. func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName string, pollInterval, pollTimeout time.Duration) error {
  257. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  258. deployment, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  259. if err != nil {
  260. return false, err
  261. }
  262. // Rollback not set or is kicked off
  263. if deployment.Annotations[apps.DeprecatedRollbackTo] == "" {
  264. return true, nil
  265. }
  266. return false, nil
  267. })
  268. if err != nil {
  269. return fmt.Errorf("error waiting for deployment %s rollbackTo to be cleared: %v", deploymentName, err)
  270. }
  271. return nil
  272. }
  273. // WaitForDeploymentUpdatedReplicasGTE waits for given deployment to be observed by the controller and has at least a number of updatedReplicas
  274. func WaitForDeploymentUpdatedReplicasGTE(c clientset.Interface, ns, deploymentName string, minUpdatedReplicas int32, desiredGeneration int64, pollInterval, pollTimeout time.Duration) error {
  275. var deployment *apps.Deployment
  276. err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  277. d, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  278. if err != nil {
  279. return false, err
  280. }
  281. deployment = d
  282. return deployment.Status.ObservedGeneration >= desiredGeneration && deployment.Status.UpdatedReplicas >= minUpdatedReplicas, nil
  283. })
  284. if err != nil {
  285. return fmt.Errorf("error waiting for deployment %q to have at least %d updatedReplicas: %v; latest .status.updatedReplicas: %d", deploymentName, minUpdatedReplicas, err, deployment.Status.UpdatedReplicas)
  286. }
  287. return nil
  288. }
  289. func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, reason string, condType apps.DeploymentConditionType, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
  290. var deployment *apps.Deployment
  291. pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
  292. d, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
  293. if err != nil {
  294. return false, err
  295. }
  296. deployment = d
  297. cond := deploymentutil.GetDeploymentCondition(deployment.Status, condType)
  298. return cond != nil && cond.Reason == reason, nil
  299. })
  300. if pollErr == wait.ErrWaitTimeout {
  301. pollErr = fmt.Errorf("deployment %q never updated with the desired condition and reason, latest deployment conditions: %+v", deployment.Name, deployment.Status.Conditions)
  302. _, allOldRSs, newRS, err := deploymentutil.GetAllReplicaSets(deployment, c.AppsV1())
  303. if err == nil {
  304. LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
  305. LogPodsOfDeployment(c, deployment, append(allOldRSs, newRS), logf)
  306. }
  307. }
  308. return pollErr
  309. }