6f7498b622
* [WIP] feat: Add nv22 skeleton Addition of Network Version 22 skeleton * update FFI * feat: drand: refactor round verification * feat: sealing: Support nv22 DDO features in the sealing pipeline (#11226) * Initial work supporting DDO pieces in lotus-miner * sealing: Update pipeline input to operate on UniversalPiece * sealing: Update pipeline checks/sealing states to operate on UniversalPiece * sealing: Make pipeline build with UniversalPiece * move PieceDealInfo out of api * make gen * make sealing pipeline unit tests pass * fix itest ensemble build * don't panic in SectorsStatus with deals * stop linter from complaining about checkPieces * fix sector import tests * mod tidy * sealing: Add logic for (pre)committing DDO sectors * sealing: state-types with method defs * DDO non-snap pipeline works(?), DDO Itests * DDO support in snapdeals pipeline * make gen * update actor bundles * update the gst market fix * fix: chain: use PreCommitSectorsBatch2 when setting up genesis * some bug fixes * integration working changes * update actor bundles * Make TestOnboardRawPieceSnap pass * Appease the linter * Make deadlines test pass with v12 actors * Update go-state-types, abstract market DealState * make gen * mod tidy, lint fixes * Fix some more tests * Bump version in master Bump version in master * Make gen Make gen * fix sender * fix: lotus-provider: Fix winning PoSt * fix: sql Scan cannot write to an object * Actually show miner-addrs in info-log Actually show miner-addrs in lotus-provider info-log * [WIP] feat: Add nv22 skeleton Addition of Network Version 22 skeleton * update FFI * ddo is now nv22 * make gen * temp actor bundle with ddo * use working go-state-types * gst with v13 market migration * update bundle, builtin.MethodsMiner.ProveCommitSectors2 -> 3 * actually working v13 migration, v13 migration itest * Address review * sealing: Correct DDO snap pledge math * itests: Mixed ddo itest * pipeline: Fix sectorWeight * sealing: convert market deals into PAMs in mixed sectors * sealing: make market to ddo conversion work * fix lint * update gst * Update actors and GST to lastest integ branch * commit batcher: Update ProveCommitSectors3Params builder logic * make gen * use builtin-actors master * ddo: address review * itests: Add commd assertions to ddo tests * make gen * gst with fixed types * config knobs for RequireActivationSuccess * storage: Drop obsolete flaky tasts --------- Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: Phi <orjan.roren@gmail.com> Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com> Co-authored-by: TippyFlits <james.bluett@protocol.ai> * feat: implement FIP-0063 * chore: deps: update to go-multiaddr v0.12.2 (#11602) * feat: fvm: update the FVM/FFI to v4.1 (#11608) (#11612) This: 1. Adds nv22 support. 2. Updates the message tracing format. Co-authored-by: Steven Allen <steven@stebalien.com> * AggregateProofType nil when doing batch updates Use latest nv22 go-state-types version with matching update * Update to v13.0.0-rc.2 bundle * chore: Upgrade heights and codename Update upgrade heights Co-Authored-By: Steven Allen <steven@stebalien.com> * Update epoch after nv22 DRAND switch Update epoch after nv22 DRAND switch * Update Mango codename to Phoneix Make the codename for the Drand-change inline with Dragon style. * Add UpgradePhoenixHeight to API params * set UpgradePhoenixHeight to be one hour after Dragon * Make gen Make gen and UpgradePhoenixHeight in butterfly and local devnet to be in line with Calibration and Mainnet * Update epoch heights (#11637) Update epoch heights * new: add forest bootstrap nodes (#11636) Signed-off-by: samuelarogbonlo <sbayo971@gmail.com> * Merge pull request #11491 from filecoin-project/fix/remove-decommissioned-pl-bootstrap-nodes Remove PL operated bootstrap nodes from mainnet.pi * feat: api: new verified registry methods to get all allocations and claims (#11631) * new verireg methods * update changelog and add itest * update itest and cli * update new method's support till v9 * remove gateway APIs * fix cli internal var names * chore:: backport #11609 to the feat/nv22 branch (#11644) * feat: api: improve the correctness of Eth's trace_block (#11609) * Improve the correctness of Eth's trace_block - Improve encoding/decoding of parameters and return values: - Encode "native" parameters and return values with Solidity ABI. - Correctly decode parameters to "create" calls. - Use the correct (ish) output for "create" calls. - Handle all forms of "create". - Make robust with respect to reverts: - Use the actor ID/address from the trace instead of looking it up in the state-tree (may not exist in the state-tree due to a revert). - Gracefully handle failed actor/contract creation. - Improve performance: - We avoid looking anything up in the state-tree when translating the trace, which should significantly improve performance. - Improve code readability: - Remove all "backtracking" logic. - Use an "environment" struct to store temporary state instead of attaching it to the trace. - Fix random bugs: - Fix an allocation bug in the "address" logic (need to set the capacity before modifying the slice). - Improved error checking/handling. - Use correct types for `trace_block` action/results (create, call, etc.). - And use the correct types for Result/Action structs instead of reusing the same "Call" action every time. - Improve error messages. * Make gen Make gen --------- Co-authored-by: Steven Allen <steven@stebalien.com> * fix: add UpgradePhoenixHeight to StateGetNetworkParams (#11648) * chore: deps: update to go-state-types v13.0.0-rc.1 * do NOT update the cache when running the real migration * Merge pull request #11632 from hanabi1224/hm/drand-test feat: drand quicknet: allow scheduling drand quicknet upgrade before nv22 on 2k devnet * chore: deps: update to go-state-types v13.0.0-rc.2 chore: deps: update to go-state-types v13.0.0-rc.2 * feat: set migration config UpgradeEpoch for v13 actors upgrade * Built-in actor events first draft * itest for DDO non-market verified data w/ builtin actor events * Tests for builtin actor events API * Clean up DDO+Events tests, add lots of explainer comments * Minor tweaks to events types * Avoid duplicate messages when looking for receipts * Rename internal events modules for clarity * Adjust actor event API after review * s/ActorEvents/Events/g in global config * Manage event sending rate for SubscribeActorEvents * Terminate SubscribeActorEvents chan when at max height * Document future API changes * More clarity in actor event API docs * More post-review changes, lots of tests for SubscribeActorEvents Use BlockDelay as the window for receiving events on the SubscribeActorEvents channel. We expect the user to have received the initial batch of historical events (if any) in one block's time. For real-time events we expect them to not fall behind by roughly one block's time. * Remove duplicate code from actor event type marshalling tests Reduce verbosity and remove duplicate test logic from actor event types JSON marshalling tests. * Rename actor events test to follow go convention Add missing `s` to `actor_events` test file to follow golang convention used across the repo. * Run actor events table tests in deterministic order Refactor `map` usage for actor event table tests to ensure deterministic test execution order, making debugging potential issues easier. If non-determinism is a target, leverage Go's built-in parallel testing capabilities. * Reduce scope for filter removal failure when getting actor events Use a fresh context to remove the temporary filter installed solely to get the actor events. This should reduce chances of failure in a case where the original context may be expired/cancelled. Refactor removal into a `defer` statement for a more readable, concise return statement. * Use fixed RNG seed for actor event tests Improve determinism in actor event tests by using a fixed RNG seed. This makes up a more reproducible test suit. * Use provided libraries to assert eventual conditions Use the functionalities already provided by `testify` to assert eventual conditions, and remove the use of `time.Sleep`. Remove duplicate code in utility functions that are already defined. Refactor assertion helper functions to use consistent terminology: "require" implies fatal error, whereas "assert" implies error where the test may proceed executing. * Update changelog for actor events APIs * Fix concerns and docs identified by review * Update actor bundle to v13.0.0-rc3 Update actor bundle to v13.0.0-rc3 * Prep Lotus v1.26.0-rc1 - For sanity reverting the mainnet upgrade epoch to 99999999, and then only set it when cutting the final release -Update Calibnet CIDs to v13.0.0-rc3 - Add GetActorEvents, SubscribeActorEvents, GetAllClaims and GetAllAllocations methods to the changelog Co-Authored-By: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> * Update CHANGELOG.md Co-authored-by: Masih H. Derkani <m@derkani.org> * Make gen Make gen * fix: beacon: validate drand change at nv16 correctly * bump to v1.26.0-rc2 * test: cleanup ddo verified itest, extract steps to functions also add allocation-removed event case * test: extract verified DDO test to separate file, add more checks * test: add additional actor events checks * Add verification for "deal-activated" actor event * docs(drand): document the meaning of "IsChained" (#11692) * Resolve conflicts I encountered multiple issues when trying to run make gen. And these changes fixed a couple of them: - go mod tidy - Remove RaftState/RaftLeader - Revert `if ts.Height() > claim.TermMax+claim.TermStart || !cctx.IsSet("expired")` to the what is in the release/v1.26.0: `if tsHeight > val.TermMax || !expired` * fixup imports, make jen * Update version Update version in master to v1.27.0-dev * Update node/impl/full/dummy.go Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> * Adjust ListClaimsCmd Adjust ListClaimsCmd according to review --------- Signed-off-by: samuelarogbonlo <sbayo971@gmail.com> Co-authored-by: TippyFlits <james.bluett@protocol.ai> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com> Co-authored-by: Steven Allen <steven@stebalien.com> Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Samuel Arogbonlo <47984109+samuelarogbonlo@users.noreply.github.com> Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Co-authored-by: tom123222 <160735201+tom123222@users.noreply.github.com> Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Masih H. Derkani <m@derkani.org> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com>
571 lines
17 KiB
Go
571 lines
17 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
const NoTimeout = math.MaxInt64
|
|
const NoHeight = abi.ChainEpoch(-1)
|
|
|
|
type triggerID = uint64
|
|
|
|
// msgH is the block height at which a message was present / event has happened
|
|
type msgH = abi.ChainEpoch
|
|
|
|
// triggerH is the block height at which the listener will be notified about the
|
|
//
|
|
// message (msgH+confidence)
|
|
type triggerH = abi.ChainEpoch
|
|
|
|
type eventData interface{}
|
|
|
|
// EventHandler arguments:
|
|
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
|
|
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type EventHandler func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
|
// wait for has already happened in tipset `ts`
|
|
//
|
|
// If `done` is true, timeout won't be triggered
|
|
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
|
|
//
|
|
// may still be called)
|
|
type CheckFunc func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error)
|
|
|
|
// Keep track of information for an event handler
|
|
type handlerInfo struct {
|
|
confidence int
|
|
timeout abi.ChainEpoch
|
|
|
|
disabled bool // TODO: GC after gcConfidence reached
|
|
|
|
handle EventHandler
|
|
revert RevertHandler
|
|
}
|
|
|
|
// When a change occurs, a queuedEvent is created and put into a queue
|
|
// until the required confidence is reached
|
|
type queuedEvent struct {
|
|
trigger triggerID
|
|
data eventData
|
|
|
|
prevTipset, tipset *types.TipSet
|
|
|
|
called bool
|
|
}
|
|
|
|
// Manages chain head change events, which may be forward (new tipset added to
|
|
// chain) or backward (chain branch discarded in favour of heavier branch)
|
|
type hcEvents struct {
|
|
cs EventHelperAPI
|
|
|
|
lk sync.Mutex
|
|
lastTs *types.TipSet
|
|
|
|
ctr triggerID
|
|
|
|
// TODO: get rid of trigger IDs and just use pointers as keys.
|
|
triggers map[triggerID]*handlerInfo
|
|
|
|
// TODO: instead of scheduling events in the future, look at the chain in the past. We can sip the "confidence" queue entirely.
|
|
// maps block heights to events
|
|
// [triggerH][msgH][event]
|
|
confQueue map[triggerH]map[msgH][]*queuedEvent
|
|
|
|
// [msgH][triggerH]
|
|
revertQueue map[msgH][]triggerH
|
|
|
|
// [timeoutH+confidence][triggerID]{calls}
|
|
timeouts map[abi.ChainEpoch]map[triggerID]int
|
|
|
|
messageEvents
|
|
watcherEvents
|
|
}
|
|
|
|
func newHCEvents(api EventHelperAPI, obs *observer) *hcEvents {
|
|
e := &hcEvents{
|
|
cs: api,
|
|
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
|
revertQueue: map[msgH][]triggerH{},
|
|
triggers: map[triggerID]*handlerInfo{},
|
|
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
|
|
}
|
|
|
|
e.messageEvents = newMessageEvents(e, api)
|
|
e.watcherEvents = newWatcherEvents(e, api)
|
|
|
|
// We need to take the lock as the observer could immediately try calling us.
|
|
e.lk.Lock()
|
|
e.lastTs = obs.Observe((*hcEventsObserver)(e))
|
|
e.lk.Unlock()
|
|
|
|
return e
|
|
}
|
|
|
|
type hcEventsObserver hcEvents
|
|
|
|
func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
|
|
e.lk.Lock()
|
|
defer e.lk.Unlock()
|
|
|
|
defer func() { e.lastTs = to }()
|
|
|
|
// Check if the head change caused any state changes that we were
|
|
// waiting for
|
|
stateChanges := e.checkStateChanges(from, to)
|
|
|
|
// Queue up calls until there have been enough blocks to reach
|
|
// confidence on the state changes
|
|
for tid, data := range stateChanges {
|
|
e.queueForConfidence(tid, data, from, to)
|
|
}
|
|
|
|
// Check if the head change included any new message calls
|
|
newCalls := e.checkNewCalls(ctx, from, to)
|
|
|
|
// Queue up calls until there have been enough blocks to reach
|
|
// confidence on the message calls
|
|
for tid, calls := range newCalls {
|
|
for _, data := range calls {
|
|
e.queueForConfidence(tid, data, nil, to)
|
|
}
|
|
}
|
|
|
|
for at := from.Height() + 1; at <= to.Height(); at++ {
|
|
// Apply any queued events and timeouts that were targeted at the
|
|
// current chain height
|
|
e.applyWithConfidence(ctx, at)
|
|
e.applyTimeouts(ctx, at, to)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *hcEventsObserver) Revert(ctx context.Context, from, to *types.TipSet) error {
|
|
e.lk.Lock()
|
|
defer e.lk.Unlock()
|
|
|
|
defer func() { e.lastTs = to }()
|
|
|
|
reverts, ok := e.revertQueue[from.Height()]
|
|
if !ok {
|
|
return nil // nothing to do
|
|
}
|
|
|
|
for _, triggerH := range reverts {
|
|
toRevert := e.confQueue[triggerH][from.Height()]
|
|
for _, event := range toRevert {
|
|
if !event.called {
|
|
continue // event wasn't apply()-ied yet
|
|
}
|
|
|
|
trigger := e.triggers[event.trigger]
|
|
|
|
if err := trigger.revert(ctx, from); err != nil {
|
|
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", from.Height(), triggerH, err)
|
|
}
|
|
}
|
|
delete(e.confQueue[triggerH], from.Height())
|
|
}
|
|
delete(e.revertQueue, from.Height())
|
|
return nil
|
|
}
|
|
|
|
// Queue up events until the chain has reached a height that reflects the
|
|
// desired confidence
|
|
func (e *hcEventsObserver) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
|
|
trigger := e.triggers[trigID]
|
|
|
|
appliedH := ts.Height()
|
|
|
|
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
|
|
|
|
byOrigH, ok := e.confQueue[triggerH]
|
|
if !ok {
|
|
byOrigH = map[abi.ChainEpoch][]*queuedEvent{}
|
|
e.confQueue[triggerH] = byOrigH
|
|
}
|
|
|
|
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
|
|
trigger: trigID,
|
|
data: data,
|
|
tipset: ts,
|
|
prevTipset: prevTs,
|
|
})
|
|
|
|
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
|
|
}
|
|
|
|
// Apply any events that were waiting for this chain height for confidence
|
|
func (e *hcEventsObserver) applyWithConfidence(ctx context.Context, height abi.ChainEpoch) {
|
|
byOrigH, ok := e.confQueue[height]
|
|
if !ok {
|
|
return // no triggers at this height
|
|
}
|
|
|
|
for origH, events := range byOrigH {
|
|
for _, event := range events {
|
|
if event.called {
|
|
continue
|
|
}
|
|
|
|
trigger := e.triggers[event.trigger]
|
|
if trigger.disabled {
|
|
continue
|
|
}
|
|
|
|
more, err := trigger.handle(ctx, event.data, event.prevTipset, event.tipset, height)
|
|
if err != nil {
|
|
log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
|
|
continue // don't revert failed calls
|
|
}
|
|
|
|
event.called = true
|
|
|
|
touts, ok := e.timeouts[trigger.timeout]
|
|
if ok {
|
|
touts[event.trigger]++
|
|
}
|
|
|
|
trigger.disabled = !more
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply any timeouts that expire at this height
|
|
func (e *hcEventsObserver) applyTimeouts(ctx context.Context, at abi.ChainEpoch, ts *types.TipSet) {
|
|
triggers, ok := e.timeouts[at]
|
|
if !ok {
|
|
return // nothing to do
|
|
}
|
|
|
|
for triggerID, calls := range triggers {
|
|
if calls > 0 {
|
|
continue // don't timeout if the method was called
|
|
}
|
|
trigger := e.triggers[triggerID]
|
|
if trigger.disabled {
|
|
continue
|
|
}
|
|
|
|
// This should be cached.
|
|
timeoutTs, err := e.cs.ChainGetTipSetAfterHeight(ctx, at-abi.ChainEpoch(trigger.confidence), ts.Key())
|
|
if err != nil {
|
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
|
|
}
|
|
|
|
more, err := trigger.handle(ctx, nil, nil, timeoutTs, at)
|
|
if err != nil {
|
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
|
|
continue // don't revert failed calls
|
|
}
|
|
|
|
trigger.disabled = !more // allows messages after timeout
|
|
}
|
|
}
|
|
|
|
// Listen for an event
|
|
// - CheckFunc: immediately checks if the event already occurred
|
|
// - EventHandler: called when the event has occurred, after confidence tipsets
|
|
// - RevertHandler: called if the chain head changes causing the event to revert
|
|
// - confidence: wait this many tipsets before calling EventHandler
|
|
// - timeout: at this chain height, timeout on waiting for this event
|
|
func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
|
|
e.lk.Lock()
|
|
defer e.lk.Unlock()
|
|
|
|
// Check if the event has already occurred
|
|
done, more, err := check(ctx, e.lastTs)
|
|
if err != nil {
|
|
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
|
|
}
|
|
if done {
|
|
timeout = NoTimeout
|
|
}
|
|
|
|
id := e.ctr
|
|
e.ctr++
|
|
|
|
e.triggers[id] = &handlerInfo{
|
|
confidence: confidence,
|
|
timeout: timeout + abi.ChainEpoch(confidence),
|
|
|
|
disabled: !more,
|
|
|
|
handle: hnd,
|
|
revert: rev,
|
|
}
|
|
|
|
// If there's a timeout, set up a timeout check at that height
|
|
if timeout != NoTimeout {
|
|
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
|
|
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
|
|
}
|
|
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
|
|
// to listen for changes
|
|
type headChangeAPI interface {
|
|
onHeadChanged(ctx context.Context, check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
|
|
}
|
|
|
|
// watcherEvents watches for a state change
|
|
type watcherEvents struct {
|
|
cs EventHelperAPI
|
|
hcAPI headChangeAPI
|
|
|
|
lk sync.RWMutex
|
|
matchers map[triggerID]StateMatchFunc
|
|
}
|
|
|
|
func newWatcherEvents(hcAPI headChangeAPI, cs EventHelperAPI) watcherEvents {
|
|
return watcherEvents{
|
|
cs: cs,
|
|
hcAPI: hcAPI,
|
|
matchers: make(map[triggerID]StateMatchFunc),
|
|
}
|
|
}
|
|
|
|
// Run each of the matchers against the previous and current state to see if
|
|
// there's a change
|
|
func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData {
|
|
we.lk.RLock()
|
|
defer we.lk.RUnlock()
|
|
|
|
res := make(map[triggerID]eventData)
|
|
for tid, matchFn := range we.matchers {
|
|
ok, data, err := matchFn(oldState, newState)
|
|
if err != nil {
|
|
log.Errorf("event diff fn failed: %s", err)
|
|
continue
|
|
}
|
|
|
|
if ok {
|
|
res[tid] = data
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// StateChange represents a change in state
|
|
type StateChange interface{}
|
|
|
|
// StateChangeHandler arguments:
|
|
// `oldTs` is the state "from" tipset
|
|
// `newTs` is the state "to" tipset
|
|
// `states` is the change in state
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
|
|
|
|
// StateChanged registers a callback which is triggered when a specified state
|
|
// change occurs or a timeout is reached.
|
|
//
|
|
// - `CheckFunc` callback is invoked immediately with a recent tipset, it
|
|
// returns two booleans - `done`, and `more`.
|
|
//
|
|
// - `done` should be true when some on-chain state change we are waiting
|
|
// for has happened. When `done` is set to true, timeout trigger is disabled.
|
|
//
|
|
// - `more` should be false when we don't want to receive new notifications
|
|
// through StateChangeHandler. Note that notifications may still be delivered to
|
|
// RevertHandler
|
|
//
|
|
// - `StateChangeHandler` is called when the specified state change was observed
|
|
// on-chain, and a confidence threshold was reached, or the specified `timeout`
|
|
// height was reached with no state change observed. When this callback is
|
|
// invoked on a timeout, `oldTs` and `states are set to nil.
|
|
// This callback returns a boolean specifying whether further notifications
|
|
// should be sent, like `more` return param from `CheckFunc` above.
|
|
//
|
|
// - `RevertHandler` is called after apply handler, when we drop the tipset
|
|
// containing the message. The tipset passed as the argument is the tipset
|
|
// that is being dropped. Note that the event dropped may be re-applied
|
|
// in a different tipset in small amount of time.
|
|
//
|
|
// - `StateMatchFunc` is called against each tipset state. If there is a match,
|
|
// the state change is queued up until the confidence interval has elapsed (and
|
|
// `StateChangeHandler` is called)
|
|
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
|
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
|
states, ok := data.(StateChange)
|
|
if data != nil && !ok {
|
|
panic("expected StateChange")
|
|
}
|
|
|
|
return scHnd(prevTs, ts, states, height)
|
|
}
|
|
|
|
id, err := we.hcAPI.onHeadChanged(context.TODO(), check, hnd, rev, confidence, timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
we.lk.Lock()
|
|
defer we.lk.Unlock()
|
|
we.matchers[id] = mf
|
|
|
|
return nil
|
|
}
|
|
|
|
// messageEvents watches for message calls to actors
|
|
type messageEvents struct {
|
|
cs EventHelperAPI
|
|
hcAPI headChangeAPI
|
|
|
|
lk sync.RWMutex
|
|
matchers map[triggerID]MsgMatchFunc
|
|
}
|
|
|
|
func newMessageEvents(hcAPI headChangeAPI, cs EventHelperAPI) messageEvents {
|
|
return messageEvents{
|
|
cs: cs,
|
|
hcAPI: hcAPI,
|
|
matchers: make(map[triggerID]MsgMatchFunc),
|
|
}
|
|
}
|
|
|
|
// Check if there are any new actor calls
|
|
func (me *messageEvents) checkNewCalls(ctx context.Context, from, to *types.TipSet) map[triggerID][]eventData {
|
|
me.lk.RLock()
|
|
defer me.lk.RUnlock()
|
|
|
|
// For each message in the tipset
|
|
res := make(map[triggerID][]eventData)
|
|
me.messagesForTs(from, func(msg *types.Message) {
|
|
// TODO: provide receipts
|
|
|
|
// Run each trigger's matcher against the message
|
|
for tid, matchFn := range me.matchers {
|
|
matched, err := matchFn(msg)
|
|
if err != nil {
|
|
log.Errorf("event matcher failed: %s", err)
|
|
continue
|
|
}
|
|
|
|
// If there was a match, include the message in the results for the
|
|
// trigger
|
|
if matched {
|
|
res[tid] = append(res[tid], msg)
|
|
}
|
|
}
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
// Get the messages in a tipset
|
|
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
|
|
seen := map[cid.Cid]struct{}{}
|
|
|
|
for i, tsb := range ts.Cids() {
|
|
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb)
|
|
if err != nil {
|
|
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s",
|
|
ts.Height(), tsb, ts.Blocks()[i].Messages, err)
|
|
continue
|
|
}
|
|
for i, c := range msgs.Cids {
|
|
// We iterate over the CIDs to avoid having to recompute them.
|
|
_, ok := seen[c]
|
|
if ok {
|
|
continue
|
|
}
|
|
seen[c] = struct{}{}
|
|
if i < len(msgs.BlsMessages) {
|
|
consume(msgs.BlsMessages[i])
|
|
} else {
|
|
consume(&msgs.SecpkMessages[i-len(msgs.BlsMessages)].Message)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MsgHandler arguments:
|
|
// `ts` is the tipset, in which the `msg` is included.
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
|
|
|
|
// Called registers a callback which is triggered when a specified method is
|
|
//
|
|
// called on an actor, or a timeout is reached.
|
|
//
|
|
// - `CheckFunc` callback is invoked immediately with a recent tipset, it
|
|
// returns two booleans - `done`, and `more`.
|
|
//
|
|
// - `done` should be true when some on-chain action we are waiting for has
|
|
// happened. When `done` is set to true, timeout trigger is disabled.
|
|
//
|
|
// - `more` should be false when we don't want to receive new notifications
|
|
// through MsgHandler. Note that notifications may still be delivered to
|
|
// RevertHandler
|
|
//
|
|
// - `MsgHandler` is called when the specified event was observed on-chain,
|
|
// and a confidence threshold was reached, or the specified `timeout` height
|
|
// was reached with no events observed. When this callback is invoked on a
|
|
// timeout, `msg` is set to nil. This callback returns a boolean specifying
|
|
// whether further notifications should be sent, like `more` return param
|
|
// from `CheckFunc` above.
|
|
//
|
|
// - `RevertHandler` is called after apply handler, when we drop the tipset
|
|
// containing the message. The tipset passed as the argument is the tipset
|
|
// that is being dropped. Note that the message dropped may be re-applied
|
|
// in a different tipset in small amount of time.
|
|
//
|
|
// - `MsgMatchFunc` is called against each message. If there is a match, the
|
|
// message is queued up until the confidence interval has elapsed (and
|
|
// `MsgHandler` is called)
|
|
func (me *messageEvents) Called(ctx context.Context, check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
|
|
hnd := func(ctx context.Context, data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
|
msg, ok := data.(*types.Message)
|
|
if data != nil && !ok {
|
|
panic("expected msg")
|
|
}
|
|
|
|
ml, err := me.cs.StateSearchMsg(ctx, ts.Key(), msg.Cid(), stmgr.LookbackNoLimit, true)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if ml == nil {
|
|
return msgHnd(msg, nil, ts, height)
|
|
}
|
|
|
|
return msgHnd(msg, &ml.Receipt, ts, height)
|
|
}
|
|
|
|
id, err := me.hcAPI.onHeadChanged(ctx, check, hnd, rev, confidence, timeout)
|
|
if err != nil {
|
|
return xerrors.Errorf("on head changed error: %w", err)
|
|
}
|
|
|
|
me.lk.Lock()
|
|
defer me.lk.Unlock()
|
|
me.matchers[id] = mf
|
|
|
|
return nil
|
|
}
|
|
|
|
// Convenience function for checking and matching messages
|
|
func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
|
|
return me.Called(ctx, me.CheckMsg(msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
|
|
}
|