From 10877d2e662842af06f67ce04f7f3da318d2bb36 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 26 Apr 2024 20:24:13 +1000 Subject: [PATCH] feat(events): adjust indexes in event index db to match query patterns Introduces a v4 migration that just adjusts indexes. Copies some improvements from https://github.com/filecoin-project/lotus/pull/11723 Closes: https://github.com/filecoin-project/lotus/issues/11909 --- chain/events/filter/index.go | 143 ++++++++++++++++++++++++++--------- 1 file changed, 107 insertions(+), 36 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 296d7c1d5..a9651224d 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -26,7 +26,7 @@ var pragmas = []string{ "PRAGMA temp_store = memory", "PRAGMA mmap_size = 30000000000", "PRAGMA page_size = 32768", - "PRAGMA auto_vacuum = NONE", + "PRAGMA auto_vacuum = NONE", // not useful until we implement GC "PRAGMA automatic_index = OFF", "PRAGMA journal_mode = WAL", "PRAGMA read_uncommitted = ON", @@ -45,8 +45,10 @@ var ddls = []string{ reverted INTEGER NOT NULL )`, - createIndexEventHeightTipsetKey, createIndexEventEmitterAddr, + createIndexEventTipsetKeyCid, + createIndexEventHeight, + createIndexEventReverted, `CREATE TABLE IF NOT EXISTS event_entry ( event_id INTEGER, @@ -57,7 +59,9 @@ var ddls = []string{ value BLOB NOT NULL )`, - createIndexEventEntryKey, + createIndexEventEntryIndexedKey, + createIndexEventEntryCodecValue, + createIndexEventEntryEventId, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( @@ -67,6 +71,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, } var ( @@ -74,7 +79,7 @@ var ( ) const ( - schemaVersion = 3 + schemaVersion = 4 eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` @@ -82,9 +87,14 @@ const ( revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - createIndexEventHeightTipsetKey = `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)` - createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` - createIndexEventEntryKey = `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)` + createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` + createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` + createIndexEventHeight = `CREATE INDEX IF NOT EXISTS event_height ON event (height);` + createIndexEventReverted = `CREATE INDEX IF NOT EXISTS event_reverted ON event (reverted);` + + createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);` + createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);` + createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);` ) type EventIndex struct { @@ -237,7 +247,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C if err != nil { return xerrors.Errorf("rows affected: %w", err) } - log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected) + log.Infof("Cleaned up %d entries that had deleted events\n", nrRowsAffected) // drop the temporary indices after the migration _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid") @@ -249,11 +259,9 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err) } - // create the final index on event.height and event.tipset_key - _, err = tx.ExecContext(ctx, createIndexEventHeightTipsetKey) - if err != nil { - return xerrors.Errorf("create index height_tipset_key: %w", err) - } + // original v2 migration introduced an index: + // CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key) + // which has subsequently been removed in v4, so it's omitted here // increment the schema version to 2 in _meta table. _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (2)") @@ -266,20 +274,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C return xerrors.Errorf("commit transaction: %w", err) } - // during the migration, we have likely increased the WAL size a lot, so lets do some - // simple DB administration to free up space (VACUUM followed by truncating the WAL file) - // as this would be a good time to do it when no other writes are happening - log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration") - _, err = ei.db.ExecContext(ctx, "VACUUM") - if err != nil { - log.Warnf("error vacuuming database: %s", err) - } - _, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)") - if err != nil { - log.Warnf("error checkpointing wal: %s", err) - } - - log.Infof("Successfully migrated events to version 2 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 1 to version 2 in %s", time.Since(now)) return nil } @@ -301,11 +296,9 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { return xerrors.Errorf("create index event_emitter_addr: %w", err) } - // create index on event_entry.key index. - _, err = tx.ExecContext(ctx, createIndexEventEntryKey) - if err != nil { - return xerrors.Errorf("create index event_entry_key_index: %w", err) - } + // original v3 migration introduced an index: + // CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key) + // which has subsequently been removed in v4, so it's omitted here // increment the schema version to 3 in _meta table. _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (3)") @@ -317,10 +310,78 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { if err != nil { return xerrors.Errorf("commit transaction: %w", err) } + log.Infof("Successfully migrated event index from version 2 to version 3 in %s", time.Since(now)) + return nil +} + +// migrateToVersion4 migrates the schema from version 3 to version 4 by adjusting indexes to match +// the query patterns of the event filter. +// +// First it drops indexes introduced in previous migrations: +// 1. the index on the event.height and event.tipset_key columns +// 2. the index on the event_entry.key column +// +// And then creating the following indices: +// 1. an index on the event.tipset_key_cid column +// 2. an index on the event.height column +// 3. an index on the event.reverted column +// 4. an index on the event_entry.indexed and event_entry.key columns +// 5. an index on the event_entry.codec and event_entry.value columns +// 6. an index on the event_entry.event_id column +func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { + now := time.Now() + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + for _, create := range []struct { + desc string + query string + }{ + {"drop index height_tipset_key", "DROP INDEX IF EXISTS height_tipset_key;"}, + {"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"}, + {"create index event_tipset_key_cid", createIndexEventTipsetKeyCid}, + {"create index event_height", createIndexEventHeight}, + {"create index event_reverted", createIndexEventReverted}, + {"create index event_entry_indexed_key", createIndexEventEntryIndexedKey}, + {"create index event_entry_codec_value", createIndexEventEntryCodecValue}, + {"create index event_entry_event_id", createIndexEventEntryEventId}, + } { + _, err = tx.ExecContext(ctx, create.query) + if err != nil { + return xerrors.Errorf("%s: %w", create.desc, err) + } + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + ei.vacuumDBAndCheckpointWAL(ctx) + log.Infof("Successfully migrated events to version 3 in %s", time.Since(now)) return nil } +func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) { + // During the large migrations, we have likely increased the WAL size a lot, so lets do some + // simple DB administration to free up space (VACUUM followed by truncating the WAL file) + // as this would be a good time to do it when no other writes are happening. + log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration") + _, err := ei.db.ExecContext(ctx, "VACUUM") + if err != nil { + log.Warnf("error vacuuming database: %s", err) + } + _, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)") + if err != nil { + log.Warnf("error checkpointing wal: %s", err) + } +} + func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) { db, err := sql.Open("sqlite3", path+"?mode=rwc") if err != nil { @@ -358,25 +419,35 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor } if version == 1 { - log.Infof("upgrading event index from version 1 to version 2") + log.Infof("Upgrading event index from version 1 to version 2") err = eventIndex.migrateToVersion2(ctx, chainStore) if err != nil { _ = db.Close() - return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err) + return nil, xerrors.Errorf("could not migrate event index schema from version 1 to version 2: %w", err) } version = 2 } if version == 2 { - log.Infof("upgrading event index from version 2 to version 3") + log.Infof("Upgrading event index from version 2 to version 3") err = eventIndex.migrateToVersion3(ctx) if err != nil { _ = db.Close() - return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err) + return nil, xerrors.Errorf("could not migrate event index schema from version 2 to version 3: %w", err) } version = 3 } + if version == 3 { + log.Infof("Upgrading event index from version 3 to version 4") + err = eventIndex.migrateToVersion4(ctx) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate event index schema from version 3 to version 4: %w", err) + } + version = 4 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)