attacher.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package awsebs
  14. import (
  15. "fmt"
  16. "os"
  17. "path"
  18. "strconv"
  19. "time"
  20. "k8s.io/klog"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/kubernetes/pkg/util/mount"
  24. "k8s.io/kubernetes/pkg/volume"
  25. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  26. "k8s.io/legacy-cloud-providers/aws"
  27. )
  28. type awsElasticBlockStoreAttacher struct {
  29. host volume.VolumeHost
  30. awsVolumes aws.Volumes
  31. }
  32. var _ volume.Attacher = &awsElasticBlockStoreAttacher{}
  33. var _ volume.DeviceMounter = &awsElasticBlockStoreAttacher{}
  34. var _ volume.AttachableVolumePlugin = &awsElasticBlockStorePlugin{}
  35. var _ volume.DeviceMountableVolumePlugin = &awsElasticBlockStorePlugin{}
  36. func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error) {
  37. awsCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  38. if err != nil {
  39. return nil, err
  40. }
  41. return &awsElasticBlockStoreAttacher{
  42. host: plugin.host,
  43. awsVolumes: awsCloud,
  44. }, nil
  45. }
  46. func (plugin *awsElasticBlockStorePlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  47. return plugin.NewAttacher()
  48. }
  49. func (plugin *awsElasticBlockStorePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  50. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  51. return mounter.GetMountRefs(deviceMountPath)
  52. }
  53. func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  54. volumeSource, _, err := getVolumeSource(spec)
  55. if err != nil {
  56. return "", err
  57. }
  58. volumeID := aws.KubernetesVolumeID(volumeSource.VolumeID)
  59. // awsCloud.AttachDisk checks if disk is already attached to node and
  60. // succeeds in that case, so no need to do that separately.
  61. devicePath, err := attacher.awsVolumes.AttachDisk(volumeID, nodeName)
  62. if err != nil {
  63. klog.Errorf("Error attaching volume %q to node %q: %+v", volumeID, nodeName, err)
  64. return "", err
  65. }
  66. return devicePath, nil
  67. }
  68. func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  69. klog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for AWS", nodeName)
  70. volumeNodeMap := map[types.NodeName][]*volume.Spec{
  71. nodeName: specs,
  72. }
  73. nodeVolumesResult := make(map[*volume.Spec]bool)
  74. nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
  75. if err != nil {
  76. klog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
  77. return nodeVolumesResult, err
  78. }
  79. if result, ok := nodesVerificationMap[nodeName]; ok {
  80. return result, nil
  81. }
  82. return nodeVolumesResult, nil
  83. }
  84. func (attacher *awsElasticBlockStoreAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
  85. volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
  86. diskNamesByNode := make(map[types.NodeName][]aws.KubernetesVolumeID)
  87. volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec)
  88. for nodeName, volumeSpecs := range volumesByNode {
  89. for _, volumeSpec := range volumeSpecs {
  90. volumeSource, _, err := getVolumeSource(volumeSpec)
  91. if err != nil {
  92. klog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
  93. continue
  94. }
  95. name := aws.KubernetesVolumeID(volumeSource.VolumeID)
  96. diskNamesByNode[nodeName] = append(diskNamesByNode[nodeName], name)
  97. nodeDisk, nodeDiskExists := volumesAttachedCheck[nodeName]
  98. if !nodeDiskExists {
  99. nodeDisk = make(map[*volume.Spec]bool)
  100. }
  101. nodeDisk[volumeSpec] = true
  102. volumeSpecMap[name] = volumeSpec
  103. volumesAttachedCheck[nodeName] = nodeDisk
  104. }
  105. }
  106. attachedResult, err := attacher.awsVolumes.DisksAreAttached(diskNamesByNode)
  107. if err != nil {
  108. klog.Errorf("Error checking if volumes are attached to nodes err = %v", err)
  109. return volumesAttachedCheck, err
  110. }
  111. for nodeName, nodeDisks := range attachedResult {
  112. for diskName, attached := range nodeDisks {
  113. if !attached {
  114. spec := volumeSpecMap[diskName]
  115. setNodeDisk(volumesAttachedCheck, spec, nodeName, false)
  116. }
  117. }
  118. }
  119. return volumesAttachedCheck, nil
  120. }
  121. func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
  122. volumeSource, _, err := getVolumeSource(spec)
  123. if err != nil {
  124. return "", err
  125. }
  126. volumeID := volumeSource.VolumeID
  127. partition := ""
  128. if volumeSource.Partition != 0 {
  129. partition = strconv.Itoa(int(volumeSource.Partition))
  130. }
  131. if devicePath == "" {
  132. return "", fmt.Errorf("waitForAttach failed for AWS Volume %q: devicePath is empty", volumeID)
  133. }
  134. ticker := time.NewTicker(checkSleepDuration)
  135. defer ticker.Stop()
  136. timer := time.NewTimer(timeout)
  137. defer timer.Stop()
  138. for {
  139. select {
  140. case <-ticker.C:
  141. klog.V(5).Infof("Checking AWS Volume %q is attached.", volumeID)
  142. devicePaths := getDiskByIDPaths(aws.KubernetesVolumeID(volumeSource.VolumeID), partition, devicePath)
  143. path, err := verifyDevicePath(devicePaths)
  144. if err != nil {
  145. // Log error, if any, and continue checking periodically. See issue #11321
  146. klog.Errorf("Error verifying AWS Volume (%q) is attached: %v", volumeID, err)
  147. } else if path != "" {
  148. // A device path has successfully been created for the PD
  149. klog.Infof("Successfully found attached AWS Volume %q.", volumeID)
  150. return path, nil
  151. }
  152. case <-timer.C:
  153. return "", fmt.Errorf("could not find attached AWS Volume %q. Timeout waiting for mount paths to be created", volumeID)
  154. }
  155. }
  156. }
  157. func (attacher *awsElasticBlockStoreAttacher) GetDeviceMountPath(
  158. spec *volume.Spec) (string, error) {
  159. volumeSource, _, err := getVolumeSource(spec)
  160. if err != nil {
  161. return "", err
  162. }
  163. return makeGlobalPDPath(attacher.host, aws.KubernetesVolumeID(volumeSource.VolumeID)), nil
  164. }
  165. // FIXME: this method can be further pruned.
  166. func (attacher *awsElasticBlockStoreAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  167. mounter := attacher.host.GetMounter(awsElasticBlockStorePluginName)
  168. notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
  169. if err != nil {
  170. if os.IsNotExist(err) {
  171. if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
  172. return err
  173. }
  174. notMnt = true
  175. } else {
  176. return err
  177. }
  178. }
  179. volumeSource, readOnly, err := getVolumeSource(spec)
  180. if err != nil {
  181. return err
  182. }
  183. options := []string{}
  184. if readOnly {
  185. options = append(options, "ro")
  186. }
  187. if notMnt {
  188. diskMounter := volumeutil.NewSafeFormatAndMountFromHost(awsElasticBlockStorePluginName, attacher.host)
  189. mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
  190. err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, mountOptions)
  191. if err != nil {
  192. os.Remove(deviceMountPath)
  193. return err
  194. }
  195. }
  196. return nil
  197. }
  198. type awsElasticBlockStoreDetacher struct {
  199. mounter mount.Interface
  200. awsVolumes aws.Volumes
  201. }
  202. var _ volume.Detacher = &awsElasticBlockStoreDetacher{}
  203. var _ volume.DeviceUnmounter = &awsElasticBlockStoreDetacher{}
  204. func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error) {
  205. awsCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  206. if err != nil {
  207. return nil, err
  208. }
  209. return &awsElasticBlockStoreDetacher{
  210. mounter: plugin.host.GetMounter(plugin.GetPluginName()),
  211. awsVolumes: awsCloud,
  212. }, nil
  213. }
  214. func (plugin *awsElasticBlockStorePlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  215. return plugin.NewDetacher()
  216. }
  217. func (detacher *awsElasticBlockStoreDetacher) Detach(volumeName string, nodeName types.NodeName) error {
  218. volumeID := aws.KubernetesVolumeID(path.Base(volumeName))
  219. if _, err := detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil {
  220. klog.Errorf("Error detaching volumeID %q: %v", volumeID, err)
  221. return err
  222. }
  223. return nil
  224. }
  225. func (detacher *awsElasticBlockStoreDetacher) UnmountDevice(deviceMountPath string) error {
  226. return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
  227. }
  228. func (plugin *awsElasticBlockStorePlugin) CanAttach(spec *volume.Spec) (bool, error) {
  229. return true, nil
  230. }
  231. func (plugin *awsElasticBlockStorePlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  232. return true, nil
  233. }
  234. func setNodeDisk(
  235. nodeDiskMap map[types.NodeName]map[*volume.Spec]bool,
  236. volumeSpec *volume.Spec,
  237. nodeName types.NodeName,
  238. check bool) {
  239. volumeMap := nodeDiskMap[nodeName]
  240. if volumeMap == nil {
  241. volumeMap = make(map[*volume.Spec]bool)
  242. nodeDiskMap[nodeName] = volumeMap
  243. }
  244. volumeMap[volumeSpec] = check
  245. }