Fix concerns and docs identified by review

This commit is contained in:
Rod Vagg 2024-03-05 15:49:04 +11:00 committed by Phi-rjan
parent 61dd381a3d
commit a0483a4950
6 changed files with 40 additions and 30 deletions

View File

@ -909,14 +909,17 @@ type FullNode interface {
// Actor events
// GetActorEvents returns all FVM and built-in Actor events that match the given filter.
// GetActorEvents returns all user-programmed and built-in actor events that match the given
// filter.
// This is a request/response API.
// Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
// configuration options and also the amount of historical data available in the node.
//
// This is an EXPERIMENTAL API and may be subject to change.
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read
// SubscribeActorEvents returns a long-lived stream of all FVM and built-in Actor events that
// match the given filter.
// SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor
// events that match the given filter.
// Events that match the given filter are written to the stream in real-time as they are emitted
// from the FVM.
// The response stream is closed when the client disconnects, when a ToHeight is specified and is
@ -926,7 +929,9 @@ type FullNode interface {
// FromHeight.
// Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
// configuration options and also the amount of historical data available in the node.
// NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW
//
// Note: this API is only available via websocket connections.
// This is an EXPERIMENTAL API and may be subject to change.
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
}

Binary file not shown.

View File

@ -41,7 +41,7 @@ type eventFilter struct {
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
tipsetCid cid.Cid
addresses []address.Address // list of f4 actor addresses that are extpected to emit the event
addresses []address.Address // list of actor addresses that are extpected to emit the event
keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match
maxResults int // maximum number of results to collect, 0 is unlimited
@ -56,7 +56,7 @@ var _ Filter = (*eventFilter)(nil)
type CollectedEvent struct {
Entries []types.EventEntry
EmitterAddr address.Address // f4 address of emitter
EmitterAddr address.Address // address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
Height abi.ChainEpoch

View File

@ -3390,11 +3390,14 @@ Response:
### GetActorEvents
GetActorEvents returns all FVM and built-in Actor events that match the given filter.
GetActorEvents returns all user-programmed and built-in actor events that match the given
filter.
This is a request/response API.
Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
configuration options and also the amount of historical data available in the node.
This is an EXPERIMENTAL API and may be subject to change.
Perms: read
@ -8829,8 +8832,8 @@ Response:
### SubscribeActorEvents
SubscribeActorEvents returns a long-lived stream of all FVM and built-in Actor events that
match the given filter.
SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor
events that match the given filter.
Events that match the given filter are written to the stream in real-time as they are emitted
from the FVM.
The response stream is closed when the client disconnects, when a ToHeight is specified and is
@ -8840,7 +8843,9 @@ real-time events are written to the response stream if the filter specifies an e
FromHeight.
Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
configuration options and also the amount of historical data available in the node.
NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW
Note: this API is only available via websocket connections.
This is an EXPERIMENTAL API and may be subject to change.
Perms: read

View File

@ -232,6 +232,11 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
out := make(chan *types.ActorEvent)
// When we start sending real-time events, we want to make sure that we don't fall behind more
// than one epoch's worth of events (approximately). Capture this value now, before we send
// historical events to allow for a little bit of slack in the historical event sending.
minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1
go func() {
defer func() {
// tell the caller we're done
@ -242,11 +247,6 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
}
}()
// When we start sending real-time events, we want to make sure that we don't fall behind more
// than one epoch's worth of events (approximately). Capture this value now, before we send
// historical events to allow for a little bit of slack in the historical event sending.
minBacklogHeight := a.chain.GetHeaviestTipSet().Height() - 1
// Handle any historical events that our filter may have picked up -----------------------------
evs := getCollected(ctx, fm)
@ -270,7 +270,7 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
// for the case where we have a MaxHeight set, we don't get a signal from the filter when we
// reach that height, so we need to check it ourselves, do it now but also in the loop
if params.MaxHeight > 0 && a.chain.GetHeaviestTipSet().Height() > params.MaxHeight {
if params.MaxHeight > 0 && minBacklogHeight+1 >= params.MaxHeight {
return
}

View File

@ -246,9 +246,6 @@ func TestGetActorEvents(t *testing.T) {
}
func TestSubscribeActorEvents(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
const (
seed = 984651320
maxFilterHeightRange = 100
@ -279,14 +276,13 @@ func TestSubscribeActorEvents(t *testing.T) {
{"1.5 block speed", blockDelay * 3 / 2, false, -1},
{"twice block speed", blockDelay * 2, false, -1},
} {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
tc := tc
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mockClock.Set(time.Now())
mockFilterManager := newMockEventFilterManager(t)
allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight)
@ -326,7 +322,10 @@ func TestSubscribeActorEvents(t *testing.T) {
// Ticker to simulate both time and the chain advancing, including emitting events at
// the right time directly to the filter.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for thisHeight := int64(currentHeight); ctx.Err() == nil; thisHeight++ {
ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, thisHeight)})
req.NoError(err)
@ -334,9 +333,9 @@ func TestSubscribeActorEvents(t *testing.T) {
var eventsThisEpoch []*filter.CollectedEvent
if thisHeight <= finishHeight {
eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+1)*eventsPerEpoch]
eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+2)*eventsPerEpoch]
}
for i := 0; i < eventsPerEpoch; i++ {
for i := 0; i < eventsPerEpoch && ctx.Err() == nil; i++ {
if len(eventsThisEpoch) > 0 {
mockFilter.sendEventToChannel(eventsThisEpoch[0])
eventsThisEpoch = eventsThisEpoch[1:]
@ -406,14 +405,14 @@ func TestSubscribeActorEvents(t *testing.T) {
// cleanup
mockFilter.requireClearSubChannelCalledEventually(500 * time.Millisecond)
mockFilterManager.requireRemovedEventually(mockFilter.ID(), 500*time.Millisecond)
cancel()
wg.Wait() // wait for the chain to stop advancing
})
}
}
func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
// Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
const (
seed = 984651320
maxFilterHeightRange = 100
@ -439,6 +438,8 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
{"1.5 block speed", 1.5, false},
{"twice block speed", 2, false},
} {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
tc := tc
t.Run(tc.name, func(t *testing.T) {
@ -446,7 +447,7 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
mockClock.Set(time.Now())
mockFilterManager := newMockEventFilterManager(t)
allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight-1)
allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight)
mockFilter := newMockFilter(ctx, t, rng, allEvents)
mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter)
@ -464,11 +465,10 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
// assume we can cleanly pick up all historical events in one go
receiveLoop:
for len(gotEvents) < len(allEvents) && ctx.Err() == nil {
for ctx.Err() == nil {
select {
case e, ok := <-eventChan:
if tc.expectComplete || ok {
req.True(ok)
if ok {
gotEvents = append(gotEvents, e)
mockClock.Add(time.Duration(float64(blockDelay) * tc.blockTimeToComplete / float64(len(allEvents))))
// no need to advance the chain, we're also testing that's not necessary