volume_host.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "fmt"
  16. "net"
  17. "runtime"
  18. "k8s.io/klog"
  19. utilexec "k8s.io/utils/exec"
  20. "k8s.io/utils/mount"
  21. authenticationv1 "k8s.io/api/authentication/v1"
  22. v1 "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. utilfeature "k8s.io/apiserver/pkg/util/feature"
  26. "k8s.io/client-go/informers"
  27. clientset "k8s.io/client-go/kubernetes"
  28. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  29. "k8s.io/client-go/tools/cache"
  30. "k8s.io/client-go/tools/record"
  31. cloudprovider "k8s.io/cloud-provider"
  32. "k8s.io/kubernetes/pkg/features"
  33. "k8s.io/kubernetes/pkg/kubelet/configmap"
  34. "k8s.io/kubernetes/pkg/kubelet/secret"
  35. "k8s.io/kubernetes/pkg/kubelet/token"
  36. "k8s.io/kubernetes/pkg/volume"
  37. "k8s.io/kubernetes/pkg/volume/util"
  38. "k8s.io/kubernetes/pkg/volume/util/hostutil"
  39. "k8s.io/kubernetes/pkg/volume/util/subpath"
  40. )
  41. // NewInitializedVolumePluginMgr returns a new instance of
  42. // volume.VolumePluginMgr initialized with kubelets implementation of the
  43. // volume.VolumeHost interface.
  44. //
  45. // kubelet - used by VolumeHost methods to expose kubelet specific parameters
  46. // plugins - used to initialize volumePluginMgr
  47. func NewInitializedVolumePluginMgr(
  48. kubelet *Kubelet,
  49. secretManager secret.Manager,
  50. configMapManager configmap.Manager,
  51. tokenManager *token.Manager,
  52. plugins []volume.VolumePlugin,
  53. prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
  54. // Initialize csiDriverLister before calling InitPlugins
  55. var informerFactory informers.SharedInformerFactory
  56. var csiDriverLister storagelisters.CSIDriverLister
  57. var csiDriversSynced cache.InformerSynced
  58. const resyncPeriod = 0
  59. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  60. // Don't initialize if kubeClient is nil
  61. if kubelet.kubeClient != nil {
  62. informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
  63. csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers()
  64. csiDriverLister = csiDriverInformer.Lister()
  65. csiDriversSynced = csiDriverInformer.Informer().HasSynced
  66. } else {
  67. klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister")
  68. }
  69. }
  70. kvh := &kubeletVolumeHost{
  71. kubelet: kubelet,
  72. volumePluginMgr: volume.VolumePluginMgr{},
  73. secretManager: secretManager,
  74. configMapManager: configMapManager,
  75. tokenManager: tokenManager,
  76. informerFactory: informerFactory,
  77. csiDriverLister: csiDriverLister,
  78. csiDriversSynced: csiDriversSynced,
  79. exec: utilexec.New(),
  80. }
  81. if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
  82. return nil, fmt.Errorf(
  83. "could not initialize volume plugins for KubeletVolumePluginMgr: %v",
  84. err)
  85. }
  86. return &kvh.volumePluginMgr, nil
  87. }
  88. // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface
  89. var _ volume.VolumeHost = &kubeletVolumeHost{}
  90. var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}
  91. func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
  92. return kvh.kubelet.getPluginDir(pluginName)
  93. }
  94. type kubeletVolumeHost struct {
  95. kubelet *Kubelet
  96. volumePluginMgr volume.VolumePluginMgr
  97. secretManager secret.Manager
  98. tokenManager *token.Manager
  99. configMapManager configmap.Manager
  100. informerFactory informers.SharedInformerFactory
  101. csiDriverLister storagelisters.CSIDriverLister
  102. csiDriversSynced cache.InformerSynced
  103. exec utilexec.Interface
  104. }
  105. func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
  106. kvh.kubelet.runtimeState.setStorageState(err)
  107. }
  108. func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
  109. return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
  110. }
  111. func (kvh *kubeletVolumeHost) GetPodsDir() string {
  112. return kvh.kubelet.getPodsDir()
  113. }
  114. func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
  115. dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
  116. if runtime.GOOS == "windows" {
  117. dir = util.GetWindowsPath(dir)
  118. }
  119. return dir
  120. }
  121. func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  122. return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
  123. }
  124. func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
  125. return kvh.kubelet.getPodPluginDir(podUID, pluginName)
  126. }
  127. func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
  128. return kvh.kubelet.kubeClient
  129. }
  130. func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
  131. return kvh.kubelet.subpather
  132. }
  133. func (kvh *kubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
  134. return kvh.kubelet.hostutil
  135. }
  136. func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
  137. return kvh.informerFactory
  138. }
  139. func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
  140. return kvh.csiDriverLister
  141. }
  142. func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
  143. return kvh.csiDriversSynced
  144. }
  145. // WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
  146. func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
  147. if kvh.csiDriversSynced == nil {
  148. klog.Error("csiDriversSynced not found on KubeletVolumeHost")
  149. return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
  150. }
  151. synced := []cache.InformerSynced{kvh.csiDriversSynced}
  152. if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
  153. klog.Warning("failed to wait for cache sync for CSIDriverLister")
  154. return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
  155. }
  156. return nil
  157. }
  158. func (kvh *kubeletVolumeHost) NewWrapperMounter(
  159. volName string,
  160. spec volume.Spec,
  161. pod *v1.Pod,
  162. opts volume.VolumeOptions) (volume.Mounter, error) {
  163. // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
  164. wrapperVolumeName := "wrapped_" + volName
  165. if spec.Volume != nil {
  166. spec.Volume.Name = wrapperVolumeName
  167. }
  168. return kvh.kubelet.newVolumeMounterFromPlugins(&spec, pod, opts)
  169. }
  170. func (kvh *kubeletVolumeHost) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  171. // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
  172. wrapperVolumeName := "wrapped_" + volName
  173. if spec.Volume != nil {
  174. spec.Volume.Name = wrapperVolumeName
  175. }
  176. plugin, err := kvh.kubelet.volumePluginMgr.FindPluginBySpec(&spec)
  177. if err != nil {
  178. return nil, err
  179. }
  180. return plugin.NewUnmounter(spec.Name(), podUID)
  181. }
  182. func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
  183. return kvh.kubelet.cloud
  184. }
  185. func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
  186. return kvh.kubelet.mounter
  187. }
  188. func (kvh *kubeletVolumeHost) GetHostName() string {
  189. return kvh.kubelet.hostname
  190. }
  191. func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
  192. return kvh.kubelet.GetHostIP()
  193. }
  194. func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
  195. node, err := kvh.kubelet.getNodeAnyWay()
  196. if err != nil {
  197. return nil, fmt.Errorf("error retrieving node: %v", err)
  198. }
  199. return node.Status.Allocatable, nil
  200. }
  201. func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  202. return kvh.secretManager.GetSecret
  203. }
  204. func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  205. return kvh.configMapManager.GetConfigMap
  206. }
  207. func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  208. return kvh.tokenManager.GetServiceAccountToken
  209. }
  210. func (kvh *kubeletVolumeHost) DeleteServiceAccountTokenFunc() func(podUID types.UID) {
  211. return kvh.tokenManager.DeleteServiceAccountToken
  212. }
  213. func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
  214. node, err := kvh.kubelet.GetNode()
  215. if err != nil {
  216. return nil, fmt.Errorf("error retrieving node: %v", err)
  217. }
  218. return node.Labels, nil
  219. }
  220. func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
  221. return kvh.kubelet.nodeName
  222. }
  223. func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
  224. return kvh.kubelet.recorder
  225. }
  226. func (kvh *kubeletVolumeHost) GetExec(pluginName string) utilexec.Interface {
  227. return kvh.exec
  228. }