migrate_client.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "fmt"
  17. "os"
  18. "os/exec"
  19. "path/filepath"
  20. "strings"
  21. "time"
  22. "context"
  23. clientv2 "github.com/coreos/etcd/client"
  24. "github.com/coreos/etcd/clientv3"
  25. "k8s.io/klog"
  26. )
  27. // CombinedEtcdClient provides an implementation of EtcdMigrateClient using a combination of the etcd v2 client, v3 client
  28. // and etcdctl commands called via the shell.
  29. type CombinedEtcdClient struct {
  30. cfg *EtcdMigrateCfg
  31. }
  32. // NewEtcdMigrateClient creates a new EtcdMigrateClient from the given EtcdMigrateCfg.
  33. func NewEtcdMigrateClient(cfg *EtcdMigrateCfg) (EtcdMigrateClient, error) {
  34. return &CombinedEtcdClient{cfg}, nil
  35. }
  36. // Close closes the client and releases any resources it holds.
  37. func (e *CombinedEtcdClient) Close() error {
  38. return nil
  39. }
  40. // SetEtcdVersionKeyValue writes the given version to the etcd 'etcd_version' key.
  41. // If no error is returned, the write was successful, indicating the etcd server is available
  42. // and able to perform consensus writes.
  43. func (e *CombinedEtcdClient) SetEtcdVersionKeyValue(version *EtcdVersion) error {
  44. return e.Put(version, "etcd_version", version.String())
  45. }
  46. // Put write a single key value pair to etcd.
  47. func (e *CombinedEtcdClient) Put(version *EtcdVersion, key, value string) error {
  48. if version.Major == 2 {
  49. v2client, err := e.clientV2()
  50. if err != nil {
  51. return err
  52. }
  53. _, err = v2client.Set(context.Background(), key, value, nil)
  54. return err
  55. }
  56. v3client, err := e.clientV3()
  57. if err != nil {
  58. return err
  59. }
  60. defer v3client.Close()
  61. _, err = v3client.KV.Put(context.Background(), key, value)
  62. return err
  63. }
  64. // Get reads a single value for a given key.
  65. func (e *CombinedEtcdClient) Get(version *EtcdVersion, key string) (string, error) {
  66. if version.Major == 2 {
  67. v2client, err := e.clientV2()
  68. if err != nil {
  69. return "", err
  70. }
  71. resp, err := v2client.Get(context.Background(), key, nil)
  72. if err != nil {
  73. return "", err
  74. }
  75. return resp.Node.Value, nil
  76. }
  77. v3client, err := e.clientV3()
  78. if err != nil {
  79. return "", err
  80. }
  81. defer v3client.Close()
  82. resp, err := v3client.KV.Get(context.Background(), key)
  83. if err != nil {
  84. return "", err
  85. }
  86. kvs := resp.Kvs
  87. if len(kvs) != 1 {
  88. return "", fmt.Errorf("expected exactly one value for key %s but got %d", key, len(kvs))
  89. }
  90. return string(kvs[0].Value), nil
  91. }
  92. func (e *CombinedEtcdClient) clientV2() (clientv2.KeysAPI, error) {
  93. v2client, err := clientv2.New(clientv2.Config{Endpoints: []string{e.endpoint()}})
  94. if err != nil {
  95. return nil, err
  96. }
  97. return clientv2.NewKeysAPI(v2client), nil
  98. }
  99. func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
  100. return clientv3.New(clientv3.Config{Endpoints: []string{e.endpoint()}})
  101. }
  102. // Backup creates a backup of an etcd2 data directory at the given backupDir.
  103. func (e *CombinedEtcdClient) Backup(version *EtcdVersion, backupDir string) error {
  104. // We cannot use etcd/client (v2) to make this call. It is implemented in the etcdctl client code.
  105. if version.Major != 2 {
  106. return fmt.Errorf("etcd 2.x required but got version '%s'", version)
  107. }
  108. return e.runEtcdctlCommand(version,
  109. "--debug",
  110. "backup",
  111. "--data-dir", e.cfg.dataDirectory,
  112. "--backup-dir", backupDir,
  113. )
  114. }
  115. // Snapshot captures a snapshot from a running etcd3 server and saves it to the given snapshotFile.
  116. // We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
  117. func (e *CombinedEtcdClient) Snapshot(version *EtcdVersion, snapshotFile string) error {
  118. if version.Major != 3 {
  119. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  120. }
  121. return e.runEtcdctlCommand(version,
  122. "--endpoints", e.endpoint(),
  123. "snapshot", "save", snapshotFile,
  124. )
  125. }
  126. // Restore restores a given snapshotFile into the data directory specified this clients config.
  127. func (e *CombinedEtcdClient) Restore(version *EtcdVersion, snapshotFile string) error {
  128. // We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
  129. if version.Major != 3 {
  130. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  131. }
  132. return e.runEtcdctlCommand(version,
  133. "snapshot", "restore", snapshotFile,
  134. "--data-dir", e.cfg.dataDirectory,
  135. "--name", e.cfg.name,
  136. "--initial-advertise-peer-urls", e.cfg.peerAdvertiseUrls,
  137. "--initial-cluster", e.cfg.initialCluster,
  138. )
  139. }
  140. // Migrate upgrades a 'etcd2' storage version data directory to a 'etcd3' storage version
  141. // data directory.
  142. func (e *CombinedEtcdClient) Migrate(version *EtcdVersion) error {
  143. // We cannot use etcd/clientv3 to make this call as it is implemented in etcd/etcdctl.
  144. if version.Major != 3 {
  145. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  146. }
  147. return e.runEtcdctlCommand(version,
  148. "migrate",
  149. "--data-dir", e.cfg.dataDirectory,
  150. )
  151. }
  152. func (e *CombinedEtcdClient) runEtcdctlCommand(version *EtcdVersion, args ...string) error {
  153. etcdctlCmd := exec.Command(filepath.Join(e.cfg.binPath, fmt.Sprintf("etcdctl-%s", version)), args...)
  154. etcdctlCmd.Env = []string{fmt.Sprintf("ETCDCTL_API=%d", version.Major)}
  155. etcdctlCmd.Stdout = os.Stdout
  156. etcdctlCmd.Stderr = os.Stderr
  157. return etcdctlCmd.Run()
  158. }
  159. // AttachLease attaches leases of the given leaseDuration to all the etcd objects under
  160. // ttlKeysDirectory specified in this client's config.
  161. func (e *CombinedEtcdClient) AttachLease(leaseDuration time.Duration) error {
  162. ttlKeysPrefix := e.cfg.ttlKeysDirectory
  163. // Make sure that ttlKeysPrefix is ended with "/" so that we only get children "directories".
  164. if !strings.HasSuffix(ttlKeysPrefix, "/") {
  165. ttlKeysPrefix += "/"
  166. }
  167. ctx := context.Background()
  168. v3client, err := e.clientV3()
  169. if err != nil {
  170. return err
  171. }
  172. defer v3client.Close()
  173. objectsResp, err := v3client.KV.Get(ctx, ttlKeysPrefix, clientv3.WithPrefix())
  174. if err != nil {
  175. return fmt.Errorf("Error while getting objects to attach to the lease")
  176. }
  177. lease, err := v3client.Lease.Grant(ctx, int64(leaseDuration/time.Second))
  178. if err != nil {
  179. return fmt.Errorf("Error while creating lease: %v", err)
  180. }
  181. klog.Infof("Lease with TTL: %v created", lease.TTL)
  182. klog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
  183. for _, kv := range objectsResp.Kvs {
  184. putResp, err := v3client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID), clientv3.WithPrevKV())
  185. if err != nil {
  186. klog.Errorf("Error while attaching lease to: %s", string(kv.Key))
  187. }
  188. if bytes.Compare(putResp.PrevKv.Value, kv.Value) != 0 {
  189. return fmt.Errorf("concurrent access to key detected when setting lease on %s, expected previous value of %s but got %s",
  190. kv.Key, kv.Value, putResp.PrevKv.Value)
  191. }
  192. }
  193. return nil
  194. }
  195. func (e *CombinedEtcdClient) endpoint() string {
  196. return fmt.Sprintf("http://127.0.0.1:%d", e.cfg.port)
  197. }