diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index ab4e24493..27aec481f 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -7,14 +7,17 @@ import ( "fmt" "sort" "strings" + "time" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -42,6 +45,8 @@ var ddls = []string{ reverted INTEGER NOT NULL )`, + `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`, + `CREATE TABLE IF NOT EXISTS event_entry ( event_id INTEGER, indexed INTEGER NOT NULL, @@ -56,27 +61,173 @@ var ddls = []string{ version UINT64 NOT NULL UNIQUE )`, - // version 1. `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, } -const schemaVersion = 1 +var ( + log = logging.Logger("filter") +) const ( - insertEvent = `INSERT OR IGNORE INTO event - (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) - VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + schemaVersion = 2 - insertEntry = `INSERT OR IGNORE INTO event_entry - (event_id, indexed, flags, key, codec, value) - VALUES(?, ?, ?, ?, ?, ?)` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` + revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` + restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` ) type EventIndex struct { db *sql.DB + + stmtEventExists *sql.Stmt + stmtInsertEvent *sql.Stmt + stmtInsertEntry *sql.Stmt + stmtRevertEventsInTipset *sql.Stmt + stmtRestoreEvent *sql.Stmt } -func NewEventIndex(path string) (*EventIndex, error) { +func (ei *EventIndex) initStatements() (err error) { + ei.stmtEventExists, err = ei.db.Prepare(eventExists) + if err != nil { + return xerrors.Errorf("prepare stmtEventExists: %w", err) + } + + ei.stmtInsertEvent, err = ei.db.Prepare(insertEvent) + if err != nil { + return xerrors.Errorf("prepare stmtInsertEvent: %w", err) + } + + ei.stmtInsertEntry, err = ei.db.Prepare(insertEntry) + if err != nil { + return xerrors.Errorf("prepare stmtInsertEntry: %w", err) + } + + ei.stmtRevertEventsInTipset, err = ei.db.Prepare(revertEventsInTipset) + if err != nil { + return xerrors.Errorf("prepare stmtRevertEventsInTipset: %w", err) + } + + ei.stmtRestoreEvent, err = ei.db.Prepare(restoreEvent) + if err != nil { + return xerrors.Errorf("prepare stmtRestoreEvent: %w", err) + } + + return nil +} + +func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error { + now := time.Now() + + tx, err := ei.db.Begin() + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + // rollback the transaction (a no-op if the transaction was already committed) + defer tx.Rollback() //nolint:errcheck + + stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?") + if err != nil { + return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) + } + + stmtSelectEvent, err := tx.Prepare("SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1") + if err != nil { + return xerrors.Errorf("prepare stmtSelectEvent: %w", err) + } + + stmtDeleteEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid=? AND id= minHeight.Int64 { + tsKey := currTs.Parents() + currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey) + if err != nil { + return xerrors.Errorf("get tipset from key: %w", err) + } + log.Debugf("Migrating height %d\n", currTs.Height()) + + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("tipset key cid: %w", err) + } + + // delete all events that are not in the canonical chain + _, err = stmtDeleteOffChainEvent.Exec(tsKeyCid.Bytes(), currTs.Height()) + if err != nil { + return xerrors.Errorf("delete off chain event: %w", err) + } + + // find the first eventId from the last time the tipset was applied + var eventId sql.NullInt64 + err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId) + if err != nil { + if err == sql.ErrNoRows { + continue + } + return xerrors.Errorf("select event: %w", err) + } + + // this tipset might not have any events which is ok + if !eventId.Valid { + continue + } + log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height()) + + res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64) + if err != nil { + return xerrors.Errorf("delete event: %w", err) + } + + nrRowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("rows affected: %w", err) + } + log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String()) + } + + // delete all entries that have an event_id that doesn't exist (since we don't have a foreign + // key constraint that gives us cascading deletes) + res, err := tx.Exec("DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)") + if err != nil { + return xerrors.Errorf("delete event_entry: %w", err) + } + + nrRowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("rows affected: %w", err) + } + log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected) + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + log.Infof("Successfully migrated events to version 2 in %s", time.Since(now)) + + return nil +} + +func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) { db, err := sql.Open("sqlite3", path+"?mode=rwc") if err != nil { return nil, xerrors.Errorf("open sqlite3 database: %w", err) @@ -89,6 +240,8 @@ func NewEventIndex(path string) (*EventIndex, error) { } } + eventIndex := EventIndex{db: db} + q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") if err == sql.ErrNoRows || !q.Next() { // empty database, create the schema @@ -102,24 +255,48 @@ func NewEventIndex(path string) (*EventIndex, error) { _ = db.Close() return nil, xerrors.Errorf("looking for _meta table: %w", err) } else { - // Ensure we don't open a database from a different schema version - - row := db.QueryRow("SELECT max(version) FROM _meta") + // check the schema version to see if we need to upgrade the database schema var version int - err := row.Scan(&version) + err := db.QueryRow("SELECT max(version) FROM _meta").Scan(&version) if err != nil { _ = db.Close() return nil, xerrors.Errorf("invalid database version: no version found") } + + if version == 1 { + log.Infof("upgrading event index from version 1 to version 2") + + err = eventIndex.migrateToVersion2(ctx, chainStore) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err) + } + + // to upgrade to version version 2 we only need to create an index on the event table + // which means we can just recreate the schema (it will not have any effect on existing data) + for _, ddl := range ddls { + if _, err := db.Exec(ddl); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err) + } + } + + version = 2 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) } } - return &EventIndex{ - db: db, - }, nil + err = eventIndex.initStatements() + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) + } + + return &eventIndex, nil } func (ei *EventIndex) Close() error { @@ -130,8 +307,29 @@ func (ei *EventIndex) Close() error { } func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { - // cache of lookups between actor id and f4 address + tx, err := ei.db.Begin() + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + // rollback the transaction (a no-op if the transaction was already committed) + defer tx.Rollback() //nolint:errcheck + // lets handle the revert case first, since its simpler and we can simply mark all events events in this tipset as reverted and return + if revert { + _, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) + if err != nil { + return xerrors.Errorf("revert event: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + return nil + } + + // cache of lookups between actor id and f4 address addressLookups := make(map[abi.ActorID]address.Address) ems, err := te.messages(ctx) @@ -139,19 +337,8 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("load executed messages: %w", err) } - tx, err := ei.db.Begin() - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - stmtEvent, err := tx.Prepare(insertEvent) - if err != nil { - return xerrors.Errorf("prepare insert event: %w", err) - } - stmtEntry, err := tx.Prepare(insertEntry) - if err != nil { - return xerrors.Errorf("prepare insert entry: %w", err) - } - + // iterate over all executed messages in this tipset and insert them into the database if they + // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { for evIdx, ev := range em.Events() { addr, found := addressLookups[ev.Emitter] @@ -170,7 +357,9 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("tipset key cid: %w", err) } - res, err := stmtEvent.Exec( + // check if this event already exists in the database + var entryID sql.NullInt64 + err = tx.Stmt(ei.stmtEventExists).QueryRow( te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid @@ -178,34 +367,76 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index - revert, // reverted - ) + ).Scan(&entryID) if err != nil { - return xerrors.Errorf("exec insert event: %w", err) + return xerrors.Errorf("error checking if event exists: %w", err) } - lastID, err := res.LastInsertId() - if err != nil { - return xerrors.Errorf("get last row id: %w", err) - } - - for _, entry := range ev.Entries { - _, err := stmtEntry.Exec( - lastID, // event_id - isIndexedValue(entry.Flags), // indexed - []byte{entry.Flags}, // flags - entry.Key, // key - entry.Codec, // codec - entry.Value, // value + if !entryID.Valid { + // event does not exist, lets insert it + res, err := tx.Stmt(ei.stmtInsertEvent).Exec( + te.msgTs.Height(), // height + te.msgTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + evIdx, // event_index + em.Message().Cid().Bytes(), // message_cid + msgIdx, // message_index + false, // reverted ) if err != nil { - return xerrors.Errorf("exec insert entry: %w", err) + return xerrors.Errorf("exec insert event: %w", err) + } + + entryID.Int64, err = res.LastInsertId() + if err != nil { + return xerrors.Errorf("get last row id: %w", err) + } + + // insert all the entries for this event + for _, entry := range ev.Entries { + _, err = tx.Stmt(ei.stmtInsertEntry).Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value + ) + if err != nil { + return xerrors.Errorf("exec insert entry: %w", err) + } + } + } else { + // event already exists, lets mark it as not reverted + res, err := tx.Stmt(ei.stmtRestoreEvent).Exec( + te.msgTs.Height(), // height + te.msgTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + evIdx, // event_index + em.Message().Cid().Bytes(), // message_cid + msgIdx, // message_index + ) + if err != nil { + return xerrors.Errorf("exec restore event: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("error getting rows affected: %s", err) + } + + // this is a sanity check as we should only ever be updating one event + if rowsAffected != 1 { + log.Warnf("restored %d events but expected only one to exist", rowsAffected) } } } } - if err := tx.Commit(); err != nil { + err = tx.Commit() + if err != nil { return xerrors.Errorf("commit transaction: %w", err) } diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ee2ae8611..fcdb1ab05 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -74,7 +74,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { dbPath := filepath.Join(workDir, "actorevents.db") - ei, err := NewEventIndex(dbPath) + ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect events") diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 68a6990ce..e871ea005 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -79,7 +79,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo } var err error - eventIndex, err = filter.NewEventIndex(dbPath) + eventIndex, err = filter.NewEventIndex(ctx, dbPath, chainapi.Chain) if err != nil { return nil, err }