csi_plugin.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "errors"
  16. "fmt"
  17. "os"
  18. "path/filepath"
  19. "strings"
  20. "time"
  21. "context"
  22. "k8s.io/klog"
  23. api "k8s.io/api/core/v1"
  24. storage "k8s.io/api/storage/v1beta1"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. utilversion "k8s.io/apimachinery/pkg/util/version"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. clientset "k8s.io/client-go/kubernetes"
  33. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  34. csitranslationplugins "k8s.io/csi-translation-lib/plugins"
  35. "k8s.io/kubernetes/pkg/features"
  36. "k8s.io/kubernetes/pkg/volume"
  37. "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
  38. )
  39. const (
  40. // CSIPluginName is the name of the in-tree CSI Plugin
  41. CSIPluginName = "kubernetes.io/csi"
  42. csiTimeout = 2 * time.Minute
  43. volNameSep = "^"
  44. volDataFileName = "vol_data.json"
  45. fsTypeBlockName = "block"
  46. // CsiResyncPeriod is default resync period duration
  47. // TODO: increase to something useful
  48. CsiResyncPeriod = time.Minute
  49. )
  50. type csiPlugin struct {
  51. host volume.VolumeHost
  52. blockEnabled bool
  53. csiDriverLister storagelisters.CSIDriverLister
  54. }
  55. // ProbeVolumePlugins returns implemented plugins
  56. func ProbeVolumePlugins() []volume.VolumePlugin {
  57. p := &csiPlugin{
  58. host: nil,
  59. blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
  60. }
  61. return []volume.VolumePlugin{p}
  62. }
  63. // volume.VolumePlugin methods
  64. var _ volume.VolumePlugin = &csiPlugin{}
  65. // RegistrationHandler is the handler which is fed to the pluginwatcher API.
  66. type RegistrationHandler struct {
  67. }
  68. // TODO (verult) consider using a struct instead of global variables
  69. // csiDrivers map keep track of all registered CSI drivers on the node and their
  70. // corresponding sockets
  71. var csiDrivers = &DriversStore{}
  72. var nim nodeinfomanager.Interface
  73. // PluginHandler is the plugin registration handler interface passed to the
  74. // pluginwatcher module in kubelet
  75. var PluginHandler = &RegistrationHandler{}
  76. // ValidatePlugin is called by kubelet's plugin watcher upon detection
  77. // of a new registration socket opened by CSI Driver registrar side car.
  78. func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
  79. klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s",
  80. pluginName, endpoint, strings.Join(versions, ",")))
  81. _, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
  82. if err != nil {
  83. return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err)
  84. }
  85. return err
  86. }
  87. // RegisterPlugin is called when a plugin can be registered
  88. func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
  89. klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
  90. highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
  91. if err != nil {
  92. return err
  93. }
  94. // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
  95. // all other CSI components will be able to get the actual socket of CSI drivers by its name.
  96. csiDrivers.Set(pluginName, Driver{
  97. endpoint: endpoint,
  98. highestSupportedVersion: highestSupportedVersion,
  99. })
  100. // Get node info from the driver.
  101. csi, err := newCsiDriverClient(csiDriverName(pluginName))
  102. if err != nil {
  103. return err
  104. }
  105. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  106. defer cancel()
  107. driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
  108. if err != nil {
  109. if unregErr := unregisterDriver(pluginName); unregErr != nil {
  110. klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
  111. }
  112. return err
  113. }
  114. err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
  115. if err != nil {
  116. if unregErr := unregisterDriver(pluginName); unregErr != nil {
  117. klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
  118. }
  119. return err
  120. }
  121. return nil
  122. }
  123. func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
  124. if len(versions) == 0 {
  125. return nil, errors.New(log("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName))
  126. }
  127. // Validate version
  128. newDriverHighestVersion, err := highestSupportedVersion(versions)
  129. if err != nil {
  130. return nil, errors.New(log("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err))
  131. }
  132. existingDriver, driverExists := csiDrivers.Get(pluginName)
  133. if driverExists {
  134. if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
  135. return nil, errors.New(log("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion))
  136. }
  137. }
  138. return newDriverHighestVersion, nil
  139. }
  140. // DeRegisterPlugin is called when a plugin removed its socket, signaling
  141. // it is no longer available
  142. func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
  143. klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
  144. if err := unregisterDriver(pluginName); err != nil {
  145. klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
  146. }
  147. }
  148. func (p *csiPlugin) Init(host volume.VolumeHost) error {
  149. p.host = host
  150. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  151. csiClient := host.GetKubeClient()
  152. if csiClient == nil {
  153. klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
  154. } else {
  155. // set CSIDriverLister
  156. adcHost, ok := host.(volume.AttachDetachVolumeHost)
  157. if ok {
  158. p.csiDriverLister = adcHost.CSIDriverLister()
  159. if p.csiDriverLister == nil {
  160. klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
  161. }
  162. }
  163. kletHost, ok := host.(volume.KubeletVolumeHost)
  164. if ok {
  165. p.csiDriverLister = kletHost.CSIDriverLister()
  166. if p.csiDriverLister == nil {
  167. klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
  168. }
  169. }
  170. }
  171. }
  172. var migratedPlugins = map[string](func() bool){
  173. csitranslationplugins.GCEPDInTreePluginName: func() bool {
  174. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
  175. },
  176. csitranslationplugins.AWSEBSInTreePluginName: func() bool {
  177. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
  178. },
  179. csitranslationplugins.CinderInTreePluginName: func() bool {
  180. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
  181. },
  182. csitranslationplugins.AzureDiskInTreePluginName: func() bool {
  183. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
  184. },
  185. csitranslationplugins.AzureFileInTreePluginName: func() bool {
  186. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureFile)
  187. },
  188. }
  189. // Initializing the label management channels
  190. nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
  191. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
  192. utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
  193. // This function prevents Kubelet from posting Ready status until CSINodeInfo
  194. // is both installed and initialized
  195. if err := initializeCSINode(host); err != nil {
  196. return errors.New(log("failed to initialize CSINodeInfo: %v", err))
  197. }
  198. }
  199. return nil
  200. }
  201. func initializeCSINode(host volume.VolumeHost) error {
  202. kvh, ok := host.(volume.KubeletVolumeHost)
  203. if !ok {
  204. klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
  205. return nil
  206. }
  207. kubeClient := host.GetKubeClient()
  208. if kubeClient == nil {
  209. // Kubelet running in standalone mode. Skip CSINodeInfo initialization
  210. klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
  211. return nil
  212. }
  213. kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
  214. go func() {
  215. defer utilruntime.HandleCrash()
  216. // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
  217. // after max retry steps.
  218. initBackoff := wait.Backoff{
  219. Steps: 6,
  220. Duration: 15 * time.Millisecond,
  221. Factor: 6.0,
  222. Jitter: 0.1,
  223. }
  224. err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
  225. klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
  226. err := nim.InitializeCSINodeWithAnnotation()
  227. if err != nil {
  228. kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
  229. klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
  230. return false, nil
  231. }
  232. // Successfully initialized drivers, allow Kubelet to post Ready
  233. kvh.SetKubeletError(nil)
  234. return true, nil
  235. })
  236. if err != nil {
  237. // 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
  238. // are permanently enabled the apiserver/controllers can assume that the kubelet is
  239. // using CSI for all Migrated volume plugins. Then all the CSINode initialization
  240. // code can be dropped from Kubelet.
  241. // Kill the Kubelet process and allow it to restart to retry initialization
  242. klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
  243. }
  244. }()
  245. return nil
  246. }
  247. func (p *csiPlugin) GetPluginName() string {
  248. return CSIPluginName
  249. }
  250. // GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
  251. // That string value is used in Detach() to extract driver name and volumeName.
  252. func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  253. csi, err := getPVSourceFromSpec(spec)
  254. if err != nil {
  255. return "", errors.New(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
  256. }
  257. // return driverName<separator>volumeHandle
  258. return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
  259. }
  260. func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
  261. // TODO (vladimirvivien) CanSupport should also take into account
  262. // the availability/registration of specified Driver in the volume source
  263. if spec == nil {
  264. return false
  265. }
  266. if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  267. return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
  268. (spec.Volume != nil && spec.Volume.CSI != nil)
  269. }
  270. return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
  271. }
  272. func (p *csiPlugin) RequiresRemount() bool {
  273. return false
  274. }
  275. func (p *csiPlugin) NewMounter(
  276. spec *volume.Spec,
  277. pod *api.Pod,
  278. _ volume.VolumeOptions) (volume.Mounter, error) {
  279. volSrc, pvSrc, err := getSourceFromSpec(spec)
  280. if err != nil {
  281. return nil, err
  282. }
  283. var (
  284. driverName string
  285. volumeHandle string
  286. readOnly bool
  287. )
  288. switch {
  289. case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume):
  290. volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
  291. driverName = volSrc.Driver
  292. if volSrc.ReadOnly != nil {
  293. readOnly = *volSrc.ReadOnly
  294. }
  295. case pvSrc != nil:
  296. driverName = pvSrc.Driver
  297. volumeHandle = pvSrc.VolumeHandle
  298. readOnly = spec.ReadOnly
  299. default:
  300. return nil, errors.New(log("volume source not found in volume.Spec"))
  301. }
  302. volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
  303. if err != nil {
  304. return nil, err
  305. }
  306. // Check CSIDriver.Spec.Mode to ensure that the CSI driver
  307. // supports the current volumeLifecycleMode.
  308. if err := p.supportsVolumeLifecycleMode(driverName, volumeLifecycleMode); err != nil {
  309. return nil, err
  310. }
  311. k8s := p.host.GetKubeClient()
  312. if k8s == nil {
  313. return nil, errors.New(log("failed to get a kubernetes client"))
  314. }
  315. kvh, ok := p.host.(volume.KubeletVolumeHost)
  316. if !ok {
  317. return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
  318. }
  319. mounter := &csiMountMgr{
  320. plugin: p,
  321. k8s: k8s,
  322. spec: spec,
  323. pod: pod,
  324. podUID: pod.UID,
  325. driverName: csiDriverName(driverName),
  326. volumeLifecycleMode: volumeLifecycleMode,
  327. volumeID: volumeHandle,
  328. specVolumeID: spec.Name(),
  329. readOnly: readOnly,
  330. kubeVolHost: kvh,
  331. }
  332. mounter.csiClientGetter.driverName = csiDriverName(driverName)
  333. // Save volume info in pod dir
  334. dir := mounter.GetPath()
  335. dataDir := filepath.Dir(dir) // dropoff /mount at end
  336. if err := os.MkdirAll(dataDir, 0750); err != nil {
  337. return nil, errors.New(log("failed to create dir %#v: %v", dataDir, err))
  338. }
  339. klog.V(4).Info(log("created path successfully [%s]", dataDir))
  340. mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
  341. // persist volume info data for teardown
  342. node := string(p.host.GetNodeName())
  343. volData := map[string]string{
  344. volDataKey.specVolID: spec.Name(),
  345. volDataKey.volHandle: volumeHandle,
  346. volDataKey.driverName: driverName,
  347. volDataKey.nodeName: node,
  348. volDataKey.volumeLifecycleMode: string(volumeLifecycleMode),
  349. }
  350. attachID := getAttachmentName(volumeHandle, driverName, node)
  351. volData[volDataKey.attachmentID] = attachID
  352. if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
  353. if removeErr := os.RemoveAll(dataDir); removeErr != nil {
  354. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, removeErr))
  355. }
  356. return nil, errors.New(log("failed to save volume info data: %v", err))
  357. }
  358. klog.V(4).Info(log("mounter created successfully"))
  359. return mounter, nil
  360. }
  361. func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
  362. klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
  363. kvh, ok := p.host.(volume.KubeletVolumeHost)
  364. if !ok {
  365. return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
  366. }
  367. unmounter := &csiMountMgr{
  368. plugin: p,
  369. podUID: podUID,
  370. specVolumeID: specName,
  371. kubeVolHost: kvh,
  372. }
  373. // load volume info from file
  374. dir := unmounter.GetPath()
  375. dataDir := filepath.Dir(dir) // dropoff /mount at end
  376. data, err := loadVolumeData(dataDir, volDataFileName)
  377. if err != nil {
  378. return nil, errors.New(log("unmounter failed to load volume data file [%s]: %v", dir, err))
  379. }
  380. unmounter.driverName = csiDriverName(data[volDataKey.driverName])
  381. unmounter.volumeID = data[volDataKey.volHandle]
  382. unmounter.csiClientGetter.driverName = unmounter.driverName
  383. return unmounter, nil
  384. }
  385. func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  386. klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
  387. volData, err := loadVolumeData(mountPath, volDataFileName)
  388. if err != nil {
  389. return nil, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
  390. }
  391. klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
  392. var spec *volume.Spec
  393. inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
  394. // If inlineEnabled is true and mode is VolumeLifecycleEphemeral,
  395. // use constructVolSourceSpec to construct volume source spec.
  396. // If inlineEnabled is false or mode is VolumeLifecyclePersistent,
  397. // use constructPVSourceSpec to construct volume construct pv source spec.
  398. if inlineEnabled && storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral {
  399. spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
  400. return spec, nil
  401. }
  402. spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
  403. return spec, nil
  404. }
  405. // constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
  406. func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
  407. vol := &api.Volume{
  408. Name: volSpecName,
  409. VolumeSource: api.VolumeSource{
  410. CSI: &api.CSIVolumeSource{
  411. Driver: driverName,
  412. },
  413. },
  414. }
  415. return volume.NewSpecFromVolume(vol)
  416. }
  417. //constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
  418. func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
  419. fsMode := api.PersistentVolumeFilesystem
  420. pv := &api.PersistentVolume{
  421. ObjectMeta: meta.ObjectMeta{
  422. Name: volSpecName,
  423. },
  424. Spec: api.PersistentVolumeSpec{
  425. PersistentVolumeSource: api.PersistentVolumeSource{
  426. CSI: &api.CSIPersistentVolumeSource{
  427. Driver: driverName,
  428. VolumeHandle: volumeHandle,
  429. },
  430. },
  431. VolumeMode: &fsMode,
  432. },
  433. }
  434. return volume.NewSpecFromPersistentVolume(pv, false)
  435. }
  436. func (p *csiPlugin) SupportsMountOption() bool {
  437. // TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
  438. // to probe for the result for this method
  439. // (bswartz) Until the CSI spec supports probing, our only option is to
  440. // make plugins register their support for mount options or lack thereof
  441. // directly with kubernetes.
  442. return true
  443. }
  444. func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
  445. return false
  446. }
  447. // volume.AttachableVolumePlugin methods
  448. var _ volume.AttachableVolumePlugin = &csiPlugin{}
  449. var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
  450. func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
  451. return p.newAttacherDetacher()
  452. }
  453. func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  454. return p.NewAttacher()
  455. }
  456. func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
  457. return p.newAttacherDetacher()
  458. }
  459. func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) {
  460. inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
  461. if inlineEnabled {
  462. volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
  463. if err != nil {
  464. return false, err
  465. }
  466. if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
  467. klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name()))
  468. return false, nil
  469. }
  470. }
  471. pvSrc, err := getCSISourceFromSpec(spec)
  472. if err != nil {
  473. return false, err
  474. }
  475. driverName := pvSrc.Driver
  476. skipAttach, err := p.skipAttach(driverName)
  477. if err != nil {
  478. return false, err
  479. }
  480. return !skipAttach, nil
  481. }
  482. // CanDeviceMount returns true if the spec supports device mount
  483. func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  484. inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
  485. if !inlineEnabled {
  486. // No need to check anything, we assume it is a persistent volume.
  487. return true, nil
  488. }
  489. volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
  490. if err != nil {
  491. return false, err
  492. }
  493. if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
  494. klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name()))
  495. return false, nil
  496. }
  497. // Persistent volumes support device mount.
  498. return true, nil
  499. }
  500. func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  501. return p.NewDetacher()
  502. }
  503. func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  504. m := p.host.GetMounter(p.GetPluginName())
  505. return m.GetMountRefs(deviceMountPath)
  506. }
  507. // BlockVolumePlugin methods
  508. var _ volume.BlockVolumePlugin = &csiPlugin{}
  509. func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
  510. if !p.blockEnabled {
  511. return nil, errors.New("CSIBlockVolume feature not enabled")
  512. }
  513. pvSource, err := getCSISourceFromSpec(spec)
  514. if err != nil {
  515. return nil, err
  516. }
  517. readOnly, err := getReadOnlyFromSpec(spec)
  518. if err != nil {
  519. return nil, err
  520. }
  521. klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
  522. k8s := p.host.GetKubeClient()
  523. if k8s == nil {
  524. return nil, errors.New(log("failed to get a kubernetes client"))
  525. }
  526. mapper := &csiBlockMapper{
  527. k8s: k8s,
  528. plugin: p,
  529. volumeID: pvSource.VolumeHandle,
  530. driverName: csiDriverName(pvSource.Driver),
  531. readOnly: readOnly,
  532. spec: spec,
  533. specName: spec.Name(),
  534. podUID: podRef.UID,
  535. }
  536. mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
  537. // Save volume info in pod dir
  538. dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
  539. if err := os.MkdirAll(dataDir, 0750); err != nil {
  540. return nil, errors.New(log("failed to create data dir %s: %v", dataDir, err))
  541. }
  542. klog.V(4).Info(log("created path successfully [%s]", dataDir))
  543. // persist volume info data for teardown
  544. node := string(p.host.GetNodeName())
  545. attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
  546. volData := map[string]string{
  547. volDataKey.specVolID: spec.Name(),
  548. volDataKey.volHandle: pvSource.VolumeHandle,
  549. volDataKey.driverName: pvSource.Driver,
  550. volDataKey.nodeName: node,
  551. volDataKey.attachmentID: attachID,
  552. }
  553. if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
  554. if removeErr := os.RemoveAll(dataDir); removeErr != nil {
  555. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, removeErr))
  556. }
  557. return nil, errors.New(log("failed to save volume info data: %v", err))
  558. }
  559. return mapper, nil
  560. }
  561. func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
  562. if !p.blockEnabled {
  563. return nil, errors.New("CSIBlockVolume feature not enabled")
  564. }
  565. klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
  566. unmapper := &csiBlockMapper{
  567. plugin: p,
  568. podUID: podUID,
  569. specName: volName,
  570. }
  571. // load volume info from file
  572. dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
  573. data, err := loadVolumeData(dataDir, volDataFileName)
  574. if err != nil {
  575. return nil, errors.New(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
  576. }
  577. unmapper.driverName = csiDriverName(data[volDataKey.driverName])
  578. unmapper.volumeID = data[volDataKey.volHandle]
  579. unmapper.csiClientGetter.driverName = unmapper.driverName
  580. return unmapper, nil
  581. }
  582. func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
  583. if !p.blockEnabled {
  584. return nil, errors.New("CSIBlockVolume feature not enabled")
  585. }
  586. klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
  587. dataDir := getVolumeDeviceDataDir(specVolName, p.host)
  588. volData, err := loadVolumeData(dataDir, volDataFileName)
  589. if err != nil {
  590. return nil, errors.New(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
  591. }
  592. klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
  593. blockMode := api.PersistentVolumeBlock
  594. pv := &api.PersistentVolume{
  595. ObjectMeta: meta.ObjectMeta{
  596. Name: volData[volDataKey.specVolID],
  597. },
  598. Spec: api.PersistentVolumeSpec{
  599. PersistentVolumeSource: api.PersistentVolumeSource{
  600. CSI: &api.CSIPersistentVolumeSource{
  601. Driver: volData[volDataKey.driverName],
  602. VolumeHandle: volData[volDataKey.volHandle],
  603. },
  604. },
  605. VolumeMode: &blockMode,
  606. },
  607. }
  608. return volume.NewSpecFromPersistentVolume(pv, false), nil
  609. }
  610. // skipAttach looks up CSIDriver object associated with driver name
  611. // to determine if driver requires attachment volume operation
  612. func (p *csiPlugin) skipAttach(driver string) (bool, error) {
  613. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  614. return false, nil
  615. }
  616. kletHost, ok := p.host.(volume.KubeletVolumeHost)
  617. if ok {
  618. if err := kletHost.WaitForCacheSync(); err != nil {
  619. return false, err
  620. }
  621. }
  622. if p.csiDriverLister == nil {
  623. return false, errors.New("CSIDriver lister does not exist")
  624. }
  625. csiDriver, err := p.csiDriverLister.Get(driver)
  626. if err != nil {
  627. if apierrors.IsNotFound(err) {
  628. // Don't skip attach if CSIDriver does not exist
  629. return false, nil
  630. }
  631. return false, err
  632. }
  633. if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
  634. return true, nil
  635. }
  636. return false, nil
  637. }
  638. // supportsVolumeMode checks whether the CSI driver supports a volume in the given mode.
  639. // An error indicates that it isn't supported and explains why.
  640. func (p *csiPlugin) supportsVolumeLifecycleMode(driver string, volumeMode storage.VolumeLifecycleMode) error {
  641. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  642. // Feature disabled, therefore only "persistent" volumes are supported.
  643. if volumeMode != storage.VolumeLifecyclePersistent {
  644. return fmt.Errorf("CSIInlineVolume feature not enabled, %q volumes not supported", volumeMode)
  645. }
  646. return nil
  647. }
  648. // Retrieve CSIDriver. It's not an error if that isn't
  649. // possible (we don't have the lister if CSIDriverRegistry is
  650. // disabled) or the driver isn't found (CSIDriver is
  651. // optional), but then only persistent volumes are supported.
  652. var csiDriver *storage.CSIDriver
  653. if p.csiDriverLister != nil {
  654. kletHost, ok := p.host.(volume.KubeletVolumeHost)
  655. if ok {
  656. if err := kletHost.WaitForCacheSync(); err != nil {
  657. return err
  658. }
  659. }
  660. c, err := p.csiDriverLister.Get(driver)
  661. if err != nil && !apierrors.IsNotFound(err) {
  662. // Some internal error.
  663. return err
  664. }
  665. csiDriver = c
  666. }
  667. // The right response depends on whether we have information
  668. // about the driver and the volume mode.
  669. switch {
  670. case csiDriver == nil && volumeMode == storage.VolumeLifecyclePersistent:
  671. // No information, but that's okay for persistent volumes (and only those).
  672. return nil
  673. case csiDriver == nil:
  674. return fmt.Errorf("volume mode %q not supported by driver %s (no CSIDriver object)", volumeMode, driver)
  675. case containsVolumeMode(csiDriver.Spec.VolumeLifecycleModes, volumeMode):
  676. // Explicitly listed.
  677. return nil
  678. default:
  679. return fmt.Errorf("volume mode %q not supported by driver %s (only supports %q)", volumeMode, driver, csiDriver.Spec.VolumeLifecycleModes)
  680. }
  681. }
  682. // containsVolumeMode checks whether the given volume mode is listed.
  683. func containsVolumeMode(modes []storage.VolumeLifecycleMode, mode storage.VolumeLifecycleMode) bool {
  684. for _, m := range modes {
  685. if m == mode {
  686. return true
  687. }
  688. }
  689. return false
  690. }
  691. // getVolumeLifecycleMode returns the mode for the specified spec: {persistent|ephemeral}.
  692. // 1) If mode cannot be determined, it will default to "persistent".
  693. // 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
  694. // See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md
  695. func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLifecycleMode, error) {
  696. // 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
  697. // 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
  698. volSrc, _, err := getSourceFromSpec(spec)
  699. if err != nil {
  700. return "", err
  701. }
  702. if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  703. return storage.VolumeLifecycleEphemeral, nil
  704. }
  705. return storage.VolumeLifecyclePersistent, nil
  706. }
  707. func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
  708. skip, err := p.skipAttach(driver)
  709. if err != nil {
  710. return nil, err
  711. }
  712. if skip {
  713. return nil, nil
  714. }
  715. attachID := getAttachmentName(handle, driver, nodeName)
  716. // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  717. attachment, err := client.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
  718. if err != nil {
  719. return nil, err // This err already has enough context ("VolumeAttachment xyz not found")
  720. }
  721. if attachment == nil {
  722. err = errors.New("no existing VolumeAttachment found")
  723. return nil, err
  724. }
  725. return attachment.Status.AttachmentMetadata, nil
  726. }
  727. func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
  728. k8s := p.host.GetKubeClient()
  729. if k8s == nil {
  730. return nil, errors.New(log("unable to get kubernetes client from host"))
  731. }
  732. return &csiAttacher{
  733. plugin: p,
  734. k8s: k8s,
  735. waitSleepTime: 1 * time.Second,
  736. }, nil
  737. }
  738. func unregisterDriver(driverName string) error {
  739. csiDrivers.Delete(driverName)
  740. if err := nim.UninstallCSIDriver(driverName); err != nil {
  741. return errors.New(log("Error uninstalling CSI driver: %v", err))
  742. }
  743. return nil
  744. }
  745. // Return the highest supported version
  746. func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
  747. if len(versions) == 0 {
  748. return nil, errors.New(log("CSI driver reporting empty array for supported versions"))
  749. }
  750. var highestSupportedVersion *utilversion.Version
  751. var theErr error
  752. for i := len(versions) - 1; i >= 0; i-- {
  753. currentHighestVer, err := utilversion.ParseGeneric(versions[i])
  754. if err != nil {
  755. theErr = err
  756. continue
  757. }
  758. if currentHighestVer.Major() > 1 {
  759. // CSI currently only has version 0.x and 1.x (see https://github.com/container-storage-interface/spec/releases).
  760. // Therefore any driver claiming version 2.x+ is ignored as an unsupported versions.
  761. // Future 1.x versions of CSI are supposed to be backwards compatible so this version of Kubernetes will work with any 1.x driver
  762. // (or 0.x), but it may not work with 2.x drivers (because 2.x does not have to be backwards compatible with 1.x).
  763. continue
  764. }
  765. if highestSupportedVersion == nil || highestSupportedVersion.LessThan(currentHighestVer) {
  766. highestSupportedVersion = currentHighestVer
  767. }
  768. }
  769. if highestSupportedVersion == nil {
  770. return nil, fmt.Errorf("could not find a highest supported version from versions (%v) reported by this driver: %v", versions, theErr)
  771. }
  772. if highestSupportedVersion.Major() != 1 {
  773. // CSI v0.x is no longer supported as of Kubernetes v1.17 in
  774. // accordance with deprecation policy set out in Kubernetes v1.13
  775. return nil, fmt.Errorf("highest supported version reported by driver is %v, must be v1.x", highestSupportedVersion)
  776. }
  777. return highestSupportedVersion, nil
  778. }