admission.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package label
  14. import (
  15. "bytes"
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "sync"
  21. v1 "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apiserver/pkg/admission"
  24. cloudprovider "k8s.io/cloud-provider"
  25. cloudvolume "k8s.io/cloud-provider/volume"
  26. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  27. "k8s.io/klog"
  28. api "k8s.io/kubernetes/pkg/apis/core"
  29. k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
  30. persistentvolume "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
  31. kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
  32. )
  33. const (
  34. // PluginName is the name of persistent volume label admission plugin
  35. PluginName = "PersistentVolumeLabel"
  36. )
  37. // Register registers a plugin
  38. func Register(plugins *admission.Plugins) {
  39. plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
  40. persistentVolumeLabelAdmission := newPersistentVolumeLabel()
  41. return persistentVolumeLabelAdmission, nil
  42. })
  43. }
  44. var _ = admission.Interface(&persistentVolumeLabel{})
  45. type persistentVolumeLabel struct {
  46. *admission.Handler
  47. mutex sync.Mutex
  48. cloudConfig []byte
  49. awsPVLabeler cloudprovider.PVLabeler
  50. gcePVLabeler cloudprovider.PVLabeler
  51. azurePVLabeler cloudprovider.PVLabeler
  52. openStackPVLabeler cloudprovider.PVLabeler
  53. vspherePVLabeler cloudprovider.PVLabeler
  54. }
  55. var _ admission.MutationInterface = &persistentVolumeLabel{}
  56. var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
  57. // newPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
  58. // based on the labels provided by the underlying cloud provider.
  59. //
  60. // As a side effect, the cloud provider may block invalid or non-existent volumes.
  61. func newPersistentVolumeLabel() *persistentVolumeLabel {
  62. // DEPRECATED: in a future release, we will use mutating admission webhooks to apply PV labels.
  63. // Once the mutating admission webhook is used for AWS, Azure, GCE, and OpenStack,
  64. // this admission controller will be removed.
  65. klog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
  66. "Please remove this controller from your configuration files and scripts.")
  67. return &persistentVolumeLabel{
  68. Handler: admission.NewHandler(admission.Create),
  69. }
  70. }
  71. func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
  72. l.cloudConfig = cloudConfig
  73. }
  74. func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
  75. for _, req := range reqs {
  76. for _, term := range terms {
  77. for _, r := range term.MatchExpressions {
  78. if r.Key == req.Key {
  79. return true
  80. }
  81. }
  82. }
  83. }
  84. return false
  85. }
  86. func (l *persistentVolumeLabel) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
  87. if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
  88. return nil
  89. }
  90. obj := a.GetObject()
  91. if obj == nil {
  92. return nil
  93. }
  94. volume, ok := obj.(*api.PersistentVolume)
  95. if !ok {
  96. return nil
  97. }
  98. volumeLabels, err := l.findVolumeLabels(volume)
  99. if err != nil {
  100. return admission.NewForbidden(a, err)
  101. }
  102. requirements := make([]api.NodeSelectorRequirement, 0)
  103. if len(volumeLabels) != 0 {
  104. if volume.Labels == nil {
  105. volume.Labels = make(map[string]string)
  106. }
  107. for k, v := range volumeLabels {
  108. // We (silently) replace labels if they are provided.
  109. // This should be OK because they are in the kubernetes.io namespace
  110. // i.e. we own them
  111. volume.Labels[k] = v
  112. // Set NodeSelectorRequirements based on the labels
  113. var values []string
  114. if k == v1.LabelZoneFailureDomain {
  115. zones, err := volumehelpers.LabelZonesToSet(v)
  116. if err != nil {
  117. return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
  118. }
  119. // zone values here are sorted for better testability.
  120. values = zones.List()
  121. } else {
  122. values = []string{v}
  123. }
  124. requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
  125. }
  126. if volume.Spec.NodeAffinity == nil {
  127. volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
  128. }
  129. if volume.Spec.NodeAffinity.Required == nil {
  130. volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
  131. }
  132. if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
  133. // Need at least one term pre-allocated whose MatchExpressions can be appended to
  134. volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
  135. }
  136. if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
  137. klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
  138. requirements, volume.Spec.NodeAffinity)
  139. } else {
  140. for _, req := range requirements {
  141. for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
  142. volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
  143. }
  144. }
  145. }
  146. }
  147. return nil
  148. }
  149. func (l *persistentVolumeLabel) findVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
  150. existingLabels := volume.Labels
  151. // All cloud providers set only these two labels.
  152. domain, domainOK := existingLabels[v1.LabelZoneFailureDomain]
  153. region, regionOK := existingLabels[v1.LabelZoneRegion]
  154. isDynamicallyProvisioned := metav1.HasAnnotation(volume.ObjectMeta, persistentvolume.AnnDynamicallyProvisioned)
  155. if isDynamicallyProvisioned && domainOK && regionOK {
  156. // PV already has all the labels and we can trust the dynamic provisioning that it provided correct values.
  157. return map[string]string{
  158. v1.LabelZoneFailureDomain: domain,
  159. v1.LabelZoneRegion: region,
  160. }, nil
  161. }
  162. // Either missing labels or we don't trust the user provided correct values.
  163. switch {
  164. case volume.Spec.AWSElasticBlockStore != nil:
  165. labels, err := l.findAWSEBSLabels(volume)
  166. if err != nil {
  167. return nil, fmt.Errorf("error querying AWS EBS volume %s: %v", volume.Spec.AWSElasticBlockStore.VolumeID, err)
  168. }
  169. return labels, nil
  170. case volume.Spec.GCEPersistentDisk != nil:
  171. labels, err := l.findGCEPDLabels(volume)
  172. if err != nil {
  173. return nil, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err)
  174. }
  175. return labels, nil
  176. case volume.Spec.AzureDisk != nil:
  177. labels, err := l.findAzureDiskLabels(volume)
  178. if err != nil {
  179. return nil, fmt.Errorf("error querying AzureDisk volume %s: %v", volume.Spec.AzureDisk.DiskName, err)
  180. }
  181. return labels, nil
  182. case volume.Spec.Cinder != nil:
  183. labels, err := l.findCinderDiskLabels(volume)
  184. if err != nil {
  185. return nil, fmt.Errorf("error querying Cinder volume %s: %v", volume.Spec.Cinder.VolumeID, err)
  186. }
  187. return labels, nil
  188. case volume.Spec.VsphereVolume != nil:
  189. labels, err := l.findVsphereVolumeLabels(volume)
  190. if err != nil {
  191. return nil, fmt.Errorf("error querying vSphere Volume %s: %v", volume.Spec.VsphereVolume.VolumePath, err)
  192. }
  193. return labels, nil
  194. }
  195. // Unrecognized volume, do not add any labels
  196. return nil, nil
  197. }
  198. func (l *persistentVolumeLabel) findAWSEBSLabels(volume *api.PersistentVolume) (map[string]string, error) {
  199. // Ignore any volumes that are being provisioned
  200. if volume.Spec.AWSElasticBlockStore.VolumeID == cloudvolume.ProvisionedVolumeName {
  201. return nil, nil
  202. }
  203. pvlabler, err := l.getAWSPVLabeler()
  204. if err != nil {
  205. return nil, err
  206. }
  207. if pvlabler == nil {
  208. return nil, fmt.Errorf("unable to build AWS cloud provider for EBS")
  209. }
  210. pv := &v1.PersistentVolume{}
  211. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  212. if err != nil {
  213. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  214. }
  215. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  216. }
  217. // getAWSPVLabeler returns the AWS implementation of PVLabeler
  218. func (l *persistentVolumeLabel) getAWSPVLabeler() (cloudprovider.PVLabeler, error) {
  219. l.mutex.Lock()
  220. defer l.mutex.Unlock()
  221. if l.awsPVLabeler == nil {
  222. var cloudConfigReader io.Reader
  223. if len(l.cloudConfig) > 0 {
  224. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  225. }
  226. cloudProvider, err := cloudprovider.GetCloudProvider("aws", cloudConfigReader)
  227. if err != nil || cloudProvider == nil {
  228. return nil, err
  229. }
  230. awsPVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  231. if !ok {
  232. return nil, errors.New("AWS cloud provider does not implement PV labeling")
  233. }
  234. l.awsPVLabeler = awsPVLabeler
  235. }
  236. return l.awsPVLabeler, nil
  237. }
  238. func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
  239. // Ignore any volumes that are being provisioned
  240. if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
  241. return nil, nil
  242. }
  243. pvlabler, err := l.getGCEPVLabeler()
  244. if err != nil {
  245. return nil, err
  246. }
  247. if pvlabler == nil {
  248. return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
  249. }
  250. pv := &v1.PersistentVolume{}
  251. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  252. if err != nil {
  253. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  254. }
  255. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  256. }
  257. // getGCEPVLabeler returns the GCE implementation of PVLabeler
  258. func (l *persistentVolumeLabel) getGCEPVLabeler() (cloudprovider.PVLabeler, error) {
  259. l.mutex.Lock()
  260. defer l.mutex.Unlock()
  261. if l.gcePVLabeler == nil {
  262. var cloudConfigReader io.Reader
  263. if len(l.cloudConfig) > 0 {
  264. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  265. }
  266. cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
  267. if err != nil || cloudProvider == nil {
  268. return nil, err
  269. }
  270. gcePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  271. if !ok {
  272. return nil, errors.New("GCE cloud provider does not implement PV labeling")
  273. }
  274. l.gcePVLabeler = gcePVLabeler
  275. }
  276. return l.gcePVLabeler, nil
  277. }
  278. // getAzurePVLabeler returns the Azure implementation of PVLabeler
  279. func (l *persistentVolumeLabel) getAzurePVLabeler() (cloudprovider.PVLabeler, error) {
  280. l.mutex.Lock()
  281. defer l.mutex.Unlock()
  282. if l.azurePVLabeler == nil {
  283. var cloudConfigReader io.Reader
  284. if len(l.cloudConfig) > 0 {
  285. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  286. }
  287. cloudProvider, err := cloudprovider.GetCloudProvider("azure", cloudConfigReader)
  288. if err != nil || cloudProvider == nil {
  289. return nil, err
  290. }
  291. azurePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  292. if !ok {
  293. return nil, errors.New("Azure cloud provider does not implement PV labeling")
  294. }
  295. l.azurePVLabeler = azurePVLabeler
  296. }
  297. return l.azurePVLabeler, nil
  298. }
  299. func (l *persistentVolumeLabel) findAzureDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
  300. // Ignore any volumes that are being provisioned
  301. if volume.Spec.AzureDisk.DiskName == cloudvolume.ProvisionedVolumeName {
  302. return nil, nil
  303. }
  304. pvlabler, err := l.getAzurePVLabeler()
  305. if err != nil {
  306. return nil, err
  307. }
  308. if pvlabler == nil {
  309. return nil, fmt.Errorf("unable to build Azure cloud provider for AzureDisk")
  310. }
  311. pv := &v1.PersistentVolume{}
  312. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  313. if err != nil {
  314. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  315. }
  316. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  317. }
  318. func (l *persistentVolumeLabel) getOpenStackPVLabeler() (cloudprovider.PVLabeler, error) {
  319. l.mutex.Lock()
  320. defer l.mutex.Unlock()
  321. if l.openStackPVLabeler == nil {
  322. var cloudConfigReader io.Reader
  323. if len(l.cloudConfig) > 0 {
  324. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  325. }
  326. cloudProvider, err := cloudprovider.GetCloudProvider("openstack", cloudConfigReader)
  327. if err != nil || cloudProvider == nil {
  328. return nil, err
  329. }
  330. openStackPVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  331. if !ok {
  332. return nil, errors.New("OpenStack cloud provider does not implement PV labeling")
  333. }
  334. l.openStackPVLabeler = openStackPVLabeler
  335. }
  336. return l.openStackPVLabeler, nil
  337. }
  338. func (l *persistentVolumeLabel) findCinderDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
  339. // Ignore any volumes that are being provisioned
  340. if volume.Spec.Cinder.VolumeID == cloudvolume.ProvisionedVolumeName {
  341. return nil, nil
  342. }
  343. pvlabler, err := l.getOpenStackPVLabeler()
  344. if err != nil {
  345. return nil, err
  346. }
  347. if pvlabler == nil {
  348. return nil, fmt.Errorf("unable to build OpenStack cloud provider for Cinder disk")
  349. }
  350. pv := &v1.PersistentVolume{}
  351. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  352. if err != nil {
  353. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  354. }
  355. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  356. }
  357. func (l *persistentVolumeLabel) findVsphereVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
  358. pvlabler, err := l.getVspherePVLabeler()
  359. if err != nil {
  360. return nil, err
  361. }
  362. if pvlabler == nil {
  363. return nil, fmt.Errorf("unable to build vSphere cloud provider")
  364. }
  365. pv := &v1.PersistentVolume{}
  366. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  367. if err != nil {
  368. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  369. }
  370. labels, err := pvlabler.GetLabelsForVolume(context.TODO(), pv)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return labels, nil
  375. }
  376. func (l *persistentVolumeLabel) getVspherePVLabeler() (cloudprovider.PVLabeler, error) {
  377. l.mutex.Lock()
  378. defer l.mutex.Unlock()
  379. if l.vspherePVLabeler == nil {
  380. var cloudConfigReader io.Reader
  381. if len(l.cloudConfig) > 0 {
  382. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  383. }
  384. cloudProvider, err := cloudprovider.GetCloudProvider("vsphere", cloudConfigReader)
  385. if err != nil || cloudProvider == nil {
  386. return nil, err
  387. }
  388. vspherePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  389. if !ok {
  390. // GetCloudProvider failed
  391. return nil, errors.New("vSphere Cloud Provider does not implement PV labeling")
  392. }
  393. l.vspherePVLabeler = vspherePVLabeler
  394. }
  395. return l.vspherePVLabeler, nil
  396. }