lease.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconcilers
  14. /*
  15. Original Source:
  16. https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
  17. */
  18. import (
  19. "fmt"
  20. "net"
  21. "path"
  22. "sync"
  23. "time"
  24. "k8s.io/klog"
  25. corev1 "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. kruntime "k8s.io/apimachinery/pkg/runtime"
  29. apirequest "k8s.io/apiserver/pkg/endpoints/request"
  30. "k8s.io/apiserver/pkg/registry/rest"
  31. "k8s.io/apiserver/pkg/storage"
  32. endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
  33. )
  34. // Leases is an interface which assists in managing the set of active masters
  35. type Leases interface {
  36. // ListLeases retrieves a list of the current master IPs
  37. ListLeases() ([]string, error)
  38. // UpdateLease adds or refreshes a master's lease
  39. UpdateLease(ip string) error
  40. // RemoveLease removes a master's lease
  41. RemoveLease(ip string) error
  42. }
  43. type storageLeases struct {
  44. storage storage.Interface
  45. baseKey string
  46. leaseTime time.Duration
  47. }
  48. var _ Leases = &storageLeases{}
  49. // ListLeases retrieves a list of the current master IPs from storage
  50. func (s *storageLeases) ListLeases() ([]string, error) {
  51. ipInfoList := &corev1.EndpointsList{}
  52. if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
  53. return nil, err
  54. }
  55. ipList := make([]string, 0, len(ipInfoList.Items))
  56. for _, ip := range ipInfoList.Items {
  57. if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 {
  58. ipList = append(ipList, ip.Subsets[0].Addresses[0].IP)
  59. }
  60. }
  61. klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
  62. return ipList, nil
  63. }
  64. // UpdateLease resets the TTL on a master IP in storage
  65. func (s *storageLeases) UpdateLease(ip string) error {
  66. key := path.Join(s.baseKey, ip)
  67. return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
  68. // just make sure we've got the right IP set, and then refresh the TTL
  69. existing := input.(*corev1.Endpoints)
  70. existing.Subsets = []corev1.EndpointSubset{
  71. {
  72. Addresses: []corev1.EndpointAddress{{IP: ip}},
  73. },
  74. }
  75. // leaseTime needs to be in seconds
  76. leaseTime := uint64(s.leaseTime / time.Second)
  77. // NB: GuaranteedUpdate does not perform the store operation unless
  78. // something changed between load and store (not including resource
  79. // version), meaning we can't refresh the TTL without actually
  80. // changing a field.
  81. existing.Generation++
  82. klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
  83. return existing, &leaseTime, nil
  84. })
  85. }
  86. // RemoveLease removes the lease on a master IP in storage
  87. func (s *storageLeases) RemoveLease(ip string) error {
  88. return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc)
  89. }
  90. // NewLeases creates a new etcd-based Leases implementation.
  91. func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
  92. return &storageLeases{
  93. storage: storage,
  94. baseKey: baseKey,
  95. leaseTime: leaseTime,
  96. }
  97. }
  98. type leaseEndpointReconciler struct {
  99. epAdapter EndpointsAdapter
  100. masterLeases Leases
  101. stopReconcilingCalled bool
  102. reconcilingLock sync.Mutex
  103. }
  104. // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
  105. func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler {
  106. return &leaseEndpointReconciler{
  107. epAdapter: epAdapter,
  108. masterLeases: masterLeases,
  109. stopReconcilingCalled: false,
  110. }
  111. }
  112. // ReconcileEndpoints lists keys in a special etcd directory.
  113. // Each key is expected to have a TTL of R+n, where R is the refresh interval
  114. // at which this function is called, and n is some small value. If an
  115. // apiserver goes down, it will fail to refresh its key's TTL and the key will
  116. // expire. ReconcileEndpoints will notice that the endpoints object is
  117. // different from the directory listing, and update the endpoints object
  118. // accordingly.
  119. func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  120. r.reconcilingLock.Lock()
  121. defer r.reconcilingLock.Unlock()
  122. if r.stopReconcilingCalled {
  123. return nil
  124. }
  125. // Refresh the TTL on our key, independently of whether any error or
  126. // update conflict happens below. This makes sure that at least some of
  127. // the masters will add our endpoint.
  128. if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
  129. return err
  130. }
  131. return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
  132. }
  133. func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  134. e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
  135. shouldCreate := false
  136. if err != nil {
  137. if !errors.IsNotFound(err) {
  138. return err
  139. }
  140. shouldCreate = true
  141. e = &corev1.Endpoints{
  142. ObjectMeta: metav1.ObjectMeta{
  143. Name: serviceName,
  144. Namespace: corev1.NamespaceDefault,
  145. },
  146. }
  147. }
  148. // ... and the list of master IP keys from etcd
  149. masterIPs, err := r.masterLeases.ListLeases()
  150. if err != nil {
  151. return err
  152. }
  153. // Since we just refreshed our own key, assume that zero endpoints
  154. // returned from storage indicates an issue or invalid state, and thus do
  155. // not update the endpoints list based on the result.
  156. if len(masterIPs) == 0 {
  157. return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
  158. }
  159. // Next, we compare the current list of endpoints with the list of master IP keys
  160. formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
  161. if formatCorrect && ipCorrect && portsCorrect {
  162. return r.epAdapter.EnsureEndpointSliceFromEndpoints(corev1.NamespaceDefault, e)
  163. }
  164. if !formatCorrect {
  165. // Something is egregiously wrong, just re-make the endpoints record.
  166. e.Subsets = []corev1.EndpointSubset{{
  167. Addresses: []corev1.EndpointAddress{},
  168. Ports: endpointPorts,
  169. }}
  170. }
  171. if !formatCorrect || !ipCorrect {
  172. // repopulate the addresses according to the expected IPs from etcd
  173. e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
  174. for ind, ip := range masterIPs {
  175. e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
  176. }
  177. // Lexicographic order is retained by this step.
  178. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  179. }
  180. if !portsCorrect {
  181. // Reset ports.
  182. e.Subsets[0].Ports = endpointPorts
  183. }
  184. klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
  185. if shouldCreate {
  186. if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
  187. err = nil
  188. }
  189. } else {
  190. _, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
  191. }
  192. return err
  193. }
  194. // checkEndpointSubsetFormatWithLease determines if the endpoint is in the
  195. // format ReconcileEndpoints expects when the controller is using leases.
  196. //
  197. // Return values:
  198. // * formatCorrect is true if exactly one subset is found.
  199. // * ipsCorrect when the addresses in the endpoints match the expected addresses list
  200. // * portsCorrect is true when endpoint ports exactly match provided ports.
  201. // portsCorrect is only evaluated when reconcilePorts is set to true.
  202. func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
  203. if len(e.Subsets) != 1 {
  204. return false, false, false
  205. }
  206. sub := &e.Subsets[0]
  207. portsCorrect = true
  208. if reconcilePorts {
  209. if len(sub.Ports) != len(ports) {
  210. portsCorrect = false
  211. } else {
  212. for i, port := range ports {
  213. if port != sub.Ports[i] {
  214. portsCorrect = false
  215. break
  216. }
  217. }
  218. }
  219. }
  220. ipsCorrect = true
  221. if len(sub.Addresses) != len(expectedIPs) {
  222. ipsCorrect = false
  223. } else {
  224. // check the actual content of the addresses
  225. // present addrs is used as a set (the keys) and to indicate if a
  226. // value was already found (the values)
  227. presentAddrs := make(map[string]bool, len(expectedIPs))
  228. for _, ip := range expectedIPs {
  229. presentAddrs[ip] = false
  230. }
  231. // uniqueness is assumed amongst all Addresses.
  232. for _, addr := range sub.Addresses {
  233. if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
  234. ipsCorrect = false
  235. break
  236. }
  237. presentAddrs[addr.IP] = true
  238. }
  239. }
  240. return true, ipsCorrect, portsCorrect
  241. }
  242. func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
  243. if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
  244. return err
  245. }
  246. return r.doReconcile(serviceName, endpointPorts, true)
  247. }
  248. func (r *leaseEndpointReconciler) StopReconciling() {
  249. r.reconcilingLock.Lock()
  250. defer r.reconcilingLock.Unlock()
  251. r.stopReconcilingCalled = true
  252. }