nodemapper.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package vsphere
  14. import (
  15. "context"
  16. "errors"
  17. "strings"
  18. "sync"
  19. "github.com/vmware/govmomi/object"
  20. "github.com/vmware/govmomi/vapi/rest"
  21. "github.com/vmware/govmomi/vapi/tags"
  22. "github.com/vmware/govmomi/vim25/mo"
  23. "github.com/vmware/govmomi/vim25/types"
  24. v1 "k8s.io/api/core/v1"
  25. "k8s.io/kubernetes/test/e2e/framework"
  26. neturl "net/url"
  27. )
  28. // NodeMapper contains information to generate nameToNodeInfo and vcToZoneDatastore maps
  29. type NodeMapper struct {
  30. }
  31. // NodeInfo contains information about vcenter nodes
  32. type NodeInfo struct {
  33. Name string
  34. DataCenterRef types.ManagedObjectReference
  35. VirtualMachineRef types.ManagedObjectReference
  36. HostSystemRef types.ManagedObjectReference
  37. VSphere *VSphere
  38. Zones []string
  39. }
  40. const (
  41. datacenterType = "Datacenter"
  42. clusterComputeResourceType = "ClusterComputeResource"
  43. hostSystemType = "HostSystem"
  44. )
  45. var (
  46. nameToNodeInfo = make(map[string]*NodeInfo)
  47. vcToZoneDatastoresMap = make(map[string](map[string][]string))
  48. )
  49. // GenerateNodeMap populates node name to node info map
  50. func (nm *NodeMapper) GenerateNodeMap(vSphereInstances map[string]*VSphere, nodeList v1.NodeList) error {
  51. type VMSearch struct {
  52. vs *VSphere
  53. datacenter *object.Datacenter
  54. }
  55. var wg sync.WaitGroup
  56. var queueChannel []*VMSearch
  57. var datacenters []*object.Datacenter
  58. var err error
  59. for _, vs := range vSphereInstances {
  60. // Create context
  61. ctx, cancel := context.WithCancel(context.Background())
  62. defer cancel()
  63. if vs.Config.Datacenters == "" {
  64. datacenters, err = vs.GetAllDatacenter(ctx)
  65. if err != nil {
  66. framework.Logf("NodeMapper error: %v", err)
  67. continue
  68. }
  69. } else {
  70. dcName := strings.Split(vs.Config.Datacenters, ",")
  71. for _, dc := range dcName {
  72. dc = strings.TrimSpace(dc)
  73. if dc == "" {
  74. continue
  75. }
  76. datacenter, err := vs.GetDatacenter(ctx, dc)
  77. if err != nil {
  78. framework.Logf("NodeMapper error dc: %s \n err: %v", dc, err)
  79. continue
  80. }
  81. datacenters = append(datacenters, datacenter)
  82. }
  83. }
  84. for _, dc := range datacenters {
  85. framework.Logf("Search candidates vc=%s and datacenter=%s", vs.Config.Hostname, dc.Name())
  86. queueChannel = append(queueChannel, &VMSearch{vs: vs, datacenter: dc})
  87. }
  88. }
  89. for _, node := range nodeList.Items {
  90. n := node
  91. go func() {
  92. nodeUUID := getUUIDFromProviderID(n.Spec.ProviderID)
  93. framework.Logf("Searching for node with UUID: %s", nodeUUID)
  94. for _, res := range queueChannel {
  95. ctx, cancel := context.WithCancel(context.Background())
  96. defer cancel()
  97. vm, err := res.vs.GetVMByUUID(ctx, nodeUUID, res.datacenter)
  98. if err != nil {
  99. framework.Logf("Error %v while looking for node=%s in vc=%s and datacenter=%s",
  100. err, n.Name, res.vs.Config.Hostname, res.datacenter.Name())
  101. continue
  102. }
  103. if vm != nil {
  104. hostSystemRef := res.vs.GetHostFromVMReference(ctx, vm.Reference())
  105. zones := retrieveZoneInformationForNode(n.Name, res.vs, hostSystemRef)
  106. framework.Logf("Found node %s as vm=%+v placed on host=%+v under zones %s in vc=%s and datacenter=%s",
  107. n.Name, vm, hostSystemRef, zones, res.vs.Config.Hostname, res.datacenter.Name())
  108. nodeInfo := &NodeInfo{Name: n.Name, DataCenterRef: res.datacenter.Reference(), VirtualMachineRef: vm.Reference(), HostSystemRef: hostSystemRef, VSphere: res.vs, Zones: zones}
  109. nm.SetNodeInfo(n.Name, nodeInfo)
  110. break
  111. }
  112. }
  113. wg.Done()
  114. }()
  115. wg.Add(1)
  116. }
  117. wg.Wait()
  118. if len(nameToNodeInfo) != len(nodeList.Items) {
  119. return errors.New("all nodes not mapped to respective vSphere")
  120. }
  121. return nil
  122. }
  123. // Establish rest connection to retrieve tag manager stub
  124. func withTagsClient(ctx context.Context, connection *VSphere, f func(c *rest.Client) error) error {
  125. c := rest.NewClient(connection.Client.Client)
  126. user := neturl.UserPassword(connection.Config.Username, connection.Config.Password)
  127. if err := c.Login(ctx, user); err != nil {
  128. return err
  129. }
  130. defer c.Logout(ctx)
  131. return f(c)
  132. }
  133. // Iterates over each node and retrieves the zones in which they are placed
  134. func retrieveZoneInformationForNode(nodeName string, connection *VSphere, hostSystemRef types.ManagedObjectReference) []string {
  135. ctx, cancel := context.WithCancel(context.Background())
  136. defer cancel()
  137. var zones []string
  138. pc := connection.Client.ServiceContent.PropertyCollector
  139. withTagsClient(ctx, connection, func(c *rest.Client) error {
  140. client := tags.NewManager(c)
  141. // Example result: ["Host", "Cluster", "Datacenter"]
  142. ancestors, err := mo.Ancestors(ctx, connection.Client, pc, hostSystemRef)
  143. if err != nil {
  144. return err
  145. }
  146. var validAncestors []mo.ManagedEntity
  147. // Filter out only Datacenter, ClusterComputeResource and HostSystem type objects. These objects will be
  148. // in the following order ["Datacenter" < "ClusterComputeResource" < "HostSystem"] so that the highest
  149. // zone precedence will be received by the HostSystem type.
  150. for _, ancestor := range ancestors {
  151. moType := ancestor.ExtensibleManagedObject.Self.Type
  152. if moType == datacenterType || moType == clusterComputeResourceType || moType == hostSystemType {
  153. validAncestors = append(validAncestors, ancestor)
  154. }
  155. }
  156. for _, ancestor := range validAncestors {
  157. var zonesAttachedToObject []string
  158. tags, err := client.ListAttachedTags(ctx, ancestor)
  159. if err != nil {
  160. return err
  161. }
  162. for _, value := range tags {
  163. tag, err := client.GetTag(ctx, value)
  164. if err != nil {
  165. return err
  166. }
  167. category, err := client.GetCategory(ctx, tag.CategoryID)
  168. if err != nil {
  169. return err
  170. }
  171. switch {
  172. case category.Name == "k8s-zone":
  173. framework.Logf("Found %s associated with %s for %s", tag.Name, ancestor.Name, nodeName)
  174. zonesAttachedToObject = append(zonesAttachedToObject, tag.Name)
  175. case category.Name == "k8s-region":
  176. framework.Logf("Found %s associated with %s for %s", tag.Name, ancestor.Name, nodeName)
  177. }
  178. }
  179. // Overwrite zone information if it exists for this object
  180. if len(zonesAttachedToObject) != 0 {
  181. zones = zonesAttachedToObject
  182. }
  183. }
  184. return nil
  185. })
  186. return zones
  187. }
  188. // GenerateZoneToDatastoreMap generates a mapping of zone to datastore for easily verifying volume placement
  189. func (nm *NodeMapper) GenerateZoneToDatastoreMap() error {
  190. // 1. Create zone to hosts map for each VC
  191. var vcToZoneHostsMap = make(map[string](map[string][]string))
  192. // 2. Create host to datastores map for each VC
  193. var vcToHostDatastoresMap = make(map[string](map[string][]string))
  194. ctx, cancel := context.WithCancel(context.Background())
  195. defer cancel()
  196. // 3. Populate vcToZoneHostsMap and vcToHostDatastoresMap
  197. for _, nodeInfo := range nameToNodeInfo {
  198. vc := nodeInfo.VSphere.Config.Hostname
  199. host := nodeInfo.HostSystemRef.Value
  200. for _, zone := range nodeInfo.Zones {
  201. if vcToZoneHostsMap[vc] == nil {
  202. vcToZoneHostsMap[vc] = make(map[string][]string)
  203. }
  204. // Populating vcToZoneHostsMap using the HostSystemRef and Zone fields from each NodeInfo
  205. hosts := vcToZoneHostsMap[vc][zone]
  206. hosts = append(hosts, host)
  207. vcToZoneHostsMap[vc][zone] = hosts
  208. }
  209. if vcToHostDatastoresMap[vc] == nil {
  210. vcToHostDatastoresMap[vc] = make(map[string][]string)
  211. }
  212. datastores := vcToHostDatastoresMap[vc][host]
  213. // Populating vcToHostDatastoresMap by finding out the datastores mounted on node's host
  214. datastoreRefs := nodeInfo.VSphere.GetDatastoresMountedOnHost(ctx, nodeInfo.HostSystemRef)
  215. for _, datastore := range datastoreRefs {
  216. datastores = append(datastores, datastore.Value)
  217. }
  218. vcToHostDatastoresMap[vc][host] = datastores
  219. }
  220. // 4, Populate vcToZoneDatastoresMap from vcToZoneHostsMap and vcToHostDatastoresMap
  221. for vc, zoneToHostsMap := range vcToZoneHostsMap {
  222. for zone, hosts := range zoneToHostsMap {
  223. commonDatastores := retrieveCommonDatastoresAmongHosts(hosts, vcToHostDatastoresMap[vc])
  224. if vcToZoneDatastoresMap[vc] == nil {
  225. vcToZoneDatastoresMap[vc] = make(map[string][]string)
  226. }
  227. vcToZoneDatastoresMap[vc][zone] = commonDatastores
  228. }
  229. }
  230. framework.Logf("Zone to datastores map : %+v", vcToZoneDatastoresMap)
  231. return nil
  232. }
  233. // retrieveCommonDatastoresAmongHosts retrieves the common datastores from the specified hosts
  234. func retrieveCommonDatastoresAmongHosts(hosts []string, hostToDatastoresMap map[string][]string) []string {
  235. var datastoreCountMap = make(map[string]int)
  236. for _, host := range hosts {
  237. for _, datastore := range hostToDatastoresMap[host] {
  238. datastoreCountMap[datastore] = datastoreCountMap[datastore] + 1
  239. }
  240. }
  241. var commonDatastores []string
  242. numHosts := len(hosts)
  243. for datastore, count := range datastoreCountMap {
  244. if count == numHosts {
  245. commonDatastores = append(commonDatastores, datastore)
  246. }
  247. }
  248. return commonDatastores
  249. }
  250. // GetDatastoresInZone returns all the datastores in the specified zone
  251. func (nm *NodeMapper) GetDatastoresInZone(vc string, zone string) []string {
  252. return vcToZoneDatastoresMap[vc][zone]
  253. }
  254. // GetNodeInfo returns NodeInfo for given nodeName
  255. func (nm *NodeMapper) GetNodeInfo(nodeName string) *NodeInfo {
  256. return nameToNodeInfo[nodeName]
  257. }
  258. // SetNodeInfo sets NodeInfo for given nodeName. This function is not thread safe. Users need to handle concurrency.
  259. func (nm *NodeMapper) SetNodeInfo(nodeName string, nodeInfo *NodeInfo) {
  260. nameToNodeInfo[nodeName] = nodeInfo
  261. }