cloudstack_loadbalancer.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cloudstack
  14. import (
  15. "context"
  16. "fmt"
  17. "strconv"
  18. "github.com/xanzy/go-cloudstack/cloudstack"
  19. "k8s.io/klog"
  20. "k8s.io/api/core/v1"
  21. cloudprovider "k8s.io/cloud-provider"
  22. )
  23. type loadBalancer struct {
  24. *cloudstack.CloudStackClient
  25. name string
  26. algorithm string
  27. hostIDs []string
  28. ipAddr string
  29. ipAddrID string
  30. networkID string
  31. projectID string
  32. rules map[string]*cloudstack.LoadBalancerRule
  33. }
  34. // GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
  35. func (cs *CSCloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
  36. klog.V(4).Infof("GetLoadBalancer(%v, %v, %v)", clusterName, service.Namespace, service.Name)
  37. // Get the load balancer details and existing rules.
  38. lb, err := cs.getLoadBalancer(service)
  39. if err != nil {
  40. return nil, false, err
  41. }
  42. // If we don't have any rules, the load balancer does not exist.
  43. if len(lb.rules) == 0 {
  44. return nil, false, nil
  45. }
  46. klog.V(4).Infof("Found a load balancer associated with IP %v", lb.ipAddr)
  47. status := &v1.LoadBalancerStatus{}
  48. status.Ingress = append(status.Ingress, v1.LoadBalancerIngress{IP: lb.ipAddr})
  49. return status, true, nil
  50. }
  51. // EnsureLoadBalancer creates a new load balancer, or updates the existing one. Returns the status of the balancer.
  52. func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (status *v1.LoadBalancerStatus, err error) {
  53. klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, service.Spec.LoadBalancerIP, service.Spec.Ports, nodes)
  54. if len(service.Spec.Ports) == 0 {
  55. return nil, fmt.Errorf("requested load balancer with no ports")
  56. }
  57. // Get the load balancer details and existing rules.
  58. lb, err := cs.getLoadBalancer(service)
  59. if err != nil {
  60. return nil, err
  61. }
  62. // Set the load balancer algorithm.
  63. switch service.Spec.SessionAffinity {
  64. case v1.ServiceAffinityNone:
  65. lb.algorithm = "roundrobin"
  66. case v1.ServiceAffinityClientIP:
  67. lb.algorithm = "source"
  68. default:
  69. return nil, fmt.Errorf("unsupported load balancer affinity: %v", service.Spec.SessionAffinity)
  70. }
  71. // Verify that all the hosts belong to the same network, and retrieve their ID's.
  72. lb.hostIDs, lb.networkID, err = cs.verifyHosts(nodes)
  73. if err != nil {
  74. return nil, err
  75. }
  76. if !lb.hasLoadBalancerIP() {
  77. // Create or retrieve the load balancer IP.
  78. if err := lb.getLoadBalancerIP(service.Spec.LoadBalancerIP); err != nil {
  79. return nil, err
  80. }
  81. if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
  82. defer func(lb *loadBalancer) {
  83. if err != nil {
  84. if err := lb.releaseLoadBalancerIP(); err != nil {
  85. klog.Errorf(err.Error())
  86. }
  87. }
  88. }(lb)
  89. }
  90. }
  91. klog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name, lb.ipAddr)
  92. for _, port := range service.Spec.Ports {
  93. // All ports have their own load balancer rule, so add the port to lbName to keep the names unique.
  94. lbRuleName := fmt.Sprintf("%s-%d", lb.name, port.Port)
  95. // If the load balancer rule exists and is up-to-date, we move on to the next rule.
  96. exists, needsUpdate, err := lb.checkLoadBalancerRule(lbRuleName, port)
  97. if err != nil {
  98. return nil, err
  99. }
  100. if exists && !needsUpdate {
  101. klog.V(4).Infof("Load balancer rule %v is up-to-date", lbRuleName)
  102. // Delete the rule from the map, to prevent it being deleted.
  103. delete(lb.rules, lbRuleName)
  104. continue
  105. }
  106. if needsUpdate {
  107. klog.V(4).Infof("Updating load balancer rule: %v", lbRuleName)
  108. if err := lb.updateLoadBalancerRule(lbRuleName); err != nil {
  109. return nil, err
  110. }
  111. // Delete the rule from the map, to prevent it being deleted.
  112. delete(lb.rules, lbRuleName)
  113. continue
  114. }
  115. klog.V(4).Infof("Creating load balancer rule: %v", lbRuleName)
  116. lbRule, err := lb.createLoadBalancerRule(lbRuleName, port)
  117. if err != nil {
  118. return nil, err
  119. }
  120. klog.V(4).Infof("Assigning hosts (%v) to load balancer rule: %v", lb.hostIDs, lbRuleName)
  121. if err = lb.assignHostsToRule(lbRule, lb.hostIDs); err != nil {
  122. return nil, err
  123. }
  124. }
  125. // Cleanup any rules that are now still in the rules map, as they are no longer needed.
  126. for _, lbRule := range lb.rules {
  127. klog.V(4).Infof("Deleting obsolete load balancer rule: %v", lbRule.Name)
  128. if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
  129. return nil, err
  130. }
  131. }
  132. status = &v1.LoadBalancerStatus{}
  133. status.Ingress = []v1.LoadBalancerIngress{{IP: lb.ipAddr}}
  134. return status, nil
  135. }
  136. // UpdateLoadBalancer updates hosts under the specified load balancer.
  137. func (cs *CSCloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
  138. klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v)", clusterName, service.Namespace, service.Name, nodes)
  139. // Get the load balancer details and existing rules.
  140. lb, err := cs.getLoadBalancer(service)
  141. if err != nil {
  142. return err
  143. }
  144. // Verify that all the hosts belong to the same network, and retrieve their ID's.
  145. lb.hostIDs, _, err = cs.verifyHosts(nodes)
  146. if err != nil {
  147. return err
  148. }
  149. for _, lbRule := range lb.rules {
  150. p := lb.LoadBalancer.NewListLoadBalancerRuleInstancesParams(lbRule.Id)
  151. // Retrieve all VMs currently associated to this load balancer rule.
  152. l, err := lb.LoadBalancer.ListLoadBalancerRuleInstances(p)
  153. if err != nil {
  154. return fmt.Errorf("error retrieving associated instances: %v", err)
  155. }
  156. assign, remove := symmetricDifference(lb.hostIDs, l.LoadBalancerRuleInstances)
  157. if len(assign) > 0 {
  158. klog.V(4).Infof("Assigning new hosts (%v) to load balancer rule: %v", assign, lbRule.Name)
  159. if err := lb.assignHostsToRule(lbRule, assign); err != nil {
  160. return err
  161. }
  162. }
  163. if len(remove) > 0 {
  164. klog.V(4).Infof("Removing old hosts (%v) from load balancer rule: %v", assign, lbRule.Name)
  165. if err := lb.removeHostsFromRule(lbRule, remove); err != nil {
  166. return err
  167. }
  168. }
  169. }
  170. return nil
  171. }
  172. // EnsureLoadBalancerDeleted deletes the specified load balancer if it exists, returning
  173. // nil if the load balancer specified either didn't exist or was successfully deleted.
  174. func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
  175. klog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v, %v)", clusterName, service.Namespace, service.Name)
  176. // Get the load balancer details and existing rules.
  177. lb, err := cs.getLoadBalancer(service)
  178. if err != nil {
  179. return err
  180. }
  181. for _, lbRule := range lb.rules {
  182. klog.V(4).Infof("Deleting load balancer rule: %v", lbRule.Name)
  183. if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
  184. return err
  185. }
  186. }
  187. if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
  188. klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
  189. if err := lb.releaseLoadBalancerIP(); err != nil {
  190. return err
  191. }
  192. }
  193. return nil
  194. }
  195. // GetLoadBalancerName retrieves the name of the LoadBalancer.
  196. func (cs *CSCloud) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
  197. return cloudprovider.DefaultLoadBalancerName(service)
  198. }
  199. // getLoadBalancer retrieves the IP address and ID and all the existing rules it can find.
  200. func (cs *CSCloud) getLoadBalancer(service *v1.Service) (*loadBalancer, error) {
  201. lb := &loadBalancer{
  202. CloudStackClient: cs.client,
  203. name: cs.GetLoadBalancerName(context.TODO(), "", service),
  204. projectID: cs.projectID,
  205. rules: make(map[string]*cloudstack.LoadBalancerRule),
  206. }
  207. p := cs.client.LoadBalancer.NewListLoadBalancerRulesParams()
  208. p.SetKeyword(lb.name)
  209. p.SetListall(true)
  210. if cs.projectID != "" {
  211. p.SetProjectid(cs.projectID)
  212. }
  213. l, err := cs.client.LoadBalancer.ListLoadBalancerRules(p)
  214. if err != nil {
  215. return nil, fmt.Errorf("error retrieving load balancer rules: %v", err)
  216. }
  217. for _, lbRule := range l.LoadBalancerRules {
  218. lb.rules[lbRule.Name] = lbRule
  219. if lb.ipAddr != "" && lb.ipAddr != lbRule.Publicip {
  220. klog.Warningf("Load balancer for service %v/%v has rules associated with different IP's: %v, %v", service.Namespace, service.Name, lb.ipAddr, lbRule.Publicip)
  221. }
  222. lb.ipAddr = lbRule.Publicip
  223. lb.ipAddrID = lbRule.Publicipid
  224. }
  225. klog.V(4).Infof("Load balancer %v contains %d rule(s)", lb.name, len(lb.rules))
  226. return lb, nil
  227. }
  228. // verifyHosts verifies if all hosts belong to the same network, and returns the host ID's and network ID.
  229. func (cs *CSCloud) verifyHosts(nodes []*v1.Node) ([]string, string, error) {
  230. hostNames := map[string]bool{}
  231. for _, node := range nodes {
  232. hostNames[node.Name] = true
  233. }
  234. p := cs.client.VirtualMachine.NewListVirtualMachinesParams()
  235. p.SetListall(true)
  236. if cs.projectID != "" {
  237. p.SetProjectid(cs.projectID)
  238. }
  239. l, err := cs.client.VirtualMachine.ListVirtualMachines(p)
  240. if err != nil {
  241. return nil, "", fmt.Errorf("error retrieving list of hosts: %v", err)
  242. }
  243. var hostIDs []string
  244. var networkID string
  245. // Check if the virtual machine is in the hosts slice, then add the corresponding ID.
  246. for _, vm := range l.VirtualMachines {
  247. if hostNames[vm.Name] {
  248. if networkID != "" && networkID != vm.Nic[0].Networkid {
  249. return nil, "", fmt.Errorf("found hosts that belong to different networks")
  250. }
  251. networkID = vm.Nic[0].Networkid
  252. hostIDs = append(hostIDs, vm.Id)
  253. }
  254. }
  255. return hostIDs, networkID, nil
  256. }
  257. // hasLoadBalancerIP returns true if we have a load balancer address and ID.
  258. func (lb *loadBalancer) hasLoadBalancerIP() bool {
  259. return lb.ipAddr != "" && lb.ipAddrID != ""
  260. }
  261. // getLoadBalancerIP retieves an existing IP or associates a new IP.
  262. func (lb *loadBalancer) getLoadBalancerIP(loadBalancerIP string) error {
  263. if loadBalancerIP != "" {
  264. return lb.getPublicIPAddress(loadBalancerIP)
  265. }
  266. return lb.associatePublicIPAddress()
  267. }
  268. // getPublicIPAddressID retrieves the ID of the given IP, and sets the address and it's ID.
  269. func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
  270. klog.V(4).Infof("Retrieve load balancer IP details: %v", loadBalancerIP)
  271. p := lb.Address.NewListPublicIpAddressesParams()
  272. p.SetIpaddress(loadBalancerIP)
  273. p.SetListall(true)
  274. if lb.projectID != "" {
  275. p.SetProjectid(lb.projectID)
  276. }
  277. l, err := lb.Address.ListPublicIpAddresses(p)
  278. if err != nil {
  279. return fmt.Errorf("error retrieving IP address: %v", err)
  280. }
  281. if l.Count != 1 {
  282. return fmt.Errorf("could not find IP address %v", loadBalancerIP)
  283. }
  284. lb.ipAddr = l.PublicIpAddresses[0].Ipaddress
  285. lb.ipAddrID = l.PublicIpAddresses[0].Id
  286. return nil
  287. }
  288. // associatePublicIPAddress associates a new IP and sets the address and it's ID.
  289. func (lb *loadBalancer) associatePublicIPAddress() error {
  290. klog.V(4).Infof("Allocate new IP for load balancer: %v", lb.name)
  291. // If a network belongs to a VPC, the IP address needs to be associated with
  292. // the VPC instead of with the network.
  293. network, count, err := lb.Network.GetNetworkByID(lb.networkID, cloudstack.WithProject(lb.projectID))
  294. if err != nil {
  295. if count == 0 {
  296. return fmt.Errorf("could not find network %v", lb.networkID)
  297. }
  298. return fmt.Errorf("error retrieving network: %v", err)
  299. }
  300. p := lb.Address.NewAssociateIpAddressParams()
  301. if network.Vpcid != "" {
  302. p.SetVpcid(network.Vpcid)
  303. } else {
  304. p.SetNetworkid(lb.networkID)
  305. }
  306. if lb.projectID != "" {
  307. p.SetProjectid(lb.projectID)
  308. }
  309. // Associate a new IP address
  310. r, err := lb.Address.AssociateIpAddress(p)
  311. if err != nil {
  312. return fmt.Errorf("error associating new IP address: %v", err)
  313. }
  314. lb.ipAddr = r.Ipaddress
  315. lb.ipAddrID = r.Id
  316. return nil
  317. }
  318. // releasePublicIPAddress releases an associated IP.
  319. func (lb *loadBalancer) releaseLoadBalancerIP() error {
  320. p := lb.Address.NewDisassociateIpAddressParams(lb.ipAddrID)
  321. if _, err := lb.Address.DisassociateIpAddress(p); err != nil {
  322. return fmt.Errorf("error releasing load balancer IP %v: %v", lb.ipAddr, err)
  323. }
  324. return nil
  325. }
  326. // checkLoadBalancerRule checks if the rule already exists and if it does, if it can be updated. If
  327. // it does exist but cannot be updated, it will delete the existing rule so it can be created again.
  328. func (lb *loadBalancer) checkLoadBalancerRule(lbRuleName string, port v1.ServicePort) (bool, bool, error) {
  329. lbRule, ok := lb.rules[lbRuleName]
  330. if !ok {
  331. return false, false, nil
  332. }
  333. // Check if any of the values we cannot update (those that require a new load balancer rule) are changed.
  334. if lbRule.Publicip == lb.ipAddr && lbRule.Privateport == strconv.Itoa(int(port.NodePort)) && lbRule.Publicport == strconv.Itoa(int(port.Port)) {
  335. return true, lbRule.Algorithm != lb.algorithm, nil
  336. }
  337. // Delete the load balancer rule so we can create a new one using the new values.
  338. if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
  339. return false, false, err
  340. }
  341. return false, false, nil
  342. }
  343. // updateLoadBalancerRule updates a load balancer rule.
  344. func (lb *loadBalancer) updateLoadBalancerRule(lbRuleName string) error {
  345. lbRule := lb.rules[lbRuleName]
  346. p := lb.LoadBalancer.NewUpdateLoadBalancerRuleParams(lbRule.Id)
  347. p.SetAlgorithm(lb.algorithm)
  348. _, err := lb.LoadBalancer.UpdateLoadBalancerRule(p)
  349. return err
  350. }
  351. // createLoadBalancerRule creates a new load balancer rule and returns it's ID.
  352. func (lb *loadBalancer) createLoadBalancerRule(lbRuleName string, port v1.ServicePort) (*cloudstack.LoadBalancerRule, error) {
  353. p := lb.LoadBalancer.NewCreateLoadBalancerRuleParams(
  354. lb.algorithm,
  355. lbRuleName,
  356. int(port.NodePort),
  357. int(port.Port),
  358. )
  359. p.SetNetworkid(lb.networkID)
  360. p.SetPublicipid(lb.ipAddrID)
  361. switch port.Protocol {
  362. case v1.ProtocolTCP:
  363. p.SetProtocol("TCP")
  364. case v1.ProtocolUDP:
  365. p.SetProtocol("UDP")
  366. default:
  367. return nil, fmt.Errorf("unsupported load balancer protocol: %v", port.Protocol)
  368. }
  369. // Do not create corresponding firewall rule.
  370. p.SetOpenfirewall(false)
  371. // Create a new load balancer rule.
  372. r, err := lb.LoadBalancer.CreateLoadBalancerRule(p)
  373. if err != nil {
  374. return nil, fmt.Errorf("error creating load balancer rule %v: %v", lbRuleName, err)
  375. }
  376. lbRule := &cloudstack.LoadBalancerRule{
  377. Id: r.Id,
  378. Algorithm: r.Algorithm,
  379. Cidrlist: r.Cidrlist,
  380. Name: r.Name,
  381. Networkid: r.Networkid,
  382. Privateport: r.Privateport,
  383. Publicport: r.Publicport,
  384. Publicip: r.Publicip,
  385. Publicipid: r.Publicipid,
  386. }
  387. return lbRule, nil
  388. }
  389. // deleteLoadBalancerRule deletes a load balancer rule.
  390. func (lb *loadBalancer) deleteLoadBalancerRule(lbRule *cloudstack.LoadBalancerRule) error {
  391. p := lb.LoadBalancer.NewDeleteLoadBalancerRuleParams(lbRule.Id)
  392. if _, err := lb.LoadBalancer.DeleteLoadBalancerRule(p); err != nil {
  393. return fmt.Errorf("error deleting load balancer rule %v: %v", lbRule.Name, err)
  394. }
  395. // Delete the rule from the map as it no longer exists
  396. delete(lb.rules, lbRule.Name)
  397. return nil
  398. }
  399. // assignHostsToRule assigns hosts to a load balancer rule.
  400. func (lb *loadBalancer) assignHostsToRule(lbRule *cloudstack.LoadBalancerRule, hostIDs []string) error {
  401. p := lb.LoadBalancer.NewAssignToLoadBalancerRuleParams(lbRule.Id)
  402. p.SetVirtualmachineids(hostIDs)
  403. if _, err := lb.LoadBalancer.AssignToLoadBalancerRule(p); err != nil {
  404. return fmt.Errorf("error assigning hosts to load balancer rule %v: %v", lbRule.Name, err)
  405. }
  406. return nil
  407. }
  408. // removeHostsFromRule removes hosts from a load balancer rule.
  409. func (lb *loadBalancer) removeHostsFromRule(lbRule *cloudstack.LoadBalancerRule, hostIDs []string) error {
  410. p := lb.LoadBalancer.NewRemoveFromLoadBalancerRuleParams(lbRule.Id)
  411. p.SetVirtualmachineids(hostIDs)
  412. if _, err := lb.LoadBalancer.RemoveFromLoadBalancerRule(p); err != nil {
  413. return fmt.Errorf("error removing hosts from load balancer rule %v: %v", lbRule.Name, err)
  414. }
  415. return nil
  416. }
  417. // symmetricDifference returns the symmetric difference between the old (existing) and new (wanted) host ID's.
  418. func symmetricDifference(hostIDs []string, lbInstances []*cloudstack.VirtualMachine) ([]string, []string) {
  419. new := make(map[string]bool)
  420. for _, hostID := range hostIDs {
  421. new[hostID] = true
  422. }
  423. var remove []string
  424. for _, instance := range lbInstances {
  425. if new[instance.Id] {
  426. delete(new, instance.Id)
  427. continue
  428. }
  429. remove = append(remove, instance.Id)
  430. }
  431. var assign []string
  432. for hostID := range new {
  433. assign = append(assign, hostID)
  434. }
  435. return assign, remove
  436. }