admission.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package label
  14. import (
  15. "bytes"
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "sync"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/apiserver/pkg/admission"
  23. cloudprovider "k8s.io/cloud-provider"
  24. cloudvolume "k8s.io/cloud-provider/volume"
  25. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  26. "k8s.io/klog"
  27. api "k8s.io/kubernetes/pkg/apis/core"
  28. k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
  29. kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
  30. )
  31. const (
  32. // PluginName is the name of persistent volume label admission plugin
  33. PluginName = "PersistentVolumeLabel"
  34. )
  35. // Register registers a plugin
  36. func Register(plugins *admission.Plugins) {
  37. plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
  38. persistentVolumeLabelAdmission := newPersistentVolumeLabel()
  39. return persistentVolumeLabelAdmission, nil
  40. })
  41. }
  42. var _ = admission.Interface(&persistentVolumeLabel{})
  43. type persistentVolumeLabel struct {
  44. *admission.Handler
  45. mutex sync.Mutex
  46. cloudConfig []byte
  47. awsPVLabeler cloudprovider.PVLabeler
  48. gcePVLabeler cloudprovider.PVLabeler
  49. azurePVLabeler cloudprovider.PVLabeler
  50. openStackPVLabeler cloudprovider.PVLabeler
  51. vspherePVLabeler cloudprovider.PVLabeler
  52. }
  53. var _ admission.MutationInterface = &persistentVolumeLabel{}
  54. var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
  55. // newPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
  56. // based on the labels provided by the underlying cloud provider.
  57. //
  58. // As a side effect, the cloud provider may block invalid or non-existent volumes.
  59. func newPersistentVolumeLabel() *persistentVolumeLabel {
  60. // DEPRECATED: in a future release, we will use mutating admission webhooks to apply PV labels.
  61. // Once the mutating admission webhook is used for AWS, Azure, GCE, and OpenStack,
  62. // this admission controller will be removed.
  63. klog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
  64. "Please remove this controller from your configuration files and scripts.")
  65. return &persistentVolumeLabel{
  66. Handler: admission.NewHandler(admission.Create),
  67. }
  68. }
  69. func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
  70. l.cloudConfig = cloudConfig
  71. }
  72. func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
  73. for _, req := range reqs {
  74. for _, term := range terms {
  75. for _, r := range term.MatchExpressions {
  76. if r.Key == req.Key {
  77. return true
  78. }
  79. }
  80. }
  81. }
  82. return false
  83. }
  84. func (l *persistentVolumeLabel) Admit(a admission.Attributes, o admission.ObjectInterfaces) (err error) {
  85. if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
  86. return nil
  87. }
  88. obj := a.GetObject()
  89. if obj == nil {
  90. return nil
  91. }
  92. volume, ok := obj.(*api.PersistentVolume)
  93. if !ok {
  94. return nil
  95. }
  96. var volumeLabels map[string]string
  97. if volume.Spec.AWSElasticBlockStore != nil {
  98. labels, err := l.findAWSEBSLabels(volume)
  99. if err != nil {
  100. return admission.NewForbidden(a, fmt.Errorf("error querying AWS EBS volume %s: %v", volume.Spec.AWSElasticBlockStore.VolumeID, err))
  101. }
  102. volumeLabels = labels
  103. }
  104. if volume.Spec.GCEPersistentDisk != nil {
  105. labels, err := l.findGCEPDLabels(volume)
  106. if err != nil {
  107. return admission.NewForbidden(a, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err))
  108. }
  109. volumeLabels = labels
  110. }
  111. if volume.Spec.AzureDisk != nil {
  112. labels, err := l.findAzureDiskLabels(volume)
  113. if err != nil {
  114. return admission.NewForbidden(a, fmt.Errorf("error querying AzureDisk volume %s: %v", volume.Spec.AzureDisk.DiskName, err))
  115. }
  116. volumeLabels = labels
  117. }
  118. if volume.Spec.Cinder != nil {
  119. labels, err := l.findCinderDiskLabels(volume)
  120. if err != nil {
  121. return admission.NewForbidden(a, fmt.Errorf("error querying Cinder volume %s: %v", volume.Spec.Cinder.VolumeID, err))
  122. }
  123. volumeLabels = labels
  124. }
  125. if volume.Spec.VsphereVolume != nil {
  126. labels, err := l.findVsphereVolumeLabels(volume)
  127. if err != nil {
  128. return admission.NewForbidden(a, fmt.Errorf("error querying vSphere Volume %s: %v", volume.Spec.VsphereVolume.VolumePath, err))
  129. }
  130. volumeLabels = labels
  131. }
  132. requirements := make([]api.NodeSelectorRequirement, 0)
  133. if len(volumeLabels) != 0 {
  134. if volume.Labels == nil {
  135. volume.Labels = make(map[string]string)
  136. }
  137. for k, v := range volumeLabels {
  138. // We (silently) replace labels if they are provided.
  139. // This should be OK because they are in the kubernetes.io namespace
  140. // i.e. we own them
  141. volume.Labels[k] = v
  142. // Set NodeSelectorRequirements based on the labels
  143. var values []string
  144. if k == v1.LabelZoneFailureDomain {
  145. zones, err := volumehelpers.LabelZonesToSet(v)
  146. if err != nil {
  147. return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
  148. }
  149. // zone values here are sorted for better testability.
  150. values = zones.List()
  151. } else {
  152. values = []string{v}
  153. }
  154. requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
  155. }
  156. if volume.Spec.NodeAffinity == nil {
  157. volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
  158. }
  159. if volume.Spec.NodeAffinity.Required == nil {
  160. volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
  161. }
  162. if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
  163. // Need at least one term pre-allocated whose MatchExpressions can be appended to
  164. volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
  165. }
  166. if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
  167. klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
  168. requirements, volume.Spec.NodeAffinity)
  169. } else {
  170. for _, req := range requirements {
  171. for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
  172. volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
  173. }
  174. }
  175. }
  176. }
  177. return nil
  178. }
  179. func (l *persistentVolumeLabel) findAWSEBSLabels(volume *api.PersistentVolume) (map[string]string, error) {
  180. // Ignore any volumes that are being provisioned
  181. if volume.Spec.AWSElasticBlockStore.VolumeID == cloudvolume.ProvisionedVolumeName {
  182. return nil, nil
  183. }
  184. pvlabler, err := l.getAWSPVLabeler()
  185. if err != nil {
  186. return nil, err
  187. }
  188. if pvlabler == nil {
  189. return nil, fmt.Errorf("unable to build AWS cloud provider for EBS")
  190. }
  191. pv := &v1.PersistentVolume{}
  192. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  193. if err != nil {
  194. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  195. }
  196. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  197. }
  198. // getAWSPVLabeler returns the AWS implementation of PVLabeler
  199. func (l *persistentVolumeLabel) getAWSPVLabeler() (cloudprovider.PVLabeler, error) {
  200. l.mutex.Lock()
  201. defer l.mutex.Unlock()
  202. if l.awsPVLabeler == nil {
  203. var cloudConfigReader io.Reader
  204. if len(l.cloudConfig) > 0 {
  205. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  206. }
  207. cloudProvider, err := cloudprovider.GetCloudProvider("aws", cloudConfigReader)
  208. if err != nil || cloudProvider == nil {
  209. return nil, err
  210. }
  211. awsPVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  212. if !ok {
  213. return nil, errors.New("AWS cloud provider does not implement PV labeling")
  214. }
  215. l.awsPVLabeler = awsPVLabeler
  216. }
  217. return l.awsPVLabeler, nil
  218. }
  219. func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
  220. // Ignore any volumes that are being provisioned
  221. if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
  222. return nil, nil
  223. }
  224. pvlabler, err := l.getGCEPVLabeler()
  225. if err != nil {
  226. return nil, err
  227. }
  228. if pvlabler == nil {
  229. return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
  230. }
  231. pv := &v1.PersistentVolume{}
  232. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  233. if err != nil {
  234. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  235. }
  236. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  237. }
  238. // getGCEPVLabeler returns the GCE implementation of PVLabeler
  239. func (l *persistentVolumeLabel) getGCEPVLabeler() (cloudprovider.PVLabeler, error) {
  240. l.mutex.Lock()
  241. defer l.mutex.Unlock()
  242. if l.gcePVLabeler == nil {
  243. var cloudConfigReader io.Reader
  244. if len(l.cloudConfig) > 0 {
  245. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  246. }
  247. cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
  248. if err != nil || cloudProvider == nil {
  249. return nil, err
  250. }
  251. gcePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  252. if !ok {
  253. return nil, errors.New("GCE cloud provider does not implement PV labeling")
  254. }
  255. l.gcePVLabeler = gcePVLabeler
  256. }
  257. return l.gcePVLabeler, nil
  258. }
  259. // getAzurePVLabeler returns the Azure implementation of PVLabeler
  260. func (l *persistentVolumeLabel) getAzurePVLabeler() (cloudprovider.PVLabeler, error) {
  261. l.mutex.Lock()
  262. defer l.mutex.Unlock()
  263. if l.azurePVLabeler == nil {
  264. var cloudConfigReader io.Reader
  265. if len(l.cloudConfig) > 0 {
  266. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  267. }
  268. cloudProvider, err := cloudprovider.GetCloudProvider("azure", cloudConfigReader)
  269. if err != nil || cloudProvider == nil {
  270. return nil, err
  271. }
  272. azurePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  273. if !ok {
  274. return nil, errors.New("Azure cloud provider does not implement PV labeling")
  275. }
  276. l.azurePVLabeler = azurePVLabeler
  277. }
  278. return l.azurePVLabeler, nil
  279. }
  280. func (l *persistentVolumeLabel) findAzureDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
  281. // Ignore any volumes that are being provisioned
  282. if volume.Spec.AzureDisk.DiskName == cloudvolume.ProvisionedVolumeName {
  283. return nil, nil
  284. }
  285. pvlabler, err := l.getAzurePVLabeler()
  286. if err != nil {
  287. return nil, err
  288. }
  289. if pvlabler == nil {
  290. return nil, fmt.Errorf("unable to build Azure cloud provider for AzureDisk")
  291. }
  292. pv := &v1.PersistentVolume{}
  293. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  294. if err != nil {
  295. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  296. }
  297. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  298. }
  299. func (l *persistentVolumeLabel) getOpenStackPVLabeler() (cloudprovider.PVLabeler, error) {
  300. l.mutex.Lock()
  301. defer l.mutex.Unlock()
  302. if l.openStackPVLabeler == nil {
  303. var cloudConfigReader io.Reader
  304. if len(l.cloudConfig) > 0 {
  305. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  306. }
  307. cloudProvider, err := cloudprovider.GetCloudProvider("openstack", cloudConfigReader)
  308. if err != nil || cloudProvider == nil {
  309. return nil, err
  310. }
  311. openStackPVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  312. if !ok {
  313. return nil, errors.New("OpenStack cloud provider does not implement PV labeling")
  314. }
  315. l.openStackPVLabeler = openStackPVLabeler
  316. }
  317. return l.openStackPVLabeler, nil
  318. }
  319. func (l *persistentVolumeLabel) findCinderDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
  320. // Ignore any volumes that are being provisioned
  321. if volume.Spec.Cinder.VolumeID == cloudvolume.ProvisionedVolumeName {
  322. return nil, nil
  323. }
  324. pvlabler, err := l.getOpenStackPVLabeler()
  325. if err != nil {
  326. return nil, err
  327. }
  328. if pvlabler == nil {
  329. return nil, fmt.Errorf("unable to build OpenStack cloud provider for Cinder disk")
  330. }
  331. pv := &v1.PersistentVolume{}
  332. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  333. if err != nil {
  334. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  335. }
  336. return pvlabler.GetLabelsForVolume(context.TODO(), pv)
  337. }
  338. func (l *persistentVolumeLabel) findVsphereVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
  339. pvlabler, err := l.getVspherePVLabeler()
  340. if err != nil {
  341. return nil, err
  342. }
  343. if pvlabler == nil {
  344. return nil, fmt.Errorf("unable to build vSphere cloud provider")
  345. }
  346. pv := &v1.PersistentVolume{}
  347. err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
  348. if err != nil {
  349. return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
  350. }
  351. labels, err := pvlabler.GetLabelsForVolume(context.TODO(), pv)
  352. if err != nil {
  353. return nil, err
  354. }
  355. return labels, nil
  356. }
  357. func (l *persistentVolumeLabel) getVspherePVLabeler() (cloudprovider.PVLabeler, error) {
  358. l.mutex.Lock()
  359. defer l.mutex.Unlock()
  360. if l.vspherePVLabeler == nil {
  361. var cloudConfigReader io.Reader
  362. if len(l.cloudConfig) > 0 {
  363. cloudConfigReader = bytes.NewReader(l.cloudConfig)
  364. }
  365. cloudProvider, err := cloudprovider.GetCloudProvider("vsphere", cloudConfigReader)
  366. if err != nil || cloudProvider == nil {
  367. return nil, err
  368. }
  369. vspherePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
  370. if !ok {
  371. // GetCloudProvider failed
  372. return nil, errors.New("vSphere Cloud Provider does not implement PV labeling")
  373. }
  374. l.vspherePVLabeler = vspherePVLabeler
  375. }
  376. return l.vspherePVLabeler, nil
  377. }