123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package reconcilers
- /*
- Original Source:
- https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
- */
- import (
- "fmt"
- "net"
- "path"
- "sync"
- "time"
- "k8s.io/klog"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- kruntime "k8s.io/apimachinery/pkg/runtime"
- apirequest "k8s.io/apiserver/pkg/endpoints/request"
- "k8s.io/apiserver/pkg/registry/rest"
- "k8s.io/apiserver/pkg/storage"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
- endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
- )
- // Leases is an interface which assists in managing the set of active masters
- type Leases interface {
- // ListLeases retrieves a list of the current master IPs
- ListLeases() ([]string, error)
- // UpdateLease adds or refreshes a master's lease
- UpdateLease(ip string) error
- // RemoveLease removes a master's lease
- RemoveLease(ip string) error
- }
- type storageLeases struct {
- storage storage.Interface
- baseKey string
- leaseTime time.Duration
- }
- var _ Leases = &storageLeases{}
- // ListLeases retrieves a list of the current master IPs from storage
- func (s *storageLeases) ListLeases() ([]string, error) {
- ipInfoList := &corev1.EndpointsList{}
- if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
- return nil, err
- }
- ipList := make([]string, len(ipInfoList.Items))
- for i, ip := range ipInfoList.Items {
- ipList[i] = ip.Subsets[0].Addresses[0].IP
- }
- klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
- return ipList, nil
- }
- // UpdateLease resets the TTL on a master IP in storage
- func (s *storageLeases) UpdateLease(ip string) error {
- key := path.Join(s.baseKey, ip)
- return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
- // just make sure we've got the right IP set, and then refresh the TTL
- existing := input.(*corev1.Endpoints)
- existing.Subsets = []corev1.EndpointSubset{
- {
- Addresses: []corev1.EndpointAddress{{IP: ip}},
- },
- }
- // leaseTime needs to be in seconds
- leaseTime := uint64(s.leaseTime / time.Second)
- // NB: GuaranteedUpdate does not perform the store operation unless
- // something changed between load and store (not including resource
- // version), meaning we can't refresh the TTL without actually
- // changing a field.
- existing.Generation++
- klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
- return existing, &leaseTime, nil
- })
- }
- // RemoveLease removes the lease on a master IP in storage
- func (s *storageLeases) RemoveLease(ip string) error {
- return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc)
- }
- // NewLeases creates a new etcd-based Leases implementation.
- func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
- return &storageLeases{
- storage: storage,
- baseKey: baseKey,
- leaseTime: leaseTime,
- }
- }
- type leaseEndpointReconciler struct {
- endpointClient corev1client.EndpointsGetter
- masterLeases Leases
- stopReconcilingCalled bool
- reconcilingLock sync.Mutex
- }
- // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
- func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
- return &leaseEndpointReconciler{
- endpointClient: endpointClient,
- masterLeases: masterLeases,
- stopReconcilingCalled: false,
- }
- }
- // ReconcileEndpoints lists keys in a special etcd directory.
- // Each key is expected to have a TTL of R+n, where R is the refresh interval
- // at which this function is called, and n is some small value. If an
- // apiserver goes down, it will fail to refresh its key's TTL and the key will
- // expire. ReconcileEndpoints will notice that the endpoints object is
- // different from the directory listing, and update the endpoints object
- // accordingly.
- func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
- r.reconcilingLock.Lock()
- defer r.reconcilingLock.Unlock()
- if r.stopReconcilingCalled {
- return nil
- }
- // Refresh the TTL on our key, independently of whether any error or
- // update conflict happens below. This makes sure that at least some of
- // the masters will add our endpoint.
- if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
- return err
- }
- return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
- }
- func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
- e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
- shouldCreate := false
- if err != nil {
- if !errors.IsNotFound(err) {
- return err
- }
- shouldCreate = true
- e = &corev1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: serviceName,
- Namespace: corev1.NamespaceDefault,
- },
- }
- }
- // ... and the list of master IP keys from etcd
- masterIPs, err := r.masterLeases.ListLeases()
- if err != nil {
- return err
- }
- // Since we just refreshed our own key, assume that zero endpoints
- // returned from storage indicates an issue or invalid state, and thus do
- // not update the endpoints list based on the result.
- if len(masterIPs) == 0 {
- return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
- }
- // Next, we compare the current list of endpoints with the list of master IP keys
- formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
- if formatCorrect && ipCorrect && portsCorrect {
- return nil
- }
- if !formatCorrect {
- // Something is egregiously wrong, just re-make the endpoints record.
- e.Subsets = []corev1.EndpointSubset{{
- Addresses: []corev1.EndpointAddress{},
- Ports: endpointPorts,
- }}
- }
- if !formatCorrect || !ipCorrect {
- // repopulate the addresses according to the expected IPs from etcd
- e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
- for ind, ip := range masterIPs {
- e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
- }
- // Lexicographic order is retained by this step.
- e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
- }
- if !portsCorrect {
- // Reset ports.
- e.Subsets[0].Ports = endpointPorts
- }
- klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
- if shouldCreate {
- if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
- err = nil
- }
- } else {
- _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
- }
- return err
- }
- // checkEndpointSubsetFormatWithLease determines if the endpoint is in the
- // format ReconcileEndpoints expects when the controller is using leases.
- //
- // Return values:
- // * formatCorrect is true if exactly one subset is found.
- // * ipsCorrect when the addresses in the endpoints match the expected addresses list
- // * portsCorrect is true when endpoint ports exactly match provided ports.
- // portsCorrect is only evaluated when reconcilePorts is set to true.
- func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
- if len(e.Subsets) != 1 {
- return false, false, false
- }
- sub := &e.Subsets[0]
- portsCorrect = true
- if reconcilePorts {
- if len(sub.Ports) != len(ports) {
- portsCorrect = false
- } else {
- for i, port := range ports {
- if port != sub.Ports[i] {
- portsCorrect = false
- break
- }
- }
- }
- }
- ipsCorrect = true
- if len(sub.Addresses) != len(expectedIPs) {
- ipsCorrect = false
- } else {
- // check the actual content of the addresses
- // present addrs is used as a set (the keys) and to indicate if a
- // value was already found (the values)
- presentAddrs := make(map[string]bool, len(expectedIPs))
- for _, ip := range expectedIPs {
- presentAddrs[ip] = false
- }
- // uniqueness is assumed amongst all Addresses.
- for _, addr := range sub.Addresses {
- if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
- ipsCorrect = false
- break
- }
- presentAddrs[addr.IP] = true
- }
- }
- return true, ipsCorrect, portsCorrect
- }
- func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
- if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
- return err
- }
- return r.doReconcile(serviceName, endpointPorts, true)
- }
- func (r *leaseEndpointReconciler) StopReconciling() {
- r.reconcilingLock.Lock()
- defer r.reconcilingLock.Unlock()
- r.stopReconcilingCalled = true
- }
|