123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package upgrades
- import (
- "context"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net/http"
- "path/filepath"
- "sync"
- "time"
- "github.com/onsi/ginkgo"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/version"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/kubernetes/test/e2e/framework"
- e2esset "k8s.io/kubernetes/test/e2e/framework/statefulset"
- "k8s.io/kubernetes/test/e2e/framework/testfiles"
- )
- const cassandraManifestPath = "test/e2e/testing-manifests/statefulset/cassandra"
- // CassandraUpgradeTest ups and verifies that a Cassandra StatefulSet behaves
- // well across upgrades.
- type CassandraUpgradeTest struct {
- ip string
- successfulWrites int
- }
- // Name returns the tracking name of the test.
- func (CassandraUpgradeTest) Name() string { return "cassandra-upgrade" }
- // Skip returns true when this test can be skipped.
- func (CassandraUpgradeTest) Skip(upgCtx UpgradeContext) bool {
- minVersion := version.MustParseSemantic("1.6.0")
- for _, vCtx := range upgCtx.Versions {
- if vCtx.Version.LessThan(minVersion) {
- return true
- }
- }
- return false
- }
- func cassandraKubectlCreate(ns, file string) {
- input := string(testfiles.ReadOrDie(filepath.Join(cassandraManifestPath, file)))
- framework.RunKubectlOrDieInput(ns, input, "create", "-f", "-", fmt.Sprintf("--namespace=%s", ns))
- }
- // Setup creates a Cassandra StatefulSet and a PDB. It also brings up a tester
- // ReplicaSet and associated service and PDB to guarantee availability during
- // the upgrade.
- // It waits for the system to stabilize before adding two users to verify
- // connectivity.
- func (t *CassandraUpgradeTest) Setup(f *framework.Framework) {
- ns := f.Namespace.Name
- statefulsetPoll := 30 * time.Second
- statefulsetTimeout := 10 * time.Minute
- ginkgo.By("Creating a PDB")
- cassandraKubectlCreate(ns, "pdb.yaml")
- ginkgo.By("Creating a Cassandra StatefulSet")
- e2esset.CreateStatefulSet(f.ClientSet, cassandraManifestPath, ns)
- ginkgo.By("Creating a cassandra-test-server deployment")
- cassandraKubectlCreate(ns, "tester.yaml")
- ginkgo.By("Getting the ingress IPs from the services")
- err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
- if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" {
- return false, nil
- }
- if _, err := t.listUsers(); err != nil {
- framework.Logf("Service endpoint is up but isn't responding")
- return false, nil
- }
- return true, nil
- })
- framework.ExpectNoError(err)
- framework.Logf("Service endpoint is up")
- ginkgo.By("Adding 2 dummy users")
- err = t.addUser("Alice")
- framework.ExpectNoError(err)
- err = t.addUser("Bob")
- framework.ExpectNoError(err)
- t.successfulWrites = 2
- ginkgo.By("Verifying that the users exist")
- users, err := t.listUsers()
- framework.ExpectNoError(err)
- framework.ExpectEqual(len(users), 2)
- }
- // listUsers gets a list of users from the db via the tester service.
- func (t *CassandraUpgradeTest) listUsers() ([]string, error) {
- r, err := http.Get(fmt.Sprintf("http://%s:8080/list", t.ip))
- if err != nil {
- return nil, err
- }
- defer r.Body.Close()
- if r.StatusCode != http.StatusOK {
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return nil, err
- }
- return nil, fmt.Errorf(string(b))
- }
- var names []string
- if err := json.NewDecoder(r.Body).Decode(&names); err != nil {
- return nil, err
- }
- return names, nil
- }
- // addUser adds a user to the db via the tester services.
- func (t *CassandraUpgradeTest) addUser(name string) error {
- val := map[string][]string{"name": {name}}
- r, err := http.PostForm(fmt.Sprintf("http://%s:8080/add", t.ip), val)
- if err != nil {
- return err
- }
- defer r.Body.Close()
- if r.StatusCode != http.StatusOK {
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return err
- }
- return fmt.Errorf(string(b))
- }
- return nil
- }
- // getServiceIP is a helper method to extract the Ingress IP from the service.
- func (t *CassandraUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string {
- svc, err := f.ClientSet.CoreV1().Services(ns).Get(context.TODO(), svcName, metav1.GetOptions{})
- framework.ExpectNoError(err)
- ingress := svc.Status.LoadBalancer.Ingress
- if len(ingress) == 0 {
- return ""
- }
- return ingress[0].IP
- }
- // Test is called during the upgrade.
- // It launches two goroutines, one continuously writes to the db and one reads
- // from the db. Each attempt is tallied and at the end we verify if the success
- // ratio is over a certain threshold (0.75). We also verify that we get
- // at least the same number of rows back as we successfully wrote.
- func (t *CassandraUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
- ginkgo.By("Continuously polling the database during upgrade.")
- var (
- success, failures, writeAttempts, lastUserCount int
- mu sync.Mutex
- errors = map[string]int{}
- )
- // Write loop.
- go wait.Until(func() {
- writeAttempts++
- if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil {
- framework.Logf("Unable to add user: %v", err)
- mu.Lock()
- errors[err.Error()]++
- mu.Unlock()
- return
- }
- t.successfulWrites++
- }, 10*time.Millisecond, done)
- // Read loop.
- wait.Until(func() {
- users, err := t.listUsers()
- if err != nil {
- framework.Logf("Could not retrieve users: %v", err)
- failures++
- mu.Lock()
- errors[err.Error()]++
- mu.Unlock()
- return
- }
- success++
- lastUserCount = len(users)
- }, 10*time.Millisecond, done)
- framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites)
- framework.ExpectEqual(lastUserCount >= t.successfulWrites, true)
- ratio := float64(success) / float64(success+failures)
- framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio)
- ratio = float64(t.successfulWrites) / float64(writeAttempts)
- framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio)
- framework.Logf("Errors: %v", errors)
- // TODO(maisem): tweak this value once we have a few test runs.
- framework.ExpectEqual(ratio > 0.75, true)
- }
- // Teardown does one final check of the data's availability.
- func (t *CassandraUpgradeTest) Teardown(f *framework.Framework) {
- users, err := t.listUsers()
- framework.ExpectNoError(err)
- framework.ExpectEqual(len(users) >= t.successfulWrites, true)
- }
|