proxier.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304
  1. // +build windows
  2. /*
  3. Copyright 2017 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package winkernel
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net"
  19. "os"
  20. "reflect"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/Microsoft/hcsshim"
  25. "github.com/Microsoft/hcsshim/hcn"
  26. "github.com/davecgh/go-spew/spew"
  27. "k8s.io/klog"
  28. "k8s.io/api/core/v1"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. genericfeatures "k8s.io/apiserver/pkg/features"
  33. utilfeature "k8s.io/apiserver/pkg/util/feature"
  34. "k8s.io/client-go/tools/record"
  35. apiservice "k8s.io/kubernetes/pkg/api/v1/service"
  36. "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  37. "k8s.io/kubernetes/pkg/proxy"
  38. "k8s.io/kubernetes/pkg/proxy/apis/config"
  39. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  40. "k8s.io/kubernetes/pkg/util/async"
  41. )
  42. // KernelCompatTester tests whether the required kernel capabilities are
  43. // present to run the windows kernel proxier.
  44. type KernelCompatTester interface {
  45. IsCompatible() error
  46. }
  47. // CanUseWinKernelProxier returns true if we should use the Kernel Proxier
  48. // instead of the "classic" userspace Proxier. This is determined by checking
  49. // the windows kernel version and for the existence of kernel features.
  50. func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
  51. // Check that the kernel supports what we need.
  52. if err := kcompat.IsCompatible(); err != nil {
  53. return false, err
  54. }
  55. return true, nil
  56. }
  57. type WindowsKernelCompatTester struct{}
  58. // IsCompatible returns true if winkernel can support this mode of proxy
  59. func (lkct WindowsKernelCompatTester) IsCompatible() error {
  60. _, err := hcsshim.HNSListPolicyListRequest()
  61. if err != nil {
  62. return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
  63. }
  64. return nil
  65. }
  66. type externalIPInfo struct {
  67. ip string
  68. hnsID string
  69. }
  70. type loadBalancerIngressInfo struct {
  71. ip string
  72. hnsID string
  73. }
  74. type loadBalancerInfo struct {
  75. hnsID string
  76. }
  77. type loadBalancerFlags struct {
  78. isILB bool
  79. isDSR bool
  80. localRoutedVIP bool
  81. useMUX bool
  82. preserveDIP bool
  83. }
  84. // internal struct for string service information
  85. type serviceInfo struct {
  86. clusterIP net.IP
  87. port int
  88. protocol v1.Protocol
  89. nodePort int
  90. targetPort int
  91. loadBalancerStatus v1.LoadBalancerStatus
  92. sessionAffinityType v1.ServiceAffinity
  93. stickyMaxAgeSeconds int
  94. externalIPs []*externalIPInfo
  95. loadBalancerIngressIPs []*loadBalancerIngressInfo
  96. loadBalancerSourceRanges []string
  97. onlyNodeLocalEndpoints bool
  98. healthCheckNodePort int
  99. hnsID string
  100. nodePorthnsID string
  101. policyApplied bool
  102. remoteEndpoint *endpointsInfo
  103. hns HostNetworkService
  104. preserveDIP bool
  105. }
  106. type hnsNetworkInfo struct {
  107. name string
  108. id string
  109. networkType string
  110. remoteSubnets []*remoteSubnetInfo
  111. }
  112. type remoteSubnetInfo struct {
  113. destinationPrefix string
  114. isolationID uint16
  115. providerAddress string
  116. drMacAddress string
  117. }
  118. func Log(v interface{}, message string, level klog.Level) {
  119. klog.V(level).Infof("%s, %s", message, spew.Sdump(v))
  120. }
  121. func LogJson(v interface{}, message string, level klog.Level) {
  122. jsonString, err := json.Marshal(v)
  123. if err == nil {
  124. klog.V(level).Infof("%s, %s", message, string(jsonString))
  125. }
  126. }
  127. // internal struct for endpoints information
  128. type endpointsInfo struct {
  129. ip string
  130. port uint16
  131. isLocal bool
  132. macAddress string
  133. hnsID string
  134. refCount uint16
  135. providerAddress string
  136. hns HostNetworkService
  137. }
  138. //Uses mac prefix and IPv4 address to return a mac address
  139. //This ensures mac addresses are unique for proper load balancing
  140. //Does not support IPv6 and returns a dummy mac
  141. func conjureMac(macPrefix string, ip net.IP) string {
  142. if ip4 := ip.To4(); ip4 != nil {
  143. a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
  144. return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
  145. }
  146. return "02-11-22-33-44-55"
  147. }
  148. func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkService) *endpointsInfo {
  149. info := &endpointsInfo{
  150. ip: ip,
  151. port: port,
  152. isLocal: isLocal,
  153. macAddress: conjureMac("02-11", net.ParseIP(ip)),
  154. refCount: 0,
  155. hnsID: "",
  156. hns: hns,
  157. }
  158. return info
  159. }
  160. func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) {
  161. hnsEndpoint := &endpointsInfo{
  162. ip: ip,
  163. isLocal: true,
  164. macAddress: mac,
  165. providerAddress: providerAddress,
  166. }
  167. ep, err := hns.createEndpoint(hnsEndpoint, network)
  168. return ep, err
  169. }
  170. func (ep *endpointsInfo) Cleanup() {
  171. Log(ep, "Endpoint Cleanup", 3)
  172. ep.refCount--
  173. // Remove the remote hns endpoint, if no service is referring it
  174. // Never delete a Local Endpoint. Local Endpoints are already created by other entities.
  175. // Remove only remote endpoints created by this service
  176. if ep.refCount <= 0 && !ep.isLocal {
  177. klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
  178. err := ep.hns.deleteEndpoint(ep.hnsID)
  179. if err == nil {
  180. ep.hnsID = ""
  181. } else {
  182. klog.Errorf("Endpoint deletion failed for %v: %v", ep.ip, err)
  183. }
  184. }
  185. }
  186. // returns a new serviceInfo struct
  187. func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo {
  188. onlyNodeLocalEndpoints := false
  189. if apiservice.RequestsOnlyLocalTraffic(service) {
  190. onlyNodeLocalEndpoints = true
  191. }
  192. // set default session sticky max age 180min=10800s
  193. stickyMaxAgeSeconds := 10800
  194. if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil {
  195. stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
  196. }
  197. klog.Infof("Service %q preserve-destination: %v", svcPortName.NamespacedName.String(), service.Annotations["preserve-destination"])
  198. preserveDIP := service.Annotations["preserve-destination"] == "true"
  199. err := hcn.DSRSupported()
  200. if err != nil {
  201. preserveDIP = false
  202. }
  203. info := &serviceInfo{
  204. clusterIP: net.ParseIP(service.Spec.ClusterIP),
  205. port: int(port.Port),
  206. protocol: port.Protocol,
  207. nodePort: int(port.NodePort),
  208. // targetPort is zero if it is specified as a name in port.TargetPort.
  209. // Its real value would be got later from endpoints.
  210. targetPort: port.TargetPort.IntValue(),
  211. // Deep-copy in case the service instance changes
  212. loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(),
  213. sessionAffinityType: service.Spec.SessionAffinity,
  214. stickyMaxAgeSeconds: stickyMaxAgeSeconds,
  215. loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
  216. onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
  217. hns: hns,
  218. preserveDIP: preserveDIP,
  219. }
  220. copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
  221. for _, eip := range service.Spec.ExternalIPs {
  222. info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
  223. }
  224. for _, ingress := range service.Status.LoadBalancer.Ingress {
  225. info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
  226. }
  227. if apiservice.NeedsHealthCheck(service) {
  228. p := service.Spec.HealthCheckNodePort
  229. if p == 0 {
  230. klog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
  231. } else {
  232. info.healthCheckNodePort = int(p)
  233. }
  234. }
  235. return info
  236. }
  237. type endpointsChange struct {
  238. previous proxyEndpointsMap
  239. current proxyEndpointsMap
  240. }
  241. type endpointsChangeMap struct {
  242. lock sync.Mutex
  243. hostname string
  244. items map[types.NamespacedName]*endpointsChange
  245. }
  246. type serviceChange struct {
  247. previous proxyServiceMap
  248. current proxyServiceMap
  249. }
  250. type serviceChangeMap struct {
  251. lock sync.Mutex
  252. items map[types.NamespacedName]*serviceChange
  253. }
  254. type updateEndpointMapResult struct {
  255. hcEndpoints map[types.NamespacedName]int
  256. staleEndpoints map[endpointServicePair]bool
  257. staleServiceNames map[proxy.ServicePortName]bool
  258. }
  259. type updateServiceMapResult struct {
  260. hcServices map[types.NamespacedName]uint16
  261. staleServices sets.String
  262. }
  263. type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
  264. type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
  265. func newEndpointsChangeMap(hostname string) endpointsChangeMap {
  266. return endpointsChangeMap{
  267. hostname: hostname,
  268. items: make(map[types.NamespacedName]*endpointsChange),
  269. }
  270. }
  271. func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints, hns HostNetworkService) bool {
  272. ecm.lock.Lock()
  273. defer ecm.lock.Unlock()
  274. change, exists := ecm.items[*namespacedName]
  275. if !exists {
  276. change = &endpointsChange{}
  277. change.previous = endpointsToEndpointsMap(previous, ecm.hostname, hns)
  278. ecm.items[*namespacedName] = change
  279. }
  280. change.current = endpointsToEndpointsMap(current, ecm.hostname, hns)
  281. if reflect.DeepEqual(change.previous, change.current) {
  282. delete(ecm.items, *namespacedName)
  283. }
  284. return len(ecm.items) > 0
  285. }
  286. func newServiceChangeMap() serviceChangeMap {
  287. return serviceChangeMap{
  288. items: make(map[types.NamespacedName]*serviceChange),
  289. }
  290. }
  291. func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service, hns HostNetworkService) bool {
  292. scm.lock.Lock()
  293. defer scm.lock.Unlock()
  294. change, exists := scm.items[*namespacedName]
  295. if !exists {
  296. // Service is Added
  297. change = &serviceChange{}
  298. change.previous = serviceToServiceMap(previous, hns)
  299. scm.items[*namespacedName] = change
  300. }
  301. change.current = serviceToServiceMap(current, hns)
  302. if reflect.DeepEqual(change.previous, change.current) {
  303. delete(scm.items, *namespacedName)
  304. }
  305. return len(scm.items) > 0
  306. }
  307. func (sm *proxyServiceMap) merge(other proxyServiceMap, curEndpoints proxyEndpointsMap) sets.String {
  308. existingPorts := sets.NewString()
  309. for svcPortName, info := range other {
  310. existingPorts.Insert(svcPortName.Port)
  311. svcInfo, exists := (*sm)[svcPortName]
  312. if !exists {
  313. klog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  314. } else {
  315. klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  316. svcInfo.cleanupAllPolicies(curEndpoints[svcPortName])
  317. delete(*sm, svcPortName)
  318. }
  319. (*sm)[svcPortName] = info
  320. }
  321. return existingPorts
  322. }
  323. func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String, curEndpoints proxyEndpointsMap) {
  324. for svcPortName := range other {
  325. if existingPorts.Has(svcPortName.Port) {
  326. continue
  327. }
  328. info, exists := (*sm)[svcPortName]
  329. if exists {
  330. klog.V(1).Infof("Removing service port %q", svcPortName)
  331. if info.protocol == v1.ProtocolUDP {
  332. staleServices.Insert(info.clusterIP.String())
  333. }
  334. info.cleanupAllPolicies(curEndpoints[svcPortName])
  335. delete(*sm, svcPortName)
  336. } else {
  337. klog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
  338. }
  339. }
  340. }
  341. func (em proxyEndpointsMap) merge(other proxyEndpointsMap, curServices proxyServiceMap) {
  342. // Endpoint Update/Add
  343. for svcPortName := range other {
  344. epInfos, exists := em[svcPortName]
  345. if exists {
  346. //
  347. info, exists := curServices[svcPortName]
  348. klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  349. if exists {
  350. klog.V(2).Infof("Endpoints are modified. Service [%v] is stale", svcPortName)
  351. info.cleanupAllPolicies(epInfos)
  352. } else {
  353. // If no service exists, just cleanup the remote endpoints
  354. klog.V(2).Infof("Endpoints are orphaned. Cleaning up")
  355. // Cleanup Endpoints references
  356. for _, ep := range epInfos {
  357. ep.Cleanup()
  358. }
  359. }
  360. delete(em, svcPortName)
  361. }
  362. em[svcPortName] = other[svcPortName]
  363. }
  364. }
  365. func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxyServiceMap) {
  366. // Endpoint Update/Removal
  367. for svcPortName := range other {
  368. info, exists := curServices[svcPortName]
  369. if exists {
  370. klog.V(2).Infof("Service [%v] is stale", info)
  371. info.cleanupAllPolicies(em[svcPortName])
  372. } else {
  373. // If no service exists, just cleanup the remote endpoints
  374. klog.V(2).Infof("Endpoints are orphaned. Cleaning up")
  375. // Cleanup Endpoints references
  376. epInfos, exists := em[svcPortName]
  377. if exists {
  378. for _, ep := range epInfos {
  379. ep.Cleanup()
  380. }
  381. }
  382. }
  383. delete(em, svcPortName)
  384. }
  385. }
  386. // Proxier is an hns based proxy for connections between a localhost:lport
  387. // and services that provide the actual backends.
  388. type Proxier struct {
  389. // endpointsChanges and serviceChanges contains all changes to endpoints and
  390. // services that happened since policies were synced. For a single object,
  391. // changes are accumulated, i.e. previous is state from before all of them,
  392. // current is state after applying all of those.
  393. endpointsChanges endpointsChangeMap
  394. serviceChanges serviceChangeMap
  395. mu sync.Mutex // protects the following fields
  396. serviceMap proxyServiceMap
  397. endpointsMap proxyEndpointsMap
  398. portsMap map[localPort]closeable
  399. // endpointsSynced and servicesSynced are set to true when corresponding
  400. // objects are synced after startup. This is used to avoid updating hns policies
  401. // with some partial data after kube-proxy restart.
  402. endpointsSynced bool
  403. servicesSynced bool
  404. initialized int32
  405. syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
  406. // These are effectively const and do not need the mutex to be held.
  407. masqueradeAll bool
  408. masqueradeMark string
  409. clusterCIDR string
  410. hostname string
  411. nodeIP net.IP
  412. recorder record.EventRecorder
  413. healthChecker healthcheck.Server
  414. healthzServer healthcheck.HealthzUpdater
  415. // Since converting probabilities (floats) to strings is expensive
  416. // and we are using only probabilities in the format of 1/n, we are
  417. // precomputing some number of those and cache for future reuse.
  418. precomputedProbabilities []string
  419. hns HostNetworkService
  420. network hnsNetworkInfo
  421. sourceVip string
  422. hostMac string
  423. isDSR bool
  424. }
  425. type localPort struct {
  426. desc string
  427. ip string
  428. port int
  429. protocol string
  430. }
  431. func (lp *localPort) String() string {
  432. return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
  433. }
  434. func Enum(p v1.Protocol) uint16 {
  435. if p == v1.ProtocolTCP {
  436. return 6
  437. }
  438. if p == v1.ProtocolUDP {
  439. return 17
  440. }
  441. if p == v1.ProtocolSCTP {
  442. return 132
  443. }
  444. return 0
  445. }
  446. type closeable interface {
  447. Close() error
  448. }
  449. // Proxier implements ProxyProvider
  450. var _ proxy.ProxyProvider = &Proxier{}
  451. // NewProxier returns a new Proxier
  452. func NewProxier(
  453. syncPeriod time.Duration,
  454. minSyncPeriod time.Duration,
  455. masqueradeAll bool,
  456. masqueradeBit int,
  457. clusterCIDR string,
  458. hostname string,
  459. nodeIP net.IP,
  460. recorder record.EventRecorder,
  461. healthzServer healthcheck.HealthzUpdater,
  462. config config.KubeProxyWinkernelConfiguration,
  463. ) (*Proxier, error) {
  464. masqueradeValue := 1 << uint(masqueradeBit)
  465. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  466. if nodeIP == nil {
  467. klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
  468. nodeIP = net.ParseIP("127.0.0.1")
  469. }
  470. if len(clusterCIDR) == 0 {
  471. klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
  472. }
  473. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
  474. var hns HostNetworkService
  475. hns = hnsV1{}
  476. supportedFeatures := hcn.GetSupportedFeatures()
  477. if supportedFeatures.Api.V2 {
  478. hns = hnsV2{}
  479. }
  480. hnsNetworkName := config.NetworkName
  481. if len(hnsNetworkName) == 0 {
  482. klog.V(3).Infof("network-name flag not set. Checking environment variable")
  483. hnsNetworkName = os.Getenv("KUBE_NETWORK")
  484. if len(hnsNetworkName) == 0 {
  485. return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
  486. }
  487. }
  488. klog.V(3).Infof("Cleaning up old HNS policy lists")
  489. deleteAllHnsLoadBalancerPolicy()
  490. // Get HNS network information
  491. hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
  492. for err != nil {
  493. klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
  494. time.Sleep(1 * time.Second)
  495. hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
  496. }
  497. // Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated
  498. // Sleep and update the network to include new information
  499. if hnsNetworkInfo.networkType == "Overlay" {
  500. time.Sleep(10 * time.Second)
  501. hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
  502. if err != nil {
  503. return nil, fmt.Errorf("Could not find HNS network %s", hnsNetworkName)
  504. }
  505. }
  506. klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetworkInfo)
  507. isDSR := config.EnableDSR
  508. if isDSR && !utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WinDSR) {
  509. return nil, fmt.Errorf("WinDSR feature gate not enabled")
  510. }
  511. err = hcn.DSRSupported()
  512. if isDSR && err != nil {
  513. return nil, err
  514. }
  515. var sourceVip string
  516. var hostMac string
  517. if hnsNetworkInfo.networkType == "Overlay" {
  518. if !utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WinOverlay) {
  519. return nil, fmt.Errorf("WinOverlay feature gate not enabled")
  520. }
  521. err = hcn.RemoteSubnetSupported()
  522. if err != nil {
  523. return nil, err
  524. }
  525. sourceVip = config.SourceVip
  526. if len(sourceVip) == 0 {
  527. return nil, fmt.Errorf("source-vip flag not set")
  528. }
  529. interfaces, _ := net.Interfaces() //TODO create interfaces
  530. for _, inter := range interfaces {
  531. addresses, _ := inter.Addrs()
  532. for _, addr := range addresses {
  533. addrIP, _, _ := net.ParseCIDR(addr.String())
  534. if addrIP.String() == nodeIP.String() {
  535. klog.V(2).Infof("Host MAC address is %s", inter.HardwareAddr.String())
  536. hostMac = inter.HardwareAddr.String()
  537. }
  538. }
  539. }
  540. if len(hostMac) == 0 {
  541. return nil, fmt.Errorf("Could not find host mac address for %s", nodeIP)
  542. }
  543. }
  544. proxier := &Proxier{
  545. portsMap: make(map[localPort]closeable),
  546. serviceMap: make(proxyServiceMap),
  547. serviceChanges: newServiceChangeMap(),
  548. endpointsMap: make(proxyEndpointsMap),
  549. endpointsChanges: newEndpointsChangeMap(hostname),
  550. masqueradeAll: masqueradeAll,
  551. masqueradeMark: masqueradeMark,
  552. clusterCIDR: clusterCIDR,
  553. hostname: hostname,
  554. nodeIP: nodeIP,
  555. recorder: recorder,
  556. healthChecker: healthChecker,
  557. healthzServer: healthzServer,
  558. hns: hns,
  559. network: *hnsNetworkInfo,
  560. sourceVip: sourceVip,
  561. hostMac: hostMac,
  562. isDSR: isDSR,
  563. }
  564. burstSyncs := 2
  565. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  566. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  567. return proxier, nil
  568. }
  569. // CleanupLeftovers removes all hns rules created by the Proxier
  570. // It returns true if an error was encountered. Errors are logged.
  571. func CleanupLeftovers() (encounteredError bool) {
  572. // Delete all Hns Load Balancer Policies
  573. deleteAllHnsLoadBalancerPolicy()
  574. // TODO
  575. // Delete all Hns Remote endpoints
  576. return encounteredError
  577. }
  578. func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []*endpointsInfo) {
  579. Log(svcInfo, "Service Cleanup", 3)
  580. // Skip the svcInfo.policyApplied check to remove all the policies
  581. svcInfo.deleteAllHnsLoadBalancerPolicy()
  582. // Cleanup Endpoints references
  583. for _, ep := range endpoints {
  584. ep.Cleanup()
  585. }
  586. if svcInfo.remoteEndpoint != nil {
  587. svcInfo.remoteEndpoint.Cleanup()
  588. }
  589. svcInfo.policyApplied = false
  590. }
  591. func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
  592. // Remove the Hns Policy corresponding to this service
  593. hns := svcInfo.hns
  594. hns.deleteLoadBalancer(svcInfo.hnsID)
  595. svcInfo.hnsID = ""
  596. hns.deleteLoadBalancer(svcInfo.nodePorthnsID)
  597. svcInfo.nodePorthnsID = ""
  598. for _, externalIP := range svcInfo.externalIPs {
  599. hns.deleteLoadBalancer(externalIP.hnsID)
  600. externalIP.hnsID = ""
  601. }
  602. for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
  603. hns.deleteLoadBalancer(lbIngressIP.hnsID)
  604. lbIngressIP.hnsID = ""
  605. }
  606. }
  607. func deleteAllHnsLoadBalancerPolicy() {
  608. plists, err := hcsshim.HNSListPolicyListRequest()
  609. if err != nil {
  610. return
  611. }
  612. for _, plist := range plists {
  613. LogJson(plist, "Remove Policy", 3)
  614. _, err = plist.Delete()
  615. if err != nil {
  616. klog.Errorf("%v", err)
  617. }
  618. }
  619. }
  620. func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
  621. hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
  622. if err != nil {
  623. klog.Errorf("%v", err)
  624. return nil, err
  625. }
  626. return &hnsNetworkInfo{
  627. id: hnsnetwork.Id,
  628. name: hnsnetwork.Name,
  629. networkType: hnsnetwork.Type,
  630. }, nil
  631. }
  632. // Sync is called to synchronize the proxier state to hns as soon as possible.
  633. func (proxier *Proxier) Sync() {
  634. proxier.syncRunner.Run()
  635. }
  636. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  637. func (proxier *Proxier) SyncLoop() {
  638. // Update healthz timestamp at beginning in case Sync() never succeeds.
  639. if proxier.healthzServer != nil {
  640. proxier.healthzServer.UpdateTimestamp()
  641. }
  642. proxier.syncRunner.Loop(wait.NeverStop)
  643. }
  644. func (proxier *Proxier) setInitialized(value bool) {
  645. var initialized int32
  646. if value {
  647. initialized = 1
  648. }
  649. atomic.StoreInt32(&proxier.initialized, initialized)
  650. }
  651. func (proxier *Proxier) isInitialized() bool {
  652. return atomic.LoadInt32(&proxier.initialized) > 0
  653. }
  654. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  655. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  656. if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() {
  657. proxier.syncRunner.Run()
  658. }
  659. }
  660. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  661. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  662. if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() {
  663. proxier.syncRunner.Run()
  664. }
  665. }
  666. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  667. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  668. if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() {
  669. proxier.syncRunner.Run()
  670. }
  671. }
  672. func (proxier *Proxier) OnServiceSynced() {
  673. proxier.mu.Lock()
  674. proxier.servicesSynced = true
  675. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  676. proxier.mu.Unlock()
  677. // Sync unconditionally - this is called once per lifetime.
  678. proxier.syncProxyRules()
  679. }
  680. func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
  681. // if ClusterIP is "None" or empty, skip proxying
  682. if !helper.IsServiceIPSet(service) {
  683. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  684. return true
  685. }
  686. // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
  687. if service.Spec.Type == v1.ServiceTypeExternalName {
  688. klog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
  689. return true
  690. }
  691. return false
  692. }
  693. // <serviceMap> is updated by this function (based on the given changes).
  694. // <changes> map is cleared after applying them.
  695. func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) {
  696. result.staleServices = sets.NewString()
  697. serviceMap := proxier.serviceMap
  698. changes := &proxier.serviceChanges
  699. func() {
  700. changes.lock.Lock()
  701. defer changes.lock.Unlock()
  702. for _, change := range changes.items {
  703. existingPorts := serviceMap.merge(change.current, proxier.endpointsMap)
  704. serviceMap.unmerge(change.previous, existingPorts, result.staleServices, proxier.endpointsMap)
  705. }
  706. changes.items = make(map[types.NamespacedName]*serviceChange)
  707. }()
  708. // TODO: If this will appear to be computationally expensive, consider
  709. // computing this incrementally similarly to serviceMap.
  710. result.hcServices = make(map[types.NamespacedName]uint16)
  711. for svcPortName, info := range serviceMap {
  712. if info.healthCheckNodePort != 0 {
  713. result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
  714. }
  715. }
  716. return result
  717. }
  718. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  719. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  720. if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() {
  721. proxier.syncRunner.Run()
  722. }
  723. }
  724. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  725. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  726. if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() {
  727. proxier.syncRunner.Run()
  728. }
  729. }
  730. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  731. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  732. if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() {
  733. proxier.syncRunner.Run()
  734. }
  735. }
  736. func (proxier *Proxier) OnEndpointsSynced() {
  737. proxier.mu.Lock()
  738. proxier.endpointsSynced = true
  739. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  740. proxier.mu.Unlock()
  741. // Sync unconditionally - this is called once per lifetime.
  742. proxier.syncProxyRules()
  743. }
  744. func (proxier *Proxier) cleanupAllPolicies() {
  745. for svcName, svcInfo := range proxier.serviceMap {
  746. svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
  747. }
  748. }
  749. func isNetworkNotFoundError(err error) bool {
  750. if err == nil {
  751. return false
  752. }
  753. if _, ok := err.(hcn.NetworkNotFoundError); ok {
  754. return true
  755. }
  756. if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
  757. return true
  758. }
  759. return false
  760. }
  761. // <endpointsMap> is updated by this function (based on the given changes).
  762. // <changes> map is cleared after applying them.
  763. func (proxier *Proxier) updateEndpointsMap() (result updateEndpointMapResult) {
  764. result.staleEndpoints = make(map[endpointServicePair]bool)
  765. result.staleServiceNames = make(map[proxy.ServicePortName]bool)
  766. endpointsMap := proxier.endpointsMap
  767. changes := &proxier.endpointsChanges
  768. func() {
  769. changes.lock.Lock()
  770. defer changes.lock.Unlock()
  771. for _, change := range changes.items {
  772. endpointsMap.unmerge(change.previous, proxier.serviceMap)
  773. endpointsMap.merge(change.current, proxier.serviceMap)
  774. }
  775. changes.items = make(map[types.NamespacedName]*endpointsChange)
  776. }()
  777. // TODO: If this will appear to be computationally expensive, consider
  778. // computing this incrementally similarly to endpointsMap.
  779. result.hcEndpoints = make(map[types.NamespacedName]int)
  780. localIPs := getLocalIPs(endpointsMap)
  781. for nsn, ips := range localIPs {
  782. result.hcEndpoints[nsn] = len(ips)
  783. }
  784. return result
  785. }
  786. func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
  787. localIPs := make(map[types.NamespacedName]sets.String)
  788. for svcPortName := range endpointsMap {
  789. for _, ep := range endpointsMap[svcPortName] {
  790. if ep.isLocal {
  791. nsn := svcPortName.NamespacedName
  792. if localIPs[nsn] == nil {
  793. localIPs[nsn] = sets.NewString()
  794. }
  795. localIPs[nsn].Insert(ep.ip) // just the IP part
  796. }
  797. }
  798. }
  799. return localIPs
  800. }
  801. // Translates single Endpoints object to proxyEndpointsMap.
  802. // This function is used for incremental updated of endpointsMap.
  803. //
  804. // NOTE: endpoints object should NOT be modified.
  805. func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string, hns HostNetworkService) proxyEndpointsMap {
  806. if endpoints == nil {
  807. return nil
  808. }
  809. endpointsMap := make(proxyEndpointsMap)
  810. // We need to build a map of portname -> all ip:ports for that
  811. // portname. Explode Endpoints.Subsets[*] into this structure.
  812. for i := range endpoints.Subsets {
  813. ss := &endpoints.Subsets[i]
  814. for i := range ss.Ports {
  815. port := &ss.Ports[i]
  816. if port.Port == 0 {
  817. klog.Warningf("Ignoring invalid endpoint port %s", port.Name)
  818. continue
  819. }
  820. svcPortName := proxy.ServicePortName{
  821. NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
  822. Port: port.Name,
  823. }
  824. for i := range ss.Addresses {
  825. addr := &ss.Addresses[i]
  826. if addr.IP == "" {
  827. klog.Warningf("Ignoring invalid endpoint port %s with empty host", port.Name)
  828. continue
  829. }
  830. isLocal := addr.NodeName != nil && *addr.NodeName == hostname
  831. epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal, hns)
  832. endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
  833. }
  834. if klog.V(3) {
  835. newEPList := []*endpointsInfo{}
  836. for _, ep := range endpointsMap[svcPortName] {
  837. newEPList = append(newEPList, ep)
  838. }
  839. klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
  840. }
  841. }
  842. }
  843. return endpointsMap
  844. }
  845. // Translates single Service object to proxyServiceMap.
  846. //
  847. // NOTE: service object should NOT be modified.
  848. func serviceToServiceMap(service *v1.Service, hns HostNetworkService) proxyServiceMap {
  849. if service == nil {
  850. return nil
  851. }
  852. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  853. if shouldSkipService(svcName, service) {
  854. return nil
  855. }
  856. serviceMap := make(proxyServiceMap)
  857. for i := range service.Spec.Ports {
  858. servicePort := &service.Spec.Ports[i]
  859. svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  860. serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service, hns)
  861. }
  862. return serviceMap
  863. }
  864. // This is where all of the hns save/restore calls happen.
  865. // assumes proxier.mu is held
  866. func (proxier *Proxier) syncProxyRules() {
  867. proxier.mu.Lock()
  868. defer proxier.mu.Unlock()
  869. start := time.Now()
  870. defer func() {
  871. SyncProxyRulesLatency.Observe(sinceInSeconds(start))
  872. DeprecatedSyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
  873. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  874. }()
  875. // don't sync rules till we've received services and endpoints
  876. if !proxier.endpointsSynced || !proxier.servicesSynced {
  877. klog.V(2).Info("Not syncing hns until Services and Endpoints have been received from master")
  878. return
  879. }
  880. hnsNetworkName := proxier.network.name
  881. hns := proxier.hns
  882. prevNetworkID := proxier.network.id
  883. updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
  884. if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
  885. klog.Infof("The HNS network %s is not present or has changed since the last sync. Please check the CNI deployment", hnsNetworkName)
  886. proxier.cleanupAllPolicies()
  887. if updatedNetwork != nil {
  888. proxier.network = *updatedNetwork
  889. }
  890. return
  891. }
  892. // We assume that if this was called, we really want to sync them,
  893. // even if nothing changed in the meantime. In other words, callers are
  894. // responsible for detecting no-op changes and not calling this function.
  895. serviceUpdateResult := proxier.updateServiceMap()
  896. endpointUpdateResult := proxier.updateEndpointsMap()
  897. staleServices := serviceUpdateResult.staleServices
  898. // merge stale services gathered from updateEndpointsMap
  899. for svcPortName := range endpointUpdateResult.staleServiceNames {
  900. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == v1.ProtocolUDP {
  901. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
  902. staleServices.Insert(svcInfo.clusterIP.String())
  903. }
  904. }
  905. if proxier.network.networkType == "Overlay" {
  906. existingSourceVip, err := hns.getEndpointByIpAddress(proxier.sourceVip, hnsNetworkName)
  907. if existingSourceVip == nil {
  908. _, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
  909. }
  910. if err != nil {
  911. klog.Errorf("Source Vip endpoint creation failed: %v", err)
  912. return
  913. }
  914. }
  915. klog.V(3).Infof("Syncing Policies")
  916. // Program HNS by adding corresponding policies for each service.
  917. for svcName, svcInfo := range proxier.serviceMap {
  918. if svcInfo.policyApplied {
  919. klog.V(4).Infof("Policy already applied for %s", spew.Sdump(svcInfo))
  920. continue
  921. }
  922. if proxier.network.networkType == "Overlay" {
  923. serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.clusterIP.String(), hnsNetworkName)
  924. if serviceVipEndpoint == nil {
  925. klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.clusterIP.String())
  926. hnsEndpoint := &endpointsInfo{
  927. ip: svcInfo.clusterIP.String(),
  928. isLocal: false,
  929. macAddress: proxier.hostMac,
  930. providerAddress: proxier.nodeIP.String(),
  931. }
  932. newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  933. if err != nil {
  934. klog.Errorf("Remote endpoint creation failed for service VIP: %v", err)
  935. continue
  936. }
  937. newHnsEndpoint.refCount++
  938. svcInfo.remoteEndpoint = newHnsEndpoint
  939. }
  940. }
  941. var hnsEndpoints []endpointsInfo
  942. var hnsLocalEndpoints []endpointsInfo
  943. klog.V(4).Infof("====Applying Policy for %s====", svcName)
  944. // Create Remote endpoints for every endpoint, corresponding to the service
  945. containsPublicIP := false
  946. for _, ep := range proxier.endpointsMap[svcName] {
  947. var newHnsEndpoint *endpointsInfo
  948. hnsNetworkName := proxier.network.name
  949. var err error
  950. // targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
  951. // Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
  952. // TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
  953. if svcInfo.targetPort == 0 {
  954. svcInfo.targetPort = int(ep.port)
  955. }
  956. if len(ep.hnsID) > 0 {
  957. newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
  958. }
  959. if newHnsEndpoint == nil {
  960. // First check if an endpoint resource exists for this IP, on the current host
  961. // A Local endpoint could exist here already
  962. // A remote endpoint was already created and proxy was restarted
  963. newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.ip, hnsNetworkName)
  964. }
  965. if newHnsEndpoint == nil {
  966. if ep.isLocal {
  967. klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.ip, err, hnsNetworkName)
  968. continue
  969. }
  970. if proxier.network.networkType == "Overlay" {
  971. klog.Infof("Updating network %v to check for new remote subnet policies", proxier.network.name)
  972. networkName := proxier.network.name
  973. updatedNetwork, err := hns.getNetworkByName(networkName)
  974. if err != nil {
  975. klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
  976. proxier.cleanupAllPolicies()
  977. return
  978. }
  979. proxier.network = *updatedNetwork
  980. var providerAddress string
  981. for _, rs := range proxier.network.remoteSubnets {
  982. _, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
  983. if err != nil {
  984. klog.Fatalf("%v", err)
  985. }
  986. if ipNet.Contains(net.ParseIP(ep.ip)) {
  987. providerAddress = rs.providerAddress
  988. }
  989. if ep.ip == rs.providerAddress {
  990. providerAddress = rs.providerAddress
  991. }
  992. }
  993. if len(providerAddress) == 0 {
  994. klog.Errorf("Could not find provider address for %s", ep.ip)
  995. providerAddress = proxier.nodeIP.String()
  996. containsPublicIP = true
  997. }
  998. hnsEndpoint := &endpointsInfo{
  999. ip: ep.ip,
  1000. isLocal: false,
  1001. macAddress: conjureMac("02-11", net.ParseIP(ep.ip)),
  1002. providerAddress: providerAddress,
  1003. }
  1004. newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1005. if err != nil {
  1006. klog.Errorf("Remote endpoint creation failed: %v, %s", err, spew.Sdump(hnsEndpoint))
  1007. continue
  1008. }
  1009. } else {
  1010. hnsEndpoint := &endpointsInfo{
  1011. ip: ep.ip,
  1012. isLocal: false,
  1013. macAddress: ep.macAddress,
  1014. }
  1015. newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1016. if err != nil {
  1017. klog.Errorf("Remote endpoint creation failed: %v", err)
  1018. continue
  1019. }
  1020. }
  1021. }
  1022. // Save the hnsId for reference
  1023. LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
  1024. hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
  1025. if newHnsEndpoint.isLocal {
  1026. hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
  1027. }
  1028. ep.hnsID = newHnsEndpoint.hnsID
  1029. ep.refCount++
  1030. Log(ep, "Endpoint resource found", 3)
  1031. }
  1032. klog.V(3).Infof("Associated endpoints [%s] for service [%s]", spew.Sdump(hnsEndpoints), svcName)
  1033. if len(svcInfo.hnsID) > 0 {
  1034. // This should not happen
  1035. klog.Warningf("Load Balancer already exists %s -- Debug ", svcInfo.hnsID)
  1036. }
  1037. if len(hnsEndpoints) == 0 {
  1038. klog.Errorf("Endpoint information not available for service %s. Not applying any policy", svcName)
  1039. continue
  1040. }
  1041. klog.V(4).Infof("Trying to Apply Policies for service %s", spew.Sdump(svcInfo))
  1042. var hnsLoadBalancer *loadBalancerInfo
  1043. var sourceVip = proxier.sourceVip
  1044. if containsPublicIP {
  1045. sourceVip = proxier.nodeIP.String()
  1046. }
  1047. hnsLoadBalancer, err := hns.getLoadBalancer(
  1048. hnsEndpoints,
  1049. loadBalancerFlags{isDSR: proxier.isDSR},
  1050. sourceVip,
  1051. svcInfo.clusterIP.String(),
  1052. Enum(svcInfo.protocol),
  1053. uint16(svcInfo.targetPort),
  1054. uint16(svcInfo.port),
  1055. )
  1056. if err != nil {
  1057. klog.Errorf("Policy creation failed: %v", err)
  1058. continue
  1059. }
  1060. svcInfo.hnsID = hnsLoadBalancer.hnsID
  1061. klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
  1062. // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
  1063. if svcInfo.nodePort > 0 {
  1064. hnsLoadBalancer, err := hns.getLoadBalancer(
  1065. hnsEndpoints,
  1066. loadBalancerFlags{localRoutedVIP: true},
  1067. sourceVip,
  1068. "",
  1069. Enum(svcInfo.protocol),
  1070. uint16(svcInfo.targetPort),
  1071. uint16(svcInfo.nodePort),
  1072. )
  1073. if err != nil {
  1074. klog.Errorf("Policy creation failed: %v", err)
  1075. continue
  1076. }
  1077. svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
  1078. klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
  1079. }
  1080. // Create a Load Balancer Policy for each external IP
  1081. for _, externalIP := range svcInfo.externalIPs {
  1082. // Try loading existing policies, if already available
  1083. hnsLoadBalancer, err = hns.getLoadBalancer(
  1084. hnsEndpoints,
  1085. loadBalancerFlags{},
  1086. sourceVip,
  1087. externalIP.ip,
  1088. Enum(svcInfo.protocol),
  1089. uint16(svcInfo.targetPort),
  1090. uint16(svcInfo.port),
  1091. )
  1092. if err != nil {
  1093. klog.Errorf("Policy creation failed: %v", err)
  1094. continue
  1095. }
  1096. externalIP.hnsID = hnsLoadBalancer.hnsID
  1097. klog.V(3).Infof("Hns LoadBalancer resource created for externalIP resources %v, Id[%s]", externalIP, hnsLoadBalancer.hnsID)
  1098. }
  1099. // Create a Load Balancer Policy for each loadbalancer ingress
  1100. for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
  1101. // Try loading existing policies, if already available
  1102. lbIngressEndpoints := hnsEndpoints
  1103. if svcInfo.preserveDIP {
  1104. lbIngressEndpoints = hnsLocalEndpoints
  1105. }
  1106. hnsLoadBalancer, err := hns.getLoadBalancer(
  1107. lbIngressEndpoints,
  1108. loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
  1109. sourceVip,
  1110. lbIngressIP.ip,
  1111. Enum(svcInfo.protocol),
  1112. uint16(svcInfo.targetPort),
  1113. uint16(svcInfo.port),
  1114. )
  1115. if err != nil {
  1116. klog.Errorf("Policy creation failed: %v", err)
  1117. continue
  1118. }
  1119. lbIngressIP.hnsID = hnsLoadBalancer.hnsID
  1120. klog.V(3).Infof("Hns LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIP)
  1121. }
  1122. svcInfo.policyApplied = true
  1123. Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
  1124. }
  1125. // Update healthz timestamp.
  1126. if proxier.healthzServer != nil {
  1127. proxier.healthzServer.UpdateTimestamp()
  1128. }
  1129. SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1130. // Update healthchecks. The endpoints list might include services that are
  1131. // not "OnlyLocal", but the services list will not, and the healthChecker
  1132. // will just drop those endpoints.
  1133. if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
  1134. klog.Errorf("Error syncing healthcheck services: %v", err)
  1135. }
  1136. if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
  1137. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  1138. }
  1139. // Finish housekeeping.
  1140. // TODO: these could be made more consistent.
  1141. for _, svcIP := range staleServices.UnsortedList() {
  1142. // TODO : Check if this is required to cleanup stale services here
  1143. klog.V(5).Infof("Pending delete stale service IP %s connections", svcIP)
  1144. }
  1145. }
  1146. type endpointServicePair struct {
  1147. endpoint string
  1148. servicePortName proxy.ServicePortName
  1149. }