123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package daemon
- import (
- "bytes"
- "fmt"
- "reflect"
- "sort"
- "k8s.io/klog"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime"
- intstrutil "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/apimachinery/pkg/util/json"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/daemon/util"
- labelsutil "k8s.io/kubernetes/pkg/util/labels"
- )
- // rollingUpdate deletes old daemon set pods making sure that no more than
- // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
- func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
- nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
- _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
- maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
- if err != nil {
- return fmt.Errorf("Couldn't get unavailable numbers: %v", err)
- }
- oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
- // for oldPods delete all not running pods
- var oldPodsToDelete []string
- klog.V(4).Infof("Marking all unavailable old pods for deletion")
- for _, pod := range oldUnavailablePods {
- // Skip terminating pods. We won't delete them again
- if pod.DeletionTimestamp != nil {
- continue
- }
- klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
- oldPodsToDelete = append(oldPodsToDelete, pod.Name)
- }
- klog.V(4).Infof("Marking old pods for deletion")
- for _, pod := range oldAvailablePods {
- if numUnavailable >= maxUnavailable {
- klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
- break
- }
- klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
- oldPodsToDelete = append(oldPodsToDelete, pod.Name)
- numUnavailable++
- }
- return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
- }
- // constructHistory finds all histories controlled by the given DaemonSet, and
- // update current history revision number, or create current history if need to.
- // It also deduplicates current history, and adds missing unique labels to existing histories.
- func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
- var histories []*apps.ControllerRevision
- var currentHistories []*apps.ControllerRevision
- histories, err = dsc.controlledHistories(ds)
- if err != nil {
- return nil, nil, err
- }
- for _, history := range histories {
- // Add the unique label if it's not already added to the history
- // We use history name instead of computing hash, so that we don't need to worry about hash collision
- if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok {
- toUpdate := history.DeepCopy()
- toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name
- history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(toUpdate)
- if err != nil {
- return nil, nil, err
- }
- }
- // Compare histories with ds to separate cur and old history
- found := false
- found, err = Match(ds, history)
- if err != nil {
- return nil, nil, err
- }
- if found {
- currentHistories = append(currentHistories, history)
- } else {
- old = append(old, history)
- }
- }
- currRevision := maxRevision(old) + 1
- switch len(currentHistories) {
- case 0:
- // Create a new history if the current one isn't found
- cur, err = dsc.snapshot(ds, currRevision)
- if err != nil {
- return nil, nil, err
- }
- default:
- cur, err = dsc.dedupCurHistories(ds, currentHistories)
- if err != nil {
- return nil, nil, err
- }
- // Update revision number if necessary
- if cur.Revision < currRevision {
- toUpdate := cur.DeepCopy()
- toUpdate.Revision = currRevision
- _, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(toUpdate)
- if err != nil {
- return nil, nil, err
- }
- }
- }
- return cur, old, err
- }
- func (dsc *DaemonSetsController) cleanupHistory(ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
- nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
- toKeep := int(*ds.Spec.RevisionHistoryLimit)
- toKill := len(old) - toKeep
- if toKill <= 0 {
- return nil
- }
- // Find all hashes of live pods
- liveHashes := make(map[string]bool)
- for _, pods := range nodesToDaemonPods {
- for _, pod := range pods {
- if hash := pod.Labels[apps.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 {
- liveHashes[hash] = true
- }
- }
- }
- // Find all live history with the above hashes
- liveHistory := make(map[string]bool)
- for _, history := range old {
- if hash := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] {
- liveHistory[history.Name] = true
- }
- }
- // Clean up old history from smallest to highest revision (from oldest to newest)
- sort.Sort(historiesByRevision(old))
- for _, history := range old {
- if toKill <= 0 {
- break
- }
- if liveHistory[history.Name] {
- continue
- }
- // Clean up
- err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(history.Name, nil)
- if err != nil {
- return err
- }
- toKill--
- }
- return nil
- }
- // maxRevision returns the max revision number of the given list of histories
- func maxRevision(histories []*apps.ControllerRevision) int64 {
- max := int64(0)
- for _, history := range histories {
- if history.Revision > max {
- max = history.Revision
- }
- }
- return max
- }
- func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) {
- if len(curHistories) == 1 {
- return curHistories[0], nil
- }
- var maxRevision int64
- var keepCur *apps.ControllerRevision
- for _, cur := range curHistories {
- if cur.Revision >= maxRevision {
- keepCur = cur
- maxRevision = cur.Revision
- }
- }
- // Clean up duplicates and relabel pods
- for _, cur := range curHistories {
- if cur.Name == keepCur.Name {
- continue
- }
- // Relabel pods before dedup
- pods, err := dsc.getDaemonPods(ds)
- if err != nil {
- return nil, err
- }
- for _, pod := range pods {
- if pod.Labels[apps.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] {
- toUpdate := pod.DeepCopy()
- if toUpdate.Labels == nil {
- toUpdate.Labels = make(map[string]string)
- }
- toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
- _, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Update(toUpdate)
- if err != nil {
- return nil, err
- }
- }
- }
- // Remove duplicates
- err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(cur.Name, nil)
- if err != nil {
- return nil, err
- }
- }
- return keepCur, nil
- }
- // controlledHistories returns all ControllerRevisions controlled by the given DaemonSet.
- // This also reconciles ControllerRef by adopting/orphaning.
- // Note that returned histories are pointers to objects in the cache.
- // If you want to modify one, you need to deep-copy it first.
- func (dsc *DaemonSetsController) controlledHistories(ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) {
- selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
- if err != nil {
- return nil, err
- }
- // List all histories to include those that don't match the selector anymore
- // but have a ControllerRef pointing to the controller.
- histories, err := dsc.historyLister.List(labels.Everything())
- if err != nil {
- return nil, err
- }
- // If any adoptions are attempted, we should first recheck for deletion with
- // an uncached quorum read sometime after listing Pods (see #42639).
- canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
- fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
- if fresh.UID != ds.UID {
- return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
- }
- return fresh, nil
- })
- // Use ControllerRefManager to adopt/orphan as needed.
- cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc)
- return cm.ClaimControllerRevisions(histories)
- }
- // Match check if the given DaemonSet's template matches the template stored in the given history.
- func Match(ds *apps.DaemonSet, history *apps.ControllerRevision) (bool, error) {
- patch, err := getPatch(ds)
- if err != nil {
- return false, err
- }
- return bytes.Equal(patch, history.Data.Raw), nil
- }
- // getPatch returns a strategic merge patch that can be applied to restore a Daemonset to a
- // previous version. If the returned error is nil the patch is valid. The current state that we save is just the
- // PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
- // recorded patches.
- func getPatch(ds *apps.DaemonSet) ([]byte, error) {
- dsBytes, err := json.Marshal(ds)
- if err != nil {
- return nil, err
- }
- var raw map[string]interface{}
- err = json.Unmarshal(dsBytes, &raw)
- if err != nil {
- return nil, err
- }
- objCopy := make(map[string]interface{})
- specCopy := make(map[string]interface{})
- // Create a patch of the DaemonSet that replaces spec.template
- spec := raw["spec"].(map[string]interface{})
- template := spec["template"].(map[string]interface{})
- specCopy["template"] = template
- template["$patch"] = "replace"
- objCopy["spec"] = specCopy
- patch, err := json.Marshal(objCopy)
- return patch, err
- }
- func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
- patch, err := getPatch(ds)
- if err != nil {
- return nil, err
- }
- hash := controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)
- name := ds.Name + "-" + hash
- history := &apps.ControllerRevision{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: ds.Namespace,
- Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, hash),
- Annotations: ds.Annotations,
- OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)},
- },
- Data: runtime.RawExtension{Raw: patch},
- Revision: revision,
- }
- history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(history)
- if outerErr := err; errors.IsAlreadyExists(outerErr) {
- // TODO: Is it okay to get from historyLister?
- existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
- if getErr != nil {
- return nil, getErr
- }
- // Check if we already created it
- done, matchErr := Match(ds, existedHistory)
- if matchErr != nil {
- return nil, matchErr
- }
- if done {
- return existedHistory, nil
- }
- // Handle name collisions between different history
- // Get the latest DaemonSet from the API server to make sure collision count is only increased when necessary
- currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
- if getErr != nil {
- return nil, getErr
- }
- // If the collision count used to compute hash was in fact stale, there's no need to bump collision count; retry again
- if !reflect.DeepEqual(currDS.Status.CollisionCount, ds.Status.CollisionCount) {
- return nil, fmt.Errorf("found a stale collision count (%d, expected %d) of DaemonSet %q while processing; will retry until it is updated", ds.Status.CollisionCount, currDS.Status.CollisionCount, ds.Name)
- }
- if currDS.Status.CollisionCount == nil {
- currDS.Status.CollisionCount = new(int32)
- }
- *currDS.Status.CollisionCount++
- _, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(currDS)
- if updateErr != nil {
- return nil, updateErr
- }
- klog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
- return nil, outerErr
- }
- return history, err
- }
- func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) {
- var newPods []*v1.Pod
- var oldPods []*v1.Pod
- for _, pods := range nodeToDaemonPods {
- for _, pod := range pods {
- // If the returned error is not nil we have a parse error.
- // The controller handles this via the hash.
- generation, err := util.GetTemplateGeneration(ds)
- if err != nil {
- generation = nil
- }
- if util.IsPodUpdated(pod, hash, generation) {
- newPods = append(newPods, pod)
- } else {
- oldPods = append(oldPods, pod)
- }
- }
- }
- return newPods, oldPods
- }
- func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
- klog.V(4).Infof("Getting unavailable numbers")
- var numUnavailable, desiredNumberScheduled int
- for i := range nodeList {
- node := nodeList[i]
- wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- return -1, -1, err
- }
- if !wantToRun {
- continue
- }
- desiredNumberScheduled++
- daemonPods, exists := nodeToDaemonPods[node.Name]
- if !exists {
- numUnavailable++
- continue
- }
- available := false
- for _, pod := range daemonPods {
- //for the purposes of update we ensure that the Pod is both available and not terminating
- if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) && pod.DeletionTimestamp == nil {
- available = true
- break
- }
- }
- if !available {
- numUnavailable++
- }
- }
- maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desiredNumberScheduled, true)
- if err != nil {
- return -1, -1, fmt.Errorf("Invalid value for MaxUnavailable: %v", err)
- }
- klog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable)
- return maxUnavailable, numUnavailable, nil
- }
- type historiesByRevision []*apps.ControllerRevision
- func (h historiesByRevision) Len() int { return len(h) }
- func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
- func (h historiesByRevision) Less(i, j int) bool {
- return h[i].Revision < h[j].Revision
- }
|