drain.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package drain
  14. import (
  15. "errors"
  16. "fmt"
  17. "math"
  18. "time"
  19. "github.com/spf13/cobra"
  20. corev1 "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. "k8s.io/apimachinery/pkg/runtime/schema"
  25. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  26. "k8s.io/apimachinery/pkg/util/sets"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/cli-runtime/pkg/genericclioptions"
  29. "k8s.io/cli-runtime/pkg/printers"
  30. "k8s.io/cli-runtime/pkg/resource"
  31. cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
  32. "k8s.io/kubernetes/pkg/kubectl/drain"
  33. "k8s.io/kubernetes/pkg/kubectl/scheme"
  34. "k8s.io/kubernetes/pkg/kubectl/util/i18n"
  35. "k8s.io/kubernetes/pkg/kubectl/util/templates"
  36. )
  37. type DrainCmdOptions struct {
  38. PrintFlags *genericclioptions.PrintFlags
  39. ToPrinter func(string) (printers.ResourcePrinterFunc, error)
  40. Namespace string
  41. drainer *drain.Helper
  42. nodeInfos []*resource.Info
  43. genericclioptions.IOStreams
  44. }
  45. var (
  46. cordonLong = templates.LongDesc(i18n.T(`
  47. Mark node as unschedulable.`))
  48. cordonExample = templates.Examples(i18n.T(`
  49. # Mark node "foo" as unschedulable.
  50. kubectl cordon foo`))
  51. )
  52. func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
  53. o := NewDrainCmdOptions(f, ioStreams)
  54. cmd := &cobra.Command{
  55. Use: "cordon NODE",
  56. DisableFlagsInUseLine: true,
  57. Short: i18n.T("Mark node as unschedulable"),
  58. Long: cordonLong,
  59. Example: cordonExample,
  60. Run: func(cmd *cobra.Command, args []string) {
  61. cmdutil.CheckErr(o.Complete(f, cmd, args))
  62. cmdutil.CheckErr(o.RunCordonOrUncordon(true))
  63. },
  64. }
  65. cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
  66. cmdutil.AddDryRunFlag(cmd)
  67. return cmd
  68. }
  69. var (
  70. uncordonLong = templates.LongDesc(i18n.T(`
  71. Mark node as schedulable.`))
  72. uncordonExample = templates.Examples(i18n.T(`
  73. # Mark node "foo" as schedulable.
  74. $ kubectl uncordon foo`))
  75. )
  76. func NewCmdUncordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
  77. o := NewDrainCmdOptions(f, ioStreams)
  78. cmd := &cobra.Command{
  79. Use: "uncordon NODE",
  80. DisableFlagsInUseLine: true,
  81. Short: i18n.T("Mark node as schedulable"),
  82. Long: uncordonLong,
  83. Example: uncordonExample,
  84. Run: func(cmd *cobra.Command, args []string) {
  85. cmdutil.CheckErr(o.Complete(f, cmd, args))
  86. cmdutil.CheckErr(o.RunCordonOrUncordon(false))
  87. },
  88. }
  89. cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
  90. cmdutil.AddDryRunFlag(cmd)
  91. return cmd
  92. }
  93. var (
  94. drainLong = templates.LongDesc(i18n.T(`
  95. Drain node in preparation for maintenance.
  96. The given node will be marked unschedulable to prevent new pods from arriving.
  97. 'drain' evicts the pods if the APIServer supports
  98. [eviction](http://kubernetes.io/docs/admin/disruptions/). Otherwise, it will use normal
  99. DELETE to delete the pods.
  100. The 'drain' evicts or deletes all pods except mirror pods (which cannot be deleted through
  101. the API server). If there are DaemonSet-managed pods, drain will not proceed
  102. without --ignore-daemonsets, and regardless it will not delete any
  103. DaemonSet-managed pods, because those pods would be immediately replaced by the
  104. DaemonSet controller, which ignores unschedulable markings. If there are any
  105. pods that are neither mirror pods nor managed by ReplicationController,
  106. ReplicaSet, DaemonSet, StatefulSet or Job, then drain will not delete any pods unless you
  107. use --force. --force will also allow deletion to proceed if the managing resource of one
  108. or more pods is missing.
  109. 'drain' waits for graceful termination. You should not operate on the machine until
  110. the command completes.
  111. When you are ready to put the node back into service, use kubectl uncordon, which
  112. will make the node schedulable again.
  113. ![Workflow](http://kubernetes.io/images/docs/kubectl_drain.svg)`))
  114. drainExample = templates.Examples(i18n.T(`
  115. # Drain node "foo", even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet on it.
  116. $ kubectl drain foo --force
  117. # As above, but abort if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet, and use a grace period of 15 minutes.
  118. $ kubectl drain foo --grace-period=900`))
  119. )
  120. func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
  121. return &DrainCmdOptions{
  122. PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
  123. IOStreams: ioStreams,
  124. drainer: &drain.Helper{
  125. GracePeriodSeconds: -1,
  126. ErrOut: ioStreams.ErrOut,
  127. },
  128. }
  129. }
  130. func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
  131. o := NewDrainCmdOptions(f, ioStreams)
  132. cmd := &cobra.Command{
  133. Use: "drain NODE",
  134. DisableFlagsInUseLine: true,
  135. Short: i18n.T("Drain node in preparation for maintenance"),
  136. Long: drainLong,
  137. Example: drainExample,
  138. Run: func(cmd *cobra.Command, args []string) {
  139. cmdutil.CheckErr(o.Complete(f, cmd, args))
  140. cmdutil.CheckErr(o.RunDrain())
  141. },
  142. }
  143. cmd.Flags().BoolVar(&o.drainer.Force, "force", o.drainer.Force, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.")
  144. cmd.Flags().BoolVar(&o.drainer.IgnoreAllDaemonSets, "ignore-daemonsets", o.drainer.IgnoreAllDaemonSets, "Ignore DaemonSet-managed pods.")
  145. cmd.Flags().BoolVar(&o.drainer.DeleteLocalData, "delete-local-data", o.drainer.DeleteLocalData, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
  146. cmd.Flags().IntVar(&o.drainer.GracePeriodSeconds, "grace-period", o.drainer.GracePeriodSeconds, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
  147. cmd.Flags().DurationVar(&o.drainer.Timeout, "timeout", o.drainer.Timeout, "The length of time to wait before giving up, zero means infinite")
  148. cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
  149. cmd.Flags().StringVarP(&o.drainer.PodSelector, "pod-selector", "", o.drainer.PodSelector, "Label selector to filter pods on the node")
  150. cmdutil.AddDryRunFlag(cmd)
  151. return cmd
  152. }
  153. // Complete populates some fields from the factory, grabs command line
  154. // arguments and looks up the node using Builder
  155. func (o *DrainCmdOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
  156. var err error
  157. if len(args) == 0 && !cmd.Flags().Changed("selector") {
  158. return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
  159. }
  160. if len(args) > 0 && len(o.drainer.Selector) > 0 {
  161. return cmdutil.UsageErrorf(cmd, "error: cannot specify both a node name and a --selector option")
  162. }
  163. o.drainer.DryRun = cmdutil.GetDryRunFlag(cmd)
  164. if o.drainer.Client, err = f.KubernetesClientSet(); err != nil {
  165. return err
  166. }
  167. if len(o.drainer.PodSelector) > 0 {
  168. if _, err := labels.Parse(o.drainer.PodSelector); err != nil {
  169. return errors.New("--pod-selector=<pod_selector> must be a valid label selector")
  170. }
  171. }
  172. o.nodeInfos = []*resource.Info{}
  173. o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
  174. if err != nil {
  175. return err
  176. }
  177. o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) {
  178. o.PrintFlags.NamePrintFlags.Operation = operation
  179. if o.drainer.DryRun {
  180. o.PrintFlags.Complete("%s (dry run)")
  181. }
  182. printer, err := o.PrintFlags.ToPrinter()
  183. if err != nil {
  184. return nil, err
  185. }
  186. return printer.PrintObj, nil
  187. }
  188. builder := f.NewBuilder().
  189. WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
  190. NamespaceParam(o.Namespace).DefaultNamespace().
  191. ResourceNames("nodes", args...).
  192. SingleResourceType().
  193. Flatten()
  194. if len(o.drainer.Selector) > 0 {
  195. builder = builder.LabelSelectorParam(o.drainer.Selector).
  196. ResourceTypes("nodes")
  197. }
  198. r := builder.Do()
  199. if err = r.Err(); err != nil {
  200. return err
  201. }
  202. return r.Visit(func(info *resource.Info, err error) error {
  203. if err != nil {
  204. return err
  205. }
  206. if info.Mapping.Resource.GroupResource() != (schema.GroupResource{Group: "", Resource: "nodes"}) {
  207. return fmt.Errorf("error: expected resource of type node, got %q", info.Mapping.Resource)
  208. }
  209. o.nodeInfos = append(o.nodeInfos, info)
  210. return nil
  211. })
  212. }
  213. // RunDrain runs the 'drain' command
  214. func (o *DrainCmdOptions) RunDrain() error {
  215. if err := o.RunCordonOrUncordon(true); err != nil {
  216. return err
  217. }
  218. printObj, err := o.ToPrinter("drained")
  219. if err != nil {
  220. return err
  221. }
  222. drainedNodes := sets.NewString()
  223. var fatal error
  224. for _, info := range o.nodeInfos {
  225. var err error
  226. if !o.drainer.DryRun {
  227. err = o.deleteOrEvictPodsSimple(info)
  228. }
  229. if err == nil || o.drainer.DryRun {
  230. drainedNodes.Insert(info.Name)
  231. printObj(info.Object, o.Out)
  232. } else {
  233. fmt.Fprintf(o.ErrOut, "error: unable to drain node %q, aborting command...\n\n", info.Name)
  234. remainingNodes := []string{}
  235. fatal = err
  236. for _, remainingInfo := range o.nodeInfos {
  237. if drainedNodes.Has(remainingInfo.Name) {
  238. continue
  239. }
  240. remainingNodes = append(remainingNodes, remainingInfo.Name)
  241. }
  242. if len(remainingNodes) > 0 {
  243. fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
  244. for _, nodeName := range remainingNodes {
  245. fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
  246. }
  247. }
  248. break
  249. }
  250. }
  251. return fatal
  252. }
  253. func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
  254. list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
  255. if errs != nil {
  256. return utilerrors.NewAggregate(errs)
  257. }
  258. if warnings := list.Warnings(); warnings != "" {
  259. fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
  260. }
  261. if err := o.deleteOrEvictPods(list.Pods()); err != nil {
  262. pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
  263. fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
  264. for _, pendingPod := range pendingList.Pods() {
  265. fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name)
  266. }
  267. if newErrs != nil {
  268. fmt.Fprintf(o.ErrOut, "following errors also occurred:\n%s", utilerrors.NewAggregate(newErrs))
  269. }
  270. return err
  271. }
  272. return nil
  273. }
  274. // deleteOrEvictPods deletes or evicts the pods on the api server
  275. func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error {
  276. if len(pods) == 0 {
  277. return nil
  278. }
  279. policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client)
  280. if err != nil {
  281. return err
  282. }
  283. getPodFn := func(namespace, name string) (*corev1.Pod, error) {
  284. return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
  285. }
  286. if len(policyGroupVersion) > 0 {
  287. return o.evictPods(pods, policyGroupVersion, getPodFn)
  288. } else {
  289. return o.deletePods(pods, getPodFn)
  290. }
  291. }
  292. func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
  293. returnCh := make(chan error, 1)
  294. for _, pod := range pods {
  295. go func(pod corev1.Pod, returnCh chan error) {
  296. for {
  297. fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name)
  298. err := o.drainer.EvictPod(pod, policyGroupVersion)
  299. if err == nil {
  300. break
  301. } else if apierrors.IsNotFound(err) {
  302. returnCh <- nil
  303. return
  304. } else if apierrors.IsTooManyRequests(err) {
  305. fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
  306. time.Sleep(5 * time.Second)
  307. } else {
  308. returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
  309. return
  310. }
  311. }
  312. _, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
  313. if err == nil {
  314. returnCh <- nil
  315. } else {
  316. returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
  317. }
  318. }(pod, returnCh)
  319. }
  320. doneCount := 0
  321. var errors []error
  322. // 0 timeout means infinite, we use MaxInt64 to represent it.
  323. var globalTimeout time.Duration
  324. if o.drainer.Timeout == 0 {
  325. globalTimeout = time.Duration(math.MaxInt64)
  326. } else {
  327. globalTimeout = o.drainer.Timeout
  328. }
  329. globalTimeoutCh := time.After(globalTimeout)
  330. numPods := len(pods)
  331. for doneCount < numPods {
  332. select {
  333. case err := <-returnCh:
  334. doneCount++
  335. if err != nil {
  336. errors = append(errors, err)
  337. }
  338. case <-globalTimeoutCh:
  339. return fmt.Errorf("drain did not complete within %v", globalTimeout)
  340. }
  341. }
  342. return utilerrors.NewAggregate(errors)
  343. }
  344. func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
  345. // 0 timeout means infinite, we use MaxInt64 to represent it.
  346. var globalTimeout time.Duration
  347. if o.drainer.Timeout == 0 {
  348. globalTimeout = time.Duration(math.MaxInt64)
  349. } else {
  350. globalTimeout = o.drainer.Timeout
  351. }
  352. for _, pod := range pods {
  353. err := o.drainer.DeletePod(pod)
  354. if err != nil && !apierrors.IsNotFound(err) {
  355. return err
  356. }
  357. }
  358. _, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn)
  359. return err
  360. }
  361. func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) {
  362. var verbStr string
  363. if usingEviction {
  364. verbStr = "evicted"
  365. } else {
  366. verbStr = "deleted"
  367. }
  368. printObj, err := o.ToPrinter(verbStr)
  369. if err != nil {
  370. return pods, err
  371. }
  372. err = wait.PollImmediate(interval, timeout, func() (bool, error) {
  373. pendingPods := []corev1.Pod{}
  374. for i, pod := range pods {
  375. p, err := getPodFn(pod.Namespace, pod.Name)
  376. if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
  377. printObj(&pod, o.Out)
  378. continue
  379. } else if err != nil {
  380. return false, err
  381. } else {
  382. pendingPods = append(pendingPods, pods[i])
  383. }
  384. }
  385. pods = pendingPods
  386. if len(pendingPods) > 0 {
  387. return false, nil
  388. }
  389. return true, nil
  390. })
  391. return pods, err
  392. }
  393. // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
  394. // "Unschedulable" is passed as the first arg.
  395. func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
  396. cordonOrUncordon := "cordon"
  397. if !desired {
  398. cordonOrUncordon = "un" + cordonOrUncordon
  399. }
  400. for _, nodeInfo := range o.nodeInfos {
  401. printError := func(err error) {
  402. fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err)
  403. }
  404. gvk := nodeInfo.ResourceMapping().GroupVersionKind
  405. if gvk.Kind == "Node" {
  406. c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk)
  407. if err != nil {
  408. printError(err)
  409. continue
  410. }
  411. if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
  412. printObj, err := o.ToPrinter(already(desired))
  413. if err != nil {
  414. fmt.Fprintf(o.ErrOut, "error: %v\n", err)
  415. continue
  416. }
  417. printObj(nodeInfo.Object, o.Out)
  418. } else {
  419. if !o.drainer.DryRun {
  420. err, patchErr := c.PatchOrReplace(o.drainer.Client)
  421. if patchErr != nil {
  422. printError(patchErr)
  423. }
  424. if err != nil {
  425. printError(err)
  426. continue
  427. }
  428. }
  429. printObj, err := o.ToPrinter(changed(desired))
  430. if err != nil {
  431. fmt.Fprintf(o.ErrOut, "%v\n", err)
  432. continue
  433. }
  434. printObj(nodeInfo.Object, o.Out)
  435. }
  436. } else {
  437. printObj, err := o.ToPrinter("skipped")
  438. if err != nil {
  439. fmt.Fprintf(o.ErrOut, "%v\n", err)
  440. continue
  441. }
  442. printObj(nodeInfo.Object, o.Out)
  443. }
  444. }
  445. return nil
  446. }
  447. // already() and changed() return suitable strings for {un,}cordoning
  448. func already(desired bool) string {
  449. if desired {
  450. return "already cordoned"
  451. }
  452. return "already uncordoned"
  453. }
  454. func changed(desired bool) string {
  455. if desired {
  456. return "cordoned"
  457. }
  458. return "uncordoned"
  459. }