master_utils.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "context"
  16. "flag"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "path"
  21. "strconv"
  22. "time"
  23. "github.com/go-openapi/spec"
  24. "github.com/google/uuid"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator"
  28. "k8s.io/apiserver/pkg/authentication/authenticatorfactory"
  29. authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
  30. "k8s.io/apiserver/pkg/authentication/user"
  31. "k8s.io/apiserver/pkg/authorization/authorizer"
  32. "k8s.io/apiserver/pkg/authorization/authorizerfactory"
  33. authorizerunion "k8s.io/apiserver/pkg/authorization/union"
  34. openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
  35. genericapiserver "k8s.io/apiserver/pkg/server"
  36. "k8s.io/apiserver/pkg/server/options"
  37. serverstorage "k8s.io/apiserver/pkg/server/storage"
  38. "k8s.io/apiserver/pkg/storage/storagebackend"
  39. "k8s.io/client-go/informers"
  40. clientset "k8s.io/client-go/kubernetes"
  41. restclient "k8s.io/client-go/rest"
  42. "k8s.io/component-base/version"
  43. "k8s.io/klog"
  44. openapicommon "k8s.io/kube-openapi/pkg/common"
  45. "k8s.io/kubernetes/pkg/api/legacyscheme"
  46. "k8s.io/kubernetes/pkg/generated/openapi"
  47. "k8s.io/kubernetes/pkg/kubeapiserver"
  48. kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
  49. "k8s.io/kubernetes/pkg/master"
  50. )
  51. // Config is a struct of configuration directives for NewMasterComponents.
  52. type Config struct {
  53. // If nil, a default is used, partially filled configs will not get populated.
  54. MasterConfig *master.Config
  55. StartReplicationManager bool
  56. // Client throttling qps
  57. QPS float32
  58. // Client burst qps, also burst replicas allowed in rc manager
  59. Burst int
  60. // TODO: Add configs for endpoints controller, scheduler etc
  61. }
  62. // alwaysAllow always allows an action
  63. type alwaysAllow struct{}
  64. func (alwaysAllow) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
  65. return authorizer.DecisionAllow, "always allow", nil
  66. }
  67. // alwaysEmpty simulates "no authentication" for old tests
  68. func alwaysEmpty(req *http.Request) (*authauthenticator.Response, bool, error) {
  69. return &authauthenticator.Response{
  70. User: &user.DefaultInfo{
  71. Name: "",
  72. },
  73. }, true, nil
  74. }
  75. // MasterReceiver can be used to provide the master to a custom incoming server function
  76. type MasterReceiver interface {
  77. SetMaster(m *master.Master)
  78. }
  79. // MasterHolder implements
  80. type MasterHolder struct {
  81. Initialized chan struct{}
  82. M *master.Master
  83. }
  84. // SetMaster assigns the current master.
  85. func (h *MasterHolder) SetMaster(m *master.Master) {
  86. h.M = m
  87. close(h.Initialized)
  88. }
  89. // DefaultOpenAPIConfig returns an openapicommon.Config initialized to default values.
  90. func DefaultOpenAPIConfig() *openapicommon.Config {
  91. openAPIConfig := genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme))
  92. openAPIConfig.Info = &spec.Info{
  93. InfoProps: spec.InfoProps{
  94. Title: "Kubernetes",
  95. Version: "unversioned",
  96. },
  97. }
  98. openAPIConfig.DefaultResponse = &spec.Response{
  99. ResponseProps: spec.ResponseProps{
  100. Description: "Default Response.",
  101. },
  102. }
  103. openAPIConfig.GetDefinitions = openapi.GetOpenAPIDefinitions
  104. return openAPIConfig
  105. }
  106. // startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
  107. func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
  108. var m *master.Master
  109. var s *httptest.Server
  110. // Ensure we log at least level 4
  111. v := flag.Lookup("v").Value
  112. level, _ := strconv.Atoi(v.String())
  113. if level < 4 {
  114. v.Set("4")
  115. }
  116. if incomingServer != nil {
  117. s = incomingServer
  118. } else {
  119. s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  120. m.GenericAPIServer.Handler.ServeHTTP(w, req)
  121. }))
  122. }
  123. stopCh := make(chan struct{})
  124. closeFn := func() {
  125. if m != nil {
  126. m.GenericAPIServer.RunPreShutdownHooks()
  127. }
  128. close(stopCh)
  129. s.Close()
  130. }
  131. if masterConfig == nil {
  132. masterConfig = NewMasterConfig()
  133. masterConfig.GenericConfig.OpenAPIConfig = DefaultOpenAPIConfig()
  134. }
  135. // set the loopback client config
  136. if masterConfig.GenericConfig.LoopbackClientConfig == nil {
  137. masterConfig.GenericConfig.LoopbackClientConfig = &restclient.Config{QPS: 50, Burst: 100, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}}
  138. }
  139. masterConfig.GenericConfig.LoopbackClientConfig.Host = s.URL
  140. privilegedLoopbackToken := uuid.New().String()
  141. // wrap any available authorizer
  142. tokens := make(map[string]*user.DefaultInfo)
  143. tokens[privilegedLoopbackToken] = &user.DefaultInfo{
  144. Name: user.APIServerUser,
  145. UID: uuid.New().String(),
  146. Groups: []string{user.SystemPrivilegedGroup},
  147. }
  148. tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens)
  149. if masterConfig.GenericConfig.Authentication.Authenticator == nil {
  150. masterConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, authauthenticator.RequestFunc(alwaysEmpty))
  151. } else {
  152. masterConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, masterConfig.GenericConfig.Authentication.Authenticator)
  153. }
  154. if masterConfig.GenericConfig.Authorization.Authorizer != nil {
  155. tokenAuthorizer := authorizerfactory.NewPrivilegedGroups(user.SystemPrivilegedGroup)
  156. masterConfig.GenericConfig.Authorization.Authorizer = authorizerunion.New(tokenAuthorizer, masterConfig.GenericConfig.Authorization.Authorizer)
  157. } else {
  158. masterConfig.GenericConfig.Authorization.Authorizer = alwaysAllow{}
  159. }
  160. masterConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken
  161. clientset, err := clientset.NewForConfig(masterConfig.GenericConfig.LoopbackClientConfig)
  162. if err != nil {
  163. klog.Fatal(err)
  164. }
  165. masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
  166. m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate())
  167. if err != nil {
  168. // We log the error first so that even if closeFn crashes, the error is shown
  169. klog.Errorf("error in bringing up the master: %v", err)
  170. closeFn()
  171. klog.Fatalf("error in bringing up the master: %v", err)
  172. }
  173. if masterReceiver != nil {
  174. masterReceiver.SetMaster(m)
  175. }
  176. // TODO have this start method actually use the normal start sequence for the API server
  177. // this method never actually calls the `Run` method for the API server
  178. // fire the post hooks ourselves
  179. m.GenericAPIServer.PrepareRun()
  180. m.GenericAPIServer.RunPostStartHooks(stopCh)
  181. cfg := *masterConfig.GenericConfig.LoopbackClientConfig
  182. cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
  183. privilegedClient, err := restclient.RESTClientFor(&cfg)
  184. if err != nil {
  185. closeFn()
  186. klog.Fatal(err)
  187. }
  188. var lastHealthContent []byte
  189. err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
  190. result := privilegedClient.Get().AbsPath("/healthz").Do(context.TODO())
  191. status := 0
  192. result.StatusCode(&status)
  193. if status == 200 {
  194. return true, nil
  195. }
  196. lastHealthContent, _ = result.Raw()
  197. return false, nil
  198. })
  199. if err != nil {
  200. closeFn()
  201. klog.Errorf("last health content: %q", string(lastHealthContent))
  202. klog.Fatal(err)
  203. }
  204. return m, s, closeFn
  205. }
  206. // NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests.
  207. func NewIntegrationTestMasterConfig() *master.Config {
  208. return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{})
  209. }
  210. // NewIntegrationTestMasterConfigWithOptions returns the master config appropriate for most integration tests
  211. // configured with the provided options.
  212. func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
  213. masterConfig := NewMasterConfigWithOptions(opts)
  214. masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
  215. masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
  216. // TODO: get rid of these tests or port them to secure serving
  217. masterConfig.GenericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
  218. return masterConfig
  219. }
  220. // MasterConfigOptions are the configurable options for a new integration test master config.
  221. type MasterConfigOptions struct {
  222. EtcdOptions *options.EtcdOptions
  223. }
  224. // DefaultEtcdOptions are the default EtcdOptions for use with integration tests.
  225. func DefaultEtcdOptions() *options.EtcdOptions {
  226. // This causes the integration tests to exercise the etcd
  227. // prefix code, so please don't change without ensuring
  228. // sufficient coverage in other ways.
  229. etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New().String(), nil))
  230. etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
  231. return etcdOptions
  232. }
  233. // NewMasterConfig returns a basic master config.
  234. func NewMasterConfig() *master.Config {
  235. return NewMasterConfigWithOptions(&MasterConfigOptions{})
  236. }
  237. // NewMasterConfigWithOptions returns a basic master config configured with the provided options.
  238. func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
  239. etcdOptions := DefaultEtcdOptions()
  240. if opts.EtcdOptions != nil {
  241. etcdOptions = opts.EtcdOptions
  242. }
  243. storageConfig := kubeapiserver.NewStorageFactoryConfig()
  244. storageConfig.APIResourceConfig = serverstorage.NewResourceConfig()
  245. completedStorageConfig, err := storageConfig.Complete(etcdOptions)
  246. if err != nil {
  247. panic(err)
  248. }
  249. storageFactory, err := completedStorageConfig.New()
  250. if err != nil {
  251. panic(err)
  252. }
  253. genericConfig := genericapiserver.NewConfig(legacyscheme.Codecs)
  254. kubeVersion := version.Get()
  255. genericConfig.Version = &kubeVersion
  256. genericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
  257. // TODO: get rid of these tests or port them to secure serving
  258. genericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
  259. err = etcdOptions.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
  260. if err != nil {
  261. panic(err)
  262. }
  263. return &master.Config{
  264. GenericConfig: genericConfig,
  265. ExtraConfig: master.ExtraConfig{
  266. APIResourceConfigSource: master.DefaultAPIResourceConfigSource(),
  267. StorageFactory: storageFactory,
  268. KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250},
  269. APIServerServicePort: 443,
  270. MasterCount: 1,
  271. },
  272. }
  273. }
  274. // CloseFunc can be called to cleanup the master
  275. type CloseFunc func()
  276. // RunAMaster starts a master with the provided config.
  277. func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server, CloseFunc) {
  278. if masterConfig == nil {
  279. masterConfig = NewMasterConfig()
  280. masterConfig.GenericConfig.EnableProfiling = true
  281. }
  282. return startMasterOrDie(masterConfig, nil, nil)
  283. }
  284. // RunAMasterUsingServer starts up a master using the provided config on the specified server.
  285. func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
  286. return startMasterOrDie(masterConfig, s, masterReceiver)
  287. }
  288. // SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
  289. func SharedEtcd() *storagebackend.Config {
  290. cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New().String(), "registry"), nil)
  291. cfg.Transport.ServerList = []string{GetEtcdURL()}
  292. return cfg
  293. }
  294. type fakeLocalhost443Listener struct{}
  295. func (fakeLocalhost443Listener) Accept() (net.Conn, error) {
  296. return nil, nil
  297. }
  298. func (fakeLocalhost443Listener) Close() error {
  299. return nil
  300. }
  301. func (fakeLocalhost443Listener) Addr() net.Addr {
  302. return &net.TCPAddr{
  303. IP: net.IPv4(127, 0, 0, 1),
  304. Port: 443,
  305. }
  306. }