feat(store/v2): support rocksdb and pebbledb as RawDB (#19607)

Co-authored-by: cool-developer <51834436+cool-develope@users.noreply.github.com>
Co-authored-by: Marko <marbar3778@yahoo.com>
Co-authored-by: sontrinh16 <trinhleson2000@gmail.com>
This commit is contained in:
Aleksandr Bezobchuk 2024-03-19 10:34:14 -04:00 committed by GitHub
parent 65ab2530cc
commit a86c2a9980
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 813 additions and 5 deletions

33
store/db/db.go Normal file
View File

@ -0,0 +1,33 @@
package db
import (
"fmt"
"cosmossdk.io/store/v2"
)
type RawDBType string
const (
DBTypeGoLevelDB RawDBType = "goleveldb"
DBTypeRocksDB = "rocksdb"
DBTypePebbleDB = "pebbledb"
DBTypePrefixDB = "prefixdb"
DBFileSuffix string = ".db"
)
func NewRawDB(dbType RawDBType, name, dataDir string, opts store.DBOptions) (store.RawDB, error) {
switch dbType {
case DBTypeGoLevelDB:
return NewGoLevelDB(name, dataDir, opts)
case DBTypeRocksDB:
return NewRocksDB(name, dataDir)
case DBTypePebbleDB:
return NewPebbleDB(name, dataDir)
}
return nil, fmt.Errorf("unsupported db type: %s", dbType)
}

View File

@ -16,6 +16,10 @@ type DBTestSuite struct {
db store.RawDB
}
func (s *DBTestSuite) TearDownSuite() {
s.Require().NoError(s.db.Close())
}
func (s *DBTestSuite) TestDBOperations() {
// Set
b := s.db.NewBatch()
@ -93,6 +97,24 @@ func TestMemDBSuite(t *testing.T) {
})
}
func TestPebbleDBSuite(t *testing.T) {
db, err := NewPebbleDB("test", t.TempDir())
require.NoError(t, err)
suite.Run(t, &DBTestSuite{
db: db,
})
}
func TestRocksDBSuite(t *testing.T) {
db, err := NewRocksDB("test", t.TempDir())
require.NoError(t, err)
suite.Run(t, &DBTestSuite{
db: db,
})
}
func TestGoLevelDBSuite(t *testing.T) {
db, err := NewGoLevelDB("test", t.TempDir(), nil)
require.NoError(t, err)

View File

@ -19,18 +19,20 @@ import (
storeerrors "cosmossdk.io/store/v2/errors"
)
var _ store.RawDB = (*GoLevelDB)(nil)
// GoLevelDB implements RawDB using github.com/syndtr/goleveldb/leveldb.
// It is used for only store v2 migration, since some clients use goleveldb as the iavl v0/v1 backend.
// It is used for only store v2 migration, since some clients use goleveldb as
// the IAVL v0/v1 backend.
type GoLevelDB struct {
db *leveldb.DB
}
var _ store.RawDB = (*GoLevelDB)(nil)
func NewGoLevelDB(name, dir string, opts store.DBOptions) (*GoLevelDB, error) {
defaultOpts := &opt.Options{
Filter: filter.NewBloomFilter(10), // by default, goleveldb doesn't use a bloom filter.
}
if opts != nil {
files := cast.ToInt(opts.Get("maxopenfiles"))
if files > 0 {
@ -42,7 +44,7 @@ func NewGoLevelDB(name, dir string, opts store.DBOptions) (*GoLevelDB, error) {
}
func NewGoLevelDBWithOpts(name, dir string, o *opt.Options) (*GoLevelDB, error) {
dbPath := filepath.Join(dir, name+".db")
dbPath := filepath.Join(dir, name+DBFileSuffix)
db, err := leveldb.OpenFile(dbPath, o)
if err != nil {
return nil, err

285
store/db/pebbledb.go Normal file
View File

@ -0,0 +1,285 @@
package db
import (
"bytes"
"errors"
"fmt"
"path/filepath"
"slices"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
storeerrors "cosmossdk.io/store/v2/errors"
"github.com/cockroachdb/pebble"
"github.com/spf13/cast"
)
var _ store.RawDB = (*PebbleDB)(nil)
// PebbleDB implements RawDB using PebbleDB as the underlying storage engine.
// It is used for only store v2 migration, since some clients use PebbleDB as
// the IAVL v0/v1 backend.
type PebbleDB struct {
storage *pebble.DB
}
func NewPebbleDB(name, dataDir string) (*PebbleDB, error) {
return NewPebbleDBWithOpts(name, dataDir, nil)
}
func NewPebbleDBWithOpts(name, dataDir string, opts store.DBOptions) (*PebbleDB, error) {
do := &pebble.Options{
MaxConcurrentCompactions: func() int { return 3 }, // default 1
}
do.EnsureDefaults()
if opts != nil {
files := cast.ToInt(opts.Get("maxopenfiles"))
if files > 0 {
do.MaxOpenFiles = files
}
}
dbPath := filepath.Join(dataDir, name+DBFileSuffix)
db, err := pebble.Open(dbPath, do)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}
return &PebbleDB{storage: db}, nil
}
func (db *PebbleDB) Close() error {
err := db.storage.Close()
db.storage = nil
return err
}
func (db *PebbleDB) Get(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, storeerrors.ErrKeyEmpty
}
bz, closer, err := db.storage.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return nil, nil
}
return nil, fmt.Errorf("failed to perform PebbleDB read: %w", err)
}
if len(bz) == 0 {
return nil, closer.Close()
}
return bz, closer.Close()
}
func (db *PebbleDB) Has(key []byte) (bool, error) {
bz, err := db.Get(key)
if err != nil {
return false, err
}
return bz != nil, nil
}
func (db *PebbleDB) Iterator(start, end []byte) (corestore.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, storeerrors.ErrKeyEmpty
}
itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: start, UpperBound: end})
if err != nil {
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
}
return newPebbleDBIterator(itr, start, end, false), nil
}
func (db *PebbleDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, storeerrors.ErrKeyEmpty
}
itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: start, UpperBound: end})
if err != nil {
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
}
return newPebbleDBIterator(itr, start, end, true), nil
}
func (db *PebbleDB) NewBatch() store.RawBatch {
return &pebbleDBBatch{
db: db,
batch: db.storage.NewBatch(),
}
}
func (db *PebbleDB) NewBatchWithSize(size int) store.RawBatch {
return &pebbleDBBatch{
db: db,
batch: db.storage.NewBatchWithSize(size),
}
}
var _ corestore.Iterator = (*pebbleDBIterator)(nil)
type pebbleDBIterator struct {
source *pebble.Iterator
start []byte
end []byte
valid bool
reverse bool
}
func newPebbleDBIterator(src *pebble.Iterator, start, end []byte, reverse bool) *pebbleDBIterator {
// move the underlying PebbleDB cursor to the first key
var valid bool
if reverse {
if end == nil {
valid = src.Last()
} else {
valid = src.SeekLT(end)
}
} else {
valid = src.First()
}
return &pebbleDBIterator{
source: src,
start: start,
end: end,
valid: valid,
reverse: reverse,
}
}
func (itr *pebbleDBIterator) Domain() (start, end []byte) {
return itr.start, itr.end
}
func (itr *pebbleDBIterator) Valid() bool {
// once invalid, forever invalid
if !itr.valid || !itr.source.Valid() {
itr.valid = false
return itr.valid
}
// if source has error, consider it invalid
if err := itr.source.Error(); err != nil {
itr.valid = false
return itr.valid
}
// if key is at the end or past it, consider it invalid
if end := itr.end; end != nil {
if bytes.Compare(end, itr.Key()) <= 0 {
itr.valid = false
return itr.valid
}
}
return true
}
func (itr *pebbleDBIterator) Key() []byte {
itr.assertIsValid()
return slices.Clone(itr.source.Key())
}
func (itr *pebbleDBIterator) Value() []byte {
itr.assertIsValid()
return slices.Clone(itr.source.Value())
}
func (itr *pebbleDBIterator) Next() {
itr.assertIsValid()
if itr.reverse {
itr.valid = itr.source.Prev()
} else {
itr.valid = itr.source.Next()
}
}
func (itr *pebbleDBIterator) Error() error {
return itr.source.Error()
}
func (itr *pebbleDBIterator) Close() error {
err := itr.source.Close()
itr.source = nil
itr.valid = false
return err
}
func (itr *pebbleDBIterator) assertIsValid() {
if !itr.valid {
panic("pebbleDB iterator is invalid")
}
}
var _ store.RawBatch = (*pebbleDBBatch)(nil)
type pebbleDBBatch struct {
db *PebbleDB
batch *pebble.Batch
}
func (b *pebbleDBBatch) Set(key, value []byte) error {
if len(key) == 0 {
return storeerrors.ErrKeyEmpty
}
if value == nil {
return storeerrors.ErrValueNil
}
if b.batch == nil {
return storeerrors.ErrBatchClosed
}
return b.batch.Set(key, value, nil)
}
func (b *pebbleDBBatch) Delete(key []byte) error {
if len(key) == 0 {
return storeerrors.ErrKeyEmpty
}
if b.batch == nil {
return storeerrors.ErrBatchClosed
}
return b.batch.Delete(key, nil)
}
func (b *pebbleDBBatch) Write() error {
err := b.batch.Commit(&pebble.WriteOptions{Sync: false})
if err != nil {
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
}
return nil
}
func (b *pebbleDBBatch) WriteSync() error {
err := b.batch.Commit(&pebble.WriteOptions{Sync: true})
if err != nil {
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
}
return nil
}
func (b *pebbleDBBatch) Close() error {
return b.batch.Close()
}
func (b *pebbleDBBatch) GetByteSize() (int, error) {
return b.batch.Len(), nil
}

329
store/db/rocksdb.go Normal file
View File

@ -0,0 +1,329 @@
//go:build rocksdb
// +build rocksdb
package db
import (
"bytes"
"fmt"
"path/filepath"
"runtime"
"slices"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
storeerrors "cosmossdk.io/store/v2/errors"
"github.com/linxGnu/grocksdb"
)
var (
_ store.RawDB = (*RocksDB)(nil)
defaultReadOpts = grocksdb.NewDefaultReadOptions()
)
// RocksDB implements RawDB using RocksDB as the underlying storage engine.
// It is used for only store v2 migration, since some clients use RocksDB as
// the IAVL v0/v1 backend.
type RocksDB struct {
storage *grocksdb.DB
}
// defaultRocksdbOptions, good enough for most cases, including heavy workloads.
// 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads).
// compression: snappy as default, need to -lsnappy to enable.
func defaultRocksdbOptions() *grocksdb.Options {
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(grocksdb.NewLRUCache(1 << 30))
bbto.SetFilterPolicy(grocksdb.NewBloomFilter(10))
rocksdbOpts := grocksdb.NewDefaultOptions()
rocksdbOpts.SetBlockBasedTableFactory(bbto)
// SetMaxOpenFiles to 4096 seems to provide a reliable performance boost
rocksdbOpts.SetMaxOpenFiles(4096)
rocksdbOpts.SetCreateIfMissing(true)
rocksdbOpts.IncreaseParallelism(runtime.NumCPU())
// 1.5GB maximum memory use for writebuffer.
rocksdbOpts.OptimizeLevelStyleCompaction(512 * 1024 * 1024)
return rocksdbOpts
}
func NewRocksDB(name, dataDir string) (*RocksDB, error) {
opts := defaultRocksdbOptions()
opts.SetCreateIfMissing(true)
return NewRocksDBWithOpts(name, dataDir, opts)
}
func NewRocksDBWithOpts(name, dataDir string, opts *grocksdb.Options) (*RocksDB, error) {
dbPath := filepath.Join(dataDir, name+DBFileSuffix)
storage, err := grocksdb.OpenDb(opts, dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open RocksDB: %w", err)
}
return &RocksDB{
storage: storage,
}, nil
}
func (db *RocksDB) Close() error {
db.storage.Close()
db.storage = nil
return nil
}
func (db *RocksDB) Get(key []byte) ([]byte, error) {
bz, err := db.storage.GetBytes(defaultReadOpts, key)
if err != nil {
return nil, err
}
return bz, nil
}
func (db *RocksDB) Has(key []byte) (bool, error) {
bz, err := db.Get(key)
if err != nil {
return false, err
}
return bz != nil, nil
}
func (db *RocksDB) Iterator(start, end []byte) (corestore.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, storeerrors.ErrKeyEmpty
}
itr := db.storage.NewIterator(defaultReadOpts)
return newRocksDBIterator(itr, start, end, false), nil
}
func (db *RocksDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, storeerrors.ErrKeyEmpty
}
itr := db.storage.NewIterator(defaultReadOpts)
return newRocksDBIterator(itr, start, end, true), nil
}
func (db *RocksDB) NewBatch() store.RawBatch {
return &rocksDBBatch{
db: db,
batch: grocksdb.NewWriteBatch(),
}
}
func (db *RocksDB) NewBatchWithSize(_ int) store.RawBatch {
return db.NewBatch()
}
var _ corestore.Iterator = (*rocksDBIterator)(nil)
type rocksDBIterator struct {
source *grocksdb.Iterator
start []byte
end []byte
valid bool
reverse bool
}
func newRocksDBIterator(src *grocksdb.Iterator, start, end []byte, reverse bool) *rocksDBIterator {
if reverse {
if end == nil {
src.SeekToLast()
} else {
src.Seek(end)
if src.Valid() {
eoaKey := readOnlySlice(src.Key()) // end or after key
if bytes.Compare(end, eoaKey) <= 0 {
src.Prev()
}
} else {
src.SeekToLast()
}
}
} else {
if start == nil {
src.SeekToFirst()
} else {
src.Seek(start)
}
}
return &rocksDBIterator{
source: src,
start: start,
end: end,
reverse: reverse,
valid: src.Valid(),
}
}
func (itr *rocksDBIterator) Domain() (start, end []byte) {
return itr.start, itr.end
}
func (itr *rocksDBIterator) Valid() bool {
// once invalid, forever invalid
if !itr.valid {
return false
}
// if source has error, consider it invalid
if err := itr.source.Err(); err != nil {
itr.valid = false
return false
}
// if source is invalid, consider it invalid
if !itr.source.Valid() {
itr.valid = false
return false
}
// if key is at the end or past it, consider it invalid
start := itr.start
end := itr.end
key := readOnlySlice(itr.source.Key())
if itr.reverse {
if start != nil && bytes.Compare(key, start) < 0 {
itr.valid = false
return false
}
} else {
if end != nil && bytes.Compare(end, key) <= 0 {
itr.valid = false
return false
}
}
return true
}
func (itr *rocksDBIterator) Key() []byte {
itr.assertIsValid()
return copyAndFreeSlice(itr.source.Key())
}
func (itr *rocksDBIterator) Value() []byte {
itr.assertIsValid()
return copyAndFreeSlice(itr.source.Value())
}
func (itr *rocksDBIterator) Next() {
if !itr.valid {
return
}
if itr.reverse {
itr.source.Prev()
} else {
itr.source.Next()
}
}
func (itr *rocksDBIterator) Error() error {
return itr.source.Err()
}
func (itr *rocksDBIterator) Close() error {
itr.source.Close()
itr.source = nil
itr.valid = false
return nil
}
func (itr *rocksDBIterator) assertIsValid() {
if !itr.valid {
panic("rocksDB iterator is invalid")
}
}
type rocksDBBatch struct {
db *RocksDB
batch *grocksdb.WriteBatch
}
func (b *rocksDBBatch) Set(key, value []byte) error {
if len(key) == 0 {
return storeerrors.ErrKeyEmpty
}
if value == nil {
return storeerrors.ErrValueNil
}
if b.batch == nil {
return storeerrors.ErrBatchClosed
}
b.batch.Put(key, value)
return nil
}
func (b *rocksDBBatch) Delete(key []byte) error {
if len(key) == 0 {
return storeerrors.ErrKeyEmpty
}
if b.batch == nil {
return storeerrors.ErrBatchClosed
}
b.batch.Delete(key)
return nil
}
func (b *rocksDBBatch) Write() error {
writeOpts := grocksdb.NewDefaultWriteOptions()
writeOpts.SetSync(false)
if err := b.db.storage.Write(writeOpts, b.batch); err != nil {
return fmt.Errorf("failed to write RocksDB batch: %w", err)
}
return nil
}
func (b *rocksDBBatch) WriteSync() error {
writeOpts := grocksdb.NewDefaultWriteOptions()
writeOpts.SetSync(true)
if err := b.db.storage.Write(writeOpts, b.batch); err != nil {
return fmt.Errorf("failed to write RocksDB batch: %w", err)
}
return nil
}
func (b *rocksDBBatch) Close() error {
b.batch.Destroy()
return nil
}
func (b *rocksDBBatch) GetByteSize() (int, error) {
return len(b.batch.Data()), nil
}
func readOnlySlice(s *grocksdb.Slice) []byte {
if !s.Exists() {
return nil
}
return s.Data()
}
// copyAndFreeSlice will copy a given RocksDB slice and free it. If the slice
// does not exist, <nil> will be returned.
func copyAndFreeSlice(s *grocksdb.Slice) []byte {
defer s.Free()
if !s.Exists() {
return nil
}
return slices.Clone(s.Data())
}

135
store/db/rocksdb_noflag.go Normal file
View File

@ -0,0 +1,135 @@
//go:build !rocksdb
// +build !rocksdb
package db
import (
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"github.com/linxGnu/grocksdb"
)
var (
_ store.RawDB = (*RocksDB)(nil)
)
// RocksDB implements RawDB using RocksDB as the underlying storage engine.
// It is used for only store v2 migration, since some clients use RocksDB as
// the IAVL v0/v1 backend.
type RocksDB struct {
}
func NewRocksDB(dataDir string) (*RocksDB, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func NewRocksDBWithOpts(dataDir string, opts store.DBOptions) (*RocksDB, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) Close() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) Get(key []byte) ([]byte, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) Has(key []byte) (bool, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) Iterator(start, end []byte) (corestore.Iterator, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) ReverseIterator(start, end []byte) (corestore.Iterator, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) NewBatch() store.RawBatch {
panic("rocksdb must be built with -tags rocksdb")
}
func (db *RocksDB) NewBatchWithSize(_ int) store.RawBatch {
return db.NewBatch()
}
var _ corestore.Iterator = (*rocksDBIterator)(nil)
type rocksDBIterator struct {
}
func newRocksDBIterator(src *grocksdb.Iterator, start, end []byte, reverse bool) *rocksDBIterator {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Domain() (start, end []byte) {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Valid() bool {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Key() []byte {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Value() []byte {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Next() {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Error() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) Close() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (itr *rocksDBIterator) assertIsValid() {
panic("rocksdb must be built with -tags rocksdb")
}
type rocksDBBatch struct {
}
func (b *rocksDBBatch) Set(key, value []byte) error {
panic("rocksdb must be built with -tags rocksdb")
}
func (b *rocksDBBatch) Delete(key []byte) error {
panic("rocksdb must be built with -tags rocksdb")
}
func (b *rocksDBBatch) Write() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (b *rocksDBBatch) WriteSync() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (b *rocksDBBatch) Close() error {
panic("rocksdb must be built with -tags rocksdb")
}
func (b *rocksDBBatch) GetByteSize() (int, error) {
panic("rocksdb must be built with -tags rocksdb")
}
func readOnlySlice(s *grocksdb.Slice) []byte {
panic("rocksdb must be built with -tags rocksdb")
}
// copyAndFreeSlice will copy a given RocksDB slice and free it. If the slice
// does not exist, <nil> will be returned.
func copyAndFreeSlice(s *grocksdb.Slice) []byte {
panic("rocksdb must be built with -tags rocksdb")
}

View File

@ -10,7 +10,7 @@ require (
github.com/cockroachdb/pebble v1.1.0
github.com/cometbft/cometbft v0.38.5
github.com/cosmos/gogoproto v1.4.11
github.com/cosmos/iavl v1.1.0
github.com/cosmos/iavl v1.1.1
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3

View File

@ -48,6 +48,8 @@ github.com/cosmos/gogoproto v1.4.11 h1:LZcMHrx4FjUgrqQSWeaGC1v/TeuVFqSLa43CC6aWR
github.com/cosmos/gogoproto v1.4.11/go.mod h1:/g39Mh8m17X8Q/GDEs5zYTSNaNnInBSohtaxzQnYq1Y=
github.com/cosmos/iavl v1.1.0 h1:YRdPLKCPoJQDXoZu0BToQ6PADswLkhNzyt7DZLgK0xY=
github.com/cosmos/iavl v1.1.0/go.mod h1:jLeUvm6bGT1YutCaL2fIar/8vGUE8cPZvh/gXEWDaDM=
github.com/cosmos/iavl v1.1.1 h1:64nTi8s3gEoGqhA8TyAWFWfz7/pg0anKzHNSc1ETc7Q=
github.com/cosmos/iavl v1.1.1/go.mod h1:jLeUvm6bGT1YutCaL2fIar/8vGUE8cPZvh/gXEWDaDM=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=