nodemapper.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package vsphere
  14. import (
  15. "context"
  16. "errors"
  17. "strings"
  18. "sync"
  19. "github.com/vmware/govmomi/object"
  20. "github.com/vmware/govmomi/vapi/rest"
  21. "github.com/vmware/govmomi/vapi/tags"
  22. "github.com/vmware/govmomi/vim25/mo"
  23. "github.com/vmware/govmomi/vim25/types"
  24. v1 "k8s.io/api/core/v1"
  25. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  26. neturl "net/url"
  27. )
  28. type NodeMapper struct {
  29. }
  30. type NodeInfo struct {
  31. Name string
  32. DataCenterRef types.ManagedObjectReference
  33. VirtualMachineRef types.ManagedObjectReference
  34. HostSystemRef types.ManagedObjectReference
  35. VSphere *VSphere
  36. Zones []string
  37. }
  38. const (
  39. DatacenterType = "Datacenter"
  40. ClusterComputeResourceType = "ClusterComputeResource"
  41. HostSystemType = "HostSystem"
  42. )
  43. var (
  44. nameToNodeInfo = make(map[string]*NodeInfo)
  45. vcToZoneDatastoresMap = make(map[string](map[string][]string))
  46. )
  47. // GenerateNodeMap populates node name to node info map
  48. func (nm *NodeMapper) GenerateNodeMap(vSphereInstances map[string]*VSphere, nodeList v1.NodeList) error {
  49. type VmSearch struct {
  50. vs *VSphere
  51. datacenter *object.Datacenter
  52. }
  53. var wg sync.WaitGroup
  54. var queueChannel []*VmSearch
  55. var datacenters []*object.Datacenter
  56. var err error
  57. for _, vs := range vSphereInstances {
  58. // Create context
  59. ctx, cancel := context.WithCancel(context.Background())
  60. defer cancel()
  61. if vs.Config.Datacenters == "" {
  62. datacenters, err = vs.GetAllDatacenter(ctx)
  63. if err != nil {
  64. e2elog.Logf("NodeMapper error: %v", err)
  65. continue
  66. }
  67. } else {
  68. dcName := strings.Split(vs.Config.Datacenters, ",")
  69. for _, dc := range dcName {
  70. dc = strings.TrimSpace(dc)
  71. if dc == "" {
  72. continue
  73. }
  74. datacenter, err := vs.GetDatacenter(ctx, dc)
  75. if err != nil {
  76. e2elog.Logf("NodeMapper error dc: %s \n err: %v", dc, err)
  77. continue
  78. }
  79. datacenters = append(datacenters, datacenter)
  80. }
  81. }
  82. for _, dc := range datacenters {
  83. e2elog.Logf("Search candidates vc=%s and datacenter=%s", vs.Config.Hostname, dc.Name())
  84. queueChannel = append(queueChannel, &VmSearch{vs: vs, datacenter: dc})
  85. }
  86. }
  87. for _, node := range nodeList.Items {
  88. n := node
  89. go func() {
  90. nodeUUID := getUUIDFromProviderID(n.Spec.ProviderID)
  91. e2elog.Logf("Searching for node with UUID: %s", nodeUUID)
  92. for _, res := range queueChannel {
  93. ctx, cancel := context.WithCancel(context.Background())
  94. defer cancel()
  95. vm, err := res.vs.GetVMByUUID(ctx, nodeUUID, res.datacenter)
  96. if err != nil {
  97. e2elog.Logf("Error %v while looking for node=%s in vc=%s and datacenter=%s",
  98. err, n.Name, res.vs.Config.Hostname, res.datacenter.Name())
  99. continue
  100. }
  101. if vm != nil {
  102. hostSystemRef := res.vs.GetHostFromVMReference(ctx, vm.Reference())
  103. zones := retrieveZoneInformationForNode(n.Name, res.vs, hostSystemRef)
  104. e2elog.Logf("Found node %s as vm=%+v placed on host=%+v under zones %s in vc=%s and datacenter=%s",
  105. n.Name, vm, hostSystemRef, zones, res.vs.Config.Hostname, res.datacenter.Name())
  106. nodeInfo := &NodeInfo{Name: n.Name, DataCenterRef: res.datacenter.Reference(), VirtualMachineRef: vm.Reference(), HostSystemRef: hostSystemRef, VSphere: res.vs, Zones: zones}
  107. nm.SetNodeInfo(n.Name, nodeInfo)
  108. break
  109. }
  110. }
  111. wg.Done()
  112. }()
  113. wg.Add(1)
  114. }
  115. wg.Wait()
  116. if len(nameToNodeInfo) != len(nodeList.Items) {
  117. return errors.New("all nodes not mapped to respective vSphere")
  118. }
  119. return nil
  120. }
  121. // Establish rest connection to retrieve tag manager stub
  122. func withTagsClient(ctx context.Context, connection *VSphere, f func(c *rest.Client) error) error {
  123. c := rest.NewClient(connection.Client.Client)
  124. user := neturl.UserPassword(connection.Config.Username, connection.Config.Password)
  125. if err := c.Login(ctx, user); err != nil {
  126. return err
  127. }
  128. defer c.Logout(ctx)
  129. return f(c)
  130. }
  131. // Iterates over each node and retrieves the zones in which they are placed
  132. func retrieveZoneInformationForNode(nodeName string, connection *VSphere, hostSystemRef types.ManagedObjectReference) []string {
  133. ctx, cancel := context.WithCancel(context.Background())
  134. defer cancel()
  135. var zones []string
  136. pc := connection.Client.ServiceContent.PropertyCollector
  137. withTagsClient(ctx, connection, func(c *rest.Client) error {
  138. client := tags.NewManager(c)
  139. // Example result: ["Host", "Cluster", "Datacenter"]
  140. ancestors, err := mo.Ancestors(ctx, connection.Client, pc, hostSystemRef)
  141. if err != nil {
  142. return err
  143. }
  144. var validAncestors []mo.ManagedEntity
  145. // Filter out only Datacenter, ClusterComputeResource and HostSystem type objects. These objects will be
  146. // in the following order ["Datacenter" < "ClusterComputeResource" < "HostSystem"] so that the highest
  147. // zone precedence will be received by the HostSystem type.
  148. for _, ancestor := range ancestors {
  149. moType := ancestor.ExtensibleManagedObject.Self.Type
  150. if moType == DatacenterType || moType == ClusterComputeResourceType || moType == HostSystemType {
  151. validAncestors = append(validAncestors, ancestor)
  152. }
  153. }
  154. for _, ancestor := range validAncestors {
  155. var zonesAttachedToObject []string
  156. tags, err := client.ListAttachedTags(ctx, ancestor)
  157. if err != nil {
  158. return err
  159. }
  160. for _, value := range tags {
  161. tag, err := client.GetTag(ctx, value)
  162. if err != nil {
  163. return err
  164. }
  165. category, err := client.GetCategory(ctx, tag.CategoryID)
  166. if err != nil {
  167. return err
  168. }
  169. switch {
  170. case category.Name == "k8s-zone":
  171. e2elog.Logf("Found %s associated with %s for %s", tag.Name, ancestor.Name, nodeName)
  172. zonesAttachedToObject = append(zonesAttachedToObject, tag.Name)
  173. case category.Name == "k8s-region":
  174. e2elog.Logf("Found %s associated with %s for %s", tag.Name, ancestor.Name, nodeName)
  175. }
  176. }
  177. // Overwrite zone information if it exists for this object
  178. if len(zonesAttachedToObject) != 0 {
  179. zones = zonesAttachedToObject
  180. }
  181. }
  182. return nil
  183. })
  184. return zones
  185. }
  186. // Generate zone to datastore mapping for easily verifying volume placement
  187. func (nm *NodeMapper) GenerateZoneToDatastoreMap() error {
  188. // 1. Create zone to hosts map for each VC
  189. var vcToZoneHostsMap = make(map[string](map[string][]string))
  190. // 2. Create host to datastores map for each VC
  191. var vcToHostDatastoresMap = make(map[string](map[string][]string))
  192. ctx, cancel := context.WithCancel(context.Background())
  193. defer cancel()
  194. // 3. Populate vcToZoneHostsMap and vcToHostDatastoresMap
  195. for _, nodeInfo := range nameToNodeInfo {
  196. vc := nodeInfo.VSphere.Config.Hostname
  197. host := nodeInfo.HostSystemRef.Value
  198. for _, zone := range nodeInfo.Zones {
  199. if vcToZoneHostsMap[vc] == nil {
  200. vcToZoneHostsMap[vc] = make(map[string][]string)
  201. }
  202. // Populating vcToZoneHostsMap using the HostSystemRef and Zone fields from each NodeInfo
  203. hosts := vcToZoneHostsMap[vc][zone]
  204. hosts = append(hosts, host)
  205. vcToZoneHostsMap[vc][zone] = hosts
  206. }
  207. if vcToHostDatastoresMap[vc] == nil {
  208. vcToHostDatastoresMap[vc] = make(map[string][]string)
  209. }
  210. datastores := vcToHostDatastoresMap[vc][host]
  211. // Populating vcToHostDatastoresMap by finding out the datastores mounted on node's host
  212. datastoreRefs := nodeInfo.VSphere.GetDatastoresMountedOnHost(ctx, nodeInfo.HostSystemRef)
  213. for _, datastore := range datastoreRefs {
  214. datastores = append(datastores, datastore.Value)
  215. }
  216. vcToHostDatastoresMap[vc][host] = datastores
  217. }
  218. // 4, Populate vcToZoneDatastoresMap from vcToZoneHostsMap and vcToHostDatastoresMap
  219. for vc, zoneToHostsMap := range vcToZoneHostsMap {
  220. for zone, hosts := range zoneToHostsMap {
  221. commonDatastores := retrieveCommonDatastoresAmongHosts(hosts, vcToHostDatastoresMap[vc])
  222. if vcToZoneDatastoresMap[vc] == nil {
  223. vcToZoneDatastoresMap[vc] = make(map[string][]string)
  224. }
  225. vcToZoneDatastoresMap[vc][zone] = commonDatastores
  226. }
  227. }
  228. e2elog.Logf("Zone to datastores map : %+v", vcToZoneDatastoresMap)
  229. return nil
  230. }
  231. // Retrieves the common datastores from the specified hosts
  232. func retrieveCommonDatastoresAmongHosts(hosts []string, hostToDatastoresMap map[string][]string) []string {
  233. var datastoreCountMap = make(map[string]int)
  234. for _, host := range hosts {
  235. for _, datastore := range hostToDatastoresMap[host] {
  236. datastoreCountMap[datastore] = datastoreCountMap[datastore] + 1
  237. }
  238. }
  239. var commonDatastores []string
  240. numHosts := len(hosts)
  241. for datastore, count := range datastoreCountMap {
  242. if count == numHosts {
  243. commonDatastores = append(commonDatastores, datastore)
  244. }
  245. }
  246. return commonDatastores
  247. }
  248. // Get all the datastores in the specified zone
  249. func (nm *NodeMapper) GetDatastoresInZone(vc string, zone string) []string {
  250. return vcToZoneDatastoresMap[vc][zone]
  251. }
  252. // GetNodeInfo return NodeInfo for given nodeName
  253. func (nm *NodeMapper) GetNodeInfo(nodeName string) *NodeInfo {
  254. return nameToNodeInfo[nodeName]
  255. }
  256. // SetNodeInfo sets NodeInfo for given nodeName. This function is not thread safe. Users need to handle concurrency.
  257. func (nm *NodeMapper) SetNodeInfo(nodeName string, nodeInfo *NodeInfo) {
  258. nameToNodeInfo[nodeName] = nodeInfo
  259. }