diff --git a/store/v2/migration/manager.go b/store/v2/migration/manager.go index 73976e2471..b769e0ab6f 100644 --- a/store/v2/migration/manager.go +++ b/store/v2/migration/manager.go @@ -181,14 +181,13 @@ func (m *Manager) writeChangeset() error { } batch := m.db.NewBatch() - defer batch.Close() - if err := batch.Set(csKey, csBytes); err != nil { return fmt.Errorf("failed to write changeset to db.Batch: %w", err) } if err := batch.Write(); err != nil { return fmt.Errorf("failed to write changeset to db: %w", err) } + batch.Close() } return nil diff --git a/store/v2/root/migrate_test.go b/store/v2/root/migrate_test.go index 5999fd053e..e80f41fd1b 100644 --- a/store/v2/root/migrate_test.go +++ b/store/v2/root/migrate_test.go @@ -87,8 +87,16 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { originalLatestVersion, err := s.rootStore.GetLatestVersion() s.Require().NoError(err) - // start the migration process - s.Require().NoError(s.rootStore.StartMigration()) + // check if the Query fallback to the original SC + for version := uint64(1); version <= originalLatestVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < 10; i++ { + res, err := s.rootStore.Query([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value) + } + } + } // continue to apply changeset against the original store latestVersion := originalLatestVersion + 1 diff --git a/store/v2/root/store.go b/store/v2/root/store.go index 8bbcc71249..dc94f7cfe9 100644 --- a/store/v2/root/store.go +++ b/store/v2/root/store.go @@ -59,6 +59,9 @@ type Store struct { isMigrating bool } +// New creates a new root Store instance. +// +// NOTE: The migration manager is optional and can be nil if no migration is required. func New( logger log.Logger, ss store.VersionedDatabase, @@ -73,6 +76,7 @@ func New( stateCommitment: sc, migrationManager: mm, telemetry: m, + isMigrating: mm != nil, }, nil } @@ -163,24 +167,29 @@ func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) ( defer s.telemetry.MeasureSince(now, "root_store", "query") } - val, err := s.stateStorage.Get(storeKey, version, key) - if err != nil || val == nil { - // fallback to querying SC backend if not found in SS backend - // - // Note, this should only used during migration, i.e. while SS and IAVL v2 - // are being asynchronously synced. + var val []byte + var err error + if s.isMigrating { // if we're migrating, we need to query the SC backend + val, err = s.stateCommitment.Get(storeKey, version, key) + if err != nil { + return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", err) + } + } else { + val, err = s.stateStorage.Get(storeKey, version, key) + if err != nil { + return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err) + } if val == nil { + // fallback to querying SC backend if not found in SS backend + // + // Note, this should only used during migration, i.e. while SS and IAVL v2 + // are being asynchronously synced. bz, scErr := s.stateCommitment.Get(storeKey, version, key) if scErr != nil { return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", scErr) } - val = bz } - - if err != nil { - return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err) - } } result := store.QueryResult{ @@ -235,6 +244,11 @@ func (s *Store) loadVersion(v uint64) error { // set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1 s.lastCommitInfo = &proof.CommitInfo{Version: v} + // if we're migrating, we need to start the migration process + if s.isMigrating { + s.startMigration() + } + return nil } @@ -289,20 +303,18 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { eg := new(errgroup.Group) - // commit SS async - eg.Go(func() error { - // if we're migrating, we don't want to commit to the state storage - // to avoid parallel writes - if s.isMigrating { + // if we're migrating, we don't want to commit to the state storage to avoid + // parallel writes + if !s.isMigrating { + // commit SS async + eg.Go(func() error { + if err := s.stateStorage.ApplyChangeset(version, cs); err != nil { + return fmt.Errorf("failed to commit SS: %w", err) + } + return nil - } - - if err := s.stateStorage.ApplyChangeset(version, cs); err != nil { - return fmt.Errorf("failed to commit SS: %w", err) - } - - return nil - }) + }) + } // commit SC async eg.Go(func() error { @@ -344,22 +356,19 @@ func (s *Store) Prune(version uint64) error { return nil } -// StartMigration starts the migration process and initializes the channels. -// An error is returned if migration is already in progress. +// startMigration starts a migration process to migrate the RootStore/v1 to the +// SS and SC backends of store/v2 and initializes the channels. +// It runs in a separate goroutine and replaces the current RootStore with the +// migrated new backends once the migration is complete. +// // NOTE: This method should only be called once after loadVersion. -func (s *Store) StartMigration() error { - if s.isMigrating { - return fmt.Errorf("migration already in progress") - } - +func (s *Store) startMigration() { // buffer at most 1 changeset, if the receiver is behind attempting to buffer // more than 1 will block. s.chChangeset = make(chan *migration.VersionedChangeset, 1) // it is used to signal the migration manager that the migration is done s.chDone = make(chan struct{}) - s.isMigrating = true - mtx := sync.Mutex{} mtx.Lock() go func() { @@ -374,8 +383,6 @@ func (s *Store) StartMigration() error { // wait for the migration manager to start mtx.Lock() defer mtx.Unlock() - - return nil } // writeSC accepts a Changeset and writes that as a batch to the underlying SC diff --git a/store/v2/store.go b/store/v2/store.go index bffede6ec3..e8437eee75 100644 --- a/store/v2/store.go +++ b/store/v2/store.go @@ -72,12 +72,6 @@ type RootStore interface { // old versions of the RootStore by the CLI. Prune(version uint64) error - // StartMigration starts a migration process to migrate the RootStore/v1 to the - // SS and SC backends of store/v2. - // It runs in a separate goroutine and replaces the current RootStore with the - // migrated new backends once the migration is complete. - StartMigration() error - // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics)