migrator.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "fmt"
  16. "os"
  17. "os/exec"
  18. "time"
  19. "github.com/blang/semver"
  20. "k8s.io/klog"
  21. )
  22. // EtcdMigrateCfg provides all configuration required to perform etcd data upgrade/downgrade migrations.
  23. type EtcdMigrateCfg struct {
  24. binPath string
  25. name string
  26. initialCluster string
  27. port uint64
  28. peerListenUrls string
  29. peerAdvertiseUrls string
  30. etcdDataPrefix string
  31. ttlKeysDirectory string
  32. supportedVersions SupportedVersions
  33. dataDirectory string
  34. etcdServerArgs string
  35. }
  36. // EtcdMigrateClient defines the etcd client operations required to perform migrations.
  37. type EtcdMigrateClient interface {
  38. SetEtcdVersionKeyValue(version *EtcdVersion) error
  39. Get(version *EtcdVersion, key string) (string, error)
  40. Put(version *EtcdVersion, key, value string) error
  41. Backup(version *EtcdVersion, backupDir string) error
  42. Snapshot(version *EtcdVersion, snapshotFile string) error
  43. Restore(version *EtcdVersion, snapshotFile string) error
  44. Migrate(version *EtcdVersion) error
  45. AttachLease(leaseDuration time.Duration) error
  46. Close() error
  47. }
  48. // Migrator manages etcd data migrations.
  49. type Migrator struct {
  50. cfg *EtcdMigrateCfg // TODO: don't wire this directly in
  51. dataDirectory *DataDirectory
  52. client EtcdMigrateClient
  53. }
  54. // MigrateIfNeeded upgrades or downgrades the etcd data directory to the given target version.
  55. func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
  56. klog.Infof("Starting migration to %s", target)
  57. err := m.dataDirectory.Initialize(target)
  58. if err != nil {
  59. return fmt.Errorf("failed to initialize data directory %s: %v", m.dataDirectory.path, err)
  60. }
  61. var current *EtcdVersionPair
  62. vfExists, err := m.dataDirectory.versionFile.Exists()
  63. if err != nil {
  64. return err
  65. }
  66. if vfExists {
  67. current, err = m.dataDirectory.versionFile.Read()
  68. if err != nil {
  69. return err
  70. }
  71. } else {
  72. return fmt.Errorf("existing data directory '%s' is missing version.txt file, unable to migrate", m.dataDirectory.path)
  73. }
  74. for {
  75. klog.Infof("Converging current version '%s' to target version '%s'", current, target)
  76. currentNextMinorVersion := &EtcdVersion{Version: semver.Version{Major: current.version.Major, Minor: current.version.Minor + 1}}
  77. switch {
  78. case current.version.MajorMinorEquals(target.version) || currentNextMinorVersion.MajorMinorEquals(target.version):
  79. klog.Infof("current version '%s' equals or is one minor version previous of target version '%s' - migration complete", current, target)
  80. err = m.dataDirectory.versionFile.Write(target)
  81. if err != nil {
  82. return fmt.Errorf("failed to write version.txt to '%s': %v", m.dataDirectory.path, err)
  83. }
  84. return nil
  85. case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
  86. return fmt.Errorf("upgrading from etcd2 storage to etcd3 storage is not supported")
  87. case current.version.Major == 3 && target.version.Major == 2:
  88. return fmt.Errorf("downgrading from etcd 3.x to 2.x is not supported")
  89. case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
  90. stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
  91. klog.Infof("upgrading etcd from %s to %s", current, stepVersion)
  92. current, err = m.minorVersionUpgrade(current, stepVersion)
  93. case current.version.Major == 3 && target.version.Major == 3 && current.version.Minor > target.version.Minor:
  94. klog.Infof("rolling etcd back from %s to %s", current, target)
  95. current, err = m.rollbackEtcd3MinorVersion(current, target)
  96. }
  97. if err != nil {
  98. return err
  99. }
  100. }
  101. }
  102. func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  103. if target.version.Minor != current.version.Minor-1 {
  104. return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
  105. }
  106. klog.Infof("Performing etcd %s -> %s rollback", current.version, target.version)
  107. err := m.dataDirectory.Backup()
  108. if err != nil {
  109. return nil, err
  110. }
  111. snapshotFilename := fmt.Sprintf("%s.snapshot.db", m.dataDirectory.path)
  112. err = os.Remove(snapshotFilename)
  113. if err != nil && !os.IsNotExist(err) {
  114. return nil, fmt.Errorf("failed to clean snapshot file before rollback: %v", err)
  115. }
  116. // Start current version of etcd.
  117. runner := m.newServer()
  118. klog.Infof("Starting etcd version %s to capture rollback snapshot.", current.version)
  119. err = runner.Start(current.version)
  120. if err != nil {
  121. klog.Fatalf("Unable to automatically downgrade etcd: starting etcd version %s to capture rollback snapshot failed: %v", current.version, err)
  122. return nil, err
  123. }
  124. klog.Infof("Snapshotting etcd %s to %s", current.version, snapshotFilename)
  125. err = m.client.Snapshot(current.version, snapshotFilename)
  126. if err != nil {
  127. return nil, err
  128. }
  129. err = runner.Stop()
  130. if err != nil {
  131. return nil, err
  132. }
  133. klog.Info("Backing up data before rolling back")
  134. backupDir := fmt.Sprintf("%s.bak", m.dataDirectory)
  135. err = os.RemoveAll(backupDir)
  136. if err != nil {
  137. return nil, err
  138. }
  139. origInfo, err := os.Stat(m.dataDirectory.path)
  140. if err != nil {
  141. return nil, err
  142. }
  143. err = exec.Command("mv", m.dataDirectory.path, backupDir).Run()
  144. if err != nil {
  145. return nil, err
  146. }
  147. klog.Infof("Restoring etcd %s from %s", target.version, snapshotFilename)
  148. err = m.client.Restore(target.version, snapshotFilename)
  149. if err != nil {
  150. return nil, err
  151. }
  152. err = os.Chmod(m.dataDirectory.path, origInfo.Mode())
  153. if err != nil {
  154. return nil, err
  155. }
  156. return target, nil
  157. }
  158. func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  159. runner := m.newServer()
  160. // Do the migration step, by just starting etcd in the target version.
  161. err := runner.Start(target.version)
  162. if err != nil {
  163. return nil, err
  164. }
  165. err = runner.Stop()
  166. return target, err
  167. }
  168. func (m *Migrator) newServer() *EtcdMigrateServer {
  169. return NewEtcdMigrateServer(m.cfg, m.client)
  170. }