migrator.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "fmt"
  16. "os"
  17. "os/exec"
  18. "time"
  19. "github.com/blang/semver"
  20. "k8s.io/klog"
  21. )
  22. // EtcdMigrateCfg provides all configuration required to perform etcd data upgrade/downgrade migrations.
  23. type EtcdMigrateCfg struct {
  24. binPath string
  25. name string
  26. initialCluster string
  27. port uint64
  28. peerListenUrls string
  29. peerAdvertiseUrls string
  30. etcdDataPrefix string
  31. ttlKeysDirectory string
  32. supportedVersions SupportedVersions
  33. dataDirectory string
  34. etcdServerArgs string
  35. }
  36. // EtcdMigrateClient defines the etcd client operations required to perform migrations.
  37. type EtcdMigrateClient interface {
  38. SetEtcdVersionKeyValue(version *EtcdVersion) error
  39. Get(version *EtcdVersion, key string) (string, error)
  40. Put(version *EtcdVersion, key, value string) error
  41. Backup(version *EtcdVersion, backupDir string) error
  42. Snapshot(version *EtcdVersion, snapshotFile string) error
  43. Restore(version *EtcdVersion, snapshotFile string) error
  44. Migrate(version *EtcdVersion) error
  45. AttachLease(leaseDuration time.Duration) error
  46. Close() error
  47. }
  48. // Migrator manages etcd data migrations.
  49. type Migrator struct {
  50. cfg *EtcdMigrateCfg // TODO: don't wire this directly in
  51. dataDirectory *DataDirectory
  52. client EtcdMigrateClient
  53. }
  54. // MigrateIfNeeded upgrades or downgrades the etcd data directory to the given target version.
  55. func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
  56. klog.Infof("Starting migration to %s", target)
  57. err := m.dataDirectory.Initialize(target)
  58. if err != nil {
  59. return fmt.Errorf("failed to initialize data directory %s: %v", m.dataDirectory.path, err)
  60. }
  61. var current *EtcdVersionPair
  62. vfExists, err := m.dataDirectory.versionFile.Exists()
  63. if err != nil {
  64. return err
  65. }
  66. if vfExists {
  67. current, err = m.dataDirectory.versionFile.Read()
  68. if err != nil {
  69. return err
  70. }
  71. } else {
  72. return fmt.Errorf("existing data directory '%s' is missing version.txt file, unable to migrate", m.dataDirectory.path)
  73. }
  74. for {
  75. klog.Infof("Converging current version '%s' to target version '%s'", current, target)
  76. currentNextMinorVersion := &EtcdVersion{Version: semver.Version{Major: current.version.Major, Minor: current.version.Minor + 1}}
  77. switch {
  78. case current.version.MajorMinorEquals(target.version) || currentNextMinorVersion.MajorMinorEquals(target.version):
  79. klog.Infof("current version '%s' equals or is one minor version previous of target version '%s' - migration complete", current, target)
  80. err = m.dataDirectory.versionFile.Write(target)
  81. if err != nil {
  82. return fmt.Errorf("failed to write version.txt to '%s': %v", m.dataDirectory.path, err)
  83. }
  84. return nil
  85. case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
  86. klog.Infof("upgrading from etcd2 storage to etcd3 storage")
  87. current, err = m.etcd2ToEtcd3Upgrade(current, target)
  88. case current.version.Major == 3 && target.version.Major == 2:
  89. klog.Infof("downgrading from etcd 3.x to 2.x")
  90. current, err = m.rollbackToEtcd2(current, target)
  91. case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
  92. stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
  93. klog.Infof("upgrading etcd from %s to %s", current, stepVersion)
  94. current, err = m.minorVersionUpgrade(current, stepVersion)
  95. case current.version.Major == 3 && target.version.Major == 3 && current.version.Minor > target.version.Minor:
  96. klog.Infof("rolling etcd back from %s to %s", current, target)
  97. current, err = m.rollbackEtcd3MinorVersion(current, target)
  98. }
  99. if err != nil {
  100. return err
  101. }
  102. }
  103. }
  104. func (m *Migrator) backupEtcd2(current *EtcdVersion) error {
  105. backupDir := fmt.Sprintf("%s/%s", m.dataDirectory, "migration-backup")
  106. klog.Infof("Backup etcd before starting migration")
  107. err := os.Mkdir(backupDir, 0666)
  108. if err != nil {
  109. return fmt.Errorf("failed to create backup directory before starting migration: %v", err)
  110. }
  111. m.client.Backup(current, backupDir)
  112. klog.Infof("Backup done in %s", backupDir)
  113. return nil
  114. }
  115. func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  116. if target.version.Minor != current.version.Minor-1 {
  117. return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
  118. }
  119. klog.Infof("Performing etcd %s -> %s rollback", current.version, target.version)
  120. err := m.dataDirectory.Backup()
  121. if err != nil {
  122. return nil, err
  123. }
  124. snapshotFilename := fmt.Sprintf("%s.snapshot.db", m.dataDirectory.path)
  125. err = os.Remove(snapshotFilename)
  126. if err != nil && !os.IsNotExist(err) {
  127. return nil, fmt.Errorf("failed to clean snapshot file before rollback: %v", err)
  128. }
  129. // Start current version of etcd.
  130. runner := m.newServer()
  131. klog.Infof("Starting etcd version %s to capture rollback snapshot.", current.version)
  132. err = runner.Start(current.version)
  133. if err != nil {
  134. klog.Fatalf("Unable to automatically downgrade etcd: starting etcd version %s to capture rollback snapshot failed: %v", current.version, err)
  135. return nil, err
  136. }
  137. klog.Infof("Snapshotting etcd %s to %s", current.version, snapshotFilename)
  138. err = m.client.Snapshot(current.version, snapshotFilename)
  139. if err != nil {
  140. return nil, err
  141. }
  142. err = runner.Stop()
  143. if err != nil {
  144. return nil, err
  145. }
  146. klog.Infof("Backing up data before rolling back")
  147. backupDir := fmt.Sprintf("%s.bak", m.dataDirectory)
  148. err = os.RemoveAll(backupDir)
  149. if err != nil {
  150. return nil, err
  151. }
  152. origInfo, err := os.Stat(m.dataDirectory.path)
  153. if err != nil {
  154. return nil, err
  155. }
  156. err = exec.Command("mv", m.dataDirectory.path, backupDir).Run()
  157. if err != nil {
  158. return nil, err
  159. }
  160. klog.Infof("Restoring etcd %s from %s", target.version, snapshotFilename)
  161. err = m.client.Restore(target.version, snapshotFilename)
  162. if err != nil {
  163. return nil, err
  164. }
  165. err = os.Chmod(m.dataDirectory.path, origInfo.Mode())
  166. if err != nil {
  167. return nil, err
  168. }
  169. return target, nil
  170. }
  171. func (m *Migrator) rollbackToEtcd2(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  172. if !(current.version.Major == 3 && current.version.Minor == 0 && target.version.Major == 2 && target.version.Minor == 2) {
  173. return nil, fmt.Errorf("etcd3 -> etcd2 downgrade is supported only between 3.0.x and 2.2.x, got current %s target %s", current, target)
  174. }
  175. klog.Infof("Backup and remove all existing v2 data")
  176. err := m.dataDirectory.Backup()
  177. if err != nil {
  178. return nil, err
  179. }
  180. err = RollbackV3ToV2(m.dataDirectory.path, time.Hour)
  181. if err != nil {
  182. return nil, fmt.Errorf("rollback to etcd 2.x failed: %v", err)
  183. }
  184. return target, nil
  185. }
  186. func (m *Migrator) etcd2ToEtcd3Upgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  187. if current.storageVersion != storageEtcd2 || target.version.Major != 3 || target.storageVersion != storageEtcd3 {
  188. return nil, fmt.Errorf("etcd2 to etcd3 upgrade is supported only for x.x.x/etcd2 to 3.0.x/etcd3, got current %s target %s", current, target)
  189. }
  190. runner := m.newServer()
  191. klog.Infof("Performing etcd2 -> etcd3 migration")
  192. err := m.client.Migrate(target.version)
  193. if err != nil {
  194. return nil, err
  195. }
  196. klog.Infof("Attaching leases to TTL entries")
  197. // Now attach lease to all keys.
  198. // To do it, we temporarily start etcd on a random port (so that
  199. // apiserver actually cannot access it).
  200. err = runner.Start(target.version)
  201. if err != nil {
  202. return nil, err
  203. }
  204. defer func() {
  205. err = runner.Stop()
  206. }()
  207. // Create a lease and attach all keys to it.
  208. err = m.client.AttachLease(1 * time.Hour)
  209. if err != nil {
  210. return nil, err
  211. }
  212. return target, err
  213. }
  214. func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
  215. runner := m.newServer()
  216. // Do the migration step, by just starting etcd in the target version.
  217. err := runner.Start(target.version)
  218. if err != nil {
  219. return nil, err
  220. }
  221. err = runner.Stop()
  222. return target, err
  223. }
  224. func (m *Migrator) newServer() *EtcdMigrateServer {
  225. return NewEtcdMigrateServer(m.cfg, m.client)
  226. }