From d4b230c1372193902ea33318bd9538479b9ef15f Mon Sep 17 00:00:00 2001 From: Rob Quist Date: Wed, 10 Jun 2020 10:24:38 +0200 Subject: [PATCH 01/25] Update Ubuntu installation documentation + bump required go version to 1.14 --- documentation/en/install-lotus-ubuntu.md | 32 +++++++++++++++--------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/documentation/en/install-lotus-ubuntu.md b/documentation/en/install-lotus-ubuntu.md index 236033794..b458113b3 100644 --- a/documentation/en/install-lotus-ubuntu.md +++ b/documentation/en/install-lotus-ubuntu.md @@ -2,7 +2,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) @@ -15,29 +15,28 @@ These steps will install the following dependencies: - llvm (proofs build) - clang (proofs build) -Run - -```sh -sudo apt update -sudo apt install mesa-opencl-icd ocl-icd-opencl-dev -``` - -Build +Install dependencies ```sh sudo add-apt-repository ppa:longsleep/golang-backports sudo apt update -sudo apt install golang-go gcc git bzr jq pkg-config mesa-opencl-icd ocl-icd-opencl-dev +sudo apt install mesa-opencl-icd ocl-icd-opencl-dev gcc git bzr jq pkg-config golang-go ``` -Clone +Install Rust +_(this is an interactive installer)_ +```sh +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh +``` + +Clone the Lotus repository ```sh git clone https://github.com/filecoin-project/lotus.git cd lotus/ ``` -Install +Build the Lotus binaries from source and install ```sh make clean && make all @@ -45,3 +44,12 @@ sudo make install ``` After installing Lotus, you can run the `lotus` command directly from your CLI to see usage documentation. Next, you can join the [Lotus Testnet](https://docs.lotu.sh/en+join-testnet). + +### Interopnet + +If you seek a smaller network to test, you can join the `interopnet`. Please note that this network is meant for developers - it resets much more often, and is much smaller. To join this network, checkout the branch `interopnet` instead of `master` before building and installing; +``` +git checkout interopnet +``` + +Please also note that this documentation (if viewed on the website) might not be up to date with the interopnet. For the latest documentation on the interopnet branch, see the [Lotus Documentation Interopnet Branch on GitHub](https://github.com/filecoin-project/lotus/tree/interopnet/documentation/en) From 159a1a4645afce1ba6aaa8da58b8a4abdf25a6c2 Mon Sep 17 00:00:00 2001 From: RobQuistNL Date: Wed, 10 Jun 2020 10:29:57 +0200 Subject: [PATCH 02/25] Update other docs + bump go.mod version --- documentation/en/install-lotus-arch.md | 2 +- documentation/en/install-lotus-fedora.md | 2 +- go.mod | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/documentation/en/install-lotus-arch.md b/documentation/en/install-lotus-arch.md index 4927d063b..e5131424b 100644 --- a/documentation/en/install-lotus-arch.md +++ b/documentation/en/install-lotus-arch.md @@ -2,7 +2,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) diff --git a/documentation/en/install-lotus-fedora.md b/documentation/en/install-lotus-fedora.md index 9f5864496..8473ef88b 100644 --- a/documentation/en/install-lotus-fedora.md +++ b/documentation/en/install-lotus-fedora.md @@ -6,7 +6,7 @@ These steps will install the following dependencies: -- go (1.13 or higher) +- go (1.14 or higher) - gcc (7.4.0 or higher) - git (version 2 or higher) - bzr (some go dependency needs this) diff --git a/go.mod b/go.mod index 21fea095f..e44543c87 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/filecoin-project/lotus -go 1.13 +go 1.14 require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 From d670257402f91e7c3bd3d87ddd06439b4f783ee6 Mon Sep 17 00:00:00 2001 From: Rob Quist Date: Fri, 12 Jun 2020 17:30:43 +0200 Subject: [PATCH 03/25] Add back installation of Go and Rust --- documentation/en/install-lotus-ubuntu.md | 27 ++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/documentation/en/install-lotus-ubuntu.md b/documentation/en/install-lotus-ubuntu.md index b458113b3..dda4fc84c 100644 --- a/documentation/en/install-lotus-ubuntu.md +++ b/documentation/en/install-lotus-ubuntu.md @@ -15,28 +15,43 @@ These steps will install the following dependencies: - llvm (proofs build) - clang (proofs build) -Install dependencies +### Install dependencies ```sh -sudo add-apt-repository ppa:longsleep/golang-backports sudo apt update -sudo apt install mesa-opencl-icd ocl-icd-opencl-dev gcc git bzr jq pkg-config golang-go +sudo apt install mesa-opencl-icd ocl-icd-opencl-dev gcc git bzr jq pkg-config curl +sudo apt upgrade ``` -Install Rust +### Install Go 1.14 + +Find the latest version of Go [on their website](https://golang.org/dl/) and follow the installation instructions. At the time of writing this document, thats 1.14.4. Extract it to `/usr/local`, and add the go binaries to your `$PATH`. + +```sh +wget -c https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/local +echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.profile +source ~/.profile +``` + +Verify your go installation by running +```sh +go version +``` + +### Install Rust _(this is an interactive installer)_ ```sh curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh ``` -Clone the Lotus repository +### Clone the Lotus repository ```sh git clone https://github.com/filecoin-project/lotus.git cd lotus/ ``` -Build the Lotus binaries from source and install +### Build the Lotus binaries from source and install ```sh make clean && make all From 9077dea0cd061b920c47c1d20bb339db6e64ce87 Mon Sep 17 00:00:00 2001 From: Rob Quist Date: Fri, 12 Jun 2020 17:40:47 +0200 Subject: [PATCH 04/25] Update install-lotus-ubuntu.md --- documentation/en/install-lotus-ubuntu.md | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/en/install-lotus-ubuntu.md b/documentation/en/install-lotus-ubuntu.md index dda4fc84c..4d5ba77f9 100644 --- a/documentation/en/install-lotus-ubuntu.md +++ b/documentation/en/install-lotus-ubuntu.md @@ -42,6 +42,7 @@ go version _(this is an interactive installer)_ ```sh curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh +source ~/.profile ``` ### Clone the Lotus repository From 030472d61c52376355ce5741cb3208109e103d43 Mon Sep 17 00:00:00 2001 From: Rob Quist Date: Tue, 16 Jun 2020 15:30:41 +0200 Subject: [PATCH 05/25] Remove Rust installation and remove Go installation guide --- documentation/en/install-lotus-ubuntu.md | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/documentation/en/install-lotus-ubuntu.md b/documentation/en/install-lotus-ubuntu.md index 4d5ba77f9..a72f56a06 100644 --- a/documentation/en/install-lotus-ubuntu.md +++ b/documentation/en/install-lotus-ubuntu.md @@ -25,25 +25,7 @@ sudo apt upgrade ### Install Go 1.14 -Find the latest version of Go [on their website](https://golang.org/dl/) and follow the installation instructions. At the time of writing this document, thats 1.14.4. Extract it to `/usr/local`, and add the go binaries to your `$PATH`. - -```sh -wget -c https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/local -echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.profile -source ~/.profile -``` - -Verify your go installation by running -```sh -go version -``` - -### Install Rust -_(this is an interactive installer)_ -```sh -curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -source ~/.profile -``` +Install the latest version of Go by following [the docs on their website](https://golang.org/doc/install). ### Clone the Lotus repository From 7f932b96ae7a23178d3a7b540a816295d7946f5f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 22 Jun 2020 14:38:40 -0700 Subject: [PATCH 06/25] WIP --- chain/events/state/predicates.go | 101 +++++++++++++++++++++++++++++++ chain/events/state/state.go | 57 +++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 chain/events/state/predicates.go create mode 100644 chain/events/state/state.go diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go new file mode 100644 index 000000000..e9d9a7393 --- /dev/null +++ b/chain/events/state/predicates.go @@ -0,0 +1,101 @@ +package state + +import ( + "context" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" +) + +type StatePredicates struct { + sm *stmgr.StateManager +} + +func NewStatePredicates(sm *stmgr.StateManager) *StatePredicates { + return &StatePredicates{ + sm: sm, + } +} + +type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) + +func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { + return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { + oldActor, err := sp.sm.GetActor(addr, oldState) + if err != nil { + return false, nil, err + } + newActor, err := sp.sm.GetActor(addr, newState) + if oldActor.Head.Equals(newActor.Head) { + return false, nil, nil + } + return diffStateFunc(ctx, oldActor.Head, newActor.Head) + } +} + +type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) + +func (sp *StatePredicates) OnStorageMarketActorChanged(addr address.Address, diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { + return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { + var oldState market.State + cst := cbor.NewCborStore(sp.sm.ChainStore().Blockstore()) + if err := cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + return false, nil, err + } + var newState market.State + if err := cst.Get(ctx, newActorStateHead, &newActorStateHead); err != nil { + return false, nil, err + } + return diffStorageMarketState(ctx, &oldState, &newState) + }) +} + +type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) + +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.States.Equals(newState.States) { + return false, nil, nil + } + blks := cbor.NewCborStore(sp.sm.ChainStore().Blockstore()) + oldRoot, err := amt.LoadAMT(ctx, blks, oldState.States) + if err != nil { + return false, nil, err + } + newRoot, err := amt.LoadAMT(ctx, blks, newState.States) + if err != nil { + return false, nil, err + } + return diffDealStates(ctx, oldRoot, newRoot) + } +} + +func (sp *StatePredicates) DealStateChangedForIDs(ctx context.Context, dealIds []abi.DealID, oldRoot, newRoot *amt.Root) (changed bool, user UserData, err error) { + var changedDeals []abi.DealID + for _, dealId := range dealIds { + var oldDeal, newDeal market.DealState + err := oldRoot.Get(ctx, uint64(dealId), &oldDeal) + if err != nil { + return false, nil, err + } + err = newRoot.Get(ctx, uint64(dealId), &newDeal) + if err != nil { + return false, nil, err + } + if oldDeal != newDeal { + changedDeals = append(changedDeals, dealId) + } + } + if len(changedDeals) > 0 { + return true, changed, nil + } + return false, nil, nil +} diff --git a/chain/events/state/state.go b/chain/events/state/state.go new file mode 100644 index 000000000..75041ad79 --- /dev/null +++ b/chain/events/state/state.go @@ -0,0 +1,57 @@ +package state + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" +) + +type StateWatcher struct { +} + +type WatcherAPI interface { + ChainNotify(context.Context) (<-chan []*api.HeadChange, error) +} + +type UserData interface{} + +type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) + +type Callback func(ctx context.Context, oldState, newState *types.TipSet, events interface{}) + +type RevertHandler func(ctx context.Context, ts *types.TipSet) error + +/* +w := NewWatcher(api, OnActorChange(t04, OnDealStateChange(123)), cb) +*/ +func NewWatcher(ctx context.Context, api WatcherAPI, d DiffFunc, apply Callback, revert RevertHandler, confidence, timeout abi.ChainEpoch) { + go func() { + notifs, err := api.ChainNotify(ctx) + if err != nil { + // bad + return + } + + curTs := (<-notifs)[0].Val + d(ctx, curTs, curTs) + + for { + select { + case update := <-notifs: + for i, change := range update { + switch change.Type { + case store.HCApply: + d(ctx, curTs, change.Val) + case store.HCRevert: + + } + } + } + } + + }() +} From dd490220d7e91e450a49af702424c13e6edafb68 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 24 Jun 2020 14:10:52 -0400 Subject: [PATCH 07/25] refactor: Extract message-specific code from calledEvents This allows us to create a general purpose head change events manager that can be used for call events and also for state change events. --- chain/events/events.go | 19 +- chain/events/events_hc.go | 589 ++++++++++++++++++++++++++++++++++++++ chain/events/utils.go | 8 +- 3 files changed, 597 insertions(+), 19 deletions(-) create mode 100644 chain/events/events_hc.go diff --git a/chain/events/events.go b/chain/events/events.go index a325b5410..408c8845e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -51,7 +51,7 @@ type Events struct { readyOnce sync.Once heightEvents - calledEvents + hcEvents } func NewEvents(ctx context.Context, api eventAPI) *Events { @@ -74,18 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - calledEvents: calledEvents{ - cs: api, - tsc: tsc, - ctx: ctx, - gcConfidence: uint64(gcConfidence), - - confQueue: map[triggerH]map[msgH][]*queuedEvent{}, - revertQueue: map[msgH][]triggerH{}, - triggers: map[triggerID]*callHandler{}, - matchers: map[triggerID][]MatchFunc{}, - timeouts: map[abi.ChainEpoch]map[triggerID]int{}, - }, + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) @@ -143,7 +132,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } e.readyOnce.Do(func() { - e.at = cur[0].Val.Height() + e.lastTs = cur[0].Val e.ready.Done() }) @@ -186,5 +175,5 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } - return e.headChangeCalled(rev, app) + return e.processHeadChangeEvent(rev, app) } diff --git a/chain/events/events_hc.go b/chain/events/events_hc.go new file mode 100644 index 000000000..ae42fd85f --- /dev/null +++ b/chain/events/events_hc.go @@ -0,0 +1,589 @@ +package events + +import ( + "context" + "math" + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/types" +) + +const NoTimeout = math.MaxInt64 + +type triggerID = uint64 + +// msgH is the block height at which a message was present / event has happened +type msgH = abi.ChainEpoch + +// triggerH is the block height at which the listener will be notified about the +// message (msgH+confidence) +type triggerH = abi.ChainEpoch + +type eventData interface{} + +// EventHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type EventHandler func(data eventData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +// CheckFunc is used for atomicity guarantees. If the condition the callbacks +// wait for has already happened in tipset `ts` +// +// If `done` is true, timeout won't be triggered +// If `more` is false, no messages will be sent to EventHandler (RevertHandler +// may still be called) +type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) + +// Keep track of information for an event handler +type handlerInfo struct { + confidence int + timeout abi.ChainEpoch + + disabled bool // TODO: GC after gcConfidence reached + + handle EventHandler + revert RevertHandler +} + +// When a change occurs, a queuedEvent is created and put into a queue +// until the required confidence is reached +type queuedEvent struct { + trigger triggerID + + h abi.ChainEpoch + data eventData + + called bool +} + +// Manages chain head change events, which may be forward (new tipset added to +// chain) or backward (chain branch discarded in favour of heavier branch) +type hcEvents struct { + cs eventAPI + tsc *tipSetCache + ctx context.Context + gcConfidence uint64 + + lastTs *types.TipSet + + lk sync.Mutex + + ctr triggerID + + triggers map[triggerID]*handlerInfo + + // maps block heights to events + // [triggerH][msgH][event] + confQueue map[triggerH]map[msgH][]*queuedEvent + + // [msgH][triggerH] + revertQueue map[msgH][]triggerH + + // [timeoutH+confidence][triggerID]{calls} + timeouts map[abi.ChainEpoch]map[triggerID]int + + messageEvents + watcherEvents +} + +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) hcEvents { + e := hcEvents{ + ctx: ctx, + cs: cs, + tsc: tsc, + gcConfidence: gcConfidence, + + confQueue: map[triggerH]map[msgH][]*queuedEvent{}, + revertQueue: map[msgH][]triggerH{}, + triggers: map[triggerID]*handlerInfo{}, + timeouts: map[abi.ChainEpoch]map[triggerID]int{}, + } + + e.messageEvents = newMessageEvents(ctx, &e, cs) + e.watcherEvents = newWatcherEvents(ctx, &e, cs) + + return e +} + +// Called when there is a change to the head with tipsets to be +// reverted / applied +func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { + for _, ts := range rev { + e.handleReverts(ts) + e.lastTs = ts + } + + for _, ts := range app { + // Check if the head change caused any state changes that we were + // waiting for + stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts) + + // Queue up calls until there have been enough blocks to reach + // confidence on the state changes + for tid, data := range stateChanges { + e.queueForConfidence(tid, data, ts) + } + + // Check if the head change included any new message calls + newCalls, err := e.messageEvents.checkNewCalls(ts) + if err != nil { + return err + } + + // Queue up calls until there have been enough blocks to reach + // confidence on the message calls + for tid, data := range newCalls { + e.queueForConfidence(tid, data, ts) + } + + for at := e.lastTs.Height(); at <= ts.Height(); at++ { + // Apply any events and timeouts that were queued up until the + // current chain height + e.applyWithConfidence(ts, at) + e.applyTimeouts(ts) + } + + // Update the latest known tipset + e.lastTs = ts + } + + return nil +} + +func (e *hcEvents) handleReverts(ts *types.TipSet) { + reverts, ok := e.revertQueue[ts.Height()] + if !ok { + return // nothing to do + } + + for _, triggerH := range reverts { + toRevert := e.confQueue[triggerH][ts.Height()] + for _, event := range toRevert { + if !event.called { + continue // event wasn't apply()-ied yet + } + + trigger := e.triggers[event.trigger] + + if err := trigger.revert(e.ctx, ts); err != nil { + log.Errorf("reverting chain trigger failed: %s", err) + // log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) + } + } + delete(e.confQueue[triggerH], ts.Height()) + } + delete(e.revertQueue, ts.Height()) +} + +// Queue up events until the chain has reached a height that reflects the +// desired confidence +func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.TipSet) { + trigger := e.triggers[trigID] + + appliedH := ts.Height() + + triggerH := appliedH + abi.ChainEpoch(trigger.confidence) + + byOrigH, ok := e.confQueue[triggerH] + if !ok { + byOrigH = map[abi.ChainEpoch][]*queuedEvent{} + e.confQueue[triggerH] = byOrigH + } + + byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ + trigger: trigID, + h: appliedH, + data: data, + }) + + e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) +} + +// Apply any events that were waiting for this chain height for confidence +func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { + byOrigH, ok := e.confQueue[height] + if !ok { + return // no triggers at thin height + } + + for origH, events := range byOrigH { + triggerTs, err := e.tsc.get(origH) + if err != nil { + log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height) + } + + for _, event := range events { + if event.called { + continue + } + + trigger := e.triggers[event.trigger] + if trigger.disabled { + continue + } + + more, err := trigger.handle(event.data, triggerTs, height) + if err != nil { + log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", origH, height, err) + // log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) + continue // don't revert failed calls + } + + event.called = true + + touts, ok := e.timeouts[trigger.timeout] + if ok { + touts[event.trigger]++ + } + + trigger.disabled = !more + } + } +} + +// Apply any timeouts that expire at this height +func (e *hcEvents) applyTimeouts(ts *types.TipSet) { + triggers, ok := e.timeouts[ts.Height()] + if !ok { + return // nothing to do + } + + for triggerID, calls := range triggers { + if calls > 0 { + continue // don't timeout if the method was called + } + trigger := e.triggers[triggerID] + if trigger.disabled { + continue + } + + timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence)) + if err != nil { + log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) + } + + // more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) + more, err := trigger.handle(nil, timeoutTs, ts.Height()) + if err != nil { + log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) + continue // don't revert failed calls + } + + trigger.disabled = !more // allows messages after timeout + } +} + +// Listen for an event +// - CheckFunc: immediately checks if the event already occured +// - EventHandler: called when the event has occurred, after confidence tipsets +// - RevertHandler: called if the chain head changes causing the event to revert +// - confidence: wait this many tipsets before calling EventHandler +// - timeout: at this chain height, timeout on waiting for this event +func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) { + e.lk.Lock() + defer e.lk.Unlock() + + // Check if the even has already occurred + ts := e.tsc.best() + done, more, err := check(ts) + if err != nil { + return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) + } + if done { + timeout = NoTimeout + } + + // Create a trigger for the event + id := e.ctr + e.ctr++ + + e.triggers[id] = &handlerInfo{ + confidence: confidence, + timeout: timeout + abi.ChainEpoch(confidence), + + disabled: !more, + + handle: hnd, + revert: rev, + } + + // If there's a timeout, set up a timeout check at that height + if timeout != NoTimeout { + if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { + e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} + } + e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + } + + return id, nil +} + +// headChangeAPI is used to allow the composed event APIs to call back to hcEvents +// to listen for changes +type headChangeAPI interface { + onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) +} + +// watcherEvents watches for a state change +type watcherEvents struct { + ctx context.Context + cs eventAPI + hcApi headChangeAPI + + lk sync.RWMutex + matchers map[triggerID]StateMatchFunc +} + +func newWatcherEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) watcherEvents { + return watcherEvents{ + ctx: ctx, + cs: cs, + hcApi: hcApi, + matchers: make(map[triggerID]StateMatchFunc), + } +} + +// Run each of the matchers against the previous and current state to see if +// there's a change +func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData { + we.lk.RLock() + defer we.lk.RUnlock() + + res := make(map[triggerID]eventData) + for tid, matchFn := range we.matchers { + ok, data, err := matchFn(we.ctx, oldState, newState) + if err != nil { + log.Errorf("event diff fn failed: %s", err) + continue + } + + if ok { + res[tid] = data + } + } + return res +} + +// Used to store the state change +type stateData interface{} + +// StateChangeHandler arguments: +// `ts` is the tipset, in which the change occured +// `curH`-`ts.Height` = `confidence` +type StateChangeHandler func(ctx context.Context, oldState, newState stateData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) (bool, eventData, error) + +// StateChanged registers a callback which is triggered when a specified state +// change occurs or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain state change we are waiting +// for has happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through StateChangeHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `StateChangeHandler` is called when the specified state change was observed +// on-chain, and a confidence threshold was reached, or the specified `timeout` +// height was reached with no state change observed. When this callback is +// invoked on a timeout, `oldState` and `newState` are set to nil. +// This callback returns a boolean specifying whether further notifications +// should be sent, like `more` return param from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the message dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `StateMatchFunc` is called against each tipset state. If there is a match, +// the state change is queued up until the confidence interval has elapsed (and +// `StateChangeHandler` is called) +func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { + hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + states, ok := data.([]interface{}) + if !ok || len(states) != 2 { + panic("expected 2 element array") + } + + return scHnd(we.ctx, states[0], states[1], ts, height) + } + + id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + we.lk.Lock() + we.matchers[id] = mf + defer we.lk.Unlock() + + return nil +} + +// messageEvents watches for message calls to actors +type messageEvents struct { + ctx context.Context + cs eventAPI + hcApi headChangeAPI + + lk sync.RWMutex + matchers map[triggerID][]MsgMatchFunc +} + +func newMessageEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) messageEvents { + return messageEvents{ + ctx: ctx, + cs: cs, + hcApi: hcApi, + matchers: map[triggerID][]MsgMatchFunc{}, + } +} + +// Check if there are any new actor calls +func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) { + pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here + if err != nil { + log.Errorf("getting parent tipset in checkNewCalls: %s", err) + return nil, err + } + + res := make(map[triggerID]eventData) + me.messagesForTs(pts, func(msg *types.Message) { + me.lk.RLock() + defer me.lk.RUnlock() + // TODO: provide receipts + + for tid, matchFns := range me.matchers { + var matched bool + for _, matchFn := range matchFns { + ok, err := matchFn(msg) + if err != nil { + log.Errorf("event matcher failed: %s", err) + continue + } + matched = ok + + if matched { + break + } + } + + if matched { + res[tid] = msg + break + } + } + }) + + return res, nil +} + +// Get the messages in a tipset +func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { + seen := map[cid.Cid]struct{}{} + + for _, tsb := range ts.Blocks() { + + msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) + if err != nil { + log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) + // this is quite bad, but probably better than missing all the other updates + continue + } + + for _, m := range msgs.BlsMessages { + _, ok := seen[m.Cid()] + if ok { + continue + } + seen[m.Cid()] = struct{}{} + + consume(m) + } + + for _, m := range msgs.SecpkMessages { + _, ok := seen[m.Message.Cid()] + if ok { + continue + } + seen[m.Message.Cid()] = struct{}{} + + consume(&m.Message) + } + } +} + +// MsgHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type MsgMatchFunc func(msg *types.Message) (bool, error) + +// Called registers a callback which is triggered when a specified method is +// called on an actor, or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain action we are waiting for has +// happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through MsgHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `MsgHandler` is called when the specified event was observed on-chain, +// and a confidence threshold was reached, or the specified `timeout` height +// was reached with no events observed. When this callback is invoked on a +// timeout, `msg` is set to nil. This callback returns a boolean specifying +// whether further notifications should be sent, like `more` return param +// from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the message dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `MsgMatchFunc` is called against each message. If there is a match, the +// message is queued up until the confidence interval has elapsed (and +// `MsgHandler` is called) +func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error { + hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + msg, ok := data.(*types.Message) + if data != nil && !ok { + panic("expected msg") + } + + rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key()) + if err != nil { + return false, err + } + + return msgHnd(msg, rec, ts, height) + } + + id, err := me.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + me.lk.Lock() + defer me.lk.Unlock() + me.matchers[id] = append(me.matchers[id], mf) + + return nil +} + +// Convenience function for checking and matching messages +func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { + return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage())) +} diff --git a/chain/events/utils.go b/chain/events/utils.go index d525e5368..40556c9ff 100644 --- a/chain/events/utils.go +++ b/chain/events/utils.go @@ -8,11 +8,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd CalledHandler) CheckFunc { +func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc { msg := smsg.VMMessage() return func(ts *types.TipSet) (done bool, more bool, err error) { - fa, err := e.cs.StateGetActor(ctx, msg.From, ts.Key()) + fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key()) if err != nil { return false, true, err } @@ -22,7 +22,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca return false, true, nil } - rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) + rec, err := me.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) if err != nil { return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) } @@ -33,7 +33,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca } } -func (e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { +func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc { return func(msg *types.Message) (bool, error) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce) From b62fef7541e5c022532fa2671a9ef3e37b7afe6d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 25 Jun 2020 12:12:03 -0400 Subject: [PATCH 08/25] feat: include previous TS in StateChangedHandler --- chain/events/events_hc.go | 80 +++++++----- chain/events/events_test.go | 243 +++++++++++++++++++++++++++++++++++- 2 files changed, 293 insertions(+), 30 deletions(-) diff --git a/chain/events/events_hc.go b/chain/events/events_hc.go index ae42fd85f..1f508b57f 100644 --- a/chain/events/events_hc.go +++ b/chain/events/events_hc.go @@ -13,6 +13,7 @@ import ( ) const NoTimeout = math.MaxInt64 +const NoHeight = abi.ChainEpoch(-1) type triggerID = uint64 @@ -26,9 +27,10 @@ type triggerH = abi.ChainEpoch type eventData interface{} // EventHandler arguments: -// `ts` is the tipset, in which the `msg` is included. +// `prevTs` is the previous tipset, eg the "from" tipset for a state change. +// `ts` is the event tipset, eg the tipset in which the `msg` is included. // `curH`-`ts.Height` = `confidence` -type EventHandler func(data eventData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) +type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) // CheckFunc is used for atomicity guarantees. If the condition the callbacks // wait for has already happened in tipset `ts` @@ -54,8 +56,9 @@ type handlerInfo struct { type queuedEvent struct { trigger triggerID - h abi.ChainEpoch - data eventData + prevH abi.ChainEpoch + h abi.ChainEpoch + data eventData called bool } @@ -112,6 +115,9 @@ func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidenc // Called when there is a change to the head with tipsets to be // reverted / applied func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { + e.lk.Lock() + defer e.lk.Unlock() + for _, ts := range rev { e.handleReverts(ts) e.lastTs = ts @@ -125,7 +131,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { // Queue up calls until there have been enough blocks to reach // confidence on the state changes for tid, data := range stateChanges { - e.queueForConfidence(tid, data, ts) + e.queueForConfidence(tid, data, e.lastTs, ts) } // Check if the head change included any new message calls @@ -137,11 +143,11 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { // Queue up calls until there have been enough blocks to reach // confidence on the message calls for tid, data := range newCalls { - e.queueForConfidence(tid, data, ts) + e.queueForConfidence(tid, data, nil, ts) } for at := e.lastTs.Height(); at <= ts.Height(); at++ { - // Apply any events and timeouts that were queued up until the + // Apply any queued events and timeouts that were targeted at the // current chain height e.applyWithConfidence(ts, at) e.applyTimeouts(ts) @@ -170,8 +176,7 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) { trigger := e.triggers[event.trigger] if err := trigger.revert(e.ctx, ts); err != nil { - log.Errorf("reverting chain trigger failed: %s", err) - // log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) + log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err) } } delete(e.confQueue[triggerH], ts.Height()) @@ -181,9 +186,13 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) { // Queue up events until the chain has reached a height that reflects the // desired confidence -func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.TipSet) { +func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) { trigger := e.triggers[trigID] + prevH := NoHeight + if prevTs != nil { + prevH = prevTs.Height() + } appliedH := ts.Height() triggerH := appliedH + abi.ChainEpoch(trigger.confidence) @@ -196,6 +205,7 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.T byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ trigger: trigID, + prevH: prevH, h: appliedH, data: data, }) @@ -207,7 +217,7 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.T func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { byOrigH, ok := e.confQueue[height] if !ok { - return // no triggers at thin height + return // no triggers at this height } for origH, events := range byOrigH { @@ -226,10 +236,20 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) continue } - more, err := trigger.handle(event.data, triggerTs, height) + // Previous tipset - this is relevant for example in a state change + // from one tipset to another + var prevTs *types.TipSet + if event.prevH != NoHeight { + prevTs, err = e.tsc.get(event.prevH) + if err != nil { + log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height) + continue + } + } + + more, err := trigger.handle(event.data, prevTs, triggerTs, height) if err != nil { - log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", origH, height, err) - // log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) + log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err) continue // don't revert failed calls } @@ -266,8 +286,7 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) { log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) } - // more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) - more, err := trigger.handle(nil, timeoutTs, ts.Height()) + more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) if err != nil { log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) continue // don't revert failed calls @@ -355,7 +374,7 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map res := make(map[triggerID]eventData) for tid, matchFn := range we.matchers { - ok, data, err := matchFn(we.ctx, oldState, newState) + ok, data, err := matchFn(oldState, newState) if err != nil { log.Errorf("event diff fn failed: %s", err) continue @@ -369,14 +388,19 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map } // Used to store the state change -type stateData interface{} +type stateChange interface{} + +// Will be checked to ensure it has length 2 +type stateData []stateChange // StateChangeHandler arguments: -// `ts` is the tipset, in which the change occured +// `oldTs` is the state "from" tipset +// `newTs` is the state "to" tipset +// `data` is the old / new state // `curH`-`ts.Height` = `confidence` -type StateChangeHandler func(ctx context.Context, oldState, newState stateData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) +type StateChangeHandler func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (more bool, err error) -type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) (bool, eventData, error) +type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, stateData, error) // StateChanged registers a callback which is triggered when a specified state // change occurs or a timeout is reached. @@ -400,20 +424,20 @@ type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) // // * `RevertHandler` is called after apply handler, when we drop the tipset // containing the message. The tipset passed as the argument is the tipset -// that is being dropped. Note that the message dropped may be re-applied +// that is being dropped. Note that the event dropped may be re-applied // in a different tipset in small amount of time. // // * `StateMatchFunc` is called against each tipset state. If there is a match, // the state change is queued up until the confidence interval has elapsed (and // `StateChangeHandler` is called) func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { - hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { - states, ok := data.([]interface{}) - if !ok || len(states) != 2 { - panic("expected 2 element array") + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + states, ok := data.(stateData) + if data != nil && (!ok || len(states) != 2) { + panic("StateChangeHandler: stateData passed to watcher must be a 2 element array: [old state, new state]") } - return scHnd(we.ctx, states[0], states[1], ts, height) + return scHnd(prevTs, ts, states, height) } id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) @@ -557,7 +581,7 @@ type MsgMatchFunc func(msg *types.Message) (bool, error) // message is queued up until the confidence interval has elapsed (and // `MsgHandler` is called) func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error { - hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { msg, ok := data.(*types.Message) if data != nil && !ok { panic("expected msg") diff --git a/chain/events/events_test.go b/chain/events/events_test.go index a048789ec..5185d977b 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1004,8 +1004,6 @@ func TestRemoveTriggersOnMessage(t *testing.T) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) - fmt.Println(msg == nil) - fmt.Println(curH) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { @@ -1067,3 +1065,244 @@ func TestRemoveTriggersOnMessage(t *testing.T) { require.Equal(t, true, applied) require.Equal(t, false, reverted) } + +func TestStateChanged(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var appliedData stateData + var appliedOldTs *types.TipSet + var appliedNewTs *types.TipSet + var appliedH abi.ChainEpoch + var matchData stateData + + confidence := 3 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + appliedData = data + appliedOldTs = oldTs + appliedNewTs = newTs + appliedH = curH + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + // create few blocks to make sure nothing get's randomly called + + fcs.advance(0, 4, nil) // H=5 + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create state change (but below confidence threshold) + matchData = []stateChange{"a", "b"} + fcs.advance(0, 3, nil) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create additional block so we are above confidence threshold + + fcs.advance(0, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // dip below confidence (should not apply again) + fcs.advance(2, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Change happens from 5 -> 6 + require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height()) + require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height()) + + // Actually applied (with confidence) at 9 + require.Equal(t, abi.ChainEpoch(9), appliedH) + + // Make sure the state change was correctly passed through + require.Equal(t, "a", appliedData[0]) + require.Equal(t, "b", appliedData[1]) +} + +func TestStateChangedRevert(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var matchData stateData + + confidence := 1 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + fcs.advance(0, 2, nil) // H=3 + + // Make a state change from TS at height 3 to TS at height 4 + matchData = []stateChange{"a", "b"} + fcs.advance(0, 1, nil) // H=4 + + // Haven't yet reached confidence + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Advance to reach confidence level + fcs.advance(0, 1, nil) // H=5 + + // Should now have called the handler + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // Advance 3 more TS + fcs.advance(0, 3, nil) // H=8 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress but not so far as to cause a revert + fcs.advance(3, 1, nil) // H=6 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress back to state where change happened + fcs.advance(3, 1, nil) // H=4 + + // Expect revert to have happened + require.Equal(t, false, applied) + require.Equal(t, true, reverted) +} + +func TestStateChangedTimeout(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + called := false + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + return false, stateData{}, nil + }) + + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.True(t, called) + called = false + + // with check func reporting done + + fcs = &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events = NewEvents(context.Background(), fcs) + + err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return true, true, nil + }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + return false, stateData{}, nil + }) + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.False(t, called) +} From 95a9dc9db00b41da5a8f03d3472ee77778446883 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 25 Jun 2020 12:35:14 -0400 Subject: [PATCH 09/25] refactor: use struct instead of array for state change --- chain/events/events_hc.go | 23 +++++++++++++---------- chain/events/events_test.go | 34 +++++++++++++++++----------------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/chain/events/events_hc.go b/chain/events/events_hc.go index 1f508b57f..50826db36 100644 --- a/chain/events/events_hc.go +++ b/chain/events/events_hc.go @@ -387,20 +387,23 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map return res } -// Used to store the state change -type stateChange interface{} +// Used to store the state for a stateChange +type stateData interface{} -// Will be checked to ensure it has length 2 -type stateData []stateChange +// A change in state from -> to +type stateChange struct { + from stateData + to stateData +} // StateChangeHandler arguments: // `oldTs` is the state "from" tipset // `newTs` is the state "to" tipset -// `data` is the old / new state +// `states` is the old / new state // `curH`-`ts.Height` = `confidence` -type StateChangeHandler func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (more bool, err error) +type StateChangeHandler func(oldTs, newTs *types.TipSet, states *stateChange, curH abi.ChainEpoch) (more bool, err error) -type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, stateData, error) +type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) // StateChanged registers a callback which is triggered when a specified state // change occurs or a timeout is reached. @@ -432,9 +435,9 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, stateData, error) // `StateChangeHandler` is called) func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { - states, ok := data.(stateData) - if data != nil && (!ok || len(states) != 2) { - panic("StateChangeHandler: stateData passed to watcher must be a 2 element array: [old state, new state]") + states, ok := data.(*stateChange) + if data != nil && !ok { + panic("expected *stateChange") } return scHnd(prevTs, ts, states, height) diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 5185d977b..36da2bc1e 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1081,18 +1081,18 @@ func TestStateChanged(t *testing.T) { more := true var applied, reverted bool - var appliedData stateData + var appliedData *stateChange var appliedOldTs *types.TipSet var appliedNewTs *types.TipSet var appliedH abi.ChainEpoch - var matchData stateData + var matchData *stateChange confidence := 3 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true appliedData = data @@ -1103,7 +1103,7 @@ func TestStateChanged(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil - }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { if matchData == nil { return false, matchData, nil } @@ -1121,7 +1121,7 @@ func TestStateChanged(t *testing.T) { require.Equal(t, false, reverted) // create state change (but below confidence threshold) - matchData = []stateChange{"a", "b"} + matchData = &stateChange{from: "a", to: "b"} fcs.advance(0, 3, nil) require.Equal(t, false, applied) @@ -1149,8 +1149,8 @@ func TestStateChanged(t *testing.T) { require.Equal(t, abi.ChainEpoch(9), appliedH) // Make sure the state change was correctly passed through - require.Equal(t, "a", appliedData[0]) - require.Equal(t, "b", appliedData[1]) + require.Equal(t, "a", appliedData.from) + require.Equal(t, "b", appliedData.to) } func TestStateChangedRevert(t *testing.T) { @@ -1168,21 +1168,21 @@ func TestStateChangedRevert(t *testing.T) { more := true var applied, reverted bool - var matchData stateData + var matchData *stateChange confidence := 1 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil - }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { if matchData == nil { return false, matchData, nil } @@ -1196,7 +1196,7 @@ func TestStateChangedRevert(t *testing.T) { fcs.advance(0, 2, nil) // H=3 // Make a state change from TS at height 3 to TS at height 4 - matchData = []stateChange{"a", "b"} + matchData = &stateChange{from: "a", to: "b"} fcs.advance(0, 1, nil) // H=4 // Haven't yet reached confidence @@ -1248,7 +1248,7 @@ func TestStateChangedTimeout(t *testing.T) { err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, data) require.Equal(t, abi.ChainEpoch(20), newTs.Height()) @@ -1257,8 +1257,8 @@ func TestStateChangedTimeout(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { - return false, stateData{}, nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + return false, nil, nil }) require.NoError(t, err) @@ -1286,7 +1286,7 @@ func TestStateChangedTimeout(t *testing.T) { err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil - }, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, data) require.Equal(t, abi.ChainEpoch(20), newTs.Height()) @@ -1295,8 +1295,8 @@ func TestStateChangedTimeout(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) { - return false, stateData{}, nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + return false, nil, nil }) require.NoError(t, err) From 728afc0587d2893a1d4674e98e4d44c21fc74842 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 25 Jun 2020 12:46:43 -0400 Subject: [PATCH 10/25] refactor: remove state API example --- chain/events/state/state.go | 57 ------------------------------------- 1 file changed, 57 deletions(-) delete mode 100644 chain/events/state/state.go diff --git a/chain/events/state/state.go b/chain/events/state/state.go deleted file mode 100644 index 75041ad79..000000000 --- a/chain/events/state/state.go +++ /dev/null @@ -1,57 +0,0 @@ -package state - -import ( - "context" - - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/specs-actors/actors/abi" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" -) - -type StateWatcher struct { -} - -type WatcherAPI interface { - ChainNotify(context.Context) (<-chan []*api.HeadChange, error) -} - -type UserData interface{} - -type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) - -type Callback func(ctx context.Context, oldState, newState *types.TipSet, events interface{}) - -type RevertHandler func(ctx context.Context, ts *types.TipSet) error - -/* -w := NewWatcher(api, OnActorChange(t04, OnDealStateChange(123)), cb) -*/ -func NewWatcher(ctx context.Context, api WatcherAPI, d DiffFunc, apply Callback, revert RevertHandler, confidence, timeout abi.ChainEpoch) { - go func() { - notifs, err := api.ChainNotify(ctx) - if err != nil { - // bad - return - } - - curTs := (<-notifs)[0].Val - d(ctx, curTs, curTs) - - for { - select { - case update := <-notifs: - for i, change := range update { - switch change.Type { - case store.HCApply: - d(ctx, curTs, change.Val) - case store.HCRevert: - - } - } - } - } - - }() -} From e3c897fb68ff787095b1db4a0939e94f5af7491b Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Wed, 17 Jun 2020 01:10:29 -0400 Subject: [PATCH 11/25] Change StateReadState to take an address, not an actor --- api/api_full.go | 2 +- api/apistruct/struct.go | 6 +++--- cli/state.go | 7 +------ cmd/lotus-chainwatch/sync.go | 6 ++++-- node/impl/full/state.go | 7 ++++++- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 17b0fdce6..d5a256bd6 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -206,7 +206,7 @@ type FullNode interface { StateCall(context.Context, *types.Message, types.TipSetKey) (*InvocResult, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) - StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*ActorState, error) + StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error) StateListMessages(ctx context.Context, match *types.Message, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error) StateNetworkName(context.Context) (dtypes.NetworkName, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 0c0acd05e..591bcb4a3 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -139,7 +139,7 @@ type FullNodeStruct struct { StateCall func(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) `perm:"read"` StateReplay func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) `perm:"read"` StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"` - StateReadState func(context.Context, *types.Actor, types.TipSetKey) (*api.ActorState, error) `perm:"read"` + StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"` StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"` StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"` @@ -618,8 +618,8 @@ func (c *FullNodeStruct) StateGetActor(ctx context.Context, actor address.Addres return c.Internal.StateGetActor(ctx, actor, tsk) } -func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { - return c.Internal.StateReadState(ctx, act, tsk) +func (c *FullNodeStruct) StateReadState(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.ActorState, error) { + return c.Internal.StateReadState(ctx, addr, tsk) } func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, tsk types.TipSetKey) (types.BigInt, error) { diff --git a/cli/state.go b/cli/state.go index 1aad85299..a159f2272 100644 --- a/cli/state.go +++ b/cli/state.go @@ -742,12 +742,7 @@ var stateReadStateCmd = &cli.Command{ return err } - act, err := api.StateGetActor(ctx, addr, ts.Key()) - if err != nil { - return err - } - - as, err := api.StateReadState(ctx, act, ts.Key()) + as, err := api.StateReadState(ctx, addr, ts.Key()) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 88afb647e..059dcf9d6 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -161,7 +161,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Error(err) return } - ast, err := api.StateReadState(ctx, act, genesisTs.Key()) + + ast, err := api.StateReadState(ctx, addr, genesisTs.Key()) if err != nil { log.Error(err) return @@ -210,7 +211,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - ast, err := api.StateReadState(ctx, &act, pts.Key()) + ast, err := api.StateReadState(ctx, addr, pts.Key()) + if err != nil { log.Error(err) return diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 048ae7858..43e16d34f 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -300,7 +300,7 @@ func (a *StateAPI) StateAccountKey(ctx context.Context, addr address.Address, ts return a.StateManager.ResolveToKeyAddress(ctx, addr, ts) } -func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk types.TipSetKey) (*api.ActorState, error) { +func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) @@ -310,6 +310,11 @@ func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, tsk typ return nil, err } + act, err := state.GetActor(actor) + if err != nil { + return nil, err + } + blk, err := state.Store.(*cbor.BasicIpldStore).Blocks.Get(act.Head) if err != nil { return nil, err From 50aa1e6baa5d6fce83b573bfec82e4cc87b4f5e5 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 25 Jun 2020 00:20:15 -0400 Subject: [PATCH 12/25] Update bitfield --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 404c2d46c..1f60da467 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2 - github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e + github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.3.0 diff --git a/go.sum b/go.sum index 2b8a31ba6..33dc25268 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mo github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01 h1:k/FyoahW7Pvqi8p8YF7Np8YcK1XWGJ8TlR1mEICly3E= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= From e54b49b442c2bbd0f44b6aca14102a4a5c78f93b Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 25 Jun 2020 13:48:50 -0400 Subject: [PATCH 13/25] Bump API version --- build/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/version.go b/build/version.go index 03d5c0792..fc64eae71 100644 --- a/build/version.go +++ b/build/version.go @@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool { } // APIVersion is a semver version of the rpc api exposed -var APIVersion Version = newVer(0, 4, 0) +var APIVersion Version = newVer(0, 5, 0) //nolint:varcheck,deadcode const ( From abad4a39415175f67f3d6a85d929d01297efc915 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 25 Jun 2020 17:43:37 -0400 Subject: [PATCH 14/25] refactor: modify predicates API --- chain/events/events_called.go | 359 ------------------------------- chain/events/events_hc.go | 20 +- chain/events/events_test.go | 36 ++-- chain/events/state/predicates.go | 77 ++++--- 4 files changed, 73 insertions(+), 419 deletions(-) delete mode 100644 chain/events/events_called.go diff --git a/chain/events/events_called.go b/chain/events/events_called.go deleted file mode 100644 index 0bae99404..000000000 --- a/chain/events/events_called.go +++ /dev/null @@ -1,359 +0,0 @@ -package events - -import ( - "context" - "math" - "sync" - - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/chain/types" -) - -const NoTimeout = math.MaxInt64 - -type triggerID = uint64 - -// msgH is the block height at which a message was present / event has happened -type msgH = abi.ChainEpoch - -// triggerH is the block height at which the listener will be notified about the -// message (msgH+confidence) -type triggerH = abi.ChainEpoch - -// CalledHandler arguments: -// `ts` is the tipset, in which the `msg` is included. -// `curH`-`ts.Height` = `confidence` -type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) - -// CheckFunc is used for atomicity guarantees. If the condition the callbacks -// wait for has already happened in tipset `ts` -// -// If `done` is true, timeout won't be triggered -// If `more` is false, no messages will be sent to CalledHandler (RevertHandler -// may still be called) -type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) - -type MatchFunc func(msg *types.Message) (bool, error) - -type callHandler struct { - confidence int - timeout abi.ChainEpoch - - disabled bool // TODO: GC after gcConfidence reached - - handle CalledHandler - revert RevertHandler -} - -type queuedEvent struct { - trigger triggerID - - h abi.ChainEpoch - msg *types.Message - - called bool -} - -type calledEvents struct { - cs eventAPI - tsc *tipSetCache - ctx context.Context - gcConfidence uint64 - - at abi.ChainEpoch - - lk sync.Mutex - - ctr triggerID - - triggers map[triggerID]*callHandler - matchers map[triggerID][]MatchFunc - - // maps block heights to events - // [triggerH][msgH][event] - confQueue map[triggerH]map[msgH][]*queuedEvent - - // [msgH][triggerH] - revertQueue map[msgH][]triggerH - - // [timeoutH+confidence][triggerID]{calls} - timeouts map[abi.ChainEpoch]map[triggerID]int -} - -func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { - e.lk.Lock() - defer e.lk.Unlock() - - for _, ts := range rev { - e.handleReverts(ts) - e.at = ts.Height() - } - - for _, ts := range app { - // called triggers - e.checkNewCalls(ts) - for ; e.at <= ts.Height(); e.at++ { - e.applyWithConfidence(ts, e.at) - e.applyTimeouts(ts) - } - } - - return nil -} - -func (e *calledEvents) handleReverts(ts *types.TipSet) { - reverts, ok := e.revertQueue[ts.Height()] - if !ok { - return // nothing to do - } - - for _, triggerH := range reverts { - toRevert := e.confQueue[triggerH][ts.Height()] - for _, event := range toRevert { - if !event.called { - continue // event wasn't apply()-ied yet - } - - trigger := e.triggers[event.trigger] - - if err := trigger.revert(e.ctx, ts); err != nil { - log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) - } - } - delete(e.confQueue[triggerH], ts.Height()) - } - delete(e.revertQueue, ts.Height()) -} - -func (e *calledEvents) checkNewCalls(ts *types.TipSet) { - pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here - if err != nil { - log.Errorf("getting parent tipset in checkNewCalls: %s", err) - return - } - - e.messagesForTs(pts, func(msg *types.Message) { - // TODO: provide receipts - for tid, matchFns := range e.matchers { - var matched bool - for _, matchFn := range matchFns { - ok, err := matchFn(msg) - if err != nil { - log.Errorf("event matcher failed: %s", err) - continue - } - matched = ok - - if matched { - break - } - } - - if matched { - e.queueForConfidence(tid, msg, ts) - break - } - } - }) -} - -func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts *types.TipSet) { - trigger := e.triggers[trigID] - - appliedH := ts.Height() - - triggerH := appliedH + abi.ChainEpoch(trigger.confidence) - - byOrigH, ok := e.confQueue[triggerH] - if !ok { - byOrigH = map[abi.ChainEpoch][]*queuedEvent{} - e.confQueue[triggerH] = byOrigH - } - - byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ - trigger: trigID, - h: appliedH, - msg: msg, - }) - - e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) -} - -func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { - byOrigH, ok := e.confQueue[height] - if !ok { - return // no triggers at thin height - } - - for origH, events := range byOrigH { - triggerTs, err := e.tsc.get(origH) - if err != nil { - log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height) - } - - for _, event := range events { - if event.called { - continue - } - - trigger := e.triggers[event.trigger] - if trigger.disabled { - continue - } - - rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts.Key()) - if err != nil { - log.Error(err) - return - } - - more, err := trigger.handle(event.msg, rec, triggerTs, height) - if err != nil { - log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) - continue // don't revert failed calls - } - - event.called = true - - touts, ok := e.timeouts[trigger.timeout] - if ok { - touts[event.trigger]++ - } - - trigger.disabled = !more - } - } -} - -func (e *calledEvents) applyTimeouts(ts *types.TipSet) { - triggers, ok := e.timeouts[ts.Height()] - if !ok { - return // nothing to do - } - - for triggerID, calls := range triggers { - if calls > 0 { - continue // don't timeout if the method was called - } - trigger := e.triggers[triggerID] - if trigger.disabled { - continue - } - - timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence)) - if err != nil { - log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) - } - - more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) - if err != nil { - log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) - continue // don't revert failed calls - } - - trigger.disabled = !more // allows messages after timeout - } -} - -func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { - seen := map[cid.Cid]struct{}{} - - for _, tsb := range ts.Blocks() { - - msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) - if err != nil { - log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) - // this is quite bad, but probably better than missing all the other updates - continue - } - - for _, m := range msgs.BlsMessages { - _, ok := seen[m.Cid()] - if ok { - continue - } - seen[m.Cid()] = struct{}{} - - consume(m) - } - - for _, m := range msgs.SecpkMessages { - _, ok := seen[m.Message.Cid()] - if ok { - continue - } - seen[m.Message.Cid()] = struct{}{} - - consume(&m.Message) - } - } -} - -// Called registers a callbacks which are triggered when a specified method is -// called on an actor, or a timeout is reached. -// -// * `CheckFunc` callback is invoked immediately with a recent tipset, it -// returns two booleans - `done`, and `more`. -// -// * `done` should be true when some on-chain action we are waiting for has -// happened. When `done` is set to true, timeout trigger is disabled. -// -// * `more` should be false when we don't want to receive new notifications -// through CalledHandler. Note that notifications may still be delivered to -// RevertHandler -// -// * `CalledHandler` is called when the specified event was observed on-chain, -// and a confidence threshold was reached, or the specified `timeout` height -// was reached with no events observed. When this callback is invoked on a -// timeout, `msg` is set to nil. This callback returns a boolean specifying -// whether further notifications should be sent, like `more` return param -// from `CheckFunc` above. -// -// * `RevertHandler` is called after apply handler, when we drop the tipset -// containing the message. The tipset passed as the argument is the tipset -// that is being dropped. Note that the message dropped may be re-applied -// in a different tipset in small amount of time. -func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MatchFunc) error { - e.lk.Lock() - defer e.lk.Unlock() - - ts := e.tsc.best() - done, more, err := check(ts) - if err != nil { - return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) - } - if done { - timeout = NoTimeout - } - - id := e.ctr - e.ctr++ - - e.triggers[id] = &callHandler{ - confidence: confidence, - timeout: timeout + abi.ChainEpoch(confidence), - - disabled: !more, - - handle: hnd, - revert: rev, - } - - e.matchers[id] = append(e.matchers[id], mf) - - if timeout != NoTimeout { - if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { - e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} - } - e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 - } - - return nil -} - -func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { - return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage())) -} diff --git a/chain/events/events_hc.go b/chain/events/events_hc.go index 50826db36..01f5fdfcc 100644 --- a/chain/events/events_hc.go +++ b/chain/events/events_hc.go @@ -387,23 +387,17 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map return res } -// Used to store the state for a stateChange -type stateData interface{} - -// A change in state from -> to -type stateChange struct { - from stateData - to stateData -} +// A change in state +type StateChange interface{} // StateChangeHandler arguments: // `oldTs` is the state "from" tipset // `newTs` is the state "to" tipset -// `states` is the old / new state +// `states` is the change in state // `curH`-`ts.Height` = `confidence` -type StateChangeHandler func(oldTs, newTs *types.TipSet, states *stateChange, curH abi.ChainEpoch) (more bool, err error) +type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error) -type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) +type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error) // StateChanged registers a callback which is triggered when a specified state // change occurs or a timeout is reached. @@ -435,9 +429,9 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) // `StateChangeHandler` is called) func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { - states, ok := data.(*stateChange) + states, ok := data.(StateChange) if data != nil && !ok { - panic("expected *stateChange") + panic("expected StateChange") } return scHnd(prevTs, ts, states, height) diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 36da2bc1e..2bb5b0916 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1066,6 +1066,11 @@ func TestRemoveTriggersOnMessage(t *testing.T) { require.Equal(t, false, reverted) } +type testStateChange struct { + from string + to string +} + func TestStateChanged(t *testing.T) { fcs := &fakeCS{ t: t, @@ -1081,18 +1086,18 @@ func TestStateChanged(t *testing.T) { more := true var applied, reverted bool - var appliedData *stateChange + var appliedData StateChange var appliedOldTs *types.TipSet var appliedNewTs *types.TipSet var appliedH abi.ChainEpoch - var matchData *stateChange + var matchData StateChange confidence := 3 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true appliedData = data @@ -1103,7 +1108,7 @@ func TestStateChanged(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil - }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { if matchData == nil { return false, matchData, nil } @@ -1121,7 +1126,7 @@ func TestStateChanged(t *testing.T) { require.Equal(t, false, reverted) // create state change (but below confidence threshold) - matchData = &stateChange{from: "a", to: "b"} + matchData = testStateChange{from: "a", to: "b"} fcs.advance(0, 3, nil) require.Equal(t, false, applied) @@ -1149,8 +1154,9 @@ func TestStateChanged(t *testing.T) { require.Equal(t, abi.ChainEpoch(9), appliedH) // Make sure the state change was correctly passed through - require.Equal(t, "a", appliedData.from) - require.Equal(t, "b", appliedData.to) + rcvd := appliedData.(testStateChange) + require.Equal(t, "a", rcvd.from) + require.Equal(t, "b", rcvd.to) } func TestStateChangedRevert(t *testing.T) { @@ -1168,21 +1174,21 @@ func TestStateChangedRevert(t *testing.T) { more := true var applied, reverted bool - var matchData *stateChange + var matchData StateChange confidence := 1 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil - }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { if matchData == nil { return false, matchData, nil } @@ -1196,7 +1202,7 @@ func TestStateChangedRevert(t *testing.T) { fcs.advance(0, 2, nil) // H=3 // Make a state change from TS at height 3 to TS at height 4 - matchData = &stateChange{from: "a", to: "b"} + matchData = testStateChange{from: "a", to: "b"} fcs.advance(0, 1, nil) // H=4 // Haven't yet reached confidence @@ -1248,7 +1254,7 @@ func TestStateChangedTimeout(t *testing.T) { err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, data) require.Equal(t, abi.ChainEpoch(20), newTs.Height()) @@ -1257,7 +1263,7 @@ func TestStateChangedTimeout(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { return false, nil, nil }) @@ -1286,7 +1292,7 @@ func TestStateChangedTimeout(t *testing.T) { err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil - }, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) { + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, data) require.Equal(t, abi.ChainEpoch(20), newTs.Height()) @@ -1295,7 +1301,7 @@ func TestStateChangedTimeout(t *testing.T) { }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) { + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { return false, nil, nil }) require.NoError(t, err) diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index e9d9a7393..df1bcde30 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -2,38 +2,49 @@ package state import ( "context" - - "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" ) -type StatePredicates struct { - sm *stmgr.StateManager +type UserData interface{} + +type ChainApi interface { + ChainHasObj(context.Context, cid.Cid) (bool, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) } -func NewStatePredicates(sm *stmgr.StateManager) *StatePredicates { +type StatePredicates struct { + api ChainApi + cst *cbor.BasicIpldStore +} + +func NewStatePredicates(api ChainApi) *StatePredicates { return &StatePredicates{ - sm: sm, + api: api, + cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)), } } +type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) + type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { - oldActor, err := sp.sm.GetActor(addr, oldState) + oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) if err != nil { return false, nil, err } - newActor, err := sp.sm.GetActor(addr, newState) + newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) if oldActor.Head.Equals(newActor.Head) { return false, nil, nil } @@ -46,12 +57,11 @@ type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State func (sp *StatePredicates) OnStorageMarketActorChanged(addr address.Address, diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State - cst := cbor.NewCborStore(sp.sm.ChainStore().Blockstore()) - if err := cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { return false, nil, err } var newState market.State - if err := cst.Get(ctx, newActorStateHead, &newActorStateHead); err != nil { + if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil { return false, nil, err } return diffStorageMarketState(ctx, &oldState, &newState) @@ -65,37 +75,40 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) if oldState.States.Equals(newState.States) { return false, nil, nil } - blks := cbor.NewCborStore(sp.sm.ChainStore().Blockstore()) - oldRoot, err := amt.LoadAMT(ctx, blks, oldState.States) + + oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) if err != nil { return false, nil, err } - newRoot, err := amt.LoadAMT(ctx, blks, newState.States) + newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) if err != nil { return false, nil, err } + return diffDealStates(ctx, oldRoot, newRoot) } } -func (sp *StatePredicates) DealStateChangedForIDs(ctx context.Context, dealIds []abi.DealID, oldRoot, newRoot *amt.Root) (changed bool, user UserData, err error) { - var changedDeals []abi.DealID - for _, dealId := range dealIds { - var oldDeal, newDeal market.DealState - err := oldRoot.Get(ctx, uint64(dealId), &oldDeal) - if err != nil { - return false, nil, err +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { + return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { + var changedDeals []abi.DealID + for _, dealId := range dealIds { + var oldDeal, newDeal market.DealState + err := oldDealStateRoot.Get(ctx, uint64(dealId), &oldDeal) + if err != nil { + return false, nil, err + } + err = newDealStateRoot.Get(ctx, uint64(dealId), &newDeal) + if err != nil { + return false, nil, err + } + if oldDeal != newDeal { + changedDeals = append(changedDeals, dealId) + } } - err = newRoot.Get(ctx, uint64(dealId), &newDeal) - if err != nil { - return false, nil, err - } - if oldDeal != newDeal { - changedDeals = append(changedDeals, dealId) + if len(changedDeals) > 0 { + return true, changed, nil } + return false, nil, nil } - if len(changedDeals) > 0 { - return true, changed, nil - } - return false, nil, nil } From 38533631bf8c136d3cb164a236f6d3f4dea50546 Mon Sep 17 00:00:00 2001 From: Rob Quist Date: Fri, 26 Jun 2020 02:09:39 +0200 Subject: [PATCH 15/25] Bump to Go v. 1.14 as requirement --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3038e929c..b143793aa 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ all: build unexport GOFLAGS GOVERSION:=$(shell go version | cut -d' ' -f 3 | cut -d. -f 2) -ifeq ($(shell expr $(GOVERSION) \< 13), 1) +ifeq ($(shell expr $(GOVERSION) \< 14), 1) $(warning Your Golang version is go 1.$(GOVERSION)) $(error Update Golang to version $(shell grep '^go' go.mod)) endif From eafb04004ef67da5879e84eb3c07e2a9f7fdad11 Mon Sep 17 00:00:00 2001 From: Alexey Date: Fri, 26 Jun 2020 09:06:01 +0300 Subject: [PATCH 16/25] Fix to addresses in paychmgr channel creation --- paychmgr/paych.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 597663a94..763c448f9 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -76,7 +76,7 @@ func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) return err } from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) if err != nil { return err } @@ -114,7 +114,7 @@ func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Addre return nil, err } from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) if err != nil { return nil, err } From 393a9ca4f2457758cbaef59ceec44f5539786640 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Jun 2020 11:51:45 -0400 Subject: [PATCH 17/25] test: predicates test --- chain/events/state/predicates.go | 18 ++- chain/events/state/predicates_test.go | 176 ++++++++++++++++++++++++++ 2 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 chain/events/state/predicates_test.go diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index df1bcde30..014a7ecc3 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -17,8 +17,7 @@ import ( type UserData interface{} type ChainApi interface { - ChainHasObj(context.Context, cid.Cid) (bool, error) - ChainReadObj(context.Context, cid.Cid) ([]byte, error) + apibstore.ChainIO StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) } @@ -54,7 +53,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) -func (sp *StatePredicates) OnStorageMarketActorChanged(addr address.Address, diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { +func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { @@ -89,9 +88,16 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } +type ChangedDeals map[abi.DealID]DealStateChange + +type DealStateChange struct { + From market.DealState + To market.DealState +} + func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { - var changedDeals []abi.DealID + changedDeals := make(ChangedDeals) for _, dealId := range dealIds { var oldDeal, newDeal market.DealState err := oldDealStateRoot.Get(ctx, uint64(dealId), &oldDeal) @@ -103,11 +109,11 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal return false, nil, err } if oldDeal != newDeal { - changedDeals = append(changedDeals, dealId) + changedDeals[dealId] = DealStateChange{oldDeal, newDeal} } } if len(changedDeals) > 0 { - return true, changed, nil + return true, changedDeals, nil } return false, nil, nil } diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go new file mode 100644 index 000000000..190f5bd2a --- /dev/null +++ b/chain/events/state/predicates_test.go @@ -0,0 +1,176 @@ +package state + +import ( + "context" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-hamt-ipld" + "testing" + + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +type mockApi struct { + ts map[types.TipSetKey]*types.Actor + bs bstore.Blockstore +} + +func newMockApi(bs bstore.Blockstore) *mockApi { + return &mockApi{ + bs: bs, + ts: make(map[types.TipSetKey]*types.Actor), + } +} + +func (m mockApi) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + return m.bs.Has(c) +} + +func (m mockApi) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + blk, err := m.bs.Get(c) + if err != nil { + return nil, xerrors.Errorf("blockstore get: %w", err) + } + + return blk.RawData(), nil +} + +func (m mockApi) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + return m.ts[tsk], nil +} + +func (m mockApi) setActor(tsk types.TipSetKey, act *types.Actor) { + m.ts[tsk] = act +} + +func TestPredicates(t *testing.T) { + ctx := context.Background() + + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + + oldDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + SlashEpoch: 0, + }, + } + oldStateC := createMarketState(t, store, oldDeals, ctx) + + newDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 3, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 6, + SlashEpoch: 6, + }, + } + newStateC := createMarketState(t, store, newDeals, ctx) + + miner, err := address.NewFromString("t00") + require.NoError(t, err) + oldState, err := mockTipset(miner, 1) + require.NoError(t, err) + newState, err := mockTipset(miner, 2) + require.NoError(t, err) + + api := newMockApi(bs) + api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) + api.setActor(newState.Key(), &types.Actor{Head: newStateC}) + + preds := NewStatePredicates(api) + + dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} + diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) + + // Diff a state against itself: expect no change + changed, _, err := diffFn(ctx, oldState, oldState) + require.NoError(t, err) + require.False(t, changed) + + // Diff old state against new state + changed, val, err := diffFn(ctx, oldState, newState) + require.NoError(t, err) + require.True(t, changed) + + changedDeals, ok := val.(ChangedDeals) + require.True(t, ok) + require.Len(t, changedDeals, 2) + require.Contains(t, changedDeals, abi.DealID(1)) + require.Contains(t, changedDeals, abi.DealID(2)) + deal1 := changedDeals[abi.DealID(1)] + if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } + deal2 := changedDeals[abi.DealID(2)] + if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } +} + +func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { + return types.NewTipSet([]*types.BlockHeader{{ + Miner: miner, + Height: 5, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + Timestamp: timestamp, + }}) +} + +func createMarketState(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { + rootCid := createAMT(t, store, deals, ctx) + + emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) + require.NoError(t, err) + emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5))) + require.NoError(t, err) + state := market.ConstructState(emptyArrayCid, emptyMap, emptyMap) + state.States = rootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} + +func createAMT(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { + root := amt.NewAMT(store) + for dealId, dealState := range deals { + err := root.Set(ctx, uint64(dealId), dealState) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} From a1b009328dea6ed79797d3b88adce292a2943c1b Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Jun 2020 14:43:46 -0400 Subject: [PATCH 18/25] refactor: simplify diff --- chain/events/{events_hc.go => events_called.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename chain/events/{events_hc.go => events_called.go} (100%) diff --git a/chain/events/events_hc.go b/chain/events/events_called.go similarity index 100% rename from chain/events/events_hc.go rename to chain/events/events_called.go From db1773d70804bc4ec5c14fe36a4ac359bfba12f7 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Jun 2020 14:59:23 -0400 Subject: [PATCH 19/25] docs: add predicate docs --- chain/events/events_called.go | 2 +- chain/events/state/predicates.go | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 01f5fdfcc..1306b26ec 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -306,7 +306,7 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa e.lk.Lock() defer e.lk.Unlock() - // Check if the even has already occurred + // Check if the event has already occurred ts := e.tsc.best() done, more, err := check(ts) if err != nil { diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 014a7ecc3..21d43adab 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -14,13 +14,16 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/market" ) +// Data returned from the DiffFunc type UserData interface{} +// The calls made by this class external APIs type ChainApi interface { apibstore.ChainIO StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) } +// Use StatePredicates to respond to state changes type StatePredicates struct { api ChainApi cst *cbor.BasicIpldStore @@ -33,10 +36,15 @@ func NewStatePredicates(api ChainApi) *StatePredicates { } } +// Check if there's a change form oldState to newState, and return +// - changed: was there a change +// - user: user-defined data representing the state change +// - err type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) +// Calls diffStateFunc when the state changes for the given actor func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) @@ -53,6 +61,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) +// Calls diffStorageMarketState when the state changes for the market actor func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State @@ -69,6 +78,7 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) +// Calls diffDealStates when the market state changes func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { if oldState.States.Equals(newState.States) { @@ -88,13 +98,16 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } +// A set of changes to deal state type ChangedDeals map[abi.DealID]DealStateChange +// Change in deal state from -> to type DealStateChange struct { From market.DealState To market.DealState } +// Detect changes in the deal state AMT for the given deal IDs func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) From bbb9a9cd1aa9e83a5a5ead62f9004ff79dd8b89c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Jun 2020 15:36:48 -0400 Subject: [PATCH 20/25] refactor: lint fixes --- chain/events/events.go | 2 +- chain/events/events_called.go | 24 +++++++------- chain/events/events_test.go | 2 +- chain/events/state/predicates.go | 46 +++++++++++++++------------ chain/events/state/predicates_test.go | 33 +++++++++---------- 5 files changed, 56 insertions(+), 51 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index 408c8845e..c1ef07a4e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -74,7 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + hcEvents: *newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 1306b26ec..3d8e05c02 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -93,7 +93,7 @@ type hcEvents struct { watcherEvents } -func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) hcEvents { +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents { e := hcEvents{ ctx: ctx, cs: cs, @@ -109,7 +109,7 @@ func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidenc e.messageEvents = newMessageEvents(ctx, &e, cs) e.watcherEvents = newWatcherEvents(ctx, &e, cs) - return e + return &e } // Called when there is a change to the head with tipsets to be @@ -297,7 +297,7 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) { } // Listen for an event -// - CheckFunc: immediately checks if the event already occured +// - CheckFunc: immediately checks if the event already occurred // - EventHandler: called when the event has occurred, after confidence tipsets // - RevertHandler: called if the chain head changes causing the event to revert // - confidence: wait this many tipsets before calling EventHandler @@ -351,17 +351,17 @@ type headChangeAPI interface { type watcherEvents struct { ctx context.Context cs eventAPI - hcApi headChangeAPI + hcAPI headChangeAPI lk sync.RWMutex matchers map[triggerID]StateMatchFunc } -func newWatcherEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) watcherEvents { +func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) watcherEvents { return watcherEvents{ ctx: ctx, cs: cs, - hcApi: hcApi, + hcAPI: hcAPI, matchers: make(map[triggerID]StateMatchFunc), } } @@ -387,7 +387,7 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map return res } -// A change in state +// StateChange represents a change in state type StateChange interface{} // StateChangeHandler arguments: @@ -437,7 +437,7 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, return scHnd(prevTs, ts, states, height) } - id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) if err != nil { return err } @@ -453,17 +453,17 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, type messageEvents struct { ctx context.Context cs eventAPI - hcApi headChangeAPI + hcAPI headChangeAPI lk sync.RWMutex matchers map[triggerID][]MsgMatchFunc } -func newMessageEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) messageEvents { +func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents { return messageEvents{ ctx: ctx, cs: cs, - hcApi: hcApi, + hcAPI: hcAPI, matchers: map[triggerID][]MsgMatchFunc{}, } } @@ -592,7 +592,7 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa return msgHnd(msg, rec, ts, height) } - id, err := me.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) if err != nil { return err } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 2bb5b0916..5798fb75c 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1068,7 +1068,7 @@ func TestRemoveTriggersOnMessage(t *testing.T) { type testStateChange struct { from string - to string + to string } func TestStateChanged(t *testing.T) { diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 21d43adab..3245d5c03 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -2,41 +2,41 @@ package state import ( "context" - cbor "github.com/ipfs/go-ipld-cbor" - "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" ) -// Data returned from the DiffFunc +// UserData is the data returned from the DiffFunc type UserData interface{} -// The calls made by this class external APIs -type ChainApi interface { +// ChainAPI abstracts out calls made by this class to external APIs +type ChainAPI interface { apibstore.ChainIO StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) } -// Use StatePredicates to respond to state changes +// StatePredicates has common predicates for responding to state changes type StatePredicates struct { - api ChainApi + api ChainAPI cst *cbor.BasicIpldStore } -func NewStatePredicates(api ChainApi) *StatePredicates { +func NewStatePredicates(api ChainAPI) *StatePredicates { return &StatePredicates{ api: api, cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)), } } -// Check if there's a change form oldState to newState, and return +// DiffFunc check if there's a change form oldState to newState, and returns // - changed: was there a change // - user: user-defined data representing the state change // - err @@ -44,7 +44,7 @@ type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (chang type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) -// Calls diffStateFunc when the state changes for the given actor +// OnActorStateChanged calls diffStateFunc when the state changes for the given actor func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) @@ -52,6 +52,10 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu return false, nil, err } newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + if err != nil { + return false, nil, err + } + if oldActor.Head.Equals(newActor.Head) { return false, nil, nil } @@ -61,7 +65,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) -// Calls diffStorageMarketState when the state changes for the market actor +// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State @@ -78,7 +82,7 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) -// Calls diffDealStates when the market state changes +// OnDealStateChanged calls diffDealStates when the market state changes func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { if oldState.States.Equals(newState.States) { @@ -98,31 +102,31 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } -// A set of changes to deal state +// ChangedDeals is a set of changes to deal state type ChangedDeals map[abi.DealID]DealStateChange -// Change in deal state from -> to +// DealStateChange is a change in deal state from -> to type DealStateChange struct { From market.DealState - To market.DealState + To market.DealState } -// Detect changes in the deal state AMT for the given deal IDs +// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) - for _, dealId := range dealIds { + for _, dealID := range dealIds { var oldDeal, newDeal market.DealState - err := oldDealStateRoot.Get(ctx, uint64(dealId), &oldDeal) + err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) if err != nil { return false, nil, err } - err = newDealStateRoot.Get(ctx, uint64(dealId), &newDeal) + err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) if err != nil { return false, nil, err } if oldDeal != newDeal { - changedDeals[dealId] = DealStateChange{oldDeal, newDeal} + changedDeals[dealID] = DealStateChange{oldDeal, newDeal} } } if len(changedDeals) > 0 { diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 190f5bd2a..1c10209a8 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -2,9 +2,10 @@ package state import ( "context" + "testing" + "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/ipfs/go-hamt-ipld" - "testing" "github.com/filecoin-project/go-amt-ipld/v2" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -28,23 +29,23 @@ func init() { dummyCid, _ = cid.Parse("bafkqaaa") } -type mockApi struct { +type mockAPI struct { ts map[types.TipSetKey]*types.Actor bs bstore.Blockstore } -func newMockApi(bs bstore.Blockstore) *mockApi { - return &mockApi{ +func newMockAPI(bs bstore.Blockstore) *mockAPI { + return &mockAPI{ bs: bs, ts: make(map[types.TipSetKey]*types.Actor), } } -func (m mockApi) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { +func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { return m.bs.Has(c) } -func (m mockApi) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { +func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { blk, err := m.bs.Get(c) if err != nil { return nil, xerrors.Errorf("blockstore get: %w", err) @@ -53,11 +54,11 @@ func (m mockApi) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { return blk.RawData(), nil } -func (m mockApi) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { +func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { return m.ts[tsk], nil } -func (m mockApi) setActor(tsk types.TipSetKey, act *types.Actor) { +func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { m.ts[tsk] = act } @@ -79,7 +80,7 @@ func TestPredicates(t *testing.T) { SlashEpoch: 0, }, } - oldStateC := createMarketState(t, store, oldDeals, ctx) + oldStateC := createMarketState(ctx, t, store, oldDeals) newDeals := map[abi.DealID]*market.DealState{ abi.DealID(1): { @@ -93,7 +94,7 @@ func TestPredicates(t *testing.T) { SlashEpoch: 6, }, } - newStateC := createMarketState(t, store, newDeals, ctx) + newStateC := createMarketState(ctx, t, store, newDeals) miner, err := address.NewFromString("t00") require.NoError(t, err) @@ -102,7 +103,7 @@ func TestPredicates(t *testing.T) { newState, err := mockTipset(miner, 2) require.NoError(t, err) - api := newMockApi(bs) + api := newMockAPI(bs) api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) api.setActor(newState.Key(), &types.Actor{Head: newStateC}) @@ -149,8 +150,8 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) }}) } -func createMarketState(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { - rootCid := createAMT(t, store, deals, ctx) +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + rootCid := createAMT(ctx, t, store, deals) emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) require.NoError(t, err) @@ -164,10 +165,10 @@ func createMarketState(t *testing.T, store *cbornode.BasicIpldStore, deals map[a return stateC } -func createAMT(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { +func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { root := amt.NewAMT(store) - for dealId, dealState := range deals { - err := root.Set(ctx, uint64(dealId), dealState) + for dealID, dealState := range deals { + err := root.Set(ctx, uint64(dealID), dealState) require.NoError(t, err) } rootCid, err := root.Flush(ctx) From 659c723ea31d72aad34ae8723cc39afec170e21d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 26 Jun 2020 15:42:44 -0400 Subject: [PATCH 21/25] fix: embed hcEvents into events as pointer so as not to copy lock --- chain/events/events.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index c1ef07a4e..e11507795 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -51,7 +51,7 @@ type Events struct { readyOnce sync.Once heightEvents - hcEvents + *hcEvents } func NewEvents(ctx context.Context, api eventAPI) *Events { @@ -74,7 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: *newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) From fbfe57cd633e47127fb7a05512547f25649bd9cd Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 26 Jun 2020 20:13:14 -0300 Subject: [PATCH 22/25] doc: block validations (#1919) Co-authored-by: Jakub Sztandera --- chain/types/message.go | 1 + documentation/en/block-validation.md | 137 +++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 documentation/en/block-validation.md diff --git a/chain/types/message.go b/chain/types/message.go index 68844d63c..b0d7f885f 100644 --- a/chain/types/message.go +++ b/chain/types/message.go @@ -21,6 +21,7 @@ type ChainMsg interface { Cid() cid.Cid VMMessage() *Message ToStorageBlock() (block.Block, error) + // FIXME: This is the *message* length, this name is misleading. ChainLength() int } diff --git a/documentation/en/block-validation.md b/documentation/en/block-validation.md new file mode 100644 index 000000000..a5ee49c30 --- /dev/null +++ b/documentation/en/block-validation.md @@ -0,0 +1,137 @@ +# Incoming block validations + +This document reviews the code flow that takes place inside the full node after receiving a new block from the GossipSub `/fil/blocks` topic and traces all of its protocol-related validation logic. We do not include validation logic *inside* the VM, the analysis stops at `(*VM).Invoke()`. The `V:` tag explicitly signals validations throughout the text. + +## `modules.HandleIncomingBlocks()` + +We subscribe to the `/fil/blocks` PubSub topic to receive external blocks from peers in the network and register a block validator that operates at the PubSub (`libp2p` stack) level, validating each PubSub message containing a Filecoin block header. + +`V:` PubSub message is a valid CBOR `BlockMsg`. + +`V:` Total messages in block are under `BlockMessageLimit`. + +`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in the block header (`ValidateMsgMeta()`). + +`V:` Miner `Address` in block header is present and corresponds to a public-key address in the current chain state. + +`V:` Block signature (`BlockSig`) is present and belongs to the public-key address retrieved for the miner (`CheckBlockSignature()`). + +## `sub.HandleIncomingBlocks()` + +Assemble a `FullBlock` from the received block header retrieving its Filecoin messages. + +`V:` Block messages CIDs can be retrieved from the network and decode into valid CBOR `Message`/`SignedMessage`. + +## `(*Syncer).InformNewHead()` + +Assemble a `FullTipSet` populated with the single block received earlier. + +`V:` `ValidateMsgMeta()` (already done in the topic validator). + +`V:` Block's `ParentWeight` is greater than the one from the (first block of the) heaviest tipset. + +## `(*Syncer).Sync()` + +`(*Syncer).collectHeaders()`: we retrieve all tipsets from the received block down to our chain. Validation now is expanded to *every* block inside these tipsets. + +`V`: Beacon entires are ordered by their round number. + +`V:` Tipset `Parents` CIDs match the fetched parent tipset through block sync. (This check is not enforced correctly at the moment, see [issue](https://github.com/filecoin-project/lotus/issues/1918).) + +## `(*Syncer).ValidateBlock()` + +This function contains most of the validation logic grouped in separate closures that run asynchronously, this list does not reflect validation order then. + +`V:` Block `Timestamp`: + * Is not bigger than current time plus `AllowableClockDrift`. + * Is not smaller than previous block's `Timestamp` plus `BlockDelay` (including null blocks). + +### Messages + +We check all the messages contained in one block at a time (`(*Syncer).checkBlockMessages()`). + +`V:` The block's `BLSAggregate` matches the aggregate of BLS messages digests and public keys (extracted from the messages `From`). + +`V:` Each `secp256k1` message `Signature` is signed with the public key extracted from the message `From`. + +`V:` Aggregate message CIDs, encapsulated in the `MsgMeta` structure, serialize to the `Messages` CID in block header (similar to `ValidateMsgMeta()` call). + +`V:` For each message, in `ValidForBlockInclusion()`: +* Message fields `Version`, `To`, `From`, `Value`, `GasPrice`, and `GasLimit` are correctly defined. +* Message `GasLimit` is under the message minimum gas cost (derived from chain height and message length). + +`V:` Actor associated with message `From` exists and is an account actor, its `Nonce` matches the message `Nonce`. + +### Miner + +`V:` Miner address is registered in the `Claims` HAMT of the Power actor. + +### Compute parent tipset state + +`V:` Block's `ParentStateRoot` CID matches the state CID computed from the parent tipset. + +`V:` Block's `ParentMessageReceipts` CID matches receipts CID computed from the parent tipset. + +### Winner + +Draw randomness for current epoch with minimum ticket from previous tipset, using `ElectionProofProduction` +domain separation tag. +`V`: `ElectionProof.VRFProof` is computed correctly by checking BLS signature using miner's key. +`V`: Miner is not slashed in `StoragePowerActor`. +`V`: Check if ticket is a winning ticket: +``` +h := blake2b(VRFProof) +lhs := AsInt(h) * totalNetworkPower +rhs := minerPower * 2^256 +if lhs < rhs { return "Winner" } else { return "Not a winner" } +``` + +### Block signature + +`V:` `CheckBlockSignature()` (same signature validation as the one applied to the incoming block). + +### Beacon values check + +`V`: Validate that all `BeaconEntries` are valid. Check that every one of them is a signature of a message: `previousSignature || round` signed using drand's public key. +`V`: All entries between `MaxBeaconRoundForEpoch` down to `prevEntry` (from previous tipset) are included. + +### Verify VRF Ticket chain + +Draw randomness for current epoch with minimum ticket from previous tipset, using `TicketProduction` +domain separation tag. +`V`: `VerifyVRF` using drawn randomness and miner public key. + +### Winning PoSt proof + +Draw randomness for current epoch with `WinningPoSt` domain separation tag. +Get list of sectors challanged in this epoch for this miner, based on the randomness drawn. + +`V`: Use filecoin proofs system to verify that miner prooved access to sealed versions of these sectors. + +## `(*StateManager).TipSetState()` + +Called throughout the validation process for the parent of each tipset being validated. The checks here then do not apply to the received new head itself that started the validation process. + +### `(*StateManager).computeTipSetState()` + +`V:` Every block in the tipset should belong to different a miner. + +### `(*StateManager).ApplyBlocks()` + +We create a new VM with the tipset's `ParentStateRoot` (this is then the parent state of the parent of the tipset currently being validated) on which to apply all messages from all blocks in the tipset. For each message independently we apply the validations listed next. + +### `(*VM).ApplyMessage()` + +`V:` Basic gas and value checks in `checkMessage()`: +* Message `GasLimit` is bigger than zero. +* Message `GasPrice` and `Value` are set. + +`V:` Message storage gas cost is under the message's `GasLimit`. + +`V:` Message's `Nonce` matches nonce in actor retrieved from message's `From`. + +`V:` Message's maximum gas cost (derived from its `GasLimit`, `GasPrice`, and `Value`) is under the balance of the actor retrieved from message's `From`. + +### `(*VM).send()` + +`V:` Message's transfer `Value` is under the balance in actor retrieved from message's `From`. From a4f6269724cee12da578a01f4fd0a659cd3e65f0 Mon Sep 17 00:00:00 2001 From: jackoelv Date: Sat, 27 Jun 2020 23:16:24 +0800 Subject: [PATCH 23/25] Update mining.md lotus-storage-miner state power should be changed to lotus state power --- documentation/en/mining.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/documentation/en/mining.md b/documentation/en/mining.md index 80c81aedd..3b1f5a8a3 100644 --- a/documentation/en/mining.md +++ b/documentation/en/mining.md @@ -82,12 +82,12 @@ lotus-storage-miner sectors pledge Get **miner power** and **sector usage**: ```sh -lotus-storage-miner state power +lotus state power # returns total power -lotus-storage-miner state power +lotus state power -lotus-storage-miner state sectors +lotus state sectors ``` ## Performance tuning From 8d51a201433b2d60d3d680e2b1cfc2ee777bf090 Mon Sep 17 00:00:00 2001 From: waynewyang Date: Sun, 28 Jun 2020 18:20:56 +0800 Subject: [PATCH 24/25] fix: mining logic --- miner/miner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/miner.go b/miner/miner.go index 1f5f8dad5..d42778e3b 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -140,7 +140,7 @@ func (m *Miner) mine(ctx context.Context) { onDone, err := m.waitFunc(ctx, prebase.TipSet.MinTimestamp()) if err != nil { log.Error(err) - return + continue } base, err := m.GetBestMiningCandidate(ctx) From 90a470ed78636e9dbf8362aa0e9da36c2f75634b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Jun 2020 16:05:27 +0200 Subject: [PATCH 25/25] Update go-bitfield with fixed empty marshaling --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 7a438c5b0..c05ace73c 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2 - github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01 + github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.3.0 diff --git a/go.sum b/go.sum index 33dc25268..abcd893f9 100644 --- a/go.sum +++ b/go.sum @@ -225,8 +225,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mo github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= -github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01 h1:k/FyoahW7Pvqi8p8YF7Np8YcK1XWGJ8TlR1mEICly3E= -github.com/filecoin-project/go-bitfield v0.0.2-0.20200624234227-4563d4a0bc01/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 h1:B2gUde2DlfCb5YMYNVems2orobxC3KhrX3migym1IOQ= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=