From d5f4d807d72e3be661463c529b1dcfa2c55a0b9b Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Wed, 13 Mar 2024 18:22:25 +0000 Subject: [PATCH] Prevent DDL re-execution during event index schema migrations This enhancement optimizes the schema migration process for the event index by preventing the redundant execution of Data Definition Language (DDL) statements that define the event schema. Traditionally, these DDL statements were grouped into a single slice, reflecting the most current version of the event index schema. With each migration, this slice was updated to the latest schema iteration, executing all statements in bulk. Initially, this method sufficed as migrations were focused on adding indices to existing table columns. However, as the database schema evolves to meet new requirements, such as the forthcoming migrations that involve changes to table schemas (notably, indexing events by emitter actor ID instead of addresses), the prior approach of bulk execution of DDL statements becomes unsuitable: it will no longer be safe to repeatedly execute DDL statements in previous migrations, because the upcoming one changes `event` table column structure. To address this issue, the work here has isolated the event index schema migrations on a per-version basis. This adjustment ensures that only the necessary DDL statements are executed during each migration, avoiding the inefficiencies and potential errors associated with redundant executions. The work here should also minimize the refactoring required for future migrations, facilitating a smoother introduction of significant schema updates. --- chain/events/filter/index.go | 137 ++++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 50 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 49be57c79..83998767e 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -45,8 +45,8 @@ var ddls = []string{ reverted INTEGER NOT NULL )`, - `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`, - `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`, + createIndexEventHeightTipsetKey, + createIndexEventEmitterAddr, `CREATE TABLE IF NOT EXISTS event_entry ( event_id INTEGER, @@ -57,11 +57,11 @@ var ddls = []string{ value BLOB NOT NULL )`, - `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)`, + createIndexEventEntryKey, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( - version UINT64 NOT NULL UNIQUE + version UINT64 NOT NULL UNIQUE )`, `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, @@ -81,6 +81,10 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + + createIndexEventHeightTipsetKey = `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)` + createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` + createIndexEventEntryKey = `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)` ) type EventIndex struct { @@ -125,43 +129,43 @@ func (ei *EventIndex) initStatements() (err error) { func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error { now := time.Now() - tx, err := ei.db.Begin() + tx, err := ei.db.BeginTx(ctx, nil) if err != nil { return xerrors.Errorf("begin transaction: %w", err) } // rollback the transaction (a no-op if the transaction was already committed) - defer tx.Rollback() //nolint:errcheck + defer func() { _ = tx.Rollback() }() // create some temporary indices to help speed up the migration - _, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)") + _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)") if err != nil { return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err) } - _, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)") + _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)") if err != nil { return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err) } - stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?") + stmtDeleteOffChainEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid!=? and height=?") if err != nil { return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) } - stmtSelectEvent, err := tx.Prepare("SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1") + stmtSelectEvent, err := tx.PrepareContext(ctx, "SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1") if err != nil { return xerrors.Errorf("prepare stmtSelectEvent: %w", err) } - stmtDeleteEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid=? AND id 0 { - subclauses := []string{} + subclauses := make([]string, 0, len(f.addresses)) for _, addr := range f.addresses { subclauses = append(subclauses, "emitter_addr=?") values = append(values, addr.Bytes()) @@ -543,7 +580,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) values = append(values, key) - subclauses := []string{} + subclauses := make([]string, 0, len(vals)) for _, val := range vals { subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias)) values = append(values, val.Value, val.Codec)