attacher.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cinder
  14. import (
  15. "context"
  16. "fmt"
  17. "os"
  18. "path"
  19. "strings"
  20. "time"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/klog"
  25. "k8s.io/kubernetes/pkg/util/mount"
  26. "k8s.io/kubernetes/pkg/volume"
  27. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  28. )
  29. type cinderDiskAttacher struct {
  30. host volume.VolumeHost
  31. cinderProvider BlockStorageProvider
  32. }
  33. var _ volume.Attacher = &cinderDiskAttacher{}
  34. var _ volume.DeviceMounter = &cinderDiskAttacher{}
  35. var _ volume.AttachableVolumePlugin = &cinderPlugin{}
  36. var _ volume.DeviceMountableVolumePlugin = &cinderPlugin{}
  37. const (
  38. probeVolumeInitDelay = 1 * time.Second
  39. probeVolumeFactor = 2.0
  40. operationFinishInitDelay = 1 * time.Second
  41. operationFinishFactor = 1.1
  42. operationFinishSteps = 10
  43. diskAttachInitDelay = 1 * time.Second
  44. diskAttachFactor = 1.2
  45. diskAttachSteps = 15
  46. diskDetachInitDelay = 1 * time.Second
  47. diskDetachFactor = 1.2
  48. diskDetachSteps = 13
  49. )
  50. func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
  51. cinder, err := plugin.getCloudProvider()
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &cinderDiskAttacher{
  56. host: plugin.host,
  57. cinderProvider: cinder,
  58. }, nil
  59. }
  60. func (plugin *cinderPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  61. return plugin.NewAttacher()
  62. }
  63. func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  64. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  65. return mounter.GetMountRefs(deviceMountPath)
  66. }
  67. func (attacher *cinderDiskAttacher) waitOperationFinished(volumeID string) error {
  68. backoff := wait.Backoff{
  69. Duration: operationFinishInitDelay,
  70. Factor: operationFinishFactor,
  71. Steps: operationFinishSteps,
  72. }
  73. var volumeStatus string
  74. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  75. var pending bool
  76. var err error
  77. pending, volumeStatus, err = attacher.cinderProvider.OperationPending(volumeID)
  78. if err != nil {
  79. return false, err
  80. }
  81. return !pending, nil
  82. })
  83. if err == wait.ErrWaitTimeout {
  84. err = fmt.Errorf("Volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
  85. }
  86. return err
  87. }
  88. func (attacher *cinderDiskAttacher) waitDiskAttached(instanceID, volumeID string) error {
  89. backoff := wait.Backoff{
  90. Duration: diskAttachInitDelay,
  91. Factor: diskAttachFactor,
  92. Steps: diskAttachSteps,
  93. }
  94. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  95. attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  96. if err != nil {
  97. return false, err
  98. }
  99. return attached, nil
  100. })
  101. if err == wait.ErrWaitTimeout {
  102. err = fmt.Errorf("Volume %q failed to be attached within the alloted time", volumeID)
  103. }
  104. return err
  105. }
  106. func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  107. volumeID, _, _, err := getVolumeInfo(spec)
  108. if err != nil {
  109. return "", err
  110. }
  111. instanceID, err := attacher.nodeInstanceID(nodeName)
  112. if err != nil {
  113. return "", err
  114. }
  115. if err := attacher.waitOperationFinished(volumeID); err != nil {
  116. return "", err
  117. }
  118. attached, err := attacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  119. if err != nil {
  120. // Log error and continue with attach
  121. klog.Warningf(
  122. "Error checking if volume (%q) is already attached to current instance (%q). Will continue and try attach anyway. err=%v",
  123. volumeID, instanceID, err)
  124. }
  125. if err == nil && attached {
  126. // Volume is already attached to instance.
  127. klog.Infof("Attach operation is successful. volume %q is already attached to instance %q.", volumeID, instanceID)
  128. } else {
  129. _, err = attacher.cinderProvider.AttachDisk(instanceID, volumeID)
  130. if err == nil {
  131. if err = attacher.waitDiskAttached(instanceID, volumeID); err != nil {
  132. klog.Errorf("Error waiting for volume %q to be attached from node %q: %v", volumeID, nodeName, err)
  133. return "", err
  134. }
  135. klog.Infof("Attach operation successful: volume %q attached to instance %q.", volumeID, instanceID)
  136. } else {
  137. klog.Infof("Attach volume %q to instance %q failed with: %v", volumeID, instanceID, err)
  138. return "", err
  139. }
  140. }
  141. devicePath, err := attacher.cinderProvider.GetAttachmentDiskPath(instanceID, volumeID)
  142. if err != nil {
  143. klog.Infof("Can not get device path of volume %q which be attached to instance %q, failed with: %v", volumeID, instanceID, err)
  144. return "", err
  145. }
  146. return devicePath, nil
  147. }
  148. func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
  149. volumesAttachedCheck := make(map[*volume.Spec]bool)
  150. volumeSpecMap := make(map[string]*volume.Spec)
  151. volumeIDList := []string{}
  152. for _, spec := range specs {
  153. volumeID, _, _, err := getVolumeInfo(spec)
  154. if err != nil {
  155. klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
  156. continue
  157. }
  158. volumeIDList = append(volumeIDList, volumeID)
  159. volumesAttachedCheck[spec] = true
  160. volumeSpecMap[volumeID] = spec
  161. }
  162. attachedResult, err := attacher.cinderProvider.DisksAreAttachedByName(nodeName, volumeIDList)
  163. if err != nil {
  164. // Log error and continue with attach
  165. klog.Errorf(
  166. "Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v",
  167. volumeIDList, nodeName, err)
  168. return volumesAttachedCheck, err
  169. }
  170. for volumeID, attached := range attachedResult {
  171. if !attached {
  172. spec := volumeSpecMap[volumeID]
  173. volumesAttachedCheck[spec] = false
  174. klog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
  175. }
  176. }
  177. return volumesAttachedCheck, nil
  178. }
  179. func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
  180. // NOTE: devicePath is path as reported by Cinder, which may be incorrect and should not be used. See Issue #33128
  181. volumeID, _, _, err := getVolumeInfo(spec)
  182. if err != nil {
  183. return "", err
  184. }
  185. if devicePath == "" {
  186. return "", fmt.Errorf("WaitForAttach failed for Cinder disk %q: devicePath is empty", volumeID)
  187. }
  188. ticker := time.NewTicker(probeVolumeInitDelay)
  189. defer ticker.Stop()
  190. timer := time.NewTimer(timeout)
  191. defer timer.Stop()
  192. duration := probeVolumeInitDelay
  193. for {
  194. select {
  195. case <-ticker.C:
  196. klog.V(5).Infof("Checking Cinder disk %q is attached.", volumeID)
  197. probeAttachedVolume()
  198. if !attacher.cinderProvider.ShouldTrustDevicePath() {
  199. // Using the Cinder volume ID, find the real device path (See Issue #33128)
  200. devicePath = attacher.cinderProvider.GetDevicePath(volumeID)
  201. }
  202. exists, err := mount.PathExists(devicePath)
  203. if exists && err == nil {
  204. klog.Infof("Successfully found attached Cinder disk %q at %v.", volumeID, devicePath)
  205. return devicePath, nil
  206. }
  207. // Log an error, and continue checking periodically
  208. klog.Errorf("Error: could not find attached Cinder disk %q (path: %q): %v", volumeID, devicePath, err)
  209. // Using exponential backoff instead of linear
  210. ticker.Stop()
  211. duration = time.Duration(float64(duration) * probeVolumeFactor)
  212. ticker = time.NewTicker(duration)
  213. case <-timer.C:
  214. return "", fmt.Errorf("could not find attached Cinder disk %q. Timeout waiting for mount paths to be created", volumeID)
  215. }
  216. }
  217. }
  218. func (attacher *cinderDiskAttacher) GetDeviceMountPath(
  219. spec *volume.Spec) (string, error) {
  220. volumeID, _, _, err := getVolumeInfo(spec)
  221. if err != nil {
  222. return "", err
  223. }
  224. return makeGlobalPDName(attacher.host, volumeID), nil
  225. }
  226. // FIXME: this method can be further pruned.
  227. func (attacher *cinderDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
  228. mounter := attacher.host.GetMounter(cinderVolumePluginName)
  229. notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
  230. if err != nil {
  231. if os.IsNotExist(err) {
  232. if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
  233. return err
  234. }
  235. notMnt = true
  236. } else {
  237. return err
  238. }
  239. }
  240. _, volumeFSType, readOnly, err := getVolumeInfo(spec)
  241. if err != nil {
  242. return err
  243. }
  244. options := []string{}
  245. if readOnly {
  246. options = append(options, "ro")
  247. }
  248. if notMnt {
  249. diskMounter := volumeutil.NewSafeFormatAndMountFromHost(cinderVolumePluginName, attacher.host)
  250. mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
  251. err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeFSType, mountOptions)
  252. if err != nil {
  253. os.Remove(deviceMountPath)
  254. return err
  255. }
  256. }
  257. return nil
  258. }
  259. type cinderDiskDetacher struct {
  260. mounter mount.Interface
  261. cinderProvider BlockStorageProvider
  262. }
  263. var _ volume.Detacher = &cinderDiskDetacher{}
  264. var _ volume.DeviceUnmounter = &cinderDiskDetacher{}
  265. func (plugin *cinderPlugin) NewDetacher() (volume.Detacher, error) {
  266. cinder, err := plugin.getCloudProvider()
  267. if err != nil {
  268. return nil, err
  269. }
  270. return &cinderDiskDetacher{
  271. mounter: plugin.host.GetMounter(plugin.GetPluginName()),
  272. cinderProvider: cinder,
  273. }, nil
  274. }
  275. func (plugin *cinderPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  276. return plugin.NewDetacher()
  277. }
  278. func (detacher *cinderDiskDetacher) waitOperationFinished(volumeID string) error {
  279. backoff := wait.Backoff{
  280. Duration: operationFinishInitDelay,
  281. Factor: operationFinishFactor,
  282. Steps: operationFinishSteps,
  283. }
  284. var volumeStatus string
  285. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  286. var pending bool
  287. var err error
  288. pending, volumeStatus, err = detacher.cinderProvider.OperationPending(volumeID)
  289. if err != nil {
  290. return false, err
  291. }
  292. return !pending, nil
  293. })
  294. if err == wait.ErrWaitTimeout {
  295. err = fmt.Errorf("Volume %q is %s, can't finish within the alloted time", volumeID, volumeStatus)
  296. }
  297. return err
  298. }
  299. func (detacher *cinderDiskDetacher) waitDiskDetached(instanceID, volumeID string) error {
  300. backoff := wait.Backoff{
  301. Duration: diskDetachInitDelay,
  302. Factor: diskDetachFactor,
  303. Steps: diskDetachSteps,
  304. }
  305. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  306. attached, err := detacher.cinderProvider.DiskIsAttached(instanceID, volumeID)
  307. if err != nil {
  308. return false, err
  309. }
  310. return !attached, nil
  311. })
  312. if err == wait.ErrWaitTimeout {
  313. err = fmt.Errorf("Volume %q failed to detach within the alloted time", volumeID)
  314. }
  315. return err
  316. }
  317. func (detacher *cinderDiskDetacher) Detach(volumeName string, nodeName types.NodeName) error {
  318. volumeID := path.Base(volumeName)
  319. if err := detacher.waitOperationFinished(volumeID); err != nil {
  320. return err
  321. }
  322. attached, instanceID, err := detacher.cinderProvider.DiskIsAttachedByName(nodeName, volumeID)
  323. if err != nil {
  324. // Log error and continue with detach
  325. klog.Errorf(
  326. "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
  327. volumeID, nodeName, err)
  328. }
  329. if err == nil && !attached {
  330. // Volume is already detached from node.
  331. klog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName)
  332. return nil
  333. }
  334. if err = detacher.cinderProvider.DetachDisk(instanceID, volumeID); err != nil {
  335. klog.Errorf("Error detaching volume %q from node %q: %v", volumeID, nodeName, err)
  336. return err
  337. }
  338. if err = detacher.waitDiskDetached(instanceID, volumeID); err != nil {
  339. klog.Errorf("Error waiting for volume %q to detach from node %q: %v", volumeID, nodeName, err)
  340. return err
  341. }
  342. klog.Infof("detached volume %q from node %q", volumeID, nodeName)
  343. return nil
  344. }
  345. func (detacher *cinderDiskDetacher) UnmountDevice(deviceMountPath string) error {
  346. return mount.CleanupMountPoint(deviceMountPath, detacher.mounter, false)
  347. }
  348. func (plugin *cinderPlugin) CanAttach(spec *volume.Spec) (bool, error) {
  349. return true, nil
  350. }
  351. func (plugin *cinderPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  352. return true, nil
  353. }
  354. func (attacher *cinderDiskAttacher) nodeInstanceID(nodeName types.NodeName) (string, error) {
  355. instances, res := attacher.cinderProvider.Instances()
  356. if !res {
  357. return "", fmt.Errorf("failed to list openstack instances")
  358. }
  359. instanceID, err := instances.InstanceID(context.TODO(), nodeName)
  360. if err != nil {
  361. return "", err
  362. }
  363. if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
  364. instanceID = instanceID[(ind + 1):]
  365. }
  366. return instanceID, nil
  367. }