prepull.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package upgrade
  14. import (
  15. "fmt"
  16. "time"
  17. "github.com/pkg/errors"
  18. apps "k8s.io/api/apps/v1"
  19. v1 "k8s.io/api/core/v1"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. clientset "k8s.io/client-go/kubernetes"
  23. kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
  24. "k8s.io/kubernetes/cmd/kubeadm/app/constants"
  25. "k8s.io/kubernetes/cmd/kubeadm/app/images"
  26. "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
  27. utilpointer "k8s.io/utils/pointer"
  28. )
  29. const (
  30. prepullPrefix = "upgrade-prepull-"
  31. )
  32. // Prepuller defines an interface for performing a prepull operation in a create-wait-delete fashion in parallel
  33. type Prepuller interface {
  34. CreateFunc(string) error
  35. WaitFunc(string)
  36. DeleteFunc(string) error
  37. }
  38. // DaemonSetPrepuller makes sure the control-plane images are available on all control-planes
  39. type DaemonSetPrepuller struct {
  40. client clientset.Interface
  41. cfg *kubeadmapi.ClusterConfiguration
  42. waiter apiclient.Waiter
  43. }
  44. // NewDaemonSetPrepuller creates a new instance of the DaemonSetPrepuller struct
  45. func NewDaemonSetPrepuller(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.ClusterConfiguration) *DaemonSetPrepuller {
  46. return &DaemonSetPrepuller{
  47. client: client,
  48. cfg: cfg,
  49. waiter: waiter,
  50. }
  51. }
  52. // CreateFunc creates a DaemonSet for making the image available on every relevant node
  53. func (d *DaemonSetPrepuller) CreateFunc(component string) error {
  54. var image string
  55. if component == constants.Etcd {
  56. image = images.GetEtcdImage(d.cfg)
  57. } else {
  58. image = images.GetKubernetesImage(component, d.cfg)
  59. }
  60. ds := buildPrePullDaemonSet(component, image)
  61. // Create the DaemonSet in the API Server
  62. if err := apiclient.CreateOrUpdateDaemonSet(d.client, ds); err != nil {
  63. return errors.Wrapf(err, "unable to create a DaemonSet for prepulling the component %q", component)
  64. }
  65. return nil
  66. }
  67. // WaitFunc waits for all Pods in the specified DaemonSet to be in the Running state
  68. func (d *DaemonSetPrepuller) WaitFunc(component string) {
  69. fmt.Printf("[upgrade/prepull] Prepulling image for component %s.\n", component)
  70. d.waiter.WaitForPodsWithLabel("k8s-app=upgrade-prepull-" + component)
  71. }
  72. // DeleteFunc deletes the DaemonSet used for making the image available on every relevant node
  73. func (d *DaemonSetPrepuller) DeleteFunc(component string) error {
  74. dsName := addPrepullPrefix(component)
  75. // TODO: The IsNotFound() check is required in cases where the DaemonSet is missing.
  76. // Investigate why this happens: https://github.com/kubernetes/kubeadm/issues/1700
  77. if err := apiclient.DeleteDaemonSetForeground(d.client, metav1.NamespaceSystem, dsName); err != nil && !apierrors.IsNotFound(err) {
  78. return errors.Wrapf(err, "unable to cleanup the DaemonSet used for prepulling %s", component)
  79. }
  80. fmt.Printf("[upgrade/prepull] Prepulled image for component %s.\n", component)
  81. return nil
  82. }
  83. // PrepullImagesInParallel creates DaemonSets synchronously but waits in parallel for the images to pull
  84. func PrepullImagesInParallel(kubePrepuller Prepuller, timeout time.Duration, componentsToPrepull []string) error {
  85. fmt.Printf("[upgrade/prepull] Will prepull images for components %v\n", componentsToPrepull)
  86. timeoutChan := time.After(timeout)
  87. // Synchronously create the DaemonSets
  88. for _, component := range componentsToPrepull {
  89. if err := kubePrepuller.CreateFunc(component); err != nil {
  90. return err
  91. }
  92. }
  93. // Create a channel for streaming data from goroutines that run in parallel to a blocking for loop that cleans up
  94. prePulledChan := make(chan string, len(componentsToPrepull))
  95. for _, component := range componentsToPrepull {
  96. go func(c string) {
  97. // Wait as long as needed. This WaitFunc call should be blocking until completion
  98. kubePrepuller.WaitFunc(c)
  99. // When the task is done, go ahead and cleanup by sending the name to the channel
  100. prePulledChan <- c
  101. }(component)
  102. }
  103. // This call blocks until all expected messages are received from the channel or errors out if timeoutChan fires.
  104. // For every successful wait, kubePrepuller.DeleteFunc is executed
  105. if err := waitForItemsFromChan(timeoutChan, prePulledChan, len(componentsToPrepull), kubePrepuller.DeleteFunc); err != nil {
  106. return err
  107. }
  108. fmt.Println("[upgrade/prepull] Successfully prepulled the images for all the control plane components")
  109. return nil
  110. }
  111. // waitForItemsFromChan waits for n elements from stringChan with a timeout. For every item received from stringChan, cleanupFunc is executed
  112. func waitForItemsFromChan(timeoutChan <-chan time.Time, stringChan chan string, n int, cleanupFunc func(string) error) error {
  113. i := 0
  114. for {
  115. select {
  116. case <-timeoutChan:
  117. return errors.New("the prepull operation timed out")
  118. case result := <-stringChan:
  119. i++
  120. // If the cleanup function errors; error here as well
  121. if err := cleanupFunc(result); err != nil {
  122. return err
  123. }
  124. if i == n {
  125. return nil
  126. }
  127. }
  128. }
  129. }
  130. // addPrepullPrefix adds the prepull prefix for this functionality; can be used in names, labels, etc.
  131. func addPrepullPrefix(component string) string {
  132. return fmt.Sprintf("%s%s", prepullPrefix, component)
  133. }
  134. // buildPrePullDaemonSet builds the DaemonSet that ensures the control plane image is available
  135. func buildPrePullDaemonSet(component, image string) *apps.DaemonSet {
  136. var gracePeriodSecs int64
  137. return &apps.DaemonSet{
  138. ObjectMeta: metav1.ObjectMeta{
  139. Name: addPrepullPrefix(component),
  140. Namespace: metav1.NamespaceSystem,
  141. },
  142. Spec: apps.DaemonSetSpec{
  143. Selector: &metav1.LabelSelector{
  144. MatchLabels: map[string]string{
  145. "k8s-app": addPrepullPrefix(component),
  146. },
  147. },
  148. Template: v1.PodTemplateSpec{
  149. ObjectMeta: metav1.ObjectMeta{
  150. Labels: map[string]string{
  151. "k8s-app": addPrepullPrefix(component),
  152. },
  153. },
  154. Spec: v1.PodSpec{
  155. Containers: []v1.Container{
  156. {
  157. Name: component,
  158. Image: image,
  159. Command: []string{"/bin/sleep", "3600"},
  160. },
  161. },
  162. NodeSelector: map[string]string{
  163. constants.LabelNodeRoleMaster: "",
  164. },
  165. Tolerations: []v1.Toleration{constants.ControlPlaneToleration},
  166. TerminationGracePeriodSeconds: &gracePeriodSecs,
  167. // Explicitly add a PodSecurityContext to allow these Pods to run as non-root.
  168. // This prevents restrictive PSPs from blocking the Pod creation.
  169. SecurityContext: &v1.PodSecurityContext{
  170. RunAsUser: utilpointer.Int64Ptr(999),
  171. },
  172. },
  173. },
  174. },
  175. }
  176. }