| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 | // Copyright 2015 The etcd Authors//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at////     http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.package mvccimport (	"bytes"	"errors"	"fmt"	"github.com/google/btree"	"go.uber.org/zap")var (	ErrRevisionNotFound = errors.New("mvcc: revision not found"))// keyIndex stores the revisions of a key in the backend.// Each keyIndex has at least one key generation.// Each generation might have several key versions.// Tombstone on a key appends an tombstone version at the end// of the current generation and creates a new empty generation.// Each version of a key has an index pointing to the backend.//// For example: put(1.0);put(2.0);tombstone(3.0);put(4.0);tombstone(5.0) on key "foo"// generate a keyIndex:// key:     "foo"// rev: 5// generations://    {empty}//    {4.0, 5.0(t)}//    {1.0, 2.0, 3.0(t)}//// Compact a keyIndex removes the versions with smaller or equal to// rev except the largest one. If the generation becomes empty// during compaction, it will be removed. if all the generations get// removed, the keyIndex should be removed.//// For example:// compact(2) on the previous example// generations://    {empty}//    {4.0, 5.0(t)}//    {2.0, 3.0(t)}//// compact(4)// generations://    {empty}//    {4.0, 5.0(t)}//// compact(5):// generations://    {empty} -> key SHOULD be removed.//// compact(6):// generations://    {empty} -> key SHOULD be removed.type keyIndex struct {	key         []byte	modified    revision // the main rev of the last modification	generations []generation}// put puts a revision to the keyIndex.func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {	rev := revision{main: main, sub: sub}	if !rev.GreaterThan(ki.modified) {		if lg != nil {			lg.Panic(				"'put' with an unexpected smaller revision",				zap.Int64("given-revision-main", rev.main),				zap.Int64("given-revision-sub", rev.sub),				zap.Int64("modified-revision-main", ki.modified.main),				zap.Int64("modified-revision-sub", ki.modified.sub),			)		} else {			plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)		}	}	if len(ki.generations) == 0 {		ki.generations = append(ki.generations, generation{})	}	g := &ki.generations[len(ki.generations)-1]	if len(g.revs) == 0 { // create a new key		keysGauge.Inc()		g.created = rev	}	g.revs = append(g.revs, rev)	g.ver++	ki.modified = rev}func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {	if len(ki.generations) != 0 {		if lg != nil {			lg.Panic(				"'restore' got an unexpected non-empty generations",				zap.Int("generations-size", len(ki.generations)),			)		} else {			plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")		}	}	ki.modified = modified	g := generation{created: created, ver: ver, revs: []revision{modified}}	ki.generations = append(ki.generations, g)	keysGauge.Inc()}// tombstone puts a revision, pointing to a tombstone, to the keyIndex.// It also creates a new empty generation in the keyIndex.// It returns ErrRevisionNotFound when tombstone on an empty generation.func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {	if ki.isEmpty() {		if lg != nil {			lg.Panic(				"'tombstone' got an unexpected empty keyIndex",				zap.String("key", string(ki.key)),			)		} else {			plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))		}	}	if ki.generations[len(ki.generations)-1].isEmpty() {		return ErrRevisionNotFound	}	ki.put(lg, main, sub)	ki.generations = append(ki.generations, generation{})	keysGauge.Dec()	return nil}// get gets the modified, created revision and version of the key that satisfies the given atRev.// Rev must be higher than or equal to the given atRev.func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {	if ki.isEmpty() {		if lg != nil {			lg.Panic(				"'get' got an unexpected empty keyIndex",				zap.String("key", string(ki.key)),			)		} else {			plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))		}	}	g := ki.findGeneration(atRev)	if g.isEmpty() {		return revision{}, revision{}, 0, ErrRevisionNotFound	}	n := g.walk(func(rev revision) bool { return rev.main > atRev })	if n != -1 {		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil	}	return revision{}, revision{}, 0, ErrRevisionNotFound}// since returns revisions since the given rev. Only the revision with the// largest sub revision will be returned if multiple revisions have the same// main revision.func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {	if ki.isEmpty() {		if lg != nil {			lg.Panic(				"'since' got an unexpected empty keyIndex",				zap.String("key", string(ki.key)),			)		} else {			plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))		}	}	since := revision{rev, 0}	var gi int	// find the generations to start checking	for gi = len(ki.generations) - 1; gi > 0; gi-- {		g := ki.generations[gi]		if g.isEmpty() {			continue		}		if since.GreaterThan(g.created) {			break		}	}	var revs []revision	var last int64	for ; gi < len(ki.generations); gi++ {		for _, r := range ki.generations[gi].revs {			if since.GreaterThan(r) {				continue			}			if r.main == last {				// replace the revision with a new one that has higher sub value,				// because the original one should not be seen by external				revs[len(revs)-1] = r				continue			}			revs = append(revs, r)			last = r.main		}	}	return revs}// compact compacts a keyIndex by removing the versions with smaller or equal// revision than the given atRev except the largest one (If the largest one is// a tombstone, it will not be kept).// If a generation becomes empty during compaction, it will be removed.func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {	if ki.isEmpty() {		if lg != nil {			lg.Panic(				"'compact' got an unexpected empty keyIndex",				zap.String("key", string(ki.key)),			)		} else {			plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))		}	}	genIdx, revIndex := ki.doCompact(atRev, available)	g := &ki.generations[genIdx]	if !g.isEmpty() {		// remove the previous contents.		if revIndex != -1 {			g.revs = g.revs[revIndex:]		}		// remove any tombstone		if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {			delete(available, g.revs[0])			genIdx++		}	}	// remove the previous generations.	ki.generations = ki.generations[genIdx:]}// keep finds the revision to be kept if compact is called at given atRev.func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {	if ki.isEmpty() {		return	}	genIdx, revIndex := ki.doCompact(atRev, available)	g := &ki.generations[genIdx]	if !g.isEmpty() {		// remove any tombstone		if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {			delete(available, g.revs[revIndex])		}	}}func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {	// walk until reaching the first revision smaller or equal to "atRev",	// and add the revision to the available map	f := func(rev revision) bool {		if rev.main <= atRev {			available[rev] = struct{}{}			return false		}		return true	}	genIdx, g := 0, &ki.generations[0]	// find first generation includes atRev or created after atRev	for genIdx < len(ki.generations)-1 {		if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {			break		}		genIdx++		g = &ki.generations[genIdx]	}	revIndex = g.walk(f)	return genIdx, revIndex}func (ki *keyIndex) isEmpty() bool {	return len(ki.generations) == 1 && ki.generations[0].isEmpty()}// findGeneration finds out the generation of the keyIndex that the// given rev belongs to. If the given rev is at the gap of two generations,// which means that the key does not exist at the given rev, it returns nil.func (ki *keyIndex) findGeneration(rev int64) *generation {	lastg := len(ki.generations) - 1	cg := lastg	for cg >= 0 {		if len(ki.generations[cg].revs) == 0 {			cg--			continue		}		g := ki.generations[cg]		if cg != lastg {			if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {				return nil			}		}		if g.revs[0].main <= rev {			return &ki.generations[cg]		}		cg--	}	return nil}func (ki *keyIndex) Less(b btree.Item) bool {	return bytes.Compare(ki.key, b.(*keyIndex).key) == -1}func (ki *keyIndex) equal(b *keyIndex) bool {	if !bytes.Equal(ki.key, b.key) {		return false	}	if ki.modified != b.modified {		return false	}	if len(ki.generations) != len(b.generations) {		return false	}	for i := range ki.generations {		ag, bg := ki.generations[i], b.generations[i]		if !ag.equal(bg) {			return false		}	}	return true}func (ki *keyIndex) String() string {	var s string	for _, g := range ki.generations {		s += g.String()	}	return s}// generation contains multiple revisions of a key.type generation struct {	ver     int64	created revision // when the generation is created (put in first revision).	revs    []revision}func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }// walk walks through the revisions in the generation in descending order.// It passes the revision to the given function.// walk returns until: 1. it finishes walking all pairs 2. the function returns false.// walk returns the position at where it stopped. If it stopped after// finishing walking, -1 will be returned.func (g *generation) walk(f func(rev revision) bool) int {	l := len(g.revs)	for i := range g.revs {		ok := f(g.revs[l-i-1])		if !ok {			return l - i - 1		}	}	return -1}func (g *generation) String() string {	return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)}func (g generation) equal(b generation) bool {	if g.ver != b.ver {		return false	}	if len(g.revs) != len(b.revs) {		return false	}	for i := range g.revs {		ar, br := g.revs[i], b.revs[i]		if ar != br {			return false		}	}	return true}
 |