From dc0c8639df93c87b80d2794c406eb647bfcaa7be Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 26 Feb 2024 15:10:26 +1100 Subject: [PATCH] Manage event sending rate for SubscribeActorEvents --- node/impl/full/actor_events.go | 183 +++++++++++++++++++++++---------- 1 file changed, 131 insertions(+), 52 deletions(-) diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index 6ee62d3cb..ef6fcec7a 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -3,6 +3,7 @@ package full import ( "context" "fmt" + "time" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -10,6 +11,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/events/filter" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -62,9 +64,11 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types return nil, err } - evs, err := getCollected(ctx, f) - _ = a.EventFilterManager.Remove(ctx, f.ID()) - return evs, err + evs, _, _ := getCollected(ctx, f) + if err := a.EventFilterManager.Remove(ctx, f.ID()); err != nil { + log.Warnf("failed to remove filter: %s", err) + } + return evs, nil } type filterParams struct { @@ -172,75 +176,143 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter return nil, err } - out := make(chan *types.ActorEvent, 25) + // The goal for the code below is to be able to send events on the `out` channel as fast as + // possible and not let it get too far behind the rate at which the events are generated. + // For historical events we see the rate at which they were generated by looking the height range; + // we then make sure that the client can receive them at least twice as fast as they were + // generated so they catch up quick enough to receive new events. + // For ongoing events we use an exponential moving average of the events per height to make sure + // that the client doesn't fall behind. + // In both cases we allow a little bit of slack but need to avoid letting the client bloat the + // buffer too much. + // There is no special handling for reverts, so they will just look like a lot more events per + // epoch and the user has to receive them anyway. + + out := make(chan *types.ActorEvent) go func() { defer func() { - // Tell the caller we're done + // tell the caller we're done close(out) - - // Unsubscribe. fm.ClearSubChannel() - _ = a.EventFilterManager.Remove(ctx, fm.ID()) + if err := a.EventFilterManager.Remove(ctx, fm.ID()); err != nil { + log.Warnf("failed to remove filter: %s", err) + } }() - evs, err := getCollected(ctx, fm) - if err != nil { - log.Errorf("failed to get collected events: %w", err) - return - } + // Handle any historical events that our filter may have picked up ----------------------------- - for _, ev := range evs { - ev := ev - select { - case out <- ev: - case <-ctx.Done(): - return - default: - // TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events - log.Errorf("closing event subscription due to slow reader") - return + evs, minEpoch, maxEpoch := getCollected(ctx, fm) + if len(evs) > 0 { + // must be able to send events at least twice as fast as they were generated + epochRange := maxEpoch - minEpoch + if epochRange <= 0 { + epochRange = 1 + } + eventsPerEpoch := float64(len(evs)) / float64(epochRange) + eventsPerSecond := 2 * eventsPerEpoch / float64(build.BlockDelaySecs) + // a minimum rate of 1 event per second if we don't have many events + if eventsPerSecond < 1 { + eventsPerSecond = 1 + } + + // send events from evs to the out channel and ensure we don't do it slower than eventsPerMs + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + const maxSlowTicks = 3 // slightly forgiving, allow 3 slow ticks (seconds) before giving up + slowTicks := 0 + sentEvents := 0.0 + + for _, ev := range evs { + select { + case out <- ev: + sentEvents++ + case <-ticker.C: + if sentEvents < eventsPerSecond { + slowTicks++ + if slowTicks >= maxSlowTicks { + log.Errorf("closing event subscription due to slow event sending rate") + return + } + } else { + slowTicks = 0 + } + sentEvents = 0 + case <-ctx.Done(): + return + } } } + // Handle ongoing events from the filter ------------------------------------------------------- + in := make(chan interface{}, 256) fm.SetSubChannel(in) + var buffer []*types.ActorEvent + const α = 0.2 // decay factor for the events per height EMA + var eventsPerHeightEma float64 = 256 // exponential moving average of events per height, initially guess at 256 + var lastHeight abi.ChainEpoch // last seen event height + var eventsAtCurrentHeight int // number of events at the current height + + collectEvent := func(ev interface{}) bool { + ce, ok := ev.(*filter.CollectedEvent) + if !ok { + log.Errorf("got unexpected value from event filter: %T", ev) + return false + } + + if ce.Height > lastHeight { + // update the EMA of events per height when the height increases + if lastHeight != 0 { + eventsPerHeightEma = α*float64(eventsAtCurrentHeight) + (1-α)*eventsPerHeightEma + } + lastHeight = ce.Height + eventsAtCurrentHeight = 0 + } + eventsAtCurrentHeight++ + + buffer = append(buffer, &types.ActorEvent{ + Entries: ce.Entries, + Emitter: ce.EmitterAddr, + Reverted: ce.Reverted, + Height: ce.Height, + TipSetKey: ce.TipSetKey, + MsgCid: ce.MsgCid, + }) + return true + } + for ctx.Err() == nil { - select { - case val, ok := <-in: - if !ok { - // Shutting down. + if len(buffer) > 0 { + // check if we need to disconnect the client because they've fallen behind, always allow at + // least 8 events in the buffer to provide a little bit of slack + if len(buffer) > 8 && float64(len(buffer)) > eventsPerHeightEma/2 { + log.Errorf("closing event subscription due to slow event sending rate") return } - ce, ok := val.(*filter.CollectedEvent) - if !ok { - log.Errorf("got unexpected value from event filter: %T", val) - return - } - - ev := &types.ActorEvent{ - Entries: ce.Entries, - Emitter: ce.EmitterAddr, - Reverted: ce.Reverted, - Height: ce.Height, - TipSetKey: ce.TipSetKey, - MsgCid: ce.MsgCid, - } - select { - case out <- ev: - default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate - log.Errorf("closing event subscription due to slow reader") + case ev, ok := <-in: // incoming event + if !ok || !collectEvent(ev) { + return + } + case out <- buffer[0]: // successful send + buffer[0] = nil + buffer = buffer[1:] + case <-ctx.Done(): return } - if len(out) > 5 { - log.Warnf("event subscription is slow, has %d buffered entries", len(out)) + } else { + select { + case ev, ok := <-in: // incoming event + if !ok || !collectEvent(ev) { + return + } + case <-ctx.Done(): + return } - - case <-ctx.Done(): - return } } }() @@ -248,12 +320,19 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter return out, nil } -func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, error) { +func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, abi.ChainEpoch, abi.ChainEpoch) { ces := f.TakeCollectedEvents(ctx) var out []*types.ActorEvent + var min, max abi.ChainEpoch for _, e := range ces { + if min == 0 || e.Height < min { + min = e.Height + } + if e.Height > max { + max = e.Height + } out = append(out, &types.ActorEvent{ Entries: e.Entries, Emitter: e.EmitterAddr, @@ -264,5 +343,5 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve }) } - return out, nil + return out, min, max }