123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package node
- import (
- "fmt"
- "strings"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/types"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/api/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- appsv1listers "k8s.io/client-go/listers/apps/v1"
- utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- nodepkg "k8s.io/kubernetes/pkg/util/node"
- "k8s.io/klog"
- )
- // DeletePods will delete all pods from master running on given node,
- // and return true if any pods were deleted, or were found pending
- // deletion.
- func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
- remaining := false
- selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
- options := metav1.ListOptions{FieldSelector: selector}
- pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
- var updateErrList []error
- if err != nil {
- return remaining, err
- }
- if len(pods.Items) > 0 {
- RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
- }
- for _, pod := range pods.Items {
- // Defensive check, also needed for tests.
- if pod.Spec.NodeName != nodeName {
- continue
- }
- // Set reason and message in the pod object.
- if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
- if apierrors.IsConflict(err) {
- updateErrList = append(updateErrList,
- fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err))
- continue
- }
- }
- // if the pod has already been marked for deletion, we still return true that there are remaining pods.
- if pod.DeletionGracePeriodSeconds != nil {
- remaining = true
- continue
- }
- // if the pod is managed by a daemonset, ignore it
- _, err := daemonStore.GetPodDaemonSets(&pod)
- if err == nil { // No error means at least one daemonset was found
- continue
- }
- klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
- recorder.Eventf(&pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
- if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
- return false, err
- }
- remaining = true
- }
- if len(updateErrList) > 0 {
- return false, utilerrors.NewAggregate(updateErrList)
- }
- return remaining, nil
- }
- // SetPodTerminationReason attempts to set a reason and message in the
- // pod status, updates it in the apiserver, and returns an error if it
- // encounters one.
- func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
- if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
- return pod, nil
- }
- pod.Status.Reason = nodepkg.NodeUnreachablePodReason
- pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
- var updatedPod *v1.Pod
- var err error
- if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
- return nil, err
- }
- return updatedPod, nil
- }
- // MarkAllPodsNotReady updates ready status of all pods running on
- // given node from master return true if success
- func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
- nodeName := node.Name
- klog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
- opts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String()}
- pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(opts)
- if err != nil {
- return err
- }
- errMsg := []string{}
- for _, pod := range pods.Items {
- // Defensive check, also needed for tests.
- if pod.Spec.NodeName != nodeName {
- continue
- }
- for _, cond := range pod.Status.Conditions {
- if cond.Type == v1.PodReady {
- cond.Status = v1.ConditionFalse
- if !utilpod.UpdatePodCondition(&pod.Status, &cond) {
- break
- }
- klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
- _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(&pod)
- if err != nil {
- klog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
- errMsg = append(errMsg, fmt.Sprintf("%v", err))
- }
- break
- }
- }
- }
- if len(errMsg) == 0 {
- return nil
- }
- return fmt.Errorf("%v", strings.Join(errMsg, "; "))
- }
- // RecordNodeEvent records a event related to a node.
- func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
- ref := &v1.ObjectReference{
- Kind: "Node",
- Name: nodeName,
- UID: types.UID(nodeUID),
- Namespace: "",
- }
- klog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
- recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
- }
- // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
- func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
- ref := &v1.ObjectReference{
- Kind: "Node",
- Name: node.Name,
- UID: node.UID,
- Namespace: "",
- }
- klog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
- // TODO: This requires a transaction, either both node status is updated
- // and event is recorded or neither should happen, see issue #6055.
- recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
- }
- // SwapNodeControllerTaint returns true in case of success and false
- // otherwise.
- func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
- for _, taintToAdd := range taintsToAdd {
- now := metav1.Now()
- taintToAdd.TimeAdded = &now
- }
- err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintsToAdd...)
- if err != nil {
- utilruntime.HandleError(
- fmt.Errorf(
- "unable to taint %+v unresponsive Node %q: %v",
- taintsToAdd,
- node.Name,
- err))
- return false
- }
- klog.V(4).Infof("Added %+v Taint to Node %v", taintsToAdd, node.Name)
- err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintsToRemove...)
- if err != nil {
- utilruntime.HandleError(
- fmt.Errorf(
- "unable to remove %+v unneeded taint from unresponsive Node %q: %v",
- taintsToRemove,
- node.Name,
- err))
- return false
- }
- klog.V(4).Infof("Made sure that Node %+v has no %v Taint", node.Name, taintsToRemove)
- return true
- }
- // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
- // success and false on failure.
- func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
- err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate)
- if err != nil {
- utilruntime.HandleError(
- fmt.Errorf(
- "unable to update labels %+v for Node %q: %v",
- labelsToUpdate,
- node.Name,
- err))
- return false
- }
- klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name)
- return true
- }
- // CreateAddNodeHandler creates an add node handler.
- func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
- return func(originalObj interface{}) {
- node := originalObj.(*v1.Node).DeepCopy()
- if err := f(node); err != nil {
- utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
- }
- }
- }
- // CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
- func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
- return func(origOldObj, origNewObj interface{}) {
- node := origNewObj.(*v1.Node).DeepCopy()
- prevNode := origOldObj.(*v1.Node).DeepCopy()
- if err := f(prevNode, node); err != nil {
- utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
- }
- }
- }
- // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
- func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
- return func(originalObj interface{}) {
- originalNode, isNode := originalObj.(*v1.Node)
- // We can get DeletedFinalStateUnknown instead of *v1.Node here and
- // we need to handle that correctly. #34692
- if !isNode {
- deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
- if !ok {
- klog.Errorf("Received unexpected object: %v", originalObj)
- return
- }
- originalNode, ok = deletedState.Obj.(*v1.Node)
- if !ok {
- klog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
- return
- }
- }
- node := originalNode.DeepCopy()
- if err := f(node); err != nil {
- utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
- }
- }
- }
- // GetNodeCondition extracts the provided condition from the given status and returns that.
- // Returns nil and -1 if the condition is not present, and the index of the located condition.
- func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
- if status == nil {
- return -1, nil
- }
- for i := range status.Conditions {
- if status.Conditions[i].Type == conditionType {
- return i, &status.Conditions[i]
- }
- }
- return -1, nil
- }
|