portworx_util.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package portworx
  14. import (
  15. "fmt"
  16. osdapi "github.com/libopenstorage/openstorage/api"
  17. osdclient "github.com/libopenstorage/openstorage/api/client"
  18. volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
  19. osdspec "github.com/libopenstorage/openstorage/api/spec"
  20. volumeapi "github.com/libopenstorage/openstorage/volume"
  21. "k8s.io/api/core/v1"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  25. "k8s.io/klog"
  26. api "k8s.io/kubernetes/pkg/apis/core"
  27. "k8s.io/kubernetes/pkg/volume"
  28. )
  29. const (
  30. osdMgmtDefaultPort = 9001
  31. osdDriverVersion = "v1"
  32. pxdDriverName = "pxd"
  33. pvcClaimLabel = "pvc"
  34. pvcNamespaceLabel = "namespace"
  35. pxServiceName = "portworx-service"
  36. pxDriverName = "pxd-sched"
  37. )
  38. type portworxVolumeUtil struct {
  39. portworxClient *osdclient.Client
  40. }
  41. // CreateVolume creates a Portworx volume.
  42. func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
  43. driver, err := util.getPortworxDriver(p.plugin.host)
  44. if err != nil || driver == nil {
  45. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  46. return "", 0, nil, err
  47. }
  48. klog.Infof("Creating Portworx volume for PVC: %v", p.options.PVC.Name)
  49. capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
  50. // Portworx Volumes are specified in GiB
  51. requestGiB := volumehelpers.RoundUpToGiB(capacity)
  52. // Perform a best-effort parsing of parameters. Portworx 1.2.9 and later parses volume parameters from
  53. // spec.VolumeLabels. So even if below SpecFromOpts() fails to parse certain parameters or
  54. // doesn't support new parameters, the server-side processing will parse it correctly.
  55. // We still need to call SpecFromOpts() here to handle cases where someone is running Portworx 1.2.8 and lower.
  56. specHandler := osdspec.NewSpecHandler()
  57. spec, locator, source, _ := specHandler.SpecFromOpts(p.options.Parameters)
  58. if spec == nil {
  59. spec = specHandler.DefaultSpec()
  60. }
  61. // Pass all parameters as volume labels for Portworx server-side processing
  62. if spec.VolumeLabels == nil {
  63. spec.VolumeLabels = make(map[string]string, 0)
  64. }
  65. for k, v := range p.options.Parameters {
  66. spec.VolumeLabels[k] = v
  67. }
  68. // Update the requested size in the spec
  69. spec.Size = uint64(requestGiB * volumehelpers.GiB)
  70. // Change the Portworx Volume name to PV name
  71. if locator == nil {
  72. locator = &osdapi.VolumeLocator{
  73. VolumeLabels: make(map[string]string),
  74. }
  75. }
  76. locator.Name = p.options.PVName
  77. // Add claim Name as a part of Portworx Volume Labels
  78. locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
  79. locator.VolumeLabels[pvcNamespaceLabel] = p.options.PVC.Namespace
  80. for k, v := range p.options.PVC.Annotations {
  81. if _, present := spec.VolumeLabels[k]; present {
  82. klog.Warningf("not saving annotation: %s=%s in spec labels due to an existing key", k, v)
  83. continue
  84. }
  85. spec.VolumeLabels[k] = v
  86. }
  87. volumeID, err := driver.Create(locator, source, spec)
  88. if err != nil {
  89. klog.Errorf("Error creating Portworx Volume : %v", err)
  90. return "", 0, nil, err
  91. }
  92. klog.Infof("Successfully created Portworx volume for PVC: %v", p.options.PVC.Name)
  93. return volumeID, requestGiB, nil, err
  94. }
  95. // DeleteVolume deletes a Portworx volume
  96. func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
  97. driver, err := util.getPortworxDriver(d.plugin.host)
  98. if err != nil || driver == nil {
  99. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  100. return err
  101. }
  102. err = driver.Delete(d.volumeID)
  103. if err != nil {
  104. klog.Errorf("Error deleting Portworx Volume (%v): %v", d.volName, err)
  105. return err
  106. }
  107. return nil
  108. }
  109. // AttachVolume attaches a Portworx Volume
  110. func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
  111. driver, err := util.getLocalPortworxDriver(m.plugin.host)
  112. if err != nil || driver == nil {
  113. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  114. return "", err
  115. }
  116. devicePath, err := driver.Attach(m.volName, attachOptions)
  117. if err != nil {
  118. klog.Errorf("Error attaching Portworx Volume (%v): %v", m.volName, err)
  119. return "", err
  120. }
  121. return devicePath, nil
  122. }
  123. // DetachVolume detaches a Portworx Volume
  124. func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
  125. driver, err := util.getLocalPortworxDriver(u.plugin.host)
  126. if err != nil || driver == nil {
  127. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  128. return err
  129. }
  130. err = driver.Detach(u.volName, false /*doNotForceDetach*/)
  131. if err != nil {
  132. klog.Errorf("Error detaching Portworx Volume (%v): %v", u.volName, err)
  133. return err
  134. }
  135. return nil
  136. }
  137. // MountVolume mounts a Portworx Volume on the specified mountPath
  138. func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
  139. driver, err := util.getLocalPortworxDriver(m.plugin.host)
  140. if err != nil || driver == nil {
  141. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  142. return err
  143. }
  144. err = driver.Mount(m.volName, mountPath)
  145. if err != nil {
  146. klog.Errorf("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
  147. return err
  148. }
  149. return nil
  150. }
  151. // UnmountVolume unmounts a Portworx Volume
  152. func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
  153. driver, err := util.getLocalPortworxDriver(u.plugin.host)
  154. if err != nil || driver == nil {
  155. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  156. return err
  157. }
  158. err = driver.Unmount(u.volName, mountPath)
  159. if err != nil {
  160. klog.Errorf("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
  161. return err
  162. }
  163. return nil
  164. }
  165. func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
  166. driver, err := util.getPortworxDriver(volumeHost)
  167. if err != nil || driver == nil {
  168. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  169. return err
  170. }
  171. vols, err := driver.Inspect([]string{spec.Name()})
  172. if err != nil {
  173. return err
  174. }
  175. if len(vols) != 1 {
  176. return fmt.Errorf("failed to inspect Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
  177. }
  178. vol := vols[0]
  179. newSizeInBytes := uint64(volumehelpers.RoundUpToGiB(newSize) * volumehelpers.GiB)
  180. if vol.Spec.Size >= newSizeInBytes {
  181. klog.Infof("Portworx volume: %s already at size: %d greater than or equal to new "+
  182. "requested size: %d. Skipping resize.", spec.Name(), vol.Spec.Size, newSizeInBytes)
  183. return nil
  184. }
  185. vol.Spec.Size = newSizeInBytes
  186. err = driver.Set(spec.Name(), vol.Locator, vol.Spec)
  187. if err != nil {
  188. return err
  189. }
  190. // check if the volume's size actually got updated
  191. vols, err = driver.Inspect([]string{spec.Name()})
  192. if err != nil {
  193. return err
  194. }
  195. if len(vols) != 1 {
  196. return fmt.Errorf("failed to inspect resized Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
  197. }
  198. updatedVol := vols[0]
  199. if updatedVol.Spec.Size < vol.Spec.Size {
  200. return fmt.Errorf("Portworx volume: %s doesn't match expected size after resize. expected:%v actual:%v",
  201. spec.Name(), vol.Spec.Size, updatedVol.Spec.Size)
  202. }
  203. return nil
  204. }
  205. func isClientValid(client *osdclient.Client) (bool, error) {
  206. if client == nil {
  207. return false, nil
  208. }
  209. _, err := client.Versions(osdapi.OsdVolumePath)
  210. if err != nil {
  211. klog.Errorf("portworx client failed driver versions check. Err: %v", err)
  212. return false, err
  213. }
  214. return true, nil
  215. }
  216. func createDriverClient(hostname string, port int32) (*osdclient.Client, error) {
  217. client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s:%d", hostname, port),
  218. pxdDriverName, osdDriverVersion, pxDriverName)
  219. if err != nil {
  220. return nil, err
  221. }
  222. isValid, err := isClientValid(client)
  223. if isValid {
  224. return client, nil
  225. }
  226. return nil, err
  227. }
  228. // getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations.
  229. // Operations like create and delete volume don't need to be restricted to local volume host since
  230. // any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
  231. // the Portworx node that will own/owns the data.
  232. func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
  233. // check if existing saved client is valid
  234. if isValid, _ := isClientValid(util.portworxClient); isValid {
  235. return volumeclient.VolumeDriver(util.portworxClient), nil
  236. }
  237. // create new client
  238. var err error
  239. util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility
  240. if err != nil || util.portworxClient == nil {
  241. // Create client from portworx k8s service.
  242. svc, err := getPortworxService(volumeHost)
  243. if err != nil {
  244. return nil, err
  245. }
  246. // The port here is always the default one since it's the service port
  247. util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
  248. if err != nil || util.portworxClient == nil {
  249. klog.Errorf("Failed to connect to portworx service. Err: %v", err)
  250. return nil, err
  251. }
  252. klog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
  253. svc.Spec.ClusterIP, osdMgmtDefaultPort)
  254. } else {
  255. klog.Infof("Using portworx service at: %v:%d as api endpoint",
  256. volumeHost.GetHostName(), osdMgmtDefaultPort)
  257. }
  258. return volumeclient.VolumeDriver(util.portworxClient), nil
  259. }
  260. // getLocalPortworxDriver returns driver connected to Portworx API server on volume host.
  261. // This is required to force certain operations (mount, unmount, detach, attach) to
  262. // go to the volume host instead of the k8s service which might route it to any host. This pertains to how
  263. // Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
  264. // see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
  265. func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
  266. if util.portworxClient != nil {
  267. // check if existing saved client is valid
  268. if isValid, _ := isClientValid(util.portworxClient); isValid {
  269. return volumeclient.VolumeDriver(util.portworxClient), nil
  270. }
  271. }
  272. // Lookup port
  273. svc, err := getPortworxService(volumeHost)
  274. if err != nil {
  275. return nil, err
  276. }
  277. osgMgmtPort := lookupPXAPIPortFromService(svc)
  278. util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
  279. if err != nil {
  280. return nil, err
  281. }
  282. klog.Infof("Using portworx local service at: %v:%d as api endpoint",
  283. volumeHost.GetHostName(), osgMgmtPort)
  284. return volumeclient.VolumeDriver(util.portworxClient), nil
  285. }
  286. // lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
  287. // port for osdMgmtDefaultPort
  288. func lookupPXAPIPortFromService(svc *v1.Service) int32 {
  289. for _, p := range svc.Spec.Ports {
  290. if p.Port == osdMgmtDefaultPort {
  291. return p.TargetPort.IntVal
  292. }
  293. }
  294. return osdMgmtDefaultPort // default
  295. }
  296. // getPortworxService returns the portworx cluster service from the API server
  297. func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
  298. kubeClient := host.GetKubeClient()
  299. if kubeClient == nil {
  300. err := fmt.Errorf("Failed to get kubeclient when creating portworx client")
  301. klog.Errorf(err.Error())
  302. return nil, err
  303. }
  304. opts := metav1.GetOptions{}
  305. svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
  306. if err != nil {
  307. klog.Errorf("Failed to get service. Err: %v", err)
  308. return nil, err
  309. }
  310. if svc == nil {
  311. err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it", pxServiceName)
  312. klog.Errorf(err.Error())
  313. return nil, err
  314. }
  315. return svc, nil
  316. }