kube_apiserver_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "reflect"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/go-openapi/spec"
  25. appsv1 "k8s.io/api/apps/v1"
  26. corev1 "k8s.io/api/core/v1"
  27. "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
  28. apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
  29. apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apiserver/pkg/registry/generic/registry"
  33. "k8s.io/client-go/kubernetes"
  34. "k8s.io/kube-aggregator/pkg/apis/apiregistration"
  35. kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
  36. "k8s.io/kubernetes/test/integration/etcd"
  37. "k8s.io/kubernetes/test/integration/framework"
  38. )
  39. const (
  40. // testApiextensionsOverlapProbeString is a probe string which identifies whether
  41. // a CRD change triggers an OpenAPI spec change
  42. testApiextensionsOverlapProbeString = "testApiextensionsOverlapProbeField"
  43. )
  44. func TestRun(t *testing.T) {
  45. server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  46. defer server.TearDownFn()
  47. client, err := kubernetes.NewForConfig(server.ClientConfig)
  48. if err != nil {
  49. t.Fatalf("unexpected error: %v", err)
  50. }
  51. // test whether the server is really healthy after /healthz told us so
  52. t.Logf("Creating Deployment directly after being healthy")
  53. var replicas int32 = 1
  54. _, err = client.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{
  55. TypeMeta: metav1.TypeMeta{
  56. Kind: "Deployment",
  57. APIVersion: "apps/v1",
  58. },
  59. ObjectMeta: metav1.ObjectMeta{
  60. Namespace: "default",
  61. Name: "test",
  62. Labels: map[string]string{"foo": "bar"},
  63. },
  64. Spec: appsv1.DeploymentSpec{
  65. Replicas: &replicas,
  66. Strategy: appsv1.DeploymentStrategy{
  67. Type: appsv1.RollingUpdateDeploymentStrategyType,
  68. },
  69. Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
  70. Template: corev1.PodTemplateSpec{
  71. ObjectMeta: metav1.ObjectMeta{
  72. Labels: map[string]string{"foo": "bar"},
  73. },
  74. Spec: corev1.PodSpec{
  75. Containers: []corev1.Container{
  76. {
  77. Name: "foo",
  78. Image: "foo",
  79. },
  80. },
  81. },
  82. },
  83. },
  84. }, metav1.CreateOptions{})
  85. if err != nil {
  86. t.Fatalf("Failed to create deployment: %v", err)
  87. }
  88. }
  89. func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) bool {
  90. res := client.CoreV1().RESTClient().Get().AbsPath(path).Do(context.TODO())
  91. var status int
  92. res.StatusCode(&status)
  93. return status == http.StatusOK
  94. }
  95. func TestLivezAndReadyz(t *testing.T) {
  96. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--livez-grace-period", "0s"}, framework.SharedEtcd())
  97. defer server.TearDownFn()
  98. client, err := kubernetes.NewForConfig(server.ClientConfig)
  99. if err != nil {
  100. t.Fatalf("unexpected error: %v", err)
  101. }
  102. if !endpointReturnsStatusOK(client, "/livez") {
  103. t.Fatalf("livez should be healthy")
  104. }
  105. if !endpointReturnsStatusOK(client, "/readyz") {
  106. t.Fatalf("readyz should be healthy")
  107. }
  108. }
  109. // TestOpenAPIDelegationChainPlumbing is a smoke test that checks for
  110. // the existence of some representative paths from the
  111. // apiextensions-server and the kube-aggregator server, both part of
  112. // the delegation chain in kube-apiserver.
  113. func TestOpenAPIDelegationChainPlumbing(t *testing.T) {
  114. server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  115. defer server.TearDownFn()
  116. kubeclient, err := kubernetes.NewForConfig(server.ClientConfig)
  117. if err != nil {
  118. t.Fatalf("unexpected error: %v", err)
  119. }
  120. result := kubeclient.RESTClient().Get().AbsPath("/openapi/v2").Do(context.TODO())
  121. status := 0
  122. result.StatusCode(&status)
  123. if status != http.StatusOK {
  124. t.Fatalf("GET /openapi/v2 failed: expected status=%d, got=%d", http.StatusOK, status)
  125. }
  126. raw, err := result.Raw()
  127. if err != nil {
  128. t.Fatalf("Unexpected error: %v", err)
  129. }
  130. type openAPISchema struct {
  131. Paths map[string]interface{} `json:"paths"`
  132. }
  133. var doc openAPISchema
  134. err = json.Unmarshal(raw, &doc)
  135. if err != nil {
  136. t.Fatalf("Failed to unmarshal: %v", err)
  137. }
  138. matchedExtension := false
  139. extensionsPrefix := "/apis/" + apiextensions.GroupName
  140. matchedRegistration := false
  141. registrationPrefix := "/apis/" + apiregistration.GroupName
  142. for path := range doc.Paths {
  143. if strings.HasPrefix(path, extensionsPrefix) {
  144. matchedExtension = true
  145. }
  146. if strings.HasPrefix(path, registrationPrefix) {
  147. matchedRegistration = true
  148. }
  149. if matchedExtension && matchedRegistration {
  150. return
  151. }
  152. }
  153. if !matchedExtension {
  154. t.Errorf("missing path: %q", extensionsPrefix)
  155. }
  156. if !matchedRegistration {
  157. t.Errorf("missing path: %q", registrationPrefix)
  158. }
  159. }
  160. func TestOpenAPIApiextensionsOverlapProtection(t *testing.T) {
  161. server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  162. defer server.TearDownFn()
  163. apiextensionsclient, err := apiextensionsclientset.NewForConfig(server.ClientConfig)
  164. if err != nil {
  165. t.Fatalf("unexpected error: %v", err)
  166. }
  167. crdPath, exist, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/{name}`)
  168. if err != nil {
  169. t.Fatalf("unexpected error getting CRD OpenAPI path: %v", err)
  170. }
  171. if !exist {
  172. t.Fatalf("unexpected error: apiextensions OpenAPI path doesn't exist")
  173. }
  174. // Create a CRD that overlaps OpenAPI path with the CRD API
  175. crd := &apiextensionsv1beta1.CustomResourceDefinition{
  176. ObjectMeta: metav1.ObjectMeta{
  177. Name: "customresourcedefinitions.apiextensions.k8s.io",
  178. Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
  179. },
  180. Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
  181. Group: "apiextensions.k8s.io",
  182. Version: "v1beta1",
  183. Scope: apiextensionsv1beta1.ClusterScoped,
  184. Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
  185. Plural: "customresourcedefinitions",
  186. Singular: "customresourcedefinition",
  187. Kind: "CustomResourceDefinition",
  188. ListKind: "CustomResourceDefinitionList",
  189. },
  190. Validation: &apiextensionsv1beta1.CustomResourceValidation{
  191. OpenAPIV3Schema: &apiextensionsv1beta1.JSONSchemaProps{
  192. Type: "object",
  193. Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{
  194. testApiextensionsOverlapProbeString: {Type: "boolean"},
  195. },
  196. },
  197. },
  198. },
  199. }
  200. etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
  201. // Create a probe CRD foo that triggers an OpenAPI spec change
  202. if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "foo"); err != nil {
  203. t.Fatalf("unexpected error: %v", err)
  204. }
  205. // Expect the CRD path to not change
  206. path, _, err := getOpenAPIPath(apiextensionsclient, `/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/{name}`)
  207. if err != nil {
  208. t.Fatalf("unexpected error: %v", err)
  209. }
  210. pathBytes, err := json.Marshal(path)
  211. if err != nil {
  212. t.Fatalf("unexpected error: %v", err)
  213. }
  214. crdPathBytes, err := json.Marshal(crdPath)
  215. if err != nil {
  216. t.Fatalf("unexpected error: %v", err)
  217. }
  218. if !bytes.Equal(pathBytes, crdPathBytes) {
  219. t.Fatalf("expected CRD OpenAPI path to not change, but got different results: want %q, got %q", string(crdPathBytes), string(pathBytes))
  220. }
  221. // Expect the orphan definition to be pruned from the spec
  222. exist, err = specHasProbe(apiextensionsclient, testApiextensionsOverlapProbeString)
  223. if err != nil {
  224. t.Fatalf("unexpected error: %v", err)
  225. }
  226. if exist {
  227. t.Fatalf("unexpected error: orphan definition isn't pruned")
  228. }
  229. // Create a CRD that overlaps OpenAPI definition with the CRD API
  230. crd = &apiextensionsv1beta1.CustomResourceDefinition{
  231. ObjectMeta: metav1.ObjectMeta{
  232. Name: "customresourcedefinitions.apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
  233. Annotations: map[string]string{"api-approved.kubernetes.io": "unapproved, test-only"},
  234. },
  235. Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
  236. Group: "apiextensions.apis.pkg.apiextensions-apiserver.k8s.io",
  237. Version: "v1beta1",
  238. Scope: apiextensionsv1beta1.ClusterScoped,
  239. Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
  240. Plural: "customresourcedefinitions",
  241. Singular: "customresourcedefinition",
  242. Kind: "CustomResourceDefinition",
  243. ListKind: "CustomResourceDefinitionList",
  244. },
  245. Validation: &apiextensionsv1beta1.CustomResourceValidation{
  246. OpenAPIV3Schema: &apiextensionsv1beta1.JSONSchemaProps{
  247. Type: "object",
  248. Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{
  249. testApiextensionsOverlapProbeString: {Type: "boolean"},
  250. },
  251. },
  252. },
  253. },
  254. }
  255. etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
  256. // Create a probe CRD bar that triggers an OpenAPI spec change
  257. if err := triggerSpecUpdateWithProbeCRD(t, apiextensionsclient, "bar"); err != nil {
  258. t.Fatalf("unexpected error: %v", err)
  259. }
  260. // Expect the apiextensions definition to not change, since the overlapping definition will get renamed.
  261. apiextensionsDefinition, exist, err := getOpenAPIDefinition(apiextensionsclient, `io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1beta1.CustomResourceDefinition`)
  262. if err != nil {
  263. t.Fatalf("unexpected error: %v", err)
  264. }
  265. if !exist {
  266. t.Fatalf("unexpected error: apiextensions definition doesn't exist")
  267. }
  268. bytes, err := json.Marshal(apiextensionsDefinition)
  269. if err != nil {
  270. t.Fatalf("unexpected error: %v", err)
  271. }
  272. if exist := strings.Contains(string(bytes), testApiextensionsOverlapProbeString); exist {
  273. t.Fatalf("unexpected error: apiextensions definition gets overlapped")
  274. }
  275. }
  276. // triggerSpecUpdateWithProbeCRD creates a probe CRD with suffix in name, and waits until
  277. // the path and definition for the probe CRD show up in the OpenAPI spec
  278. func triggerSpecUpdateWithProbeCRD(t *testing.T, apiextensionsclient *apiextensionsclientset.Clientset, suffix string) error {
  279. // Create a probe CRD that triggers OpenAPI spec change
  280. name := fmt.Sprintf("integration-test-%s-crd", suffix)
  281. kind := fmt.Sprintf("Integration-test-%s-crd", suffix)
  282. group := "probe.test.com"
  283. crd := &apiextensionsv1beta1.CustomResourceDefinition{
  284. ObjectMeta: metav1.ObjectMeta{Name: name + "s." + group},
  285. Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
  286. Group: group,
  287. Version: "v1",
  288. Scope: apiextensionsv1beta1.ClusterScoped,
  289. Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
  290. Plural: name + "s",
  291. Singular: name,
  292. Kind: kind,
  293. ListKind: kind + "List",
  294. },
  295. },
  296. }
  297. etcd.CreateTestCRDs(t, apiextensionsclient, false, crd)
  298. // Expect the probe CRD path to show up in the OpenAPI spec
  299. // TODO(roycaihw): expose response header in rest client and utilize etag here
  300. if err := wait.Poll(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
  301. _, exist, err := getOpenAPIPath(apiextensionsclient, fmt.Sprintf(`/apis/%s/v1/%ss/{name}`, group, name))
  302. if err != nil {
  303. return false, err
  304. }
  305. return exist, nil
  306. }); err != nil {
  307. return fmt.Errorf("failed to observe probe CRD path in the spec: %v", err)
  308. }
  309. return nil
  310. }
  311. func specHasProbe(clientset *apiextensionsclientset.Clientset, probe string) (bool, error) {
  312. bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
  313. if err != nil {
  314. return false, err
  315. }
  316. return strings.Contains(string(bs), probe), nil
  317. }
  318. func getOpenAPIPath(clientset *apiextensionsclientset.Clientset, path string) (spec.PathItem, bool, error) {
  319. bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
  320. if err != nil {
  321. return spec.PathItem{}, false, err
  322. }
  323. s := spec.Swagger{}
  324. if err := json.Unmarshal(bs, &s); err != nil {
  325. return spec.PathItem{}, false, err
  326. }
  327. if s.SwaggerProps.Paths == nil {
  328. return spec.PathItem{}, false, fmt.Errorf("unexpected empty path")
  329. }
  330. value, ok := s.SwaggerProps.Paths.Paths[path]
  331. return value, ok, nil
  332. }
  333. func getOpenAPIDefinition(clientset *apiextensionsclientset.Clientset, definition string) (spec.Schema, bool, error) {
  334. bs, err := clientset.RESTClient().Get().AbsPath("openapi", "v2").DoRaw(context.TODO())
  335. if err != nil {
  336. return spec.Schema{}, false, err
  337. }
  338. s := spec.Swagger{}
  339. if err := json.Unmarshal(bs, &s); err != nil {
  340. return spec.Schema{}, false, err
  341. }
  342. if s.SwaggerProps.Definitions == nil {
  343. return spec.Schema{}, false, fmt.Errorf("unexpected empty path")
  344. }
  345. value, ok := s.SwaggerProps.Definitions[definition]
  346. return value, ok, nil
  347. }
  348. // return the unique endpoint IPs
  349. func getEndpointIPs(endpoints *corev1.Endpoints) []string {
  350. endpointMap := make(map[string]bool)
  351. ips := make([]string, 0)
  352. for _, subset := range endpoints.Subsets {
  353. for _, address := range subset.Addresses {
  354. if _, ok := endpointMap[address.IP]; !ok {
  355. endpointMap[address.IP] = true
  356. ips = append(ips, address.IP)
  357. }
  358. }
  359. }
  360. return ips
  361. }
  362. func verifyEndpointsWithIPs(servers []*kubeapiservertesting.TestServer, ips []string) bool {
  363. listenAddresses := make([]string, 0)
  364. for _, server := range servers {
  365. listenAddresses = append(listenAddresses, server.ServerOpts.GenericServerRunOptions.AdvertiseAddress.String())
  366. }
  367. return reflect.DeepEqual(listenAddresses, ips)
  368. }
  369. func testReconcilersMasterLease(t *testing.T, leaseCount int, masterCount int) {
  370. var leaseServers []*kubeapiservertesting.TestServer
  371. var masterCountServers []*kubeapiservertesting.TestServer
  372. etcd := framework.SharedEtcd()
  373. instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
  374. DisableStorageCleanup: true,
  375. }
  376. // cleanup the registry storage
  377. defer registry.CleanupStorage()
  378. // 1. start masterCount api servers
  379. for i := 0; i < masterCount; i++ {
  380. // start master count api server
  381. server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
  382. "--endpoint-reconciler-type", "master-count",
  383. "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
  384. "--apiserver-count", fmt.Sprintf("%v", masterCount),
  385. }, etcd)
  386. masterCountServers = append(masterCountServers, server)
  387. }
  388. // 2. verify master count servers have registered
  389. if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
  390. client, err := kubernetes.NewForConfig(masterCountServers[0].ClientConfig)
  391. if err != nil {
  392. t.Logf("error creating client: %v", err)
  393. return false, nil
  394. }
  395. endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
  396. if err != nil {
  397. t.Logf("error fetching endpoints: %v", err)
  398. return false, nil
  399. }
  400. return verifyEndpointsWithIPs(masterCountServers, getEndpointIPs(endpoints)), nil
  401. }); err != nil {
  402. t.Fatalf("master count endpoints failed to register: %v", err)
  403. }
  404. // 3. start lease api servers
  405. for i := 0; i < leaseCount; i++ {
  406. options := []string{
  407. "--endpoint-reconciler-type", "lease",
  408. "--advertise-address", fmt.Sprintf("10.0.1.%v", i+10),
  409. }
  410. server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, options, etcd)
  411. defer server.TearDownFn()
  412. leaseServers = append(leaseServers, server)
  413. }
  414. time.Sleep(3 * time.Second)
  415. // 4. Shutdown the masterCount server
  416. for _, server := range masterCountServers {
  417. server.TearDownFn()
  418. }
  419. // 5. verify only leaseEndpoint servers left
  420. if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
  421. client, err := kubernetes.NewForConfig(leaseServers[0].ClientConfig)
  422. if err != nil {
  423. t.Logf("create client error: %v", err)
  424. return false, nil
  425. }
  426. endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
  427. if err != nil {
  428. t.Logf("error fetching endpoints: %v", err)
  429. return false, nil
  430. }
  431. return verifyEndpointsWithIPs(leaseServers, getEndpointIPs(endpoints)), nil
  432. }); err != nil {
  433. t.Fatalf("did not find only lease endpoints: %v", err)
  434. }
  435. }
  436. func TestReconcilerMasterLeaseCombined(t *testing.T) {
  437. testReconcilersMasterLease(t, 1, 3)
  438. }
  439. func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
  440. testReconcilersMasterLease(t, 3, 2)
  441. }
  442. func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
  443. testReconcilersMasterLease(t, 3, 3)
  444. }