proxier.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327
  1. // +build windows
  2. /*
  3. Copyright 2017 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package winkernel
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net"
  19. "os"
  20. "reflect"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/Microsoft/hcsshim"
  25. "github.com/Microsoft/hcsshim/hcn"
  26. "github.com/davecgh/go-spew/spew"
  27. "k8s.io/klog"
  28. "k8s.io/api/core/v1"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/intstr"
  31. "k8s.io/apimachinery/pkg/util/sets"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. genericfeatures "k8s.io/apiserver/pkg/features"
  34. utilfeature "k8s.io/apiserver/pkg/util/feature"
  35. "k8s.io/client-go/tools/record"
  36. apiservice "k8s.io/kubernetes/pkg/api/v1/service"
  37. "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  38. "k8s.io/kubernetes/pkg/proxy"
  39. "k8s.io/kubernetes/pkg/proxy/apis/config"
  40. proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
  41. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  42. "k8s.io/kubernetes/pkg/proxy/metrics"
  43. "k8s.io/kubernetes/pkg/util/async"
  44. )
  45. // KernelCompatTester tests whether the required kernel capabilities are
  46. // present to run the windows kernel proxier.
  47. type KernelCompatTester interface {
  48. IsCompatible() error
  49. }
  50. // CanUseWinKernelProxier returns true if we should use the Kernel Proxier
  51. // instead of the "classic" userspace Proxier. This is determined by checking
  52. // the windows kernel version and for the existence of kernel features.
  53. func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
  54. // Check that the kernel supports what we need.
  55. if err := kcompat.IsCompatible(); err != nil {
  56. return false, err
  57. }
  58. return true, nil
  59. }
  60. type WindowsKernelCompatTester struct{}
  61. // IsCompatible returns true if winkernel can support this mode of proxy
  62. func (lkct WindowsKernelCompatTester) IsCompatible() error {
  63. _, err := hcsshim.HNSListPolicyListRequest()
  64. if err != nil {
  65. return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
  66. }
  67. return nil
  68. }
  69. type externalIPInfo struct {
  70. ip string
  71. hnsID string
  72. }
  73. type loadBalancerIngressInfo struct {
  74. ip string
  75. hnsID string
  76. }
  77. type loadBalancerInfo struct {
  78. hnsID string
  79. }
  80. type loadBalancerFlags struct {
  81. isILB bool
  82. isDSR bool
  83. localRoutedVIP bool
  84. useMUX bool
  85. preserveDIP bool
  86. }
  87. // internal struct for string service information
  88. type serviceInfo struct {
  89. clusterIP net.IP
  90. port int
  91. protocol v1.Protocol
  92. nodePort int
  93. targetPort int
  94. loadBalancerStatus v1.LoadBalancerStatus
  95. sessionAffinityType v1.ServiceAffinity
  96. stickyMaxAgeSeconds int
  97. externalIPs []*externalIPInfo
  98. loadBalancerIngressIPs []*loadBalancerIngressInfo
  99. loadBalancerSourceRanges []string
  100. onlyNodeLocalEndpoints bool
  101. healthCheckNodePort int
  102. hnsID string
  103. nodePorthnsID string
  104. policyApplied bool
  105. remoteEndpoint *endpointsInfo
  106. hns HostNetworkService
  107. preserveDIP bool
  108. }
  109. type hnsNetworkInfo struct {
  110. name string
  111. id string
  112. networkType string
  113. remoteSubnets []*remoteSubnetInfo
  114. }
  115. type remoteSubnetInfo struct {
  116. destinationPrefix string
  117. isolationID uint16
  118. providerAddress string
  119. drMacAddress string
  120. }
  121. func Log(v interface{}, message string, level klog.Level) {
  122. klog.V(level).Infof("%s, %s", message, spew.Sdump(v))
  123. }
  124. func LogJson(v interface{}, message string, level klog.Level) {
  125. jsonString, err := json.Marshal(v)
  126. if err == nil {
  127. klog.V(level).Infof("%s, %s", message, string(jsonString))
  128. }
  129. }
  130. // internal struct for endpoints information
  131. type endpointsInfo struct {
  132. ip string
  133. port uint16
  134. isLocal bool
  135. macAddress string
  136. hnsID string
  137. refCount uint16
  138. providerAddress string
  139. hns HostNetworkService
  140. }
  141. //Uses mac prefix and IPv4 address to return a mac address
  142. //This ensures mac addresses are unique for proper load balancing
  143. //Does not support IPv6 and returns a dummy mac
  144. func conjureMac(macPrefix string, ip net.IP) string {
  145. if ip4 := ip.To4(); ip4 != nil {
  146. a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
  147. return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
  148. }
  149. return "02-11-22-33-44-55"
  150. }
  151. func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkService) *endpointsInfo {
  152. info := &endpointsInfo{
  153. ip: ip,
  154. port: port,
  155. isLocal: isLocal,
  156. macAddress: conjureMac("02-11", net.ParseIP(ip)),
  157. refCount: 0,
  158. hnsID: "",
  159. hns: hns,
  160. }
  161. return info
  162. }
  163. func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) {
  164. hnsEndpoint := &endpointsInfo{
  165. ip: ip,
  166. isLocal: true,
  167. macAddress: mac,
  168. providerAddress: providerAddress,
  169. }
  170. ep, err := hns.createEndpoint(hnsEndpoint, network)
  171. return ep, err
  172. }
  173. func (ep *endpointsInfo) Cleanup() {
  174. Log(ep, "Endpoint Cleanup", 3)
  175. ep.refCount--
  176. // Remove the remote hns endpoint, if no service is referring it
  177. // Never delete a Local Endpoint. Local Endpoints are already created by other entities.
  178. // Remove only remote endpoints created by this service
  179. if ep.refCount <= 0 && !ep.isLocal {
  180. klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
  181. err := ep.hns.deleteEndpoint(ep.hnsID)
  182. if err == nil {
  183. ep.hnsID = ""
  184. } else {
  185. klog.Errorf("Endpoint deletion failed for %v: %v", ep.ip, err)
  186. }
  187. }
  188. }
  189. // returns a new serviceInfo struct
  190. func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo {
  191. onlyNodeLocalEndpoints := false
  192. if apiservice.RequestsOnlyLocalTraffic(service) {
  193. onlyNodeLocalEndpoints = true
  194. }
  195. // set default session sticky max age 180min=10800s
  196. stickyMaxAgeSeconds := 10800
  197. if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil {
  198. stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
  199. }
  200. klog.Infof("Service %q preserve-destination: %v", svcPortName.NamespacedName.String(), service.Annotations["preserve-destination"])
  201. preserveDIP := service.Annotations["preserve-destination"] == "true"
  202. err := hcn.DSRSupported()
  203. if err != nil {
  204. preserveDIP = false
  205. }
  206. // targetPort is zero if it is specified as a name in port.TargetPort.
  207. // Its real value would be got later from endpoints.
  208. targetPort := 0
  209. if port.TargetPort.Type == intstr.Int {
  210. targetPort = port.TargetPort.IntValue()
  211. }
  212. info := &serviceInfo{
  213. clusterIP: net.ParseIP(service.Spec.ClusterIP),
  214. port: int(port.Port),
  215. protocol: port.Protocol,
  216. nodePort: int(port.NodePort),
  217. targetPort: targetPort,
  218. // Deep-copy in case the service instance changes
  219. loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(),
  220. sessionAffinityType: service.Spec.SessionAffinity,
  221. stickyMaxAgeSeconds: stickyMaxAgeSeconds,
  222. loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
  223. onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
  224. hns: hns,
  225. preserveDIP: preserveDIP,
  226. }
  227. copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
  228. for _, eip := range service.Spec.ExternalIPs {
  229. info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
  230. }
  231. for _, ingress := range service.Status.LoadBalancer.Ingress {
  232. info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
  233. }
  234. if apiservice.NeedsHealthCheck(service) {
  235. p := service.Spec.HealthCheckNodePort
  236. if p == 0 {
  237. klog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
  238. } else {
  239. info.healthCheckNodePort = int(p)
  240. }
  241. }
  242. return info
  243. }
  244. type endpointsChange struct {
  245. previous proxyEndpointsMap
  246. current proxyEndpointsMap
  247. }
  248. type endpointsChangeMap struct {
  249. lock sync.Mutex
  250. hostname string
  251. items map[types.NamespacedName]*endpointsChange
  252. }
  253. type serviceChange struct {
  254. previous proxyServiceMap
  255. current proxyServiceMap
  256. }
  257. type serviceChangeMap struct {
  258. lock sync.Mutex
  259. items map[types.NamespacedName]*serviceChange
  260. }
  261. type updateEndpointMapResult struct {
  262. hcEndpoints map[types.NamespacedName]int
  263. staleEndpoints map[endpointServicePair]bool
  264. staleServiceNames map[proxy.ServicePortName]bool
  265. }
  266. type updateServiceMapResult struct {
  267. hcServices map[types.NamespacedName]uint16
  268. staleServices sets.String
  269. }
  270. type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
  271. type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
  272. func newEndpointsChangeMap(hostname string) endpointsChangeMap {
  273. return endpointsChangeMap{
  274. hostname: hostname,
  275. items: make(map[types.NamespacedName]*endpointsChange),
  276. }
  277. }
  278. func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints, hns HostNetworkService) bool {
  279. ecm.lock.Lock()
  280. defer ecm.lock.Unlock()
  281. change, exists := ecm.items[*namespacedName]
  282. if !exists {
  283. change = &endpointsChange{}
  284. change.previous = endpointsToEndpointsMap(previous, ecm.hostname, hns)
  285. ecm.items[*namespacedName] = change
  286. }
  287. change.current = endpointsToEndpointsMap(current, ecm.hostname, hns)
  288. if reflect.DeepEqual(change.previous, change.current) {
  289. delete(ecm.items, *namespacedName)
  290. }
  291. return len(ecm.items) > 0
  292. }
  293. func newServiceChangeMap() serviceChangeMap {
  294. return serviceChangeMap{
  295. items: make(map[types.NamespacedName]*serviceChange),
  296. }
  297. }
  298. func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service, hns HostNetworkService) bool {
  299. scm.lock.Lock()
  300. defer scm.lock.Unlock()
  301. change, exists := scm.items[*namespacedName]
  302. if !exists {
  303. // Service is Added
  304. change = &serviceChange{}
  305. change.previous = serviceToServiceMap(previous, hns)
  306. scm.items[*namespacedName] = change
  307. }
  308. change.current = serviceToServiceMap(current, hns)
  309. if reflect.DeepEqual(change.previous, change.current) {
  310. delete(scm.items, *namespacedName)
  311. }
  312. return len(scm.items) > 0
  313. }
  314. func (sm *proxyServiceMap) merge(other proxyServiceMap, curEndpoints proxyEndpointsMap) sets.String {
  315. existingPorts := sets.NewString()
  316. for svcPortName, info := range other {
  317. existingPorts.Insert(svcPortName.Port)
  318. svcInfo, exists := (*sm)[svcPortName]
  319. if !exists {
  320. klog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  321. } else {
  322. klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  323. svcInfo.cleanupAllPolicies(curEndpoints[svcPortName])
  324. delete(*sm, svcPortName)
  325. }
  326. (*sm)[svcPortName] = info
  327. }
  328. return existingPorts
  329. }
  330. func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String, curEndpoints proxyEndpointsMap) {
  331. for svcPortName := range other {
  332. if existingPorts.Has(svcPortName.Port) {
  333. continue
  334. }
  335. info, exists := (*sm)[svcPortName]
  336. if exists {
  337. klog.V(1).Infof("Removing service port %q", svcPortName)
  338. if info.protocol == v1.ProtocolUDP {
  339. staleServices.Insert(info.clusterIP.String())
  340. }
  341. info.cleanupAllPolicies(curEndpoints[svcPortName])
  342. delete(*sm, svcPortName)
  343. } else {
  344. klog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
  345. }
  346. }
  347. }
  348. func (em proxyEndpointsMap) merge(other proxyEndpointsMap, curServices proxyServiceMap) {
  349. // Endpoint Update/Add
  350. for svcPortName := range other {
  351. epInfos, exists := em[svcPortName]
  352. if exists {
  353. //
  354. info, exists := curServices[svcPortName]
  355. klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
  356. if exists {
  357. klog.V(2).Infof("Endpoints are modified. Service [%v] is stale", svcPortName)
  358. info.cleanupAllPolicies(epInfos)
  359. } else {
  360. // If no service exists, just cleanup the remote endpoints
  361. klog.V(2).Infof("Endpoints are orphaned. Cleaning up")
  362. // Cleanup Endpoints references
  363. for _, ep := range epInfos {
  364. ep.Cleanup()
  365. }
  366. }
  367. delete(em, svcPortName)
  368. }
  369. em[svcPortName] = other[svcPortName]
  370. }
  371. }
  372. func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxyServiceMap) {
  373. // Endpoint Update/Removal
  374. for svcPortName := range other {
  375. info, exists := curServices[svcPortName]
  376. if exists {
  377. klog.V(2).Infof("Service [%v] is stale", info)
  378. info.cleanupAllPolicies(em[svcPortName])
  379. } else {
  380. // If no service exists, just cleanup the remote endpoints
  381. klog.V(2).Infof("Endpoints are orphaned. Cleaning up")
  382. // Cleanup Endpoints references
  383. epInfos, exists := em[svcPortName]
  384. if exists {
  385. for _, ep := range epInfos {
  386. ep.Cleanup()
  387. }
  388. }
  389. }
  390. delete(em, svcPortName)
  391. }
  392. }
  393. // Proxier is an hns based proxy for connections between a localhost:lport
  394. // and services that provide the actual backends.
  395. type Proxier struct {
  396. // EndpointSlice support has not been added for this proxier yet.
  397. proxyconfig.NoopEndpointSliceHandler
  398. // TODO(imroc): implement node handler for winkernel proxier.
  399. proxyconfig.NoopNodeHandler
  400. // endpointsChanges and serviceChanges contains all changes to endpoints and
  401. // services that happened since policies were synced. For a single object,
  402. // changes are accumulated, i.e. previous is state from before all of them,
  403. // current is state after applying all of those.
  404. endpointsChanges endpointsChangeMap
  405. serviceChanges serviceChangeMap
  406. mu sync.Mutex // protects the following fields
  407. serviceMap proxyServiceMap
  408. endpointsMap proxyEndpointsMap
  409. portsMap map[localPort]closeable
  410. // endpointsSynced and servicesSynced are set to true when corresponding
  411. // objects are synced after startup. This is used to avoid updating hns policies
  412. // with some partial data after kube-proxy restart.
  413. endpointsSynced bool
  414. servicesSynced bool
  415. initialized int32
  416. syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
  417. // These are effectively const and do not need the mutex to be held.
  418. masqueradeAll bool
  419. masqueradeMark string
  420. clusterCIDR string
  421. hostname string
  422. nodeIP net.IP
  423. recorder record.EventRecorder
  424. serviceHealthServer healthcheck.ServiceHealthServer
  425. healthzServer healthcheck.ProxierHealthUpdater
  426. // Since converting probabilities (floats) to strings is expensive
  427. // and we are using only probabilities in the format of 1/n, we are
  428. // precomputing some number of those and cache for future reuse.
  429. precomputedProbabilities []string
  430. hns HostNetworkService
  431. network hnsNetworkInfo
  432. sourceVip string
  433. hostMac string
  434. isDSR bool
  435. }
  436. type localPort struct {
  437. desc string
  438. ip string
  439. port int
  440. protocol string
  441. }
  442. func (lp *localPort) String() string {
  443. return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
  444. }
  445. func Enum(p v1.Protocol) uint16 {
  446. if p == v1.ProtocolTCP {
  447. return 6
  448. }
  449. if p == v1.ProtocolUDP {
  450. return 17
  451. }
  452. if p == v1.ProtocolSCTP {
  453. return 132
  454. }
  455. return 0
  456. }
  457. type closeable interface {
  458. Close() error
  459. }
  460. // Proxier implements proxy.Provider
  461. var _ proxy.Provider = &Proxier{}
  462. // NewProxier returns a new Proxier
  463. func NewProxier(
  464. syncPeriod time.Duration,
  465. minSyncPeriod time.Duration,
  466. masqueradeAll bool,
  467. masqueradeBit int,
  468. clusterCIDR string,
  469. hostname string,
  470. nodeIP net.IP,
  471. recorder record.EventRecorder,
  472. healthzServer healthcheck.ProxierHealthUpdater,
  473. config config.KubeProxyWinkernelConfiguration,
  474. ) (*Proxier, error) {
  475. masqueradeValue := 1 << uint(masqueradeBit)
  476. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  477. if nodeIP == nil {
  478. klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
  479. nodeIP = net.ParseIP("127.0.0.1")
  480. }
  481. if len(clusterCIDR) == 0 {
  482. klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
  483. }
  484. serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
  485. var hns HostNetworkService
  486. hns = hnsV1{}
  487. supportedFeatures := hcn.GetSupportedFeatures()
  488. if supportedFeatures.Api.V2 {
  489. hns = hnsV2{}
  490. }
  491. hnsNetworkName := config.NetworkName
  492. if len(hnsNetworkName) == 0 {
  493. klog.V(3).Infof("network-name flag not set. Checking environment variable")
  494. hnsNetworkName = os.Getenv("KUBE_NETWORK")
  495. if len(hnsNetworkName) == 0 {
  496. return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
  497. }
  498. }
  499. klog.V(3).Infof("Cleaning up old HNS policy lists")
  500. deleteAllHnsLoadBalancerPolicy()
  501. // Get HNS network information
  502. hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
  503. for err != nil {
  504. klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
  505. time.Sleep(1 * time.Second)
  506. hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
  507. }
  508. // Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated
  509. // Sleep and update the network to include new information
  510. if hnsNetworkInfo.networkType == "Overlay" {
  511. time.Sleep(10 * time.Second)
  512. hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
  513. if err != nil {
  514. return nil, fmt.Errorf("Could not find HNS network %s", hnsNetworkName)
  515. }
  516. }
  517. klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetworkInfo)
  518. isDSR := config.EnableDSR
  519. if isDSR && !utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WinDSR) {
  520. return nil, fmt.Errorf("WinDSR feature gate not enabled")
  521. }
  522. err = hcn.DSRSupported()
  523. if isDSR && err != nil {
  524. return nil, err
  525. }
  526. var sourceVip string
  527. var hostMac string
  528. if hnsNetworkInfo.networkType == "Overlay" {
  529. if !utilfeature.DefaultFeatureGate.Enabled(genericfeatures.WinOverlay) {
  530. return nil, fmt.Errorf("WinOverlay feature gate not enabled")
  531. }
  532. err = hcn.RemoteSubnetSupported()
  533. if err != nil {
  534. return nil, err
  535. }
  536. sourceVip = config.SourceVip
  537. if len(sourceVip) == 0 {
  538. return nil, fmt.Errorf("source-vip flag not set")
  539. }
  540. interfaces, _ := net.Interfaces() //TODO create interfaces
  541. for _, inter := range interfaces {
  542. addresses, _ := inter.Addrs()
  543. for _, addr := range addresses {
  544. addrIP, _, _ := net.ParseCIDR(addr.String())
  545. if addrIP.String() == nodeIP.String() {
  546. klog.V(2).Infof("Host MAC address is %s", inter.HardwareAddr.String())
  547. hostMac = inter.HardwareAddr.String()
  548. }
  549. }
  550. }
  551. if len(hostMac) == 0 {
  552. return nil, fmt.Errorf("Could not find host mac address for %s", nodeIP)
  553. }
  554. }
  555. proxier := &Proxier{
  556. portsMap: make(map[localPort]closeable),
  557. serviceMap: make(proxyServiceMap),
  558. serviceChanges: newServiceChangeMap(),
  559. endpointsMap: make(proxyEndpointsMap),
  560. endpointsChanges: newEndpointsChangeMap(hostname),
  561. masqueradeAll: masqueradeAll,
  562. masqueradeMark: masqueradeMark,
  563. clusterCIDR: clusterCIDR,
  564. hostname: hostname,
  565. nodeIP: nodeIP,
  566. recorder: recorder,
  567. serviceHealthServer: serviceHealthServer,
  568. healthzServer: healthzServer,
  569. hns: hns,
  570. network: *hnsNetworkInfo,
  571. sourceVip: sourceVip,
  572. hostMac: hostMac,
  573. isDSR: isDSR,
  574. }
  575. burstSyncs := 2
  576. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  577. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  578. return proxier, nil
  579. }
  580. // CleanupLeftovers removes all hns rules created by the Proxier
  581. // It returns true if an error was encountered. Errors are logged.
  582. func CleanupLeftovers() (encounteredError bool) {
  583. // Delete all Hns Load Balancer Policies
  584. deleteAllHnsLoadBalancerPolicy()
  585. // TODO
  586. // Delete all Hns Remote endpoints
  587. return encounteredError
  588. }
  589. func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []*endpointsInfo) {
  590. Log(svcInfo, "Service Cleanup", 3)
  591. // Skip the svcInfo.policyApplied check to remove all the policies
  592. svcInfo.deleteAllHnsLoadBalancerPolicy()
  593. // Cleanup Endpoints references
  594. for _, ep := range endpoints {
  595. ep.Cleanup()
  596. }
  597. if svcInfo.remoteEndpoint != nil {
  598. svcInfo.remoteEndpoint.Cleanup()
  599. }
  600. svcInfo.policyApplied = false
  601. }
  602. func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
  603. // Remove the Hns Policy corresponding to this service
  604. hns := svcInfo.hns
  605. hns.deleteLoadBalancer(svcInfo.hnsID)
  606. svcInfo.hnsID = ""
  607. hns.deleteLoadBalancer(svcInfo.nodePorthnsID)
  608. svcInfo.nodePorthnsID = ""
  609. for _, externalIP := range svcInfo.externalIPs {
  610. hns.deleteLoadBalancer(externalIP.hnsID)
  611. externalIP.hnsID = ""
  612. }
  613. for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
  614. hns.deleteLoadBalancer(lbIngressIP.hnsID)
  615. lbIngressIP.hnsID = ""
  616. }
  617. }
  618. func deleteAllHnsLoadBalancerPolicy() {
  619. plists, err := hcsshim.HNSListPolicyListRequest()
  620. if err != nil {
  621. return
  622. }
  623. for _, plist := range plists {
  624. LogJson(plist, "Remove Policy", 3)
  625. _, err = plist.Delete()
  626. if err != nil {
  627. klog.Errorf("%v", err)
  628. }
  629. }
  630. }
  631. func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
  632. hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
  633. if err != nil {
  634. klog.Errorf("%v", err)
  635. return nil, err
  636. }
  637. return &hnsNetworkInfo{
  638. id: hnsnetwork.Id,
  639. name: hnsnetwork.Name,
  640. networkType: hnsnetwork.Type,
  641. }, nil
  642. }
  643. // Sync is called to synchronize the proxier state to hns as soon as possible.
  644. func (proxier *Proxier) Sync() {
  645. if proxier.healthzServer != nil {
  646. proxier.healthzServer.QueuedUpdate()
  647. }
  648. proxier.syncRunner.Run()
  649. }
  650. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  651. func (proxier *Proxier) SyncLoop() {
  652. // Update healthz timestamp at beginning in case Sync() never succeeds.
  653. if proxier.healthzServer != nil {
  654. proxier.healthzServer.Updated()
  655. }
  656. proxier.syncRunner.Loop(wait.NeverStop)
  657. }
  658. func (proxier *Proxier) setInitialized(value bool) {
  659. var initialized int32
  660. if value {
  661. initialized = 1
  662. }
  663. atomic.StoreInt32(&proxier.initialized, initialized)
  664. }
  665. func (proxier *Proxier) isInitialized() bool {
  666. return atomic.LoadInt32(&proxier.initialized) > 0
  667. }
  668. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  669. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  670. if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() {
  671. proxier.Sync()
  672. }
  673. }
  674. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  675. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  676. if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() {
  677. proxier.Sync()
  678. }
  679. }
  680. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  681. namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  682. if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() {
  683. proxier.Sync()
  684. }
  685. }
  686. func (proxier *Proxier) OnServiceSynced() {
  687. proxier.mu.Lock()
  688. proxier.servicesSynced = true
  689. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  690. proxier.mu.Unlock()
  691. // Sync unconditionally - this is called once per lifetime.
  692. proxier.syncProxyRules()
  693. }
  694. func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
  695. // if ClusterIP is "None" or empty, skip proxying
  696. if !helper.IsServiceIPSet(service) {
  697. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  698. return true
  699. }
  700. // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
  701. if service.Spec.Type == v1.ServiceTypeExternalName {
  702. klog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
  703. return true
  704. }
  705. return false
  706. }
  707. // <serviceMap> is updated by this function (based on the given changes).
  708. // <changes> map is cleared after applying them.
  709. func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) {
  710. result.staleServices = sets.NewString()
  711. serviceMap := proxier.serviceMap
  712. changes := &proxier.serviceChanges
  713. func() {
  714. changes.lock.Lock()
  715. defer changes.lock.Unlock()
  716. for _, change := range changes.items {
  717. existingPorts := serviceMap.merge(change.current, proxier.endpointsMap)
  718. serviceMap.unmerge(change.previous, existingPorts, result.staleServices, proxier.endpointsMap)
  719. }
  720. changes.items = make(map[types.NamespacedName]*serviceChange)
  721. }()
  722. // TODO: If this will appear to be computationally expensive, consider
  723. // computing this incrementally similarly to serviceMap.
  724. result.hcServices = make(map[types.NamespacedName]uint16)
  725. for svcPortName, info := range serviceMap {
  726. if info.healthCheckNodePort != 0 {
  727. result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
  728. }
  729. }
  730. return result
  731. }
  732. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  733. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  734. if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() {
  735. proxier.Sync()
  736. }
  737. }
  738. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  739. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  740. if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() {
  741. proxier.Sync()
  742. }
  743. }
  744. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  745. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  746. if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() {
  747. proxier.Sync()
  748. }
  749. }
  750. func (proxier *Proxier) OnEndpointsSynced() {
  751. proxier.mu.Lock()
  752. proxier.endpointsSynced = true
  753. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  754. proxier.mu.Unlock()
  755. // Sync unconditionally - this is called once per lifetime.
  756. proxier.syncProxyRules()
  757. }
  758. func (proxier *Proxier) cleanupAllPolicies() {
  759. for svcName, svcInfo := range proxier.serviceMap {
  760. svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
  761. }
  762. }
  763. func isNetworkNotFoundError(err error) bool {
  764. if err == nil {
  765. return false
  766. }
  767. if _, ok := err.(hcn.NetworkNotFoundError); ok {
  768. return true
  769. }
  770. if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
  771. return true
  772. }
  773. return false
  774. }
  775. // <endpointsMap> is updated by this function (based on the given changes).
  776. // <changes> map is cleared after applying them.
  777. func (proxier *Proxier) updateEndpointsMap() (result updateEndpointMapResult) {
  778. result.staleEndpoints = make(map[endpointServicePair]bool)
  779. result.staleServiceNames = make(map[proxy.ServicePortName]bool)
  780. endpointsMap := proxier.endpointsMap
  781. changes := &proxier.endpointsChanges
  782. func() {
  783. changes.lock.Lock()
  784. defer changes.lock.Unlock()
  785. for _, change := range changes.items {
  786. endpointsMap.unmerge(change.previous, proxier.serviceMap)
  787. endpointsMap.merge(change.current, proxier.serviceMap)
  788. }
  789. changes.items = make(map[types.NamespacedName]*endpointsChange)
  790. }()
  791. // TODO: If this will appear to be computationally expensive, consider
  792. // computing this incrementally similarly to endpointsMap.
  793. result.hcEndpoints = make(map[types.NamespacedName]int)
  794. localIPs := getLocalIPs(endpointsMap)
  795. for nsn, ips := range localIPs {
  796. result.hcEndpoints[nsn] = len(ips)
  797. }
  798. return result
  799. }
  800. func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
  801. localIPs := make(map[types.NamespacedName]sets.String)
  802. for svcPortName := range endpointsMap {
  803. for _, ep := range endpointsMap[svcPortName] {
  804. if ep.isLocal {
  805. nsn := svcPortName.NamespacedName
  806. if localIPs[nsn] == nil {
  807. localIPs[nsn] = sets.NewString()
  808. }
  809. localIPs[nsn].Insert(ep.ip) // just the IP part
  810. }
  811. }
  812. }
  813. return localIPs
  814. }
  815. // Translates single Endpoints object to proxyEndpointsMap.
  816. // This function is used for incremental updated of endpointsMap.
  817. //
  818. // NOTE: endpoints object should NOT be modified.
  819. func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string, hns HostNetworkService) proxyEndpointsMap {
  820. if endpoints == nil {
  821. return nil
  822. }
  823. endpointsMap := make(proxyEndpointsMap)
  824. // We need to build a map of portname -> all ip:ports for that
  825. // portname. Explode Endpoints.Subsets[*] into this structure.
  826. for i := range endpoints.Subsets {
  827. ss := &endpoints.Subsets[i]
  828. for i := range ss.Ports {
  829. port := &ss.Ports[i]
  830. if port.Port == 0 {
  831. klog.Warningf("Ignoring invalid endpoint port %s", port.Name)
  832. continue
  833. }
  834. svcPortName := proxy.ServicePortName{
  835. NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
  836. Port: port.Name,
  837. }
  838. for i := range ss.Addresses {
  839. addr := &ss.Addresses[i]
  840. if addr.IP == "" {
  841. klog.Warningf("Ignoring invalid endpoint port %s with empty host", port.Name)
  842. continue
  843. }
  844. isLocal := addr.NodeName != nil && *addr.NodeName == hostname
  845. epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal, hns)
  846. endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
  847. }
  848. if klog.V(3) {
  849. newEPList := []*endpointsInfo{}
  850. for _, ep := range endpointsMap[svcPortName] {
  851. newEPList = append(newEPList, ep)
  852. }
  853. klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
  854. }
  855. }
  856. }
  857. return endpointsMap
  858. }
  859. // Translates single Service object to proxyServiceMap.
  860. //
  861. // NOTE: service object should NOT be modified.
  862. func serviceToServiceMap(service *v1.Service, hns HostNetworkService) proxyServiceMap {
  863. if service == nil {
  864. return nil
  865. }
  866. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  867. if shouldSkipService(svcName, service) {
  868. return nil
  869. }
  870. serviceMap := make(proxyServiceMap)
  871. for i := range service.Spec.Ports {
  872. servicePort := &service.Spec.Ports[i]
  873. svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  874. serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service, hns)
  875. }
  876. return serviceMap
  877. }
  878. // This is where all of the hns save/restore calls happen.
  879. // assumes proxier.mu is held
  880. func (proxier *Proxier) syncProxyRules() {
  881. proxier.mu.Lock()
  882. defer proxier.mu.Unlock()
  883. start := time.Now()
  884. defer func() {
  885. SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
  886. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  887. }()
  888. // don't sync rules till we've received services and endpoints
  889. if !proxier.endpointsSynced || !proxier.servicesSynced {
  890. klog.V(2).Info("Not syncing hns until Services and Endpoints have been received from master")
  891. return
  892. }
  893. hnsNetworkName := proxier.network.name
  894. hns := proxier.hns
  895. prevNetworkID := proxier.network.id
  896. updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
  897. if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
  898. klog.Infof("The HNS network %s is not present or has changed since the last sync. Please check the CNI deployment", hnsNetworkName)
  899. proxier.cleanupAllPolicies()
  900. if updatedNetwork != nil {
  901. proxier.network = *updatedNetwork
  902. }
  903. return
  904. }
  905. // We assume that if this was called, we really want to sync them,
  906. // even if nothing changed in the meantime. In other words, callers are
  907. // responsible for detecting no-op changes and not calling this function.
  908. serviceUpdateResult := proxier.updateServiceMap()
  909. endpointUpdateResult := proxier.updateEndpointsMap()
  910. staleServices := serviceUpdateResult.staleServices
  911. // merge stale services gathered from updateEndpointsMap
  912. for svcPortName := range endpointUpdateResult.staleServiceNames {
  913. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == v1.ProtocolUDP {
  914. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
  915. staleServices.Insert(svcInfo.clusterIP.String())
  916. }
  917. }
  918. if proxier.network.networkType == "Overlay" {
  919. existingSourceVip, err := hns.getEndpointByIpAddress(proxier.sourceVip, hnsNetworkName)
  920. if existingSourceVip == nil {
  921. _, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
  922. }
  923. if err != nil {
  924. klog.Errorf("Source Vip endpoint creation failed: %v", err)
  925. return
  926. }
  927. }
  928. klog.V(3).Infof("Syncing Policies")
  929. // Program HNS by adding corresponding policies for each service.
  930. for svcName, svcInfo := range proxier.serviceMap {
  931. if svcInfo.policyApplied {
  932. klog.V(4).Infof("Policy already applied for %s", spew.Sdump(svcInfo))
  933. continue
  934. }
  935. if proxier.network.networkType == "Overlay" {
  936. serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.clusterIP.String(), hnsNetworkName)
  937. if serviceVipEndpoint == nil {
  938. klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.clusterIP.String())
  939. hnsEndpoint := &endpointsInfo{
  940. ip: svcInfo.clusterIP.String(),
  941. isLocal: false,
  942. macAddress: proxier.hostMac,
  943. providerAddress: proxier.nodeIP.String(),
  944. }
  945. newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  946. if err != nil {
  947. klog.Errorf("Remote endpoint creation failed for service VIP: %v", err)
  948. continue
  949. }
  950. newHnsEndpoint.refCount++
  951. svcInfo.remoteEndpoint = newHnsEndpoint
  952. }
  953. }
  954. var hnsEndpoints []endpointsInfo
  955. var hnsLocalEndpoints []endpointsInfo
  956. klog.V(4).Infof("====Applying Policy for %s====", svcName)
  957. // Create Remote endpoints for every endpoint, corresponding to the service
  958. containsPublicIP := false
  959. containsNodeIP := false
  960. for _, ep := range proxier.endpointsMap[svcName] {
  961. var newHnsEndpoint *endpointsInfo
  962. hnsNetworkName := proxier.network.name
  963. var err error
  964. // targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
  965. // Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
  966. // TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
  967. if svcInfo.targetPort == 0 {
  968. svcInfo.targetPort = int(ep.port)
  969. }
  970. if len(ep.hnsID) > 0 {
  971. newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
  972. }
  973. if newHnsEndpoint == nil {
  974. // First check if an endpoint resource exists for this IP, on the current host
  975. // A Local endpoint could exist here already
  976. // A remote endpoint was already created and proxy was restarted
  977. newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.ip, hnsNetworkName)
  978. }
  979. if newHnsEndpoint == nil {
  980. if ep.isLocal {
  981. klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.ip, err, hnsNetworkName)
  982. continue
  983. }
  984. if proxier.network.networkType == "Overlay" {
  985. klog.Infof("Updating network %v to check for new remote subnet policies", proxier.network.name)
  986. networkName := proxier.network.name
  987. updatedNetwork, err := hns.getNetworkByName(networkName)
  988. if err != nil {
  989. klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
  990. proxier.cleanupAllPolicies()
  991. return
  992. }
  993. proxier.network = *updatedNetwork
  994. var providerAddress string
  995. for _, rs := range proxier.network.remoteSubnets {
  996. _, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
  997. if err != nil {
  998. klog.Fatalf("%v", err)
  999. }
  1000. if ipNet.Contains(net.ParseIP(ep.ip)) {
  1001. providerAddress = rs.providerAddress
  1002. }
  1003. if ep.ip == rs.providerAddress {
  1004. providerAddress = rs.providerAddress
  1005. containsNodeIP = true
  1006. }
  1007. }
  1008. if len(providerAddress) == 0 {
  1009. klog.Infof("Could not find provider address for %s. Assuming it is a public IP", ep.ip)
  1010. providerAddress = proxier.nodeIP.String()
  1011. containsPublicIP = true
  1012. }
  1013. hnsEndpoint := &endpointsInfo{
  1014. ip: ep.ip,
  1015. isLocal: false,
  1016. macAddress: conjureMac("02-11", net.ParseIP(ep.ip)),
  1017. providerAddress: providerAddress,
  1018. }
  1019. newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1020. if err != nil {
  1021. klog.Errorf("Remote endpoint creation failed: %v, %s", err, spew.Sdump(hnsEndpoint))
  1022. continue
  1023. }
  1024. } else {
  1025. hnsEndpoint := &endpointsInfo{
  1026. ip: ep.ip,
  1027. isLocal: false,
  1028. macAddress: ep.macAddress,
  1029. }
  1030. newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1031. if err != nil {
  1032. klog.Errorf("Remote endpoint creation failed: %v", err)
  1033. continue
  1034. }
  1035. }
  1036. }
  1037. // Save the hnsId for reference
  1038. LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
  1039. hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
  1040. if newHnsEndpoint.isLocal {
  1041. hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
  1042. }
  1043. ep.hnsID = newHnsEndpoint.hnsID
  1044. ep.refCount++
  1045. Log(ep, "Endpoint resource found", 3)
  1046. }
  1047. klog.V(3).Infof("Associated endpoints [%s] for service [%s]", spew.Sdump(hnsEndpoints), svcName)
  1048. if len(svcInfo.hnsID) > 0 {
  1049. // This should not happen
  1050. klog.Warningf("Load Balancer already exists %s -- Debug ", svcInfo.hnsID)
  1051. }
  1052. if len(hnsEndpoints) == 0 {
  1053. klog.Errorf("Endpoint information not available for service %s. Not applying any policy", svcName)
  1054. continue
  1055. }
  1056. klog.V(4).Infof("Trying to Apply Policies for service %s", spew.Sdump(svcInfo))
  1057. var hnsLoadBalancer *loadBalancerInfo
  1058. var sourceVip = proxier.sourceVip
  1059. if containsPublicIP || containsNodeIP {
  1060. sourceVip = proxier.nodeIP.String()
  1061. }
  1062. hnsLoadBalancer, err := hns.getLoadBalancer(
  1063. hnsEndpoints,
  1064. loadBalancerFlags{isDSR: proxier.isDSR},
  1065. sourceVip,
  1066. svcInfo.clusterIP.String(),
  1067. Enum(svcInfo.protocol),
  1068. uint16(svcInfo.targetPort),
  1069. uint16(svcInfo.port),
  1070. )
  1071. if err != nil {
  1072. klog.Errorf("Policy creation failed: %v", err)
  1073. continue
  1074. }
  1075. svcInfo.hnsID = hnsLoadBalancer.hnsID
  1076. klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
  1077. // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
  1078. if svcInfo.nodePort > 0 {
  1079. // If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
  1080. // This means that health services can use Node Port without falsely getting results from a different node.
  1081. nodePortEndpoints := hnsEndpoints
  1082. if svcInfo.preserveDIP {
  1083. nodePortEndpoints = hnsLocalEndpoints
  1084. }
  1085. hnsLoadBalancer, err := hns.getLoadBalancer(
  1086. nodePortEndpoints,
  1087. loadBalancerFlags{localRoutedVIP: true},
  1088. sourceVip,
  1089. "",
  1090. Enum(svcInfo.protocol),
  1091. uint16(svcInfo.targetPort),
  1092. uint16(svcInfo.nodePort),
  1093. )
  1094. if err != nil {
  1095. klog.Errorf("Policy creation failed: %v", err)
  1096. continue
  1097. }
  1098. svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
  1099. klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
  1100. }
  1101. // Create a Load Balancer Policy for each external IP
  1102. for _, externalIP := range svcInfo.externalIPs {
  1103. // Try loading existing policies, if already available
  1104. hnsLoadBalancer, err = hns.getLoadBalancer(
  1105. hnsEndpoints,
  1106. loadBalancerFlags{},
  1107. sourceVip,
  1108. externalIP.ip,
  1109. Enum(svcInfo.protocol),
  1110. uint16(svcInfo.targetPort),
  1111. uint16(svcInfo.port),
  1112. )
  1113. if err != nil {
  1114. klog.Errorf("Policy creation failed: %v", err)
  1115. continue
  1116. }
  1117. externalIP.hnsID = hnsLoadBalancer.hnsID
  1118. klog.V(3).Infof("Hns LoadBalancer resource created for externalIP resources %v, Id[%s]", externalIP, hnsLoadBalancer.hnsID)
  1119. }
  1120. // Create a Load Balancer Policy for each loadbalancer ingress
  1121. for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
  1122. // Try loading existing policies, if already available
  1123. lbIngressEndpoints := hnsEndpoints
  1124. if svcInfo.preserveDIP {
  1125. lbIngressEndpoints = hnsLocalEndpoints
  1126. }
  1127. hnsLoadBalancer, err := hns.getLoadBalancer(
  1128. lbIngressEndpoints,
  1129. loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
  1130. sourceVip,
  1131. lbIngressIP.ip,
  1132. Enum(svcInfo.protocol),
  1133. uint16(svcInfo.targetPort),
  1134. uint16(svcInfo.port),
  1135. )
  1136. if err != nil {
  1137. klog.Errorf("Policy creation failed: %v", err)
  1138. continue
  1139. }
  1140. lbIngressIP.hnsID = hnsLoadBalancer.hnsID
  1141. klog.V(3).Infof("Hns LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIP)
  1142. }
  1143. svcInfo.policyApplied = true
  1144. Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
  1145. }
  1146. if proxier.healthzServer != nil {
  1147. proxier.healthzServer.Updated()
  1148. }
  1149. SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1150. // Update service healthchecks. The endpoints list might include services that are
  1151. // not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1152. // will just drop those endpoints.
  1153. if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.hcServices); err != nil {
  1154. klog.Errorf("Error syncing healthcheck services: %v", err)
  1155. }
  1156. if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
  1157. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  1158. }
  1159. // Finish housekeeping.
  1160. // TODO: these could be made more consistent.
  1161. for _, svcIP := range staleServices.UnsortedList() {
  1162. // TODO : Check if this is required to cleanup stale services here
  1163. klog.V(5).Infof("Pending delete stale service IP %s connections", svcIP)
  1164. }
  1165. }
  1166. type endpointServicePair struct {
  1167. endpoint string
  1168. servicePortName proxy.ServicePortName
  1169. }