csi_plugin.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package csi
  14. import (
  15. "errors"
  16. "fmt"
  17. "os"
  18. "path"
  19. "sort"
  20. "strings"
  21. "time"
  22. "context"
  23. "k8s.io/klog"
  24. api "k8s.io/api/core/v1"
  25. apierrs "k8s.io/apimachinery/pkg/api/errors"
  26. meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. utilversion "k8s.io/apimachinery/pkg/util/version"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. clientset "k8s.io/client-go/kubernetes"
  33. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  34. csitranslationplugins "k8s.io/csi-translation-lib/plugins"
  35. "k8s.io/kubernetes/pkg/features"
  36. "k8s.io/kubernetes/pkg/volume"
  37. "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
  38. )
  39. const (
  40. // CSIPluginName is the name of the in-tree CSI Plugin
  41. CSIPluginName = "kubernetes.io/csi"
  42. // TODO (vladimirvivien) implement a more dynamic way to discover
  43. // the unix domain socket path for each installed csi driver.
  44. // TODO (vladimirvivien) would be nice to name socket with a .sock extension
  45. // for consistency.
  46. csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock"
  47. csiTimeout = 2 * time.Minute
  48. volNameSep = "^"
  49. volDataFileName = "vol_data.json"
  50. fsTypeBlockName = "block"
  51. // TODO: increase to something useful
  52. csiResyncPeriod = time.Minute
  53. )
  54. var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"}
  55. type csiPlugin struct {
  56. host volume.VolumeHost
  57. blockEnabled bool
  58. csiDriverLister storagelisters.CSIDriverLister
  59. }
  60. //TODO (vladimirvivien) add this type to storage api
  61. type driverMode string
  62. const persistentDriverMode driverMode = "persistent"
  63. const ephemeralDriverMode driverMode = "ephemeral"
  64. // ProbeVolumePlugins returns implemented plugins
  65. func ProbeVolumePlugins() []volume.VolumePlugin {
  66. p := &csiPlugin{
  67. host: nil,
  68. blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
  69. }
  70. return []volume.VolumePlugin{p}
  71. }
  72. // volume.VolumePlugin methods
  73. var _ volume.VolumePlugin = &csiPlugin{}
  74. // RegistrationHandler is the handler which is fed to the pluginwatcher API.
  75. type RegistrationHandler struct {
  76. }
  77. // TODO (verult) consider using a struct instead of global variables
  78. // csiDrivers map keep track of all registered CSI drivers on the node and their
  79. // corresponding sockets
  80. var csiDrivers = &DriversStore{}
  81. var nim nodeinfomanager.Interface
  82. // PluginHandler is the plugin registration handler interface passed to the
  83. // pluginwatcher module in kubelet
  84. var PluginHandler = &RegistrationHandler{}
  85. // ValidatePlugin is called by kubelet's plugin watcher upon detection
  86. // of a new registration socket opened by CSI Driver registrar side car.
  87. func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
  88. klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s, foundInDeprecatedDir: %v",
  89. pluginName, endpoint, strings.Join(versions, ","), foundInDeprecatedDir))
  90. if foundInDeprecatedDir {
  91. // CSI 0.x drivers used /var/lib/kubelet/plugins as the socket dir.
  92. // This was deprecated as the socket dir for kubelet drivers, in lieu of a dedicated dir /var/lib/kubelet/plugins_registry
  93. // The deprecated dir will only be allowed for a whitelisted set of old versions.
  94. // CSI 1.x drivers should use the /var/lib/kubelet/plugins_registry
  95. if !isDeprecatedSocketDirAllowed(versions) {
  96. err := fmt.Errorf("socket for CSI driver %q versions %v was found in a deprecated dir. Drivers implementing CSI 1.x+ must use the new dir", pluginName, versions)
  97. klog.Error(err)
  98. return err
  99. }
  100. }
  101. _, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
  102. return err
  103. }
  104. // RegisterPlugin is called when a plugin can be registered
  105. func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
  106. klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
  107. highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
  108. if err != nil {
  109. return err
  110. }
  111. // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
  112. // all other CSI components will be able to get the actual socket of CSI drivers by its name.
  113. csiDrivers.Set(pluginName, Driver{
  114. endpoint: endpoint,
  115. highestSupportedVersion: highestSupportedVersion,
  116. })
  117. // Get node info from the driver.
  118. csi, err := newCsiDriverClient(csiDriverName(pluginName))
  119. if err != nil {
  120. return err
  121. }
  122. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  123. defer cancel()
  124. driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
  125. if err != nil {
  126. if unregErr := unregisterDriver(pluginName); unregErr != nil {
  127. klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
  128. }
  129. return err
  130. }
  131. err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
  132. if err != nil {
  133. if unregErr := unregisterDriver(pluginName); unregErr != nil {
  134. klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
  135. }
  136. return err
  137. }
  138. return nil
  139. }
  140. func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
  141. if len(versions) == 0 {
  142. err := fmt.Errorf("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName)
  143. klog.Error(err)
  144. return nil, err
  145. }
  146. // Validate version
  147. newDriverHighestVersion, err := highestSupportedVersion(versions)
  148. if err != nil {
  149. err := fmt.Errorf("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err)
  150. klog.Error(err)
  151. return nil, err
  152. }
  153. existingDriver, driverExists := csiDrivers.Get(pluginName)
  154. if driverExists {
  155. if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
  156. err := fmt.Errorf("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion)
  157. klog.Error(err)
  158. return nil, err
  159. }
  160. }
  161. return newDriverHighestVersion, nil
  162. }
  163. // DeRegisterPlugin is called when a plugin removed its socket, signaling
  164. // it is no longer available
  165. func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
  166. klog.V(4).Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
  167. if err := unregisterDriver(pluginName); err != nil {
  168. klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
  169. }
  170. }
  171. func (p *csiPlugin) Init(host volume.VolumeHost) error {
  172. p.host = host
  173. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  174. csiClient := host.GetKubeClient()
  175. if csiClient == nil {
  176. klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
  177. } else {
  178. // set CSIDriverLister
  179. adcHost, ok := host.(volume.AttachDetachVolumeHost)
  180. if ok {
  181. p.csiDriverLister = adcHost.CSIDriverLister()
  182. if p.csiDriverLister == nil {
  183. klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
  184. }
  185. }
  186. kletHost, ok := host.(volume.KubeletVolumeHost)
  187. if ok {
  188. p.csiDriverLister = kletHost.CSIDriverLister()
  189. if p.csiDriverLister == nil {
  190. klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
  191. }
  192. }
  193. }
  194. }
  195. var migratedPlugins = map[string](func() bool){
  196. csitranslationplugins.GCEPDInTreePluginName: func() bool {
  197. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
  198. },
  199. csitranslationplugins.AWSEBSInTreePluginName: func() bool {
  200. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
  201. },
  202. csitranslationplugins.CinderInTreePluginName: func() bool {
  203. return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
  204. },
  205. }
  206. // Initializing the label management channels
  207. nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
  208. if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
  209. utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
  210. // This function prevents Kubelet from posting Ready status until CSINodeInfo
  211. // is both installed and initialized
  212. if err := initializeCSINode(host); err != nil {
  213. return fmt.Errorf("failed to initialize CSINodeInfo: %v", err)
  214. }
  215. }
  216. return nil
  217. }
  218. func initializeCSINode(host volume.VolumeHost) error {
  219. kvh, ok := host.(volume.KubeletVolumeHost)
  220. if !ok {
  221. klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
  222. return nil
  223. }
  224. kubeClient := host.GetKubeClient()
  225. if kubeClient == nil {
  226. // Kubelet running in standalone mode. Skip CSINodeInfo initialization
  227. klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
  228. return nil
  229. }
  230. kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
  231. go func() {
  232. defer utilruntime.HandleCrash()
  233. // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
  234. // after max retry steps.
  235. initBackoff := wait.Backoff{
  236. Steps: 6,
  237. Duration: 15 * time.Millisecond,
  238. Factor: 6.0,
  239. Jitter: 0.1,
  240. }
  241. err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
  242. klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
  243. err := nim.InitializeCSINodeWithAnnotation()
  244. if err != nil {
  245. kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
  246. klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
  247. return false, nil
  248. }
  249. // Successfully initialized drivers, allow Kubelet to post Ready
  250. kvh.SetKubeletError(nil)
  251. return true, nil
  252. })
  253. if err != nil {
  254. // 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
  255. // are permanently enabled the apiserver/controllers can assume that the kubelet is
  256. // using CSI for all Migrated volume plugins. Then all the CSINode initialization
  257. // code can be dropped from Kubelet.
  258. // Kill the Kubelet process and allow it to restart to retry initialization
  259. klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
  260. }
  261. }()
  262. return nil
  263. }
  264. func (p *csiPlugin) GetPluginName() string {
  265. return CSIPluginName
  266. }
  267. // GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
  268. // That string value is used in Detach() to extract driver name and volumeName.
  269. func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  270. csi, err := getPVSourceFromSpec(spec)
  271. if err != nil {
  272. klog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
  273. return "", err
  274. }
  275. // return driverName<separator>volumeHandle
  276. return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
  277. }
  278. func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
  279. // TODO (vladimirvivien) CanSupport should also take into account
  280. // the availability/registration of specified Driver in the volume source
  281. if spec == nil {
  282. return false
  283. }
  284. if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  285. return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
  286. (spec.Volume != nil && spec.Volume.CSI != nil)
  287. }
  288. return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
  289. }
  290. func (p *csiPlugin) IsMigratedToCSI() bool {
  291. return false
  292. }
  293. func (p *csiPlugin) RequiresRemount() bool {
  294. return false
  295. }
  296. func (p *csiPlugin) NewMounter(
  297. spec *volume.Spec,
  298. pod *api.Pod,
  299. _ volume.VolumeOptions) (volume.Mounter, error) {
  300. volSrc, pvSrc, err := getSourceFromSpec(spec)
  301. if err != nil {
  302. return nil, err
  303. }
  304. var (
  305. driverName string
  306. volumeHandle string
  307. readOnly bool
  308. )
  309. switch {
  310. case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume):
  311. volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
  312. driverName = volSrc.Driver
  313. if volSrc.ReadOnly != nil {
  314. readOnly = *volSrc.ReadOnly
  315. }
  316. case pvSrc != nil:
  317. driverName = pvSrc.Driver
  318. volumeHandle = pvSrc.VolumeHandle
  319. readOnly = spec.ReadOnly
  320. default:
  321. return nil, fmt.Errorf("volume source not found in volume.Spec")
  322. }
  323. driverMode, err := p.getDriverMode(spec)
  324. if err != nil {
  325. return nil, err
  326. }
  327. k8s := p.host.GetKubeClient()
  328. if k8s == nil {
  329. klog.Error(log("failed to get a kubernetes client"))
  330. return nil, errors.New("failed to get a Kubernetes client")
  331. }
  332. mounter := &csiMountMgr{
  333. plugin: p,
  334. k8s: k8s,
  335. spec: spec,
  336. pod: pod,
  337. podUID: pod.UID,
  338. driverName: csiDriverName(driverName),
  339. driverMode: driverMode,
  340. volumeID: volumeHandle,
  341. specVolumeID: spec.Name(),
  342. readOnly: readOnly,
  343. }
  344. mounter.csiClientGetter.driverName = csiDriverName(driverName)
  345. // Save volume info in pod dir
  346. dir := mounter.GetPath()
  347. dataDir := path.Dir(dir) // dropoff /mount at end
  348. if err := os.MkdirAll(dataDir, 0750); err != nil {
  349. klog.Error(log("failed to create dir %#v: %v", dataDir, err))
  350. return nil, err
  351. }
  352. klog.V(4).Info(log("created path successfully [%s]", dataDir))
  353. mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
  354. // persist volume info data for teardown
  355. node := string(p.host.GetNodeName())
  356. volData := map[string]string{
  357. volDataKey.specVolID: spec.Name(),
  358. volDataKey.volHandle: volumeHandle,
  359. volDataKey.driverName: driverName,
  360. volDataKey.nodeName: node,
  361. volDataKey.driverMode: string(driverMode),
  362. }
  363. attachID := getAttachmentName(volumeHandle, driverName, node)
  364. volData[volDataKey.attachmentID] = attachID
  365. if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
  366. klog.Error(log("failed to save volume info data: %v", err))
  367. if err := os.RemoveAll(dataDir); err != nil {
  368. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, err))
  369. return nil, err
  370. }
  371. return nil, err
  372. }
  373. klog.V(4).Info(log("mounter created successfully"))
  374. return mounter, nil
  375. }
  376. func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
  377. klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
  378. unmounter := &csiMountMgr{
  379. plugin: p,
  380. podUID: podUID,
  381. specVolumeID: specName,
  382. }
  383. // load volume info from file
  384. dir := unmounter.GetPath()
  385. dataDir := path.Dir(dir) // dropoff /mount at end
  386. data, err := loadVolumeData(dataDir, volDataFileName)
  387. if err != nil {
  388. klog.Error(log("unmounter failed to load volume data file [%s]: %v", dir, err))
  389. return nil, err
  390. }
  391. unmounter.driverName = csiDriverName(data[volDataKey.driverName])
  392. unmounter.volumeID = data[volDataKey.volHandle]
  393. unmounter.csiClientGetter.driverName = unmounter.driverName
  394. return unmounter, nil
  395. }
  396. func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  397. klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
  398. volData, err := loadVolumeData(mountPath, volDataFileName)
  399. if err != nil {
  400. klog.Error(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
  401. return nil, err
  402. }
  403. klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
  404. var spec *volume.Spec
  405. inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
  406. if inlineEnabled {
  407. mode := driverMode(volData[volDataKey.driverMode])
  408. switch {
  409. case mode == ephemeralDriverMode:
  410. spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
  411. case mode == persistentDriverMode:
  412. fallthrough
  413. default:
  414. spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
  415. }
  416. } else {
  417. spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
  418. }
  419. return spec, nil
  420. }
  421. // constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
  422. func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
  423. vol := &api.Volume{
  424. Name: volSpecName,
  425. VolumeSource: api.VolumeSource{
  426. CSI: &api.CSIVolumeSource{
  427. Driver: driverName,
  428. },
  429. },
  430. }
  431. return volume.NewSpecFromVolume(vol)
  432. }
  433. //constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
  434. func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
  435. fsMode := api.PersistentVolumeFilesystem
  436. pv := &api.PersistentVolume{
  437. ObjectMeta: meta.ObjectMeta{
  438. Name: volSpecName,
  439. },
  440. Spec: api.PersistentVolumeSpec{
  441. PersistentVolumeSource: api.PersistentVolumeSource{
  442. CSI: &api.CSIPersistentVolumeSource{
  443. Driver: driverName,
  444. VolumeHandle: volumeHandle,
  445. },
  446. },
  447. VolumeMode: &fsMode,
  448. },
  449. }
  450. return volume.NewSpecFromPersistentVolume(pv, false)
  451. }
  452. func (p *csiPlugin) SupportsMountOption() bool {
  453. // TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
  454. // to probe for the result for this method
  455. // (bswartz) Until the CSI spec supports probing, our only option is to
  456. // make plugins register their support for mount options or lack thereof
  457. // directly with kubernetes.
  458. return true
  459. }
  460. func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
  461. return false
  462. }
  463. // volume.AttachableVolumePlugin methods
  464. var _ volume.AttachableVolumePlugin = &csiPlugin{}
  465. var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
  466. func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
  467. return p.newAttacherDetacher()
  468. }
  469. func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
  470. return p.NewAttacher()
  471. }
  472. func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
  473. return p.newAttacherDetacher()
  474. }
  475. func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) {
  476. driverMode, err := p.getDriverMode(spec)
  477. if err != nil {
  478. return false, err
  479. }
  480. if driverMode == ephemeralDriverMode {
  481. klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name()))
  482. return false, nil
  483. }
  484. pvSrc, err := getCSISourceFromSpec(spec)
  485. if err != nil {
  486. return false, err
  487. }
  488. driverName := pvSrc.Driver
  489. skipAttach, err := p.skipAttach(driverName)
  490. if err != nil {
  491. return false, err
  492. }
  493. return !skipAttach, nil
  494. }
  495. // CanDeviceMount returns true if the spec supports device mount
  496. func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
  497. driverMode, err := p.getDriverMode(spec)
  498. if err != nil {
  499. return false, err
  500. }
  501. if driverMode == ephemeralDriverMode {
  502. klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name()))
  503. return false, nil
  504. }
  505. return true, nil
  506. }
  507. func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
  508. return p.NewDetacher()
  509. }
  510. func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
  511. m := p.host.GetMounter(p.GetPluginName())
  512. return m.GetMountRefs(deviceMountPath)
  513. }
  514. // BlockVolumePlugin methods
  515. var _ volume.BlockVolumePlugin = &csiPlugin{}
  516. func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
  517. if !p.blockEnabled {
  518. return nil, errors.New("CSIBlockVolume feature not enabled")
  519. }
  520. pvSource, err := getCSISourceFromSpec(spec)
  521. if err != nil {
  522. return nil, err
  523. }
  524. readOnly, err := getReadOnlyFromSpec(spec)
  525. if err != nil {
  526. return nil, err
  527. }
  528. klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
  529. k8s := p.host.GetKubeClient()
  530. if k8s == nil {
  531. klog.Error(log("failed to get a kubernetes client"))
  532. return nil, errors.New("failed to get a Kubernetes client")
  533. }
  534. mapper := &csiBlockMapper{
  535. k8s: k8s,
  536. plugin: p,
  537. volumeID: pvSource.VolumeHandle,
  538. driverName: csiDriverName(pvSource.Driver),
  539. readOnly: readOnly,
  540. spec: spec,
  541. specName: spec.Name(),
  542. podUID: podRef.UID,
  543. }
  544. mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
  545. // Save volume info in pod dir
  546. dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
  547. if err := os.MkdirAll(dataDir, 0750); err != nil {
  548. klog.Error(log("failed to create data dir %s: %v", dataDir, err))
  549. return nil, err
  550. }
  551. klog.V(4).Info(log("created path successfully [%s]", dataDir))
  552. // persist volume info data for teardown
  553. node := string(p.host.GetNodeName())
  554. attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
  555. volData := map[string]string{
  556. volDataKey.specVolID: spec.Name(),
  557. volDataKey.volHandle: pvSource.VolumeHandle,
  558. volDataKey.driverName: pvSource.Driver,
  559. volDataKey.nodeName: node,
  560. volDataKey.attachmentID: attachID,
  561. }
  562. if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
  563. klog.Error(log("failed to save volume info data: %v", err))
  564. if err := os.RemoveAll(dataDir); err != nil {
  565. klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, err))
  566. return nil, err
  567. }
  568. return nil, err
  569. }
  570. return mapper, nil
  571. }
  572. func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
  573. if !p.blockEnabled {
  574. return nil, errors.New("CSIBlockVolume feature not enabled")
  575. }
  576. klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
  577. unmapper := &csiBlockMapper{
  578. plugin: p,
  579. podUID: podUID,
  580. specName: volName,
  581. }
  582. // load volume info from file
  583. dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
  584. data, err := loadVolumeData(dataDir, volDataFileName)
  585. if err != nil {
  586. klog.Error(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
  587. return nil, err
  588. }
  589. unmapper.driverName = csiDriverName(data[volDataKey.driverName])
  590. unmapper.volumeID = data[volDataKey.volHandle]
  591. unmapper.csiClientGetter.driverName = unmapper.driverName
  592. if err != nil {
  593. return nil, err
  594. }
  595. return unmapper, nil
  596. }
  597. func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
  598. if !p.blockEnabled {
  599. return nil, errors.New("CSIBlockVolume feature not enabled")
  600. }
  601. klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
  602. dataDir := getVolumeDeviceDataDir(specVolName, p.host)
  603. volData, err := loadVolumeData(dataDir, volDataFileName)
  604. if err != nil {
  605. klog.Error(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
  606. return nil, err
  607. }
  608. klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
  609. blockMode := api.PersistentVolumeBlock
  610. pv := &api.PersistentVolume{
  611. ObjectMeta: meta.ObjectMeta{
  612. Name: volData[volDataKey.specVolID],
  613. },
  614. Spec: api.PersistentVolumeSpec{
  615. PersistentVolumeSource: api.PersistentVolumeSource{
  616. CSI: &api.CSIPersistentVolumeSource{
  617. Driver: volData[volDataKey.driverName],
  618. VolumeHandle: volData[volDataKey.volHandle],
  619. },
  620. },
  621. VolumeMode: &blockMode,
  622. },
  623. }
  624. return volume.NewSpecFromPersistentVolume(pv, false), nil
  625. }
  626. // skipAttach looks up CSIDriver object associated with driver name
  627. // to determine if driver requires attachment volume operation
  628. func (p *csiPlugin) skipAttach(driver string) (bool, error) {
  629. if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  630. return false, nil
  631. }
  632. kletHost, ok := p.host.(volume.KubeletVolumeHost)
  633. if ok {
  634. kletHost.WaitForCacheSync()
  635. }
  636. if p.csiDriverLister == nil {
  637. return false, errors.New("CSIDriver lister does not exist")
  638. }
  639. csiDriver, err := p.csiDriverLister.Get(driver)
  640. if err != nil {
  641. if apierrs.IsNotFound(err) {
  642. // Don't skip attach if CSIDriver does not exist
  643. return false, nil
  644. }
  645. return false, err
  646. }
  647. if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
  648. return true, nil
  649. }
  650. return false, nil
  651. }
  652. // getDriverMode returns the driver mode for the specified spec: {persistent|ephemeral}.
  653. // 1) If mode cannot be determined, it will default to "persistent".
  654. // 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
  655. // See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md
  656. func (p *csiPlugin) getDriverMode(spec *volume.Spec) (driverMode, error) {
  657. // TODO (vladimirvivien) ultimately, mode will be retrieved from CSIDriver.Spec.Mode.
  658. // However, in alpha version, mode is determined by the volume source:
  659. // 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
  660. // 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
  661. volSrc, _, err := getSourceFromSpec(spec)
  662. if err != nil {
  663. return "", err
  664. }
  665. if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
  666. return ephemeralDriverMode, nil
  667. }
  668. return persistentDriverMode, nil
  669. }
  670. func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
  671. skip, err := p.skipAttach(driver)
  672. if err != nil {
  673. return nil, err
  674. }
  675. if skip {
  676. return nil, nil
  677. }
  678. attachID := getAttachmentName(handle, driver, nodeName)
  679. // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  680. attachment, err := client.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
  681. if err != nil {
  682. return nil, err // This err already has enough context ("VolumeAttachment xyz not found")
  683. }
  684. if attachment == nil {
  685. err = errors.New("no existing VolumeAttachment found")
  686. return nil, err
  687. }
  688. return attachment.Status.AttachmentMetadata, nil
  689. }
  690. func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
  691. k8s := p.host.GetKubeClient()
  692. if k8s == nil {
  693. klog.Error(log("unable to get kubernetes client from host"))
  694. return nil, errors.New("unable to get Kubernetes client")
  695. }
  696. return &csiAttacher{
  697. plugin: p,
  698. k8s: k8s,
  699. waitSleepTime: 1 * time.Second,
  700. }, nil
  701. }
  702. func unregisterDriver(driverName string) error {
  703. csiDrivers.Delete(driverName)
  704. if err := nim.UninstallCSIDriver(driverName); err != nil {
  705. klog.Errorf("Error uninstalling CSI driver: %v", err)
  706. return err
  707. }
  708. return nil
  709. }
  710. // Return the highest supported version
  711. func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
  712. if len(versions) == 0 {
  713. return nil, fmt.Errorf("CSI driver reporting empty array for supported versions")
  714. }
  715. // Sort by lowest to highest version
  716. sort.Slice(versions, func(i, j int) bool {
  717. parsedVersionI, err := utilversion.ParseGeneric(versions[i])
  718. if err != nil {
  719. // Push bad values to the bottom
  720. return true
  721. }
  722. parsedVersionJ, err := utilversion.ParseGeneric(versions[j])
  723. if err != nil {
  724. // Push bad values to the bottom
  725. return false
  726. }
  727. return parsedVersionI.LessThan(parsedVersionJ)
  728. })
  729. for i := len(versions) - 1; i >= 0; i-- {
  730. highestSupportedVersion, err := utilversion.ParseGeneric(versions[i])
  731. if err != nil {
  732. return nil, err
  733. }
  734. if highestSupportedVersion.Major() <= 1 {
  735. return highestSupportedVersion, nil
  736. }
  737. }
  738. return nil, fmt.Errorf("None of the CSI versions reported by this driver are supported")
  739. }
  740. // Only drivers that implement CSI 0.x are allowed to use deprecated socket dir.
  741. func isDeprecatedSocketDirAllowed(versions []string) bool {
  742. for _, version := range versions {
  743. if isV0Version(version) {
  744. return true
  745. }
  746. }
  747. return false
  748. }
  749. func isV0Version(version string) bool {
  750. parsedVersion, err := utilversion.ParseGeneric(version)
  751. if err != nil {
  752. return false
  753. }
  754. return parsedVersion.Major() == 0
  755. }