controller_utils.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. apierrors "k8s.io/apimachinery/pkg/api/errors"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/client-go/tools/cache"
  24. "k8s.io/client-go/tools/record"
  25. "k8s.io/api/core/v1"
  26. clientset "k8s.io/client-go/kubernetes"
  27. appsv1listers "k8s.io/client-go/listers/apps/v1"
  28. utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/kubelet/util/format"
  31. nodepkg "k8s.io/kubernetes/pkg/util/node"
  32. "k8s.io/klog"
  33. )
  34. // DeletePods will delete all pods from master running on given node,
  35. // and return true if any pods were deleted, or were found pending
  36. // deletion.
  37. func DeletePods(kubeClient clientset.Interface, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
  38. remaining := false
  39. var updateErrList []error
  40. if len(pods) > 0 {
  41. RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
  42. }
  43. for i := range pods {
  44. // Defensive check, also needed for tests.
  45. if pods[i].Spec.NodeName != nodeName {
  46. continue
  47. }
  48. // Pod will be modified, so making copy is required.
  49. pod := pods[i].DeepCopy()
  50. // Set reason and message in the pod object.
  51. if _, err := SetPodTerminationReason(kubeClient, pod, nodeName); err != nil {
  52. if apierrors.IsConflict(err) {
  53. updateErrList = append(updateErrList,
  54. fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err))
  55. continue
  56. }
  57. }
  58. // if the pod has already been marked for deletion, we still return true that there are remaining pods.
  59. if pod.DeletionGracePeriodSeconds != nil {
  60. remaining = true
  61. continue
  62. }
  63. // if the pod is managed by a daemonset, ignore it
  64. if _, err := daemonStore.GetPodDaemonSets(pod); err == nil {
  65. // No error means at least one daemonset was found
  66. continue
  67. }
  68. klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
  69. recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
  70. if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, nil); err != nil {
  71. if apierrors.IsNotFound(err) {
  72. // NotFound error means that pod was already deleted.
  73. // There is nothing left to do with this pod.
  74. continue
  75. }
  76. return false, err
  77. }
  78. remaining = true
  79. }
  80. if len(updateErrList) > 0 {
  81. return false, utilerrors.NewAggregate(updateErrList)
  82. }
  83. return remaining, nil
  84. }
  85. // SetPodTerminationReason attempts to set a reason and message in the
  86. // pod status, updates it in the apiserver, and returns an error if it
  87. // encounters one.
  88. func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
  89. if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
  90. return pod, nil
  91. }
  92. pod.Status.Reason = nodepkg.NodeUnreachablePodReason
  93. pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
  94. var updatedPod *v1.Pod
  95. var err error
  96. if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
  97. return nil, err
  98. }
  99. return updatedPod, nil
  100. }
  101. // MarkPodsNotReady updates ready status of given pods running on
  102. // given node from master return true if success
  103. func MarkPodsNotReady(kubeClient clientset.Interface, pods []*v1.Pod, nodeName string) error {
  104. klog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
  105. errMsg := []string{}
  106. for i := range pods {
  107. // Defensive check, also needed for tests.
  108. if pods[i].Spec.NodeName != nodeName {
  109. continue
  110. }
  111. // Pod will be modified, so making copy is required.
  112. pod := pods[i].DeepCopy()
  113. for _, cond := range pod.Status.Conditions {
  114. if cond.Type == v1.PodReady {
  115. cond.Status = v1.ConditionFalse
  116. if !utilpod.UpdatePodCondition(&pod.Status, &cond) {
  117. break
  118. }
  119. klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
  120. _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
  121. if err != nil {
  122. if apierrors.IsNotFound(err) {
  123. // NotFound error means that pod was already deleted.
  124. // There is nothing left to do with this pod.
  125. continue
  126. }
  127. klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
  128. errMsg = append(errMsg, fmt.Sprintf("%v", err))
  129. }
  130. break
  131. }
  132. }
  133. }
  134. if len(errMsg) == 0 {
  135. return nil
  136. }
  137. return fmt.Errorf("%v", strings.Join(errMsg, "; "))
  138. }
  139. // RecordNodeEvent records a event related to a node.
  140. func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
  141. ref := &v1.ObjectReference{
  142. Kind: "Node",
  143. Name: nodeName,
  144. UID: types.UID(nodeUID),
  145. Namespace: "",
  146. }
  147. klog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
  148. recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
  149. }
  150. // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
  151. func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
  152. ref := &v1.ObjectReference{
  153. Kind: "Node",
  154. Name: node.Name,
  155. UID: node.UID,
  156. Namespace: "",
  157. }
  158. klog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
  159. // TODO: This requires a transaction, either both node status is updated
  160. // and event is recorded or neither should happen, see issue #6055.
  161. recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
  162. }
  163. // SwapNodeControllerTaint returns true in case of success and false
  164. // otherwise.
  165. func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
  166. for _, taintToAdd := range taintsToAdd {
  167. now := metav1.Now()
  168. taintToAdd.TimeAdded = &now
  169. }
  170. err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintsToAdd...)
  171. if err != nil {
  172. utilruntime.HandleError(
  173. fmt.Errorf(
  174. "unable to taint %+v unresponsive Node %q: %v",
  175. taintsToAdd,
  176. node.Name,
  177. err))
  178. return false
  179. }
  180. klog.V(4).Infof("Added %+v Taint to Node %v", taintsToAdd, node.Name)
  181. err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintsToRemove...)
  182. if err != nil {
  183. utilruntime.HandleError(
  184. fmt.Errorf(
  185. "unable to remove %+v unneeded taint from unresponsive Node %q: %v",
  186. taintsToRemove,
  187. node.Name,
  188. err))
  189. return false
  190. }
  191. klog.V(4).Infof("Made sure that Node %+v has no %v Taint", node.Name, taintsToRemove)
  192. return true
  193. }
  194. // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
  195. // success and false on failure.
  196. func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
  197. err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate)
  198. if err != nil {
  199. utilruntime.HandleError(
  200. fmt.Errorf(
  201. "unable to update labels %+v for Node %q: %v",
  202. labelsToUpdate,
  203. node.Name,
  204. err))
  205. return false
  206. }
  207. klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name)
  208. return true
  209. }
  210. // CreateAddNodeHandler creates an add node handler.
  211. func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
  212. return func(originalObj interface{}) {
  213. node := originalObj.(*v1.Node).DeepCopy()
  214. if err := f(node); err != nil {
  215. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
  216. }
  217. }
  218. }
  219. // CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
  220. func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
  221. return func(origOldObj, origNewObj interface{}) {
  222. node := origNewObj.(*v1.Node).DeepCopy()
  223. prevNode := origOldObj.(*v1.Node).DeepCopy()
  224. if err := f(prevNode, node); err != nil {
  225. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
  226. }
  227. }
  228. }
  229. // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
  230. func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
  231. return func(originalObj interface{}) {
  232. originalNode, isNode := originalObj.(*v1.Node)
  233. // We can get DeletedFinalStateUnknown instead of *v1.Node here and
  234. // we need to handle that correctly. #34692
  235. if !isNode {
  236. deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
  237. if !ok {
  238. klog.Errorf("Received unexpected object: %v", originalObj)
  239. return
  240. }
  241. originalNode, ok = deletedState.Obj.(*v1.Node)
  242. if !ok {
  243. klog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
  244. return
  245. }
  246. }
  247. node := originalNode.DeepCopy()
  248. if err := f(node); err != nil {
  249. utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
  250. }
  251. }
  252. }
  253. // GetNodeCondition extracts the provided condition from the given status and returns that.
  254. // Returns nil and -1 if the condition is not present, and the index of the located condition.
  255. func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
  256. if status == nil {
  257. return -1, nil
  258. }
  259. for i := range status.Conditions {
  260. if status.Conditions[i].Type == conditionType {
  261. return i, &status.Conditions[i]
  262. }
  263. }
  264. return -1, nil
  265. }