diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 97a03e264..5a6c751bc 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -14,7 +14,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -var HeadChangeCoalesceDelay = time.Second +var ( + HeadChangeCoalesceMinDelay = 2 * time.Second + HeadChangeCoalesceMaxDelay = 6 * time.Second + HeadChangeCoalesceMergeInterval = time.Second +) type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet @@ -38,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { } func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { - mpp.sm.ChainStore().SubscribeHeadChanges(store.WrapHeadChangeCoalescer(cb, HeadChangeCoalesceDelay)) + mpp.sm.ChainStore().SubscribeHeadChanges( + store.WrapHeadChangeCoalescer( + cb, + HeadChangeCoalesceMinDelay, + HeadChangeCoalesceMaxDelay, + HeadChangeCoalesceMergeInterval, + )) return mpp.sm.ChainStore().GetHeaviestTipSet() } diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go index 8e30c468c..443359c8a 100644 --- a/chain/store/coalescer.go +++ b/chain/store/coalescer.go @@ -8,8 +8,15 @@ import ( ) // WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer. -func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee { - c := NewHeadChangeCoalescer(fn, delay) +// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will +// wait for that long to coalesce more head changes. +// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change +// more than that. +// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was +// within the merge interval when the coalesce timer fires, then the coalesce time is extended +// by min delay and up to max delay total. +func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee { + c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval) return c.HeadChange } @@ -32,7 +39,7 @@ type headChange struct { } // NewHeadChangeCoalescer creates a HeadChangeCoalescer. -func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer { +func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer { ctx, cancel := context.WithCancel(context.Background()) c := &HeadChangeCoalescer{ notify: fn, @@ -41,7 +48,7 @@ func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoa eventq: make(chan headChange), } - go c.background(delay) + go c.background(minDelay, maxDelay, mergeInterval) return c } @@ -71,19 +78,46 @@ func (c *HeadChangeCoalescer) Close() error { // Implementation details -func (c *HeadChangeCoalescer) background(delay time.Duration) { +func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) { var timerC <-chan time.Time + var first, last time.Time + for { select { case evt := <-c.eventq: c.coalesce(evt.revert, evt.apply) - if timerC == nil { - timerC = time.After(delay) + + now := time.Now() + last = now + if first.IsZero() { + first = now } - case <-timerC: - c.dispatch() - timerC = nil + if timerC == nil { + timerC = time.After(minDelay) + } + + case now := <-timerC: + sinceFirst := now.Sub(first) + sinceLast := now.Sub(last) + + if sinceLast < mergeInterval && sinceFirst < maxDelay { + // coalesce some more + maxWait := maxDelay - sinceFirst + wait := minDelay + if maxWait < wait { + wait = maxWait + } + + timerC = time.After(wait) + } else { + // dispatch + c.dispatch() + + first = time.Time{} + last = time.Time{} + timerC = nil + } case <-c.ctx.Done(): if c.revert != nil || c.apply != nil { diff --git a/chain/store/coalescer_test.go b/chain/store/coalescer_test.go index a93601954..d46285108 100644 --- a/chain/store/coalescer_test.go +++ b/chain/store/coalescer_test.go @@ -13,7 +13,11 @@ func TestHeadChangeCoalescer(t *testing.T) { c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error { notif <- headChange{apply: apply, revert: revert} return nil - }, 100*time.Millisecond) + }, + 100*time.Millisecond, + 200*time.Millisecond, + 10*time.Millisecond, + ) defer c.Close() //nolint b0 := mock.MkBlock(nil, 0, 0) diff --git a/node/test/builder.go b/node/test/builder.go index 727f46098..cf38792da 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -54,7 +54,9 @@ import ( func init() { chain.BootstrapPeerThreshold = 1 - messagepool.HeadChangeCoalesceDelay = time.Microsecond + messagepool.HeadChangeCoalesceMinDelay = time.Microsecond + messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond + messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond } func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {