From 8b4081f8bb792a30ca0517ce14a5fe02e65a0df6 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Thu, 21 Mar 2024 08:12:56 -0400 Subject: [PATCH] fix(store/v2): clean up resources after the migration is completed (#19800) --- store/commitment/iavl/tree.go | 2 +- store/migration/manager.go | 11 +++++++++++ store/root/store.go | 37 +++++++++++++++++++++-------------- store/snapshots/manager.go | 9 ++++++++- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index f4dd62bfc0..944b740859 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -131,5 +131,5 @@ func (t *IavlTree) Import(version uint64) (commitment.Importer, error) { // Close closes the iavl tree. func (t *IavlTree) Close() error { - return nil + return t.tree.Close() } diff --git a/store/migration/manager.go b/store/migration/manager.go index 285709351c..43dc7dffc0 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -205,3 +205,14 @@ func (m *Manager) Sync() error { } } } + +// Close closes the manager. It should be called after the migration is done. +// It will close the db and notify the snapshotsManager that the migration is done. +func (m *Manager) Close() error { + if err := m.db.Close(); err != nil { + return fmt.Errorf("failed to close db: %w", err) + } + m.snapshotsManager.EndMigration(m.stateCommitment) + + return nil +} diff --git a/store/root/store.go b/store/root/store.go index 68944c209d..91d007bd7d 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -256,21 +256,6 @@ func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) { } if s.workingHash == nil { - // if migration is in progress, send the changeset to the migration manager - if s.isMigrating { - // if the migration manager has already migrated to the version, close the - // channels and replace the state commitment - if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { - close(s.chDone) - close(s.chChangeset) - s.isMigrating = false - s.stateCommitment = s.migrationManager.GetStateCommitment() - s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) - } else { - s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} - } - } - if err := s.writeSC(cs); err != nil { return nil, err } @@ -397,7 +382,29 @@ func (s *Store) StartMigration() error { // tree, which allows us to retrieve the working hash of the SC tree. Finally, // we construct a *CommitInfo and set that as lastCommitInfo. Note, this should // only be called once per block! +// If migration is in progress, the changeset is sent to the migration manager. func (s *Store) writeSC(cs *corestore.Changeset) error { + if s.isMigrating { + // if the migration manager has already migrated to the version, close the + // channels and replace the state commitment + if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { + close(s.chDone) + close(s.chChangeset) + s.isMigrating = false + // close the old state commitment and replace it with the new one + if err := s.stateCommitment.Close(); err != nil { + return fmt.Errorf("failed to close the old SC store: %w", err) + } + s.stateCommitment = s.migrationManager.GetStateCommitment() + if err := s.migrationManager.Close(); err != nil { + return fmt.Errorf("failed to close migration manager: %w", err) + } + s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) + } else { + s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} + } + } + if err := s.stateCommitment.WriteBatch(cs); err != nil { return fmt.Errorf("failed to write batch to SC store: %w", err) } diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 4f2ad69d17..634f2d5b5b 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -245,7 +245,7 @@ func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error if err != nil { return err } - defer m.end() + // m.end() will be called by the migration manager with EndMigration(). go func() { if err := m.commitSnapshotter.Snapshot(height, protoWriter); err != nil { @@ -258,6 +258,13 @@ func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error return nil } +// EndMigration ends the migration operation. +// It will replace the current commitSnapshotter with the new one. +func (m *Manager) EndMigration(commitSnapshotter CommitSnapshotter) { + defer m.end() + m.commitSnapshotter = commitSnapshotter +} + // List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations. func (m *Manager) List() ([]*types.Snapshot, error) { return m.store.List()