migrate_client.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "os"
  19. "os/exec"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. clientv2 "go.etcd.io/etcd/client"
  24. "go.etcd.io/etcd/clientv3"
  25. "google.golang.org/grpc"
  26. "k8s.io/klog"
  27. )
  28. // CombinedEtcdClient provides an implementation of EtcdMigrateClient using a combination of the etcd v2 client, v3 client
  29. // and etcdctl commands called via the shell.
  30. type CombinedEtcdClient struct {
  31. cfg *EtcdMigrateCfg
  32. }
  33. // NewEtcdMigrateClient creates a new EtcdMigrateClient from the given EtcdMigrateCfg.
  34. func NewEtcdMigrateClient(cfg *EtcdMigrateCfg) (EtcdMigrateClient, error) {
  35. return &CombinedEtcdClient{cfg}, nil
  36. }
  37. // Close closes the client and releases any resources it holds.
  38. func (e *CombinedEtcdClient) Close() error {
  39. return nil
  40. }
  41. // SetEtcdVersionKeyValue writes the given version to the etcd 'etcd_version' key.
  42. // If no error is returned, the write was successful, indicating the etcd server is available
  43. // and able to perform consensus writes.
  44. func (e *CombinedEtcdClient) SetEtcdVersionKeyValue(version *EtcdVersion) error {
  45. return e.Put(version, "etcd_version", version.String())
  46. }
  47. // Put write a single key value pair to etcd.
  48. func (e *CombinedEtcdClient) Put(version *EtcdVersion, key, value string) error {
  49. if version.Major == 2 {
  50. v2client, err := e.clientV2()
  51. if err != nil {
  52. return err
  53. }
  54. _, err = v2client.Set(context.Background(), key, value, nil)
  55. return err
  56. }
  57. v3client, err := e.clientV3()
  58. if err != nil {
  59. return err
  60. }
  61. defer v3client.Close()
  62. _, err = v3client.KV.Put(context.Background(), key, value)
  63. return err
  64. }
  65. // Get reads a single value for a given key.
  66. func (e *CombinedEtcdClient) Get(version *EtcdVersion, key string) (string, error) {
  67. if version.Major == 2 {
  68. v2client, err := e.clientV2()
  69. if err != nil {
  70. return "", err
  71. }
  72. resp, err := v2client.Get(context.Background(), key, nil)
  73. if err != nil {
  74. return "", err
  75. }
  76. return resp.Node.Value, nil
  77. }
  78. v3client, err := e.clientV3()
  79. if err != nil {
  80. return "", err
  81. }
  82. defer v3client.Close()
  83. resp, err := v3client.KV.Get(context.Background(), key)
  84. if err != nil {
  85. return "", err
  86. }
  87. kvs := resp.Kvs
  88. if len(kvs) != 1 {
  89. return "", fmt.Errorf("expected exactly one value for key %s but got %d", key, len(kvs))
  90. }
  91. return string(kvs[0].Value), nil
  92. }
  93. func (e *CombinedEtcdClient) clientV2() (clientv2.KeysAPI, error) {
  94. v2client, err := clientv2.New(clientv2.Config{Endpoints: []string{e.endpoint()}})
  95. if err != nil {
  96. return nil, err
  97. }
  98. return clientv2.NewKeysAPI(v2client), nil
  99. }
  100. func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
  101. return clientv3.New(clientv3.Config{
  102. Endpoints: []string{e.endpoint()},
  103. DialTimeout: 20 * time.Second,
  104. DialOptions: []grpc.DialOption{
  105. grpc.WithBlock(), // block until the underlying connection is up
  106. },
  107. })
  108. }
  109. // Backup creates a backup of an etcd2 data directory at the given backupDir.
  110. func (e *CombinedEtcdClient) Backup(version *EtcdVersion, backupDir string) error {
  111. // We cannot use etcd/client (v2) to make this call. It is implemented in the etcdctl client code.
  112. if version.Major != 2 {
  113. return fmt.Errorf("etcd 2.x required but got version '%s'", version)
  114. }
  115. return e.runEtcdctlCommand(version,
  116. "--debug",
  117. "backup",
  118. "--data-dir", e.cfg.dataDirectory,
  119. "--backup-dir", backupDir,
  120. )
  121. }
  122. // Snapshot captures a snapshot from a running etcd3 server and saves it to the given snapshotFile.
  123. // We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
  124. func (e *CombinedEtcdClient) Snapshot(version *EtcdVersion, snapshotFile string) error {
  125. if version.Major != 3 {
  126. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  127. }
  128. return e.runEtcdctlCommand(version,
  129. "--endpoints", e.endpoint(),
  130. "snapshot", "save", snapshotFile,
  131. )
  132. }
  133. // Restore restores a given snapshotFile into the data directory specified this clients config.
  134. func (e *CombinedEtcdClient) Restore(version *EtcdVersion, snapshotFile string) error {
  135. // We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
  136. if version.Major != 3 {
  137. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  138. }
  139. return e.runEtcdctlCommand(version,
  140. "snapshot", "restore", snapshotFile,
  141. "--data-dir", e.cfg.dataDirectory,
  142. "--name", e.cfg.name,
  143. "--initial-advertise-peer-urls", e.cfg.peerAdvertiseUrls,
  144. "--initial-cluster", e.cfg.initialCluster,
  145. )
  146. }
  147. // Migrate upgrades a 'etcd2' storage version data directory to a 'etcd3' storage version
  148. // data directory.
  149. func (e *CombinedEtcdClient) Migrate(version *EtcdVersion) error {
  150. // We cannot use etcd/clientv3 to make this call as it is implemented in etcd/etcdctl.
  151. if version.Major != 3 {
  152. return fmt.Errorf("etcd 3.x required but got version '%s'", version)
  153. }
  154. return e.runEtcdctlCommand(version,
  155. "migrate",
  156. "--data-dir", e.cfg.dataDirectory,
  157. )
  158. }
  159. func (e *CombinedEtcdClient) runEtcdctlCommand(version *EtcdVersion, args ...string) error {
  160. etcdctlCmd := exec.Command(filepath.Join(e.cfg.binPath, fmt.Sprintf("etcdctl-%s", version)), args...)
  161. etcdctlCmd.Env = []string{fmt.Sprintf("ETCDCTL_API=%d", version.Major)}
  162. etcdctlCmd.Stdout = os.Stdout
  163. etcdctlCmd.Stderr = os.Stderr
  164. return etcdctlCmd.Run()
  165. }
  166. // AttachLease attaches leases of the given leaseDuration to all the etcd objects under
  167. // ttlKeysDirectory specified in this client's config.
  168. func (e *CombinedEtcdClient) AttachLease(leaseDuration time.Duration) error {
  169. ttlKeysPrefix := e.cfg.ttlKeysDirectory
  170. // Make sure that ttlKeysPrefix is ended with "/" so that we only get children "directories".
  171. if !strings.HasSuffix(ttlKeysPrefix, "/") {
  172. ttlKeysPrefix += "/"
  173. }
  174. ctx := context.Background()
  175. v3client, err := e.clientV3()
  176. if err != nil {
  177. return err
  178. }
  179. defer v3client.Close()
  180. objectsResp, err := v3client.KV.Get(ctx, ttlKeysPrefix, clientv3.WithPrefix())
  181. if err != nil {
  182. return fmt.Errorf("error while getting objects to attach to the lease")
  183. }
  184. lease, err := v3client.Lease.Grant(ctx, int64(leaseDuration/time.Second))
  185. if err != nil {
  186. return fmt.Errorf("error while creating lease: %v", err)
  187. }
  188. klog.Infof("Lease with TTL: %v created", lease.TTL)
  189. klog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
  190. for _, kv := range objectsResp.Kvs {
  191. putResp, err := v3client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID), clientv3.WithPrevKV())
  192. if err != nil {
  193. klog.Errorf("Error while attaching lease to: %s", string(kv.Key))
  194. }
  195. if !bytes.Equal(putResp.PrevKv.Value, kv.Value) {
  196. return fmt.Errorf("concurrent access to key detected when setting lease on %s, expected previous value of %s but got %s",
  197. kv.Key, kv.Value, putResp.PrevKv.Value)
  198. }
  199. }
  200. return nil
  201. }
  202. func (e *CombinedEtcdClient) endpoint() string {
  203. return fmt.Sprintf("http://127.0.0.1:%d", e.cfg.port)
  204. }