Merge pull request #11015 from filecoin-project/fix-dedup

feat: Make sure we don't store duplidate actor events caused to reorgs in events.db
This commit is contained in:
Friðrik Ásmundsson 2023-06-30 11:56:20 +00:00 committed by GitHub
commit 8577dcb475
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 282 additions and 51 deletions

View File

@ -7,14 +7,17 @@ import (
"fmt"
"sort"
"strings"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -42,6 +45,8 @@ var ddls = []string{
reverted INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)`,
`CREATE TABLE IF NOT EXISTS event_entry (
event_id INTEGER,
indexed INTEGER NOT NULL,
@ -56,27 +61,173 @@ var ddls = []string{
version UINT64 NOT NULL UNIQUE
)`,
// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
}
const schemaVersion = 1
var (
log = logging.Logger("filter")
)
const (
insertEvent = `INSERT OR IGNORE INTO event
(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
schemaVersion = 2
insertEntry = `INSERT OR IGNORE INTO event_entry
(event_id, indexed, flags, key, codec, value)
VALUES(?, ?, ?, ?, ?, ?)`
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
)
type EventIndex struct {
db *sql.DB
stmtEventExists *sql.Stmt
stmtInsertEvent *sql.Stmt
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
}
func NewEventIndex(path string) (*EventIndex, error) {
func (ei *EventIndex) initStatements() (err error) {
ei.stmtEventExists, err = ei.db.Prepare(eventExists)
if err != nil {
return xerrors.Errorf("prepare stmtEventExists: %w", err)
}
ei.stmtInsertEvent, err = ei.db.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEvent: %w", err)
}
ei.stmtInsertEntry, err = ei.db.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEntry: %w", err)
}
ei.stmtRevertEventsInTipset, err = ei.db.Prepare(revertEventsInTipset)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventsInTipset: %w", err)
}
ei.stmtRestoreEvent, err = ei.db.Prepare(restoreEvent)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}
return nil
}
func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error {
now := time.Now()
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
// rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck
stmtDeleteOffChainEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid!=? and height=?")
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err)
}
stmtSelectEvent, err := tx.Prepare("SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1")
if err != nil {
return xerrors.Errorf("prepare stmtSelectEvent: %w", err)
}
stmtDeleteEvent, err := tx.Prepare("DELETE FROM event WHERE tipset_key_cid=? AND id<?")
if err != nil {
return xerrors.Errorf("prepare stmtDeleteEvent: %w", err)
}
// get the lowest height tipset
var minHeight sql.NullInt64
err = ei.db.QueryRow("SELECT MIN(height) FROM event").Scan(&minHeight)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return xerrors.Errorf("query min height: %w", err)
}
log.Infof("Migrating events from head to %d", minHeight.Int64)
currTs := chainStore.GetHeaviestTipSet()
for int64(currTs.Height()) >= minHeight.Int64 {
tsKey := currTs.Parents()
currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey)
if err != nil {
return xerrors.Errorf("get tipset from key: %w", err)
}
log.Debugf("Migrating height %d\n", currTs.Height())
tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("tipset key cid: %w", err)
}
// delete all events that are not in the canonical chain
_, err = stmtDeleteOffChainEvent.Exec(tsKeyCid.Bytes(), currTs.Height())
if err != nil {
return xerrors.Errorf("delete off chain event: %w", err)
}
// find the first eventId from the last time the tipset was applied
var eventId sql.NullInt64
err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId)
if err != nil {
if err == sql.ErrNoRows {
continue
}
return xerrors.Errorf("select event: %w", err)
}
// this tipset might not have any events which is ok
if !eventId.Valid {
continue
}
log.Debugf("Deleting all events with id < %d at height %d\n", eventId.Int64, currTs.Height())
res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64)
if err != nil {
return xerrors.Errorf("delete event: %w", err)
}
nrRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("rows affected: %w", err)
}
log.Debugf("deleted %d events from tipset %s\n", nrRowsAffected, tsKeyCid.String())
}
// delete all entries that have an event_id that doesn't exist (since we don't have a foreign
// key constraint that gives us cascading deletes)
res, err := tx.Exec("DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)")
if err != nil {
return xerrors.Errorf("delete event_entry: %w", err)
}
nrRowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("rows affected: %w", err)
}
log.Infof("cleaned up %d entries that had deleted events\n", nrRowsAffected)
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
log.Infof("Successfully migrated events to version 2 in %s", time.Since(now))
return nil
}
func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
@ -89,6 +240,8 @@ func NewEventIndex(path string) (*EventIndex, error) {
}
}
eventIndex := EventIndex{db: db}
q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if err == sql.ErrNoRows || !q.Next() {
// empty database, create the schema
@ -102,24 +255,48 @@ func NewEventIndex(path string) (*EventIndex, error) {
_ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else {
// Ensure we don't open a database from a different schema version
row := db.QueryRow("SELECT max(version) FROM _meta")
// check the schema version to see if we need to upgrade the database schema
var version int
err := row.Scan(&version)
err := db.QueryRow("SELECT max(version) FROM _meta").Scan(&version)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found")
}
if version == 1 {
log.Infof("upgrading event index from version 1 to version 2")
err = eventIndex.migrateToVersion2(ctx, chainStore)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err)
}
// to upgrade to version version 2 we only need to create an index on the event table
// which means we can just recreate the schema (it will not have any effect on existing data)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err)
}
}
version = 2
}
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
}
}
return &EventIndex{
db: db,
}, nil
err = eventIndex.initStatements()
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}
return &eventIndex, nil
}
func (ei *EventIndex) Close() error {
@ -130,8 +307,29 @@ func (ei *EventIndex) Close() error {
}
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
// cache of lookups between actor id and f4 address
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
// rollback the transaction (a no-op if the transaction was already committed)
defer tx.Rollback() //nolint:errcheck
// lets handle the revert case first, since its simpler and we can simply mark all events events in this tipset as reverted and return
if revert {
_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
if err != nil {
return xerrors.Errorf("revert event: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
return nil
}
// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)
ems, err := te.messages(ctx)
@ -139,19 +337,8 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("load executed messages: %w", err)
}
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
stmtEvent, err := tx.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}
stmtEntry, err := tx.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare insert entry: %w", err)
}
// iterate over all executed messages in this tipset and insert them into the database if they
// don't exist, otherwise mark them as not reverted
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter]
@ -170,7 +357,9 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("tipset key cid: %w", err)
}
res, err := stmtEvent.Exec(
// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
@ -178,20 +367,36 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
revert, // reverted
).Scan(&entryID)
if err != nil {
return xerrors.Errorf("error checking if event exists: %w", err)
}
if !entryID.Valid {
// event does not exist, lets insert it
res, err := tx.Stmt(ei.stmtInsertEvent).Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
false, // reverted
)
if err != nil {
return xerrors.Errorf("exec insert event: %w", err)
}
lastID, err := res.LastInsertId()
entryID.Int64, err = res.LastInsertId()
if err != nil {
return xerrors.Errorf("get last row id: %w", err)
}
// insert all the entries for this event
for _, entry := range ev.Entries {
_, err := stmtEntry.Exec(
lastID, // event_id
_, err = tx.Stmt(ei.stmtInsertEntry).Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
@ -202,10 +407,36 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("exec insert entry: %w", err)
}
}
} else {
// event already exists, lets mark it as not reverted
res, err := tx.Stmt(ei.stmtRestoreEvent).Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
)
if err != nil {
return xerrors.Errorf("exec restore event: %w", err)
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %s", err)
}
// this is a sanity check as we should only ever be updating one event
if rowsAffected != 1 {
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
}
}
}
}
if err := tx.Commit(); err != nil {
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

View File

@ -74,7 +74,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
dbPath := filepath.Join(workDir, "actorevents.db")
ei, err := NewEventIndex(dbPath)
ei, err := NewEventIndex(context.Background(), dbPath, nil)
require.NoError(t, err, "create event index")
if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")

View File

@ -79,7 +79,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo
}
var err error
eventIndex, err = filter.NewEventIndex(dbPath)
eventIndex, err = filter.NewEventIndex(ctx, dbPath, chainapi.Chain)
if err != nil {
return nil, err
}