chore(store/v2): cleanup the migration API (#20298)
This commit is contained in:
parent
d90f552f62
commit
559f78403a
@ -181,14 +181,13 @@ func (m *Manager) writeChangeset() error {
|
||||
}
|
||||
|
||||
batch := m.db.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
if err := batch.Set(csKey, csBytes); err != nil {
|
||||
return fmt.Errorf("failed to write changeset to db.Batch: %w", err)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return fmt.Errorf("failed to write changeset to db: %w", err)
|
||||
}
|
||||
batch.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -87,8 +87,16 @@ func (s *MigrateStoreTestSuite) TestMigrateState() {
|
||||
originalLatestVersion, err := s.rootStore.GetLatestVersion()
|
||||
s.Require().NoError(err)
|
||||
|
||||
// start the migration process
|
||||
s.Require().NoError(s.rootStore.StartMigration())
|
||||
// check if the Query fallback to the original SC
|
||||
for version := uint64(1); version <= originalLatestVersion; version++ {
|
||||
for _, storeKey := range storeKeys {
|
||||
for i := 0; i < 10; i++ {
|
||||
res, err := s.rootStore.Query([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i)), true)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// continue to apply changeset against the original store
|
||||
latestVersion := originalLatestVersion + 1
|
||||
|
||||
@ -59,6 +59,9 @@ type Store struct {
|
||||
isMigrating bool
|
||||
}
|
||||
|
||||
// New creates a new root Store instance.
|
||||
//
|
||||
// NOTE: The migration manager is optional and can be nil if no migration is required.
|
||||
func New(
|
||||
logger log.Logger,
|
||||
ss store.VersionedDatabase,
|
||||
@ -73,6 +76,7 @@ func New(
|
||||
stateCommitment: sc,
|
||||
migrationManager: mm,
|
||||
telemetry: m,
|
||||
isMigrating: mm != nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -163,24 +167,29 @@ func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) (
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "query")
|
||||
}
|
||||
|
||||
val, err := s.stateStorage.Get(storeKey, version, key)
|
||||
if err != nil || val == nil {
|
||||
// fallback to querying SC backend if not found in SS backend
|
||||
//
|
||||
// Note, this should only used during migration, i.e. while SS and IAVL v2
|
||||
// are being asynchronously synced.
|
||||
var val []byte
|
||||
var err error
|
||||
if s.isMigrating { // if we're migrating, we need to query the SC backend
|
||||
val, err = s.stateCommitment.Get(storeKey, version, key)
|
||||
if err != nil {
|
||||
return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", err)
|
||||
}
|
||||
} else {
|
||||
val, err = s.stateStorage.Get(storeKey, version, key)
|
||||
if err != nil {
|
||||
return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err)
|
||||
}
|
||||
if val == nil {
|
||||
// fallback to querying SC backend if not found in SS backend
|
||||
//
|
||||
// Note, this should only used during migration, i.e. while SS and IAVL v2
|
||||
// are being asynchronously synced.
|
||||
bz, scErr := s.stateCommitment.Get(storeKey, version, key)
|
||||
if scErr != nil {
|
||||
return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", scErr)
|
||||
}
|
||||
|
||||
val = bz
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
result := store.QueryResult{
|
||||
@ -235,6 +244,11 @@ func (s *Store) loadVersion(v uint64) error {
|
||||
// set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1
|
||||
s.lastCommitInfo = &proof.CommitInfo{Version: v}
|
||||
|
||||
// if we're migrating, we need to start the migration process
|
||||
if s.isMigrating {
|
||||
s.startMigration()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -289,20 +303,18 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {
|
||||
|
||||
eg := new(errgroup.Group)
|
||||
|
||||
// commit SS async
|
||||
eg.Go(func() error {
|
||||
// if we're migrating, we don't want to commit to the state storage
|
||||
// to avoid parallel writes
|
||||
if s.isMigrating {
|
||||
// if we're migrating, we don't want to commit to the state storage to avoid
|
||||
// parallel writes
|
||||
if !s.isMigrating {
|
||||
// commit SS async
|
||||
eg.Go(func() error {
|
||||
if err := s.stateStorage.ApplyChangeset(version, cs); err != nil {
|
||||
return fmt.Errorf("failed to commit SS: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.stateStorage.ApplyChangeset(version, cs); err != nil {
|
||||
return fmt.Errorf("failed to commit SS: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// commit SC async
|
||||
eg.Go(func() error {
|
||||
@ -344,22 +356,19 @@ func (s *Store) Prune(version uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartMigration starts the migration process and initializes the channels.
|
||||
// An error is returned if migration is already in progress.
|
||||
// startMigration starts a migration process to migrate the RootStore/v1 to the
|
||||
// SS and SC backends of store/v2 and initializes the channels.
|
||||
// It runs in a separate goroutine and replaces the current RootStore with the
|
||||
// migrated new backends once the migration is complete.
|
||||
//
|
||||
// NOTE: This method should only be called once after loadVersion.
|
||||
func (s *Store) StartMigration() error {
|
||||
if s.isMigrating {
|
||||
return fmt.Errorf("migration already in progress")
|
||||
}
|
||||
|
||||
func (s *Store) startMigration() {
|
||||
// buffer at most 1 changeset, if the receiver is behind attempting to buffer
|
||||
// more than 1 will block.
|
||||
s.chChangeset = make(chan *migration.VersionedChangeset, 1)
|
||||
// it is used to signal the migration manager that the migration is done
|
||||
s.chDone = make(chan struct{})
|
||||
|
||||
s.isMigrating = true
|
||||
|
||||
mtx := sync.Mutex{}
|
||||
mtx.Lock()
|
||||
go func() {
|
||||
@ -374,8 +383,6 @@ func (s *Store) StartMigration() error {
|
||||
// wait for the migration manager to start
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeSC accepts a Changeset and writes that as a batch to the underlying SC
|
||||
|
||||
@ -72,12 +72,6 @@ type RootStore interface {
|
||||
// old versions of the RootStore by the CLI.
|
||||
Prune(version uint64) error
|
||||
|
||||
// StartMigration starts a migration process to migrate the RootStore/v1 to the
|
||||
// SS and SC backends of store/v2.
|
||||
// It runs in a separate goroutine and replaces the current RootStore with the
|
||||
// migrated new backends once the migration is complete.
|
||||
StartMigration() error
|
||||
|
||||
// SetMetrics sets the telemetry handler on the RootStore.
|
||||
SetMetrics(m metrics.Metrics)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user