feat: Make sure we dont store duplidate actor events caused by reorgs

This commit is contained in:
Fridrik Asmundsson 2023-06-28 16:07:03 +00:00
parent 30a9f63165
commit b80c0828b9
3 changed files with 282 additions and 51 deletions

View File

@ -7,14 +7,17 @@ import (
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -42,6 +45,8 @@ var ddls = []string{
reverted INTEGER NOT NULL reverted INTEGER NOT NULL
)`, )`,
`CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`,
`CREATE TABLE IF NOT EXISTS event_entry ( `CREATE TABLE IF NOT EXISTS event_entry (
event_id INTEGER, event_id INTEGER,
indexed INTEGER NOT NULL, indexed INTEGER NOT NULL,
@ -56,27 +61,173 @@ var ddls = []string{
version UINT64 NOT NULL UNIQUE version UINT64 NOT NULL UNIQUE
)`, )`,
// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`, `INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
} }
const schemaVersion = 1 var (
log = logging.Logger("filter")
)
const ( const (
insertEvent = `INSERT OR IGNORE INTO event schemaVersion = 2
(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
(event_id, indexed, flags, key, codec, value) insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
VALUES(?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
) )
type EventIndex struct { type EventIndex struct {
db *sql.DB db *sql.DB
stmtEventExists *sql.Stmt
stmtInsertEvent *sql.Stmt
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
} }
func NewEventIndex(path string) (*EventIndex, error) { func (ei *EventIndex) initStatements() (err error) {
ei.stmtEventExists, err = ei.db.Prepare(eventExists)
if err != nil {
return xerrors.Errorf("prepare stmtEventExists: %w", err)
}
ei.stmtInsertEvent, err = ei.db.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEvent: %w", err)
}
ei.stmtInsertEntry, err = ei.db.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEntry: %w", err)
}
ei.stmtRevertEventsInTipset, err = ei.db.Prepare(revertEventsInTipset)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventsInTipset: %w", err)
}
ei.stmtRestoreEvent, err = ei.db.Prepare(restoreEvent)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}
return nil
}
func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error {
now := time.Now()
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
// rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck
stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?")
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
}
stmtSelectEvent, err := tx.Prepare("SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1")
if err != nil {
return xerrors.Errorf("prepare stmtSelectEvent: %w", err)
}
stmtDeleteEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid=? AND id<?")
if err != nil {
return xerrors.Errorf("prepare stmtDeleteEvent: %w", err)
}
// get the lowest height tipset
var minHeight sql.NullInt64
err = ei.db.QueryRow("SELECT MIN(height) FROM event").Scan(&minHeight)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return xerrors.Errorf("query min height: %w", err)
}
log.Infof("Migrating events from head to %d", minHeight.Int64)
currTs := chainStore.GetHeaviestTipSet()
for int64(currTs.Height()) >= minHeight.Int64 {
tsKey := currTs.Parents()
currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey)
if err != nil {
return xerrors.Errorf("get tipset from key: %w", err)
}
log.Debugf("Migrating height %d\n", currTs.Height())
tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("tipset key cid: %w", err)
}
// delete all events that are not in the canonical chain
_, err = stmtDeleteOffChainEvent.Exec(tsKeyCid.Bytes(), currTs.Height())
if err != nil {
return xerrors.Errorf("delete off chain event: %w", err)
}
// find the first eventId from the last time the tipset was applied
var eventId sql.NullInt64
err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId)
if err != nil {
if err == sql.ErrNoRows {
continue
}
return xerrors.Errorf("select event: %w", err)
}
// this tipset might not have any events which is ok
if !eventId.Valid {
continue
}
log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height())
res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64)
if err != nil {
return xerrors.Errorf("delete event: %w", err)
}
nrRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("rows affected: %w", err)
}
log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String())
}
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign
// key constraint that gives us cascading deletes)
res, err := tx.Exec("DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)")
if err != nil {
return xerrors.Errorf("delete event_entry: %w", err)
}
nrRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("rows affected: %w", err)
}
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
log.Infof("Successfully migrated events to version 2 in %s", time.Since(now))
return nil
}
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc") db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil { if err != nil {
return nil, xerrors.Errorf("open sqlite3 database: %w", err) return nil, xerrors.Errorf("open sqlite3 database: %w", err)
@ -89,6 +240,8 @@ func NewEventIndex(path string) (*EventIndex, error) {
} }
} }
eventIndex := EventIndex{db: db}
q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if err == sql.ErrNoRows || !q.Next() { if err == sql.ErrNoRows || !q.Next() {
// empty database, create the schema // empty database, create the schema
@ -102,24 +255,48 @@ func NewEventIndex(path string) (*EventIndex, error) {
_ = db.Close() _ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err) return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else { } else {
// Ensure we don't open a database from a different schema version // check the schema version to see if we need to upgrade the database schema
row := db.QueryRow("SELECT max(version) FROM _meta")
var version int var version int
err := row.Scan(&version) err := db.QueryRow("SELECT max(version) FROM _meta").Scan(&version)
if err != nil { if err != nil {
_ = db.Close() _ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found") return nil, xerrors.Errorf("invalid database version: no version found")
} }
if version == 1 {
log.Infof("upgrading event index from version 1 to version 2")
err = eventIndex.migrateToVersion2(ctx, chainStore)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
}
// to upgrade to version version 2 we only need to create an index on the event table
// which means we can just recreate the schema (it will not have any effect on existing data)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err)
}
}
version = 2
}
if version != schemaVersion { if version != schemaVersion {
_ = db.Close() _ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
} }
} }
return &EventIndex{ err = eventIndex.initStatements()
db: db, if err != nil {
}, nil _ = db.Close()
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}
return &eventIndex, nil
} }
func (ei *EventIndex) Close() error { func (ei *EventIndex) Close() error {
@ -130,8 +307,29 @@ func (ei *EventIndex) Close() error {
} }
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
// cache of lookups between actor id and f4 address tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
// rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck
// lets handle the revert case first, since its simpler and we can simply mark all events events in this tipset as reverted and return
if revert {
_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
if err != nil {
return xerrors.Errorf("revert event: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
return nil
}
// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address) addressLookups := make(map[abi.ActorID]address.Address)
ems, err := te.messages(ctx) ems, err := te.messages(ctx)
@ -139,19 +337,8 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("load executed messages: %w", err) return xerrors.Errorf("load executed messages: %w", err)
} }
tx, err := ei.db.Begin() // iterate over all executed messages in this tipset and insert them into the database if they
if err != nil { // don't exist, otherwise mark them as not reverted
return xerrors.Errorf("begin transaction: %w", err)
}
stmtEvent, err := tx.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}
stmtEntry, err := tx.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare insert entry: %w", err)
}
for msgIdx, em := range ems { for msgIdx, em := range ems {
for evIdx, ev := range em.Events() { for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter] addr, found := addressLookups[ev.Emitter]
@ -170,7 +357,9 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("tipset key cid: %w", err) return xerrors.Errorf("tipset key cid: %w", err)
} }
res, err := stmtEvent.Exec( // check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
te.msgTs.Height(), // height te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid tsKeyCid.Bytes(), // tipset_key_cid
@ -178,20 +367,36 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
evIdx, // event_index evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index msgIdx, // message_index
revert, // reverted ).Scan(&entryID)
if err != nil {
return xerrors.Errorf("error checking if event exists: %w", err)
}
if !entryID.Valid {
// event does not exist, lets insert it
res, err := tx.Stmt(ei.stmtInsertEvent).Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
false, // reverted
) )
if err != nil { if err != nil {
return xerrors.Errorf("exec insert event: %w", err) return xerrors.Errorf("exec insert event: %w", err)
} }
lastID, err := res.LastInsertId() entryID.Int64, err = res.LastInsertId()
if err != nil { if err != nil {
return xerrors.Errorf("get last row id: %w", err) return xerrors.Errorf("get last row id: %w", err)
} }
// insert all the entries for this event
for _, entry := range ev.Entries { for _, entry := range ev.Entries {
_, err := stmtEntry.Exec( _, err = tx.Stmt(ei.stmtInsertEntry).Exec(
lastID, // event_id entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags []byte{entry.Flags}, // flags
entry.Key, // key entry.Key, // key
@ -202,10 +407,36 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("exec insert entry: %w", err) return xerrors.Errorf("exec insert entry: %w", err)
} }
} }
} else {
// event already exists, lets mark it as not reverted
res, err := tx.Stmt(ei.stmtRestoreEvent).Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
)
if err != nil {
return xerrors.Errorf("exec restore event: %w", err)
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %s", err)
}
// this is a sanity check as we should only ever be updating one event
if rowsAffected != 1 {
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
}
}
} }
} }
if err := tx.Commit(); err != nil { err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err) return xerrors.Errorf("commit transaction: %w", err)
} }

View File

@ -74,7 +74,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
dbPath := filepath.Join(workDir, "actorevents.db") dbPath := filepath.Join(workDir, "actorevents.db")
ei, err := NewEventIndex(dbPath) ei, err := NewEventIndex(context.Background(), dbPath, nil)
require.NoError(t, err, "create event index") require.NoError(t, err, "create event index")
if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events") require.NoError(t, err, "collect events")

View File

@ -79,7 +79,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo
} }
var err error var err error
eventIndex, err = filter.NewEventIndex(dbPath) eventIndex, err = filter.NewEventIndex(ctx, dbPath, chainapi.Chain)
if err != nil { if err != nil {
return nil, err return nil, err
} }