cloud_request_manager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cloudresource
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. cloudprovider "k8s.io/cloud-provider"
  23. "k8s.io/klog"
  24. )
  25. // SyncManager is an interface for making requests to a cloud provider
  26. type SyncManager interface {
  27. Run(stopCh <-chan struct{})
  28. NodeAddresses() ([]v1.NodeAddress, error)
  29. }
  30. var _ SyncManager = &cloudResourceSyncManager{}
  31. type cloudResourceSyncManager struct {
  32. // Cloud provider interface.
  33. cloud cloudprovider.Interface
  34. // Sync period
  35. syncPeriod time.Duration
  36. nodeAddressesMonitor *sync.Cond
  37. nodeAddressesErr error
  38. nodeAddresses []v1.NodeAddress
  39. nodeName types.NodeName
  40. }
  41. // NewSyncManager creates a manager responsible for collecting resources from a
  42. // cloud provider through requests that are sensitive to timeouts and hanging
  43. func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
  44. return &cloudResourceSyncManager{
  45. cloud: cloud,
  46. syncPeriod: syncPeriod,
  47. nodeName: nodeName,
  48. // nodeAddressesMonitor is a monitor that guards a result (nodeAddresses,
  49. // nodeAddressesErr) of the sync loop under the condition that a result has
  50. // been saved at least once. The semantics here are:
  51. //
  52. // * Readers of the result will wait on the monitor until the first result
  53. // has been saved.
  54. // * The sync loop (i.e. the only writer), will signal all waiters every
  55. // time it updates the result.
  56. nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
  57. }
  58. }
  59. // NodeAddresses waits for the first sync loop to run. If no successful syncs
  60. // have run, it will return the most recent error. If node addresses have been
  61. // synced successfully, it will return the list of node addresses from the most
  62. // recent successful sync.
  63. func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
  64. m.nodeAddressesMonitor.L.Lock()
  65. defer m.nodeAddressesMonitor.L.Unlock()
  66. // wait until there is something
  67. for {
  68. if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
  69. return addrs, err
  70. }
  71. klog.V(5).Infof("Waiting for cloud provider to provide node addresses")
  72. m.nodeAddressesMonitor.Wait()
  73. }
  74. }
  75. // getNodeAddresses calls the cloud provider to get a current list of node addresses.
  76. func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) {
  77. // TODO(roberthbailey): Can we do this without having credentials to talk to
  78. // the cloud provider?
  79. // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and
  80. // returned an interface.
  81. // TODO: If IP addresses couldn't be fetched from the cloud provider, should
  82. // kubelet fallback on the other methods for getting the IP below?
  83. instances, ok := m.cloud.Instances()
  84. if !ok {
  85. return nil, fmt.Errorf("failed to get instances from cloud provider")
  86. }
  87. return instances.NodeAddresses(context.TODO(), m.nodeName)
  88. }
  89. func (m *cloudResourceSyncManager) syncNodeAddresses() {
  90. klog.V(5).Infof("Requesting node addresses from cloud provider for node %q", m.nodeName)
  91. addrs, err := m.getNodeAddresses()
  92. m.nodeAddressesMonitor.L.Lock()
  93. defer m.nodeAddressesMonitor.L.Unlock()
  94. defer m.nodeAddressesMonitor.Broadcast()
  95. if err != nil {
  96. klog.V(2).Infof("Node addresses from cloud provider for node %q not collected: %v", m.nodeName, err)
  97. if len(m.nodeAddresses) > 0 {
  98. // in the event that a sync loop fails when a previous sync had
  99. // succeeded, continue to use the old addresses.
  100. return
  101. }
  102. m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err)
  103. return
  104. }
  105. klog.V(5).Infof("Node addresses from cloud provider for node %q collected", m.nodeName)
  106. m.nodeAddressesErr = nil
  107. m.nodeAddresses = addrs
  108. }
  109. // Run starts the cloud resource sync manager's sync loop.
  110. func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
  111. wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh)
  112. }