feat: allow no migration of commitment (#20181)
This commit is contained in:
parent
43764cff30
commit
14f3ca0ae9
@ -380,7 +380,7 @@ func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio
|
||||
var (
|
||||
importer Importer
|
||||
snapshotItem snapshotstypes.SnapshotItem
|
||||
storeKey string
|
||||
storeKey []byte
|
||||
)
|
||||
|
||||
loop:
|
||||
@ -402,10 +402,10 @@ loop:
|
||||
importer.Close()
|
||||
}
|
||||
|
||||
storeKey = item.Store.Name
|
||||
storeKey = []byte(item.Store.Name)
|
||||
tree := c.multiTrees[item.Store.Name]
|
||||
if tree == nil {
|
||||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey)
|
||||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", item.Store.Name)
|
||||
}
|
||||
importer, err = tree.Import(version)
|
||||
if err != nil {
|
||||
@ -432,15 +432,13 @@ loop:
|
||||
node.Value = []byte{}
|
||||
}
|
||||
|
||||
key := []byte(storeKey)
|
||||
// If the node is a leaf node, it will be written to the storage.
|
||||
chStorage <- &corestore.StateChanges{
|
||||
Actor: key,
|
||||
Actor: storeKey,
|
||||
StateChanges: []corestore.KVPair{
|
||||
{
|
||||
Key: node.Key,
|
||||
Value: node.Value,
|
||||
Remove: false,
|
||||
Key: node.Key,
|
||||
Value: node.Value,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@ -2,7 +2,9 @@ package migration
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -14,6 +16,7 @@ import (
|
||||
"cosmossdk.io/store/v2/commitment"
|
||||
"cosmossdk.io/store/v2/internal/encoding"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
)
|
||||
|
||||
@ -49,6 +52,8 @@ type Manager struct {
|
||||
}
|
||||
|
||||
// NewManager returns a new Manager.
|
||||
//
|
||||
// NOTE: `sc` can be `nil` if don't want to migrate the commitment.
|
||||
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
|
||||
return &Manager{
|
||||
logger: logger,
|
||||
@ -106,8 +111,51 @@ func (m *Manager) Migrate(height uint64) error {
|
||||
})
|
||||
eg.Go(func() error {
|
||||
defer close(chStorage)
|
||||
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
|
||||
return err
|
||||
if m.stateCommitment != nil {
|
||||
if _, err := m.stateCommitment.Restore(height, 0, ms, chStorage); err != nil {
|
||||
return err
|
||||
}
|
||||
} else { // there is no commitment migration, just consume the stream to restore the state storage
|
||||
var storeKey []byte
|
||||
loop:
|
||||
for {
|
||||
snapshotItem := snapshotstypes.SnapshotItem{}
|
||||
err := ms.ReadMsg(&snapshotItem)
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read snapshot item: %w", err)
|
||||
}
|
||||
switch item := snapshotItem.Item.(type) {
|
||||
case *snapshotstypes.SnapshotItem_Store:
|
||||
storeKey = []byte(item.Store.Name)
|
||||
case *snapshotstypes.SnapshotItem_IAVL:
|
||||
if item.IAVL.Height == 0 { // only restore the leaf nodes
|
||||
key := item.IAVL.Key
|
||||
if key == nil {
|
||||
key = []byte{}
|
||||
}
|
||||
value := item.IAVL.Value
|
||||
if value == nil {
|
||||
value = []byte{}
|
||||
}
|
||||
chStorage <- &corestore.StateChanges{
|
||||
Actor: storeKey,
|
||||
StateChanges: []corestore.KVPair{
|
||||
{
|
||||
Key: key,
|
||||
Value: value,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
@ -186,12 +234,13 @@ func (m *Manager) Sync() error {
|
||||
if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal changeset: %w", err)
|
||||
}
|
||||
|
||||
if err := m.stateCommitment.WriteBatch(cs); err != nil {
|
||||
return fmt.Errorf("failed to write changeset to commitment: %w", err)
|
||||
}
|
||||
if _, err := m.stateCommitment.Commit(version); err != nil {
|
||||
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
|
||||
if m.stateCommitment != nil {
|
||||
if err := m.stateCommitment.WriteBatch(cs); err != nil {
|
||||
return fmt.Errorf("failed to write changeset to commitment: %w", err)
|
||||
}
|
||||
if _, err := m.stateCommitment.Commit(version); err != nil {
|
||||
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
|
||||
}
|
||||
}
|
||||
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
|
||||
return fmt.Errorf("failed to write changeset to storage: %w", err)
|
||||
@ -212,7 +261,9 @@ func (m *Manager) Close() error {
|
||||
if err := m.db.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close db: %w", err)
|
||||
}
|
||||
m.snapshotsManager.EndMigration(m.stateCommitment)
|
||||
if m.stateCommitment != nil {
|
||||
m.snapshotsManager.EndMigration(m.stateCommitment)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ import (
|
||||
|
||||
var storeKeys = []string{"store1", "store2"}
|
||||
|
||||
func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitment.CommitStore) {
|
||||
t.Helper()
|
||||
|
||||
db := dbm.NewMemDB()
|
||||
@ -49,57 +49,66 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
|
||||
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
|
||||
require.NoError(t, err)
|
||||
if noCommitStore {
|
||||
newCommitStore = nil
|
||||
}
|
||||
|
||||
return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
|
||||
}
|
||||
|
||||
func TestMigrateState(t *testing.T) {
|
||||
m, orgCommitStore := setupMigrationManager(t)
|
||||
for _, noCommitStore := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("Migrate noCommitStore=%v", noCommitStore), func(t *testing.T) {
|
||||
m, orgCommitStore := setupMigrationManager(t, noCommitStore)
|
||||
|
||||
// apply changeset
|
||||
toVersion := uint64(100)
|
||||
keyCount := 10
|
||||
for version := uint64(1); version <= toVersion; version++ {
|
||||
cs := corestore.NewChangeset()
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
|
||||
}
|
||||
}
|
||||
require.NoError(t, orgCommitStore.WriteBatch(cs))
|
||||
_, err := orgCommitStore.Commit(version)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := m.Migrate(toVersion - 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check the migrated state
|
||||
for version := uint64(1); version < toVersion; version++ {
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
|
||||
// apply changeset
|
||||
toVersion := uint64(100)
|
||||
keyCount := 10
|
||||
for version := uint64(1); version <= toVersion; version++ {
|
||||
cs := corestore.NewChangeset()
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
|
||||
}
|
||||
}
|
||||
require.NoError(t, orgCommitStore.WriteBatch(cs))
|
||||
_, err := orgCommitStore.Commit(version)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
// check the latest state
|
||||
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, val)
|
||||
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, val)
|
||||
|
||||
// check the storage
|
||||
for version := uint64(1); version < toVersion; version++ {
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
|
||||
err := m.Migrate(toVersion - 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
if m.stateCommitment != nil {
|
||||
// check the migrated state
|
||||
for version := uint64(1); version < toVersion; version++ {
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
// check the latest state
|
||||
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
|
||||
require.Nil(t, val)
|
||||
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, val)
|
||||
}
|
||||
}
|
||||
|
||||
// check the storage
|
||||
for version := uint64(1); version < toVersion; version++ {
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < keyCount; i++ {
|
||||
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -395,7 +395,10 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
|
||||
if err := s.stateCommitment.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close the old SC store: %w", err)
|
||||
}
|
||||
s.stateCommitment = s.migrationManager.GetStateCommitment()
|
||||
newStateCommitment := s.migrationManager.GetStateCommitment()
|
||||
if newStateCommitment != nil {
|
||||
s.stateCommitment = newStateCommitment
|
||||
}
|
||||
if err := s.migrationManager.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close migration manager: %w", err)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user