attacher.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // +build !providerless
  2. /*
  3. Copyright 2016 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package vsphere_volume
  15. import (
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "runtime"
  20. "time"
  21. "k8s.io/klog"
  22. "k8s.io/utils/keymutex"
  23. "k8s.io/utils/mount"
  24. v1 "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/kubernetes/pkg/volume"
  27. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  28. "k8s.io/legacy-cloud-providers/vsphere"
  29. )
  30. type vsphereVMDKAttacher struct {
  31. host volume.VolumeHost
  32. vsphereVolumes vsphere.Volumes
  33. }
  34. var _ volume.Attacher = &vsphereVMDKAttacher{}
  35. var _ volume.DeviceMounter = &vsphereVMDKAttacher{}
  36. var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{}
  37. var _ volume.DeviceMountableVolumePlugin = &vsphereVolumePlugin{}
  38. // Singleton key mutex for keeping attach operations for the same host atomic
  39. var attachdetachMutex = keymutex.NewHashed(0)
  40. func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) {
  41. vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &vsphereVMDKAttacher{
  46. host: plugin.host,
  47. vsphereVolumes: vsphereCloud,
  48. }, nil
  49. }
  50. func (plugin *vsphereVolumePlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  51. return plugin.NewAttacher()
  52. }
  53. // Attaches the volume specified by the given spec to the given host.
  54. // On success, returns the device path where the device was attached on the
  55. // node.
  56. // Callers are responsible for retryinging on failure.
  57. // Callers are responsible for thread safety between concurrent attach and
  58. // detach operations.
  59. func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  60. volumeSource, _, err := getVolumeSource(spec)
  61. if err != nil {
  62. return "", err
  63. }
  64. klog.V(4).Infof("vSphere: Attach disk called for node %s", nodeName)
  65. // Keeps concurrent attach operations to same host atomic
  66. attachdetachMutex.LockKey(string(nodeName))
  67. defer attachdetachMutex.UnlockKey(string(nodeName))
  68. // vsphereCloud.AttachDisk checks if disk is already attached to host and
  69. // succeeds in that case, so no need to do that separately.
  70. diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyName, nodeName)
  71. if err != nil {
  72. klog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.VolumePath, nodeName, err)
  73. return "", err
  74. }
  75. return filepath.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil
  76. }
  77. func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  78. klog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for vSphere", nodeName)
  79. volumeNodeMap := map[types.NodeName][]*volume.Spec{
  80. nodeName: specs,
  81. }
  82. nodeVolumesResult := make(map[*volume.Spec]bool)
  83. nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
  84. if err != nil {
  85. klog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
  86. return nodeVolumesResult, err
  87. }
  88. if result, ok := nodesVerificationMap[nodeName]; ok {
  89. return result, nil
  90. }
  91. return nodeVolumesResult, nil
  92. }
  93. func (attacher *vsphereVMDKAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
  94. volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
  95. volumePathsByNode := make(map[types.NodeName][]string)
  96. volumeSpecMap := make(map[string]*volume.Spec)
  97. for nodeName, volumeSpecs := range volumesByNode {
  98. for _, volumeSpec := range volumeSpecs {
  99. volumeSource, _, err := getVolumeSource(volumeSpec)
  100. if err != nil {
  101. klog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
  102. continue
  103. }
  104. volPath := volumeSource.VolumePath
  105. volumePathsByNode[nodeName] = append(volumePathsByNode[nodeName], volPath)
  106. nodeVolume, nodeVolumeExists := volumesAttachedCheck[nodeName]
  107. if !nodeVolumeExists {
  108. nodeVolume = make(map[*volume.Spec]bool)
  109. }
  110. nodeVolume[volumeSpec] = true
  111. volumeSpecMap[volPath] = volumeSpec
  112. volumesAttachedCheck[nodeName] = nodeVolume
  113. }
  114. }
  115. attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathsByNode)
  116. if err != nil {
  117. klog.Errorf("Error checking if volumes are attached to nodes: %+v. err: %v", volumePathsByNode, err)
  118. return volumesAttachedCheck, err
  119. }
  120. for nodeName, nodeVolumes := range attachedResult {
  121. for volumePath, attached := range nodeVolumes {
  122. if !attached {
  123. spec := volumeSpecMap[volumePath]
  124. setNodeVolume(volumesAttachedCheck, spec, nodeName, false)
  125. }
  126. }
  127. }
  128. return volumesAttachedCheck, nil
  129. }
  130. func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
  131. volumeSource, _, err := getVolumeSource(spec)
  132. if err != nil {
  133. return "", err
  134. }
  135. if devicePath == "" {
  136. return "", fmt.Errorf("WaitForAttach failed for VMDK %q: devicePath is empty.", volumeSource.VolumePath)
  137. }
  138. ticker := time.NewTicker(checkSleepDuration)
  139. defer ticker.Stop()
  140. timer := time.NewTimer(timeout)
  141. defer timer.Stop()
  142. for {
  143. select {
  144. case <-ticker.C:
  145. klog.V(5).Infof("Checking VMDK %q is attached", volumeSource.VolumePath)
  146. path, err := verifyDevicePath(devicePath)
  147. if err != nil {
  148. // Log error, if any, and continue checking periodically. See issue #11321
  149. klog.Warningf("Error verifying VMDK (%q) is attached: %v", volumeSource.VolumePath, err)
  150. } else if path != "" {
  151. // A device path has successfully been created for the VMDK
  152. klog.Infof("Successfully found attached VMDK %q.", volumeSource.VolumePath)
  153. return path, nil
  154. }
  155. case <-timer.C:
  156. return "", fmt.Errorf("Could not find attached VMDK %q. Timeout waiting for mount paths to be created.", volumeSource.VolumePath)
  157. }
  158. }
  159. }
  160. // GetDeviceMountPath returns a path where the device should
  161. // point which should be bind mounted for individual volumes.
  162. func (attacher *vsphereVMDKAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
  163. volumeSource, _, err := getVolumeSource(spec)
  164. if err != nil {
  165. return "", err
  166. }
  167. return makeGlobalPDPath(attacher.host, volumeSource.VolumePath), nil
  168. }
  169. // GetMountDeviceRefs finds all other references to the device referenced
  170. // by deviceMountPath; returns a list of paths.
  171. func (plugin *vsphereVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  172. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  173. return mounter.GetMountRefs(deviceMountPath)
  174. }
  175. // MountDevice mounts device to global mount point.
  176. func (attacher *vsphereVMDKAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  177. klog.Info("vsphere MountDevice", devicePath, deviceMountPath)
  178. mounter := attacher.host.GetMounter(vsphereVolumePluginName)
  179. notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
  180. if err != nil {
  181. if os.IsNotExist(err) {
  182. dir := deviceMountPath
  183. if runtime.GOOS == "windows" {
  184. dir = filepath.Dir(deviceMountPath)
  185. }
  186. if err := os.MkdirAll(dir, 0750); err != nil {
  187. klog.Errorf("Failed to create directory at %#v. err: %s", dir, err)
  188. return err
  189. }
  190. notMnt = true
  191. } else {
  192. return err
  193. }
  194. }
  195. volumeSource, _, err := getVolumeSource(spec)
  196. if err != nil {
  197. return err
  198. }
  199. options := []string{}
  200. if notMnt {
  201. diskMounter := volumeutil.NewSafeFormatAndMountFromHost(vsphereVolumePluginName, attacher.host)
  202. mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
  203. err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, mountOptions)
  204. if err != nil {
  205. os.Remove(deviceMountPath)
  206. return err
  207. }
  208. klog.V(4).Infof("formatting spec %v devicePath %v deviceMountPath %v fs %v with options %+v", spec.Name(), devicePath, deviceMountPath, volumeSource.FSType, options)
  209. }
  210. return nil
  211. }
  212. type vsphereVMDKDetacher struct {
  213. mounter mount.Interface
  214. vsphereVolumes vsphere.Volumes
  215. }
  216. var _ volume.Detacher = &vsphereVMDKDetacher{}
  217. var _ volume.DeviceUnmounter = &vsphereVMDKDetacher{}
  218. func (plugin *vsphereVolumePlugin) NewDetacher() (volume.Detacher, error) {
  219. vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  220. if err != nil {
  221. return nil, err
  222. }
  223. return &vsphereVMDKDetacher{
  224. mounter: plugin.host.GetMounter(plugin.GetPluginName()),
  225. vsphereVolumes: vsphereCloud,
  226. }, nil
  227. }
  228. func (plugin *vsphereVolumePlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  229. return plugin.NewDetacher()
  230. }
  231. // Detach the given device from the given node.
  232. func (detacher *vsphereVMDKDetacher) Detach(volumeName string, nodeName types.NodeName) error {
  233. volPath := getVolPathfromVolumeName(volumeName)
  234. attached, err := detacher.vsphereVolumes.DiskIsAttached(volPath, nodeName)
  235. if err != nil {
  236. // Log error and continue with detach
  237. klog.Errorf(
  238. "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
  239. volPath, nodeName, err)
  240. }
  241. if err == nil && !attached {
  242. // Volume is already detached from node.
  243. klog.Infof("detach operation was successful. volume %q is already detached from node %q.", volPath, nodeName)
  244. return nil
  245. }
  246. attachdetachMutex.LockKey(string(nodeName))
  247. defer attachdetachMutex.UnlockKey(string(nodeName))
  248. if err := detacher.vsphereVolumes.DetachDisk(volPath, nodeName); err != nil {
  249. klog.Errorf("Error detaching volume %q: %v", volPath, err)
  250. return err
  251. }
  252. return nil
  253. }
  254. func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error {
  255. return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
  256. }
  257. func (plugin *vsphereVolumePlugin) CanAttach(spec *volume.Spec) (bool, error) {
  258. return true, nil
  259. }
  260. func (plugin *vsphereVolumePlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  261. return true, nil
  262. }
  263. func setNodeVolume(
  264. nodeVolumeMap map[types.NodeName]map[*volume.Spec]bool,
  265. volumeSpec *volume.Spec,
  266. nodeName types.NodeName,
  267. check bool) {
  268. volumeMap := nodeVolumeMap[nodeName]
  269. if volumeMap == nil {
  270. volumeMap = make(map[*volume.Spec]bool)
  271. nodeVolumeMap[nodeName] = volumeMap
  272. }
  273. volumeMap[volumeSpec] = check
  274. }