More post-review changes, lots of tests for SubscribeActorEvents

Use BlockDelay as the window for receiving events on the SubscribeActorEvents
channel. We expect the user to have received the initial batch of historical
events (if any) in one block's time. For real-time events we expect them to
not fall behind by roughly one block's time.
This commit is contained in:
Rod Vagg 2024-03-01 17:23:04 +11:00 committed by Phi-rjan
parent 5633861ce6
commit 318695a44c
9 changed files with 927 additions and 216 deletions

View File

@ -27,7 +27,16 @@ func isIndexedValue(b uint8) bool {
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}
type EventFilter struct {
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)
type EventFilter interface {
Filter
TakeCollectedEvents(context.Context) []*CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
}
type eventFilter struct {
id types.FilterID
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
@ -43,7 +52,7 @@ type EventFilter struct {
ch chan<- interface{}
}
var _ Filter = (*EventFilter)(nil)
var _ Filter = (*eventFilter)(nil)
type CollectedEvent struct {
Entries []types.EventEntry
@ -56,24 +65,24 @@ type CollectedEvent struct {
MsgCid cid.Cid // cid of message that produced event
}
func (f *EventFilter) ID() types.FilterID {
func (f *eventFilter) ID() types.FilterID {
return f.id
}
func (f *EventFilter) SetSubChannel(ch chan<- interface{}) {
func (f *eventFilter) SetSubChannel(ch chan<- interface{}) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
f.collected = nil
}
func (f *EventFilter) ClearSubChannel() {
func (f *eventFilter) ClearSubChannel() {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = nil
}
func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
if !f.matchTipset(te) {
return nil
}
@ -138,13 +147,13 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}
func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) {
func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) {
f.mu.Lock()
f.collected = ces
f.mu.Unlock()
}
func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
f.collected = nil
@ -154,14 +163,14 @@ func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent
return collected
}
func (f *EventFilter) LastTaken() time.Time {
func (f *eventFilter) LastTaken() time.Time {
f.mu.Lock()
defer f.mu.Unlock()
return f.lastTaken
}
// matchTipset reports whether this filter matches the given tipset
func (f *EventFilter) matchTipset(te *TipSetEvents) bool {
func (f *eventFilter) matchTipset(te *TipSetEvents) bool {
if f.tipsetCid != cid.Undef {
tsCid, err := te.Cid()
if err != nil {
@ -179,7 +188,7 @@ func (f *EventFilter) matchTipset(te *TipSetEvents) bool {
return true
}
func (f *EventFilter) matchAddress(o address.Address) bool {
func (f *eventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}
@ -194,7 +203,7 @@ func (f *EventFilter) matchAddress(o address.Address) bool {
return false
}
func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
func (f *eventFilter) matchKeys(ees []types.EventEntry) bool {
if len(f.keysWithCodec) == 0 {
return true
}
@ -297,7 +306,7 @@ type EventFilterManager struct {
EventIndex *EventIndex
mu sync.Mutex // guards mutations to filters
filters map[types.FilterID]*EventFilter
filters map[types.FilterID]EventFilter
currentHeight abi.ChainEpoch
}
@ -364,7 +373,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}
func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (*EventFilter, error) {
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()
@ -378,7 +387,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
return nil, xerrors.Errorf("new filter id: %w", err)
}
f := &EventFilter{
f := &eventFilter{
id: id,
minHeight: minHeight,
maxHeight: maxHeight,
@ -390,14 +399,14 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.PrefillFilter(ctx, f, excludeReverted); err != nil {
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
return nil, err
}
}
m.mu.Lock()
if m.filters == nil {
m.filters = make(map[types.FilterID]*EventFilter)
m.filters = make(map[types.FilterID]EventFilter)
}
m.filters[id] = f
m.mu.Unlock()

View File

@ -86,13 +86,13 @@ func TestEventFilterCollectEvents(t *testing.T) {
testCases := []struct {
name string
filter *EventFilter
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14001,
maxHeight: -1,
},
@ -101,7 +101,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: 13999,
},
@ -110,7 +110,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14000,
maxHeight: -1,
},
@ -119,7 +119,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
@ -129,7 +129,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
@ -139,7 +139,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
@ -149,7 +149,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -163,7 +163,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -179,7 +179,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -194,7 +194,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -208,7 +208,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -225,7 +225,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -242,7 +242,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -259,7 +259,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{

View File

@ -481,7 +481,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter, excludeReverted bool) error {
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
clauses := []string{}
values := []any{}
joins := []string{}

View File

@ -82,13 +82,13 @@ func TestEventIndexPrefillFilter(t *testing.T) {
testCases := []struct {
name string
filter *EventFilter
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14001,
maxHeight: -1,
},
@ -97,7 +97,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: 13999,
},
@ -106,7 +106,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14000,
maxHeight: -1,
},
@ -115,7 +115,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
@ -125,7 +125,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
@ -135,7 +135,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
@ -145,7 +145,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match one entry",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -159,7 +159,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -175,7 +175,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -190,7 +190,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -204,7 +204,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -221,7 +221,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -238,7 +238,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -255,7 +255,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -272,7 +272,7 @@ func TestEventIndexPrefillFilter(t *testing.T) {
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.PrefillFilter(context.Background(), tc.filter, false); err != nil {
if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil {
require.NoError(t, err, "prefill filter events")
}
@ -409,13 +409,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
inclusiveTestCases := []struct {
name string
filter *EventFilter
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14001,
maxHeight: -1,
},
@ -424,7 +424,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: 13999,
},
@ -433,7 +433,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14000,
maxHeight: -1,
},
@ -442,7 +442,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
@ -452,7 +452,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: reveredCID14000,
@ -462,7 +462,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a3},
@ -472,7 +472,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match address 2",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
@ -482,7 +482,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match address 1",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
@ -492,7 +492,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -506,7 +506,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -522,7 +522,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -537,7 +537,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -551,7 +551,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -568,7 +568,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -585,7 +585,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -602,7 +602,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -619,7 +619,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -633,7 +633,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -649,13 +649,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
exclusiveTestCases := []struct {
name string
filter *EventFilter
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14001,
maxHeight: -1,
},
@ -664,7 +664,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: 13999,
},
@ -673,7 +673,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset min height",
filter: &EventFilter{
filter: &eventFilter{
minHeight: 14000,
maxHeight: -1,
},
@ -682,7 +682,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset cid",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
@ -692,7 +692,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match tipset cid but reverted",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: reveredCID14000,
@ -702,7 +702,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a3},
@ -712,7 +712,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch address 2 but reverted",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
@ -722,7 +722,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match address",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
@ -732,7 +732,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -746,7 +746,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -762,7 +762,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -777,7 +777,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -791,7 +791,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -808,7 +808,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -825,7 +825,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with matching reverted value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -842,7 +842,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -859,7 +859,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
filter: &eventFilter{
minHeight: -1,
maxHeight: -1,
keysWithCodec: keysToKeysWithCodec(map[string][][]byte{
@ -876,7 +876,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
for _, tc := range inclusiveTestCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.PrefillFilter(context.Background(), tc.filter, false); err != nil {
if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil {
require.NoError(t, err, "prefill filter events")
}
@ -888,7 +888,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) {
for _, tc := range exclusiveTestCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.PrefillFilter(context.Background(), tc.filter, true); err != nil {
if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil {
require.NoError(t, err, "prefill filter events")
}

View File

@ -372,7 +372,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
// verify that we can trace a datacap allocation through to a claim with the events, since this
// information is not completely available from the state tree
claims := buildClaimsFromEvents(ctx, t, eventsFromMessages, miner.FullNode)
claims := buildClaimsFromMessages(ctx, t, eventsFromMessages, miner.FullNode)
for _, claim := range claims {
p, err := address.NewIDAddress(uint64(claim.Provider))
require.NoError(t, err)
@ -395,6 +395,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
}, claims)
// construct ActorEvents from GetActorEvents API
t.Logf("Inspecting full events list from GetActorEvents")
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
FromHeight: epochPtr(0),
})
@ -405,6 +406,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
require.Equal(t, eventsFromMessages, allEvtsFromGetAPI)
// construct ActorEvents from subscription channel for just the miner actor
t.Logf("Inspecting only miner's events list from SubscribeActorEvents")
var subMinerEvts []*types.ActorEvent
for evt := range minerEvtsChan {
subMinerEvts = append(subMinerEvts, evt)
@ -421,15 +423,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
// compare events from messages and receipts with events from subscription channel
require.Equal(t, allMinerEvts, subMinerEvts)
// construct ActorEvents from subscription channel for just the sector-activated events
var prefillSectorActivatedEvts []*types.ActorEvent
for evt := range sectorActivatedEvtsChan {
prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt)
if len(prefillSectorActivatedEvts) == 2 {
break
}
}
require.Len(t, prefillSectorActivatedEvts, 2)
// construct ActorEvents from subscription channels for just the sector-activated events
var sectorActivatedEvts []*types.ActorEvent
for _, evt := range eventsFromMessages {
for _, entry := range evt.Entries {
@ -439,10 +433,42 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
}
}
}
require.Len(t, sectorActivatedEvts, 2) // sanity check
t.Logf("Inspecting only sector-activated events list from real-time SubscribeActorEvents")
var subscribedSectorActivatedEvts []*types.ActorEvent
for evt := range sectorActivatedEvtsChan {
subscribedSectorActivatedEvts = append(subscribedSectorActivatedEvts, evt)
if len(subscribedSectorActivatedEvts) == 2 {
break
}
}
// compare events from messages and receipts with events from subscription channel
require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts)
require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts)
// same thing but use historical event fetching to see the same list
t.Logf("Inspecting only sector-activated events list from historical SubscribeActorEvents")
sectorActivatedEvtsChan, err = miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
Fields: map[string][]types.ActorEventBlock{
"$type": {
{Codec: 0x51, Value: sectorActivatedCbor},
},
},
FromHeight: epochPtr(0),
})
require.NoError(t, err)
subscribedSectorActivatedEvts = subscribedSectorActivatedEvts[:0]
for evt := range sectorActivatedEvtsChan {
subscribedSectorActivatedEvts = append(subscribedSectorActivatedEvts, evt)
if len(subscribedSectorActivatedEvts) == 2 {
break
}
}
// compare events from messages and receipts with events from subscription channel
require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts)
// check that our `ToHeight` filter works as expected
t.Logf("Inspecting only initial list of events SubscribeActorEvents with ToHeight")
var initialEvents []*types.ActorEvent
for evt := range initialEventsChan {
initialEvents = append(initialEvents, evt)
@ -451,6 +477,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
require.Equal(t, eventsFromMessages[0:5], initialEvents)
// construct ActorEvents from subscription channel for all actor events
t.Logf("Inspecting full events list from historical SubscribeActorEvents")
allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
FromHeight: epochPtr(0),
})
@ -464,9 +491,15 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
}
// compare events from messages and receipts with events from subscription channel
require.Equal(t, eventsFromMessages, prefillEvts)
t.Logf("All done comparing events")
// NOTE: There is a delay in finishing this test because the SubscribeActorEvents
// with the ToHeight (initialEventsChan) has to wait at least a full actual epoch before
// realising that there's no more events for that filter. itests run with a different block
// speed than the ActorEventHandler is aware of.
}
func buildClaimsFromEvents(ctx context.Context, t *testing.T, eventsFromMessages []*types.ActorEvent, node v1api.FullNode) []*verifregtypes9.Claim {
func buildClaimsFromMessages(ctx context.Context, t *testing.T, eventsFromMessages []*types.ActorEvent, node v1api.FullNode) []*verifregtypes9.Claim {
claimKeyCbor := stringToEventKey(t, "claim")
claims := make([]*verifregtypes9.Claim, 0)
for _, event := range eventsFromMessages {

View File

@ -1,14 +1,28 @@
package full
import (
"context"
"fmt"
pseudo "math/rand"
"sync"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/types"
)
var testCid = cid.MustParse("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i")
func TestParseHeightRange(t *testing.T) {
epochPtr := func(i int) *abi.ChainEpoch {
e := abi.ChainEpoch(i)
@ -97,16 +111,655 @@ func TestParseHeightRange(t *testing.T) {
for name, tc := range tcs {
tc2 := tc
t.Run(name, func(t *testing.T) {
req := require.New(t)
min, max, err := parseHeightRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange)
require.Equal(t, tc2.minOut, min)
require.Equal(t, tc2.maxOut, max)
req.Equal(tc2.minOut, min)
req.Equal(tc2.maxOut, max)
if tc2.errStr != "" {
fmt.Println(err)
require.Error(t, err)
require.Contains(t, err.Error(), tc2.errStr)
t.Log(err)
req.Error(err)
req.Contains(err.Error(), tc2.errStr)
} else {
require.NoError(t, err)
req.NoError(err)
}
})
}
}
func TestGetActorEvents(t *testing.T) {
ctx := context.Background()
req := require.New(t)
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
rng := pseudo.New(pseudo.NewSource(seed))
const maxFilterHeightRange = 100
minerAddr, err := address.NewIDAddress(uint64(rng.Int63()))
req.NoError(err)
testCases := map[string]struct {
filter *types.ActorEventFilter
currentHeight int64
installMinHeight int64
installMaxHeight int64
installTipSetKey cid.Cid
installAddresses []address.Address
installKeysWithCodec map[string][]types.ActorEventBlock
installExcludeReverted bool
expectErr string
}{
"nil filter": {
filter: nil,
installMinHeight: -1,
installMaxHeight: -1,
},
"empty filter": {
filter: &types.ActorEventFilter{},
installMinHeight: -1,
installMaxHeight: -1,
},
"basic height range filter": {
filter: &types.ActorEventFilter{
FromHeight: epochPtr(0),
ToHeight: epochPtr(maxFilterHeightRange),
},
installMinHeight: 0,
installMaxHeight: maxFilterHeightRange,
},
"from, no to height": {
filter: &types.ActorEventFilter{
FromHeight: epochPtr(0),
},
currentHeight: maxFilterHeightRange - 1,
installMinHeight: 0,
installMaxHeight: -1,
},
"to, no from height": {
filter: &types.ActorEventFilter{
ToHeight: epochPtr(maxFilterHeightRange - 1),
},
installMinHeight: -1,
installMaxHeight: maxFilterHeightRange - 1,
},
"from, no to height, too far": {
filter: &types.ActorEventFilter{
FromHeight: epochPtr(0),
},
currentHeight: maxFilterHeightRange + 1,
expectErr: "invalid epoch range: 'from' height is too far in the past",
},
"to, no from height, too far": {
filter: &types.ActorEventFilter{
ToHeight: epochPtr(maxFilterHeightRange + 1),
},
currentHeight: 0,
expectErr: "invalid epoch range: 'to' height is too far in the future",
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
efm := newMockEventFilterManager(t)
collectedEvents := makeCollectedEvents(t, rng, 0, 1, 10)
filter := newMockFilter(ctx, t, rng, collectedEvents)
if tc.expectErr == "" {
efm.expectInstall(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, tc.installExcludeReverted, filter)
}
ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, tc.currentHeight)})
req.NoError(err)
chain := newMockChainAccessor(t, ts)
handler := NewActorEventHandler(chain, efm, 50*time.Millisecond, maxFilterHeightRange)
gotEvents, err := handler.GetActorEvents(ctx, tc.filter)
if tc.expectErr != "" {
req.Error(err)
req.Contains(err.Error(), tc.expectErr)
} else {
req.NoError(err)
expectedEvents := collectedToActorEvents(collectedEvents)
req.Equal(expectedEvents, gotEvents)
efm.assertRemoved(filter.ID())
}
})
}
}
func TestSubscribeActorEvents(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
rng := pseudo.New(pseudo.NewSource(seed))
mockClock := clock.NewMock()
const maxFilterHeightRange = 100
const blockDelay = 30 * time.Second
const filterStartHeight = 0
const currentHeight = 10
const finishHeight = 20
const eventsPerEpoch = 2
minerAddr, err := address.NewIDAddress(uint64(rng.Int63()))
require.NoError(t, err)
for _, tc := range []struct {
name string
receiveSpeed time.Duration // how fast will we receive all events _per epoch_
expectComplete bool // do we expect this to succeed?
endEpoch int // -1 for no end
}{
{"fast", 0, true, -1},
{"fast with end", 0, true, finishHeight},
{"half block speed", blockDelay / 2, true, -1},
{"half block speed with end", blockDelay / 2, true, finishHeight},
// testing exactly blockDelay is a border case and will be flaky
{"1.5 block speed", blockDelay * 3 / 2, false, -1},
{"twice block speed", blockDelay * 2, false, -1},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mockClock.Set(time.Now())
mockFilterManager := newMockEventFilterManager(t)
allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight)
historicalEvents := allEvents[0 : (currentHeight-filterStartHeight)*eventsPerEpoch]
mockFilter := newMockFilter(ctx, t, rng, historicalEvents)
mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, false, mockFilter)
ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)})
req.NoError(err)
mockChain := newMockChainAccessor(t, ts)
handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock)
aef := &types.ActorEventFilter{FromHeight: epochPtr(0)}
if tc.endEpoch >= 0 {
aef.ToHeight = epochPtr(tc.endEpoch)
}
eventChan, err := handler.SubscribeActorEvents(ctx, aef)
req.NoError(err)
gotEvents := make([]*types.ActorEvent, 0)
// assume we can cleanly pick up all historical events in one go
for len(gotEvents) < len(historicalEvents) && ctx.Err() == nil {
select {
case e, ok := <-eventChan:
req.True(ok)
gotEvents = append(gotEvents, e)
case <-ctx.Done():
t.Fatalf("timed out waiting for event")
}
}
req.Equal(collectedToActorEvents(historicalEvents), gotEvents)
mockClock.Add(blockDelay)
nextReceiveTime := mockClock.Now()
// Ticker to simulate both time and the chain advancing, including emitting events at
// the right time directly to the filter.
go func() {
for thisHeight := int64(currentHeight); ctx.Err() == nil; thisHeight++ {
ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, thisHeight)})
req.NoError(err)
mockChain.setHeaviestTipSet(ts)
var eventsThisEpoch []*filter.CollectedEvent
if thisHeight <= finishHeight {
eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+1)*eventsPerEpoch]
}
for i := 0; i < eventsPerEpoch; i++ {
if len(eventsThisEpoch) > 0 {
mockFilter.sendEventToChannel(eventsThisEpoch[0])
eventsThisEpoch = eventsThisEpoch[1:]
}
select {
case <-time.After(2 * time.Millisecond): // allow everyone to catch a breath
mockClock.Add(blockDelay / eventsPerEpoch)
case <-ctx.Done():
return
}
}
if thisHeight == finishHeight+1 && tc.expectComplete && tc.endEpoch < 0 && ctx.Err() == nil {
// at finish+1, for the case where we expect clean completion and there is no ToEpoch
// set on the filter, if we send one more event at the next height so we end up with
// something uncollected in the buffer, causing a disconnect
evt := makeCollectedEvents(t, rng, finishHeight+1, 1, finishHeight+1)[0]
mockFilter.sendEventToChannel(evt)
} // else if endEpoch is set, we expect the chain advance to force closure
}
}()
// Client collecting events off the channel
var prematureEnd bool
for thisHeight := int64(currentHeight); thisHeight <= finishHeight && !prematureEnd && ctx.Err() == nil; thisHeight++ {
// delay to simulate latency
select {
case <-mockClock.After(nextReceiveTime.Sub(mockClock.Now())):
case <-ctx.Done():
t.Fatalf("timed out simulating receive delay")
}
// collect eventsPerEpoch more events
newEvents := make([]*types.ActorEvent, 0)
for len(newEvents) < eventsPerEpoch && !prematureEnd && ctx.Err() == nil {
select {
case e, ok := <-eventChan: // receive the events from the subscription
if ok {
newEvents = append(newEvents, e)
} else {
prematureEnd = true
}
case <-ctx.Done():
t.Fatalf("timed out waiting for event")
}
nextReceiveTime = nextReceiveTime.Add(tc.receiveSpeed)
}
if tc.expectComplete || !prematureEnd {
// sanity check that we got what we expected this epoch
req.Len(newEvents, eventsPerEpoch)
epochEvents := allEvents[(thisHeight)*eventsPerEpoch : (thisHeight+1)*eventsPerEpoch]
req.Equal(collectedToActorEvents(epochEvents), newEvents)
gotEvents = append(gotEvents, newEvents...)
}
}
req.Equal(tc.expectComplete, !prematureEnd, "expected to complete")
if tc.expectComplete {
req.Len(gotEvents, len(allEvents))
req.Equal(collectedToActorEvents(allEvents), gotEvents)
} else {
req.NotEqual(len(gotEvents), len(allEvents))
}
// cleanup
mockFilter.waitAssertClearSubChannelCalled(500 * time.Millisecond)
mockFilterManager.waitAssertRemoved(mockFilter.ID(), 500*time.Millisecond)
})
}
}
func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
// Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
rng := pseudo.New(pseudo.NewSource(seed))
mockClock := clock.NewMock()
const maxFilterHeightRange = 100
const blockDelay = 30 * time.Second
const filterStartHeight = 0
const currentHeight = 10
const eventsPerEpoch = 2
minerAddr, err := address.NewIDAddress(uint64(rng.Int63()))
require.NoError(t, err)
for _, tc := range []struct {
name string
blockTimeToComplete float64 // fraction of a block time that it takes to receive all events
expectComplete bool // do we expect this to succeed?
}{
{"fast", 0, true},
{"half block speed", 0.5, true},
{"1.5 block speed", 1.5, false},
{"twice block speed", 2, false},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
mockClock.Set(time.Now())
mockFilterManager := newMockEventFilterManager(t)
allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight-1)
mockFilter := newMockFilter(ctx, t, rng, allEvents)
mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter)
ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)})
req.NoError(err)
mockChain := newMockChainAccessor(t, ts)
handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock)
aef := &types.ActorEventFilter{FromHeight: epochPtr(0), ToHeight: epochPtr(currentHeight)}
eventChan, err := handler.SubscribeActorEvents(ctx, aef)
req.NoError(err)
gotEvents := make([]*types.ActorEvent, 0)
// assume we can cleanly pick up all historical events in one go
receiveLoop:
for len(gotEvents) < len(allEvents) && ctx.Err() == nil {
select {
case e, ok := <-eventChan:
if tc.expectComplete || ok {
req.True(ok)
gotEvents = append(gotEvents, e)
mockClock.Add(time.Duration(float64(blockDelay) * tc.blockTimeToComplete / float64(len(allEvents))))
// no need to advance the chain, we're also testing that's not necessary
time.Sleep(2 * time.Millisecond) // catch a breath
} else {
break receiveLoop
}
case <-ctx.Done():
t.Fatalf("timed out waiting for event, got %d/%d events", len(gotEvents), len(allEvents))
}
}
if tc.expectComplete {
req.Equal(collectedToActorEvents(allEvents), gotEvents)
} else {
req.NotEqual(len(gotEvents), len(allEvents))
}
// advance the chain and observe cleanup
ts, err = types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight+1)})
req.NoError(err)
mockChain.setHeaviestTipSet(ts)
mockClock.Add(blockDelay)
mockFilterManager.waitAssertRemoved(mockFilter.ID(), 1*time.Second)
})
}
}
var (
_ ChainAccessor = (*mockChainAccessor)(nil)
_ filter.EventFilter = (*mockFilter)(nil)
_ EventFilterManager = (*mockEventFilterManager)(nil)
)
type mockChainAccessor struct {
t *testing.T
ts *types.TipSet
lk sync.Mutex
}
func newMockChainAccessor(t *testing.T, ts *types.TipSet) *mockChainAccessor {
return &mockChainAccessor{t: t, ts: ts}
}
func (m *mockChainAccessor) setHeaviestTipSet(ts *types.TipSet) {
m.lk.Lock()
defer m.lk.Unlock()
m.ts = ts
}
func (m *mockChainAccessor) GetHeaviestTipSet() *types.TipSet {
m.lk.Lock()
defer m.lk.Unlock()
return m.ts
}
type mockFilter struct {
t *testing.T
ctx context.Context
id types.FilterID
lastTaken time.Time
ch chan<- interface{}
historicalEvents []*filter.CollectedEvent
subChannelCalls int
clearSubChannelCalls int
lk sync.Mutex
}
func newMockFilter(ctx context.Context, t *testing.T, rng *pseudo.Rand, historicalEvents []*filter.CollectedEvent) *mockFilter {
t.Helper()
byt := make([]byte, 32)
_, err := rng.Read(byt)
require.NoError(t, err)
return &mockFilter{
t: t,
ctx: ctx,
id: types.FilterID(byt),
historicalEvents: historicalEvents,
}
}
func (m *mockFilter) sendEventToChannel(e *filter.CollectedEvent) {
m.lk.Lock()
defer m.lk.Unlock()
if m.ch != nil {
select {
case m.ch <- e:
case <-m.ctx.Done():
}
}
}
func (m *mockFilter) waitAssertClearSubChannelCalled(timeout time.Duration) {
m.t.Helper()
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Millisecond) {
m.lk.Lock()
c := m.clearSubChannelCalls
m.lk.Unlock()
switch c {
case 0:
continue
case 1:
return
default:
m.t.Fatalf("ClearSubChannel called more than once")
}
}
m.t.Fatalf("ClearSubChannel not called")
}
func (m *mockFilter) ID() types.FilterID {
return m.id
}
func (m *mockFilter) LastTaken() time.Time {
return m.lastTaken
}
func (m *mockFilter) SetSubChannel(ch chan<- interface{}) {
m.t.Helper()
m.lk.Lock()
defer m.lk.Unlock()
m.subChannelCalls++
m.ch = ch
}
func (m *mockFilter) ClearSubChannel() {
m.t.Helper()
m.lk.Lock()
defer m.lk.Unlock()
m.clearSubChannelCalls++
m.ch = nil
}
func (m *mockFilter) TakeCollectedEvents(ctx context.Context) []*filter.CollectedEvent {
e := m.historicalEvents
m.historicalEvents = nil
m.lastTaken = time.Now()
return e
}
func (m *mockFilter) CollectEvents(ctx context.Context, tse *filter.TipSetEvents, reorg bool, ar filter.AddressResolver) error {
m.t.Fatalf("unexpected call to CollectEvents")
return nil
}
type filterManagerExpectation struct {
minHeight, maxHeight abi.ChainEpoch
tipsetCid cid.Cid
addresses []address.Address
keysWithCodec map[string][]types.ActorEventBlock
excludeReverted bool
returnFilter filter.EventFilter
}
type mockEventFilterManager struct {
t *testing.T
expectations []filterManagerExpectation
removed []types.FilterID
lk sync.Mutex
}
func newMockEventFilterManager(t *testing.T) *mockEventFilterManager {
return &mockEventFilterManager{t: t}
}
func (m *mockEventFilterManager) expectInstall(
minHeight, maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
returnFilter filter.EventFilter) {
m.t.Helper()
m.expectations = append(m.expectations, filterManagerExpectation{
minHeight: minHeight,
maxHeight: maxHeight,
tipsetCid: tipsetCid,
addresses: addresses,
keysWithCodec: keysWithCodec,
excludeReverted: excludeReverted,
returnFilter: returnFilter,
})
}
func (m *mockEventFilterManager) assertRemoved(id types.FilterID) {
m.t.Helper()
m.lk.Lock()
defer m.lk.Unlock()
require.Contains(m.t, m.removed, id)
}
func (m *mockEventFilterManager) waitAssertRemoved(id types.FilterID, timeout time.Duration) {
m.t.Helper()
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Millisecond) {
m.lk.Lock()
if len(m.removed) == 0 {
m.lk.Unlock()
continue
}
defer m.lk.Unlock()
require.Contains(m.t, m.removed, id)
return
}
m.t.Fatalf("filter %x not removed", id)
}
func (m *mockEventFilterManager) Install(
ctx context.Context,
minHeight, maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
) (filter.EventFilter, error) {
require.True(m.t, len(m.expectations) > 0, "unexpected call to Install")
exp := m.expectations[0]
m.expectations = m.expectations[1:]
// check the expectation matches the call then return the attached filter
require.Equal(m.t, exp.minHeight, minHeight)
require.Equal(m.t, exp.maxHeight, maxHeight)
require.Equal(m.t, exp.tipsetCid, tipsetCid)
require.Equal(m.t, exp.addresses, addresses)
require.Equal(m.t, exp.keysWithCodec, keysWithCodec)
require.Equal(m.t, exp.excludeReverted, excludeReverted)
return exp.returnFilter, nil
}
func (m *mockEventFilterManager) Remove(ctx context.Context, id types.FilterID) error {
m.lk.Lock()
defer m.lk.Unlock()
m.removed = append(m.removed, id)
return nil
}
func newBlockHeader(minerAddr address.Address, height int64) *types.BlockHeader {
return &types.BlockHeader{
Miner: minerAddr,
Ticket: &types.Ticket{
VRFProof: []byte("vrf proof0000000vrf proof0000000"),
},
ElectionProof: &types.ElectionProof{
VRFProof: []byte("vrf proof0000000vrf proof0000000"),
},
Parents: []cid.Cid{testCid, testCid},
ParentMessageReceipts: testCid,
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")},
ParentWeight: types.NewInt(123125126212),
Messages: testCid,
Height: abi.ChainEpoch(height),
ParentStateRoot: testCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")},
ParentBaseFee: types.NewInt(3432432843291),
}
}
func epochPtr(i int) *abi.ChainEpoch {
e := abi.ChainEpoch(i)
return &e
}
func collectedToActorEvents(collected []*filter.CollectedEvent) []*types.ActorEvent {
var out []*types.ActorEvent
for _, c := range collected {
out = append(out, &types.ActorEvent{
Entries: c.Entries,
Emitter: c.EmitterAddr,
Reverted: c.Reverted,
Height: c.Height,
TipSetKey: c.TipSetKey,
MsgCid: c.MsgCid,
})
}
return out
}
func makeCollectedEvents(t *testing.T, rng *pseudo.Rand, eventStartHeight, eventsPerHeight, eventEndHeight int64) []*filter.CollectedEvent {
var out []*filter.CollectedEvent
for h := eventStartHeight; h <= eventEndHeight; h++ {
for i := int64(0); i < eventsPerHeight; i++ {
out = append(out, makeCollectedEvent(t, rng, types.NewTipSetKey(mkCid(t, fmt.Sprintf("h=%d", h))), abi.ChainEpoch(h)))
}
}
return out
}
func makeCollectedEvent(t *testing.T, rng *pseudo.Rand, tsKey types.TipSetKey, height abi.ChainEpoch) *filter.CollectedEvent {
addr, err := address.NewIDAddress(uint64(rng.Int63()))
require.NoError(t, err)
return &filter.CollectedEvent{
Entries: []types.EventEntry{
{Flags: 0x01, Key: "k1", Codec: cid.Raw, Value: []byte("v1")},
{Flags: 0x01, Key: "k2", Codec: cid.Raw, Value: []byte("v2")},
},
EmitterAddr: addr,
EventIdx: 0,
Reverted: false,
Height: height,
TipSetKey: tsKey,
MsgIdx: 0,
MsgCid: testCid,
}
}
func mkCid(t *testing.T, s string) cid.Cid {
h, err := multihash.Sum([]byte(s), multihash.SHA2_256, -1)
require.NoError(t, err)
return cid.NewCidV1(cid.Raw, h)
}

View File

@ -6,14 +6,14 @@ import (
"time"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -27,21 +27,70 @@ var (
_ ActorEventAPI = *new(api.Gateway)
)
type ActorEventHandler struct {
EventFilterManager *filter.EventFilterManager
MaxFilterHeightRange abi.ChainEpoch
Chain *store.ChainStore
type ChainAccessor interface {
GetHeaviestTipSet() *types.TipSet
}
var _ ActorEventAPI = (*ActorEventHandler)(nil)
type EventFilterManager interface {
Install(
ctx context.Context,
minHeight, maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
) (filter.EventFilter, error)
Remove(ctx context.Context, id types.FilterID) error
}
type ActorEventsAPI struct {
fx.In
ActorEventAPI
}
type ActorEventHandler struct {
chain ChainAccessor
eventFilterManager EventFilterManager
blockDelay time.Duration
maxFilterHeightRange abi.ChainEpoch
clock clock.Clock
}
var _ ActorEventAPI = (*ActorEventHandler)(nil)
func NewActorEventHandler(
chain ChainAccessor,
eventFilterManager EventFilterManager,
blockDelay time.Duration,
maxFilterHeightRange abi.ChainEpoch,
) *ActorEventHandler {
return &ActorEventHandler{
chain: chain,
eventFilterManager: eventFilterManager,
blockDelay: blockDelay,
maxFilterHeightRange: maxFilterHeightRange,
clock: clock.New(),
}
}
func NewActorEventHandlerWithClock(
chain ChainAccessor,
eventFilterManager EventFilterManager,
blockDelay time.Duration,
maxFilterHeightRange abi.ChainEpoch,
clock clock.Clock,
) *ActorEventHandler {
return &ActorEventHandler{
chain: chain,
eventFilterManager: eventFilterManager,
blockDelay: blockDelay,
maxFilterHeightRange: maxFilterHeightRange,
clock: clock,
}
}
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
if a.EventFilterManager == nil {
if a.eventFilterManager == nil {
return nil, api.ErrNotSupported
}
@ -59,13 +108,13 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
if err != nil {
return nil, err
}
evs, _, _ := getCollected(ctx, f)
if err := a.EventFilterManager.Remove(ctx, f.ID()); err != nil {
evs := getCollected(ctx, f)
if err := a.eventFilterManager.Remove(ctx, f.ID()); err != nil {
log.Warnf("failed to remove filter: %s", err)
}
return evs, nil
@ -90,18 +139,14 @@ func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams
return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight")
}
tsk := types.EmptyTSK
if f.TipSetKey != nil {
tsk = *f.TipSetKey
}
return &filterParams{
MinHeight: 0,
MaxHeight: 0,
TipSetKey: tsk,
TipSetKey: *f.TipSetKey,
}, nil
}
min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange)
min, max, err := parseHeightRange(a.chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.maxFilterHeightRange)
if err != nil {
return nil, err
}
@ -156,9 +201,10 @@ func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEp
}
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
if a.EventFilterManager == nil {
if a.eventFilterManager == nil {
return nil, api.ErrNotSupported
}
if evtFilter == nil {
evtFilter = &types.ActorEventFilter{}
}
@ -171,22 +217,18 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
if err != nil {
return nil, err
}
// The goal for the code below is to be able to send events on the `out` channel as fast as
// possible and not let it get too far behind the rate at which the events are generated.
// For historical events we see the rate at which they were generated by looking the height range;
// we then make sure that the client can receive them at least twice as fast as they were
// generated so they catch up quick enough to receive new events.
// For ongoing events we use an exponential moving average of the events per height to make sure
// that the client doesn't fall behind.
// In both cases we allow a little bit of slack but need to avoid letting the client bloat the
// buffer too much.
// There is no special handling for reverts, so they will just look like a lot more events per
// epoch and the user has to receive them anyway.
// The goal for the code below is to send events on the `out` channel as fast as possible and not
// let it get too far behind the rate at which the events are generated.
// For historical events, we aim to send all events within a single block's time (30s on mainnet).
// This ensures that the client can catch up quickly enough to start receiving new events.
// For ongoing events, we also aim to send all events within a single block's time, so we never
// want to be buffering events (approximately) more than one epoch behind the current head.
// It's approximate because we only update our notion of "current epoch" once per ~blocktime.
out := make(chan *types.ActorEvent)
@ -195,54 +237,41 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
// tell the caller we're done
close(out)
fm.ClearSubChannel()
if err := a.EventFilterManager.Remove(ctx, fm.ID()); err != nil {
if err := a.eventFilterManager.Remove(ctx, fm.ID()); err != nil {
log.Warnf("failed to remove filter: %s", err)
}
}()
// When we start sending real-time events, we want to make sure that we don't fall behind more
// than one epoch's worth of events (approximately). Capture this value now, before we send
// historical events to allow for a little bit of slack in the historical event sending.
minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1
// Handle any historical events that our filter may have picked up -----------------------------
evs, minEpoch, maxEpoch := getCollected(ctx, fm)
evs := getCollected(ctx, fm)
if len(evs) > 0 {
// must be able to send events at least twice as fast as they were generated
epochRange := maxEpoch - minEpoch
if epochRange <= 0 {
epochRange = 1
}
eventsPerEpoch := float64(len(evs)) / float64(epochRange)
eventsPerSecond := 2 * eventsPerEpoch / float64(build.BlockDelaySecs)
// a minimum rate of 1 event per second if we don't have many events
if eventsPerSecond < 1 {
eventsPerSecond = 1
}
// send events from evs to the out channel and ensure we don't do it slower than eventsPerMs
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
const maxSlowTicks = 3 // slightly forgiving, allow 3 slow ticks (seconds) before giving up
slowTicks := 0
sentEvents := 0.0
// ensure we get all events out on the channel within one block's time (30s on mainnet)
timer := a.clock.Timer(a.blockDelay)
for _, ev := range evs {
select {
case out <- ev:
sentEvents++
case <-ticker.C:
if sentEvents < eventsPerSecond {
slowTicks++
if slowTicks >= maxSlowTicks {
log.Errorf("closing event subscription due to slow event sending rate")
return
}
} else {
slowTicks = 0
}
sentEvents = 0
case <-timer.C:
log.Errorf("closing event subscription due to slow event sending rate")
timer.Stop()
return
case <-ctx.Done():
timer.Stop()
return
}
}
timer.Stop()
}
// for the case where we have a MaxHeight set, we don't get a signal from the filter when we
// reach that height, so we need to check it ourselves, do it now but also in the loop
if params.MaxHeight > 0 && a.chain.GetHeaviestTipSet().Height() > params.MaxHeight {
return
}
// Handle ongoing events from the filter -------------------------------------------------------
@ -251,10 +280,7 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
fm.SetSubChannel(in)
var buffer []*types.ActorEvent
const α = 0.2 // decay factor for the events per height EMA
var eventsPerHeightEma float64 = 256 // exponential moving average of events per height, initially guess at 256
var lastHeight abi.ChainEpoch // last seen event height
var eventsAtCurrentHeight int // number of events at the current height
nextBacklogHeightUpdate := a.clock.Now().Add(a.blockDelay)
collectEvent := func(ev interface{}) bool {
ce, ok := ev.(*filter.CollectedEvent)
@ -263,15 +289,12 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
return false
}
if ce.Height > lastHeight {
// update the EMA of events per height when the height increases
if lastHeight != 0 {
eventsPerHeightEma = α*float64(eventsAtCurrentHeight) + (1-α)*eventsPerHeightEma
}
lastHeight = ce.Height
eventsAtCurrentHeight = 0
if ce.Height < minBacklogHeight {
// since we mostly care about buffer size, we only trigger a too-slow close when the buffer
// increases, i.e. we collect a new event
log.Errorf("closing event subscription due to slow event sending rate")
return false
}
eventsAtCurrentHeight++
buffer = append(buffer, &types.ActorEvent{
Entries: ce.Entries,
@ -284,23 +307,11 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
return true
}
// for the case where we have a MaxHeight set, we don't get a signal from the filter when we
// reach that height, so we need to check it ourselves, do it now but also in the loop
if params.MaxHeight > 0 && a.Chain.GetHeaviestTipSet().Height() > params.MaxHeight {
return
}
ticker := time.NewTicker(time.Duration(build.BlockDelaySecs) * time.Second)
ticker := a.clock.Ticker(a.blockDelay)
defer ticker.Stop()
for ctx.Err() == nil {
if len(buffer) > 0 {
// check if we need to disconnect the client because they've fallen behind, always allow at
// least 8 events in the buffer to provide a little bit of slack
if len(buffer) > 8 && float64(len(buffer)) > eventsPerHeightEma/2 {
log.Errorf("closing event subscription due to slow event sending rate")
return
}
select {
case ev, ok := <-in: // incoming event
if !ok || !collectEvent(ev) {
@ -309,6 +320,12 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
case out <- buffer[0]: // successful send
buffer[0] = nil
buffer = buffer[1:]
case <-ticker.C:
// check that our backlog isn't too big by looking at the oldest event
if buffer[0].Height < minBacklogHeight {
log.Errorf("closing event subscription due to slow event sending rate")
return
}
case <-ctx.Done():
return
}
@ -321,30 +338,30 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
case <-ctx.Done():
return
case <-ticker.C:
if params.MaxHeight > 0 && a.Chain.GetHeaviestTipSet().Height() > params.MaxHeight {
currentHeight := a.chain.GetHeaviestTipSet().Height()
if params.MaxHeight > 0 && currentHeight > params.MaxHeight {
// we've reached the filter's MaxHeight, we're done so we can close the channel
return
}
}
}
if a.clock.Now().After(nextBacklogHeightUpdate) {
minBacklogHeight = a.chain.GetHeaviestTipSet().Height() - 1
nextBacklogHeightUpdate = a.clock.Now().Add(a.blockDelay)
}
}
}()
return out, nil
}
func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, abi.ChainEpoch, abi.ChainEpoch) {
func getCollected(ctx context.Context, f filter.EventFilter) []*types.ActorEvent {
ces := f.TakeCollectedEvents(ctx)
var out []*types.ActorEvent
var min, max abi.ChainEpoch
for _, e := range ces {
if min == 0 || e.Height < min {
min = e.Height
}
if e.Height > max {
max = e.Height
}
out = append(out, &types.ActorEvent{
Entries: e.Entries,
Emitter: e.EmitterAddr,
@ -355,5 +372,5 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve
})
}
return out, min, max
return out
}

View File

@ -1322,7 +1322,7 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan
return minHeight, maxHeight, nil
}
func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) {
func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (filter.EventFilter, error) {
var (
minHeight abi.ChainEpoch
maxHeight abi.ChainEpoch
@ -1465,7 +1465,7 @@ func (e *EthEventHandler) EthUninstallFilter(ctx context.Context, id ethtypes.Et
func (e *EthEventHandler) uninstallFilter(ctx context.Context, f filter.Filter) error {
switch f.(type) {
case *filter.EventFilter:
case filter.EventFilter:
err := e.EventFilterManager.Remove(ctx, f.ID())
if err != nil && !errors.Is(err, filter.ErrFilterNotFound) {
return err

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
@ -163,18 +164,16 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
func ActorEventHandler(enable bool, fevmCfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) {
ee := &full.ActorEventHandler{
MaxFilterHeightRange: abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange),
Chain: cs,
}
if !enable || fevmCfg.Events.DisableRealTimeFilterAPI {
// all Actor events functionality is disabled
return ee, nil
fm = nil
}
ee.EventFilterManager = fm
return ee, nil
return full.NewActorEventHandler(
cs,
fm,
time.Duration(build.BlockDelaySecs)*time.Second,
abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange),
), nil
}
}