123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package apiserver
- import (
- "context"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "os"
- "path"
- "reflect"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apiserver/pkg/server/dynamiccertificates"
- genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
- "k8s.io/client-go/discovery"
- client "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
- "k8s.io/client-go/util/cert"
- apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
- aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
- kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
- "k8s.io/kubernetes/test/integration/framework"
- wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
- wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
- sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
- )
- func TestAggregatedAPIServer(t *testing.T) {
- // makes the kube-apiserver very responsive. it's normally a minute
- dynamiccertificates.FileRefreshDuration = 1 * time.Second
- stopCh := make(chan struct{})
- defer close(stopCh)
- testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true}, nil, framework.SharedEtcd())
- defer testServer.TearDownFn()
- kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
- // force json because everything speaks it
- kubeClientConfig.ContentType = ""
- kubeClientConfig.AcceptContentTypes = ""
- kubeClient := client.NewForConfigOrDie(kubeClientConfig)
- aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
- // start the wardle server to prove we can aggregate it
- wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig))
- defer os.Remove(wardleToKASKubeConfigFile)
- wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
- defer os.RemoveAll(wardleCertDir)
- listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
- o.RecommendedOptions.SecureServing.Listener = listener
- o.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
- wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
- wardleCmd.SetArgs([]string{
- "--authentication-kubeconfig", wardleToKASKubeConfigFile,
- "--authorization-kubeconfig", wardleToKASKubeConfigFile,
- "--etcd-servers", framework.GetEtcdURL(),
- "--cert-dir", wardleCertDir,
- "--kubeconfig", wardleToKASKubeConfigFile,
- })
- if err := wardleCmd.Execute(); err != nil {
- t.Fatal(err)
- }
- }()
- directWardleClientConfig, err := waitForWardleRunning(t, kubeClientConfig, wardleCertDir, wardlePort)
- if err != nil {
- t.Fatal(err)
- }
- // now we're finally ready to test. These are what's run by default now
- wardleClient, err := client.NewForConfig(directWardleClientConfig)
- if err != nil {
- t.Fatal(err)
- }
- testAPIGroupList(t, wardleClient.Discovery().RESTClient())
- testAPIGroup(t, wardleClient.Discovery().RESTClient())
- testAPIResourceList(t, wardleClient.Discovery().RESTClient())
- wardleCA, err := ioutil.ReadFile(directWardleClientConfig.CAFile)
- if err != nil {
- t.Fatal(err)
- }
- _, err = aggregatorClient.ApiregistrationV1beta1().APIServices().Create(context.TODO(), &apiregistrationv1beta1.APIService{
- ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
- Spec: apiregistrationv1beta1.APIServiceSpec{
- Service: &apiregistrationv1beta1.ServiceReference{
- Namespace: "kube-wardle",
- Name: "api",
- },
- Group: "wardle.example.com",
- Version: "v1alpha1",
- CABundle: wardleCA,
- GroupPriorityMinimum: 200,
- VersionPriority: 200,
- },
- }, metav1.CreateOptions{})
- if err != nil {
- t.Fatal(err)
- }
- // wait for the unavailable API service to be processed with updated status
- err = wait.Poll(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
- _, err = kubeClient.Discovery().ServerResources()
- hasExpectedError := checkWardleUnavailableDiscoveryError(t, err)
- return hasExpectedError, nil
- })
- if err != nil {
- t.Fatal(err)
- }
- // TODO figure out how to turn on enough of services and dns to run more
- // Now we want to verify that the client CA bundles properly reflect the values for the cluster-authentication
- firstKubeCANames, err := cert.GetClientCANamesForURL(kubeClientConfig.Host)
- if err != nil {
- t.Fatal(err)
- }
- t.Log(firstKubeCANames)
- firstWardleCANames, err := cert.GetClientCANamesForURL(directWardleClientConfig.Host)
- if err != nil {
- t.Fatal(err)
- }
- t.Log(firstWardleCANames)
- if !reflect.DeepEqual(firstKubeCANames, firstWardleCANames) {
- t.Fatal("names don't match")
- }
- // now we update the client-ca nd request-header-client-ca-file and the kas will consume it, update the configmap
- // and then the wardle server will detect and update too.
- if err := ioutil.WriteFile(path.Join(testServer.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil {
- t.Fatal(err)
- }
- if err := ioutil.WriteFile(path.Join(testServer.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil {
- t.Fatal(err)
- }
- // wait for it to be picked up. there's a test in certreload_test.go that ensure this works
- time.Sleep(4 * time.Second)
- // Now we want to verify that the client CA bundles properly updated to reflect the new values written for the kube-apiserver
- secondKubeCANames, err := cert.GetClientCANamesForURL(kubeClientConfig.Host)
- if err != nil {
- t.Fatal(err)
- }
- t.Log(secondKubeCANames)
- for i := range firstKubeCANames {
- if firstKubeCANames[i] == secondKubeCANames[i] {
- t.Errorf("ca bundles should change")
- }
- }
- secondWardleCANames, err := cert.GetClientCANamesForURL(directWardleClientConfig.Host)
- if err != nil {
- t.Fatal(err)
- }
- t.Log(secondWardleCANames)
- // second wardle should contain all the certs, first and last
- numMatches := 0
- for _, needle := range firstKubeCANames {
- for _, haystack := range secondWardleCANames {
- if needle == haystack {
- numMatches++
- break
- }
- }
- }
- for _, needle := range secondKubeCANames {
- for _, haystack := range secondWardleCANames {
- if needle == haystack {
- numMatches++
- break
- }
- }
- }
- if numMatches != 4 {
- t.Fatal("names don't match")
- }
- }
- func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
- directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig))
- directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
- directWardleClientConfig.CAData = nil
- directWardleClientConfig.ServerName = ""
- directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken
- var wardleClient client.Interface
- lastHealthContent := []byte{}
- var lastHealthErr error
- err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
- if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created
- lastHealthErr = err
- return false, nil
- }
- directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
- wardleClient, err = client.NewForConfig(directWardleClientConfig)
- if err != nil {
- // this happens because we race the API server start
- t.Log(err)
- return false, nil
- }
- healthStatus := 0
- result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus)
- lastHealthContent, lastHealthErr = result.Raw()
- if healthStatus != http.StatusOK {
- return false, nil
- }
- return true, nil
- })
- if err != nil {
- t.Log(string(lastHealthContent))
- t.Log(lastHealthErr)
- return nil, err
- }
- return directWardleClientConfig, nil
- }
- func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string {
- // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
- // the loopback client config uses a loopback cert with different SNI. We need to use the "real"
- // cert, so we'll hope we aren't hacked during a unit test and instead load it from the server we started.
- wardleToKASKubeClientConfig := rest.CopyConfig(kubeClientConfig)
- servingCerts, _, err := cert.GetServingCertificatesForURL(wardleToKASKubeClientConfig.Host, "")
- if err != nil {
- t.Fatal(err)
- }
- encodedServing, err := cert.EncodeCertificates(servingCerts...)
- if err != nil {
- t.Fatal(err)
- }
- wardleToKASKubeClientConfig.CAData = encodedServing
- for _, v := range servingCerts {
- t.Logf("Client: Server public key is %v\n", dynamiccertificates.GetHumanCertDetail(v))
- }
- certs, err := cert.ParseCertsPEM(wardleToKASKubeClientConfig.CAData)
- if err != nil {
- t.Fatal(err)
- }
- for _, curr := range certs {
- t.Logf("CA bundle %v\n", dynamiccertificates.GetHumanCertDetail(curr))
- }
- adminKubeConfig := createKubeConfig(wardleToKASKubeClientConfig)
- wardleToKASKubeConfigFile, _ := ioutil.TempFile("", "")
- if err := clientcmd.WriteToFile(*adminKubeConfig, wardleToKASKubeConfigFile.Name()); err != nil {
- t.Fatal(err)
- }
- return wardleToKASKubeConfigFile.Name()
- }
- func checkWardleUnavailableDiscoveryError(t *testing.T, err error) bool {
- if err == nil {
- t.Log("Discovery call expected to return failed unavailable service")
- return false
- }
- if !discovery.IsGroupDiscoveryFailedError(err) {
- t.Logf("Unexpected error: %T, %v", err, err)
- return false
- }
- discoveryErr := err.(*discovery.ErrGroupDiscoveryFailed)
- if len(discoveryErr.Groups) != 1 {
- t.Logf("Unexpected failed groups: %v", err)
- return false
- }
- groupVersion := schema.GroupVersion{Group: "wardle.example.com", Version: "v1alpha1"}
- groupVersionErr, ok := discoveryErr.Groups[groupVersion]
- if !ok {
- t.Logf("Unexpected failed group version: %v", err)
- return false
- }
- if !apierrors.IsServiceUnavailable(groupVersionErr) {
- t.Logf("Unexpected failed group version error: %v", err)
- return false
- }
- return true
- }
- func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
- clusterNick := "cluster"
- userNick := "user"
- contextNick := "context"
- config := clientcmdapi.NewConfig()
- credentials := clientcmdapi.NewAuthInfo()
- credentials.Token = clientCfg.BearerToken
- credentials.ClientCertificate = clientCfg.TLSClientConfig.CertFile
- if len(credentials.ClientCertificate) == 0 {
- credentials.ClientCertificateData = clientCfg.TLSClientConfig.CertData
- }
- credentials.ClientKey = clientCfg.TLSClientConfig.KeyFile
- if len(credentials.ClientKey) == 0 {
- credentials.ClientKeyData = clientCfg.TLSClientConfig.KeyData
- }
- config.AuthInfos[userNick] = credentials
- cluster := clientcmdapi.NewCluster()
- cluster.Server = clientCfg.Host
- cluster.CertificateAuthority = clientCfg.CAFile
- if len(cluster.CertificateAuthority) == 0 {
- cluster.CertificateAuthorityData = clientCfg.CAData
- }
- cluster.InsecureSkipTLSVerify = clientCfg.Insecure
- config.Clusters[clusterNick] = cluster
- context := clientcmdapi.NewContext()
- context.Cluster = clusterNick
- context.AuthInfo = userNick
- config.Contexts[contextNick] = context
- config.CurrentContext = contextNick
- return config
- }
- func readResponse(client rest.Interface, location string) ([]byte, error) {
- return client.Get().AbsPath(location).DoRaw(context.TODO())
- }
- func testAPIGroupList(t *testing.T, client rest.Interface) {
- contents, err := readResponse(client, "/apis")
- if err != nil {
- t.Fatalf("%v", err)
- }
- t.Log(string(contents))
- var apiGroupList metav1.APIGroupList
- err = json.Unmarshal(contents, &apiGroupList)
- if err != nil {
- t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis", err)
- }
- assert.Equal(t, 1, len(apiGroupList.Groups))
- assert.Equal(t, wardlev1alpha1.GroupName, apiGroupList.Groups[0].Name)
- assert.Equal(t, 2, len(apiGroupList.Groups[0].Versions))
- v1alpha1 := metav1.GroupVersionForDiscovery{
- GroupVersion: wardlev1alpha1.SchemeGroupVersion.String(),
- Version: wardlev1alpha1.SchemeGroupVersion.Version,
- }
- v1beta1 := metav1.GroupVersionForDiscovery{
- GroupVersion: wardlev1beta1.SchemeGroupVersion.String(),
- Version: wardlev1beta1.SchemeGroupVersion.Version,
- }
- assert.Equal(t, v1beta1, apiGroupList.Groups[0].Versions[0])
- assert.Equal(t, v1alpha1, apiGroupList.Groups[0].Versions[1])
- assert.Equal(t, v1beta1, apiGroupList.Groups[0].PreferredVersion)
- }
- func testAPIGroup(t *testing.T, client rest.Interface) {
- contents, err := readResponse(client, "/apis/wardle.example.com")
- if err != nil {
- t.Fatalf("%v", err)
- }
- t.Log(string(contents))
- var apiGroup metav1.APIGroup
- err = json.Unmarshal(contents, &apiGroup)
- if err != nil {
- t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.example.com", err)
- }
- assert.Equal(t, wardlev1alpha1.SchemeGroupVersion.Group, apiGroup.Name)
- assert.Equal(t, 2, len(apiGroup.Versions))
- assert.Equal(t, wardlev1alpha1.SchemeGroupVersion.String(), apiGroup.Versions[1].GroupVersion)
- assert.Equal(t, wardlev1alpha1.SchemeGroupVersion.Version, apiGroup.Versions[1].Version)
- assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0])
- }
- func testAPIResourceList(t *testing.T, client rest.Interface) {
- contents, err := readResponse(client, "/apis/wardle.example.com/v1alpha1")
- if err != nil {
- t.Fatalf("%v", err)
- }
- t.Log(string(contents))
- var apiResourceList metav1.APIResourceList
- err = json.Unmarshal(contents, &apiResourceList)
- if err != nil {
- t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.example.com/v1alpha1", err)
- }
- assert.Equal(t, wardlev1alpha1.SchemeGroupVersion.String(), apiResourceList.GroupVersion)
- assert.Equal(t, 2, len(apiResourceList.APIResources))
- assert.Equal(t, "fischers", apiResourceList.APIResources[0].Name)
- assert.False(t, apiResourceList.APIResources[0].Namespaced)
- assert.Equal(t, "flunders", apiResourceList.APIResources[1].Name)
- assert.True(t, apiResourceList.APIResources[1].Namespaced)
- }
- var (
- // I have no idea what these certs are, they just need to be different
- differentClientCA = []byte(`-----BEGIN CERTIFICATE-----
- MIIDQDCCAiigAwIBAgIJANWw74P5KJk2MA0GCSqGSIb3DQEBCwUAMDQxMjAwBgNV
- BAMMKWdlbmVyaWNfd2ViaG9va19hZG1pc3Npb25fcGx1Z2luX3Rlc3RzX2NhMCAX
- DTE3MTExNjAwMDUzOVoYDzIyOTEwOTAxMDAwNTM5WjAjMSEwHwYDVQQDExh3ZWJo
- b29rLXRlc3QuZGVmYXVsdC5zdmMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
- AoIBAQDXd/nQ89a5H8ifEsigmMd01Ib6NVR3bkJjtkvYnTbdfYEBj7UzqOQtHoLa
- dIVmefny5uIHvj93WD8WDVPB3jX2JHrXkDTXd/6o6jIXHcsUfFTVLp6/bZ+Anqe0
- r/7hAPkzA2A7APyTWM3ZbEeo1afXogXhOJ1u/wz0DflgcB21gNho4kKTONXO3NHD
- XLpspFqSkxfEfKVDJaYAoMnYZJtFNsa2OvsmLnhYF8bjeT3i07lfwrhUZvP+7Gsp
- 7UgUwc06WuNHjfx1s5e6ySzH0QioMD1rjYneqOvk0pKrMIhuAEWXqq7jlXcDtx1E
- j+wnYbVqqVYheHZ8BCJoVAAQGs9/AgMBAAGjZDBiMAkGA1UdEwQCMAAwCwYDVR0P
- BAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATApBgNVHREEIjAg
- hwR/AAABghh3ZWJob29rLXRlc3QuZGVmYXVsdC5zdmMwDQYJKoZIhvcNAQELBQAD
- ggEBAD/GKSPNyQuAOw/jsYZesb+RMedbkzs18sSwlxAJQMUrrXwlVdHrA8q5WhE6
- ABLqU1b8lQ8AWun07R8k5tqTmNvCARrAPRUqls/ryER+3Y9YEcxEaTc3jKNZFLbc
- T6YtcnkdhxsiO136wtiuatpYL91RgCmuSpR8+7jEHhuFU01iaASu7ypFrUzrKHTF
- bKwiLRQi1cMzVcLErq5CDEKiKhUkoDucyARFszrGt9vNIl/YCcBOkcNvM3c05Hn3
- M++C29JwS3Hwbubg6WO3wjFjoEhpCwU6qRYUz3MRp4tHO4kxKXx+oQnUiFnR7vW0
- YkNtGc1RUDHwecCTFpJtPb7Yu/E=
- -----END CERTIFICATE-----
- `)
- differentFrontProxyCA = []byte(`-----BEGIN CERTIFICATE-----
- MIIBqDCCAU2gAwIBAgIUfbqeieihh/oERbfvRm38XvS/xHAwCgYIKoZIzj0EAwIw
- GjEYMBYGA1UEAxMPSW50ZXJtZWRpYXRlLUNBMCAXDTE2MTAxMTA1MDYwMFoYDzIx
- MTYwOTE3MDUwNjAwWjAUMRIwEAYDVQQDEwlNeSBDbGllbnQwWTATBgcqhkjOPQIB
- BggqhkjOPQMBBwNCAARv6N4R/sjMR65iMFGNLN1GC/vd7WhDW6J4X/iAjkRLLnNb
- KbRG/AtOUZ+7upJ3BWIRKYbOabbQGQe2BbKFiap4o3UwczAOBgNVHQ8BAf8EBAMC
- BaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4EFgQU
- K/pZOWpNcYai6eHFpmJEeFpeQlEwHwYDVR0jBBgwFoAUX6nQlxjfWnP6aM1meO/Q
- a6b3a9kwCgYIKoZIzj0EAwIDSQAwRgIhAIWTKw/sjJITqeuNzJDAKU4xo1zL+xJ5
- MnVCuBwfwDXCAiEAw/1TA+CjPq9JC5ek1ifR0FybTURjeQqYkKpve1dveps=
- -----END CERTIFICATE-----
- `)
- )
|