rollback_v2.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "encoding/json"
  16. "os"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "time"
  21. // Uncomment when you want to rollback to 2.2.1 version.
  22. oldwal "k8s.io/kubernetes/third_party/forked/etcd221/wal"
  23. // Uncomment when you want to rollback to 2.3.7 version.
  24. // oldwal "k8s.io/kubernetes/third_party/forked/etcd237/wal"
  25. "github.com/coreos/etcd/etcdserver"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/etcdserver/membership"
  28. "github.com/coreos/etcd/mvcc/backend"
  29. "github.com/coreos/etcd/mvcc/mvccpb"
  30. "github.com/coreos/etcd/pkg/pbutil"
  31. "github.com/coreos/etcd/pkg/types"
  32. "github.com/coreos/etcd/raft/raftpb"
  33. "github.com/coreos/etcd/snap"
  34. "github.com/coreos/etcd/store"
  35. "github.com/coreos/etcd/wal"
  36. "github.com/coreos/etcd/wal/walpb"
  37. "github.com/coreos/go-semver/semver"
  38. "k8s.io/klog"
  39. )
  40. const rollbackVersion = "2.2.0"
  41. // RollbackV3ToV2 rolls back an etcd 3.0.x data directory to the 2.x.x version specified by rollbackVersion.
  42. func RollbackV3ToV2(migrateDatadir string, ttl time.Duration) error {
  43. dbpath := path.Join(migrateDatadir, "member", "snap", "db")
  44. klog.Infof("Rolling db file %s back to etcd 2.x", dbpath)
  45. // etcd3 store backend. We will use it to parse v3 data files and extract information.
  46. be := backend.NewDefaultBackend(dbpath)
  47. tx := be.BatchTx()
  48. // etcd2 store backend. We will use v3 data to update this and then save snapshot to disk.
  49. st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
  50. expireTime := time.Now().Add(ttl)
  51. tx.Lock()
  52. err := tx.UnsafeForEach([]byte("key"), func(k, v []byte) error {
  53. kv := &mvccpb.KeyValue{}
  54. kv.Unmarshal(v)
  55. // This is compact key.
  56. if !strings.HasPrefix(string(kv.Key), "/") {
  57. return nil
  58. }
  59. ttlOpt := store.TTLOptionSet{}
  60. if kv.Lease != 0 {
  61. ttlOpt = store.TTLOptionSet{ExpireTime: expireTime}
  62. }
  63. if !isTombstone(k) {
  64. sk := path.Join(strings.Trim(etcdserver.StoreKeysPrefix, "/"), string(kv.Key))
  65. _, err := st.Set(sk, false, string(kv.Value), ttlOpt)
  66. if err != nil {
  67. return err
  68. }
  69. } else {
  70. st.Delete(string(kv.Key), false, false)
  71. }
  72. return nil
  73. })
  74. if err != nil {
  75. return err
  76. }
  77. tx.Unlock()
  78. if err := traverseAndDeleteEmptyDir(st, "/"); err != nil {
  79. return err
  80. }
  81. // rebuild cluster state.
  82. metadata, hardstate, oldSt, err := rebuild(migrateDatadir)
  83. if err != nil {
  84. return err
  85. }
  86. // In the following, it's low level logic that saves metadata and data into v2 snapshot.
  87. backupPath := migrateDatadir + ".rollback.backup"
  88. if err := os.Rename(migrateDatadir, backupPath); err != nil {
  89. return err
  90. }
  91. if err := os.MkdirAll(path.Join(migrateDatadir, "member", "snap"), 0777); err != nil {
  92. return err
  93. }
  94. walDir := path.Join(migrateDatadir, "member", "wal")
  95. w, err := oldwal.Create(walDir, metadata)
  96. if err != nil {
  97. return err
  98. }
  99. err = w.SaveSnapshot(walpb.Snapshot{Index: hardstate.Commit, Term: hardstate.Term})
  100. w.Close()
  101. if err != nil {
  102. return err
  103. }
  104. event, err := oldSt.Get(etcdserver.StoreClusterPrefix, true, false)
  105. if err != nil {
  106. return err
  107. }
  108. // nodes (members info) for ConfState
  109. nodes := []uint64{}
  110. traverseMetadata(event.Node, func(n *store.NodeExtern) {
  111. if n.Key != etcdserver.StoreClusterPrefix {
  112. // update store metadata
  113. v := ""
  114. if !n.Dir {
  115. v = *n.Value
  116. }
  117. if n.Key == path.Join(etcdserver.StoreClusterPrefix, "version") {
  118. v = rollbackVersion
  119. }
  120. if _, err := st.Set(n.Key, n.Dir, v, store.TTLOptionSet{}); err != nil {
  121. klog.Error(err)
  122. }
  123. // update nodes
  124. fields := strings.Split(n.Key, "/")
  125. if len(fields) == 4 && fields[2] == "members" {
  126. nodeID, err := strconv.ParseUint(fields[3], 16, 64)
  127. if err != nil {
  128. klog.Fatalf("failed to parse member ID (%s): %v", fields[3], err)
  129. }
  130. nodes = append(nodes, nodeID)
  131. }
  132. }
  133. })
  134. data, err := st.Save()
  135. if err != nil {
  136. return err
  137. }
  138. raftSnap := raftpb.Snapshot{
  139. Data: data,
  140. Metadata: raftpb.SnapshotMetadata{
  141. Index: hardstate.Commit,
  142. Term: hardstate.Term,
  143. ConfState: raftpb.ConfState{
  144. Nodes: nodes,
  145. },
  146. },
  147. }
  148. snapshotter := snap.New(path.Join(migrateDatadir, "member", "snap"))
  149. if err := snapshotter.SaveSnap(raftSnap); err != nil {
  150. return err
  151. }
  152. klog.Infof("Finished successfully")
  153. return nil
  154. }
  155. func traverseMetadata(head *store.NodeExtern, handleFunc func(*store.NodeExtern)) {
  156. q := []*store.NodeExtern{head}
  157. for len(q) > 0 {
  158. n := q[0]
  159. q = q[1:]
  160. handleFunc(n)
  161. q = append(q, n.Nodes...)
  162. }
  163. }
  164. const (
  165. revBytesLen = 8 + 1 + 8
  166. markedRevBytesLen = revBytesLen + 1
  167. markBytePosition = markedRevBytesLen - 1
  168. markTombstone byte = 't'
  169. )
  170. func isTombstone(b []byte) bool {
  171. return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
  172. }
  173. func traverseAndDeleteEmptyDir(st store.Store, dir string) error {
  174. e, err := st.Get(dir, true, false)
  175. if err != nil {
  176. return err
  177. }
  178. if len(e.Node.Nodes) == 0 {
  179. st.Delete(dir, true, true)
  180. return nil
  181. }
  182. for _, node := range e.Node.Nodes {
  183. if !node.Dir {
  184. klog.V(2).Infof("key: %s", node.Key[len(etcdserver.StoreKeysPrefix):])
  185. } else {
  186. err := traverseAndDeleteEmptyDir(st, node.Key)
  187. if err != nil {
  188. return err
  189. }
  190. }
  191. }
  192. return nil
  193. }
  194. func rebuild(datadir string) ([]byte, *raftpb.HardState, store.Store, error) {
  195. waldir := path.Join(datadir, "member", "wal")
  196. snapdir := path.Join(datadir, "member", "snap")
  197. ss := snap.New(snapdir)
  198. snapshot, err := ss.Load()
  199. if err != nil && err != snap.ErrNoSnapshot {
  200. return nil, nil, nil, err
  201. }
  202. var walsnap walpb.Snapshot
  203. if snapshot != nil {
  204. walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
  205. }
  206. w, err := wal.OpenForRead(waldir, walsnap)
  207. if err != nil {
  208. return nil, nil, nil, err
  209. }
  210. defer w.Close()
  211. meta, hardstate, ents, err := w.ReadAll()
  212. if err != nil {
  213. return nil, nil, nil, err
  214. }
  215. st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
  216. if snapshot != nil {
  217. err := st.Recovery(snapshot.Data)
  218. if err != nil {
  219. return nil, nil, nil, err
  220. }
  221. }
  222. cluster := membership.NewCluster("")
  223. cluster.SetStore(st)
  224. cluster.Recover(func(*semver.Version) {})
  225. applier := etcdserver.NewApplierV2(st, cluster)
  226. for _, ent := range ents {
  227. if ent.Type == raftpb.EntryConfChange {
  228. var cc raftpb.ConfChange
  229. pbutil.MustUnmarshal(&cc, ent.Data)
  230. switch cc.Type {
  231. case raftpb.ConfChangeAddNode:
  232. m := new(membership.Member)
  233. if err := json.Unmarshal(cc.Context, m); err != nil {
  234. return nil, nil, nil, err
  235. }
  236. cluster.AddMember(m)
  237. case raftpb.ConfChangeRemoveNode:
  238. id := types.ID(cc.NodeID)
  239. cluster.RemoveMember(id)
  240. case raftpb.ConfChangeUpdateNode:
  241. m := new(membership.Member)
  242. if err := json.Unmarshal(cc.Context, m); err != nil {
  243. return nil, nil, nil, err
  244. }
  245. cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
  246. }
  247. continue
  248. }
  249. var raftReq pb.InternalRaftRequest
  250. if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
  251. var r pb.Request
  252. pbutil.MustUnmarshal(&r, ent.Data)
  253. applyRequest(&r, applier)
  254. } else {
  255. if raftReq.V2 != nil {
  256. req := raftReq.V2
  257. applyRequest(req, applier)
  258. }
  259. }
  260. }
  261. return meta, &hardstate, st, nil
  262. }
  263. func toTTLOptions(r *pb.Request) store.TTLOptionSet {
  264. refresh, _ := pbutil.GetBool(r.Refresh)
  265. ttlOptions := store.TTLOptionSet{Refresh: refresh}
  266. if r.Expiration != 0 {
  267. ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
  268. }
  269. return ttlOptions
  270. }
  271. func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
  272. // TODO: find a sane way to perform this cast or avoid it in the first place
  273. reqV2 := &etcdserver.RequestV2{
  274. ID: r.ID,
  275. Method: r.Method,
  276. Path: r.Path,
  277. Val: r.Val,
  278. Dir: r.Dir,
  279. PrevValue: r.PrevValue,
  280. PrevIndex: r.PrevIndex,
  281. PrevExist: r.PrevExist,
  282. Expiration: r.Expiration,
  283. Wait: r.Wait,
  284. Since: r.Since,
  285. Recursive: r.Recursive,
  286. Sorted: r.Sorted,
  287. Quorum: r.Quorum,
  288. Time: r.Time,
  289. Stream: r.Stream,
  290. Refresh: r.Refresh,
  291. XXX_unrecognized: r.XXX_unrecognized,
  292. }
  293. toTTLOptions(r)
  294. switch r.Method {
  295. case "PUT":
  296. applyV2.Put(reqV2)
  297. case "DELETE":
  298. applyV2.Delete(reqV2)
  299. case "POST", "QGET", "SYNC":
  300. return
  301. default:
  302. klog.Fatal("unknown command")
  303. }
  304. }