storage_flowcontrol.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rest
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
  19. "k8s.io/apimachinery/pkg/api/equality"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
  24. "k8s.io/apiserver/pkg/registry/generic"
  25. "k8s.io/apiserver/pkg/registry/rest"
  26. genericapiserver "k8s.io/apiserver/pkg/server"
  27. serverstorage "k8s.io/apiserver/pkg/server/storage"
  28. flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
  29. "k8s.io/klog"
  30. "k8s.io/kubernetes/pkg/api/legacyscheme"
  31. "k8s.io/kubernetes/pkg/apis/flowcontrol"
  32. flowcontrolapisv1alpha1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1alpha1"
  33. flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
  34. prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
  35. )
  36. var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
  37. // RESTStorageProvider is a provider of REST storage
  38. type RESTStorageProvider struct{}
  39. // PostStartHookName is the name of the post-start-hook provided by flow-control storage
  40. const PostStartHookName = "apiserver/bootstrap-system-flowcontrol-configuration"
  41. // NewRESTStorage creates a new rest storage for flow-control api models.
  42. func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
  43. apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(flowcontrol.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
  44. if apiResourceConfigSource.VersionEnabled(flowcontrolv1alpha1.SchemeGroupVersion) {
  45. flowControlStorage, err := p.v1alpha1Storage(apiResourceConfigSource, restOptionsGetter)
  46. if err != nil {
  47. return genericapiserver.APIGroupInfo{}, false, err
  48. }
  49. apiGroupInfo.VersionedResourcesStorageMap[flowcontrolv1alpha1.SchemeGroupVersion.Version] = flowControlStorage
  50. }
  51. return apiGroupInfo, true, nil
  52. }
  53. func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
  54. storage := map[string]rest.Storage{}
  55. // flow-schema
  56. flowSchemaStorage, flowSchemaStatusStorage, err := flowschemastore.NewREST(restOptionsGetter)
  57. if err != nil {
  58. return nil, err
  59. }
  60. storage["flowschemas"] = flowSchemaStorage
  61. storage["flowschemas/status"] = flowSchemaStatusStorage
  62. // priority-level-configuration
  63. priorityLevelConfigurationStorage, priorityLevelConfigurationStatusStorage, err := prioritylevelconfigurationstore.NewREST(restOptionsGetter)
  64. if err != nil {
  65. return nil, err
  66. }
  67. storage["prioritylevelconfigurations"] = priorityLevelConfigurationStorage
  68. storage["prioritylevelconfigurations/status"] = priorityLevelConfigurationStatusStorage
  69. return storage, nil
  70. }
  71. // GroupName returns group name of the storage
  72. func (p RESTStorageProvider) GroupName() string {
  73. return flowcontrol.GroupName
  74. }
  75. // PostStartHook returns the hook func that launches the config provider
  76. func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
  77. return PostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
  78. flowcontrolClientSet := flowcontrolclient.NewForConfigOrDie(hookContext.LoopbackClientConfig)
  79. go func() {
  80. const retryCreatingSuggestedSettingsInterval = time.Second
  81. _ = wait.PollImmediateUntil(
  82. retryCreatingSuggestedSettingsInterval,
  83. func() (bool, error) {
  84. shouldEnsureSuggested, err := lastMandatoryExists(flowcontrolClientSet)
  85. if err != nil {
  86. klog.Errorf("failed getting exempt flow-schema, will retry later: %v", err)
  87. return false, nil
  88. }
  89. if !shouldEnsureSuggested {
  90. return true, nil
  91. }
  92. err = ensure(
  93. flowcontrolClientSet,
  94. flowcontrolbootstrap.SuggestedFlowSchemas,
  95. flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
  96. if err != nil {
  97. klog.Errorf("failed ensuring suggested settings, will retry later: %v", err)
  98. return false, nil
  99. }
  100. return true, nil
  101. },
  102. hookContext.StopCh)
  103. const retryCreatingMandatorySettingsInterval = time.Minute
  104. _ = wait.PollImmediateUntil(
  105. retryCreatingMandatorySettingsInterval,
  106. func() (bool, error) {
  107. if err := upgrade(
  108. flowcontrolClientSet,
  109. flowcontrolbootstrap.MandatoryFlowSchemas,
  110. // Note: the "exempt" priority-level is supposed tobe the last item in the pre-defined
  111. // list, so that a crash in the midst of the first kube-apiserver startup does not prevent
  112. // the full initial set of objects from being created.
  113. flowcontrolbootstrap.MandatoryPriorityLevelConfigurations,
  114. ); err != nil {
  115. klog.Errorf("failed creating mandatory flowcontrol settings: %v", err)
  116. return false, nil
  117. }
  118. return false, nil // always retry
  119. },
  120. hookContext.StopCh)
  121. }()
  122. return nil
  123. }, nil
  124. }
  125. // Returns false if there's a "exempt" priority-level existing in the cluster, otherwise returns a true
  126. // if the "exempt" priority-level is not found.
  127. func lastMandatoryExists(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface) (bool, error) {
  128. if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil {
  129. if apierrors.IsNotFound(err) {
  130. return true, nil
  131. }
  132. return false, err
  133. }
  134. return false, nil
  135. }
  136. const thisFieldManager = "api-priority-and-fairness-config-producer-v1"
  137. func ensure(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
  138. for _, flowSchema := range flowSchemas {
  139. _, err := flowcontrolClientSet.FlowSchemas().Create(context.TODO(), flowSchema, metav1.CreateOptions{FieldManager: thisFieldManager})
  140. if apierrors.IsAlreadyExists(err) {
  141. klog.V(3).Infof("Suggested FlowSchema %s already exists, skipping creating", flowSchema.Name)
  142. continue
  143. }
  144. if err != nil {
  145. return fmt.Errorf("cannot create suggested FlowSchema %s due to %v", flowSchema.Name, err)
  146. }
  147. klog.V(3).Infof("Created suggested FlowSchema %s", flowSchema.Name)
  148. }
  149. for _, priorityLevelConfiguration := range priorityLevels {
  150. _, err := flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), priorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager})
  151. if apierrors.IsAlreadyExists(err) {
  152. klog.V(3).Infof("Suggested PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name)
  153. continue
  154. }
  155. if err != nil {
  156. return fmt.Errorf("cannot create suggested PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err)
  157. }
  158. klog.V(3).Infof("Created suggested PriorityLevelConfiguration %s", priorityLevelConfiguration.Name)
  159. }
  160. return nil
  161. }
  162. func upgrade(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
  163. for _, expectedFlowSchema := range flowSchemas {
  164. actualFlowSchema, err := flowcontrolClientSet.FlowSchemas().Get(context.TODO(), expectedFlowSchema.Name, metav1.GetOptions{})
  165. if err == nil {
  166. // TODO(yue9944882): extract existing version from label and compare
  167. // TODO(yue9944882): create w/ version string attached
  168. wrongSpec, err := flowSchemaHasWrongSpec(expectedFlowSchema, actualFlowSchema)
  169. if err != nil {
  170. return fmt.Errorf("failed checking if mandatory FlowSchema %s is up-to-date due to %v, will retry later", expectedFlowSchema.Name, err)
  171. }
  172. if wrongSpec {
  173. if _, err := flowcontrolClientSet.FlowSchemas().Update(context.TODO(), expectedFlowSchema, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil {
  174. return fmt.Errorf("failed upgrading mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
  175. }
  176. klog.V(3).Infof("Updated mandatory FlowSchema %s because its spec was %#+v but it must be %#+v", expectedFlowSchema.Name, actualFlowSchema.Spec, expectedFlowSchema.Spec)
  177. }
  178. continue
  179. }
  180. if !apierrors.IsNotFound(err) {
  181. return fmt.Errorf("failed getting mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
  182. }
  183. _, err = flowcontrolClientSet.FlowSchemas().Create(context.TODO(), expectedFlowSchema, metav1.CreateOptions{FieldManager: thisFieldManager})
  184. if apierrors.IsAlreadyExists(err) {
  185. klog.V(3).Infof("Mandatory FlowSchema %s already exists, skipping creating", expectedFlowSchema.Name)
  186. continue
  187. }
  188. if err != nil {
  189. return fmt.Errorf("cannot create mandatory FlowSchema %s due to %v", expectedFlowSchema.Name, err)
  190. }
  191. klog.V(3).Infof("Created mandatory FlowSchema %s", expectedFlowSchema.Name)
  192. }
  193. for _, expectedPriorityLevelConfiguration := range priorityLevels {
  194. actualPriorityLevelConfiguration, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), expectedPriorityLevelConfiguration.Name, metav1.GetOptions{})
  195. if err == nil {
  196. // TODO(yue9944882): extract existing version from label and compare
  197. // TODO(yue9944882): create w/ version string attached
  198. wrongSpec, err := priorityLevelHasWrongSpec(expectedPriorityLevelConfiguration, actualPriorityLevelConfiguration)
  199. if err != nil {
  200. return fmt.Errorf("failed checking if mandatory PriorityLevelConfiguration %s is up-to-date due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  201. }
  202. if wrongSpec {
  203. if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Update(context.TODO(), expectedPriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil {
  204. return fmt.Errorf("failed upgrading mandatory PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  205. }
  206. klog.V(3).Infof("Updated mandatory PriorityLevelConfiguration %s because its spec was %#+v but must be %#+v", expectedPriorityLevelConfiguration.Name, actualPriorityLevelConfiguration.Spec, expectedPriorityLevelConfiguration.Spec)
  207. }
  208. continue
  209. }
  210. if !apierrors.IsNotFound(err) {
  211. return fmt.Errorf("failed getting PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  212. }
  213. _, err = flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), expectedPriorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager})
  214. if apierrors.IsAlreadyExists(err) {
  215. klog.V(3).Infof("Mandatory PriorityLevelConfiguration %s already exists, skipping creating", expectedPriorityLevelConfiguration.Name)
  216. continue
  217. }
  218. if err != nil {
  219. return fmt.Errorf("cannot create mandatory PriorityLevelConfiguration %s due to %v", expectedPriorityLevelConfiguration.Name, err)
  220. }
  221. klog.V(3).Infof("Created mandatory PriorityLevelConfiguration %s", expectedPriorityLevelConfiguration.Name)
  222. }
  223. return nil
  224. }
  225. func flowSchemaHasWrongSpec(expected, actual *flowcontrolv1alpha1.FlowSchema) (bool, error) {
  226. copiedExpectedFlowSchema := expected.DeepCopy()
  227. flowcontrolapisv1alpha1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
  228. return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec), nil
  229. }
  230. func priorityLevelHasWrongSpec(expected, actual *flowcontrolv1alpha1.PriorityLevelConfiguration) (bool, error) {
  231. copiedExpectedPriorityLevel := expected.DeepCopy()
  232. flowcontrolapisv1alpha1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
  233. return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec), nil
  234. }