214 lines
5.2 KiB
Go
214 lines
5.2 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
|
|
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
|
|
// wait for that long to coalesce more head changes.
|
|
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
|
|
// more than that.
|
|
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
|
|
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
|
|
// by min delay and up to max delay total.
|
|
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
|
|
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
|
|
return c.HeadChange
|
|
}
|
|
|
|
// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
|
|
// with pending head changes to reduce state computations from head change notifications.
|
|
type HeadChangeCoalescer struct {
|
|
notify ReorgNotifee
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
eventq chan headChange
|
|
|
|
revert []*types.TipSet
|
|
apply []*types.TipSet
|
|
}
|
|
|
|
type headChange struct {
|
|
revert, apply []*types.TipSet
|
|
}
|
|
|
|
// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
|
|
func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c := &HeadChangeCoalescer{
|
|
notify: fn,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
eventq: make(chan headChange),
|
|
}
|
|
|
|
go c.background(minDelay, maxDelay, mergeInterval)
|
|
|
|
return c
|
|
}
|
|
|
|
// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
|
|
// head change and schedules dispatch of a coalesced head change in the background.
|
|
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
|
|
select {
|
|
case c.eventq <- headChange{revert: revert, apply: apply}:
|
|
return nil
|
|
case <-c.ctx.Done():
|
|
return c.ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Close closes the coalescer and cancels the background dispatch goroutine.
|
|
// Any further notification will result in an error.
|
|
func (c *HeadChangeCoalescer) Close() error {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
default:
|
|
c.cancel()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Implementation details
|
|
|
|
func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
|
|
var timerC <-chan time.Time
|
|
var first, last time.Time
|
|
|
|
for {
|
|
select {
|
|
case evt := <-c.eventq:
|
|
c.coalesce(evt.revert, evt.apply)
|
|
|
|
now := time.Now()
|
|
last = now
|
|
if first.IsZero() {
|
|
first = now
|
|
}
|
|
|
|
if timerC == nil {
|
|
timerC = time.After(minDelay)
|
|
}
|
|
|
|
case now := <-timerC:
|
|
sinceFirst := now.Sub(first)
|
|
sinceLast := now.Sub(last)
|
|
|
|
if sinceLast < mergeInterval && sinceFirst < maxDelay {
|
|
// coalesce some more
|
|
maxWait := maxDelay - sinceFirst
|
|
wait := minDelay
|
|
if maxWait < wait {
|
|
wait = maxWait
|
|
}
|
|
|
|
timerC = time.After(wait)
|
|
} else {
|
|
// dispatch
|
|
c.dispatch()
|
|
|
|
first = time.Time{}
|
|
last = time.Time{}
|
|
timerC = nil
|
|
}
|
|
|
|
case <-c.ctx.Done():
|
|
if c.revert != nil || c.apply != nil {
|
|
c.dispatch()
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) {
|
|
// newly reverted tipsets cancel out with pending applys.
|
|
// similarly, newly applied tipsets cancel out with pending reverts.
|
|
|
|
// pending tipsets
|
|
pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert))
|
|
for _, ts := range c.revert {
|
|
pendRevert[ts.Key()] = struct{}{}
|
|
}
|
|
|
|
pendApply := make(map[types.TipSetKey]struct{}, len(c.apply))
|
|
for _, ts := range c.apply {
|
|
pendApply[ts.Key()] = struct{}{}
|
|
}
|
|
|
|
// incoming tipsets
|
|
reverting := make(map[types.TipSetKey]struct{}, len(revert))
|
|
for _, ts := range revert {
|
|
reverting[ts.Key()] = struct{}{}
|
|
}
|
|
|
|
applying := make(map[types.TipSetKey]struct{}, len(apply))
|
|
for _, ts := range apply {
|
|
applying[ts.Key()] = struct{}{}
|
|
}
|
|
|
|
// coalesced revert set
|
|
// - pending reverts are cancelled by incoming applys
|
|
// - incoming reverts are cancelled by pending applys
|
|
newRevert := c.merge(c.revert, revert, pendApply, applying)
|
|
|
|
// coalesced apply set
|
|
// - pending applys are cancelled by incoming reverts
|
|
// - incoming applys are cancelled by pending reverts
|
|
newApply := c.merge(c.apply, apply, pendRevert, reverting)
|
|
|
|
// commit the coalesced sets
|
|
c.revert = newRevert
|
|
c.apply = newApply
|
|
}
|
|
|
|
func (c *HeadChangeCoalescer) merge(pend, incoming []*types.TipSet, cancel1, cancel2 map[types.TipSetKey]struct{}) []*types.TipSet {
|
|
result := make([]*types.TipSet, 0, len(pend)+len(incoming))
|
|
for _, ts := range pend {
|
|
_, cancel := cancel1[ts.Key()]
|
|
if cancel {
|
|
continue
|
|
}
|
|
|
|
_, cancel = cancel2[ts.Key()]
|
|
if cancel {
|
|
continue
|
|
}
|
|
|
|
result = append(result, ts)
|
|
}
|
|
|
|
for _, ts := range incoming {
|
|
_, cancel := cancel1[ts.Key()]
|
|
if cancel {
|
|
continue
|
|
}
|
|
|
|
_, cancel = cancel2[ts.Key()]
|
|
if cancel {
|
|
continue
|
|
}
|
|
|
|
result = append(result, ts)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (c *HeadChangeCoalescer) dispatch() {
|
|
err := c.notify(c.revert, c.apply)
|
|
if err != nil {
|
|
log.Errorf("error dispatching coalesced head change notification: %s", err)
|
|
}
|
|
|
|
c.revert = nil
|
|
c.apply = nil
|
|
}
|