gce_pd.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. // +build !providerless
  2. /*
  3. Copyright 2014 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package gcepd
  15. import (
  16. "context"
  17. "fmt"
  18. "os"
  19. "path/filepath"
  20. "runtime"
  21. "strconv"
  22. "strings"
  23. "k8s.io/klog"
  24. "k8s.io/utils/mount"
  25. utilstrings "k8s.io/utils/strings"
  26. v1 "k8s.io/api/core/v1"
  27. "k8s.io/apimachinery/pkg/api/resource"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/types"
  30. utilfeature "k8s.io/apiserver/pkg/util/feature"
  31. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  32. "k8s.io/kubernetes/pkg/features"
  33. "k8s.io/kubernetes/pkg/volume"
  34. "k8s.io/kubernetes/pkg/volume/util"
  35. gcecloud "k8s.io/legacy-cloud-providers/gce"
  36. )
  37. // ProbeVolumePlugins is the primary entrypoint for volume plugins.
  38. func ProbeVolumePlugins() []volume.VolumePlugin {
  39. return []volume.VolumePlugin{&gcePersistentDiskPlugin{nil}}
  40. }
  41. type gcePersistentDiskPlugin struct {
  42. host volume.VolumeHost
  43. }
  44. var _ volume.VolumePlugin = &gcePersistentDiskPlugin{}
  45. var _ volume.PersistentVolumePlugin = &gcePersistentDiskPlugin{}
  46. var _ volume.DeletableVolumePlugin = &gcePersistentDiskPlugin{}
  47. var _ volume.ProvisionableVolumePlugin = &gcePersistentDiskPlugin{}
  48. var _ volume.ExpandableVolumePlugin = &gcePersistentDiskPlugin{}
  49. var _ volume.VolumePluginWithAttachLimits = &gcePersistentDiskPlugin{}
  50. const (
  51. gcePersistentDiskPluginName = "kubernetes.io/gce-pd"
  52. )
  53. // The constants are used to map from the machine type (number of CPUs) to the limit of
  54. // persistent disks that can be attached to an instance. Please refer to gcloud doc
  55. // https://cloud.google.com/compute/docs/disks/#increased_persistent_disk_limits
  56. // These constants are all the documented attach limit minus one because the
  57. // node boot disk is considered an attachable disk so effective attach limit is
  58. // one less.
  59. const (
  60. volumeLimitSmall = 15
  61. VolumeLimitBig = 127
  62. )
  63. func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
  64. return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(gcePersistentDiskPluginName), volName)
  65. }
  66. func (plugin *gcePersistentDiskPlugin) Init(host volume.VolumeHost) error {
  67. plugin.host = host
  68. return nil
  69. }
  70. func (plugin *gcePersistentDiskPlugin) GetPluginName() string {
  71. return gcePersistentDiskPluginName
  72. }
  73. func (plugin *gcePersistentDiskPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
  74. volumeSource, _, err := getVolumeSource(spec)
  75. if err != nil {
  76. return "", err
  77. }
  78. return volumeSource.PDName, nil
  79. }
  80. func (plugin *gcePersistentDiskPlugin) CanSupport(spec *volume.Spec) bool {
  81. return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.GCEPersistentDisk != nil) ||
  82. (spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil)
  83. }
  84. func (plugin *gcePersistentDiskPlugin) RequiresRemount() bool {
  85. return false
  86. }
  87. func (plugin *gcePersistentDiskPlugin) SupportsMountOption() bool {
  88. return true
  89. }
  90. func (plugin *gcePersistentDiskPlugin) SupportsBulkVolumeVerification() bool {
  91. return true
  92. }
  93. func (plugin *gcePersistentDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
  94. return []v1.PersistentVolumeAccessMode{
  95. v1.ReadWriteOnce,
  96. v1.ReadOnlyMany,
  97. }
  98. }
  99. func (plugin *gcePersistentDiskPlugin) GetVolumeLimits() (map[string]int64, error) {
  100. volumeLimits := map[string]int64{
  101. util.GCEVolumeLimitKey: volumeLimitSmall,
  102. }
  103. cloud := plugin.host.GetCloudProvider()
  104. // if we can't fetch cloudprovider we return an error
  105. // hoping external CCM or admin can set it. Returning
  106. // default values from here will mean, no one can
  107. // override them.
  108. if cloud == nil {
  109. return nil, fmt.Errorf("No cloudprovider present")
  110. }
  111. if cloud.ProviderName() != gcecloud.ProviderName {
  112. return nil, fmt.Errorf("Expected gce cloud got %s", cloud.ProviderName())
  113. }
  114. instances, ok := cloud.Instances()
  115. if !ok {
  116. klog.Warning("Failed to get instances from cloud provider")
  117. return volumeLimits, nil
  118. }
  119. instanceType, err := instances.InstanceType(context.TODO(), plugin.host.GetNodeName())
  120. if err != nil {
  121. klog.Errorf("Failed to get instance type from GCE cloud provider")
  122. return volumeLimits, nil
  123. }
  124. if strings.HasPrefix(instanceType, "n1-") || strings.HasPrefix(instanceType, "custom-") {
  125. volumeLimits[util.GCEVolumeLimitKey] = VolumeLimitBig
  126. } else {
  127. volumeLimits[util.GCEVolumeLimitKey] = volumeLimitSmall
  128. }
  129. return volumeLimits, nil
  130. }
  131. func (plugin *gcePersistentDiskPlugin) VolumeLimitKey(spec *volume.Spec) string {
  132. return util.GCEVolumeLimitKey
  133. }
  134. func (plugin *gcePersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
  135. // Inject real implementations here, test through the internal function.
  136. return plugin.newMounterInternal(spec, pod.UID, &GCEDiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
  137. }
  138. func getVolumeSource(
  139. spec *volume.Spec) (*v1.GCEPersistentDiskVolumeSource, bool, error) {
  140. if spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil {
  141. return spec.Volume.GCEPersistentDisk, spec.Volume.GCEPersistentDisk.ReadOnly, nil
  142. } else if spec.PersistentVolume != nil &&
  143. spec.PersistentVolume.Spec.GCEPersistentDisk != nil {
  144. return spec.PersistentVolume.Spec.GCEPersistentDisk, spec.ReadOnly, nil
  145. }
  146. return nil, false, fmt.Errorf("Spec does not reference a GCE volume type")
  147. }
  148. func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) {
  149. // GCEPDs used directly in a pod have a ReadOnly flag set by the pod author.
  150. // GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV
  151. volumeSource, readOnly, err := getVolumeSource(spec)
  152. if err != nil {
  153. return nil, err
  154. }
  155. pdName := volumeSource.PDName
  156. partition := ""
  157. if volumeSource.Partition != 0 {
  158. partition = strconv.Itoa(int(volumeSource.Partition))
  159. }
  160. return &gcePersistentDiskMounter{
  161. gcePersistentDisk: &gcePersistentDisk{
  162. podUID: podUID,
  163. volName: spec.Name(),
  164. pdName: pdName,
  165. partition: partition,
  166. mounter: mounter,
  167. manager: manager,
  168. plugin: plugin,
  169. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
  170. },
  171. mountOptions: util.MountOptionFromSpec(spec),
  172. readOnly: readOnly}, nil
  173. }
  174. func (plugin *gcePersistentDiskPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
  175. // Inject real implementations here, test through the internal function.
  176. return plugin.newUnmounterInternal(volName, podUID, &GCEDiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
  177. }
  178. func (plugin *gcePersistentDiskPlugin) newUnmounterInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Unmounter, error) {
  179. return &gcePersistentDiskUnmounter{&gcePersistentDisk{
  180. podUID: podUID,
  181. volName: volName,
  182. manager: manager,
  183. mounter: mounter,
  184. plugin: plugin,
  185. MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
  186. }}, nil
  187. }
  188. func (plugin *gcePersistentDiskPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
  189. return plugin.newDeleterInternal(spec, &GCEDiskUtil{})
  190. }
  191. func (plugin *gcePersistentDiskPlugin) newDeleterInternal(spec *volume.Spec, manager pdManager) (volume.Deleter, error) {
  192. if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.GCEPersistentDisk == nil {
  193. return nil, fmt.Errorf("spec.PersistentVolumeSource.GCEPersistentDisk is nil")
  194. }
  195. return &gcePersistentDiskDeleter{
  196. gcePersistentDisk: &gcePersistentDisk{
  197. volName: spec.Name(),
  198. pdName: spec.PersistentVolume.Spec.GCEPersistentDisk.PDName,
  199. manager: manager,
  200. plugin: plugin,
  201. }}, nil
  202. }
  203. func (plugin *gcePersistentDiskPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
  204. return plugin.newProvisionerInternal(options, &GCEDiskUtil{})
  205. }
  206. func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.VolumeOptions, manager pdManager) (volume.Provisioner, error) {
  207. return &gcePersistentDiskProvisioner{
  208. gcePersistentDisk: &gcePersistentDisk{
  209. manager: manager,
  210. plugin: plugin,
  211. },
  212. options: options,
  213. }, nil
  214. }
  215. func (plugin *gcePersistentDiskPlugin) RequiresFSResize() bool {
  216. return true
  217. }
  218. func (plugin *gcePersistentDiskPlugin) ExpandVolumeDevice(
  219. spec *volume.Spec,
  220. newSize resource.Quantity,
  221. oldSize resource.Quantity) (resource.Quantity, error) {
  222. cloud, err := getCloudProvider(plugin.host.GetCloudProvider())
  223. if err != nil {
  224. return oldSize, err
  225. }
  226. pdName := spec.PersistentVolume.Spec.GCEPersistentDisk.PDName
  227. updatedQuantity, err := cloud.ResizeDisk(pdName, oldSize, newSize)
  228. if err != nil {
  229. return oldSize, err
  230. }
  231. return updatedQuantity, nil
  232. }
  233. func (plugin *gcePersistentDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
  234. fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
  235. if err != nil {
  236. return false, fmt.Errorf("error checking VolumeMode: %v", err)
  237. }
  238. // if volume is not a fs file system, there is nothing for us to do here.
  239. if !fsVolume {
  240. return true, nil
  241. }
  242. _, err = util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
  243. if err != nil {
  244. return false, err
  245. }
  246. return true, nil
  247. }
  248. var _ volume.NodeExpandableVolumePlugin = &gcePersistentDiskPlugin{}
  249. func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
  250. mounter := plugin.host.GetMounter(plugin.GetPluginName())
  251. kvh, ok := plugin.host.(volume.KubeletVolumeHost)
  252. if !ok {
  253. return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
  254. }
  255. hu := kvh.GetHostUtil()
  256. pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
  257. sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
  258. if err != nil {
  259. return nil, err
  260. }
  261. gceVolume := &v1.Volume{
  262. Name: volumeName,
  263. VolumeSource: v1.VolumeSource{
  264. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  265. PDName: sourceName,
  266. },
  267. },
  268. }
  269. return volume.NewSpecFromVolume(gceVolume), nil
  270. }
  271. // Abstract interface to PD operations.
  272. type pdManager interface {
  273. // Creates a volume
  274. CreateVolume(provisioner *gcePersistentDiskProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, labels map[string]string, fstype string, err error)
  275. // Deletes a volume
  276. DeleteVolume(deleter *gcePersistentDiskDeleter) error
  277. }
  278. // gcePersistentDisk volumes are disk resources provided by Google Compute Engine
  279. // that are attached to the kubelet's host machine and exposed to the pod.
  280. type gcePersistentDisk struct {
  281. volName string
  282. podUID types.UID
  283. // Unique identifier of the PD, used to find the disk resource in the provider.
  284. pdName string
  285. // Specifies the partition to mount
  286. partition string
  287. // Utility interface to provision and delete disks
  288. manager pdManager
  289. // Mounter interface that provides system calls to mount the global path to the pod local path.
  290. mounter mount.Interface
  291. plugin *gcePersistentDiskPlugin
  292. volume.MetricsProvider
  293. }
  294. type gcePersistentDiskMounter struct {
  295. *gcePersistentDisk
  296. // Specifies whether the disk will be mounted as read-only.
  297. readOnly bool
  298. mountOptions []string
  299. }
  300. var _ volume.Mounter = &gcePersistentDiskMounter{}
  301. func (b *gcePersistentDiskMounter) GetAttributes() volume.Attributes {
  302. return volume.Attributes{
  303. ReadOnly: b.readOnly,
  304. Managed: !b.readOnly,
  305. SupportsSELinux: true,
  306. }
  307. }
  308. // Checks prior to mount operations to verify that the required components (binaries, etc.)
  309. // to mount the volume are available on the underlying node.
  310. // If not, it returns an error
  311. func (b *gcePersistentDiskMounter) CanMount() error {
  312. return nil
  313. }
  314. // SetUp bind mounts the disk global mount to the volume path.
  315. func (b *gcePersistentDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
  316. return b.SetUpAt(b.GetPath(), mounterArgs)
  317. }
  318. // SetUp bind mounts the disk global mount to the give volume path.
  319. func (b *gcePersistentDiskMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
  320. // TODO: handle failed mounts here.
  321. notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
  322. klog.V(4).Infof("GCE PersistentDisk set up: Dir (%s) PD name (%q) Mounted (%t) Error (%v), ReadOnly (%t)", dir, b.pdName, !notMnt, err, b.readOnly)
  323. if err != nil && !os.IsNotExist(err) {
  324. return fmt.Errorf("cannot validate mount point: %s %v", dir, err)
  325. }
  326. if !notMnt {
  327. return nil
  328. }
  329. if runtime.GOOS != "windows" {
  330. // in windows, we will use mklink to mount, will MkdirAll in Mount func
  331. if err := os.MkdirAll(dir, 0750); err != nil {
  332. return fmt.Errorf("mkdir failed on disk %s (%v)", dir, err)
  333. }
  334. }
  335. // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
  336. options := []string{"bind"}
  337. if b.readOnly {
  338. options = append(options, "ro")
  339. }
  340. globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
  341. klog.V(4).Infof("attempting to mount %s", dir)
  342. mountOptions := util.JoinMountOptions(b.mountOptions, options)
  343. err = b.mounter.Mount(globalPDPath, dir, "", mountOptions)
  344. if err != nil {
  345. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  346. if mntErr != nil {
  347. return fmt.Errorf("failed to mount: %v. Cleanup IsLikelyNotMountPoint check failed: %v", err, mntErr)
  348. }
  349. if !notMnt {
  350. if mntErr = b.mounter.Unmount(dir); mntErr != nil {
  351. return fmt.Errorf("failed to mount: %v. Cleanup failed to unmount: %v", err, mntErr)
  352. }
  353. notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
  354. if mntErr != nil {
  355. return fmt.Errorf("failed to mount: %v. Cleanup IsLikelyNotMountPoint check failed: %v", err, mntErr)
  356. }
  357. if !notMnt {
  358. // This is very odd, we don't expect it. We'll try again next sync loop.
  359. return fmt.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop", dir)
  360. }
  361. }
  362. mntErr = os.Remove(dir)
  363. if mntErr != nil {
  364. return fmt.Errorf("failed to mount: %v. Cleanup os Remove(%s) failed: %v", err, dir, mntErr)
  365. }
  366. return fmt.Errorf("mount of disk %s failed: %v", dir, err)
  367. }
  368. if !b.readOnly {
  369. volume.SetVolumeOwnership(b, mounterArgs.FsGroup)
  370. }
  371. return nil
  372. }
  373. func makeGlobalPDName(host volume.VolumeHost, devName string) string {
  374. return filepath.Join(host.GetPluginDir(gcePersistentDiskPluginName), util.MountsInGlobalPDPath, devName)
  375. }
  376. func (b *gcePersistentDiskMounter) GetPath() string {
  377. return getPath(b.podUID, b.volName, b.plugin.host)
  378. }
  379. type gcePersistentDiskUnmounter struct {
  380. *gcePersistentDisk
  381. }
  382. var _ volume.Unmounter = &gcePersistentDiskUnmounter{}
  383. func (c *gcePersistentDiskUnmounter) GetPath() string {
  384. return getPath(c.podUID, c.volName, c.plugin.host)
  385. }
  386. // Unmounts the bind mount, and detaches the disk only if the PD
  387. // resource was the last reference to that disk on the kubelet.
  388. func (c *gcePersistentDiskUnmounter) TearDown() error {
  389. return c.TearDownAt(c.GetPath())
  390. }
  391. // TearDownAt unmounts the bind mount
  392. func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error {
  393. return mount.CleanupMountPoint(dir, c.mounter, false)
  394. }
  395. type gcePersistentDiskDeleter struct {
  396. *gcePersistentDisk
  397. }
  398. var _ volume.Deleter = &gcePersistentDiskDeleter{}
  399. func (d *gcePersistentDiskDeleter) GetPath() string {
  400. return getPath(d.podUID, d.volName, d.plugin.host)
  401. }
  402. func (d *gcePersistentDiskDeleter) Delete() error {
  403. return d.manager.DeleteVolume(d)
  404. }
  405. type gcePersistentDiskProvisioner struct {
  406. *gcePersistentDisk
  407. options volume.VolumeOptions
  408. }
  409. var _ volume.Provisioner = &gcePersistentDiskProvisioner{}
  410. func (c *gcePersistentDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
  411. if !util.AccessModesContainedInAll(c.plugin.GetAccessModes(), c.options.PVC.Spec.AccessModes) {
  412. return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
  413. }
  414. volumeID, sizeGB, labels, fstype, err := c.manager.CreateVolume(c, selectedNode, allowedTopologies)
  415. if err != nil {
  416. return nil, err
  417. }
  418. if fstype == "" {
  419. fstype = "ext4"
  420. }
  421. var volumeMode *v1.PersistentVolumeMode
  422. if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
  423. volumeMode = c.options.PVC.Spec.VolumeMode
  424. if volumeMode != nil && *volumeMode == v1.PersistentVolumeBlock {
  425. // Block volumes should not have any FSType
  426. fstype = ""
  427. }
  428. }
  429. pv := &v1.PersistentVolume{
  430. ObjectMeta: metav1.ObjectMeta{
  431. Name: c.options.PVName,
  432. Labels: map[string]string{},
  433. Annotations: map[string]string{
  434. util.VolumeDynamicallyCreatedByKey: "gce-pd-dynamic-provisioner",
  435. },
  436. },
  437. Spec: v1.PersistentVolumeSpec{
  438. PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
  439. AccessModes: c.options.PVC.Spec.AccessModes,
  440. Capacity: v1.ResourceList{
  441. v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
  442. },
  443. VolumeMode: volumeMode,
  444. PersistentVolumeSource: v1.PersistentVolumeSource{
  445. GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  446. PDName: volumeID,
  447. Partition: 0,
  448. ReadOnly: false,
  449. FSType: fstype,
  450. },
  451. },
  452. MountOptions: c.options.MountOptions,
  453. },
  454. }
  455. if len(c.options.PVC.Spec.AccessModes) == 0 {
  456. pv.Spec.AccessModes = c.plugin.GetAccessModes()
  457. }
  458. requirements := make([]v1.NodeSelectorRequirement, 0)
  459. if len(labels) != 0 {
  460. if pv.Labels == nil {
  461. pv.Labels = make(map[string]string)
  462. }
  463. for k, v := range labels {
  464. pv.Labels[k] = v
  465. var values []string
  466. if k == v1.LabelZoneFailureDomain {
  467. values, err = volumehelpers.LabelZonesToList(v)
  468. if err != nil {
  469. return nil, fmt.Errorf("failed to convert label string for Zone: %s to a List: %v", v, err)
  470. }
  471. } else {
  472. values = []string{v}
  473. }
  474. requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
  475. }
  476. }
  477. if len(requirements) > 0 {
  478. pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
  479. pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
  480. pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
  481. pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements
  482. }
  483. return pv, nil
  484. }