events: MatchFuncs for Called
This commit is contained in:
parent
32b80f79e7
commit
f52d9de4dd
@ -76,7 +76,7 @@ func NewEvents(ctx context.Context, api eventApi) *Events {
|
|||||||
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
||||||
revertQueue: map[msgH][]triggerH{},
|
revertQueue: map[msgH][]triggerH{},
|
||||||
triggers: map[triggerId]*callHandler{},
|
triggers: map[triggerId]*callHandler{},
|
||||||
callTuples: map[callTuple][]triggerId{},
|
matchers: map[triggerId][]MatchFunc{},
|
||||||
timeouts: map[uint64]map[triggerId]int{},
|
timeouts: map[uint64]map[triggerId]int{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -34,6 +33,8 @@ type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (more
|
|||||||
// may still be called)
|
// may still be called)
|
||||||
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
|
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
|
||||||
|
|
||||||
|
type MatchFunc func(msg *types.Message) (bool, error)
|
||||||
|
|
||||||
type callHandler struct {
|
type callHandler struct {
|
||||||
confidence int
|
confidence int
|
||||||
timeout uint64
|
timeout uint64
|
||||||
@ -64,7 +65,7 @@ type calledEvents struct {
|
|||||||
ctr triggerId
|
ctr triggerId
|
||||||
|
|
||||||
triggers map[triggerId]*callHandler
|
triggers map[triggerId]*callHandler
|
||||||
callTuples map[callTuple][]triggerId
|
matchers map[triggerId][]MatchFunc
|
||||||
|
|
||||||
// maps block heights to events
|
// maps block heights to events
|
||||||
// [triggerH][msgH][event]
|
// [triggerH][msgH][event]
|
||||||
@ -77,11 +78,6 @@ type calledEvents struct {
|
|||||||
timeouts map[uint64]map[triggerId]int
|
timeouts map[uint64]map[triggerId]int
|
||||||
}
|
}
|
||||||
|
|
||||||
type callTuple struct {
|
|
||||||
actor address.Address
|
|
||||||
method uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
||||||
for _, ts := range rev {
|
for _, ts := range rev {
|
||||||
e.handleReverts(ts)
|
e.handleReverts(ts)
|
||||||
@ -126,21 +122,27 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
|
|||||||
|
|
||||||
func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
|
func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
|
||||||
e.messagesForTs(ts, func(msg *types.Message) {
|
e.messagesForTs(ts, func(msg *types.Message) {
|
||||||
// TODO: do we have to verify the receipt, or are messages on chain
|
// TODO: provide receipts
|
||||||
// guaranteed to be successful?
|
|
||||||
|
|
||||||
ct := callTuple{
|
for tid, matchFns := range e.matchers {
|
||||||
actor: msg.To,
|
var matched bool
|
||||||
method: msg.Method,
|
for _, matchFn := range matchFns {
|
||||||
|
ok, err := matchFn(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("event matcher failed: %s")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
matched = ok
|
||||||
|
|
||||||
|
if matched {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
triggers, ok := e.callTuples[ct]
|
if matched {
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tid := range triggers {
|
|
||||||
e.queueForConfidence(tid, msg, ts)
|
e.queueForConfidence(tid, msg, ts)
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -292,7 +294,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
|
|||||||
// containing the message. The tipset passed as the argument is the tipset
|
// containing the message. The tipset passed as the argument is the tipset
|
||||||
// that is being dropped. Note that the message dropped may be re-applied
|
// that is being dropped. Note that the message dropped may be re-applied
|
||||||
// in a different tipset in small amount of time.
|
// in a different tipset in small amount of time.
|
||||||
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error {
|
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, mf MatchFunc) error {
|
||||||
e.lk.Lock()
|
e.lk.Lock()
|
||||||
defer e.lk.Unlock()
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
@ -317,12 +319,8 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
|
|||||||
revert: rev,
|
revert: rev,
|
||||||
}
|
}
|
||||||
|
|
||||||
ct := callTuple{
|
e.matchers[id] = append(e.matchers[id], mf)
|
||||||
actor: actor,
|
|
||||||
method: method,
|
|
||||||
}
|
|
||||||
|
|
||||||
e.callTuples[ct] = append(e.callTuples[ct], id)
|
|
||||||
if timeout != NoTimeout {
|
if timeout != NoTimeout {
|
||||||
if e.timeouts[timeout+uint64(confidence)] == nil {
|
if e.timeouts[timeout+uint64(confidence)] == nil {
|
||||||
e.timeouts[timeout+uint64(confidence)] = map[uint64]int{}
|
e.timeouts[timeout+uint64(confidence)] = map[uint64]int{}
|
||||||
|
@ -57,6 +57,9 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
|
|||||||
ParentStateRoot: dummyCid,
|
ParentStateRoot: dummyCid,
|
||||||
Messages: msgcid,
|
Messages: msgcid,
|
||||||
ParentMessageReceipts: dummyCid,
|
ParentMessageReceipts: dummyCid,
|
||||||
|
|
||||||
|
BlockSig: types.Signature{Type: types.KTBLS},
|
||||||
|
BLSAggregate: types.Signature{Type: types.KTBLS},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Height: h,
|
Height: h,
|
||||||
@ -67,6 +70,9 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
|
|||||||
ParentStateRoot: dummyCid,
|
ParentStateRoot: dummyCid,
|
||||||
Messages: msgcid,
|
Messages: msgcid,
|
||||||
ParentMessageReceipts: dummyCid,
|
ParentMessageReceipts: dummyCid,
|
||||||
|
|
||||||
|
BlockSig: types.Signature{Type: types.KTBLS},
|
||||||
|
BLSAggregate: types.Signature{Type: types.KTBLS},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -482,6 +488,12 @@ func TestAtChainedConfidenceNull(t *testing.T) {
|
|||||||
require.Equal(t, false, reverted)
|
require.Equal(t, false, reverted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func matchAddrMethod(to address.Address, m uint64) func(msg *types.Message) (bool, error) {
|
||||||
|
return func(msg *types.Message) (bool, error) {
|
||||||
|
return to == msg.To && m == msg.Method, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCalled(t *testing.T) {
|
func TestCalled(t *testing.T) {
|
||||||
fcs := &fakeCS{
|
fcs := &fakeCS{
|
||||||
t: t,
|
t: t,
|
||||||
@ -516,7 +528,7 @@ func TestCalled(t *testing.T) {
|
|||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
reverted = true
|
reverted = true
|
||||||
return nil
|
return nil
|
||||||
}, 3, 20, t0123, 5)
|
}, 3, 20, matchAddrMethod(t0123, 5))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// create few blocks to make sure nothing get's randomly called
|
// create few blocks to make sure nothing get's randomly called
|
||||||
@ -710,7 +722,7 @@ func TestCalledTimeout(t *testing.T) {
|
|||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
t.Fatal("revert on timeout")
|
t.Fatal("revert on timeout")
|
||||||
return nil
|
return nil
|
||||||
}, 3, 20, t0123, 5)
|
}, 3, 20, matchAddrMethod(t0123, 5))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fcs.advance(0, 21, nil)
|
fcs.advance(0, 21, nil)
|
||||||
@ -745,7 +757,7 @@ func TestCalledTimeout(t *testing.T) {
|
|||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
t.Fatal("revert on timeout")
|
t.Fatal("revert on timeout")
|
||||||
return nil
|
return nil
|
||||||
}, 3, 20, t0123, 5)
|
}, 3, 20, matchAddrMethod(t0123, 5))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fcs.advance(0, 21, nil)
|
fcs.advance(0, 21, nil)
|
||||||
@ -799,7 +811,7 @@ func TestCalledOrder(t *testing.T) {
|
|||||||
}
|
}
|
||||||
at++
|
at++
|
||||||
return nil
|
return nil
|
||||||
}, 3, 20, t0123, 5)
|
}, 3, 20, matchAddrMethod(t0123, 5))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fcs.advance(0, 10, map[int]cid.Cid{
|
fcs.advance(0, 10, map[int]cid.Cid{
|
||||||
|
@ -27,6 +27,8 @@ func TestTsCache(t *testing.T) {
|
|||||||
ParentStateRoot: dummyCid,
|
ParentStateRoot: dummyCid,
|
||||||
Messages: dummyCid,
|
Messages: dummyCid,
|
||||||
ParentMessageReceipts: dummyCid,
|
ParentMessageReceipts: dummyCid,
|
||||||
|
BlockSig: types.Signature{Type: types.KTBLS},
|
||||||
|
BLSAggregate: types.Signature{Type: types.KTBLS},
|
||||||
}})
|
}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -67,6 +69,8 @@ func TestTsCacheNulls(t *testing.T) {
|
|||||||
ParentStateRoot: dummyCid,
|
ParentStateRoot: dummyCid,
|
||||||
Messages: dummyCid,
|
Messages: dummyCid,
|
||||||
ParentMessageReceipts: dummyCid,
|
ParentMessageReceipts: dummyCid,
|
||||||
|
BlockSig: types.Signature{Type: types.KTBLS},
|
||||||
|
BLSAggregate: types.Signature{Type: types.KTBLS},
|
||||||
}})
|
}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user