godoc and final dispatch when closing the coalescer
This commit is contained in:
parent
ec13c5f80d
commit
9149ae2164
@ -7,11 +7,14 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
|
||||||
func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee {
|
func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee {
|
||||||
c := NewHeadChangeCoalescer(fn, delay)
|
c := NewHeadChangeCoalescer(fn, delay)
|
||||||
return c.HeadChange
|
return c.HeadChange
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
|
||||||
|
// with pending head changes to reduce state computations from head change notifications.
|
||||||
type HeadChangeCoalescer struct {
|
type HeadChangeCoalescer struct {
|
||||||
notify ReorgNotifee
|
notify ReorgNotifee
|
||||||
|
|
||||||
@ -28,6 +31,7 @@ type headChange struct {
|
|||||||
revert, apply []*types.TipSet
|
revert, apply []*types.TipSet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
|
||||||
func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer {
|
func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c := &HeadChangeCoalescer{
|
c := &HeadChangeCoalescer{
|
||||||
@ -42,6 +46,8 @@ func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoa
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
|
||||||
|
// head change and schedules dispatch of a coalesced head change in the background.
|
||||||
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
|
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
|
||||||
select {
|
select {
|
||||||
case c.eventq <- headChange{revert: revert, apply: apply}:
|
case c.eventq <- headChange{revert: revert, apply: apply}:
|
||||||
@ -51,6 +57,8 @@ func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes the coalescer and cancels the background dispatch goroutine.
|
||||||
|
// Any further notification will result in an error.
|
||||||
func (c *HeadChangeCoalescer) Close() {
|
func (c *HeadChangeCoalescer) Close() {
|
||||||
select {
|
select {
|
||||||
case <-c.ctx.Done():
|
case <-c.ctx.Done():
|
||||||
@ -59,6 +67,8 @@ func (c *HeadChangeCoalescer) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implementation details
|
||||||
|
|
||||||
func (c *HeadChangeCoalescer) background(delay time.Duration) {
|
func (c *HeadChangeCoalescer) background(delay time.Duration) {
|
||||||
var timerC <-chan time.Time
|
var timerC <-chan time.Time
|
||||||
for {
|
for {
|
||||||
@ -74,6 +84,9 @@ func (c *HeadChangeCoalescer) background(delay time.Duration) {
|
|||||||
timerC = nil
|
timerC = nil
|
||||||
|
|
||||||
case <-c.ctx.Done():
|
case <-c.ctx.Done():
|
||||||
|
if c.revert != nil || c.apply != nil {
|
||||||
|
c.dispatch()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user