Merge pull request #11064 from filecoin-project/11056-speedup-event-migration

fix: events: Improve performance of event migration from V1 to V2
This commit is contained in:
Aayush Rajasekaran 2023-07-11 10:23:14 -04:00 committed by GitHub
commit 25c591fd6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -128,6 +128,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
// rollback the transaction (a no-op if the transaction was already committed) // rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck defer tx.Rollback() //nolint:errcheck
// create some temporary indices to help speed up the migration
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)")
if err != nil {
return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err)
}
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)")
if err != nil {
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
}
stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?") stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?")
if err != nil { if err != nil {
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
@ -158,12 +168,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
currTs := chainStore.GetHeaviestTipSet() currTs := chainStore.GetHeaviestTipSet()
for int64(currTs.Height()) >= minHeight.Int64 { for int64(currTs.Height()) >= minHeight.Int64 {
if currTs.Height()%1000 == 0 {
log.Infof("Migrating height %d (remaining %d)", currTs.Height(), int64(currTs.Height())-minHeight.Int64)
}
tsKey := currTs.Parents() tsKey := currTs.Parents()
currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey) currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey)
if err != nil { if err != nil {
return xerrors.Errorf("get tipset from key: %w", err) return xerrors.Errorf("get tipset from key: %w", err)
} }
log.Debugf("Migrating height %d\n", currTs.Height()) log.Debugf("Migrating height %d", currTs.Height())
tsKeyCid, err := currTs.Key().Cid() tsKeyCid, err := currTs.Key().Cid()
if err != nil { if err != nil {
@ -190,7 +204,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
if !eventId.Valid { if !eventId.Valid {
continue continue
} }
log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height()) log.Debugf("Deleting all events with id < %d at height %d", eventId.Int64, currTs.Height())
res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64) res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64)
if err != nil { if err != nil {
@ -201,7 +215,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
if err != nil { if err != nil {
return xerrors.Errorf("rows affected: %w", err) return xerrors.Errorf("rows affected: %w", err)
} }
log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String()) log.Debugf("deleted %d events from tipset %s", nrRowsAffected, tsKeyCid.String())
} }
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign // delete all entries that have an event_id that doesn't exist (since we don't have a foreign
@ -217,11 +231,34 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
} }
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected) log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
// drop the temporary indices after the migration
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_tipset_key_cid")
if err != nil {
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
}
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_height_tipset_key_cid")
if err != nil {
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
}
err = tx.Commit() err = tx.Commit()
if err != nil { if err != nil {
return xerrors.Errorf("commit transaction: %w", err) return xerrors.Errorf("commit transaction: %w", err)
} }
// during the migration, we have likely increased the WAL size a lot, so lets do some
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
// as this would be a good time to do it when no other writes are happening
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
_, err = ei.db.Exec("VACUUM")
if err != nil {
log.Warnf("error vacuuming database: %s", err)
}
_, err = ei.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
if err != nil {
log.Warnf("error checkpointing wal: %s", err)
}
log.Infof("Successfully migrated events to version 2 in %s", time.Since(now)) log.Infof("Successfully migrated events to version 2 in %s", time.Since(now))
return nil return nil