resize_util.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "k8s.io/utils/mount"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/apimachinery/pkg/api/resource"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/strategicpatch"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/kubernetes/pkg/util/resizefs"
  28. "k8s.io/kubernetes/pkg/volume"
  29. volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
  30. )
  31. var (
  32. knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
  33. v1.PersistentVolumeClaimFileSystemResizePending: true,
  34. v1.PersistentVolumeClaimResizing: true,
  35. }
  36. )
  37. type resizeProcessStatus struct {
  38. condition v1.PersistentVolumeClaimCondition
  39. processed bool
  40. }
  41. // ClaimToClaimKey return namespace/name string for pvc
  42. func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
  43. return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
  44. }
  45. // UpdatePVSize updates just pv size after cloudprovider resizing is successful
  46. func UpdatePVSize(
  47. pv *v1.PersistentVolume,
  48. newSize resource.Quantity,
  49. kubeClient clientset.Interface) error {
  50. pvClone := pv.DeepCopy()
  51. oldData, err := json.Marshal(pvClone)
  52. if err != nil {
  53. return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err)
  54. }
  55. pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
  56. newData, err := json.Marshal(pvClone)
  57. if err != nil {
  58. return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err)
  59. }
  60. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
  61. if err != nil {
  62. return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err)
  63. }
  64. _, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pvClone.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
  65. if err != nil {
  66. return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err)
  67. }
  68. return nil
  69. }
  70. // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress
  71. // and also annotates the PVC with the name of the resizer.
  72. func MarkResizeInProgressWithResizer(
  73. pvc *v1.PersistentVolumeClaim,
  74. resizerName string,
  75. kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
  76. // Mark PVC as Resize Started
  77. progressCondition := v1.PersistentVolumeClaimCondition{
  78. Type: v1.PersistentVolumeClaimResizing,
  79. Status: v1.ConditionTrue,
  80. LastTransitionTime: metav1.Now(),
  81. }
  82. conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
  83. newPVC := pvc.DeepCopy()
  84. newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
  85. newPVC = setResizer(newPVC, resizerName)
  86. return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
  87. }
  88. // SetClaimResizer sets resizer annotation on PVC
  89. func SetClaimResizer(
  90. pvc *v1.PersistentVolumeClaim,
  91. resizerName string,
  92. kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
  93. newPVC := pvc.DeepCopy()
  94. newPVC = setResizer(newPVC, resizerName)
  95. return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
  96. }
  97. func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim {
  98. if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName {
  99. return pvc
  100. }
  101. metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName)
  102. return pvc
  103. }
  104. // MarkForFSResize marks file system resizing as pending
  105. func MarkForFSResize(
  106. pvc *v1.PersistentVolumeClaim,
  107. kubeClient clientset.Interface) error {
  108. pvcCondition := v1.PersistentVolumeClaimCondition{
  109. Type: v1.PersistentVolumeClaimFileSystemResizePending,
  110. Status: v1.ConditionTrue,
  111. LastTransitionTime: metav1.Now(),
  112. Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
  113. }
  114. conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
  115. newPVC := pvc.DeepCopy()
  116. newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
  117. _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
  118. return err
  119. }
  120. // MarkResizeFinished marks all resizing as done
  121. func MarkResizeFinished(
  122. pvc *v1.PersistentVolumeClaim,
  123. newSize resource.Quantity,
  124. kubeClient clientset.Interface) error {
  125. return MarkFSResizeFinished(pvc, newSize, kubeClient)
  126. }
  127. // MarkFSResizeFinished marks file system resizing as done
  128. func MarkFSResizeFinished(
  129. pvc *v1.PersistentVolumeClaim,
  130. newSize resource.Quantity,
  131. kubeClient clientset.Interface) error {
  132. newPVC := pvc.DeepCopy()
  133. newPVC.Status.Capacity[v1.ResourceStorage] = newSize
  134. newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
  135. _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
  136. return err
  137. }
  138. // PatchPVCStatus updates PVC status using PATCH verb
  139. // Don't use Update because this can be called from kubelet and if kubelet has an older client its
  140. // Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion
  141. // to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would
  142. func PatchPVCStatus(
  143. oldPVC *v1.PersistentVolumeClaim,
  144. newPVC *v1.PersistentVolumeClaim,
  145. kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
  146. patchBytes, err := createPVCPatch(oldPVC, newPVC)
  147. if err != nil {
  148. return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
  149. }
  150. updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
  151. Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
  152. if updateErr != nil {
  153. return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
  154. }
  155. return updatedClaim, nil
  156. }
  157. func createPVCPatch(
  158. oldPVC *v1.PersistentVolumeClaim,
  159. newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
  160. oldData, err := json.Marshal(oldPVC)
  161. if err != nil {
  162. return nil, fmt.Errorf("failed to marshal old data: %v", err)
  163. }
  164. newData, err := json.Marshal(newPVC)
  165. if err != nil {
  166. return nil, fmt.Errorf("failed to marshal new data: %v", err)
  167. }
  168. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
  169. if err != nil {
  170. return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
  171. }
  172. patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
  173. if err != nil {
  174. return nil, fmt.Errorf("failed to add resource version: %v", err)
  175. }
  176. return patchBytes, nil
  177. }
  178. func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
  179. var patchMap map[string]interface{}
  180. err := json.Unmarshal(patchBytes, &patchMap)
  181. if err != nil {
  182. return nil, fmt.Errorf("error unmarshalling patch: %v", err)
  183. }
  184. u := unstructured.Unstructured{Object: patchMap}
  185. a, err := meta.Accessor(&u)
  186. if err != nil {
  187. return nil, fmt.Errorf("error creating accessor: %v", err)
  188. }
  189. a.SetResourceVersion(resourceVersion)
  190. versionBytes, err := json.Marshal(patchMap)
  191. if err != nil {
  192. return nil, fmt.Errorf("error marshalling json patch: %v", err)
  193. }
  194. return versionBytes, nil
  195. }
  196. // MergeResizeConditionOnPVC updates pvc with requested resize conditions
  197. // leaving other conditions untouched.
  198. func MergeResizeConditionOnPVC(
  199. pvc *v1.PersistentVolumeClaim,
  200. resizeConditions []v1.PersistentVolumeClaimCondition) *v1.PersistentVolumeClaim {
  201. resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]*resizeProcessStatus{}
  202. for _, condition := range resizeConditions {
  203. resizeConditionMap[condition.Type] = &resizeProcessStatus{condition, false}
  204. }
  205. oldConditions := pvc.Status.Conditions
  206. newConditions := []v1.PersistentVolumeClaimCondition{}
  207. for _, condition := range oldConditions {
  208. // If Condition is of not resize type, we keep it.
  209. if _, ok := knownResizeConditions[condition.Type]; !ok {
  210. newConditions = append(newConditions, condition)
  211. continue
  212. }
  213. if newCondition, ok := resizeConditionMap[condition.Type]; ok {
  214. if newCondition.condition.Status != condition.Status {
  215. newConditions = append(newConditions, newCondition.condition)
  216. } else {
  217. newConditions = append(newConditions, condition)
  218. }
  219. newCondition.processed = true
  220. }
  221. }
  222. // append all unprocessed conditions
  223. for _, newCondition := range resizeConditionMap {
  224. if !newCondition.processed {
  225. newConditions = append(newConditions, newCondition.condition)
  226. }
  227. }
  228. pvc.Status.Conditions = newConditions
  229. return pvc
  230. }
  231. // GenericResizeFS : call generic filesystem resizer for plugins that don't have any special filesystem resize requirements
  232. func GenericResizeFS(host volume.VolumeHost, pluginName, devicePath, deviceMountPath string) (bool, error) {
  233. mounter := host.GetMounter(pluginName)
  234. diskFormatter := &mount.SafeFormatAndMount{
  235. Interface: mounter,
  236. Exec: host.GetExec(pluginName),
  237. }
  238. resizer := resizefs.NewResizeFs(diskFormatter)
  239. return resizer.Resize(devicePath, deviceMountPath)
  240. }