fix: events index: record processed epochs and tipsets for events and eth_get_log blocks till requested tipset has been indexed (#12080)

* record seen event epochs

* create correct index

* migrate to version 6

* fix typo

* test both conditions

* changes as per review

* record reverted tipsets

* see if tipsets has events and has not been reverted

* sub/unsub tipset updates from the index

* eth_get_logs should wait for events

* fix naming

* changes as per review

* solve issue with missing events

* use correct var

* changes as per review

* add unique constraint

* fix test wait

* check for events at min_height as well

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* reduce duplication

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
This commit is contained in:
Aarsh Shah 2024-06-20 12:00:49 +04:00 committed by GitHub
parent e229617ddc
commit c87e2f2a64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 425 additions and 13 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/ipfs/go-cid"
@ -62,9 +63,13 @@ var ddls = []string{
value BLOB NOT NULL
)`,
createTableEventsSeen,
createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKeyCid,
// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
@ -76,6 +81,7 @@ var ddls = []string{
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (6)`,
}
var (
@ -83,13 +89,19 @@ var (
)
const (
schemaVersion = 5
schemaVersion = 6
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`
restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`
isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`
createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
@ -99,6 +111,17 @@ const (
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`
createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL,
UNIQUE(height, tipset_key_cid)
)`
createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);`
)
type EventIndex struct {
@ -109,8 +132,27 @@ type EventIndex struct {
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
stmtUpsertEventsSeen *sql.Stmt
stmtRevertEventSeen *sql.Stmt
stmtRestoreEventSeen *sql.Stmt
stmtIsTipsetProcessed *sql.Stmt
stmtGetMaxHeightInIndex *sql.Stmt
stmtIsHeightProcessed *sql.Stmt
mu sync.Mutex
subIdCounter uint64
updateSubs map[uint64]*updateSub
}
type updateSub struct {
ctx context.Context
ch chan EventIndexUpdated
cancel context.CancelFunc
}
type EventIndexUpdated struct{}
func (ei *EventIndex) initStatements() (err error) {
ei.stmtEventExists, err = ei.db.Prepare(eventExists)
if err != nil {
@ -137,6 +179,36 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}
ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err)
}
ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err)
}
ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err)
}
ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed)
if err != nil {
return xerrors.Errorf("prepare isTipsetProcessed: %w", err)
}
ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex)
if err != nil {
return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err)
}
ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed)
if err != nil {
return xerrors.Errorf("prepare isHeightProcessed: %w", err)
}
return nil
}
@ -402,9 +474,59 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
return xerrors.Errorf("commit transaction: %w", err)
}
log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
return nil
}
func (ei *EventIndex) migrateToVersion6(ctx context.Context) error {
now := time.Now()
tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err)
}
_, err = stmtCreateTableEventsSeen.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("create table events_seen: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenHeight)
if err != nil {
return xerrors.Errorf("create index events_seen_height: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid)
if err != nil {
return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err)
}
// INSERT an entry in the events_seen table for all epochs we do have events for in our DB
_, err = tx.ExecContext(ctx, `
INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted)
SELECT DISTINCT height, tipset_key_cid, reverted FROM event
`)
if err != nil {
return xerrors.Errorf("insert events into events_seen: %w", err)
}
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)")
if err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
ei.vacuumDBAndCheckpointWAL(ctx)
log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now))
return nil
}
@ -502,6 +624,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
version = 5
}
if version == 5 {
log.Infof("Upgrading event index from version 5 to version 6")
err = eventIndex.migrateToVersion6(ctx)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err)
}
version = 6
}
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
@ -514,6 +646,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}
eventIndex.updateSubs = make(map[uint64]*updateSub)
return &eventIndex, nil
}
@ -524,6 +658,60 @@ func (ei *EventIndex) Close() error {
return ei.db.Close()
}
func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) {
subCtx, subCancel := context.WithCancel(context.Background())
ch := make(chan EventIndexUpdated)
tSub := &updateSub{
ctx: subCtx,
cancel: subCancel,
ch: ch,
}
ei.mu.Lock()
subId := ei.subIdCounter
ei.subIdCounter++
ei.updateSubs[subId] = tSub
ei.mu.Unlock()
unSubscribeF := func() {
ei.mu.Lock()
tSub, ok := ei.updateSubs[subId]
if !ok {
ei.mu.Unlock()
return
}
delete(ei.updateSubs, subId)
ei.mu.Unlock()
// cancel the subscription
tSub.cancel()
}
return tSub.ch, unSubscribeF
}
func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx)
var maxHeight uint64
err := row.Scan(&maxHeight)
return maxHeight, err
}
func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height)
var exists bool
err := row.Scan(&exists)
return exists, err
}
func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid)
var exists bool
err := row.Scan(&exists)
return exists, err
}
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
@ -532,6 +720,11 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// rollback the transaction (a no-op if the transaction was already committed)
defer func() { _ = tx.Rollback() }()
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
// lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return
if revert {
_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
@ -539,11 +732,34 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("revert event: %w", err)
}
_, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes())
if err != nil {
return xerrors.Errorf("revert event seen: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()
for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
@ -571,11 +787,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
addressLookups[ev.Emitter] = addr
}
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
@ -655,11 +866,39 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
}
// this statement will mark the tipset as processed and will insert a new row if it doesn't exist
// or update the reverted field to false if it does
_, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec(
te.msgTs.Height(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec upsert events seen: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()
for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

View File

@ -76,10 +76,50 @@ func TestEventIndexPrefillFilter(t *testing.T) {
ei, err := NewEventIndex(context.Background(), dbPath, nil)
require.NoError(t, err, "create event index")
subCh, unSubscribe := ei.SubscribeUpdates()
defer unSubscribe()
out := make(chan EventIndexUpdated, 1)
go func() {
tu := <-subCh
out <- tu
}()
if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}
mh, err := ei.GetMaxHeightInIndex(context.Background())
require.NoError(t, err)
require.Equal(t, uint64(14000), mh)
b, err := ei.IsHeightProcessed(context.Background(), 14000)
require.NoError(t, err)
require.True(t, b)
b, err = ei.IsHeightProcessed(context.Background(), 14001)
require.NoError(t, err)
require.False(t, b)
b, err = ei.IsHeightProcessed(context.Background(), 13000)
require.NoError(t, err)
require.False(t, b)
tsKey := events14000.msgTs.Key()
tsKeyCid, err := tsKey.Cid()
require.NoError(t, err, "tipset key cid")
seen, err := ei.IsTipsetProcessed(context.Background(), tsKeyCid.Bytes())
require.NoError(t, err)
require.True(t, seen, "tipset key should be seen")
seen, err = ei.IsTipsetProcessed(context.Background(), []byte{1})
require.NoError(t, err)
require.False(t, seen, "tipset key should not be seen")
_ = <-out
testCases := []struct {
name string
filter *eventFilter
@ -397,6 +437,22 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
ei, err := NewEventIndex(context.Background(), dbPath, nil)
require.NoError(t, err, "create event index")
tCh := make(chan EventIndexUpdated, 3)
subCh, unSubscribe := ei.SubscribeUpdates()
defer unSubscribe()
go func() {
cnt := 0
for tu := range subCh {
tCh <- tu
cnt++
if cnt == 3 {
close(tCh)
return
}
}
}()
if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect reverted events")
}
@ -407,6 +463,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
require.NoError(t, err, "collect events")
}
_ = <-tCh
_ = <-tCh
_ = <-tCh
inclusiveTestCases := []struct {
name string
filter *eventFilter

View File

@ -44,6 +44,11 @@ var ErrUnsupported = errors.New("unsupported method")
const maxEthFeeHistoryRewardPercentiles = 100
var (
// wait for 3 epochs
eventReadTimeout = 90 * time.Second
)
type EthModuleAPI interface {
EthBlockNumber(ctx context.Context) (ethtypes.EthUint64, error)
EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error)
@ -1258,10 +1263,58 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return nil, api.ErrNotSupported
}
// Create a temporary filter
f, err := e.installEthFilterSpec(ctx, filterSpec)
if e.EventFilterManager.EventIndex == nil {
return nil, xerrors.Errorf("cannot use eth_get_logs if historical event index is disabled")
}
pf, err := e.parseEthFilterSpec(ctx, filterSpec)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to parse eth filter spec: %w", err)
}
if pf.tipsetCid == cid.Undef {
maxHeight := pf.maxHeight
if maxHeight == -1 {
maxHeight = e.Chain.GetHeaviestTipSet().Height()
}
if maxHeight > e.Chain.GetHeaviestTipSet().Height() {
return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset")
}
err := e.waitForHeightProcessed(ctx, maxHeight)
if err != nil {
return nil, err
}
// should also have the minHeight in the filter indexed
if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil {
return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err)
} else if !b {
return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight)
}
} else {
ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset by cid: %w", err)
}
err = e.waitForHeightProcessed(ctx, ts.Height())
if err != nil {
return nil, err
}
b, err := e.EventFilterManager.EventIndex.IsTipsetProcessed(ctx, pf.tipsetCid.Bytes())
if err != nil {
return nil, xerrors.Errorf("failed to check if tipset events have been indexed: %w", err)
}
if !b {
return nil, xerrors.Errorf("event index failed to index tipset %s", pf.tipsetCid.String())
}
}
// Create a temporary filter
f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true)
if err != nil {
return nil, xerrors.Errorf("failed to install event filter: %w", err)
}
ces := f.TakeCollectedEvents(ctx)
@ -1270,6 +1323,47 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
}
func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error {
ei := e.EventFilterManager.EventIndex
if height > e.Chain.GetHeaviestTipSet().Height() {
return xerrors.New("height is in the future")
}
ctx, cancel := context.WithTimeout(ctx, eventReadTimeout)
defer cancel()
// if the height we're interested in has already been indexed -> there's nothing to do here
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
}
// subscribe for updates to the event index
subCh, unSubscribeF := ei.SubscribeUpdates()
defer unSubscribeF()
// it could be that the event index was update while the subscription was being processed -> check if index has what we need now
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
}
for {
select {
case <-subCh:
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (e *EthEventHandler) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
if e.FilterStore == nil {
return nil, api.ErrNotSupported
@ -1368,7 +1462,15 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan
return minHeight, maxHeight, nil
}
func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (filter.EventFilter, error) {
type parsedFilter struct {
minHeight abi.ChainEpoch
maxHeight abi.ChainEpoch
tipsetCid cid.Cid
addresses []address.Address
keys map[string][]types.ActorEventBlock
}
func (e *EthEventHandler) parseEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*parsedFilter, error) {
var (
minHeight abi.ChainEpoch
maxHeight abi.ChainEpoch
@ -1405,7 +1507,13 @@ func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *
return nil, err
}
return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keysToKeysWithCodec(keys), true)
return &parsedFilter{
minHeight: minHeight,
maxHeight: maxHeight,
tipsetCid: tipsetCid,
addresses: addresses,
keys: keysToKeysWithCodec(keys),
}, nil
}
func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEventBlock {
@ -1426,11 +1534,16 @@ func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes
return ethtypes.EthFilterID{}, api.ErrNotSupported
}
f, err := e.installEthFilterSpec(ctx, filterSpec)
pf, err := e.parseEthFilterSpec(ctx, filterSpec)
if err != nil {
return ethtypes.EthFilterID{}, err
}
f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true)
if err != nil {
return ethtypes.EthFilterID{}, xerrors.Errorf("failed to install event filter: %w", err)
}
if err := e.FilterStore.Add(ctx, f); err != nil {
// Could not record in store, attempt to delete filter to clean up
err2 := e.TipSetFilterManager.Remove(ctx, f.ID())