feat(store/v2): Implement State Migration (#19327)

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
cool-developer 2024-02-13 12:26:16 -05:00 committed by GitHub
parent 3461c64afb
commit 92eb6de6e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 276 additions and 0 deletions

View File

@ -0,0 +1,61 @@
package migration
import (
"golang.org/x/sync/errgroup"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
)
const (
// defaultChannelBufferSize is the default buffer size for the migration stream.
defaultChannelBufferSize = 1024
// defaultStorageBufferSize is the default buffer size for the storage snapshotter.
defaultStorageBufferSize = 1024
)
// Manager manages the migration of the whole state from store/v1 to store/v2.
type Manager struct {
logger log.Logger
snapshotsManager *snapshots.Manager
storageSnapshotter snapshots.StorageSnapshotter
commitSnapshotter snapshots.CommitSnapshotter
}
// NewManager returns a new Manager.
func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager {
return &Manager{
logger: logger,
snapshotsManager: sm,
storageSnapshotter: ss,
commitSnapshotter: cs,
}
}
// Migrate migrates the whole state at the given height to the new store/v2.
func (m *Manager) Migrate(height uint64) error {
// create the migration stream and snapshot,
// which acts as protoio.Reader and snapshots.WriteCloser.
ms := NewMigrationStream(defaultChannelBufferSize)
if err := m.snapshotsManager.CreateMigration(height, ms); err != nil {
return err
}
// restore the snapshot
chStorage := make(chan *store.KVPair, defaultStorageBufferSize)
eg := new(errgroup.Group)
eg.Go(func() error {
return m.storageSnapshotter.Restore(height, chStorage)
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage)
return err
})
return eg.Wait()
}

View File

@ -0,0 +1,105 @@
package migration
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/pebbledb"
)
var storeKeys = []string{"store1", "store2"}
func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
t.Helper()
db := dbm.NewMemDB()
multiTrees := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)
snapshotsStore, err := snapshots.NewStore(db, t.TempDir())
require.NoError(t, err)
snapshotsManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, log.NewNopLogger())
storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB) // for store/v2
db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db1, []byte(storeKey))
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}
func TestMigrateState(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t)
// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := store.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)))
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
err := m.Migrate(toVersion - 1)
require.NoError(t, err)
// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)
// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
}

79
store/migration/stream.go Normal file
View File

@ -0,0 +1,79 @@
package migration
import (
"fmt"
"io"
"sync/atomic"
protoio "github.com/cosmos/gogoproto/io"
"github.com/cosmos/gogoproto/proto"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)
var (
_ snapshots.WriteCloser = (*MigrationStream)(nil)
_ protoio.ReadCloser = (*MigrationStream)(nil)
)
// MigrationStream is a stream for migrating the whole IAVL state as a snapshot.
// It's used to sync the whole state from the store/v1 to store/v2.
// The main idea is to use the same snapshotter interface without writing to disk.
type MigrationStream struct {
chBuffer chan proto.Message
err atomic.Value // atomic error
}
// NewMigrationStream returns a new MigrationStream.
func NewMigrationStream(chBufferSize int) *MigrationStream {
return &MigrationStream{
chBuffer: make(chan proto.Message, chBufferSize),
}
}
// WriteMsg implements protoio.Write interface.
func (ms *MigrationStream) WriteMsg(msg proto.Message) error {
ms.chBuffer <- msg
return nil
}
// CloseWithError implements snapshots.WriteCloser interface.
func (ms *MigrationStream) CloseWithError(err error) {
ms.err.Store(err)
close(ms.chBuffer)
}
// ReadMsg implements the protoio.Read interface.
//
// NOTE: It we follow the pattern of snapshot.Restore, however, the migration is done in memory.
// It doesn't require any deserialization -- just passing the pointer to the <msg>.
func (ms *MigrationStream) ReadMsg(msg proto.Message) error {
// msg should be a pointer to the same type as the one written to the stream
snapshotsItem, ok := msg.(*snapshotstypes.SnapshotItem)
if !ok {
return fmt.Errorf("unexpected message type: %T", msg)
}
// It doesn't require any deserialization, just a type assertion.
item := <-ms.chBuffer
if item == nil {
return io.EOF
}
*snapshotsItem = *(item.(*snapshotstypes.SnapshotItem))
// check if there is an error from the writer.
err := ms.err.Load()
if err != nil {
return err.(error)
}
return nil
}
// Close implements io.Closer interface.
func (ms *MigrationStream) Close() error {
close(ms.chBuffer)
return nil
}

View File

@ -233,6 +233,30 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
}
}
// CreateMigration creates a migration snapshot and writes it to the given writer.
// It is used to migrate the state from the original store to the store/v2.
func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error {
if m == nil {
return errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil")
}
err := m.begin(opSnapshot)
if err != nil {
return err
}
defer m.end()
go func() {
if err := m.commitSnapshotter.Snapshot(height, protoWriter); err != nil {
protoWriter.CloseWithError(err)
return
}
_ = protoWriter.Close() // always return nil
}()
return nil
}
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()

View File

@ -19,6 +19,13 @@ const (
snapshotCompressionLevel = 7
)
type WriteCloser interface {
protoio.WriteCloser
// CloseWithError closes the writer and sends an error to the reader.
CloseWithError(err error)
}
// StreamWriter set up a stream pipeline to serialize snapshot nodes:
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
type StreamWriter struct {