Merge pull request #7000 from filecoin-project/feat/refactor-events

Refactor events subsystem
This commit is contained in:
Łukasz Magiera 2021-08-31 12:02:22 +02:00 committed by GitHub
commit b0f57d74e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1561 additions and 1224 deletions

View File

@ -33,6 +33,7 @@ type Gateway interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*HeadChange, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)

View File

@ -480,6 +480,8 @@ type GatewayStruct struct {
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) ``
ChainGetPath func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey) ([]*HeadChange, error) ``
ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) ``
ChainGetTipSetAfterHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) ``
@ -3039,6 +3041,17 @@ func (s *GatewayStub) ChainGetMessage(p0 context.Context, p1 cid.Cid) (*types.Me
return nil, ErrNotSupported
}
func (s *GatewayStruct) ChainGetPath(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey) ([]*HeadChange, error) {
if s.Internal.ChainGetPath == nil {
return *new([]*HeadChange), ErrNotSupported
}
return s.Internal.ChainGetPath(p0, p1, p2)
}
func (s *GatewayStub) ChainGetPath(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey) ([]*HeadChange, error) {
return *new([]*HeadChange), ErrNotSupported
}
func (s *GatewayStruct) ChainGetTipSet(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) {
if s.Internal.ChainGetTipSet == nil {
return nil, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,3 +1,4 @@
//go:build debug || 2k
// +build debug 2k
package build

View File

@ -1,3 +1,4 @@
//go:build butterflynet
// +build butterflynet
package build

View File

@ -1,3 +1,4 @@
//go:build calibnet
// +build calibnet
package build

View File

@ -1,3 +1,4 @@
//go:build debug
// +build debug
package build

View File

@ -1,3 +1,4 @@
//go:build interopnet
// +build interopnet
package build

View File

@ -1,10 +1,5 @@
// +build !debug
// +build !2k
// +build !testground
// +build !calibnet
// +build !nerpanet
// +build !butterflynet
// +build !interopnet
//go:build !debug && !2k && !testground && !calibnet && !nerpanet && !butterflynet && !interopnet
// +build !debug,!2k,!testground,!calibnet,!nerpanet,!butterflynet,!interopnet
package build

View File

@ -1,3 +1,4 @@
//go:build nerpanet
// +build nerpanet
package build

View File

@ -1,3 +1,4 @@
//go:build !testground
// +build !testground
package build

View File

@ -1,3 +1,4 @@
//go:build testground
// +build testground
// This file makes hardcoded parameters (const) configurable as vars.

View File

@ -1,4 +1,5 @@
//+build tools
//go:build tools
// +build tools
package build

33
chain/events/cache.go Normal file
View File

@ -0,0 +1,33 @@
package events
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
type uncachedAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}
type cache struct {
*tipSetCache
*messageCache
uncachedAPI
}
func newCache(api EventAPI, gcConfidence abi.ChainEpoch) *cache {
return &cache{
newTSCache(api, gcConfidence),
newMessageCache(api),
api,
}
}

View File

@ -2,18 +2,14 @@ package events
import (
"context"
"sync"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -25,209 +21,46 @@ type (
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)
type heightHandler struct {
confidence int
called bool
handle HeightHandler
revert RevertHandler
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, from, to *types.TipSet) error
Revert(ctx context.Context, from, to *types.TipSet) error
}
type EventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}
type Events struct {
api EventAPI
tsc *tipSetCache
lk sync.Mutex
ready chan struct{}
readyOnce sync.Once
heightEvents
*observer
*heightEvents
*hcEvents
observers []TipSetObserver
}
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
cache := newCache(api, gcConfidence)
e := &Events{
api: api,
tsc: tsc,
heightEvents: heightEvents{
tsc: tsc,
ctx: ctx,
gcConfidence: gcConfidence,
heightTriggers: map[uint64]*heightHandler{},
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
htHeights: map[abi.ChainEpoch][]uint64{},
},
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
ob := newObserver(cache, gcConfidence)
if err := ob.start(ctx); err != nil {
return nil, err
}
go e.listenHeadChanges(ctx)
he := newHeightEvents(cache, ob, gcConfidence)
headChange := newHCEvents(cache, ob)
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}
return e
return &Events{ob, he, headChange}, nil
}
func NewEvents(ctx context.Context, api EventAPI) *Events {
func NewEvents(ctx context.Context, api EventAPI) (*Events, error) {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
log.Info("restarting listenHeadChanges")
}
}
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warnf("tsc.add: adding current tipset failed: %v", err)
}
e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
// Signal that we have seen first tipset
close(e.ready)
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
}
return nil
}
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
e.lk.Lock()
defer e.lk.Unlock()
if err := e.headChangeAt(rev, app); err != nil {
return err
}
if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}
// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}
for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}
return nil
}

View File

@ -5,9 +5,6 @@ import (
"math"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/go-state-types/abi"
@ -35,7 +32,7 @@ type eventData interface{}
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type EventHandler func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
// wait for has already happened in tipset `ts`
@ -43,7 +40,7 @@ type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainE
// If `done` is true, timeout won't be triggered
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
// may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
type CheckFunc func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error)
// Keep track of information for an event handler
type handlerInfo struct {
@ -60,10 +57,9 @@ type handlerInfo struct {
// until the required confidence is reached
type queuedEvent struct {
trigger triggerID
data eventData
prevH abi.ChainEpoch
h abi.ChainEpoch
data eventData
prevTipset, tipset *types.TipSet
called bool
}
@ -71,19 +67,17 @@ type queuedEvent struct {
// Manages chain head change events, which may be forward (new tipset added to
// chain) or backward (chain branch discarded in favour of heavier branch)
type hcEvents struct {
cs EventAPI
tsc *tipSetCache
ctx context.Context
gcConfidence uint64
cs EventAPI
lk sync.Mutex
lastTs *types.TipSet
lk sync.Mutex
ctr triggerID
// TODO: get rid of trigger IDs and just use pointers as keys.
triggers map[triggerID]*handlerInfo
// TODO: instead of scheduling events in the future, look at the chain in the past. We can sip the "confidence" queue entirely.
// maps block heights to events
// [triggerH][msgH][event]
confQueue map[triggerH]map[msgH][]*queuedEvent
@ -98,83 +92,77 @@ type hcEvents struct {
watcherEvents
}
func newHCEvents(ctx context.Context, cs EventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents {
e := hcEvents{
ctx: ctx,
cs: cs,
tsc: tsc,
gcConfidence: gcConfidence,
func newHCEvents(api EventAPI, obs *observer) *hcEvents {
e := &hcEvents{
cs: api,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerID]*handlerInfo{},
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
}
e.messageEvents = newMessageEvents(ctx, &e, cs)
e.watcherEvents = newWatcherEvents(ctx, &e, cs)
e.messageEvents = newMessageEvents(e, api)
e.watcherEvents = newWatcherEvents(e, api)
return &e
// We need to take the lock as the observer could immediately try calling us.
e.lk.Lock()
e.lastTs = obs.Observe((*hcEventsObserver)(e))
e.lk.Unlock()
return e
}
// Called when there is a change to the head with tipsets to be
// reverted / applied
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
type hcEventsObserver hcEvents
func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
e.handleReverts(ts)
e.lastTs = ts
defer func() { e.lastTs = to }()
// Check if the head change caused any state changes that we were
// waiting for
stateChanges := e.checkStateChanges(from, to)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, from, to)
}
for _, ts := range app {
// Check if the head change caused any state changes that we were
// waiting for
stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts)
// Check if the head change included any new message calls
newCalls := e.checkNewCalls(ctx, from, to)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, e.lastTs, ts)
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, to)
}
// Check if the head change included any new message calls
newCalls, err := e.messageEvents.checkNewCalls(ts)
if err != nil {
return err
}
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, ts)
}
}
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
}
// Update the latest known tipset
e.lastTs = ts
}
for at := from.Height() + 1; at <= to.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ctx, at)
e.applyTimeouts(ctx, at, to)
}
return nil
}
func (e *hcEvents) handleReverts(ts *types.TipSet) {
reverts, ok := e.revertQueue[ts.Height()]
func (e *hcEventsObserver) Revert(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
defer func() { e.lastTs = to }()
reverts, ok := e.revertQueue[from.Height()]
if !ok {
return // nothing to do
return nil // nothing to do
}
for _, triggerH := range reverts {
toRevert := e.confQueue[triggerH][ts.Height()]
toRevert := e.confQueue[triggerH][from.Height()]
for _, event := range toRevert {
if !event.called {
continue // event wasn't apply()-ied yet
@ -182,24 +170,21 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) {
trigger := e.triggers[event.trigger]
if err := trigger.revert(e.ctx, ts); err != nil {
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err)
if err := trigger.revert(ctx, from); err != nil {
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", from.Height(), triggerH, err)
}
}
delete(e.confQueue[triggerH], ts.Height())
delete(e.confQueue[triggerH], from.Height())
}
delete(e.revertQueue, ts.Height())
delete(e.revertQueue, from.Height())
return nil
}
// Queue up events until the chain has reached a height that reflects the
// desired confidence
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
func (e *hcEventsObserver) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
trigger := e.triggers[trigID]
prevH := NoHeight
if prevTs != nil {
prevH = prevTs.Height()
}
appliedH := ts.Height()
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
@ -211,28 +196,23 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts
}
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
trigger: trigID,
prevH: prevH,
h: appliedH,
data: data,
trigger: trigID,
data: data,
tipset: ts,
prevTipset: prevTs,
})
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
}
// Apply any events that were waiting for this chain height for confidence
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
func (e *hcEventsObserver) applyWithConfidence(ctx context.Context, height abi.ChainEpoch) {
byOrigH, ok := e.confQueue[height]
if !ok {
return // no triggers at this height
}
for origH, events := range byOrigH {
triggerTs, err := e.tsc.get(origH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height)
}
for _, event := range events {
if event.called {
continue
@ -243,18 +223,7 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
continue
}
// Previous tipset - this is relevant for example in a state change
// from one tipset to another
var prevTs *types.TipSet
if event.prevH != NoHeight {
prevTs, err = e.tsc.get(event.prevH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height)
continue
}
}
more, err := trigger.handle(event.data, prevTs, triggerTs, height)
more, err := trigger.handle(ctx, event.data, event.prevTipset, event.tipset, height)
if err != nil {
log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
continue // don't revert failed calls
@ -273,8 +242,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
}
// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
func (e *hcEventsObserver) applyTimeouts(ctx context.Context, at abi.ChainEpoch, ts *types.TipSet) {
triggers, ok := e.timeouts[at]
if !ok {
return // nothing to do
}
@ -288,14 +257,15 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
continue
}
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
// This should be cached.
timeoutTs, err := e.cs.ChainGetTipSetAfterHeight(ctx, at-abi.ChainEpoch(trigger.confidence), ts.Key())
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
}
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
more, err := trigger.handle(ctx, nil, nil, timeoutTs, at)
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
continue // don't revert failed calls
}
@ -309,24 +279,19 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
// - RevertHandler: called if the chain head changes causing the event to revert
// - confidence: wait this many tipsets before calling EventHandler
// - timeout: at this chain height, timeout on waiting for this event
func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
e.lk.Lock()
defer e.lk.Unlock()
// Check if the event has already occurred
ts, err := e.tsc.best()
done, more, err := check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("error getting best tipset: %w", err)
}
done, more, err := check(ts)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
if done {
timeout = NoTimeout
}
// Create a trigger for the event
id := e.ctr
e.ctr++
@ -354,12 +319,11 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
// to listen for changes
type headChangeAPI interface {
onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
}
// watcherEvents watches for a state change
type watcherEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
@ -367,9 +331,8 @@ type watcherEvents struct {
matchers map[triggerID]StateMatchFunc
}
func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) watcherEvents {
func newWatcherEvents(hcAPI headChangeAPI, cs EventAPI) watcherEvents {
return watcherEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]StateMatchFunc),
@ -425,7 +388,7 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
// * `StateChangeHandler` is called when the specified state change was observed
// on-chain, and a confidence threshold was reached, or the specified `timeout`
// height was reached with no state change observed. When this callback is
// invoked on a timeout, `oldState` and `newState` are set to nil.
// invoked on a timeout, `oldTs` and `states are set to nil.
// This callback returns a boolean specifying whether further notifications
// should be sent, like `more` return param from `CheckFunc` above.
//
@ -438,7 +401,7 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
// the state change is queued up until the confidence interval has elapsed (and
// `StateChangeHandler` is called)
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
states, ok := data.(StateChange)
if data != nil && !ok {
panic("expected StateChange")
@ -447,7 +410,7 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler,
return scHnd(prevTs, ts, states, height)
}
id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
id, err := we.hcAPI.onHeadChanged(context.TODO(), check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
@ -461,43 +424,29 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler,
// messageEvents watches for message calls to actors
type messageEvents struct {
ctx context.Context
cs EventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID]MsgMatchFunc
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) messageEvents {
blsMsgCache, _ := lru.NewARC(500)
func newMessageEvents(hcAPI headChangeAPI, cs EventAPI) messageEvents {
return messageEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
blockMsgLk: sync.Mutex{},
blockMsgCache: blsMsgCache,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
}
}
// Check if there are any new actor calls
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) {
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return nil, err
}
func (me *messageEvents) checkNewCalls(ctx context.Context, from, to *types.TipSet) map[triggerID][]eventData {
me.lk.RLock()
defer me.lk.RUnlock()
// For each message in the tipset
res := make(map[triggerID][]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
me.messagesForTs(from, func(msg *types.Message) {
// TODO: provide receipts
// Run each trigger's matcher against the message
@ -516,47 +465,32 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventD
}
})
return res, nil
return res
}
// Get the messages in a tipset
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
me.blockMsgLk.Lock()
msgsI, ok := me.blockMsgCache.Get(tsb.Cid())
var err error
if !ok {
msgsI, err = me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
me.blockMsgLk.Unlock()
continue
}
me.blockMsgCache.Add(tsb.Cid(), msgsI)
for i, tsb := range ts.Cids() {
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb)
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s",
ts.Height(), tsb, ts.Blocks()[i].Messages, err)
continue
}
me.blockMsgLk.Unlock()
msgs := msgsI.(*api.BlockMessages)
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
for i, c := range msgs.Cids {
// We iterate over the CIDs to avoid having to recompute them.
_, ok := seen[c]
if ok {
continue
}
seen[m.Cid()] = struct{}{}
consume(m)
}
for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()]
if ok {
continue
seen[c] = struct{}{}
if i < len(msgs.BlsMessages) {
consume(msgs.BlsMessages[i])
} else {
consume(&msgs.SecpkMessages[i-len(msgs.BlsMessages)].Message)
}
seen[m.Message.Cid()] = struct{}{}
consume(&m.Message)
}
}
}
@ -596,14 +530,14 @@ type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
// * `MsgMatchFunc` is called against each message. If there is a match, the
// message is queued up until the confidence interval has elapsed (and
// `MsgHandler` is called)
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
func (me *messageEvents) Called(ctx context.Context, check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
msg, ok := data.(*types.Message)
if data != nil && !ok {
panic("expected msg")
}
ml, err := me.cs.StateSearchMsg(me.ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
ml, err := me.cs.StateSearchMsg(ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
if err != nil {
return false, err
}
@ -615,7 +549,7 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa
return msgHnd(msg, &ml.Receipt, ts, height)
}
id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
id, err := me.hcAPI.onHeadChanged(ctx, check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
@ -629,5 +563,5 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa
// Convenience function for checking and matching messages
func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
return me.Called(ctx, me.CheckMsg(msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
}

View File

@ -11,199 +11,235 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type heightEvents struct {
lk sync.Mutex
tsc *tipSetCache
gcConfidence abi.ChainEpoch
type heightHandler struct {
ts *types.TipSet
height abi.ChainEpoch
called bool
ctr triggerID
heightTriggers map[triggerID]*heightHandler
htTriggerHeights map[triggerH][]triggerID
htHeights map[msgH][]triggerID
ctx context.Context
handle HeightHandler
revert RevertHandler
}
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
type heightEvents struct {
api EventAPI
gcConfidence abi.ChainEpoch
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
// TODO: log error if h below gcconfidence
// revert height-based triggers
lk sync.Mutex
head *types.TipSet
tsHeights, triggerHeights map[abi.ChainEpoch][]*heightHandler
lastGc abi.ChainEpoch //nolint:structcheck
}
revert := func(h abi.ChainEpoch, ts *types.TipSet) {
for _, tid := range e.htHeights[h] {
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
rev := e.heightTriggers[tid].revert
e.lk.Unlock()
err := rev(ctx, ts)
e.lk.Lock()
e.heightTriggers[tid].called = false
span.End()
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
}
}
revert(ts.Height(), ts)
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
if cts != nil {
break
}
revert(subh, ts)
subh--
}
if err := e.tsc.revert(ts); err != nil {
return err
}
func newHeightEvents(api EventAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents {
he := &heightEvents{
api: api,
gcConfidence: gcConfidence,
tsHeights: map[abi.ChainEpoch][]*heightHandler{},
triggerHeights: map[abi.ChainEpoch][]*heightHandler{},
}
for i := range app {
ts := app[i]
if err := e.tsc.add(ts); err != nil {
return err
}
// height triggers
apply := func(h abi.ChainEpoch, ts *types.TipSet) error {
for _, tid := range e.htTriggerHeights[h] {
hnd := e.heightTriggers[tid]
if hnd.called {
return nil
}
triggerH := h - abi.ChainEpoch(hnd.confidence)
incTs, err := e.tsc.getNonNull(triggerH)
if err != nil {
return err
}
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
handle := hnd.handle
e.lk.Unlock()
err = handle(ctx, incTs, h)
e.lk.Lock()
hnd.called = true
span.End()
if err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err)
}
}
return nil
}
if err := apply(ts.Height(), ts); err != nil {
return err
}
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
if cts != nil {
break
}
if err := apply(subh, ts); err != nil {
return err
}
subh--
}
}
return nil
he.lk.Lock()
he.head = obs.Observe((*heightEventsObserver)(he))
he.lk.Unlock()
return he
}
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
best, err := e.tsc.best()
if err != nil {
e.lk.Unlock()
return xerrors.Errorf("error getting best tipset: %w", err)
// ts passed to handlers is the tipset at the specified epoch, or above if lower tipsets were null.
//
// The context governs cancellations of this call, it won't cancel the event handler.
func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
if abi.ChainEpoch(confidence) > e.gcConfidence {
// Need this to be able to GC effectively.
return xerrors.Errorf("confidence cannot be greater than gcConfidence: %d > %d", confidence, e.gcConfidence)
}
bestH := best.Height()
if bestH >= h+abi.ChainEpoch(confidence) {
ts, err := e.tsc.getNonNull(h)
if err != nil {
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
}
e.lk.Unlock()
ctx, span := trace.StartSpan(e.ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err = hnd(ctx, ts, bestH)
span.End()
if err != nil {
return err
}
e.lk.Lock()
best, err = e.tsc.best()
if err != nil {
e.lk.Unlock()
return xerrors.Errorf("error getting best tipset: %w", err)
}
bestH = best.Height()
}
defer e.lk.Unlock()
if bestH >= h+abi.ChainEpoch(confidence)+e.gcConfidence {
return nil
}
triggerAt := h + abi.ChainEpoch(confidence)
id := e.ctr
e.ctr++
e.heightTriggers[id] = &heightHandler{
confidence: confidence,
handler := &heightHandler{
height: h,
handle: hnd,
revert: rev,
}
triggerAt := h + abi.ChainEpoch(confidence)
e.htHeights[h] = append(e.htHeights[h], id)
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
// Here we try to jump onto a moving train. To avoid stopping the train, we release the lock
// while calling the API and/or the trigger functions. Unfortunately, it's entirely possible
// (although unlikely) to go back and forth across the trigger heights, so we need to keep
// going back and forth here till we're synced.
//
// TODO: Consider using a worker goroutine so we can just drop the handler in a channel? The
// downside is that we'd either need a tipset cache, or we'd need to potentially fetch
// tipsets in-line inside the event loop.
e.lk.Lock()
for {
head := e.head
if head.Height() >= h {
// Head is past the handler height. We at least need to stash the tipset to
// avoid doing this from the main event loop.
e.lk.Unlock()
var ts *types.TipSet
if head.Height() == h {
ts = head
} else {
var err error
ts, err = e.api.ChainGetTipSetAfterHeight(ctx, handler.height, head.Key())
if err != nil {
return xerrors.Errorf("events.ChainAt: failed to get tipset: %s", err)
}
}
// If we've applied the handler on the wrong tipset, revert.
if handler.called && !ts.Equals(handler.ts) {
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.revert(ctx, handler.ts)
span.End()
if err != nil {
return err
}
handler.called = false
}
// Save the tipset.
handler.ts = ts
// If we've reached confidence and haven't called, call.
if !handler.called && head.Height() >= triggerAt {
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.handle(ctx, handler.ts, head.Height())
span.End()
if err != nil {
return err
}
handler.called = true
// If we've reached gcConfidence, return without saving anything.
if head.Height() >= h+e.gcConfidence {
return nil
}
}
e.lk.Lock()
} else if handler.called {
// We're not passed the head (anymore) but have applied the handler. Revert, try again.
e.lk.Unlock()
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err := handler.revert(ctx, handler.ts)
span.End()
if err != nil {
return err
}
handler.called = false
e.lk.Lock()
} // otherwise, we changed heads but the change didn't matter.
// If we managed to get through this without the head changing, we're finally done.
if head.Equals(e.head) {
e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler)
e.tsHeights[h] = append(e.tsHeights[h], handler)
e.lk.Unlock()
return nil
}
}
}
// Updates the head and garbage collects if we're 2x over our garbage collection confidence period.
func (e *heightEventsObserver) updateHead(h *types.TipSet) {
e.lk.Lock()
defer e.lk.Unlock()
e.head = h
if e.head.Height() < e.lastGc+e.gcConfidence*2 {
return
}
e.lastGc = h.Height()
targetGcHeight := e.head.Height() - e.gcConfidence
for h := range e.tsHeights {
if h >= targetGcHeight {
continue
}
delete(e.tsHeights, h)
}
for h := range e.triggerHeights {
if h >= targetGcHeight {
continue
}
delete(e.triggerHeights, h)
}
}
type heightEventsObserver heightEvents
func (e *heightEventsObserver) Revert(ctx context.Context, from, to *types.TipSet) error {
// Update the head first so we don't accidental skip reverting a concurrent call to ChainAt.
e.updateHead(to)
// Call revert on all hights between the two tipsets, handling empty tipsets.
for h := from.Height(); h > to.Height(); h-- {
e.lk.Lock()
triggers := e.tsHeights[h]
e.lk.Unlock()
// 1. Triggers are only invoked from the global event loop, we don't need to hold the lock while calling.
// 2. We only ever append to or replace the trigger slice, so it's safe to iterate over it without the lock.
for _, handler := range triggers {
handler.ts = nil // invalidate
if !handler.called {
// We haven't triggered this yet, or there has been a concurrent call to ChainAt.
continue
}
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
err := handler.revert(ctx, from)
span.End()
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
handler.called = false
}
}
return nil
}
func (e *heightEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
// Update the head first so we don't accidental skip applying a concurrent call to ChainAt.
e.updateHead(to)
for h := from.Height() + 1; h <= to.Height(); h++ {
e.lk.Lock()
triggers := e.triggerHeights[h]
tipsets := e.tsHeights[h]
e.lk.Unlock()
// Stash the tipset for future triggers.
for _, handler := range tipsets {
handler.ts = to
}
// Trigger the ready triggers.
for _, handler := range triggers {
if handler.called {
// We may have reverted past the trigger point, but not past the call point.
// Or there has been a concurrent call to ChainAt.
continue
}
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
err := handler.handle(ctx, handler.ts, h)
span.End()
if err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", h, to.Height(), err)
}
handler.called = true
}
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,42 @@
package events
import (
"context"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
)
type messageCache struct {
api EventAPI
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageCache(api EventAPI) *messageCache {
blsMsgCache, _ := lru.NewARC(500)
return &messageCache{
api: api,
blockMsgCache: blsMsgCache,
}
}
func (c *messageCache) ChainGetBlockMessages(ctx context.Context, blkCid cid.Cid) (*api.BlockMessages, error) {
c.blockMsgLk.Lock()
defer c.blockMsgLk.Unlock()
msgsI, ok := c.blockMsgCache.Get(blkCid)
var err error
if !ok {
msgsI, err = c.api.ChainGetBlockMessages(ctx, blkCid)
if err != nil {
return nil, err
}
c.blockMsgCache.Add(blkCid, msgsI)
}
return msgsI.(*api.BlockMessages), nil
}

255
chain/events/observer.go Normal file
View File

@ -0,0 +1,255 @@
package events
import (
"context"
"sync"
"time"
"github.com/filecoin-project/go-state-types/abi"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
type observer struct {
api EventAPI
gcConfidence abi.ChainEpoch
ready chan struct{}
lk sync.Mutex
head *types.TipSet
maxHeight abi.ChainEpoch
observers []TipSetObserver
}
func newObserver(api *cache, gcConfidence abi.ChainEpoch) *observer {
obs := &observer{
api: api,
gcConfidence: gcConfidence,
ready: make(chan struct{}),
observers: []TipSetObserver{},
}
obs.Observe(api.observer())
return obs
}
func (o *observer) start(ctx context.Context) error {
go o.listenHeadChanges(ctx)
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-o.ready:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (o *observer) listenHeadChanges(ctx context.Context) {
for {
if err := o.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
log.Info("restarting listenHeadChanges")
}
}
func (o *observer) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := o.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
curHead := cur[0].Val
o.lk.Lock()
if o.head == nil {
o.head = curHead
close(o.ready)
}
startHead := o.head
o.lk.Unlock()
if !startHead.Equals(curHead) {
changes, err := o.api.ChainGetPath(ctx, startHead.Key(), curHead.Key())
if err != nil {
return xerrors.Errorf("failed to get path from last applied tipset to head: %w", err)
}
if err := o.applyChanges(ctx, changes); err != nil {
return xerrors.Errorf("failed catch-up head changes: %w", err)
}
}
for changes := range notifs {
if err := o.applyChanges(ctx, changes); err != nil {
return err
}
}
return nil
}
func (o *observer) applyChanges(ctx context.Context, changes []*api.HeadChange) error {
// Used to wait for a prior notification round to finish (by tests)
if len(changes) == 0 {
return nil
}
var rev, app []*types.TipSet
for _, changes := range changes {
switch changes.Type {
case store.HCRevert:
rev = append(rev, changes.Val)
case store.HCApply:
app = append(app, changes.Val)
default:
log.Errorf("unexpected head change notification type: '%s'", changes.Type)
}
}
if err := o.headChange(ctx, rev, app); err != nil {
return xerrors.Errorf("failed to apply head changes: %w", err)
}
return nil
}
func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "events.HeadChange")
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
o.lk.Lock()
head := o.head
o.lk.Unlock()
defer func() {
span.AddAttributes(trace.Int64Attribute("endHeight", int64(head.Height())))
span.End()
}()
// NOTE: bailing out here if the head isn't what we expected is fine. We'll re-start the
// entire process and handle any strange reorgs.
for i, from := range rev {
if !from.Equals(head) {
return xerrors.Errorf(
"expected to revert %s (%d), reverting %s (%d)",
head.Key(), head.Height(), from.Key(), from.Height(),
)
}
var to *types.TipSet
if i+1 < len(rev) {
// If we have more reverts, the next revert is the next head.
to = rev[i+1]
} else {
// At the end of the revert sequenece, we need to lookup the joint tipset
// between the revert sequence and the apply sequence.
var err error
to, err = o.api.ChainGetTipSet(ctx, from.Parents())
if err != nil {
// Well, this sucks. We'll bail and restart.
return xerrors.Errorf("failed to get tipset when reverting due to a SetHeead: %w", err)
}
}
// Get the current observers and atomically set the head.
//
// 1. We need to get the observers every time in case some registered/deregistered.
// 2. We need to atomically set the head so new observers don't see events twice or
// skip them.
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()
for _, obs := range observers {
if err := obs.Revert(ctx, from, to); err != nil {
log.Errorf("observer %T failed to apply tipset %s (%d) with: %s", obs, from.Key(), from.Height(), err)
}
}
if to.Height() < o.maxHeight-o.gcConfidence {
log.Errorf("reverted past finality, from %d to %d", o.maxHeight, to.Height())
}
head = to
}
for _, to := range app {
if to.Parents() != head.Key() {
return xerrors.Errorf(
"cannot apply %s (%d) with parents %s on top of %s (%d)",
to.Key(), to.Height(), to.Parents(), head.Key(), head.Height(),
)
}
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()
for _, obs := range observers {
if err := obs.Apply(ctx, head, to); err != nil {
log.Errorf("observer %T failed to revert tipset %s (%d) with: %s", obs, to.Key(), to.Height(), err)
}
}
if to.Height() > o.maxHeight {
o.maxHeight = to.Height()
}
head = to
}
return nil
}
// Observe registers the observer, and returns the current tipset. The observer is guaranteed to
// observe events starting at this tipset.
//
// Returns nil if the observer hasn't started yet (but still registers).
func (o *observer) Observe(obs TipSetObserver) *types.TipSet {
o.lk.Lock()
defer o.lk.Unlock()
o.observers = append(o.observers, obs)
return o.head
}

View File

@ -11,7 +11,9 @@ import (
)
type tsCacheAPI interface {
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
}
@ -20,61 +22,157 @@ type tsCacheAPI interface {
type tipSetCache struct {
mu sync.RWMutex
cache []*types.TipSet
start int
len int
byKey map[types.TipSetKey]*types.TipSet
byHeight []*types.TipSet
start int // chain head (end)
len int
storage tsCacheAPI
}
func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
func newTSCache(storage tsCacheAPI, cap abi.ChainEpoch) *tipSetCache {
return &tipSetCache{
cache: make([]*types.TipSet, cap),
start: 0,
len: 0,
byKey: make(map[types.TipSetKey]*types.TipSet, cap),
byHeight: make([]*types.TipSet, cap),
start: 0,
len: 0,
storage: storage,
}
}
func (tsc *tipSetCache) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
if ts, ok := tsc.byKey[tsk]; ok {
return ts, nil
}
return tsc.storage.ChainGetTipSet(ctx, tsk)
}
func (tsc *tipSetCache) add(ts *types.TipSet) error {
func (tsc *tipSetCache) ChainGetTipSetByHeight(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return tsc.get(ctx, height, tsk, true)
}
func (tsc *tipSetCache) ChainGetTipSetAfterHeight(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return tsc.get(ctx, height, tsk, false)
}
func (tsc *tipSetCache) get(ctx context.Context, height abi.ChainEpoch, tsk types.TipSetKey, prev bool) (*types.TipSet, error) {
fallback := tsc.storage.ChainGetTipSetAfterHeight
if prev {
fallback = tsc.storage.ChainGetTipSetByHeight
}
tsc.mu.RLock()
// Nothing in the cache?
if tsc.len == 0 {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return fallback(ctx, height, tsk)
}
// Resolve the head.
head := tsc.byHeight[tsc.start]
if !tsk.IsEmpty() {
// Not on this chain?
var ok bool
head, ok = tsc.byKey[tsk]
if !ok {
tsc.mu.RUnlock()
return fallback(ctx, height, tsk)
}
}
headH := head.Height()
tailH := headH - abi.ChainEpoch(tsc.len)
if headH == height {
tsc.mu.RUnlock()
return head, nil
} else if headH < height {
tsc.mu.RUnlock()
// If the user doesn't pass a tsk, we assume "head" is the last tipset we processed.
return nil, xerrors.Errorf("requested epoch is in the future")
} else if height < tailH {
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tailH)
tsc.mu.RUnlock()
return fallback(ctx, height, head.Key())
}
direction := 1
if prev {
direction = -1
}
var ts *types.TipSet
for i := 0; i < tsc.len && ts == nil; i += direction {
ts = tsc.byHeight[normalModulo(tsc.start-int(headH-height)+i, len(tsc.byHeight))]
}
tsc.mu.RUnlock()
return ts, nil
}
func (tsc *tipSetCache) ChainHead(ctx context.Context) (*types.TipSet, error) {
tsc.mu.RLock()
best := tsc.byHeight[tsc.start]
tsc.mu.RUnlock()
if best == nil {
return tsc.storage.ChainHead(ctx)
}
return best, nil
}
func (tsc *tipSetCache) add(to *types.TipSet) error {
tsc.mu.Lock()
defer tsc.mu.Unlock()
if tsc.len > 0 {
if tsc.cache[tsc.start].Height() >= ts.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
best := tsc.byHeight[tsc.start]
if best.Height() >= to.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.byHeight[tsc.start].Height()+1, to.Height())
}
if best.Key() != to.Parents() {
return xerrors.Errorf(
"tipSetCache.add: expected new tipset %s (%d) to follow %s (%d), its parents are %s",
to.Key(), to.Height(), best.Key(), best.Height(), best.Parents(),
)
}
}
nextH := ts.Height()
nextH := to.Height()
if tsc.len > 0 {
nextH = tsc.cache[tsc.start].Height() + 1
nextH = tsc.byHeight[tsc.start].Height() + 1
}
// fill null blocks
for nextH != ts.Height() {
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = nil
if tsc.len < len(tsc.cache) {
for nextH != to.Height() {
tsc.start = normalModulo(tsc.start+1, len(tsc.byHeight))
was := tsc.byHeight[tsc.start]
if was != nil {
tsc.byHeight[tsc.start] = nil
delete(tsc.byKey, was.Key())
}
if tsc.len < len(tsc.byHeight) {
tsc.len++
}
nextH++
}
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = ts
if tsc.len < len(tsc.cache) {
tsc.start = normalModulo(tsc.start+1, len(tsc.byHeight))
was := tsc.byHeight[tsc.start]
if was != nil {
delete(tsc.byKey, was.Key())
}
tsc.byHeight[tsc.start] = to
if tsc.len < len(tsc.byHeight) {
tsc.len++
}
tsc.byKey[to.Key()] = to
return nil
}
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
func (tsc *tipSetCache) revert(from *types.TipSet) error {
tsc.mu.Lock()
defer tsc.mu.Unlock()
return tsc.revertUnlocked(ts)
return tsc.revertUnlocked(from)
}
func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
@ -82,75 +180,35 @@ func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
return nil // this can happen, and it's fine
}
if !tsc.cache[tsc.start].Equals(ts) {
was := tsc.byHeight[tsc.start]
if !was.Equals(ts) {
return xerrors.New("tipSetCache.revert: revert tipset didn't match cache head")
}
delete(tsc.byKey, was.Key())
tsc.cache[tsc.start] = nil
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
tsc.byHeight[tsc.start] = nil
tsc.start = normalModulo(tsc.start-1, len(tsc.byHeight))
tsc.len--
_ = tsc.revertUnlocked(nil) // revert null block gap
return nil
}
func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error) {
for {
ts, err := tsc.get(height)
if err != nil {
return nil, err
}
if ts != nil {
return ts, nil
}
height++
}
func (tsc *tipSetCache) observer() TipSetObserver {
return (*tipSetCacheObserver)(tsc)
}
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
tsc.mu.RLock()
type tipSetCacheObserver tipSetCache
if tsc.len == 0 {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
}
var _ TipSetObserver = new(tipSetCacheObserver)
headH := tsc.cache[tsc.start].Height()
if height > headH {
tsc.mu.RUnlock()
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
}
clen := len(tsc.cache)
var tail *types.TipSet
for i := 1; i <= tsc.len; i++ {
tail = tsc.cache[normalModulo(tsc.start-tsc.len+i, clen)]
if tail != nil {
break
}
}
if height < tail.Height() {
tsc.mu.RUnlock()
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
}
ts := tsc.cache[normalModulo(tsc.start-int(headH-height), clen)]
tsc.mu.RUnlock()
return ts, nil
func (tsc *tipSetCacheObserver) Apply(_ context.Context, _, to *types.TipSet) error {
return (*tipSetCache)(tsc).add(to)
}
func (tsc *tipSetCache) best() (*types.TipSet, error) {
tsc.mu.RLock()
best := tsc.cache[tsc.start]
tsc.mu.RUnlock()
if best == nil {
return tsc.storage.ChainHead(context.TODO())
}
return best, nil
func (tsc *tipSetCacheObserver) Revert(ctx context.Context, from, _ *types.TipSet) error {
return (*tipSetCache)(tsc).revert(from)
}
func normalModulo(n, m int) int {

View File

@ -6,61 +6,22 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
)
func TestTsCache(t *testing.T) {
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})
h := abi.ChainEpoch(75)
a, _ := address.NewFromString("t00")
add := func() {
ts, err := types.NewTipSet([]*types.BlockHeader{{
Miner: a,
Height: h,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
if err != nil {
t.Fatal(err)
}
if err := tsc.add(ts); err != nil {
t.Fatal(err)
}
h++
}
for i := 0; i < 9000; i++ {
if i%90 > 60 {
best, err := tsc.best()
if err != nil {
t.Fatal(err, "; i:", i)
return
}
if err := tsc.revert(best); err != nil {
t.Fatal(err, "; i:", i)
return
}
h--
} else {
add()
}
}
}
type tsCacheAPIFailOnStorageCall struct {
t *testing.T
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
@ -69,100 +30,181 @@ func (tc *tsCacheAPIFailOnStorageCall) ChainHead(ctx context.Context) (*types.Ti
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
tc.t.Fatal("storage call")
return &types.TipSet{}, nil
}
type cacheHarness struct {
t *testing.T
miner address.Address
tsc *tipSetCache
height abi.ChainEpoch
}
func newCacheharness(t *testing.T) *cacheHarness {
a, err := address.NewFromString("t00")
require.NoError(t, err)
h := &cacheHarness{
t: t,
tsc: newTSCache(&tsCacheAPIFailOnStorageCall{t: t}, 50),
height: 75,
miner: a,
}
h.addWithParents(nil)
return h
}
func (h *cacheHarness) addWithParents(parents []cid.Cid) {
ts, err := types.NewTipSet([]*types.BlockHeader{{
Miner: h.miner,
Height: h.height,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Parents: parents,
}})
require.NoError(h.t, err)
require.NoError(h.t, h.tsc.add(ts))
h.height++
}
func (h *cacheHarness) add() {
last, err := h.tsc.ChainHead(context.Background())
require.NoError(h.t, err)
h.addWithParents(last.Cids())
}
func (h *cacheHarness) revert() {
best, err := h.tsc.ChainHead(context.Background())
require.NoError(h.t, err)
err = h.tsc.revert(best)
require.NoError(h.t, err)
h.height--
}
func (h *cacheHarness) skip(n abi.ChainEpoch) {
h.height += n
}
func TestTsCache(t *testing.T) {
h := newCacheharness(t)
for i := 0; i < 9000; i++ {
if i%90 > 60 {
h.revert()
} else {
h.add()
}
}
}
func TestTsCacheNulls(t *testing.T) {
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})
ctx := context.Background()
h := newCacheharness(t)
h := abi.ChainEpoch(75)
h.add()
h.add()
h.add()
h.skip(5)
a, _ := address.NewFromString("t00")
add := func() {
ts, err := types.NewTipSet([]*types.BlockHeader{{
Miner: a,
Height: h,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
if err != nil {
t.Fatal(err)
}
if err := tsc.add(ts); err != nil {
t.Fatal(err)
}
h++
}
h.add()
h.add()
add()
add()
add()
h += 5
add()
add()
best, err := tsc.best()
best, err := h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, h-1, best.Height())
require.Equal(t, h.height-1, best.Height())
ts, err := tsc.get(h - 1)
ts, err := h.tsc.ChainGetTipSetByHeight(ctx, h.height-1, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h-1, ts.Height())
require.Equal(t, h.height-1, ts.Height())
ts, err = tsc.get(h - 2)
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-2, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h-2, ts.Height())
require.Equal(t, h.height-2, ts.Height())
ts, err = tsc.get(h - 3)
// Should skip the nulls and walk back to the last tipset.
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-3, types.EmptyTSK)
require.NoError(t, err)
require.Nil(t, ts)
require.Equal(t, h.height-8, ts.Height())
ts, err = tsc.get(h - 8)
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-8, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h-8, ts.Height())
require.Equal(t, h.height-8, ts.Height())
best, err = tsc.best()
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.NoError(t, tsc.revert(best))
require.NoError(t, h.tsc.revert(best))
best, err = tsc.best()
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.NoError(t, tsc.revert(best))
require.NoError(t, h.tsc.revert(best))
best, err = tsc.best()
best, err = h.tsc.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, h-8, best.Height())
require.Equal(t, h.height-8, best.Height())
h += 50
add()
h.skip(50)
h.add()
ts, err = tsc.get(h - 1)
ts, err = h.tsc.ChainGetTipSetByHeight(ctx, h.height-1, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, h-1, ts.Height())
require.Equal(t, h.height-1, ts.Height())
}
type tsCacheAPIStorageCallCounter struct {
t *testing.T
chainGetTipSetByHeight int
chainHead int
t *testing.T
chainGetTipSetByHeight int
chainGetTipSetAfterHeight int
chainGetTipSet int
chainHead int
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSetByHeight++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSetAfterHeight++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainHead(ctx context.Context) (*types.TipSet, error) {
tc.chainHead++
return &types.TipSet{}, nil
}
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
tc.chainGetTipSet++
return &types.TipSet{}, nil
}
func TestTsCacheEmpty(t *testing.T) {
// Calling best on an empty cache should just call out to the chain API
callCounter := &tsCacheAPIStorageCallCounter{t: t}
tsc := newTSCache(50, callCounter)
_, err := tsc.best()
tsc := newTSCache(callCounter, 50)
_, err := tsc.ChainHead(context.Background())
require.NoError(t, err)
require.Equal(t, 1, callCounter.chainHead)
}
func TestTsCacheSkip(t *testing.T) {
h := newCacheharness(t)
ts, err := types.NewTipSet([]*types.BlockHeader{{
Miner: h.miner,
Height: h.height,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
// With parents that don't match the last block.
Parents: nil,
}})
require.NoError(h.t, err)
err = h.tsc.add(ts)
require.Error(t, err)
}

View File

@ -10,10 +10,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc {
func (me *messageEvents) CheckMsg(smsg types.ChainMsg, hnd MsgHandler) CheckFunc {
msg := smsg.VMMessage()
return func(ts *types.TipSet) (done bool, more bool, err error) {
return func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key())
if err != nil {
return false, true, err
@ -24,7 +24,7 @@ func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd
return false, true, nil
}
ml, err := me.cs.StateSearchMsg(me.ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
ml, err := me.cs.StateSearchMsg(ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
if err != nil {
return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err)
}

View File

@ -31,7 +31,6 @@ import (
lru "github.com/hashicorp/golang-lru"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dstore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor"
@ -294,27 +293,36 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange
}}
go func() {
defer close(out)
var unsubOnce sync.Once
defer func() {
// Tell the caller we're done first, the following may block for a bit.
close(out)
// Unsubscribe.
cs.bestTips.Unsub(subch)
// Drain the channel.
for range subch {
}
}()
for {
select {
case val, ok := <-subch:
if !ok {
log.Warn("chain head sub exit loop")
// Shutting down.
return
}
select {
case out <- val.([]*api.HeadChange):
default:
log.Errorf("closing head change subscription due to slow reader")
return
}
if len(out) > 5 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
select {
case out <- val.([]*api.HeadChange):
case <-ctx.Done():
}
case <-ctx.Done():
unsubOnce.Do(func() {
go cs.bestTips.Unsub(subch)
})
return
}
}
}()
@ -642,7 +650,7 @@ func (cs *ChainStore) FlushValidationCache() error {
return FlushValidationCache(cs.metadataDs)
}
func FlushValidationCache(ds datastore.Batching) error {
func FlushValidationCache(ds dstore.Batching) error {
log.Infof("clearing block validation cache...")
dsWalk, err := ds.Query(query.Query{
@ -674,7 +682,7 @@ func FlushValidationCache(ds datastore.Batching) error {
for _, k := range allKeys {
if strings.HasPrefix(k.Key, blockValidationCacheKeyPrefix.String()) {
delCnt++
batch.Delete(datastore.RawKey(k.Key)) // nolint:errcheck
batch.Delete(dstore.RawKey(k.Key)) // nolint:errcheck
}
}

View File

@ -1,4 +1,5 @@
//+build gofuzz
//go:build gofuzz
// +build gofuzz
package types

View File

@ -1,3 +1,4 @@
//go:build !nodaemon
// +build !nodaemon
package main

View File

@ -1,3 +1,4 @@
//go:build nodaemon
// +build nodaemon
package main

View File

@ -1,3 +1,4 @@
//go:build debug
// +build debug
package main

View File

@ -1,4 +1,5 @@
//+build cgo
//go:build cgo
// +build cgo
package ffiwrapper

View File

@ -1,4 +1,5 @@
//+build cgo
//go:build cgo
// +build cgo
package ffiwrapper

View File

@ -1,4 +1,5 @@
//+build cgo
//go:build cgo
// +build cgo
package ffiwrapper

View File

@ -1,3 +1,4 @@
//go:build !linux
// +build !linux
package fsutil

View File

@ -40,6 +40,7 @@ type TargetAPI interface {
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
@ -216,6 +217,10 @@ func (gw *Node) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, erro
return gw.target.ChainNotify(ctx)
}
func (gw *Node) ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error) {
return gw.target.ChainGetPath(ctx, from, to)
}
func (gw *Node) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
return gw.target.ChainReadObj(ctx, c)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
logging "github.com/ipfs/go-log/v2"
)
func TestAPI(t *testing.T) {
@ -39,6 +40,7 @@ func runAPITest(t *testing.T, opts ...interface{}) {
t.Run("testConnectTwo", ts.testConnectTwo)
t.Run("testMining", ts.testMining)
t.Run("testMiningReal", ts.testMiningReal)
t.Run("testSlowNotify", ts.testSlowNotify)
t.Run("testSearchMsg", ts.testSearchMsg)
t.Run("testNonGenesisMiner", ts.testNonGenesisMiner)
}
@ -169,6 +171,51 @@ func (ts *apiSuite) testMiningReal(t *testing.T) {
ts.testMining(t)
}
func (ts *apiSuite) testSlowNotify(t *testing.T) {
_ = logging.SetLogLevel("rpc", "ERROR")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
full, miner, _ := kit.EnsembleMinimal(t, ts.opts...)
// Subscribe a bunch of times to make sure we fill up any RPC buffers.
var newHeadsChans []<-chan []*lapi.HeadChange
for i := 0; i < 100; i++ {
newHeads, err := full.ChainNotify(ctx)
require.NoError(t, err)
newHeadsChans = append(newHeadsChans, newHeads)
}
initHead := (<-newHeadsChans[0])[0]
baseHeight := initHead.Val.Height()
bm := kit.NewBlockMiner(t, miner)
bm.MineBlocks(ctx, time.Microsecond)
full.WaitTillChain(ctx, kit.HeightAtLeast(baseHeight+100))
// Make sure they were all closed, draining any buffered events first.
for _, ch := range newHeadsChans {
var ok bool
for ok {
select {
case _, ok = <-ch:
default:
t.Fatal("expected new heads channel to be closed")
}
}
}
// Make sure we can resubscribe and everything still works.
newHeads, err := full.ChainNotify(ctx)
require.NoError(t, err)
for i := 0; i < 10; i++ {
_, ok := <-newHeads
require.True(t, ok, "notify channel closed")
}
}
func (ts *apiSuite) testNonGenesisMiner(t *testing.T) {
ctx := context.Background()

View File

@ -103,10 +103,11 @@ func TestPaymentChannelsAPI(t *testing.T) {
creatorStore := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(paymentCreator)))
// wait for the receiver to submit their vouchers
ev := events.NewEvents(ctx, paymentCreator)
ev, err := events.NewEvents(ctx, paymentCreator)
require.NoError(t, err)
preds := state.NewStatePredicates(paymentCreator)
finished := make(chan struct{})
err = ev.StateChanged(func(ts *types.TipSet) (done bool, more bool, err error) {
err = ev.StateChanged(func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
act, err := paymentCreator.StateGetActor(ctx, channel, ts.Key())
if err != nil {
return false, false, err

View File

@ -381,8 +381,9 @@ func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) {
// waitForHeight waits for the node to reach the given chain epoch
func waitForHeight(ctx context.Context, t *testing.T, node kit.TestFullNode, height abi.ChainEpoch) {
atHeight := make(chan struct{})
chainEvents := events.NewEvents(ctx, node)
err := chainEvents.ChainAt(func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
chainEvents, err := events.NewEvents(ctx, node)
require.NoError(t, err)
err = chainEvents.ChainAt(ctx, func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
close(atHeight)
return nil
}, nil, 1, height)

View File

@ -1,3 +1,4 @@
//go:build freebsd
// +build freebsd
package ulimit

View File

@ -1,3 +1,4 @@
//go:build !windows
// +build !windows
package ulimit

View File

@ -1,3 +1,4 @@
//go:build darwin || linux || netbsd || openbsd
// +build darwin linux netbsd openbsd
package ulimit

View File

@ -50,11 +50,14 @@ type clientApi struct {
full.MpoolAPI
}
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) (storagemarket.StorageClientNode, error) {
capi := &clientApi{chain, stateapi, mpool}
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, capi)
ev, err := events.NewEvents(ctx, capi)
if err != nil {
return nil, err
}
a := &ClientNodeAdapter{
clientApi: capi,
@ -63,7 +66,7 @@ func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi ful
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
}
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
return a
return a, nil
}
func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) {
@ -262,7 +265,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
if ts == nil {
// keep listening for events
return false, true, nil

View File

@ -22,7 +22,7 @@ import (
)
type eventsCalledAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
Called(ctx context.Context, check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}
type dealInfoAPI interface {
@ -64,7 +64,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
dealInfo, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
@ -165,7 +165,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
return nil
}
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(ctx, checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
@ -182,7 +182,7 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
_, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
@ -257,7 +257,7 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
return nil
}
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(ctx, checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}

View File

@ -477,13 +477,13 @@ type fakeEvents struct {
DealStartEpochTimeout bool
}
func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
func (fe *fakeEvents) Called(ctx context.Context, check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
if fe.DealStartEpochTimeout {
msgHnd(nil, nil, nil, 100) // nolint:errcheck
return nil
}
_, more, err := check(fe.CheckTs)
_, more, err := check(ctx, fe.CheckTs)
if err != nil {
return err
}
@ -506,7 +506,7 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
return nil
}
if matchMessage.doesRevert {
err := rev(fe.Ctx, matchMessage.ts)
err := rev(ctx, matchMessage.ts)
if err != nil {
return err
}

View File

@ -55,11 +55,14 @@ type ProviderNodeAdapter struct {
scMgr *SectorCommittedManager
}
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) (storagemarket.StorageProviderNode, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) (storagemarket.StorageProviderNode, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, full)
ev, err := events.NewEvents(ctx, full)
if err != nil {
return nil, err
}
na := &ProviderNodeAdapter{
FullNode: full,
@ -77,7 +80,7 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf
}
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
return na
return na, nil
}
}
@ -340,7 +343,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
checkFunc := func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
if ts == nil {
// keep listening for events
return false, true, nil

View File

@ -56,8 +56,11 @@ func SettlePaymentChannels(mctx helpers.MetricsCtx, lc fx.Lifecycle, papi API) e
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
pcs := newPaymentChannelSettler(ctx, &papi)
ev := events.NewEvents(ctx, papi)
return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
ev, err := events.NewEvents(ctx, &papi)
if err != nil {
return err
}
return ev.Called(ctx, pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
},
})
return nil
@ -70,7 +73,7 @@ func newPaymentChannelSettler(ctx context.Context, api settlerAPI) *paymentChann
}
}
func (pcs *paymentChannelSettler) check(ts *types.TipSet) (done bool, more bool, err error) {
func (pcs *paymentChannelSettler) check(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error) {
return false, true, nil
}

View File

@ -21,7 +21,7 @@ func NewEventsAdapter(api *events.Events) EventsAdapter {
}
func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error {
return e.delegate.ChainAt(func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return e.delegate.ChainAt(context.TODO(), func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return hnd(ctx, ts.Key().Bytes(), curH)
}, func(ctx context.Context, ts *types.TipSet) error {
return rev(ctx, ts.Key().Bytes())

View File

@ -116,6 +116,7 @@ type fullNodeFilteredAPI interface {
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error)
@ -167,28 +168,29 @@ func (m *Miner) Run(ctx context.Context) error {
return xerrors.Errorf("getting miner info: %w", err)
}
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
evtsAdapter = NewEventsAdapter(evts)
// consumer of chain head changes.
evts, err := events.NewEvents(ctx, m.api)
if err != nil {
return xerrors.Errorf("failed to subscribe to events: %w", err)
}
evtsAdapter := NewEventsAdapter(evts)
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI := NewSealingAPIAdapter(m.api)
// Instantiate a precommit policy.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
provingBuffer = md.WPoStProvingPeriod * 2
// Instantiate a precommit policy.
cfg := sealing.GetSealingConfigFunc(m.getSealConfig)
provingBuffer := md.WPoStProvingPeriod * 2
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer)
// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
)
// address selector.
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Instantiate the sealing FSM.
m.sealing = sealing.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)

View File

@ -139,7 +139,10 @@ func GetTips(ctx context.Context, api v0api.FullNode, lastHeight abi.ChainEpoch,
for {
select {
case changes := <-notif:
case changes, ok := <-notif:
if !ok {
return
}
for _, change := range changes {
log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)