1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package controller
- import (
- "context"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "hash/fnv"
- "sync"
- "sync/atomic"
- "time"
- apps "k8s.io/api/apps/v1"
- v1 "k8s.io/api/core/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/rand"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/apimachinery/pkg/util/wait"
- clientset "k8s.io/client-go/kubernetes"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- clientretry "k8s.io/client-go/util/retry"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- _ "k8s.io/kubernetes/pkg/apis/core/install"
- "k8s.io/kubernetes/pkg/apis/core/validation"
- hashutil "k8s.io/kubernetes/pkg/util/hash"
- taintutils "k8s.io/kubernetes/pkg/util/taints"
- "k8s.io/utils/integer"
- "k8s.io/klog"
- )
- const (
- // If a watch drops a delete event for a pod, it'll take this long
- // before a dormant controller waiting for those packets is woken up anyway. It is
- // specifically targeted at the case where some problem prevents an update
- // of expectations, without it the controller could stay asleep forever. This should
- // be set based on the expected latency of watch events.
- //
- // Currently a controller can service (create *and* observe the watch events for said
- // creation) about 10 pods a second, so it takes about 1 min to service
- // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
- // latency/pod at the scale of 3000 pods over 100 nodes.
- ExpectationsTimeout = 5 * time.Minute
- // When batching pod creates, SlowStartInitialBatchSize is the size of the
- // initial batch. The size of each successive batch is twice the size of
- // the previous batch. For example, for a value of 1, batch sizes would be
- // 1, 2, 4, 8, ... and for a value of 10, batch sizes would be
- // 10, 20, 40, 80, ... Setting the value higher means that quota denials
- // will result in more doomed API calls and associated event spam. Setting
- // the value lower will result in more API call round trip periods for
- // large batches.
- //
- // Given a number of pods to start "N":
- // The number of doomed calls per sync once quota is exceeded is given by:
- // min(N,SlowStartInitialBatchSize)
- // The number of batches is given by:
- // 1+floor(log_2(ceil(N/SlowStartInitialBatchSize)))
- SlowStartInitialBatchSize = 1
- )
- var UpdateTaintBackoff = wait.Backoff{
- Steps: 5,
- Duration: 100 * time.Millisecond,
- Jitter: 1.0,
- }
- var UpdateLabelBackoff = wait.Backoff{
- Steps: 5,
- Duration: 100 * time.Millisecond,
- Jitter: 1.0,
- }
- var (
- KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
- podPhaseToOrdinal = map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
- )
- type ResyncPeriodFunc func() time.Duration
- // Returns 0 for resyncPeriod in case resyncing is not needed.
- func NoResyncPeriodFunc() time.Duration {
- return 0
- }
- // StaticResyncPeriodFunc returns the resync period specified
- func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc {
- return func() time.Duration {
- return resyncPeriod
- }
- }
- // Expectations are a way for controllers to tell the controller manager what they expect. eg:
- // ControllerExpectations: {
- // controller1: expects 2 adds in 2 minutes
- // controller2: expects 2 dels in 2 minutes
- // controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met
- // }
- //
- // Implementation:
- // ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
- // ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
- //
- // * Once set expectations can only be lowered
- // * A controller isn't synced till its expectations are either fulfilled, or expire
- // * Controllers that don't set expectations will get woken up for every matching controllee
- // ExpKeyFunc to parse out the key from a ControlleeExpectation
- var ExpKeyFunc = func(obj interface{}) (string, error) {
- if e, ok := obj.(*ControlleeExpectations); ok {
- return e.key, nil
- }
- return "", fmt.Errorf("could not find key for obj %#v", obj)
- }
- // ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
- // Only abstracted out for testing.
- // Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
- // types of controllers, because the keys might conflict across types.
- type ControllerExpectationsInterface interface {
- GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
- SatisfiedExpectations(controllerKey string) bool
- DeleteExpectations(controllerKey string)
- SetExpectations(controllerKey string, add, del int) error
- ExpectCreations(controllerKey string, adds int) error
- ExpectDeletions(controllerKey string, dels int) error
- CreationObserved(controllerKey string)
- DeletionObserved(controllerKey string)
- RaiseExpectations(controllerKey string, add, del int)
- LowerExpectations(controllerKey string, add, del int)
- }
- // ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
- type ControllerExpectations struct {
- cache.Store
- }
- // GetExpectations returns the ControlleeExpectations of the given controller.
- func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
- exp, exists, err := r.GetByKey(controllerKey)
- if err == nil && exists {
- return exp.(*ControlleeExpectations), true, nil
- }
- return nil, false, err
- }
- // DeleteExpectations deletes the expectations of the given controller from the TTLStore.
- func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
- if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
- if err := r.Delete(exp); err != nil {
- klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
- }
- }
- }
- // SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
- // Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
- // manager.
- func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
- if exp, exists, err := r.GetExpectations(controllerKey); exists {
- if exp.Fulfilled() {
- klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
- return true
- } else if exp.isExpired() {
- klog.V(4).Infof("Controller expectations expired %#v", exp)
- return true
- } else {
- klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
- return false
- }
- } else if err != nil {
- klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
- } else {
- // When a new controller is created, it doesn't have expectations.
- // When it doesn't see expected watch events for > TTL, the expectations expire.
- // - In this case it wakes up, creates/deletes controllees, and sets expectations again.
- // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
- // - In this case it continues without setting expectations till it needs to create/delete controllees.
- klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
- }
- // Trigger a sync if we either encountered and error (which shouldn't happen since we're
- // getting from local store) or this controller hasn't established expectations.
- return true
- }
- // TODO: Extend ExpirationCache to support explicit expiration.
- // TODO: Make this possible to disable in tests.
- // TODO: Support injection of clock.
- func (exp *ControlleeExpectations) isExpired() bool {
- return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
- }
- // SetExpectations registers new expectations for the given controller. Forgets existing expectations.
- func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
- exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
- klog.V(4).Infof("Setting expectations %#v", exp)
- return r.Add(exp)
- }
- func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
- return r.SetExpectations(controllerKey, adds, 0)
- }
- func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error {
- return r.SetExpectations(controllerKey, 0, dels)
- }
- // Decrements the expectation counts of the given controller.
- func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
- if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
- exp.Add(int64(-add), int64(-del))
- // The expectations might've been modified since the update on the previous line.
- klog.V(4).Infof("Lowered expectations %#v", exp)
- }
- }
- // Increments the expectation counts of the given controller.
- func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
- if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
- exp.Add(int64(add), int64(del))
- // The expectations might've been modified since the update on the previous line.
- klog.V(4).Infof("Raised expectations %#v", exp)
- }
- }
- // CreationObserved atomically decrements the `add` expectation count of the given controller.
- func (r *ControllerExpectations) CreationObserved(controllerKey string) {
- r.LowerExpectations(controllerKey, 1, 0)
- }
- // DeletionObserved atomically decrements the `del` expectation count of the given controller.
- func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
- r.LowerExpectations(controllerKey, 0, 1)
- }
- // Expectations are either fulfilled, or expire naturally.
- type Expectations interface {
- Fulfilled() bool
- }
- // ControlleeExpectations track controllee creates/deletes.
- type ControlleeExpectations struct {
- // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
- // See: https://golang.org/pkg/sync/atomic/ for more information
- add int64
- del int64
- key string
- timestamp time.Time
- }
- // Add increments the add and del counters.
- func (e *ControlleeExpectations) Add(add, del int64) {
- atomic.AddInt64(&e.add, add)
- atomic.AddInt64(&e.del, del)
- }
- // Fulfilled returns true if this expectation has been fulfilled.
- func (e *ControlleeExpectations) Fulfilled() bool {
- // TODO: think about why this line being atomic doesn't matter
- return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
- }
- // GetExpectations returns the add and del expectations of the controllee.
- func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
- return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
- }
- // NewControllerExpectations returns a store for ControllerExpectations.
- func NewControllerExpectations() *ControllerExpectations {
- return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
- }
- // UIDSetKeyFunc to parse out the key from a UIDSet.
- var UIDSetKeyFunc = func(obj interface{}) (string, error) {
- if u, ok := obj.(*UIDSet); ok {
- return u.key, nil
- }
- return "", fmt.Errorf("could not find key for obj %#v", obj)
- }
- // UIDSet holds a key and a set of UIDs. Used by the
- // UIDTrackingControllerExpectations to remember which UID it has seen/still
- // waiting for.
- type UIDSet struct {
- sets.String
- key string
- }
- // UIDTrackingControllerExpectations tracks the UID of the pods it deletes.
- // This cache is needed over plain old expectations to safely handle graceful
- // deletion. The desired behavior is to treat an update that sets the
- // DeletionTimestamp on an object as a delete. To do so consistently, one needs
- // to remember the expected deletes so they aren't double counted.
- // TODO: Track creates as well (#22599)
- type UIDTrackingControllerExpectations struct {
- ControllerExpectationsInterface
- // TODO: There is a much nicer way to do this that involves a single store,
- // a lock per entry, and a ControlleeExpectationsInterface type.
- uidStoreLock sync.Mutex
- // Store used for the UIDs associated with any expectation tracked via the
- // ControllerExpectationsInterface.
- uidStore cache.Store
- }
- // GetUIDs is a convenience method to avoid exposing the set of expected uids.
- // The returned set is not thread safe, all modifications must be made holding
- // the uidStoreLock.
- func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
- if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
- return uid.(*UIDSet).String
- }
- return nil
- }
- // ExpectDeletions records expectations for the given deleteKeys, against the given controller.
- func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error {
- expectedUIDs := sets.NewString()
- for _, k := range deletedKeys {
- expectedUIDs.Insert(k)
- }
- klog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
- u.uidStoreLock.Lock()
- defer u.uidStoreLock.Unlock()
- if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
- klog.Errorf("Clobbering existing delete keys: %+v", existing)
- }
- if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
- return err
- }
- return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
- }
- // DeletionObserved records the given deleteKey as a deletion, for the given rc.
- func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
- u.uidStoreLock.Lock()
- defer u.uidStoreLock.Unlock()
- uids := u.GetUIDs(rcKey)
- if uids != nil && uids.Has(deleteKey) {
- klog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
- u.ControllerExpectationsInterface.DeletionObserved(rcKey)
- uids.Delete(deleteKey)
- }
- }
- // DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
- // underlying ControllerExpectationsInterface.
- func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
- u.uidStoreLock.Lock()
- defer u.uidStoreLock.Unlock()
- u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
- if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
- if err := u.uidStore.Delete(uidExp); err != nil {
- klog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
- }
- }
- }
- // NewUIDTrackingControllerExpectations returns a wrapper around
- // ControllerExpectations that is aware of deleteKeys.
- func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
- return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
- }
- // Reasons for pod events
- const (
- // FailedCreatePodReason is added in an event and in a replica set condition
- // when a pod for a replica set is failed to be created.
- FailedCreatePodReason = "FailedCreate"
- // SuccessfulCreatePodReason is added in an event when a pod for a replica set
- // is successfully created.
- SuccessfulCreatePodReason = "SuccessfulCreate"
- // FailedDeletePodReason is added in an event and in a replica set condition
- // when a pod for a replica set is failed to be deleted.
- FailedDeletePodReason = "FailedDelete"
- // SuccessfulDeletePodReason is added in an event when a pod for a replica set
- // is successfully deleted.
- SuccessfulDeletePodReason = "SuccessfulDelete"
- )
- // RSControlInterface is an interface that knows how to add or delete
- // ReplicaSets, as well as increment or decrement them. It is used
- // by the deployment controller to ease testing of actions that it takes.
- type RSControlInterface interface {
- PatchReplicaSet(namespace, name string, data []byte) error
- }
- // RealRSControl is the default implementation of RSControllerInterface.
- type RealRSControl struct {
- KubeClient clientset.Interface
- Recorder record.EventRecorder
- }
- var _ RSControlInterface = &RealRSControl{}
- func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error {
- _, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
- return err
- }
- // TODO: merge the controller revision interface in controller_history.go with this one
- // ControllerRevisionControlInterface is an interface that knows how to patch
- // ControllerRevisions, as well as increment or decrement them. It is used
- // by the daemonset controller to ease testing of actions that it takes.
- type ControllerRevisionControlInterface interface {
- PatchControllerRevision(namespace, name string, data []byte) error
- }
- // RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface.
- type RealControllerRevisionControl struct {
- KubeClient clientset.Interface
- }
- var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{}
- func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error {
- _, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
- return err
- }
- // PodControlInterface is an interface that knows how to add or delete pods
- // created as an interface to allow testing.
- type PodControlInterface interface {
- // CreatePods creates new pods according to the spec.
- CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
- // CreatePodsOnNode creates a new pod according to the spec on the specified node,
- // and sets the ControllerRef.
- CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
- // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
- CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
- // DeletePod deletes the pod identified by podID.
- DeletePod(namespace string, podID string, object runtime.Object) error
- // PatchPod patches the pod.
- PatchPod(namespace, name string, data []byte) error
- }
- // RealPodControl is the default implementation of PodControlInterface.
- type RealPodControl struct {
- KubeClient clientset.Interface
- Recorder record.EventRecorder
- }
- var _ PodControlInterface = &RealPodControl{}
- func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set {
- desiredLabels := make(labels.Set)
- for k, v := range template.Labels {
- desiredLabels[k] = v
- }
- return desiredLabels
- }
- func getPodsFinalizers(template *v1.PodTemplateSpec) []string {
- desiredFinalizers := make([]string, len(template.Finalizers))
- copy(desiredFinalizers, template.Finalizers)
- return desiredFinalizers
- }
- func getPodsAnnotationSet(template *v1.PodTemplateSpec) labels.Set {
- desiredAnnotations := make(labels.Set)
- for k, v := range template.Annotations {
- desiredAnnotations[k] = v
- }
- return desiredAnnotations
- }
- func getPodsPrefix(controllerName string) string {
- // use the dash (if the name isn't too long) to make the pod name a bit prettier
- prefix := fmt.Sprintf("%s-", controllerName)
- if len(validation.ValidatePodName(prefix, true)) != 0 {
- prefix = controllerName
- }
- return prefix
- }
- func validateControllerRef(controllerRef *metav1.OwnerReference) error {
- if controllerRef == nil {
- return fmt.Errorf("controllerRef is nil")
- }
- if len(controllerRef.APIVersion) == 0 {
- return fmt.Errorf("controllerRef has empty APIVersion")
- }
- if len(controllerRef.Kind) == 0 {
- return fmt.Errorf("controllerRef has empty Kind")
- }
- if controllerRef.Controller == nil || *controllerRef.Controller != true {
- return fmt.Errorf("controllerRef.Controller is not set to true")
- }
- if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true {
- return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set")
- }
- return nil
- }
- func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
- return r.createPods("", namespace, template, object, nil)
- }
- func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
- if err := validateControllerRef(controllerRef); err != nil {
- return err
- }
- return r.createPods("", namespace, template, controllerObject, controllerRef)
- }
- func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
- if err := validateControllerRef(controllerRef); err != nil {
- return err
- }
- return r.createPods(nodeName, namespace, template, object, controllerRef)
- }
- func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
- _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
- return err
- }
- func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error) {
- desiredLabels := getPodsLabelSet(template)
- desiredFinalizers := getPodsFinalizers(template)
- desiredAnnotations := getPodsAnnotationSet(template)
- accessor, err := meta.Accessor(parentObject)
- if err != nil {
- return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err)
- }
- prefix := getPodsPrefix(accessor.GetName())
- pod := &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Labels: desiredLabels,
- Annotations: desiredAnnotations,
- GenerateName: prefix,
- Finalizers: desiredFinalizers,
- },
- }
- if controllerRef != nil {
- pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef)
- }
- pod.Spec = *template.Spec.DeepCopy()
- return pod, nil
- }
- func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
- pod, err := GetPodFromTemplate(template, object, controllerRef)
- if err != nil {
- return err
- }
- if len(nodeName) != 0 {
- pod.Spec.NodeName = nodeName
- }
- if len(labels.Set(pod.Labels)) == 0 {
- return fmt.Errorf("unable to create pods, no labels")
- }
- newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
- if err != nil {
- // only send an event if the namespace isn't terminating
- if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
- r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
- }
- return err
- }
- accessor, err := meta.Accessor(object)
- if err != nil {
- klog.Errorf("parentObject does not have ObjectMeta, %v", err)
- return nil
- }
- klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
- r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)
- return nil
- }
- func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
- accessor, err := meta.Accessor(object)
- if err != nil {
- return fmt.Errorf("object does not have ObjectMeta, %v", err)
- }
- klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
- if err := r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), podID, nil); err != nil && !apierrors.IsNotFound(err) {
- r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
- return fmt.Errorf("unable to delete pods: %v", err)
- }
- r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)
- return nil
- }
- type FakePodControl struct {
- sync.Mutex
- Templates []v1.PodTemplateSpec
- ControllerRefs []metav1.OwnerReference
- DeletePodName []string
- Patches [][]byte
- Err error
- CreateLimit int
- CreateCallCount int
- }
- var _ PodControlInterface = &FakePodControl{}
- func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
- f.Lock()
- defer f.Unlock()
- f.Patches = append(f.Patches, data)
- if f.Err != nil {
- return f.Err
- }
- return nil
- }
- func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
- f.Lock()
- defer f.Unlock()
- f.CreateCallCount++
- if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
- return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
- }
- f.Templates = append(f.Templates, *spec)
- if f.Err != nil {
- return f.Err
- }
- return nil
- }
- func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
- f.Lock()
- defer f.Unlock()
- f.CreateCallCount++
- if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
- return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
- }
- f.Templates = append(f.Templates, *spec)
- f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
- if f.Err != nil {
- return f.Err
- }
- return nil
- }
- func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
- f.Lock()
- defer f.Unlock()
- f.CreateCallCount++
- if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
- return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
- }
- f.Templates = append(f.Templates, *template)
- f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
- if f.Err != nil {
- return f.Err
- }
- return nil
- }
- func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
- f.Lock()
- defer f.Unlock()
- f.DeletePodName = append(f.DeletePodName, podID)
- if f.Err != nil {
- return f.Err
- }
- return nil
- }
- func (f *FakePodControl) Clear() {
- f.Lock()
- defer f.Unlock()
- f.DeletePodName = []string{}
- f.Templates = []v1.PodTemplateSpec{}
- f.ControllerRefs = []metav1.OwnerReference{}
- f.Patches = [][]byte{}
- f.CreateLimit = 0
- f.CreateCallCount = 0
- }
- // ByLogging allows custom sorting of pods so the best one can be picked for getting its logs.
- type ByLogging []*v1.Pod
- func (s ByLogging) Len() int { return len(s) }
- func (s ByLogging) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
- func (s ByLogging) Less(i, j int) bool {
- // 1. assigned < unassigned
- if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
- return len(s[i].Spec.NodeName) > 0
- }
- // 2. PodRunning < PodUnknown < PodPending
- if s[i].Status.Phase != s[j].Status.Phase {
- return podPhaseToOrdinal[s[i].Status.Phase] > podPhaseToOrdinal[s[j].Status.Phase]
- }
- // 3. ready < not ready
- if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
- return podutil.IsPodReady(s[i])
- }
- // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
- // see https://github.com/kubernetes/kubernetes/issues/22065
- // 4. Been ready for more time < less time < empty time
- if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) {
- readyTime1 := podReadyTime(s[i])
- readyTime2 := podReadyTime(s[j])
- if !readyTime1.Equal(readyTime2) {
- return afterOrZero(readyTime2, readyTime1)
- }
- }
- // 5. Pods with containers with higher restart counts < lower restart counts
- if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
- return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
- }
- // 6. older pods < newer pods < empty timestamp pods
- if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
- return afterOrZero(&s[j].CreationTimestamp, &s[i].CreationTimestamp)
- }
- return false
- }
- // ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete.
- type ActivePods []*v1.Pod
- func (s ActivePods) Len() int { return len(s) }
- func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
- func (s ActivePods) Less(i, j int) bool {
- // 1. Unassigned < assigned
- // If only one of the pods is unassigned, the unassigned one is smaller
- if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
- return len(s[i].Spec.NodeName) == 0
- }
- // 2. PodPending < PodUnknown < PodRunning
- if podPhaseToOrdinal[s[i].Status.Phase] != podPhaseToOrdinal[s[j].Status.Phase] {
- return podPhaseToOrdinal[s[i].Status.Phase] < podPhaseToOrdinal[s[j].Status.Phase]
- }
- // 3. Not ready < ready
- // If only one of the pods is not ready, the not ready one is smaller
- if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
- return !podutil.IsPodReady(s[i])
- }
- // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
- // see https://github.com/kubernetes/kubernetes/issues/22065
- // 4. Been ready for empty time < less time < more time
- // If both pods are ready, the latest ready one is smaller
- if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) {
- readyTime1 := podReadyTime(s[i])
- readyTime2 := podReadyTime(s[j])
- if !readyTime1.Equal(readyTime2) {
- return afterOrZero(readyTime1, readyTime2)
- }
- }
- // 5. Pods with containers with higher restart counts < lower restart counts
- if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
- return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
- }
- // 6. Empty creation time pods < newer pods < older pods
- if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
- return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
- }
- return false
- }
- // ActivePodsWithRanks is a sortable list of pods and a list of corresponding
- // ranks which will be considered during sorting. The two lists must have equal
- // length. After sorting, the pods will be ordered as follows, applying each
- // rule in turn until one matches:
- //
- // 1. If only one of the pods is assigned to a node, the pod that is not
- // assigned comes before the pod that is.
- // 2. If the pods' phases differ, a pending pod comes before a pod whose phase
- // is unknown, and a pod whose phase is unknown comes before a running pod.
- // 3. If exactly one of the pods is ready, the pod that is not ready comes
- // before the ready pod.
- // 4. If the pods' ranks differ, the pod with greater rank comes before the pod
- // with lower rank.
- // 5. If both pods are ready but have not been ready for the same amount of
- // time, the pod that has been ready for a shorter amount of time comes
- // before the pod that has been ready for longer.
- // 6. If one pod has a container that has restarted more than any container in
- // the other pod, the pod with the container with more restarts comes
- // before the other pod.
- // 7. If the pods' creation times differ, the pod that was created more recently
- // comes before the older pod.
- //
- // If none of these rules matches, the second pod comes before the first pod.
- //
- // The intention of this ordering is to put pods that should be preferred for
- // deletion first in the list.
- type ActivePodsWithRanks struct {
- // Pods is a list of pods.
- Pods []*v1.Pod
- // Rank is a ranking of pods. This ranking is used during sorting when
- // comparing two pods that are both scheduled, in the same phase, and
- // having the same ready status.
- Rank []int
- }
- func (s ActivePodsWithRanks) Len() int {
- return len(s.Pods)
- }
- func (s ActivePodsWithRanks) Swap(i, j int) {
- s.Pods[i], s.Pods[j] = s.Pods[j], s.Pods[i]
- s.Rank[i], s.Rank[j] = s.Rank[j], s.Rank[i]
- }
- // Less compares two pods with corresponding ranks and returns true if the first
- // one should be preferred for deletion.
- func (s ActivePodsWithRanks) Less(i, j int) bool {
- // 1. Unassigned < assigned
- // If only one of the pods is unassigned, the unassigned one is smaller
- if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) {
- return len(s.Pods[i].Spec.NodeName) == 0
- }
- // 2. PodPending < PodUnknown < PodRunning
- if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] {
- return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase]
- }
- // 3. Not ready < ready
- // If only one of the pods is not ready, the not ready one is smaller
- if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) {
- return !podutil.IsPodReady(s.Pods[i])
- }
- // 4. Doubled up < not doubled up
- // If one of the two pods is on the same node as one or more additional
- // ready pods that belong to the same replicaset, whichever pod has more
- // colocated ready pods is less
- if s.Rank[i] != s.Rank[j] {
- return s.Rank[i] > s.Rank[j]
- }
- // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
- // see https://github.com/kubernetes/kubernetes/issues/22065
- // 5. Been ready for empty time < less time < more time
- // If both pods are ready, the latest ready one is smaller
- if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) {
- readyTime1 := podReadyTime(s.Pods[i])
- readyTime2 := podReadyTime(s.Pods[j])
- if !readyTime1.Equal(readyTime2) {
- return afterOrZero(readyTime1, readyTime2)
- }
- }
- // 6. Pods with containers with higher restart counts < lower restart counts
- if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) {
- return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j])
- }
- // 7. Empty creation time pods < newer pods < older pods
- if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) {
- return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp)
- }
- return false
- }
- // afterOrZero checks if time t1 is after time t2; if one of them
- // is zero, the zero time is seen as after non-zero time.
- func afterOrZero(t1, t2 *metav1.Time) bool {
- if t1.Time.IsZero() || t2.Time.IsZero() {
- return t1.Time.IsZero()
- }
- return t1.After(t2.Time)
- }
- func podReadyTime(pod *v1.Pod) *metav1.Time {
- if podutil.IsPodReady(pod) {
- for _, c := range pod.Status.Conditions {
- // we only care about pod ready conditions
- if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
- return &c.LastTransitionTime
- }
- }
- }
- return &metav1.Time{}
- }
- func maxContainerRestarts(pod *v1.Pod) int {
- maxRestarts := 0
- for _, c := range pod.Status.ContainerStatuses {
- maxRestarts = integer.IntMax(maxRestarts, int(c.RestartCount))
- }
- return maxRestarts
- }
- // FilterActivePods returns pods that have not terminated.
- func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
- var result []*v1.Pod
- for _, p := range pods {
- if IsPodActive(p) {
- result = append(result, p)
- } else {
- klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
- p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
- }
- }
- return result
- }
- func IsPodActive(p *v1.Pod) bool {
- return v1.PodSucceeded != p.Status.Phase &&
- v1.PodFailed != p.Status.Phase &&
- p.DeletionTimestamp == nil
- }
- // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
- func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet {
- activeFilter := func(rs *apps.ReplicaSet) bool {
- return rs != nil && *(rs.Spec.Replicas) > 0
- }
- return FilterReplicaSets(replicaSets, activeFilter)
- }
- type filterRS func(rs *apps.ReplicaSet) bool
- // FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn).
- func FilterReplicaSets(RSes []*apps.ReplicaSet, filterFn filterRS) []*apps.ReplicaSet {
- var filtered []*apps.ReplicaSet
- for i := range RSes {
- if filterFn(RSes[i]) {
- filtered = append(filtered, RSes[i])
- }
- }
- return filtered
- }
- // PodKey returns a key unique to the given pod within a cluster.
- // It's used so we consistently use the same key scheme in this module.
- // It does exactly what cache.MetaNamespaceKeyFunc would have done
- // except there's not possibility for error since we know the exact type.
- func PodKey(pod *v1.Pod) string {
- return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
- }
- // ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
- type ControllersByCreationTimestamp []*v1.ReplicationController
- func (o ControllersByCreationTimestamp) Len() int { return len(o) }
- func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o ControllersByCreationTimestamp) Less(i, j int) bool {
- if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
- }
- // ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
- type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet
- func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
- func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
- if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
- }
- // ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
- // By using the creation timestamp, this sorts from old to new replica sets.
- type ReplicaSetsBySizeOlder []*apps.ReplicaSet
- func (o ReplicaSetsBySizeOlder) Len() int { return len(o) }
- func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o ReplicaSetsBySizeOlder) Less(i, j int) bool {
- if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) {
- return ReplicaSetsByCreationTimestamp(o).Less(i, j)
- }
- return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas)
- }
- // ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
- // By using the creation timestamp, this sorts from new to old replica sets.
- type ReplicaSetsBySizeNewer []*apps.ReplicaSet
- func (o ReplicaSetsBySizeNewer) Len() int { return len(o) }
- func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o ReplicaSetsBySizeNewer) Less(i, j int) bool {
- if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) {
- return ReplicaSetsByCreationTimestamp(o).Less(j, i)
- }
- return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas)
- }
- // AddOrUpdateTaintOnNode add taints to the node. If taint was added into node, it'll issue API calls
- // to update nodes; otherwise, no API calls. Return error if any.
- func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v1.Taint) error {
- if len(taints) == 0 {
- return nil
- }
- firstTry := true
- return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error {
- var err error
- var oldNode *v1.Node
- // First we try getting node from the API server cache, as it's cheaper. If it fails
- // we get it from etcd to be sure to have fresh data.
- if firstTry {
- oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
- firstTry = false
- } else {
- oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
- }
- if err != nil {
- return err
- }
- var newNode *v1.Node
- oldNodeCopy := oldNode
- updated := false
- for _, taint := range taints {
- curNewNode, ok, err := taintutils.AddOrUpdateTaint(oldNodeCopy, taint)
- if err != nil {
- return fmt.Errorf("failed to update taint of node")
- }
- updated = updated || ok
- newNode = curNewNode
- oldNodeCopy = curNewNode
- }
- if !updated {
- return nil
- }
- return PatchNodeTaints(c, nodeName, oldNode, newNode)
- })
- }
- // RemoveTaintOffNode is for cleaning up taints temporarily added to node,
- // won't fail if target taint doesn't exist or has been removed.
- // If passed a node it'll check if there's anything to be done, if taint is not present it won't issue
- // any API calls.
- func RemoveTaintOffNode(c clientset.Interface, nodeName string, node *v1.Node, taints ...*v1.Taint) error {
- if len(taints) == 0 {
- return nil
- }
- // Short circuit for limiting amount of API calls.
- if node != nil {
- match := false
- for _, taint := range taints {
- if taintutils.TaintExists(node.Spec.Taints, taint) {
- match = true
- break
- }
- }
- if !match {
- return nil
- }
- }
- firstTry := true
- return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error {
- var err error
- var oldNode *v1.Node
- // First we try getting node from the API server cache, as it's cheaper. If it fails
- // we get it from etcd to be sure to have fresh data.
- if firstTry {
- oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
- firstTry = false
- } else {
- oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
- }
- if err != nil {
- return err
- }
- var newNode *v1.Node
- oldNodeCopy := oldNode
- updated := false
- for _, taint := range taints {
- curNewNode, ok, err := taintutils.RemoveTaint(oldNodeCopy, taint)
- if err != nil {
- return fmt.Errorf("failed to remove taint of node")
- }
- updated = updated || ok
- newNode = curNewNode
- oldNodeCopy = curNewNode
- }
- if !updated {
- return nil
- }
- return PatchNodeTaints(c, nodeName, oldNode, newNode)
- })
- }
- // PatchNodeTaints patches node's taints.
- func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
- oldData, err := json.Marshal(oldNode)
- if err != nil {
- return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
- }
- newTaints := newNode.Spec.Taints
- newNodeClone := oldNode.DeepCopy()
- newNodeClone.Spec.Taints = newTaints
- newData, err := json.Marshal(newNodeClone)
- if err != nil {
- return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
- }
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
- if err != nil {
- return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
- }
- _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
- return err
- }
- // ComputeHash returns a hash value calculated from pod template and
- // a collisionCount to avoid hash collision. The hash will be safe encoded to
- // avoid bad words.
- func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string {
- podTemplateSpecHasher := fnv.New32a()
- hashutil.DeepHashObject(podTemplateSpecHasher, *template)
- // Add collisionCount in the hash if it exists.
- if collisionCount != nil {
- collisionCountBytes := make([]byte, 8)
- binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount))
- podTemplateSpecHasher.Write(collisionCountBytes)
- }
- return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32()))
- }
- func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error {
- firstTry := true
- return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error {
- var err error
- var node *v1.Node
- // First we try getting node from the API server cache, as it's cheaper. If it fails
- // we get it from etcd to be sure to have fresh data.
- if firstTry {
- node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
- firstTry = false
- } else {
- node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
- }
- if err != nil {
- return err
- }
- // Make a copy of the node and update the labels.
- newNode := node.DeepCopy()
- if newNode.Labels == nil {
- newNode.Labels = make(map[string]string)
- }
- for key, value := range labelsToUpdate {
- newNode.Labels[key] = value
- }
- oldData, err := json.Marshal(node)
- if err != nil {
- return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
- }
- newData, err := json.Marshal(newNode)
- if err != nil {
- return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err)
- }
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
- if err != nil {
- return fmt.Errorf("failed to create a two-way merge patch: %v", err)
- }
- if _, err := kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
- return fmt.Errorf("failed to patch the node: %v", err)
- }
- return nil
- })
- }
- func getOrCreateServiceAccount(coreClient v1core.CoreV1Interface, namespace, name string) (*v1.ServiceAccount, error) {
- sa, err := coreClient.ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
- if err == nil {
- return sa, nil
- }
- if !apierrors.IsNotFound(err) {
- return nil, err
- }
- // Create the namespace if we can't verify it exists.
- // Tolerate errors, since we don't know whether this component has namespace creation permissions.
- if _, err := coreClient.Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}); apierrors.IsNotFound(err) {
- if _, err = coreClient.Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
- klog.Warningf("create non-exist namespace %s failed:%v", namespace, err)
- }
- }
- // Create the service account
- sa, err = coreClient.ServiceAccounts(namespace).Create(context.TODO(), &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, metav1.CreateOptions{})
- if apierrors.IsAlreadyExists(err) {
- // If we're racing to init and someone else already created it, re-fetch
- return coreClient.ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
- }
- return sa, err
- }
|