controller_utils.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "fmt"
  16. "strings"
  17. apierrors "k8s.io/apimachinery/pkg/api/errors"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/fields"
  20. "k8s.io/apimachinery/pkg/types"
  21. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/client-go/tools/cache"
  24. "k8s.io/client-go/tools/record"
  25. "k8s.io/api/core/v1"
  26. clientset "k8s.io/client-go/kubernetes"
  27. appsv1listers "k8s.io/client-go/listers/apps/v1"
  28. utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
  29. api "k8s.io/kubernetes/pkg/apis/core"
  30. "k8s.io/kubernetes/pkg/controller"
  31. "k8s.io/kubernetes/pkg/kubelet/util/format"
  32. nodepkg "k8s.io/kubernetes/pkg/util/node"
  33. "k8s.io/klog"
  34. )
  35. // DeletePods will delete all pods from master running on given node,
  36. // and return true if any pods were deleted, or were found pending
  37. // deletion.
  38. func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
  39. remaining := false
  40. selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
  41. options := metav1.ListOptions{FieldSelector: selector}
  42. pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
  43. var updateErrList []error
  44. if err != nil {
  45. return remaining, err
  46. }
  47. if len(pods.Items) > 0 {
  48. RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
  49. }
  50. for _, pod := range pods.Items {
  51. // Defensive check, also needed for tests.
  52. if pod.Spec.NodeName != nodeName {
  53. continue
  54. }
  55. // Set reason and message in the pod object.
  56. if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
  57. if apierrors.IsConflict(err) {
  58. updateErrList = append(updateErrList,
  59. fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err))
  60. continue
  61. }
  62. }
  63. // if the pod has already been marked for deletion, we still return true that there are remaining pods.
  64. if pod.DeletionGracePeriodSeconds != nil {
  65. remaining = true
  66. continue
  67. }
  68. // if the pod is managed by a daemonset, ignore it
  69. _, err := daemonStore.GetPodDaemonSets(&pod)
  70. if err == nil { // No error means at least one daemonset was found
  71. continue
  72. }
  73. klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
  74. recorder.Eventf(&pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
  75. if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
  76. return false, err
  77. }
  78. remaining = true
  79. }
  80. if len(updateErrList) > 0 {
  81. return false, utilerrors.NewAggregate(updateErrList)
  82. }
  83. return remaining, nil
  84. }
  85. // SetPodTerminationReason attempts to set a reason and message in the
  86. // pod status, updates it in the apiserver, and returns an error if it
  87. // encounters one.
  88. func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
  89. if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
  90. return pod, nil
  91. }
  92. pod.Status.Reason = nodepkg.NodeUnreachablePodReason
  93. pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
  94. var updatedPod *v1.Pod
  95. var err error
  96. if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
  97. return nil, err
  98. }
  99. return updatedPod, nil
  100. }
  101. // MarkAllPodsNotReady updates ready status of all pods running on
  102. // given node from master return true if success
  103. func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
  104. nodeName := node.Name
  105. klog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
  106. opts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String()}
  107. pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(opts)
  108. if err != nil {
  109. return err
  110. }
  111. errMsg := []string{}
  112. for _, pod := range pods.Items {
  113. // Defensive check, also needed for tests.
  114. if pod.Spec.NodeName != nodeName {
  115. continue
  116. }
  117. for _, cond := range pod.Status.Conditions {
  118. if cond.Type == v1.PodReady {
  119. cond.Status = v1.ConditionFalse
  120. if !utilpod.UpdatePodCondition(&pod.Status, &cond) {
  121. break
  122. }
  123. klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
  124. _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(&pod)
  125. if err != nil {
  126. klog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
  127. errMsg = append(errMsg, fmt.Sprintf("%v", err))
  128. }
  129. break
  130. }
  131. }
  132. }
  133. if len(errMsg) == 0 {
  134. return nil
  135. }
  136. return fmt.Errorf("%v", strings.Join(errMsg, "; "))
  137. }
  138. // RecordNodeEvent records a event related to a node.
  139. func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
  140. ref := &v1.ObjectReference{
  141. Kind: "Node",
  142. Name: nodeName,
  143. UID: types.UID(nodeUID),
  144. Namespace: "",
  145. }
  146. klog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
  147. recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
  148. }
  149. // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
  150. func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
  151. ref := &v1.ObjectReference{
  152. Kind: "Node",
  153. Name: node.Name,
  154. UID: node.UID,
  155. Namespace: "",
  156. }
  157. klog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
  158. // TODO: This requires a transaction, either both node status is updated
  159. // and event is recorded or neither should happen, see issue #6055.
  160. recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
  161. }
  162. // SwapNodeControllerTaint returns true in case of success and false
  163. // otherwise.
  164. func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
  165. for _, taintToAdd := range taintsToAdd {
  166. now := metav1.Now()
  167. taintToAdd.TimeAdded = &now
  168. }
  169. err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintsToAdd...)
  170. if err != nil {
  171. utilruntime.HandleError(
  172. fmt.Errorf(
  173. "unable to taint %+v unresponsive Node %q: %v",
  174. taintsToAdd,
  175. node.Name,
  176. err))
  177. return false
  178. }
  179. klog.V(4).Infof("Added %+v Taint to Node %v", taintsToAdd, node.Name)
  180. err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintsToRemove...)
  181. if err != nil {
  182. utilruntime.HandleError(
  183. fmt.Errorf(
  184. "unable to remove %+v unneeded taint from unresponsive Node %q: %v",
  185. taintsToRemove,
  186. node.Name,
  187. err))
  188. return false
  189. }
  190. klog.V(4).Infof("Made sure that Node %+v has no %v Taint", node.Name, taintsToRemove)
  191. return true
  192. }
  193. // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
  194. // success and false on failure.
  195. func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
  196. err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate)
  197. if err != nil {
  198. utilruntime.HandleError(
  199. fmt.Errorf(
  200. "unable to update labels %+v for Node %q: %v",
  201. labelsToUpdate,
  202. node.Name,
  203. err))
  204. return false
  205. }
  206. klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name)
  207. return true
  208. }
  209. // CreateAddNodeHandler creates an add node handler.
  210. func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
  211. return func(originalObj interface{}) {
  212. node := originalObj.(*v1.Node).DeepCopy()
  213. if err := f(node); err != nil {
  214. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
  215. }
  216. }
  217. }
  218. // CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
  219. func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
  220. return func(origOldObj, origNewObj interface{}) {
  221. node := origNewObj.(*v1.Node).DeepCopy()
  222. prevNode := origOldObj.(*v1.Node).DeepCopy()
  223. if err := f(prevNode, node); err != nil {
  224. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
  225. }
  226. }
  227. }
  228. // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
  229. func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
  230. return func(originalObj interface{}) {
  231. originalNode, isNode := originalObj.(*v1.Node)
  232. // We can get DeletedFinalStateUnknown instead of *v1.Node here and
  233. // we need to handle that correctly. #34692
  234. if !isNode {
  235. deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
  236. if !ok {
  237. klog.Errorf("Received unexpected object: %v", originalObj)
  238. return
  239. }
  240. originalNode, ok = deletedState.Obj.(*v1.Node)
  241. if !ok {
  242. klog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
  243. return
  244. }
  245. }
  246. node := originalNode.DeepCopy()
  247. if err := f(node); err != nil {
  248. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
  249. }
  250. }
  251. }
  252. // GetNodeCondition extracts the provided condition from the given status and returns that.
  253. // Returns nil and -1 if the condition is not present, and the index of the located condition.
  254. func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
  255. if status == nil {
  256. return -1, nil
  257. }
  258. for i := range status.Conditions {
  259. if status.Conditions[i].Type == conditionType {
  260. return i, &status.Conditions[i]
  261. }
  262. }
  263. return -1, nil
  264. }