From 33c8314efe1b5c5a637afb487ed19ca16c066f6a Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 5 Oct 2021 23:39:55 +0800 Subject: [PATCH] feat: ADR-040: Implement RocksDB backend (#9851) ## Description Partially resolves: https://github.com/vulcanize/cosmos-sdk/issues/14 Implements a [RocksDB](https://github.com/facebook/rocksdb)-based backend for the DB interface introduced by https://github.com/cosmos/cosmos-sdk/pull/9573 and specified by [ADR-040](https://github.com/cosmos/cosmos-sdk/blob/eb7d939f86c6cd7b4218492364cdda3f649f06b5/docs/architecture/adr-040-storage-and-smt-state-commitments.md). * Historical versioning is implemented with [Checkpoints](https://github.com/facebook/rocksdb/wiki/Checkpoints). * Uses `OptimisticTransactionDB` to allow concurrent transactions with write conflict detection. This depends on some additional CGo bindings - see https://github.com/tecbot/gorocksdb/pull/216, https://github.com/facebook/rocksdb/pull/8526. We'll need to replace the `gorocksdb` module until these are upstream. --- ### Author Checklist *All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues.* I have... - [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [x] added `!` to the type prefix if API or client breaking change - [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] provided a link to the relevant issue or specification - [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - n/a - [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] added a changelog entry to `CHANGELOG.md` - [x] included comments for [documenting Go code](https://blog.golang.org/godoc) - [x] updated the relevant documentation or specification - [x] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist *All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items.* I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed `!` in the type prefix if API or client breaking change - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable) --- CHANGELOG.md | 1 + db/README.md | 3 + db/badgerdb/db.go | 48 ++-- db/dbtest/testcases.go | 189 +++++++++++++--- db/go.mod | 22 +- db/go.sum | 8 + db/internal/util.go | 27 +++ db/memdb/db.go | 46 +++- db/rocksdb/batch.go | 67 ++++++ db/rocksdb/db.go | 483 +++++++++++++++++++++++++++++++++++++++++ db/rocksdb/db_test.go | 63 ++++++ db/rocksdb/iterator.go | 147 +++++++++++++ db/types.go | 13 +- go.mod | 1 + go.sum | 2 + 15 files changed, 1064 insertions(+), 56 deletions(-) create mode 100644 db/internal/util.go create mode 100644 db/rocksdb/batch.go create mode 100644 db/rocksdb/db.go create mode 100644 db/rocksdb/db_test.go create mode 100644 db/rocksdb/iterator.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 556a4ac312..4dd5611d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -188,6 +188,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#9573](https://github.com/cosmos/cosmos-sdk/pull/9573) ADR 040 implementation: New DB interface * [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend * [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend +* [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend ### Client Breaking Changes diff --git a/db/README.md b/db/README.md index ab766bcbbc..1a9f5a9782 100644 --- a/db/README.md +++ b/db/README.md @@ -62,3 +62,6 @@ tx2.Set(key, []byte("b")) tx1.Commit() // ok tx2.Commit() // ok ``` +### RocksDB + +A [RocksDB](https://github.com/facebook/rocksdb)-based backend. Internally this uses [`OptimisticTransactionDB`](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb) to allow concurrent transactions with write conflict detection. Historical versioning is internally implemented with [Checkpoints](https://github.com/facebook/rocksdb/wiki/Checkpoints). diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index 4135530006..c52c6172bb 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -3,6 +3,7 @@ package badgerdb import ( "bytes" "encoding/csv" + "errors" "math" "os" "path/filepath" @@ -11,6 +12,7 @@ import ( "sync/atomic" dbm "github.com/cosmos/cosmos-sdk/db" + dbutil "github.com/cosmos/cosmos-sdk/db/internal" "github.com/dgraph-io/badger/v3" ) @@ -41,6 +43,7 @@ type badgerTxn struct { type badgerWriter struct { badgerTxn + discarded bool } type badgerIterator struct { @@ -48,7 +51,8 @@ type badgerIterator struct { start, end []byte iter *badger.Iterator lastErr error - primed bool + // Whether iterator has been advanced to the first element (is fully initialized) + primed bool } // Map our versions to Badger timestamps. @@ -168,9 +172,9 @@ func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) { func (b *BadgerDB) ReadWriter() dbm.DBReadWriter { atomic.AddInt32(&b.openWriters, 1) b.mtx.RLock() - ts := b.vmgr.lastCommitTs() + ts := b.vmgr.lastTs b.mtx.RUnlock() - return &badgerWriter{badgerTxn{txn: b.db.NewTransactionAt(ts, true), db: b}} + return &badgerWriter{badgerTxn{txn: b.db.NewTransactionAt(ts, true), db: b}, false} } func (b *BadgerDB) Writer() dbm.DBWriter { @@ -261,11 +265,8 @@ func (tx *badgerTxn) Has(key []byte) (bool, error) { } func (tx *badgerWriter) Set(key, value []byte) error { - if len(key) == 0 { - return dbm.ErrKeyEmpty - } - if value == nil { - return dbm.ErrValueNil + if err := dbutil.ValidateKv(key, value); err != nil { + return err } return tx.txn.Set(key, value) } @@ -277,20 +278,31 @@ func (tx *badgerWriter) Delete(key []byte) error { return tx.txn.Delete(key) } -func (tx *badgerWriter) Commit() error { +func (tx *badgerWriter) Commit() (err error) { + if tx.discarded { + return errors.New("transaction has been discarded") + } + defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }() // Commit to the current commit TS, after ensuring it is > ReadTs + tx.db.mtx.RLock() tx.db.vmgr.updateCommitTs(tx.txn.ReadTs()) - defer tx.Discard() - return tx.txn.CommitAt(tx.db.vmgr.lastCommitTs(), nil) + ts := tx.db.vmgr.lastTs + tx.db.mtx.RUnlock() + err = tx.txn.CommitAt(ts, nil) + return } -func (tx *badgerTxn) Discard() { +func (tx *badgerTxn) Discard() error { tx.txn.Discard() + return nil } -func (tx *badgerWriter) Discard() { - defer atomic.AddInt32(&tx.db.openWriters, -1) - tx.badgerTxn.Discard() +func (tx *badgerWriter) Discard() error { + if !tx.discarded { + defer atomic.AddInt32(&tx.db.openWriters, -1) + tx.discarded = true + } + return tx.badgerTxn.Discard() } func (tx *badgerTxn) iteratorOpts(start, end []byte, opts badger.IteratorOptions) (*badgerIterator, error) { @@ -393,9 +405,11 @@ func (vm *versionManager) Copy() *versionManager { } } -// updateCommitTs atomically increments the lastTs if equal to readts. +// updateCommitTs increments the lastTs if equal to readts. func (vm *versionManager) updateCommitTs(readts uint64) { - atomic.CompareAndSwapUint64(&vm.lastTs, readts, readts+1) + if vm.lastTs == readts { + vm.lastTs += 1 + } } func (vm *versionManager) Save(target uint64) (uint64, error) { id, err := vm.VersionManager.Save(target) diff --git a/db/dbtest/testcases.go b/db/dbtest/testcases.go index dedd040922..304fde5582 100644 --- a/db/dbtest/testcases.go +++ b/db/dbtest/testcases.go @@ -69,7 +69,7 @@ func DoTestGetSetHasDelete(t *testing.T, load Loader) { err = txn.Set([]byte("b"), []byte{0x02}) require.NoError(t, err) - view.Discard() + require.NoError(t, view.Discard()) require.NoError(t, txn.Commit()) txn = db.ReadWriter() @@ -145,7 +145,7 @@ func DoTestIterators(t *testing.T, load Loader) { for ; iter.Next(); i++ { expectedValue := expected[i] value := iter.Value() - require.EqualValues(t, expectedValue, string(value), "i=%v", i) + require.Equal(t, expectedValue, string(value), "i=%v", i) } require.Equal(t, len(expected), i) } @@ -189,7 +189,7 @@ func DoTestIterators(t *testing.T, load Loader) { it.Close() } - view.Discard() + require.NoError(t, view.Discard()) require.NoError(t, db.Close()) } @@ -246,6 +246,7 @@ func DoTestVersioning(t *testing.T, load Loader) { require.NoError(t, err) has, err := view.Has([]byte("2")) require.False(t, has) + require.NoError(t, view.Discard()) view, err = db.ReaderAt(v2) require.NoError(t, err) @@ -258,13 +259,12 @@ func DoTestVersioning(t *testing.T, load Loader) { require.NoError(t, err) has, err = view.Has([]byte("1")) require.False(t, has) + require.NoError(t, view.Discard()) - // Try to read an invalid version view, err = db.ReaderAt(versions.Last() + 1) - require.Equal(t, dbm.ErrVersionDoesNotExist, err) + require.Equal(t, dbm.ErrVersionDoesNotExist, err, "should fail to read a nonexistent version") - require.NoError(t, db.DeleteVersion(v2)) - // Try to read a deleted version + require.NoError(t, db.DeleteVersion(v2), "should delete version v2") view, err = db.ReaderAt(v2) require.Equal(t, dbm.ErrVersionDoesNotExist, err) @@ -283,6 +283,14 @@ func DoTestVersioning(t *testing.T, load Loader) { prev = ver } + // Open multiple readers for the same past version + view, err = db.ReaderAt(v3) + require.NoError(t, err) + view2, err := db.ReaderAt(v3) + require.NoError(t, err) + require.NoError(t, view.Discard()) + require.NoError(t, view2.Discard()) + require.NoError(t, db.Close()) } @@ -300,23 +308,37 @@ func DoTestTransactions(t *testing.T, load Loader, multipleWriters bool) { t.Run("no commit", func(t *testing.T) { t.Helper() view := db.Reader() - defer view.Discard() tx := getWriter() - defer tx.Discard() require.NoError(t, tx.Set([]byte("0"), []byte("a"))) v, err := view.Get([]byte("0")) require.NoError(t, err) require.Nil(t, v) + require.NoError(t, view.Discard()) + require.NoError(t, tx.Discard()) }) // Try to commit version with open txns - t.Run("open transactions", func(t *testing.T) { + t.Run("cannot save with open transactions", func(t *testing.T) { t.Helper() tx := getWriter() - tx.Set([]byte("2"), []byte("a")) + require.NoError(t, tx.Set([]byte("0"), []byte("a"))) _, err := db.SaveNextVersion() require.Equal(t, dbm.ErrOpenTransactions, err) - tx.Discard() + require.NoError(t, tx.Discard()) + }) + + // Try to use a transaction after closing + t.Run("cannot reuse transaction", func(t *testing.T) { + t.Helper() + tx := getWriter() + require.NoError(t, tx.Commit()) + require.Error(t, tx.Set([]byte("0"), []byte("a"))) + require.NoError(t, tx.Discard()) // redundant discard is fine + + tx = getWriter() + require.NoError(t, tx.Discard()) + require.Error(t, tx.Set([]byte("0"), []byte("a"))) + require.NoError(t, tx.Discard()) }) // Continue only if the backend supports multiple concurrent writers @@ -353,13 +375,129 @@ func DoTestTransactions(t *testing.T, load Loader, multipleWriters bool) { } wg.Wait() view := db.Reader() - defer view.Discard() v, err := view.Get(ikey(0)) require.NoError(t, err) require.Equal(t, ival(0), v) + require.NoError(t, view.Discard()) }) - } + // Try to reuse a reader txn + view := db.Reader() + require.NoError(t, view.Discard()) + _, err := view.Get([]byte("0")) + require.Error(t, err) + require.NoError(t, view.Discard()) // redundant discard is fine + + require.NoError(t, db.Close()) +} + +// Test that Revert works as intended, optionally closing and +// reloading the DB both before and after reverting +func DoTestRevert(t *testing.T, load Loader, reload bool) { + t.Helper() + dirname := t.TempDir() + db := load(t, dirname) + var txn dbm.DBWriter + + txn = db.Writer() + require.NoError(t, txn.Set([]byte{2}, []byte{2})) + require.NoError(t, txn.Commit()) + + txn = db.Writer() + for i := byte(6); i < 10; i++ { + require.NoError(t, txn.Set([]byte{i}, []byte{i})) + } + require.NoError(t, txn.Delete([]byte{2})) + require.NoError(t, txn.Delete([]byte{3})) + require.NoError(t, txn.Commit()) + + require.Error(t, db.Revert()) // can't revert with no versions + + _, err := db.SaveNextVersion() + require.NoError(t, err) + + // get snapshot of db state + state := map[string][]byte{} + view := db.Reader() + it, err := view.Iterator(nil, nil) + require.NoError(t, err) + for it.Next() { + state[string(it.Key())] = it.Value() + } + require.NoError(t, it.Close()) + view.Discard() + + checkContents := func() { + view = db.Reader() + count := 0 + it, err = view.Iterator(nil, nil) + require.NoError(t, err) + for it.Next() { + val, has := state[string(it.Key())] + require.True(t, has, "key should not be present: %v => %v", it.Key(), it.Value()) + require.Equal(t, val, it.Value()) + count++ + } + require.NoError(t, it.Close()) + require.Equal(t, len(state), count) + view.Discard() + } + + changeContents := func() { + txn = db.Writer() + require.NoError(t, txn.Set([]byte{3}, []byte{15})) + require.NoError(t, txn.Set([]byte{7}, []byte{70})) + require.NoError(t, txn.Delete([]byte{8})) + require.NoError(t, txn.Delete([]byte{9})) + require.NoError(t, txn.Set([]byte{10}, []byte{0})) + require.NoError(t, txn.Commit()) + + txn = db.Writer() + require.NoError(t, txn.Set([]byte{3}, []byte{30})) + require.NoError(t, txn.Set([]byte{8}, []byte{8})) + require.NoError(t, txn.Delete([]byte{9})) + require.NoError(t, txn.Commit()) + } + + changeContents() + + if reload { + db.Close() + db = load(t, dirname) + } + + txn = db.Writer() + require.Error(t, db.Revert()) // can't revert with open writers + txn.Discard() + require.NoError(t, db.Revert()) + + if reload { + db.Close() + db = load(t, dirname) + } + + checkContents() + + // With intermediate versions added & deleted, revert again to v1 + changeContents() + v2, _ := db.SaveNextVersion() + + txn = db.Writer() + require.NoError(t, txn.Delete([]byte{6})) + require.NoError(t, txn.Set([]byte{8}, []byte{9})) + require.NoError(t, txn.Set([]byte{11}, []byte{11})) + txn.Commit() + v3, _ := db.SaveNextVersion() + + txn = db.Writer() + require.NoError(t, txn.Set([]byte{12}, []byte{12})) + txn.Commit() + + db.DeleteVersion(v2) + db.DeleteVersion(v3) + db.Revert() + checkContents() + require.NoError(t, db.Close()) } @@ -389,9 +527,12 @@ func DoTestReloadDB(t *testing.T, load Loader) { require.NoError(t, err) txn = db.Writer() - require.NoError(t, txn.Set(ikey(100), ival(100))) + require.NoError(t, txn.Set([]byte("working-version"), ival(100))) require.NoError(t, txn.Commit()) + txn = db.Writer() + require.NoError(t, txn.Set([]byte("uncommitted"), ival(200))) + // Reload and check each saved version db.Close() db = load(t, dirname) @@ -401,19 +542,13 @@ func DoTestReloadDB(t *testing.T, load Loader) { require.NoError(t, err) require.Equal(t, last, vset.Last()) - txn = db.Writer() - for i := 10; i < 15; i++ { - require.NoError(t, txn.Set(ikey(i), ival(i+10))) - } - require.NoError(t, txn.Commit()) - for i := 0; i < 10; i++ { view, err := db.ReaderAt(firstVersions[i]) require.NoError(t, err) val, err := view.Get(ikey(i)) require.NoError(t, err) require.Equal(t, ival(i), val) - view.Discard() + require.NoError(t, view.Discard()) } view, err := db.ReaderAt(last) @@ -427,14 +562,18 @@ func DoTestReloadDB(t *testing.T, load Loader) { require.Equal(t, ival(i), v) } } - view.Discard() + require.NoError(t, view.Discard()) // Load working version view = db.Reader() - val, err := view.Get(ikey(100)) + val, err := view.Get([]byte("working-version")) require.NoError(t, err) require.Equal(t, ival(100), val) - view.Discard() + val, err = view.Get([]byte("uncommitted")) + require.NoError(t, err) + require.Nil(t, val) + + require.NoError(t, view.Discard()) require.NoError(t, db.Close()) } diff --git a/db/go.mod b/db/go.mod index 5e2a70f769..0577e38ffe 100644 --- a/db/go.mod +++ b/db/go.mod @@ -6,10 +6,30 @@ require ( github.com/dgraph-io/badger/v3 v3.2103.1 github.com/google/btree v1.0.0 github.com/stretchr/testify v1.7.0 + github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/cespare/xxhash v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto v0.1.0 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/golang/protobuf v1.3.1 // indirect + github.com/golang/snappy v0.0.3 // indirect + github.com/google/flatbuffers v1.12.0 // indirect + github.com/klauspost/compress v1.12.3 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opencensus.io v0.22.5 // indirect + golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) + +// FIXME: gorocksdb bindings for OptimisticTransactionDB are not merged upstream, so we use a fork +// See https://github.com/tecbot/gorocksdb/pull/216 +replace github.com/tecbot/gorocksdb => github.com/roysc/gorocksdb v1.1.0 diff --git a/db/go.sum b/db/go.sum index 5033ed4839..2f70e94719 100644 --- a/db/go.sum +++ b/db/go.sum @@ -23,6 +23,12 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -61,6 +67,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/roysc/gorocksdb v1.1.0 h1:+bPfxli0I3UhzJpghp/L8Y9oGR6SoDj4dFV5lPPHUvs= +github.com/roysc/gorocksdb v1.1.0/go.mod h1:b/U29r/CtguX3TF7mKG1Jjn4APDqh4wECshxXdiWHpA= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/db/internal/util.go b/db/internal/util.go new file mode 100644 index 0000000000..310442c021 --- /dev/null +++ b/db/internal/util.go @@ -0,0 +1,27 @@ +package util + +import ( + "fmt" + dbm "github.com/cosmos/cosmos-sdk/db" +) + +func ValidateKv(key, value []byte) error { + if len(key) == 0 { + return dbm.ErrKeyEmpty + } + if value == nil { + return dbm.ErrValueNil + } + return nil +} + +func CombineErrors(ret error, also error, desc string) error { + if also != nil { + if ret != nil { + ret = fmt.Errorf("%w; %v: %v", ret, desc, also) + } else { + ret = also + } + } + return ret +} diff --git a/db/memdb/db.go b/db/memdb/db.go index 9dd8af7c68..e80688eff7 100644 --- a/db/memdb/db.go +++ b/db/memdb/db.go @@ -7,6 +7,7 @@ import ( "sync/atomic" dbm "github.com/cosmos/cosmos-sdk/db" + dbutil "github.com/cosmos/cosmos-sdk/db/internal" "github.com/google/btree" ) @@ -160,6 +161,9 @@ func (db *MemDB) DeleteVersion(target uint64) error { // Get implements DBReader. func (tx *dbTxn) Get(key []byte) ([]byte, error) { + if tx.btree == nil { + return nil, dbm.ErrTransactionClosed + } if len(key) == 0 { return nil, dbm.ErrKeyEmpty } @@ -172,6 +176,9 @@ func (tx *dbTxn) Get(key []byte) ([]byte, error) { // Has implements DBReader. func (tx *dbTxn) Has(key []byte) (bool, error) { + if tx.btree == nil { + return false, dbm.ErrTransactionClosed + } if len(key) == 0 { return false, dbm.ErrKeyEmpty } @@ -180,11 +187,11 @@ func (tx *dbTxn) Has(key []byte) (bool, error) { // Set implements DBWriter. func (tx *dbWriter) Set(key []byte, value []byte) error { - if len(key) == 0 { - return dbm.ErrKeyEmpty + if tx.btree == nil { + return dbm.ErrTransactionClosed } - if value == nil { - return dbm.ErrValueNil + if err := dbutil.ValidateKv(key, value); err != nil { + return err } tx.btree.ReplaceOrInsert(newPair(key, value)) return nil @@ -192,6 +199,9 @@ func (tx *dbWriter) Set(key []byte, value []byte) error { // Delete implements DBWriter. func (tx *dbWriter) Delete(key []byte) error { + if tx.btree == nil { + return dbm.ErrTransactionClosed + } if len(key) == 0 { return dbm.ErrKeyEmpty } @@ -202,6 +212,9 @@ func (tx *dbWriter) Delete(key []byte) error { // Iterator implements DBReader. // Takes out a read-lock on the database until the iterator is closed. func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) { + if tx.btree == nil { + return nil, dbm.ErrTransactionClosed + } if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, dbm.ErrKeyEmpty } @@ -211,6 +224,9 @@ func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) { // ReverseIterator implements DBReader. // Takes out a read-lock on the database until the iterator is closed. func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) { + if tx.btree == nil { + return nil, dbm.ErrTransactionClosed + } if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, dbm.ErrKeyEmpty } @@ -219,17 +235,29 @@ func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) { // Commit implements DBWriter. func (tx *dbWriter) Commit() error { + if tx.btree == nil { + return dbm.ErrTransactionClosed + } tx.db.mtx.Lock() defer tx.db.mtx.Unlock() - defer tx.Discard() tx.db.btree = tx.btree + return tx.Discard() +} + +// Discard implements DBReader. +func (tx *dbTxn) Discard() error { + if tx.btree != nil { + tx.btree = nil + } return nil } -// Discard implements DBReader and DBWriter. -func (tx *dbTxn) Discard() {} -func (tx *dbWriter) Discard() { - atomic.AddInt32(&tx.db.openWriters, -1) +// Discard implements DBWriter. +func (tx *dbWriter) Discard() error { + if tx.btree != nil { + defer atomic.AddInt32(&tx.db.openWriters, -1) + } + return tx.dbTxn.Discard() } // Print prints the database contents. diff --git a/db/rocksdb/batch.go b/db/rocksdb/batch.go new file mode 100644 index 0000000000..e78d71eaa0 --- /dev/null +++ b/db/rocksdb/batch.go @@ -0,0 +1,67 @@ +package rocksdb + +import ( + "sync/atomic" + + dbm "github.com/cosmos/cosmos-sdk/db" + dbutil "github.com/cosmos/cosmos-sdk/db/internal" + "github.com/tecbot/gorocksdb" +) + +type rocksDBBatch struct { + batch *gorocksdb.WriteBatch + mgr *dbManager +} + +var _ dbm.DBWriter = (*rocksDBBatch)(nil) + +func (mgr *dbManager) newRocksDBBatch() *rocksDBBatch { + return &rocksDBBatch{ + batch: gorocksdb.NewWriteBatch(), + mgr: mgr, + } +} + +// Set implements DBWriter. +func (b *rocksDBBatch) Set(key, value []byte) error { + if err := dbutil.ValidateKv(key, value); err != nil { + return err + } + if b.batch == nil { + return dbm.ErrTransactionClosed + } + b.batch.Put(key, value) + return nil +} + +// Delete implements DBWriter. +func (b *rocksDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return dbm.ErrKeyEmpty + } + if b.batch == nil { + return dbm.ErrTransactionClosed + } + b.batch.Delete(key) + return nil +} + +// Write implements DBWriter. +func (b *rocksDBBatch) Commit() (err error) { + if b.batch == nil { + return dbm.ErrTransactionClosed + } + defer func() { err = dbutil.CombineErrors(err, b.Discard(), "Discard also failed") }() + err = b.mgr.current.Write(b.mgr.opts.wo, b.batch) + return +} + +// Close implements DBWriter. +func (b *rocksDBBatch) Discard() error { + if b.batch != nil { + defer atomic.AddInt32(&b.mgr.openWriters, -1) + b.batch.Destroy() + b.batch = nil + } + return nil +} diff --git a/db/rocksdb/db.go b/db/rocksdb/db.go new file mode 100644 index 0000000000..f906ea5316 --- /dev/null +++ b/db/rocksdb/db.go @@ -0,0 +1,483 @@ +package rocksdb + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "sync/atomic" + + dbm "github.com/cosmos/cosmos-sdk/db" + dbutil "github.com/cosmos/cosmos-sdk/db/internal" + "github.com/tecbot/gorocksdb" +) + +var ( + currentDBFileName string = "current.db" + checkpointFileFormat string = "%020d.db" +) + +var ( + _ dbm.DBConnection = (*RocksDB)(nil) + _ dbm.DBReader = (*dbTxn)(nil) + _ dbm.DBWriter = (*dbWriter)(nil) + _ dbm.DBReadWriter = (*dbWriter)(nil) +) + +// RocksDB is a connection to a RocksDB key-value database. +type RocksDB = dbManager + +type dbManager struct { + current *dbConnection + dir string + opts dbOptions + vmgr *dbm.VersionManager + mtx sync.RWMutex + // Track open DBWriters + openWriters int32 + cpCache checkpointCache +} + +type dbConnection = gorocksdb.OptimisticTransactionDB + +type checkpointCache struct { + cache map[uint64]*cpCacheEntry + mtx sync.RWMutex +} + +type cpCacheEntry struct { + cxn *dbConnection + openCount uint +} + +type dbTxn struct { + txn *gorocksdb.Transaction + mgr *dbManager + version uint64 +} +type dbWriter struct{ dbTxn } + +type dbOptions struct { + dbo *gorocksdb.Options + txo *gorocksdb.OptimisticTransactionOptions + ro *gorocksdb.ReadOptions + wo *gorocksdb.WriteOptions +} + +// NewDB creates a new RocksDB key-value database with inside the given directory. +// If dir does not exist, it will be created. +func NewDB(dir string) (*dbManager, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + + // default rocksdb option, good enough for most cases, including heavy workloads. + // 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads). + // compression: snappy as default, need to -lsnappy to enable. + bbto := gorocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetBlockCache(gorocksdb.NewLRUCache(1 << 30)) + bbto.SetFilterPolicy(gorocksdb.NewBloomFilter(10)) + dbo := gorocksdb.NewDefaultOptions() + dbo.SetBlockBasedTableFactory(bbto) + dbo.SetCreateIfMissing(true) + dbo.IncreaseParallelism(runtime.NumCPU()) + // 1.5GB maximum memory use for writebuffer. + dbo.OptimizeLevelStyleCompaction(1<<30 + 512<<20) + + opts := dbOptions{ + dbo: dbo, + txo: gorocksdb.NewDefaultOptimisticTransactionOptions(), + ro: gorocksdb.NewDefaultReadOptions(), + wo: gorocksdb.NewDefaultWriteOptions(), + } + mgr := &dbManager{ + dir: dir, + opts: opts, + cpCache: checkpointCache{cache: map[uint64]*cpCacheEntry{}}, + } + + err := os.MkdirAll(mgr.checkpointsDir(), 0755) + if err != nil { + return nil, err + } + if mgr.vmgr, err = readVersions(mgr.checkpointsDir()); err != nil { + return nil, err + } + dbPath := filepath.Join(dir, currentDBFileName) + // if the current db file is missing but there are checkpoints, restore it + if mgr.vmgr.Count() > 0 { + if _, err = os.Stat(dbPath); os.IsNotExist(err) { + err = mgr.restoreFromCheckpoint(mgr.vmgr.Last(), dbPath) + if err != nil { + return nil, err + } + } + } + mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(dbo, dbPath) + if err != nil { + return nil, err + } + return mgr, nil +} + +func (mgr *dbManager) checkpointsDir() string { + return filepath.Join(mgr.dir, "checkpoints") +} + +// Reads directory for checkpoints files +func readVersions(dir string) (*dbm.VersionManager, error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + var versions []uint64 + for _, f := range files { + var version uint64 + if _, err := fmt.Sscanf(f.Name(), checkpointFileFormat, &version); err != nil { + return nil, err + } + versions = append(versions, version) + } + return dbm.NewVersionManager(versions), nil +} + +func (mgr *dbManager) checkpointPath(version uint64) (string, error) { + dbPath := filepath.Join(mgr.checkpointsDir(), fmt.Sprintf(checkpointFileFormat, version)) + if stat, err := os.Stat(dbPath); err != nil { + if errors.Is(err, os.ErrNotExist) { + err = dbm.ErrVersionDoesNotExist + } + return "", err + } else if !stat.IsDir() { + return "", dbm.ErrVersionDoesNotExist + } + return dbPath, nil +} + +func (mgr *dbManager) openCheckpoint(version uint64) (*dbConnection, error) { + mgr.cpCache.mtx.Lock() + defer mgr.cpCache.mtx.Unlock() + cp, has := mgr.cpCache.cache[version] + if has { + cp.openCount += 1 + return cp.cxn, nil + } + dbPath, err := mgr.checkpointPath(version) + if err != nil { + return nil, err + } + db, err := gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath) + if err != nil { + return nil, err + } + mgr.cpCache.cache[version] = &cpCacheEntry{cxn: db, openCount: 1} + return db, nil +} + +func (mgr *dbManager) Reader() dbm.DBReader { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + return &dbTxn{ + // Note: oldTransaction could be passed here as a small optimization to + // avoid allocating a new object. + txn: mgr.current.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil), + mgr: mgr, + } +} + +func (mgr *dbManager) ReaderAt(version uint64) (dbm.DBReader, error) { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + db, err := mgr.openCheckpoint(version) + if err != nil { + return nil, err + } + + return &dbTxn{ + txn: db.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil), + mgr: mgr, + version: version, + }, nil +} + +func (mgr *dbManager) ReadWriter() dbm.DBReadWriter { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + atomic.AddInt32(&mgr.openWriters, 1) + return &dbWriter{dbTxn{ + txn: mgr.current.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil), + mgr: mgr, + }} +} + +func (mgr *dbManager) Writer() dbm.DBWriter { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + atomic.AddInt32(&mgr.openWriters, 1) + return mgr.newRocksDBBatch() +} + +func (mgr *dbManager) Versions() (dbm.VersionSet, error) { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + return mgr.vmgr, nil +} + +// SaveNextVersion implements DBConnection. +func (mgr *dbManager) SaveNextVersion() (uint64, error) { + return mgr.save(0) +} + +// SaveVersion implements DBConnection. +func (mgr *dbManager) SaveVersion(target uint64) error { + if target == 0 { + return dbm.ErrInvalidVersion + } + _, err := mgr.save(target) + return err +} + +func (mgr *dbManager) save(target uint64) (uint64, error) { + mgr.mtx.Lock() + defer mgr.mtx.Unlock() + if mgr.openWriters > 0 { + return 0, dbm.ErrOpenTransactions + } + newVmgr := mgr.vmgr.Copy() + target, err := newVmgr.Save(target) + if err != nil { + return 0, err + } + cp, err := mgr.current.NewCheckpoint() + if err != nil { + return 0, err + } + dir := filepath.Join(mgr.checkpointsDir(), fmt.Sprintf(checkpointFileFormat, target)) + if err := cp.CreateCheckpoint(dir, 0); err != nil { + return 0, err + } + cp.Destroy() + mgr.vmgr = newVmgr + return target, nil +} + +func (mgr *dbManager) DeleteVersion(ver uint64) error { + if mgr.cpCache.has(ver) { + return dbm.ErrOpenTransactions + } + mgr.mtx.Lock() + defer mgr.mtx.Unlock() + dbPath, err := mgr.checkpointPath(ver) + if err != nil { + return err + } + mgr.vmgr = mgr.vmgr.Copy() + mgr.vmgr.Delete(ver) + return os.RemoveAll(dbPath) +} + +func (mgr *dbManager) Revert() (err error) { + mgr.mtx.RLock() + defer mgr.mtx.RUnlock() + if mgr.openWriters > 0 { + return dbm.ErrOpenTransactions + } + last := mgr.vmgr.Last() + if last == 0 { + return dbm.ErrInvalidVersion + } + // Close current connection and replace it with a checkpoint (created from the last checkpoint) + mgr.current.Close() + dbPath := filepath.Join(mgr.dir, currentDBFileName) + err = os.RemoveAll(dbPath) + if err != nil { + return + } + err = mgr.restoreFromCheckpoint(last, dbPath) + if err != nil { + return + } + mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath) + return +} + +func (mgr *dbManager) restoreFromCheckpoint(version uint64, path string) error { + cxn, err := mgr.openCheckpoint(version) + if err != nil { + return err + } + defer mgr.cpCache.decrement(version) + cp, err := cxn.NewCheckpoint() + if err != nil { + return err + } + err = cp.CreateCheckpoint(path, 0) + if err != nil { + return err + } + cp.Destroy() + return nil +} + +// Close implements DBConnection. +func (mgr *dbManager) Close() error { + mgr.current.Close() + mgr.opts.destroy() + return nil +} + +// Stats implements DBConnection. +func (mgr *dbManager) Stats() map[string]string { + keys := []string{"rocksdb.stats"} + stats := make(map[string]string, len(keys)) + for _, key := range keys { + stats[key] = mgr.current.GetProperty(key) + } + return stats +} + +// Get implements DBReader. +func (tx *dbTxn) Get(key []byte) ([]byte, error) { + if tx.txn == nil { + return nil, dbm.ErrTransactionClosed + } + if len(key) == 0 { + return nil, dbm.ErrKeyEmpty + } + res, err := tx.txn.Get(tx.mgr.opts.ro, key) + if err != nil { + return nil, err + } + return moveSliceToBytes(res), nil +} + +// Get implements DBReader. +func (tx *dbWriter) Get(key []byte) ([]byte, error) { + if tx.txn == nil { + return nil, dbm.ErrTransactionClosed + } + if len(key) == 0 { + return nil, dbm.ErrKeyEmpty + } + res, err := tx.txn.GetForUpdate(tx.mgr.opts.ro, key) + if err != nil { + return nil, err + } + return moveSliceToBytes(res), nil +} + +// Has implements DBReader. +func (tx *dbTxn) Has(key []byte) (bool, error) { + bytes, err := tx.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DBWriter. +func (tx *dbWriter) Set(key []byte, value []byte) error { + if tx.txn == nil { + return dbm.ErrTransactionClosed + } + if err := dbutil.ValidateKv(key, value); err != nil { + return err + } + return tx.txn.Put(key, value) +} + +// Delete implements DBWriter. +func (tx *dbWriter) Delete(key []byte) error { + if tx.txn == nil { + return dbm.ErrTransactionClosed + } + if len(key) == 0 { + return dbm.ErrKeyEmpty + } + return tx.txn.Delete(key) +} + +func (tx *dbWriter) Commit() (err error) { + if tx.txn == nil { + return dbm.ErrTransactionClosed + } + defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }() + err = tx.txn.Commit() + return +} + +func (tx *dbTxn) Discard() error { + if tx.txn == nil { + return nil // Discard() is idempotent + } + defer func() { tx.txn.Destroy(); tx.txn = nil }() + if tx.version == 0 { + return nil + } + if !tx.mgr.cpCache.decrement(tx.version) { + return fmt.Errorf("transaction has no corresponding checkpoint cache entry: %v", tx.version) + } + return nil +} + +func (tx *dbWriter) Discard() error { + if tx.txn != nil { + defer atomic.AddInt32(&tx.mgr.openWriters, -1) + } + return tx.dbTxn.Discard() +} + +// Iterator implements DBReader. +func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) { + if tx.txn == nil { + return nil, dbm.ErrTransactionClosed + } + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, dbm.ErrKeyEmpty + } + itr := tx.txn.NewIterator(tx.mgr.opts.ro) + return newRocksDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DBReader. +func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) { + if tx.txn == nil { + return nil, dbm.ErrTransactionClosed + } + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, dbm.ErrKeyEmpty + } + itr := tx.txn.NewIterator(tx.mgr.opts.ro) + return newRocksDBIterator(itr, start, end, true), nil +} + +func (o dbOptions) destroy() { + o.ro.Destroy() + o.wo.Destroy() + o.txo.Destroy() + o.dbo.Destroy() +} + +func (cpc *checkpointCache) has(ver uint64) bool { + cpc.mtx.RLock() + defer cpc.mtx.RUnlock() + _, has := cpc.cache[ver] + return has +} + +func (cpc *checkpointCache) decrement(ver uint64) bool { + cpc.mtx.Lock() + defer cpc.mtx.Unlock() + cp, has := cpc.cache[ver] + if !has { + return false + } + cp.openCount -= 1 + if cp.openCount == 0 { + cp.cxn.Close() + delete(cpc.cache, ver) + } + return true +} diff --git a/db/rocksdb/db_test.go b/db/rocksdb/db_test.go new file mode 100644 index 0000000000..313bfde5b4 --- /dev/null +++ b/db/rocksdb/db_test.go @@ -0,0 +1,63 @@ +package rocksdb + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + dbm "github.com/cosmos/cosmos-sdk/db" + "github.com/cosmos/cosmos-sdk/db/dbtest" +) + +func load(t *testing.T, dir string) dbm.DBConnection { + db, err := NewDB(dir) + require.NoError(t, err) + return db +} + +func TestGetSetHasDelete(t *testing.T) { + dbtest.DoTestGetSetHasDelete(t, load) +} + +func TestIterators(t *testing.T) { + dbtest.DoTestIterators(t, load) +} + +func TestTransactions(t *testing.T) { + dbtest.DoTestTransactions(t, load, true) +} + +func TestVersioning(t *testing.T) { + dbtest.DoTestVersioning(t, load) +} + +func TestRevert(t *testing.T) { + dbtest.DoTestRevert(t, load, false) + dbtest.DoTestRevert(t, load, true) +} + +func TestReloadDB(t *testing.T) { + dbtest.DoTestReloadDB(t, load) +} + +// Test that the DB can be reloaded after a failed Revert +func TestRevertRecovery(t *testing.T) { + dir := t.TempDir() + db, err := NewDB(dir) + require.NoError(t, err) + _, err = db.SaveNextVersion() + require.NoError(t, err) + txn := db.Writer() + require.NoError(t, txn.Set([]byte{1}, []byte{1})) + require.NoError(t, txn.Set([]byte{2}, []byte{2})) + require.NoError(t, txn.Commit()) + + // make checkpoints dir temporarily unreadable to trigger an error + require.NoError(t, os.Chmod(db.checkpointsDir(), 0000)) + require.Error(t, db.Revert()) + + require.NoError(t, os.Chmod(db.checkpointsDir(), 0755)) + db, err = NewDB(dir) + require.NoError(t, err) +} diff --git a/db/rocksdb/iterator.go b/db/rocksdb/iterator.go new file mode 100644 index 0000000000..2b7f7b74e4 --- /dev/null +++ b/db/rocksdb/iterator.go @@ -0,0 +1,147 @@ +package rocksdb + +import ( + "bytes" + + dbm "github.com/cosmos/cosmos-sdk/db" + "github.com/tecbot/gorocksdb" +) + +type rocksDBIterator struct { + source *gorocksdb.Iterator + start, end []byte + isReverse bool + isInvalid bool + // Whether iterator has been advanced to the first element (is fully initialized) + primed bool +} + +var _ dbm.Iterator = (*rocksDBIterator)(nil) + +func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator { + if isReverse { + if end == nil { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := moveSliceToBytes(source.Key()) // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &rocksDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + primed: false, + } +} + +// Domain implements Iterator. +func (itr *rocksDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *rocksDBIterator) Valid() bool { + if !itr.primed { + return false + } + + if itr.isInvalid { + return false + } + + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + var ( + start = itr.start + end = itr.end + key = moveSliceToBytes(itr.source.Key()) + ) + // If key is end or past it, invalid. + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(key, end) >= 0 { + itr.isInvalid = true + return false + } + } + return true +} + +// Key implements Iterator. +func (itr *rocksDBIterator) Key() []byte { + itr.assertIsValid() + return moveSliceToBytes(itr.source.Key()) +} + +// Value implements Iterator. +func (itr *rocksDBIterator) Value() []byte { + itr.assertIsValid() + return moveSliceToBytes(itr.source.Value()) +} + +// Next implements Iterator. +func (itr *rocksDBIterator) Next() bool { + if !itr.primed { + itr.primed = true + } else { + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } + } + return itr.Valid() +} + +// Error implements Iterator. +func (itr *rocksDBIterator) Error() error { + return itr.source.Err() +} + +// Close implements Iterator. +func (itr *rocksDBIterator) Close() error { + itr.source.Close() + return nil +} + +func (itr *rocksDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} + +// moveSliceToBytes will free the slice and copy out a go []byte +// This function can be applied on *Slice returned from Key() and Value() +// of an Iterator, because they are marked as freed. +func moveSliceToBytes(s *gorocksdb.Slice) []byte { + defer s.Free() + if !s.Exists() { + return nil + } + v := make([]byte, s.Size()) + copy(v, s.Data()) + return v +} diff --git a/db/types.go b/db/types.go index 69a494a1e3..d360f57f4a 100644 --- a/db/types.go +++ b/db/types.go @@ -3,8 +3,8 @@ package db import "errors" var ( - // ErrBatchClosed is returned when a closed or written batch is used. - ErrBatchClosed = errors.New("batch has been written or closed") + // ErrTransactionClosed is returned when a closed or written transaction is used. + ErrTransactionClosed = errors.New("transaction has been written or closed") // ErrKeyEmpty is returned when attempting to use an empty or nil key. ErrKeyEmpty = errors.New("key cannot be empty") @@ -61,6 +61,11 @@ type DBConnection interface { // Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. DeleteVersion(uint64) error + // Reverts the DB state to the last saved version. + // Returns an error if no saved versions exist. + // Returns an error if any open DBWriter transactions exist. + Revert() error + // Close closes the database connection. Close() error } @@ -97,7 +102,7 @@ type DBReader interface { ReverseIterator(start, end []byte) (Iterator, error) // Discards the transaction, invalidating any future operations on it. - Discard() + Discard() error } // DBWriter is a write-only transaction interface. @@ -117,7 +122,7 @@ type DBWriter interface { Commit() error // Discards the transaction, invalidating any future operations on it. - Discard() + Discard() error } // DBReadWriter is a transaction interface that allows both reading and writing. diff --git a/go.mod b/go.mod index bf4158d871..1824c6014e 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/confio/ics23/go v0.6.6 github.com/cosmos/btcutil v1.0.4 github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce + github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5 github.com/cosmos/go-bip39 v1.0.0 github.com/cosmos/iavl v0.17.1 github.com/cosmos/ledger-cosmos-go v0.11.1 diff --git a/go.sum b/go.sum index e30cd20db6..c4afe2770c 100644 --- a/go.sum +++ b/go.sum @@ -168,6 +168,8 @@ github.com/cosmos/btcutil v1.0.4 h1:n7C2ngKXo7UC9gNyMNLbzqz7Asuf+7Qv4gnX/rOdQ44= github.com/cosmos/btcutil v1.0.4/go.mod h1:Ffqc8Hn6TJUdDgHBwIZLtrLQC1KdJ9jGJl/TvgUaxbU= github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce h1:nin7WtIMETZ8LezEYa5e9/iqyEgQka1x0cQYqgUeTGM= github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce/go.mod h1:g2Q3cd94kOBVRAv7ahdtO27yUc4cpNuHGnI40qanl1k= +github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5 h1:GdtDczrd06rDuZ02iprBUfo0JkeauWHBQSDqoDYShf4= +github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5/go.mod h1:eAiR2sIGn3oIrcDiEUIqmH8UvPdIvN67Ui0XeKuTDnI= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=