lease.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconcilers
  14. /*
  15. Original Source:
  16. https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
  17. */
  18. import (
  19. "fmt"
  20. "net"
  21. "path"
  22. "sync"
  23. "time"
  24. "k8s.io/klog"
  25. corev1 "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. kruntime "k8s.io/apimachinery/pkg/runtime"
  29. apirequest "k8s.io/apiserver/pkg/endpoints/request"
  30. "k8s.io/apiserver/pkg/registry/rest"
  31. "k8s.io/apiserver/pkg/storage"
  32. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  33. endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
  34. )
  35. // Leases is an interface which assists in managing the set of active masters
  36. type Leases interface {
  37. // ListLeases retrieves a list of the current master IPs
  38. ListLeases() ([]string, error)
  39. // UpdateLease adds or refreshes a master's lease
  40. UpdateLease(ip string) error
  41. // RemoveLease removes a master's lease
  42. RemoveLease(ip string) error
  43. }
  44. type storageLeases struct {
  45. storage storage.Interface
  46. baseKey string
  47. leaseTime time.Duration
  48. }
  49. var _ Leases = &storageLeases{}
  50. // ListLeases retrieves a list of the current master IPs from storage
  51. func (s *storageLeases) ListLeases() ([]string, error) {
  52. ipInfoList := &corev1.EndpointsList{}
  53. if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
  54. return nil, err
  55. }
  56. ipList := make([]string, len(ipInfoList.Items))
  57. for i, ip := range ipInfoList.Items {
  58. ipList[i] = ip.Subsets[0].Addresses[0].IP
  59. }
  60. klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
  61. return ipList, nil
  62. }
  63. // UpdateLease resets the TTL on a master IP in storage
  64. func (s *storageLeases) UpdateLease(ip string) error {
  65. key := path.Join(s.baseKey, ip)
  66. return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
  67. // just make sure we've got the right IP set, and then refresh the TTL
  68. existing := input.(*corev1.Endpoints)
  69. existing.Subsets = []corev1.EndpointSubset{
  70. {
  71. Addresses: []corev1.EndpointAddress{{IP: ip}},
  72. },
  73. }
  74. // leaseTime needs to be in seconds
  75. leaseTime := uint64(s.leaseTime / time.Second)
  76. // NB: GuaranteedUpdate does not perform the store operation unless
  77. // something changed between load and store (not including resource
  78. // version), meaning we can't refresh the TTL without actually
  79. // changing a field.
  80. existing.Generation++
  81. klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
  82. return existing, &leaseTime, nil
  83. })
  84. }
  85. // RemoveLease removes the lease on a master IP in storage
  86. func (s *storageLeases) RemoveLease(ip string) error {
  87. return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc)
  88. }
  89. // NewLeases creates a new etcd-based Leases implementation.
  90. func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
  91. return &storageLeases{
  92. storage: storage,
  93. baseKey: baseKey,
  94. leaseTime: leaseTime,
  95. }
  96. }
  97. type leaseEndpointReconciler struct {
  98. endpointClient corev1client.EndpointsGetter
  99. masterLeases Leases
  100. stopReconcilingCalled bool
  101. reconcilingLock sync.Mutex
  102. }
  103. // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
  104. func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
  105. return &leaseEndpointReconciler{
  106. endpointClient: endpointClient,
  107. masterLeases: masterLeases,
  108. stopReconcilingCalled: false,
  109. }
  110. }
  111. // ReconcileEndpoints lists keys in a special etcd directory.
  112. // Each key is expected to have a TTL of R+n, where R is the refresh interval
  113. // at which this function is called, and n is some small value. If an
  114. // apiserver goes down, it will fail to refresh its key's TTL and the key will
  115. // expire. ReconcileEndpoints will notice that the endpoints object is
  116. // different from the directory listing, and update the endpoints object
  117. // accordingly.
  118. func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  119. r.reconcilingLock.Lock()
  120. defer r.reconcilingLock.Unlock()
  121. if r.stopReconcilingCalled {
  122. return nil
  123. }
  124. // Refresh the TTL on our key, independently of whether any error or
  125. // update conflict happens below. This makes sure that at least some of
  126. // the masters will add our endpoint.
  127. if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
  128. return err
  129. }
  130. return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
  131. }
  132. func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  133. e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
  134. shouldCreate := false
  135. if err != nil {
  136. if !errors.IsNotFound(err) {
  137. return err
  138. }
  139. shouldCreate = true
  140. e = &corev1.Endpoints{
  141. ObjectMeta: metav1.ObjectMeta{
  142. Name: serviceName,
  143. Namespace: corev1.NamespaceDefault,
  144. },
  145. }
  146. }
  147. // ... and the list of master IP keys from etcd
  148. masterIPs, err := r.masterLeases.ListLeases()
  149. if err != nil {
  150. return err
  151. }
  152. // Since we just refreshed our own key, assume that zero endpoints
  153. // returned from storage indicates an issue or invalid state, and thus do
  154. // not update the endpoints list based on the result.
  155. if len(masterIPs) == 0 {
  156. return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
  157. }
  158. // Next, we compare the current list of endpoints with the list of master IP keys
  159. formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
  160. if formatCorrect && ipCorrect && portsCorrect {
  161. return nil
  162. }
  163. if !formatCorrect {
  164. // Something is egregiously wrong, just re-make the endpoints record.
  165. e.Subsets = []corev1.EndpointSubset{{
  166. Addresses: []corev1.EndpointAddress{},
  167. Ports: endpointPorts,
  168. }}
  169. }
  170. if !formatCorrect || !ipCorrect {
  171. // repopulate the addresses according to the expected IPs from etcd
  172. e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
  173. for ind, ip := range masterIPs {
  174. e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
  175. }
  176. // Lexicographic order is retained by this step.
  177. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  178. }
  179. if !portsCorrect {
  180. // Reset ports.
  181. e.Subsets[0].Ports = endpointPorts
  182. }
  183. klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
  184. if shouldCreate {
  185. if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
  186. err = nil
  187. }
  188. } else {
  189. _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
  190. }
  191. return err
  192. }
  193. // checkEndpointSubsetFormatWithLease determines if the endpoint is in the
  194. // format ReconcileEndpoints expects when the controller is using leases.
  195. //
  196. // Return values:
  197. // * formatCorrect is true if exactly one subset is found.
  198. // * ipsCorrect when the addresses in the endpoints match the expected addresses list
  199. // * portsCorrect is true when endpoint ports exactly match provided ports.
  200. // portsCorrect is only evaluated when reconcilePorts is set to true.
  201. func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
  202. if len(e.Subsets) != 1 {
  203. return false, false, false
  204. }
  205. sub := &e.Subsets[0]
  206. portsCorrect = true
  207. if reconcilePorts {
  208. if len(sub.Ports) != len(ports) {
  209. portsCorrect = false
  210. } else {
  211. for i, port := range ports {
  212. if port != sub.Ports[i] {
  213. portsCorrect = false
  214. break
  215. }
  216. }
  217. }
  218. }
  219. ipsCorrect = true
  220. if len(sub.Addresses) != len(expectedIPs) {
  221. ipsCorrect = false
  222. } else {
  223. // check the actual content of the addresses
  224. // present addrs is used as a set (the keys) and to indicate if a
  225. // value was already found (the values)
  226. presentAddrs := make(map[string]bool, len(expectedIPs))
  227. for _, ip := range expectedIPs {
  228. presentAddrs[ip] = false
  229. }
  230. // uniqueness is assumed amongst all Addresses.
  231. for _, addr := range sub.Addresses {
  232. if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
  233. ipsCorrect = false
  234. break
  235. }
  236. presentAddrs[addr.IP] = true
  237. }
  238. }
  239. return true, ipsCorrect, portsCorrect
  240. }
  241. func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
  242. if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
  243. return err
  244. }
  245. return r.doReconcile(serviceName, endpointPorts, true)
  246. }
  247. func (r *leaseEndpointReconciler) StopReconciling() {
  248. r.reconcilingLock.Lock()
  249. defer r.reconcilingLock.Unlock()
  250. r.stopReconcilingCalled = true
  251. }