operation_generator.go 77 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package operationexecutor
  14. import (
  15. goerrors "errors"
  16. "fmt"
  17. "path/filepath"
  18. "strings"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/client-go/tools/record"
  28. volerr "k8s.io/cloud-provider/volume/errors"
  29. csilib "k8s.io/csi-translation-lib"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/features"
  32. kevents "k8s.io/kubernetes/pkg/kubelet/events"
  33. "k8s.io/kubernetes/pkg/util/mount"
  34. "k8s.io/kubernetes/pkg/volume"
  35. "k8s.io/kubernetes/pkg/volume/csi"
  36. "k8s.io/kubernetes/pkg/volume/util"
  37. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  38. "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
  39. )
  40. const (
  41. unknownVolumePlugin string = "UnknownVolumePlugin"
  42. unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
  43. )
  44. var _ OperationGenerator = &operationGenerator{}
  45. type operationGenerator struct {
  46. // Used to fetch objects from the API server like Node in the
  47. // VerifyControllerAttachedVolume operation.
  48. kubeClient clientset.Interface
  49. // volumePluginMgr is the volume plugin manager used to create volume
  50. // plugin objects.
  51. volumePluginMgr *volume.VolumePluginMgr
  52. // recorder is used to record events in the API server
  53. recorder record.EventRecorder
  54. // checkNodeCapabilitiesBeforeMount, if set, enables the CanMount check,
  55. // which verifies that the components (binaries, etc.) required to mount
  56. // the volume are available on the underlying node before attempting mount.
  57. checkNodeCapabilitiesBeforeMount bool
  58. // blkUtil provides volume path related operations for block volume
  59. blkUtil volumepathhandler.BlockVolumePathHandler
  60. }
  61. // NewOperationGenerator is returns instance of operationGenerator
  62. func NewOperationGenerator(kubeClient clientset.Interface,
  63. volumePluginMgr *volume.VolumePluginMgr,
  64. recorder record.EventRecorder,
  65. checkNodeCapabilitiesBeforeMount bool,
  66. blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator {
  67. return &operationGenerator{
  68. kubeClient: kubeClient,
  69. volumePluginMgr: volumePluginMgr,
  70. recorder: recorder,
  71. checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
  72. blkUtil: blkUtil,
  73. }
  74. }
  75. // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
  76. type OperationGenerator interface {
  77. // Generates the MountVolume function needed to perform the mount of a volume plugin
  78. GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations
  79. // Generates the UnmountVolume function needed to perform the unmount of a volume plugin
  80. GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error)
  81. // Generates the AttachVolume function needed to perform attach of a volume plugin
  82. GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations
  83. // Generates the DetachVolume function needed to perform the detach of a volume plugin
  84. GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
  85. // Generates the VolumesAreAttached function needed to verify if volume plugins are attached
  86. GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
  87. // Generates the UnMountDevice function needed to perform the unmount of a device
  88. GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error)
  89. // Generates the function needed to check if the attach_detach controller has attached the volume plugin
  90. GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
  91. // Generates the MapVolume function needed to perform the map of a volume plugin
  92. GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
  93. // Generates the UnmapVolume function needed to perform the unmap of a volume plugin
  94. GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
  95. // Generates the UnmapDevice function needed to perform the unmap of a device
  96. GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error)
  97. // GetVolumePluginMgr returns volume plugin manager
  98. GetVolumePluginMgr() *volume.VolumePluginMgr
  99. GenerateBulkVolumeVerifyFunc(
  100. map[types.NodeName][]*volume.Spec,
  101. string,
  102. map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
  103. GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error)
  104. // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
  105. GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
  106. }
  107. func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
  108. attachedVolumes []AttachedVolume,
  109. nodeName types.NodeName,
  110. actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  111. // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
  112. // to this type of plugin
  113. volumesPerPlugin := make(map[string][]*volume.Spec)
  114. // volumeSpecMap maps from a volume spec to its unique volumeName which will be used
  115. // when calling MarkVolumeAsDetached
  116. volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName)
  117. // Iterate each volume spec and put them into a map index by the pluginName
  118. for _, volumeAttached := range attachedVolumes {
  119. if volumeAttached.VolumeSpec == nil {
  120. klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName)
  121. continue
  122. }
  123. volumePlugin, err :=
  124. og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
  125. if err != nil || volumePlugin == nil {
  126. klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error())
  127. continue
  128. }
  129. volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
  130. if !pluginExists {
  131. volumeSpecList = []*volume.Spec{}
  132. }
  133. volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
  134. volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
  135. volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
  136. }
  137. volumesAreAttachedFunc := func() (error, error) {
  138. // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
  139. // whether the volumes are still attached.
  140. for pluginName, volumesSpecs := range volumesPerPlugin {
  141. attachableVolumePlugin, err :=
  142. og.volumePluginMgr.FindAttachablePluginByName(pluginName)
  143. if err != nil || attachableVolumePlugin == nil {
  144. klog.Errorf(
  145. "VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
  146. pluginName,
  147. err)
  148. continue
  149. }
  150. volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
  151. if newAttacherErr != nil {
  152. klog.Errorf(
  153. "VolumesAreAttached.NewAttacher failed for getting plugin %q with: %v",
  154. pluginName,
  155. newAttacherErr)
  156. continue
  157. }
  158. attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
  159. if areAttachedErr != nil {
  160. klog.Errorf(
  161. "VolumesAreAttached failed for checking on node %q with: %v",
  162. nodeName,
  163. areAttachedErr)
  164. continue
  165. }
  166. for spec, check := range attached {
  167. if !check {
  168. actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
  169. klog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
  170. volumeSpecMap[spec], spec.Name(), nodeName)
  171. }
  172. }
  173. }
  174. return nil, nil
  175. }
  176. return volumetypes.GeneratedOperations{
  177. OperationName: "verify_volumes_are_attached_per_node",
  178. OperationFunc: volumesAreAttachedFunc,
  179. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume("<n/a>", nil), "verify_volumes_are_attached_per_node"),
  180. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  181. }, nil
  182. }
  183. func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
  184. pluginNodeVolumes map[types.NodeName][]*volume.Spec,
  185. pluginName string,
  186. volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
  187. actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  188. bulkVolumeVerifyFunc := func() (error, error) {
  189. attachableVolumePlugin, err :=
  190. og.volumePluginMgr.FindAttachablePluginByName(pluginName)
  191. if err != nil || attachableVolumePlugin == nil {
  192. klog.Errorf(
  193. "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v",
  194. pluginName,
  195. err)
  196. return nil, nil
  197. }
  198. volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
  199. if newAttacherErr != nil {
  200. klog.Errorf(
  201. "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v",
  202. attachableVolumePlugin,
  203. newAttacherErr)
  204. return nil, nil
  205. }
  206. bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier)
  207. if !ok {
  208. klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier)
  209. return nil, nil
  210. }
  211. attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes)
  212. if bulkAttachErr != nil {
  213. klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr)
  214. return nil, nil
  215. }
  216. for nodeName, volumeSpecs := range pluginNodeVolumes {
  217. for _, volumeSpec := range volumeSpecs {
  218. nodeVolumeSpecs, nodeChecked := attached[nodeName]
  219. if !nodeChecked {
  220. klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached",
  221. nodeName,
  222. volumeSpec.Name())
  223. continue
  224. }
  225. check := nodeVolumeSpecs[volumeSpec]
  226. if !check {
  227. klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q",
  228. nodeName,
  229. volumeSpec.Name())
  230. actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName)
  231. }
  232. }
  233. }
  234. return nil, nil
  235. }
  236. return volumetypes.GeneratedOperations{
  237. OperationName: "verify_volumes_are_attached",
  238. OperationFunc: bulkVolumeVerifyFunc,
  239. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, nil), "verify_volumes_are_attached"),
  240. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  241. }, nil
  242. }
  243. func (og *operationGenerator) GenerateAttachVolumeFunc(
  244. volumeToAttach VolumeToAttach,
  245. actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
  246. originalSpec := volumeToAttach.VolumeSpec
  247. attachVolumeFunc := func() (error, error) {
  248. var attachableVolumePlugin volume.AttachableVolumePlugin
  249. nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
  250. if err != nil {
  251. return volumeToAttach.GenerateError("AttachVolume.NodeUsingCSIPlugin failed", err)
  252. }
  253. // useCSIPlugin will check both CSIMigration and the plugin specific feature gates
  254. if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu {
  255. // The volume represented by this spec is CSI and thus should be migrated
  256. attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
  257. if err != nil || attachableVolumePlugin == nil {
  258. return volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginByName failed", err)
  259. }
  260. csiSpec, err := translateSpec(volumeToAttach.VolumeSpec)
  261. if err != nil {
  262. return volumeToAttach.GenerateError("AttachVolume.TranslateSpec failed", err)
  263. }
  264. volumeToAttach.VolumeSpec = csiSpec
  265. } else {
  266. attachableVolumePlugin, err =
  267. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
  268. if err != nil || attachableVolumePlugin == nil {
  269. return volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err)
  270. }
  271. }
  272. volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
  273. if newAttacherErr != nil {
  274. return volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr)
  275. }
  276. // Execute attach
  277. devicePath, attachErr := volumeAttacher.Attach(
  278. volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
  279. if attachErr != nil {
  280. uncertainNode := volumeToAttach.NodeName
  281. if derr, ok := attachErr.(*volerr.DanglingAttachError); ok {
  282. uncertainNode = derr.CurrentNode
  283. }
  284. addErr := actualStateOfWorld.MarkVolumeAsUncertain(
  285. v1.UniqueVolumeName(""),
  286. originalSpec,
  287. uncertainNode)
  288. if addErr != nil {
  289. klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr)
  290. }
  291. // On failure, return error. Caller will log and retry.
  292. return volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr)
  293. }
  294. // Successful attach event is useful for user debugging
  295. simpleMsg, _ := volumeToAttach.GenerateMsg("AttachVolume.Attach succeeded", "")
  296. for _, pod := range volumeToAttach.ScheduledPods {
  297. og.recorder.Eventf(pod, v1.EventTypeNormal, kevents.SuccessfulAttachVolume, simpleMsg)
  298. }
  299. klog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", ""))
  300. // Update actual state of world
  301. addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
  302. v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName, devicePath)
  303. if addVolumeNodeErr != nil {
  304. // On failure, return error. Caller will log and retry.
  305. return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
  306. }
  307. return nil, nil
  308. }
  309. eventRecorderFunc := func(err *error) {
  310. if *err != nil {
  311. for _, pod := range volumeToAttach.ScheduledPods {
  312. og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error())
  313. }
  314. }
  315. }
  316. attachableVolumePluginName := unknownAttachableVolumePlugin
  317. // TODO(dyzz) Ignoring this error means that if the plugin is migrated and
  318. // any transient error is encountered (API unavailable, driver not installed)
  319. // the operation will have it's metric registered with the in-tree plugin instead
  320. // of the CSI Driver we migrated to. Fixing this requires a larger refactor that
  321. // involves determining the plugin_name for the metric generating "CompleteFunc"
  322. // during the actual "OperationFunc" and not during this generation function
  323. nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
  324. if err != nil {
  325. klog.Errorf("GenerateAttachVolumeFunc failed to check if node is using CSI Plugin, metric for this operation may be inaccurate: %v", err)
  326. }
  327. // Need to translate the spec here if the plugin is migrated so that the metrics
  328. // emitted show the correct (migrated) plugin
  329. if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu {
  330. csiSpec, err := translateSpec(volumeToAttach.VolumeSpec)
  331. if err == nil {
  332. volumeToAttach.VolumeSpec = csiSpec
  333. }
  334. // If we have an error here we ignore it, the metric emitted will then be for the
  335. // in-tree plugin. This error case(skipped one) will also trigger an error
  336. // while the generated function is executed. And those errors will be handled during the execution of the generated
  337. // function with a back off policy.
  338. }
  339. // Get attacher plugin
  340. attachableVolumePlugin, err :=
  341. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
  342. // It's ok to ignore the error, returning error is not expected from this function.
  343. // If an error case occurred during the function generation, this error case(skipped one) will also trigger an error
  344. // while the generated function is executed. And those errors will be handled during the execution of the generated
  345. // function with a back off policy.
  346. if err == nil && attachableVolumePlugin != nil {
  347. attachableVolumePluginName = attachableVolumePlugin.GetPluginName()
  348. }
  349. return volumetypes.GeneratedOperations{
  350. OperationName: "volume_attach",
  351. OperationFunc: attachVolumeFunc,
  352. EventRecorderFunc: eventRecorderFunc,
  353. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(attachableVolumePluginName, volumeToAttach.VolumeSpec), "volume_attach"),
  354. }
  355. }
  356. func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
  357. return og.volumePluginMgr
  358. }
  359. func (og *operationGenerator) GenerateDetachVolumeFunc(
  360. volumeToDetach AttachedVolume,
  361. verifySafeToDetach bool,
  362. actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  363. var volumeName string
  364. var attachableVolumePlugin volume.AttachableVolumePlugin
  365. var pluginName string
  366. var err error
  367. if volumeToDetach.VolumeSpec != nil {
  368. // Get attacher plugin
  369. nu, err := nodeUsingCSIPlugin(og, volumeToDetach.VolumeSpec, volumeToDetach.NodeName)
  370. if err != nil {
  371. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NodeUsingCSIPlugin failed", err)
  372. }
  373. // useCSIPlugin will check both CSIMigration and the plugin specific feature gate
  374. if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) && nu {
  375. // The volume represented by this spec is CSI and thus should be migrated
  376. attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
  377. if err != nil || attachableVolumePlugin == nil {
  378. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
  379. }
  380. csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
  381. if err != nil {
  382. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err)
  383. }
  384. volumeToDetach.VolumeSpec = csiSpec
  385. } else {
  386. attachableVolumePlugin, err =
  387. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
  388. if err != nil || attachableVolumePlugin == nil {
  389. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
  390. }
  391. }
  392. volumeName, err =
  393. attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec)
  394. if err != nil {
  395. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err)
  396. }
  397. } else {
  398. // Get attacher plugin and the volumeName by splitting the volume unique name in case
  399. // there's no VolumeSpec: this happens only on attach/detach controller crash recovery
  400. // when a pod has been deleted during the controller downtime
  401. pluginName, volumeName, err = util.SplitUniqueName(volumeToDetach.VolumeName)
  402. if err != nil {
  403. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
  404. }
  405. // TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because
  406. // if it was PV it may have been migrated, but the same plugin with in-line may not have been.
  407. // Suggestions welcome...
  408. if csilib.IsMigratableIntreePluginByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
  409. // The volume represented by this spec is CSI and thus should be migrated
  410. attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
  411. if err != nil || attachableVolumePlugin == nil {
  412. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
  413. }
  414. // volumeToDetach.VolumeName here is always the in-tree volume name
  415. // therefore a workaround is required. volumeToDetach.DevicePath
  416. // is the attachID which happens to be what volumeName is needed for in Detach.
  417. // Therefore we set volumeName to the attachID. And CSI Detach can detect and use that.
  418. volumeName = volumeToDetach.DevicePath
  419. } else {
  420. attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
  421. if err != nil || attachableVolumePlugin == nil {
  422. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
  423. }
  424. }
  425. }
  426. if pluginName == "" {
  427. pluginName = attachableVolumePlugin.GetPluginName()
  428. }
  429. volumeDetacher, err := attachableVolumePlugin.NewDetacher()
  430. if err != nil {
  431. return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
  432. }
  433. getVolumePluginMgrFunc := func() (error, error) {
  434. var err error
  435. if verifySafeToDetach {
  436. err = og.verifyVolumeIsSafeToDetach(volumeToDetach)
  437. }
  438. if err == nil {
  439. err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
  440. }
  441. if err != nil {
  442. // On failure, add volume back to ReportAsAttached list
  443. actualStateOfWorld.AddVolumeToReportAsAttached(
  444. volumeToDetach.VolumeName, volumeToDetach.NodeName)
  445. return volumeToDetach.GenerateError("DetachVolume.Detach failed", err)
  446. }
  447. klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", ""))
  448. // Update actual state of world
  449. actualStateOfWorld.MarkVolumeAsDetached(
  450. volumeToDetach.VolumeName, volumeToDetach.NodeName)
  451. return nil, nil
  452. }
  453. return volumetypes.GeneratedOperations{
  454. OperationName: "volume_detach",
  455. OperationFunc: getVolumePluginMgrFunc,
  456. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"),
  457. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  458. }, nil
  459. }
  460. func (og *operationGenerator) GenerateMountVolumeFunc(
  461. waitForAttachTimeout time.Duration,
  462. volumeToMount VolumeToMount,
  463. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  464. isRemount bool) volumetypes.GeneratedOperations {
  465. // Get mounter plugin
  466. originalSpec := volumeToMount.VolumeSpec
  467. volumePluginName := unknownVolumePlugin
  468. // Need to translate the spec here if the plugin is migrated so that the metrics
  469. // emitted show the correct (migrated) plugin
  470. if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
  471. csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
  472. if err == nil {
  473. volumeToMount.VolumeSpec = csiSpec
  474. }
  475. // If we have an error here we ignore it, the metric emitted will then be for the
  476. // in-tree plugin. This error case(skipped one) will also trigger an error
  477. // while the generated function is executed. And those errors will be handled during the execution of the generated
  478. // function with a back off policy.
  479. }
  480. volumePlugin, err :=
  481. og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  482. if err == nil && volumePlugin != nil {
  483. volumePluginName = volumePlugin.GetPluginName()
  484. }
  485. mountVolumeFunc := func() (error, error) {
  486. // Get mounter plugin
  487. if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
  488. csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
  489. if err != nil {
  490. return volumeToMount.GenerateError("MountVolume.TranslateSpec failed", err)
  491. }
  492. volumeToMount.VolumeSpec = csiSpec
  493. }
  494. volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  495. if err != nil || volumePlugin == nil {
  496. return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
  497. }
  498. affinityErr := checkNodeAffinity(og, volumeToMount)
  499. if affinityErr != nil {
  500. return volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
  501. }
  502. volumeMounter, newMounterErr := volumePlugin.NewMounter(
  503. volumeToMount.VolumeSpec,
  504. volumeToMount.Pod,
  505. volume.VolumeOptions{})
  506. if newMounterErr != nil {
  507. return volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
  508. }
  509. mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
  510. if mountCheckError != nil {
  511. return volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
  512. }
  513. // Get attacher, if possible
  514. attachableVolumePlugin, _ :=
  515. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
  516. var volumeAttacher volume.Attacher
  517. if attachableVolumePlugin != nil {
  518. volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
  519. }
  520. // get deviceMounter, if possible
  521. deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
  522. var volumeDeviceMounter volume.DeviceMounter
  523. if deviceMountableVolumePlugin != nil {
  524. volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
  525. }
  526. var fsGroup *int64
  527. if volumeToMount.Pod.Spec.SecurityContext != nil &&
  528. volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil {
  529. fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup
  530. }
  531. devicePath := volumeToMount.DevicePath
  532. if volumeAttacher != nil {
  533. // Wait for attachable volumes to finish attaching
  534. klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
  535. devicePath, err = volumeAttacher.WaitForAttach(
  536. volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
  537. if err != nil {
  538. // On failure, return error. Caller will log and retry.
  539. return volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err)
  540. }
  541. klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)))
  542. }
  543. var resizeDone bool
  544. var resizeError error
  545. resizeOptions := volume.NodeResizeOptions{
  546. DevicePath: devicePath,
  547. }
  548. if volumeDeviceMounter != nil {
  549. deviceMountPath, err :=
  550. volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
  551. if err != nil {
  552. // On failure, return error. Caller will log and retry.
  553. return volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err)
  554. }
  555. // Mount device to global mount path
  556. err = volumeDeviceMounter.MountDevice(
  557. volumeToMount.VolumeSpec,
  558. devicePath,
  559. deviceMountPath)
  560. if err != nil {
  561. // On failure, return error. Caller will log and retry.
  562. return volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
  563. }
  564. klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath)))
  565. // Update actual state of world to reflect volume is globally mounted
  566. markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
  567. volumeToMount.VolumeName, devicePath, deviceMountPath)
  568. if markDeviceMountedErr != nil {
  569. // On failure, return error. Caller will log and retry.
  570. return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
  571. }
  572. resizeOptions.DeviceMountPath = deviceMountPath
  573. resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
  574. // resizeFileSystem will resize the file system if user has requested a resize of
  575. // underlying persistent volume and is allowed to do so.
  576. resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions)
  577. if resizeError != nil {
  578. klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
  579. return volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError)
  580. }
  581. }
  582. if og.checkNodeCapabilitiesBeforeMount {
  583. if canMountErr := volumeMounter.CanMount(); canMountErr != nil {
  584. err = fmt.Errorf(
  585. "Verify that your node machine has the required components before attempting to mount this volume type. %s",
  586. canMountErr)
  587. return volumeToMount.GenerateError("MountVolume.CanMount failed", err)
  588. }
  589. }
  590. // Execute mount
  591. mountErr := volumeMounter.SetUp(volume.MounterArgs{
  592. FsGroup: fsGroup,
  593. DesiredSize: volumeToMount.DesiredSizeLimit,
  594. PodUID: string(volumeToMount.Pod.UID),
  595. })
  596. if mountErr != nil {
  597. // On failure, return error. Caller will log and retry.
  598. return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
  599. }
  600. _, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "")
  601. verbosity := klog.Level(1)
  602. if isRemount {
  603. verbosity = klog.Level(4)
  604. }
  605. klog.V(verbosity).Infof(detailedMsg)
  606. resizeOptions.DeviceMountPath = volumeMounter.GetPath()
  607. resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
  608. // We need to call resizing here again in case resizing was not done during device mount. There could be
  609. // two reasons of that:
  610. // - Volume does not support DeviceMounter interface.
  611. // - In case of CSI the volume does not have node stage_unstage capability.
  612. if !resizeDone {
  613. resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions)
  614. if resizeError != nil {
  615. klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
  616. return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
  617. }
  618. }
  619. // Update actual state of world
  620. markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
  621. volumeToMount.PodName,
  622. volumeToMount.Pod.UID,
  623. volumeToMount.VolumeName,
  624. volumeMounter,
  625. nil,
  626. volumeToMount.OuterVolumeSpecName,
  627. volumeToMount.VolumeGidValue,
  628. originalSpec)
  629. if markVolMountedErr != nil {
  630. // On failure, return error. Caller will log and retry.
  631. return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
  632. }
  633. return nil, nil
  634. }
  635. eventRecorderFunc := func(err *error) {
  636. if *err != nil {
  637. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error())
  638. }
  639. }
  640. return volumetypes.GeneratedOperations{
  641. OperationName: "volume_mount",
  642. OperationFunc: mountVolumeFunc,
  643. EventRecorderFunc: eventRecorderFunc,
  644. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
  645. }
  646. }
  647. func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) {
  648. if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
  649. klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName)
  650. return true, nil
  651. }
  652. // Get expander, if possible
  653. expandableVolumePlugin, _ :=
  654. og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
  655. if expandableVolumePlugin != nil &&
  656. expandableVolumePlugin.RequiresFSResize() &&
  657. volumeToMount.VolumeSpec.PersistentVolume != nil {
  658. pv := volumeToMount.VolumeSpec.PersistentVolume
  659. pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(pv.Spec.ClaimRef.Name, metav1.GetOptions{})
  660. if err != nil {
  661. // Return error rather than leave the file system un-resized, caller will log and retry
  662. return false, fmt.Errorf("MountVolume.resizeFileSystem get PVC failed : %v", err)
  663. }
  664. pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
  665. pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
  666. if pvcStatusCap.Cmp(pvSpecCap) < 0 {
  667. // File system resize was requested, proceed
  668. klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("MountVolume.resizeFileSystem entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
  669. if volumeToMount.VolumeSpec.ReadOnly {
  670. simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.resizeFileSystem failed", "requested read-only file system")
  671. klog.Warningf(detailedMsg)
  672. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
  673. return true, nil
  674. }
  675. rsOpts.VolumeSpec = volumeToMount.VolumeSpec
  676. rsOpts.NewSize = pvSpecCap
  677. rsOpts.OldSize = pvcStatusCap
  678. resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
  679. if resizeErr != nil {
  680. return false, fmt.Errorf("MountVolume.resizeFileSystem failed : %v", resizeErr)
  681. }
  682. // Volume resizing is not done but it did not error out. This could happen if a CSI volume
  683. // does not have node stage_unstage capability but was asked to resize the volume before
  684. // node publish. In which case - we must retry resizing after node publish.
  685. if !resizeDone {
  686. return false, nil
  687. }
  688. simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.resizeFileSystem succeeded", "")
  689. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
  690. klog.Infof(detailedMsg)
  691. // File system resize succeeded, now update the PVC's Capacity to match the PV's
  692. err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
  693. if err != nil {
  694. // On retry, resizeFileSystem will be called again but do nothing
  695. return false, fmt.Errorf("MountVolume.resizeFileSystem update PVC status failed : %v", err)
  696. }
  697. return true, nil
  698. }
  699. }
  700. return true, nil
  701. }
  702. func (og *operationGenerator) GenerateUnmountVolumeFunc(
  703. volumeToUnmount MountedVolume,
  704. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  705. podsDir string) (volumetypes.GeneratedOperations, error) {
  706. var pluginName string
  707. if volumeToUnmount.VolumeSpec != nil && useCSIPlugin(og.volumePluginMgr, volumeToUnmount.VolumeSpec) {
  708. pluginName = csi.CSIPluginName
  709. } else {
  710. pluginName = volumeToUnmount.PluginName
  711. }
  712. // Get mountable plugin
  713. volumePlugin, err := og.volumePluginMgr.FindPluginByName(pluginName)
  714. if err != nil || volumePlugin == nil {
  715. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err)
  716. }
  717. volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
  718. volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
  719. if newUnmounterErr != nil {
  720. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
  721. }
  722. unmountVolumeFunc := func() (error, error) {
  723. subpather := og.volumePluginMgr.Host.GetSubpather()
  724. // Remove all bind-mounts for subPaths
  725. podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
  726. if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
  727. return volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
  728. }
  729. // Execute unmount
  730. unmountErr := volumeUnmounter.TearDown()
  731. if unmountErr != nil {
  732. // On failure, return error. Caller will log and retry.
  733. return volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
  734. }
  735. klog.Infof(
  736. "UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
  737. volumeToUnmount.VolumeName,
  738. volumeToUnmount.OuterVolumeSpecName,
  739. volumeToUnmount.PodName,
  740. volumeToUnmount.PodUID,
  741. volumeToUnmount.InnerVolumeSpecName,
  742. volumeToUnmount.PluginName,
  743. volumeToUnmount.VolumeGidValue)
  744. // Update actual state of world
  745. markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
  746. volumeToUnmount.PodName, volumeToUnmount.VolumeName)
  747. if markVolMountedErr != nil {
  748. // On failure, just log and exit
  749. klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error())
  750. }
  751. return nil, nil
  752. }
  753. return volumetypes.GeneratedOperations{
  754. OperationName: "volume_unmount",
  755. OperationFunc: unmountVolumeFunc,
  756. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
  757. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  758. }, nil
  759. }
  760. func (og *operationGenerator) GenerateUnmountDeviceFunc(
  761. deviceToDetach AttachedVolume,
  762. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  763. mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
  764. var pluginName string
  765. if useCSIPlugin(og.volumePluginMgr, deviceToDetach.VolumeSpec) {
  766. pluginName = csi.CSIPluginName
  767. csiSpec, err := translateSpec(deviceToDetach.VolumeSpec)
  768. if err != nil {
  769. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.TranslateSpec failed", err)
  770. }
  771. deviceToDetach.VolumeSpec = csiSpec
  772. } else {
  773. pluginName = deviceToDetach.PluginName
  774. }
  775. // Get DeviceMounter plugin
  776. deviceMountableVolumePlugin, err :=
  777. og.volumePluginMgr.FindDeviceMountablePluginByName(pluginName)
  778. if err != nil || deviceMountableVolumePlugin == nil {
  779. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindDeviceMountablePluginByName failed", err)
  780. }
  781. volumeDeviceUmounter, err := deviceMountableVolumePlugin.NewDeviceUnmounter()
  782. if err != nil {
  783. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceUmounter failed", err)
  784. }
  785. volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
  786. if err != nil {
  787. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err)
  788. }
  789. unmountDeviceFunc := func() (error, error) {
  790. //deviceMountPath := deviceToDetach.DeviceMountPath
  791. deviceMountPath, err :=
  792. volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec)
  793. if err != nil {
  794. // On failure, return error. Caller will log and retry.
  795. return deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
  796. }
  797. refs, err := deviceMountableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
  798. if err != nil || mount.HasMountRefs(deviceMountPath, refs) {
  799. if err == nil {
  800. err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
  801. }
  802. return deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err)
  803. }
  804. // Execute unmount
  805. unmountDeviceErr := volumeDeviceUmounter.UnmountDevice(deviceMountPath)
  806. if unmountDeviceErr != nil {
  807. // On failure, return error. Caller will log and retry.
  808. return deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
  809. }
  810. // Before logging that UnmountDevice succeeded and moving on,
  811. // use mounter.PathIsDevice to check if the path is a device,
  812. // if so use mounter.DeviceOpened to check if the device is in use anywhere
  813. // else on the system. Retry if it returns true.
  814. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter)
  815. if deviceOpenedErr != nil {
  816. return nil, deviceOpenedErr
  817. }
  818. // The device is still in use elsewhere. Caller will log and retry.
  819. if deviceOpened {
  820. return deviceToDetach.GenerateError(
  821. "UnmountDevice failed",
  822. goerrors.New("the device is in use when it was no longer expected to be in use"))
  823. }
  824. klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", ""))
  825. // Update actual state of world
  826. markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
  827. deviceToDetach.VolumeName)
  828. if markDeviceUnmountedErr != nil {
  829. // On failure, return error. Caller will log and retry.
  830. return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
  831. }
  832. return nil, nil
  833. }
  834. return volumetypes.GeneratedOperations{
  835. OperationName: "unmount_device",
  836. OperationFunc: unmountDeviceFunc,
  837. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"),
  838. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  839. }, nil
  840. }
  841. // GenerateMapVolumeFunc marks volume as mounted based on following steps.
  842. // If plugin is attachable, call WaitForAttach() and then mark the device
  843. // as mounted. On next step, SetUpDevice is called without dependent of
  844. // plugin type, but this method mainly is targeted for none attachable plugin.
  845. // After setup is done, create symbolic links on both global map path and pod
  846. // device map path. Once symbolic links are created, take fd lock by
  847. // loopback for the device to avoid silent volume replacement. This lock
  848. // will be released once no one uses the device.
  849. // If all steps are completed, the volume is marked as mounted.
  850. func (og *operationGenerator) GenerateMapVolumeFunc(
  851. waitForAttachTimeout time.Duration,
  852. volumeToMount VolumeToMount,
  853. actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  854. originalSpec := volumeToMount.VolumeSpec
  855. // Translate to CSI spec if migration enabled
  856. if useCSIPlugin(og.volumePluginMgr, originalSpec) {
  857. csiSpec, err := translateSpec(originalSpec)
  858. if err != nil {
  859. return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.TranslateSpec failed", err)
  860. }
  861. volumeToMount.VolumeSpec = csiSpec
  862. }
  863. // Get block volume mapper plugin
  864. blockVolumePlugin, err :=
  865. og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
  866. if err != nil {
  867. return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err)
  868. }
  869. if blockVolumePlugin == nil {
  870. return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  871. }
  872. affinityErr := checkNodeAffinity(og, volumeToMount)
  873. if affinityErr != nil {
  874. eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr)
  875. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
  876. return volumetypes.GeneratedOperations{}, detailedErr
  877. }
  878. blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
  879. volumeToMount.VolumeSpec,
  880. volumeToMount.Pod,
  881. volume.VolumeOptions{})
  882. if newMapperErr != nil {
  883. eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
  884. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error())
  885. return volumetypes.GeneratedOperations{}, detailedErr
  886. }
  887. // Get attacher, if possible
  888. attachableVolumePlugin, _ :=
  889. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
  890. var volumeAttacher volume.Attacher
  891. if attachableVolumePlugin != nil {
  892. volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
  893. }
  894. mapVolumeFunc := func() (error, error) {
  895. var devicePath string
  896. // Set up global map path under the given plugin directory using symbolic link
  897. globalMapPath, err :=
  898. blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
  899. if err != nil {
  900. // On failure, return error. Caller will log and retry.
  901. return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err)
  902. }
  903. if volumeAttacher != nil {
  904. // Wait for attachable volumes to finish attaching
  905. klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
  906. devicePath, err = volumeAttacher.WaitForAttach(
  907. volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout)
  908. if err != nil {
  909. // On failure, return error. Caller will log and retry.
  910. return volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err)
  911. }
  912. klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)))
  913. }
  914. // A plugin doesn't have attacher also needs to map device to global map path with SetUpDevice()
  915. pluginDevicePath, mapErr := blockVolumeMapper.SetUpDevice()
  916. if mapErr != nil {
  917. // On failure, return error. Caller will log and retry.
  918. return volumeToMount.GenerateError("MapVolume.SetUp failed", mapErr)
  919. }
  920. // if pluginDevicePath is provided, assume attacher may not provide device
  921. // or attachment flow uses SetupDevice to get device path
  922. if len(pluginDevicePath) != 0 {
  923. devicePath = pluginDevicePath
  924. }
  925. if len(devicePath) == 0 {
  926. return volumeToMount.GenerateError("MapVolume failed", goerrors.New("Device path of the volume is empty"))
  927. }
  928. // When kubelet is containerized, devicePath may be a symlink at a place unavailable to
  929. // kubelet, so evaluate it on the host and expect that it links to a device in /dev,
  930. // which will be available to containerized kubelet. If still it does not exist,
  931. // AttachFileDevice will fail. If kubelet is not containerized, eval it anyway.
  932. mounter := og.GetVolumePluginMgr().Host.GetMounter(blockVolumePlugin.GetPluginName())
  933. devicePath, err = mounter.EvalHostSymlinks(devicePath)
  934. if err != nil {
  935. return volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err)
  936. }
  937. // Map device to global and pod device map path
  938. volumeMapPath, volName := blockVolumeMapper.GetPodDeviceMapPath()
  939. mapErr = blockVolumeMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID)
  940. if mapErr != nil {
  941. // On failure, return error. Caller will log and retry.
  942. return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr)
  943. }
  944. // Take filedescriptor lock to keep a block device opened. Otherwise, there is a case
  945. // that the block device is silently removed and attached another device with same name.
  946. // Container runtime can't handler this problem. To avoid unexpected condition fd lock
  947. // for the block device is required.
  948. _, err = og.blkUtil.AttachFileDevice(devicePath)
  949. if err != nil {
  950. return volumeToMount.GenerateError("MapVolume.AttachFileDevice failed", err)
  951. }
  952. // Update actual state of world to reflect volume is globally mounted
  953. markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
  954. volumeToMount.VolumeName, devicePath, globalMapPath)
  955. if markDeviceMappedErr != nil {
  956. // On failure, return error. Caller will log and retry.
  957. return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
  958. }
  959. // Device mapping for global map path succeeded
  960. simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath))
  961. verbosity := klog.Level(4)
  962. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
  963. klog.V(verbosity).Infof(detailedMsg)
  964. // Device mapping for pod device map path succeeded
  965. simpleMsg, detailedMsg = volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("volumeMapPath %q", volumeMapPath))
  966. verbosity = klog.Level(1)
  967. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
  968. klog.V(verbosity).Infof(detailedMsg)
  969. // Update actual state of world
  970. markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
  971. volumeToMount.PodName,
  972. volumeToMount.Pod.UID,
  973. volumeToMount.VolumeName,
  974. nil,
  975. blockVolumeMapper,
  976. volumeToMount.OuterVolumeSpecName,
  977. volumeToMount.VolumeGidValue,
  978. originalSpec)
  979. if markVolMountedErr != nil {
  980. // On failure, return error. Caller will log and retry.
  981. return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)
  982. }
  983. return nil, nil
  984. }
  985. eventRecorderFunc := func(err *error) {
  986. if *err != nil {
  987. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error())
  988. }
  989. }
  990. return volumetypes.GeneratedOperations{
  991. OperationName: "map_volume",
  992. OperationFunc: mapVolumeFunc,
  993. EventRecorderFunc: eventRecorderFunc,
  994. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "map_volume"),
  995. }, nil
  996. }
  997. // GenerateUnmapVolumeFunc marks volume as unmonuted based on following steps.
  998. // Remove symbolic links from pod device map path dir and global map path dir.
  999. // Once those cleanups are done, remove pod device map path dir.
  1000. // If all steps are completed, the volume is marked as unmounted.
  1001. func (og *operationGenerator) GenerateUnmapVolumeFunc(
  1002. volumeToUnmount MountedVolume,
  1003. actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  1004. var blockVolumePlugin volume.BlockVolumePlugin
  1005. var err error
  1006. // Translate to CSI spec if migration enabled
  1007. // And get block volume unmapper plugin
  1008. if volumeToUnmount.VolumeSpec != nil && useCSIPlugin(og.volumePluginMgr, volumeToUnmount.VolumeSpec) {
  1009. csiSpec, err := translateSpec(volumeToUnmount.VolumeSpec)
  1010. if err != nil {
  1011. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.TranslateSpec failed", err)
  1012. }
  1013. volumeToUnmount.VolumeSpec = csiSpec
  1014. blockVolumePlugin, err =
  1015. og.volumePluginMgr.FindMapperPluginByName(csi.CSIPluginName)
  1016. if err != nil {
  1017. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err)
  1018. }
  1019. } else {
  1020. blockVolumePlugin, err =
  1021. og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName)
  1022. if err != nil {
  1023. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err)
  1024. }
  1025. }
  1026. var blockVolumeUnmapper volume.BlockVolumeUnmapper
  1027. if blockVolumePlugin == nil {
  1028. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1029. }
  1030. blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
  1031. volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
  1032. if newUnmapperErr != nil {
  1033. return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr)
  1034. }
  1035. unmapVolumeFunc := func() (error, error) {
  1036. // Try to unmap volumeName symlink under pod device map path dir
  1037. // pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
  1038. podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath()
  1039. unmapDeviceErr := og.blkUtil.UnmapDevice(podDeviceUnmapPath, volName)
  1040. if unmapDeviceErr != nil {
  1041. // On failure, return error. Caller will log and retry.
  1042. return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on pod device map path failed", unmapDeviceErr)
  1043. }
  1044. // Try to unmap podUID symlink under global map path dir
  1045. // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
  1046. globalUnmapPath := volumeToUnmount.DeviceMountPath
  1047. unmapDeviceErr = og.blkUtil.UnmapDevice(globalUnmapPath, string(volumeToUnmount.PodUID))
  1048. if unmapDeviceErr != nil {
  1049. // On failure, return error. Caller will log and retry.
  1050. return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on global map path failed", unmapDeviceErr)
  1051. }
  1052. klog.Infof(
  1053. "UnmapVolume succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
  1054. volumeToUnmount.VolumeName,
  1055. volumeToUnmount.OuterVolumeSpecName,
  1056. volumeToUnmount.PodName,
  1057. volumeToUnmount.PodUID,
  1058. volumeToUnmount.InnerVolumeSpecName,
  1059. volumeToUnmount.PluginName,
  1060. volumeToUnmount.VolumeGidValue)
  1061. // Update actual state of world
  1062. markVolUnmountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
  1063. volumeToUnmount.PodName, volumeToUnmount.VolumeName)
  1064. if markVolUnmountedErr != nil {
  1065. // On failure, just log and exit
  1066. klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error())
  1067. }
  1068. return nil, nil
  1069. }
  1070. return volumetypes.GeneratedOperations{
  1071. OperationName: "unmap_volume",
  1072. OperationFunc: unmapVolumeFunc,
  1073. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "unmap_volume"),
  1074. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1075. }, nil
  1076. }
  1077. // GenerateUnmapDeviceFunc marks device as unmounted based on following steps.
  1078. // Check under globalMapPath dir if there isn't pod's symbolic links in it.
  1079. // If symbolic link isn't there, the device isn't referenced from Pods.
  1080. // Call plugin TearDownDevice to clean-up device connection, stored data under
  1081. // globalMapPath, these operations depend on plugin implementation.
  1082. // Once TearDownDevice is completed, remove globalMapPath dir.
  1083. // After globalMapPath is removed, fd lock by loopback for the device can
  1084. // be released safely because no one can consume the device at this point.
  1085. // At last, device open status will be checked just in case.
  1086. // If all steps are completed, the device is marked as unmounted.
  1087. func (og *operationGenerator) GenerateUnmapDeviceFunc(
  1088. deviceToDetach AttachedVolume,
  1089. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  1090. mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
  1091. var blockVolumePlugin volume.BlockVolumePlugin
  1092. var err error
  1093. // Translate to CSI spec if migration enabled
  1094. if useCSIPlugin(og.volumePluginMgr, deviceToDetach.VolumeSpec) {
  1095. csiSpec, err := translateSpec(deviceToDetach.VolumeSpec)
  1096. if err != nil {
  1097. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.TranslateSpec failed", err)
  1098. }
  1099. deviceToDetach.VolumeSpec = csiSpec
  1100. blockVolumePlugin, err =
  1101. og.volumePluginMgr.FindMapperPluginByName(csi.CSIPluginName)
  1102. if err != nil {
  1103. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err)
  1104. }
  1105. } else {
  1106. blockVolumePlugin, err =
  1107. og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
  1108. if err != nil {
  1109. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err)
  1110. }
  1111. }
  1112. if blockVolumePlugin == nil {
  1113. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1114. }
  1115. blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
  1116. deviceToDetach.VolumeSpec.Name(),
  1117. "" /* podUID */)
  1118. if newUnmapperErr != nil {
  1119. return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
  1120. }
  1121. unmapDeviceFunc := func() (error, error) {
  1122. // Search under globalMapPath dir if all symbolic links from pods have been removed already.
  1123. // If symbolic links are there, pods may still refer the volume.
  1124. globalMapPath := deviceToDetach.DeviceMountPath
  1125. refs, err := og.blkUtil.GetDeviceSymlinkRefs(deviceToDetach.DevicePath, globalMapPath)
  1126. if err != nil {
  1127. return deviceToDetach.GenerateError("UnmapDevice.GetDeviceSymlinkRefs check failed", err)
  1128. }
  1129. if len(refs) > 0 {
  1130. err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs)
  1131. return deviceToDetach.GenerateError("UnmapDevice failed", err)
  1132. }
  1133. // The block volume is not referenced from Pods. Release file descriptor lock.
  1134. // This should be done before calling TearDownDevice, because some plugins that do local detach
  1135. // in TearDownDevice will fail in detaching device due to the refcnt on the loopback device.
  1136. klog.V(4).Infof("UnmapDevice: deviceToDetach.DevicePath: %v", deviceToDetach.DevicePath)
  1137. loopPath, err := og.blkUtil.GetLoopDevice(deviceToDetach.DevicePath)
  1138. if err != nil {
  1139. if err.Error() == volumepathhandler.ErrDeviceNotFound {
  1140. klog.Warningf(deviceToDetach.GenerateMsgDetailed("UnmapDevice: Couldn't find loopback device which takes file descriptor lock", fmt.Sprintf("device path: %q", deviceToDetach.DevicePath)))
  1141. } else {
  1142. errInfo := "UnmapDevice.GetLoopDevice failed to get loopback device, " + fmt.Sprintf("device path: %q", deviceToDetach.DevicePath)
  1143. return deviceToDetach.GenerateError(errInfo, err)
  1144. }
  1145. } else {
  1146. if len(loopPath) != 0 {
  1147. err = og.blkUtil.RemoveLoopDevice(loopPath)
  1148. if err != nil {
  1149. return deviceToDetach.GenerateError("UnmapDevice.RemoveLoopDevice failed", err)
  1150. }
  1151. }
  1152. }
  1153. // Execute tear down device
  1154. unmapErr := blockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath)
  1155. if unmapErr != nil {
  1156. // On failure, return error. Caller will log and retry.
  1157. return deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr)
  1158. }
  1159. // Plugin finished TearDownDevice(). Now globalMapPath dir and plugin's stored data
  1160. // on the dir are unnecessary, clean up it.
  1161. removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath)
  1162. if removeMapPathErr != nil {
  1163. // On failure, return error. Caller will log and retry.
  1164. return deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr)
  1165. }
  1166. // Before logging that UnmapDevice succeeded and moving on,
  1167. // use mounter.PathIsDevice to check if the path is a device,
  1168. // if so use mounter.DeviceOpened to check if the device is in use anywhere
  1169. // else on the system. Retry if it returns true.
  1170. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter)
  1171. if deviceOpenedErr != nil {
  1172. return nil, deviceOpenedErr
  1173. }
  1174. // The device is still in use elsewhere. Caller will log and retry.
  1175. if deviceOpened {
  1176. return deviceToDetach.GenerateError(
  1177. "UnmapDevice failed",
  1178. fmt.Errorf("the device is in use when it was no longer expected to be in use"))
  1179. }
  1180. klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", ""))
  1181. // Update actual state of world
  1182. markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
  1183. deviceToDetach.VolumeName)
  1184. if markDeviceUnmountedErr != nil {
  1185. // On failure, return error. Caller will log and retry.
  1186. return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
  1187. }
  1188. return nil, nil
  1189. }
  1190. return volumetypes.GeneratedOperations{
  1191. OperationName: "unmap_device",
  1192. OperationFunc: unmapDeviceFunc,
  1193. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmap_device"),
  1194. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1195. }, nil
  1196. }
  1197. func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
  1198. volumeToMount VolumeToMount,
  1199. nodeName types.NodeName,
  1200. actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  1201. volumePlugin, err :=
  1202. og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  1203. if err != nil || volumePlugin == nil {
  1204. return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
  1205. }
  1206. verifyControllerAttachedVolumeFunc := func() (error, error) {
  1207. if !volumeToMount.PluginIsAttachable {
  1208. // If the volume does not implement the attacher interface, it is
  1209. // assumed to be attached and the actual state of the world is
  1210. // updated accordingly.
  1211. addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
  1212. volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
  1213. if addVolumeNodeErr != nil {
  1214. // On failure, return error. Caller will log and retry.
  1215. return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
  1216. }
  1217. return nil, nil
  1218. }
  1219. if !volumeToMount.ReportedInUse {
  1220. // If the given volume has not yet been added to the list of
  1221. // VolumesInUse in the node's volume status, do not proceed, return
  1222. // error. Caller will log and retry. The node status is updated
  1223. // periodically by kubelet, so it may take as much as 10 seconds
  1224. // before this clears.
  1225. // Issue #28141 to enable on demand status updates.
  1226. return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
  1227. }
  1228. // Fetch current node object
  1229. node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{})
  1230. if fetchErr != nil {
  1231. // On failure, return error. Caller will log and retry.
  1232. return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
  1233. }
  1234. if node == nil {
  1235. // On failure, return error. Caller will log and retry.
  1236. return volumeToMount.GenerateError(
  1237. "VerifyControllerAttachedVolume failed",
  1238. fmt.Errorf("Node object retrieved from API server is nil"))
  1239. }
  1240. for _, attachedVolume := range node.Status.VolumesAttached {
  1241. if attachedVolume.Name == volumeToMount.VolumeName {
  1242. addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
  1243. v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
  1244. klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)))
  1245. if addVolumeNodeErr != nil {
  1246. // On failure, return error. Caller will log and retry.
  1247. return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
  1248. }
  1249. return nil, nil
  1250. }
  1251. }
  1252. // Volume not attached, return error. Caller will log and retry.
  1253. return volumeToMount.GenerateError("Volume not attached according to node status", nil)
  1254. }
  1255. return volumetypes.GeneratedOperations{
  1256. OperationName: "verify_controller_attached_volume",
  1257. OperationFunc: verifyControllerAttachedVolumeFunc,
  1258. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
  1259. EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1260. }, nil
  1261. }
  1262. func (og *operationGenerator) verifyVolumeIsSafeToDetach(
  1263. volumeToDetach AttachedVolume) error {
  1264. // Fetch current node object
  1265. node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(volumeToDetach.NodeName), metav1.GetOptions{})
  1266. if fetchErr != nil {
  1267. if errors.IsNotFound(fetchErr) {
  1268. klog.Warningf(volumeToDetach.GenerateMsgDetailed("Node not found on API server. DetachVolume will skip safe to detach check", ""))
  1269. return nil
  1270. }
  1271. // On failure, return error. Caller will log and retry.
  1272. return volumeToDetach.GenerateErrorDetailed("DetachVolume failed fetching node from API server", fetchErr)
  1273. }
  1274. if node == nil {
  1275. // On failure, return error. Caller will log and retry.
  1276. return volumeToDetach.GenerateErrorDetailed(
  1277. "DetachVolume failed fetching node from API server",
  1278. fmt.Errorf("node object retrieved from API server is nil"))
  1279. }
  1280. for _, inUseVolume := range node.Status.VolumesInUse {
  1281. if inUseVolume == volumeToDetach.VolumeName {
  1282. return volumeToDetach.GenerateErrorDetailed(
  1283. "DetachVolume failed",
  1284. fmt.Errorf("volume is still in use by node, according to Node status"))
  1285. }
  1286. }
  1287. // Volume is not marked as in use by node
  1288. klog.Infof(volumeToDetach.GenerateMsgDetailed("Verified volume is safe to detach", ""))
  1289. return nil
  1290. }
  1291. func (og *operationGenerator) GenerateExpandVolumeFunc(
  1292. pvc *v1.PersistentVolumeClaim,
  1293. pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
  1294. volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
  1295. volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
  1296. if err != nil {
  1297. return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1298. }
  1299. if volumePlugin == nil {
  1300. return volumetypes.GeneratedOperations{}, fmt.Errorf("Can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1301. }
  1302. expandVolumeFunc := func() (error, error) {
  1303. newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  1304. statusSize := pvc.Status.Capacity[v1.ResourceStorage]
  1305. pvSize := pv.Spec.Capacity[v1.ResourceStorage]
  1306. if pvSize.Cmp(newSize) < 0 {
  1307. updatedSize, expandErr := volumePlugin.ExpandVolumeDevice(
  1308. volumeSpec,
  1309. newSize,
  1310. statusSize)
  1311. if expandErr != nil {
  1312. detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr)
  1313. return detailedErr, detailedErr
  1314. }
  1315. klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1316. newSize = updatedSize
  1317. // k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be
  1318. // successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed
  1319. // until they reflect user requested size in pvc.Status.Size
  1320. updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
  1321. if updateErr != nil {
  1322. detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
  1323. return detailedErr, detailedErr
  1324. }
  1325. klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1326. }
  1327. fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
  1328. // No Cloudprovider resize needed, lets mark resizing as done
  1329. // Rest of the volume expand controller code will assume PVC as *not* resized until pvc.Status.Size
  1330. // reflects user requested size.
  1331. if !volumePlugin.RequiresFSResize() || !fsVolume {
  1332. klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1333. err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
  1334. if err != nil {
  1335. detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1336. return detailedErr, detailedErr
  1337. }
  1338. successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1339. og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
  1340. } else {
  1341. err := util.MarkForFSResize(pvc, og.kubeClient)
  1342. if err != nil {
  1343. detailedErr := fmt.Errorf("Error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1344. klog.Warning(detailedErr)
  1345. return nil, nil
  1346. }
  1347. }
  1348. return nil, nil
  1349. }
  1350. eventRecorderFunc := func(err *error) {
  1351. if *err != nil {
  1352. og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
  1353. }
  1354. }
  1355. return volumetypes.GeneratedOperations{
  1356. OperationName: "expand_volume",
  1357. OperationFunc: expandVolumeFunc,
  1358. EventRecorderFunc: eventRecorderFunc,
  1359. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
  1360. }, nil
  1361. }
  1362. func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
  1363. volumeToMount VolumeToMount,
  1364. actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  1365. fsResizeFunc := func() (error, error) {
  1366. // Need to translate the spec here if the plugin is migrated so that the metrics
  1367. // emitted show the correct (migrated) plugin
  1368. if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
  1369. csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
  1370. if err != nil {
  1371. return volumeToMount.GenerateError("VolumeFSResize.translateSpec failed", err)
  1372. }
  1373. volumeToMount.VolumeSpec = csiSpec
  1374. }
  1375. volumePlugin, err :=
  1376. og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  1377. if err != nil || volumePlugin == nil {
  1378. return volumeToMount.GenerateError("VolumeFSResize.FindPluginBySpec failed", err)
  1379. }
  1380. var resizeDone bool
  1381. var simpleErr, detailedErr error
  1382. resizeOptions := volume.NodeResizeOptions{
  1383. VolumeSpec: volumeToMount.VolumeSpec,
  1384. }
  1385. attachableVolumePlugin, _ :=
  1386. og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
  1387. if attachableVolumePlugin != nil {
  1388. volumeAttacher, _ := attachableVolumePlugin.NewAttacher()
  1389. if volumeAttacher != nil {
  1390. resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
  1391. resizeOptions.DevicePath = volumeToMount.DevicePath
  1392. dmp, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
  1393. if err != nil {
  1394. return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err)
  1395. }
  1396. resizeOptions.DeviceMountPath = dmp
  1397. resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
  1398. if simpleErr != nil || detailedErr != nil {
  1399. return simpleErr, detailedErr
  1400. }
  1401. if resizeDone {
  1402. return nil, nil
  1403. }
  1404. }
  1405. }
  1406. // if we are here that means volume plugin does not support attach interface
  1407. volumeMounter, newMounterErr := volumePlugin.NewMounter(
  1408. volumeToMount.VolumeSpec,
  1409. volumeToMount.Pod,
  1410. volume.VolumeOptions{})
  1411. if newMounterErr != nil {
  1412. return volumeToMount.GenerateError("VolumeFSResize.NewMounter initialization failed", newMounterErr)
  1413. }
  1414. resizeOptions.DeviceMountPath = volumeMounter.GetPath()
  1415. resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
  1416. resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
  1417. if simpleErr != nil || detailedErr != nil {
  1418. return simpleErr, detailedErr
  1419. }
  1420. if resizeDone {
  1421. return nil, nil
  1422. }
  1423. // This is a placeholder error - we should NEVER reach here.
  1424. err = fmt.Errorf("volume resizing failed for unknown reason")
  1425. return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err)
  1426. }
  1427. eventRecorderFunc := func(err *error) {
  1428. if *err != nil {
  1429. og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
  1430. }
  1431. }
  1432. // Need to translate the spec here if the plugin is migrated so that the metrics
  1433. // emitted show the correct (migrated) plugin
  1434. if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
  1435. csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
  1436. if err == nil {
  1437. volumeToMount.VolumeSpec = csiSpec
  1438. }
  1439. // If we have an error here we ignore it, the metric emitted will then be for the
  1440. // in-tree plugin. This error case(skipped one) will also trigger an error
  1441. // while the generated function is executed. And those errors will be handled during the execution of the generated
  1442. // function with a back off policy.
  1443. }
  1444. volumePlugin, err :=
  1445. og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  1446. if err != nil || volumePlugin == nil {
  1447. return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err)
  1448. }
  1449. return volumetypes.GeneratedOperations{
  1450. OperationName: "volume_fs_resize",
  1451. OperationFunc: fsResizeFunc,
  1452. EventRecorderFunc: eventRecorderFunc,
  1453. CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_fs_resize"),
  1454. }, nil
  1455. }
  1456. func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
  1457. actualStateOfWorld ActualStateOfWorldMounterUpdater,
  1458. resizeOptions volume.NodeResizeOptions) (bool, error, error) {
  1459. resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions)
  1460. if err != nil {
  1461. klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err)
  1462. e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err)
  1463. return false, e1, e2
  1464. }
  1465. if resizeDone {
  1466. markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName)
  1467. if markFSResizedErr != nil {
  1468. // On failure, return error. Caller will log and retry.
  1469. e1, e2 := volumeToMount.GenerateError("VolumeFSResize.MarkVolumeAsResized failed", markFSResizedErr)
  1470. return false, e1, e2
  1471. }
  1472. return true, nil, nil
  1473. }
  1474. return false, nil, nil
  1475. }
  1476. func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
  1477. mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)
  1478. if len(mountOptions) > 0 && !plugin.SupportsMountOption() {
  1479. return fmt.Errorf("Mount options are not supported for this volume type")
  1480. }
  1481. return nil
  1482. }
  1483. // checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels
  1484. // This ensures that we don't mount a volume that doesn't belong to this node
  1485. func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error {
  1486. if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
  1487. return nil
  1488. }
  1489. pv := volumeToMount.VolumeSpec.PersistentVolume
  1490. if pv != nil {
  1491. nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
  1492. if err != nil {
  1493. return err
  1494. }
  1495. err = util.CheckNodeAffinity(pv, nodeLabels)
  1496. if err != nil {
  1497. return err
  1498. }
  1499. }
  1500. return nil
  1501. }
  1502. // isDeviceOpened checks the device status if the device is in use anywhere else on the system
  1503. func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (bool, error) {
  1504. isDevicePath, devicePathErr := mounter.PathIsDevice(deviceToDetach.DevicePath)
  1505. var deviceOpened bool
  1506. var deviceOpenedErr error
  1507. if !isDevicePath && devicePathErr == nil ||
  1508. (devicePathErr != nil && strings.Contains(devicePathErr.Error(), "does not exist")) {
  1509. // not a device path or path doesn't exist
  1510. //TODO: refer to #36092
  1511. klog.V(3).Infof("The path isn't device path or doesn't exist. Skip checking device path: %s", deviceToDetach.DevicePath)
  1512. deviceOpened = false
  1513. } else if devicePathErr != nil {
  1514. return false, deviceToDetach.GenerateErrorDetailed("PathIsDevice failed", devicePathErr)
  1515. } else {
  1516. deviceOpened, deviceOpenedErr = mounter.DeviceOpened(deviceToDetach.DevicePath)
  1517. if deviceOpenedErr != nil {
  1518. return false, deviceToDetach.GenerateErrorDetailed("DeviceOpened failed", deviceOpenedErr)
  1519. }
  1520. }
  1521. return deviceOpened, nil
  1522. }
  1523. func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
  1524. // TODO(#75146) Check whether the driver is installed as well so that
  1525. // we can throw a better error when the driver is not installed.
  1526. // The error should be of the approximate form:
  1527. // fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
  1528. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
  1529. return false
  1530. }
  1531. if csilib.IsPVMigratable(spec.PersistentVolume) || csilib.IsInlineMigratable(spec.Volume) {
  1532. migratable, err := vpm.IsPluginMigratableBySpec(spec)
  1533. if err == nil && migratable {
  1534. return true
  1535. }
  1536. }
  1537. return false
  1538. }
  1539. func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) {
  1540. migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
  1541. if err != nil {
  1542. return false, err
  1543. }
  1544. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) ||
  1545. !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) ||
  1546. !migratable {
  1547. return false, nil
  1548. }
  1549. if len(nodeName) == 0 {
  1550. return false, goerrors.New("nodeName is empty")
  1551. }
  1552. kubeClient := og.volumePluginMgr.Host.GetKubeClient()
  1553. if kubeClient == nil {
  1554. // Don't handle the controller/kubelet version skew check and fallback
  1555. // to just checking the feature gates. This can happen if
  1556. // we are in a standalone (headless) Kubelet
  1557. return true, nil
  1558. }
  1559. adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost)
  1560. if !ok {
  1561. // Don't handle the controller/kubelet version skew check and fallback
  1562. // to just checking the feature gates. This can happen if
  1563. // "enableControllerAttachDetach" is set to true on kubelet
  1564. return true, nil
  1565. }
  1566. if adcHost.CSINodeLister() == nil {
  1567. return false, goerrors.New("could not find CSINodeLister in attachDetachController")
  1568. }
  1569. csiNode, err := adcHost.CSINodeLister().Get(string(nodeName))
  1570. if err != nil {
  1571. return false, err
  1572. }
  1573. ann := csiNode.GetAnnotations()
  1574. if ann == nil {
  1575. return false, nil
  1576. }
  1577. var mpaSet sets.String
  1578. mpa := ann[v1.MigratedPluginsAnnotationKey]
  1579. tok := strings.Split(mpa, ",")
  1580. if len(mpa) == 0 {
  1581. mpaSet = sets.NewString()
  1582. } else {
  1583. mpaSet = sets.NewString(tok...)
  1584. }
  1585. pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
  1586. if err != nil {
  1587. return false, err
  1588. }
  1589. if len(pluginName) == 0 {
  1590. // Could not find a plugin name from translation directory, assume not translated
  1591. return false, nil
  1592. }
  1593. isMigratedOnNode := mpaSet.Has(pluginName)
  1594. if isMigratedOnNode {
  1595. installed := false
  1596. driverName, err := csilib.GetCSINameFromInTreeName(pluginName)
  1597. if err != nil {
  1598. return isMigratedOnNode, err
  1599. }
  1600. for _, driver := range csiNode.Spec.Drivers {
  1601. if driver.Name == driverName {
  1602. installed = true
  1603. break
  1604. }
  1605. }
  1606. if !installed {
  1607. return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
  1608. }
  1609. }
  1610. return isMigratedOnNode, nil
  1611. }
  1612. func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
  1613. var csiPV *v1.PersistentVolume
  1614. var err error
  1615. inlineVolume := false
  1616. if spec.PersistentVolume != nil {
  1617. // TranslateInTreePVToCSI will create a new PV
  1618. csiPV, err = csilib.TranslateInTreePVToCSI(spec.PersistentVolume)
  1619. if err != nil {
  1620. return nil, fmt.Errorf("failed to translate in tree pv to CSI: %v", err)
  1621. }
  1622. } else if spec.Volume != nil {
  1623. // TranslateInTreeInlineVolumeToCSI will create a new PV
  1624. csiPV, err = csilib.TranslateInTreeInlineVolumeToCSI(spec.Volume)
  1625. if err != nil {
  1626. return nil, fmt.Errorf("failed to translate in tree inline volume to CSI: %v", err)
  1627. }
  1628. inlineVolume = true
  1629. } else {
  1630. return &volume.Spec{}, goerrors.New("not a valid volume spec")
  1631. }
  1632. return &volume.Spec{
  1633. PersistentVolume: csiPV,
  1634. ReadOnly: spec.ReadOnly,
  1635. InlineVolumeSpecForCSIMigration: inlineVolume,
  1636. }, nil
  1637. }