openstack_loadbalancer.go 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package openstack
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "strings"
  20. "time"
  21. "github.com/gophercloud/gophercloud"
  22. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions"
  23. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/external"
  24. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
  25. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
  26. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
  27. v2monitors "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
  28. v2pools "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
  29. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups"
  30. "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
  31. "github.com/gophercloud/gophercloud/openstack/networking/v2/networks"
  32. neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
  33. "github.com/gophercloud/gophercloud/pagination"
  34. "k8s.io/klog"
  35. "k8s.io/api/core/v1"
  36. "k8s.io/apimachinery/pkg/types"
  37. "k8s.io/apimachinery/pkg/util/sets"
  38. "k8s.io/apimachinery/pkg/util/wait"
  39. cloudprovider "k8s.io/cloud-provider"
  40. servicehelpers "k8s.io/cloud-provider/service/helpers"
  41. )
  42. // Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
  43. // this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
  44. const (
  45. // loadbalancerActive* is configuration of exponential backoff for
  46. // going into ACTIVE loadbalancer provisioning status. Starting with 1
  47. // seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
  48. // it will time out after 128s, which roughly corresponds to 120s
  49. loadbalancerActiveInitDelay = 1 * time.Second
  50. loadbalancerActiveFactor = 1.2
  51. loadbalancerActiveSteps = 19
  52. // loadbalancerDelete* is configuration of exponential backoff for
  53. // waiting for delete operation to complete. Starting with 1
  54. // seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
  55. // it will time out after 32s, which roughly corresponds to 30s
  56. loadbalancerDeleteInitDelay = 1 * time.Second
  57. loadbalancerDeleteFactor = 1.2
  58. loadbalancerDeleteSteps = 13
  59. activeStatus = "ACTIVE"
  60. errorStatus = "ERROR"
  61. ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
  62. ServiceAnnotationLoadBalancerSubnetID = "loadbalancer.openstack.org/subnet-id"
  63. // ServiceAnnotationLoadBalancerInternal is the annotation used on the service
  64. // to indicate that we want an internal loadbalancer service.
  65. // If the value of ServiceAnnotationLoadBalancerInternal is false, it indicates that we want an external loadbalancer service. Default to false.
  66. ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
  67. )
  68. var _ cloudprovider.LoadBalancer = (*LbaasV2)(nil)
  69. // LbaasV2 is a LoadBalancer implementation for Neutron LBaaS v2 API
  70. type LbaasV2 struct {
  71. LoadBalancer
  72. }
  73. func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
  74. seen := make(map[string]bool)
  75. pager := extensions.List(client)
  76. err := pager.EachPage(func(page pagination.Page) (bool, error) {
  77. exts, err := extensions.ExtractExtensions(page)
  78. if err != nil {
  79. return false, err
  80. }
  81. for _, ext := range exts {
  82. seen[ext.Alias] = true
  83. }
  84. return true, nil
  85. })
  86. return seen, err
  87. }
  88. func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
  89. opts := floatingips.ListOpts{
  90. PortID: portID,
  91. }
  92. pager := floatingips.List(client, opts)
  93. floatingIPList := make([]floatingips.FloatingIP, 0, 1)
  94. err := pager.EachPage(func(page pagination.Page) (bool, error) {
  95. f, err := floatingips.ExtractFloatingIPs(page)
  96. if err != nil {
  97. return false, err
  98. }
  99. floatingIPList = append(floatingIPList, f...)
  100. if len(floatingIPList) > 1 {
  101. return false, ErrMultipleResults
  102. }
  103. return true, nil
  104. })
  105. if err != nil {
  106. if isNotFound(err) {
  107. return nil, ErrNotFound
  108. }
  109. return nil, err
  110. }
  111. if len(floatingIPList) == 0 {
  112. return nil, ErrNotFound
  113. } else if len(floatingIPList) > 1 {
  114. return nil, ErrMultipleResults
  115. }
  116. return &floatingIPList[0], nil
  117. }
  118. func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
  119. opts := loadbalancers.ListOpts{
  120. Name: name,
  121. }
  122. pager := loadbalancers.List(client, opts)
  123. loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
  124. err := pager.EachPage(func(page pagination.Page) (bool, error) {
  125. v, err := loadbalancers.ExtractLoadBalancers(page)
  126. if err != nil {
  127. return false, err
  128. }
  129. loadbalancerList = append(loadbalancerList, v...)
  130. if len(loadbalancerList) > 1 {
  131. return false, ErrMultipleResults
  132. }
  133. return true, nil
  134. })
  135. if err != nil {
  136. if isNotFound(err) {
  137. return nil, ErrNotFound
  138. }
  139. return nil, err
  140. }
  141. if len(loadbalancerList) == 0 {
  142. return nil, ErrNotFound
  143. } else if len(loadbalancerList) > 1 {
  144. return nil, ErrMultipleResults
  145. }
  146. return &loadbalancerList[0], nil
  147. }
  148. func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) {
  149. var existingListeners []listeners.Listener
  150. err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) {
  151. listenerList, err := listeners.ExtractListeners(page)
  152. if err != nil {
  153. return false, err
  154. }
  155. for _, l := range listenerList {
  156. for _, lb := range l.Loadbalancers {
  157. if lb.ID == id {
  158. existingListeners = append(existingListeners, l)
  159. break
  160. }
  161. }
  162. }
  163. return true, nil
  164. })
  165. if err != nil {
  166. return nil, err
  167. }
  168. return existingListeners, nil
  169. }
  170. // get listener for a port or nil if does not exist
  171. func getListenerForPort(existingListeners []listeners.Listener, port v1.ServicePort) *listeners.Listener {
  172. for _, l := range existingListeners {
  173. if listeners.Protocol(l.Protocol) == toListenersProtocol(port.Protocol) && l.ProtocolPort == int(port.Port) {
  174. return &l
  175. }
  176. }
  177. return nil
  178. }
  179. // Get pool for a listener. A listener always has exactly one pool.
  180. func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2pools.Pool, error) {
  181. listenerPools := make([]v2pools.Pool, 0, 1)
  182. err := v2pools.List(client, v2pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) {
  183. poolsList, err := v2pools.ExtractPools(page)
  184. if err != nil {
  185. return false, err
  186. }
  187. for _, p := range poolsList {
  188. for _, l := range p.Listeners {
  189. if l.ID == listenerID {
  190. listenerPools = append(listenerPools, p)
  191. }
  192. }
  193. }
  194. if len(listenerPools) > 1 {
  195. return false, ErrMultipleResults
  196. }
  197. return true, nil
  198. })
  199. if err != nil {
  200. if isNotFound(err) {
  201. return nil, ErrNotFound
  202. }
  203. return nil, err
  204. }
  205. if len(listenerPools) == 0 {
  206. return nil, ErrNotFound
  207. } else if len(listenerPools) > 1 {
  208. return nil, ErrMultipleResults
  209. }
  210. return &listenerPools[0], nil
  211. }
  212. func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools.Member, error) {
  213. var members []v2pools.Member
  214. err := v2pools.ListMembers(client, id, v2pools.ListMembersOpts{}).EachPage(func(page pagination.Page) (bool, error) {
  215. membersList, err := v2pools.ExtractMembers(page)
  216. if err != nil {
  217. return false, err
  218. }
  219. members = append(members, membersList...)
  220. return true, nil
  221. })
  222. if err != nil {
  223. return nil, err
  224. }
  225. return members, nil
  226. }
  227. // Check if a member exists for node
  228. func memberExists(members []v2pools.Member, addr string, port int) bool {
  229. for _, member := range members {
  230. if member.Address == addr && member.ProtocolPort == port {
  231. return true
  232. }
  233. }
  234. return false
  235. }
  236. func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
  237. for i, existingListener := range existingListeners {
  238. if existingListener.ID == id {
  239. existingListeners[i] = existingListeners[len(existingListeners)-1]
  240. existingListeners = existingListeners[:len(existingListeners)-1]
  241. break
  242. }
  243. }
  244. return existingListeners
  245. }
  246. func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member {
  247. for i, member := range members {
  248. if member.Address == addr && member.ProtocolPort == port {
  249. members[i] = members[len(members)-1]
  250. members = members[:len(members)-1]
  251. }
  252. }
  253. return members
  254. }
  255. func getSecurityGroupName(service *v1.Service) string {
  256. securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name)
  257. //OpenStack requires that the name of a security group is shorter than 255 bytes.
  258. if len(securityGroupName) > 255 {
  259. securityGroupName = securityGroupName[:255]
  260. }
  261. return securityGroupName
  262. }
  263. func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) {
  264. pager := rules.List(client, opts)
  265. var securityRules []rules.SecGroupRule
  266. err := pager.EachPage(func(page pagination.Page) (bool, error) {
  267. ruleList, err := rules.ExtractRules(page)
  268. if err != nil {
  269. return false, err
  270. }
  271. securityRules = append(securityRules, ruleList...)
  272. return true, nil
  273. })
  274. if err != nil {
  275. return nil, err
  276. }
  277. return securityRules, nil
  278. }
  279. func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
  280. backoff := wait.Backoff{
  281. Duration: loadbalancerActiveInitDelay,
  282. Factor: loadbalancerActiveFactor,
  283. Steps: loadbalancerActiveSteps,
  284. }
  285. var provisioningStatus string
  286. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  287. loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
  288. if err != nil {
  289. return false, err
  290. }
  291. provisioningStatus = loadbalancer.ProvisioningStatus
  292. if loadbalancer.ProvisioningStatus == activeStatus {
  293. return true, nil
  294. } else if loadbalancer.ProvisioningStatus == errorStatus {
  295. return true, fmt.Errorf("loadbalancer has gone into ERROR state")
  296. } else {
  297. return false, nil
  298. }
  299. })
  300. if err == wait.ErrWaitTimeout {
  301. err = fmt.Errorf("loadbalancer failed to go into ACTIVE provisioning status within alloted time")
  302. }
  303. return provisioningStatus, err
  304. }
  305. func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
  306. backoff := wait.Backoff{
  307. Duration: loadbalancerDeleteInitDelay,
  308. Factor: loadbalancerDeleteFactor,
  309. Steps: loadbalancerDeleteSteps,
  310. }
  311. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  312. _, err := loadbalancers.Get(client, loadbalancerID).Extract()
  313. if err != nil {
  314. if isNotFound(err) {
  315. return true, nil
  316. }
  317. return false, err
  318. }
  319. return false, nil
  320. })
  321. if err == wait.ErrWaitTimeout {
  322. err = fmt.Errorf("loadbalancer failed to delete within the alloted time")
  323. }
  324. return err
  325. }
  326. func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
  327. switch protocol {
  328. case v1.ProtocolTCP:
  329. return rules.ProtocolTCP
  330. case v1.ProtocolUDP:
  331. return rules.ProtocolUDP
  332. default:
  333. return rules.RuleProtocol(strings.ToLower(string(protocol)))
  334. }
  335. }
  336. func toListenersProtocol(protocol v1.Protocol) listeners.Protocol {
  337. switch protocol {
  338. case v1.ProtocolTCP:
  339. return listeners.ProtocolTCP
  340. default:
  341. return listeners.Protocol(string(protocol))
  342. }
  343. }
  344. func createNodeSecurityGroup(client *gophercloud.ServiceClient, nodeSecurityGroupID string, port int, protocol v1.Protocol, lbSecGroup string) error {
  345. v4NodeSecGroupRuleCreateOpts := rules.CreateOpts{
  346. Direction: rules.DirIngress,
  347. PortRangeMax: port,
  348. PortRangeMin: port,
  349. Protocol: toRuleProtocol(protocol),
  350. RemoteGroupID: lbSecGroup,
  351. SecGroupID: nodeSecurityGroupID,
  352. EtherType: rules.EtherType4,
  353. }
  354. v6NodeSecGroupRuleCreateOpts := rules.CreateOpts{
  355. Direction: rules.DirIngress,
  356. PortRangeMax: port,
  357. PortRangeMin: port,
  358. Protocol: toRuleProtocol(protocol),
  359. RemoteGroupID: lbSecGroup,
  360. SecGroupID: nodeSecurityGroupID,
  361. EtherType: rules.EtherType6,
  362. }
  363. _, err := rules.Create(client, v4NodeSecGroupRuleCreateOpts).Extract()
  364. if err != nil {
  365. return err
  366. }
  367. _, err = rules.Create(client, v6NodeSecGroupRuleCreateOpts).Extract()
  368. if err != nil {
  369. return err
  370. }
  371. return nil
  372. }
  373. func (lbaas *LbaasV2) createLoadBalancer(service *v1.Service, name string, internalAnnotation bool) (*loadbalancers.LoadBalancer, error) {
  374. createOpts := loadbalancers.CreateOpts{
  375. Name: name,
  376. Description: fmt.Sprintf("Kubernetes external service %s", name),
  377. VipSubnetID: lbaas.opts.SubnetID,
  378. Provider: lbaas.opts.LBProvider,
  379. }
  380. loadBalancerIP := service.Spec.LoadBalancerIP
  381. if loadBalancerIP != "" && internalAnnotation {
  382. createOpts.VipAddress = loadBalancerIP
  383. }
  384. loadbalancer, err := loadbalancers.Create(lbaas.lb, createOpts).Extract()
  385. if err != nil {
  386. return nil, fmt.Errorf("error creating loadbalancer %v: %v", createOpts, err)
  387. }
  388. return loadbalancer, nil
  389. }
  390. // GetLoadBalancer returns whether the specified load balancer exists and its status
  391. func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
  392. loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
  393. loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
  394. if err == ErrNotFound {
  395. return nil, false, nil
  396. }
  397. if loadbalancer == nil {
  398. return nil, false, err
  399. }
  400. status := &v1.LoadBalancerStatus{}
  401. portID := loadbalancer.VipPortID
  402. if portID != "" {
  403. floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
  404. if err != nil && err != ErrNotFound {
  405. return nil, false, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
  406. }
  407. if floatIP != nil {
  408. status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
  409. }
  410. } else {
  411. status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
  412. }
  413. return status, true, err
  414. }
  415. // GetLoadBalancerName is an implementation of LoadBalancer.GetLoadBalancerName.
  416. func (lbaas *LbaasV2) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
  417. // TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
  418. return cloudprovider.DefaultLoadBalancerName(service)
  419. }
  420. // The LB needs to be configured with instance addresses on the same
  421. // subnet as the LB (aka opts.SubnetID). Currently we're just
  422. // guessing that the node's InternalIP is the right address.
  423. // In case no InternalIP can be found, ExternalIP is tried.
  424. // If neither InternalIP nor ExternalIP can be found an error is
  425. // returned.
  426. func nodeAddressForLB(node *v1.Node) (string, error) {
  427. addrs := node.Status.Addresses
  428. if len(addrs) == 0 {
  429. return "", ErrNoAddressFound
  430. }
  431. allowedAddrTypes := []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP}
  432. for _, allowedAddrType := range allowedAddrTypes {
  433. for _, addr := range addrs {
  434. if addr.Type == allowedAddrType {
  435. return addr.Address, nil
  436. }
  437. }
  438. }
  439. return "", ErrNoAddressFound
  440. }
  441. //getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
  442. func getStringFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting string) string {
  443. klog.V(4).Infof("getStringFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
  444. if annotationValue, ok := service.Annotations[annotationKey]; ok {
  445. //if there is an annotation for this setting, set the "setting" var to it
  446. // annotationValue can be empty, it is working as designed
  447. // it makes possible for instance provisioning loadbalancer without floatingip
  448. klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
  449. return annotationValue
  450. }
  451. //if there is no annotation, set "settings" var to the value from cloud config
  452. klog.V(4).Infof("Could not find a Service Annotation; falling back on cloud-config setting: %v = %v", annotationKey, defaultSetting)
  453. return defaultSetting
  454. }
  455. // getSubnetIDForLB returns subnet-id for a specific node
  456. func getSubnetIDForLB(compute *gophercloud.ServiceClient, node v1.Node) (string, error) {
  457. ipAddress, err := nodeAddressForLB(&node)
  458. if err != nil {
  459. return "", err
  460. }
  461. instanceID := node.Spec.ProviderID
  462. if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
  463. instanceID = instanceID[(ind + 1):]
  464. }
  465. interfaces, err := getAttachedInterfacesByID(compute, instanceID)
  466. if err != nil {
  467. return "", err
  468. }
  469. for _, intf := range interfaces {
  470. for _, fixedIP := range intf.FixedIPs {
  471. if fixedIP.IPAddress == ipAddress {
  472. return fixedIP.SubnetID, nil
  473. }
  474. }
  475. }
  476. return "", ErrNotFound
  477. }
  478. // getNodeSecurityGroupIDForLB lists node-security-groups for specific nodes
  479. func getNodeSecurityGroupIDForLB(compute *gophercloud.ServiceClient, network *gophercloud.ServiceClient, nodes []*v1.Node) ([]string, error) {
  480. secGroupNames := sets.NewString()
  481. for _, node := range nodes {
  482. nodeName := types.NodeName(node.Name)
  483. srv, err := getServerByName(compute, nodeName)
  484. if err != nil {
  485. return []string{}, err
  486. }
  487. // use the first node-security-groups
  488. // case 0: node1:SG1 node2:SG1 return SG1
  489. // case 1: node1:SG1 node2:SG2 return SG1,SG2
  490. // case 2: node1:SG1,SG2 node2:SG3,SG4 return SG1,SG3
  491. // case 3: node1:SG1,SG2 node2:SG2,SG3 return SG1,SG2
  492. secGroupNames.Insert(srv.SecurityGroups[0]["name"].(string))
  493. }
  494. secGroupIDs := make([]string, secGroupNames.Len())
  495. for i, name := range secGroupNames.List() {
  496. secGroupID, err := groups.IDFromName(network, name)
  497. if err != nil {
  498. return []string{}, err
  499. }
  500. secGroupIDs[i] = secGroupID
  501. }
  502. return secGroupIDs, nil
  503. }
  504. // isSecurityGroupNotFound return true while 'err' is object of gophercloud.ErrResourceNotFound
  505. func isSecurityGroupNotFound(err error) bool {
  506. errType := reflect.TypeOf(err).String()
  507. errTypeSlice := strings.Split(errType, ".")
  508. errTypeValue := ""
  509. if len(errTypeSlice) != 0 {
  510. errTypeValue = errTypeSlice[len(errTypeSlice)-1]
  511. }
  512. if errTypeValue == "ErrResourceNotFound" {
  513. return true
  514. }
  515. return false
  516. }
  517. // getFloatingNetworkIDForLB returns a floating-network-id for cluster.
  518. func getFloatingNetworkIDForLB(client *gophercloud.ServiceClient) (string, error) {
  519. var floatingNetworkIds []string
  520. type NetworkWithExternalExt struct {
  521. networks.Network
  522. external.NetworkExternalExt
  523. }
  524. err := networks.List(client, networks.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
  525. var externalNetwork []NetworkWithExternalExt
  526. err := networks.ExtractNetworksInto(page, &externalNetwork)
  527. if err != nil {
  528. return false, err
  529. }
  530. for _, externalNet := range externalNetwork {
  531. if externalNet.External {
  532. floatingNetworkIds = append(floatingNetworkIds, externalNet.ID)
  533. }
  534. }
  535. if len(floatingNetworkIds) > 1 {
  536. return false, ErrMultipleResults
  537. }
  538. return true, nil
  539. })
  540. if err != nil {
  541. if isNotFound(err) {
  542. return "", ErrNotFound
  543. }
  544. if err == ErrMultipleResults {
  545. klog.V(4).Infof("find multiple external networks, pick the first one when there are no explicit configuration.")
  546. return floatingNetworkIds[0], nil
  547. }
  548. return "", err
  549. }
  550. if len(floatingNetworkIds) == 0 {
  551. return "", ErrNotFound
  552. }
  553. return floatingNetworkIds[0], nil
  554. }
  555. // TODO: This code currently ignores 'region' and always creates a
  556. // loadbalancer in only the current OpenStack region. We should take
  557. // a list of regions (from config) and query/create loadbalancers in
  558. // each region.
  559. // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
  560. func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
  561. klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
  562. if len(nodes) == 0 {
  563. return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s/%s", apiService.Namespace, apiService.Name)
  564. }
  565. lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
  566. if len(lbaas.opts.SubnetID) == 0 {
  567. // Get SubnetID automatically.
  568. // The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
  569. subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
  570. if err != nil {
  571. klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  572. return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
  573. "and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
  574. }
  575. lbaas.opts.SubnetID = subnetID
  576. }
  577. ports := apiService.Spec.Ports
  578. if len(ports) == 0 {
  579. return nil, fmt.Errorf("no ports provided to openstack load balancer")
  580. }
  581. floatingPool := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
  582. if len(floatingPool) == 0 {
  583. var err error
  584. floatingPool, err = getFloatingNetworkIDForLB(lbaas.network)
  585. if err != nil {
  586. klog.Warningf("Failed to find floating-network-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  587. }
  588. }
  589. var internalAnnotation bool
  590. internal := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, "false")
  591. switch internal {
  592. case "true":
  593. klog.V(4).Infof("Ensure an internal loadbalancer service.")
  594. internalAnnotation = true
  595. case "false":
  596. if len(floatingPool) != 0 {
  597. klog.V(4).Infof("Ensure an external loadbalancer service, using floatingPool: %v", floatingPool)
  598. internalAnnotation = false
  599. } else {
  600. return nil, fmt.Errorf("floating-network-id or loadbalancer.openstack.org/floating-network-id should be specified when ensuring an external loadbalancer service")
  601. }
  602. default:
  603. return nil, fmt.Errorf("unknown service.beta.kubernetes.io/openstack-internal-load-balancer annotation: %v, specify \"true\" or \"false\" ",
  604. internal)
  605. }
  606. // Check for TCP protocol on each port
  607. // TODO: Convert all error messages to use an event recorder
  608. for _, port := range ports {
  609. if port.Protocol != v1.ProtocolTCP {
  610. return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
  611. }
  612. }
  613. sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
  614. if err != nil {
  615. return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  616. }
  617. if !servicehelpers.IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
  618. return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
  619. }
  620. affinity := apiService.Spec.SessionAffinity
  621. var persistence *v2pools.SessionPersistence
  622. switch affinity {
  623. case v1.ServiceAffinityNone:
  624. persistence = nil
  625. case v1.ServiceAffinityClientIP:
  626. persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
  627. default:
  628. return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
  629. }
  630. name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
  631. loadbalancer, err := getLoadbalancerByName(lbaas.lb, name)
  632. if err != nil {
  633. if err != ErrNotFound {
  634. return nil, fmt.Errorf("error getting loadbalancer %s: %v", name, err)
  635. }
  636. klog.V(2).Infof("Creating loadbalancer %s", name)
  637. loadbalancer, err = lbaas.createLoadBalancer(apiService, name, internalAnnotation)
  638. if err != nil {
  639. // Unknown error, retry later
  640. return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
  641. }
  642. } else {
  643. klog.V(2).Infof("LoadBalancer %s already exists", name)
  644. }
  645. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  646. if err != nil {
  647. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  648. }
  649. lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
  650. if lbmethod == "" {
  651. lbmethod = v2pools.LBMethodRoundRobin
  652. }
  653. oldListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
  654. if err != nil {
  655. return nil, fmt.Errorf("error getting LB %s listeners: %v", name, err)
  656. }
  657. for portIndex, port := range ports {
  658. listener := getListenerForPort(oldListeners, port)
  659. if listener == nil {
  660. klog.V(4).Infof("Creating listener for port %d", int(port.Port))
  661. listener, err = listeners.Create(lbaas.lb, listeners.CreateOpts{
  662. Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
  663. Protocol: listeners.Protocol(port.Protocol),
  664. ProtocolPort: int(port.Port),
  665. LoadbalancerID: loadbalancer.ID,
  666. }).Extract()
  667. if err != nil {
  668. // Unknown error, retry later
  669. return nil, fmt.Errorf("error creating LB listener: %v", err)
  670. }
  671. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  672. if err != nil {
  673. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  674. }
  675. }
  676. klog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID)
  677. // After all ports have been processed, remaining listeners are removed as obsolete.
  678. // Pop valid listeners.
  679. oldListeners = popListener(oldListeners, listener.ID)
  680. pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
  681. if err != nil && err != ErrNotFound {
  682. // Unknown error, retry later
  683. return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
  684. }
  685. if pool == nil {
  686. klog.V(4).Infof("Creating pool for listener %s", listener.ID)
  687. pool, err = v2pools.Create(lbaas.lb, v2pools.CreateOpts{
  688. Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
  689. Protocol: v2pools.Protocol(port.Protocol),
  690. LBMethod: lbmethod,
  691. ListenerID: listener.ID,
  692. Persistence: persistence,
  693. }).Extract()
  694. if err != nil {
  695. // Unknown error, retry later
  696. return nil, fmt.Errorf("error creating pool for listener %s: %v", listener.ID, err)
  697. }
  698. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  699. if err != nil {
  700. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  701. }
  702. }
  703. klog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID)
  704. members, err := getMembersByPoolID(lbaas.lb, pool.ID)
  705. if err != nil && !isNotFound(err) {
  706. return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
  707. }
  708. for _, node := range nodes {
  709. addr, err := nodeAddressForLB(node)
  710. if err != nil {
  711. if err == ErrNotFound {
  712. // Node failure, do not create member
  713. klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
  714. continue
  715. } else {
  716. return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
  717. }
  718. }
  719. if !memberExists(members, addr, int(port.NodePort)) {
  720. klog.V(4).Infof("Creating member for pool %s", pool.ID)
  721. _, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
  722. Name: fmt.Sprintf("member_%s_%d_%s", name, portIndex, node.Name),
  723. ProtocolPort: int(port.NodePort),
  724. Address: addr,
  725. SubnetID: lbaas.opts.SubnetID,
  726. }).Extract()
  727. if err != nil {
  728. return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
  729. }
  730. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  731. if err != nil {
  732. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  733. }
  734. } else {
  735. // After all members have been processed, remaining members are deleted as obsolete.
  736. members = popMember(members, addr, int(port.NodePort))
  737. }
  738. klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
  739. }
  740. // Delete obsolete members for this pool
  741. for _, member := range members {
  742. klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
  743. err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
  744. if err != nil && !isNotFound(err) {
  745. return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
  746. }
  747. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  748. if err != nil {
  749. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  750. }
  751. }
  752. monitorID := pool.MonitorID
  753. if monitorID == "" && lbaas.opts.CreateMonitor {
  754. klog.V(4).Infof("Creating monitor for pool %s", pool.ID)
  755. monitor, err := v2monitors.Create(lbaas.lb, v2monitors.CreateOpts{
  756. Name: fmt.Sprintf("monitor_%s_%d", name, portIndex),
  757. PoolID: pool.ID,
  758. Type: string(port.Protocol),
  759. Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
  760. Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
  761. MaxRetries: int(lbaas.opts.MonitorMaxRetries),
  762. }).Extract()
  763. if err != nil {
  764. return nil, fmt.Errorf("error creating LB pool healthmonitor: %v", err)
  765. }
  766. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  767. if err != nil {
  768. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  769. }
  770. monitorID = monitor.ID
  771. } else if lbaas.opts.CreateMonitor == false {
  772. klog.V(4).Infof("Do not create monitor for pool %s when create-monitor is false", pool.ID)
  773. }
  774. if monitorID != "" {
  775. klog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID)
  776. }
  777. }
  778. // All remaining listeners are obsolete, delete
  779. for _, listener := range oldListeners {
  780. klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
  781. // get pool for listener
  782. pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
  783. if err != nil && err != ErrNotFound {
  784. return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
  785. }
  786. if pool != nil {
  787. // get and delete monitor
  788. monitorID := pool.MonitorID
  789. if monitorID != "" {
  790. klog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
  791. err = v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
  792. if err != nil && !isNotFound(err) {
  793. return nil, fmt.Errorf("error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err)
  794. }
  795. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  796. if err != nil {
  797. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  798. }
  799. }
  800. // get and delete pool members
  801. members, err := getMembersByPoolID(lbaas.lb, pool.ID)
  802. if err != nil && !isNotFound(err) {
  803. return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
  804. }
  805. if members != nil {
  806. for _, member := range members {
  807. klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
  808. err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
  809. if err != nil && !isNotFound(err) {
  810. return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
  811. }
  812. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  813. if err != nil {
  814. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  815. }
  816. }
  817. }
  818. klog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
  819. // delete pool
  820. err = v2pools.Delete(lbaas.lb, pool.ID).ExtractErr()
  821. if err != nil && !isNotFound(err) {
  822. return nil, fmt.Errorf("error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err)
  823. }
  824. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  825. if err != nil {
  826. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  827. }
  828. }
  829. // delete listener
  830. err = listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
  831. if err != nil && !isNotFound(err) {
  832. return nil, fmt.Errorf("error deleteting obsolete listener: %v", err)
  833. }
  834. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  835. if err != nil {
  836. return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  837. }
  838. klog.V(2).Infof("Deleted obsolete listener: %s", listener.ID)
  839. }
  840. portID := loadbalancer.VipPortID
  841. floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
  842. if err != nil && err != ErrNotFound {
  843. return nil, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
  844. }
  845. if floatIP == nil && floatingPool != "" && !internalAnnotation {
  846. klog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
  847. floatIPOpts := floatingips.CreateOpts{
  848. FloatingNetworkID: floatingPool,
  849. PortID: portID,
  850. }
  851. loadBalancerIP := apiService.Spec.LoadBalancerIP
  852. if loadBalancerIP != "" {
  853. floatIPOpts.FloatingIP = loadBalancerIP
  854. }
  855. floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
  856. if err != nil {
  857. return nil, fmt.Errorf("error creating LB floatingip %+v: %v", floatIPOpts, err)
  858. }
  859. }
  860. status := &v1.LoadBalancerStatus{}
  861. if floatIP != nil {
  862. status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
  863. } else {
  864. status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
  865. }
  866. if lbaas.opts.ManageSecurityGroups {
  867. err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
  868. if err != nil {
  869. // cleanup what was created so far
  870. _ = lbaas.EnsureLoadBalancerDeleted(ctx, clusterName, apiService)
  871. return status, err
  872. }
  873. }
  874. return status, nil
  875. }
  876. // ensureSecurityGroup ensures security group exist for specific loadbalancer service.
  877. // Creating security group for specific loadbalancer service when it does not exist.
  878. func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
  879. // find node-security-group for service
  880. var err error
  881. if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
  882. lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
  883. if err != nil {
  884. return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  885. }
  886. }
  887. klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
  888. // get service ports
  889. ports := apiService.Spec.Ports
  890. if len(ports) == 0 {
  891. return fmt.Errorf("no ports provided to openstack load balancer")
  892. }
  893. // get service source ranges
  894. sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
  895. if err != nil {
  896. return fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  897. }
  898. // ensure security group for LB
  899. lbSecGroupName := getSecurityGroupName(apiService)
  900. lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
  901. if err != nil {
  902. // If the security group of LB not exist, create it later
  903. if isSecurityGroupNotFound(err) {
  904. lbSecGroupID = ""
  905. } else {
  906. return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
  907. }
  908. }
  909. if len(lbSecGroupID) == 0 {
  910. // create security group
  911. lbSecGroupCreateOpts := groups.CreateOpts{
  912. Name: getSecurityGroupName(apiService),
  913. Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName),
  914. }
  915. lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract()
  916. if err != nil {
  917. return fmt.Errorf("failed to create Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  918. }
  919. lbSecGroupID = lbSecGroup.ID
  920. //add rule in security group
  921. for _, port := range ports {
  922. for _, sourceRange := range sourceRanges.StringSlice() {
  923. ethertype := rules.EtherType4
  924. network, _, err := net.ParseCIDR(sourceRange)
  925. if err != nil {
  926. return fmt.Errorf("error parsing source range %s as a CIDR: %v", sourceRange, err)
  927. }
  928. if network.To4() == nil {
  929. ethertype = rules.EtherType6
  930. }
  931. lbSecGroupRuleCreateOpts := rules.CreateOpts{
  932. Direction: rules.DirIngress,
  933. PortRangeMax: int(port.Port),
  934. PortRangeMin: int(port.Port),
  935. Protocol: toRuleProtocol(port.Protocol),
  936. RemoteIPPrefix: sourceRange,
  937. SecGroupID: lbSecGroup.ID,
  938. EtherType: ethertype,
  939. }
  940. _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
  941. if err != nil {
  942. return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
  943. }
  944. }
  945. }
  946. lbSecGroupRuleCreateOpts := rules.CreateOpts{
  947. Direction: rules.DirIngress,
  948. PortRangeMax: 4, // ICMP: Code - Values for ICMP "Destination Unreachable: Fragmentation Needed and Don't Fragment was Set"
  949. PortRangeMin: 3, // ICMP: Type
  950. Protocol: rules.ProtocolICMP,
  951. RemoteIPPrefix: "0.0.0.0/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
  952. SecGroupID: lbSecGroup.ID,
  953. EtherType: rules.EtherType4,
  954. }
  955. _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
  956. if err != nil {
  957. return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
  958. }
  959. lbSecGroupRuleCreateOpts = rules.CreateOpts{
  960. Direction: rules.DirIngress,
  961. PortRangeMax: 0, // ICMP: Code - Values for ICMP "Packet Too Big"
  962. PortRangeMin: 2, // ICMP: Type
  963. Protocol: rules.ProtocolICMP,
  964. RemoteIPPrefix: "::/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
  965. SecGroupID: lbSecGroup.ID,
  966. EtherType: rules.EtherType6,
  967. }
  968. _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
  969. if err != nil {
  970. return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
  971. }
  972. // get security groups of port
  973. portID := loadbalancer.VipPortID
  974. port, err := getPortByID(lbaas.network, portID)
  975. if err != nil {
  976. return err
  977. }
  978. // ensure the vip port has the security groups
  979. found := false
  980. for _, portSecurityGroups := range port.SecurityGroups {
  981. if portSecurityGroups == lbSecGroup.ID {
  982. found = true
  983. break
  984. }
  985. }
  986. // update loadbalancer vip port
  987. if !found {
  988. port.SecurityGroups = append(port.SecurityGroups, lbSecGroup.ID)
  989. updateOpts := neutronports.UpdateOpts{SecurityGroups: &port.SecurityGroups}
  990. res := neutronports.Update(lbaas.network, portID, updateOpts)
  991. if res.Err != nil {
  992. msg := fmt.Sprintf("Error occurred updating port %s for loadbalancer service %s/%s: %v", portID, apiService.Namespace, apiService.Name, res.Err)
  993. return fmt.Errorf(msg)
  994. }
  995. }
  996. }
  997. // ensure rules for every node security group
  998. for _, port := range ports {
  999. for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
  1000. opts := rules.ListOpts{
  1001. Direction: string(rules.DirIngress),
  1002. SecGroupID: nodeSecurityGroupID,
  1003. RemoteGroupID: lbSecGroupID,
  1004. PortRangeMax: int(port.NodePort),
  1005. PortRangeMin: int(port.NodePort),
  1006. Protocol: string(port.Protocol),
  1007. }
  1008. secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
  1009. if err != nil && !isNotFound(err) {
  1010. msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
  1011. return fmt.Errorf(msg)
  1012. }
  1013. if len(secGroupRules) != 0 {
  1014. // Do not add rule when find rules for remote group in the Node Security Group
  1015. continue
  1016. }
  1017. // Add the rules in the Node Security Group
  1018. err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
  1019. if err != nil {
  1020. return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  1021. }
  1022. }
  1023. }
  1024. return nil
  1025. }
  1026. // UpdateLoadBalancer updates hosts under the specified load balancer.
  1027. func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
  1028. loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
  1029. klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
  1030. lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
  1031. if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
  1032. // Get SubnetID automatically.
  1033. // The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
  1034. subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
  1035. if err != nil {
  1036. klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
  1037. return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
  1038. "and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
  1039. }
  1040. lbaas.opts.SubnetID = subnetID
  1041. }
  1042. ports := service.Spec.Ports
  1043. if len(ports) == 0 {
  1044. return fmt.Errorf("no ports provided to openstack load balancer")
  1045. }
  1046. loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
  1047. if err != nil {
  1048. return err
  1049. }
  1050. if loadbalancer == nil {
  1051. return fmt.Errorf("loadbalancer %s does not exist", loadBalancerName)
  1052. }
  1053. // Get all listeners for this loadbalancer, by "port key".
  1054. type portKey struct {
  1055. Protocol listeners.Protocol
  1056. Port int
  1057. }
  1058. var listenerIDs []string
  1059. lbListeners := make(map[portKey]listeners.Listener)
  1060. allListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
  1061. if err != nil {
  1062. return fmt.Errorf("error getting listeners for LB %s: %v", loadBalancerName, err)
  1063. }
  1064. for _, l := range allListeners {
  1065. key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
  1066. lbListeners[key] = l
  1067. listenerIDs = append(listenerIDs, l.ID)
  1068. }
  1069. // Get all pools for this loadbalancer, by listener ID.
  1070. lbPools := make(map[string]v2pools.Pool)
  1071. for _, listenerID := range listenerIDs {
  1072. pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listenerID)
  1073. if err != nil {
  1074. return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
  1075. }
  1076. lbPools[listenerID] = *pool
  1077. }
  1078. // Compose Set of member (addresses) that _should_ exist
  1079. addrs := make(map[string]*v1.Node)
  1080. for _, node := range nodes {
  1081. addr, err := nodeAddressForLB(node)
  1082. if err != nil {
  1083. return err
  1084. }
  1085. addrs[addr] = node
  1086. }
  1087. // Check for adding/removing members associated with each port
  1088. for portIndex, port := range ports {
  1089. // Get listener associated with this port
  1090. listener, ok := lbListeners[portKey{
  1091. Protocol: toListenersProtocol(port.Protocol),
  1092. Port: int(port.Port),
  1093. }]
  1094. if !ok {
  1095. return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
  1096. }
  1097. // Get pool associated with this listener
  1098. pool, ok := lbPools[listener.ID]
  1099. if !ok {
  1100. return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
  1101. }
  1102. // Find existing pool members (by address) for this port
  1103. getMembers, err := getMembersByPoolID(lbaas.lb, pool.ID)
  1104. if err != nil {
  1105. return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
  1106. }
  1107. members := make(map[string]v2pools.Member)
  1108. for _, member := range getMembers {
  1109. members[member.Address] = member
  1110. }
  1111. // Add any new members for this port
  1112. for addr, node := range addrs {
  1113. if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
  1114. // Already exists, do not create member
  1115. continue
  1116. }
  1117. _, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
  1118. Name: fmt.Sprintf("member_%s_%d_%s", loadbalancer.Name, portIndex, node.Name),
  1119. Address: addr,
  1120. ProtocolPort: int(port.NodePort),
  1121. SubnetID: lbaas.opts.SubnetID,
  1122. }).Extract()
  1123. if err != nil {
  1124. return err
  1125. }
  1126. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1127. if err != nil {
  1128. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1129. }
  1130. }
  1131. // Remove any old members for this port
  1132. for _, member := range members {
  1133. if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
  1134. // Still present, do not delete member
  1135. continue
  1136. }
  1137. err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
  1138. if err != nil && !isNotFound(err) {
  1139. return err
  1140. }
  1141. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1142. if err != nil {
  1143. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1144. }
  1145. }
  1146. }
  1147. if lbaas.opts.ManageSecurityGroups {
  1148. err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
  1149. if err != nil {
  1150. return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
  1151. }
  1152. }
  1153. return nil
  1154. }
  1155. // updateSecurityGroup updating security group for specific loadbalancer service.
  1156. func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
  1157. originalNodeSecurityGroupIDs := lbaas.opts.NodeSecurityGroupIDs
  1158. var err error
  1159. lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
  1160. if err != nil {
  1161. return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  1162. }
  1163. klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
  1164. original := sets.NewString(originalNodeSecurityGroupIDs...)
  1165. current := sets.NewString(lbaas.opts.NodeSecurityGroupIDs...)
  1166. removals := original.Difference(current)
  1167. // Generate Name
  1168. lbSecGroupName := getSecurityGroupName(apiService)
  1169. lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
  1170. if err != nil {
  1171. return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
  1172. }
  1173. ports := apiService.Spec.Ports
  1174. if len(ports) == 0 {
  1175. return fmt.Errorf("no ports provided to openstack load balancer")
  1176. }
  1177. for _, port := range ports {
  1178. for removal := range removals {
  1179. // Delete the rules in the Node Security Group
  1180. opts := rules.ListOpts{
  1181. Direction: string(rules.DirIngress),
  1182. SecGroupID: removal,
  1183. RemoteGroupID: lbSecGroupID,
  1184. PortRangeMax: int(port.NodePort),
  1185. PortRangeMin: int(port.NodePort),
  1186. Protocol: string(port.Protocol),
  1187. }
  1188. secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
  1189. if err != nil && !isNotFound(err) {
  1190. return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, removal, err)
  1191. }
  1192. for _, rule := range secGroupRules {
  1193. res := rules.Delete(lbaas.network, rule.ID)
  1194. if res.Err != nil && !isNotFound(res.Err) {
  1195. return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
  1196. }
  1197. }
  1198. }
  1199. for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
  1200. opts := rules.ListOpts{
  1201. Direction: string(rules.DirIngress),
  1202. SecGroupID: nodeSecurityGroupID,
  1203. RemoteGroupID: lbSecGroupID,
  1204. PortRangeMax: int(port.NodePort),
  1205. PortRangeMin: int(port.NodePort),
  1206. Protocol: string(port.Protocol),
  1207. }
  1208. secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
  1209. if err != nil && !isNotFound(err) {
  1210. return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
  1211. }
  1212. if len(secGroupRules) != 0 {
  1213. // Do not add rule when find rules for remote group in the Node Security Group
  1214. continue
  1215. }
  1216. // Add the rules in the Node Security Group
  1217. err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
  1218. if err != nil {
  1219. return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
  1220. }
  1221. }
  1222. }
  1223. return nil
  1224. }
  1225. // EnsureLoadBalancerDeleted deletes the specified load balancer
  1226. func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
  1227. loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
  1228. klog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
  1229. loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
  1230. if err != nil && err != ErrNotFound {
  1231. return err
  1232. }
  1233. if loadbalancer == nil {
  1234. return nil
  1235. }
  1236. if loadbalancer.VipPortID != "" {
  1237. portID := loadbalancer.VipPortID
  1238. floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
  1239. if err != nil && err != ErrNotFound {
  1240. return err
  1241. }
  1242. if floatingIP != nil {
  1243. err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
  1244. if err != nil && !isNotFound(err) {
  1245. return err
  1246. }
  1247. }
  1248. }
  1249. // get all listeners associated with this loadbalancer
  1250. listenerList, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
  1251. if err != nil {
  1252. return fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.ID, err)
  1253. }
  1254. // get all pools (and health monitors) associated with this loadbalancer
  1255. var poolIDs []string
  1256. var monitorIDs []string
  1257. for _, listener := range listenerList {
  1258. pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
  1259. if err != nil && err != ErrNotFound {
  1260. return fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
  1261. }
  1262. if pool != nil {
  1263. poolIDs = append(poolIDs, pool.ID)
  1264. // If create-monitor of cloud-config is false, pool has not monitor.
  1265. if pool.MonitorID != "" {
  1266. monitorIDs = append(monitorIDs, pool.MonitorID)
  1267. }
  1268. }
  1269. }
  1270. // delete all monitors
  1271. for _, monitorID := range monitorIDs {
  1272. err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
  1273. if err != nil && !isNotFound(err) {
  1274. return err
  1275. }
  1276. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1277. if err != nil {
  1278. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1279. }
  1280. }
  1281. // delete all members and pools
  1282. for _, poolID := range poolIDs {
  1283. // get members for current pool
  1284. membersList, err := getMembersByPoolID(lbaas.lb, poolID)
  1285. if err != nil && !isNotFound(err) {
  1286. return fmt.Errorf("error getting pool members %s: %v", poolID, err)
  1287. }
  1288. // delete all members for this pool
  1289. for _, member := range membersList {
  1290. err := v2pools.DeleteMember(lbaas.lb, poolID, member.ID).ExtractErr()
  1291. if err != nil && !isNotFound(err) {
  1292. return err
  1293. }
  1294. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1295. if err != nil {
  1296. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1297. }
  1298. }
  1299. // delete pool
  1300. err = v2pools.Delete(lbaas.lb, poolID).ExtractErr()
  1301. if err != nil && !isNotFound(err) {
  1302. return err
  1303. }
  1304. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1305. if err != nil {
  1306. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1307. }
  1308. }
  1309. // delete all listeners
  1310. for _, listener := range listenerList {
  1311. err := listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
  1312. if err != nil && !isNotFound(err) {
  1313. return err
  1314. }
  1315. provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
  1316. if err != nil {
  1317. return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
  1318. }
  1319. }
  1320. // delete loadbalancer
  1321. err = loadbalancers.Delete(lbaas.lb, loadbalancer.ID).ExtractErr()
  1322. if err != nil && !isNotFound(err) {
  1323. return err
  1324. }
  1325. err = waitLoadbalancerDeleted(lbaas.lb, loadbalancer.ID)
  1326. if err != nil {
  1327. return fmt.Errorf("failed to delete loadbalancer: %v", err)
  1328. }
  1329. // Delete the Security Group
  1330. if lbaas.opts.ManageSecurityGroups {
  1331. err := lbaas.EnsureSecurityGroupDeleted(clusterName, service)
  1332. if err != nil {
  1333. return fmt.Errorf("Failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
  1334. }
  1335. }
  1336. return nil
  1337. }
  1338. // EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
  1339. func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error {
  1340. // Generate Name
  1341. lbSecGroupName := getSecurityGroupName(service)
  1342. lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
  1343. if err != nil {
  1344. if isSecurityGroupNotFound(err) {
  1345. // It is OK when the security group has been deleted by others.
  1346. return nil
  1347. }
  1348. return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err)
  1349. }
  1350. lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
  1351. if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
  1352. return lbSecGroup.Err
  1353. }
  1354. if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
  1355. // Just happen when nodes have not Security Group, or should not happen
  1356. // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
  1357. // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
  1358. klog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
  1359. service.Namespace, service.Name)
  1360. } else {
  1361. // Delete the rules in the Node Security Group
  1362. for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
  1363. opts := rules.ListOpts{
  1364. SecGroupID: nodeSecurityGroupID,
  1365. RemoteGroupID: lbSecGroupID,
  1366. }
  1367. secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
  1368. if err != nil && !isNotFound(err) {
  1369. msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
  1370. return fmt.Errorf(msg)
  1371. }
  1372. for _, rule := range secGroupRules {
  1373. res := rules.Delete(lbaas.network, rule.ID)
  1374. if res.Err != nil && !isNotFound(res.Err) {
  1375. return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
  1376. }
  1377. }
  1378. }
  1379. }
  1380. return nil
  1381. }