123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package main
- import (
- "encoding/json"
- "os"
- "path"
- "strconv"
- "strings"
- "time"
- // Uncomment when you want to rollback to 2.2.1 version.
- oldwal "k8s.io/kubernetes/third_party/forked/etcd221/wal"
- // Uncomment when you want to rollback to 2.3.7 version.
- // oldwal "k8s.io/kubernetes/third_party/forked/etcd237/wal"
- "github.com/coreos/etcd/etcdserver"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/etcdserver/membership"
- "github.com/coreos/etcd/mvcc/backend"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "github.com/coreos/etcd/pkg/pbutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/snap"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/wal"
- "github.com/coreos/etcd/wal/walpb"
- "github.com/coreos/go-semver/semver"
- "k8s.io/klog"
- )
- const rollbackVersion = "2.2.0"
- // RollbackV3ToV2 rolls back an etcd 3.0.x data directory to the 2.x.x version specified by rollbackVersion.
- func RollbackV3ToV2(migrateDatadir string, ttl time.Duration) error {
- dbpath := path.Join(migrateDatadir, "member", "snap", "db")
- klog.Infof("Rolling db file %s back to etcd 2.x", dbpath)
- // etcd3 store backend. We will use it to parse v3 data files and extract information.
- be := backend.NewDefaultBackend(dbpath)
- tx := be.BatchTx()
- // etcd2 store backend. We will use v3 data to update this and then save snapshot to disk.
- st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
- expireTime := time.Now().Add(ttl)
- tx.Lock()
- err := tx.UnsafeForEach([]byte("key"), func(k, v []byte) error {
- kv := &mvccpb.KeyValue{}
- kv.Unmarshal(v)
- // This is compact key.
- if !strings.HasPrefix(string(kv.Key), "/") {
- return nil
- }
- ttlOpt := store.TTLOptionSet{}
- if kv.Lease != 0 {
- ttlOpt = store.TTLOptionSet{ExpireTime: expireTime}
- }
- if !isTombstone(k) {
- sk := path.Join(strings.Trim(etcdserver.StoreKeysPrefix, "/"), string(kv.Key))
- _, err := st.Set(sk, false, string(kv.Value), ttlOpt)
- if err != nil {
- return err
- }
- } else {
- st.Delete(string(kv.Key), false, false)
- }
- return nil
- })
- if err != nil {
- return err
- }
- tx.Unlock()
- if err := traverseAndDeleteEmptyDir(st, "/"); err != nil {
- return err
- }
- // rebuild cluster state.
- metadata, hardstate, oldSt, err := rebuild(migrateDatadir)
- if err != nil {
- return err
- }
- // In the following, it's low level logic that saves metadata and data into v2 snapshot.
- backupPath := migrateDatadir + ".rollback.backup"
- if err := os.Rename(migrateDatadir, backupPath); err != nil {
- return err
- }
- if err := os.MkdirAll(path.Join(migrateDatadir, "member", "snap"), 0777); err != nil {
- return err
- }
- walDir := path.Join(migrateDatadir, "member", "wal")
- w, err := oldwal.Create(walDir, metadata)
- if err != nil {
- return err
- }
- err = w.SaveSnapshot(walpb.Snapshot{Index: hardstate.Commit, Term: hardstate.Term})
- w.Close()
- if err != nil {
- return err
- }
- event, err := oldSt.Get(etcdserver.StoreClusterPrefix, true, false)
- if err != nil {
- return err
- }
- // nodes (members info) for ConfState
- nodes := []uint64{}
- traverseMetadata(event.Node, func(n *store.NodeExtern) {
- if n.Key != etcdserver.StoreClusterPrefix {
- // update store metadata
- v := ""
- if !n.Dir {
- v = *n.Value
- }
- if n.Key == path.Join(etcdserver.StoreClusterPrefix, "version") {
- v = rollbackVersion
- }
- if _, err := st.Set(n.Key, n.Dir, v, store.TTLOptionSet{}); err != nil {
- klog.Error(err)
- }
- // update nodes
- fields := strings.Split(n.Key, "/")
- if len(fields) == 4 && fields[2] == "members" {
- nodeID, err := strconv.ParseUint(fields[3], 16, 64)
- if err != nil {
- klog.Fatalf("failed to parse member ID (%s): %v", fields[3], err)
- }
- nodes = append(nodes, nodeID)
- }
- }
- })
- data, err := st.Save()
- if err != nil {
- return err
- }
- raftSnap := raftpb.Snapshot{
- Data: data,
- Metadata: raftpb.SnapshotMetadata{
- Index: hardstate.Commit,
- Term: hardstate.Term,
- ConfState: raftpb.ConfState{
- Nodes: nodes,
- },
- },
- }
- snapshotter := snap.New(path.Join(migrateDatadir, "member", "snap"))
- if err := snapshotter.SaveSnap(raftSnap); err != nil {
- return err
- }
- klog.Infof("Finished successfully")
- return nil
- }
- func traverseMetadata(head *store.NodeExtern, handleFunc func(*store.NodeExtern)) {
- q := []*store.NodeExtern{head}
- for len(q) > 0 {
- n := q[0]
- q = q[1:]
- handleFunc(n)
- q = append(q, n.Nodes...)
- }
- }
- const (
- revBytesLen = 8 + 1 + 8
- markedRevBytesLen = revBytesLen + 1
- markBytePosition = markedRevBytesLen - 1
- markTombstone byte = 't'
- )
- func isTombstone(b []byte) bool {
- return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
- }
- func traverseAndDeleteEmptyDir(st store.Store, dir string) error {
- e, err := st.Get(dir, true, false)
- if err != nil {
- return err
- }
- if len(e.Node.Nodes) == 0 {
- st.Delete(dir, true, true)
- return nil
- }
- for _, node := range e.Node.Nodes {
- if !node.Dir {
- klog.V(2).Infof("key: %s", node.Key[len(etcdserver.StoreKeysPrefix):])
- } else {
- err := traverseAndDeleteEmptyDir(st, node.Key)
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func rebuild(datadir string) ([]byte, *raftpb.HardState, store.Store, error) {
- waldir := path.Join(datadir, "member", "wal")
- snapdir := path.Join(datadir, "member", "snap")
- ss := snap.New(snapdir)
- snapshot, err := ss.Load()
- if err != nil && err != snap.ErrNoSnapshot {
- return nil, nil, nil, err
- }
- var walsnap walpb.Snapshot
- if snapshot != nil {
- walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
- }
- w, err := wal.OpenForRead(waldir, walsnap)
- if err != nil {
- return nil, nil, nil, err
- }
- defer w.Close()
- meta, hardstate, ents, err := w.ReadAll()
- if err != nil {
- return nil, nil, nil, err
- }
- st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
- if snapshot != nil {
- err := st.Recovery(snapshot.Data)
- if err != nil {
- return nil, nil, nil, err
- }
- }
- cluster := membership.NewCluster("")
- cluster.SetStore(st)
- cluster.Recover(func(*semver.Version) {})
- applier := etcdserver.NewApplierV2(st, cluster)
- for _, ent := range ents {
- if ent.Type == raftpb.EntryConfChange {
- var cc raftpb.ConfChange
- pbutil.MustUnmarshal(&cc, ent.Data)
- switch cc.Type {
- case raftpb.ConfChangeAddNode:
- m := new(membership.Member)
- if err := json.Unmarshal(cc.Context, m); err != nil {
- return nil, nil, nil, err
- }
- cluster.AddMember(m)
- case raftpb.ConfChangeRemoveNode:
- id := types.ID(cc.NodeID)
- cluster.RemoveMember(id)
- case raftpb.ConfChangeUpdateNode:
- m := new(membership.Member)
- if err := json.Unmarshal(cc.Context, m); err != nil {
- return nil, nil, nil, err
- }
- cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
- }
- continue
- }
- var raftReq pb.InternalRaftRequest
- if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
- var r pb.Request
- pbutil.MustUnmarshal(&r, ent.Data)
- applyRequest(&r, applier)
- } else {
- if raftReq.V2 != nil {
- req := raftReq.V2
- applyRequest(req, applier)
- }
- }
- }
- return meta, &hardstate, st, nil
- }
- func toTTLOptions(r *pb.Request) store.TTLOptionSet {
- refresh, _ := pbutil.GetBool(r.Refresh)
- ttlOptions := store.TTLOptionSet{Refresh: refresh}
- if r.Expiration != 0 {
- ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
- }
- return ttlOptions
- }
- func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
- // TODO: find a sane way to perform this cast or avoid it in the first place
- reqV2 := &etcdserver.RequestV2{
- ID: r.ID,
- Method: r.Method,
- Path: r.Path,
- Val: r.Val,
- Dir: r.Dir,
- PrevValue: r.PrevValue,
- PrevIndex: r.PrevIndex,
- PrevExist: r.PrevExist,
- Expiration: r.Expiration,
- Wait: r.Wait,
- Since: r.Since,
- Recursive: r.Recursive,
- Sorted: r.Sorted,
- Quorum: r.Quorum,
- Time: r.Time,
- Stream: r.Stream,
- Refresh: r.Refresh,
- XXX_unrecognized: r.XXX_unrecognized,
- }
- toTTLOptions(r)
- switch r.Method {
- case "PUT":
- applyV2.Put(reqV2)
- case "DELETE":
- applyV2.Delete(reqV2)
- case "POST", "QGET", "SYNC":
- return
- default:
- klog.Fatal("unknown command")
- }
- }
|