attacher.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. // +build !providerless
  2. /*
  3. Copyright 2016 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package cinder
  15. import (
  16. "context"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strings"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/klog"
  26. "k8s.io/utils/mount"
  27. "k8s.io/kubernetes/pkg/volume"
  28. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  29. )
  30. type cinderDiskAttacher struct {
  31. host volume.VolumeHost
  32. cinderProvider BlockStorageProvider
  33. }
  34. var _ volume.Attacher = &cinderDiskAttacher{}
  35. var _ volume.DeviceMounter = &cinderDiskAttacher{}
  36. var _ volume.AttachableVolumePlugin = &cinderPlugin{}
  37. var _ volume.DeviceMountableVolumePlugin = &cinderPlugin{}
  38. const (
  39. probeVolumeInitDelay = 1 * time.Second
  40. probeVolumeFactor = 2.0
  41. operationFinishInitDelay = 1 * time.Second
  42. operationFinishFactor = 1.1
  43. operationFinishSteps = 10
  44. diskAttachInitDelay = 1 * time.Second
  45. diskAttachFactor = 1.2
  46. diskAttachSteps = 15
  47. diskDetachInitDelay = 1 * time.Second
  48. diskDetachFactor = 1.2
  49. diskDetachSteps = 13
  50. )
  51. func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
  52. cinder, err := plugin.getCloudProvider()
  53. if err != nil {
  54. return nil, err
  55. }
  56. return &cinderDiskAttacher{
  57. host: plugin.host,
  58. cinderProvider: cinder,
  59. }, nil
  60. }
  61. func (plugin *cinderPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  62. return plugin.NewAttacher()
  63. }
  64. func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  65. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  66. return mounter.GetMountRefs(deviceMountPath)
  67. }
  68. func (attacher *cinderDiskAttacher) waitOperationFinished(volumeID string) error {
  69. backoff := wait.Backoff{
  70. Duration: operationFinishInitDelay,
  71. Factor: operationFinishFactor,
  72. Steps: operationFinishSteps,
  73. }
  74. var volumeStatus string
  75. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  76. var pending bool
  77. var err error
  78. pending, volumeStatus, err = attacher.cinderProvider.OperationPending(volumeID)
  79. if err != nil {
  80. return false, err
  81. }
  82. return !pending, nil
  83. })
  84. if err == wait.ErrWaitTimeout {
  85. err = fmt.Errorf("Volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
  86. }
  87. return err
  88. }
  89. func (attacher *cinderDiskAttacher) waitDiskAttached(instanceID, volumeID string) error {
  90. backoff := wait.Backoff{
  91. Duration: diskAttachInitDelay,
  92. Factor: diskAttachFactor,
  93. Steps: diskAttachSteps,
  94. }
  95. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  96. attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  97. if err != nil {
  98. return false, err
  99. }
  100. return attached, nil
  101. })
  102. if err == wait.ErrWaitTimeout {
  103. err = fmt.Errorf("Volume %q failed to be attached within the alloted time", volumeID)
  104. }
  105. return err
  106. }
  107. func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  108. volumeID, _, _, err := getVolumeInfo(spec)
  109. if err != nil {
  110. return "", err
  111. }
  112. instanceID, err := attacher.nodeInstanceID(nodeName)
  113. if err != nil {
  114. return "", err
  115. }
  116. if err := attacher.waitOperationFinished(volumeID); err != nil {
  117. return "", err
  118. }
  119. attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  120. if err != nil {
  121. // Log error and continue with attach
  122. klog.Warningf(
  123. "Error checking if volume (%q) is already attached to current instance (%q). Will continue and try attach anyway. err=%v",
  124. volumeID, instanceID, err)
  125. }
  126. if err == nil && attached {
  127. // Volume is already attached to instance.
  128. klog.Infof("Attach operation is successful. volume %q is already attached to instance %q.", volumeID, instanceID)
  129. } else {
  130. _, err = attacher.cinderProvider.AttachDisk(instanceID, volumeID)
  131. if err == nil {
  132. if err = attacher.waitDiskAttached(instanceID, volumeID); err != nil {
  133. klog.Errorf("Error waiting for volume %q to be attached from node %q: %v", volumeID, nodeName, err)
  134. return "", err
  135. }
  136. klog.Infof("Attach operation successful: volume %q attached to instance %q.", volumeID, instanceID)
  137. } else {
  138. klog.Infof("Attach volume %q to instance %q failed with: %v", volumeID, instanceID, err)
  139. return "", err
  140. }
  141. }
  142. devicePath, err := attacher.cinderProvider.GetAttachmentDiskPath(instanceID, volumeID)
  143. if err != nil {
  144. klog.Infof("Can not get device path of volume %q which be attached to instance %q, failed with: %v", volumeID, instanceID, err)
  145. return "", err
  146. }
  147. return devicePath, nil
  148. }
  149. func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  150. volumesAttachedCheck := make(map[*volume.Spec]bool)
  151. volumeSpecMap := make(map[string]*volume.Spec)
  152. volumeIDList := []string{}
  153. for _, spec := range specs {
  154. volumeID, _, _, err := getVolumeInfo(spec)
  155. if err != nil {
  156. klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
  157. continue
  158. }
  159. volumeIDList = append(volumeIDList, volumeID)
  160. volumesAttachedCheck[spec] = true
  161. volumeSpecMap[volumeID] = spec
  162. }
  163. attachedResult, err := attacher.cinderProvider.DisksAreAttachedByName(nodeName, volumeIDList)
  164. if err != nil {
  165. // Log error and continue with attach
  166. klog.Errorf(
  167. "Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v",
  168. volumeIDList, nodeName, err)
  169. return volumesAttachedCheck, err
  170. }
  171. for volumeID, attached := range attachedResult {
  172. if !attached {
  173. spec := volumeSpecMap[volumeID]
  174. volumesAttachedCheck[spec] = false
  175. klog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
  176. }
  177. }
  178. return volumesAttachedCheck, nil
  179. }
  180. func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
  181. // NOTE: devicePath is path as reported by Cinder, which may be incorrect and should not be used. See Issue #33128
  182. volumeID, _, _, err := getVolumeInfo(spec)
  183. if err != nil {
  184. return "", err
  185. }
  186. if devicePath == "" {
  187. return "", fmt.Errorf("WaitForAttach failed for Cinder disk %q: devicePath is empty", volumeID)
  188. }
  189. ticker := time.NewTicker(probeVolumeInitDelay)
  190. defer ticker.Stop()
  191. timer := time.NewTimer(timeout)
  192. defer timer.Stop()
  193. duration := probeVolumeInitDelay
  194. for {
  195. select {
  196. case <-ticker.C:
  197. klog.V(5).Infof("Checking Cinder disk %q is attached.", volumeID)
  198. probeAttachedVolume()
  199. if !attacher.cinderProvider.ShouldTrustDevicePath() {
  200. // Using the Cinder volume ID, find the real device path (See Issue #33128)
  201. devicePath = attacher.cinderProvider.GetDevicePath(volumeID)
  202. }
  203. exists, err := mount.PathExists(devicePath)
  204. if exists && err == nil {
  205. klog.Infof("Successfully found attached Cinder disk %q at %v.", volumeID, devicePath)
  206. return devicePath, nil
  207. }
  208. // Log an error, and continue checking periodically
  209. klog.Errorf("Error: could not find attached Cinder disk %q (path: %q): %v", volumeID, devicePath, err)
  210. // Using exponential backoff instead of linear
  211. ticker.Stop()
  212. duration = time.Duration(float64(duration) * probeVolumeFactor)
  213. ticker = time.NewTicker(duration)
  214. case <-timer.C:
  215. return "", fmt.Errorf("could not find attached Cinder disk %q. Timeout waiting for mount paths to be created", volumeID)
  216. }
  217. }
  218. }
  219. func (attacher *cinderDiskAttacher) GetDeviceMountPath(
  220. spec *volume.Spec) (string, error) {
  221. volumeID, _, _, err := getVolumeInfo(spec)
  222. if err != nil {
  223. return "", err
  224. }
  225. return makeGlobalPDName(attacher.host, volumeID), nil
  226. }
  227. // FIXME: this method can be further pruned.
  228. func (attacher *cinderDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  229. mounter := attacher.host.GetMounter(cinderVolumePluginName)
  230. notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
  231. if err != nil {
  232. if os.IsNotExist(err) {
  233. if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
  234. return err
  235. }
  236. notMnt = true
  237. } else {
  238. return err
  239. }
  240. }
  241. _, volumeFSType, readOnly, err := getVolumeInfo(spec)
  242. if err != nil {
  243. return err
  244. }
  245. options := []string{}
  246. if readOnly {
  247. options = append(options, "ro")
  248. }
  249. if notMnt {
  250. diskMounter := volumeutil.NewSafeFormatAndMountFromHost(cinderVolumePluginName, attacher.host)
  251. mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
  252. err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeFSType, mountOptions)
  253. if err != nil {
  254. os.Remove(deviceMountPath)
  255. return err
  256. }
  257. }
  258. return nil
  259. }
  260. type cinderDiskDetacher struct {
  261. mounter mount.Interface
  262. cinderProvider BlockStorageProvider
  263. }
  264. var _ volume.Detacher = &cinderDiskDetacher{}
  265. var _ volume.DeviceUnmounter = &cinderDiskDetacher{}
  266. func (plugin *cinderPlugin) NewDetacher() (volume.Detacher, error) {
  267. cinder, err := plugin.getCloudProvider()
  268. if err != nil {
  269. return nil, err
  270. }
  271. return &cinderDiskDetacher{
  272. mounter: plugin.host.GetMounter(plugin.GetPluginName()),
  273. cinderProvider: cinder,
  274. }, nil
  275. }
  276. func (plugin *cinderPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  277. return plugin.NewDetacher()
  278. }
  279. func (detacher *cinderDiskDetacher) waitOperationFinished(volumeID string) error {
  280. backoff := wait.Backoff{
  281. Duration: operationFinishInitDelay,
  282. Factor: operationFinishFactor,
  283. Steps: operationFinishSteps,
  284. }
  285. var volumeStatus string
  286. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  287. var pending bool
  288. var err error
  289. pending, volumeStatus, err = detacher.cinderProvider.OperationPending(volumeID)
  290. if err != nil {
  291. return false, err
  292. }
  293. return !pending, nil
  294. })
  295. if err == wait.ErrWaitTimeout {
  296. err = fmt.Errorf("Volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
  297. }
  298. return err
  299. }
  300. func (detacher *cinderDiskDetacher) waitDiskDetached(instanceID, volumeID string) error {
  301. backoff := wait.Backoff{
  302. Duration: diskDetachInitDelay,
  303. Factor: diskDetachFactor,
  304. Steps: diskDetachSteps,
  305. }
  306. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  307. attached, err := detacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  308. if err != nil {
  309. return false, err
  310. }
  311. return !attached, nil
  312. })
  313. if err == wait.ErrWaitTimeout {
  314. err = fmt.Errorf("Volume %q failed to detach within the alloted time", volumeID)
  315. }
  316. return err
  317. }
  318. func (detacher *cinderDiskDetacher) Detach(volumeName string, nodeName types.NodeName) error {
  319. volumeID := path.Base(volumeName)
  320. if err := detacher.waitOperationFinished(volumeID); err != nil {
  321. return err
  322. }
  323. attached, instanceID, err := detacher.cinderProvider.DiskIsAttachedByName(nodeName, volumeID)
  324. if err != nil {
  325. // Log error and continue with detach
  326. klog.Errorf(
  327. "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
  328. volumeID, nodeName, err)
  329. }
  330. if err == nil && !attached {
  331. // Volume is already detached from node.
  332. klog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName)
  333. return nil
  334. }
  335. if err = detacher.cinderProvider.DetachDisk(instanceID, volumeID); err != nil {
  336. klog.Errorf("Error detaching volume %q from node %q: %v", volumeID, nodeName, err)
  337. return err
  338. }
  339. if err = detacher.waitDiskDetached(instanceID, volumeID); err != nil {
  340. klog.Errorf("Error waiting for volume %q to detach from node %q: %v", volumeID, nodeName, err)
  341. return err
  342. }
  343. klog.Infof("detached volume %q from node %q", volumeID, nodeName)
  344. return nil
  345. }
  346. func (detacher *cinderDiskDetacher) UnmountDevice(deviceMountPath string) error {
  347. return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
  348. }
  349. func (plugin *cinderPlugin) CanAttach(spec *volume.Spec) (bool, error) {
  350. return true, nil
  351. }
  352. func (plugin *cinderPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  353. return true, nil
  354. }
  355. func (attacher *cinderDiskAttacher) nodeInstanceID(nodeName types.NodeName) (string, error) {
  356. instances, res := attacher.cinderProvider.Instances()
  357. if !res {
  358. return "", fmt.Errorf("failed to list openstack instances")
  359. }
  360. instanceID, err := instances.InstanceID(context.TODO(), nodeName)
  361. if err != nil {
  362. return "", err
  363. }
  364. if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
  365. instanceID = instanceID[(ind + 1):]
  366. }
  367. return instanceID, nil
  368. }