mastercount.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package reconcilers master count based reconciler
  14. package reconcilers
  15. import (
  16. "net"
  17. "sync"
  18. corev1 "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/api/errors"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/client-go/util/retry"
  22. "k8s.io/klog"
  23. endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
  24. )
  25. // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
  26. // masters. masterCountEndpointReconciler implements EndpointReconciler.
  27. type masterCountEndpointReconciler struct {
  28. masterCount int
  29. epAdapter EndpointsAdapter
  30. stopReconcilingCalled bool
  31. reconcilingLock sync.Mutex
  32. }
  33. // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
  34. // specified expected number of masters.
  35. func NewMasterCountEndpointReconciler(masterCount int, epAdapter EndpointsAdapter) EndpointReconciler {
  36. return &masterCountEndpointReconciler{
  37. masterCount: masterCount,
  38. epAdapter: epAdapter,
  39. }
  40. }
  41. // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
  42. // ReconcileEndpoints expects that the endpoints objects it manages will all be
  43. // managed only by ReconcileEndpoints; therefore, to understand this, you need only
  44. // understand the requirements and the body of this function.
  45. //
  46. // Requirements:
  47. // * All apiservers MUST use the same ports for their {rw, ro} services.
  48. // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
  49. // endpoints for their {rw, ro} services.
  50. // * All apiservers MUST know and agree on the number of apiservers expected
  51. // to be running (c.masterCount).
  52. // * ReconcileEndpoints is called periodically from all apiservers.
  53. func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  54. r.reconcilingLock.Lock()
  55. defer r.reconcilingLock.Unlock()
  56. if r.stopReconcilingCalled {
  57. return nil
  58. }
  59. e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
  60. if err != nil {
  61. e = &corev1.Endpoints{
  62. ObjectMeta: metav1.ObjectMeta{
  63. Name: serviceName,
  64. Namespace: metav1.NamespaceDefault,
  65. },
  66. }
  67. }
  68. if errors.IsNotFound(err) {
  69. // Simply create non-existing endpoints for the service.
  70. e.Subsets = []corev1.EndpointSubset{{
  71. Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
  72. Ports: endpointPorts,
  73. }}
  74. _, err = r.epAdapter.Create(metav1.NamespaceDefault, e)
  75. return err
  76. }
  77. // First, determine if the endpoint is in the format we expect (one
  78. // subset, ports matching endpointPorts, N IP addresses).
  79. formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
  80. if !formatCorrect {
  81. // Something is egregiously wrong, just re-make the endpoints record.
  82. e.Subsets = []corev1.EndpointSubset{{
  83. Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
  84. Ports: endpointPorts,
  85. }}
  86. klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
  87. _, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
  88. return err
  89. }
  90. if ipCorrect && portsCorrect {
  91. return r.epAdapter.EnsureEndpointSliceFromEndpoints(metav1.NamespaceDefault, e)
  92. }
  93. if !ipCorrect {
  94. // We *always* add our own IP address.
  95. e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
  96. // Lexicographic order is retained by this step.
  97. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  98. // If too many IP addresses, remove the ones lexicographically after our
  99. // own IP address. Given the requirements stated at the top of
  100. // this function, this should cause the list of IP addresses to
  101. // become eventually correct.
  102. if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
  103. // addrs is a pointer because we're going to mutate it.
  104. for i, addr := range *addrs {
  105. if addr.IP == ip.String() {
  106. for len(*addrs) > r.masterCount {
  107. // wrap around if necessary.
  108. remove := (i + 1) % len(*addrs)
  109. *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
  110. }
  111. break
  112. }
  113. }
  114. }
  115. }
  116. if !portsCorrect {
  117. // Reset ports.
  118. e.Subsets[0].Ports = endpointPorts
  119. }
  120. klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
  121. _, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
  122. return err
  123. }
  124. func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
  125. r.reconcilingLock.Lock()
  126. defer r.reconcilingLock.Unlock()
  127. e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
  128. if err != nil {
  129. if errors.IsNotFound(err) {
  130. // Endpoint doesn't exist
  131. return nil
  132. }
  133. return err
  134. }
  135. if len(e.Subsets) == 0 {
  136. // no action is needed to remove the endpoint
  137. return nil
  138. }
  139. // Remove our IP from the list of addresses
  140. new := []corev1.EndpointAddress{}
  141. for _, addr := range e.Subsets[0].Addresses {
  142. if addr.IP != ip.String() {
  143. new = append(new, addr)
  144. }
  145. }
  146. e.Subsets[0].Addresses = new
  147. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  148. err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  149. _, err := r.epAdapter.Update(metav1.NamespaceDefault, e)
  150. return err
  151. })
  152. return err
  153. }
  154. func (r *masterCountEndpointReconciler) StopReconciling() {
  155. r.reconcilingLock.Lock()
  156. defer r.reconcilingLock.Unlock()
  157. r.stopReconcilingCalled = true
  158. }
  159. // Determine if the endpoint is in the format ReconcileEndpoints expects.
  160. //
  161. // Return values:
  162. // * formatCorrect is true if exactly one subset is found.
  163. // * ipCorrect is true when current master's IP is found and the number
  164. // of addresses is less than or equal to the master count.
  165. // * portsCorrect is true when endpoint ports exactly match provided ports.
  166. // portsCorrect is only evaluated when reconcilePorts is set to true.
  167. func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
  168. if len(e.Subsets) != 1 {
  169. return false, false, false
  170. }
  171. sub := &e.Subsets[0]
  172. portsCorrect = true
  173. if reconcilePorts {
  174. if len(sub.Ports) != len(ports) {
  175. portsCorrect = false
  176. }
  177. for i, port := range ports {
  178. if len(sub.Ports) <= i || port != sub.Ports[i] {
  179. portsCorrect = false
  180. break
  181. }
  182. }
  183. }
  184. for _, addr := range sub.Addresses {
  185. if addr.IP == ip {
  186. ipCorrect = len(sub.Addresses) <= count
  187. break
  188. }
  189. }
  190. return true, ipCorrect, portsCorrect
  191. }
  192. // GetMasterServiceUpdateIfNeeded sets service attributes for the
  193. // given apiserver service.
  194. // * GetMasterServiceUpdateIfNeeded expects that the service object it
  195. // manages will be managed only by GetMasterServiceUpdateIfNeeded;
  196. // therefore, to understand this, you need only understand the
  197. // requirements and the body of this function.
  198. // * GetMasterServiceUpdateIfNeeded ensures that the correct ports are
  199. // are set.
  200. //
  201. // Requirements:
  202. // * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only
  203. // GetMasterServiceUpdateIfNeeded to manage service attributes
  204. // * updateMasterService is called periodically from all apiservers.
  205. func GetMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
  206. // Determine if the service is in the format we expect
  207. // (servicePorts are present and service type matches)
  208. formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
  209. if formatCorrect {
  210. return svc, false
  211. }
  212. svc.Spec.Ports = servicePorts
  213. svc.Spec.Type = serviceType
  214. return svc, true
  215. }
  216. // Determine if the service is in the correct format
  217. // GetMasterServiceUpdateIfNeeded expects (servicePorts are correct
  218. // and service type matches).
  219. func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
  220. if s.Spec.Type != serviceType {
  221. return false
  222. }
  223. if len(ports) != len(s.Spec.Ports) {
  224. return false
  225. }
  226. for i, port := range ports {
  227. if port != s.Spec.Ports[i] {
  228. return false
  229. }
  230. }
  231. return true
  232. }