endpointsadapter.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package reconcilers
  14. import (
  15. "context"
  16. corev1 "k8s.io/api/core/v1"
  17. discovery "k8s.io/api/discovery/v1beta1"
  18. apiequality "k8s.io/apimachinery/pkg/api/equality"
  19. "k8s.io/apimachinery/pkg/api/errors"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  22. discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1"
  23. utilnet "k8s.io/utils/net"
  24. )
  25. // EndpointsAdapter provides a simple interface for reading and writing both
  26. // Endpoints and Endpoint Slices.
  27. // NOTE: This is an incomplete adapter implementation that is only suitable for
  28. // use in this package. This takes advantage of the Endpoints used in this
  29. // package always having a consistent set of ports, a single subset, and a small
  30. // set of addresses. Any more complex Endpoints resource would likely translate
  31. // into multiple Endpoint Slices creating significantly more complexity instead
  32. // of the 1:1 mapping this allows.
  33. type EndpointsAdapter struct {
  34. endpointClient corev1client.EndpointsGetter
  35. endpointSliceClient discoveryclient.EndpointSlicesGetter
  36. }
  37. // NewEndpointsAdapter returns a new EndpointsAdapter.
  38. func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter {
  39. return EndpointsAdapter{
  40. endpointClient: endpointClient,
  41. endpointSliceClient: endpointSliceClient,
  42. }
  43. }
  44. // Get takes the name and namespace of the Endpoints resource, and returns a
  45. // corresponding Endpoints object if it exists, and an error if there is any.
  46. func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) {
  47. return adapter.endpointClient.Endpoints(namespace).Get(context.TODO(), name, getOpts)
  48. }
  49. // Create accepts a namespace and Endpoints object and creates the Endpoints
  50. // object. If an endpointSliceClient exists, a matching EndpointSlice will also
  51. // be created or updated. The created Endpoints object or an error will be
  52. // returned.
  53. func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
  54. endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
  55. if err == nil {
  56. err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
  57. }
  58. return endpoints, err
  59. }
  60. // Update accepts a namespace and Endpoints object and updates it. If an
  61. // endpointSliceClient exists, a matching EndpointSlice will also be created or
  62. // updated. The updated Endpoints object or an error will be returned.
  63. func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
  64. endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(context.TODO(), endpoints, metav1.UpdateOptions{})
  65. if err == nil {
  66. err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
  67. }
  68. return endpoints, err
  69. }
  70. // EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
  71. // and creates or updates a corresponding EndpointSlice if an endpointSliceClient
  72. // exists. An error will be returned if it fails to sync the EndpointSlice.
  73. func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
  74. if adapter.endpointSliceClient == nil {
  75. return nil
  76. }
  77. endpointSlice := endpointSliceFromEndpoints(endpoints)
  78. currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(context.TODO(), endpointSlice.Name, metav1.GetOptions{})
  79. if err != nil {
  80. if errors.IsNotFound(err) {
  81. if _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
  82. err = nil
  83. }
  84. }
  85. return err
  86. }
  87. // required for transition from IP to IPv4 address type.
  88. if currentEndpointSlice.AddressType != endpointSlice.AddressType {
  89. err = adapter.endpointSliceClient.EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, &metav1.DeleteOptions{})
  90. if err != nil {
  91. return err
  92. }
  93. _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
  94. return err
  95. }
  96. if apiequality.Semantic.DeepEqual(currentEndpointSlice.Endpoints, endpointSlice.Endpoints) &&
  97. apiequality.Semantic.DeepEqual(currentEndpointSlice.Ports, endpointSlice.Ports) &&
  98. apiequality.Semantic.DeepEqual(currentEndpointSlice.Labels, endpointSlice.Labels) {
  99. return nil
  100. }
  101. _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
  102. return err
  103. }
  104. // endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
  105. // resource.
  106. func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
  107. endpointSlice := &discovery.EndpointSlice{}
  108. endpointSlice.Name = endpoints.Name
  109. endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name}
  110. // TODO: Add support for dual stack here (and in the rest of
  111. // EndpointsAdapter).
  112. endpointSlice.AddressType = discovery.AddressTypeIPv4
  113. if len(endpoints.Subsets) > 0 {
  114. subset := endpoints.Subsets[0]
  115. for i := range subset.Ports {
  116. endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
  117. Port: &subset.Ports[i].Port,
  118. Name: &subset.Ports[i].Name,
  119. Protocol: &subset.Ports[i].Protocol,
  120. })
  121. }
  122. if allAddressesIPv6(append(subset.Addresses, subset.NotReadyAddresses...)) {
  123. endpointSlice.AddressType = discovery.AddressTypeIPv6
  124. }
  125. endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.Addresses, endpointSlice.AddressType, true)...)
  126. endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.NotReadyAddresses, endpointSlice.AddressType, false)...)
  127. }
  128. return endpointSlice
  129. }
  130. // getEndpointsFromAddresses returns a list of Endpoints from addresses that
  131. // match the provided address type.
  132. func getEndpointsFromAddresses(addresses []corev1.EndpointAddress, addressType discovery.AddressType, ready bool) []discovery.Endpoint {
  133. endpoints := []discovery.Endpoint{}
  134. isIPv6AddressType := addressType == discovery.AddressTypeIPv6
  135. for _, address := range addresses {
  136. if utilnet.IsIPv6String(address.IP) == isIPv6AddressType {
  137. endpoints = append(endpoints, endpointFromAddress(address, ready))
  138. }
  139. }
  140. return endpoints
  141. }
  142. // endpointFromAddress generates an Endpoint from an EndpointAddress resource.
  143. func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
  144. topology := map[string]string{}
  145. if address.NodeName != nil {
  146. topology["kubernetes.io/hostname"] = *address.NodeName
  147. }
  148. return discovery.Endpoint{
  149. Addresses: []string{address.IP},
  150. Conditions: discovery.EndpointConditions{Ready: &ready},
  151. TargetRef: address.TargetRef,
  152. Topology: topology,
  153. }
  154. }
  155. // allAddressesIPv6 returns true if all provided addresses are IPv6.
  156. func allAddressesIPv6(addresses []corev1.EndpointAddress) bool {
  157. if len(addresses) == 0 {
  158. return false
  159. }
  160. for _, address := range addresses {
  161. if !utilnet.IsIPv6String(address.IP) {
  162. return false
  163. }
  164. }
  165. return true
  166. }