etcd.go 6.0 KB


  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package upgrades
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "path/filepath"
  21. "sync"
  22. "time"
  23. "github.com/onsi/ginkgo"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/util/version"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. "k8s.io/kubernetes/test/e2e/framework"
  28. e2esset "k8s.io/kubernetes/test/e2e/framework/statefulset"
  29. "k8s.io/kubernetes/test/e2e/framework/testfiles"
  30. )
  31. const manifestPath = "test/e2e/testing-manifests/statefulset/etcd"
  32. // EtcdUpgradeTest tests that etcd is writable before and after a cluster upgrade.
  33. type EtcdUpgradeTest struct {
  34. ip string
  35. successfulWrites int
  36. }
  37. // Name returns the tracking name of the test.
  38. func (EtcdUpgradeTest) Name() string { return "etcd-upgrade" }
  39. // Skip returns true when this test can be skipped.
  40. func (EtcdUpgradeTest) Skip(upgCtx UpgradeContext) bool {
  41. minVersion := version.MustParseSemantic("1.6.0")
  42. for _, vCtx := range upgCtx.Versions {
  43. if vCtx.Version.LessThan(minVersion) {
  44. return true
  45. }
  46. }
  47. return false
  48. }
  49. func kubectlCreate(ns, file string) {
  50. input := string(testfiles.ReadOrDie(filepath.Join(manifestPath, file)))
  51. framework.RunKubectlOrDieInput(ns, input, "create", "-f", "-", fmt.Sprintf("--namespace=%s", ns))
  52. }
  53. // Setup creates etcd statefulset and then verifies that the etcd is writable.
  54. func (t *EtcdUpgradeTest) Setup(f *framework.Framework) {
  55. ns := f.Namespace.Name
  56. statefulsetPoll := 30 * time.Second
  57. statefulsetTimeout := 10 * time.Minute
  58. ginkgo.By("Creating a PDB")
  59. kubectlCreate(ns, "pdb.yaml")
  60. ginkgo.By("Creating an etcd StatefulSet")
  61. e2esset.CreateStatefulSet(f.ClientSet, manifestPath, ns)
  62. ginkgo.By("Creating an etcd--test-server deployment")
  63. kubectlCreate(ns, "tester.yaml")
  64. ginkgo.By("Getting the ingress IPs from the services")
  65. err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
  66. if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" {
  67. return false, nil
  68. }
  69. if _, err := t.listUsers(); err != nil {
  70. framework.Logf("Service endpoint is up but isn't responding")
  71. return false, nil
  72. }
  73. return true, nil
  74. })
  75. framework.ExpectNoError(err)
  76. framework.Logf("Service endpoint is up")
  77. ginkgo.By("Adding 2 dummy users")
  78. err = t.addUser("Alice")
  79. framework.ExpectNoError(err)
  80. err = t.addUser("Bob")
  81. framework.ExpectNoError(err)
  82. t.successfulWrites = 2
  83. ginkgo.By("Verifying that the users exist")
  84. users, err := t.listUsers()
  85. framework.ExpectNoError(err)
  86. framework.ExpectEqual(len(users), 2)
  87. }
  88. func (t *EtcdUpgradeTest) listUsers() ([]string, error) {
  89. r, err := http.Get(fmt.Sprintf("http://%s:8080/list", t.ip))
  90. if err != nil {
  91. return nil, err
  92. }
  93. defer r.Body.Close()
  94. if r.StatusCode != http.StatusOK {
  95. b, err := ioutil.ReadAll(r.Body)
  96. if err != nil {
  97. return nil, err
  98. }
  99. return nil, fmt.Errorf(string(b))
  100. }
  101. var names []string
  102. if err := json.NewDecoder(r.Body).Decode(&names); err != nil {
  103. return nil, err
  104. }
  105. return names, nil
  106. }
  107. func (t *EtcdUpgradeTest) addUser(name string) error {
  108. val := map[string][]string{"name": {name}}
  109. r, err := http.PostForm(fmt.Sprintf("http://%s:8080/add", t.ip), val)
  110. if err != nil {
  111. return err
  112. }
  113. defer r.Body.Close()
  114. if r.StatusCode != http.StatusOK {
  115. b, err := ioutil.ReadAll(r.Body)
  116. if err != nil {
  117. return err
  118. }
  119. return fmt.Errorf(string(b))
  120. }
  121. return nil
  122. }
  123. func (t *EtcdUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string {
  124. svc, err := f.ClientSet.CoreV1().Services(ns).Get(context.TODO(), svcName, metav1.GetOptions{})
  125. framework.ExpectNoError(err)
  126. ingress := svc.Status.LoadBalancer.Ingress
  127. if len(ingress) == 0 {
  128. return ""
  129. }
  130. return ingress[0].IP
  131. }
  132. // Test waits for upgrade to complete and verifies if etcd is writable.
  133. func (t *EtcdUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
  134. ginkgo.By("Continuously polling the database during upgrade.")
  135. var (
  136. success, failures, writeAttempts, lastUserCount int
  137. mu sync.Mutex
  138. errors = map[string]int{}
  139. )
  140. // Write loop.
  141. go wait.Until(func() {
  142. writeAttempts++
  143. if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil {
  144. framework.Logf("Unable to add user: %v", err)
  145. mu.Lock()
  146. errors[err.Error()]++
  147. mu.Unlock()
  148. return
  149. }
  150. t.successfulWrites++
  151. }, 10*time.Millisecond, done)
  152. // Read loop.
  153. wait.Until(func() {
  154. users, err := t.listUsers()
  155. if err != nil {
  156. framework.Logf("Could not retrieve users: %v", err)
  157. failures++
  158. mu.Lock()
  159. errors[err.Error()]++
  160. mu.Unlock()
  161. return
  162. }
  163. success++
  164. lastUserCount = len(users)
  165. }, 10*time.Millisecond, done)
  166. framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites)
  167. framework.ExpectEqual(lastUserCount >= t.successfulWrites, true)
  168. ratio := float64(success) / float64(success+failures)
  169. framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio)
  170. ratio = float64(t.successfulWrites) / float64(writeAttempts)
  171. framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio)
  172. framework.Logf("Errors: %v", errors)
  173. // TODO(maisem): tweak this value once we have a few test runs.
  174. framework.ExpectEqual(ratio > 0.75, true)
  175. }
  176. // Teardown does one final check of the data's availability.
  177. func (t *EtcdUpgradeTest) Teardown(f *framework.Framework) {
  178. users, err := t.listUsers()
  179. framework.ExpectNoError(err)
  180. framework.ExpectEqual(len(users) >= t.successfulWrites, true)
  181. }