From 14fd0ceda32021f2e2cb1f759c8846cf8eb47097 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Mon, 29 Jan 2024 15:21:20 -0500 Subject: [PATCH] feat(store/v2): remove cosmos-db depdency from store (#19229) --- store/batch.go | 31 ++ store/commitment/iavl/tree.go | 7 +- store/commitment/iavl/tree_test.go | 5 +- store/commitment/store.go | 44 +-- store/commitment/store_test_suite.go | 4 +- store/database.go | 57 +++- store/db/db_test.go | 107 ++++++ store/db/goleveldb.go | 427 +++++++++++++++++++++++ store/db/memdb.go | 465 ++++++++++++++++++++++++++ store/db/prefixdb.go | 348 +++++++++++++++++++ store/db/wrapper.go | 39 +++ store/errors.go | 9 +- store/go.mod | 12 +- store/go.sum | 8 +- store/{ => proof}/commit_info.go | 2 +- store/{ => proof}/commit_info_test.go | 2 +- store/{ => proof}/proof.go | 6 +- store/{ => proof}/proof_test.go | 2 +- store/pruning/manager_test.go | 2 +- store/root/store.go | 15 +- store/root/store_test.go | 2 +- store/snapshots/helpers_test.go | 4 +- store/snapshots/manager_test.go | 4 +- store/snapshots/store.go | 32 +- store/snapshots/store_test.go | 8 +- store/storage/rocksdb/batch.go | 4 + store/store.go | 5 +- 27 files changed, 1572 insertions(+), 79 deletions(-) create mode 100644 store/db/db_test.go create mode 100644 store/db/goleveldb.go create mode 100644 store/db/memdb.go create mode 100644 store/db/prefixdb.go create mode 100644 store/db/wrapper.go rename store/{ => proof}/commit_info.go (99%) rename store/{ => proof}/commit_info_test.go (99%) rename store/{ => proof}/proof.go (97%) rename store/{ => proof}/proof_test.go (99%) diff --git a/store/batch.go b/store/batch.go index aa09237cbc..752a7147f5 100644 --- a/store/batch.go +++ b/store/batch.go @@ -15,3 +15,34 @@ type Batch interface { // Reset resets the batch. Reset() } + +// RawBatch represents a group of writes. They may or may not be written atomically depending on the +// backend. Callers must call Close on the batch when done. +// +// As with RawDB, given keys and values should be considered read-only, and must not be modified after +// passing them to the batch. +type RawBatch interface { + // Set sets a key/value pair. + // CONTRACT: key, value readonly []byte + Set(key, value []byte) error + + // Delete deletes a key/value pair. + // CONTRACT: key readonly []byte + Delete(key []byte) error + + // Write writes the batch, possibly without flushing to disk. Only Close() can be called after, + // other methods will error. + Write() error + + // WriteSync writes the batch and flushes it to disk. Only Close() can be called after, other + // methods will error. + WriteSync() error + + // Close closes the batch. It is idempotent, but calls to other methods afterwards will error. + Close() error + + // GetByteSize that returns the current size of the batch in bytes. Depending on the implementation, + // this may return the size of the underlying LSM batch, including the size of additional metadata + // on top of the expected key and value total byte count. + GetByteSize() (int, error) +} diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index d3c126ad0e..f4dd62bfc0 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -3,12 +3,13 @@ package iavl import ( "fmt" - dbm "github.com/cosmos/cosmos-db" "github.com/cosmos/iavl" ics23 "github.com/cosmos/ics23/go" log "cosmossdk.io/log" + "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" + dbm "cosmossdk.io/store/v2/db" ) var _ commitment.Tree = (*IavlTree)(nil) @@ -19,8 +20,8 @@ type IavlTree struct { } // NewIavlTree creates a new IavlTree instance. -func NewIavlTree(db dbm.DB, logger log.Logger, cfg *Config) *IavlTree { - tree := iavl.NewMutableTree(db, cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) +func NewIavlTree(db store.RawDB, logger log.Logger, cfg *Config) *IavlTree { + tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) return &IavlTree{ tree: tree, } diff --git a/store/commitment/iavl/tree_test.go b/store/commitment/iavl/tree_test.go index 6963c8aabe..b3337a71e1 100644 --- a/store/commitment/iavl/tree_test.go +++ b/store/commitment/iavl/tree_test.go @@ -3,17 +3,18 @@ package iavl import ( "testing" - dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "cosmossdk.io/log" + "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" + dbm "cosmossdk.io/store/v2/db" ) func TestCommitterSuite(t *testing.T) { s := &commitment.CommitStoreTestSuite{ - NewStore: func(db dbm.DB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) { + NewStore: func(db store.RawDB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) { multiTrees := make(map[string]commitment.Tree) cfg := DefaultConfig() for _, storeKey := range storeKeys { diff --git a/store/commitment/store.go b/store/commitment/store.go index bfe319e0d0..a8b160101f 100644 --- a/store/commitment/store.go +++ b/store/commitment/store.go @@ -7,12 +7,12 @@ import ( "io" "math" - dbm "github.com/cosmos/cosmos-db" protoio "github.com/cosmos/gogoproto/io" "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/internal/encoding" + "cosmossdk.io/store/v2/proof" "cosmossdk.io/store/v2/snapshots" snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) @@ -34,12 +34,12 @@ var ( // and trees. type CommitStore struct { logger log.Logger - db dbm.DB + db store.RawDB multiTrees map[string]Tree } // NewCommitStore creates a new CommitStore instance. -func NewCommitStore(multiTrees map[string]Tree, db dbm.DB, logger log.Logger) (*CommitStore, error) { +func NewCommitStore(multiTrees map[string]Tree, db store.RawDB, logger log.Logger) (*CommitStore, error) { return &CommitStore{ logger: logger, db: db, @@ -67,19 +67,19 @@ func (c *CommitStore) WriteBatch(cs *store.Changeset) error { return nil } -func (c *CommitStore) WorkingCommitInfo(version uint64) *store.CommitInfo { - storeInfos := make([]store.StoreInfo, 0, len(c.multiTrees)) +func (c *CommitStore) WorkingCommitInfo(version uint64) *proof.CommitInfo { + storeInfos := make([]proof.StoreInfo, 0, len(c.multiTrees)) for storeKey, tree := range c.multiTrees { - storeInfos = append(storeInfos, store.StoreInfo{ + storeInfos = append(storeInfos, proof.StoreInfo{ Name: storeKey, - CommitID: store.CommitID{ + CommitID: proof.CommitID{ Version: version, Hash: tree.WorkingHash(), }, }) } - return &store.CommitInfo{ + return &proof.CommitInfo{ Version: version, StoreInfos: storeInfos, } @@ -129,7 +129,7 @@ func (c *CommitStore) LoadVersion(targetVersion uint64) error { // If the target version is greater than the latest version, it is the snapshot // restore case, we should create a new commit info for the target version. - var cInfo *store.CommitInfo + var cInfo *proof.CommitInfo if targetVersion > latestVersion { cInfo = c.WorkingCommitInfo(targetVersion) } @@ -137,7 +137,7 @@ func (c *CommitStore) LoadVersion(targetVersion uint64) error { return c.flushCommitInfo(targetVersion, cInfo) } -func (c *CommitStore) GetCommitInfo(version uint64) (*store.CommitInfo, error) { +func (c *CommitStore) GetCommitInfo(version uint64) (*proof.CommitInfo, error) { key := []byte(fmt.Sprintf(commitInfoKeyFmt, version)) value, err := c.db.Get(key) if err != nil { @@ -147,7 +147,7 @@ func (c *CommitStore) GetCommitInfo(version uint64) (*store.CommitInfo, error) { return nil, nil } - cInfo := &store.CommitInfo{} + cInfo := &proof.CommitInfo{} if err := cInfo.Unmarshal(value); err != nil { return nil, err } @@ -155,7 +155,7 @@ func (c *CommitStore) GetCommitInfo(version uint64) (*store.CommitInfo, error) { return cInfo, nil } -func (c *CommitStore) flushCommitInfo(version uint64, cInfo *store.CommitInfo) error { +func (c *CommitStore) flushCommitInfo(version uint64, cInfo *proof.CommitInfo) error { batch := c.db.NewBatch() if cInfo != nil { cInfoKey := []byte(fmt.Sprintf(commitInfoKeyFmt, version)) @@ -180,14 +180,14 @@ func (c *CommitStore) flushCommitInfo(version uint64, cInfo *store.CommitInfo) e return batch.WriteSync() } -func (c *CommitStore) Commit(version uint64) (*store.CommitInfo, error) { - storeInfos := make([]store.StoreInfo, 0, len(c.multiTrees)) +func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) { + storeInfos := make([]proof.StoreInfo, 0, len(c.multiTrees)) for storeKey, tree := range c.multiTrees { // If a commit event execution is interrupted, a new iavl store's version // will be larger than the RMS's metadata, when the block is replayed, we // should avoid committing that iavl store again. - var commitID store.CommitID + var commitID proof.CommitID if tree.GetLatestVersion() >= version { commitID.Version = version commitID.Hash = tree.Hash() @@ -196,18 +196,18 @@ func (c *CommitStore) Commit(version uint64) (*store.CommitInfo, error) { if err != nil { return nil, err } - commitID = store.CommitID{ + commitID = proof.CommitID{ Version: version, Hash: hash, } } - storeInfos = append(storeInfos, store.StoreInfo{ + storeInfos = append(storeInfos, proof.StoreInfo{ Name: storeKey, CommitID: commitID, }) } - cInfo := &store.CommitInfo{ + cInfo := &proof.CommitInfo{ Version: version, StoreInfos: storeInfos, } @@ -229,13 +229,13 @@ func (c *CommitStore) SetInitialVersion(version uint64) error { return nil } -func (c *CommitStore) GetProof(storeKey string, version uint64, key []byte) ([]store.CommitmentOp, error) { +func (c *CommitStore) GetProof(storeKey string, version uint64, key []byte) ([]proof.CommitmentOp, error) { tree, ok := c.multiTrees[storeKey] if !ok { return nil, fmt.Errorf("store %s not found", storeKey) } - proof, err := tree.GetProof(version, key) + iProof, err := tree.GetProof(version, key) if err != nil { return nil, err } @@ -246,13 +246,13 @@ func (c *CommitStore) GetProof(storeKey string, version uint64, key []byte) ([]s if cInfo == nil { return nil, fmt.Errorf("commit info not found for version %d", version) } - commitOp := store.NewIAVLCommitmentOp(key, proof) + commitOp := proof.NewIAVLCommitmentOp(key, iProof) _, storeCommitmentOp, err := cInfo.GetStoreProof(storeKey) if err != nil { return nil, err } - return []store.CommitmentOp{commitOp, *storeCommitmentOp}, nil + return []proof.CommitmentOp{commitOp, *storeCommitmentOp}, nil } func (c *CommitStore) Get(storeKey string, version uint64, key []byte) ([]byte, error) { diff --git a/store/commitment/store_test_suite.go b/store/commitment/store_test_suite.go index f46f76abd7..fc277bba32 100644 --- a/store/commitment/store_test_suite.go +++ b/store/commitment/store_test_suite.go @@ -5,11 +5,11 @@ import ( "io" "sync" - dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/suite" "cosmossdk.io/log" "cosmossdk.io/store/v2" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) @@ -23,7 +23,7 @@ const ( type CommitStoreTestSuite struct { suite.Suite - NewStore func(db dbm.DB, storeKeys []string, logger log.Logger) (*CommitStore, error) + NewStore func(db store.RawDB, storeKeys []string, logger log.Logger) (*CommitStore, error) } func (s *CommitStoreTestSuite) TestSnapshotter() { diff --git a/store/database.go b/store/database.go index e665a97278..987aaa5666 100644 --- a/store/database.go +++ b/store/database.go @@ -4,6 +4,7 @@ import ( "io" corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2/proof" ) // Reader wraps the Has and Get method of a backing data store. @@ -71,7 +72,7 @@ type Committer interface { WriteBatch(cs *Changeset) error // WorkingCommitInfo returns the CommitInfo for the working tree. - WorkingCommitInfo(version uint64) *CommitInfo + WorkingCommitInfo(version uint64) *proof.CommitInfo // GetLatestVersion returns the latest version. GetLatestVersion() (uint64, error) @@ -80,10 +81,10 @@ type Committer interface { LoadVersion(targetVersion uint64) error // Commit commits the working tree to the database. - Commit(version uint64) (*CommitInfo, error) + Commit(version uint64) (*proof.CommitInfo, error) // GetProof returns the proof of existence or non-existence for the given key. - GetProof(storeKey string, version uint64, key []byte) ([]CommitmentOp, error) + GetProof(storeKey string, version uint64, key []byte) ([]proof.CommitmentOp, error) // Get returns the value for the given key at the given version. // @@ -95,7 +96,7 @@ type Committer interface { SetInitialVersion(version uint64) error // GetCommitInfo returns the CommitInfo for the given version. - GetCommitInfo(version uint64) (*CommitInfo, error) + GetCommitInfo(version uint64) (*proof.CommitInfo, error) // Prune attempts to prune all versions up to and including the provided // version argument. The operation should be idempotent. An error should be @@ -106,3 +107,51 @@ type Committer interface { // only be called once and any call after may panic. io.Closer } + +// RawDB is the main interface for all key-value database backends. DBs are concurrency-safe. +// Callers must call Close on the database when done. +// +// Keys cannot be nil or empty, while values cannot be nil. Keys and values should be considered +// read-only, both when returned and when given, and must be copied before they are modified. +type RawDB interface { + // Get fetches the value of the given key, or nil if it does not exist. + // CONTRACT: key, value readonly []byte + Get([]byte) ([]byte, error) + + // Has checks if a key exists. + // CONTRACT: key, value readonly []byte + Has(key []byte) (bool, error) + + // Iterator returns an iterator over a domain of keys, in ascending order. The caller must call + // Close when done. End is exclusive, and start must be less than end. A nil start iterates + // from the first key, and a nil end iterates to the last key (inclusive). Empty keys are not + // valid. + // CONTRACT: No writes may happen within a domain while an iterator exists over it. + // CONTRACT: start, end readonly []byte + Iterator(start, end []byte) (corestore.Iterator, error) + + // ReverseIterator returns an iterator over a domain of keys, in descending order. The caller + // must call Close when done. End is exclusive, and start must be less than end. A nil end + // iterates from the last key (inclusive), and a nil start iterates to the first key (inclusive). + // Empty keys are not valid. + // CONTRACT: No writes may happen within a domain while an iterator exists over it. + // CONTRACT: start, end readonly []byte + ReverseIterator(start, end []byte) (corestore.Iterator, error) + + // Close closes the database connection. + Close() error + + // NewBatch creates a batch for atomic updates. The caller must call Batch.Close. + NewBatch() RawBatch + + // NewBatchWithSize create a new batch for atomic updates, but with pre-allocated size. + // This will does the same thing as NewBatch if the batch implementation doesn't support pre-allocation. + NewBatchWithSize(int) RawBatch +} + +type ( + // Options defines the interface of a database options. + Options interface { + Get(string) interface{} + } +) diff --git a/store/db/db_test.go b/store/db/db_test.go new file mode 100644 index 0000000000..9d66786d8f --- /dev/null +++ b/store/db/db_test.go @@ -0,0 +1,107 @@ +package db + +import ( + "fmt" + "testing" + + "cosmossdk.io/store/v2" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type DBTestSuite struct { + suite.Suite + + db store.RawDB +} + +func (s *DBTestSuite) TestDBOperations() { + // Set + b := s.db.NewBatch() + s.Require().NoError(b.Set([]byte("key"), []byte("value"))) + s.Require().NoError(b.Set([]byte("key1"), []byte("value1"))) + s.Require().NoError(b.Set([]byte("key2"), []byte("value2"))) + s.Require().NoError(b.Write()) + + // Get + value, err := s.db.Get([]byte("key")) + s.Require().NoError(err) + s.Require().Equal([]byte("value"), value) + + // Has + has, err := s.db.Has([]byte("key1")) + s.Require().NoError(err) + s.Require().True(has) + has, err = s.db.Has([]byte("key3")) + s.Require().NoError(err) + s.Require().False(has) + + // Delete + b = s.db.NewBatch() + s.Require().NoError(b.Delete([]byte("key1"))) + s.Require().NoError(b.Write()) + + // Has + has, err = s.db.Has([]byte("key1")) + s.Require().NoError(err) + s.Require().False(has) +} + +func (s *DBTestSuite) TestIterator() { + // Set + b := s.db.NewBatch() + for i := 0; i < 10; i++ { + s.Require().NoError(b.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))) + } + s.Require().NoError(b.Write()) + + // Iterator + itr, err := s.db.Iterator(nil, nil) + s.Require().NoError(err) + defer itr.Close() + + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + value1, err := s.db.Get(key) + s.Require().NoError(err) + s.Require().Equal(value1, value) + } + + // Reverse Iterator + ritr, err := s.db.ReverseIterator([]byte("key0"), []byte("keys")) + s.Require().NoError(err) + defer ritr.Close() + + index := 9 + for ; ritr.Valid(); ritr.Next() { + key := ritr.Key() + value := ritr.Value() + s.Require().Equal([]byte(fmt.Sprintf("key%d", index)), key) + value1, err := s.db.Get(key) + s.Require().NoError(err) + s.Require().Equal(value1, value) + index -= 1 + } + s.Require().Equal(-1, index) +} + +func TestMemDBSuite(t *testing.T) { + suite.Run(t, &DBTestSuite{ + db: NewMemDB(), + }) +} + +func TestGoLevelDBSuite(t *testing.T) { + db, err := NewGoLevelDB("test", t.TempDir(), nil) + require.NoError(t, err) + suite.Run(t, &DBTestSuite{ + db: db, + }) +} + +func TestPrefixDBSuite(t *testing.T) { + suite.Run(t, &DBTestSuite{ + db: NewPrefixDB(NewMemDB(), []byte("prefix")), + }) +} diff --git a/store/db/goleveldb.go b/store/db/goleveldb.go new file mode 100644 index 0000000000..b2bf0953a6 --- /dev/null +++ b/store/db/goleveldb.go @@ -0,0 +1,427 @@ +package db + +import ( + "bytes" + "fmt" + "path/filepath" + + "github.com/spf13/cast" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" + + corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2" +) + +// GoLevelDB implements RawDB using github.com/syndtr/goleveldb/leveldb. +// It is used for only store v2 migration, since some clients use goleveldb as the iavl v0/v1 backend. +type GoLevelDB struct { + db *leveldb.DB +} + +var _ store.RawDB = (*GoLevelDB)(nil) + +func NewGoLevelDB(name string, dir string, opts store.Options) (*GoLevelDB, error) { + defaultOpts := &opt.Options{ + Filter: filter.NewBloomFilter(10), // by default, goleveldb doesn't use a bloom filter. + } + if opts != nil { + files := cast.ToInt(opts.Get("maxopenfiles")) + if files > 0 { + defaultOpts.OpenFilesCacheCapacity = files + } + } + + return NewGoLevelDBWithOpts(name, dir, defaultOpts) +} + +func NewGoLevelDBWithOpts(name string, dir string, o *opt.Options) (*GoLevelDB, error) { + dbPath := filepath.Join(dir, name+".db") + db, err := leveldb.OpenFile(dbPath, o) + if err != nil { + return nil, err + } + database := &GoLevelDB{ + db: db, + } + return database, nil +} + +// Get implements RawDB. +func (db *GoLevelDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, store.ErrKeyEmpty + } + res, err := db.db.Get(key, nil) + if err != nil { + if err == errors.ErrNotFound { + return nil, nil + } + return nil, err + } + return res, nil +} + +// Has implements RawDB. +func (db *GoLevelDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements RawDB. +func (db *GoLevelDB) Set(key []byte, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + if err := db.db.Put(key, value, nil); err != nil { + return err + } + return nil +} + +// SetSync implements RawDB. +func (db *GoLevelDB) SetSync(key []byte, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + if err := db.db.Put(key, value, &opt.WriteOptions{Sync: true}); err != nil { + return err + } + return nil +} + +// Delete implements RawDB. +func (db *GoLevelDB) Delete(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if err := db.db.Delete(key, nil); err != nil { + return err + } + return nil +} + +// DeleteSync implements RawDB. +func (db *GoLevelDB) DeleteSync(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + err := db.db.Delete(key, &opt.WriteOptions{Sync: true}) + if err != nil { + return err + } + return nil +} + +func (db *GoLevelDB) RawDB() *leveldb.DB { + return db.db +} + +// Close implements RawDB. +func (db *GoLevelDB) Close() error { + if err := db.db.Close(); err != nil { + return err + } + return nil +} + +// Print implements RawDB. +func (db *GoLevelDB) Print() error { + str, err := db.db.GetProperty("leveldb.stats") + if err != nil { + return err + } + fmt.Printf("%v\n", str) + + itr := db.db.NewIterator(nil, nil) + for itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements RawDB. +func (db *GoLevelDB) Stats() map[string]string { + keys := []string{ + "leveldb.num-files-at-level{n}", + "leveldb.stats", + "leveldb.sstables", + "leveldb.blockpool", + "leveldb.cachedblock", + "leveldb.openedtables", + "leveldb.alivesnaps", + "leveldb.aliveiters", + } + + stats := make(map[string]string) + for _, key := range keys { + str, err := db.db.GetProperty(key) + if err == nil { + stats[key] = str + } + } + return stats +} + +func (db *GoLevelDB) ForceCompact(start, limit []byte) error { + return db.db.CompactRange(util.Range{Start: start, Limit: limit}) +} + +// NewBatch implements RawDB. +func (db *GoLevelDB) NewBatch() store.RawBatch { + return newGoLevelDBBatch(db) +} + +// NewBatchWithSize implements RawDB. +func (db *GoLevelDB) NewBatchWithSize(size int) store.RawBatch { + return newGoLevelDBBatchWithSize(db, size) +} + +// Iterator implements RawDB. +func (db *GoLevelDB) Iterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + itr := db.db.NewIterator(&util.Range{Start: start, Limit: end}, nil) + return newGoLevelDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements RawDB. +func (db *GoLevelDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + itr := db.db.NewIterator(&util.Range{Start: start, Limit: end}, nil) + return newGoLevelDBIterator(itr, start, end, true), nil +} + +type goLevelDBIterator struct { + source iterator.Iterator + start []byte + end []byte + isReverse bool + isInvalid bool +} + +var _ corestore.Iterator = (*goLevelDBIterator)(nil) + +func newGoLevelDBIterator(source iterator.Iterator, start, end []byte, isReverse bool) *goLevelDBIterator { + if isReverse { + if end == nil { + source.Last() + } else { + valid := source.Seek(end) + if valid { + eoakey := source.Key() // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.Last() + } + } + } else { + if start == nil { + source.First() + } else { + source.Seek(start) + } + } + return &goLevelDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *goLevelDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *goLevelDBIterator) Valid() bool { + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // If source errors, invalid. + if err := itr.Error(); err != nil { + itr.isInvalid = true + return false + } + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + start := itr.start + end := itr.end + key := itr.source.Key() + + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +// Key implements Iterator. +func (itr *goLevelDBIterator) Key() []byte { + // Key returns a copy of the current key. + // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 + itr.assertIsValid() + return cp(itr.source.Key()) +} + +// Value implements Iterator. +func (itr *goLevelDBIterator) Value() []byte { + // Value returns a copy of the current value. + // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 + itr.assertIsValid() + return cp(itr.source.Value()) +} + +// Next implements Iterator. +func (itr *goLevelDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr *goLevelDBIterator) Error() error { + return itr.source.Error() +} + +// Close implements Iterator. +func (itr *goLevelDBIterator) Close() error { + itr.source.Release() + return nil +} + +func (itr goLevelDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} + +type goLevelDBBatch struct { + db *GoLevelDB + batch *leveldb.Batch +} + +var _ store.RawBatch = (*goLevelDBBatch)(nil) + +func newGoLevelDBBatch(db *GoLevelDB) *goLevelDBBatch { + return &goLevelDBBatch{ + db: db, + batch: new(leveldb.Batch), + } +} + +func newGoLevelDBBatchWithSize(db *GoLevelDB, size int) *goLevelDBBatch { + return &goLevelDBBatch{ + db: db, + batch: leveldb.MakeBatch(size), + } +} + +// Set implements RawBatch. +func (b *goLevelDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + if b.batch == nil { + return store.ErrBatchClosed + } + b.batch.Put(key, value) + return nil +} + +// Delete implements RawBatch. +func (b *goLevelDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if b.batch == nil { + return store.ErrBatchClosed + } + b.batch.Delete(key) + return nil +} + +// Write implements RawBatch. +func (b *goLevelDBBatch) Write() error { + return b.write(false) +} + +// WriteSync implements RawBatch. +func (b *goLevelDBBatch) WriteSync() error { + return b.write(true) +} + +func (b *goLevelDBBatch) write(sync bool) error { + if b.batch == nil { + return store.ErrBatchClosed + } + err := b.db.db.Write(b.batch, &opt.WriteOptions{Sync: sync}) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// Close implements RawBatch. +func (b *goLevelDBBatch) Close() error { + if b.batch != nil { + b.batch.Reset() + b.batch = nil + } + return nil +} + +// GetByteSize implements RawBatch +func (b *goLevelDBBatch) GetByteSize() (int, error) { + if b.batch == nil { + return 0, store.ErrBatchClosed + } + return len(b.batch.Dump()), nil +} diff --git a/store/db/memdb.go b/store/db/memdb.go new file mode 100644 index 0000000000..c13f513ca8 --- /dev/null +++ b/store/db/memdb.go @@ -0,0 +1,465 @@ +package db + +import ( + "bytes" + "context" + "fmt" + "sync" + + "github.com/google/btree" + + corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2" +) + +const ( + // The approximate number of items and children per B-tree node. Tuned with benchmarks. + bTreeDegree = 32 +) + +// item is a btree.Item with byte slices as keys and values +type item struct { + key []byte + value []byte +} + +// Less implements btree.Item. +func (i item) Less(other btree.Item) bool { + // this considers nil == []byte{}, but that's ok since we handle nil endpoints + // in iterators specially anyway + return bytes.Compare(i.key, other.(item).key) == -1 +} + +// newKey creates a new key item. +func newKey(key []byte) item { + return item{key: key} +} + +// newPair creates a new pair item. +func newPair(key, value []byte) item { + return item{key: key, value: value} +} + +// MemDB is an in-memory database backend using a B-tree for storage. +// +// For performance reasons, all given and returned keys and values are pointers to the in-memory +// database, so modifying them will cause the stored values to be modified as well. All DB methods +// already specify that keys and values should be considered read-only, but this is especially +// important with MemDB. +type MemDB struct { + mtx sync.RWMutex + btree *btree.BTree +} + +var _ store.RawDB = (*MemDB)(nil) + +// NewMemDB creates a new in-memory database. +func NewMemDB() *MemDB { + database := &MemDB{ + btree: btree.New(bTreeDegree), + } + return database +} + +// Get implements DB. +func (db *MemDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, store.ErrKeyEmpty + } + db.mtx.RLock() + defer db.mtx.RUnlock() + + i := db.btree.Get(newKey(key)) + if i != nil { + return i.(item).value, nil + } + return nil, nil +} + +// Has implements DB. +func (db *MemDB) Has(key []byte) (bool, error) { + if len(key) == 0 { + return false, store.ErrKeyEmpty + } + db.mtx.RLock() + defer db.mtx.RUnlock() + + return db.btree.Has(newKey(key)), nil +} + +// Set implements DB. +func (db *MemDB) Set(key []byte, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + db.mtx.Lock() + defer db.mtx.Unlock() + + db.set(key, value) + return nil +} + +// set sets a value without locking the mutex. +func (db *MemDB) set(key []byte, value []byte) { + db.btree.ReplaceOrInsert(newPair(key, value)) +} + +// SetSync implements DB. +func (db *MemDB) SetSync(key []byte, value []byte) error { + return db.Set(key, value) +} + +// Delete implements DB. +func (db *MemDB) Delete(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + db.mtx.Lock() + defer db.mtx.Unlock() + + db.delete(key) + return nil +} + +// delete deletes a key without locking the mutex. +func (db *MemDB) delete(key []byte) { + db.btree.Delete(newKey(key)) +} + +// DeleteSync implements DB. +func (db *MemDB) DeleteSync(key []byte) error { + return db.Delete(key) +} + +// Close implements DB. +func (db *MemDB) Close() error { + // Close is a noop since for an in-memory database, we don't have a destination to flush + // contents to nor do we want any data loss on invoking Close(). + // See the discussion in https://github.com/tendermint/tendermint/libs/pull/56 + return nil +} + +// Print implements DB. +func (db *MemDB) Print() error { + db.mtx.RLock() + defer db.mtx.RUnlock() + + db.btree.Ascend(func(i btree.Item) bool { + item := i.(item) + fmt.Printf("[%X]:\t[%X]\n", item.key, item.value) + return true + }) + return nil +} + +// Stats implements DB. +func (db *MemDB) Stats() map[string]string { + db.mtx.RLock() + defer db.mtx.RUnlock() + + stats := make(map[string]string) + stats["database.type"] = "memDB" + stats["database.size"] = fmt.Sprintf("%d", db.btree.Len()) + return stats +} + +// NewBatch implements DB. +func (db *MemDB) NewBatch() store.RawBatch { + return newMemDBBatch(db) +} + +// NewBatchWithSize implements DB. +// It does the same thing as NewBatch because we can't pre-allocate memDBBatch +func (db *MemDB) NewBatchWithSize(size int) store.RawBatch { + return newMemDBBatch(db) +} + +// Iterator implements DB. +// Takes out a read-lock on the database until the iterator is closed. +func (db *MemDB) Iterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + return newMemDBIterator(db, start, end, false), nil +} + +// ReverseIterator implements DB. +// Takes out a read-lock on the database until the iterator is closed. +func (db *MemDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + return newMemDBIterator(db, start, end, true), nil +} + +// IteratorNoMtx makes an iterator with no mutex. +func (db *MemDB) IteratorNoMtx(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + return newMemDBIteratorMtxChoice(db, start, end, false, false), nil +} + +// ReverseIteratorNoMtx makes an iterator with no mutex. +func (db *MemDB) ReverseIteratorNoMtx(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + return newMemDBIteratorMtxChoice(db, start, end, true, false), nil +} + +const ( + // Size of the channel buffer between traversal goroutine and iterator. Using an unbuffered + // channel causes two context switches per item sent, while buffering allows more work per + // context switch. Tuned with benchmarks. + chBufferSize = 64 +) + +// memDBIterator is a memDB iterator. +type memDBIterator struct { + ch <-chan *item + cancel context.CancelFunc + item *item + start []byte + end []byte + useMtx bool +} + +var _ corestore.Iterator = (*memDBIterator)(nil) + +// newMemDBIterator creates a new memDBIterator. +func newMemDBIterator(db *MemDB, start []byte, end []byte, reverse bool) *memDBIterator { + return newMemDBIteratorMtxChoice(db, start, end, reverse, true) +} + +func newMemDBIteratorMtxChoice(db *MemDB, start []byte, end []byte, reverse bool, useMtx bool) *memDBIterator { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan *item, chBufferSize) + iter := &memDBIterator{ + ch: ch, + cancel: cancel, + start: start, + end: end, + useMtx: useMtx, + } + + if useMtx { + db.mtx.RLock() + } + go func() { + if useMtx { + defer db.mtx.RUnlock() + } + // Because we use [start, end) for reverse ranges, while btree uses (start, end], we need + // the following variables to handle some reverse iteration conditions ourselves. + var ( + skipEqual []byte + abortLessThan []byte + ) + visitor := func(i btree.Item) bool { + item := i.(item) + if skipEqual != nil && bytes.Equal(item.key, skipEqual) { + skipEqual = nil + return true + } + if abortLessThan != nil && bytes.Compare(item.key, abortLessThan) == -1 { + return false + } + select { + case <-ctx.Done(): + return false + case ch <- &item: + return true + } + } + switch { + case start == nil && end == nil && !reverse: + db.btree.Ascend(visitor) + case start == nil && end == nil && reverse: + db.btree.Descend(visitor) + case end == nil && !reverse: + // must handle this specially, since nil is considered less than anything else + db.btree.AscendGreaterOrEqual(newKey(start), visitor) + case !reverse: + db.btree.AscendRange(newKey(start), newKey(end), visitor) + case end == nil: + // abort after start, since we use [start, end) while btree uses (start, end] + abortLessThan = start + db.btree.Descend(visitor) + default: + // skip end and abort after start, since we use [start, end) while btree uses (start, end] + skipEqual = end + abortLessThan = start + db.btree.DescendLessOrEqual(newKey(end), visitor) + } + close(ch) + }() + + // prime the iterator with the first value, if any + if item, ok := <-ch; ok { + iter.item = item + } + + return iter +} + +// Close implements Iterator. +func (i *memDBIterator) Close() error { + i.cancel() + for range i.ch { // drain channel + } + i.item = nil + return nil +} + +// Domain implements Iterator. +func (i *memDBIterator) Domain() ([]byte, []byte) { + return i.start, i.end +} + +// Valid implements Iterator. +func (i *memDBIterator) Valid() bool { + return i.item != nil +} + +// Next implements Iterator. +func (i *memDBIterator) Next() { + i.assertIsValid() + item, ok := <-i.ch + switch { + case ok: + i.item = item + default: + i.item = nil + } +} + +// Error implements Iterator. +func (i *memDBIterator) Error() error { + return nil // famous last words +} + +// Key implements Iterator. +func (i *memDBIterator) Key() []byte { + i.assertIsValid() + return i.item.key +} + +// Value implements Iterator. +func (i *memDBIterator) Value() []byte { + i.assertIsValid() + return i.item.value +} + +func (i *memDBIterator) assertIsValid() { + if !i.Valid() { + panic("iterator is invalid") + } +} + +// memDBBatch operations +type opType int + +const ( + opTypeSet opType = iota + 1 + opTypeDelete +) + +type operation struct { + opType + key []byte + value []byte +} + +// memDBBatch handles in-memory batching. +type memDBBatch struct { + db *MemDB + ops []operation + size int +} + +var _ store.RawBatch = (*memDBBatch)(nil) + +// newMemDBBatch creates a new memDBBatch +func newMemDBBatch(db *MemDB) *memDBBatch { + return &memDBBatch{ + db: db, + ops: []operation{}, + size: 0, + } +} + +// Set implements Batch. +func (b *memDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + if b.ops == nil { + return store.ErrBatchClosed + } + b.size += len(key) + len(value) + b.ops = append(b.ops, operation{opTypeSet, key, value}) + return nil +} + +// Delete implements Batch. +func (b *memDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if b.ops == nil { + return store.ErrBatchClosed + } + b.size += len(key) + b.ops = append(b.ops, operation{opTypeDelete, key, nil}) + return nil +} + +// Write implements Batch. +func (b *memDBBatch) Write() error { + if b.ops == nil { + return store.ErrBatchClosed + } + b.db.mtx.Lock() + defer b.db.mtx.Unlock() + + for _, op := range b.ops { + switch op.opType { + case opTypeSet: + b.db.set(op.key, op.value) + case opTypeDelete: + b.db.delete(op.key) + default: + return fmt.Errorf("unknown operation type %v (%v)", op.opType, op) + } + } + + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// WriteSync implements Batch. +func (b *memDBBatch) WriteSync() error { + return b.Write() +} + +// Close implements Batch. +func (b *memDBBatch) Close() error { + b.ops = nil + b.size = 0 + return nil +} + +// GetByteSize implements Batch +func (b *memDBBatch) GetByteSize() (int, error) { + if b.ops == nil { + return 0, store.ErrBatchClosed + } + return b.size, nil +} diff --git a/store/db/prefixdb.go b/store/db/prefixdb.go new file mode 100644 index 0000000000..1b8e091bf5 --- /dev/null +++ b/store/db/prefixdb.go @@ -0,0 +1,348 @@ +package db + +import ( + "bytes" + "fmt" + "sync" + + corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2" +) + +// PrefixDB wraps a namespace of another database as a logical database. +type PrefixDB struct { + mtx sync.Mutex + prefix []byte + db store.RawDB +} + +var _ store.RawDB = (*PrefixDB)(nil) + +// NewPrefixDB lets you namespace multiple RawDBs within a single RawDB. +func NewPrefixDB(db store.RawDB, prefix []byte) *PrefixDB { + return &PrefixDB{ + prefix: prefix, + db: db, + } +} + +// Get implements RawDB. +func (pdb *PrefixDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, store.ErrKeyEmpty + } + + pkey := pdb.prefixed(key) + value, err := pdb.db.Get(pkey) + if err != nil { + return nil, err + } + return value, nil +} + +// Has implements RawDB. +func (pdb *PrefixDB) Has(key []byte) (bool, error) { + if len(key) == 0 { + return false, store.ErrKeyEmpty + } + + ok, err := pdb.db.Has(pdb.prefixed(key)) + if err != nil { + return ok, err + } + + return ok, nil +} + +// Iterator implements RawDB. +func (pdb *PrefixDB) Iterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + + var pstart, pend []byte + pstart = append(cp(pdb.prefix), start...) + if end == nil { + pend = cpIncr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) + } + itr, err := pdb.db.Iterator(pstart, pend) + if err != nil { + return nil, err + } + + return newPrefixIterator(pdb.prefix, start, end, itr) +} + +// ReverseIterator implements RawDB. +func (pdb *PrefixDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, store.ErrKeyEmpty + } + + var pstart, pend []byte + pstart = append(cp(pdb.prefix), start...) + if end == nil { + pend = cpIncr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) + } + ritr, err := pdb.db.ReverseIterator(pstart, pend) + if err != nil { + return nil, err + } + + return newPrefixIterator(pdb.prefix, start, end, ritr) +} + +// NewBatch implements RawDB. +func (pdb *PrefixDB) NewBatch() store.RawBatch { + return newPrefixBatch(pdb.prefix, pdb.db.NewBatch()) +} + +// NewBatchWithSize implements RawDB. +func (pdb *PrefixDB) NewBatchWithSize(size int) store.RawBatch { + return newPrefixBatch(pdb.prefix, pdb.db.NewBatchWithSize(size)) +} + +// Close implements RawDB. +func (pdb *PrefixDB) Close() error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.Close() +} + +// Print implements RawDB. +func (pdb *PrefixDB) Print() error { + fmt.Printf("prefix: %X\n", pdb.prefix) + + itr, err := pdb.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +func (pdb *PrefixDB) prefixed(key []byte) []byte { + return append(cp(pdb.prefix), key...) +} + +// IteratePrefix is a convenience function for iterating over a key domain +// restricted by prefix. +func IteratePrefix(db store.RawDB, prefix []byte) (corestore.Iterator, error) { + var start, end []byte + if len(prefix) == 0 { + start = nil + end = nil + } else { + start = cp(prefix) + end = cpIncr(prefix) + } + itr, err := db.Iterator(start, end) + if err != nil { + return nil, err + } + return itr, nil +} + +// Strips prefix while iterating from Iterator. +type prefixDBIterator struct { + prefix []byte + start []byte + end []byte + source corestore.Iterator + valid bool + err error +} + +var _ corestore.Iterator = (*prefixDBIterator)(nil) + +func newPrefixIterator(prefix, start, end []byte, source corestore.Iterator) (*prefixDBIterator, error) { + pitrInvalid := &prefixDBIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: false, + } + + // Empty keys are not allowed, so if a key exists in the database that exactly matches the + // prefix we need to skip it. + if source.Valid() && bytes.Equal(source.Key(), prefix) { + source.Next() + } + + if !source.Valid() || !bytes.HasPrefix(source.Key(), prefix) { + return pitrInvalid, nil + } + + return &prefixDBIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: true, + }, nil +} + +// Domain implements Iterator. +func (itr *prefixDBIterator) Domain() (start []byte, end []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *prefixDBIterator) Valid() bool { + if !itr.valid || itr.err != nil || !itr.source.Valid() { + return false + } + + key := itr.source.Key() + if len(key) < len(itr.prefix) || !bytes.Equal(key[:len(itr.prefix)], itr.prefix) { + itr.err = fmt.Errorf("received invalid key from backend: %x (expected prefix %x)", + key, itr.prefix) + return false + } + + return true +} + +// Next implements Iterator. +func (itr *prefixDBIterator) Next() { + itr.assertIsValid() + itr.source.Next() + + if !itr.source.Valid() || !bytes.HasPrefix(itr.source.Key(), itr.prefix) { + itr.valid = false + } else if bytes.Equal(itr.source.Key(), itr.prefix) { + // Empty keys are not allowed, so if a key exists in the database that exactly matches the + // prefix we need to skip it. + itr.Next() + } +} + +// Next implements Iterator. +func (itr *prefixDBIterator) Key() []byte { + itr.assertIsValid() + key := itr.source.Key() + return key[len(itr.prefix):] // we have checked the key in Valid() +} + +// Value implements Iterator. +func (itr *prefixDBIterator) Value() []byte { + itr.assertIsValid() + return itr.source.Value() +} + +// Error implements Iterator. +func (itr *prefixDBIterator) Error() error { + if err := itr.source.Error(); err != nil { + return err + } + return itr.err +} + +// Close implements Iterator. +func (itr *prefixDBIterator) Close() error { + return itr.source.Close() +} + +func (itr *prefixDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} + +type prefixDBBatch struct { + prefix []byte + source store.RawBatch +} + +var _ store.RawBatch = (*prefixDBBatch)(nil) + +func newPrefixBatch(prefix []byte, source store.RawBatch) prefixDBBatch { + return prefixDBBatch{ + prefix: prefix, + source: source, + } +} + +// Set implements RawBatch. +func (pb prefixDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + if value == nil { + return store.ErrValueNil + } + pkey := append(cp(pb.prefix), key...) + return pb.source.Set(pkey, value) +} + +// Delete implements RawBatch. +func (pb prefixDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return store.ErrKeyEmpty + } + pkey := append(cp(pb.prefix), key...) + return pb.source.Delete(pkey) +} + +// Write implements RawBatch. +func (pb prefixDBBatch) Write() error { + return pb.source.Write() +} + +// WriteSync implements RawBatch. +func (pb prefixDBBatch) WriteSync() error { + return pb.source.WriteSync() +} + +// Close implements RawBatch. +func (pb prefixDBBatch) Close() error { + return pb.source.Close() +} + +// GetByteSize implements RawBatch +func (pb prefixDBBatch) GetByteSize() (int, error) { + if pb.source == nil { + return 0, store.ErrBatchClosed + } + return pb.source.GetByteSize() +} + +func cp(bz []byte) (ret []byte) { + ret = make([]byte, len(bz)) + copy(ret, bz) + return ret +} + +// Returns a slice of the same length (big endian) +// except incremented by one. +// Returns nil on overflow (e.g. if bz bytes are all 0xFF) +// CONTRACT: len(bz) > 0 +func cpIncr(bz []byte) (ret []byte) { + if len(bz) == 0 { + panic("cpIncr expects non-zero bz length") + } + ret = cp(bz) + for i := len(bz) - 1; i >= 0; i-- { + if ret[i] < byte(0xFF) { + ret[i]++ + return + } + ret[i] = byte(0x00) + if i == 0 { + // Overflow + return nil + } + } + return nil +} diff --git a/store/db/wrapper.go b/store/db/wrapper.go new file mode 100644 index 0000000000..da4b61a2de --- /dev/null +++ b/store/db/wrapper.go @@ -0,0 +1,39 @@ +package db + +import ( + idb "github.com/cosmos/iavl/db" + + "cosmossdk.io/store/v2" +) + +// Wrapper wraps a RawDB to implement iavl.DB which is used by iavl.Tree. +type Wrapper struct { + store.RawDB +} + +var _ idb.DB = (*Wrapper)(nil) + +// NewWrapper returns a new Wrapper. +func NewWrapper(db store.RawDB) *Wrapper { + return &Wrapper{RawDB: db} +} + +// Iterator implements iavl.DB. +func (db *Wrapper) Iterator(start, end []byte) (idb.Iterator, error) { + return db.RawDB.Iterator(start, end) +} + +// ReverseIterator implements iavl.DB. +func (db *Wrapper) ReverseIterator(start, end []byte) (idb.Iterator, error) { + return db.RawDB.ReverseIterator(start, end) +} + +// NewBatch implements iavl.DB. +func (db *Wrapper) NewBatch() idb.Batch { + return db.RawDB.NewBatch() +} + +// NewBatchWithSize implements iavl.DB. +func (db *Wrapper) NewBatchWithSize(size int) idb.Batch { + return db.RawDB.NewBatchWithSize(size) +} diff --git a/store/errors.go b/store/errors.go index d951eb7711..fe6fd38699 100644 --- a/store/errors.go +++ b/store/errors.go @@ -10,9 +10,6 @@ import ( const StoreCodespace = "store" var ( - // ErrInvalidProof is returned when a proof is invalid - ErrInvalidProof = errors.Register(StoreCodespace, 2, "invalid proof") - // ErrTxDecode is returned if we cannot parse a transaction ErrTxDecode = errors.Register(StoreCodespace, 3, "tx parse error") @@ -36,6 +33,12 @@ var ( ErrUnknownStoreKey = errors.Register(StoreCodespace, 10, "unknown store key") ErrKeyEmpty = errors.Register(StoreCodespace, 11, "key empty") ErrStartAfterEnd = errors.Register(StoreCodespace, 12, "start key after end key") + + // ErrBatchClosed is returned when a closed or written batch is used. + ErrBatchClosed = errors.Register(StoreCodespace, 13, "batch has been written or closed") + + // ErrValueNil is returned when attempting to set a nil value. + ErrValueNil = errors.Register(StoreCodespace, 14, "value nil") ) // ErrVersionPruned defines an error returned when a version queried is pruned diff --git a/store/go.mod b/store/go.mod index 508017a657..1a8231be52 100644 --- a/store/go.mod +++ b/store/go.mod @@ -9,14 +9,16 @@ require ( github.com/cockroachdb/errors v1.11.1 github.com/cockroachdb/pebble v1.0.0 github.com/cometbft/cometbft v0.38.5 - github.com/cosmos/cosmos-db v1.0.0 github.com/cosmos/gogoproto v1.4.11 - github.com/cosmos/iavl v1.0.0 + github.com/cosmos/iavl v1.0.0-beta.1.0.20240125174944-11ba4961dae9 github.com/cosmos/ics23/go v0.10.0 + github.com/google/btree v1.1.2 github.com/hashicorp/go-metrics v0.5.3 github.com/linxGnu/grocksdb v1.8.11 github.com/mattn/go-sqlite3 v1.14.20 + github.com/spf13/cast v1.6.0 github.com/stretchr/testify v1.8.4 + github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d golang.org/x/exp v0.0.0-20231226003508-02704c960a9b ) @@ -27,14 +29,14 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cosmos/cosmos-db v1.0.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect - github.com/emicklei/dot v1.6.0 // indirect + github.com/emicklei/dot v1.6.1 // indirect github.com/getsentry/sentry-go v0.25.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect @@ -54,8 +56,6 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/zerolog v1.31.0 // indirect github.com/sasha-s/go-deadlock v0.3.1 // indirect - github.com/spf13/cast v1.6.0 // indirect - github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect diff --git a/store/go.sum b/store/go.sum index a3b317e313..5483724844 100644 --- a/store/go.sum +++ b/store/go.sum @@ -44,8 +44,8 @@ github.com/cosmos/cosmos-db v1.0.0 h1:EVcQZ+qYag7W6uorBKFPvX6gRjw6Uq2hIh4hCWjuQ0 github.com/cosmos/cosmos-db v1.0.0/go.mod h1:iBvi1TtqaedwLdcrZVYRSSCb6eSy61NLj4UNmdIgs0U= github.com/cosmos/gogoproto v1.4.11 h1:LZcMHrx4FjUgrqQSWeaGC1v/TeuVFqSLa43CC6aWR2g= github.com/cosmos/gogoproto v1.4.11/go.mod h1:/g39Mh8m17X8Q/GDEs5zYTSNaNnInBSohtaxzQnYq1Y= -github.com/cosmos/iavl v1.0.0 h1:bw6t0Mv/mVCJvlMTOPHWLs5uUE3BRBfVWCRelOzl+so= -github.com/cosmos/iavl v1.0.0/go.mod h1:CmTGqMnRnucjxbjduneZXT+0vPgNElYvdefjX2q9tYc= +github.com/cosmos/iavl v1.0.0-beta.1.0.20240125174944-11ba4961dae9 h1:guolkG50C5Pfk/+iHXbRg20DhkDkDMHmJVXtzS67FME= +github.com/cosmos/iavl v1.0.0-beta.1.0.20240125174944-11ba4961dae9/go.mod h1:JDw0feJTylH9iDDzi8sWeJO0xrf3qajxebBMnWA6iz4= github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM= github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -57,8 +57,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= -github.com/emicklei/dot v1.6.0 h1:vUzuoVE8ipzS7QkES4UfxdpCwdU2U97m2Pb2tQCoYRY= -github.com/emicklei/dot v1.6.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/emicklei/dot v1.6.1 h1:ujpDlBkkwgWUY+qPId5IwapRW/xEoligRSYjioR6DFI= +github.com/emicklei/dot v1.6.1/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/store/commit_info.go b/store/proof/commit_info.go similarity index 99% rename from store/commit_info.go rename to store/proof/commit_info.go index a9845073a6..308bbdcc3b 100644 --- a/store/commit_info.go +++ b/store/proof/commit_info.go @@ -1,4 +1,4 @@ -package store +package proof import ( "bytes" diff --git a/store/commit_info_test.go b/store/proof/commit_info_test.go similarity index 99% rename from store/commit_info_test.go rename to store/proof/commit_info_test.go index f372a0ab21..a890ccc4be 100644 --- a/store/commit_info_test.go +++ b/store/proof/commit_info_test.go @@ -1,4 +1,4 @@ -package store +package proof import ( "testing" diff --git a/store/proof.go b/store/proof/proof.go similarity index 97% rename from store/proof.go rename to store/proof/proof.go index 58a46236d9..4ca6175b4b 100644 --- a/store/proof.go +++ b/store/proof/proof.go @@ -1,13 +1,17 @@ -package store +package proof import ( "crypto/sha256" ics23 "github.com/cosmos/ics23/go" + "cosmossdk.io/errors" errorsmod "cosmossdk.io/errors" ) +// ErrInvalidProof is returned when a proof is invalid +var ErrInvalidProof = errors.Register("store", 2, "invalid proof") + // Proof operation types const ( ProofOpIAVLCommitment = "ics23:iavl" diff --git a/store/proof_test.go b/store/proof/proof_test.go similarity index 99% rename from store/proof_test.go rename to store/proof/proof_test.go index 31cf0fbdf5..57f2525cab 100644 --- a/store/proof_test.go +++ b/store/proof/proof_test.go @@ -1,4 +1,4 @@ -package store +package proof import ( "fmt" diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index 73c611743f..1714599202 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -4,13 +4,13 @@ import ( "fmt" "testing" - dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/suite" "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) diff --git a/store/root/store.go b/store/root/store.go index 4495dc3946..370ba5db51 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/proof" "cosmossdk.io/store/v2/pruning" ) @@ -35,7 +36,7 @@ type Store struct { commitHeader *coreheader.Info // lastCommitInfo reflects the last version/hash that has been committed - lastCommitInfo *store.CommitInfo + lastCommitInfo *proof.CommitInfo // workingHash defines the current (yet to be committed) hash workingHash []byte @@ -127,7 +128,7 @@ func (s *Store) GetStateCommitment() store.Committer { // LastCommitID returns a CommitID based off of the latest internal CommitInfo. // If an internal CommitInfo is not set, a new one will be returned with only the // latest version set, which is based off of the SS view. -func (s *Store) LastCommitID() (store.CommitID, error) { +func (s *Store) LastCommitID() (proof.CommitID, error) { if s.lastCommitInfo != nil { return s.lastCommitInfo.CommitID(), nil } @@ -139,20 +140,20 @@ func (s *Store) LastCommitID() (store.CommitID, error) { // Ref: https://github.com/cosmos/cosmos-sdk/issues/17314 latestVersion, err := s.stateStore.GetLatestVersion() if err != nil { - return store.CommitID{}, err + return proof.CommitID{}, err } // sanity check: ensure integrity of latest version against SC scVersion, err := s.stateCommitment.GetLatestVersion() if err != nil { - return store.CommitID{}, err + return proof.CommitID{}, err } if scVersion != latestVersion { - return store.CommitID{}, fmt.Errorf("SC and SS version mismatch; got: %d, expected: %d", scVersion, latestVersion) + return proof.CommitID{}, fmt.Errorf("SC and SS version mismatch; got: %d, expected: %d", scVersion, latestVersion) } - return store.CommitID{Version: latestVersion}, nil + return proof.CommitID{Version: latestVersion}, nil } // GetLatestVersion returns the latest version based on the latest internal @@ -243,7 +244,7 @@ func (s *Store) loadVersion(v uint64) error { s.commitHeader = nil // set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1 - s.lastCommitInfo = &store.CommitInfo{Version: v} + s.lastCommitInfo = &proof.CommitInfo{Version: v} return nil } diff --git a/store/root/store_test.go b/store/root/store_test.go index bbdaceabb8..97569c6c98 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/suite" coreheader "cosmossdk.io/core/header" @@ -12,6 +11,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/pruning" "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 9711ab5383..8ed5d3d594 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -11,13 +11,13 @@ import ( "testing" "time" - db "github.com/cosmos/cosmos-db" protoio "github.com/cosmos/gogoproto/io" "github.com/stretchr/testify/require" errorsmod "cosmossdk.io/errors" "cosmossdk.io/log" "cosmossdk.io/store/v2" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) @@ -189,7 +189,7 @@ func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 { // The snapshot will complete when the returned closer is called. func setupBusyManager(t *testing.T) *snapshots.Manager { t.Helper() - store, err := snapshots.NewStore(db.NewMemDB(), t.TempDir()) + store, err := snapshots.NewStore(dbm.NewMemDB(), t.TempDir()) require.NoError(t, err) hung := newHungCommitSnapshotter() mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index af5b6eb1e1..da598f7a69 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -4,11 +4,11 @@ import ( "errors" "testing" - db "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "cosmossdk.io/log" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/snapshots/types" ) @@ -237,7 +237,7 @@ func TestManager_Restore(t *testing.T) { func TestManager_TakeError(t *testing.T) { snapshotter := &mockErrorCommitSnapshotter{} - store, err := snapshots.NewStore(db.NewMemDB(), GetTempDir(t)) + store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t)) require.NoError(t, err) manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) diff --git a/store/snapshots/store.go b/store/snapshots/store.go index 4d202cb2d5..ad2179ddba 100644 --- a/store/snapshots/store.go +++ b/store/snapshots/store.go @@ -11,7 +11,6 @@ import ( "strconv" "sync" - db "github.com/cosmos/cosmos-db" "github.com/cosmos/gogoproto/proto" "cosmossdk.io/errors" @@ -26,7 +25,7 @@ const ( // Store is a snapshot store, containing snapshot metadata and binary chunks. type Store struct { - db db.DB + db store.RawDB dir string mtx sync.Mutex @@ -34,7 +33,7 @@ type Store struct { } // NewStore creates a new snapshot store. -func NewStore(db db.DB, dir string) (*Store, error) { +func NewStore(db store.RawDB, dir string) (*Store, error) { if dir == "" { return nil, errors.Wrap(store.ErrLogic, "snapshot directory not given") } @@ -59,14 +58,20 @@ func (s *Store) Delete(height uint64, format uint32) error { return errors.Wrapf(store.ErrConflict, "snapshot for height %v format %v is currently being saved", height, format) } - err := s.db.DeleteSync(encodeKey(height, format)) - if err != nil { + b := s.db.NewBatch() + defer b.Close() + if err := b.Delete(encodeKey(height, format)); err != nil { + return errors.Wrapf(err, "failed to delete item in the batch") + } + if err := b.WriteSync(); err != nil { return errors.Wrapf(err, "failed to delete snapshot for height %v format %v", height, format) } - err = os.RemoveAll(s.pathSnapshot(height, format)) - return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v", - height, format) + if err := os.RemoveAll(s.pathSnapshot(height, format)); err != nil { + return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v", + height, format) + } + return nil } // Get fetches snapshot info from the database. @@ -327,8 +332,15 @@ func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { if err != nil { return errors.Wrap(err, "failed to encode snapshot metadata") } - err = s.db.SetSync(encodeKey(snapshot.Height, snapshot.Format), value) - return errors.Wrap(err, "failed to store snapshot") + b := s.db.NewBatch() + defer b.Close() + if err := b.Set(encodeKey(snapshot.Height, snapshot.Format), value); err != nil { + return errors.Wrap(err, "failed to set snapshot in batch") + } + if err := b.WriteSync(); err != nil { + return errors.Wrap(err, "failed to store snapshot") + } + return nil } // pathHeight generates the path to a height, containing multiple snapshot formats. diff --git a/store/snapshots/store_test.go b/store/snapshots/store_test.go index b202807cb6..07f4d4a6d5 100644 --- a/store/snapshots/store_test.go +++ b/store/snapshots/store_test.go @@ -7,17 +7,17 @@ import ( "testing" "time" - db "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + dbm "cosmossdk.io/store/v2/db" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/snapshots/types" ) func setupStore(t *testing.T) *snapshots.Store { t.Helper() - store, err := snapshots.NewStore(db.NewMemDB(), GetTempDir(t)) + store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t)) require.NoError(t, err) _, err = store.Save(1, 1, makeChunks([][]byte{ @@ -42,13 +42,13 @@ func setupStore(t *testing.T) *snapshots.Store { func TestNewStore(t *testing.T) { tempdir := GetTempDir(t) - _, err := snapshots.NewStore(db.NewMemDB(), tempdir) + _, err := snapshots.NewStore(dbm.NewMemDB(), tempdir) require.NoError(t, err) } func TestNewStore_ErrNoDir(t *testing.T) { - _, err := snapshots.NewStore(db.NewMemDB(), "") + _, err := snapshots.NewStore(dbm.NewMemDB(), "") require.Error(t, err) } diff --git a/store/storage/rocksdb/batch.go b/store/storage/rocksdb/batch.go index ad421104e2..e780b8059c 100644 --- a/store/storage/rocksdb/batch.go +++ b/store/storage/rocksdb/batch.go @@ -7,8 +7,12 @@ import ( "encoding/binary" "github.com/linxGnu/grocksdb" + + "cosmossdk.io/store/v2" ) +var _ store.Batch = (*Batch)(nil) + type Batch struct { version uint64 ts [TimestampSize]byte diff --git a/store/store.go b/store/store.go index 31d60e3a85..3e9365c437 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ import ( coreheader "cosmossdk.io/core/header" corestore "cosmossdk.io/core/store" "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/proof" ) // RootStore defines an abstraction layer containing a State Storage (SS) engine @@ -65,7 +66,7 @@ type RootStore interface { Commit(cs *Changeset) ([]byte, error) // LastCommitID returns a CommitID pertaining to the last commitment. - LastCommitID() (CommitID, error) + LastCommitID() (proof.CommitID, error) // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics) @@ -107,5 +108,5 @@ type QueryResult struct { Key []byte Value []byte Version uint64 - ProofOps []CommitmentOp + ProofOps []proof.CommitmentOp }