From 6bbe09041101845b3636f89390e2e2320cbd9af8 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 1 May 2024 13:20:41 +0400 Subject: [PATCH] Event index should be unique for tipsets (#11952) * event index should be unique for tipsets * fix formatting * migrate to version 5 --- chain/events/filter/event.go | 10 +++++-- chain/events/filter/index.go | 58 ++++++++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 0accc551a..fa17d235e 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -57,7 +57,7 @@ var _ Filter = (*eventFilter)(nil) type CollectedEvent struct { Entries []types.EventEntry EmitterAddr address.Address // address of emitter - EventIdx int // index of the event within the list of emitted events + EventIdx int // index of the event within the list of emitted events in a given tipset Reverted bool Height abi.ChainEpoch TipSetKey types.TipSetKey // tipset that contained the message @@ -94,8 +94,11 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever if err != nil { return xerrors.Errorf("load executed messages: %w", err) } + + eventCount := 0 + for msgIdx, em := range ems { - for evIdx, ev := range em.Events() { + for _, ev := range em.Events() { // lookup address corresponding to the actor id addr, found := addressLookups[ev.Emitter] if !found { @@ -119,7 +122,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever cev := &CollectedEvent{ Entries: ev.Entries, EmitterAddr: addr, - EventIdx: evIdx, + EventIdx: eventCount, Reverted: revert, Height: te.msgTs.Height(), TipSetKey: te.msgTs.Key(), @@ -141,6 +144,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever } f.collected = append(f.collected, cev) f.mu.Unlock() + eventCount++ } } diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 9a8f8bca4..9bf7213c8 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -72,6 +72,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (5)`, } var ( @@ -79,7 +80,7 @@ var ( ) const ( - schemaVersion = 4 + schemaVersion = 5 eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` @@ -365,9 +366,42 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { return xerrors.Errorf("commit transaction: %w", err) } + log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) + return nil +} + +func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { + now := time.Now() + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1") + if err != nil { + return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err) + } + + _, err = stmtEventIndexUpdate.ExecContext(ctx) + if err != nil { + return xerrors.Errorf("update event index: %w", err) + } + + _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)") + if err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) return nil } @@ -452,6 +486,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor version = 4 } + if version == 4 { + log.Infof("Upgrading event index from version 4 to version 5") + err = eventIndex.migrateToVersion5(ctx) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate event index schema from version 4 to version 5: %w", err) + } + version = 5 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) @@ -505,10 +549,11 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("load executed messages: %w", err) } + eventCount := 0 // iterate over all executed messages in this tipset and insert them into the database if they // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { - for evIdx, ev := range em.Events() { + for _, ev := range em.Events() { addr, found := addressLookups[ev.Emitter] if !found { var ok bool @@ -532,7 +577,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid addr.Bytes(), // emitter_addr - evIdx, // event_index + eventCount, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index ).Scan(&entryID) @@ -547,7 +592,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid addr.Bytes(), // emitter_addr - evIdx, // event_index + eventCount, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index false, // reverted @@ -582,7 +627,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid addr.Bytes(), // emitter_addr - evIdx, // event_index + eventCount, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index ) @@ -600,6 +645,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever log.Warnf("restored %d events but expected only one to exist", rowsAffected) } } + eventCount++ } }