storage_flowcontrol.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rest
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
  19. "k8s.io/apimachinery/pkg/api/equality"
  20. apierrors "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
  24. "k8s.io/apiserver/pkg/registry/generic"
  25. "k8s.io/apiserver/pkg/registry/rest"
  26. genericapiserver "k8s.io/apiserver/pkg/server"
  27. serverstorage "k8s.io/apiserver/pkg/server/storage"
  28. flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
  29. "k8s.io/klog"
  30. "k8s.io/kubernetes/pkg/api/legacyscheme"
  31. "k8s.io/kubernetes/pkg/apis/flowcontrol"
  32. flowcontrolapisv1alpha1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1alpha1"
  33. flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
  34. prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
  35. )
  36. var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
  37. // RESTStorageProvider is a provider of REST storage
  38. type RESTStorageProvider struct{}
  39. // PostStartHookName is the name of the post-start-hook provided by flow-control storage
  40. const PostStartHookName = "apiserver/bootstrap-system-flowcontrol-configuration"
  41. // NewRESTStorage creates a new rest storage for flow-control api models.
  42. func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
  43. apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(flowcontrol.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
  44. if apiResourceConfigSource.VersionEnabled(flowcontrolv1alpha1.SchemeGroupVersion) {
  45. flowControlStorage, err := p.v1alpha1Storage(apiResourceConfigSource, restOptionsGetter)
  46. if err != nil {
  47. return genericapiserver.APIGroupInfo{}, false, err
  48. }
  49. apiGroupInfo.VersionedResourcesStorageMap[flowcontrolv1alpha1.SchemeGroupVersion.Version] = flowControlStorage
  50. }
  51. return apiGroupInfo, true, nil
  52. }
  53. func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
  54. storage := map[string]rest.Storage{}
  55. // flow-schema
  56. flowSchemaStorage, flowSchemaStatusStorage, err := flowschemastore.NewREST(restOptionsGetter)
  57. if err != nil {
  58. return nil, err
  59. }
  60. storage["flowschemas"] = flowSchemaStorage
  61. storage["flowschemas/status"] = flowSchemaStatusStorage
  62. // priority-level-configuration
  63. priorityLevelConfigurationStorage, priorityLevelConfigurationStatusStorage, err := prioritylevelconfigurationstore.NewREST(restOptionsGetter)
  64. if err != nil {
  65. return nil, err
  66. }
  67. storage["prioritylevelconfigurations"] = priorityLevelConfigurationStorage
  68. storage["prioritylevelconfigurations/status"] = priorityLevelConfigurationStatusStorage
  69. return storage, nil
  70. }
  71. // GroupName returns group name of the storage
  72. func (p RESTStorageProvider) GroupName() string {
  73. return flowcontrol.GroupName
  74. }
  75. func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
  76. return PostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
  77. flowcontrolClientSet := flowcontrolclient.NewForConfigOrDie(hookContext.LoopbackClientConfig)
  78. go func() {
  79. const retryCreatingSuggestedSettingsInterval = time.Second
  80. _ = wait.PollImmediateUntil(
  81. retryCreatingSuggestedSettingsInterval,
  82. func() (bool, error) {
  83. shouldEnsureSuggested, err := lastMandatoryExists(flowcontrolClientSet)
  84. if err != nil {
  85. klog.Errorf("failed getting exempt flow-schema, will retry later: %v", err)
  86. return false, nil
  87. }
  88. if !shouldEnsureSuggested {
  89. return true, nil
  90. }
  91. err = ensure(
  92. flowcontrolClientSet,
  93. flowcontrolbootstrap.SuggestedFlowSchemas,
  94. flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
  95. if err != nil {
  96. klog.Errorf("failed ensuring suggested settings, will retry later: %v", err)
  97. return false, nil
  98. }
  99. return true, nil
  100. },
  101. hookContext.StopCh)
  102. const retryCreatingMandatorySettingsInterval = time.Minute
  103. _ = wait.PollImmediateUntil(
  104. retryCreatingMandatorySettingsInterval,
  105. func() (bool, error) {
  106. if err := upgrade(
  107. flowcontrolClientSet,
  108. flowcontrolbootstrap.MandatoryFlowSchemas,
  109. // Note: the "exempt" priority-level is supposed tobe the last item in the pre-defined
  110. // list, so that a crash in the midst of the first kube-apiserver startup does not prevent
  111. // the full initial set of objects from being created.
  112. flowcontrolbootstrap.MandatoryPriorityLevelConfigurations,
  113. ); err != nil {
  114. klog.Errorf("failed creating mandatory flowcontrol settings: %v", err)
  115. return false, nil
  116. }
  117. return false, nil // always retry
  118. },
  119. hookContext.StopCh)
  120. }()
  121. return nil
  122. }, nil
  123. }
  124. // Returns false if there's a "exempt" priority-level existing in the cluster, otherwise returns a true
  125. // if the "exempt" priority-level is not found.
  126. func lastMandatoryExists(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface) (bool, error) {
  127. if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil {
  128. if apierrors.IsNotFound(err) {
  129. return true, nil
  130. }
  131. return false, err
  132. }
  133. return false, nil
  134. }
  135. func ensure(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
  136. for _, flowSchema := range flowSchemas {
  137. _, err := flowcontrolClientSet.FlowSchemas().Create(context.TODO(), flowSchema, metav1.CreateOptions{})
  138. if apierrors.IsAlreadyExists(err) {
  139. klog.V(3).Infof("system preset FlowSchema %s already exists, skipping creating", flowSchema.Name)
  140. continue
  141. }
  142. if err != nil {
  143. return fmt.Errorf("cannot create FlowSchema %s due to %v", flowSchema.Name, err)
  144. }
  145. klog.V(3).Infof("created system preset FlowSchema %s", flowSchema.Name)
  146. }
  147. for _, priorityLevelConfiguration := range priorityLevels {
  148. _, err := flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), priorityLevelConfiguration, metav1.CreateOptions{})
  149. if apierrors.IsAlreadyExists(err) {
  150. klog.V(3).Infof("system preset PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name)
  151. continue
  152. }
  153. if err != nil {
  154. return fmt.Errorf("cannot create PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err)
  155. }
  156. klog.V(3).Infof("created system preset PriorityLevelConfiguration %s", priorityLevelConfiguration.Name)
  157. }
  158. return nil
  159. }
  160. func upgrade(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
  161. for _, expectedFlowSchema := range flowSchemas {
  162. actualFlowSchema, err := flowcontrolClientSet.FlowSchemas().Get(context.TODO(), expectedFlowSchema.Name, metav1.GetOptions{})
  163. if err == nil {
  164. // TODO(yue9944882): extract existing version from label and compare
  165. // TODO(yue9944882): create w/ version string attached
  166. identical, err := flowSchemaHasWrongSpec(expectedFlowSchema, actualFlowSchema)
  167. if err != nil {
  168. return fmt.Errorf("failed checking if mandatory FlowSchema %s is up-to-date due to %v, will retry later", expectedFlowSchema.Name, err)
  169. }
  170. if !identical {
  171. if _, err := flowcontrolClientSet.FlowSchemas().Update(context.TODO(), expectedFlowSchema, metav1.UpdateOptions{}); err != nil {
  172. return fmt.Errorf("failed upgrading mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
  173. }
  174. }
  175. continue
  176. }
  177. if !apierrors.IsNotFound(err) {
  178. return fmt.Errorf("failed getting FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
  179. }
  180. _, err = flowcontrolClientSet.FlowSchemas().Create(context.TODO(), expectedFlowSchema, metav1.CreateOptions{})
  181. if apierrors.IsAlreadyExists(err) {
  182. klog.V(3).Infof("system preset FlowSchema %s already exists, skipping creating", expectedFlowSchema.Name)
  183. continue
  184. }
  185. if err != nil {
  186. return fmt.Errorf("cannot create FlowSchema %s due to %v", expectedFlowSchema.Name, err)
  187. }
  188. klog.V(3).Infof("created system preset FlowSchema %s", expectedFlowSchema.Name)
  189. }
  190. for _, expectedPriorityLevelConfiguration := range priorityLevels {
  191. actualPriorityLevelConfiguration, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), expectedPriorityLevelConfiguration.Name, metav1.GetOptions{})
  192. if err == nil {
  193. // TODO(yue9944882): extract existing version from label and compare
  194. // TODO(yue9944882): create w/ version string attached
  195. identical, err := priorityLevelHasWrongSpec(expectedPriorityLevelConfiguration, actualPriorityLevelConfiguration)
  196. if err != nil {
  197. return fmt.Errorf("failed checking if mandatory PriorityLevelConfiguration %s is up-to-date due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  198. }
  199. if !identical {
  200. if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Update(context.TODO(), expectedPriorityLevelConfiguration, metav1.UpdateOptions{}); err != nil {
  201. return fmt.Errorf("failed upgrading mandatory PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  202. }
  203. }
  204. continue
  205. }
  206. if !apierrors.IsNotFound(err) {
  207. return fmt.Errorf("failed getting PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
  208. }
  209. _, err = flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), expectedPriorityLevelConfiguration, metav1.CreateOptions{})
  210. if apierrors.IsAlreadyExists(err) {
  211. klog.V(3).Infof("system preset PriorityLevelConfiguration %s already exists, skipping creating", expectedPriorityLevelConfiguration.Name)
  212. continue
  213. }
  214. if err != nil {
  215. return fmt.Errorf("cannot create PriorityLevelConfiguration %s due to %v", expectedPriorityLevelConfiguration.Name, err)
  216. }
  217. klog.V(3).Infof("created system preset PriorityLevelConfiguration %s", expectedPriorityLevelConfiguration.Name)
  218. }
  219. return nil
  220. }
  221. func flowSchemaHasWrongSpec(expected, actual *flowcontrolv1alpha1.FlowSchema) (bool, error) {
  222. copiedExpectedFlowSchema := expected.DeepCopy()
  223. flowcontrolapisv1alpha1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
  224. return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec), nil
  225. }
  226. func priorityLevelHasWrongSpec(expected, actual *flowcontrolv1alpha1.PriorityLevelConfiguration) (bool, error) {
  227. copiedExpectedPriorityLevel := expected.DeepCopy()
  228. flowcontrolapisv1alpha1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
  229. return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec), nil
  230. }