v1.27.0-a #10
@ -26,7 +26,7 @@ var pragmas = []string{
|
|||||||
"PRAGMA temp_store = memory",
|
"PRAGMA temp_store = memory",
|
||||||
"PRAGMA mmap_size = 30000000000",
|
"PRAGMA mmap_size = 30000000000",
|
||||||
"PRAGMA page_size = 32768",
|
"PRAGMA page_size = 32768",
|
||||||
"PRAGMA auto_vacuum = NONE",
|
"PRAGMA auto_vacuum = NONE", // not useful until we implement GC
|
||||||
"PRAGMA automatic_index = OFF",
|
"PRAGMA automatic_index = OFF",
|
||||||
"PRAGMA journal_mode = WAL",
|
"PRAGMA journal_mode = WAL",
|
||||||
"PRAGMA read_uncommitted = ON",
|
"PRAGMA read_uncommitted = ON",
|
||||||
@ -45,8 +45,10 @@ var ddls = []string{
|
|||||||
reverted INTEGER NOT NULL
|
reverted INTEGER NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
|
||||||
createIndexEventHeightTipsetKey,
|
|
||||||
createIndexEventEmitterAddr,
|
createIndexEventEmitterAddr,
|
||||||
|
createIndexEventTipsetKeyCid,
|
||||||
|
createIndexEventHeight,
|
||||||
|
createIndexEventReverted,
|
||||||
|
|
||||||
`CREATE TABLE IF NOT EXISTS event_entry (
|
`CREATE TABLE IF NOT EXISTS event_entry (
|
||||||
event_id INTEGER,
|
event_id INTEGER,
|
||||||
@ -57,7 +59,9 @@ var ddls = []string{
|
|||||||
value BLOB NOT NULL
|
value BLOB NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
|
||||||
createIndexEventEntryKey,
|
createIndexEventEntryIndexedKey,
|
||||||
|
createIndexEventEntryCodecValue,
|
||||||
|
createIndexEventEntryEventId,
|
||||||
|
|
||||||
// metadata containing version of schema
|
// metadata containing version of schema
|
||||||
`CREATE TABLE IF NOT EXISTS _meta (
|
`CREATE TABLE IF NOT EXISTS _meta (
|
||||||
@ -67,6 +71,7 @@ var ddls = []string{
|
|||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
|
||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
|
||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
|
||||||
|
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -74,7 +79,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
schemaVersion = 3
|
schemaVersion = 4
|
||||||
|
|
||||||
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
||||||
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
|
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
|
||||||
@ -82,9 +87,14 @@ const (
|
|||||||
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
|
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
|
||||||
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
||||||
|
|
||||||
createIndexEventHeightTipsetKey = `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`
|
createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
|
||||||
createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
|
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
|
||||||
createIndexEventEntryKey = `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)`
|
createIndexEventHeight = `CREATE INDEX IF NOT EXISTS event_height ON event (height);`
|
||||||
|
createIndexEventReverted = `CREATE INDEX IF NOT EXISTS event_reverted ON event (reverted);`
|
||||||
|
|
||||||
|
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
|
||||||
|
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
|
||||||
|
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventIndex struct {
|
type EventIndex struct {
|
||||||
@ -237,7 +247,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("rows affected: %w", err)
|
return xerrors.Errorf("rows affected: %w", err)
|
||||||
}
|
}
|
||||||
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
|
log.Infof("Cleaned up %d entries that had deleted events\n", nrRowsAffected)
|
||||||
|
|
||||||
// drop the temporary indices after the migration
|
// drop the temporary indices after the migration
|
||||||
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid")
|
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid")
|
||||||
@ -249,11 +259,9 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
|
return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the final index on event.height and event.tipset_key
|
// original v2 migration introduced an index:
|
||||||
_, err = tx.ExecContext(ctx, createIndexEventHeightTipsetKey)
|
// CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)
|
||||||
if err != nil {
|
// which has subsequently been removed in v4, so it's omitted here
|
||||||
return xerrors.Errorf("create index height_tipset_key: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// increment the schema version to 2 in _meta table.
|
// increment the schema version to 2 in _meta table.
|
||||||
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (2)")
|
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (2)")
|
||||||
@ -266,20 +274,7 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C
|
|||||||
return xerrors.Errorf("commit transaction: %w", err)
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// during the migration, we have likely increased the WAL size a lot, so lets do some
|
log.Infof("Successfully migrated event index from version 1 to version 2 in %s", time.Since(now))
|
||||||
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
|
|
||||||
// as this would be a good time to do it when no other writes are happening
|
|
||||||
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
|
|
||||||
_, err = ei.db.ExecContext(ctx, "VACUUM")
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("error vacuuming database: %s", err)
|
|
||||||
}
|
|
||||||
_, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("error checkpointing wal: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Successfully migrated events to version 2 in %s", time.Since(now))
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -301,11 +296,9 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("create index event_emitter_addr: %w", err)
|
return xerrors.Errorf("create index event_emitter_addr: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create index on event_entry.key index.
|
// original v3 migration introduced an index:
|
||||||
_, err = tx.ExecContext(ctx, createIndexEventEntryKey)
|
// CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)
|
||||||
if err != nil {
|
// which has subsequently been removed in v4, so it's omitted here
|
||||||
return xerrors.Errorf("create index event_entry_key_index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// increment the schema version to 3 in _meta table.
|
// increment the schema version to 3 in _meta table.
|
||||||
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (3)")
|
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (3)")
|
||||||
@ -317,10 +310,78 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("commit transaction: %w", err)
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
log.Infof("Successfully migrated event index from version 2 to version 3 in %s", time.Since(now))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateToVersion4 migrates the schema from version 3 to version 4 by adjusting indexes to match
|
||||||
|
// the query patterns of the event filter.
|
||||||
|
//
|
||||||
|
// First it drops indexes introduced in previous migrations:
|
||||||
|
// 1. the index on the event.height and event.tipset_key columns
|
||||||
|
// 2. the index on the event_entry.key column
|
||||||
|
//
|
||||||
|
// And then creating the following indices:
|
||||||
|
// 1. an index on the event.tipset_key_cid column
|
||||||
|
// 2. an index on the event.height column
|
||||||
|
// 3. an index on the event.reverted column
|
||||||
|
// 4. an index on the event_entry.indexed and event_entry.key columns
|
||||||
|
// 5. an index on the event_entry.codec and event_entry.value columns
|
||||||
|
// 6. an index on the event_entry.event_id column
|
||||||
|
func (ei *EventIndex) migrateToVersion4(ctx context.Context) error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
tx, err := ei.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
for _, create := range []struct {
|
||||||
|
desc string
|
||||||
|
query string
|
||||||
|
}{
|
||||||
|
{"drop index height_tipset_key", "DROP INDEX IF EXISTS height_tipset_key;"},
|
||||||
|
{"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"},
|
||||||
|
{"create index event_tipset_key_cid", createIndexEventTipsetKeyCid},
|
||||||
|
{"create index event_height", createIndexEventHeight},
|
||||||
|
{"create index event_reverted", createIndexEventReverted},
|
||||||
|
{"create index event_entry_indexed_key", createIndexEventEntryIndexedKey},
|
||||||
|
{"create index event_entry_codec_value", createIndexEventEntryCodecValue},
|
||||||
|
{"create index event_entry_event_id", createIndexEventEntryEventId},
|
||||||
|
} {
|
||||||
|
_, err = tx.ExecContext(ctx, create.query)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("%s: %w", create.desc, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ei.vacuumDBAndCheckpointWAL(ctx)
|
||||||
|
|
||||||
log.Infof("Successfully migrated events to version 3 in %s", time.Since(now))
|
log.Infof("Successfully migrated events to version 3 in %s", time.Since(now))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) {
|
||||||
|
// During the large migrations, we have likely increased the WAL size a lot, so lets do some
|
||||||
|
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
|
||||||
|
// as this would be a good time to do it when no other writes are happening.
|
||||||
|
log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration")
|
||||||
|
_, err := ei.db.ExecContext(ctx, "VACUUM")
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error vacuuming database: %s", err)
|
||||||
|
}
|
||||||
|
_, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error checkpointing wal: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
|
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
|
||||||
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -358,25 +419,35 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
|
|||||||
}
|
}
|
||||||
|
|
||||||
if version == 1 {
|
if version == 1 {
|
||||||
log.Infof("upgrading event index from version 1 to version 2")
|
log.Infof("Upgrading event index from version 1 to version 2")
|
||||||
err = eventIndex.migrateToVersion2(ctx, chainStore)
|
err = eventIndex.migrateToVersion2(ctx, chainStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
|
return nil, xerrors.Errorf("could not migrate event index schema from version 1 to version 2: %w", err)
|
||||||
}
|
}
|
||||||
version = 2
|
version = 2
|
||||||
}
|
}
|
||||||
|
|
||||||
if version == 2 {
|
if version == 2 {
|
||||||
log.Infof("upgrading event index from version 2 to version 3")
|
log.Infof("Upgrading event index from version 2 to version 3")
|
||||||
err = eventIndex.migrateToVersion3(ctx)
|
err = eventIndex.migrateToVersion3(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
|
return nil, xerrors.Errorf("could not migrate event index schema from version 2 to version 3: %w", err)
|
||||||
}
|
}
|
||||||
version = 3
|
version = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if version == 3 {
|
||||||
|
log.Infof("Upgrading event index from version 3 to version 4")
|
||||||
|
err = eventIndex.migrateToVersion4(ctx)
|
||||||
|
if err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, xerrors.Errorf("could not migrate event index schema from version 3 to version 4: %w", err)
|
||||||
|
}
|
||||||
|
version = 4
|
||||||
|
}
|
||||||
|
|
||||||
if version != schemaVersion {
|
if version != schemaVersion {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
|
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
|
||||||
|
Loading…
Reference in New Issue
Block a user