diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 17536da3f..449b6ae18 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -27,7 +27,16 @@ func isIndexedValue(b uint8) bool { return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } -type EventFilter struct { +type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool) + +type EventFilter interface { + Filter + + TakeCollectedEvents(context.Context) []*CollectedEvent + CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error +} + +type eventFilter struct { id types.FilterID minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum @@ -43,7 +52,7 @@ type EventFilter struct { ch chan<- interface{} } -var _ Filter = (*EventFilter)(nil) +var _ Filter = (*eventFilter)(nil) type CollectedEvent struct { Entries []types.EventEntry @@ -56,24 +65,24 @@ type CollectedEvent struct { MsgCid cid.Cid // cid of message that produced event } -func (f *EventFilter) ID() types.FilterID { +func (f *eventFilter) ID() types.FilterID { return f.id } -func (f *EventFilter) SetSubChannel(ch chan<- interface{}) { +func (f *eventFilter) SetSubChannel(ch chan<- interface{}) { f.mu.Lock() defer f.mu.Unlock() f.ch = ch f.collected = nil } -func (f *EventFilter) ClearSubChannel() { +func (f *eventFilter) ClearSubChannel() { f.mu.Lock() defer f.mu.Unlock() f.ch = nil } -func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { +func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error { if !f.matchTipset(te) { return nil } @@ -138,13 +147,13 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } -func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) { +func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) { f.mu.Lock() f.collected = ces f.mu.Unlock() } -func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { +func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { f.mu.Lock() collected := f.collected f.collected = nil @@ -154,14 +163,14 @@ func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent return collected } -func (f *EventFilter) LastTaken() time.Time { +func (f *eventFilter) LastTaken() time.Time { f.mu.Lock() defer f.mu.Unlock() return f.lastTaken } // matchTipset reports whether this filter matches the given tipset -func (f *EventFilter) matchTipset(te *TipSetEvents) bool { +func (f *eventFilter) matchTipset(te *TipSetEvents) bool { if f.tipsetCid != cid.Undef { tsCid, err := te.Cid() if err != nil { @@ -179,7 +188,7 @@ func (f *EventFilter) matchTipset(te *TipSetEvents) bool { return true } -func (f *EventFilter) matchAddress(o address.Address) bool { +func (f *eventFilter) matchAddress(o address.Address) bool { if len(f.addresses) == 0 { return true } @@ -194,7 +203,7 @@ func (f *EventFilter) matchAddress(o address.Address) bool { return false } -func (f *EventFilter) matchKeys(ees []types.EventEntry) bool { +func (f *eventFilter) matchKeys(ees []types.EventEntry) bool { if len(f.keysWithCodec) == 0 { return true } @@ -297,7 +306,7 @@ type EventFilterManager struct { EventIndex *EventIndex mu sync.Mutex // guards mutations to filters - filters map[types.FilterID]*EventFilter + filters map[types.FilterID]EventFilter currentHeight abi.ChainEpoch } @@ -364,7 +373,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) } func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, - keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (*EventFilter, error) { + keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { m.mu.Lock() currentHeight := m.currentHeight m.mu.Unlock() @@ -378,7 +387,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a return nil, xerrors.Errorf("new filter id: %w", err) } - f := &EventFilter{ + f := &eventFilter{ id: id, minHeight: minHeight, maxHeight: maxHeight, @@ -390,14 +399,14 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { // Filter needs historic events - if err := m.EventIndex.PrefillFilter(ctx, f, excludeReverted); err != nil { + if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { return nil, err } } m.mu.Lock() if m.filters == nil { - m.filters = make(map[types.FilterID]*EventFilter) + m.filters = make(map[types.FilterID]EventFilter) } m.filters[id] = f m.mu.Unlock() diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go index ff20ef6d4..c650b71eb 100644 --- a/chain/events/filter/event_test.go +++ b/chain/events/filter/event_test.go @@ -86,13 +86,13 @@ func TestEventFilterCollectEvents(t *testing.T) { testCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -101,7 +101,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -110,7 +110,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -119,7 +119,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -129,7 +129,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -139,7 +139,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -149,7 +149,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -163,7 +163,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -179,7 +179,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -194,7 +194,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -208,7 +208,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -225,7 +225,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -242,7 +242,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -259,7 +259,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 95d99ac22..d3dd1a085 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -481,7 +481,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } // PrefillFilter fills a filter's collection of events from the historic index -func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter, excludeReverted bool) error { +func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { clauses := []string{} values := []any{} joins := []string{} diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index 77d7d39d8..ce3f7b78a 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -82,13 +82,13 @@ func TestEventIndexPrefillFilter(t *testing.T) { testCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -97,7 +97,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -106,7 +106,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -115,7 +115,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -125,7 +125,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -135,7 +135,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -145,7 +145,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -159,7 +159,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -175,7 +175,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -190,7 +190,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -204,7 +204,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -221,7 +221,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -238,7 +238,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -255,7 +255,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -272,7 +272,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.PrefillFilter(context.Background(), tc.filter, false); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { require.NoError(t, err, "prefill filter events") } @@ -409,13 +409,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { inclusiveTestCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -424,7 +424,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -433,7 +433,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -442,7 +442,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -452,7 +452,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: reveredCID14000, @@ -462,7 +462,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a3}, @@ -472,7 +472,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match address 2", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -482,7 +482,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match address 1", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -492,7 +492,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -506,7 +506,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -522,7 +522,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -537,7 +537,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -551,7 +551,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -568,7 +568,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -585,7 +585,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -602,7 +602,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -619,7 +619,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -633,7 +633,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -649,13 +649,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { exclusiveTestCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -664,7 +664,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -673,7 +673,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -682,7 +682,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -692,7 +692,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match tipset cid but reverted", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: reveredCID14000, @@ -702,7 +702,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a3}, @@ -712,7 +712,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch address 2 but reverted", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -722,7 +722,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -732,7 +732,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -746,7 +746,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -762,7 +762,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -777,7 +777,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -791,7 +791,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -808,7 +808,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -825,7 +825,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with matching reverted value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -842,7 +842,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -859,7 +859,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ @@ -876,7 +876,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { for _, tc := range inclusiveTestCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.PrefillFilter(context.Background(), tc.filter, false); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { require.NoError(t, err, "prefill filter events") } @@ -888,7 +888,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { for _, tc := range exclusiveTestCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.PrefillFilter(context.Background(), tc.filter, true); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil { require.NoError(t, err, "prefill filter events") } diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 2b2adb299..e447300af 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -372,7 +372,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { // verify that we can trace a datacap allocation through to a claim with the events, since this // information is not completely available from the state tree - claims := buildClaimsFromEvents(ctx, t, eventsFromMessages, miner.FullNode) + claims := buildClaimsFromMessages(ctx, t, eventsFromMessages, miner.FullNode) for _, claim := range claims { p, err := address.NewIDAddress(uint64(claim.Provider)) require.NoError(t, err) @@ -395,6 +395,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { }, claims) // construct ActorEvents from GetActorEvents API + t.Logf("Inspecting full events list from GetActorEvents") allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ FromHeight: epochPtr(0), }) @@ -405,6 +406,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { require.Equal(t, eventsFromMessages, allEvtsFromGetAPI) // construct ActorEvents from subscription channel for just the miner actor + t.Logf("Inspecting only miner's events list from SubscribeActorEvents") var subMinerEvts []*types.ActorEvent for evt := range minerEvtsChan { subMinerEvts = append(subMinerEvts, evt) @@ -421,15 +423,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { // compare events from messages and receipts with events from subscription channel require.Equal(t, allMinerEvts, subMinerEvts) - // construct ActorEvents from subscription channel for just the sector-activated events - var prefillSectorActivatedEvts []*types.ActorEvent - for evt := range sectorActivatedEvtsChan { - prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt) - if len(prefillSectorActivatedEvts) == 2 { - break - } - } - require.Len(t, prefillSectorActivatedEvts, 2) + // construct ActorEvents from subscription channels for just the sector-activated events var sectorActivatedEvts []*types.ActorEvent for _, evt := range eventsFromMessages { for _, entry := range evt.Entries { @@ -439,10 +433,42 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { } } } + require.Len(t, sectorActivatedEvts, 2) // sanity check + + t.Logf("Inspecting only sector-activated events list from real-time SubscribeActorEvents") + var subscribedSectorActivatedEvts []*types.ActorEvent + for evt := range sectorActivatedEvtsChan { + subscribedSectorActivatedEvts = append(subscribedSectorActivatedEvts, evt) + if len(subscribedSectorActivatedEvts) == 2 { + break + } + } // compare events from messages and receipts with events from subscription channel - require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts) + require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts) + + // same thing but use historical event fetching to see the same list + t.Logf("Inspecting only sector-activated events list from historical SubscribeActorEvents") + sectorActivatedEvtsChan, err = miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ + Fields: map[string][]types.ActorEventBlock{ + "$type": { + {Codec: 0x51, Value: sectorActivatedCbor}, + }, + }, + FromHeight: epochPtr(0), + }) + require.NoError(t, err) + subscribedSectorActivatedEvts = subscribedSectorActivatedEvts[:0] + for evt := range sectorActivatedEvtsChan { + subscribedSectorActivatedEvts = append(subscribedSectorActivatedEvts, evt) + if len(subscribedSectorActivatedEvts) == 2 { + break + } + } + // compare events from messages and receipts with events from subscription channel + require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts) // check that our `ToHeight` filter works as expected + t.Logf("Inspecting only initial list of events SubscribeActorEvents with ToHeight") var initialEvents []*types.ActorEvent for evt := range initialEventsChan { initialEvents = append(initialEvents, evt) @@ -451,6 +477,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { require.Equal(t, eventsFromMessages[0:5], initialEvents) // construct ActorEvents from subscription channel for all actor events + t.Logf("Inspecting full events list from historical SubscribeActorEvents") allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ FromHeight: epochPtr(0), }) @@ -464,9 +491,15 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { } // compare events from messages and receipts with events from subscription channel require.Equal(t, eventsFromMessages, prefillEvts) + t.Logf("All done comparing events") + + // NOTE: There is a delay in finishing this test because the SubscribeActorEvents + // with the ToHeight (initialEventsChan) has to wait at least a full actual epoch before + // realising that there's no more events for that filter. itests run with a different block + // speed than the ActorEventHandler is aware of. } -func buildClaimsFromEvents(ctx context.Context, t *testing.T, eventsFromMessages []*types.ActorEvent, node v1api.FullNode) []*verifregtypes9.Claim { +func buildClaimsFromMessages(ctx context.Context, t *testing.T, eventsFromMessages []*types.ActorEvent, node v1api.FullNode) []*verifregtypes9.Claim { claimKeyCbor := stringToEventKey(t, "claim") claims := make([]*verifregtypes9.Claim, 0) for _, event := range eventsFromMessages { diff --git a/node/impl/full/actor_event_test.go b/node/impl/full/actor_event_test.go index a7778ced5..21938de1c 100644 --- a/node/impl/full/actor_event_test.go +++ b/node/impl/full/actor_event_test.go @@ -1,14 +1,28 @@ package full import ( + "context" "fmt" + pseudo "math/rand" + "sync" "testing" + "time" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/raulk/clock" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/lotus/chain/events/filter" + "github.com/filecoin-project/lotus/chain/types" ) +var testCid = cid.MustParse("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i") + func TestParseHeightRange(t *testing.T) { epochPtr := func(i int) *abi.ChainEpoch { e := abi.ChainEpoch(i) @@ -97,16 +111,655 @@ func TestParseHeightRange(t *testing.T) { for name, tc := range tcs { tc2 := tc t.Run(name, func(t *testing.T) { + req := require.New(t) min, max, err := parseHeightRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange) - require.Equal(t, tc2.minOut, min) - require.Equal(t, tc2.maxOut, max) + req.Equal(tc2.minOut, min) + req.Equal(tc2.maxOut, max) if tc2.errStr != "" { - fmt.Println(err) - require.Error(t, err) - require.Contains(t, err.Error(), tc2.errStr) + t.Log(err) + req.Error(err) + req.Contains(err.Error(), tc2.errStr) } else { - require.NoError(t, err) + req.NoError(err) } }) } } + +func TestGetActorEvents(t *testing.T) { + ctx := context.Background() + req := require.New(t) + + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + const maxFilterHeightRange = 100 + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + req.NoError(err) + + testCases := map[string]struct { + filter *types.ActorEventFilter + currentHeight int64 + installMinHeight int64 + installMaxHeight int64 + installTipSetKey cid.Cid + installAddresses []address.Address + installKeysWithCodec map[string][]types.ActorEventBlock + installExcludeReverted bool + expectErr string + }{ + "nil filter": { + filter: nil, + installMinHeight: -1, + installMaxHeight: -1, + }, + "empty filter": { + filter: &types.ActorEventFilter{}, + installMinHeight: -1, + installMaxHeight: -1, + }, + "basic height range filter": { + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + ToHeight: epochPtr(maxFilterHeightRange), + }, + installMinHeight: 0, + installMaxHeight: maxFilterHeightRange, + }, + "from, no to height": { + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + }, + currentHeight: maxFilterHeightRange - 1, + installMinHeight: 0, + installMaxHeight: -1, + }, + "to, no from height": { + filter: &types.ActorEventFilter{ + ToHeight: epochPtr(maxFilterHeightRange - 1), + }, + installMinHeight: -1, + installMaxHeight: maxFilterHeightRange - 1, + }, + "from, no to height, too far": { + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + }, + currentHeight: maxFilterHeightRange + 1, + expectErr: "invalid epoch range: 'from' height is too far in the past", + }, + "to, no from height, too far": { + filter: &types.ActorEventFilter{ + ToHeight: epochPtr(maxFilterHeightRange + 1), + }, + currentHeight: 0, + expectErr: "invalid epoch range: 'to' height is too far in the future", + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + efm := newMockEventFilterManager(t) + collectedEvents := makeCollectedEvents(t, rng, 0, 1, 10) + filter := newMockFilter(ctx, t, rng, collectedEvents) + + if tc.expectErr == "" { + efm.expectInstall(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, tc.installExcludeReverted, filter) + } + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, tc.currentHeight)}) + req.NoError(err) + chain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandler(chain, efm, 50*time.Millisecond, maxFilterHeightRange) + + gotEvents, err := handler.GetActorEvents(ctx, tc.filter) + if tc.expectErr != "" { + req.Error(err) + req.Contains(err.Error(), tc.expectErr) + } else { + req.NoError(err) + expectedEvents := collectedToActorEvents(collectedEvents) + req.Equal(expectedEvents, gotEvents) + efm.assertRemoved(filter.ID()) + } + }) + } +} + +func TestSubscribeActorEvents(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + mockClock := clock.NewMock() + + const maxFilterHeightRange = 100 + const blockDelay = 30 * time.Second + const filterStartHeight = 0 + const currentHeight = 10 + const finishHeight = 20 + const eventsPerEpoch = 2 + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + for _, tc := range []struct { + name string + receiveSpeed time.Duration // how fast will we receive all events _per epoch_ + expectComplete bool // do we expect this to succeed? + endEpoch int // -1 for no end + }{ + {"fast", 0, true, -1}, + {"fast with end", 0, true, finishHeight}, + {"half block speed", blockDelay / 2, true, -1}, + {"half block speed with end", blockDelay / 2, true, finishHeight}, + // testing exactly blockDelay is a border case and will be flaky + {"1.5 block speed", blockDelay * 3 / 2, false, -1}, + {"twice block speed", blockDelay * 2, false, -1}, + } { + + tc := tc + t.Run(tc.name, func(t *testing.T) { + req := require.New(t) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mockClock.Set(time.Now()) + mockFilterManager := newMockEventFilterManager(t) + allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight) + historicalEvents := allEvents[0 : (currentHeight-filterStartHeight)*eventsPerEpoch] + mockFilter := newMockFilter(ctx, t, rng, historicalEvents) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, false, mockFilter) + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) + req.NoError(err) + mockChain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock) + + aef := &types.ActorEventFilter{FromHeight: epochPtr(0)} + if tc.endEpoch >= 0 { + aef.ToHeight = epochPtr(tc.endEpoch) + } + eventChan, err := handler.SubscribeActorEvents(ctx, aef) + req.NoError(err) + + gotEvents := make([]*types.ActorEvent, 0) + + // assume we can cleanly pick up all historical events in one go + for len(gotEvents) < len(historicalEvents) && ctx.Err() == nil { + select { + case e, ok := <-eventChan: + req.True(ok) + gotEvents = append(gotEvents, e) + case <-ctx.Done(): + t.Fatalf("timed out waiting for event") + } + } + req.Equal(collectedToActorEvents(historicalEvents), gotEvents) + + mockClock.Add(blockDelay) + nextReceiveTime := mockClock.Now() + + // Ticker to simulate both time and the chain advancing, including emitting events at + // the right time directly to the filter. + + go func() { + for thisHeight := int64(currentHeight); ctx.Err() == nil; thisHeight++ { + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, thisHeight)}) + req.NoError(err) + mockChain.setHeaviestTipSet(ts) + + var eventsThisEpoch []*filter.CollectedEvent + if thisHeight <= finishHeight { + eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+1)*eventsPerEpoch] + } + for i := 0; i < eventsPerEpoch; i++ { + if len(eventsThisEpoch) > 0 { + mockFilter.sendEventToChannel(eventsThisEpoch[0]) + eventsThisEpoch = eventsThisEpoch[1:] + } + select { + case <-time.After(2 * time.Millisecond): // allow everyone to catch a breath + mockClock.Add(blockDelay / eventsPerEpoch) + case <-ctx.Done(): + return + } + } + + if thisHeight == finishHeight+1 && tc.expectComplete && tc.endEpoch < 0 && ctx.Err() == nil { + // at finish+1, for the case where we expect clean completion and there is no ToEpoch + // set on the filter, if we send one more event at the next height so we end up with + // something uncollected in the buffer, causing a disconnect + evt := makeCollectedEvents(t, rng, finishHeight+1, 1, finishHeight+1)[0] + mockFilter.sendEventToChannel(evt) + } // else if endEpoch is set, we expect the chain advance to force closure + } + }() + + // Client collecting events off the channel + + var prematureEnd bool + for thisHeight := int64(currentHeight); thisHeight <= finishHeight && !prematureEnd && ctx.Err() == nil; thisHeight++ { + // delay to simulate latency + select { + case <-mockClock.After(nextReceiveTime.Sub(mockClock.Now())): + case <-ctx.Done(): + t.Fatalf("timed out simulating receive delay") + } + + // collect eventsPerEpoch more events + newEvents := make([]*types.ActorEvent, 0) + for len(newEvents) < eventsPerEpoch && !prematureEnd && ctx.Err() == nil { + select { + case e, ok := <-eventChan: // receive the events from the subscription + if ok { + newEvents = append(newEvents, e) + } else { + prematureEnd = true + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for event") + } + nextReceiveTime = nextReceiveTime.Add(tc.receiveSpeed) + } + + if tc.expectComplete || !prematureEnd { + // sanity check that we got what we expected this epoch + req.Len(newEvents, eventsPerEpoch) + epochEvents := allEvents[(thisHeight)*eventsPerEpoch : (thisHeight+1)*eventsPerEpoch] + req.Equal(collectedToActorEvents(epochEvents), newEvents) + gotEvents = append(gotEvents, newEvents...) + } + } + + req.Equal(tc.expectComplete, !prematureEnd, "expected to complete") + if tc.expectComplete { + req.Len(gotEvents, len(allEvents)) + req.Equal(collectedToActorEvents(allEvents), gotEvents) + } else { + req.NotEqual(len(gotEvents), len(allEvents)) + } + + // cleanup + mockFilter.waitAssertClearSubChannelCalled(500 * time.Millisecond) + mockFilterManager.waitAssertRemoved(mockFilter.ID(), 500*time.Millisecond) + }) + } +} + +func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { + // Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + mockClock := clock.NewMock() + + const maxFilterHeightRange = 100 + const blockDelay = 30 * time.Second + const filterStartHeight = 0 + const currentHeight = 10 + const eventsPerEpoch = 2 + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + for _, tc := range []struct { + name string + blockTimeToComplete float64 // fraction of a block time that it takes to receive all events + expectComplete bool // do we expect this to succeed? + }{ + {"fast", 0, true}, + {"half block speed", 0.5, true}, + {"1.5 block speed", 1.5, false}, + {"twice block speed", 2, false}, + } { + + tc := tc + t.Run(tc.name, func(t *testing.T) { + req := require.New(t) + + mockClock.Set(time.Now()) + mockFilterManager := newMockEventFilterManager(t) + allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight-1) + mockFilter := newMockFilter(ctx, t, rng, allEvents) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter) + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) + req.NoError(err) + mockChain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock) + + aef := &types.ActorEventFilter{FromHeight: epochPtr(0), ToHeight: epochPtr(currentHeight)} + eventChan, err := handler.SubscribeActorEvents(ctx, aef) + req.NoError(err) + + gotEvents := make([]*types.ActorEvent, 0) + + // assume we can cleanly pick up all historical events in one go + receiveLoop: + for len(gotEvents) < len(allEvents) && ctx.Err() == nil { + select { + case e, ok := <-eventChan: + if tc.expectComplete || ok { + req.True(ok) + gotEvents = append(gotEvents, e) + mockClock.Add(time.Duration(float64(blockDelay) * tc.blockTimeToComplete / float64(len(allEvents)))) + // no need to advance the chain, we're also testing that's not necessary + time.Sleep(2 * time.Millisecond) // catch a breath + } else { + break receiveLoop + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for event, got %d/%d events", len(gotEvents), len(allEvents)) + } + } + if tc.expectComplete { + req.Equal(collectedToActorEvents(allEvents), gotEvents) + } else { + req.NotEqual(len(gotEvents), len(allEvents)) + } + // advance the chain and observe cleanup + ts, err = types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight+1)}) + req.NoError(err) + mockChain.setHeaviestTipSet(ts) + mockClock.Add(blockDelay) + mockFilterManager.waitAssertRemoved(mockFilter.ID(), 1*time.Second) + }) + } +} + +var ( + _ ChainAccessor = (*mockChainAccessor)(nil) + _ filter.EventFilter = (*mockFilter)(nil) + _ EventFilterManager = (*mockEventFilterManager)(nil) +) + +type mockChainAccessor struct { + t *testing.T + ts *types.TipSet + lk sync.Mutex +} + +func newMockChainAccessor(t *testing.T, ts *types.TipSet) *mockChainAccessor { + return &mockChainAccessor{t: t, ts: ts} +} + +func (m *mockChainAccessor) setHeaviestTipSet(ts *types.TipSet) { + m.lk.Lock() + defer m.lk.Unlock() + m.ts = ts +} + +func (m *mockChainAccessor) GetHeaviestTipSet() *types.TipSet { + m.lk.Lock() + defer m.lk.Unlock() + return m.ts +} + +type mockFilter struct { + t *testing.T + ctx context.Context + id types.FilterID + lastTaken time.Time + ch chan<- interface{} + historicalEvents []*filter.CollectedEvent + subChannelCalls int + clearSubChannelCalls int + lk sync.Mutex +} + +func newMockFilter(ctx context.Context, t *testing.T, rng *pseudo.Rand, historicalEvents []*filter.CollectedEvent) *mockFilter { + t.Helper() + byt := make([]byte, 32) + _, err := rng.Read(byt) + require.NoError(t, err) + return &mockFilter{ + t: t, + ctx: ctx, + id: types.FilterID(byt), + historicalEvents: historicalEvents, + } +} + +func (m *mockFilter) sendEventToChannel(e *filter.CollectedEvent) { + m.lk.Lock() + defer m.lk.Unlock() + if m.ch != nil { + select { + case m.ch <- e: + case <-m.ctx.Done(): + } + } +} + +func (m *mockFilter) waitAssertClearSubChannelCalled(timeout time.Duration) { + m.t.Helper() + for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Millisecond) { + m.lk.Lock() + c := m.clearSubChannelCalls + m.lk.Unlock() + switch c { + case 0: + continue + case 1: + return + default: + m.t.Fatalf("ClearSubChannel called more than once") + } + } + m.t.Fatalf("ClearSubChannel not called") +} + +func (m *mockFilter) ID() types.FilterID { + return m.id +} + +func (m *mockFilter) LastTaken() time.Time { + return m.lastTaken +} + +func (m *mockFilter) SetSubChannel(ch chan<- interface{}) { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + m.subChannelCalls++ + m.ch = ch +} + +func (m *mockFilter) ClearSubChannel() { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + m.clearSubChannelCalls++ + m.ch = nil +} + +func (m *mockFilter) TakeCollectedEvents(ctx context.Context) []*filter.CollectedEvent { + e := m.historicalEvents + m.historicalEvents = nil + m.lastTaken = time.Now() + return e +} + +func (m *mockFilter) CollectEvents(ctx context.Context, tse *filter.TipSetEvents, reorg bool, ar filter.AddressResolver) error { + m.t.Fatalf("unexpected call to CollectEvents") + return nil +} + +type filterManagerExpectation struct { + minHeight, maxHeight abi.ChainEpoch + tipsetCid cid.Cid + addresses []address.Address + keysWithCodec map[string][]types.ActorEventBlock + excludeReverted bool + returnFilter filter.EventFilter +} + +type mockEventFilterManager struct { + t *testing.T + expectations []filterManagerExpectation + removed []types.FilterID + lk sync.Mutex +} + +func newMockEventFilterManager(t *testing.T) *mockEventFilterManager { + return &mockEventFilterManager{t: t} +} + +func (m *mockEventFilterManager) expectInstall( + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, + returnFilter filter.EventFilter) { + + m.t.Helper() + m.expectations = append(m.expectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + excludeReverted: excludeReverted, + returnFilter: returnFilter, + }) +} + +func (m *mockEventFilterManager) assertRemoved(id types.FilterID) { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + require.Contains(m.t, m.removed, id) +} + +func (m *mockEventFilterManager) waitAssertRemoved(id types.FilterID, timeout time.Duration) { + m.t.Helper() + for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Millisecond) { + m.lk.Lock() + if len(m.removed) == 0 { + m.lk.Unlock() + continue + } + defer m.lk.Unlock() + require.Contains(m.t, m.removed, id) + return + } + m.t.Fatalf("filter %x not removed", id) +} + +func (m *mockEventFilterManager) Install( + ctx context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, +) (filter.EventFilter, error) { + + require.True(m.t, len(m.expectations) > 0, "unexpected call to Install") + exp := m.expectations[0] + m.expectations = m.expectations[1:] + // check the expectation matches the call then return the attached filter + require.Equal(m.t, exp.minHeight, minHeight) + require.Equal(m.t, exp.maxHeight, maxHeight) + require.Equal(m.t, exp.tipsetCid, tipsetCid) + require.Equal(m.t, exp.addresses, addresses) + require.Equal(m.t, exp.keysWithCodec, keysWithCodec) + require.Equal(m.t, exp.excludeReverted, excludeReverted) + return exp.returnFilter, nil +} + +func (m *mockEventFilterManager) Remove(ctx context.Context, id types.FilterID) error { + m.lk.Lock() + defer m.lk.Unlock() + m.removed = append(m.removed, id) + return nil +} + +func newBlockHeader(minerAddr address.Address, height int64) *types.BlockHeader { + return &types.BlockHeader{ + Miner: minerAddr, + Ticket: &types.Ticket{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + ElectionProof: &types.ElectionProof{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + Parents: []cid.Cid{testCid, testCid}, + ParentMessageReceipts: testCid, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")}, + ParentWeight: types.NewInt(123125126212), + Messages: testCid, + Height: abi.ChainEpoch(height), + ParentStateRoot: testCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")}, + ParentBaseFee: types.NewInt(3432432843291), + } +} + +func epochPtr(i int) *abi.ChainEpoch { + e := abi.ChainEpoch(i) + return &e +} + +func collectedToActorEvents(collected []*filter.CollectedEvent) []*types.ActorEvent { + var out []*types.ActorEvent + for _, c := range collected { + out = append(out, &types.ActorEvent{ + Entries: c.Entries, + Emitter: c.EmitterAddr, + Reverted: c.Reverted, + Height: c.Height, + TipSetKey: c.TipSetKey, + MsgCid: c.MsgCid, + }) + } + return out +} + +func makeCollectedEvents(t *testing.T, rng *pseudo.Rand, eventStartHeight, eventsPerHeight, eventEndHeight int64) []*filter.CollectedEvent { + var out []*filter.CollectedEvent + for h := eventStartHeight; h <= eventEndHeight; h++ { + for i := int64(0); i < eventsPerHeight; i++ { + out = append(out, makeCollectedEvent(t, rng, types.NewTipSetKey(mkCid(t, fmt.Sprintf("h=%d", h))), abi.ChainEpoch(h))) + } + } + return out +} + +func makeCollectedEvent(t *testing.T, rng *pseudo.Rand, tsKey types.TipSetKey, height abi.ChainEpoch) *filter.CollectedEvent { + addr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + return &filter.CollectedEvent{ + Entries: []types.EventEntry{ + {Flags: 0x01, Key: "k1", Codec: cid.Raw, Value: []byte("v1")}, + {Flags: 0x01, Key: "k2", Codec: cid.Raw, Value: []byte("v2")}, + }, + EmitterAddr: addr, + EventIdx: 0, + Reverted: false, + Height: height, + TipSetKey: tsKey, + MsgIdx: 0, + MsgCid: testCid, + } +} + +func mkCid(t *testing.T, s string) cid.Cid { + h, err := multihash.Sum([]byte(s), multihash.SHA2_256, -1) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, h) +} diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index de5e3b966..280f50525 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -6,14 +6,14 @@ import ( "time" "github.com/ipfs/go-cid" + "github.com/raulk/clock" "go.uber.org/fx" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/events/filter" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -27,21 +27,70 @@ var ( _ ActorEventAPI = *new(api.Gateway) ) -type ActorEventHandler struct { - EventFilterManager *filter.EventFilterManager - MaxFilterHeightRange abi.ChainEpoch - Chain *store.ChainStore +type ChainAccessor interface { + GetHeaviestTipSet() *types.TipSet } -var _ ActorEventAPI = (*ActorEventHandler)(nil) +type EventFilterManager interface { + Install( + ctx context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, + ) (filter.EventFilter, error) + Remove(ctx context.Context, id types.FilterID) error +} type ActorEventsAPI struct { fx.In ActorEventAPI } +type ActorEventHandler struct { + chain ChainAccessor + eventFilterManager EventFilterManager + blockDelay time.Duration + maxFilterHeightRange abi.ChainEpoch + clock clock.Clock +} + +var _ ActorEventAPI = (*ActorEventHandler)(nil) + +func NewActorEventHandler( + chain ChainAccessor, + eventFilterManager EventFilterManager, + blockDelay time.Duration, + maxFilterHeightRange abi.ChainEpoch, +) *ActorEventHandler { + return &ActorEventHandler{ + chain: chain, + eventFilterManager: eventFilterManager, + blockDelay: blockDelay, + maxFilterHeightRange: maxFilterHeightRange, + clock: clock.New(), + } +} + +func NewActorEventHandlerWithClock( + chain ChainAccessor, + eventFilterManager EventFilterManager, + blockDelay time.Duration, + maxFilterHeightRange abi.ChainEpoch, + clock clock.Clock, +) *ActorEventHandler { + return &ActorEventHandler{ + chain: chain, + eventFilterManager: eventFilterManager, + blockDelay: blockDelay, + maxFilterHeightRange: maxFilterHeightRange, + clock: clock, + } +} + func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) { - if a.EventFilterManager == nil { + if a.eventFilterManager == nil { return nil, api.ErrNotSupported } @@ -59,13 +108,13 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) if err != nil { return nil, err } - evs, _, _ := getCollected(ctx, f) - if err := a.EventFilterManager.Remove(ctx, f.ID()); err != nil { + evs := getCollected(ctx, f) + if err := a.eventFilterManager.Remove(ctx, f.ID()); err != nil { log.Warnf("failed to remove filter: %s", err) } return evs, nil @@ -90,18 +139,14 @@ func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight") } - tsk := types.EmptyTSK - if f.TipSetKey != nil { - tsk = *f.TipSetKey - } return &filterParams{ MinHeight: 0, MaxHeight: 0, - TipSetKey: tsk, + TipSetKey: *f.TipSetKey, }, nil } - min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange) + min, max, err := parseHeightRange(a.chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.maxFilterHeightRange) if err != nil { return nil, err } @@ -156,9 +201,10 @@ func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEp } func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { - if a.EventFilterManager == nil { + if a.eventFilterManager == nil { return nil, api.ErrNotSupported } + if evtFilter == nil { evtFilter = &types.ActorEventFilter{} } @@ -171,22 +217,18 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) if err != nil { return nil, err } - // The goal for the code below is to be able to send events on the `out` channel as fast as - // possible and not let it get too far behind the rate at which the events are generated. - // For historical events we see the rate at which they were generated by looking the height range; - // we then make sure that the client can receive them at least twice as fast as they were - // generated so they catch up quick enough to receive new events. - // For ongoing events we use an exponential moving average of the events per height to make sure - // that the client doesn't fall behind. - // In both cases we allow a little bit of slack but need to avoid letting the client bloat the - // buffer too much. - // There is no special handling for reverts, so they will just look like a lot more events per - // epoch and the user has to receive them anyway. + // The goal for the code below is to send events on the `out` channel as fast as possible and not + // let it get too far behind the rate at which the events are generated. + // For historical events, we aim to send all events within a single block's time (30s on mainnet). + // This ensures that the client can catch up quickly enough to start receiving new events. + // For ongoing events, we also aim to send all events within a single block's time, so we never + // want to be buffering events (approximately) more than one epoch behind the current head. + // It's approximate because we only update our notion of "current epoch" once per ~blocktime. out := make(chan *types.ActorEvent) @@ -195,54 +237,41 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter // tell the caller we're done close(out) fm.ClearSubChannel() - if err := a.EventFilterManager.Remove(ctx, fm.ID()); err != nil { + if err := a.eventFilterManager.Remove(ctx, fm.ID()); err != nil { log.Warnf("failed to remove filter: %s", err) } }() + // When we start sending real-time events, we want to make sure that we don't fall behind more + // than one epoch's worth of events (approximately). Capture this value now, before we send + // historical events to allow for a little bit of slack in the historical event sending. + minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1 + // Handle any historical events that our filter may have picked up ----------------------------- - evs, minEpoch, maxEpoch := getCollected(ctx, fm) + evs := getCollected(ctx, fm) if len(evs) > 0 { - // must be able to send events at least twice as fast as they were generated - epochRange := maxEpoch - minEpoch - if epochRange <= 0 { - epochRange = 1 - } - eventsPerEpoch := float64(len(evs)) / float64(epochRange) - eventsPerSecond := 2 * eventsPerEpoch / float64(build.BlockDelaySecs) - // a minimum rate of 1 event per second if we don't have many events - if eventsPerSecond < 1 { - eventsPerSecond = 1 - } - - // send events from evs to the out channel and ensure we don't do it slower than eventsPerMs - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - const maxSlowTicks = 3 // slightly forgiving, allow 3 slow ticks (seconds) before giving up - slowTicks := 0 - sentEvents := 0.0 - + // ensure we get all events out on the channel within one block's time (30s on mainnet) + timer := a.clock.Timer(a.blockDelay) for _, ev := range evs { select { case out <- ev: - sentEvents++ - case <-ticker.C: - if sentEvents < eventsPerSecond { - slowTicks++ - if slowTicks >= maxSlowTicks { - log.Errorf("closing event subscription due to slow event sending rate") - return - } - } else { - slowTicks = 0 - } - sentEvents = 0 + case <-timer.C: + log.Errorf("closing event subscription due to slow event sending rate") + timer.Stop() + return case <-ctx.Done(): + timer.Stop() return } } + timer.Stop() + } + + // for the case where we have a MaxHeight set, we don't get a signal from the filter when we + // reach that height, so we need to check it ourselves, do it now but also in the loop + if params.MaxHeight > 0 && a.chain.GetHeaviestTipSet().Height() > params.MaxHeight { + return } // Handle ongoing events from the filter ------------------------------------------------------- @@ -251,10 +280,7 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter fm.SetSubChannel(in) var buffer []*types.ActorEvent - const α = 0.2 // decay factor for the events per height EMA - var eventsPerHeightEma float64 = 256 // exponential moving average of events per height, initially guess at 256 - var lastHeight abi.ChainEpoch // last seen event height - var eventsAtCurrentHeight int // number of events at the current height + nextBacklogHeightUpdate := a.clock.Now().Add(a.blockDelay) collectEvent := func(ev interface{}) bool { ce, ok := ev.(*filter.CollectedEvent) @@ -263,15 +289,12 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter return false } - if ce.Height > lastHeight { - // update the EMA of events per height when the height increases - if lastHeight != 0 { - eventsPerHeightEma = α*float64(eventsAtCurrentHeight) + (1-α)*eventsPerHeightEma - } - lastHeight = ce.Height - eventsAtCurrentHeight = 0 + if ce.Height < minBacklogHeight { + // since we mostly care about buffer size, we only trigger a too-slow close when the buffer + // increases, i.e. we collect a new event + log.Errorf("closing event subscription due to slow event sending rate") + return false } - eventsAtCurrentHeight++ buffer = append(buffer, &types.ActorEvent{ Entries: ce.Entries, @@ -284,23 +307,11 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter return true } - // for the case where we have a MaxHeight set, we don't get a signal from the filter when we - // reach that height, so we need to check it ourselves, do it now but also in the loop - if params.MaxHeight > 0 && a.Chain.GetHeaviestTipSet().Height() > params.MaxHeight { - return - } - ticker := time.NewTicker(time.Duration(build.BlockDelaySecs) * time.Second) + ticker := a.clock.Ticker(a.blockDelay) defer ticker.Stop() for ctx.Err() == nil { if len(buffer) > 0 { - // check if we need to disconnect the client because they've fallen behind, always allow at - // least 8 events in the buffer to provide a little bit of slack - if len(buffer) > 8 && float64(len(buffer)) > eventsPerHeightEma/2 { - log.Errorf("closing event subscription due to slow event sending rate") - return - } - select { case ev, ok := <-in: // incoming event if !ok || !collectEvent(ev) { @@ -309,6 +320,12 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter case out <- buffer[0]: // successful send buffer[0] = nil buffer = buffer[1:] + case <-ticker.C: + // check that our backlog isn't too big by looking at the oldest event + if buffer[0].Height < minBacklogHeight { + log.Errorf("closing event subscription due to slow event sending rate") + return + } case <-ctx.Done(): return } @@ -321,30 +338,30 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter case <-ctx.Done(): return case <-ticker.C: - if params.MaxHeight > 0 && a.Chain.GetHeaviestTipSet().Height() > params.MaxHeight { + currentHeight := a.chain.GetHeaviestTipSet().Height() + if params.MaxHeight > 0 && currentHeight > params.MaxHeight { + // we've reached the filter's MaxHeight, we're done so we can close the channel return } } } + + if a.clock.Now().After(nextBacklogHeightUpdate) { + minBacklogHeight = a.chain.GetHeaviestTipSet().Height() - 1 + nextBacklogHeightUpdate = a.clock.Now().Add(a.blockDelay) + } } }() return out, nil } -func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, abi.ChainEpoch, abi.ChainEpoch) { +func getCollected(ctx context.Context, f filter.EventFilter) []*types.ActorEvent { ces := f.TakeCollectedEvents(ctx) var out []*types.ActorEvent - var min, max abi.ChainEpoch for _, e := range ces { - if min == 0 || e.Height < min { - min = e.Height - } - if e.Height > max { - max = e.Height - } out = append(out, &types.ActorEvent{ Entries: e.Entries, Emitter: e.EmitterAddr, @@ -355,5 +372,5 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve }) } - return out, min, max + return out } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index c7529c0b6..11c53b3cf 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1322,7 +1322,7 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan return minHeight, maxHeight, nil } -func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) { +func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (filter.EventFilter, error) { var ( minHeight abi.ChainEpoch maxHeight abi.ChainEpoch @@ -1465,7 +1465,7 @@ func (e *EthEventHandler) EthUninstallFilter(ctx context.Context, id ethtypes.Et func (e *EthEventHandler) uninstallFilter(ctx context.Context, f filter.Filter) error { switch f.(type) { - case *filter.EventFilter: + case filter.EventFilter: err := e.EventFilterManager.Remove(ctx, f.ID()) if err != nil && !errors.Is(err, filter.ErrFilterNotFound) { return err diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 5d695ab57..135a34e5b 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/filter" "github.com/filecoin-project/lotus/chain/messagepool" @@ -163,18 +164,16 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc func ActorEventHandler(enable bool, fevmCfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) { return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) { - ee := &full.ActorEventHandler{ - MaxFilterHeightRange: abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange), - Chain: cs, - } if !enable || fevmCfg.Events.DisableRealTimeFilterAPI { - // all Actor events functionality is disabled - return ee, nil + fm = nil } - ee.EventFilterManager = fm - - return ee, nil + return full.NewActorEventHandler( + cs, + fm, + time.Duration(build.BlockDelaySecs)*time.Second, + abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange), + ), nil } }