From 0d7621be564fb27d834158d08e1471f572fc38b7 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 11 Jul 2023 09:10:16 +0000 Subject: [PATCH] Add tmp indices to events table while performing migration to V2 After the migration we also perform db administration on the events DB to reduce its file size on disk. --- chain/events/filter/index.go | 43 +++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 27aec481f..bacba60d7 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -128,6 +128,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C // rollback the transaction (a no-op if the transaction was already committed) defer tx.Rollback() //nolint:errcheck + // create some temporary indices to help speed up the migration + _, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)") + if err != nil { + return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err) + } + _, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)") + if err != nil { + return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err) + } + stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?") if err != nil { return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) @@ -158,12 +168,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C currTs := chainStore.GetHeaviestTipSet() for int64(currTs.Height()) >= minHeight.Int64 { + if currTs.Height()%1000 == 0 { + log.Infof("Migrating height %d (remaining %d)", currTs.Height(), int64(currTs.Height())-minHeight.Int64) + } + tsKey := currTs.Parents() currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey) if err != nil { return xerrors.Errorf("get tipset from key: %w", err) } - log.Debugf("Migrating height %d\n", currTs.Height()) + log.Debugf("Migrating height %d", currTs.Height()) tsKeyCid, err := currTs.Key().Cid() if err != nil { @@ -190,7 +204,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C if !eventId.Valid { continue } - log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height()) + log.Debugf("Deleting all events with id < %d at height %d", eventId.Int64, currTs.Height()) res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64) if err != nil { @@ -201,7 +215,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C if err != nil { return xerrors.Errorf("rows affected: %w", err) } - log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String()) + log.Debugf("deleted %d events from tipset %s", nrRowsAffected, tsKeyCid.String()) } // delete all entries that have an event_id that doesn't exist (since we don't have a foreign @@ -217,11 +231,34 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C } log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected) + // drop the temporary indices after the migration + _, err = tx.Exec("DROP INDEX IF EXISTS tmp_tipset_key_cid") + if err != nil { + return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err) + } + _, err = tx.Exec("DROP INDEX IF EXISTS tmp_height_tipset_key_cid") + if err != nil { + return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) } + // during the migration, we have likely increased the WAL size a lot, so lets do some + // simple DB administration to free up space (VACUUM followed by truncating the WAL file) + // as this would be a good time to do it when no other writes are happening + log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration") + _, err = ei.db.Exec("VACUUM") + if err != nil { + log.Warnf("error vacuuming database: %s", err) + } + _, err = ei.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)") + if err != nil { + log.Warnf("error checkpointing wal: %s", err) + } + log.Infof("Successfully migrated events to version 2 in %s", time.Since(now)) return nil