reconciler.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpointslice
  14. import (
  15. "context"
  16. "fmt"
  17. "sort"
  18. "time"
  19. corev1 "k8s.io/api/core/v1"
  20. discovery "k8s.io/api/discovery/v1beta1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. clientset "k8s.io/client-go/kubernetes"
  27. corelisters "k8s.io/client-go/listers/core/v1"
  28. "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
  29. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  30. )
  31. // reconciler is responsible for transforming current EndpointSlice state into
  32. // desired state
  33. type reconciler struct {
  34. client clientset.Interface
  35. nodeLister corelisters.NodeLister
  36. maxEndpointsPerSlice int32
  37. endpointSliceTracker *endpointSliceTracker
  38. metricsCache *metrics.Cache
  39. }
  40. // endpointMeta includes the attributes we group slices on, this type helps with
  41. // that logic in reconciler
  42. type endpointMeta struct {
  43. Ports []discovery.EndpointPort `json:"ports" protobuf:"bytes,2,rep,name=ports"`
  44. AddressType discovery.AddressType `json:"addressType" protobuf:"bytes,3,rep,name=addressType"`
  45. }
  46. // reconcile takes a set of pods currently matching a service selector and
  47. // compares them with the endpoints already present in any existing endpoint
  48. // slices for the given service. It creates, updates, or deletes endpoint slices
  49. // to ensure the desired set of pods are represented by endpoint slices.
  50. func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
  51. addressType := discovery.AddressTypeIPv4
  52. if isIPv6Service(service) {
  53. addressType = discovery.AddressTypeIPv6
  54. }
  55. slicesToCreate := []*discovery.EndpointSlice{}
  56. slicesToUpdate := []*discovery.EndpointSlice{}
  57. slicesToDelete := []*discovery.EndpointSlice{}
  58. // Build data structures for existing state.
  59. existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
  60. numExistingEndpoints := 0
  61. for _, existingSlice := range existingSlices {
  62. if existingSlice.AddressType == addressType {
  63. epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
  64. existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
  65. numExistingEndpoints += len(existingSlice.Endpoints)
  66. } else {
  67. slicesToDelete = append(slicesToDelete, existingSlice)
  68. }
  69. }
  70. // Build data structures for desired state.
  71. desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
  72. desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointSet{}
  73. numDesiredEndpoints := 0
  74. for _, pod := range pods {
  75. if endpointutil.ShouldPodBeInEndpoints(pod) {
  76. endpointPorts := getEndpointPorts(service, pod)
  77. epHash := endpointutil.NewPortMapKey(endpointPorts)
  78. if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
  79. desiredEndpointsByPortMap[epHash] = endpointSet{}
  80. }
  81. if _, ok := desiredMetaByPortMap[epHash]; !ok {
  82. desiredMetaByPortMap[epHash] = &endpointMeta{
  83. AddressType: addressType,
  84. Ports: endpointPorts,
  85. }
  86. }
  87. node, err := r.nodeLister.Get(pod.Spec.NodeName)
  88. if err != nil {
  89. return err
  90. }
  91. endpoint := podToEndpoint(pod, node, service)
  92. if len(endpoint.Addresses) > 0 {
  93. desiredEndpointsByPortMap[epHash].Insert(&endpoint)
  94. numDesiredEndpoints++
  95. }
  96. }
  97. }
  98. spMetrics := metrics.NewServicePortCache()
  99. totalAdded := 0
  100. totalRemoved := 0
  101. // Determine changes necessary for each group of slices by port map.
  102. for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
  103. numEndpoints := len(desiredEndpoints)
  104. pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
  105. service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
  106. totalAdded += added
  107. totalRemoved += removed
  108. spMetrics.Set(portMap, metrics.EfficiencyInfo{
  109. Endpoints: numEndpoints,
  110. Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
  111. })
  112. if len(pmSlicesToCreate) > 0 {
  113. slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
  114. }
  115. if len(pmSlicesToUpdate) > 0 {
  116. slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
  117. }
  118. if len(pmSlicesToDelete) > 0 {
  119. slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
  120. }
  121. }
  122. // If there are unique sets of ports that are no longer desired, mark
  123. // the corresponding endpoint slices for deletion.
  124. for portMap, existingSlices := range existingSlicesByPortMap {
  125. if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
  126. for _, existingSlice := range existingSlices {
  127. slicesToDelete = append(slicesToDelete, existingSlice)
  128. }
  129. }
  130. }
  131. // When no endpoint slices would usually exist, we need to add a placeholder.
  132. if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
  133. placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: addressType})
  134. slicesToCreate = append(slicesToCreate, placeholderSlice)
  135. spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
  136. Endpoints: 0,
  137. Slices: 1,
  138. })
  139. }
  140. metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
  141. metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
  142. serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
  143. r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
  144. return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
  145. }
  146. // finalize creates, updates, and deletes slices as specified
  147. func (r *reconciler) finalize(
  148. service *corev1.Service,
  149. slicesToCreate,
  150. slicesToUpdate,
  151. slicesToDelete []*discovery.EndpointSlice,
  152. triggerTime time.Time,
  153. ) error {
  154. errs := []error{}
  155. // If there are slices to create and delete, change the creates to updates
  156. // of the slices that would otherwise be deleted.
  157. for i := 0; i < len(slicesToDelete); {
  158. if len(slicesToCreate) == 0 {
  159. break
  160. }
  161. sliceToDelete := slicesToDelete[i]
  162. slice := slicesToCreate[len(slicesToCreate)-1]
  163. // Only update EndpointSlices that have the same AddressType as this
  164. // field is considered immutable. Since Services also consider IPFamily
  165. // immutable, the only case where this should matter will be the
  166. // migration from IP to IPv4 and IPv6 AddressTypes, where there's a
  167. // chance EndpointSlices with an IP AddressType would otherwise be
  168. // updated to IPv4 or IPv6 without this check.
  169. if sliceToDelete.AddressType == slice.AddressType {
  170. slice.Name = sliceToDelete.Name
  171. slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
  172. slicesToUpdate = append(slicesToUpdate, slice)
  173. slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
  174. } else {
  175. i++
  176. }
  177. }
  178. for _, endpointSlice := range slicesToCreate {
  179. addTriggerTimeAnnotation(endpointSlice, triggerTime)
  180. createdSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
  181. if err != nil {
  182. // If the namespace is terminating, creates will continue to fail. Simply drop the item.
  183. if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
  184. return nil
  185. }
  186. errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
  187. } else {
  188. r.endpointSliceTracker.Update(createdSlice)
  189. metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
  190. }
  191. }
  192. for _, endpointSlice := range slicesToUpdate {
  193. addTriggerTimeAnnotation(endpointSlice, triggerTime)
  194. updatedSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
  195. if err != nil {
  196. errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
  197. } else {
  198. r.endpointSliceTracker.Update(updatedSlice)
  199. metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
  200. }
  201. }
  202. for _, endpointSlice := range slicesToDelete {
  203. err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, &metav1.DeleteOptions{})
  204. if err != nil {
  205. errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
  206. } else {
  207. r.endpointSliceTracker.Delete(endpointSlice)
  208. metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
  209. }
  210. }
  211. return utilerrors.NewAggregate(errs)
  212. }
  213. // reconcileByPortMapping compares the endpoints found in existing slices with
  214. // the list of desired endpoints and returns lists of slices to create, update,
  215. // and delete. The logic is split up into several main steps:
  216. // 1. Iterate through existing slices, delete endpoints that are no longer
  217. // desired and update matching endpoints that have changed.
  218. // 2. Iterate through slices that have been modified in 1 and fill them up with
  219. // any remaining desired endpoints.
  220. // 3. If there still desired endpoints left, try to fit them into a previously
  221. // unchanged slice and/or create new ones.
  222. func (r *reconciler) reconcileByPortMapping(
  223. service *corev1.Service,
  224. existingSlices []*discovery.EndpointSlice,
  225. desiredSet endpointSet,
  226. endpointMeta *endpointMeta,
  227. ) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, []*discovery.EndpointSlice, int, int) {
  228. slicesByName := map[string]*discovery.EndpointSlice{}
  229. sliceNamesUnchanged := sets.String{}
  230. sliceNamesToUpdate := sets.String{}
  231. sliceNamesToDelete := sets.String{}
  232. numRemoved := 0
  233. // 1. Iterate through existing slices to delete endpoints no longer desired
  234. // and update endpoints that have changed
  235. for _, existingSlice := range existingSlices {
  236. slicesByName[existingSlice.Name] = existingSlice
  237. newEndpoints := []discovery.Endpoint{}
  238. endpointUpdated := false
  239. for _, endpoint := range existingSlice.Endpoints {
  240. got := desiredSet.Get(&endpoint)
  241. // If endpoint is desired add it to list of endpoints to keep.
  242. if got != nil {
  243. newEndpoints = append(newEndpoints, *got)
  244. // If existing version of endpoint doesn't match desired version
  245. // set endpointUpdated to ensure endpoint changes are persisted.
  246. if !endpointsEqualBeyondHash(got, &endpoint) {
  247. endpointUpdated = true
  248. }
  249. // once an endpoint has been placed/found in a slice, it no
  250. // longer needs to be handled
  251. desiredSet.Delete(&endpoint)
  252. }
  253. }
  254. // If an endpoint was updated or removed, mark for update or delete
  255. if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
  256. if len(existingSlice.Endpoints) > len(newEndpoints) {
  257. numRemoved += len(existingSlice.Endpoints) - len(newEndpoints)
  258. }
  259. if len(newEndpoints) == 0 {
  260. // if no endpoints desired in this slice, mark for deletion
  261. sliceNamesToDelete.Insert(existingSlice.Name)
  262. } else {
  263. // otherwise, copy and mark for update
  264. epSlice := existingSlice.DeepCopy()
  265. epSlice.Endpoints = newEndpoints
  266. slicesByName[existingSlice.Name] = epSlice
  267. sliceNamesToUpdate.Insert(epSlice.Name)
  268. }
  269. } else {
  270. // slices with no changes will be useful if there are leftover endpoints
  271. sliceNamesUnchanged.Insert(existingSlice.Name)
  272. }
  273. }
  274. numAdded := desiredSet.Len()
  275. // 2. If we still have desired endpoints to add and slices marked for update,
  276. // iterate through the slices and fill them up with the desired endpoints.
  277. if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 {
  278. slices := []*discovery.EndpointSlice{}
  279. for _, sliceName := range sliceNamesToUpdate.UnsortedList() {
  280. slices = append(slices, slicesByName[sliceName])
  281. }
  282. // Sort endpoint slices by length so we're filling up the fullest ones
  283. // first.
  284. sort.Sort(endpointSliceEndpointLen(slices))
  285. // Iterate through slices and fill them up with desired endpoints.
  286. for _, slice := range slices {
  287. for desiredSet.Len() > 0 && len(slice.Endpoints) < int(r.maxEndpointsPerSlice) {
  288. endpoint, _ := desiredSet.PopAny()
  289. slice.Endpoints = append(slice.Endpoints, *endpoint)
  290. }
  291. }
  292. }
  293. // 3. If there are still desired endpoints left at this point, we try to fit
  294. // the endpoints in a single existing slice. If there are no slices with
  295. // that capacity, we create new slices for the endpoints.
  296. slicesToCreate := []*discovery.EndpointSlice{}
  297. for desiredSet.Len() > 0 {
  298. var sliceToFill *discovery.EndpointSlice
  299. // If the remaining amounts of endpoints is smaller than the max
  300. // endpoints per slice and we have slices that haven't already been
  301. // filled, try to fit them in one.
  302. if desiredSet.Len() < int(r.maxEndpointsPerSlice) && sliceNamesUnchanged.Len() > 0 {
  303. unchangedSlices := []*discovery.EndpointSlice{}
  304. for _, sliceName := range sliceNamesUnchanged.UnsortedList() {
  305. unchangedSlices = append(unchangedSlices, slicesByName[sliceName])
  306. }
  307. sliceToFill = getSliceToFill(unchangedSlices, desiredSet.Len(), int(r.maxEndpointsPerSlice))
  308. }
  309. // If we didn't find a sliceToFill, generate a new empty one.
  310. if sliceToFill == nil {
  311. sliceToFill = newEndpointSlice(service, endpointMeta)
  312. } else {
  313. // deep copy required to modify this slice.
  314. sliceToFill = sliceToFill.DeepCopy()
  315. slicesByName[sliceToFill.Name] = sliceToFill
  316. }
  317. // Fill the slice up with remaining endpoints.
  318. for desiredSet.Len() > 0 && len(sliceToFill.Endpoints) < int(r.maxEndpointsPerSlice) {
  319. endpoint, _ := desiredSet.PopAny()
  320. sliceToFill.Endpoints = append(sliceToFill.Endpoints, *endpoint)
  321. }
  322. // New slices will not have a Name set, use this to determine whether
  323. // this should be an update or create.
  324. if sliceToFill.Name != "" {
  325. sliceNamesToUpdate.Insert(sliceToFill.Name)
  326. sliceNamesUnchanged.Delete(sliceToFill.Name)
  327. } else {
  328. slicesToCreate = append(slicesToCreate, sliceToFill)
  329. }
  330. }
  331. // Build slicesToUpdate from slice names.
  332. slicesToUpdate := []*discovery.EndpointSlice{}
  333. for _, sliceName := range sliceNamesToUpdate.UnsortedList() {
  334. slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName])
  335. }
  336. // Build slicesToDelete from slice names.
  337. slicesToDelete := []*discovery.EndpointSlice{}
  338. for _, sliceName := range sliceNamesToDelete.UnsortedList() {
  339. slicesToDelete = append(slicesToDelete, slicesByName[sliceName])
  340. }
  341. return slicesToCreate, slicesToUpdate, slicesToDelete, numAdded, numRemoved
  342. }
  343. func (r *reconciler) deleteService(namespace, name string) {
  344. r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name})
  345. }