696469aae7
It was possible for NewEvents to never return, blocked on waiting for a WaitGroup to be done. The call to Done was in a goroutine that could exit before reaching the Done call. Replace the WaitGroup with a channel that is closed to signal that initialisation is complete. Also, while we are waiting on the channel, wait on the context so we can exit clealy if the context is canceled.
192 lines
4.4 KiB
Go
192 lines
4.4 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
var log = logging.Logger("events")
|
|
|
|
// HeightHandler `curH`-`ts.Height` = `confidence`
|
|
type (
|
|
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
|
|
RevertHandler func(ctx context.Context, ts *types.TipSet) error
|
|
)
|
|
|
|
type heightHandler struct {
|
|
confidence int
|
|
called bool
|
|
|
|
handle HeightHandler
|
|
revert RevertHandler
|
|
}
|
|
|
|
type eventAPI interface {
|
|
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
|
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
|
ChainHead(context.Context) (*types.TipSet, error)
|
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
|
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
|
|
|
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
|
|
}
|
|
|
|
type Events struct {
|
|
api eventAPI
|
|
|
|
tsc *tipSetCache
|
|
lk sync.Mutex
|
|
|
|
ready chan struct{}
|
|
readyOnce sync.Once
|
|
|
|
heightEvents
|
|
*hcEvents
|
|
}
|
|
|
|
func NewEvents(ctx context.Context, api eventAPI) *Events {
|
|
gcConfidence := 2 * build.ForkLengthThreshold
|
|
|
|
tsc := newTSCache(gcConfidence, api)
|
|
|
|
e := &Events{
|
|
api: api,
|
|
|
|
tsc: tsc,
|
|
|
|
heightEvents: heightEvents{
|
|
tsc: tsc,
|
|
ctx: ctx,
|
|
gcConfidence: gcConfidence,
|
|
|
|
heightTriggers: map[uint64]*heightHandler{},
|
|
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
|
|
htHeights: map[abi.ChainEpoch][]uint64{},
|
|
},
|
|
|
|
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
|
|
ready: make(chan struct{}),
|
|
}
|
|
|
|
go e.listenHeadChanges(ctx)
|
|
|
|
// Wait for the first tipset to be seen or bail if shutting down
|
|
select {
|
|
case <-e.ready:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Events) listenHeadChanges(ctx context.Context) {
|
|
for {
|
|
if err := e.listenHeadChangesOnce(ctx); err != nil {
|
|
log.Errorf("listen head changes errored: %s", err)
|
|
} else {
|
|
log.Warn("listenHeadChanges quit")
|
|
}
|
|
if ctx.Err() != nil {
|
|
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
|
|
return
|
|
}
|
|
build.Clock.Sleep(time.Second)
|
|
log.Info("restarting listenHeadChanges")
|
|
}
|
|
}
|
|
|
|
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
notifs, err := e.api.ChainNotify(ctx)
|
|
if err != nil {
|
|
// Retry is handled by caller
|
|
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
|
|
}
|
|
|
|
var cur []*api.HeadChange
|
|
var ok bool
|
|
|
|
// Wait for first tipset or bail
|
|
select {
|
|
case cur, ok = <-notifs:
|
|
if !ok {
|
|
return xerrors.Errorf("notification channel closed")
|
|
}
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if len(cur) != 1 {
|
|
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
|
|
}
|
|
|
|
if cur[0].Type != store.HCCurrent {
|
|
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
|
|
}
|
|
|
|
if err := e.tsc.add(cur[0].Val); err != nil {
|
|
log.Warnf("tsc.add: adding current tipset failed: %v", err)
|
|
}
|
|
|
|
e.readyOnce.Do(func() {
|
|
e.lastTs = cur[0].Val
|
|
// Signal that we have seen first tipset
|
|
close(e.ready)
|
|
})
|
|
|
|
for notif := range notifs {
|
|
var rev, app []*types.TipSet
|
|
for _, notif := range notif {
|
|
switch notif.Type {
|
|
case store.HCRevert:
|
|
rev = append(rev, notif.Val)
|
|
case store.HCApply:
|
|
app = append(app, notif.Val)
|
|
default:
|
|
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
|
|
}
|
|
}
|
|
|
|
if err := e.headChange(rev, app); err != nil {
|
|
log.Warnf("headChange failed: %s", err)
|
|
}
|
|
|
|
// sync with fake chainstore (for tests)
|
|
if fcs, ok := e.api.(interface{ notifDone() }); ok {
|
|
fcs.notifDone()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Events) headChange(rev, app []*types.TipSet) error {
|
|
if len(app) == 0 {
|
|
return xerrors.New("events.headChange expected at least one applied tipset")
|
|
}
|
|
|
|
e.lk.Lock()
|
|
defer e.lk.Unlock()
|
|
|
|
if err := e.headChangeAt(rev, app); err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.processHeadChangeEvent(rev, app)
|
|
}
|