volume_host.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "fmt"
  16. "net"
  17. "runtime"
  18. "k8s.io/klog"
  19. authenticationv1 "k8s.io/api/authentication/v1"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/types"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. utilfeature "k8s.io/apiserver/pkg/util/feature"
  24. "k8s.io/client-go/informers"
  25. clientset "k8s.io/client-go/kubernetes"
  26. storagelisters "k8s.io/client-go/listers/storage/v1beta1"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/client-go/tools/record"
  29. cloudprovider "k8s.io/cloud-provider"
  30. "k8s.io/kubernetes/pkg/features"
  31. "k8s.io/kubernetes/pkg/kubelet/configmap"
  32. "k8s.io/kubernetes/pkg/kubelet/container"
  33. "k8s.io/kubernetes/pkg/kubelet/mountpod"
  34. "k8s.io/kubernetes/pkg/kubelet/secret"
  35. "k8s.io/kubernetes/pkg/kubelet/token"
  36. "k8s.io/kubernetes/pkg/util/mount"
  37. "k8s.io/kubernetes/pkg/volume"
  38. "k8s.io/kubernetes/pkg/volume/util"
  39. execmnt "k8s.io/kubernetes/pkg/volume/util/exec"
  40. "k8s.io/kubernetes/pkg/volume/util/subpath"
  41. )
  42. // NewInitializedVolumePluginMgr returns a new instance of
  43. // volume.VolumePluginMgr initialized with kubelets implementation of the
  44. // volume.VolumeHost interface.
  45. //
  46. // kubelet - used by VolumeHost methods to expose kubelet specific parameters
  47. // plugins - used to initialize volumePluginMgr
  48. func NewInitializedVolumePluginMgr(
  49. kubelet *Kubelet,
  50. secretManager secret.Manager,
  51. configMapManager configmap.Manager,
  52. tokenManager *token.Manager,
  53. plugins []volume.VolumePlugin,
  54. prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
  55. // Initialize csiDriverLister before calling InitPlugins
  56. var informerFactory informers.SharedInformerFactory
  57. var csiDriverLister storagelisters.CSIDriverLister
  58. var csiDriversSynced cache.InformerSynced
  59. const resyncPeriod = 0
  60. if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
  61. // Don't initialize if kubeClient is nil
  62. if kubelet.kubeClient != nil {
  63. informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
  64. csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers()
  65. csiDriverLister = csiDriverInformer.Lister()
  66. csiDriversSynced = csiDriverInformer.Informer().HasSynced
  67. } else {
  68. klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister")
  69. }
  70. }
  71. mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
  72. if err != nil {
  73. return nil, err
  74. }
  75. kvh := &kubeletVolumeHost{
  76. kubelet: kubelet,
  77. volumePluginMgr: volume.VolumePluginMgr{},
  78. secretManager: secretManager,
  79. configMapManager: configMapManager,
  80. tokenManager: tokenManager,
  81. mountPodManager: mountPodManager,
  82. informerFactory: informerFactory,
  83. csiDriverLister: csiDriverLister,
  84. csiDriversSynced: csiDriversSynced,
  85. }
  86. if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
  87. return nil, fmt.Errorf(
  88. "Could not initialize volume plugins for KubeletVolumePluginMgr: %v",
  89. err)
  90. }
  91. return &kvh.volumePluginMgr, nil
  92. }
  93. // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface
  94. var _ volume.VolumeHost = &kubeletVolumeHost{}
  95. var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}
  96. func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
  97. return kvh.kubelet.getPluginDir(pluginName)
  98. }
  99. type kubeletVolumeHost struct {
  100. kubelet *Kubelet
  101. volumePluginMgr volume.VolumePluginMgr
  102. secretManager secret.Manager
  103. tokenManager *token.Manager
  104. configMapManager configmap.Manager
  105. mountPodManager mountpod.Manager
  106. informerFactory informers.SharedInformerFactory
  107. csiDriverLister storagelisters.CSIDriverLister
  108. csiDriversSynced cache.InformerSynced
  109. }
  110. func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
  111. kvh.kubelet.runtimeState.setStorageState(err)
  112. }
  113. func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
  114. return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
  115. }
  116. func (kvh *kubeletVolumeHost) GetPodsDir() string {
  117. return kvh.kubelet.getPodsDir()
  118. }
  119. func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
  120. dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
  121. if runtime.GOOS == "windows" {
  122. dir = util.GetWindowsPath(dir)
  123. }
  124. return dir
  125. }
  126. func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
  127. return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
  128. }
  129. func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
  130. return kvh.kubelet.getPodPluginDir(podUID, pluginName)
  131. }
  132. func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
  133. return kvh.kubelet.kubeClient
  134. }
  135. func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
  136. return kvh.kubelet.subpather
  137. }
  138. func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
  139. return kvh.informerFactory
  140. }
  141. func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
  142. return kvh.csiDriverLister
  143. }
  144. func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
  145. return kvh.csiDriversSynced
  146. }
  147. // WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
  148. func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
  149. if kvh.csiDriversSynced == nil {
  150. klog.Error("csiDriversSynced not found on KubeletVolumeHost")
  151. return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
  152. }
  153. synced := []cache.InformerSynced{kvh.csiDriversSynced}
  154. if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
  155. klog.Warning("failed to wait for cache sync for CSIDriverLister")
  156. return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
  157. }
  158. return nil
  159. }
  160. func (kvh *kubeletVolumeHost) NewWrapperMounter(
  161. volName string,
  162. spec volume.Spec,
  163. pod *v1.Pod,
  164. opts volume.VolumeOptions) (volume.Mounter, error) {
  165. // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
  166. wrapperVolumeName := "wrapped_" + volName
  167. if spec.Volume != nil {
  168. spec.Volume.Name = wrapperVolumeName
  169. }
  170. return kvh.kubelet.newVolumeMounterFromPlugins(&spec, pod, opts)
  171. }
  172. func (kvh *kubeletVolumeHost) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
  173. // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
  174. wrapperVolumeName := "wrapped_" + volName
  175. if spec.Volume != nil {
  176. spec.Volume.Name = wrapperVolumeName
  177. }
  178. plugin, err := kvh.kubelet.volumePluginMgr.FindPluginBySpec(&spec)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return plugin.NewUnmounter(spec.Name(), podUID)
  183. }
  184. func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
  185. return kvh.kubelet.cloud
  186. }
  187. func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
  188. exec, err := kvh.getMountExec(pluginName)
  189. if err != nil {
  190. klog.V(2).Infof("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
  191. // Use the default mounter
  192. exec = nil
  193. }
  194. if exec == nil {
  195. return kvh.kubelet.mounter
  196. }
  197. return execmnt.NewExecMounter(exec, kvh.kubelet.mounter)
  198. }
  199. func (kvh *kubeletVolumeHost) GetHostName() string {
  200. return kvh.kubelet.hostname
  201. }
  202. func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
  203. return kvh.kubelet.GetHostIP()
  204. }
  205. func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
  206. node, err := kvh.kubelet.getNodeAnyWay()
  207. if err != nil {
  208. return nil, fmt.Errorf("error retrieving node: %v", err)
  209. }
  210. return node.Status.Allocatable, nil
  211. }
  212. func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
  213. return kvh.secretManager.GetSecret
  214. }
  215. func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
  216. return kvh.configMapManager.GetConfigMap
  217. }
  218. func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
  219. return kvh.tokenManager.GetServiceAccountToken
  220. }
  221. func (kvh *kubeletVolumeHost) DeleteServiceAccountTokenFunc() func(podUID types.UID) {
  222. return kvh.tokenManager.DeleteServiceAccountToken
  223. }
  224. func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
  225. node, err := kvh.kubelet.GetNode()
  226. if err != nil {
  227. return nil, fmt.Errorf("error retrieving node: %v", err)
  228. }
  229. return node.Labels, nil
  230. }
  231. func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
  232. return kvh.kubelet.nodeName
  233. }
  234. func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
  235. return kvh.kubelet.recorder
  236. }
  237. func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
  238. exec, err := kvh.getMountExec(pluginName)
  239. if err != nil {
  240. klog.V(2).Infof("Error finding mount pod for plugin %s: %s", pluginName, err.Error())
  241. // Use the default exec
  242. exec = nil
  243. }
  244. if exec == nil {
  245. return mount.NewOsExec()
  246. }
  247. return exec
  248. }
  249. // getMountExec returns mount.Exec implementation that leads to pod with mount
  250. // utilities. It returns nil,nil when there is no such pod and default mounter /
  251. // os.Exec should be used.
  252. func (kvh *kubeletVolumeHost) getMountExec(pluginName string) (mount.Exec, error) {
  253. if !utilfeature.DefaultFeatureGate.Enabled(features.MountContainers) {
  254. klog.V(5).Infof("using default mounter/exec for %s", pluginName)
  255. return nil, nil
  256. }
  257. pod, container, err := kvh.mountPodManager.GetMountPod(pluginName)
  258. if err != nil {
  259. return nil, err
  260. }
  261. if pod == nil {
  262. // Use default mounter/exec for this plugin
  263. klog.V(5).Infof("using default mounter/exec for %s", pluginName)
  264. return nil, nil
  265. }
  266. klog.V(5).Infof("using container %s/%s/%s to execute mount utilities for %s", pod.Namespace, pod.Name, container, pluginName)
  267. return &containerExec{
  268. pod: pod,
  269. containerName: container,
  270. kl: kvh.kubelet,
  271. }, nil
  272. }
  273. // containerExec is implementation of mount.Exec that executes commands in given
  274. // container in given pod.
  275. type containerExec struct {
  276. pod *v1.Pod
  277. containerName string
  278. kl *Kubelet
  279. }
  280. var _ mount.Exec = &containerExec{}
  281. func (e *containerExec) Run(cmd string, args ...string) ([]byte, error) {
  282. cmdline := append([]string{cmd}, args...)
  283. klog.V(5).Infof("Exec mounter running in pod %s/%s/%s: %v", e.pod.Namespace, e.pod.Name, e.containerName, cmdline)
  284. return e.kl.RunInContainer(container.GetPodFullName(e.pod), e.pod.UID, e.containerName, cmdline)
  285. }