endpoints.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package proxy
  14. import (
  15. "net"
  16. "reflect"
  17. "strconv"
  18. "sync"
  19. "time"
  20. "k8s.io/klog"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. "k8s.io/client-go/tools/record"
  25. "k8s.io/kubernetes/pkg/proxy/metrics"
  26. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  27. utilnet "k8s.io/utils/net"
  28. )
  29. // BaseEndpointInfo contains base information that defines an endpoint.
  30. // This could be used directly by proxier while processing endpoints,
  31. // or can be used for constructing a more specific EndpointInfo struct
  32. // defined by the proxier if needed.
  33. type BaseEndpointInfo struct {
  34. Endpoint string // TODO: should be an endpointString type
  35. // IsLocal indicates whether the endpoint is running in same host as kube-proxy.
  36. IsLocal bool
  37. }
  38. var _ Endpoint = &BaseEndpointInfo{}
  39. // String is part of proxy.Endpoint interface.
  40. func (info *BaseEndpointInfo) String() string {
  41. return info.Endpoint
  42. }
  43. // GetIsLocal is part of proxy.Endpoint interface.
  44. func (info *BaseEndpointInfo) GetIsLocal() bool {
  45. return info.IsLocal
  46. }
  47. // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
  48. func (info *BaseEndpointInfo) IP() string {
  49. return utilproxy.IPPart(info.Endpoint)
  50. }
  51. // Port returns just the Port part of the endpoint.
  52. func (info *BaseEndpointInfo) Port() (int, error) {
  53. return utilproxy.PortPart(info.Endpoint)
  54. }
  55. // Equal is part of proxy.Endpoint interface.
  56. func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
  57. return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
  58. }
  59. func newBaseEndpointInfo(IP string, port int, isLocal bool) *BaseEndpointInfo {
  60. return &BaseEndpointInfo{
  61. Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
  62. IsLocal: isLocal,
  63. }
  64. }
  65. type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint
  66. // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
  67. // Endpoints, keyed by their namespace and name.
  68. type EndpointChangeTracker struct {
  69. // lock protects items.
  70. lock sync.Mutex
  71. // hostname is the host where kube-proxy is running.
  72. hostname string
  73. // items maps a service to is endpointsChange.
  74. items map[types.NamespacedName]*endpointsChange
  75. // makeEndpointInfo allows proxier to inject customized information when processing endpoint.
  76. makeEndpointInfo makeEndpointFunc
  77. // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
  78. isIPv6Mode *bool
  79. recorder record.EventRecorder
  80. // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
  81. // object to change. Used to calculate the network-programming-latency.
  82. lastChangeTriggerTimes map[types.NamespacedName][]time.Time
  83. }
  84. // NewEndpointChangeTracker initializes an EndpointsChangeMap
  85. func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
  86. return &EndpointChangeTracker{
  87. hostname: hostname,
  88. items: make(map[types.NamespacedName]*endpointsChange),
  89. makeEndpointInfo: makeEndpointInfo,
  90. isIPv6Mode: isIPv6Mode,
  91. recorder: recorder,
  92. lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
  93. }
  94. }
  95. // Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true
  96. // if items changed, otherwise return false. Update can be used to add/update/delete items of EndpointsChangeMap. For example,
  97. // Add item
  98. // - pass <nil, endpoints> as the <previous, current> pair.
  99. // Update item
  100. // - pass <oldEndpoints, endpoints> as the <previous, current> pair.
  101. // Delete item
  102. // - pass <endpoints, nil> as the <previous, current> pair.
  103. func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
  104. endpoints := current
  105. if endpoints == nil {
  106. endpoints = previous
  107. }
  108. // previous == nil && current == nil is unexpected, we should return false directly.
  109. if endpoints == nil {
  110. return false
  111. }
  112. metrics.EndpointChangesTotal.Inc()
  113. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  114. ect.lock.Lock()
  115. defer ect.lock.Unlock()
  116. change, exists := ect.items[namespacedName]
  117. if !exists {
  118. change = &endpointsChange{}
  119. change.previous = ect.endpointsToEndpointsMap(previous)
  120. ect.items[namespacedName] = change
  121. }
  122. if t := getLastChangeTriggerTime(endpoints); !t.IsZero() {
  123. ect.lastChangeTriggerTimes[namespacedName] =
  124. append(ect.lastChangeTriggerTimes[namespacedName], t)
  125. }
  126. change.current = ect.endpointsToEndpointsMap(current)
  127. // if change.previous equal to change.current, it means no change
  128. if reflect.DeepEqual(change.previous, change.current) {
  129. delete(ect.items, namespacedName)
  130. // Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
  131. // SLI is defined as the duration between a time of an event and a time when the network was
  132. // programmed to incorporate that event, if there are events that happened between two
  133. // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
  134. // there will be no network programming for them and thus no network programming latency metric
  135. // should be exported.
  136. delete(ect.lastChangeTriggerTimes, namespacedName)
  137. }
  138. metrics.EndpointChangesPending.Set(float64(len(ect.items)))
  139. return len(ect.items) > 0
  140. }
  141. // getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
  142. // annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
  143. // or was set incorrectly.
  144. func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
  145. if _, ok := endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
  146. // It's possible that the Endpoints object won't have the EndpointsLastChangeTriggerTime
  147. // annotation set. In that case return the 'zero value', which is ignored in the upstream code.
  148. return time.Time{}
  149. }
  150. val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime])
  151. if err != nil {
  152. klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
  153. endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err)
  154. // In case of error val = time.Zero, which is ignored in the upstream code.
  155. }
  156. return val
  157. }
  158. // endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object,
  159. // changes are accumulated, i.e. previous is state from before applying the changes,
  160. // current is state after applying the changes.
  161. type endpointsChange struct {
  162. previous EndpointsMap
  163. current EndpointsMap
  164. }
  165. // UpdateEndpointMapResult is the updated results after applying endpoints changes.
  166. type UpdateEndpointMapResult struct {
  167. // HCEndpointsLocalIPSize maps an endpoints name to the length of its local IPs.
  168. HCEndpointsLocalIPSize map[types.NamespacedName]int
  169. // StaleEndpoints identifies if an endpoints service pair is stale.
  170. StaleEndpoints []ServiceEndpoint
  171. // StaleServiceNames identifies if a service is stale.
  172. StaleServiceNames []ServicePortName
  173. // List of the trigger times for all endpoints objects that changed. It's used to export the
  174. // network programming latency.
  175. LastChangeTriggerTimes []time.Time
  176. }
  177. // UpdateEndpointsMap updates endpointsMap base on the given changes.
  178. func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
  179. result.StaleEndpoints = make([]ServiceEndpoint, 0)
  180. result.StaleServiceNames = make([]ServicePortName, 0)
  181. result.LastChangeTriggerTimes = make([]time.Time, 0)
  182. em.apply(
  183. changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
  184. // TODO: If this will appear to be computationally expensive, consider
  185. // computing this incrementally similarly to endpointsMap.
  186. result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
  187. localIPs := em.getLocalEndpointIPs()
  188. for nsn, ips := range localIPs {
  189. result.HCEndpointsLocalIPSize[nsn] = len(ips)
  190. }
  191. return result
  192. }
  193. // EndpointsMap maps a service name to a list of all its Endpoints.
  194. type EndpointsMap map[ServicePortName][]Endpoint
  195. // endpointsToEndpointsMap translates single Endpoints object to EndpointsMap.
  196. // This function is used for incremental updated of endpointsMap.
  197. //
  198. // NOTE: endpoints object should NOT be modified.
  199. func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoints) EndpointsMap {
  200. if endpoints == nil {
  201. return nil
  202. }
  203. endpointsMap := make(EndpointsMap)
  204. // We need to build a map of portname -> all ip:ports for that
  205. // portname. Explode Endpoints.Subsets[*] into this structure.
  206. for i := range endpoints.Subsets {
  207. ss := &endpoints.Subsets[i]
  208. for i := range ss.Ports {
  209. port := &ss.Ports[i]
  210. if port.Port == 0 {
  211. klog.Warningf("ignoring invalid endpoint port %s", port.Name)
  212. continue
  213. }
  214. svcPortName := ServicePortName{
  215. NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
  216. Port: port.Name,
  217. }
  218. for i := range ss.Addresses {
  219. addr := &ss.Addresses[i]
  220. if addr.IP == "" {
  221. klog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
  222. continue
  223. }
  224. // Filter out the incorrect IP version case.
  225. // Any endpoint port that contains incorrect IP version will be ignored.
  226. if ect.isIPv6Mode != nil && utilnet.IsIPv6String(addr.IP) != *ect.isIPv6Mode {
  227. // Emit event on the corresponding service which had a different
  228. // IP version than the endpoint.
  229. utilproxy.LogAndEmitIncorrectIPVersionEvent(ect.recorder, "endpoints", addr.IP, endpoints.Name, endpoints.Namespace, "")
  230. continue
  231. }
  232. isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
  233. baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal)
  234. if ect.makeEndpointInfo != nil {
  235. endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
  236. } else {
  237. endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
  238. }
  239. }
  240. if klog.V(3) {
  241. newEPList := []string{}
  242. for _, ep := range endpointsMap[svcPortName] {
  243. newEPList = append(newEPList, ep.String())
  244. }
  245. klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
  246. }
  247. }
  248. }
  249. return endpointsMap
  250. }
  251. // apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
  252. // is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
  253. // The changes map is cleared after applying them.
  254. // In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
  255. // that were changed and will result in syncing the proxy rules.
  256. func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
  257. staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
  258. if changes == nil {
  259. return
  260. }
  261. changes.lock.Lock()
  262. defer changes.lock.Unlock()
  263. for _, change := range changes.items {
  264. em.unmerge(change.previous)
  265. em.merge(change.current)
  266. detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
  267. }
  268. changes.items = make(map[types.NamespacedName]*endpointsChange)
  269. metrics.EndpointChangesPending.Set(0)
  270. for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
  271. *lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
  272. }
  273. changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
  274. }
  275. // Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
  276. func (em EndpointsMap) merge(other EndpointsMap) {
  277. for svcPortName := range other {
  278. em[svcPortName] = other[svcPortName]
  279. }
  280. }
  281. // Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
  282. func (em EndpointsMap) unmerge(other EndpointsMap) {
  283. for svcPortName := range other {
  284. delete(em, svcPortName)
  285. }
  286. }
  287. // GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
  288. func (em EndpointsMap) getLocalEndpointIPs() map[types.NamespacedName]sets.String {
  289. localIPs := make(map[types.NamespacedName]sets.String)
  290. for svcPortName, epList := range em {
  291. for _, ep := range epList {
  292. if ep.GetIsLocal() {
  293. nsn := svcPortName.NamespacedName
  294. if localIPs[nsn] == nil {
  295. localIPs[nsn] = sets.NewString()
  296. }
  297. localIPs[nsn].Insert(ep.IP())
  298. }
  299. }
  300. }
  301. return localIPs
  302. }
  303. // detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
  304. // is used to store stale udp service in order to clear udp conntrack later.
  305. func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
  306. for svcPortName, epList := range oldEndpointsMap {
  307. for _, ep := range epList {
  308. stale := true
  309. for i := range newEndpointsMap[svcPortName] {
  310. if newEndpointsMap[svcPortName][i].Equal(ep) {
  311. stale = false
  312. break
  313. }
  314. }
  315. if stale {
  316. klog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.String())
  317. *staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
  318. }
  319. }
  320. }
  321. for svcPortName, epList := range newEndpointsMap {
  322. // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
  323. if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
  324. *staleServiceNames = append(*staleServiceNames, svcPortName)
  325. }
  326. }
  327. }