Merge pull request #4688 from filecoin-project/feat/head-change-coalscer
head change coalescer
This commit is contained in:
commit
2ae0edc4df
@ -2,6 +2,7 @@ package messagepool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
@ -9,9 +10,16 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
var (
|
||||
HeadChangeCoalesceMinDelay = 2 * time.Second
|
||||
HeadChangeCoalesceMaxDelay = 6 * time.Second
|
||||
HeadChangeCoalesceMergeInterval = time.Second
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
||||
@ -34,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||
}
|
||||
|
||||
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
||||
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
||||
mpp.sm.ChainStore().SubscribeHeadChanges(
|
||||
store.WrapHeadChangeCoalescer(
|
||||
cb,
|
||||
HeadChangeCoalesceMinDelay,
|
||||
HeadChangeCoalesceMaxDelay,
|
||||
HeadChangeCoalesceMergeInterval,
|
||||
))
|
||||
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
|
214
chain/store/coalescer.go
Normal file
214
chain/store/coalescer.go
Normal file
@ -0,0 +1,214 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
|
||||
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
|
||||
// wait for that long to coalesce more head changes.
|
||||
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
|
||||
// more than that.
|
||||
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
|
||||
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
|
||||
// by min delay and up to max delay total.
|
||||
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
|
||||
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
|
||||
return c.HeadChange
|
||||
}
|
||||
|
||||
// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
|
||||
// with pending head changes to reduce state computations from head change notifications.
|
||||
type HeadChangeCoalescer struct {
|
||||
notify ReorgNotifee
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
eventq chan headChange
|
||||
|
||||
revert []*types.TipSet
|
||||
apply []*types.TipSet
|
||||
}
|
||||
|
||||
type headChange struct {
|
||||
revert, apply []*types.TipSet
|
||||
}
|
||||
|
||||
// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
|
||||
func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &HeadChangeCoalescer{
|
||||
notify: fn,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
eventq: make(chan headChange),
|
||||
}
|
||||
|
||||
go c.background(minDelay, maxDelay, mergeInterval)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
|
||||
// head change and schedules dispatch of a coalesced head change in the background.
|
||||
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
|
||||
select {
|
||||
case c.eventq <- headChange{revert: revert, apply: apply}:
|
||||
return nil
|
||||
case <-c.ctx.Done():
|
||||
return c.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the coalescer and cancels the background dispatch goroutine.
|
||||
// Any further notification will result in an error.
|
||||
func (c *HeadChangeCoalescer) Close() error {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
default:
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implementation details
|
||||
|
||||
func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
|
||||
var timerC <-chan time.Time
|
||||
var first, last time.Time
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-c.eventq:
|
||||
c.coalesce(evt.revert, evt.apply)
|
||||
|
||||
now := time.Now()
|
||||
last = now
|
||||
if first.IsZero() {
|
||||
first = now
|
||||
}
|
||||
|
||||
if timerC == nil {
|
||||
timerC = time.After(minDelay)
|
||||
}
|
||||
|
||||
case now := <-timerC:
|
||||
sinceFirst := now.Sub(first)
|
||||
sinceLast := now.Sub(last)
|
||||
|
||||
if sinceLast < mergeInterval && sinceFirst < maxDelay {
|
||||
// coalesce some more
|
||||
maxWait := maxDelay - sinceFirst
|
||||
wait := minDelay
|
||||
if maxWait < wait {
|
||||
wait = maxWait
|
||||
}
|
||||
|
||||
timerC = time.After(wait)
|
||||
} else {
|
||||
// dispatch
|
||||
c.dispatch()
|
||||
|
||||
first = time.Time{}
|
||||
last = time.Time{}
|
||||
timerC = nil
|
||||
}
|
||||
|
||||
case <-c.ctx.Done():
|
||||
if c.revert != nil || c.apply != nil {
|
||||
c.dispatch()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) {
|
||||
// newly reverted tipsets cancel out with pending applys.
|
||||
// similarly, newly applied tipsets cancel out with pending reverts.
|
||||
|
||||
// pending tipsets
|
||||
pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert))
|
||||
for _, ts := range c.revert {
|
||||
pendRevert[ts.Key()] = struct{}{}
|
||||
}
|
||||
|
||||
pendApply := make(map[types.TipSetKey]struct{}, len(c.apply))
|
||||
for _, ts := range c.apply {
|
||||
pendApply[ts.Key()] = struct{}{}
|
||||
}
|
||||
|
||||
// incoming tipsets
|
||||
reverting := make(map[types.TipSetKey]struct{}, len(revert))
|
||||
for _, ts := range revert {
|
||||
reverting[ts.Key()] = struct{}{}
|
||||
}
|
||||
|
||||
applying := make(map[types.TipSetKey]struct{}, len(apply))
|
||||
for _, ts := range apply {
|
||||
applying[ts.Key()] = struct{}{}
|
||||
}
|
||||
|
||||
// coalesced revert set
|
||||
// - pending reverts are cancelled by incoming applys
|
||||
// - incoming reverts are cancelled by pending applys
|
||||
newRevert := make([]*types.TipSet, 0, len(c.revert)+len(revert))
|
||||
for _, ts := range c.revert {
|
||||
_, cancel := applying[ts.Key()]
|
||||
if cancel {
|
||||
continue
|
||||
}
|
||||
|
||||
newRevert = append(newRevert, ts)
|
||||
}
|
||||
|
||||
for _, ts := range revert {
|
||||
_, cancel := pendApply[ts.Key()]
|
||||
if cancel {
|
||||
continue
|
||||
}
|
||||
|
||||
newRevert = append(newRevert, ts)
|
||||
}
|
||||
|
||||
// coalesced apply set
|
||||
// - pending applys are cancelled by incoming reverts
|
||||
// - incoming applys are cancelled by pending reverts
|
||||
newApply := make([]*types.TipSet, 0, len(c.apply)+len(apply))
|
||||
for _, ts := range c.apply {
|
||||
_, cancel := reverting[ts.Key()]
|
||||
if cancel {
|
||||
continue
|
||||
}
|
||||
|
||||
newApply = append(newApply, ts)
|
||||
}
|
||||
|
||||
for _, ts := range apply {
|
||||
_, cancel := pendRevert[ts.Key()]
|
||||
if cancel {
|
||||
continue
|
||||
}
|
||||
|
||||
newApply = append(newApply, ts)
|
||||
}
|
||||
|
||||
// commit the coalesced sets
|
||||
c.revert = newRevert
|
||||
c.apply = newApply
|
||||
}
|
||||
|
||||
func (c *HeadChangeCoalescer) dispatch() {
|
||||
err := c.notify(c.revert, c.apply)
|
||||
if err != nil {
|
||||
log.Errorf("error dispatching coalesced head change notification: %s", err)
|
||||
}
|
||||
|
||||
c.revert = nil
|
||||
c.apply = nil
|
||||
}
|
72
chain/store/coalescer_test.go
Normal file
72
chain/store/coalescer_test.go
Normal file
@ -0,0 +1,72 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
)
|
||||
|
||||
func TestHeadChangeCoalescer(t *testing.T) {
|
||||
notif := make(chan headChange, 1)
|
||||
c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error {
|
||||
notif <- headChange{apply: apply, revert: revert}
|
||||
return nil
|
||||
},
|
||||
100*time.Millisecond,
|
||||
200*time.Millisecond,
|
||||
10*time.Millisecond,
|
||||
)
|
||||
defer c.Close() //nolint
|
||||
|
||||
b0 := mock.MkBlock(nil, 0, 0)
|
||||
root := mock.TipSet(b0)
|
||||
bA := mock.MkBlock(root, 1, 1)
|
||||
tA := mock.TipSet(bA)
|
||||
bB := mock.MkBlock(root, 1, 2)
|
||||
tB := mock.TipSet(bB)
|
||||
tAB := mock.TipSet(bA, bB)
|
||||
bC := mock.MkBlock(root, 1, 3)
|
||||
tABC := mock.TipSet(bA, bB, bC)
|
||||
bD := mock.MkBlock(root, 1, 4)
|
||||
tABCD := mock.TipSet(bA, bB, bC, bD)
|
||||
bE := mock.MkBlock(root, 1, 5)
|
||||
tABCDE := mock.TipSet(bA, bB, bC, bD, bE)
|
||||
|
||||
c.HeadChange(nil, []*types.TipSet{tA}) //nolint
|
||||
c.HeadChange(nil, []*types.TipSet{tB}) //nolint
|
||||
c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) //nolint
|
||||
c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) //nolint
|
||||
|
||||
change := <-notif
|
||||
|
||||
if len(change.revert) != 0 {
|
||||
t.Fatalf("expected empty revert set but got %d elements", len(change.revert))
|
||||
}
|
||||
if len(change.apply) != 1 {
|
||||
t.Fatalf("expected single element apply set but got %d elements", len(change.apply))
|
||||
}
|
||||
if change.apply[0] != tABC {
|
||||
t.Fatalf("expected to apply tABC")
|
||||
}
|
||||
|
||||
c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) //nolint
|
||||
c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) //nolint
|
||||
|
||||
change = <-notif
|
||||
|
||||
if len(change.revert) != 1 {
|
||||
t.Fatalf("expected single element revert set but got %d elements", len(change.revert))
|
||||
}
|
||||
if change.revert[0] != tABC {
|
||||
t.Fatalf("expected to revert tABC")
|
||||
}
|
||||
if len(change.apply) != 1 {
|
||||
t.Fatalf("expected single element apply set but got %d elements", len(change.apply))
|
||||
}
|
||||
if change.apply[0] != tABCDE {
|
||||
t.Fatalf("expected to revert tABC")
|
||||
}
|
||||
|
||||
}
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/chain/gen"
|
||||
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||
@ -53,6 +54,9 @@ import (
|
||||
|
||||
func init() {
|
||||
chain.BootstrapPeerThreshold = 1
|
||||
messagepool.HeadChangeCoalesceMinDelay = time.Microsecond
|
||||
messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond
|
||||
messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond
|
||||
}
|
||||
|
||||
func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {
|
||||
|
Loading…
Reference in New Issue
Block a user