refactor events system

This commit is contained in:
Steven Allen 2021-08-03 17:10:30 -07:00
parent a875e9ba73
commit 3846170302
19 changed files with 1041 additions and 917 deletions

33
chain/events/cache.go Normal file
View File

@ -0,0 +1,33 @@
package events
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
type uncachedAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}
type cache struct {
*tipSetCache
*messageCache
uncachedAPI
}
func newCache(api EventAPI, gcConfidence abi.ChainEpoch) *cache {
return &cache{
newTSCache(api, gcConfidence),
newMessageCache(api),
api,
}
}

View File

@ -2,18 +2,14 @@ package events
import (
"context"
"sync"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -25,209 +21,50 @@ type (
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)
type heightHandler struct {
confidence int
called bool
handle HeightHandler
revert RevertHandler
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, from, to *types.TipSet) error
Revert(ctx context.Context, from, to *types.TipSet) error
}
type EventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}
type Events struct {
api EventAPI
tsc *tipSetCache
lk sync.Mutex
ready chan struct{}
readyOnce sync.Once
heightEvents
*observer
*heightEvents
*hcEvents
observers []TipSetObserver
}
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
cache := newCache(api, gcConfidence)
e := &Events{
api: api,
ob := newObserver(cache, gcConfidence)
he := newHeightEvents(cache, gcConfidence)
headChange := newHCEvents(cache)
tsc: tsc,
heightEvents: heightEvents{
tsc: tsc,
ctx: ctx,
gcConfidence: gcConfidence,
heightTriggers: map[uint64]*heightHandler{},
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
htHeights: map[abi.ChainEpoch][]uint64{},
},
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
// Cache first. Observers are ordered and we always want to fill the cache first.
ob.Observe(cache.observer())
ob.Observe(he.observer())
ob.Observe(headChange.observer())
if err := ob.start(ctx); err != nil {
return nil, err
}
go e.listenHeadChanges(ctx)
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}
return e
return &Events{ob, he, headChange}, nil
}
func NewEvents(ctx context.Context, api EventAPI) *Events {
func NewEvents(ctx context.Context, api EventAPI) (*Events, error) {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
log.Info("restarting listenHeadChanges")
}
}
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warnf("tsc.add: adding current tipset failed: %v", err)
}
e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
// Signal that we have seen first tipset
close(e.ready)
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
}
return nil
}
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
e.lk.Lock()
defer e.lk.Unlock()
if err := e.headChangeAt(rev, app); err != nil {
return err
}
if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}
// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}
for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}
return nil
}

View File

@ -5,9 +5,6 @@ import (
"math"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/go-state-types/abi"
@ -35,7 +32,7 @@ type eventData interface{}
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type EventHandler func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
// wait for has already happened in tipset `ts`
@ -43,7 +40,7 @@ type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainE
// If `done` is true, timeout won't be triggered
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
// may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
type CheckFunc func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error)
// Keep track of information for an event handler
type handlerInfo struct {
@ -60,10 +57,9 @@ type handlerInfo struct {
// until the required confidence is reached
type queuedEvent struct {
trigger triggerID
data eventData
prevH abi.ChainEpoch
h abi.ChainEpoch
data eventData
prevTipset, tipset *types.TipSet
called bool
}
@ -71,10 +67,7 @@ type queuedEvent struct {
// Manages chain head change events, which may be forward (new tipset added to
// chain) or backward (chain branch discarded in favour of heavier branch)
type hcEvents struct {
cs EventAPI
tsc *tipSetCache
ctx context.Context
gcConfidence uint64
cs EventAPI
lastTs *types.TipSet
@ -82,8 +75,10 @@ type hcEvents struct {
ctr triggerID
// TODO: get rid of trigger IDs and just use pointers as keys.
triggers map[triggerID]*handlerInfo
// TODO: instead of scheduling events in the future, look at the chain in the past. We can sip the "confidence" queue entirely.
// maps block heights to events
// [triggerH][msgH][event]
confQueue map[triggerH]map[msgH][]*queuedEvent
@ -98,83 +93,76 @@ type hcEvents struct {
watcherEvents
}
func newHCEvents(ctx context.Context, cs EventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents {
e := hcEvents{
ctx: ctx,
cs: cs,
tsc: tsc,
gcConfidence: gcConfidence,
func newHCEvents(api EventAPI) *hcEvents {
e := &hcEvents{
cs: api,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerID]*handlerInfo{},
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
}
e.messageEvents = newMessageEvents(ctx, &e, cs)
e.watcherEvents = newWatcherEvents(ctx, &e, cs)
e.messageEvents = newMessageEvents(e, api)
e.watcherEvents = newWatcherEvents(e, api)
return &e
return e
}
// Called when there is a change to the head with tipsets to be
// reverted / applied
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
type hcEventsObserver hcEvents
func (e *hcEvents) observer() TipSetObserver {
return (*hcEventsObserver)(e)
}
func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
e.handleReverts(ts)
e.lastTs = ts
defer func() { e.lastTs = to }()
// Check if the head change caused any state changes that we were
// waiting for
stateChanges := e.checkStateChanges(from, to)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, from, to)
}
for _, ts := range app {
// Check if the head change caused any state changes that we were
// waiting for
stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts)
// Check if the head change included any new message calls
newCalls := e.checkNewCalls(ctx, from, to)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, e.lastTs, ts)
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, to)
}
// Check if the head change included any new message calls
newCalls, err := e.messageEvents.checkNewCalls(ts)
if err != nil {
return err
}
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, ts)
}
}
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
}
// Update the latest known tipset
e.lastTs = ts
}
for at := from.Height() + 1; at <= to.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ctx, at)
e.applyTimeouts(ctx, to)
}
return nil
}
func (e *hcEvents) handleReverts(ts *types.TipSet) {
reverts, ok := e.revertQueue[ts.Height()]
func (e *hcEventsObserver) Revert(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
defer func() { e.lastTs = to }()
reverts, ok := e.revertQueue[from.Height()]
if !ok {
return // nothing to do
return nil // nothing to do
}
for _, triggerH := range reverts {
toRevert := e.confQueue[triggerH][ts.Height()]
toRevert := e.confQueue[triggerH][from.Height()]
for _, event := range toRevert {
if !event.called {
continue // event wasn't apply()-ied yet
@ -182,24 +170,21 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) {
trigger := e.triggers[event.trigger]
if err := trigger.revert(e.ctx, ts); err != nil {
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err)
if err := trigger.revert(ctx, from); err != nil {
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", from.Height(), triggerH, err)
}
}
delete(e.confQueue[triggerH], ts.Height())
delete(e.confQueue[triggerH], from.Height())
}
delete(e.revertQueue, ts.Height())
delete(e.revertQueue, from.Height())
return nil
}
// Queue up events until the chain has reached a height that reflects the
// desired confidence
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
func (e *hcEventsObserver) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
trigger := e.triggers[trigID]
prevH := NoHeight
if prevTs != nil {
prevH = prevTs.Height()
}
appliedH := ts.Height()
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
@ -211,28 +196,23 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts
}
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
trigger: trigID,
prevH: prevH,
h: appliedH,
data: data,
trigger: trigID,
data: data,
tipset: ts,
prevTipset: prevTs,
})
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
}
// Apply any events that were waiting for this chain height for confidence
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
func (e *hcEventsObserver) applyWithConfidence(ctx context.Context, height abi.ChainEpoch) {
byOrigH, ok := e.confQueue[height]
if !ok {
return // no triggers at this height
}
for origH, events := range byOrigH {
triggerTs, err := e.tsc.get(origH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height)
}
for _, event := range events {
if event.called {
continue
@ -243,18 +223,7 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
continue
}
// Previous tipset - this is relevant for example in a state change
// from one tipset to another
var prevTs *types.TipSet
if event.prevH != NoHeight {
prevTs, err = e.tsc.get(event.prevH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height)
continue
}
}
more, err := trigger.handle(event.data, prevTs, triggerTs, height)
more, err := trigger.handle(ctx, event.data, event.prevTipset, event.tipset, height)
if err != nil {
log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
continue // don't revert failed calls
@ -273,7 +242,7 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
}
// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
func (e *hcEventsObserver) applyTimeouts(ctx context.Context, ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
if !ok {
return // nothing to do
@ -288,12 +257,13 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
continue
}
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
// This should be cached.
timeoutTs, err := e.cs.ChainGetTipSetAfterHeight(ctx, ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Key())
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
}
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
more, err := trigger.handle(ctx, nil, nil, timeoutTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
continue // don't revert failed calls
@ -309,24 +279,24 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
// - RevertHandler: called if the chain head changes causing the event to revert
// - confidence: wait this many tipsets before calling EventHandler
// - timeout: at this chain height, timeout on waiting for this event
func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
e.lk.Lock()
defer e.lk.Unlock()
// Check if the event has already occurred
ts, err := e.tsc.best()
if err != nil {
return 0, xerrors.Errorf("error getting best tipset: %w", err)
}
done, more, err := check(ts)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
more := true
done := false
if e.lastTs != nil {
var err error
done, more, err = check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
}
if done {
timeout = NoTimeout
}
// Create a trigger for the event
id := e.ctr
e.ctr++
@ -354,12 +324,11 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
// to listen for changes
type headChangeAPI interface {
onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
}
// watcherEvents watches for a state change
type watcherEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
@ -367,9 +336,8 @@ type watcherEvents struct {
matchers map[triggerID]StateMatchFunc
}
func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) watcherEvents {
func newWatcherEvents(hcAPI headChangeAPI, cs EventAPI) watcherEvents {
return watcherEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]StateMatchFunc),
@ -438,7 +406,7 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
// the state change is queued up until the confidence interval has elapsed (and
// `StateChangeHandler` is called)
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
states, ok := data.(StateChange)
if data != nil && !ok {
panic("expected StateChange")
@ -447,7 +415,7 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler,
return scHnd(prevTs, ts, states, height)
}
id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
id, err := we.hcAPI.onHeadChanged(context.TODO(), check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
@ -461,43 +429,29 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler,
// messageEvents watches for message calls to actors
type messageEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID]MsgMatchFunc
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) messageEvents {
blsMsgCache, _ := lru.NewARC(500)
func newMessageEvents(hcAPI headChangeAPI, cs EventAPI) messageEvents {
return messageEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
blockMsgLk: sync.Mutex{},
blockMsgCache: blsMsgCache,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
}
}
// Check if there are any new actor calls
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) {
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return nil, err
}
func (me *messageEvents) checkNewCalls(ctx context.Context, from, to *types.TipSet) map[triggerID][]eventData {
me.lk.RLock()
defer me.lk.RUnlock()
// For each message in the tipset
res := make(map[triggerID][]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
me.messagesForTs(from, func(msg *types.Message) {
// TODO: provide receipts
// Run each trigger's matcher against the message
@ -516,47 +470,32 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventD
}
})
return res, nil
return res
}
// Get the messages in a tipset
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
me.blockMsgLk.Lock()
msgsI, ok := me.blockMsgCache.Get(tsb.Cid())
var err error
if !ok {
msgsI, err = me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
me.blockMsgLk.Unlock()
continue
}
me.blockMsgCache.Add(tsb.Cid(), msgsI)
for i, tsb := range ts.Cids() {
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb)
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s",
ts.Height(), tsb, ts.Blocks()[i].Messages, err)
continue
}
me.blockMsgLk.Unlock()
msgs := msgsI.(*api.BlockMessages)
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
for i, c := range msgs.Cids {
// We iterate over the CIDs to avoid having to recompute them.
_, ok := seen[c]
if ok {
continue
}
seen[m.Cid()] = struct{}{}
consume(m)
}
for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()]
if ok {
continue
seen[c] = struct{}{}
if i < len(msgs.BlsMessages) {
consume(msgs.BlsMessages[i])
} else {
consume(&msgs.SecpkMessages[i-len(msgs.BlsMessages)].Message)
}
seen[m.Message.Cid()] = struct{}{}
consume(&m.Message)
}
}
}
@ -596,14 +535,14 @@ type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
// * `MsgMatchFunc` is called against each message. If there is a match, the
// message is queued up until the confidence interval has elapsed (and
// `MsgHandler` is called)
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
func (me *messageEvents) Called(ctx context.Context, check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
msg, ok := data.(*types.Message)
if data != nil && !ok {
panic("expected msg")
}
ml, err := me.cs.StateSearchMsg(me.ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
ml, err := me.cs.StateSearchMsg(ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
if err != nil {
return false, err
}
@ -615,7 +554,7 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa
return msgHnd(msg, &ml.Receipt, ts, height)
}
id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
id, err := me.hcAPI.onHeadChanged(ctx, check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
@ -629,5 +568,5 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa
// Convenience function for checking and matching messages
func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
return me.Called(ctx, me.CheckMsg(msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
}

View File

@ -11,136 +11,32 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type heightEvents struct {
lk sync.Mutex
tsc *tipSetCache
gcConfidence abi.ChainEpoch
type heightHandler struct {
ts *types.TipSet
height abi.ChainEpoch
called bool
ctr triggerID
heightTriggers map[triggerID]*heightHandler
htTriggerHeights map[triggerH][]triggerID
htHeights map[msgH][]triggerID
ctx context.Context
handle HeightHandler
revert RevertHandler
}
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
type heightEvents struct {
api EventAPI
gcConfidence abi.ChainEpoch
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
// TODO: log error if h below gcconfidence
// revert height-based triggers
lk sync.Mutex
head *types.TipSet
tsHeights, triggerHeights map[abi.ChainEpoch][]*heightHandler
lastGc abi.ChainEpoch //nolint:structcheck
}
revert := func(h abi.ChainEpoch, ts *types.TipSet) {
for _, tid := range e.htHeights[h] {
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
rev := e.heightTriggers[tid].revert
e.lk.Unlock()
err := rev(ctx, ts)
e.lk.Lock()
e.heightTriggers[tid].called = false
span.End()
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
}
}
revert(ts.Height(), ts)
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
if cts != nil {
break
}
revert(subh, ts)
subh--
}
if err := e.tsc.revert(ts); err != nil {
return err
}
func newHeightEvents(api EventAPI, gcConfidence abi.ChainEpoch) *heightEvents {
return &heightEvents{
api: api,
gcConfidence: gcConfidence,
tsHeights: map[abi.ChainEpoch][]*heightHandler{},
triggerHeights: map[abi.ChainEpoch][]*heightHandler{},
}
for i := range app {
ts := app[i]
if err := e.tsc.add(ts); err != nil {
return err
}
// height triggers
apply := func(h abi.ChainEpoch, ts *types.TipSet) error {
for _, tid := range e.htTriggerHeights[h] {
hnd := e.heightTriggers[tid]
if hnd.called {
return nil
}
triggerH := h - abi.ChainEpoch(hnd.confidence)
incTs, err := e.tsc.getNonNull(triggerH)
if err != nil {
return err
}
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
handle := hnd.handle
e.lk.Unlock()
err = handle(ctx, incTs, h)
e.lk.Lock()
hnd.called = true
span.End()
if err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err)
}
}
return nil
}
if err := apply(ts.Height(), ts); err != nil {
return err
}
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
if cts != nil {
break
}
if err := apply(subh, ts); err != nil {
return err
}
subh--
}
}
return nil
}
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
@ -148,62 +44,211 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
best, err := e.tsc.best()
if err != nil {
e.lk.Unlock()
return xerrors.Errorf("error getting best tipset: %w", err)
//
// The context governs cancellations of this call, it won't cancel the event handler.
func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
if abi.ChainEpoch(confidence) > e.gcConfidence {
// Need this to be able to GC effectively.
return xerrors.Errorf("confidence cannot be greater than gcConfidence: %d > %d", confidence, e.gcConfidence)
}
bestH := best.Height()
if bestH >= h+abi.ChainEpoch(confidence) {
ts, err := e.tsc.getNonNull(h)
if err != nil {
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
}
e.lk.Unlock()
ctx, span := trace.StartSpan(e.ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err = hnd(ctx, ts, bestH)
span.End()
if err != nil {
return err
}
e.lk.Lock()
best, err = e.tsc.best()
if err != nil {
e.lk.Unlock()
return xerrors.Errorf("error getting best tipset: %w", err)
}
bestH = best.Height()
}
defer e.lk.Unlock()
if bestH >= h+abi.ChainEpoch(confidence)+e.gcConfidence {
return nil
}
triggerAt := h + abi.ChainEpoch(confidence)
id := e.ctr
e.ctr++
e.heightTriggers[id] = &heightHandler{
confidence: confidence,
handler := &heightHandler{
height: h,
handle: hnd,
revert: rev,
}
triggerAt := h + abi.ChainEpoch(confidence)
e.htHeights[h] = append(e.htHeights[h], id)
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
// Here we try to jump onto a moving train. To avoid stopping the train, we release the lock
// while calling the API and/or the trigger functions. Unfortunately, it's entirely possible
// (although unlikely) to go back and forth across the trigger heights, so we need to keep
// going back and forth here till we're synced.
//
// TODO: Consider using a worker goroutine so we can just drop the handler in a channel? The
// downside is that we'd either need a tipset cache, or we'd need to potentially fetch
// tipsets in-line inside the event loop.
e.lk.Lock()
for {
head := e.head
// If we haven't initialized yet, store the trigger and move on.
if head == nil {
e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler)
e.tsHeights[h] = append(e.tsHeights[h], handler)
e.lk.Unlock()
return nil
}
if head.Height() >= h {
// Head is past the handler height. We at least need to stash the tipset to
// avoid doing this from the main event loop.
e.lk.Unlock()
var ts *types.TipSet
if head.Height() == h {
ts = head
} else {
var err error
ts, err = e.api.ChainGetTipSetAfterHeight(ctx, handler.height, head.Key())
if err != nil {
return xerrors.Errorf("events.ChainAt: failed to get tipset: %s", err)
}
}
// If we've applied the handler on the wrong tipset, revert.
if handler.called && !ts.Equals(handler.ts) {
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.revert(ctx, handler.ts)
span.End()
if err != nil {
return err
}
handler.called = false
}
// Save the tipset.
handler.ts = ts
// If we've reached confidence and haven't called, call.
if !handler.called && head.Height() >= triggerAt {
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.handle(ctx, handler.ts, head.Height())
span.End()
if err != nil {
return err
}
handler.called = true
// If we've reached gcConfidence, return without saving anything.
if head.Height() >= h+e.gcConfidence {
return nil
}
}
e.lk.Lock()
} else if handler.called {
// We're not passed the head (anymore) but have applied the handler. Revert, try again.
e.lk.Unlock()
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.revert(ctx, handler.ts)
span.End()
if err != nil {
return err
}
handler.called = false
e.lk.Lock()
} // otherwise, we changed heads but the change didn't matter.
// If we managed to get through this without the head changing, we're finally done.
if head.Equals(e.head) {
e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler)
e.tsHeights[h] = append(e.tsHeights[h], handler)
e.lk.Unlock()
return nil
}
}
}
func (e *heightEvents) observer() TipSetObserver {
return (*heightEventsObserver)(e)
}
// Updates the head and garbage collects if we're 2x over our garbage collection confidence period.
func (e *heightEventsObserver) updateHead(h *types.TipSet) {
e.lk.Lock()
defer e.lk.Unlock()
e.head = h
if e.head.Height() < e.lastGc+e.gcConfidence*2 {
return
}
e.lastGc = h.Height()
targetGcHeight := e.head.Height() - e.gcConfidence
for h := range e.tsHeights {
if h >= targetGcHeight {
continue
}
delete(e.tsHeights, h)
}
for h := range e.triggerHeights {
if h >= targetGcHeight {
continue
}
delete(e.triggerHeights, h)
}
}
type heightEventsObserver heightEvents
func (e *heightEventsObserver) Revert(ctx context.Context, from, to *types.TipSet) error {
// Update the head first so we don't accidental skip reverting a concurrent call to ChainAt.
e.updateHead(to)
// Call revert on all hights between the two tipsets, handling empty tipsets.
for h := from.Height(); h > to.Height(); h-- {
e.lk.Lock()
triggers := e.tsHeights[h]
e.lk.Unlock()
// 1. Triggers are only invoked from the global event loop, we don't need to hold the lock while calling.
// 2. We only ever append to or replace the trigger slice, so it's safe to iterate over it without the lock.
for _, handler := range triggers {
handler.ts = nil // invalidate
if !handler.called {
// We haven't triggered this yet, or there has been a concurrent call to ChainAt.
continue
}
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
err := handler.revert(ctx, from)
span.End()
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
handler.called = false
}
}
return nil
}
func (e *heightEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
// Update the head first so we don't accidental skip applying a concurrent call to ChainAt.
e.updateHead(to)
for h := from.Height() + 1; h <= to.Height(); h++ {
e.lk.Lock()
triggers := e.triggerHeights[h]
tipsets := e.tsHeights[h]
e.lk.Unlock()
// Stash the tipset for future triggers.
for _, handler := range tipsets {
handler.ts = to
}
// Trigger the ready triggers.
for _, handler := range triggers {
if handler.called {
// We may have reverted past the trigger point, but not past the call point.
// Or there has been a concurrent call to ChainAt.
continue
}
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
err := handler.handle(ctx, handler.ts, h)
span.End()
if err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", h, to.Height(), err)
}
handler.called = true
}
}
return nil
}

View File

@ -41,8 +41,6 @@ type fakeCS struct {
msgs map[cid.Cid]fakeMsg
blkMsgs map[cid.Cid]cid.Cid
sync sync.Mutex
tipsets map[types.TipSetKey]*types.TipSet
sub func(rev, app []*types.TipSet)
@ -51,6 +49,20 @@ type fakeCS struct {
callNumber map[string]int
}
func newFakeCS(t *testing.T) *fakeCS {
fcs := &fakeCS{
t: t,
h: 1,
msgs: make(map[cid.Cid]fakeMsg),
blkMsgs: make(map[cid.Cid]cid.Cid),
tipsets: make(map[types.TipSetKey]*types.TipSet),
tsc: newTSCache(nil, 2*build.ForkLengthThreshold),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
return fcs
}
func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
@ -58,6 +70,13 @@ func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
panic("implement me")
}
func (fcs *fakeCS) ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetPath"] = fcs.callNumber["ChainGetPath"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
@ -85,6 +104,12 @@ func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types
fcs.callNumber["ChainGetTipSetByHeight"] = fcs.callNumber["ChainGetTipSetByHeight"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetTipSetAfterHeight"] = fcs.callNumber["ChainGetTipSetAfterHeight"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
a, _ := address.NewFromString("t00")
@ -132,13 +157,13 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg
return ts
}
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
func (fcs *fakeCS) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainNotify"] = fcs.callNumber["ChainNotify"] + 1
out := make(chan []*api.HeadChange, 1)
best, err := fcs.tsc.best()
best, err := fcs.tsc.ChainHead(ctx)
if err != nil {
return nil, err
}
@ -160,7 +185,12 @@ func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error
}
}
out <- notif
select {
case out <- notif:
case <-ctx.Done():
// TODO: fail test?
return
}
}
return out, nil
@ -180,7 +210,15 @@ func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api
return &api.BlockMessages{}, nil
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
cids := make([]cid.Cid, len(ms.bmsgs)+len(ms.smsgs))
for i, m := range ms.bmsgs {
cids[i] = m.Cid()
}
for i, m := range ms.smsgs {
cids[i+len(ms.bmsgs)] = m.Cid()
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs, Cids: cids}, nil
}
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
@ -202,6 +240,9 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
fcs.t.Fatal("sub not be nil")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nullm := map[int]struct{}{}
for _, v := range nulls {
nullm[v] = struct{}{}
@ -209,12 +250,14 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
var revs []*types.TipSet
for i := 0; i < rev; i++ {
ts, err := fcs.tsc.best()
fcs.t.Log("revert", fcs.h)
from, err := fcs.tsc.ChainHead(ctx)
require.NoError(fcs.t, err)
if _, ok := nullm[int(ts.Height())]; !ok {
revs = append(revs, ts)
require.NoError(fcs.t, fcs.tsc.revert(ts))
if _, ok := nullm[int(from.Height())]; !ok {
revs = append(revs, from)
require.NoError(fcs.t, fcs.tsc.revert(from))
}
fcs.h--
}
@ -222,6 +265,7 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
var apps []*types.TipSet
for i := 0; i < app; i++ {
fcs.h++
fcs.t.Log("apply", fcs.h)
mc, hasMsgs := msgs[i]
if !hasMsgs {
@ -232,7 +276,7 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
continue
}
best, err := fcs.tsc.best()
best, err := fcs.tsc.ChainHead(ctx)
require.NoError(fcs.t, err)
ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts))
@ -244,35 +288,24 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
apps = append(apps, ts)
}
fcs.sync.Lock()
fcs.sub(revs, apps)
fcs.sync.Lock()
fcs.sync.Unlock() //nolint:staticcheck
}
func (fcs *fakeCS) notifDone() {
fcs.sync.Unlock()
// Wait for the last round to finish.
fcs.sub(nil, nil)
fcs.sub(nil, nil)
}
var _ EventAPI = &fakeCS{}
func TestAt(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs := newFakeCS(t)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
@ -325,20 +358,18 @@ func TestAt(t *testing.T) {
}
func TestAtDoubleTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events := NewEvents(context.Background(), fcs)
fcs := newFakeCS(t)
events, err := NewEvents(ctx, fcs)
require.NoError(t, err)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(ctx, func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
@ -368,20 +399,14 @@ func TestAtDoubleTrigger(t *testing.T) {
}
func TestAtNullTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs := newFakeCS(t)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, abi.ChainEpoch(6), ts.Height())
require.Equal(t, 8, int(curH))
applied = true
@ -403,20 +428,18 @@ func TestAtNullTrigger(t *testing.T) {
}
func TestAtNullConf(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events := NewEvents(context.Background(), fcs)
fcs := newFakeCS(t)
events, err := NewEvents(ctx, fcs)
require.NoError(t, err)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(ctx, func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
@ -443,22 +466,17 @@ func TestAtNullConf(t *testing.T) {
}
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
fcs := newFakeCS(t)
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 5, nil) // 6
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
@ -478,22 +496,17 @@ func TestAtStart(t *testing.T) {
}
func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
fcs := newFakeCS(t)
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 10, nil) // 11
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 11, int(curH))
applied = true
@ -509,21 +522,16 @@ func TestAtStartConfidence(t *testing.T) {
}
func TestAtChained(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
fcs := newFakeCS(t)
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
@ -544,23 +552,18 @@ func TestAtChained(t *testing.T) {
}
func TestAtChainedConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
fcs := newFakeCS(t)
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 15, nil)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
@ -579,22 +582,17 @@ func TestAtChainedConfidence(t *testing.T) {
}
func TestAtChainedConfidenceNull(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
fcs := newFakeCS(t)
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 15, nil, 5)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
err = events.ChainAt(context.Background(), func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
applied = true
require.Equal(t, 6, int(ts.Height()))
return nil
@ -615,18 +613,10 @@ func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Messag
}
func TestCalled(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -637,7 +627,7 @@ func TestCalled(t *testing.T) {
var appliedTs *types.TipSet
var appliedH abi.ChainEpoch
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
@ -828,25 +818,17 @@ func TestCalled(t *testing.T) {
}
func TestCalledTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
called := false
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
called = true
@ -869,20 +851,16 @@ func TestCalledTimeout(t *testing.T) {
// with check func reporting done
fcs = &fakeCS{
t: t,
h: 1,
fcs = newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)
events = NewEvents(context.Background(), fcs)
// XXX: Needed to set the latest head so "check" succeeds". Is that OK? Or do we expect
// check to work _before_ we've received any events.
fcs.advance(0, 1, nil)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
called = true
@ -904,25 +882,17 @@ func TestCalledTimeout(t *testing.T) {
}
func TestCalledOrder(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
at := 0
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
switch at {
@ -968,18 +938,10 @@ func TestCalledOrder(t *testing.T) {
}
func TestCalledNull(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -987,7 +949,7 @@ func TestCalledNull(t *testing.T) {
more := true
var applied, reverted bool
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
@ -1034,18 +996,10 @@ func TestCalledNull(t *testing.T) {
}
func TestRemoveTriggersOnMessage(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -1053,7 +1007,7 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
more := true
var applied, reverted bool
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
@ -1125,18 +1079,10 @@ type testStateChange struct {
}
func TestStateChanged(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
more := true
var applied, reverted bool
@ -1149,7 +1095,7 @@ func TestStateChanged(t *testing.T) {
confidence := 3
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
@ -1214,18 +1160,10 @@ func TestStateChanged(t *testing.T) {
}
func TestStateChangedRevert(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
more := true
var applied, reverted bool
@ -1234,7 +1172,7 @@ func TestStateChangedRevert(t *testing.T) {
confidence := 1
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
@ -1293,22 +1231,14 @@ func TestStateChangedRevert(t *testing.T) {
}
func TestStateChangedTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
called := false
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
@ -1334,20 +1264,15 @@ func TestStateChangedTimeout(t *testing.T) {
// with check func reporting done
fcs = &fakeCS{
t: t,
h: 1,
fcs = newFakeCS(t)
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
// XXX: Needed to set the latest head so "check" succeeds". Is that OK? Or do we expect
// check to work _before_ we've received any events.
fcs.advance(0, 1, nil)
events = NewEvents(context.Background(), fcs)
err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
@ -1371,25 +1296,17 @@ func TestStateChangedTimeout(t *testing.T) {
}
func TestCalledMultiplePerEpoch(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
events, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
at := 0
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
switch at {
@ -1431,18 +1348,10 @@ func TestCalledMultiplePerEpoch(t *testing.T) {
}
func TestCachedSameBlock(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
fcs := newFakeCS(t)
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
_ = NewEvents(context.Background(), fcs)
_, err := NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 10, map[int]cid.Cid{})
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 20, "expect call ChainGetBlockMessages %d but got ", 20, fcs.callNumber["ChainGetBlockMessages"])

View File

@ -0,0 +1,42 @@
package events
import (
"context"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
)
type messageCache struct {
api EventAPI
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageCache(api EventAPI) *messageCache {
blsMsgCache, _ := lru.NewARC(500)
return &messageCache{
api: api,
blockMsgCache: blsMsgCache,
}
}
func (c *messageCache) ChainGetBlockMessages(ctx context.Context, blkCid cid.Cid) (*api.BlockMessages, error) {
c.blockMsgLk.Lock()
defer c.blockMsgLk.Unlock()
msgsI, ok := c.blockMsgCache.Get(blkCid)
var err error
if !ok {
msgsI, err = c.api.ChainGetBlockMessages(ctx, blkCid)
if err != nil {
return nil, err
}
c.blockMsgCache.Add(blkCid, msgsI)
}
return msgsI.(*api.BlockMessages), nil
}

234
chain/events/observer.go Normal file
View File

@ -0,0 +1,234 @@
package events
import (
"context"
"sync"
"time"
"github.com/filecoin-project/go-state-types/abi"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
type observer struct {
api EventAPI
lk sync.Mutex
gcConfidence abi.ChainEpoch
ready chan struct{}
head *types.TipSet
maxHeight abi.ChainEpoch
observers []TipSetObserver
}
func newObserver(api EventAPI, gcConfidence abi.ChainEpoch) *observer {
return &observer{
api: api,
gcConfidence: gcConfidence,
ready: make(chan struct{}),
observers: []TipSetObserver{},
}
}
func (o *observer) start(ctx context.Context) error {
go o.listenHeadChanges(ctx)
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-o.ready:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (o *observer) listenHeadChanges(ctx context.Context) {
for {
if err := o.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
log.Info("restarting listenHeadChanges")
}
}
func (o *observer) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := o.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
head := cur[0].Val
if o.head == nil {
o.head = head
close(o.ready)
} else if !o.head.Equals(head) {
changes, err := o.api.ChainGetPath(ctx, o.head.Key(), head.Key())
if err != nil {
return xerrors.Errorf("failed to get path from last applied tipset to head: %w", err)
}
if err := o.applyChanges(ctx, changes); err != nil {
return xerrors.Errorf("failed to apply head changes: %w", err)
}
}
for changes := range notifs {
if err := o.applyChanges(ctx, changes); err != nil {
return err
}
}
return nil
}
func (o *observer) applyChanges(ctx context.Context, changes []*api.HeadChange) error {
// Used to wait for a prior notification round to finish (by tests)
if len(changes) == 0 {
return nil
}
var rev, app []*types.TipSet
for _, changes := range changes {
switch changes.Type {
case store.HCRevert:
rev = append(rev, changes.Val)
case store.HCApply:
app = append(app, changes.Val)
default:
log.Errorf("unexpected head change notification type: '%s'", changes.Type)
}
}
if err := o.headChange(ctx, rev, app); err != nil {
return xerrors.Errorf("failed to apply head changes: %w", err)
}
return nil
}
func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "events.HeadChange")
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
defer func() {
span.AddAttributes(trace.Int64Attribute("endHeight", int64(o.head.Height())))
span.End()
}()
// NOTE: bailing out here if the head isn't what we expected is fine. We'll re-start the
// entire process and handle any strange reorgs.
for i, from := range rev {
if !from.Equals(o.head) {
return xerrors.Errorf(
"expected to revert %s (%d), reverting %s (%d)",
o.head.Key(), o.head.Height(), from.Key(), from.Height(),
)
}
var to *types.TipSet
if i+1 < len(rev) {
// If we have more reverts, the next revert is the next head.
to = rev[i+1]
} else {
// At the end of the revert sequenece, we need to looup the joint tipset
// between the revert sequence and the apply sequence.
var err error
to, err = o.api.ChainGetTipSet(ctx, from.Parents())
if err != nil {
// Well, this sucks. We'll bail and restart.
return xerrors.Errorf("failed to get tipset when reverting due to a SetHeead: %w", err)
}
}
// Get the observers late in case an observer registers/unregisters itself.
o.lk.Lock()
observers := o.observers
o.lk.Unlock()
for _, obs := range observers {
if err := obs.Revert(ctx, from, to); err != nil {
log.Errorf("observer %T failed to apply tipset %s (%d) with: %s", obs, from.Key(), from.Height(), err)
}
}
if to.Height() < o.maxHeight-o.gcConfidence {
log.Errorf("reverted past finality, from %d to %d", o.maxHeight, to.Height())
}
o.head = to
}
for _, to := range app {
if to.Parents() != o.head.Key() {
return xerrors.Errorf(
"cannot apply %s (%d) with parents %s on top of %s (%d)",
to.Key(), to.Height(), to.Parents(), o.head.Key(), o.head.Height(),
)
}
// Get the observers late in case an observer registers/unregisters itself.
o.lk.Lock()
observers := o.observers
o.lk.Unlock()
for _, obs := range observers {
if err := obs.Apply(ctx, o.head, to); err != nil {
log.Errorf("observer %T failed to revert tipset %s (%d) with: %s", obs, to.Key(), to.Height(), err)
}
}
o.head = to
if to.Height() > o.maxHeight {
o.maxHeight = to.Height()
}
}
return nil
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (o *observer) Observe(obs TipSetObserver) {
o.lk.Lock()
defer o.lk.Unlock()
o.observers = append(o.observers, obs)
}

View File

@ -11,7 +11,9 @@ import (
)
type tsCacheAPI interface {
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
}
@ -20,68 +22,157 @@ type tsCacheAPI interface {
type tipSetCache struct {
mu sync.RWMutex
cache []*types.TipSet
start int // chain head (end)
len int
byKey map[types.TipSetKey]*types.TipSet
byHeight []*types.TipSet
start int // chain head (end)
len int
storage tsCacheAPI
}
func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
func newTSCache(storage tsCacheAPI, cap abi.ChainEpoch) *tipSetCache {
return &tipSetCache{
cache: make([]*types.TipSet, cap),
start: 0,
len: 0,
byKey: make(map[types.TipSetKey]*types.TipSet, cap),
byHeight: make([]*types.TipSet, cap),
start: 0,
len: 0,
storage: storage,
}
}
func (tsc *tipSetCache) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
if ts, ok := tsc.byKey[tsk]; ok {
return ts, nil
}
return tsc.storage.ChainGetTipSet(ctx, tsk)
}
func (tsc *tipSetCache) add(ts *types.TipSet) error {
func (tsc *tipSetCache) ChainGetTipSetByHeight(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return tsc.get(ctx, height, tsk, true)
}
func (tsc *tipSetCache) ChainGetTipSetAfterHeight(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return tsc.get(ctx, height, tsk, false)
}
func (tsc *tipSetCache) get(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey, prev bool) (*types.TipSet, error) {
fallback := tsc.storage.ChainGetTipSetAfterHeight
if prev {
fallback = tsc.storage.ChainGetTipSetByHeight
}
tsc.mu.RLock()
// Nothing in the cache?
if tsc.len == 0 {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return fallback(ctx, height, tsk)
}
// Resolve the head.
head := tsc.byHeight[tsc.start]
if !tsk.IsEmpty() {
// Not on this chain?
var ok bool
head, ok = tsc.byKey[tsk]
if !ok {
tsc.mu.RUnlock()
return fallback(ctx, height, tsk)
}
}
headH := head.Height()
tailH := headH - abi.ChainEpoch(tsc.len)
if headH == height {
tsc.mu.RUnlock()
return head, nil
} else if headH < height {
tsc.mu.RUnlock()
// If the user doesn't pass a tsk, we assume "head" is the last tipset we processed.
return nil, xerrors.Errorf("requested epoch is in the future")
} else if height < tailH {
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tailH)
tsc.mu.RUnlock()
return fallback(ctx, height, head.Key())
}
direction := 1
if prev {
direction = -1
}
var ts *types.TipSet
for i := 0; i < tsc.len && ts == nil; i += direction {
ts = tsc.byHeight[normalModulo(tsc.start-int(headH-height)+i, len(tsc.byHeight))]
}
tsc.mu.RUnlock()
return ts, nil
}
func (tsc *tipSetCache) ChainHead(ctx context.Context) (*types.TipSet, error) {
tsc.mu.RLock()
best := tsc.byHeight[tsc.start]
tsc.mu.RUnlock()
if best == nil {
return tsc.storage.ChainHead(ctx)
}
return best, nil
}
func (tsc *tipSetCache) add(to *types.TipSet) error {
tsc.mu.Lock()
defer tsc.mu.Unlock()
if tsc.len > 0 {
best := tsc.cache[tsc.start]
if best.Height() >= ts.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
best := tsc.byHeight[tsc.start]
if best.Height() >= to.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.byHeight[tsc.start].Height()+1, to.Height())
}
if best.Key() != ts.Parents() {
if best.Key() != to.Parents() {
return xerrors.Errorf(
"tipSetCache.add: expected new tipset %s (%d) to follow %s (%d), its parents are %s",
ts.Key(), ts.Height(), best.Key(), best.Height(), best.Parents(),
to.Key(), to.Height(), best.Key(), best.Height(), best.Parents(),
)
}
}
nextH := ts.Height()
nextH := to.Height()
if tsc.len > 0 {
nextH = tsc.cache[tsc.start].Height() + 1
nextH = tsc.byHeight[tsc.start].Height() + 1
}
// fill null blocks
for nextH != ts.Height() {
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = nil
if tsc.len < len(tsc.cache) {
for nextH != to.Height() {
tsc.start = normalModulo(tsc.start+1, len(tsc.byHeight))
was := tsc.byHeight[tsc.start]
if was != nil {
tsc.byHeight[tsc.start] = nil
delete(tsc.byKey, was.Key())
}
if tsc.len < len(tsc.byHeight) {
tsc.len++
}
nextH++
}
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = ts
if tsc.len < len(tsc.cache) {
tsc.start = normalModulo(tsc.start+1, len(tsc.byHeight))
was := tsc.byHeight[tsc.start]
if was != nil {
delete(tsc.byKey, was.Key())
}
tsc.byHeight[tsc.start] = to
if tsc.len < len(tsc.byHeight) {
tsc.len++
}
tsc.byKey[to.Key()] = to
return nil
}
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
func (tsc *tipSetCache) revert(from *types.TipSet) error {
tsc.mu.Lock()
defer tsc.mu.Unlock()
return tsc.revertUnlocked(ts)
return tsc.revertUnlocked(from)
}
func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
@ -89,75 +180,35 @@ func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
return nil // this can happen, and it's fine
}
if !tsc.cache[tsc.start].Equals(ts) {
was := tsc.byHeight[tsc.start]
if !was.Equals(ts) {
return xerrors.New("tipSetCache.revert: revert tipset didn't match cache head")
}
delete(tsc.byKey, was.Key())
tsc.cache[tsc.start] = nil
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
tsc.byHeight[tsc.start] = nil
tsc.start = normalModulo(tsc.start-1, len(tsc.byHeight))
tsc.len--
_ = tsc.revertUnlocked(nil) // revert null block gap
return nil
}
func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error) {
for {
ts, err := tsc.get(height)
if err != nil {
return nil, err
}
if ts != nil {
return ts, nil
}
height++
}
func (tsc *tipSetCache) observer() TipSetObserver {
return (*tipSetCacheObserver)(tsc)
}
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
tsc.mu.RLock()
type tipSetCacheObserver tipSetCache
if tsc.len == 0 {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
}
var _ TipSetObserver = new(tipSetCacheObserver)
headH := tsc.cache[tsc.start].Height()
if height > headH {
tsc.mu.RUnlock()
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
}
clen := len(tsc.cache)
var tail *types.TipSet
for i := 1; i <= tsc.len; i++ {
tail = tsc.cache[normalModulo(tsc.start-tsc.len+i, clen)]
if tail != nil {
break
}
}
if height < tail.Height() {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
}
ts := tsc.cache[normalModulo(tsc.start-int(headH-height), clen)]
tsc.mu.RUnlock()
return ts, nil
func (tsc *tipSetCacheObserver) Apply(_ context.Context, _, to *types.TipSet) error {
return (*tipSetCache)(tsc).add(to)
}
func (tsc *tipSetCache) best() (*types.TipSet, error) {
tsc.mu.RLock()
best := tsc.cache[tsc.start]
tsc.mu.RUnlock()
if best == nil {
return tsc.storage.ChainHead(context.TODO())
}
return best, nil
func (tsc *tipSetCacheObserver) Revert(ctx context.Context, from, _ *types.TipSet) error {
return (*tipSetCache)(tsc).revert(from)
}
func normalModulo(n, m int) int {

View File

@ -17,6 +17,11 @@ type tsCacheAPIFailOnStorageCall struct {
t *testing.T
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
@ -25,6 +30,10 @@ func (tc *tsCacheAPIFailOnStorageCall) ChainHead(ctx context.Context) (*types.Ti
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
type cacheHarness struct {
t *testing.T
@ -40,7 +49,7 @@ func newCacheharness(t *testing.T) *cacheHarness {
h := &cacheHarness{
t: t,
tsc: newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t}),
tsc: newTSCache(&tsCacheAPIFailOnStorageCall{t: t}, 50),
height: 75,
miner: a,
}
@ -65,13 +74,13 @@ func (h *cacheHarness) addWithParents(parents []cid.Cid) {
}
func (h *cacheHarness) add() {
last, err := h.tsc.best()
last, err := h.tsc.ChainHead(context.Background())
require.NoError(h.t, err)
h.addWithParents(last.Cids())
}
func (h *cacheHarness) revert() {
best, err := h.tsc.best()
best, err := h.tsc.ChainHead(context.Background())
require.NoError(h.t, err)
err = h.tsc.revert(best)
require.NoError(h.t, err)
@ -95,6 +104,7 @@ func TestTsCache(t *testing.T) {
}
func TestTsCacheNulls(t *testing.T) {
ctx := context.Background()
h := newCacheharness(t)
h.add()
@ -105,66 +115,77 @@ func TestTsCacheNulls(t *testing.T) {
h.add()
h.add()
best, err := h.tsc.best()
best, err := h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, h.height-1, best.Height())
ts, err := h.tsc.get(h.height - 1)
ts, err := h.tsc.ChainGetTipSetByHeight(ctx, h.height-1, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h.height-1, ts.Height())
ts, err = h.tsc.get(h.height - 2)
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-2, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h.height-2, ts.Height())
ts, err = h.tsc.get(h.height - 3)
require.NoError(t, err)
require.Nil(t, ts)
ts, err = h.tsc.get(h.height - 8)
// Should skip the nulls and walk back to the last tipset.
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-3, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h.height-8, ts.Height())
best, err = h.tsc.best()
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-8, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h.height-8, ts.Height())
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.NoError(t, h.tsc.revert(best))
best, err = h.tsc.best()
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.NoError(t, h.tsc.revert(best))
best, err = h.tsc.best()
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, h.height-8, best.Height())
h.skip(50)
h.add()
ts, err = h.tsc.get(h.height - 1)
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-1, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h.height-1, ts.Height())
}
type tsCacheAPIStorageCallCounter struct {
t *testing.T
chainGetTipSetByHeight int
chainHead int
t *testing.T
chainGetTipSetByHeight int
chainGetTipSetAfterHeight int
chainGetTipSet int
chainHead int
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSetByHeight++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSetAfterHeight++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainHead(ctx context.Context) (*types.TipSet, error) {
tc.chainHead++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSet++
return &types.TipSet{}, nil
}
func TestTsCacheEmpty(t *testing.T) {
// Calling best on an empty cache should just call out to the chain API
callCounter := &tsCacheAPIStorageCallCounter{t: t}
tsc := newTSCache(50, callCounter)
_, err := tsc.best()
tsc := newTSCache(callCounter, 50)
_, err := tsc.ChainHead(context.Background())
require.NoError(t, err)
require.Equal(t, 1, callCounter.chainHead)
}

View File

@ -10,10 +10,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc {
func (me *messageEvents) CheckMsg(smsg types.ChainMsg, hnd MsgHandler) CheckFunc {
msg := smsg.VMMessage()
return func(ts *types.TipSet) (done bool, more bool, err error) {
return func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key())
if err != nil {
return false, true, err
@ -24,7 +24,7 @@ func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd
return false, true, nil
}
ml, err := me.cs.StateSearchMsg(me.ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
ml, err := me.cs.StateSearchMsg(ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
if err != nil {
return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err)
}

View File

@ -103,10 +103,11 @@ func TestPaymentChannelsAPI(t *testing.T) {
creatorStore := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(paymentCreator)))
// wait for the receiver to submit their vouchers
ev := events.NewEvents(ctx, paymentCreator)
ev, err := events.NewEvents(ctx, paymentCreator)
require.NoError(t, err)
preds := state.NewStatePredicates(paymentCreator)
finished := make(chan struct{})
err = ev.StateChanged(func(ts *types.TipSet) (done bool, more bool, err error) {
err = ev.StateChanged(func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
act, err := paymentCreator.StateGetActor(ctx, channel, ts.Key())
if err != nil {
return false, false, err

View File

@ -381,8 +381,9 @@ func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) {
// waitForHeight waits for the node to reach the given chain epoch
func waitForHeight(ctx context.Context, t *testing.T, node kit.TestFullNode, height abi.ChainEpoch) {
atHeight := make(chan struct{})
chainEvents := events.NewEvents(ctx, node)
err := chainEvents.ChainAt(func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
chainEvents, err := events.NewEvents(ctx, node)
require.NoError(t, err)
err = chainEvents.ChainAt(ctx, func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
close(atHeight)
return nil
}, nil, 1, height)

View File

@ -50,11 +50,14 @@ type clientApi struct {
full.MpoolAPI
}
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) (storagemarket.StorageClientNode, error) {
capi := &clientApi{chain, stateapi, mpool}
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, capi)
ev, err := events.NewEvents(ctx, capi)
if err != nil {
return nil, err
}
a := &ClientNodeAdapter{
clientApi: capi,
@ -63,7 +66,7 @@ func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi ful
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
}
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
return a
return a, nil
}
func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) {
@ -262,7 +265,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
if ts == nil {
// keep listening for events
return false, true, nil

View File

@ -22,7 +22,7 @@ import (
)
type eventsCalledAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
Called(ctx context.Context, check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}
type dealInfoAPI interface {
@ -64,7 +64,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
dealInfo, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
@ -165,7 +165,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
return nil
}
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(ctx, checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
@ -182,7 +182,7 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
_, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
@ -257,7 +257,7 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
return nil
}
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(ctx, checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}

View File

@ -477,13 +477,13 @@ type fakeEvents struct {
DealStartEpochTimeout bool
}
func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
func (fe *fakeEvents) Called(ctx context.Context, check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
if fe.DealStartEpochTimeout {
msgHnd(nil, nil, nil, 100) // nolint:errcheck
return nil
}
_, more, err := check(fe.CheckTs)
_, more, err := check(ctx, fe.CheckTs)
if err != nil {
return err
}
@ -506,7 +506,7 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
return nil
}
if matchMessage.doesRevert {
err := rev(fe.Ctx, matchMessage.ts)
err := rev(ctx, matchMessage.ts)
if err != nil {
return err
}

View File

@ -55,11 +55,14 @@ type ProviderNodeAdapter struct {
scMgr *SectorCommittedManager
}
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) (storagemarket.StorageProviderNode, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) (storagemarket.StorageProviderNode, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, full)
ev, err := events.NewEvents(ctx, full)
if err != nil {
return nil, err
}
na := &ProviderNodeAdapter{
FullNode: full,
@ -77,7 +80,7 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf
}
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
return na
return na, nil
}
}
@ -340,7 +343,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
if ts == nil {
// keep listening for events
return false, true, nil

View File

@ -56,8 +56,11 @@ func SettlePaymentChannels(mctx helpers.MetricsCtx, lc fx.Lifecycle, papi API) e
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
pcs := newPaymentChannelSettler(ctx, &papi)
ev := events.NewEvents(ctx, papi)
return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
ev, err := events.NewEvents(ctx, &papi)
if err != nil {
return err
}
return ev.Called(ctx, pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
},
})
return nil
@ -70,7 +73,7 @@ func newPaymentChannelSettler(ctx context.Context, api settlerAPI) *paymentChann
}
}
func (pcs *paymentChannelSettler) check(ts *types.TipSet) (done bool, more bool, err error) {
func (pcs *paymentChannelSettler) check(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
return false, true, nil
}

View File

@ -21,7 +21,7 @@ func NewEventsAdapter(api *events.Events) EventsAdapter {
}
func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error {
return e.delegate.ChainAt(func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return e.delegate.ChainAt(context.TODO(), func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return hnd(ctx, ts.Key().Bytes(), curH)
}, func(ctx context.Context, ts *types.TipSet) error {
return rev(ctx, ts.Key().Bytes())

View File

@ -116,6 +116,7 @@ type fullNodeFilteredAPI interface {
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error)
@ -167,28 +168,29 @@ func (m *Miner) Run(ctx context.Context) error {
return xerrors.Errorf("getting miner info: %w", err)
}
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
evtsAdapter = NewEventsAdapter(evts)
// consumer of chain head changes.
evts, err := events.NewEvents(ctx, m.api)
if err != nil {
return xerrors.Errorf("failed to subscribe to events: %w", err)
}
evtsAdapter := NewEventsAdapter(evts)
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI := NewSealingAPIAdapter(m.api)
// Instantiate a precommit policy.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
provingBuffer = md.WPoStProvingPeriod * 2
// Instantiate a precommit policy.
cfg := sealing.GetSealingConfigFunc(m.getSealConfig)
provingBuffer := md.WPoStProvingPeriod * 2
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer)
// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
)
// address selector.
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Instantiate the sealing FSM.
m.sealing = sealing.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)