Initialise event index in di

This commit is contained in:
Ian Davis 2022-11-15 14:23:23 +00:00
parent 0d9c474a59
commit 32839f6919
4 changed files with 86 additions and 57 deletions

View File

@ -306,6 +306,12 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages, load: m.loadExecutedMessages,
} }
if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
}
// TODO: could run this loop in parallel with errgroup if there are many filters // TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters { for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
@ -331,6 +337,12 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages, load: m.loadExecutedMessages,
} }
if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
}
// TODO: could run this loop in parallel with errgroup if there are many filters // TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters { for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
@ -346,7 +358,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
currentHeight := m.currentHeight currentHeight := m.currentHeight
m.mu.Unlock() m.mu.Unlock()
if m.EventIndex == nil && (minHeight < currentHeight || maxHeight < currentHeight) { if m.EventIndex == nil && minHeight < currentHeight {
return nil, xerrors.Errorf("historic event index disabled") return nil, xerrors.Errorf("historic event index disabled")
} }
@ -365,7 +377,8 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
maxResults: m.MaxFilterResults, maxResults: m.MaxFilterResults,
} }
if m.EventIndex != nil && (minHeight < currentHeight || maxHeight < currentHeight) { if m.EventIndex != nil && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.PrefillFilter(ctx, f); err != nil { if err := m.EventIndex.PrefillFilter(ctx, f); err != nil {
return nil, err return nil, err
} }

View File

@ -50,7 +50,7 @@ var ddls = []string{
value BLOB NOT NULL value BLOB NOT NULL
)`, )`,
// placeholder version to enable migrations. // metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta ( `CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE version UINT64 NOT NULL UNIQUE
)`, )`,

View File

@ -624,6 +624,11 @@ type ActorEventConfig struct {
// the entire chain) // the entire chain)
MaxFilterHeightRange uint64 MaxFilterHeightRange uint64
// EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to
// support the historic filter APIs. If the database does not exist it will be created. The directory containing
// the database must already exist and be writeable.
ActorEventDatabasePath string
// Others, not implemented yet: // Others, not implemented yet:
// Set a limit on the number of active websocket subscriptions (may be zero) // Set a limit on the number of active websocket subscriptions (may be zero)
// Set a timeout for subscription clients // Set a timeout for subscription clients

View File

@ -38,8 +38,9 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
} }
if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI { if !cfg.EnableRealTimeFilterAPI {
// all event functionality is disabled // all event functionality is disabled
// the historic filter API relies on the real time one
return ee, nil return ee, nil
} }
@ -53,67 +54,77 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy
}, },
}) })
if cfg.EnableRealTimeFilterAPI { // Enable indexing of actor events
ee.EventFilterManager = &filter.EventFilterManager{ var eventIndex *filter.EventIndex
ChainStore: cs, if cfg.EnableHistoricFilterAPI {
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { var err error
// we only want to match using f4 addresses eventIndex, err = filter.NewEventIndex(cfg.ActorEventDatabasePath)
idAddr, err := address.NewIDAddress(uint64(emitter)) if err != nil {
if err != nil { return nil, err
return address.Undef, false
}
addr, err := sm.LookupRobustAddress(ctx, idAddr, ts)
if err != nil {
return address.Undef, false
}
// if robust address is not f4 then we won't match against it so bail early
if addr.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return addr, true
},
MaxFilterResults: cfg.MaxFilterResults,
}
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
} }
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStop: func(ctx context.Context) error {
ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) return eventIndex.Close()
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.EventFilterManager)
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
}, },
}) })
} }
if cfg.EnableHistoricFilterAPI { ee.EventFilterManager = &filter.EventFilterManager{
// TODO: enable indexer ChainStore: cs,
EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
addr, err := sm.LookupRobustAddress(ctx, idAddr, ts)
if err != nil {
return address.Undef, false
}
// if robust address is not f4 then we won't match against it so bail early
if addr.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return addr, true
},
MaxFilterResults: cfg.MaxFilterResults,
} }
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence)
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.EventFilterManager)
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
},
})
return ee, nil return ee, nil
} }