123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- package framework
- import (
- "flag"
- "net"
- "net/http"
- "net/http/httptest"
- "path"
- "strconv"
- "time"
- "github.com/go-openapi/spec"
- "github.com/pborman/uuid"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/wait"
- authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator"
- "k8s.io/apiserver/pkg/authentication/authenticatorfactory"
- authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
- "k8s.io/apiserver/pkg/authentication/user"
- "k8s.io/apiserver/pkg/authorization/authorizer"
- "k8s.io/apiserver/pkg/authorization/authorizerfactory"
- authorizerunion "k8s.io/apiserver/pkg/authorization/union"
- openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
- genericapiserver "k8s.io/apiserver/pkg/server"
- "k8s.io/apiserver/pkg/server/options"
- serverstorage "k8s.io/apiserver/pkg/server/storage"
- "k8s.io/apiserver/pkg/storage/storagebackend"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- restclient "k8s.io/client-go/rest"
- "k8s.io/klog"
- openapicommon "k8s.io/kube-openapi/pkg/common"
- "k8s.io/kubernetes/pkg/api/legacyscheme"
- "k8s.io/kubernetes/pkg/generated/openapi"
- "k8s.io/kubernetes/pkg/kubeapiserver"
- kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
- "k8s.io/kubernetes/pkg/master"
- "k8s.io/kubernetes/pkg/version"
- )
- type Config struct {
-
- MasterConfig *master.Config
- StartReplicationManager bool
-
- QPS float32
-
- Burst int
-
- }
- type alwaysAllow struct{}
- func (alwaysAllow) Authorize(requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
- return authorizer.DecisionAllow, "always allow", nil
- }
- func alwaysEmpty(req *http.Request) (*authauthenticator.Response, bool, error) {
- return &authauthenticator.Response{
- User: &user.DefaultInfo{
- Name: "",
- },
- }, true, nil
- }
- type MasterReceiver interface {
- SetMaster(m *master.Master)
- }
- type MasterHolder struct {
- Initialized chan struct{}
- M *master.Master
- }
- func (h *MasterHolder) SetMaster(m *master.Master) {
- h.M = m
- close(h.Initialized)
- }
- func DefaultOpenAPIConfig() *openapicommon.Config {
- openAPIConfig := genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme))
- openAPIConfig.Info = &spec.Info{
- InfoProps: spec.InfoProps{
- Title: "Kubernetes",
- Version: "unversioned",
- },
- }
- openAPIConfig.DefaultResponse = &spec.Response{
- ResponseProps: spec.ResponseProps{
- Description: "Default Response.",
- },
- }
- openAPIConfig.GetDefinitions = openapi.GetOpenAPIDefinitions
- return openAPIConfig
- }
- func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
- var m *master.Master
- var s *httptest.Server
-
- v := flag.Lookup("v").Value
- level, _ := strconv.Atoi(v.String())
- if level < 4 {
- v.Set("4")
- }
- if incomingServer != nil {
- s = incomingServer
- } else {
- s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- m.GenericAPIServer.Handler.ServeHTTP(w, req)
- }))
- }
- stopCh := make(chan struct{})
- closeFn := func() {
- m.GenericAPIServer.RunPreShutdownHooks()
- close(stopCh)
- s.Close()
- }
- if masterConfig == nil {
- masterConfig = NewMasterConfig()
- masterConfig.GenericConfig.OpenAPIConfig = DefaultOpenAPIConfig()
- }
-
- if masterConfig.GenericConfig.LoopbackClientConfig == nil {
- masterConfig.GenericConfig.LoopbackClientConfig = &restclient.Config{QPS: 50, Burst: 100, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}}
- }
- masterConfig.GenericConfig.LoopbackClientConfig.Host = s.URL
- privilegedLoopbackToken := uuid.NewRandom().String()
-
- tokens := make(map[string]*user.DefaultInfo)
- tokens[privilegedLoopbackToken] = &user.DefaultInfo{
- Name: user.APIServerUser,
- UID: uuid.NewRandom().String(),
- Groups: []string{user.SystemPrivilegedGroup},
- }
- tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens)
- if masterConfig.GenericConfig.Authentication.Authenticator == nil {
- masterConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, authauthenticator.RequestFunc(alwaysEmpty))
- } else {
- masterConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, masterConfig.GenericConfig.Authentication.Authenticator)
- }
- if masterConfig.GenericConfig.Authorization.Authorizer != nil {
- tokenAuthorizer := authorizerfactory.NewPrivilegedGroups(user.SystemPrivilegedGroup)
- masterConfig.GenericConfig.Authorization.Authorizer = authorizerunion.New(tokenAuthorizer, masterConfig.GenericConfig.Authorization.Authorizer)
- } else {
- masterConfig.GenericConfig.Authorization.Authorizer = alwaysAllow{}
- }
- masterConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken
- clientset, err := clientset.NewForConfig(masterConfig.GenericConfig.LoopbackClientConfig)
- if err != nil {
- klog.Fatal(err)
- }
- masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
- m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate())
- if err != nil {
- closeFn()
- klog.Fatalf("error in bringing up the master: %v", err)
- }
- if masterReceiver != nil {
- masterReceiver.SetMaster(m)
- }
-
-
-
- m.GenericAPIServer.PrepareRun()
- m.GenericAPIServer.RunPostStartHooks(stopCh)
- cfg := *masterConfig.GenericConfig.LoopbackClientConfig
- cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
- privilegedClient, err := restclient.RESTClientFor(&cfg)
- if err != nil {
- closeFn()
- klog.Fatal(err)
- }
- var lastHealthContent []byte
- err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
- result := privilegedClient.Get().AbsPath("/healthz").Do()
- status := 0
- result.StatusCode(&status)
- if status == 200 {
- return true, nil
- }
- lastHealthContent, _ = result.Raw()
- return false, nil
- })
- if err != nil {
- closeFn()
- klog.Errorf("last health content: %q", string(lastHealthContent))
- klog.Fatal(err)
- }
- return m, s, closeFn
- }
- func NewIntegrationTestMasterConfig() *master.Config {
- return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{})
- }
- func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
- masterConfig := NewMasterConfigWithOptions(opts)
- masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
- masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
-
- masterConfig.GenericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
- return masterConfig
- }
- type MasterConfigOptions struct {
- EtcdOptions *options.EtcdOptions
- }
- func DefaultEtcdOptions() *options.EtcdOptions {
-
-
-
- etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
- etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
- return etcdOptions
- }
- func NewMasterConfig() *master.Config {
- return NewMasterConfigWithOptions(&MasterConfigOptions{})
- }
- func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
- etcdOptions := DefaultEtcdOptions()
- if opts.EtcdOptions != nil {
- etcdOptions = opts.EtcdOptions
- }
- storageConfig := kubeapiserver.NewStorageFactoryConfig()
- storageConfig.ApiResourceConfig = serverstorage.NewResourceConfig()
- completedStorageConfig, err := storageConfig.Complete(etcdOptions)
- if err != nil {
- panic(err)
- }
- storageFactory, err := completedStorageConfig.New()
- if err != nil {
- panic(err)
- }
- genericConfig := genericapiserver.NewConfig(legacyscheme.Codecs)
- kubeVersion := version.Get()
- genericConfig.Version = &kubeVersion
- genericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
-
- genericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
- err = etcdOptions.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
- if err != nil {
- panic(err)
- }
- return &master.Config{
- GenericConfig: genericConfig,
- ExtraConfig: master.ExtraConfig{
- APIResourceConfigSource: master.DefaultAPIResourceConfigSource(),
- StorageFactory: storageFactory,
- KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250},
- APIServerServicePort: 443,
- MasterCount: 1,
- },
- }
- }
- type CloseFunc func()
- // RunAMaster starts a master with the provided config.
- func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server, CloseFunc) {
- if masterConfig == nil {
- masterConfig = NewMasterConfig()
- masterConfig.GenericConfig.EnableProfiling = true
- }
- return startMasterOrDie(masterConfig, nil, nil)
- }
- func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server, CloseFunc) {
- return startMasterOrDie(masterConfig, s, masterReceiver)
- }
- func SharedEtcd() *storagebackend.Config {
- cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New(), "registry"), nil)
- cfg.Transport.ServerList = []string{GetEtcdURL()}
- return cfg
- }
- type fakeLocalhost443Listener struct{}
- func (fakeLocalhost443Listener) Accept() (net.Conn, error) {
- return nil, nil
- }
- func (fakeLocalhost443Listener) Close() error {
- return nil
- }
- func (fakeLocalhost443Listener) Addr() net.Addr {
- return &net.TCPAddr{
- IP: net.IPv4(127, 0, 0, 1),
- Port: 443,
- }
- }
|