Manage event sending rate for SubscribeActorEvents

This commit is contained in:
Rod Vagg 2024-02-26 15:10:26 +11:00
parent c492b491d7
commit dc0c8639df

View File

@ -3,6 +3,7 @@ package full
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.uber.org/fx" "go.uber.org/fx"
@ -10,6 +11,7 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/events/filter" "github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -62,9 +64,11 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types
return nil, err return nil, err
} }
evs, err := getCollected(ctx, f) evs, _, _ := getCollected(ctx, f)
_ = a.EventFilterManager.Remove(ctx, f.ID()) if err := a.EventFilterManager.Remove(ctx, f.ID()); err != nil {
return evs, err log.Warnf("failed to remove filter: %s", err)
}
return evs, nil
} }
type filterParams struct { type filterParams struct {
@ -172,88 +176,163 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter
return nil, err return nil, err
} }
out := make(chan *types.ActorEvent, 25) // The goal for the code below is to be able to send events on the `out` channel as fast as
// possible and not let it get too far behind the rate at which the events are generated.
// For historical events we see the rate at which they were generated by looking the height range;
// we then make sure that the client can receive them at least twice as fast as they were
// generated so they catch up quick enough to receive new events.
// For ongoing events we use an exponential moving average of the events per height to make sure
// that the client doesn't fall behind.
// In both cases we allow a little bit of slack but need to avoid letting the client bloat the
// buffer too much.
// There is no special handling for reverts, so they will just look like a lot more events per
// epoch and the user has to receive them anyway.
out := make(chan *types.ActorEvent)
go func() { go func() {
defer func() { defer func() {
// Tell the caller we're done // tell the caller we're done
close(out) close(out)
// Unsubscribe.
fm.ClearSubChannel() fm.ClearSubChannel()
_ = a.EventFilterManager.Remove(ctx, fm.ID()) if err := a.EventFilterManager.Remove(ctx, fm.ID()); err != nil {
log.Warnf("failed to remove filter: %s", err)
}
}() }()
evs, err := getCollected(ctx, fm) // Handle any historical events that our filter may have picked up -----------------------------
if err != nil {
log.Errorf("failed to get collected events: %w", err) evs, minEpoch, maxEpoch := getCollected(ctx, fm)
return if len(evs) > 0 {
// must be able to send events at least twice as fast as they were generated
epochRange := maxEpoch - minEpoch
if epochRange <= 0 {
epochRange = 1
}
eventsPerEpoch := float64(len(evs)) / float64(epochRange)
eventsPerSecond := 2 * eventsPerEpoch / float64(build.BlockDelaySecs)
// a minimum rate of 1 event per second if we don't have many events
if eventsPerSecond < 1 {
eventsPerSecond = 1
} }
// send events from evs to the out channel and ensure we don't do it slower than eventsPerMs
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
const maxSlowTicks = 3 // slightly forgiving, allow 3 slow ticks (seconds) before giving up
slowTicks := 0
sentEvents := 0.0
for _, ev := range evs { for _, ev := range evs {
ev := ev
select { select {
case out <- ev: case out <- ev:
sentEvents++
case <-ticker.C:
if sentEvents < eventsPerSecond {
slowTicks++
if slowTicks >= maxSlowTicks {
log.Errorf("closing event subscription due to slow event sending rate")
return
}
} else {
slowTicks = 0
}
sentEvents = 0
case <-ctx.Done(): case <-ctx.Done():
return return
default:
// TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events
log.Errorf("closing event subscription due to slow reader")
return
} }
} }
}
// Handle ongoing events from the filter -------------------------------------------------------
in := make(chan interface{}, 256) in := make(chan interface{}, 256)
fm.SetSubChannel(in) fm.SetSubChannel(in)
for ctx.Err() == nil { var buffer []*types.ActorEvent
select { const α = 0.2 // decay factor for the events per height EMA
case val, ok := <-in: var eventsPerHeightEma float64 = 256 // exponential moving average of events per height, initially guess at 256
var lastHeight abi.ChainEpoch // last seen event height
var eventsAtCurrentHeight int // number of events at the current height
collectEvent := func(ev interface{}) bool {
ce, ok := ev.(*filter.CollectedEvent)
if !ok { if !ok {
// Shutting down. log.Errorf("got unexpected value from event filter: %T", ev)
return return false
} }
ce, ok := val.(*filter.CollectedEvent) if ce.Height > lastHeight {
if !ok { // update the EMA of events per height when the height increases
log.Errorf("got unexpected value from event filter: %T", val) if lastHeight != 0 {
return eventsPerHeightEma = α*float64(eventsAtCurrentHeight) + (1-α)*eventsPerHeightEma
} }
lastHeight = ce.Height
eventsAtCurrentHeight = 0
}
eventsAtCurrentHeight++
ev := &types.ActorEvent{ buffer = append(buffer, &types.ActorEvent{
Entries: ce.Entries, Entries: ce.Entries,
Emitter: ce.EmitterAddr, Emitter: ce.EmitterAddr,
Reverted: ce.Reverted, Reverted: ce.Reverted,
Height: ce.Height, Height: ce.Height,
TipSetKey: ce.TipSetKey, TipSetKey: ce.TipSetKey,
MsgCid: ce.MsgCid, MsgCid: ce.MsgCid,
})
return true
}
for ctx.Err() == nil {
if len(buffer) > 0 {
// check if we need to disconnect the client because they've fallen behind, always allow at
// least 8 events in the buffer to provide a little bit of slack
if len(buffer) > 8 && float64(len(buffer)) > eventsPerHeightEma/2 {
log.Errorf("closing event subscription due to slow event sending rate")
return
} }
select { select {
case out <- ev: case ev, ok := <-in: // incoming event
default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate if !ok || !collectEvent(ev) {
log.Errorf("closing event subscription due to slow reader")
return return
} }
if len(out) > 5 { case out <- buffer[0]: // successful send
log.Warnf("event subscription is slow, has %d buffered entries", len(out)) buffer[0] = nil
} buffer = buffer[1:]
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} else {
select {
case ev, ok := <-in: // incoming event
if !ok || !collectEvent(ev) {
return
}
case <-ctx.Done():
return
}
}
} }
}() }()
return out, nil return out, nil
} }
func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, error) { func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, abi.ChainEpoch, abi.ChainEpoch) {
ces := f.TakeCollectedEvents(ctx) ces := f.TakeCollectedEvents(ctx)
var out []*types.ActorEvent var out []*types.ActorEvent
var min, max abi.ChainEpoch
for _, e := range ces { for _, e := range ces {
if min == 0 || e.Height < min {
min = e.Height
}
if e.Height > max {
max = e.Height
}
out = append(out, &types.ActorEvent{ out = append(out, &types.ActorEvent{
Entries: e.Entries, Entries: e.Entries,
Emitter: e.EmitterAddr, Emitter: e.EmitterAddr,
@ -264,5 +343,5 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve
}) })
} }
return out, nil return out, min, max
} }