attacher.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package vsphere_volume
  14. import (
  15. "fmt"
  16. "os"
  17. "path/filepath"
  18. "time"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/types"
  21. "k8s.io/klog"
  22. "k8s.io/kubernetes/pkg/util/mount"
  23. "k8s.io/kubernetes/pkg/volume"
  24. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  25. "k8s.io/legacy-cloud-providers/vsphere"
  26. "k8s.io/utils/keymutex"
  27. )
  28. type vsphereVMDKAttacher struct {
  29. host volume.VolumeHost
  30. vsphereVolumes vsphere.Volumes
  31. }
  32. var _ volume.Attacher = &vsphereVMDKAttacher{}
  33. var _ volume.DeviceMounter = &vsphereVMDKAttacher{}
  34. var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{}
  35. var _ volume.DeviceMountableVolumePlugin = &vsphereVolumePlugin{}
  36. // Singleton key mutex for keeping attach operations for the same host atomic
  37. var attachdetachMutex = keymutex.NewHashed(0)
  38. func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) {
  39. vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  40. if err != nil {
  41. return nil, err
  42. }
  43. return &vsphereVMDKAttacher{
  44. host: plugin.host,
  45. vsphereVolumes: vsphereCloud,
  46. }, nil
  47. }
  48. func (plugin *vsphereVolumePlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  49. return plugin.NewAttacher()
  50. }
  51. // Attaches the volume specified by the given spec to the given host.
  52. // On success, returns the device path where the device was attached on the
  53. // node.
  54. // Callers are responsible for retryinging on failure.
  55. // Callers are responsible for thread safety between concurrent attach and
  56. // detach operations.
  57. func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  58. volumeSource, _, err := getVolumeSource(spec)
  59. if err != nil {
  60. return "", err
  61. }
  62. klog.V(4).Infof("vSphere: Attach disk called for node %s", nodeName)
  63. // Keeps concurrent attach operations to same host atomic
  64. attachdetachMutex.LockKey(string(nodeName))
  65. defer attachdetachMutex.UnlockKey(string(nodeName))
  66. // vsphereCloud.AttachDisk checks if disk is already attached to host and
  67. // succeeds in that case, so no need to do that separately.
  68. diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyName, nodeName)
  69. if err != nil {
  70. klog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.VolumePath, nodeName, err)
  71. return "", err
  72. }
  73. return filepath.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil
  74. }
  75. func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  76. klog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for vSphere", nodeName)
  77. volumeNodeMap := map[types.NodeName][]*volume.Spec{
  78. nodeName: specs,
  79. }
  80. nodeVolumesResult := make(map[*volume.Spec]bool)
  81. nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
  82. if err != nil {
  83. klog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
  84. return nodeVolumesResult, err
  85. }
  86. if result, ok := nodesVerificationMap[nodeName]; ok {
  87. return result, nil
  88. }
  89. return nodeVolumesResult, nil
  90. }
  91. func (attacher *vsphereVMDKAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
  92. volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
  93. volumePathsByNode := make(map[types.NodeName][]string)
  94. volumeSpecMap := make(map[string]*volume.Spec)
  95. for nodeName, volumeSpecs := range volumesByNode {
  96. for _, volumeSpec := range volumeSpecs {
  97. volumeSource, _, err := getVolumeSource(volumeSpec)
  98. if err != nil {
  99. klog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
  100. continue
  101. }
  102. volPath := volumeSource.VolumePath
  103. volumePathsByNode[nodeName] = append(volumePathsByNode[nodeName], volPath)
  104. nodeVolume, nodeVolumeExists := volumesAttachedCheck[nodeName]
  105. if !nodeVolumeExists {
  106. nodeVolume = make(map[*volume.Spec]bool)
  107. }
  108. nodeVolume[volumeSpec] = true
  109. volumeSpecMap[volPath] = volumeSpec
  110. volumesAttachedCheck[nodeName] = nodeVolume
  111. }
  112. }
  113. attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathsByNode)
  114. if err != nil {
  115. klog.Errorf("Error checking if volumes are attached to nodes: %+v. err: %v", volumePathsByNode, err)
  116. return volumesAttachedCheck, err
  117. }
  118. for nodeName, nodeVolumes := range attachedResult {
  119. for volumePath, attached := range nodeVolumes {
  120. if !attached {
  121. spec := volumeSpecMap[volumePath]
  122. setNodeVolume(volumesAttachedCheck, spec, nodeName, false)
  123. }
  124. }
  125. }
  126. return volumesAttachedCheck, nil
  127. }
  128. func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
  129. volumeSource, _, err := getVolumeSource(spec)
  130. if err != nil {
  131. return "", err
  132. }
  133. if devicePath == "" {
  134. return "", fmt.Errorf("WaitForAttach failed for VMDK %q: devicePath is empty.", volumeSource.VolumePath)
  135. }
  136. ticker := time.NewTicker(checkSleepDuration)
  137. defer ticker.Stop()
  138. timer := time.NewTimer(timeout)
  139. defer timer.Stop()
  140. for {
  141. select {
  142. case <-ticker.C:
  143. klog.V(5).Infof("Checking VMDK %q is attached", volumeSource.VolumePath)
  144. path, err := verifyDevicePath(devicePath)
  145. if err != nil {
  146. // Log error, if any, and continue checking periodically. See issue #11321
  147. klog.Warningf("Error verifying VMDK (%q) is attached: %v", volumeSource.VolumePath, err)
  148. } else if path != "" {
  149. // A device path has successfully been created for the VMDK
  150. klog.Infof("Successfully found attached VMDK %q.", volumeSource.VolumePath)
  151. return path, nil
  152. }
  153. case <-timer.C:
  154. return "", fmt.Errorf("Could not find attached VMDK %q. Timeout waiting for mount paths to be created.", volumeSource.VolumePath)
  155. }
  156. }
  157. }
  158. // GetDeviceMountPath returns a path where the device should
  159. // point which should be bind mounted for individual volumes.
  160. func (attacher *vsphereVMDKAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
  161. volumeSource, _, err := getVolumeSource(spec)
  162. if err != nil {
  163. return "", err
  164. }
  165. return makeGlobalPDPath(attacher.host, volumeSource.VolumePath), nil
  166. }
  167. // GetMountDeviceRefs finds all other references to the device referenced
  168. // by deviceMountPath; returns a list of paths.
  169. func (plugin *vsphereVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  170. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  171. return mounter.GetMountRefs(deviceMountPath)
  172. }
  173. // MountDevice mounts device to global mount point.
  174. func (attacher *vsphereVMDKAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  175. mounter := attacher.host.GetMounter(vsphereVolumePluginName)
  176. notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
  177. if err != nil {
  178. if os.IsNotExist(err) {
  179. if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
  180. klog.Errorf("Failed to create directory at %#v. err: %s", deviceMountPath, err)
  181. return err
  182. }
  183. notMnt = true
  184. } else {
  185. return err
  186. }
  187. }
  188. volumeSource, _, err := getVolumeSource(spec)
  189. if err != nil {
  190. return err
  191. }
  192. options := []string{}
  193. if notMnt {
  194. diskMounter := volumeutil.NewSafeFormatAndMountFromHost(vsphereVolumePluginName, attacher.host)
  195. mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
  196. err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, mountOptions)
  197. if err != nil {
  198. os.Remove(deviceMountPath)
  199. return err
  200. }
  201. klog.V(4).Infof("formatting spec %v devicePath %v deviceMountPath %v fs %v with options %+v", spec.Name(), devicePath, deviceMountPath, volumeSource.FSType, options)
  202. }
  203. return nil
  204. }
  205. type vsphereVMDKDetacher struct {
  206. mounter mount.Interface
  207. vsphereVolumes vsphere.Volumes
  208. }
  209. var _ volume.Detacher = &vsphereVMDKDetacher{}
  210. var _ volume.DeviceUnmounter = &vsphereVMDKDetacher{}
  211. func (plugin *vsphereVolumePlugin) NewDetacher() (volume.Detacher, error) {
  212. vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  213. if err != nil {
  214. return nil, err
  215. }
  216. return &vsphereVMDKDetacher{
  217. mounter: plugin.host.GetMounter(plugin.GetPluginName()),
  218. vsphereVolumes: vsphereCloud,
  219. }, nil
  220. }
  221. func (plugin *vsphereVolumePlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  222. return plugin.NewDetacher()
  223. }
  224. // Detach the given device from the given node.
  225. func (detacher *vsphereVMDKDetacher) Detach(volumeName string, nodeName types.NodeName) error {
  226. volPath := getVolPathfromVolumeName(volumeName)
  227. attached, err := detacher.vsphereVolumes.DiskIsAttached(volPath, nodeName)
  228. if err != nil {
  229. // Log error and continue with detach
  230. klog.Errorf(
  231. "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
  232. volPath, nodeName, err)
  233. }
  234. if err == nil && !attached {
  235. // Volume is already detached from node.
  236. klog.Infof("detach operation was successful. volume %q is already detached from node %q.", volPath, nodeName)
  237. return nil
  238. }
  239. attachdetachMutex.LockKey(string(nodeName))
  240. defer attachdetachMutex.UnlockKey(string(nodeName))
  241. if err := detacher.vsphereVolumes.DetachDisk(volPath, nodeName); err != nil {
  242. klog.Errorf("Error detaching volume %q: %v", volPath, err)
  243. return err
  244. }
  245. return nil
  246. }
  247. func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error {
  248. return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
  249. }
  250. func (plugin *vsphereVolumePlugin) CanAttach(spec *volume.Spec) (bool, error) {
  251. return true, nil
  252. }
  253. func (plugin *vsphereVolumePlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  254. return true, nil
  255. }
  256. func setNodeVolume(
  257. nodeVolumeMap map[types.NodeName]map[*volume.Spec]bool,
  258. volumeSpec *volume.Spec,
  259. nodeName types.NodeName,
  260. check bool) {
  261. volumeMap := nodeVolumeMap[nodeName]
  262. if volumeMap == nil {
  263. volumeMap = make(map[*volume.Spec]bool)
  264. nodeVolumeMap[nodeName] = volumeMap
  265. }
  266. volumeMap[volumeSpec] = check
  267. }