diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 93f499ac9..c31c7f8d5 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -306,6 +306,12 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) load: m.loadExecutedMessages, } + if m.EventIndex != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + return err + } + } + // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { @@ -331,6 +337,12 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) load: m.loadExecutedMessages, } + if m.EventIndex != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + return err + } + } + // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { @@ -346,7 +358,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a currentHeight := m.currentHeight m.mu.Unlock() - if m.EventIndex == nil && (minHeight < currentHeight || maxHeight < currentHeight) { + if m.EventIndex == nil && minHeight < currentHeight { return nil, xerrors.Errorf("historic event index disabled") } @@ -365,7 +377,8 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a maxResults: m.MaxFilterResults, } - if m.EventIndex != nil && (minHeight < currentHeight || maxHeight < currentHeight) { + if m.EventIndex != nil && minHeight < currentHeight { + // Filter needs historic events if err := m.EventIndex.PrefillFilter(ctx, f); err != nil { return nil, err } diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index e26189757..f32e0b8a8 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -50,7 +50,7 @@ var ddls = []string{ value BLOB NOT NULL )`, - // placeholder version to enable migrations. + // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( version UINT64 NOT NULL UNIQUE )`, diff --git a/node/config/types.go b/node/config/types.go index 0032930fd..197a5c6b3 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -624,6 +624,11 @@ type ActorEventConfig struct { // the entire chain) MaxFilterHeightRange uint64 + // EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to + // support the historic filter APIs. If the database does not exist it will be created. The directory containing + // the database must already exist and be writeable. + ActorEventDatabasePath string + // Others, not implemented yet: // Set a limit on the number of active websocket subscriptions (may be zero) // Set a timeout for subscription clients diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 20405c3e7..57dbd3e40 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -38,8 +38,9 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), } - if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI { + if !cfg.EnableRealTimeFilterAPI { // all event functionality is disabled + // the historic filter API relies on the real time one return ee, nil } @@ -53,67 +54,77 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy }, }) - if cfg.EnableRealTimeFilterAPI { - ee.EventFilterManager = &filter.EventFilterManager{ - ChainStore: cs, - AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - // we only want to match using f4 addresses - idAddr, err := address.NewIDAddress(uint64(emitter)) - if err != nil { - return address.Undef, false - } - addr, err := sm.LookupRobustAddress(ctx, idAddr, ts) - if err != nil { - return address.Undef, false - } - // if robust address is not f4 then we won't match against it so bail early - if addr.Protocol() != address.Delegated { - return address.Undef, false - } - // we have an f4 address, make sure it's assigned by the EAM - if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } - return addr, true - }, - - MaxFilterResults: cfg.MaxFilterResults, - } - ee.TipSetFilterManager = &filter.TipSetFilterManager{ - MaxFilterResults: cfg.MaxFilterResults, - } - ee.MemPoolFilterManager = &filter.MemPoolFilterManager{ - MaxFilterResults: cfg.MaxFilterResults, + // Enable indexing of actor events + var eventIndex *filter.EventIndex + if cfg.EnableHistoricFilterAPI { + var err error + eventIndex, err = filter.NewEventIndex(cfg.ActorEventDatabasePath) + if err != nil { + return nil, err } - const ChainHeadConfidence = 1 - - ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ - OnStart: func(context.Context) error { - ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) - if err != nil { - return err - } - // ignore returned tipsets - _ = ev.Observe(ee.EventFilterManager) - _ = ev.Observe(ee.TipSetFilterManager) - - ch, err := mp.Updates(ctx) - if err != nil { - return err - } - go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch) - - return nil + OnStop: func(ctx context.Context) error { + return eventIndex.Close() }, }) - } - if cfg.EnableHistoricFilterAPI { - // TODO: enable indexer + ee.EventFilterManager = &filter.EventFilterManager{ + ChainStore: cs, + EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true + AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + // we only want to match using f4 addresses + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + addr, err := sm.LookupRobustAddress(ctx, idAddr, ts) + if err != nil { + return address.Undef, false + } + // if robust address is not f4 then we won't match against it so bail early + if addr.Protocol() != address.Delegated { + return address.Undef, false + } + // we have an f4 address, make sure it's assigned by the EAM + if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { + return address.Undef, false + } + return addr, true + }, + + MaxFilterResults: cfg.MaxFilterResults, } + ee.TipSetFilterManager = &filter.TipSetFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, + } + ee.MemPoolFilterManager = &filter.MemPoolFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, + } + + const ChainHeadConfidence = 1 + + ctx := helpers.LifecycleCtx(mctx, lc) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) + if err != nil { + return err + } + // ignore returned tipsets + _ = ev.Observe(ee.EventFilterManager) + _ = ev.Observe(ee.TipSetFilterManager) + + ch, err := mp.Updates(ctx) + if err != nil { + return err + } + go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch) + + return nil + }, + }) return ee, nil }