12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package openstack
- import (
- "context"
- "fmt"
- "net"
- "reflect"
- "strings"
- "time"
- "github.com/gophercloud/gophercloud"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/external"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
- v2monitors "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
- v2pools "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
- "github.com/gophercloud/gophercloud/openstack/networking/v2/networks"
- neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
- "github.com/gophercloud/gophercloud/pagination"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- cloudprovider "k8s.io/cloud-provider"
- servicehelpers "k8s.io/cloud-provider/service/helpers"
- )
- // Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
- // this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
- const (
- // loadbalancerActive* is configuration of exponential backoff for
- // going into ACTIVE loadbalancer provisioning status. Starting with 1
- // seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
- // it will time out after 128s, which roughly corresponds to 120s
- loadbalancerActiveInitDelay = 1 * time.Second
- loadbalancerActiveFactor = 1.2
- loadbalancerActiveSteps = 19
- // loadbalancerDelete* is configuration of exponential backoff for
- // waiting for delete operation to complete. Starting with 1
- // seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
- // it will time out after 32s, which roughly corresponds to 30s
- loadbalancerDeleteInitDelay = 1 * time.Second
- loadbalancerDeleteFactor = 1.2
- loadbalancerDeleteSteps = 13
- activeStatus = "ACTIVE"
- errorStatus = "ERROR"
- ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
- ServiceAnnotationLoadBalancerSubnetID = "loadbalancer.openstack.org/subnet-id"
- // ServiceAnnotationLoadBalancerInternal is the annotation used on the service
- // to indicate that we want an internal loadbalancer service.
- // If the value of ServiceAnnotationLoadBalancerInternal is false, it indicates that we want an external loadbalancer service. Default to false.
- ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
- )
- var _ cloudprovider.LoadBalancer = (*LbaasV2)(nil)
- // LbaasV2 is a LoadBalancer implementation for Neutron LBaaS v2 API
- type LbaasV2 struct {
- LoadBalancer
- }
- func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
- seen := make(map[string]bool)
- pager := extensions.List(client)
- err := pager.EachPage(func(page pagination.Page) (bool, error) {
- exts, err := extensions.ExtractExtensions(page)
- if err != nil {
- return false, err
- }
- for _, ext := range exts {
- seen[ext.Alias] = true
- }
- return true, nil
- })
- return seen, err
- }
- func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
- opts := floatingips.ListOpts{
- PortID: portID,
- }
- pager := floatingips.List(client, opts)
- floatingIPList := make([]floatingips.FloatingIP, 0, 1)
- err := pager.EachPage(func(page pagination.Page) (bool, error) {
- f, err := floatingips.ExtractFloatingIPs(page)
- if err != nil {
- return false, err
- }
- floatingIPList = append(floatingIPList, f...)
- if len(floatingIPList) > 1 {
- return false, ErrMultipleResults
- }
- return true, nil
- })
- if err != nil {
- if isNotFound(err) {
- return nil, ErrNotFound
- }
- return nil, err
- }
- if len(floatingIPList) == 0 {
- return nil, ErrNotFound
- } else if len(floatingIPList) > 1 {
- return nil, ErrMultipleResults
- }
- return &floatingIPList[0], nil
- }
- func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
- opts := loadbalancers.ListOpts{
- Name: name,
- }
- pager := loadbalancers.List(client, opts)
- loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
- err := pager.EachPage(func(page pagination.Page) (bool, error) {
- v, err := loadbalancers.ExtractLoadBalancers(page)
- if err != nil {
- return false, err
- }
- loadbalancerList = append(loadbalancerList, v...)
- if len(loadbalancerList) > 1 {
- return false, ErrMultipleResults
- }
- return true, nil
- })
- if err != nil {
- if isNotFound(err) {
- return nil, ErrNotFound
- }
- return nil, err
- }
- if len(loadbalancerList) == 0 {
- return nil, ErrNotFound
- } else if len(loadbalancerList) > 1 {
- return nil, ErrMultipleResults
- }
- return &loadbalancerList[0], nil
- }
- func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) {
- var existingListeners []listeners.Listener
- err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) {
- listenerList, err := listeners.ExtractListeners(page)
- if err != nil {
- return false, err
- }
- for _, l := range listenerList {
- for _, lb := range l.Loadbalancers {
- if lb.ID == id {
- existingListeners = append(existingListeners, l)
- break
- }
- }
- }
- return true, nil
- })
- if err != nil {
- return nil, err
- }
- return existingListeners, nil
- }
- // get listener for a port or nil if does not exist
- func getListenerForPort(existingListeners []listeners.Listener, port v1.ServicePort) *listeners.Listener {
- for _, l := range existingListeners {
- if listeners.Protocol(l.Protocol) == toListenersProtocol(port.Protocol) && l.ProtocolPort == int(port.Port) {
- return &l
- }
- }
- return nil
- }
- // Get pool for a listener. A listener always has exactly one pool.
- func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2pools.Pool, error) {
- listenerPools := make([]v2pools.Pool, 0, 1)
- err := v2pools.List(client, v2pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) {
- poolsList, err := v2pools.ExtractPools(page)
- if err != nil {
- return false, err
- }
- for _, p := range poolsList {
- for _, l := range p.Listeners {
- if l.ID == listenerID {
- listenerPools = append(listenerPools, p)
- }
- }
- }
- if len(listenerPools) > 1 {
- return false, ErrMultipleResults
- }
- return true, nil
- })
- if err != nil {
- if isNotFound(err) {
- return nil, ErrNotFound
- }
- return nil, err
- }
- if len(listenerPools) == 0 {
- return nil, ErrNotFound
- } else if len(listenerPools) > 1 {
- return nil, ErrMultipleResults
- }
- return &listenerPools[0], nil
- }
- func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools.Member, error) {
- var members []v2pools.Member
- err := v2pools.ListMembers(client, id, v2pools.ListMembersOpts{}).EachPage(func(page pagination.Page) (bool, error) {
- membersList, err := v2pools.ExtractMembers(page)
- if err != nil {
- return false, err
- }
- members = append(members, membersList...)
- return true, nil
- })
- if err != nil {
- return nil, err
- }
- return members, nil
- }
- // Check if a member exists for node
- func memberExists(members []v2pools.Member, addr string, port int) bool {
- for _, member := range members {
- if member.Address == addr && member.ProtocolPort == port {
- return true
- }
- }
- return false
- }
- func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
- for i, existingListener := range existingListeners {
- if existingListener.ID == id {
- existingListeners[i] = existingListeners[len(existingListeners)-1]
- existingListeners = existingListeners[:len(existingListeners)-1]
- break
- }
- }
- return existingListeners
- }
- func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member {
- for i, member := range members {
- if member.Address == addr && member.ProtocolPort == port {
- members[i] = members[len(members)-1]
- members = members[:len(members)-1]
- }
- }
- return members
- }
- func getSecurityGroupName(service *v1.Service) string {
- securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name)
- //OpenStack requires that the name of a security group is shorter than 255 bytes.
- if len(securityGroupName) > 255 {
- securityGroupName = securityGroupName[:255]
- }
- return securityGroupName
- }
- func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) {
- pager := rules.List(client, opts)
- var securityRules []rules.SecGroupRule
- err := pager.EachPage(func(page pagination.Page) (bool, error) {
- ruleList, err := rules.ExtractRules(page)
- if err != nil {
- return false, err
- }
- securityRules = append(securityRules, ruleList...)
- return true, nil
- })
- if err != nil {
- return nil, err
- }
- return securityRules, nil
- }
- func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
- backoff := wait.Backoff{
- Duration: loadbalancerActiveInitDelay,
- Factor: loadbalancerActiveFactor,
- Steps: loadbalancerActiveSteps,
- }
- var provisioningStatus string
- err := wait.ExponentialBackoff(backoff, func() (bool, error) {
- loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
- if err != nil {
- return false, err
- }
- provisioningStatus = loadbalancer.ProvisioningStatus
- if loadbalancer.ProvisioningStatus == activeStatus {
- return true, nil
- } else if loadbalancer.ProvisioningStatus == errorStatus {
- return true, fmt.Errorf("loadbalancer has gone into ERROR state")
- } else {
- return false, nil
- }
- })
- if err == wait.ErrWaitTimeout {
- err = fmt.Errorf("loadbalancer failed to go into ACTIVE provisioning status within alloted time")
- }
- return provisioningStatus, err
- }
- func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
- backoff := wait.Backoff{
- Duration: loadbalancerDeleteInitDelay,
- Factor: loadbalancerDeleteFactor,
- Steps: loadbalancerDeleteSteps,
- }
- err := wait.ExponentialBackoff(backoff, func() (bool, error) {
- _, err := loadbalancers.Get(client, loadbalancerID).Extract()
- if err != nil {
- if isNotFound(err) {
- return true, nil
- }
- return false, err
- }
- return false, nil
- })
- if err == wait.ErrWaitTimeout {
- err = fmt.Errorf("loadbalancer failed to delete within the alloted time")
- }
- return err
- }
- func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
- switch protocol {
- case v1.ProtocolTCP:
- return rules.ProtocolTCP
- case v1.ProtocolUDP:
- return rules.ProtocolUDP
- default:
- return rules.RuleProtocol(strings.ToLower(string(protocol)))
- }
- }
- func toListenersProtocol(protocol v1.Protocol) listeners.Protocol {
- switch protocol {
- case v1.ProtocolTCP:
- return listeners.ProtocolTCP
- default:
- return listeners.Protocol(string(protocol))
- }
- }
- func createNodeSecurityGroup(client *gophercloud.ServiceClient, nodeSecurityGroupID string, port int, protocol v1.Protocol, lbSecGroup string) error {
- v4NodeSecGroupRuleCreateOpts := rules.CreateOpts{
- Direction: rules.DirIngress,
- PortRangeMax: port,
- PortRangeMin: port,
- Protocol: toRuleProtocol(protocol),
- RemoteGroupID: lbSecGroup,
- SecGroupID: nodeSecurityGroupID,
- EtherType: rules.EtherType4,
- }
- v6NodeSecGroupRuleCreateOpts := rules.CreateOpts{
- Direction: rules.DirIngress,
- PortRangeMax: port,
- PortRangeMin: port,
- Protocol: toRuleProtocol(protocol),
- RemoteGroupID: lbSecGroup,
- SecGroupID: nodeSecurityGroupID,
- EtherType: rules.EtherType6,
- }
- _, err := rules.Create(client, v4NodeSecGroupRuleCreateOpts).Extract()
- if err != nil {
- return err
- }
- _, err = rules.Create(client, v6NodeSecGroupRuleCreateOpts).Extract()
- if err != nil {
- return err
- }
- return nil
- }
- func (lbaas *LbaasV2) createLoadBalancer(service *v1.Service, name string, internalAnnotation bool) (*loadbalancers.LoadBalancer, error) {
- createOpts := loadbalancers.CreateOpts{
- Name: name,
- Description: fmt.Sprintf("Kubernetes external service %s", name),
- VipSubnetID: lbaas.opts.SubnetID,
- Provider: lbaas.opts.LBProvider,
- }
- loadBalancerIP := service.Spec.LoadBalancerIP
- if loadBalancerIP != "" && internalAnnotation {
- createOpts.VipAddress = loadBalancerIP
- }
- loadbalancer, err := loadbalancers.Create(lbaas.lb, createOpts).Extract()
- if err != nil {
- return nil, fmt.Errorf("error creating loadbalancer %v: %v", createOpts, err)
- }
- return loadbalancer, nil
- }
- // GetLoadBalancer returns whether the specified load balancer exists and its status
- func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
- loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
- loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
- if err == ErrNotFound {
- return nil, false, nil
- }
- if loadbalancer == nil {
- return nil, false, err
- }
- status := &v1.LoadBalancerStatus{}
- portID := loadbalancer.VipPortID
- if portID != "" {
- floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
- if err != nil && err != ErrNotFound {
- return nil, false, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
- }
- if floatIP != nil {
- status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
- }
- } else {
- status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
- }
- return status, true, err
- }
- // GetLoadBalancerName is an implementation of LoadBalancer.GetLoadBalancerName.
- func (lbaas *LbaasV2) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
- // TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
- return cloudprovider.DefaultLoadBalancerName(service)
- }
- // The LB needs to be configured with instance addresses on the same
- // subnet as the LB (aka opts.SubnetID). Currently we're just
- // guessing that the node's InternalIP is the right address.
- // In case no InternalIP can be found, ExternalIP is tried.
- // If neither InternalIP nor ExternalIP can be found an error is
- // returned.
- func nodeAddressForLB(node *v1.Node) (string, error) {
- addrs := node.Status.Addresses
- if len(addrs) == 0 {
- return "", ErrNoAddressFound
- }
- allowedAddrTypes := []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP}
- for _, allowedAddrType := range allowedAddrTypes {
- for _, addr := range addrs {
- if addr.Type == allowedAddrType {
- return addr.Address, nil
- }
- }
- }
- return "", ErrNoAddressFound
- }
- //getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
- func getStringFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting string) string {
- klog.V(4).Infof("getStringFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
- if annotationValue, ok := service.Annotations[annotationKey]; ok {
- //if there is an annotation for this setting, set the "setting" var to it
- // annotationValue can be empty, it is working as designed
- // it makes possible for instance provisioning loadbalancer without floatingip
- klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
- return annotationValue
- }
- //if there is no annotation, set "settings" var to the value from cloud config
- klog.V(4).Infof("Could not find a Service Annotation; falling back on cloud-config setting: %v = %v", annotationKey, defaultSetting)
- return defaultSetting
- }
- // getSubnetIDForLB returns subnet-id for a specific node
- func getSubnetIDForLB(compute *gophercloud.ServiceClient, node v1.Node) (string, error) {
- ipAddress, err := nodeAddressForLB(&node)
- if err != nil {
- return "", err
- }
- instanceID := node.Spec.ProviderID
- if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
- instanceID = instanceID[(ind + 1):]
- }
- interfaces, err := getAttachedInterfacesByID(compute, instanceID)
- if err != nil {
- return "", err
- }
- for _, intf := range interfaces {
- for _, fixedIP := range intf.FixedIPs {
- if fixedIP.IPAddress == ipAddress {
- return fixedIP.SubnetID, nil
- }
- }
- }
- return "", ErrNotFound
- }
- // getNodeSecurityGroupIDForLB lists node-security-groups for specific nodes
- func getNodeSecurityGroupIDForLB(compute *gophercloud.ServiceClient, network *gophercloud.ServiceClient, nodes []*v1.Node) ([]string, error) {
- secGroupNames := sets.NewString()
- for _, node := range nodes {
- nodeName := types.NodeName(node.Name)
- srv, err := getServerByName(compute, nodeName)
- if err != nil {
- return []string{}, err
- }
- // use the first node-security-groups
- // case 0: node1:SG1 node2:SG1 return SG1
- // case 1: node1:SG1 node2:SG2 return SG1,SG2
- // case 2: node1:SG1,SG2 node2:SG3,SG4 return SG1,SG3
- // case 3: node1:SG1,SG2 node2:SG2,SG3 return SG1,SG2
- secGroupNames.Insert(srv.SecurityGroups[0]["name"].(string))
- }
- secGroupIDs := make([]string, secGroupNames.Len())
- for i, name := range secGroupNames.List() {
- secGroupID, err := groups.IDFromName(network, name)
- if err != nil {
- return []string{}, err
- }
- secGroupIDs[i] = secGroupID
- }
- return secGroupIDs, nil
- }
- // isSecurityGroupNotFound return true while 'err' is object of gophercloud.ErrResourceNotFound
- func isSecurityGroupNotFound(err error) bool {
- errType := reflect.TypeOf(err).String()
- errTypeSlice := strings.Split(errType, ".")
- errTypeValue := ""
- if len(errTypeSlice) != 0 {
- errTypeValue = errTypeSlice[len(errTypeSlice)-1]
- }
- if errTypeValue == "ErrResourceNotFound" {
- return true
- }
- return false
- }
- // getFloatingNetworkIDForLB returns a floating-network-id for cluster.
- func getFloatingNetworkIDForLB(client *gophercloud.ServiceClient) (string, error) {
- var floatingNetworkIds []string
- type NetworkWithExternalExt struct {
- networks.Network
- external.NetworkExternalExt
- }
- err := networks.List(client, networks.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
- var externalNetwork []NetworkWithExternalExt
- err := networks.ExtractNetworksInto(page, &externalNetwork)
- if err != nil {
- return false, err
- }
- for _, externalNet := range externalNetwork {
- if externalNet.External {
- floatingNetworkIds = append(floatingNetworkIds, externalNet.ID)
- }
- }
- if len(floatingNetworkIds) > 1 {
- return false, ErrMultipleResults
- }
- return true, nil
- })
- if err != nil {
- if isNotFound(err) {
- return "", ErrNotFound
- }
- if err == ErrMultipleResults {
- klog.V(4).Infof("find multiple external networks, pick the first one when there are no explicit configuration.")
- return floatingNetworkIds[0], nil
- }
- return "", err
- }
- if len(floatingNetworkIds) == 0 {
- return "", ErrNotFound
- }
- return floatingNetworkIds[0], nil
- }
- // TODO: This code currently ignores 'region' and always creates a
- // loadbalancer in only the current OpenStack region. We should take
- // a list of regions (from config) and query/create loadbalancers in
- // each region.
- // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
- func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
- klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
- if len(nodes) == 0 {
- return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s/%s", apiService.Namespace, apiService.Name)
- }
- lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
- if len(lbaas.opts.SubnetID) == 0 {
- // Get SubnetID automatically.
- // The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
- subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
- if err != nil {
- klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
- "and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
- }
- lbaas.opts.SubnetID = subnetID
- }
- ports := apiService.Spec.Ports
- if len(ports) == 0 {
- return nil, fmt.Errorf("no ports provided to openstack load balancer")
- }
- floatingPool := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
- if len(floatingPool) == 0 {
- var err error
- floatingPool, err = getFloatingNetworkIDForLB(lbaas.network)
- if err != nil {
- klog.Warningf("Failed to find floating-network-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- }
- var internalAnnotation bool
- internal := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, "false")
- switch internal {
- case "true":
- klog.V(4).Infof("Ensure an internal loadbalancer service.")
- internalAnnotation = true
- case "false":
- if len(floatingPool) != 0 {
- klog.V(4).Infof("Ensure an external loadbalancer service, using floatingPool: %v", floatingPool)
- internalAnnotation = false
- } else {
- return nil, fmt.Errorf("floating-network-id or loadbalancer.openstack.org/floating-network-id should be specified when ensuring an external loadbalancer service")
- }
- default:
- return nil, fmt.Errorf("unknown service.beta.kubernetes.io/openstack-internal-load-balancer annotation: %v, specify \"true\" or \"false\" ",
- internal)
- }
- // Check for TCP protocol on each port
- // TODO: Convert all error messages to use an event recorder
- for _, port := range ports {
- if port.Protocol != v1.ProtocolTCP {
- return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
- }
- }
- sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
- if err != nil {
- return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- if !servicehelpers.IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
- return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
- }
- affinity := apiService.Spec.SessionAffinity
- var persistence *v2pools.SessionPersistence
- switch affinity {
- case v1.ServiceAffinityNone:
- persistence = nil
- case v1.ServiceAffinityClientIP:
- persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
- default:
- return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
- }
- name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
- loadbalancer, err := getLoadbalancerByName(lbaas.lb, name)
- if err != nil {
- if err != ErrNotFound {
- return nil, fmt.Errorf("error getting loadbalancer %s: %v", name, err)
- }
- klog.V(2).Infof("Creating loadbalancer %s", name)
- loadbalancer, err = lbaas.createLoadBalancer(apiService, name, internalAnnotation)
- if err != nil {
- // Unknown error, retry later
- return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
- }
- } else {
- klog.V(2).Infof("LoadBalancer %s already exists", name)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
- if lbmethod == "" {
- lbmethod = v2pools.LBMethodRoundRobin
- }
- oldListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("error getting LB %s listeners: %v", name, err)
- }
- for portIndex, port := range ports {
- listener := getListenerForPort(oldListeners, port)
- if listener == nil {
- klog.V(4).Infof("Creating listener for port %d", int(port.Port))
- listener, err = listeners.Create(lbaas.lb, listeners.CreateOpts{
- Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
- Protocol: listeners.Protocol(port.Protocol),
- ProtocolPort: int(port.Port),
- LoadbalancerID: loadbalancer.ID,
- }).Extract()
- if err != nil {
- // Unknown error, retry later
- return nil, fmt.Errorf("error creating LB listener: %v", err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- klog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID)
- // After all ports have been processed, remaining listeners are removed as obsolete.
- // Pop valid listeners.
- oldListeners = popListener(oldListeners, listener.ID)
- pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
- if err != nil && err != ErrNotFound {
- // Unknown error, retry later
- return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
- }
- if pool == nil {
- klog.V(4).Infof("Creating pool for listener %s", listener.ID)
- pool, err = v2pools.Create(lbaas.lb, v2pools.CreateOpts{
- Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
- Protocol: v2pools.Protocol(port.Protocol),
- LBMethod: lbmethod,
- ListenerID: listener.ID,
- Persistence: persistence,
- }).Extract()
- if err != nil {
- // Unknown error, retry later
- return nil, fmt.Errorf("error creating pool for listener %s: %v", listener.ID, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- klog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID)
- members, err := getMembersByPoolID(lbaas.lb, pool.ID)
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
- }
- for _, node := range nodes {
- addr, err := nodeAddressForLB(node)
- if err != nil {
- if err == ErrNotFound {
- // Node failure, do not create member
- klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
- continue
- } else {
- return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
- }
- }
- if !memberExists(members, addr, int(port.NodePort)) {
- klog.V(4).Infof("Creating member for pool %s", pool.ID)
- _, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
- Name: fmt.Sprintf("member_%s_%d_%s", name, portIndex, node.Name),
- ProtocolPort: int(port.NodePort),
- Address: addr,
- SubnetID: lbaas.opts.SubnetID,
- }).Extract()
- if err != nil {
- return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- } else {
- // After all members have been processed, remaining members are deleted as obsolete.
- members = popMember(members, addr, int(port.NodePort))
- }
- klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
- }
- // Delete obsolete members for this pool
- for _, member := range members {
- klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
- err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- monitorID := pool.MonitorID
- if monitorID == "" && lbaas.opts.CreateMonitor {
- klog.V(4).Infof("Creating monitor for pool %s", pool.ID)
- monitor, err := v2monitors.Create(lbaas.lb, v2monitors.CreateOpts{
- Name: fmt.Sprintf("monitor_%s_%d", name, portIndex),
- PoolID: pool.ID,
- Type: string(port.Protocol),
- Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
- Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
- MaxRetries: int(lbaas.opts.MonitorMaxRetries),
- }).Extract()
- if err != nil {
- return nil, fmt.Errorf("error creating LB pool healthmonitor: %v", err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- monitorID = monitor.ID
- } else if lbaas.opts.CreateMonitor == false {
- klog.V(4).Infof("Do not create monitor for pool %s when create-monitor is false", pool.ID)
- }
- if monitorID != "" {
- klog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID)
- }
- }
- // All remaining listeners are obsolete, delete
- for _, listener := range oldListeners {
- klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
- // get pool for listener
- pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
- if err != nil && err != ErrNotFound {
- return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
- }
- if pool != nil {
- // get and delete monitor
- monitorID := pool.MonitorID
- if monitorID != "" {
- klog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
- err = v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // get and delete pool members
- members, err := getMembersByPoolID(lbaas.lb, pool.ID)
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
- }
- if members != nil {
- for _, member := range members {
- klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
- err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- }
- klog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
- // delete pool
- err = v2pools.Delete(lbaas.lb, pool.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // delete listener
- err = listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return nil, fmt.Errorf("error deleteting obsolete listener: %v", err)
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- klog.V(2).Infof("Deleted obsolete listener: %s", listener.ID)
- }
- portID := loadbalancer.VipPortID
- floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
- if err != nil && err != ErrNotFound {
- return nil, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
- }
- if floatIP == nil && floatingPool != "" && !internalAnnotation {
- klog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
- floatIPOpts := floatingips.CreateOpts{
- FloatingNetworkID: floatingPool,
- PortID: portID,
- }
- loadBalancerIP := apiService.Spec.LoadBalancerIP
- if loadBalancerIP != "" {
- floatIPOpts.FloatingIP = loadBalancerIP
- }
- floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
- if err != nil {
- return nil, fmt.Errorf("error creating LB floatingip %+v: %v", floatIPOpts, err)
- }
- }
- status := &v1.LoadBalancerStatus{}
- if floatIP != nil {
- status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
- } else {
- status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
- }
- if lbaas.opts.ManageSecurityGroups {
- err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
- if err != nil {
- // cleanup what was created so far
- _ = lbaas.EnsureLoadBalancerDeleted(ctx, clusterName, apiService)
- return status, err
- }
- }
- return status, nil
- }
- // ensureSecurityGroup ensures security group exist for specific loadbalancer service.
- // Creating security group for specific loadbalancer service when it does not exist.
- func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
- // find node-security-group for service
- var err error
- if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
- lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
- if err != nil {
- return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- }
- klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
- // get service ports
- ports := apiService.Spec.Ports
- if len(ports) == 0 {
- return fmt.Errorf("no ports provided to openstack load balancer")
- }
- // get service source ranges
- sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
- if err != nil {
- return fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- // ensure security group for LB
- lbSecGroupName := getSecurityGroupName(apiService)
- lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
- if err != nil {
- // If the security group of LB not exist, create it later
- if isSecurityGroupNotFound(err) {
- lbSecGroupID = ""
- } else {
- return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
- }
- }
- if len(lbSecGroupID) == 0 {
- // create security group
- lbSecGroupCreateOpts := groups.CreateOpts{
- Name: getSecurityGroupName(apiService),
- Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName),
- }
- lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract()
- if err != nil {
- return fmt.Errorf("failed to create Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- lbSecGroupID = lbSecGroup.ID
- //add rule in security group
- for _, port := range ports {
- for _, sourceRange := range sourceRanges.StringSlice() {
- ethertype := rules.EtherType4
- network, _, err := net.ParseCIDR(sourceRange)
- if err != nil {
- return fmt.Errorf("error parsing source range %s as a CIDR: %v", sourceRange, err)
- }
- if network.To4() == nil {
- ethertype = rules.EtherType6
- }
- lbSecGroupRuleCreateOpts := rules.CreateOpts{
- Direction: rules.DirIngress,
- PortRangeMax: int(port.Port),
- PortRangeMin: int(port.Port),
- Protocol: toRuleProtocol(port.Protocol),
- RemoteIPPrefix: sourceRange,
- SecGroupID: lbSecGroup.ID,
- EtherType: ethertype,
- }
- _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
- if err != nil {
- return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
- }
- }
- }
- lbSecGroupRuleCreateOpts := rules.CreateOpts{
- Direction: rules.DirIngress,
- PortRangeMax: 4, // ICMP: Code - Values for ICMP "Destination Unreachable: Fragmentation Needed and Don't Fragment was Set"
- PortRangeMin: 3, // ICMP: Type
- Protocol: rules.ProtocolICMP,
- RemoteIPPrefix: "0.0.0.0/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
- SecGroupID: lbSecGroup.ID,
- EtherType: rules.EtherType4,
- }
- _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
- if err != nil {
- return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
- }
- lbSecGroupRuleCreateOpts = rules.CreateOpts{
- Direction: rules.DirIngress,
- PortRangeMax: 0, // ICMP: Code - Values for ICMP "Packet Too Big"
- PortRangeMin: 2, // ICMP: Type
- Protocol: rules.ProtocolICMP,
- RemoteIPPrefix: "::/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
- SecGroupID: lbSecGroup.ID,
- EtherType: rules.EtherType6,
- }
- _, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
- if err != nil {
- return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
- }
- // get security groups of port
- portID := loadbalancer.VipPortID
- port, err := getPortByID(lbaas.network, portID)
- if err != nil {
- return err
- }
- // ensure the vip port has the security groups
- found := false
- for _, portSecurityGroups := range port.SecurityGroups {
- if portSecurityGroups == lbSecGroup.ID {
- found = true
- break
- }
- }
- // update loadbalancer vip port
- if !found {
- port.SecurityGroups = append(port.SecurityGroups, lbSecGroup.ID)
- updateOpts := neutronports.UpdateOpts{SecurityGroups: &port.SecurityGroups}
- res := neutronports.Update(lbaas.network, portID, updateOpts)
- if res.Err != nil {
- msg := fmt.Sprintf("Error occurred updating port %s for loadbalancer service %s/%s: %v", portID, apiService.Namespace, apiService.Name, res.Err)
- return fmt.Errorf(msg)
- }
- }
- }
- // ensure rules for every node security group
- for _, port := range ports {
- for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
- opts := rules.ListOpts{
- Direction: string(rules.DirIngress),
- SecGroupID: nodeSecurityGroupID,
- RemoteGroupID: lbSecGroupID,
- PortRangeMax: int(port.NodePort),
- PortRangeMin: int(port.NodePort),
- Protocol: string(port.Protocol),
- }
- secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
- if err != nil && !isNotFound(err) {
- msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
- return fmt.Errorf(msg)
- }
- if len(secGroupRules) != 0 {
- // Do not add rule when find rules for remote group in the Node Security Group
- continue
- }
- // Add the rules in the Node Security Group
- err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
- if err != nil {
- return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- }
- }
- return nil
- }
- // UpdateLoadBalancer updates hosts under the specified load balancer.
- func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
- loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
- klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
- lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
- if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
- // Get SubnetID automatically.
- // The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
- subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
- if err != nil {
- klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
- return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
- "and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
- }
- lbaas.opts.SubnetID = subnetID
- }
- ports := service.Spec.Ports
- if len(ports) == 0 {
- return fmt.Errorf("no ports provided to openstack load balancer")
- }
- loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
- if err != nil {
- return err
- }
- if loadbalancer == nil {
- return fmt.Errorf("loadbalancer %s does not exist", loadBalancerName)
- }
- // Get all listeners for this loadbalancer, by "port key".
- type portKey struct {
- Protocol listeners.Protocol
- Port int
- }
- var listenerIDs []string
- lbListeners := make(map[portKey]listeners.Listener)
- allListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("error getting listeners for LB %s: %v", loadBalancerName, err)
- }
- for _, l := range allListeners {
- key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
- lbListeners[key] = l
- listenerIDs = append(listenerIDs, l.ID)
- }
- // Get all pools for this loadbalancer, by listener ID.
- lbPools := make(map[string]v2pools.Pool)
- for _, listenerID := range listenerIDs {
- pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listenerID)
- if err != nil {
- return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
- }
- lbPools[listenerID] = *pool
- }
- // Compose Set of member (addresses) that _should_ exist
- addrs := make(map[string]*v1.Node)
- for _, node := range nodes {
- addr, err := nodeAddressForLB(node)
- if err != nil {
- return err
- }
- addrs[addr] = node
- }
- // Check for adding/removing members associated with each port
- for portIndex, port := range ports {
- // Get listener associated with this port
- listener, ok := lbListeners[portKey{
- Protocol: toListenersProtocol(port.Protocol),
- Port: int(port.Port),
- }]
- if !ok {
- return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
- }
- // Get pool associated with this listener
- pool, ok := lbPools[listener.ID]
- if !ok {
- return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
- }
- // Find existing pool members (by address) for this port
- getMembers, err := getMembersByPoolID(lbaas.lb, pool.ID)
- if err != nil {
- return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
- }
- members := make(map[string]v2pools.Member)
- for _, member := range getMembers {
- members[member.Address] = member
- }
- // Add any new members for this port
- for addr, node := range addrs {
- if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
- // Already exists, do not create member
- continue
- }
- _, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
- Name: fmt.Sprintf("member_%s_%d_%s", loadbalancer.Name, portIndex, node.Name),
- Address: addr,
- ProtocolPort: int(port.NodePort),
- SubnetID: lbaas.opts.SubnetID,
- }).Extract()
- if err != nil {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // Remove any old members for this port
- for _, member := range members {
- if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
- // Still present, do not delete member
- continue
- }
- err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- }
- if lbaas.opts.ManageSecurityGroups {
- err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
- if err != nil {
- return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
- }
- }
- return nil
- }
- // updateSecurityGroup updating security group for specific loadbalancer service.
- func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
- originalNodeSecurityGroupIDs := lbaas.opts.NodeSecurityGroupIDs
- var err error
- lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
- if err != nil {
- return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
- original := sets.NewString(originalNodeSecurityGroupIDs...)
- current := sets.NewString(lbaas.opts.NodeSecurityGroupIDs...)
- removals := original.Difference(current)
- // Generate Name
- lbSecGroupName := getSecurityGroupName(apiService)
- lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
- if err != nil {
- return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
- }
- ports := apiService.Spec.Ports
- if len(ports) == 0 {
- return fmt.Errorf("no ports provided to openstack load balancer")
- }
- for _, port := range ports {
- for removal := range removals {
- // Delete the rules in the Node Security Group
- opts := rules.ListOpts{
- Direction: string(rules.DirIngress),
- SecGroupID: removal,
- RemoteGroupID: lbSecGroupID,
- PortRangeMax: int(port.NodePort),
- PortRangeMin: int(port.NodePort),
- Protocol: string(port.Protocol),
- }
- secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
- if err != nil && !isNotFound(err) {
- return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, removal, err)
- }
- for _, rule := range secGroupRules {
- res := rules.Delete(lbaas.network, rule.ID)
- if res.Err != nil && !isNotFound(res.Err) {
- return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
- }
- }
- }
- for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
- opts := rules.ListOpts{
- Direction: string(rules.DirIngress),
- SecGroupID: nodeSecurityGroupID,
- RemoteGroupID: lbSecGroupID,
- PortRangeMax: int(port.NodePort),
- PortRangeMin: int(port.NodePort),
- Protocol: string(port.Protocol),
- }
- secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
- if err != nil && !isNotFound(err) {
- return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
- }
- if len(secGroupRules) != 0 {
- // Do not add rule when find rules for remote group in the Node Security Group
- continue
- }
- // Add the rules in the Node Security Group
- err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
- if err != nil {
- return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
- }
- }
- }
- return nil
- }
- // EnsureLoadBalancerDeleted deletes the specified load balancer
- func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
- loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
- klog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
- loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
- if err != nil && err != ErrNotFound {
- return err
- }
- if loadbalancer == nil {
- return nil
- }
- if loadbalancer.VipPortID != "" {
- portID := loadbalancer.VipPortID
- floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
- if err != nil && err != ErrNotFound {
- return err
- }
- if floatingIP != nil {
- err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- }
- }
- // get all listeners associated with this loadbalancer
- listenerList, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.ID, err)
- }
- // get all pools (and health monitors) associated with this loadbalancer
- var poolIDs []string
- var monitorIDs []string
- for _, listener := range listenerList {
- pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
- if err != nil && err != ErrNotFound {
- return fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
- }
- if pool != nil {
- poolIDs = append(poolIDs, pool.ID)
- // If create-monitor of cloud-config is false, pool has not monitor.
- if pool.MonitorID != "" {
- monitorIDs = append(monitorIDs, pool.MonitorID)
- }
- }
- }
- // delete all monitors
- for _, monitorID := range monitorIDs {
- err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // delete all members and pools
- for _, poolID := range poolIDs {
- // get members for current pool
- membersList, err := getMembersByPoolID(lbaas.lb, poolID)
- if err != nil && !isNotFound(err) {
- return fmt.Errorf("error getting pool members %s: %v", poolID, err)
- }
- // delete all members for this pool
- for _, member := range membersList {
- err := v2pools.DeleteMember(lbaas.lb, poolID, member.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // delete pool
- err = v2pools.Delete(lbaas.lb, poolID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // delete all listeners
- for _, listener := range listenerList {
- err := listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
- }
- }
- // delete loadbalancer
- err = loadbalancers.Delete(lbaas.lb, loadbalancer.ID).ExtractErr()
- if err != nil && !isNotFound(err) {
- return err
- }
- err = waitLoadbalancerDeleted(lbaas.lb, loadbalancer.ID)
- if err != nil {
- return fmt.Errorf("failed to delete loadbalancer: %v", err)
- }
- // Delete the Security Group
- if lbaas.opts.ManageSecurityGroups {
- err := lbaas.EnsureSecurityGroupDeleted(clusterName, service)
- if err != nil {
- return fmt.Errorf("Failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
- }
- }
- return nil
- }
- // EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
- func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error {
- // Generate Name
- lbSecGroupName := getSecurityGroupName(service)
- lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
- if err != nil {
- if isSecurityGroupNotFound(err) {
- // It is OK when the security group has been deleted by others.
- return nil
- }
- return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err)
- }
- lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
- if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
- return lbSecGroup.Err
- }
- if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
- // Just happen when nodes have not Security Group, or should not happen
- // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
- // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
- klog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
- service.Namespace, service.Name)
- } else {
- // Delete the rules in the Node Security Group
- for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
- opts := rules.ListOpts{
- SecGroupID: nodeSecurityGroupID,
- RemoteGroupID: lbSecGroupID,
- }
- secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
- if err != nil && !isNotFound(err) {
- msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
- return fmt.Errorf(msg)
- }
- for _, rule := range secGroupRules {
- res := rules.Delete(lbaas.network, rule.ID)
- if res.Err != nil && !isNotFound(res.Err) {
- return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
- }
- }
- }
- }
- return nil
- }
|