Prevent DDL re-execution during event index schema migrations
This enhancement optimizes the schema migration process for the event index by preventing the redundant execution of Data Definition Language (DDL) statements that define the event schema. Traditionally, these DDL statements were grouped into a single slice, reflecting the most current version of the event index schema. With each migration, this slice was updated to the latest schema iteration, executing all statements in bulk. Initially, this method sufficed as migrations were focused on adding indices to existing table columns. However, as the database schema evolves to meet new requirements, such as the forthcoming migrations that involve changes to table schemas (notably, indexing events by emitter actor ID instead of addresses), the prior approach of bulk execution of DDL statements becomes unsuitable: it will no longer be safe to repeatedly execute DDL statements in previous migrations, because the upcoming one changes `event` table column structure. To address this issue, the work here has isolated the event index schema migrations on a per-version basis. This adjustment ensures that only the necessary DDL statements are executed during each migration, avoiding the inefficiencies and potential errors associated with redundant executions. The work here should also minimize the refactoring required for future migrations, facilitating a smoother introduction of significant schema updates.
This commit is contained in:
parent
47910cfe82
commit
d5f4d807d7
@ -45,8 +45,8 @@ var ddls = []string{
|
|||||||
reverted INTEGER NOT NULL
|
reverted INTEGER NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
|
||||||
`CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`,
|
createIndexEventHeightTipsetKey,
|
||||||
`CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`,
|
createIndexEventEmitterAddr,
|
||||||
|
|
||||||
`CREATE TABLE IF NOT EXISTS event_entry (
|
`CREATE TABLE IF NOT EXISTS event_entry (
|
||||||
event_id INTEGER,
|
event_id INTEGER,
|
||||||
@ -57,7 +57,7 @@ var ddls = []string{
|
|||||||
value BLOB NOT NULL
|
value BLOB NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
|
||||||
`CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)`,
|
createIndexEventEntryKey,
|
||||||
|
|
||||||
// metadata containing version of schema
|
// metadata containing version of schema
|
||||||
`CREATE TABLE IF NOT EXISTS _meta (
|
`CREATE TABLE IF NOT EXISTS _meta (
|
||||||
@ -81,6 +81,10 @@ const (
|
|||||||
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
|
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
|
||||||
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
|
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
|
||||||
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
||||||
|
|
||||||
|
createIndexEventHeightTipsetKey = `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`
|
||||||
|
createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
|
||||||
|
createIndexEventEntryKey = `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)`
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventIndex struct {
|
type EventIndex struct {
|
||||||
@ -125,43 +129,43 @@ func (ei *EventIndex) initStatements() (err error) {
|
|||||||
func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error {
|
func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
tx, err := ei.db.Begin()
|
tx, err := ei.db.BeginTx(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("begin transaction: %w", err)
|
return xerrors.Errorf("begin transaction: %w", err)
|
||||||
}
|
}
|
||||||
// rollback the transaction (a no-op if the transaction was already committed)
|
// rollback the transaction (a no-op if the transaction was already committed)
|
||||||
defer tx.Rollback() //nolint:errcheck
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
// create some temporary indices to help speed up the migration
|
// create some temporary indices to help speed up the migration
|
||||||
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)")
|
_, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err)
|
return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err)
|
||||||
}
|
}
|
||||||
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)")
|
_, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
|
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?")
|
stmtDeleteOffChainEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid!=? and height=?")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
|
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stmtSelectEvent, err := tx.Prepare("SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1")
|
stmtSelectEvent, err := tx.PrepareContext(ctx, "SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare stmtSelectEvent: %w", err)
|
return xerrors.Errorf("prepare stmtSelectEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stmtDeleteEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid=? AND id<?")
|
stmtDeleteEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid=? AND id<?")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare stmtDeleteEvent: %w", err)
|
return xerrors.Errorf("prepare stmtDeleteEvent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the lowest height tipset
|
// get the lowest height tipset
|
||||||
var minHeight sql.NullInt64
|
var minHeight sql.NullInt64
|
||||||
err = ei.db.QueryRow("SELECT MIN(height) FROM event").Scan(&minHeight)
|
err = ei.db.QueryRowContext(ctx, "SELECT MIN(height) FROM event").Scan(&minHeight)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == sql.ErrNoRows {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -198,7 +202,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
var eventId sql.NullInt64
|
var eventId sql.NullInt64
|
||||||
err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId)
|
err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == sql.ErrNoRows {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return xerrors.Errorf("select event: %w", err)
|
return xerrors.Errorf("select event: %w", err)
|
||||||
@ -224,7 +228,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
|
|
||||||
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign
|
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign
|
||||||
// key constraint that gives us cascading deletes)
|
// key constraint that gives us cascading deletes)
|
||||||
res, err := tx.Exec("DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)")
|
res, err := tx.ExecContext(ctx, "DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("delete event_entry: %w", err)
|
return xerrors.Errorf("delete event_entry: %w", err)
|
||||||
}
|
}
|
||||||
@ -236,15 +240,27 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
|
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
|
||||||
|
|
||||||
// drop the temporary indices after the migration
|
// drop the temporary indices after the migration
|
||||||
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_tipset_key_cid")
|
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err)
|
return xerrors.Errorf("drop index tmp_tipset_key_cid: %w", err)
|
||||||
}
|
}
|
||||||
_, err = tx.Exec("DROP INDEX IF EXISTS tmp_height_tipset_key_cid")
|
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_height_tipset_key_cid")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
|
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the final index on event.height and event.tipset_key
|
||||||
|
_, err = tx.ExecContext(ctx, createIndexEventHeightTipsetKey)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("create index height_tipset_key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// increment the schema version to 2 in _meta table.
|
||||||
|
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (2)")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("increment _meta version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("commit transaction: %w", err)
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
@ -254,11 +270,11 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
|
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
|
||||||
// as this would be a good time to do it when no other writes are happening
|
// as this would be a good time to do it when no other writes are happening
|
||||||
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
|
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
|
||||||
_, err = ei.db.Exec("VACUUM")
|
_, err = ei.db.ExecContext(ctx, "VACUUM")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error vacuuming database: %s", err)
|
log.Warnf("error vacuuming database: %s", err)
|
||||||
}
|
}
|
||||||
_, err = ei.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
|
_, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error checkpointing wal: %s", err)
|
log.Warnf("error checkpointing wal: %s", err)
|
||||||
}
|
}
|
||||||
@ -268,6 +284,43 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// migrateToVersion3 migrates the schema from version 2 to version 3 by creating two indices:
|
||||||
|
// 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key column.
|
||||||
|
func (ei *EventIndex) migrateToVersion3(ctx context.Context) error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
tx, err := ei.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
// create index on event.emitter_addr.
|
||||||
|
_, err = tx.ExecContext(ctx, createIndexEventEmitterAddr)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("create index event_emitter_addr: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create index on event_entry.key index.
|
||||||
|
_, err = tx.ExecContext(ctx, createIndexEventEntryKey)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("create index event_entry_key_index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// increment the schema version to 3 in _meta table.
|
||||||
|
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (3)")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("increment _meta version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
|
}
|
||||||
|
log.Infof("Successfully migrated events to version 3 in %s", time.Since(now))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
|
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
|
||||||
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -283,8 +336,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
|
|||||||
|
|
||||||
eventIndex := EventIndex{db: db}
|
eventIndex := EventIndex{db: db}
|
||||||
|
|
||||||
q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
|
q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
|
||||||
if err == sql.ErrNoRows || !q.Next() {
|
if errors.Is(err, sql.ErrNoRows) || !q.Next() {
|
||||||
// empty database, create the schema
|
// empty database, create the schema
|
||||||
for _, ddl := range ddls {
|
for _, ddl := range ddls {
|
||||||
if _, err := db.Exec(ddl); err != nil {
|
if _, err := db.Exec(ddl); err != nil {
|
||||||
@ -306,38 +359,21 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
|
|||||||
|
|
||||||
if version == 1 {
|
if version == 1 {
|
||||||
log.Infof("upgrading event index from version 1 to version 2")
|
log.Infof("upgrading event index from version 1 to version 2")
|
||||||
|
|
||||||
err = eventIndex.migrateToVersion2(ctx, chainStore)
|
err = eventIndex.migrateToVersion2(ctx, chainStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
|
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// to upgrade to version version 2 we only need to create an index on the event table
|
|
||||||
// which means we can just recreate the schema (it will not have any effect on existing data)
|
|
||||||
for _, ddl := range ddls {
|
|
||||||
if _, err := db.Exec(ddl); err != nil {
|
|
||||||
_ = db.Close()
|
|
||||||
return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
version = 2
|
version = 2
|
||||||
}
|
}
|
||||||
|
|
||||||
if version == 2 {
|
if version == 2 {
|
||||||
log.Infof("upgrading event index from version 2 to version 3")
|
log.Infof("upgrading event index from version 2 to version 3")
|
||||||
|
err = eventIndex.migrateToVersion3(ctx)
|
||||||
// to upgrade to version 3 we only need to create an index on the event_entry.key column
|
if err != nil {
|
||||||
// and on the event.emitter_addr column
|
|
||||||
// which means we can just reapply the schema (it will not have any effect on existing data)
|
|
||||||
for _, ddl := range ddls {
|
|
||||||
if _, err := db.Exec(ddl); err != nil {
|
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("could not upgrade index to version 3, exec ddl %q: %w", ddl, err)
|
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
version = 3
|
version = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,7 +405,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
return xerrors.Errorf("begin transaction: %w", err)
|
return xerrors.Errorf("begin transaction: %w", err)
|
||||||
}
|
}
|
||||||
// rollback the transaction (a no-op if the transaction was already committed)
|
// rollback the transaction (a no-op if the transaction was already committed)
|
||||||
defer tx.Rollback() //nolint:errcheck
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
// lets handle the revert case first, since its simpler and we can simply mark all events events in this tipset as reverted and return
|
// lets handle the revert case first, since its simpler and we can simply mark all events events in this tipset as reverted and return
|
||||||
if revert {
|
if revert {
|
||||||
@ -500,11 +536,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrefillFilter fills a filter's collection of events from the historic index
|
// prefillFilter fills a filter's collection of events from the historic index
|
||||||
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
|
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
|
||||||
clauses := []string{}
|
var (
|
||||||
values := []any{}
|
clauses, joins []string
|
||||||
joins := []string{}
|
values []any
|
||||||
|
)
|
||||||
|
|
||||||
if f.tipsetCid != cid.Undef {
|
if f.tipsetCid != cid.Undef {
|
||||||
clauses = append(clauses, "event.tipset_key_cid=?")
|
clauses = append(clauses, "event.tipset_key_cid=?")
|
||||||
@ -526,7 +563,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(f.addresses) > 0 {
|
if len(f.addresses) > 0 {
|
||||||
subclauses := []string{}
|
subclauses := make([]string, 0, len(f.addresses))
|
||||||
for _, addr := range f.addresses {
|
for _, addr := range f.addresses {
|
||||||
subclauses = append(subclauses, "emitter_addr=?")
|
subclauses = append(subclauses, "emitter_addr=?")
|
||||||
values = append(values, addr.Bytes())
|
values = append(values, addr.Bytes())
|
||||||
@ -543,7 +580,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
|
|||||||
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
|
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
|
||||||
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
|
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
|
||||||
values = append(values, key)
|
values = append(values, key)
|
||||||
subclauses := []string{}
|
subclauses := make([]string, 0, len(vals))
|
||||||
for _, val := range vals {
|
for _, val := range vals {
|
||||||
subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias))
|
subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias))
|
||||||
values = append(values, val.Value, val.Codec)
|
values = append(values, val.Value, val.Codec)
|
||||||
|
Loading…
Reference in New Issue
Block a user