improve coalescing

This commit is contained in:
vyzo 2020-11-09 13:35:42 +02:00
parent 378d7a1ad9
commit 185653f85a
4 changed files with 64 additions and 14 deletions

View File

@ -14,7 +14,11 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
var HeadChangeCoalesceDelay = time.Second var (
HeadChangeCoalesceMinDelay = 2 * time.Second
HeadChangeCoalesceMaxDelay = 6 * time.Second
HeadChangeCoalesceMergeInterval = time.Second
)
type Provider interface { type Provider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
@ -38,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
} }
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.ChainStore().SubscribeHeadChanges(store.WrapHeadChangeCoalescer(cb, HeadChangeCoalesceDelay)) mpp.sm.ChainStore().SubscribeHeadChanges(
store.WrapHeadChangeCoalescer(
cb,
HeadChangeCoalesceMinDelay,
HeadChangeCoalesceMaxDelay,
HeadChangeCoalesceMergeInterval,
))
return mpp.sm.ChainStore().GetHeaviestTipSet() return mpp.sm.ChainStore().GetHeaviestTipSet()
} }

View File

@ -8,8 +8,15 @@ import (
) )
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer. // WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee { // minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
c := NewHeadChangeCoalescer(fn, delay) // wait for that long to coalesce more head changes.
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
// more than that.
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
// by min delay and up to max delay total.
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
return c.HeadChange return c.HeadChange
} }
@ -32,7 +39,7 @@ type headChange struct {
} }
// NewHeadChangeCoalescer creates a HeadChangeCoalescer. // NewHeadChangeCoalescer creates a HeadChangeCoalescer.
func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer { func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c := &HeadChangeCoalescer{ c := &HeadChangeCoalescer{
notify: fn, notify: fn,
@ -41,7 +48,7 @@ func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoa
eventq: make(chan headChange), eventq: make(chan headChange),
} }
go c.background(delay) go c.background(minDelay, maxDelay, mergeInterval)
return c return c
} }
@ -71,19 +78,46 @@ func (c *HeadChangeCoalescer) Close() error {
// Implementation details // Implementation details
func (c *HeadChangeCoalescer) background(delay time.Duration) { func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
var timerC <-chan time.Time var timerC <-chan time.Time
var first, last time.Time
for { for {
select { select {
case evt := <-c.eventq: case evt := <-c.eventq:
c.coalesce(evt.revert, evt.apply) c.coalesce(evt.revert, evt.apply)
if timerC == nil {
timerC = time.After(delay) now := time.Now()
last = now
if first.IsZero() {
first = now
} }
case <-timerC: if timerC == nil {
c.dispatch() timerC = time.After(minDelay)
timerC = nil }
case now := <-timerC:
sinceFirst := now.Sub(first)
sinceLast := now.Sub(last)
if sinceLast < mergeInterval && sinceFirst < maxDelay {
// coalesce some more
maxWait := maxDelay - sinceFirst
wait := minDelay
if maxWait < wait {
wait = maxWait
}
timerC = time.After(wait)
} else {
// dispatch
c.dispatch()
first = time.Time{}
last = time.Time{}
timerC = nil
}
case <-c.ctx.Done(): case <-c.ctx.Done():
if c.revert != nil || c.apply != nil { if c.revert != nil || c.apply != nil {

View File

@ -13,7 +13,11 @@ func TestHeadChangeCoalescer(t *testing.T) {
c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error { c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error {
notif <- headChange{apply: apply, revert: revert} notif <- headChange{apply: apply, revert: revert}
return nil return nil
}, 100*time.Millisecond) },
100*time.Millisecond,
200*time.Millisecond,
10*time.Millisecond,
)
defer c.Close() //nolint defer c.Close() //nolint
b0 := mock.MkBlock(nil, 0, 0) b0 := mock.MkBlock(nil, 0, 0)

View File

@ -54,7 +54,9 @@ import (
func init() { func init() {
chain.BootstrapPeerThreshold = 1 chain.BootstrapPeerThreshold = 1
messagepool.HeadChangeCoalesceDelay = time.Microsecond messagepool.HeadChangeCoalesceMinDelay = time.Microsecond
messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond
messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond
} }
func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode { func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {