roundrobin.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "errors"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/klog"
  25. "k8s.io/kubernetes/pkg/proxy"
  26. "k8s.io/kubernetes/pkg/util/slice"
  27. )
  28. var (
  29. ErrMissingServiceEntry = errors.New("missing service entry")
  30. ErrMissingEndpoints = errors.New("missing endpoints")
  31. )
  32. type affinityState struct {
  33. clientIP string
  34. //clientProtocol api.Protocol //not yet used
  35. //sessionCookie string //not yet used
  36. endpoint string
  37. lastUsed time.Time
  38. }
  39. type affinityPolicy struct {
  40. affinityType v1.ServiceAffinity
  41. affinityMap map[string]*affinityState // map client IP -> affinity info
  42. ttlSeconds int
  43. }
  44. // LoadBalancerRR is a round-robin load balancer.
  45. type LoadBalancerRR struct {
  46. lock sync.RWMutex
  47. services map[proxy.ServicePortName]*balancerState
  48. }
  49. // Ensure this implements LoadBalancer.
  50. var _ LoadBalancer = &LoadBalancerRR{}
  51. type balancerState struct {
  52. endpoints []string // a list of "ip:port" style strings
  53. index int // current index into endpoints
  54. affinity affinityPolicy
  55. }
  56. func newAffinityPolicy(affinityType v1.ServiceAffinity, ttlSeconds int) *affinityPolicy {
  57. return &affinityPolicy{
  58. affinityType: affinityType,
  59. affinityMap: make(map[string]*affinityState),
  60. ttlSeconds: ttlSeconds,
  61. }
  62. }
  63. // NewLoadBalancerRR returns a new LoadBalancerRR.
  64. func NewLoadBalancerRR() *LoadBalancerRR {
  65. return &LoadBalancerRR{
  66. services: map[proxy.ServicePortName]*balancerState{},
  67. }
  68. }
  69. func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error {
  70. klog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
  71. lb.lock.Lock()
  72. defer lb.lock.Unlock()
  73. lb.newServiceInternal(svcPort, affinityType, ttlSeconds)
  74. return nil
  75. }
  76. // This assumes that lb.lock is already held.
  77. func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) *balancerState {
  78. if ttlSeconds == 0 {
  79. ttlSeconds = int(v1.DefaultClientIPServiceAffinitySeconds) //default to 3 hours if not specified. Should 0 be unlimited instead????
  80. }
  81. if _, exists := lb.services[svcPort]; !exists {
  82. lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)}
  83. klog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
  84. } else if affinityType != "" {
  85. lb.services[svcPort].affinity.affinityType = affinityType
  86. }
  87. return lb.services[svcPort]
  88. }
  89. func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
  90. klog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
  91. lb.lock.Lock()
  92. defer lb.lock.Unlock()
  93. delete(lb.services, svcPort)
  94. }
  95. // return true if this service is using some form of session affinity.
  96. func isSessionAffinity(affinity *affinityPolicy) bool {
  97. // Should never be empty string, but checking for it to be safe.
  98. if affinity.affinityType == "" || affinity.affinityType == v1.ServiceAffinityNone {
  99. return false
  100. }
  101. return true
  102. }
  103. // ServiceHasEndpoints checks whether a service entry has endpoints.
  104. func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort proxy.ServicePortName) bool {
  105. lb.lock.RLock()
  106. defer lb.lock.RUnlock()
  107. state, exists := lb.services[svcPort]
  108. // TODO: while nothing ever assigns nil to the map, *some* of the code using the map
  109. // checks for it. The code should all follow the same convention.
  110. return exists && state != nil && len(state.endpoints) > 0
  111. }
  112. // NextEndpoint returns a service endpoint.
  113. // The service endpoint is chosen using the round-robin algorithm.
  114. func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {
  115. // Coarse locking is simple. We can get more fine-grained if/when we
  116. // can prove it matters.
  117. lb.lock.Lock()
  118. defer lb.lock.Unlock()
  119. state, exists := lb.services[svcPort]
  120. if !exists || state == nil {
  121. return "", ErrMissingServiceEntry
  122. }
  123. if len(state.endpoints) == 0 {
  124. return "", ErrMissingEndpoints
  125. }
  126. klog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints)
  127. sessionAffinityEnabled := isSessionAffinity(&state.affinity)
  128. var ipaddr string
  129. if sessionAffinityEnabled {
  130. // Caution: don't shadow ipaddr
  131. var err error
  132. ipaddr, _, err = net.SplitHostPort(srcAddr.String())
  133. if err != nil {
  134. return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
  135. }
  136. if !sessionAffinityReset {
  137. sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
  138. if exists && int(time.Since(sessionAffinity.lastUsed).Seconds()) < state.affinity.ttlSeconds {
  139. // Affinity wins.
  140. endpoint := sessionAffinity.endpoint
  141. sessionAffinity.lastUsed = time.Now()
  142. klog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
  143. return endpoint, nil
  144. }
  145. }
  146. }
  147. // Take the next endpoint.
  148. endpoint := state.endpoints[state.index]
  149. state.index = (state.index + 1) % len(state.endpoints)
  150. if sessionAffinityEnabled {
  151. var affinity *affinityState
  152. affinity = state.affinity.affinityMap[ipaddr]
  153. if affinity == nil {
  154. affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()}
  155. state.affinity.affinityMap[ipaddr] = affinity
  156. }
  157. affinity.lastUsed = time.Now()
  158. affinity.endpoint = endpoint
  159. affinity.clientIP = ipaddr
  160. klog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr])
  161. }
  162. return endpoint, nil
  163. }
  164. type hostPortPair struct {
  165. host string
  166. port int
  167. }
  168. func isValidEndpoint(hpp *hostPortPair) bool {
  169. return hpp.host != "" && hpp.port > 0
  170. }
  171. func flattenValidEndpoints(endpoints []hostPortPair) []string {
  172. // Convert Endpoint objects into strings for easier use later. Ignore
  173. // the protocol field - we'll get that from the Service objects.
  174. var result []string
  175. for i := range endpoints {
  176. hpp := &endpoints[i]
  177. if isValidEndpoint(hpp) {
  178. result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
  179. }
  180. }
  181. return result
  182. }
  183. // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
  184. func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
  185. for _, affinity := range state.affinity.affinityMap {
  186. if affinity.endpoint == endpoint {
  187. klog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort)
  188. delete(state.affinity.affinityMap, affinity.clientIP)
  189. }
  190. }
  191. }
  192. // Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
  193. // Then remove any session affinity records that are not in both lists.
  194. // This assumes the lb.lock is held.
  195. func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEndpoints []string) {
  196. allEndpoints := map[string]int{}
  197. for _, newEndpoint := range newEndpoints {
  198. allEndpoints[newEndpoint] = 1
  199. }
  200. state, exists := lb.services[svcPort]
  201. if !exists {
  202. return
  203. }
  204. for _, existingEndpoint := range state.endpoints {
  205. allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1
  206. }
  207. for mKey, mVal := range allEndpoints {
  208. if mVal == 1 {
  209. klog.V(2).Infof("Delete endpoint %s for service %q", mKey, svcPort)
  210. removeSessionAffinityByEndpoint(state, svcPort, mKey)
  211. }
  212. }
  213. }
  214. // buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
  215. // portname. Expode Endpoints.Subsets[*] into this structure.
  216. func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
  217. portsToEndpoints := map[string][]hostPortPair{}
  218. for i := range endpoints.Subsets {
  219. ss := &endpoints.Subsets[i]
  220. for i := range ss.Ports {
  221. port := &ss.Ports[i]
  222. for i := range ss.Addresses {
  223. addr := &ss.Addresses[i]
  224. portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
  225. // Ignore the protocol field - we'll get that from the Service objects.
  226. }
  227. }
  228. }
  229. return portsToEndpoints
  230. }
  231. func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
  232. portsToEndpoints := buildPortsToEndpointsMap(endpoints)
  233. lb.lock.Lock()
  234. defer lb.lock.Unlock()
  235. for portname := range portsToEndpoints {
  236. svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
  237. newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
  238. state, exists := lb.services[svcPort]
  239. if !exists || state == nil || len(newEndpoints) > 0 {
  240. klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
  241. lb.updateAffinityMap(svcPort, newEndpoints)
  242. // OnEndpointsAdd can be called without NewService being called externally.
  243. // To be safe we will call it here. A new service will only be created
  244. // if one does not already exist. The affinity will be updated
  245. // later, once NewService is called.
  246. state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0)
  247. state.endpoints = slice.ShuffleStrings(newEndpoints)
  248. // Reset the round-robin index.
  249. state.index = 0
  250. }
  251. }
  252. }
  253. func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  254. portsToEndpoints := buildPortsToEndpointsMap(endpoints)
  255. oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
  256. registeredEndpoints := make(map[proxy.ServicePortName]bool)
  257. lb.lock.Lock()
  258. defer lb.lock.Unlock()
  259. for portname := range portsToEndpoints {
  260. svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
  261. newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
  262. state, exists := lb.services[svcPort]
  263. curEndpoints := []string{}
  264. if state != nil {
  265. curEndpoints = state.endpoints
  266. }
  267. if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
  268. klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
  269. lb.updateAffinityMap(svcPort, newEndpoints)
  270. // OnEndpointsUpdate can be called without NewService being called externally.
  271. // To be safe we will call it here. A new service will only be created
  272. // if one does not already exist. The affinity will be updated
  273. // later, once NewService is called.
  274. state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0)
  275. state.endpoints = slice.ShuffleStrings(newEndpoints)
  276. // Reset the round-robin index.
  277. state.index = 0
  278. }
  279. registeredEndpoints[svcPort] = true
  280. }
  281. // Now remove all endpoints missing from the update.
  282. for portname := range oldPortsToEndpoints {
  283. svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: oldEndpoints.Namespace, Name: oldEndpoints.Name}, Port: portname}
  284. if _, exists := registeredEndpoints[svcPort]; !exists {
  285. lb.resetService(svcPort)
  286. }
  287. }
  288. }
  289. func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) {
  290. // If the service is still around, reset but don't delete.
  291. if state, ok := lb.services[svcPort]; ok {
  292. if len(state.endpoints) > 0 {
  293. klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
  294. state.endpoints = []string{}
  295. }
  296. state.index = 0
  297. state.affinity.affinityMap = map[string]*affinityState{}
  298. }
  299. }
  300. func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
  301. portsToEndpoints := buildPortsToEndpointsMap(endpoints)
  302. lb.lock.Lock()
  303. defer lb.lock.Unlock()
  304. for portname := range portsToEndpoints {
  305. svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
  306. lb.resetService(svcPort)
  307. }
  308. }
  309. func (lb *LoadBalancerRR) OnEndpointsSynced() {
  310. }
  311. // Tests whether two slices are equivalent. This sorts both slices in-place.
  312. func slicesEquiv(lhs, rhs []string) bool {
  313. if len(lhs) != len(rhs) {
  314. return false
  315. }
  316. if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
  317. return true
  318. }
  319. return false
  320. }
  321. func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortName) {
  322. lb.lock.Lock()
  323. defer lb.lock.Unlock()
  324. state, exists := lb.services[svcPort]
  325. if !exists {
  326. return
  327. }
  328. for ip, affinity := range state.affinity.affinityMap {
  329. if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds {
  330. klog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort)
  331. delete(state.affinity.affinityMap, ip)
  332. }
  333. }
  334. }