lotus/chain/store/coalescer.go

214 lines
5.2 KiB
Go
Raw Normal View History

2020-11-02 11:12:32 +00:00
package store
import (
"context"
"time"
"github.com/filecoin-project/lotus/chain/types"
)
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
2020-11-09 11:35:42 +00:00
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
// wait for that long to coalesce more head changes.
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
// more than that.
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
// by min delay and up to max delay total.
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
2020-11-02 11:12:32 +00:00
return c.HeadChange
}
// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
// with pending head changes to reduce state computations from head change notifications.
2020-11-02 11:12:32 +00:00
type HeadChangeCoalescer struct {
notify ReorgNotifee
ctx context.Context
cancel func()
eventq chan headChange
revert []*types.TipSet
apply []*types.TipSet
}
type headChange struct {
revert, apply []*types.TipSet
}
// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
2020-11-09 11:35:42 +00:00
func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
2020-11-02 11:12:32 +00:00
ctx, cancel := context.WithCancel(context.Background())
c := &HeadChangeCoalescer{
notify: fn,
ctx: ctx,
cancel: cancel,
eventq: make(chan headChange),
}
2020-11-09 11:35:42 +00:00
go c.background(minDelay, maxDelay, mergeInterval)
2020-11-02 11:12:32 +00:00
return c
}
// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
// head change and schedules dispatch of a coalesced head change in the background.
2020-11-02 11:12:32 +00:00
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
select {
case c.eventq <- headChange{revert: revert, apply: apply}:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
// Close closes the coalescer and cancels the background dispatch goroutine.
// Any further notification will result in an error.
2020-11-04 11:58:44 +00:00
func (c *HeadChangeCoalescer) Close() error {
2020-11-02 11:12:32 +00:00
select {
case <-c.ctx.Done():
default:
c.cancel()
}
2020-11-04 11:58:44 +00:00
return nil
2020-11-02 11:12:32 +00:00
}
// Implementation details
2020-11-09 11:35:42 +00:00
func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
2020-11-02 11:12:32 +00:00
var timerC <-chan time.Time
2020-11-09 11:35:42 +00:00
var first, last time.Time
2020-11-02 11:12:32 +00:00
for {
select {
case evt := <-c.eventq:
c.coalesce(evt.revert, evt.apply)
2020-11-09 11:35:42 +00:00
now := time.Now()
last = now
if first.IsZero() {
first = now
}
2020-11-02 11:12:32 +00:00
if timerC == nil {
2020-11-09 11:35:42 +00:00
timerC = time.After(minDelay)
2020-11-02 11:12:32 +00:00
}
2020-11-09 11:35:42 +00:00
case now := <-timerC:
sinceFirst := now.Sub(first)
sinceLast := now.Sub(last)
if sinceLast < mergeInterval && sinceFirst < maxDelay {
// coalesce some more
maxWait := maxDelay - sinceFirst
wait := minDelay
if maxWait < wait {
wait = maxWait
}
timerC = time.After(wait)
} else {
// dispatch
c.dispatch()
first = time.Time{}
last = time.Time{}
timerC = nil
}
2020-11-02 11:12:32 +00:00
case <-c.ctx.Done():
if c.revert != nil || c.apply != nil {
c.dispatch()
}
2020-11-02 11:12:32 +00:00
return
}
}
}
func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) {
// newly reverted tipsets cancel out with pending applys.
// similarly, newly applied tipsets cancel out with pending reverts.
2020-11-02 11:12:32 +00:00
// pending tipsets
pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert))
for _, ts := range c.revert {
pendRevert[ts.Key()] = struct{}{}
2020-11-02 11:12:32 +00:00
}
pendApply := make(map[types.TipSetKey]struct{}, len(c.apply))
for _, ts := range c.apply {
pendApply[ts.Key()] = struct{}{}
}
2020-11-02 11:12:32 +00:00
// incoming tipsets
reverting := make(map[types.TipSetKey]struct{}, len(revert))
2020-11-02 11:12:32 +00:00
for _, ts := range revert {
reverting[ts.Key()] = struct{}{}
}
applying := make(map[types.TipSetKey]struct{}, len(apply))
for _, ts := range apply {
applying[ts.Key()] = struct{}{}
}
// coalesced revert set
// - pending reverts are cancelled by incoming applys
// - incoming reverts are cancelled by pending applys
newRevert := c.merge(c.revert, revert, pendApply, applying)
// coalesced apply set
// - pending applys are cancelled by incoming reverts
// - incoming applys are cancelled by pending reverts
newApply := c.merge(c.apply, apply, pendRevert, reverting)
// commit the coalesced sets
c.revert = newRevert
c.apply = newApply
}
func (c *HeadChangeCoalescer) merge(pend, incoming []*types.TipSet, cancel1, cancel2 map[types.TipSetKey]struct{}) []*types.TipSet {
result := make([]*types.TipSet, 0, len(pend)+len(incoming))
for _, ts := range pend {
_, cancel := cancel1[ts.Key()]
if cancel {
continue
}
2020-11-02 11:12:32 +00:00
_, cancel = cancel2[ts.Key()]
if cancel {
2020-11-02 11:12:32 +00:00
continue
}
result = append(result, ts)
2020-11-02 11:12:32 +00:00
}
for _, ts := range incoming {
_, cancel := cancel1[ts.Key()]
if cancel {
continue
}
_, cancel = cancel2[ts.Key()]
if cancel {
2020-11-02 11:12:32 +00:00
continue
}
result = append(result, ts)
2020-11-02 11:12:32 +00:00
}
return result
2020-11-02 11:12:32 +00:00
}
func (c *HeadChangeCoalescer) dispatch() {
err := c.notify(c.revert, c.apply)
if err != nil {
log.Errorf("error dispatching coalesced head change notification: %s", err)
}
c.revert = nil
c.apply = nil
}