wait.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "strings"
  20. "time"
  21. "github.com/spf13/cobra"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/apimachinery/pkg/runtime/schema"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/apimachinery/pkg/watch"
  31. "k8s.io/cli-runtime/pkg/genericclioptions"
  32. "k8s.io/cli-runtime/pkg/printers"
  33. "k8s.io/cli-runtime/pkg/resource"
  34. "k8s.io/client-go/dynamic"
  35. watchtools "k8s.io/client-go/tools/watch"
  36. cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
  37. "k8s.io/kubernetes/pkg/kubectl/util/templates"
  38. )
  39. var (
  40. waitLong = templates.LongDesc(`
  41. Experimental: Wait for a specific condition on one or many resources.
  42. The command takes multiple resources and waits until the specified condition
  43. is seen in the Status field of every given resource.
  44. Alternatively, the command can wait for the given set of resources to be deleted
  45. by providing the "delete" keyword as the value to the --for flag.
  46. A successful message will be printed to stdout indicating when the specified
  47. condition has been met. One can use -o option to change to output destination.`)
  48. waitExample = templates.Examples(`
  49. # Wait for the pod "busybox1" to contain the status condition of type "Ready".
  50. kubectl wait --for=condition=Ready pod/busybox1
  51. # Wait for the pod "busybox1" to be deleted, with a timeout of 60s, after having issued the "delete" command.
  52. kubectl delete pod/busybox1
  53. kubectl wait --for=delete pod/busybox1 --timeout=60s`)
  54. )
  55. // errNoMatchingResources is returned when there is no resources matching a query.
  56. var errNoMatchingResources = errors.New("no matching resources found")
  57. // WaitFlags directly reflect the information that CLI is gathering via flags. They will be converted to Options, which
  58. // reflect the runtime requirements for the command. This structure reduces the transformation to wiring and makes
  59. // the logic itself easy to unit test
  60. type WaitFlags struct {
  61. RESTClientGetter genericclioptions.RESTClientGetter
  62. PrintFlags *genericclioptions.PrintFlags
  63. ResourceBuilderFlags *genericclioptions.ResourceBuilderFlags
  64. Timeout time.Duration
  65. ForCondition string
  66. genericclioptions.IOStreams
  67. }
  68. // NewWaitFlags returns a default WaitFlags
  69. func NewWaitFlags(restClientGetter genericclioptions.RESTClientGetter, streams genericclioptions.IOStreams) *WaitFlags {
  70. return &WaitFlags{
  71. RESTClientGetter: restClientGetter,
  72. PrintFlags: genericclioptions.NewPrintFlags("condition met"),
  73. ResourceBuilderFlags: genericclioptions.NewResourceBuilderFlags().
  74. WithLabelSelector("").
  75. WithFieldSelector("").
  76. WithAll(false).
  77. WithAllNamespaces(false).
  78. WithAll(false).
  79. WithLatest(),
  80. Timeout: 30 * time.Second,
  81. IOStreams: streams,
  82. }
  83. }
  84. // NewCmdWait returns a cobra command for waiting
  85. func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericclioptions.IOStreams) *cobra.Command {
  86. flags := NewWaitFlags(restClientGetter, streams)
  87. cmd := &cobra.Command{
  88. Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available]",
  89. Short: "Experimental: Wait for a specific condition on one or many resources.",
  90. Long: waitLong,
  91. Example: waitExample,
  92. DisableFlagsInUseLine: true,
  93. Run: func(cmd *cobra.Command, args []string) {
  94. o, err := flags.ToOptions(args)
  95. cmdutil.CheckErr(err)
  96. err = o.RunWait()
  97. cmdutil.CheckErr(err)
  98. },
  99. SuggestFor: []string{"list", "ps"},
  100. }
  101. flags.AddFlags(cmd)
  102. return cmd
  103. }
  104. // AddFlags registers flags for a cli
  105. func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {
  106. flags.PrintFlags.AddFlags(cmd)
  107. flags.ResourceBuilderFlags.AddFlags(cmd.Flags())
  108. cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up. Zero means check once and don't wait, negative means wait for a week.")
  109. cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name].")
  110. }
  111. // ToOptions converts from CLI inputs to runtime inputs
  112. func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
  113. printer, err := flags.PrintFlags.ToPrinter()
  114. if err != nil {
  115. return nil, err
  116. }
  117. builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)
  118. clientConfig, err := flags.RESTClientGetter.ToRESTConfig()
  119. if err != nil {
  120. return nil, err
  121. }
  122. dynamicClient, err := dynamic.NewForConfig(clientConfig)
  123. if err != nil {
  124. return nil, err
  125. }
  126. conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
  127. if err != nil {
  128. return nil, err
  129. }
  130. effectiveTimeout := flags.Timeout
  131. if effectiveTimeout < 0 {
  132. effectiveTimeout = 168 * time.Hour
  133. }
  134. o := &WaitOptions{
  135. ResourceFinder: builder,
  136. DynamicClient: dynamicClient,
  137. Timeout: effectiveTimeout,
  138. Printer: printer,
  139. ConditionFn: conditionFn,
  140. IOStreams: flags.IOStreams,
  141. }
  142. return o, nil
  143. }
  144. func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
  145. if strings.ToLower(condition) == "delete" {
  146. return IsDeleted, nil
  147. }
  148. if strings.HasPrefix(condition, "condition=") {
  149. conditionName := condition[len("condition="):]
  150. conditionValue := "true"
  151. if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 {
  152. conditionValue = conditionName[equalsIndex+1:]
  153. conditionName = conditionName[0:equalsIndex]
  154. }
  155. return ConditionalWait{
  156. conditionName: conditionName,
  157. conditionStatus: conditionValue,
  158. errOut: errOut,
  159. }.IsConditionMet, nil
  160. }
  161. return nil, fmt.Errorf("unrecognized condition: %q", condition)
  162. }
  163. // ResourceLocation holds the location of a resource
  164. type ResourceLocation struct {
  165. GroupResource schema.GroupResource
  166. Namespace string
  167. Name string
  168. }
  169. // UIDMap maps ResourceLocation with UID
  170. type UIDMap map[ResourceLocation]types.UID
  171. // WaitOptions is a set of options that allows you to wait. This is the object reflects the runtime needs of a wait
  172. // command, making the logic itself easy to unit test with our existing mocks.
  173. type WaitOptions struct {
  174. ResourceFinder genericclioptions.ResourceFinder
  175. // UIDMap maps a resource location to a UID. It is optional, but ConditionFuncs may choose to use it to make the result
  176. // more reliable. For instance, delete can look for UID consistency during delegated calls.
  177. UIDMap UIDMap
  178. DynamicClient dynamic.Interface
  179. Timeout time.Duration
  180. Printer printers.ResourcePrinter
  181. ConditionFn ConditionFunc
  182. genericclioptions.IOStreams
  183. }
  184. // ConditionFunc is the interface for providing condition checks
  185. type ConditionFunc func(info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)
  186. // RunWait runs the waiting logic
  187. func (o *WaitOptions) RunWait() error {
  188. visitCount := 0
  189. err := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error {
  190. if err != nil {
  191. return err
  192. }
  193. visitCount++
  194. finalObject, success, err := o.ConditionFn(info, o)
  195. if success {
  196. o.Printer.PrintObj(finalObject, o.Out)
  197. return nil
  198. }
  199. if err == nil {
  200. return fmt.Errorf("%v unsatisified for unknown reason", finalObject)
  201. }
  202. return err
  203. })
  204. if err != nil {
  205. return err
  206. }
  207. if visitCount == 0 {
  208. return errNoMatchingResources
  209. }
  210. return err
  211. }
  212. // IsDeleted is a condition func for waiting for something to be deleted
  213. func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
  214. endTime := time.Now().Add(o.Timeout)
  215. for {
  216. if len(info.Name) == 0 {
  217. return info.Object, false, fmt.Errorf("resource name must be provided")
  218. }
  219. nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
  220. // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
  221. gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})
  222. if apierrors.IsNotFound(err) {
  223. return info.Object, true, nil
  224. }
  225. if err != nil {
  226. // TODO this could do something slightly fancier if we wish
  227. return info.Object, false, err
  228. }
  229. if len(gottenObjList.Items) != 1 {
  230. return info.Object, true, nil
  231. }
  232. gottenObj := &gottenObjList.Items[0]
  233. resourceLocation := ResourceLocation{
  234. GroupResource: info.Mapping.Resource.GroupResource(),
  235. Namespace: gottenObj.GetNamespace(),
  236. Name: gottenObj.GetName(),
  237. }
  238. if uid, ok := o.UIDMap[resourceLocation]; ok {
  239. if gottenObj.GetUID() != uid {
  240. return gottenObj, true, nil
  241. }
  242. }
  243. watchOptions := metav1.ListOptions{}
  244. watchOptions.FieldSelector = nameSelector
  245. watchOptions.ResourceVersion = gottenObjList.GetResourceVersion()
  246. objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)
  247. if err != nil {
  248. return gottenObj, false, err
  249. }
  250. timeout := endTime.Sub(time.Now())
  251. errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
  252. if timeout < 0 {
  253. // we're out of time
  254. return gottenObj, false, errWaitTimeoutWithName
  255. }
  256. ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
  257. watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted)
  258. cancel()
  259. switch {
  260. case err == nil:
  261. return watchEvent.Object, true, nil
  262. case err == watchtools.ErrWatchClosed:
  263. continue
  264. case err == wait.ErrWaitTimeout:
  265. if watchEvent != nil {
  266. return watchEvent.Object, false, errWaitTimeoutWithName
  267. }
  268. return gottenObj, false, errWaitTimeoutWithName
  269. default:
  270. return gottenObj, false, err
  271. }
  272. }
  273. }
  274. // Wait has helper methods for handling watches, including error handling.
  275. type Wait struct {
  276. errOut io.Writer
  277. }
  278. // IsDeleted returns true if the object is deleted. It prints any errors it encounters.
  279. func (w Wait) IsDeleted(event watch.Event) (bool, error) {
  280. switch event.Type {
  281. case watch.Error:
  282. // keep waiting in the event we see an error - we expect the watch to be closed by
  283. // the server if the error is unrecoverable.
  284. err := apierrors.FromObject(event.Object)
  285. fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
  286. return false, nil
  287. case watch.Deleted:
  288. return true, nil
  289. default:
  290. return false, nil
  291. }
  292. }
  293. // ConditionalWait hold information to check an API status condition
  294. type ConditionalWait struct {
  295. conditionName string
  296. conditionStatus string
  297. // errOut is written to if an error occurs
  298. errOut io.Writer
  299. }
  300. // IsConditionMet is a conditionfunc for waiting on an API condition to be met
  301. func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
  302. endTime := time.Now().Add(o.Timeout)
  303. for {
  304. if len(info.Name) == 0 {
  305. return info.Object, false, fmt.Errorf("resource name must be provided")
  306. }
  307. nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
  308. var gottenObj *unstructured.Unstructured
  309. // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
  310. gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})
  311. resourceVersion := ""
  312. switch {
  313. case err != nil:
  314. return info.Object, false, err
  315. case len(gottenObjList.Items) != 1:
  316. resourceVersion = gottenObjList.GetResourceVersion()
  317. default:
  318. gottenObj = &gottenObjList.Items[0]
  319. conditionMet, err := w.checkCondition(gottenObj)
  320. if conditionMet {
  321. return gottenObj, true, nil
  322. }
  323. if err != nil {
  324. return gottenObj, false, err
  325. }
  326. resourceVersion = gottenObjList.GetResourceVersion()
  327. }
  328. watchOptions := metav1.ListOptions{}
  329. watchOptions.FieldSelector = nameSelector
  330. watchOptions.ResourceVersion = resourceVersion
  331. objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)
  332. if err != nil {
  333. return gottenObj, false, err
  334. }
  335. timeout := endTime.Sub(time.Now())
  336. errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
  337. if timeout < 0 {
  338. // we're out of time
  339. return gottenObj, false, errWaitTimeoutWithName
  340. }
  341. ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
  342. watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)
  343. cancel()
  344. switch {
  345. case err == nil:
  346. return watchEvent.Object, true, nil
  347. case err == watchtools.ErrWatchClosed:
  348. continue
  349. case err == wait.ErrWaitTimeout:
  350. if watchEvent != nil {
  351. return watchEvent.Object, false, errWaitTimeoutWithName
  352. }
  353. return gottenObj, false, errWaitTimeoutWithName
  354. default:
  355. return gottenObj, false, err
  356. }
  357. }
  358. }
  359. func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
  360. conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
  361. if err != nil {
  362. return false, err
  363. }
  364. if !found {
  365. return false, nil
  366. }
  367. for _, conditionUncast := range conditions {
  368. condition := conditionUncast.(map[string]interface{})
  369. name, found, err := unstructured.NestedString(condition, "type")
  370. if !found || err != nil || strings.ToLower(name) != strings.ToLower(w.conditionName) {
  371. continue
  372. }
  373. status, found, err := unstructured.NestedString(condition, "status")
  374. if !found || err != nil {
  375. continue
  376. }
  377. return strings.ToLower(status) == strings.ToLower(w.conditionStatus), nil
  378. }
  379. return false, nil
  380. }
  381. func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
  382. if event.Type == watch.Error {
  383. // keep waiting in the event we see an error - we expect the watch to be closed by
  384. // the server
  385. err := apierrors.FromObject(event.Object)
  386. fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
  387. return false, nil
  388. }
  389. if event.Type == watch.Deleted {
  390. // this will chain back out, result in another get and an return false back up the chain
  391. return false, nil
  392. }
  393. obj := event.Object.(*unstructured.Unstructured)
  394. return w.checkCondition(obj)
  395. }
  396. func extendErrWaitTimeout(err error, info *resource.Info) error {
  397. return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
  398. }