Merge branch 'next' into feat/specs-actors-v0.7.0

This commit is contained in:
Łukasz Magiera 2020-06-29 22:45:50 +02:00
commit 24529bf35d
22 changed files with 1111 additions and 164 deletions

View File

@ -6,7 +6,7 @@ all: build
unexport GOFLAGS unexport GOFLAGS
GOVERSION:=$(shell go version | cut -d' ' -f 3 | cut -d. -f 2) GOVERSION:=$(shell go version | cut -d' ' -f 3 | cut -d. -f 2)
ifeq ($(shell expr $(GOVERSION) \< 13), 1) ifeq ($(shell expr $(GOVERSION) \< 14), 1)
$(warning Your Golang version is go 1.$(GOVERSION)) $(warning Your Golang version is go 1.$(GOVERSION))
$(error Update Golang to version $(shell grep '^go' go.mod)) $(error Update Golang to version $(shell grep '^go' go.mod))
endif endif

View File

@ -206,7 +206,7 @@ type FullNode interface {
StateCall(context.Context, *types.Message, types.TipSetKey) (*InvocResult, error) StateCall(context.Context, *types.Message, types.TipSetKey) (*InvocResult, error)
StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*ActorState, error) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error)
StateListMessages(ctx context.Context, match *types.Message, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error) StateListMessages(ctx context.Context, match *types.Message, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error)
StateNetworkName(context.Context) (dtypes.NetworkName, error) StateNetworkName(context.Context) (dtypes.NetworkName, error)

View File

@ -139,7 +139,7 @@ type FullNodeStruct struct {
StateCall func(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) `perm:"read"` StateCall func(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) `perm:"read"`
StateReplay func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) `perm:"read"` StateReplay func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) `perm:"read"`
StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"` StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, types.TipSetKey) (*api.ActorState, error) `perm:"read"` StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"`
StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"` StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"`
StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"` StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"`
StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
@ -619,8 +619,8 @@ func (c *FullNodeStruct) StateGetActor(ctx context.Context, actor address.Addres
return c.Internal.StateGetActor(ctx, actor, tsk) return c.Internal.StateGetActor(ctx, actor, tsk)
} }
func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { func (c *FullNodeStruct) StateReadState(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.ActorState, error) {
return c.Internal.StateReadState(ctx, act, tsk) return c.Internal.StateReadState(ctx, addr, tsk)
} }
func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, tsk types.TipSetKey) (types.BigInt, error) { func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, tsk types.TipSetKey) (types.BigInt, error) {

View File

@ -51,7 +51,7 @@ type Events struct {
readyOnce sync.Once readyOnce sync.Once
heightEvents heightEvents
calledEvents *hcEvents
} }
func NewEvents(ctx context.Context, api eventAPI) *Events { func NewEvents(ctx context.Context, api eventAPI) *Events {
@ -74,18 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{}, htHeights: map[abi.ChainEpoch][]uint64{},
}, },
calledEvents: calledEvents{ hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
cs: api,
tsc: tsc,
ctx: ctx,
gcConfidence: uint64(gcConfidence),
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerID]*callHandler{},
matchers: map[triggerID][]MatchFunc{},
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
},
} }
e.ready.Add(1) e.ready.Add(1)
@ -143,7 +132,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
} }
e.readyOnce.Do(func() { e.readyOnce.Do(func() {
e.at = cur[0].Val.Height() e.lastTs = cur[0].Val
e.ready.Done() e.ready.Done()
}) })
@ -186,5 +175,5 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err return err
} }
return e.headChangeCalled(rev, app) return e.processHeadChangeEvent(rev, app)
} }

View File

@ -13,6 +13,7 @@ import (
) )
const NoTimeout = math.MaxInt64 const NoTimeout = math.MaxInt64
const NoHeight = abi.ChainEpoch(-1)
type triggerID = uint64 type triggerID = uint64
@ -23,54 +24,60 @@ type msgH = abi.ChainEpoch
// message (msgH+confidence) // message (msgH+confidence)
type triggerH = abi.ChainEpoch type triggerH = abi.ChainEpoch
// CalledHandler arguments: type eventData interface{}
// `ts` is the tipset, in which the `msg` is included.
// EventHandler arguments:
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
// `curH`-`ts.Height` = `confidence` // `curH`-`ts.Height` = `confidence`
type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
// CheckFunc is used for atomicity guarantees. If the condition the callbacks // CheckFunc is used for atomicity guarantees. If the condition the callbacks
// wait for has already happened in tipset `ts` // wait for has already happened in tipset `ts`
// //
// If `done` is true, timeout won't be triggered // If `done` is true, timeout won't be triggered
// If `more` is false, no messages will be sent to CalledHandler (RevertHandler // If `more` is false, no messages will be sent to EventHandler (RevertHandler
// may still be called) // may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
type MatchFunc func(msg *types.Message) (bool, error) // Keep track of information for an event handler
type handlerInfo struct {
type callHandler struct {
confidence int confidence int
timeout abi.ChainEpoch timeout abi.ChainEpoch
disabled bool // TODO: GC after gcConfidence reached disabled bool // TODO: GC after gcConfidence reached
handle CalledHandler handle EventHandler
revert RevertHandler revert RevertHandler
} }
// When a change occurs, a queuedEvent is created and put into a queue
// until the required confidence is reached
type queuedEvent struct { type queuedEvent struct {
trigger triggerID trigger triggerID
h abi.ChainEpoch prevH abi.ChainEpoch
msg *types.Message h abi.ChainEpoch
data eventData
called bool called bool
} }
type calledEvents struct { // Manages chain head change events, which may be forward (new tipset added to
// chain) or backward (chain branch discarded in favour of heavier branch)
type hcEvents struct {
cs eventAPI cs eventAPI
tsc *tipSetCache tsc *tipSetCache
ctx context.Context ctx context.Context
gcConfidence uint64 gcConfidence uint64
at abi.ChainEpoch lastTs *types.TipSet
lk sync.Mutex lk sync.Mutex
ctr triggerID ctr triggerID
triggers map[triggerID]*callHandler triggers map[triggerID]*handlerInfo
matchers map[triggerID][]MatchFunc
// maps block heights to events // maps block heights to events
// [triggerH][msgH][event] // [triggerH][msgH][event]
@ -81,30 +88,79 @@ type calledEvents struct {
// [timeoutH+confidence][triggerID]{calls} // [timeoutH+confidence][triggerID]{calls}
timeouts map[abi.ChainEpoch]map[triggerID]int timeouts map[abi.ChainEpoch]map[triggerID]int
messageEvents
watcherEvents
} }
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents {
e := hcEvents{
ctx: ctx,
cs: cs,
tsc: tsc,
gcConfidence: gcConfidence,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerID]*handlerInfo{},
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
}
e.messageEvents = newMessageEvents(ctx, &e, cs)
e.watcherEvents = newWatcherEvents(ctx, &e, cs)
return &e
}
// Called when there is a change to the head with tipsets to be
// reverted / applied
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
e.lk.Lock() e.lk.Lock()
defer e.lk.Unlock() defer e.lk.Unlock()
for _, ts := range rev { for _, ts := range rev {
e.handleReverts(ts) e.handleReverts(ts)
e.at = ts.Height() e.lastTs = ts
} }
for _, ts := range app { for _, ts := range app {
// called triggers // Check if the head change caused any state changes that we were
e.checkNewCalls(ts) // waiting for
for ; e.at <= ts.Height(); e.at++ { stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts)
e.applyWithConfidence(ts, e.at)
// Queue up calls until there have been enough blocks to reach
// confidence on the state changes
for tid, data := range stateChanges {
e.queueForConfidence(tid, data, e.lastTs, ts)
}
// Check if the head change included any new message calls
newCalls, err := e.messageEvents.checkNewCalls(ts)
if err != nil {
return err
}
// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, data := range newCalls {
e.queueForConfidence(tid, data, nil, ts)
}
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts) e.applyTimeouts(ts)
} }
// Update the latest known tipset
e.lastTs = ts
} }
return nil return nil
} }
func (e *calledEvents) handleReverts(ts *types.TipSet) { func (e *hcEvents) handleReverts(ts *types.TipSet) {
reverts, ok := e.revertQueue[ts.Height()] reverts, ok := e.revertQueue[ts.Height()]
if !ok { if !ok {
return // nothing to do return // nothing to do
@ -120,7 +176,7 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
trigger := e.triggers[event.trigger] trigger := e.triggers[event.trigger]
if err := trigger.revert(e.ctx, ts); err != nil { if err := trigger.revert(e.ctx, ts); err != nil {
log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err)
} }
} }
delete(e.confQueue[triggerH], ts.Height()) delete(e.confQueue[triggerH], ts.Height())
@ -128,41 +184,15 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
delete(e.revertQueue, ts.Height()) delete(e.revertQueue, ts.Height())
} }
func (e *calledEvents) checkNewCalls(ts *types.TipSet) { // Queue up events until the chain has reached a height that reflects the
pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here // desired confidence
if err != nil { func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return
}
e.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts
for tid, matchFns := range e.matchers {
var matched bool
for _, matchFn := range matchFns {
ok, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
matched = ok
if matched {
break
}
}
if matched {
e.queueForConfidence(tid, msg, ts)
break
}
}
})
}
func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts *types.TipSet) {
trigger := e.triggers[trigID] trigger := e.triggers[trigID]
prevH := NoHeight
if prevTs != nil {
prevH = prevTs.Height()
}
appliedH := ts.Height() appliedH := ts.Height()
triggerH := appliedH + abi.ChainEpoch(trigger.confidence) triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
@ -175,17 +205,19 @@ func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
trigger: trigID, trigger: trigID,
prevH: prevH,
h: appliedH, h: appliedH,
msg: msg, data: data,
}) })
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
} }
func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { // Apply any events that were waiting for this chain height for confidence
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
byOrigH, ok := e.confQueue[height] byOrigH, ok := e.confQueue[height]
if !ok { if !ok {
return // no triggers at thin height return // no triggers at this height
} }
for origH, events := range byOrigH { for origH, events := range byOrigH {
@ -204,15 +236,20 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo
continue continue
} }
rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts.Key()) // Previous tipset - this is relevant for example in a state change
if err != nil { // from one tipset to another
log.Error(err) var prevTs *types.TipSet
return if event.prevH != NoHeight {
prevTs, err = e.tsc.get(event.prevH)
if err != nil {
log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height)
continue
}
} }
more, err := trigger.handle(event.msg, rec, triggerTs, height) more, err := trigger.handle(event.data, prevTs, triggerTs, height)
if err != nil { if err != nil {
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
continue // don't revert failed calls continue // don't revert failed calls
} }
@ -228,7 +265,8 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo
} }
} }
func (e *calledEvents) applyTimeouts(ts *types.TipSet) { // Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()] triggers, ok := e.timeouts[ts.Height()]
if !ok { if !ok {
return // nothing to do return // nothing to do
@ -258,12 +296,224 @@ func (e *calledEvents) applyTimeouts(ts *types.TipSet) {
} }
} }
func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { // Listen for an event
// - CheckFunc: immediately checks if the event already occurred
// - EventHandler: called when the event has occurred, after confidence tipsets
// - RevertHandler: called if the chain head changes causing the event to revert
// - confidence: wait this many tipsets before calling EventHandler
// - timeout: at this chain height, timeout on waiting for this event
func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
e.lk.Lock()
defer e.lk.Unlock()
// Check if the event has already occurred
ts := e.tsc.best()
done, more, err := check(ts)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
}
if done {
timeout = NoTimeout
}
// Create a trigger for the event
id := e.ctr
e.ctr++
e.triggers[id] = &handlerInfo{
confidence: confidence,
timeout: timeout + abi.ChainEpoch(confidence),
disabled: !more,
handle: hnd,
revert: rev,
}
// If there's a timeout, set up a timeout check at that height
if timeout != NoTimeout {
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
}
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
}
return id, nil
}
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
// to listen for changes
type headChangeAPI interface {
onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
}
// watcherEvents watches for a state change
type watcherEvents struct {
ctx context.Context
cs eventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID]StateMatchFunc
}
func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) watcherEvents {
return watcherEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]StateMatchFunc),
}
}
// Run each of the matchers against the previous and current state to see if
// there's a change
func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData {
we.lk.RLock()
defer we.lk.RUnlock()
res := make(map[triggerID]eventData)
for tid, matchFn := range we.matchers {
ok, data, err := matchFn(oldState, newState)
if err != nil {
log.Errorf("event diff fn failed: %s", err)
continue
}
if ok {
res[tid] = data
}
}
return res
}
// StateChange represents a change in state
type StateChange interface{}
// StateChangeHandler arguments:
// `oldTs` is the state "from" tipset
// `newTs` is the state "to" tipset
// `states` is the change in state
// `curH`-`ts.Height` = `confidence`
type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error)
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
// StateChanged registers a callback which is triggered when a specified state
// change occurs or a timeout is reached.
//
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
// returns two booleans - `done`, and `more`.
//
// * `done` should be true when some on-chain state change we are waiting
// for has happened. When `done` is set to true, timeout trigger is disabled.
//
// * `more` should be false when we don't want to receive new notifications
// through StateChangeHandler. Note that notifications may still be delivered to
// RevertHandler
//
// * `StateChangeHandler` is called when the specified state change was observed
// on-chain, and a confidence threshold was reached, or the specified `timeout`
// height was reached with no state change observed. When this callback is
// invoked on a timeout, `oldState` and `newState` are set to nil.
// This callback returns a boolean specifying whether further notifications
// should be sent, like `more` return param from `CheckFunc` above.
//
// * `RevertHandler` is called after apply handler, when we drop the tipset
// containing the message. The tipset passed as the argument is the tipset
// that is being dropped. Note that the event dropped may be re-applied
// in a different tipset in small amount of time.
//
// * `StateMatchFunc` is called against each tipset state. If there is a match,
// the state change is queued up until the confidence interval has elapsed (and
// `StateChangeHandler` is called)
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
states, ok := data.(StateChange)
if data != nil && !ok {
panic("expected StateChange")
}
return scHnd(prevTs, ts, states, height)
}
id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
we.lk.Lock()
we.matchers[id] = mf
defer we.lk.Unlock()
return nil
}
// messageEvents watches for message calls to actors
type messageEvents struct {
ctx context.Context
cs eventAPI
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID][]MsgMatchFunc
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents {
return messageEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: map[triggerID][]MsgMatchFunc{},
}
}
// Check if there are any new actor calls
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) {
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return nil, err
}
res := make(map[triggerID]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
me.lk.RLock()
defer me.lk.RUnlock()
// TODO: provide receipts
for tid, matchFns := range me.matchers {
var matched bool
for _, matchFn := range matchFns {
ok, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
matched = ok
if matched {
break
}
}
if matched {
res[tid] = msg
break
}
}
})
return res, nil
}
// Get the messages in a tipset
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
seen := map[cid.Cid]struct{}{} seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() { for _, tsb := range ts.Blocks() {
msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil { if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates // this is quite bad, but probably better than missing all the other updates
@ -292,20 +542,27 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
} }
} }
// Called registers a callbacks which are triggered when a specified method is // MsgHandler arguments:
// `ts` is the tipset, in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type MsgMatchFunc func(msg *types.Message) (bool, error)
// Called registers a callback which is triggered when a specified method is
// called on an actor, or a timeout is reached. // called on an actor, or a timeout is reached.
// //
// * `CheckFunc` callback is invoked immediately with a recent tipset, it // * `CheckFunc` callback is invoked immediately with a recent tipset, it
// returns two booleans - `done`, and `more`. // returns two booleans - `done`, and `more`.
// //
// * `done` should be true when some on-chain action we are waiting for has // * `done` should be true when some on-chain action we are waiting for has
// happened. When `done` is set to true, timeout trigger is disabled. // happened. When `done` is set to true, timeout trigger is disabled.
// //
// * `more` should be false when we don't want to receive new notifications // * `more` should be false when we don't want to receive new notifications
// through CalledHandler. Note that notifications may still be delivered to // through MsgHandler. Note that notifications may still be delivered to
// RevertHandler // RevertHandler
// //
// * `CalledHandler` is called when the specified event was observed on-chain, // * `MsgHandler` is called when the specified event was observed on-chain,
// and a confidence threshold was reached, or the specified `timeout` height // and a confidence threshold was reached, or the specified `timeout` height
// was reached with no events observed. When this callback is invoked on a // was reached with no events observed. When this callback is invoked on a
// timeout, `msg` is set to nil. This callback returns a boolean specifying // timeout, `msg` is set to nil. This callback returns a boolean specifying
@ -316,44 +573,38 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
// containing the message. The tipset passed as the argument is the tipset // containing the message. The tipset passed as the argument is the tipset
// that is being dropped. Note that the message dropped may be re-applied // that is being dropped. Note that the message dropped may be re-applied
// in a different tipset in small amount of time. // in a different tipset in small amount of time.
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MatchFunc) error { //
e.lk.Lock() // * `MsgMatchFunc` is called against each message. If there is a match, the
defer e.lk.Unlock() // message is queued up until the confidence interval has elapsed (and
// `MsgHandler` is called)
ts := e.tsc.best() func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
done, more, err := check(ts) hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
if err != nil { msg, ok := data.(*types.Message)
return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) if data != nil && !ok {
} panic("expected msg")
if done {
timeout = NoTimeout
}
id := e.ctr
e.ctr++
e.triggers[id] = &callHandler{
confidence: confidence,
timeout: timeout + abi.ChainEpoch(confidence),
disabled: !more,
handle: hnd,
revert: rev,
}
e.matchers[id] = append(e.matchers[id], mf)
if timeout != NoTimeout {
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
} }
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key())
if err != nil {
return false, err
}
return msgHnd(msg, rec, ts, height)
} }
id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout)
if err != nil {
return err
}
me.lk.Lock()
defer me.lk.Unlock()
me.matchers[id] = append(me.matchers[id], mf)
return nil return nil
} }
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { // Convenience function for checking and matching messages
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage())) func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
} }

View File

@ -1004,8 +1004,6 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
return false, true, nil return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied) require.Equal(t, false, applied)
fmt.Println(msg == nil)
fmt.Println(curH)
applied = true applied = true
return more, nil return more, nil
}, func(_ context.Context, ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
@ -1067,3 +1065,250 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
require.Equal(t, true, applied) require.Equal(t, true, applied)
require.Equal(t, false, reverted) require.Equal(t, false, reverted)
} }
type testStateChange struct {
from string
to string
}
func TestStateChanged(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
more := true
var applied, reverted bool
var appliedData StateChange
var appliedOldTs *types.TipSet
var appliedNewTs *types.TipSet
var appliedH abi.ChainEpoch
var matchData StateChange
confidence := 3
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
appliedData = data
appliedOldTs = oldTs
appliedNewTs = newTs
appliedH = curH
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
if matchData == nil {
return false, matchData, nil
}
d := matchData
matchData = nil
return true, d, nil
})
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create state change (but below confidence threshold)
matchData = testStateChange{from: "a", to: "b"}
fcs.advance(0, 3, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional block so we are above confidence threshold
fcs.advance(0, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// dip below confidence (should not apply again)
fcs.advance(2, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Change happens from 5 -> 6
require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height())
require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height())
// Actually applied (with confidence) at 9
require.Equal(t, abi.ChainEpoch(9), appliedH)
// Make sure the state change was correctly passed through
rcvd := appliedData.(testStateChange)
require.Equal(t, "a", rcvd.from)
require.Equal(t, "b", rcvd.to)
}
func TestStateChangedRevert(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
more := true
var applied, reverted bool
var matchData StateChange
confidence := 1
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
if matchData == nil {
return false, matchData, nil
}
d := matchData
matchData = nil
return true, d, nil
})
require.NoError(t, err)
fcs.advance(0, 2, nil) // H=3
// Make a state change from TS at height 3 to TS at height 4
matchData = testStateChange{from: "a", to: "b"}
fcs.advance(0, 1, nil) // H=4
// Haven't yet reached confidence
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Advance to reach confidence level
fcs.advance(0, 1, nil) // H=5
// Should now have called the handler
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// Advance 3 more TS
fcs.advance(0, 3, nil) // H=8
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Regress but not so far as to cause a revert
fcs.advance(3, 1, nil) // H=6
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Regress back to state where change happened
fcs.advance(3, 1, nil) // H=4
// Expect revert to have happened
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
}
func TestStateChangedTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
called := false
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.True(t, called)
called = false
// with check func reporting done
fcs = &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events = NewEvents(context.Background(), fcs)
err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.False(t, called)
}

View File

@ -0,0 +1,137 @@
package state
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
)
// UserData is the data returned from the DiffFunc
type UserData interface{}
// ChainAPI abstracts out calls made by this class to external APIs
type ChainAPI interface {
apibstore.ChainIO
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
}
// StatePredicates has common predicates for responding to state changes
type StatePredicates struct {
api ChainAPI
cst *cbor.BasicIpldStore
}
func NewStatePredicates(api ChainAPI) *StatePredicates {
return &StatePredicates{
api: api,
cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)),
}
}
// DiffFunc check if there's a change form oldState to newState, and returns
// - changed: was there a change
// - user: user-defined data representing the state change
// - err
type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error)
type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
// OnActorStateChanged calls diffStateFunc when the state changes for the given actor
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc {
return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key())
if err != nil {
return false, nil, err
}
newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key())
if err != nil {
return false, nil, err
}
if oldActor.Head.Equals(newActor.Head) {
return false, nil, nil
}
return diffStateFunc(ctx, oldActor.Head, newActor.Head)
}
}
type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error)
// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc {
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState market.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState market.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffStorageMarketState(ctx, &oldState, &newState)
})
}
type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error)
// OnDealStateChanged calls diffDealStates when the market state changes
func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc {
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
if oldState.States.Equals(newState.States) {
return false, nil, nil
}
oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States)
if err != nil {
return false, nil, err
}
newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States)
if err != nil {
return false, nil, err
}
return diffDealStates(ctx, oldRoot, newRoot)
}
}
// ChangedDeals is a set of changes to deal state
type ChangedDeals map[abi.DealID]DealStateChange
// DealStateChange is a change in deal state from -> to
type DealStateChange struct {
From market.DealState
To market.DealState
}
// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs
func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc {
return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) {
changedDeals := make(ChangedDeals)
for _, dealID := range dealIds {
var oldDeal, newDeal market.DealState
err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal)
if err != nil {
return false, nil, err
}
err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal)
if err != nil {
return false, nil, err
}
if oldDeal != newDeal {
changedDeals[dealID] = DealStateChange{oldDeal, newDeal}
}
}
if len(changedDeals) > 0 {
return true, changedDeals, nil
}
return false, nil, nil
}
}

View File

@ -0,0 +1,177 @@
package state
import (
"context"
"testing"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-hamt-ipld"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var dummyCid cid.Cid
func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}
type mockAPI struct {
ts map[types.TipSetKey]*types.Actor
bs bstore.Blockstore
}
func newMockAPI(bs bstore.Blockstore) *mockAPI {
return &mockAPI{
bs: bs,
ts: make(map[types.TipSetKey]*types.Actor),
}
}
func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
return m.bs.Has(c)
}
func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
blk, err := m.bs.Get(c)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}
return blk.RawData(), nil
}
func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return m.ts[tsk], nil
}
func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) {
m.ts[tsk] = act
}
func TestPredicates(t *testing.T) {
ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs)
oldDeals := map[abi.DealID]*market.DealState{
abi.DealID(1): {
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
SlashEpoch: 0,
},
abi.DealID(2): {
SectorStartEpoch: 4,
LastUpdatedEpoch: 5,
SlashEpoch: 0,
},
}
oldStateC := createMarketState(ctx, t, store, oldDeals)
newDeals := map[abi.DealID]*market.DealState{
abi.DealID(1): {
SectorStartEpoch: 1,
LastUpdatedEpoch: 3,
SlashEpoch: 0,
},
abi.DealID(2): {
SectorStartEpoch: 4,
LastUpdatedEpoch: 6,
SlashEpoch: 6,
},
}
newStateC := createMarketState(ctx, t, store, newDeals)
miner, err := address.NewFromString("t00")
require.NoError(t, err)
oldState, err := mockTipset(miner, 1)
require.NoError(t, err)
newState, err := mockTipset(miner, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldStateC})
api.setActor(newState.Key(), &types.Actor{Head: newStateC})
preds := NewStatePredicates(api)
dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)}
diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds)))
// Diff a state against itself: expect no change
changed, _, err := diffFn(ctx, oldState, oldState)
require.NoError(t, err)
require.False(t, changed)
// Diff old state against new state
changed, val, err := diffFn(ctx, oldState, newState)
require.NoError(t, err)
require.True(t, changed)
changedDeals, ok := val.(ChangedDeals)
require.True(t, ok)
require.Len(t, changedDeals, 2)
require.Contains(t, changedDeals, abi.DealID(1))
require.Contains(t, changedDeals, abi.DealID(2))
deal1 := changedDeals[abi.DealID(1)]
if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 {
t.Fatal("Unexpected change to LastUpdatedEpoch")
}
deal2 := changedDeals[abi.DealID(2)]
if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 {
t.Fatal("Unexpected change to LastUpdatedEpoch")
}
}
func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: miner,
Height: 5,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Timestamp: timestamp,
}})
}
func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
rootCid := createAMT(ctx, t, store, deals)
emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO())
require.NoError(t, err)
emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5)))
require.NoError(t, err)
state := market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
state.States = rootCid
stateC, err := store.Put(ctx, state)
require.NoError(t, err)
return stateC
}
func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
root := amt.NewAMT(store)
for dealID, dealState := range deals {
err := root.Set(ctx, uint64(dealID), dealState)
require.NoError(t, err)
}
rootCid, err := root.Flush(ctx)
require.NoError(t, err)
return rootCid
}

View File

@ -8,11 +8,11 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd CalledHandler) CheckFunc { func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc {
msg := smsg.VMMessage() msg := smsg.VMMessage()
return func(ts *types.TipSet) (done bool, more bool, err error) { return func(ts *types.TipSet) (done bool, more bool, err error) {
fa, err := e.cs.StateGetActor(ctx, msg.From, ts.Key()) fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key())
if err != nil { if err != nil {
return false, true, err return false, true, err
} }
@ -22,7 +22,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca
return false, true, nil return false, true, nil
} }
rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) rec, err := me.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key())
if err != nil { if err != nil {
return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err)
} }
@ -33,7 +33,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca
} }
} }
func (e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc {
return func(msg *types.Message) (bool, error) { return func(msg *types.Message) (bool, error) {
if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) {
return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce) return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)

View File

@ -21,6 +21,7 @@ type ChainMsg interface {
Cid() cid.Cid Cid() cid.Cid
VMMessage() *Message VMMessage() *Message
ToStorageBlock() (block.Block, error) ToStorageBlock() (block.Block, error)
// FIXME: This is the *message* length, this name is misleading.
ChainLength() int ChainLength() int
} }

View File

@ -742,12 +742,7 @@ var stateReadStateCmd = &cli.Command{
return err return err
} }
act, err := api.StateGetActor(ctx, addr, ts.Key()) as, err := api.StateReadState(ctx, addr, ts.Key())
if err != nil {
return err
}
as, err := api.StateReadState(ctx, act, ts.Key())
if err != nil { if err != nil {
return err return err
} }

View File

@ -161,7 +161,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Error(err) log.Error(err)
return return
} }
ast, err := api.StateReadState(ctx, act, genesisTs.Key())
ast, err := api.StateReadState(ctx, addr, genesisTs.Key())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
@ -210,7 +211,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return return
} }
ast, err := api.StateReadState(ctx, &act, pts.Key()) ast, err := api.StateReadState(ctx, addr, pts.Key())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return

View File

@ -0,0 +1,137 @@
# Incoming block validations
This document reviews the code flow that takes place inside the full node after receiving a new block from the GossipSub `/fil/blocks` topic and traces all of its protocol-related validation logic. We do not include validation logic *inside* the VM, the analysis stops at `(*VM).Invoke()`. The `V:` tag explicitly signals validations throughout the text.
## `modules.HandleIncomingBlocks()`
We subscribe to the `/fil/blocks` PubSub topic to receive external blocks from peers in the network and register a block validator that operates at the PubSub (`libp2p` stack) level, validating each PubSub message containing a Filecoin block header.
`V:` PubSub message is a valid CBOR `BlockMsg`.
`V:` Total messages in block are under `BlockMessageLimit`.
`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in the block header (`ValidateMsgMeta()`).
`V:` Miner `Address` in block header is present and corresponds to a public-key address in the current chain state.
`V:` Block signature (`BlockSig`) is present and belongs to the public-key address retrieved for the miner (`CheckBlockSignature()`).
## `sub.HandleIncomingBlocks()`
Assemble a `FullBlock` from the received block header retrieving its Filecoin messages.
`V:` Block messages CIDs can be retrieved from the network and decode into valid CBOR `Message`/`SignedMessage`.
## `(*Syncer).InformNewHead()`
Assemble a `FullTipSet` populated with the single block received earlier.
`V:` `ValidateMsgMeta()` (already done in the topic validator).
`V:` Block's `ParentWeight` is greater than the one from the (first block of the) heaviest tipset.
## `(*Syncer).Sync()`
`(*Syncer).collectHeaders()`: we retrieve all tipsets from the received block down to our chain. Validation now is expanded to *every* block inside these tipsets.
`V`: Beacon entires are ordered by their round number.
`V:` Tipset `Parents` CIDs match the fetched parent tipset through block sync. (This check is not enforced correctly at the moment, see [issue](https://github.com/filecoin-project/lotus/issues/1918).)
## `(*Syncer).ValidateBlock()`
This function contains most of the validation logic grouped in separate closures that run asynchronously, this list does not reflect validation order then.
`V:` Block `Timestamp`:
* Is not bigger than current time plus `AllowableClockDrift`.
* Is not smaller than previous block's `Timestamp` plus `BlockDelay` (including null blocks).
### Messages
We check all the messages contained in one block at a time (`(*Syncer).checkBlockMessages()`).
`V:` The block's `BLSAggregate` matches the aggregate of BLS messages digests and public keys (extracted from the messages `From`).
`V:` Each `secp256k1` message `Signature` is signed with the public key extracted from the message `From`.
`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in block header (similar to `ValidateMsgMeta()` call).
`V:` For each message, in `ValidForBlockInclusion()`:
* Message fields `Version`, `To`, `From`, `Value`, `GasPrice`, and `GasLimit` are correctly defined.
* Message `GasLimit` is under the message minimum gas cost (derived from chain height and message length).
`V:` Actor associated with message `From` exists and is an account actor, its `Nonce` matches the message `Nonce`.
### Miner
`V:` Miner address is registered in the `Claims` HAMT of the Power actor.
### Compute parent tipset state
`V:` Block's `ParentStateRoot` CID matches the state CID computed from the parent tipset.
`V:` Block's `ParentMessageReceipts` CID matches receipts CID computed from the parent tipset.
### Winner
Draw randomness for current epoch with minimum ticket from previous tipset, using `ElectionProofProduction`
domain separation tag.
`V`: `ElectionProof.VRFProof` is computed correctly by checking BLS signature using miner's key.
`V`: Miner is not slashed in `StoragePowerActor`.
`V`: Check if ticket is a winning ticket:
```
h := blake2b(VRFProof)
lhs := AsInt(h) * totalNetworkPower
rhs := minerPower * 2^256
if lhs < rhs { return "Winner" } else { return "Not a winner" }
```
### Block signature
`V:` `CheckBlockSignature()` (same signature validation as the one applied to the incoming block).
### Beacon values check
`V`: Validate that all `BeaconEntries` are valid. Check that every one of them is a signature of a message: `previousSignature || round` signed using drand's public key.
`V`: All entries between `MaxBeaconRoundForEpoch` down to `prevEntry` (from previous tipset) are included.
### Verify VRF Ticket chain
Draw randomness for current epoch with minimum ticket from previous tipset, using `TicketProduction`
domain separation tag.
`V`: `VerifyVRF` using drawn randomness and miner public key.
### Winning PoSt proof
Draw randomness for current epoch with `WinningPoSt` domain separation tag.
Get list of sectors challanged in this epoch for this miner, based on the randomness drawn.
`V`: Use filecoin proofs system to verify that miner prooved access to sealed versions of these sectors.
## `(*StateManager).TipSetState()`
Called throughout the validation process for the parent of each tipset being validated. The checks here then do not apply to the received new head itself that started the validation process.
### `(*StateManager).computeTipSetState()`
`V:` Every block in the tipset should belong to different a miner.
### `(*StateManager).ApplyBlocks()`
We create a new VM with the tipset's `ParentStateRoot` (this is then the parent state of the parent of the tipset currently being validated) on which to apply all messages from all blocks in the tipset. For each message independently we apply the validations listed next.
### `(*VM).ApplyMessage()`
`V:` Basic gas and value checks in `checkMessage()`:
* Message `GasLimit` is bigger than zero.
* Message `GasPrice` and `Value` are set.
`V:` Message storage gas cost is under the message's `GasLimit`.
`V:` Message's `Nonce` matches nonce in actor retrieved from message's `From`.
`V:` Message's maximum gas cost (derived from its `GasLimit`, `GasPrice`, and `Value`) is under the balance of the actor retrieved from message's `From`.
### `(*VM).send()`
`V:` Message's transfer `Value` is under the balance in actor retrieved from message's `From`.

View File

@ -2,7 +2,7 @@
These steps will install the following dependencies: These steps will install the following dependencies:
- go (1.13 or higher) - go (1.14 or higher)
- gcc (7.4.0 or higher) - gcc (7.4.0 or higher)
- git (version 2 or higher) - git (version 2 or higher)
- bzr (some go dependency needs this) - bzr (some go dependency needs this)

View File

@ -6,7 +6,7 @@
These steps will install the following dependencies: These steps will install the following dependencies:
- go (1.13 or higher) - go (1.14 or higher)
- gcc (7.4.0 or higher) - gcc (7.4.0 or higher)
- git (version 2 or higher) - git (version 2 or higher)
- bzr (some go dependency needs this) - bzr (some go dependency needs this)

View File

@ -2,7 +2,7 @@
These steps will install the following dependencies: These steps will install the following dependencies:
- go (1.13 or higher) - go (1.14 or higher)
- gcc (7.4.0 or higher) - gcc (7.4.0 or higher)
- git (version 2 or higher) - git (version 2 or higher)
- bzr (some go dependency needs this) - bzr (some go dependency needs this)
@ -15,29 +15,26 @@ These steps will install the following dependencies:
- llvm (proofs build) - llvm (proofs build)
- clang (proofs build) - clang (proofs build)
Run ### Install dependencies
```sh ```sh
sudo apt update sudo apt update
sudo apt install mesa-opencl-icd ocl-icd-opencl-dev sudo apt install mesa-opencl-icd ocl-icd-opencl-dev gcc git bzr jq pkg-config curl
sudo apt upgrade
``` ```
Build ### Install Go 1.14
```sh Install the latest version of Go by following [the docs on their website](https://golang.org/doc/install).
sudo add-apt-repository ppa:longsleep/golang-backports
sudo apt update
sudo apt install golang-go gcc git bzr jq pkg-config mesa-opencl-icd ocl-icd-opencl-dev
```
Clone ### Clone the Lotus repository
```sh ```sh
git clone https://github.com/filecoin-project/lotus.git git clone https://github.com/filecoin-project/lotus.git
cd lotus/ cd lotus/
``` ```
Install ### Build the Lotus binaries from source and install
```sh ```sh
make clean && make all make clean && make all
@ -45,3 +42,12 @@ sudo make install
``` ```
After installing Lotus, you can run the `lotus` command directly from your CLI to see usage documentation. Next, you can join the [Lotus Testnet](https://docs.lotu.sh/en+join-testnet). After installing Lotus, you can run the `lotus` command directly from your CLI to see usage documentation. Next, you can join the [Lotus Testnet](https://docs.lotu.sh/en+join-testnet).
### Interopnet
If you seek a smaller network to test, you can join the `interopnet`. Please note that this network is meant for developers - it resets much more often, and is much smaller. To join this network, checkout the branch `interopnet` instead of `master` before building and installing;
```
git checkout interopnet
```
Please also note that this documentation (if viewed on the website) might not be up to date with the interopnet. For the latest documentation on the interopnet branch, see the [Lotus Documentation Interopnet Branch on GitHub](https://github.com/filecoin-project/lotus/tree/interopnet/documentation/en)

View File

@ -82,12 +82,12 @@ lotus-storage-miner sectors pledge
Get **miner power** and **sector usage**: Get **miner power** and **sector usage**:
```sh ```sh
lotus-storage-miner state power lotus state power
# returns total power # returns total power
lotus-storage-miner state power <miner> lotus state power <miner>
lotus-storage-miner state sectors <miner> lotus state sectors <miner>
``` ```
## Performance tuning ## Performance tuning

4
go.mod
View File

@ -1,6 +1,6 @@
module github.com/filecoin-project/lotus module github.com/filecoin-project/lotus
go 1.13 go 1.14
require ( require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0 contrib.go.opencensus.io/exporter/jaeger v0.1.0
@ -18,7 +18,7 @@ require (
github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2 github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.3.0 github.com/filecoin-project/go-data-transfer v0.3.0

2
go.sum
View File

@ -225,6 +225,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mo
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 h1:B2gUde2DlfCb5YMYNVems2orobxC3KhrX3migym1IOQ=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=

View File

@ -140,7 +140,7 @@ func (m *Miner) mine(ctx context.Context) {
onDone, err := m.waitFunc(ctx, prebase.TipSet.MinTimestamp()) onDone, err := m.waitFunc(ctx, prebase.TipSet.MinTimestamp())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return continue
} }
base, err := m.GetBestMiningCandidate(ctx) base, err := m.GetBestMiningCandidate(ctx)

View File

@ -302,7 +302,7 @@ func (a *StateAPI) StateAccountKey(ctx context.Context, addr address.Address, ts
return a.StateManager.ResolveToKeyAddress(ctx, addr, ts) return a.StateManager.ResolveToKeyAddress(ctx, addr, ts)
} }
func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
@ -312,6 +312,11 @@ func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk typ
return nil, err return nil, err
} }
act, err := state.GetActor(actor)
if err != nil {
return nil, err
}
blk, err := state.Store.(*cbor.BasicIpldStore).Blocks.Get(act.Head) blk, err := state.Store.(*cbor.BasicIpldStore).Blocks.Get(act.Head)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -76,7 +76,7 @@ func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address)
return err return err
} }
from := account.Address from := account.Address
_, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil)
if err != nil { if err != nil {
return err return err
} }
@ -114,7 +114,7 @@ func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Addre
return nil, err return nil, err
} }
from := account.Address from := account.Address
_, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }