diff --git a/Makefile b/Makefile index 3038e929c..b143793aa 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ all: build unexport GOFLAGS GOVERSION:=$(shell go version | cut -d' ' -f 3 | cut -d. -f 2) -ifeq ($(shell expr $(GOVERSION) \< 13), 1) +ifeq ($(shell expr $(GOVERSION) \< 14), 1) $(warning Your Golang version is go 1.$(GOVERSION)) $(error Update Golang to version $(shell grep '^go' go.mod)) endif diff --git a/api/api_full.go b/api/api_full.go index 6e45073fb..080f0e478 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -206,7 +206,7 @@ type FullNode interface { StateCall(context.Context, *types.Message, types.TipSetKey) (*InvocResult, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) - StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*ActorState, error) + StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error) StateListMessages(ctx context.Context, match *types.Message, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error) StateNetworkName(context.Context) (dtypes.NetworkName, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 9cc3f9f60..f1bbedf76 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -139,7 +139,7 @@ type FullNodeStruct struct { StateCall func(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) `perm:"read"` StateReplay func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) `perm:"read"` StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"` - StateReadState func(context.Context, *types.Actor, types.TipSetKey) (*api.ActorState, error) `perm:"read"` + StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"` StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"` StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"` @@ -619,8 +619,8 @@ func (c *FullNodeStruct) StateGetActor(ctx context.Context, actor address.Addres return c.Internal.StateGetActor(ctx, actor, tsk) } -func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { - return c.Internal.StateReadState(ctx, act, tsk) +func (c *FullNodeStruct) StateReadState(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.ActorState, error) { + return c.Internal.StateReadState(ctx, addr, tsk) } func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, tsk types.TipSetKey) (types.BigInt, error) { diff --git a/chain/events/events.go b/chain/events/events.go index a325b5410..e11507795 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -51,7 +51,7 @@ type Events struct { readyOnce sync.Once heightEvents - calledEvents + *hcEvents } func NewEvents(ctx context.Context, api eventAPI) *Events { @@ -74,18 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - calledEvents: calledEvents{ - cs: api, - tsc: tsc, - ctx: ctx, - gcConfidence: uint64(gcConfidence), - - confQueue: map[triggerH]map[msgH][]*queuedEvent{}, - revertQueue: map[msgH][]triggerH{}, - triggers: map[triggerID]*callHandler{}, - matchers: map[triggerID][]MatchFunc{}, - timeouts: map[abi.ChainEpoch]map[triggerID]int{}, - }, + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) @@ -143,7 +132,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } e.readyOnce.Do(func() { - e.at = cur[0].Val.Height() + e.lastTs = cur[0].Val e.ready.Done() }) @@ -186,5 +175,5 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } - return e.headChangeCalled(rev, app) + return e.processHeadChangeEvent(rev, app) } diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 0bae99404..3d8e05c02 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -13,6 +13,7 @@ import ( ) const NoTimeout = math.MaxInt64 +const NoHeight = abi.ChainEpoch(-1) type triggerID = uint64 @@ -23,54 +24,60 @@ type msgH = abi.ChainEpoch // message (msgH+confidence) type triggerH = abi.ChainEpoch -// CalledHandler arguments: -// `ts` is the tipset, in which the `msg` is included. +type eventData interface{} + +// EventHandler arguments: +// `prevTs` is the previous tipset, eg the "from" tipset for a state change. +// `ts` is the event tipset, eg the tipset in which the `msg` is included. // `curH`-`ts.Height` = `confidence` -type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) +type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) // CheckFunc is used for atomicity guarantees. If the condition the callbacks // wait for has already happened in tipset `ts` // // If `done` is true, timeout won't be triggered -// If `more` is false, no messages will be sent to CalledHandler (RevertHandler +// If `more` is false, no messages will be sent to EventHandler (RevertHandler // may still be called) type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) -type MatchFunc func(msg *types.Message) (bool, error) - -type callHandler struct { +// Keep track of information for an event handler +type handlerInfo struct { confidence int timeout abi.ChainEpoch disabled bool // TODO: GC after gcConfidence reached - handle CalledHandler + handle EventHandler revert RevertHandler } +// When a change occurs, a queuedEvent is created and put into a queue +// until the required confidence is reached type queuedEvent struct { trigger triggerID - h abi.ChainEpoch - msg *types.Message + prevH abi.ChainEpoch + h abi.ChainEpoch + data eventData called bool } -type calledEvents struct { +// Manages chain head change events, which may be forward (new tipset added to +// chain) or backward (chain branch discarded in favour of heavier branch) +type hcEvents struct { cs eventAPI tsc *tipSetCache ctx context.Context gcConfidence uint64 - at abi.ChainEpoch + lastTs *types.TipSet lk sync.Mutex ctr triggerID - triggers map[triggerID]*callHandler - matchers map[triggerID][]MatchFunc + triggers map[triggerID]*handlerInfo // maps block heights to events // [triggerH][msgH][event] @@ -81,30 +88,79 @@ type calledEvents struct { // [timeoutH+confidence][triggerID]{calls} timeouts map[abi.ChainEpoch]map[triggerID]int + + messageEvents + watcherEvents } -func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents { + e := hcEvents{ + ctx: ctx, + cs: cs, + tsc: tsc, + gcConfidence: gcConfidence, + + confQueue: map[triggerH]map[msgH][]*queuedEvent{}, + revertQueue: map[msgH][]triggerH{}, + triggers: map[triggerID]*handlerInfo{}, + timeouts: map[abi.ChainEpoch]map[triggerID]int{}, + } + + e.messageEvents = newMessageEvents(ctx, &e, cs) + e.watcherEvents = newWatcherEvents(ctx, &e, cs) + + return &e +} + +// Called when there is a change to the head with tipsets to be +// reverted / applied +func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { e.lk.Lock() defer e.lk.Unlock() for _, ts := range rev { e.handleReverts(ts) - e.at = ts.Height() + e.lastTs = ts } for _, ts := range app { - // called triggers - e.checkNewCalls(ts) - for ; e.at <= ts.Height(); e.at++ { - e.applyWithConfidence(ts, e.at) + // Check if the head change caused any state changes that we were + // waiting for + stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts) + + // Queue up calls until there have been enough blocks to reach + // confidence on the state changes + for tid, data := range stateChanges { + e.queueForConfidence(tid, data, e.lastTs, ts) + } + + // Check if the head change included any new message calls + newCalls, err := e.messageEvents.checkNewCalls(ts) + if err != nil { + return err + } + + // Queue up calls until there have been enough blocks to reach + // confidence on the message calls + for tid, data := range newCalls { + e.queueForConfidence(tid, data, nil, ts) + } + + for at := e.lastTs.Height(); at <= ts.Height(); at++ { + // Apply any queued events and timeouts that were targeted at the + // current chain height + e.applyWithConfidence(ts, at) e.applyTimeouts(ts) } + + // Update the latest known tipset + e.lastTs = ts } return nil } -func (e *calledEvents) handleReverts(ts *types.TipSet) { +func (e *hcEvents) handleReverts(ts *types.TipSet) { reverts, ok := e.revertQueue[ts.Height()] if !ok { return // nothing to do @@ -120,7 +176,7 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) { trigger := e.triggers[event.trigger] if err := trigger.revert(e.ctx, ts); err != nil { - log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) + log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err) } } delete(e.confQueue[triggerH], ts.Height()) @@ -128,41 +184,15 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) { delete(e.revertQueue, ts.Height()) } -func (e *calledEvents) checkNewCalls(ts *types.TipSet) { - pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here - if err != nil { - log.Errorf("getting parent tipset in checkNewCalls: %s", err) - return - } - - e.messagesForTs(pts, func(msg *types.Message) { - // TODO: provide receipts - for tid, matchFns := range e.matchers { - var matched bool - for _, matchFn := range matchFns { - ok, err := matchFn(msg) - if err != nil { - log.Errorf("event matcher failed: %s", err) - continue - } - matched = ok - - if matched { - break - } - } - - if matched { - e.queueForConfidence(tid, msg, ts) - break - } - } - }) -} - -func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts *types.TipSet) { +// Queue up events until the chain has reached a height that reflects the +// desired confidence +func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) { trigger := e.triggers[trigID] + prevH := NoHeight + if prevTs != nil { + prevH = prevTs.Height() + } appliedH := ts.Height() triggerH := appliedH + abi.ChainEpoch(trigger.confidence) @@ -175,17 +205,19 @@ func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ trigger: trigID, + prevH: prevH, h: appliedH, - msg: msg, + data: data, }) e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) } -func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { +// Apply any events that were waiting for this chain height for confidence +func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { byOrigH, ok := e.confQueue[height] if !ok { - return // no triggers at thin height + return // no triggers at this height } for origH, events := range byOrigH { @@ -204,15 +236,20 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo continue } - rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts.Key()) - if err != nil { - log.Error(err) - return + // Previous tipset - this is relevant for example in a state change + // from one tipset to another + var prevTs *types.TipSet + if event.prevH != NoHeight { + prevTs, err = e.tsc.get(event.prevH) + if err != nil { + log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height) + continue + } } - more, err := trigger.handle(event.msg, rec, triggerTs, height) + more, err := trigger.handle(event.data, prevTs, triggerTs, height) if err != nil { - log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) + log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err) continue // don't revert failed calls } @@ -228,7 +265,8 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo } } -func (e *calledEvents) applyTimeouts(ts *types.TipSet) { +// Apply any timeouts that expire at this height +func (e *hcEvents) applyTimeouts(ts *types.TipSet) { triggers, ok := e.timeouts[ts.Height()] if !ok { return // nothing to do @@ -258,12 +296,224 @@ func (e *calledEvents) applyTimeouts(ts *types.TipSet) { } } -func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { +// Listen for an event +// - CheckFunc: immediately checks if the event already occurred +// - EventHandler: called when the event has occurred, after confidence tipsets +// - RevertHandler: called if the chain head changes causing the event to revert +// - confidence: wait this many tipsets before calling EventHandler +// - timeout: at this chain height, timeout on waiting for this event +func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) { + e.lk.Lock() + defer e.lk.Unlock() + + // Check if the event has already occurred + ts := e.tsc.best() + done, more, err := check(ts) + if err != nil { + return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) + } + if done { + timeout = NoTimeout + } + + // Create a trigger for the event + id := e.ctr + e.ctr++ + + e.triggers[id] = &handlerInfo{ + confidence: confidence, + timeout: timeout + abi.ChainEpoch(confidence), + + disabled: !more, + + handle: hnd, + revert: rev, + } + + // If there's a timeout, set up a timeout check at that height + if timeout != NoTimeout { + if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { + e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} + } + e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + } + + return id, nil +} + +// headChangeAPI is used to allow the composed event APIs to call back to hcEvents +// to listen for changes +type headChangeAPI interface { + onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) +} + +// watcherEvents watches for a state change +type watcherEvents struct { + ctx context.Context + cs eventAPI + hcAPI headChangeAPI + + lk sync.RWMutex + matchers map[triggerID]StateMatchFunc +} + +func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) watcherEvents { + return watcherEvents{ + ctx: ctx, + cs: cs, + hcAPI: hcAPI, + matchers: make(map[triggerID]StateMatchFunc), + } +} + +// Run each of the matchers against the previous and current state to see if +// there's a change +func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData { + we.lk.RLock() + defer we.lk.RUnlock() + + res := make(map[triggerID]eventData) + for tid, matchFn := range we.matchers { + ok, data, err := matchFn(oldState, newState) + if err != nil { + log.Errorf("event diff fn failed: %s", err) + continue + } + + if ok { + res[tid] = data + } + } + return res +} + +// StateChange represents a change in state +type StateChange interface{} + +// StateChangeHandler arguments: +// `oldTs` is the state "from" tipset +// `newTs` is the state "to" tipset +// `states` is the change in state +// `curH`-`ts.Height` = `confidence` +type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error) + +type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error) + +// StateChanged registers a callback which is triggered when a specified state +// change occurs or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain state change we are waiting +// for has happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through StateChangeHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `StateChangeHandler` is called when the specified state change was observed +// on-chain, and a confidence threshold was reached, or the specified `timeout` +// height was reached with no state change observed. When this callback is +// invoked on a timeout, `oldState` and `newState` are set to nil. +// This callback returns a boolean specifying whether further notifications +// should be sent, like `more` return param from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the event dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `StateMatchFunc` is called against each tipset state. If there is a match, +// the state change is queued up until the confidence interval has elapsed (and +// `StateChangeHandler` is called) +func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + states, ok := data.(StateChange) + if data != nil && !ok { + panic("expected StateChange") + } + + return scHnd(prevTs, ts, states, height) + } + + id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + we.lk.Lock() + we.matchers[id] = mf + defer we.lk.Unlock() + + return nil +} + +// messageEvents watches for message calls to actors +type messageEvents struct { + ctx context.Context + cs eventAPI + hcAPI headChangeAPI + + lk sync.RWMutex + matchers map[triggerID][]MsgMatchFunc +} + +func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents { + return messageEvents{ + ctx: ctx, + cs: cs, + hcAPI: hcAPI, + matchers: map[triggerID][]MsgMatchFunc{}, + } +} + +// Check if there are any new actor calls +func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) { + pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here + if err != nil { + log.Errorf("getting parent tipset in checkNewCalls: %s", err) + return nil, err + } + + res := make(map[triggerID]eventData) + me.messagesForTs(pts, func(msg *types.Message) { + me.lk.RLock() + defer me.lk.RUnlock() + // TODO: provide receipts + + for tid, matchFns := range me.matchers { + var matched bool + for _, matchFn := range matchFns { + ok, err := matchFn(msg) + if err != nil { + log.Errorf("event matcher failed: %s", err) + continue + } + matched = ok + + if matched { + break + } + } + + if matched { + res[tid] = msg + break + } + } + }) + + return res, nil +} + +// Get the messages in a tipset +func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { seen := map[cid.Cid]struct{}{} for _, tsb := range ts.Blocks() { - msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) + msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) if err != nil { log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) // this is quite bad, but probably better than missing all the other updates @@ -292,20 +542,27 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa } } -// Called registers a callbacks which are triggered when a specified method is +// MsgHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type MsgMatchFunc func(msg *types.Message) (bool, error) + +// Called registers a callback which is triggered when a specified method is // called on an actor, or a timeout is reached. // // * `CheckFunc` callback is invoked immediately with a recent tipset, it // returns two booleans - `done`, and `more`. // -// * `done` should be true when some on-chain action we are waiting for has -// happened. When `done` is set to true, timeout trigger is disabled. +// * `done` should be true when some on-chain action we are waiting for has +// happened. When `done` is set to true, timeout trigger is disabled. // -// * `more` should be false when we don't want to receive new notifications -// through CalledHandler. Note that notifications may still be delivered to -// RevertHandler +// * `more` should be false when we don't want to receive new notifications +// through MsgHandler. Note that notifications may still be delivered to +// RevertHandler // -// * `CalledHandler` is called when the specified event was observed on-chain, +// * `MsgHandler` is called when the specified event was observed on-chain, // and a confidence threshold was reached, or the specified `timeout` height // was reached with no events observed. When this callback is invoked on a // timeout, `msg` is set to nil. This callback returns a boolean specifying @@ -316,44 +573,38 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa // containing the message. The tipset passed as the argument is the tipset // that is being dropped. Note that the message dropped may be re-applied // in a different tipset in small amount of time. -func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MatchFunc) error { - e.lk.Lock() - defer e.lk.Unlock() - - ts := e.tsc.best() - done, more, err := check(ts) - if err != nil { - return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) - } - if done { - timeout = NoTimeout - } - - id := e.ctr - e.ctr++ - - e.triggers[id] = &callHandler{ - confidence: confidence, - timeout: timeout + abi.ChainEpoch(confidence), - - disabled: !more, - - handle: hnd, - revert: rev, - } - - e.matchers[id] = append(e.matchers[id], mf) - - if timeout != NoTimeout { - if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { - e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} +// +// * `MsgMatchFunc` is called against each message. If there is a match, the +// message is queued up until the confidence interval has elapsed (and +// `MsgHandler` is called) +func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error { + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + msg, ok := data.(*types.Message) + if data != nil && !ok { + panic("expected msg") } - e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + + rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key()) + if err != nil { + return false, err + } + + return msgHnd(msg, rec, ts, height) } + id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + me.lk.Lock() + defer me.lk.Unlock() + me.matchers[id] = append(me.matchers[id], mf) + return nil } -func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { - return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage())) +// Convenience function for checking and matching messages +func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { + return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage())) } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index a048789ec..5798fb75c 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1004,8 +1004,6 @@ func TestRemoveTriggersOnMessage(t *testing.T) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) - fmt.Println(msg == nil) - fmt.Println(curH) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { @@ -1067,3 +1065,250 @@ func TestRemoveTriggersOnMessage(t *testing.T) { require.Equal(t, true, applied) require.Equal(t, false, reverted) } + +type testStateChange struct { + from string + to string +} + +func TestStateChanged(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var appliedData StateChange + var appliedOldTs *types.TipSet + var appliedNewTs *types.TipSet + var appliedH abi.ChainEpoch + var matchData StateChange + + confidence := 3 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + appliedData = data + appliedOldTs = oldTs + appliedNewTs = newTs + appliedH = curH + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + // create few blocks to make sure nothing get's randomly called + + fcs.advance(0, 4, nil) // H=5 + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create state change (but below confidence threshold) + matchData = testStateChange{from: "a", to: "b"} + fcs.advance(0, 3, nil) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create additional block so we are above confidence threshold + + fcs.advance(0, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // dip below confidence (should not apply again) + fcs.advance(2, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Change happens from 5 -> 6 + require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height()) + require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height()) + + // Actually applied (with confidence) at 9 + require.Equal(t, abi.ChainEpoch(9), appliedH) + + // Make sure the state change was correctly passed through + rcvd := appliedData.(testStateChange) + require.Equal(t, "a", rcvd.from) + require.Equal(t, "b", rcvd.to) +} + +func TestStateChangedRevert(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var matchData StateChange + + confidence := 1 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + fcs.advance(0, 2, nil) // H=3 + + // Make a state change from TS at height 3 to TS at height 4 + matchData = testStateChange{from: "a", to: "b"} + fcs.advance(0, 1, nil) // H=4 + + // Haven't yet reached confidence + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Advance to reach confidence level + fcs.advance(0, 1, nil) // H=5 + + // Should now have called the handler + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // Advance 3 more TS + fcs.advance(0, 3, nil) // H=8 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress but not so far as to cause a revert + fcs.advance(3, 1, nil) // H=6 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress back to state where change happened + fcs.advance(3, 1, nil) // H=4 + + // Expect revert to have happened + require.Equal(t, false, applied) + require.Equal(t, true, reverted) +} + +func TestStateChangedTimeout(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + called := false + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.True(t, called) + called = false + + // with check func reporting done + + fcs = &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events = NewEvents(context.Background(), fcs) + + err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return true, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.False(t, called) +} diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go new file mode 100644 index 000000000..3245d5c03 --- /dev/null +++ b/chain/events/state/predicates.go @@ -0,0 +1,137 @@ +package state + +import ( + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" +) + +// UserData is the data returned from the DiffFunc +type UserData interface{} + +// ChainAPI abstracts out calls made by this class to external APIs +type ChainAPI interface { + apibstore.ChainIO + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) +} + +// StatePredicates has common predicates for responding to state changes +type StatePredicates struct { + api ChainAPI + cst *cbor.BasicIpldStore +} + +func NewStatePredicates(api ChainAPI) *StatePredicates { + return &StatePredicates{ + api: api, + cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)), + } +} + +// DiffFunc check if there's a change form oldState to newState, and returns +// - changed: was there a change +// - user: user-defined data representing the state change +// - err +type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) + +type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) + +// OnActorStateChanged calls diffStateFunc when the state changes for the given actor +func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { + return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { + oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) + if err != nil { + return false, nil, err + } + newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + if err != nil { + return false, nil, err + } + + if oldActor.Head.Equals(newActor.Head) { + return false, nil, nil + } + return diffStateFunc(ctx, oldActor.Head, newActor.Head) + } +} + +type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) + +// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor +func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { + return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { + var oldState market.State + if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + return false, nil, err + } + var newState market.State + if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil { + return false, nil, err + } + return diffStorageMarketState(ctx, &oldState, &newState) + }) +} + +type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) + +// OnDealStateChanged calls diffDealStates when the market state changes +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.States.Equals(newState.States) { + return false, nil, nil + } + + oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) + if err != nil { + return false, nil, err + } + newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) + if err != nil { + return false, nil, err + } + + return diffDealStates(ctx, oldRoot, newRoot) + } +} + +// ChangedDeals is a set of changes to deal state +type ChangedDeals map[abi.DealID]DealStateChange + +// DealStateChange is a change in deal state from -> to +type DealStateChange struct { + From market.DealState + To market.DealState +} + +// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { + return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { + changedDeals := make(ChangedDeals) + for _, dealID := range dealIds { + var oldDeal, newDeal market.DealState + err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) + if err != nil { + return false, nil, err + } + err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) + if err != nil { + return false, nil, err + } + if oldDeal != newDeal { + changedDeals[dealID] = DealStateChange{oldDeal, newDeal} + } + } + if len(changedDeals) > 0 { + return true, changedDeals, nil + } + return false, nil, nil + } +} diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go new file mode 100644 index 000000000..1c10209a8 --- /dev/null +++ b/chain/events/state/predicates_test.go @@ -0,0 +1,177 @@ +package state + +import ( + "context" + "testing" + + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-hamt-ipld" + + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +type mockAPI struct { + ts map[types.TipSetKey]*types.Actor + bs bstore.Blockstore +} + +func newMockAPI(bs bstore.Blockstore) *mockAPI { + return &mockAPI{ + bs: bs, + ts: make(map[types.TipSetKey]*types.Actor), + } +} + +func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + return m.bs.Has(c) +} + +func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + blk, err := m.bs.Get(c) + if err != nil { + return nil, xerrors.Errorf("blockstore get: %w", err) + } + + return blk.RawData(), nil +} + +func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + return m.ts[tsk], nil +} + +func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { + m.ts[tsk] = act +} + +func TestPredicates(t *testing.T) { + ctx := context.Background() + + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + + oldDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + SlashEpoch: 0, + }, + } + oldStateC := createMarketState(ctx, t, store, oldDeals) + + newDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 3, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 6, + SlashEpoch: 6, + }, + } + newStateC := createMarketState(ctx, t, store, newDeals) + + miner, err := address.NewFromString("t00") + require.NoError(t, err) + oldState, err := mockTipset(miner, 1) + require.NoError(t, err) + newState, err := mockTipset(miner, 2) + require.NoError(t, err) + + api := newMockAPI(bs) + api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) + api.setActor(newState.Key(), &types.Actor{Head: newStateC}) + + preds := NewStatePredicates(api) + + dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} + diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) + + // Diff a state against itself: expect no change + changed, _, err := diffFn(ctx, oldState, oldState) + require.NoError(t, err) + require.False(t, changed) + + // Diff old state against new state + changed, val, err := diffFn(ctx, oldState, newState) + require.NoError(t, err) + require.True(t, changed) + + changedDeals, ok := val.(ChangedDeals) + require.True(t, ok) + require.Len(t, changedDeals, 2) + require.Contains(t, changedDeals, abi.DealID(1)) + require.Contains(t, changedDeals, abi.DealID(2)) + deal1 := changedDeals[abi.DealID(1)] + if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } + deal2 := changedDeals[abi.DealID(2)] + if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } +} + +func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { + return types.NewTipSet([]*types.BlockHeader{{ + Miner: miner, + Height: 5, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + Timestamp: timestamp, + }}) +} + +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + rootCid := createAMT(ctx, t, store, deals) + + emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) + require.NoError(t, err) + emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5))) + require.NoError(t, err) + state := market.ConstructState(emptyArrayCid, emptyMap, emptyMap) + state.States = rootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} + +func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + root := amt.NewAMT(store) + for dealID, dealState := range deals { + err := root.Set(ctx, uint64(dealID), dealState) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} diff --git a/chain/events/utils.go b/chain/events/utils.go index d525e5368..40556c9ff 100644 --- a/chain/events/utils.go +++ b/chain/events/utils.go @@ -8,11 +8,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd CalledHandler) CheckFunc { +func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc { msg := smsg.VMMessage() return func(ts *types.TipSet) (done bool, more bool, err error) { - fa, err := e.cs.StateGetActor(ctx, msg.From, ts.Key()) + fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key()) if err != nil { return false, true, err } @@ -22,7 +22,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca return false, true, nil } - rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) + rec, err := me.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) if err != nil { return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) } @@ -33,7 +33,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca } } -func (e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { +func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc { return func(msg *types.Message) (bool, error) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce) diff --git a/chain/types/message.go b/chain/types/message.go index 68844d63c..b0d7f885f 100644 --- a/chain/types/message.go +++ b/chain/types/message.go @@ -21,6 +21,7 @@ type ChainMsg interface { Cid() cid.Cid VMMessage() *Message ToStorageBlock() (block.Block, error) + // FIXME: This is the *message* length, this name is misleading. ChainLength() int } diff --git a/cli/state.go b/cli/state.go index 6624a8282..e1471114e 100644 --- a/cli/state.go +++ b/cli/state.go @@ -742,12 +742,7 @@ var stateReadStateCmd = &cli.Command{ return err } - act, err := api.StateGetActor(ctx, addr, ts.Key()) - if err != nil { - return err - } - - as, err := api.StateReadState(ctx, act, ts.Key()) + as, err := api.StateReadState(ctx, addr, ts.Key()) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 88afb647e..059dcf9d6 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -161,7 +161,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Error(err) return } - ast, err := api.StateReadState(ctx, act, genesisTs.Key()) + + ast, err := api.StateReadState(ctx, addr, genesisTs.Key()) if err != nil { log.Error(err) return @@ -210,7 +211,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - ast, err := api.StateReadState(ctx, &act, pts.Key()) + ast, err := api.StateReadState(ctx, addr, pts.Key()) + if err != nil { log.Error(err) return diff --git a/documentation/en/block-validation.md b/documentation/en/block-validation.md new file mode 100644 index 000000000..a5ee49c30 --- /dev/null +++ b/documentation/en/block-validation.md @@ -0,0 +1,137 @@ +# Incoming block validations + +This document reviews the code flow that takes place inside the full node after receiving a new block from the GossipSub `/fil/blocks` topic and traces all of its protocol-related validation logic. We do not include validation logic *inside* the VM, the analysis stops at `(*VM).Invoke()`. The `V:` tag explicitly signals validations throughout the text. + +## `modules.HandleIncomingBlocks()` + +We subscribe to the `/fil/blocks` PubSub topic to receive external blocks from peers in the network and register a block validator that operates at the PubSub (`libp2p` stack) level, validating each PubSub message containing a Filecoin block header. + +`V:` PubSub message is a valid CBOR `BlockMsg`. + +`V:` Total messages in block are under `BlockMessageLimit`. + +`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in the block header (`ValidateMsgMeta()`). + +`V:` Miner `Address` in block header is present and corresponds to a public-key address in the current chain state. + +`V:` Block signature (`BlockSig`) is present and belongs to the public-key address retrieved for the miner (`CheckBlockSignature()`). + +## `sub.HandleIncomingBlocks()` + +Assemble a `FullBlock` from the received block header retrieving its Filecoin messages. + +`V:` Block messages CIDs can be retrieved from the network and decode into valid CBOR `Message`/`SignedMessage`. + +## `(*Syncer).InformNewHead()` + +Assemble a `FullTipSet` populated with the single block received earlier. + +`V:` `ValidateMsgMeta()` (already done in the topic validator). + +`V:` Block's `ParentWeight` is greater than the one from the (first block of the) heaviest tipset. + +## `(*Syncer).Sync()` + +`(*Syncer).collectHeaders()`: we retrieve all tipsets from the received block down to our chain. Validation now is expanded to *every* block inside these tipsets. + +`V`: Beacon entires are ordered by their round number. + +`V:` Tipset `Parents` CIDs match the fetched parent tipset through block sync. (This check is not enforced correctly at the moment, see [issue](https://github.com/filecoin-project/lotus/issues/1918).) + +## `(*Syncer).ValidateBlock()` + +This function contains most of the validation logic grouped in separate closures that run asynchronously, this list does not reflect validation order then. + +`V:` Block `Timestamp`: + * Is not bigger than current time plus `AllowableClockDrift`. + * Is not smaller than previous block's `Timestamp` plus `BlockDelay` (including null blocks). + +### Messages + +We check all the messages contained in one block at a time (`(*Syncer).checkBlockMessages()`). + +`V:` The block's `BLSAggregate` matches the aggregate of BLS messages digests and public keys (extracted from the messages `From`). + +`V:` Each `secp256k1` message `Signature` is signed with the public key extracted from the message `From`. + +`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in block header (similar to `ValidateMsgMeta()` call). + +`V:` For each message, in `ValidForBlockInclusion()`: +* Message fields `Version`, `To`, `From`, `Value`, `GasPrice`, and `GasLimit` are correctly defined. +* Message `GasLimit` is under the message minimum gas cost (derived from chain height and message length). + +`V:` Actor associated with message `From` exists and is an account actor, its `Nonce` matches the message `Nonce`. + +### Miner + +`V:` Miner address is registered in the `Claims` HAMT of the Power actor. + +### Compute parent tipset state + +`V:` Block's `ParentStateRoot` CID matches the state CID computed from the parent tipset. + +`V:` Block's `ParentMessageReceipts` CID matches receipts CID computed from the parent tipset. + +### Winner + +Draw randomness for current epoch with minimum ticket from previous tipset, using `ElectionProofProduction` +domain separation tag. +`V`: `ElectionProof.VRFProof` is computed correctly by checking BLS signature using miner's key. +`V`: Miner is not slashed in `StoragePowerActor`. +`V`: Check if ticket is a winning ticket: +``` +h := blake2b(VRFProof) +lhs := AsInt(h) * totalNetworkPower +rhs := minerPower * 2^256 +if lhs < rhs { return "Winner" } else { return "Not a winner" } +``` + +### Block signature + +`V:` `CheckBlockSignature()` (same signature validation as the one applied to the incoming block). + +### Beacon values check + +`V`: Validate that all `BeaconEntries` are valid. Check that every one of them is a signature of a message: `previousSignature || round` signed using drand's public key. +`V`: All entries between `MaxBeaconRoundForEpoch` down to `prevEntry` (from previous tipset) are included. + +### Verify VRF Ticket chain + +Draw randomness for current epoch with minimum ticket from previous tipset, using `TicketProduction` +domain separation tag. +`V`: `VerifyVRF` using drawn randomness and miner public key. + +### Winning PoSt proof + +Draw randomness for current epoch with `WinningPoSt` domain separation tag. +Get list of sectors challanged in this epoch for this miner, based on the randomness drawn. + +`V`: Use filecoin proofs system to verify that miner prooved access to sealed versions of these sectors. + +## `(*StateManager).TipSetState()` + +Called throughout the validation process for the parent of each tipset being validated. The checks here then do not apply to the received new head itself that started the validation process. + +### `(*StateManager).computeTipSetState()` + +`V:` Every block in the tipset should belong to different a miner. + +### `(*StateManager).ApplyBlocks()` + +We create a new VM with the tipset's `ParentStateRoot` (this is then the parent state of the parent of the tipset currently being validated) on which to apply all messages from all blocks in the tipset. For each message independently we apply the validations listed next. + +### `(*VM).ApplyMessage()` + +`V:` Basic gas and value checks in `checkMessage()`: +* Message `GasLimit` is bigger than zero. +* Message `GasPrice` and `Value` are set. + +`V:` Message storage gas cost is under the message's `GasLimit`. + +`V:` Message's `Nonce` matches nonce in actor retrieved from message's `From`. + +`V:` Message's maximum gas cost (derived from its `GasLimit`, `GasPrice`, and `Value`) is under the balance of the actor retrieved from message's `From`. + +### `(*VM).send()` + +`V:` Message's transfer `Value` is under the balance in actor retrieved from message's `From`. diff --git a/documentation/en/install-lotus-arch.md b/documentation/en/install-lotus-arch.md index 4927d063b..e5131424b 100644 --- a/documentation/en/install-lotus-arch.md +++ b/documentation/en/install-lotus-arch.md @@ -2,7 +2,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) diff --git a/documentation/en/install-lotus-fedora.md b/documentation/en/install-lotus-fedora.md index 9f5864496..8473ef88b 100644 --- a/documentation/en/install-lotus-fedora.md +++ b/documentation/en/install-lotus-fedora.md @@ -6,7 +6,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) diff --git a/documentation/en/install-lotus-ubuntu.md b/documentation/en/install-lotus-ubuntu.md index 236033794..a72f56a06 100644 --- a/documentation/en/install-lotus-ubuntu.md +++ b/documentation/en/install-lotus-ubuntu.md @@ -2,7 +2,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) @@ -15,29 +15,26 @@ These steps will install the following dependencies: - llvm (proofs build) - clang (proofs build) -Run +### Install dependencies ```sh sudo apt update -sudo apt install mesa-opencl-icd ocl-icd-opencl-dev +sudo apt install mesa-opencl-icd ocl-icd-opencl-dev gcc git bzr jq pkg-config curl +sudo apt upgrade ``` -Build +### Install Go 1.14 -```sh -sudo add-apt-repository ppa:longsleep/golang-backports -sudo apt update -sudo apt install golang-go gcc git bzr jq pkg-config mesa-opencl-icd ocl-icd-opencl-dev -``` +Install the latest version of Go by following [the docs on their website](https://golang.org/doc/install). -Clone +### Clone the Lotus repository ```sh git clone https://github.com/filecoin-project/lotus.git cd lotus/ ``` -Install +### Build the Lotus binaries from source and install ```sh make clean && make all @@ -45,3 +42,12 @@ sudo make install ``` After installing Lotus, you can run the `lotus` command directly from your CLI to see usage documentation. Next, you can join the [Lotus Testnet](https://docs.lotu.sh/en+join-testnet). + +### Interopnet + +If you seek a smaller network to test, you can join the `interopnet`. Please note that this network is meant for developers - it resets much more often, and is much smaller. To join this network, checkout the branch `interopnet` instead of `master` before building and installing; +``` +git checkout interopnet +``` + +Please also note that this documentation (if viewed on the website) might not be up to date with the interopnet. For the latest documentation on the interopnet branch, see the [Lotus Documentation Interopnet Branch on GitHub](https://github.com/filecoin-project/lotus/tree/interopnet/documentation/en) diff --git a/documentation/en/mining.md b/documentation/en/mining.md index 80c81aedd..3b1f5a8a3 100644 --- a/documentation/en/mining.md +++ b/documentation/en/mining.md @@ -82,12 +82,12 @@ lotus-storage-miner sectors pledge Get **miner power** and **sector usage**: ```sh -lotus-storage-miner state power +lotus state power # returns total power -lotus-storage-miner state power +lotus state power -lotus-storage-miner state sectors +lotus state sectors ``` ## Performance tuning diff --git a/go.mod b/go.mod index 772054470..f6975a1bb 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/filecoin-project/lotus -go 1.13 +go 1.14 require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 @@ -18,7 +18,7 @@ require ( github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2 - github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e + github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.3.0 diff --git a/go.sum b/go.sum index 3eeef99ee..bbcd751d9 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mo github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 h1:B2gUde2DlfCb5YMYNVems2orobxC3KhrX3migym1IOQ= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= diff --git a/miner/miner.go b/miner/miner.go index 1f5f8dad5..d42778e3b 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -140,7 +140,7 @@ func (m *Miner) mine(ctx context.Context) { onDone, err := m.waitFunc(ctx, prebase.TipSet.MinTimestamp()) if err != nil { log.Error(err) - return + continue } base, err := m.GetBestMiningCandidate(ctx) diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 5bb27d223..047eb7d01 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -302,7 +302,7 @@ func (a *StateAPI) StateAccountKey(ctx context.Context, addr address.Address, ts return a.StateManager.ResolveToKeyAddress(ctx, addr, ts) } -func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { +func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) @@ -312,6 +312,11 @@ func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk typ return nil, err } + act, err := state.GetActor(actor) + if err != nil { + return nil, err + } + blk, err := state.Store.(*cbor.BasicIpldStore).Blocks.Get(act.Head) if err != nil { return nil, err diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 597663a94..763c448f9 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -76,7 +76,7 @@ func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) return err } from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) if err != nil { return err } @@ -114,7 +114,7 @@ func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Addre return nil, err } from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) if err != nil { return nil, err }