endpointslicecache.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package proxy
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sort"
  18. "strings"
  19. "sync"
  20. "k8s.io/api/core/v1"
  21. discovery "k8s.io/api/discovery/v1beta1"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/client-go/tools/record"
  24. "k8s.io/klog"
  25. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  26. utilnet "k8s.io/utils/net"
  27. )
  28. // EndpointSliceCache is used as a cache of EndpointSlice information.
  29. type EndpointSliceCache struct {
  30. // lock protects trackerByServiceMap.
  31. lock sync.Mutex
  32. // trackerByServiceMap is the basis of this cache. It contains endpoint
  33. // slice trackers grouped by service name and endpoint slice name. The first
  34. // key represents a namespaced service name while the second key represents
  35. // an endpoint slice name. Since endpoints can move between slices, we
  36. // require slice specific caching to prevent endpoints being removed from
  37. // the cache when they may have just moved to a different slice.
  38. trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker
  39. makeEndpointInfo makeEndpointFunc
  40. hostname string
  41. isIPv6Mode *bool
  42. recorder record.EventRecorder
  43. }
  44. // endpointSliceTracker keeps track of EndpointSlices as they have been applied
  45. // by a proxier along with any pending EndpointSlices that have been updated
  46. // in this cache but not yet applied by a proxier.
  47. type endpointSliceTracker struct {
  48. applied endpointSliceInfoByName
  49. pending endpointSliceInfoByName
  50. }
  51. // endpointSliceInfoByName groups endpointSliceInfo by the names of the
  52. // corresponding EndpointSlices.
  53. type endpointSliceInfoByName map[string]*endpointSliceInfo
  54. // endpointSliceInfo contains just the attributes kube-proxy cares about.
  55. // Used for caching. Intentionally small to limit memory util.
  56. type endpointSliceInfo struct {
  57. Ports []discovery.EndpointPort
  58. Endpoints []*endpointInfo
  59. Remove bool
  60. }
  61. // endpointInfo contains just the attributes kube-proxy cares about.
  62. // Used for caching. Intentionally small to limit memory util.
  63. // Addresses and Topology are copied from EndpointSlice Endpoints.
  64. type endpointInfo struct {
  65. Addresses []string
  66. Topology map[string]string
  67. }
  68. // spToEndpointMap stores groups Endpoint objects by ServicePortName and
  69. // EndpointSlice name.
  70. type spToEndpointMap map[ServicePortName]map[string]Endpoint
  71. // NewEndpointSliceCache initializes an EndpointSliceCache.
  72. func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
  73. if makeEndpointInfo == nil {
  74. makeEndpointInfo = standardEndpointInfo
  75. }
  76. return &EndpointSliceCache{
  77. trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
  78. hostname: hostname,
  79. isIPv6Mode: isIPv6Mode,
  80. makeEndpointInfo: makeEndpointInfo,
  81. recorder: recorder,
  82. }
  83. }
  84. // newEndpointSliceTracker initializes an endpointSliceTracker.
  85. func newEndpointSliceTracker() *endpointSliceTracker {
  86. return &endpointSliceTracker{
  87. applied: endpointSliceInfoByName{},
  88. pending: endpointSliceInfoByName{},
  89. }
  90. }
  91. // newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
  92. func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
  93. esInfo := &endpointSliceInfo{
  94. Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
  95. Endpoints: []*endpointInfo{},
  96. Remove: remove,
  97. }
  98. // copy here to avoid mutating shared EndpointSlice object.
  99. copy(esInfo.Ports, endpointSlice.Ports)
  100. sort.Sort(byPort(esInfo.Ports))
  101. if !remove {
  102. for _, endpoint := range endpointSlice.Endpoints {
  103. if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
  104. esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
  105. Addresses: endpoint.Addresses,
  106. Topology: endpoint.Topology,
  107. })
  108. }
  109. }
  110. sort.Sort(byAddress(esInfo.Endpoints))
  111. }
  112. return esInfo
  113. }
  114. // standardEndpointInfo is the default makeEndpointFunc.
  115. func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
  116. return ep
  117. }
  118. // updatePending updates a pending slice in the cache.
  119. func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
  120. serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
  121. if err != nil {
  122. klog.Warningf("Error getting endpoint slice cache keys: %v", err)
  123. return false
  124. }
  125. esInfo := newEndpointSliceInfo(endpointSlice, remove)
  126. cache.lock.Lock()
  127. defer cache.lock.Unlock()
  128. if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
  129. cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
  130. }
  131. changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo)
  132. if changed {
  133. cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo
  134. }
  135. return changed
  136. }
  137. // checkoutChanges returns a list of all endpointsChanges that are
  138. // pending and then marks them as applied.
  139. func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
  140. changes := []*endpointsChange{}
  141. cache.lock.Lock()
  142. defer cache.lock.Unlock()
  143. for serviceNN, esTracker := range cache.trackerByServiceMap {
  144. if len(esTracker.pending) == 0 {
  145. continue
  146. }
  147. change := &endpointsChange{}
  148. change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
  149. for name, sliceInfo := range esTracker.pending {
  150. if sliceInfo.Remove {
  151. delete(esTracker.applied, name)
  152. } else {
  153. esTracker.applied[name] = sliceInfo
  154. }
  155. delete(esTracker.pending, name)
  156. }
  157. change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
  158. changes = append(changes, change)
  159. }
  160. return changes
  161. }
  162. // getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices.
  163. func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap {
  164. endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName)
  165. return endpointsMapFromEndpointInfo(endpointInfoBySP)
  166. }
  167. // endpointInfoByServicePort groups endpoint info by service port name and address.
  168. func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap {
  169. endpointInfoBySP := spToEndpointMap{}
  170. for _, sliceInfo := range sliceInfoByName {
  171. for _, port := range sliceInfo.Ports {
  172. if port.Name == nil {
  173. klog.Warningf("ignoring port with nil name %v", port)
  174. continue
  175. }
  176. // TODO: handle nil ports to mean "all"
  177. if port.Port == nil || *port.Port == int32(0) {
  178. klog.Warningf("ignoring invalid endpoint port %s", *port.Name)
  179. continue
  180. }
  181. svcPortName := ServicePortName{
  182. NamespacedName: serviceNN,
  183. Port: *port.Name,
  184. Protocol: *port.Protocol,
  185. }
  186. endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
  187. }
  188. }
  189. return endpointInfoBySP
  190. }
  191. // addEndpointsByIP adds endpointInfo for each IP.
  192. func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
  193. if endpointsByIP == nil {
  194. endpointsByIP = map[string]Endpoint{}
  195. }
  196. // iterate through endpoints to add them to endpointsByIP.
  197. for _, endpoint := range endpoints {
  198. if len(endpoint.Addresses) == 0 {
  199. klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint)
  200. continue
  201. }
  202. // Filter out the incorrect IP version case. Any endpoint port that
  203. // contains incorrect IP version will be ignored.
  204. if cache.isIPv6Mode != nil && utilnet.IsIPv6String(endpoint.Addresses[0]) != *cache.isIPv6Mode {
  205. // Emit event on the corresponding service which had a different IP
  206. // version than the endpoint.
  207. utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Namespace, serviceNN.Name, "")
  208. continue
  209. }
  210. isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
  211. endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology)
  212. // This logic ensures we're deduping potential overlapping endpoints
  213. // isLocal should not vary between matching IPs, but if it does, we
  214. // favor a true value here if it exists.
  215. if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal {
  216. endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo)
  217. }
  218. }
  219. return endpointsByIP
  220. }
  221. func (cache *EndpointSliceCache) isLocal(hostname string) bool {
  222. return len(cache.hostname) > 0 && hostname == cache.hostname
  223. }
  224. // esInfoChanged returns true if the esInfo parameter should be set as a new
  225. // pending value in the cache.
  226. func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool {
  227. if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
  228. appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
  229. pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
  230. // If there's already a pending value, return whether or not this would
  231. // change that.
  232. if pendingOk {
  233. return !reflect.DeepEqual(esInfo, pendingInfo)
  234. }
  235. // If there's already an applied value, return whether or not this would
  236. // change that.
  237. if appliedOk {
  238. return !reflect.DeepEqual(esInfo, appliedInfo)
  239. }
  240. }
  241. // If this is marked for removal and does not exist in the cache, no changes
  242. // are necessary.
  243. if esInfo.Remove {
  244. return false
  245. }
  246. // If not in the cache, and not marked for removal, it should be added.
  247. return true
  248. }
  249. // endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
  250. // has been grouped by service port and IP.
  251. func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
  252. endpointsMap := EndpointsMap{}
  253. // transform endpointInfoByServicePort into an endpointsMap with sorted IPs.
  254. for svcPortName, endpointInfoByIP := range endpointInfoBySP {
  255. if len(endpointInfoByIP) > 0 {
  256. endpointsMap[svcPortName] = []Endpoint{}
  257. for _, endpointInfo := range endpointInfoByIP {
  258. endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
  259. }
  260. // Ensure IPs are always returned in the same order to simplify diffing.
  261. sort.Sort(byIP(endpointsMap[svcPortName]))
  262. klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName]))
  263. }
  264. }
  265. return endpointsMap
  266. }
  267. // formatEndpointsList returns a string list converted from an endpoints list.
  268. func formatEndpointsList(endpoints []Endpoint) []string {
  269. var formattedList []string
  270. for _, ep := range endpoints {
  271. formattedList = append(formattedList, ep.String())
  272. }
  273. return formattedList
  274. }
  275. // endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
  276. func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) {
  277. var err error
  278. serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
  279. if !ok || serviceName == "" {
  280. err = fmt.Errorf("No %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name)
  281. } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" {
  282. err = fmt.Errorf("Expected EndpointSlice name and namespace to be set: %v", endpointSlice)
  283. }
  284. return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
  285. }
  286. // byAddress helps sort endpointInfo
  287. type byAddress []*endpointInfo
  288. func (e byAddress) Len() int {
  289. return len(e)
  290. }
  291. func (e byAddress) Swap(i, j int) {
  292. e[i], e[j] = e[j], e[i]
  293. }
  294. func (e byAddress) Less(i, j int) bool {
  295. return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
  296. }
  297. // byIP helps sort endpoints by IP
  298. type byIP []Endpoint
  299. func (e byIP) Len() int {
  300. return len(e)
  301. }
  302. func (e byIP) Swap(i, j int) {
  303. e[i], e[j] = e[j], e[i]
  304. }
  305. func (e byIP) Less(i, j int) bool {
  306. return e[i].String() < e[j].String()
  307. }
  308. // byPort helps sort EndpointSlice ports by port number
  309. type byPort []discovery.EndpointPort
  310. func (p byPort) Len() int {
  311. return len(p)
  312. }
  313. func (p byPort) Swap(i, j int) {
  314. p[i], p[j] = p[j], p[i]
  315. }
  316. func (p byPort) Less(i, j int) bool {
  317. return *p[i].Port < *p[j].Port
  318. }