mastercount.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package reconcilers master count based reconciler
  14. package reconcilers
  15. import (
  16. "net"
  17. "sync"
  18. corev1 "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/api/errors"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  22. "k8s.io/client-go/util/retry"
  23. "k8s.io/klog"
  24. endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
  25. )
  26. // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
  27. // masters. masterCountEndpointReconciler implements EndpointReconciler.
  28. type masterCountEndpointReconciler struct {
  29. masterCount int
  30. endpointClient corev1client.EndpointsGetter
  31. stopReconcilingCalled bool
  32. reconcilingLock sync.Mutex
  33. }
  34. // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
  35. // specified expected number of masters.
  36. func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler {
  37. return &masterCountEndpointReconciler{
  38. masterCount: masterCount,
  39. endpointClient: endpointClient,
  40. }
  41. }
  42. // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
  43. // ReconcileEndpoints expects that the endpoints objects it manages will all be
  44. // managed only by ReconcileEndpoints; therefore, to understand this, you need only
  45. // understand the requirements and the body of this function.
  46. //
  47. // Requirements:
  48. // * All apiservers MUST use the same ports for their {rw, ro} services.
  49. // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
  50. // endpoints for their {rw, ro} services.
  51. // * All apiservers MUST know and agree on the number of apiservers expected
  52. // to be running (c.masterCount).
  53. // * ReconcileEndpoints is called periodically from all apiservers.
  54. func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
  55. r.reconcilingLock.Lock()
  56. defer r.reconcilingLock.Unlock()
  57. if r.stopReconcilingCalled {
  58. return nil
  59. }
  60. e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
  61. if err != nil {
  62. e = &corev1.Endpoints{
  63. ObjectMeta: metav1.ObjectMeta{
  64. Name: serviceName,
  65. Namespace: metav1.NamespaceDefault,
  66. },
  67. }
  68. }
  69. if errors.IsNotFound(err) {
  70. // Simply create non-existing endpoints for the service.
  71. e.Subsets = []corev1.EndpointSubset{{
  72. Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
  73. Ports: endpointPorts,
  74. }}
  75. _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e)
  76. return err
  77. }
  78. // First, determine if the endpoint is in the format we expect (one
  79. // subset, ports matching endpointPorts, N IP addresses).
  80. formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
  81. if !formatCorrect {
  82. // Something is egregiously wrong, just re-make the endpoints record.
  83. e.Subsets = []corev1.EndpointSubset{{
  84. Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
  85. Ports: endpointPorts,
  86. }}
  87. klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
  88. _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
  89. return err
  90. }
  91. if ipCorrect && portsCorrect {
  92. return nil
  93. }
  94. if !ipCorrect {
  95. // We *always* add our own IP address.
  96. e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
  97. // Lexicographic order is retained by this step.
  98. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  99. // If too many IP addresses, remove the ones lexicographically after our
  100. // own IP address. Given the requirements stated at the top of
  101. // this function, this should cause the list of IP addresses to
  102. // become eventually correct.
  103. if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
  104. // addrs is a pointer because we're going to mutate it.
  105. for i, addr := range *addrs {
  106. if addr.IP == ip.String() {
  107. for len(*addrs) > r.masterCount {
  108. // wrap around if necessary.
  109. remove := (i + 1) % len(*addrs)
  110. *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
  111. }
  112. break
  113. }
  114. }
  115. }
  116. }
  117. if !portsCorrect {
  118. // Reset ports.
  119. e.Subsets[0].Ports = endpointPorts
  120. }
  121. klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
  122. _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
  123. return err
  124. }
  125. func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
  126. r.reconcilingLock.Lock()
  127. defer r.reconcilingLock.Unlock()
  128. e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
  129. if err != nil {
  130. if errors.IsNotFound(err) {
  131. // Endpoint doesn't exist
  132. return nil
  133. }
  134. return err
  135. }
  136. // Remove our IP from the list of addresses
  137. new := []corev1.EndpointAddress{}
  138. for _, addr := range e.Subsets[0].Addresses {
  139. if addr.IP != ip.String() {
  140. new = append(new, addr)
  141. }
  142. }
  143. e.Subsets[0].Addresses = new
  144. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
  145. err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  146. _, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
  147. return err
  148. })
  149. return err
  150. }
  151. func (r *masterCountEndpointReconciler) StopReconciling() {
  152. r.reconcilingLock.Lock()
  153. defer r.reconcilingLock.Unlock()
  154. r.stopReconcilingCalled = true
  155. }
  156. // Determine if the endpoint is in the format ReconcileEndpoints expects.
  157. //
  158. // Return values:
  159. // * formatCorrect is true if exactly one subset is found.
  160. // * ipCorrect is true when current master's IP is found and the number
  161. // of addresses is less than or equal to the master count.
  162. // * portsCorrect is true when endpoint ports exactly match provided ports.
  163. // portsCorrect is only evaluated when reconcilePorts is set to true.
  164. func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
  165. if len(e.Subsets) != 1 {
  166. return false, false, false
  167. }
  168. sub := &e.Subsets[0]
  169. portsCorrect = true
  170. if reconcilePorts {
  171. if len(sub.Ports) != len(ports) {
  172. portsCorrect = false
  173. }
  174. for i, port := range ports {
  175. if len(sub.Ports) <= i || port != sub.Ports[i] {
  176. portsCorrect = false
  177. break
  178. }
  179. }
  180. }
  181. for _, addr := range sub.Addresses {
  182. if addr.IP == ip {
  183. ipCorrect = len(sub.Addresses) <= count
  184. break
  185. }
  186. }
  187. return true, ipCorrect, portsCorrect
  188. }
  189. // GetMasterServiceUpdateIfNeeded sets service attributes for the
  190. // given apiserver service.
  191. // * GetMasterServiceUpdateIfNeeded expects that the service object it
  192. // manages will be managed only by GetMasterServiceUpdateIfNeeded;
  193. // therefore, to understand this, you need only understand the
  194. // requirements and the body of this function.
  195. // * GetMasterServiceUpdateIfNeeded ensures that the correct ports are
  196. // are set.
  197. //
  198. // Requirements:
  199. // * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only
  200. // GetMasterServiceUpdateIfNeeded to manage service attributes
  201. // * updateMasterService is called periodically from all apiservers.
  202. func GetMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
  203. // Determine if the service is in the format we expect
  204. // (servicePorts are present and service type matches)
  205. formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
  206. if formatCorrect {
  207. return svc, false
  208. }
  209. svc.Spec.Ports = servicePorts
  210. svc.Spec.Type = serviceType
  211. return svc, true
  212. }
  213. // Determine if the service is in the correct format
  214. // GetMasterServiceUpdateIfNeeded expects (servicePorts are correct
  215. // and service type matches).
  216. func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
  217. if s.Spec.Type != serviceType {
  218. return false
  219. }
  220. if len(ports) != len(s.Spec.Ports) {
  221. return false
  222. }
  223. for i, port := range ports {
  224. if port != s.Spec.Ports[i] {
  225. return false
  226. }
  227. }
  228. return true
  229. }