feat(store/v2): add the catch up process in the migration (#19454)

This commit is contained in:
cool-developer 2024-03-20 18:23:41 -04:00 committed by GitHub
parent 4952cb2792
commit 27a231ae48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 668 additions and 55 deletions

View File

@ -13,7 +13,7 @@ type Batch interface {
Write() error
// Reset resets the batch.
Reset()
Reset() error
}
// RawBatch represents a group of writes. They may or may not be written atomically depending on the

View File

@ -0,0 +1,125 @@
package encoding
import (
"bytes"
"fmt"
corestore "cosmossdk.io/core/store"
)
// encodedSize returns the size of the encoded Changeset.
func encodedSize(cs *corestore.Changeset) int {
size := EncodeUvarintSize(uint64(len(cs.Changes)))
for _, changes := range cs.Changes {
size += EncodeBytesSize(changes.Actor)
size += EncodeUvarintSize(uint64(len(changes.StateChanges)))
for _, pair := range changes.StateChanges {
size += EncodeBytesSize(pair.Key)
size += EncodeUvarintSize(1) // pair.Remove
if !pair.Remove {
size += EncodeBytesSize(pair.Value)
}
}
}
return size
}
// MarshalChangeset returns the encoded byte representation of Changeset.
// NOTE: The Changeset is encoded as follows:
// - number of store keys (uvarint)
// - for each store key:
// -- store key (bytes)
// -- number of pairs (uvarint)
// -- for each pair:
// --- key (bytes)
// --- remove (1 byte)
// --- value (bytes)
func MarshalChangeset(cs *corestore.Changeset) ([]byte, error) {
var buf bytes.Buffer
buf.Grow(encodedSize(cs))
if err := EncodeUvarint(&buf, uint64(len(cs.Changes))); err != nil {
return nil, err
}
for _, changes := range cs.Changes {
if err := EncodeBytes(&buf, changes.Actor); err != nil {
return nil, err
}
if err := EncodeUvarint(&buf, uint64(len(changes.StateChanges))); err != nil {
return nil, err
}
for _, pair := range changes.StateChanges {
if err := EncodeBytes(&buf, pair.Key); err != nil {
return nil, err
}
if pair.Remove {
if err := EncodeUvarint(&buf, 1); err != nil {
return nil, err
}
} else {
if err := EncodeUvarint(&buf, 0); err != nil {
return nil, err
}
if err := EncodeBytes(&buf, pair.Value); err != nil {
return nil, err
}
}
}
}
return buf.Bytes(), nil
}
// UnmarshalChangeset decodes the Changeset from the given byte slice.
func UnmarshalChangeset(cs *corestore.Changeset, buf []byte) error {
storeCount, n, err := DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]
changes := make([]corestore.StateChanges, storeCount)
for i := uint64(0); i < storeCount; i++ {
storeKey, n, err := DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]
pairCount, n, err := DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]
pairs := make([]corestore.KVPair, pairCount)
for j := uint64(0); j < pairCount; j++ {
pairs[j].Key, n, err = DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]
remove, n, err := DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]
if remove == 0 {
pairs[j].Remove = false
pairs[j].Value, n, err = DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]
} else if remove == 1 {
pairs[j].Remove = true
} else {
return fmt.Errorf("invalid remove flag: %d", remove)
}
}
changes[i] = corestore.StateChanges{Actor: storeKey, StateChanges: pairs}
}
cs.Changes = changes
return nil
}

View File

@ -0,0 +1,94 @@
package encoding
import (
"testing"
corestore "cosmossdk.io/core/store"
"github.com/stretchr/testify/require"
)
func TestChangesetMarshal(t *testing.T) {
testcases := []struct {
name string
changeset *corestore.Changeset
encodedSize int
encodedBytes []byte
}{
{
name: "empty",
changeset: corestore.NewChangeset(),
encodedSize: 1,
encodedBytes: []byte{0x0},
},
{
name: "one store",
changeset: &corestore.Changeset{Changes: []corestore.StateChanges{
{
Actor: []byte("storekey"),
StateChanges: corestore.KVPairs{
{Key: []byte("key"), Value: []byte("value"), Remove: false},
},
},
}},
encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 1 + 5,
encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x0, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65},
},
{
name: "one remove store",
changeset: &corestore.Changeset{Changes: []corestore.StateChanges{
{
Actor: []byte("storekey"),
StateChanges: corestore.KVPairs{
{Key: []byte("key"), Remove: true},
},
},
}},
encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1,
encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x1},
},
{
name: "two stores",
changeset: &corestore.Changeset{Changes: []corestore.StateChanges{
{
Actor: []byte("storekey1"),
StateChanges: corestore.KVPairs{
{Key: []byte("key1"), Value: []byte("value1"), Remove: false},
},
},
{
Actor: []byte("storekey2"),
StateChanges: corestore.KVPairs{
{Key: []byte("key2"), Value: []byte("value2"), Remove: false},
{Key: []byte("key1"), Remove: true},
},
},
}},
encodedSize: 2 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 1 + 6 + 1 + 4 + 1,
// encodedBytes: it is not deterministic,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// check the encoded size
require.Equal(t, encodedSize(tc.changeset), tc.encodedSize, "encoded size mismatch")
// check the encoded bytes
encodedBytes, err := MarshalChangeset(tc.changeset)
require.NoError(t, err, "marshal error")
if len(tc.encodedBytes) != 0 {
require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch")
}
// check the unmarshaled changeset
cs := corestore.NewChangeset()
require.NoError(t, UnmarshalChangeset(cs, encodedBytes), "unmarshal error")
require.Equal(t, len(tc.changeset.Changes), len(cs.Changes), "unmarshaled changeset store size mismatch")
for i, changes := range tc.changeset.Changes {
require.Equal(t, changes.Actor, cs.Changes[i].Actor, "unmarshaled changeset store key mismatch")
require.Equal(t, len(changes.StateChanges), len(cs.Changes[i].StateChanges), "unmarshaled changeset StateChanges size mismatch")
for j, pair := range changes.StateChanges {
require.Equal(t, pair, cs.Changes[i].StateChanges[j], "unmarshaled changeset pair mismatch")
}
}
})
}
}

View File

@ -1,11 +1,20 @@
package migration
import (
"encoding/binary"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/internal/encoding"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
)
const (
@ -13,27 +22,71 @@ const (
defaultChannelBufferSize = 1024
// defaultStorageBufferSize is the default buffer size for the storage snapshotter.
defaultStorageBufferSize = 1024
migrateChangesetKeyFmt = "m/cs_%x" // m/cs_<version>
)
// VersionedChangeset is a pair of version and Changeset.
type VersionedChangeset struct {
Version uint64
Changeset *corestore.Changeset
}
// Manager manages the migration of the whole state from store/v1 to store/v2.
type Manager struct {
logger log.Logger
snapshotsManager *snapshots.Manager
storageSnapshotter snapshots.StorageSnapshotter
commitSnapshotter snapshots.CommitSnapshotter
stateStorage *storage.StorageStore
stateCommitment *commitment.CommitStore
db store.RawDB
mtx sync.Mutex // mutex for migratedVersion
migratedVersion uint64
chChangeset <-chan *VersionedChangeset
chDone <-chan struct{}
}
// NewManager returns a new Manager.
func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager {
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
return &Manager{
logger: logger,
snapshotsManager: sm,
storageSnapshotter: ss,
commitSnapshotter: cs,
logger: logger,
snapshotsManager: sm,
stateStorage: ss,
stateCommitment: sc,
db: db,
}
}
// Start starts the whole migration process.
// It migrates the whole state at the given version to the new store/v2 (both SC and SS).
// It also catches up the Changesets which are committed while the migration is in progress.
// `chChangeset` is the channel to receive the committed Changesets from the RootStore.
// `chDone` is the channel to receive the done signal from the RootStore.
// NOTE: It should be called by the RootStore, running in the background.
func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone <-chan struct{}) error {
m.chChangeset = chChangeset
m.chDone = chDone
go func() {
if err := m.writeChangeset(); err != nil {
m.logger.Error("failed to write changeset", "err", err)
}
}()
if err := m.Migrate(version); err != nil {
return fmt.Errorf("failed to migrate state: %w", err)
}
return m.Sync()
}
// GetStateCommitment returns the state commitment.
func (m *Manager) GetStateCommitment() *commitment.CommitStore {
return m.stateCommitment
}
// Migrate migrates the whole state at the given height to the new store/v2.
func (m *Manager) Migrate(height uint64) error {
// create the migration stream and snapshot,
@ -49,13 +102,106 @@ func (m *Manager) Migrate(height uint64) error {
eg := new(errgroup.Group)
eg.Go(func() error {
return m.storageSnapshotter.Restore(height, chStorage)
return m.stateStorage.Restore(height, chStorage)
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage)
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
return err
})
return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}
m.mtx.Lock()
m.migratedVersion = height
m.mtx.Unlock()
return nil
}
// writeChangeset writes the Changeset to the db.
func (m *Manager) writeChangeset() error {
for vc := range m.chChangeset {
cs := vc.Changeset
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, vc.Version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := encoding.MarshalChangeset(cs)
if err != nil {
return fmt.Errorf("failed to marshal changeset: %w", err)
}
batch := m.db.NewBatch()
defer batch.Close()
if err := batch.Set(csKey, csBytes); err != nil {
return fmt.Errorf("failed to write changeset to db.Batch: %w", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write changeset to db: %w", err)
}
}
return nil
}
// GetMigratedVersion returns the migrated version.
// It is used to check the migrated version in the RootStore.
func (m *Manager) GetMigratedVersion() uint64 {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.migratedVersion
}
// Sync catches up the Changesets which are committed while the migration is in progress.
// It should be called after the migration is done.
func (m *Manager) Sync() error {
version := m.GetMigratedVersion()
if version == 0 {
return fmt.Errorf("migration is not done yet")
}
version += 1
for {
select {
case <-m.chDone:
return nil
default:
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := m.db.Get(csKey)
if err != nil {
return fmt.Errorf("failed to get changeset from db: %w", err)
}
if csBytes == nil {
// wait for the next changeset
time.Sleep(100 * time.Millisecond)
continue
}
cs := corestore.NewChangeset()
if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}
if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
}
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to write changeset to storage: %w", err)
}
m.mtx.Lock()
m.migratedVersion = version
m.mtx.Unlock()
version += 1
}
}
}

View File

@ -50,7 +50,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}
func TestMigrateState(t *testing.T) {
@ -78,17 +78,17 @@ func TestMigrateState(t *testing.T) {
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.commitSnapshotter.(*commitment.CommitStore).Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)
@ -96,7 +96,7 @@ func TestMigrateState(t *testing.T) {
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.storageSnapshotter.(*storage.StorageStore).Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}

162
store/root/migrate_test.go Normal file
View File

@ -0,0 +1,162 @@
package root
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/suite"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/migration"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
var (
storeKeys = []string{"store1", "store2", "store3"}
)
type MigrateStoreTestSuite struct {
suite.Suite
rootStore store.RootStore
}
func TestMigrateStoreTestSuite(t *testing.T) {
suite.Run(t, &MigrateStoreTestSuite{})
}
func (s *MigrateStoreTestSuite) SetupTest() {
testLog := log.NewTestLogger(s.T())
nopLog := log.NewNopLogger()
mdb := dbm.NewMemDB()
multiTrees := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey))
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig())
}
orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog)
s.Require().NoError(err)
// apply changeset against the original store
toVersion := uint64(200)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
s.Require().NoError(orgSC.WriteBatch(cs))
_, err = orgSC.Commit(version)
s.Require().NoError(err)
}
// create a new storage and commitment stores
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
ss := storage.NewStorageStore(sqliteDB, nil, testLog)
multiTrees1 := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), nopLog, iavl.DefaultConfig())
}
sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog)
s.Require().NoError(err)
snapshotsStore, err := snapshots.NewStore(s.T().TempDir())
s.Require().NoError(err)
snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog)
migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog)
// assume no storage store, simulate the migration process
s.rootStore, err = New(testLog, ss, orgSC, migrationManager, nil)
s.Require().NoError(err)
}
func (s *MigrateStoreTestSuite) TestMigrateState() {
err := s.rootStore.LoadLatestVersion()
s.Require().NoError(err)
originalLatestVersion, err := s.rootStore.GetLatestVersion()
s.Require().NoError(err)
// start the migration process
s.rootStore.StartMigration()
// continue to apply changeset against the original store
latestVersion := originalLatestVersion + 1
keyCount := 10
for ; latestVersion < 2*originalLatestVersion; latestVersion++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", latestVersion, i)), []byte(fmt.Sprintf("value-%d-%d", latestVersion, i)), false)
}
}
_, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
_, err = s.rootStore.Commit(cs)
s.Require().NoError(err)
// check if the migration is completed
ver, err := s.rootStore.GetStateStorage().GetLatestVersion()
s.Require().NoError(err)
if ver == latestVersion {
break
}
// add some delay to simulate the consensus process
time.Sleep(100 * time.Millisecond)
}
// check if the migration is successful
version, err := s.rootStore.GetLatestVersion()
s.Require().NoError(err)
s.Require().Equal(latestVersion, version)
// query against the migrated store
for version := uint64(1); version <= latestVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
targetVersion := version
if version < originalLatestVersion {
targetVersion = originalLatestVersion
}
res, err := s.rootStore.Query([]byte(storeKey), targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true)
s.Require().NoError(err)
s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value)
}
}
}
// prune the old versions
err = s.rootStore.Prune(latestVersion - 1)
s.Require().NoError(err)
// apply changeset against the migrated store
for version := latestVersion + 1; version <= latestVersion+10; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
_, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
_, err = s.rootStore.Commit(cs)
s.Require().NoError(err)
}
version, err = s.rootStore.GetLatestVersion()
s.Require().NoError(err)
s.Require().Equal(latestVersion+10, version)
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"slices"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -14,6 +15,7 @@ import (
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/metrics"
"cosmossdk.io/store/v2/migration"
"cosmossdk.io/store/v2/proof"
)
@ -27,8 +29,8 @@ type Store struct {
logger log.Logger
initialVersion uint64
// stateStore reflects the state storage backend
stateStore store.VersionedDatabase
// stateStorage reflects the state storage backend
stateStorage store.VersionedDatabase
// stateCommitment reflects the state commitment (SC) backend
stateCommitment store.Committer
@ -44,30 +46,43 @@ type Store struct {
// telemetry reflects a telemetry agent responsible for emitting metrics (if any)
telemetry metrics.StoreMetrics
// Migration related fields
// migrationManager reflects the migration manager used to migrate state from v1 to v2
migrationManager *migration.Manager
// chChangeset reflects the channel used to send the changeset to the migration manager
chChangeset chan *migration.VersionedChangeset
// chDone reflects the channel used to signal the migration manager that the migration
// is done
chDone chan struct{}
// isMigrating reflects whether the store is currently migrating
isMigrating bool
}
func New(
logger log.Logger,
ss store.VersionedDatabase,
sc store.Committer,
mm *migration.Manager,
m metrics.StoreMetrics,
) (store.RootStore, error) {
return &Store{
logger: logger.With("module", "root_store"),
initialVersion: 1,
stateStore: ss,
stateCommitment: sc,
telemetry: m,
logger: logger.With("module", "root_store"),
initialVersion: 1,
stateStorage: ss,
stateCommitment: sc,
migrationManager: mm,
telemetry: m,
}, nil
}
// Close closes the store and resets all internal fields. Note, Close() is NOT
// idempotent and should only be called once.
func (s *Store) Close() (err error) {
err = errors.Join(err, s.stateStore.Close())
err = errors.Join(err, s.stateStorage.Close())
err = errors.Join(err, s.stateCommitment.Close())
s.stateStore = nil
s.stateStorage = nil
s.stateCommitment = nil
s.lastCommitInfo = nil
s.commitHeader = nil
@ -107,7 +122,7 @@ func (s *Store) StateAt(v uint64) (corestore.ReaderMap, error) {
}
func (s *Store) GetStateStorage() store.VersionedDatabase {
return s.stateStore
return s.stateStorage
}
func (s *Store) GetStateCommitment() store.Committer {
@ -116,32 +131,17 @@ func (s *Store) GetStateCommitment() store.Committer {
// LastCommitID returns a CommitID based off of the latest internal CommitInfo.
// If an internal CommitInfo is not set, a new one will be returned with only the
// latest version set, which is based off of the SS view.
// latest version set, which is based off of the SC view.
func (s *Store) LastCommitID() (proof.CommitID, error) {
if s.lastCommitInfo != nil {
return s.lastCommitInfo.CommitID(), nil
}
// XXX/TODO: We cannot use SS to get the latest version when lastCommitInfo
// is nil if SS is flushed asynchronously. This is because the latest version
// in SS might not be the latest version in the SC stores.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/17314
latestVersion, err := s.stateStore.GetLatestVersion()
latestVersion, err := s.stateCommitment.GetLatestVersion()
if err != nil {
return proof.CommitID{}, err
}
// sanity check: ensure integrity of latest version against SC
scVersion, err := s.stateCommitment.GetLatestVersion()
if err != nil {
return proof.CommitID{}, err
}
if scVersion != latestVersion {
return proof.CommitID{}, fmt.Errorf("SC and SS version mismatch; got: %d, expected: %d", scVersion, latestVersion)
}
return proof.CommitID{Version: latestVersion}, nil
}
@ -163,7 +163,7 @@ func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) (
defer s.telemetry.MeasureSince(now, "root_store", "query")
}
val, err := s.stateStore.Get(storeKey, version, key)
val, err := s.stateStorage.Get(storeKey, version, key)
if err != nil || val == nil {
// fallback to querying SC backend if not found in SS backend
//
@ -256,6 +256,21 @@ func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
}
if s.workingHash == nil {
// if migration is in progress, send the changeset to the migration manager
if s.isMigrating {
// if the migration manager has already migrated to the version, close the
// channels and replace the state commitment
if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version {
close(s.chDone)
close(s.chChangeset)
s.isMigrating = false
s.stateCommitment = s.migrationManager.GetStateCommitment()
s.logger.Info("migration completed", "version", s.lastCommitInfo.Version)
} else {
s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs}
}
}
if err := s.writeSC(cs); err != nil {
return nil, err
}
@ -291,7 +306,13 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {
// commit SS async
eg.Go(func() error {
if err := s.stateStore.ApplyChangeset(version, cs); err != nil {
// if we're migrating, we don't want to commit to the state storage
// to avoid parallel writes
if s.isMigrating {
return nil
}
if err := s.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to commit SS: %w", err)
}
@ -327,7 +348,7 @@ func (s *Store) Prune(version uint64) error {
defer s.telemetry.MeasureSince(now, "root_store", "prune")
}
if err := s.stateStore.Prune(version); err != nil {
if err := s.stateStorage.Prune(version); err != nil {
return fmt.Errorf("failed to prune SS store: %w", err)
}
@ -338,6 +359,40 @@ func (s *Store) Prune(version uint64) error {
return nil
}
// StartMigration starts the migration process and initializes the channels.
// An error is returned if migration is already in progress.
// NOTE: This method should only be called once after loadVersion.
func (s *Store) StartMigration() error {
if s.isMigrating {
return fmt.Errorf("migration already in progress")
}
// buffer at most 1 changeset, if the receiver is behind attempting to buffer
// more than 1 will block.
s.chChangeset = make(chan *migration.VersionedChangeset, 1)
// it is used to signal the migration manager that the migration is done
s.chDone = make(chan struct{})
s.isMigrating = true
mtx := sync.Mutex{}
mtx.Lock()
go func() {
version := s.lastCommitInfo.Version
s.logger.Info("starting migration", "version", version)
mtx.Unlock()
if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil {
s.logger.Error("failed to start migration", "err", err)
}
}()
// wait for the migration manager to start
mtx.Lock()
defer mtx.Unlock()
return nil
}
// writeSC accepts a Changeset and writes that as a batch to the underlying SC
// tree, which allows us to retrieve the working hash of the SC tree. Finally,
// we construct a *CommitInfo and set that as lastCommitInfo. Note, this should

View File

@ -52,7 +52,7 @@ func (s *RootStoreTestSuite) SetupTest() {
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog)
s.Require().NoError(err)
rs, err := New(noopLog, ss, sc, nil)
rs, err := New(noopLog, ss, sc, nil, nil)
s.Require().NoError(err)
s.rootStore = rs
@ -68,7 +68,7 @@ func (s *RootStoreTestSuite) TestGetStateCommitment() {
}
func (s *RootStoreTestSuite) TestGetStateStorage() {
s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStore)
s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStorage)
}
func (s *RootStoreTestSuite) TestSetInitialVersion() {

View File

@ -232,6 +232,17 @@ func TestManager_Restore(t *testing.T) {
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.NoError(t, err)
// Feeding the chunks should work
for i, chunk := range chunks {
done, err := manager.RestoreChunk(chunk)
require.NoError(t, err)
if i == len(chunks)-1 {
assert.True(t, done)
} else {
assert.False(t, done)
}
}
}
func TestManager_TakeError(t *testing.T) {

View File

@ -41,8 +41,9 @@ func (b *Batch) Size() int {
return b.batch.Len()
}
func (b *Batch) Reset() {
func (b *Batch) Reset() error {
b.batch.Reset()
return nil
}
func (b *Batch) set(storeKey []byte, tombstone uint64, key, value []byte) error {

View File

@ -44,8 +44,9 @@ func (b Batch) Size() int {
return len(b.batch.Data())
}
func (b Batch) Reset() {
func (b Batch) Reset() error {
b.batch.Clear()
return nil
}
func (b Batch) Set(storeKey []byte, key, value []byte) error {

View File

@ -23,19 +23,21 @@ type batchOp struct {
}
type Batch struct {
db *sql.DB
tx *sql.Tx
ops []batchOp
size int
version uint64
}
func NewBatch(storage *sql.DB, version uint64) (*Batch, error) {
tx, err := storage.Begin()
func NewBatch(db *sql.DB, version uint64) (*Batch, error) {
tx, err := db.Begin()
if err != nil {
return nil, fmt.Errorf("failed to create SQL transaction: %w", err)
}
return &Batch{
db: db,
tx: tx,
ops: make([]batchOp, 0),
version: version,
@ -46,10 +48,18 @@ func (b *Batch) Size() int {
return b.size
}
func (b *Batch) Reset() {
func (b *Batch) Reset() error {
b.ops = nil
b.ops = make([]batchOp, 0)
b.size = 0
tx, err := b.db.Begin()
if err != nil {
return err
}
b.tx = tx
return nil
}
func (b *Batch) Set(storeKey []byte, key, value []byte) error {

View File

@ -130,11 +130,13 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat
if err := b.Set(kvPair.Actor, kv.Key, kv.Value); err != nil {
return err
}
if b.Size() > defaultBatchBufferSize {
if err := b.Write(); err != nil {
return err
}
if err := b.Reset(); err != nil {
return err
}
}
}
}
@ -145,7 +147,7 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat
}
}
return ss.db.SetLatestVersion(version)
return nil
}
// Close closes the store.

View File

@ -72,6 +72,12 @@ type RootStore interface {
// old versions of the RootStore by the CLI.
Prune(version uint64) error
// StartMigration starts a migration process to migrate the RootStore/v1 to the
// SS and SC backends of store/v2.
// It runs in a separate goroutine and replaces the current RootStore with the
// migrated new backends once the migration is complete.
StartMigration() error
// SetMetrics sets the telemetry handler on the RootStore.
SetMetrics(m metrics.Metrics)