refactor: modify predicates API
This commit is contained in:
parent
728afc0587
commit
abad4a3941
@ -1,359 +0,0 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
const NoTimeout = math.MaxInt64
|
||||
|
||||
type triggerID = uint64
|
||||
|
||||
// msgH is the block height at which a message was present / event has happened
|
||||
type msgH = abi.ChainEpoch
|
||||
|
||||
// triggerH is the block height at which the listener will be notified about the
|
||||
// message (msgH+confidence)
|
||||
type triggerH = abi.ChainEpoch
|
||||
|
||||
// CalledHandler arguments:
|
||||
// `ts` is the tipset, in which the `msg` is included.
|
||||
// `curH`-`ts.Height` = `confidence`
|
||||
type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
||||
|
||||
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
||||
// wait for has already happened in tipset `ts`
|
||||
//
|
||||
// If `done` is true, timeout won't be triggered
|
||||
// If `more` is false, no messages will be sent to CalledHandler (RevertHandler
|
||||
// may still be called)
|
||||
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
|
||||
|
||||
type MatchFunc func(msg *types.Message) (bool, error)
|
||||
|
||||
type callHandler struct {
|
||||
confidence int
|
||||
timeout abi.ChainEpoch
|
||||
|
||||
disabled bool // TODO: GC after gcConfidence reached
|
||||
|
||||
handle CalledHandler
|
||||
revert RevertHandler
|
||||
}
|
||||
|
||||
type queuedEvent struct {
|
||||
trigger triggerID
|
||||
|
||||
h abi.ChainEpoch
|
||||
msg *types.Message
|
||||
|
||||
called bool
|
||||
}
|
||||
|
||||
type calledEvents struct {
|
||||
cs eventAPI
|
||||
tsc *tipSetCache
|
||||
ctx context.Context
|
||||
gcConfidence uint64
|
||||
|
||||
at abi.ChainEpoch
|
||||
|
||||
lk sync.Mutex
|
||||
|
||||
ctr triggerID
|
||||
|
||||
triggers map[triggerID]*callHandler
|
||||
matchers map[triggerID][]MatchFunc
|
||||
|
||||
// maps block heights to events
|
||||
// [triggerH][msgH][event]
|
||||
confQueue map[triggerH]map[msgH][]*queuedEvent
|
||||
|
||||
// [msgH][triggerH]
|
||||
revertQueue map[msgH][]triggerH
|
||||
|
||||
// [timeoutH+confidence][triggerID]{calls}
|
||||
timeouts map[abi.ChainEpoch]map[triggerID]int
|
||||
}
|
||||
|
||||
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
||||
for _, ts := range rev {
|
||||
e.handleReverts(ts)
|
||||
e.at = ts.Height()
|
||||
}
|
||||
|
||||
for _, ts := range app {
|
||||
// called triggers
|
||||
e.checkNewCalls(ts)
|
||||
for ; e.at <= ts.Height(); e.at++ {
|
||||
e.applyWithConfidence(ts, e.at)
|
||||
e.applyTimeouts(ts)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *calledEvents) handleReverts(ts *types.TipSet) {
|
||||
reverts, ok := e.revertQueue[ts.Height()]
|
||||
if !ok {
|
||||
return // nothing to do
|
||||
}
|
||||
|
||||
for _, triggerH := range reverts {
|
||||
toRevert := e.confQueue[triggerH][ts.Height()]
|
||||
for _, event := range toRevert {
|
||||
if !event.called {
|
||||
continue // event wasn't apply()-ied yet
|
||||
}
|
||||
|
||||
trigger := e.triggers[event.trigger]
|
||||
|
||||
if err := trigger.revert(e.ctx, ts); err != nil {
|
||||
log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
|
||||
}
|
||||
}
|
||||
delete(e.confQueue[triggerH], ts.Height())
|
||||
}
|
||||
delete(e.revertQueue, ts.Height())
|
||||
}
|
||||
|
||||
func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
|
||||
pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
|
||||
if err != nil {
|
||||
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
e.messagesForTs(pts, func(msg *types.Message) {
|
||||
// TODO: provide receipts
|
||||
for tid, matchFns := range e.matchers {
|
||||
var matched bool
|
||||
for _, matchFn := range matchFns {
|
||||
ok, err := matchFn(msg)
|
||||
if err != nil {
|
||||
log.Errorf("event matcher failed: %s", err)
|
||||
continue
|
||||
}
|
||||
matched = ok
|
||||
|
||||
if matched {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if matched {
|
||||
e.queueForConfidence(tid, msg, ts)
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts *types.TipSet) {
|
||||
trigger := e.triggers[trigID]
|
||||
|
||||
appliedH := ts.Height()
|
||||
|
||||
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
|
||||
|
||||
byOrigH, ok := e.confQueue[triggerH]
|
||||
if !ok {
|
||||
byOrigH = map[abi.ChainEpoch][]*queuedEvent{}
|
||||
e.confQueue[triggerH] = byOrigH
|
||||
}
|
||||
|
||||
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
|
||||
trigger: trigID,
|
||||
h: appliedH,
|
||||
msg: msg,
|
||||
})
|
||||
|
||||
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
|
||||
}
|
||||
|
||||
func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
|
||||
byOrigH, ok := e.confQueue[height]
|
||||
if !ok {
|
||||
return // no triggers at thin height
|
||||
}
|
||||
|
||||
for origH, events := range byOrigH {
|
||||
triggerTs, err := e.tsc.get(origH)
|
||||
if err != nil {
|
||||
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height)
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
if event.called {
|
||||
continue
|
||||
}
|
||||
|
||||
trigger := e.triggers[event.trigger]
|
||||
if trigger.disabled {
|
||||
continue
|
||||
}
|
||||
|
||||
rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts.Key())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
more, err := trigger.handle(event.msg, rec, triggerTs, height)
|
||||
if err != nil {
|
||||
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err)
|
||||
continue // don't revert failed calls
|
||||
}
|
||||
|
||||
event.called = true
|
||||
|
||||
touts, ok := e.timeouts[trigger.timeout]
|
||||
if ok {
|
||||
touts[event.trigger]++
|
||||
}
|
||||
|
||||
trigger.disabled = !more
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *calledEvents) applyTimeouts(ts *types.TipSet) {
|
||||
triggers, ok := e.timeouts[ts.Height()]
|
||||
if !ok {
|
||||
return // nothing to do
|
||||
}
|
||||
|
||||
for triggerID, calls := range triggers {
|
||||
if calls > 0 {
|
||||
continue // don't timeout if the method was called
|
||||
}
|
||||
trigger := e.triggers[triggerID]
|
||||
if trigger.disabled {
|
||||
continue
|
||||
}
|
||||
|
||||
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
|
||||
if err != nil {
|
||||
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
||||
}
|
||||
|
||||
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
|
||||
if err != nil {
|
||||
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
||||
continue // don't revert failed calls
|
||||
}
|
||||
|
||||
trigger.disabled = !more // allows messages after timeout
|
||||
}
|
||||
}
|
||||
|
||||
func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
|
||||
seen := map[cid.Cid]struct{}{}
|
||||
|
||||
for _, tsb := range ts.Blocks() {
|
||||
|
||||
msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
|
||||
if err != nil {
|
||||
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
|
||||
// this is quite bad, but probably better than missing all the other updates
|
||||
continue
|
||||
}
|
||||
|
||||
for _, m := range msgs.BlsMessages {
|
||||
_, ok := seen[m.Cid()]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
seen[m.Cid()] = struct{}{}
|
||||
|
||||
consume(m)
|
||||
}
|
||||
|
||||
for _, m := range msgs.SecpkMessages {
|
||||
_, ok := seen[m.Message.Cid()]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
seen[m.Message.Cid()] = struct{}{}
|
||||
|
||||
consume(&m.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called registers a callbacks which are triggered when a specified method is
|
||||
// called on an actor, or a timeout is reached.
|
||||
//
|
||||
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||
// returns two booleans - `done`, and `more`.
|
||||
//
|
||||
// * `done` should be true when some on-chain action we are waiting for has
|
||||
// happened. When `done` is set to true, timeout trigger is disabled.
|
||||
//
|
||||
// * `more` should be false when we don't want to receive new notifications
|
||||
// through CalledHandler. Note that notifications may still be delivered to
|
||||
// RevertHandler
|
||||
//
|
||||
// * `CalledHandler` is called when the specified event was observed on-chain,
|
||||
// and a confidence threshold was reached, or the specified `timeout` height
|
||||
// was reached with no events observed. When this callback is invoked on a
|
||||
// timeout, `msg` is set to nil. This callback returns a boolean specifying
|
||||
// whether further notifications should be sent, like `more` return param
|
||||
// from `CheckFunc` above.
|
||||
//
|
||||
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
||||
// containing the message. The tipset passed as the argument is the tipset
|
||||
// that is being dropped. Note that the message dropped may be re-applied
|
||||
// in a different tipset in small amount of time.
|
||||
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MatchFunc) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
||||
ts := e.tsc.best()
|
||||
done, more, err := check(ts)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
|
||||
}
|
||||
if done {
|
||||
timeout = NoTimeout
|
||||
}
|
||||
|
||||
id := e.ctr
|
||||
e.ctr++
|
||||
|
||||
e.triggers[id] = &callHandler{
|
||||
confidence: confidence,
|
||||
timeout: timeout + abi.ChainEpoch(confidence),
|
||||
|
||||
disabled: !more,
|
||||
|
||||
handle: hnd,
|
||||
revert: rev,
|
||||
}
|
||||
|
||||
e.matchers[id] = append(e.matchers[id], mf)
|
||||
|
||||
if timeout != NoTimeout {
|
||||
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
|
||||
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
|
||||
}
|
||||
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
|
||||
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage()))
|
||||
}
|
@ -387,23 +387,17 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map
|
||||
return res
|
||||
}
|
||||
|
||||
// Used to store the state for a stateChange
|
||||
type stateData interface{}
|
||||
|
||||
// A change in state from -> to
|
||||
type stateChange struct {
|
||||
from stateData
|
||||
to stateData
|
||||
}
|
||||
// A change in state
|
||||
type StateChange interface{}
|
||||
|
||||
// StateChangeHandler arguments:
|
||||
// `oldTs` is the state "from" tipset
|
||||
// `newTs` is the state "to" tipset
|
||||
// `states` is the old / new state
|
||||
// `states` is the change in state
|
||||
// `curH`-`ts.Height` = `confidence`
|
||||
type StateChangeHandler func(oldTs, newTs *types.TipSet, states *stateChange, curH abi.ChainEpoch) (more bool, err error)
|
||||
type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error)
|
||||
|
||||
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, *stateChange, error)
|
||||
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
|
||||
|
||||
// StateChanged registers a callback which is triggered when a specified state
|
||||
// change occurs or a timeout is reached.
|
||||
@ -435,9 +429,9 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, *stateChange, error)
|
||||
// `StateChangeHandler` is called)
|
||||
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
||||
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
||||
states, ok := data.(*stateChange)
|
||||
states, ok := data.(StateChange)
|
||||
if data != nil && !ok {
|
||||
panic("expected *stateChange")
|
||||
panic("expected StateChange")
|
||||
}
|
||||
|
||||
return scHnd(prevTs, ts, states, height)
|
||||
|
@ -1066,6 +1066,11 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
|
||||
require.Equal(t, false, reverted)
|
||||
}
|
||||
|
||||
type testStateChange struct {
|
||||
from string
|
||||
to string
|
||||
}
|
||||
|
||||
func TestStateChanged(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
@ -1081,18 +1086,18 @@ func TestStateChanged(t *testing.T) {
|
||||
|
||||
more := true
|
||||
var applied, reverted bool
|
||||
var appliedData *stateChange
|
||||
var appliedData StateChange
|
||||
var appliedOldTs *types.TipSet
|
||||
var appliedNewTs *types.TipSet
|
||||
var appliedH abi.ChainEpoch
|
||||
var matchData *stateChange
|
||||
var matchData StateChange
|
||||
|
||||
confidence := 3
|
||||
timeout := abi.ChainEpoch(20)
|
||||
|
||||
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||
return false, true, nil
|
||||
}, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
require.Equal(t, false, applied)
|
||||
applied = true
|
||||
appliedData = data
|
||||
@ -1103,7 +1108,7 @@ func TestStateChanged(t *testing.T) {
|
||||
}, func(_ context.Context, ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) {
|
||||
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||
if matchData == nil {
|
||||
return false, matchData, nil
|
||||
}
|
||||
@ -1121,7 +1126,7 @@ func TestStateChanged(t *testing.T) {
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
// create state change (but below confidence threshold)
|
||||
matchData = &stateChange{from: "a", to: "b"}
|
||||
matchData = testStateChange{from: "a", to: "b"}
|
||||
fcs.advance(0, 3, nil)
|
||||
|
||||
require.Equal(t, false, applied)
|
||||
@ -1149,8 +1154,9 @@ func TestStateChanged(t *testing.T) {
|
||||
require.Equal(t, abi.ChainEpoch(9), appliedH)
|
||||
|
||||
// Make sure the state change was correctly passed through
|
||||
require.Equal(t, "a", appliedData.from)
|
||||
require.Equal(t, "b", appliedData.to)
|
||||
rcvd := appliedData.(testStateChange)
|
||||
require.Equal(t, "a", rcvd.from)
|
||||
require.Equal(t, "b", rcvd.to)
|
||||
}
|
||||
|
||||
func TestStateChangedRevert(t *testing.T) {
|
||||
@ -1168,21 +1174,21 @@ func TestStateChangedRevert(t *testing.T) {
|
||||
|
||||
more := true
|
||||
var applied, reverted bool
|
||||
var matchData *stateChange
|
||||
var matchData StateChange
|
||||
|
||||
confidence := 1
|
||||
timeout := abi.ChainEpoch(20)
|
||||
|
||||
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||
return false, true, nil
|
||||
}, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
require.Equal(t, false, applied)
|
||||
applied = true
|
||||
return more, nil
|
||||
}, func(_ context.Context, ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) {
|
||||
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||
if matchData == nil {
|
||||
return false, matchData, nil
|
||||
}
|
||||
@ -1196,7 +1202,7 @@ func TestStateChangedRevert(t *testing.T) {
|
||||
fcs.advance(0, 2, nil) // H=3
|
||||
|
||||
// Make a state change from TS at height 3 to TS at height 4
|
||||
matchData = &stateChange{from: "a", to: "b"}
|
||||
matchData = testStateChange{from: "a", to: "b"}
|
||||
fcs.advance(0, 1, nil) // H=4
|
||||
|
||||
// Haven't yet reached confidence
|
||||
@ -1248,7 +1254,7 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
|
||||
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||
return false, true, nil
|
||||
}, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
called = true
|
||||
require.Nil(t, data)
|
||||
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||
@ -1257,7 +1263,7 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
}, func(_ context.Context, ts *types.TipSet) error {
|
||||
t.Fatal("revert on timeout")
|
||||
return nil
|
||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) {
|
||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||
return false, nil, nil
|
||||
})
|
||||
|
||||
@ -1286,7 +1292,7 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
|
||||
err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||
return true, true, nil
|
||||
}, func(oldTs, newTs *types.TipSet, data *stateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||
called = true
|
||||
require.Nil(t, data)
|
||||
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||
@ -1295,7 +1301,7 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
}, func(_ context.Context, ts *types.TipSet) error {
|
||||
t.Fatal("revert on timeout")
|
||||
return nil
|
||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, *stateChange, error) {
|
||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||
return false, nil, nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -2,38 +2,49 @@ package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/filecoin-project/lotus/api/apibstore"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-amt-ipld/v2"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
)
|
||||
|
||||
type StatePredicates struct {
|
||||
sm *stmgr.StateManager
|
||||
type UserData interface{}
|
||||
|
||||
type ChainApi interface {
|
||||
ChainHasObj(context.Context, cid.Cid) (bool, error)
|
||||
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
|
||||
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
|
||||
}
|
||||
|
||||
func NewStatePredicates(sm *stmgr.StateManager) *StatePredicates {
|
||||
type StatePredicates struct {
|
||||
api ChainApi
|
||||
cst *cbor.BasicIpldStore
|
||||
}
|
||||
|
||||
func NewStatePredicates(api ChainApi) *StatePredicates {
|
||||
return &StatePredicates{
|
||||
sm: sm,
|
||||
api: api,
|
||||
cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)),
|
||||
}
|
||||
}
|
||||
|
||||
type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error)
|
||||
|
||||
type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
|
||||
|
||||
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc {
|
||||
return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) {
|
||||
oldActor, err := sp.sm.GetActor(addr, oldState)
|
||||
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key())
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
newActor, err := sp.sm.GetActor(addr, newState)
|
||||
newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key())
|
||||
if oldActor.Head.Equals(newActor.Head) {
|
||||
return false, nil, nil
|
||||
}
|
||||
@ -46,12 +57,11 @@ type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State
|
||||
func (sp *StatePredicates) OnStorageMarketActorChanged(addr address.Address, diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc {
|
||||
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
|
||||
var oldState market.State
|
||||
cst := cbor.NewCborStore(sp.sm.ChainStore().Blockstore())
|
||||
if err := cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
|
||||
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
var newState market.State
|
||||
if err := cst.Get(ctx, newActorStateHead, &newActorStateHead); err != nil {
|
||||
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
return diffStorageMarketState(ctx, &oldState, &newState)
|
||||
@ -65,37 +75,40 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc)
|
||||
if oldState.States.Equals(newState.States) {
|
||||
return false, nil, nil
|
||||
}
|
||||
blks := cbor.NewCborStore(sp.sm.ChainStore().Blockstore())
|
||||
oldRoot, err := amt.LoadAMT(ctx, blks, oldState.States)
|
||||
|
||||
oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
newRoot, err := amt.LoadAMT(ctx, blks, newState.States)
|
||||
newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
return diffDealStates(ctx, oldRoot, newRoot)
|
||||
}
|
||||
}
|
||||
|
||||
func (sp *StatePredicates) DealStateChangedForIDs(ctx context.Context, dealIds []abi.DealID, oldRoot, newRoot *amt.Root) (changed bool, user UserData, err error) {
|
||||
var changedDeals []abi.DealID
|
||||
for _, dealId := range dealIds {
|
||||
var oldDeal, newDeal market.DealState
|
||||
err := oldRoot.Get(ctx, uint64(dealId), &oldDeal)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc {
|
||||
return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) {
|
||||
var changedDeals []abi.DealID
|
||||
for _, dealId := range dealIds {
|
||||
var oldDeal, newDeal market.DealState
|
||||
err := oldDealStateRoot.Get(ctx, uint64(dealId), &oldDeal)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
err = newDealStateRoot.Get(ctx, uint64(dealId), &newDeal)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if oldDeal != newDeal {
|
||||
changedDeals = append(changedDeals, dealId)
|
||||
}
|
||||
}
|
||||
err = newRoot.Get(ctx, uint64(dealId), &newDeal)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if oldDeal != newDeal {
|
||||
changedDeals = append(changedDeals, dealId)
|
||||
if len(changedDeals) > 0 {
|
||||
return true, changed, nil
|
||||
}
|
||||
return false, nil, nil
|
||||
}
|
||||
if len(changedDeals) > 0 {
|
||||
return true, changed, nil
|
||||
}
|
||||
return false, nil, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user