portworx_util.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package portworx
  14. import (
  15. "context"
  16. "fmt"
  17. osdapi "github.com/libopenstorage/openstorage/api"
  18. osdclient "github.com/libopenstorage/openstorage/api/client"
  19. volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
  20. osdspec "github.com/libopenstorage/openstorage/api/spec"
  21. volumeapi "github.com/libopenstorage/openstorage/volume"
  22. "k8s.io/api/core/v1"
  23. "k8s.io/apimachinery/pkg/api/resource"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. volumehelpers "k8s.io/cloud-provider/volume/helpers"
  26. "k8s.io/klog"
  27. api "k8s.io/kubernetes/pkg/apis/core"
  28. "k8s.io/kubernetes/pkg/volume"
  29. )
  30. const (
  31. osdMgmtDefaultPort = 9001
  32. osdDriverVersion = "v1"
  33. pxdDriverName = "pxd"
  34. pvcClaimLabel = "pvc"
  35. pvcNamespaceLabel = "namespace"
  36. pxServiceName = "portworx-service"
  37. pxDriverName = "pxd-sched"
  38. )
  39. type portworxVolumeUtil struct {
  40. portworxClient *osdclient.Client
  41. }
  42. // CreateVolume creates a Portworx volume.
  43. func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
  44. driver, err := util.getPortworxDriver(p.plugin.host)
  45. if err != nil || driver == nil {
  46. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  47. return "", 0, nil, err
  48. }
  49. klog.Infof("Creating Portworx volume for PVC: %v", p.options.PVC.Name)
  50. capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
  51. // Portworx Volumes are specified in GiB
  52. requestGiB := volumehelpers.RoundUpToGiB(capacity)
  53. // Perform a best-effort parsing of parameters. Portworx 1.2.9 and later parses volume parameters from
  54. // spec.VolumeLabels. So even if below SpecFromOpts() fails to parse certain parameters or
  55. // doesn't support new parameters, the server-side processing will parse it correctly.
  56. // We still need to call SpecFromOpts() here to handle cases where someone is running Portworx 1.2.8 and lower.
  57. specHandler := osdspec.NewSpecHandler()
  58. spec, locator, source, _ := specHandler.SpecFromOpts(p.options.Parameters)
  59. if spec == nil {
  60. spec = specHandler.DefaultSpec()
  61. }
  62. // Pass all parameters as volume labels for Portworx server-side processing
  63. if spec.VolumeLabels == nil {
  64. spec.VolumeLabels = make(map[string]string, 0)
  65. }
  66. for k, v := range p.options.Parameters {
  67. spec.VolumeLabels[k] = v
  68. }
  69. // Update the requested size in the spec
  70. spec.Size = uint64(requestGiB * volumehelpers.GiB)
  71. // Change the Portworx Volume name to PV name
  72. if locator == nil {
  73. locator = &osdapi.VolumeLocator{
  74. VolumeLabels: make(map[string]string),
  75. }
  76. }
  77. locator.Name = p.options.PVName
  78. // Add claim Name as a part of Portworx Volume Labels
  79. locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
  80. locator.VolumeLabels[pvcNamespaceLabel] = p.options.PVC.Namespace
  81. for k, v := range p.options.PVC.Annotations {
  82. if _, present := spec.VolumeLabels[k]; present {
  83. klog.Warningf("not saving annotation: %s=%s in spec labels due to an existing key", k, v)
  84. continue
  85. }
  86. spec.VolumeLabels[k] = v
  87. }
  88. volumeID, err := driver.Create(locator, source, spec)
  89. if err != nil {
  90. klog.Errorf("Error creating Portworx Volume : %v", err)
  91. return "", 0, nil, err
  92. }
  93. klog.Infof("Successfully created Portworx volume for PVC: %v", p.options.PVC.Name)
  94. return volumeID, requestGiB, nil, err
  95. }
  96. // DeleteVolume deletes a Portworx volume
  97. func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
  98. driver, err := util.getPortworxDriver(d.plugin.host)
  99. if err != nil || driver == nil {
  100. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  101. return err
  102. }
  103. err = driver.Delete(d.volumeID)
  104. if err != nil {
  105. klog.Errorf("Error deleting Portworx Volume (%v): %v", d.volName, err)
  106. return err
  107. }
  108. return nil
  109. }
  110. // AttachVolume attaches a Portworx Volume
  111. func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
  112. driver, err := util.getLocalPortworxDriver(m.plugin.host)
  113. if err != nil || driver == nil {
  114. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  115. return "", err
  116. }
  117. devicePath, err := driver.Attach(m.volName, attachOptions)
  118. if err != nil {
  119. klog.Errorf("Error attaching Portworx Volume (%v): %v", m.volName, err)
  120. return "", err
  121. }
  122. return devicePath, nil
  123. }
  124. // DetachVolume detaches a Portworx Volume
  125. func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
  126. driver, err := util.getLocalPortworxDriver(u.plugin.host)
  127. if err != nil || driver == nil {
  128. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  129. return err
  130. }
  131. err = driver.Detach(u.volName, false /*doNotForceDetach*/)
  132. if err != nil {
  133. klog.Errorf("Error detaching Portworx Volume (%v): %v", u.volName, err)
  134. return err
  135. }
  136. return nil
  137. }
  138. // MountVolume mounts a Portworx Volume on the specified mountPath
  139. func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
  140. driver, err := util.getLocalPortworxDriver(m.plugin.host)
  141. if err != nil || driver == nil {
  142. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  143. return err
  144. }
  145. err = driver.Mount(m.volName, mountPath)
  146. if err != nil {
  147. klog.Errorf("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
  148. return err
  149. }
  150. return nil
  151. }
  152. // UnmountVolume unmounts a Portworx Volume
  153. func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
  154. driver, err := util.getLocalPortworxDriver(u.plugin.host)
  155. if err != nil || driver == nil {
  156. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  157. return err
  158. }
  159. err = driver.Unmount(u.volName, mountPath)
  160. if err != nil {
  161. klog.Errorf("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
  162. return err
  163. }
  164. return nil
  165. }
  166. func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
  167. driver, err := util.getPortworxDriver(volumeHost)
  168. if err != nil || driver == nil {
  169. klog.Errorf("Failed to get portworx driver. Err: %v", err)
  170. return err
  171. }
  172. vols, err := driver.Inspect([]string{spec.Name()})
  173. if err != nil {
  174. return err
  175. }
  176. if len(vols) != 1 {
  177. return fmt.Errorf("failed to inspect Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
  178. }
  179. vol := vols[0]
  180. newSizeInBytes := uint64(volumehelpers.RoundUpToGiB(newSize) * volumehelpers.GiB)
  181. if vol.Spec.Size >= newSizeInBytes {
  182. klog.Infof("Portworx volume: %s already at size: %d greater than or equal to new "+
  183. "requested size: %d. Skipping resize.", spec.Name(), vol.Spec.Size, newSizeInBytes)
  184. return nil
  185. }
  186. vol.Spec.Size = newSizeInBytes
  187. err = driver.Set(spec.Name(), vol.Locator, vol.Spec)
  188. if err != nil {
  189. return err
  190. }
  191. // check if the volume's size actually got updated
  192. vols, err = driver.Inspect([]string{spec.Name()})
  193. if err != nil {
  194. return err
  195. }
  196. if len(vols) != 1 {
  197. return fmt.Errorf("failed to inspect resized Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
  198. }
  199. updatedVol := vols[0]
  200. if updatedVol.Spec.Size < vol.Spec.Size {
  201. return fmt.Errorf("Portworx volume: %s doesn't match expected size after resize. expected:%v actual:%v",
  202. spec.Name(), vol.Spec.Size, updatedVol.Spec.Size)
  203. }
  204. return nil
  205. }
  206. func isClientValid(client *osdclient.Client) (bool, error) {
  207. if client == nil {
  208. return false, nil
  209. }
  210. _, err := client.Versions(osdapi.OsdVolumePath)
  211. if err != nil {
  212. klog.Errorf("portworx client failed driver versions check. Err: %v", err)
  213. return false, err
  214. }
  215. return true, nil
  216. }
  217. func createDriverClient(hostname string, port int32) (*osdclient.Client, error) {
  218. client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s:%d", hostname, port),
  219. pxdDriverName, osdDriverVersion, pxDriverName)
  220. if err != nil {
  221. return nil, err
  222. }
  223. isValid, err := isClientValid(client)
  224. if isValid {
  225. return client, nil
  226. }
  227. return nil, err
  228. }
  229. // getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations.
  230. // Operations like create and delete volume don't need to be restricted to local volume host since
  231. // any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
  232. // the Portworx node that will own/owns the data.
  233. func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
  234. // check if existing saved client is valid
  235. if isValid, _ := isClientValid(util.portworxClient); isValid {
  236. return volumeclient.VolumeDriver(util.portworxClient), nil
  237. }
  238. // create new client
  239. var err error
  240. util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility
  241. if err != nil || util.portworxClient == nil {
  242. // Create client from portworx k8s service.
  243. svc, err := getPortworxService(volumeHost)
  244. if err != nil {
  245. return nil, err
  246. }
  247. // The port here is always the default one since it's the service port
  248. util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
  249. if err != nil || util.portworxClient == nil {
  250. klog.Errorf("Failed to connect to portworx service. Err: %v", err)
  251. return nil, err
  252. }
  253. klog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
  254. svc.Spec.ClusterIP, osdMgmtDefaultPort)
  255. } else {
  256. klog.Infof("Using portworx service at: %v:%d as api endpoint",
  257. volumeHost.GetHostName(), osdMgmtDefaultPort)
  258. }
  259. return volumeclient.VolumeDriver(util.portworxClient), nil
  260. }
  261. // getLocalPortworxDriver returns driver connected to Portworx API server on volume host.
  262. // This is required to force certain operations (mount, unmount, detach, attach) to
  263. // go to the volume host instead of the k8s service which might route it to any host. This pertains to how
  264. // Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
  265. // see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
  266. func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
  267. if util.portworxClient != nil {
  268. // check if existing saved client is valid
  269. if isValid, _ := isClientValid(util.portworxClient); isValid {
  270. return volumeclient.VolumeDriver(util.portworxClient), nil
  271. }
  272. }
  273. // Lookup port
  274. svc, err := getPortworxService(volumeHost)
  275. if err != nil {
  276. return nil, err
  277. }
  278. osgMgmtPort := lookupPXAPIPortFromService(svc)
  279. util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
  280. if err != nil {
  281. return nil, err
  282. }
  283. klog.Infof("Using portworx local service at: %v:%d as api endpoint",
  284. volumeHost.GetHostName(), osgMgmtPort)
  285. return volumeclient.VolumeDriver(util.portworxClient), nil
  286. }
  287. // lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
  288. // port for osdMgmtDefaultPort
  289. func lookupPXAPIPortFromService(svc *v1.Service) int32 {
  290. for _, p := range svc.Spec.Ports {
  291. if p.Port == osdMgmtDefaultPort {
  292. return p.TargetPort.IntVal
  293. }
  294. }
  295. return osdMgmtDefaultPort // default
  296. }
  297. // getPortworxService returns the portworx cluster service from the API server
  298. func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
  299. kubeClient := host.GetKubeClient()
  300. if kubeClient == nil {
  301. err := fmt.Errorf("Failed to get kubeclient when creating portworx client")
  302. klog.Errorf(err.Error())
  303. return nil, err
  304. }
  305. opts := metav1.GetOptions{}
  306. svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(context.TODO(), pxServiceName, opts)
  307. if err != nil {
  308. klog.Errorf("Failed to get service. Err: %v", err)
  309. return nil, err
  310. }
  311. if svc == nil {
  312. err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it", pxServiceName)
  313. klog.Errorf(err.Error())
  314. return nil, err
  315. }
  316. return svc, nil
  317. }