Add tmp indices to events table while performing migration to V2

After the migration we also perform db administration on the
events DB to reduce its file size on disk.
This commit is contained in:
Fridrik Asmundsson 2023-07-11 09:10:16 +00:00
parent d463abae2f
commit 0d7621be56

View File

@ -128,6 +128,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
// rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck
// create some temporary indices to help speed up the migration
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)")
if err != nil {
return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err)
}
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)")
if err != nil {
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
}
stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?")
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
@ -158,12 +168,16 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
currTs := chainStore.GetHeaviestTipSet()
for int64(currTs.Height()) >= minHeight.Int64 {
if currTs.Height()%1000 == 0 {
log.Infof("Migrating height %d (remaining %d)", currTs.Height(), int64(currTs.Height())-minHeight.Int64)
}
tsKey := currTs.Parents()
currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey)
if err != nil {
return xerrors.Errorf("get tipset from key: %w", err)
}
log.Debugf("Migrating height %d\n", currTs.Height())
log.Debugf("Migrating height %d", currTs.Height())
tsKeyCid, err := currTs.Key().Cid()
if err != nil {
@ -190,7 +204,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
if !eventId.Valid {
continue
}
log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height())
log.Debugf("Deleting all events with id < %d at height %d", eventId.Int64, currTs.Height())
res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64)
if err != nil {
@ -201,7 +215,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
if err != nil {
return xerrors.Errorf("rows affected: %w", err)
}
log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String())
log.Debugf("deleted %d events from tipset %s", nrRowsAffected, tsKeyCid.String())
}
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign
@ -217,11 +231,34 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
}
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
// drop the temporary indices after the migration
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_tipset_key_cid")
if err != nil {
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
}
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_height_tipset_key_cid")
if err != nil {
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
// during the migration, we have likely increased the WAL size a lot, so lets do some
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
// as this would be a good time to do it when no other writes are happening
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
_, err = ei.db.Exec("VACUUM")
if err != nil {
log.Warnf("error vacuuming database: %s", err)
}
_, err = ei.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
if err != nil {
log.Warnf("error checkpointing wal: %s", err)
}
log.Infof("Successfully migrated events to version 2 in %s", time.Since(now))
return nil