feat(store/v2): snapshot manager (#18458)

This commit is contained in:
cool-developer 2023-12-07 16:50:40 -05:00 committed by GitHub
parent 14bb52ad92
commit f6df368d8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 776 additions and 260 deletions

View File

@ -4,8 +4,9 @@ package store
// track writes. Deletion can be denoted by a nil value or explicitly by the
// Delete field.
type KVPair struct {
Key []byte
Value []byte
Key []byte
Value []byte
StoreKey string // Optional for snapshot restore
}
type KVPairs []KVPair

View File

@ -0,0 +1,40 @@
package iavl
import (
"errors"
"github.com/cosmos/iavl"
"cosmossdk.io/store/v2/commitment"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
// Exporter is a wrapper around iavl.Exporter.
type Exporter struct {
exporter *iavl.Exporter
}
// Next returns the next item in the exporter.
func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) {
item, err := e.exporter.Next()
if err != nil {
if errors.Is(err, iavl.ErrorExportDone) {
return nil, commitment.ErrorExportDone
}
return nil, err
}
return &snapshotstypes.SnapshotIAVLItem{
Key: item.Key,
Value: item.Value,
Version: item.Version,
Height: int32(item.Height),
}, nil
}
// Close closes the exporter.
func (e *Exporter) Close() error {
e.exporter.Close()
return nil
}

View File

@ -0,0 +1,34 @@
package iavl
import (
"github.com/cosmos/iavl"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
// Importer is a wrapper around iavl.Importer.
type Importer struct {
importer *iavl.Importer
}
// Add adds the given item to the importer.
func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error {
return i.importer.Add(&iavl.ExportNode{
Key: item.Key,
Value: item.Value,
Version: item.Version,
Height: int8(item.Height),
})
}
// Commit commits the importer.
func (i *Importer) Commit() error {
return i.importer.Commit()
}
// Close closes the importer.
func (i *Importer) Close() error {
i.importer.Close()
return nil
}

View File

@ -77,6 +77,34 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}
// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
if err != nil {
return nil, err
}
exporter, err := tree.Export()
if err != nil {
return nil, err
}
return &Exporter{
exporter: exporter,
}, nil
}
// Import imports the tree importer at the given version.
func (t *IavlTree) Import(version uint64) (commitment.Importer, error) {
importer, err := t.tree.Import(int64(version))
if err != nil {
return nil, err
}
return &Importer{
importer: importer,
}, nil
}
// Close closes the iavl tree.
func (t *IavlTree) Close() error {
return nil

View File

@ -5,11 +5,29 @@ import (
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"cosmossdk.io/log"
"cosmossdk.io/store/v2/commitment"
)
func generateTree(treeType string) *IavlTree {
func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db dbm.DB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, logger)
},
}
suite.Run(t, s)
}
func generateTree() *IavlTree {
cfg := DefaultConfig()
db := dbm.NewMemDB()
return NewIavlTree(db, log.NewNopLogger(), cfg)
@ -17,7 +35,7 @@ func generateTree(treeType string) *IavlTree {
func TestIavlTree(t *testing.T) {
// generate a new tree
tree := generateTree("iavl")
tree := generateTree()
require.NotNil(t, tree)
initVersion := tree.GetLatestVersion()

View File

@ -3,14 +3,22 @@ package commitment
import (
"errors"
"fmt"
"io"
"math"
protoio "github.com/cosmos/gogoproto/io"
ics23 "github.com/cosmos/ics23/go"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
var _ store.Committer = (*CommitStore)(nil)
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
)
// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
// key. Each store key reflects dedicated and unique usage within a module. A caller
@ -127,6 +135,146 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
return ferr
}
// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
return fmt.Errorf("the snapshot version must be greater than 0")
}
latestVersion, err := c.GetLatestVersion()
if err != nil {
return err
}
if version > latestVersion {
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion)
}
for storeKey, tree := range c.multiTrees {
// TODO: check the parallelism of this loop
if err := func() error {
exporter, err := tree.Export(version)
if err != nil {
return fmt.Errorf("failed to export tree for version %d: %w", version, err)
}
defer exporter.Close()
err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_Store{
Store: &snapshotstypes.SnapshotStoreItem{
Name: storeKey,
},
},
})
if err != nil {
return fmt.Errorf("failed to write store name: %w", err)
}
for {
item, err := exporter.Next()
if errors.Is(err, ErrorExportDone) {
break
} else if err != nil {
return fmt.Errorf("failed to get the next export node: %w", err)
}
if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_IAVL{
IAVL: item,
},
}); err != nil {
return fmt.Errorf("failed to write iavl node: %w", err)
}
}
return nil
}(); err != nil {
return err
}
}
return nil
}
// Restore implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) {
var (
importer Importer
snapshotItem snapshotstypes.SnapshotItem
storeKey string
)
loop:
for {
snapshotItem = snapshotstypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err)
}
switch item := snapshotItem.Item.(type) {
case *snapshotstypes.SnapshotItem_Store:
if importer != nil {
if err := importer.Commit(); err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err)
}
importer.Close()
}
storeKey = item.Store.Name
tree := c.multiTrees[storeKey]
if tree == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey)
}
importer, err = tree.Import(version)
if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err)
}
defer importer.Close()
case *snapshotstypes.SnapshotItem_IAVL:
if importer == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item")
}
node := item.IAVL
if node.Height > int32(math.MaxInt8) {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 {
if node.Value == nil {
node.Value = []byte{}
}
// If the node is a leaf node, it will be written to the storage.
chStorage <- &store.KVPair{
Key: node.Key,
Value: node.Value,
StoreKey: storeKey,
}
}
err := importer.Add(node)
if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err)
}
default:
break loop
}
}
if importer != nil {
if err := importer.Commit(); err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err)
}
}
return snapshotItem, c.LoadVersion(version)
}
func (c *CommitStore) Close() (ferr error) {
for _, tree := range c.multiTrees {
if err := tree.Close(); err != nil {

View File

@ -0,0 +1,121 @@
package commitment
import (
"fmt"
"io"
"sync"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/suite"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
const (
storeKey1 = "store1"
storeKey2 = "store2"
)
// CommitStoreTestSuite is a test suite to be used for all tree backends.
type CommitStoreTestSuite struct {
suite.Suite
NewStore func(db dbm.DB, storeKeys []string, logger log.Logger) (*CommitStore, error)
}
func (s *CommitStoreTestSuite) TestSnapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)
latestVersion := uint64(10)
kvCount := 10
for i := uint64(1); i <= latestVersion; i++ {
kvPairs := make(map[string]store.KVPairs)
for _, storeKey := range storeKeys {
kvPairs[storeKey] = store.KVPairs{}
for j := 0; j < kvCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := []byte(fmt.Sprintf("value-%d-%d", i, j))
kvPairs[storeKey] = append(kvPairs[storeKey], store.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(store.NewChangeset(kvPairs)))
_, err = commitStore.Commit()
s.Require().NoError(err)
}
latestStoreInfos := commitStore.WorkingStoreInfos(latestVersion)
s.Require().Equal(len(storeKeys), len(latestStoreInfos))
// create a snapshot
dummyExtensionItem := snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_Extension{
Extension: &snapshotstypes.SnapshotExtensionMeta{
Name: "test",
Format: 1,
},
},
}
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)
chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
s.Require().NotNil(streamWriter)
defer streamWriter.Close()
err := commitStore.Snapshot(latestVersion, streamWriter)
s.Require().NoError(err)
// write an extension metadata
err = streamWriter.WriteMsg(&dummyExtensionItem)
s.Require().NoError(err)
}()
streamReader, err := snapshots.NewStreamReader(chunks)
s.Require().NoError(err)
chStorage := make(chan *store.KVPair, 100)
leaves := make(map[string]string)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for kv := range chStorage {
leaves[fmt.Sprintf("%s_%s", kv.StoreKey, kv.Key)] = string(kv.Value)
}
wg.Done()
}()
nextItem, err := targetStore.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage)
s.Require().NoError(err)
s.Require().Equal(*dummyExtensionItem.GetExtension(), *nextItem.GetExtension())
close(chStorage)
wg.Wait()
s.Require().Equal(len(storeKeys)*kvCount*int(latestVersion), len(leaves))
for _, storeKey := range storeKeys {
for i := 1; i <= int(latestVersion); i++ {
for j := 0; j < kvCount; j++ {
key := fmt.Sprintf("%s_key-%d-%d", storeKey, i, j)
s.Require().Equal(leaves[key], fmt.Sprintf("value-%d-%d", i, j))
}
}
}
// check the restored tree hash
targetStoreInfos := targetStore.WorkingStoreInfos(latestVersion)
s.Require().Equal(len(storeKeys), len(targetStoreInfos))
for _, storeInfo := range targetStoreInfos {
matched := false
for _, latestStoreInfo := range latestStoreInfos {
if storeInfo.Name == latestStoreInfo.Name {
s.Require().Equal(latestStoreInfo.GetHash(), storeInfo.GetHash())
matched = true
}
}
s.Require().True(matched)
}
}

View File

@ -1,11 +1,17 @@
package commitment
import (
"errors"
"io"
ics23 "github.com/cosmos/ics23/go"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
// ErrorExportDone is returned by Exporter.Next() when all items have been exported.
var ErrorExportDone = errors.New("export is complete")
// Tree is the interface that wraps the basic Tree methods.
type Tree interface {
Set(key, value []byte) error
@ -16,6 +22,23 @@ type Tree interface {
Commit() ([]byte, error)
GetProof(version uint64, key []byte) (*ics23.CommitmentProof, error)
Prune(version uint64) error
Export(version uint64) (Exporter, error)
Import(version uint64) (Importer, error)
io.Closer
}
// Exporter is the interface that wraps the basic Export methods.
type Exporter interface {
Next() (*snapshotstypes.SnapshotIAVLItem, error)
io.Closer
}
// Importer is the interface that wraps the basic Import methods.
type Importer interface {
Add(*snapshotstypes.SnapshotIAVLItem) error
Commit() error
io.Closer
}

View File

@ -8,6 +8,7 @@ import (
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/branch"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
@ -25,7 +26,8 @@ func TestStorageTestSuite(t *testing.T) {
}
func (s *StoreTestSuite) SetupTest() {
storage, err := sqlite.New(s.T().TempDir())
sqliteDB, err := sqlite.New(s.T().TempDir())
ss := storage.NewStorageStore(sqliteDB)
s.Require().NoError(err)
cs := store.NewChangeset(map[string]store.KVPairs{storeKey: {}})
@ -36,12 +38,12 @@ func (s *StoreTestSuite) SetupTest() {
cs.AddKVPair(storeKey, store.KVPair{Key: []byte(key), Value: []byte(val)})
}
s.Require().NoError(storage.ApplyChangeset(1, cs))
s.Require().NoError(ss.ApplyChangeset(1, cs))
kvStore, err := branch.New(storeKey, storage)
kvStore, err := branch.New(storeKey, ss)
s.Require().NoError(err)
s.storage = storage
s.storage = ss
s.kvStore = kvStore
}

View File

@ -11,6 +11,7 @@ import (
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
@ -34,8 +35,9 @@ func (s *PruningTestSuite) SetupTest() {
logger = log.NewTestLogger(s.T())
}
ss, err := sqlite.New(s.T().TempDir())
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
ss := storage.NewStorageStore(sqliteDB)
tree := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), iavl.DefaultConfig())
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, logger)
@ -58,7 +60,7 @@ func (s *PruningTestSuite) TestPruning() {
latestVersion := uint64(100)
// write 10 batches
// write batches
for i := uint64(0); i < latestVersion; i++ {
version := i + 1

View File

@ -12,6 +12,7 @@ import (
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/pruning"
"cosmossdk.io/store/v2/storage/sqlite"
)
@ -29,11 +30,12 @@ func TestStorageTestSuite(t *testing.T) {
func (s *RootStoreTestSuite) SetupTest() {
noopLog := log.NewNopLogger()
ss, err := sqlite.New(s.T().TempDir())
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
ss := storage.NewStorageStore(sqliteDB)
tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, noopLog)
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{defaultStoreKey: tree}, noopLog)
s.Require().NoError(err)
rs, err := New(noopLog, ss, sc, pruning.DefaultOptions(), pruning.DefaultOptions(), nil)

View File

@ -6,7 +6,7 @@ import (
"cosmossdk.io/errors"
"cosmossdk.io/store/v2"
snapshottypes "cosmossdk.io/store/v2/snapshots/types"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
// ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a
@ -169,15 +169,15 @@ func DrainChunks(chunks <-chan io.ReadCloser) {
// ValidRestoreHeight will check height is valid for snapshot restore or not
func ValidRestoreHeight(format uint32, height uint64) error {
if format != snapshottypes.CurrentFormat {
return errors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format)
if format != snapshotstypes.CurrentFormat {
return errors.Wrapf(snapshotstypes.ErrUnknownFormat, "format %v", format)
}
if height == 0 {
return errors.Wrap(store.ErrLogic, "cannot restore snapshot at height 0")
}
if height > uint64(math.MaxInt64) {
return errors.Wrapf(snapshottypes.ErrInvalidMetadata,
return errors.Wrapf(snapshotstypes.ErrInvalidMetadata,
"snapshot height %v cannot exceed %v", height, int64(math.MaxInt64))
}

View File

@ -17,8 +17,9 @@ import (
errorsmod "cosmossdk.io/errors"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
snapshottypes "cosmossdk.io/store/v2/snapshots/types"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
func checksums(slice [][]byte) [][]byte {
@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
}
// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte {
func snapshotItems(items [][]byte, ext snapshots.ExtensionSnapshotter) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)
@ -74,19 +75,19 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
_ = snapshottypes.WriteExtensionPayload(protoWriter, item)
_ = snapshotstypes.WriteExtensionPayload(protoWriter, item)
}
// write extension metadata
_ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
_ = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_Extension{
Extension: &snapshotstypes.SnapshotExtensionMeta{
Name: ext.SnapshotName(),
Format: ext.SnapshotFormat(),
},
},
})
_ = ext.SnapshotExtension(0, func(payload []byte) error {
return snapshottypes.WriteExtensionPayload(protoWriter, payload)
return snapshotstypes.WriteExtensionPayload(protoWriter, payload)
})
_ = protoWriter.Close()
_ = bufWriter.Flush()
@ -105,23 +106,21 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b
return chunks
}
type mockSnapshotter struct {
items [][]byte
prunedHeights map[int64]struct{}
snapshotInterval uint64
type mockCommitSnapshotter struct {
items [][]byte
}
func (m *mockSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
func (m *mockCommitSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair,
) (snapshotstypes.SnapshotItem, error) {
if format == 0 {
return snapshottypes.SnapshotItem{}, snapshottypes.ErrUnknownFormat
return snapshotstypes.SnapshotItem{}, snapshotstypes.ErrUnknownFormat
}
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
return snapshotstypes.SnapshotItem{}, errors.New("already has contents")
}
var item snapshottypes.SnapshotItem
var item snapshotstypes.SnapshotItem
m.items = [][]byte{}
for {
item.Reset()
@ -129,7 +128,7 @@ func (m *mockSnapshotter) Restore(
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message")
return snapshotstypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
@ -141,65 +140,49 @@ func (m *mockSnapshotter) Restore(
return item, nil
}
func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
func (m *mockCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil {
if err := snapshotstypes.WriteExtensionPayload(protoWriter, item); err != nil {
return err
}
}
return nil
}
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return snapshottypes.CurrentFormat
func (m *mockCommitSnapshotter) SnapshotFormat() uint32 {
return snapshotstypes.CurrentFormat
}
func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{snapshottypes.CurrentFormat}
func (m *mockCommitSnapshotter) SupportedFormats() []uint32 {
return []uint32{snapshotstypes.CurrentFormat}
}
func (m *mockSnapshotter) PruneSnapshotHeight(height int64) {
m.prunedHeights[height] = struct{}{}
type mockStorageSnapshotter struct{}
func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *store.KVPair) error {
return nil
}
func (m *mockSnapshotter) GetSnapshotInterval() uint64 {
return m.snapshotInterval
}
type mockErrorCommitSnapshotter struct{}
func (m *mockSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
m.snapshotInterval = snapshotInterval
}
var _ snapshots.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil)
type mockErrorSnapshotter struct{}
var _ snapshottypes.Snapshotter = (*mockErrorSnapshotter)(nil)
func (m *mockErrorSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
return errors.New("mock snapshot error")
}
func (m *mockErrorSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
return snapshottypes.SnapshotItem{}, errors.New("mock restore error")
func (m *mockErrorCommitSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair,
) (snapshotstypes.SnapshotItem, error) {
return snapshotstypes.SnapshotItem{}, errors.New("mock restore error")
}
func (m *mockErrorSnapshotter) SnapshotFormat() uint32 {
return snapshottypes.CurrentFormat
func (m *mockErrorCommitSnapshotter) SnapshotFormat() uint32 {
return snapshotstypes.CurrentFormat
}
func (m *mockErrorSnapshotter) SupportedFormats() []uint32 {
return []uint32{snapshottypes.CurrentFormat}
}
func (m *mockErrorSnapshotter) PruneSnapshotHeight(height int64) {
}
func (m *mockErrorSnapshotter) GetSnapshotInterval() uint64 {
return 0
}
func (m *mockErrorSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 {
return []uint32{snapshotstypes.CurrentFormat}
}
// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
@ -208,10 +191,8 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
t.Helper()
store, err := snapshots.NewStore(db.NewMemDB(), t.TempDir())
require.NoError(t, err)
hung := newHungSnapshotter()
hung.SetSnapshotInterval(opts.Interval)
mgr := snapshots.NewManager(store, opts, hung, nil, log.NewNopLogger())
require.Equal(t, opts.Interval, hung.snapshotInterval)
hung := newHungCommitSnapshotter()
mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
// Channel to ensure the test doesn't finish until the goroutine is done.
// Without this, there are intermittent test failures about
@ -222,8 +203,6 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
defer close(done)
_, err := mgr.Create(1)
require.NoError(t, err)
_, didPruneHeight := hung.prunedHeights[1]
require.True(t, didPruneHeight)
}()
time.Sleep(10 * time.Millisecond)
@ -236,40 +215,29 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
return mgr
}
// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot.
type hungSnapshotter struct {
ch chan struct{}
prunedHeights map[int64]struct{}
snapshotInterval uint64
// hungCommitSnapshotter can be used to test operations in progress. Call close to end the snapshot.
type hungCommitSnapshotter struct {
ch chan struct{}
}
func newHungSnapshotter() *hungSnapshotter {
return &hungSnapshotter{
ch: make(chan struct{}),
prunedHeights: make(map[int64]struct{}),
func newHungCommitSnapshotter() *hungCommitSnapshotter {
return &hungCommitSnapshotter{
ch: make(chan struct{}),
}
}
func (m *hungSnapshotter) Close() {
func (m *hungCommitSnapshotter) Close() {
close(m.ch)
}
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
func (m *hungCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
return nil
}
func (m *hungSnapshotter) PruneSnapshotHeight(height int64) {
m.prunedHeights[height] = struct{}{}
}
func (m *hungSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
m.snapshotInterval = snapshotInterval
}
func (m *hungSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
func (m *hungCommitSnapshotter) Restore(
height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair,
) (snapshotstypes.SnapshotItem, error) {
panic("not implemented")
}
@ -299,16 +267,16 @@ func (s *extSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}
func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error {
func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshots.ExtensionPayloadWriter) error {
for _, i := range s.state {
if err := payloadWriter(snapshottypes.Uint64ToBigEndian(i)); err != nil {
if err := payloadWriter(snapshotstypes.Uint64ToBigEndian(i)); err != nil {
return err
}
}
return nil
}
func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error {
func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshots.ExtensionPayloadReader) error {
for {
payload, err := payloadReader()
if err == io.EOF {
@ -316,7 +284,7 @@ func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadR
} else if err != nil {
return err
}
s.state = append(s.state, snapshottypes.BigEndianToUint64(payload))
s.state = append(s.state, snapshotstypes.BigEndianToUint64(payload))
}
// finalize restoration
return nil

View File

@ -31,13 +31,16 @@ import (
// 2. io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
// errors via io.Pipe.CloseWithError().
type Manager struct {
extensions map[string]types.ExtensionSnapshotter
extensions map[string]ExtensionSnapshotter
// store is the snapshot store where all completed snapshots are persisted.
store *Store
opts types.SnapshotOptions
// multistore is the store from which snapshots are taken.
multistore types.Snapshotter
logger log.Logger
opts SnapshotOptions
// commitSnapshotter is the snapshotter for the commitment state.
commitSnapshotter CommitSnapshotter
// storageSnapshotter is the snapshotter for the storage state.
storageSnapshotter StorageSnapshotter
logger log.Logger
mtx sync.Mutex
operation operation
@ -62,8 +65,9 @@ const (
opPrune operation = "prune"
opRestore operation = "restore"
chunkBufferSize = 4
chunkIDBufferSize = 1024
chunkBufferSize = 4
chunkIDBufferSize = 1024
defaultStorageChannelBufferSize = 1024
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
@ -71,23 +75,24 @@ const (
var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0")
// NewManager creates a new manager.
func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager {
func NewManager(store *Store, opts SnapshotOptions, commitSnapshotter CommitSnapshotter, storageSnapshotter StorageSnapshotter, extensions map[string]ExtensionSnapshotter, logger log.Logger) *Manager {
if extensions == nil {
extensions = map[string]types.ExtensionSnapshotter{}
extensions = map[string]ExtensionSnapshotter{}
}
return &Manager{
store: store,
opts: opts,
multistore: multistore,
extensions: extensions,
logger: logger,
store: store,
opts: opts,
commitSnapshotter: commitSnapshotter,
storageSnapshotter: storageSnapshotter,
extensions: extensions,
logger: logger.With("module", "snapshot_manager"),
}
}
// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
func (m *Manager) RegisterExtensions(extensions ...ExtensionSnapshotter) error {
if m.extensions == nil {
m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions))
m.extensions = make(map[string]ExtensionSnapshotter, len(extensions))
}
for _, extension := range extensions {
name := extension.SnapshotName()
@ -161,11 +166,9 @@ func (m *Manager) GetSnapshotBlockRetentionHeights() int64 {
// Create creates a snapshot and returns its metadata.
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
if m == nil {
return nil, errorsmod.Wrap(store.ErrLogic, "no snapshot store configured")
return nil, errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil")
}
defer m.multistore.PruneSnapshotHeight(int64(height))
err := m.begin(opSnapshot)
if err != nil {
return nil, err
@ -201,7 +204,7 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
}
}()
if err := m.multistore.Snapshot(height, streamWriter); err != nil {
if err := m.commitSnapshotter.Snapshot(height, streamWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
@ -363,7 +366,20 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.
return payload.Payload, nil
}
nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
// chStorage is the channel to pass the KV pairs to the storage snapshotter.
chStorage := make(chan *store.KVPair, defaultStorageChannelBufferSize)
defer close(chStorage)
storageErrs := make(chan error, 1)
go func() {
defer close(storageErrs)
err := m.storageSnapshotter.Restore(snapshot.Height, chStorage)
if err != nil {
storageErrs <- err
}
}()
nextItem, err = m.commitSnapshotter.Restore(snapshot.Height, snapshot.Format, streamReader, chStorage)
if err != nil {
return errorsmod.Wrap(err, "multistore restore")
}
@ -393,6 +409,12 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.
return errorsmod.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name)
}
}
// wait for storage snapshotter to complete
if err := <-storageErrs; err != nil {
return errorsmod.Wrap(err, "storage snapshotter")
}
return nil
}
@ -495,7 +517,7 @@ func (m *Manager) sortedExtensionNames() []string {
}
// IsFormatSupported returns if the snapshotter supports restoration from given format.
func IsFormatSupported(snapshotter types.ExtensionSnapshotter, format uint32) bool {
func IsFormatSupported(snapshotter ExtensionSnapshotter, format uint32) bool {
for _, i := range snapshotter.SupportedFormats() {
if i == format {
return true

View File

@ -13,14 +13,13 @@ import (
"cosmossdk.io/store/v2/snapshots/types"
)
var opts = types.NewSnapshotOptions(1500, 2)
var opts = snapshots.NewSnapshotOptions(1500, 2)
func TestManager_List(t *testing.T) {
store := setupStore(t)
snapshotter := &mockSnapshotter{}
snapshotter.SetSnapshotInterval(opts.Interval)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
require.Equal(t, opts.Interval, snapshotter.GetSnapshotInterval())
commitSnapshotter := &mockCommitSnapshotter{}
storageSnapshotter := &mockStorageSnapshotter{}
manager := snapshots.NewManager(store, opts, commitSnapshotter, storageSnapshotter, nil, log.NewNopLogger())
mgrList, err := manager.List()
require.NoError(t, err)
@ -41,7 +40,7 @@ func TestManager_List(t *testing.T) {
func TestManager_LoadChunk(t *testing.T) {
store := setupStore(t)
manager := snapshots.NewManager(store, opts, &mockSnapshotter{}, nil, log.NewNopLogger())
manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
// Existing chunk should return body
chunk, err := manager.LoadChunk(2, 1, 1)
@ -67,14 +66,13 @@ func TestManager_Take(t *testing.T) {
{4, 5, 6},
{7, 8, 9},
}
snapshotter := &mockSnapshotter{
items: items,
prunedHeights: make(map[int64]struct{}),
commitSnapshotter := &mockCommitSnapshotter{
items: items,
}
extSnapshotter := newExtSnapshotter(10)
expectChunks := snapshotItems(items, extSnapshotter)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
manager := snapshots.NewManager(store, opts, commitSnapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)
@ -85,18 +83,14 @@ func TestManager_Take(t *testing.T) {
// creating a snapshot at a lower height than the latest should error
_, err = manager.Create(3)
require.Error(t, err)
_, didPruneHeight := snapshotter.prunedHeights[3]
require.True(t, didPruneHeight)
// creating a snapshot at a higher height should be fine, and should return it
snapshot, err := manager.Create(5)
require.NoError(t, err)
_, didPruneHeight = snapshotter.prunedHeights[5]
require.True(t, didPruneHeight)
assert.Equal(t, &types.Snapshot{
Height: 5,
Format: snapshotter.SnapshotFormat(),
Format: commitSnapshotter.SnapshotFormat(),
Chunks: 1,
Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b},
Metadata: types.Metadata{
@ -117,9 +111,7 @@ func TestManager_Take(t *testing.T) {
func TestManager_Prune(t *testing.T) {
store := setupStore(t)
snapshotter := &mockSnapshotter{}
snapshotter.SetSnapshotInterval(opts.Interval)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
pruned, err := manager.Prune(2)
require.NoError(t, err)
@ -137,11 +129,9 @@ func TestManager_Prune(t *testing.T) {
func TestManager_Restore(t *testing.T) {
store := setupStore(t)
target := &mockSnapshotter{
prunedHeights: make(map[int64]struct{}),
}
target := &mockCommitSnapshotter{}
extSnapshotter := newExtSnapshotter(0)
manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger())
manager := snapshots.NewManager(store, opts, target, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)
@ -191,8 +181,6 @@ func TestManager_Restore(t *testing.T) {
// While the restore is in progress, any other operations fail
_, err = manager.Create(4)
require.Error(t, err)
_, didPruneHeight := target.prunedHeights[4]
require.True(t, didPruneHeight)
_, err = manager.Prune(1)
require.Error(t, err)
@ -248,10 +236,10 @@ func TestManager_Restore(t *testing.T) {
}
func TestManager_TakeError(t *testing.T) {
snapshotter := &mockErrorSnapshotter{}
snapshotter := &mockErrorCommitSnapshotter{}
store, err := snapshots.NewStore(db.NewMemDB(), GetTempDir(t))
require.NoError(t, err)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
_, err = manager.Create(1)
require.Error(t, err)

View File

@ -1,4 +1,4 @@
package types
package snapshots
// SnapshotOptions defines the snapshot strategy used when determining which
// heights are snapshotted for state sync.

View File

@ -1,29 +1,26 @@
package types
package snapshots
import (
protoio "github.com/cosmos/gogoproto/io"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots/types"
)
// Snapshotter is something that can create and restore snapshots, consisting of streamed binary
// chunks - all of which must be read from the channel and closed. If an unsupported format is
// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf).
type Snapshotter interface {
// Snapshot writes snapshot items into the protobuf writer.
Snapshot(height uint64, protoWriter protoio.Writer) error
// CommitSnapshotter defines an API for creating and restoring snapshots of the
// commitment state.
type CommitSnapshotter interface {
// Snapshot writes a snapshot of the commitment state at the given version.
Snapshot(version uint64, protoWriter protoio.Writer) error
// PruneSnapshotHeight prunes the given height according to the prune strategy.
// If PruneNothing, this is a no-op.
// If other strategy, this height is persisted until it is
// less than <current height> - KeepRecent and <current height> % Interval == 0
PruneSnapshotHeight(height int64)
// Restore restores the commitment state from the snapshot reader.
Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (types.SnapshotItem, error)
}
// SetSnapshotInterval sets the interval at which the snapshots are taken.
// It is used by the store that implements the Snapshotter interface
// to determine which heights to retain until after the snapshot is complete.
SetSnapshotInterval(snapshotInterval uint64)
// Restore restores a state snapshot, taking the reader of protobuf message stream as input.
Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error)
// StorageSnapshotter defines an API for restoring snapshots of the storage state.
type StorageSnapshotter interface {
// Restore restores the storage state from the given channel.
Restore(version uint64, chStorage <-chan *store.KVPair) error
}
// ExtensionPayloadReader read extension payloads,

25
store/storage/database.go Normal file
View File

@ -0,0 +1,25 @@
package storage
import (
"io"
"cosmossdk.io/store/v2"
)
// Database is an interface that wraps the storage database methods. A wrapper
// is useful for instances where you want to perform logic that is identical for all SS
// backends, such as restoring snapshots.
type Database interface {
NewBatch(version uint64) (store.Batch, error)
Has(storeKey string, version uint64, key []byte) (bool, error)
Get(storeKey string, version uint64, key []byte) ([]byte, error)
GetLatestVersion() (uint64, error)
SetLatestVersion(version uint64) error
Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error)
ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error)
Prune(version uint64) error
io.Closer
}

View File

@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
)
const (
@ -25,7 +26,7 @@ const (
tombstoneVal = "TOMBSTONE"
)
var _ store.VersionedDatabase = (*Database)(nil)
var _ storage.Database = (*Database)(nil)
type Database struct {
storage *pebble.DB
@ -92,6 +93,15 @@ func (db *Database) Close() error {
return err
}
func (db *Database) NewBatch(version uint64) (store.Batch, error) {
b, err := NewBatch(db.storage, version, db.sync)
if err != nil {
return nil, err
}
return b, nil
}
func (db *Database) SetLatestVersion(version uint64) error {
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], version)
@ -175,29 +185,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by
return nil, nil
}
func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error {
b, err := NewBatch(db.storage, version, db.sync)
if err != nil {
return err
}
for storeKey, pairs := range cs.Pairs {
for _, kvPair := range pairs {
if kvPair.Value == nil {
if err := b.Delete(storeKey, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
}
}
return b.Write()
}
// Prune removes all versions of all keys that are <= the given version.
//
// Note, the implementation of this method is inefficient and can be potentially

View File

@ -19,7 +19,7 @@ func TestStorageTestSuite(t *testing.T) {
db.SetSync(false)
}
return db, err
return storage.NewStorageStore(db), err
},
EmptyBatchSize: 12,
}

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/exp/slices"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/util"
)
@ -23,7 +24,7 @@ const (
)
var (
_ store.VersionedDatabase = (*Database)(nil)
_ storage.Database = (*Database)(nil)
defaultWriteOpts = grocksdb.NewDefaultWriteOptions()
defaultReadOpts = grocksdb.NewDefaultReadOptions()
@ -90,6 +91,10 @@ func (db *Database) Close() error {
return nil
}
func (db *Database) NewBatch(version uint64) (store.Batch, error) {
return NewBatch(db, version), nil
}
func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) {
if version < db.tsLow {
return nil, store.ErrVersionPruned{EarliestVersion: db.tsLow}
@ -141,26 +146,6 @@ func (db *Database) Get(storeKey string, version uint64, key []byte) ([]byte, er
return copyAndFreeSlice(slice), nil
}
func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error {
b := NewBatch(db, version)
for storeKey, pairs := range cs.Pairs {
for _, kvPair := range pairs {
if kvPair.Value == nil {
if err := b.Delete(storeKey, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
}
}
return b.Write()
}
// Prune attempts to prune all versions up to and including the provided version.
// This is done internally by updating the full_history_ts_low RocksDB value on
// the column families, s.t. all versions less than full_history_ts_low will be

View File

@ -21,7 +21,8 @@ const (
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
return New(dir)
db, err := New(dir)
return storage.NewStorageStore(db), err
},
EmptyBatchSize: 12,
}
@ -33,15 +34,15 @@ func TestDatabase_ReverseIterator(t *testing.T) {
require.NoError(t, err)
defer db.Close()
cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}})
batch := NewBatch(db, 1)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)})
require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val)))
}
require.NoError(t, db.ApplyChangeset(1, cs))
require.NoError(t, batch.Write())
// reverse iterator without an end key
iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil)

View File

@ -11,6 +11,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
)
const (
@ -40,7 +41,7 @@ const (
`
)
var _ store.VersionedDatabase = (*Database)(nil)
var _ storage.Database = (*Database)(nil)
type Database struct {
storage *sql.DB
@ -91,6 +92,10 @@ func (db *Database) Close() error {
return err
}
func (db *Database) NewBatch(version uint64) (store.Batch, error) {
return NewBatch(db.storage, version)
}
func (db *Database) GetLatestVersion() (uint64, error) {
stmt, err := db.storage.Prepare("SELECT value FROM state_storage WHERE store_key = ? AND key = ?")
if err != nil {
@ -168,29 +173,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by
return nil, nil
}
func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error {
b, err := NewBatch(db.storage, version)
if err != nil {
return err
}
for storeKey, pairs := range cs.Pairs {
for _, kvPair := range pairs {
if kvPair.Value == nil {
if err := b.Delete(storeKey, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
}
}
return b.Write()
}
// Prune removes all versions of all keys that are <= the given version. It keeps
// the latest (non-tombstoned) version of each key/value tuple to handle queries
// above the prune version. This is analogous to RocksDB full_history_ts_low.

View File

@ -19,7 +19,8 @@ const (
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
return New(dir)
db, err := New(dir)
return storage.NewStorageStore(db), err
},
EmptyBatchSize: 0,
}
@ -31,15 +32,16 @@ func TestDatabase_ReverseIterator(t *testing.T) {
require.NoError(t, err)
defer db.Close()
cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}})
batch, err := db.NewBatch(1)
require.NoError(t, err)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)})
require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val)))
}
require.NoError(t, db.ApplyChangeset(1, cs))
require.NoError(t, batch.Write())
// reverse iterator without an end key
iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil)
@ -106,15 +108,16 @@ func TestParallelWrites(t *testing.T) {
go func(i int) {
<-triggerStartCh
defer wg.Done()
cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}})
batch, err := db.NewBatch(uint64(i + 1))
require.NoError(t, err)
for j := 0; j < kvCount; j++ {
key := fmt.Sprintf("key-%d-%03d", i, j)
val := fmt.Sprintf("val-%d-%03d", i, j)
cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)})
require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val)))
}
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs))
require.NoError(t, batch.Write())
}(i)
}
@ -155,15 +158,16 @@ func TestParallelWriteAndPruning(t *testing.T) {
<-triggerStartCh
defer wg.Done()
for i := 0; i < latestVersion; i++ {
cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}})
batch, err := db.NewBatch(uint64(i + 1))
require.NoError(t, err)
for j := 0; j < kvCount; j++ {
key := fmt.Sprintf("key-%d-%03d", i, j)
val := fmt.Sprintf("val-%d-%03d", i, j)
cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)})
require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val)))
}
require.NoError(t, db.ApplyChangeset(uint64(i+1), cs))
require.NoError(t, batch.Write())
}
}()
// start a goroutine that prunes the database

View File

@ -1,7 +1,7 @@
//go:build rocksdb
// +build rocksdb
package storage
package storage_test
import (
"bytes"
@ -13,21 +13,29 @@ import (
"github.com/stretchr/testify/require"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/pebbledb"
"cosmossdk.io/store/v2/storage/rocksdb"
"cosmossdk.io/store/v2/storage/sqlite"
)
const (
storeKey1 = "store1"
)
var (
backends = map[string]func(dataDir string) (store.VersionedDatabase, error){
"rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) {
return rocksdb.New(dataDir)
db, err := rocksdb.New(dataDir)
return storage.NewStorageStore(db), err
},
"pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) {
return pebbledb.New(dataDir)
db, err := pebbledb.New(dataDir)
return storage.NewStorageStore(db), err
},
"btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) {
return sqlite.New(dataDir)
db, err := sqlite.New(dataDir)
return storage.NewStorageStore(db), err
},
}
rng = rand.New(rand.NewSource(567320))

130
store/storage/store.go Normal file
View File

@ -0,0 +1,130 @@
package storage
import (
"fmt"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
)
const (
// TODO: it is a random number, need to be tuned
defaultBatchBufferSize = 100000
)
var (
_ store.VersionedDatabase = (*StorageStore)(nil)
_ snapshots.StorageSnapshotter = (*StorageStore)(nil)
)
// StorageStore is a wrapper around the store.VersionedDatabase interface.
type StorageStore struct {
db Database
}
// NewStorageStore returns a reference to a new StorageStore.
func NewStorageStore(db Database) *StorageStore {
return &StorageStore{
db: db,
}
}
// Has returns true if the key exists in the store.
func (ss *StorageStore) Has(storeKey string, version uint64, key []byte) (bool, error) {
return ss.db.Has(storeKey, version, key)
}
// Get returns the value associated with the given key.
func (ss *StorageStore) Get(storeKey string, version uint64, key []byte) ([]byte, error) {
return ss.db.Get(storeKey, version, key)
}
// ApplyChangeset applies the given changeset to the storage.
func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) error {
b, err := ss.db.NewBatch(version)
if err != nil {
return err
}
for storeKey, pairs := range cs.Pairs {
for _, kvPair := range pairs {
if kvPair.Value == nil {
if err := b.Delete(storeKey, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
}
}
return b.Write()
}
// GetLatestVersion returns the latest version of the store.
func (ss *StorageStore) GetLatestVersion() (uint64, error) {
return ss.db.GetLatestVersion()
}
// SetLatestVersion sets the latest version of the store.
func (ss *StorageStore) SetLatestVersion(version uint64) error {
return ss.db.SetLatestVersion(version)
}
// Iterator returns an iterator over the specified domain and prefix.
func (ss *StorageStore) Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) {
return ss.db.Iterator(storeKey, version, start, end)
}
// ReverseIterator returns an iterator over the specified domain and prefix in reverse.
func (ss *StorageStore) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) {
return ss.db.ReverseIterator(storeKey, version, start, end)
}
// Prune prunes the store up to the given version.
func (ss *StorageStore) Prune(version uint64) error {
return ss.db.Prune(version)
}
// Restore restores the store from the given channel.
func (ss *StorageStore) Restore(version uint64, chStorage <-chan *store.KVPair) error {
latestVersion, err := ss.db.GetLatestVersion()
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
if version <= latestVersion {
return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion)
}
b, err := ss.db.NewBatch(version)
if err != nil {
return err
}
for kvPair := range chStorage {
if err := b.Set(kvPair.StoreKey, kvPair.Key, kvPair.Value); err != nil {
return err
}
if b.Size() > defaultBatchBufferSize {
if err := b.Write(); err != nil {
return err
}
}
}
if b.Size() > 0 {
if err := b.Write(); err != nil {
return err
}
}
return ss.db.SetLatestVersion(version)
}
// Close closes the store.
func (ss *StorageStore) Close() error {
return ss.db.Close()
}