From 88f39ad8de2bb5b451e1cec0ae132dc195fa47f0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 8 Oct 2021 18:44:37 +0800 Subject: [PATCH] feat: ADR-040: Implement DBConnection.Revert (#10308) Implements the `DBConnection.Revert` method which reverts DB state to the last saved version. This will be need to implement atomic commits with the KV store for https://github.com/cosmos/cosmos-sdk/pull/9892 (supports [ADR-040](https://github.com/cosmos/cosmos-sdk/blob/eb7d939f86c6cd7b4218492364cdda3f649f06b5/docs/architecture/adr-040-storage-and-smt-state-commitments.md)). Closes: https://github.com/cosmos/cosmos-sdk/pull/10308 --- ### Author Checklist *All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues.* I have... - [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] added `!` to the type prefix if API or client breaking change - [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] provided a link to the relevant issue or specification - [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] added a changelog entry to `CHANGELOG.md` - [x] included comments for [documenting Go code](https://blog.golang.org/godoc) - [x] updated the relevant documentation or specification - [x] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist *All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items.* I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed `!` in the type prefix if API or client breaking change - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable) --- CHANGELOG.md | 1 + db/badgerdb/db.go | 137 +++++++++++++++++++++++++++++++++++------ db/badgerdb/db_test.go | 5 ++ db/dbtest/testcases.go | 36 +++++++---- db/memdb/db.go | 25 ++++++++ db/memdb/db_test.go | 4 ++ db/rocksdb/db.go | 12 ++-- db/types.go | 29 +++++---- 8 files changed, 195 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dd5611d6f..6e66687671 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -189,6 +189,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend * [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend * [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend +* [\#10308](https://github.com/cosmos/cosmos-sdk/pull/10308) ADR-040: Implement DBConnection.Revert ### Client Breaking Changes diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index c52c6172bb..894537a57c 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -2,9 +2,9 @@ package badgerdb import ( "bytes" + "context" "encoding/csv" "errors" - "math" "os" "path/filepath" "strconv" @@ -15,6 +15,8 @@ import ( dbutil "github.com/cosmos/cosmos-sdk/db/internal" "github.com/dgraph-io/badger/v3" + bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -57,10 +59,10 @@ type badgerIterator struct { // Map our versions to Badger timestamps. // -// A badger Txn's commit TS must be strictly greater than a record's "last-read" -// TS in order to detect conflicts, and a Txn must be read at a TS after last +// A badger Txn's commit timestamp must be strictly greater than a record's "last-read" +// timestamp in order to detect conflicts, and a Txn must be read at a timestamp after last // commit to see current state. So we must use commit increments that are more -// granular than our version interval, and map versions to the corresponding TS. +// granular than our version interval, and map versions to the corresponding timestamp. type versionManager struct { *dbm.VersionManager vmap map[uint64]uint64 @@ -111,7 +113,10 @@ func readVersionsFile(path string) (*versionManager, error) { if err != nil { return nil, err } - var versions []uint64 + var ( + versions []uint64 + lastTs uint64 + ) vmap := map[uint64]uint64{} for _, row := range rows { version, err := strconv.ParseUint(row[0], 10, 64) @@ -122,6 +127,9 @@ func readVersionsFile(path string) (*versionManager, error) { if err != nil { return nil, err } + if version == 0 { // 0 maps to the latest timestamp + lastTs = ts + } versions = append(versions, version) vmap[version] = ts } @@ -129,7 +137,7 @@ func readVersionsFile(path string) (*versionManager, error) { return &versionManager{ VersionManager: vmgr, vmap: vmap, - lastTs: vmgr.Last(), + lastTs: lastTs, }, nil } @@ -141,7 +149,9 @@ func writeVersionsFile(vm *versionManager, path string) error { } defer file.Close() w := csv.NewWriter(file) - var rows [][]string + rows := [][]string{ + []string{"0", strconv.FormatUint(vm.lastTs, 10)}, + } for it := vm.Iterator(); it.Next(); { version := it.Value() ts, ok := vm.vmap[version] @@ -157,16 +167,20 @@ func writeVersionsFile(vm *versionManager, path string) error { } func (b *BadgerDB) Reader() dbm.DBReader { - return &badgerTxn{txn: b.db.NewTransactionAt(math.MaxUint64, false), db: b} + b.mtx.RLock() + ts := b.vmgr.lastTs + b.mtx.RUnlock() + return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b} } func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) { b.mtx.RLock() defer b.mtx.RUnlock() - if !b.vmgr.Exists(version) { + ts, has := b.vmgr.versionTs(version) + if !has { return nil, dbm.ErrVersionDoesNotExist } - return &badgerTxn{txn: b.db.NewTransactionAt(b.vmgr.versionTs(version), false), db: b}, nil + return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}, nil } func (b *BadgerDB) ReadWriter() dbm.DBReadWriter { @@ -232,6 +246,83 @@ func (b *BadgerDB) DeleteVersion(target uint64) error { return nil } +func (b *BadgerDB) Revert() error { + b.mtx.RLock() + defer b.mtx.RUnlock() + if b.openWriters > 0 { + return dbm.ErrOpenTransactions + } + + // Revert from latest commit timestamp to last "saved" timestamp + // if no versions exist, use 0 as it precedes any possible commit timestamp + var target uint64 + last := b.vmgr.Last() + if last == 0 { + target = 0 + } else { + var has bool + if target, has = b.vmgr.versionTs(last); !has { + return errors.New("bad version history") + } + } + lastTs := b.vmgr.lastTs + if target == lastTs { + return nil + } + + // Badger provides no way to rollback committed data, so we undo all changes + // since the target version using the Stream API + stream := b.db.NewStreamAt(lastTs) + // Skips unchanged keys + stream.ChooseKey = func(item *badger.Item) bool { return item.Version() > target } + // Scans for value at target version + stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + kv := bpb.KV{Key: key} + // advance down to <= target version + itr.Next() // we have at least one newer version + for itr.Valid() && bytes.Equal(key, itr.Item().Key()) && itr.Item().Version() > target { + itr.Next() + } + if itr.Valid() && bytes.Equal(key, itr.Item().Key()) && !itr.Item().IsDeletedOrExpired() { + var err error + kv.Value, err = itr.Item().ValueCopy(nil) + if err != nil { + return nil, err + } + } + return &bpb.KVList{Kv: []*bpb.KV{&kv}}, nil + } + txn := b.db.NewTransactionAt(lastTs, true) + defer txn.Discard() + stream.Send = func(buf *z.Buffer) error { + kvl, err := badger.BufferToKVList(buf) + if err != nil { + return err + } + // nil Value indicates a deleted entry + for _, kv := range kvl.Kv { + if kv.Value == nil { + err = txn.Delete(kv.Key) + if err != nil { + return err + } + } else { + err = txn.Set(kv.Key, kv.Value) + if err != nil { + return err + } + } + } + return nil + } + + err := stream.Orchestrate(context.Background()) + if err != nil { + return err + } + return txn.CommitAt(lastTs, nil) +} + func (b *BadgerDB) Stats() map[string]string { return nil } func (tx *badgerTxn) Get(key []byte) ([]byte, error) { @@ -283,7 +374,7 @@ func (tx *badgerWriter) Commit() (err error) { return errors.New("transaction has been discarded") } defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }() - // Commit to the current commit TS, after ensuring it is > ReadTs + // Commit to the current commit timestamp, after ensuring it is > ReadTs tx.db.mtx.RLock() tx.db.vmgr.updateCommitTs(tx.txn.ReadTs()) ts := tx.db.vmgr.lastTs @@ -385,14 +476,23 @@ func (i *badgerIterator) Value() []byte { return val } -func (vm *versionManager) versionTs(ver uint64) uint64 { - return vm.vmap[ver] +func (vm *versionManager) versionTs(ver uint64) (uint64, bool) { + ts, has := vm.vmap[ver] + return ts, has +} + +// updateCommitTs increments the lastTs if equal to readts. +func (vm *versionManager) updateCommitTs(readts uint64) { + if vm.lastTs == readts { + vm.lastTs += 1 + } } // Atomically accesses the last commit timestamp used as a version marker. func (vm *versionManager) lastCommitTs() uint64 { return atomic.LoadUint64(&vm.lastTs) } + func (vm *versionManager) Copy() *versionManager { vmap := map[uint64]uint64{} for ver, ts := range vm.vmap { @@ -405,12 +505,6 @@ func (vm *versionManager) Copy() *versionManager { } } -// updateCommitTs increments the lastTs if equal to readts. -func (vm *versionManager) updateCommitTs(readts uint64) { - if vm.lastTs == readts { - vm.lastTs += 1 - } -} func (vm *versionManager) Save(target uint64) (uint64, error) { id, err := vm.VersionManager.Save(target) if err != nil { @@ -419,3 +513,8 @@ func (vm *versionManager) Save(target uint64) (uint64, error) { vm.vmap[id] = vm.lastTs // non-atomic, already guarded by the vmgr mutex return id, nil } + +func (vm *versionManager) Delete(target uint64) { + vm.VersionManager.Delete(target) + delete(vm.vmap, target) +} diff --git a/db/badgerdb/db_test.go b/db/badgerdb/db_test.go index d451a63f43..419f595d60 100644 --- a/db/badgerdb/db_test.go +++ b/db/badgerdb/db_test.go @@ -31,6 +31,11 @@ func TestVersioning(t *testing.T) { dbtest.DoTestVersioning(t, load) } +func TestRevert(t *testing.T) { + dbtest.DoTestRevert(t, load, false) + dbtest.DoTestRevert(t, load, true) +} + func TestReloadDB(t *testing.T) { dbtest.DoTestReloadDB(t, load) } diff --git a/db/dbtest/testcases.go b/db/dbtest/testcases.go index 304fde5582..dde135fa36 100644 --- a/db/dbtest/testcases.go +++ b/db/dbtest/testcases.go @@ -399,27 +399,37 @@ func DoTestRevert(t *testing.T, load Loader, reload bool) { db := load(t, dirname) var txn dbm.DBWriter - txn = db.Writer() - require.NoError(t, txn.Set([]byte{2}, []byte{2})) - require.NoError(t, txn.Commit()) + initContents := func() { + txn = db.Writer() + require.NoError(t, txn.Set([]byte{2}, []byte{2})) + require.NoError(t, txn.Commit()) - txn = db.Writer() - for i := byte(6); i < 10; i++ { - require.NoError(t, txn.Set([]byte{i}, []byte{i})) + txn = db.Writer() + for i := byte(6); i < 10; i++ { + require.NoError(t, txn.Set([]byte{i}, []byte{i})) + } + require.NoError(t, txn.Delete([]byte{2})) + require.NoError(t, txn.Delete([]byte{3})) + require.NoError(t, txn.Commit()) } - require.NoError(t, txn.Delete([]byte{2})) - require.NoError(t, txn.Delete([]byte{3})) - require.NoError(t, txn.Commit()) - require.Error(t, db.Revert()) // can't revert with no versions + initContents() + require.NoError(t, db.Revert()) + view := db.Reader() + it, err := view.Iterator(nil, nil) + require.NoError(t, err) + require.False(t, it.Next()) // db is empty + require.NoError(t, it.Close()) + require.NoError(t, view.Discard()) - _, err := db.SaveNextVersion() + initContents() + _, err = db.SaveNextVersion() require.NoError(t, err) // get snapshot of db state state := map[string][]byte{} - view := db.Reader() - it, err := view.Iterator(nil, nil) + view = db.Reader() + it, err = view.Iterator(nil, nil) require.NoError(t, err) for it.Next() { state[string(it.Key())] = it.Value() diff --git a/db/memdb/db.go b/db/memdb/db.go index e80688eff7..b656d60eda 100644 --- a/db/memdb/db.go +++ b/db/memdb/db.go @@ -159,6 +159,31 @@ func (db *MemDB) DeleteVersion(target uint64) error { return nil } +func (db *MemDB) Revert() error { + db.mtx.RLock() + defer db.mtx.RUnlock() + if db.openWriters > 0 { + return dbm.ErrOpenTransactions + } + + last := db.vmgr.Last() + if last == 0 { + db.btree = btree.New(bTreeDegree) + return nil + } + var has bool + db.btree, has = db.saved[last] + if !has { + return fmt.Errorf("bad version history: version %v not saved", last) + } + for ver, _ := range db.saved { + if ver > last { + delete(db.saved, ver) + } + } + return nil +} + // Get implements DBReader. func (tx *dbTxn) Get(key []byte) ([]byte, error) { if tx.btree == nil { diff --git a/db/memdb/db_test.go b/db/memdb/db_test.go index a3ac242eac..cd5bd89081 100644 --- a/db/memdb/db_test.go +++ b/db/memdb/db_test.go @@ -44,6 +44,10 @@ func TestVersioning(t *testing.T) { dbtest.DoTestVersioning(t, load) } +func TestRevert(t *testing.T) { + dbtest.DoTestRevert(t, load, false) +} + func TestTransactions(t *testing.T) { dbtest.DoTestTransactions(t, load, false) } diff --git a/db/rocksdb/db.go b/db/rocksdb/db.go index f906ea5316..4b69172b51 100644 --- a/db/rocksdb/db.go +++ b/db/rocksdb/db.go @@ -284,10 +284,6 @@ func (mgr *dbManager) Revert() (err error) { if mgr.openWriters > 0 { return dbm.ErrOpenTransactions } - last := mgr.vmgr.Last() - if last == 0 { - return dbm.ErrInvalidVersion - } // Close current connection and replace it with a checkpoint (created from the last checkpoint) mgr.current.Close() dbPath := filepath.Join(mgr.dir, currentDBFileName) @@ -295,9 +291,11 @@ func (mgr *dbManager) Revert() (err error) { if err != nil { return } - err = mgr.restoreFromCheckpoint(last, dbPath) - if err != nil { - return + if last := mgr.vmgr.Last(); last != 0 { + err = mgr.restoreFromCheckpoint(last, dbPath) + if err != nil { + return + } } mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath) return diff --git a/db/types.go b/db/types.go index d360f57f4a..bab74bf8c5 100644 --- a/db/types.go +++ b/db/types.go @@ -31,38 +31,37 @@ var ( // and read and write access. // Past versions are only accessible read-only. type DBConnection interface { - // Opens a read-only transaction at the current working version. + // Reader opens a read-only transaction at the current working version. Reader() DBReader - // Opens a read-only transaction at a specified version. + // ReaderAt opens a read-only transaction at a specified version. // Returns ErrVersionDoesNotExist for invalid versions. ReaderAt(uint64) (DBReader, error) - // Opens a read-write transaction at the current version. + // ReadWriter opens a read-write transaction at the current version. ReadWriter() DBReadWriter - // Opens a write-only transaction at the current version. + // Writer opens a write-only transaction at the current version. Writer() DBWriter - // Returns all saved versions as an immutable set which is safe for concurrent access. + // Versions returns all saved versions as an immutable set which is safe for concurrent access. Versions() (VersionSet, error) - // Saves the current contents of the database and returns the next version ID, which will be - // `Versions().Last()+1`. + // SaveNextVersion saves the current contents of the database and returns the next version ID, + // which will be `Versions().Last()+1`. // Returns an error if any open DBWriter transactions exist. // TODO: rename to something more descriptive? SaveNextVersion() (uint64, error) - // Attempts to save database at a specific version ID, which must be greater than or equal to - // what would be returned by `SaveNextVersion`. + // SaveVersion attempts to save database at a specific version ID, which must be greater than or + // equal to what would be returned by `SaveNextVersion`. // Returns an error if any open DBWriter transactions exist. SaveVersion(uint64) error - // Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. + // DeleteVersion deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. DeleteVersion(uint64) error - // Reverts the DB state to the last saved version. - // Returns an error if no saved versions exist. + // Revert reverts the DB state to the last saved version; if none exist, this clears the DB. // Returns an error if any open DBWriter transactions exist. Revert() error @@ -101,7 +100,7 @@ type DBReader interface { // TODO: replace with an extra argument to Iterator()? ReverseIterator(start, end []byte) (Iterator, error) - // Discards the transaction, invalidating any future operations on it. + // Discard discards the transaction, invalidating any future operations on it. Discard() error } @@ -118,10 +117,10 @@ type DBWriter interface { // CONTRACT: key readonly []byte Delete([]byte) error - // Flushes pending writes and discards the transaction. + // Commit flushes pending writes and discards the transaction. Commit() error - // Discards the transaction, invalidating any future operations on it. + // Discard discards the transaction, invalidating any future operations on it. Discard() error }