diff --git a/api/api_full.go b/api/api_full.go index c345bb12a..aa7f57e67 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -909,14 +909,17 @@ type FullNode interface { // Actor events - // GetActorEvents returns all FVM and built-in Actor events that match the given filter. + // GetActorEvents returns all user-programmed and built-in actor events that match the given + // filter. // This is a request/response API. // Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange // configuration options and also the amount of historical data available in the node. + // + // This is an EXPERIMENTAL API and may be subject to change. GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read - // SubscribeActorEvents returns a long-lived stream of all FVM and built-in Actor events that - // match the given filter. + // SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor + // events that match the given filter. // Events that match the given filter are written to the stream in real-time as they are emitted // from the FVM. // The response stream is closed when the client disconnects, when a ToHeight is specified and is @@ -926,7 +929,9 @@ type FullNode interface { // FromHeight. // Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange // configuration options and also the amount of historical data available in the node. - // NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW + // + // Note: this API is only available via websocket connections. + // This is an EXPERIMENTAL API and may be subject to change. SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read } diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 97cd6f3d6..deaef44c0 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 449b6ae18..1669d840e 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -41,7 +41,7 @@ type eventFilter struct { minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum tipsetCid cid.Cid - addresses []address.Address // list of f4 actor addresses that are extpected to emit the event + addresses []address.Address // list of actor addresses that are extpected to emit the event keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match maxResults int // maximum number of results to collect, 0 is unlimited @@ -56,7 +56,7 @@ var _ Filter = (*eventFilter)(nil) type CollectedEvent struct { Entries []types.EventEntry - EmitterAddr address.Address // f4 address of emitter + EmitterAddr address.Address // address of emitter EventIdx int // index of the event within the list of emitted events Reverted bool Height abi.ChainEpoch diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 5aa9506cb..4c1dbdee4 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -3390,11 +3390,14 @@ Response: ### GetActorEvents -GetActorEvents returns all FVM and built-in Actor events that match the given filter. +GetActorEvents returns all user-programmed and built-in actor events that match the given +filter. This is a request/response API. Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange configuration options and also the amount of historical data available in the node. +This is an EXPERIMENTAL API and may be subject to change. + Perms: read @@ -8829,8 +8832,8 @@ Response: ### SubscribeActorEvents -SubscribeActorEvents returns a long-lived stream of all FVM and built-in Actor events that -match the given filter. +SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor +events that match the given filter. Events that match the given filter are written to the stream in real-time as they are emitted from the FVM. The response stream is closed when the client disconnects, when a ToHeight is specified and is @@ -8840,7 +8843,9 @@ real-time events are written to the response stream if the filter specifies an e FromHeight. Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange configuration options and also the amount of historical data available in the node. -NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW + +Note: this API is only available via websocket connections. +This is an EXPERIMENTAL API and may be subject to change. Perms: read diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index 8adfda782..fecd1d2b6 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -232,6 +232,11 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter out := make(chan *types.ActorEvent) + // When we start sending real-time events, we want to make sure that we don't fall behind more + // than one epoch's worth of events (approximately). Capture this value now, before we send + // historical events to allow for a little bit of slack in the historical event sending. + minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1 + go func() { defer func() { // tell the caller we're done @@ -242,11 +247,6 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter } }() - // When we start sending real-time events, we want to make sure that we don't fall behind more - // than one epoch's worth of events (approximately). Capture this value now, before we send - // historical events to allow for a little bit of slack in the historical event sending. - minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1 - // Handle any historical events that our filter may have picked up ----------------------------- evs := getCollected(ctx, fm) @@ -270,7 +270,7 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter // for the case where we have a MaxHeight set, we don't get a signal from the filter when we // reach that height, so we need to check it ourselves, do it now but also in the loop - if params.MaxHeight > 0 && a.chain.GetHeaviestTipSet().Height() > params.MaxHeight { + if params.MaxHeight > 0 && minBacklogHeight+1 >= params.MaxHeight { return } diff --git a/node/impl/full/actor_events_test.go b/node/impl/full/actor_events_test.go index 2bd4f62cf..ab446e57b 100644 --- a/node/impl/full/actor_events_test.go +++ b/node/impl/full/actor_events_test.go @@ -246,9 +246,6 @@ func TestGetActorEvents(t *testing.T) { } func TestSubscribeActorEvents(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - const ( seed = 984651320 maxFilterHeightRange = 100 @@ -279,14 +276,13 @@ func TestSubscribeActorEvents(t *testing.T) { {"1.5 block speed", blockDelay * 3 / 2, false, -1}, {"twice block speed", blockDelay * 2, false, -1}, } { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() tc := tc t.Run(tc.name, func(t *testing.T) { req := require.New(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - mockClock.Set(time.Now()) mockFilterManager := newMockEventFilterManager(t) allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight) @@ -326,7 +322,10 @@ func TestSubscribeActorEvents(t *testing.T) { // Ticker to simulate both time and the chain advancing, including emitting events at // the right time directly to the filter. + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for thisHeight := int64(currentHeight); ctx.Err() == nil; thisHeight++ { ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, thisHeight)}) req.NoError(err) @@ -334,9 +333,9 @@ func TestSubscribeActorEvents(t *testing.T) { var eventsThisEpoch []*filter.CollectedEvent if thisHeight <= finishHeight { - eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+1)*eventsPerEpoch] + eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+2)*eventsPerEpoch] } - for i := 0; i < eventsPerEpoch; i++ { + for i := 0; i < eventsPerEpoch && ctx.Err() == nil; i++ { if len(eventsThisEpoch) > 0 { mockFilter.sendEventToChannel(eventsThisEpoch[0]) eventsThisEpoch = eventsThisEpoch[1:] @@ -406,14 +405,14 @@ func TestSubscribeActorEvents(t *testing.T) { // cleanup mockFilter.requireClearSubChannelCalledEventually(500 * time.Millisecond) mockFilterManager.requireRemovedEventually(mockFilter.ID(), 500*time.Millisecond) + cancel() + wg.Wait() // wait for the chain to stop advancing }) } } func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { // Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() const ( seed = 984651320 maxFilterHeightRange = 100 @@ -439,6 +438,8 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { {"1.5 block speed", 1.5, false}, {"twice block speed", 2, false}, } { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() tc := tc t.Run(tc.name, func(t *testing.T) { @@ -446,7 +447,7 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { mockClock.Set(time.Now()) mockFilterManager := newMockEventFilterManager(t) - allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight-1) + allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight) mockFilter := newMockFilter(ctx, t, rng, allEvents) mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter) @@ -464,11 +465,10 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { // assume we can cleanly pick up all historical events in one go receiveLoop: - for len(gotEvents) < len(allEvents) && ctx.Err() == nil { + for ctx.Err() == nil { select { case e, ok := <-eventChan: - if tc.expectComplete || ok { - req.True(ok) + if ok { gotEvents = append(gotEvents, e) mockClock.Add(time.Duration(float64(blockDelay) * tc.blockTimeToComplete / float64(len(allEvents)))) // no need to advance the chain, we're also testing that's not necessary