server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "context"
  16. "encoding/json"
  17. "io/ioutil"
  18. "net"
  19. "net/http"
  20. "os"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/clientv3/concurrency"
  26. apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
  27. apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
  28. "k8s.io/apimachinery/pkg/api/meta"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/util/wait"
  34. genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
  35. cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
  36. "k8s.io/client-go/dynamic"
  37. clientset "k8s.io/client-go/kubernetes"
  38. restclient "k8s.io/client-go/rest"
  39. "k8s.io/client-go/restmapper"
  40. "k8s.io/kubernetes/cmd/kube-apiserver/app"
  41. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  42. "k8s.io/kubernetes/test/integration"
  43. "k8s.io/kubernetes/test/integration/framework"
  44. // install all APIs
  45. _ "k8s.io/kubernetes/pkg/master"
  46. )
  47. // StartRealMasterOrDie starts an API master that is appropriate for use in tests that require one of every resource
  48. func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *Master {
  49. certDir, err := ioutil.TempDir("", t.Name())
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. _, defaultServiceClusterIPRange, err := net.ParseCIDR("10.0.0.0/24")
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. kubeAPIServerOptions := options.NewServerRunOptions()
  62. kubeAPIServerOptions.InsecureServing.BindPort = 0
  63. kubeAPIServerOptions.SecureServing.Listener = listener
  64. kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
  65. kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
  66. kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd
  67. kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
  68. kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
  69. kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
  70. kubeAPIServerOptions.APIEnablement.RuntimeConfig["api/all"] = "true"
  71. for _, f := range configFuncs {
  72. f(kubeAPIServerOptions)
  73. }
  74. completedOptions, err := app.Complete(kubeAPIServerOptions)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. // get etcd client before starting API server
  79. rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. // get a leased session
  84. session, err := concurrency.NewSession(rawClient)
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. // then build and use an etcd lock
  89. // this prevents more than one of these masters from running at the same time
  90. lock := concurrency.NewLocker(session, "kube_integration_etcd_raw")
  91. lock.Lock()
  92. // make sure we start with a clean slate
  93. if _, err := kvClient.Delete(context.Background(), "/registry/", clientv3.WithPrefix()); err != nil {
  94. t.Fatal(err)
  95. }
  96. stopCh := make(chan struct{})
  97. kubeAPIServer, err := app.CreateServerChain(completedOptions, stopCh)
  98. if err != nil {
  99. t.Fatal(err)
  100. }
  101. kubeClientConfig := restclient.CopyConfig(kubeAPIServer.LoopbackClientConfig)
  102. // we make lots of requests, don't be slow
  103. kubeClientConfig.QPS = 99999
  104. kubeClientConfig.Burst = 9999
  105. kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
  106. go func() {
  107. // Catch panics that occur in this go routine so we get a comprehensible failure
  108. defer func() {
  109. if err := recover(); err != nil {
  110. t.Errorf("Unexpected panic trying to start API master: %#v", err)
  111. }
  112. }()
  113. if err := kubeAPIServer.PrepareRun().Run(stopCh); err != nil {
  114. t.Fatal(err)
  115. }
  116. }()
  117. lastHealth := ""
  118. attempt := 0
  119. if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
  120. // wait for the server to be healthy
  121. result := kubeClient.RESTClient().Get().AbsPath("/healthz").Do()
  122. content, _ := result.Raw()
  123. lastHealth = string(content)
  124. if errResult := result.Error(); errResult != nil {
  125. attempt++
  126. if attempt < 10 {
  127. t.Log("waiting for server to be healthy")
  128. } else {
  129. t.Log(errResult)
  130. }
  131. return false, nil
  132. }
  133. var status int
  134. result.StatusCode(&status)
  135. return status == http.StatusOK, nil
  136. }); err != nil {
  137. t.Log(lastHealth)
  138. t.Fatal(err)
  139. }
  140. // create CRDs so we can make sure that custom resources do not get lost
  141. CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(kubeClientConfig), false, GetCustomResourceDefinitionData()...)
  142. // force cached discovery reset
  143. discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
  144. restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
  145. restMapper.Reset()
  146. serverResources, err := kubeClient.Discovery().ServerResources()
  147. if err != nil {
  148. t.Fatal(err)
  149. }
  150. cleanup := func() {
  151. if err := os.RemoveAll(certDir); err != nil {
  152. t.Log(err)
  153. }
  154. close(stopCh)
  155. lock.Unlock()
  156. if err := session.Close(); err != nil {
  157. t.Log(err)
  158. }
  159. }
  160. return &Master{
  161. Client: kubeClient,
  162. Dynamic: dynamic.NewForConfigOrDie(kubeClientConfig),
  163. Config: kubeClientConfig,
  164. KV: kvClient,
  165. Mapper: restMapper,
  166. Resources: GetResources(t, serverResources),
  167. Cleanup: cleanup,
  168. }
  169. }
  170. // Master represents a running API server that is ready for use
  171. // The Cleanup func must be deferred to prevent resource leaks
  172. type Master struct {
  173. Client clientset.Interface
  174. Dynamic dynamic.Interface
  175. Config *restclient.Config
  176. KV clientv3.KV
  177. Mapper meta.RESTMapper
  178. Resources []Resource
  179. Cleanup func()
  180. }
  181. // Resource contains REST mapping information for a specific resource and extra metadata such as delete collection support
  182. type Resource struct {
  183. Mapping *meta.RESTMapping
  184. HasDeleteCollection bool
  185. }
  186. // GetResources fetches the Resources associated with serverResources that support get and create
  187. func GetResources(t *testing.T, serverResources []*metav1.APIResourceList) []Resource {
  188. var resources []Resource
  189. for _, discoveryGroup := range serverResources {
  190. for _, discoveryResource := range discoveryGroup.APIResources {
  191. // this is a subresource, skip it
  192. if strings.Contains(discoveryResource.Name, "/") {
  193. continue
  194. }
  195. hasCreate := false
  196. hasGet := false
  197. hasDeleteCollection := false
  198. for _, verb := range discoveryResource.Verbs {
  199. if verb == "get" {
  200. hasGet = true
  201. }
  202. if verb == "create" {
  203. hasCreate = true
  204. }
  205. if verb == "deletecollection" {
  206. hasDeleteCollection = true
  207. }
  208. }
  209. if !(hasCreate && hasGet) {
  210. continue
  211. }
  212. resourceGV, err := schema.ParseGroupVersion(discoveryGroup.GroupVersion)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. gvk := resourceGV.WithKind(discoveryResource.Kind)
  217. if len(discoveryResource.Group) > 0 || len(discoveryResource.Version) > 0 {
  218. gvk = schema.GroupVersionKind{
  219. Group: discoveryResource.Group,
  220. Version: discoveryResource.Version,
  221. Kind: discoveryResource.Kind,
  222. }
  223. }
  224. gvr := resourceGV.WithResource(discoveryResource.Name)
  225. resources = append(resources, Resource{
  226. Mapping: &meta.RESTMapping{
  227. Resource: gvr,
  228. GroupVersionKind: gvk,
  229. Scope: scope(discoveryResource.Namespaced),
  230. },
  231. HasDeleteCollection: hasDeleteCollection,
  232. })
  233. }
  234. }
  235. return resources
  236. }
  237. func scope(namespaced bool) meta.RESTScope {
  238. if namespaced {
  239. return meta.RESTScopeNamespace
  240. }
  241. return meta.RESTScopeRoot
  242. }
  243. // JSONToUnstructured converts a JSON stub to unstructured.Unstructured and
  244. // returns a dynamic resource client that can be used to interact with it
  245. func JSONToUnstructured(stub, namespace string, mapping *meta.RESTMapping, dynamicClient dynamic.Interface) (dynamic.ResourceInterface, *unstructured.Unstructured, error) {
  246. typeMetaAdder := map[string]interface{}{}
  247. if err := json.Unmarshal([]byte(stub), &typeMetaAdder); err != nil {
  248. return nil, nil, err
  249. }
  250. // we don't require GVK on the data we provide, so we fill it in here. We could, but that seems extraneous.
  251. typeMetaAdder["apiVersion"] = mapping.GroupVersionKind.GroupVersion().String()
  252. typeMetaAdder["kind"] = mapping.GroupVersionKind.Kind
  253. if mapping.Scope == meta.RESTScopeRoot {
  254. namespace = ""
  255. }
  256. return dynamicClient.Resource(mapping.Resource).Namespace(namespace), &unstructured.Unstructured{Object: typeMetaAdder}, nil
  257. }
  258. // CreateTestCRDs creates the given CRDs, any failure causes the test to Fatal.
  259. // If skipCrdExistsInDiscovery is true, the CRDs are only checked for the Established condition via their Status.
  260. // If skipCrdExistsInDiscovery is false, the CRDs are checked via discovery, see CrdExistsInDiscovery.
  261. func CreateTestCRDs(t *testing.T, client apiextensionsclientset.Interface, skipCrdExistsInDiscovery bool, crds ...*apiextensionsv1beta1.CustomResourceDefinition) {
  262. for _, crd := range crds {
  263. createTestCRD(t, client, skipCrdExistsInDiscovery, crd)
  264. }
  265. }
  266. func createTestCRD(t *testing.T, client apiextensionsclientset.Interface, skipCrdExistsInDiscovery bool, crd *apiextensionsv1beta1.CustomResourceDefinition) {
  267. if _, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd); err != nil {
  268. t.Fatalf("Failed to create %s CRD; %v", crd.Name, err)
  269. }
  270. if skipCrdExistsInDiscovery {
  271. if err := waitForEstablishedCRD(client, crd.Name); err != nil {
  272. t.Fatalf("Failed to establish %s CRD; %v", crd.Name, err)
  273. }
  274. return
  275. }
  276. if err := wait.PollImmediate(500*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  277. return CrdExistsInDiscovery(client, crd), nil
  278. }); err != nil {
  279. t.Fatalf("Failed to see %s in discovery: %v", crd.Name, err)
  280. }
  281. }
  282. func waitForEstablishedCRD(client apiextensionsclientset.Interface, name string) error {
  283. return wait.PollImmediate(500*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  284. crd, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(name, metav1.GetOptions{})
  285. if err != nil {
  286. return false, err
  287. }
  288. for _, cond := range crd.Status.Conditions {
  289. switch cond.Type {
  290. case apiextensionsv1beta1.Established:
  291. if cond.Status == apiextensionsv1beta1.ConditionTrue {
  292. return true, nil
  293. }
  294. }
  295. }
  296. return false, nil
  297. })
  298. }
  299. // CrdExistsInDiscovery checks to see if the given CRD exists in discovery at all served versions.
  300. func CrdExistsInDiscovery(client apiextensionsclientset.Interface, crd *apiextensionsv1beta1.CustomResourceDefinition) bool {
  301. var versions []string
  302. if len(crd.Spec.Version) != 0 {
  303. versions = append(versions, crd.Spec.Version)
  304. }
  305. for _, v := range crd.Spec.Versions {
  306. if v.Served {
  307. versions = append(versions, v.Name)
  308. }
  309. }
  310. for _, v := range versions {
  311. if !crdVersionExistsInDiscovery(client, crd, v) {
  312. return false
  313. }
  314. }
  315. return true
  316. }
  317. func crdVersionExistsInDiscovery(client apiextensionsclientset.Interface, crd *apiextensionsv1beta1.CustomResourceDefinition, version string) bool {
  318. resourceList, err := client.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version)
  319. if err != nil {
  320. return false
  321. }
  322. for _, resource := range resourceList.APIResources {
  323. if resource.Name == crd.Spec.Names.Plural {
  324. return true
  325. }
  326. }
  327. return false
  328. }