Merge pull request #641 from filecoin-project/feat/event-msg-matchers

Event msg matchers
This commit is contained in:
Łukasz Magiera 2019-11-19 20:43:22 +01:00 committed by GitHub
commit fccff23784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 53 deletions

View File

@ -140,7 +140,7 @@ func doFetch(out string, info paramFile) error {
}
log.Infof("Fetching %s from %s", out, gw)
outf, err := os.OpenFile(out, os.O_RDWR|os.O_CREATE|os.O_APPEND,0666)
outf, err := os.OpenFile(out, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}
@ -151,16 +151,16 @@ func doFetch(out string, info paramFile) error {
return err
}
header := http.Header{}
header.Set("Range", "bytes=" + strconv.FormatInt(fStat.Size(), 10) + "-")
header.Set("Range", "bytes="+strconv.FormatInt(fStat.Size(), 10)+"-")
url, err := url.Parse(gw + info.Cid)
if err != nil {
return err
}
req := http.Request{
Method: "GET",
URL: url,
URL: url,
Header: header,
Close: true,
Close: true,
}
resp, err := http.DefaultClient.Do(&req)

View File

@ -172,23 +172,6 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, dealID := range params.DealIDs {
if dealID == deal.DealID {
found = true
break
}
}
if !found {
return true, nil
}
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
@ -217,7 +200,32 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal
return nil
}
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, deal.Proposal.Provider, actors.MAMethods.ProveCommitSector); err != nil {
matchEvent := func(msg *types.Message) (bool, error) {
if msg.To != deal.Proposal.Provider {
return false, nil
}
if msg.Method != actors.MAMethods.ProveCommitSector {
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, dealID := range params.DealIDs {
if dealID == deal.DealID {
found = true
break
}
}
return found, nil
}
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
return nil, xerrors.Errorf("failed to set up called handler")
}

View File

@ -76,7 +76,7 @@ func NewEvents(ctx context.Context, api eventApi) *Events {
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
revertQueue: map[msgH][]triggerH{},
triggers: map[triggerId]*callHandler{},
callTuples: map[callTuple][]triggerId{},
matchers: map[triggerId][]MatchFunc{},
timeouts: map[uint64]map[triggerId]int{},
},
}

View File

@ -7,7 +7,6 @@ import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
)
@ -34,6 +33,8 @@ type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (more
// may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
type MatchFunc func(msg *types.Message) (bool, error)
type callHandler struct {
confidence int
timeout uint64
@ -63,8 +64,8 @@ type calledEvents struct {
ctr triggerId
triggers map[triggerId]*callHandler
callTuples map[callTuple][]triggerId
triggers map[triggerId]*callHandler
matchers map[triggerId][]MatchFunc
// maps block heights to events
// [triggerH][msgH][event]
@ -77,11 +78,6 @@ type calledEvents struct {
timeouts map[uint64]map[triggerId]int
}
type callTuple struct {
actor address.Address
method uint64
}
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
for _, ts := range rev {
e.handleReverts(ts)
@ -126,21 +122,27 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
e.messagesForTs(ts, func(msg *types.Message) {
// TODO: do we have to verify the receipt, or are messages on chain
// guaranteed to be successful?
// TODO: provide receipts
ct := callTuple{
actor: msg.To,
method: msg.Method,
}
for tid, matchFns := range e.matchers {
var matched bool
for _, matchFn := range matchFns {
ok, err := matchFn(msg)
if err != nil {
log.Warnf("event matcher failed: %s")
continue
}
matched = ok
triggers, ok := e.callTuples[ct]
if !ok {
return
}
if matched {
break
}
}
for _, tid := range triggers {
e.queueForConfidence(tid, msg, ts)
if matched {
e.queueForConfidence(tid, msg, ts)
break
}
}
})
}
@ -292,7 +294,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
// containing the message. The tipset passed as the argument is the tipset
// that is being dropped. Note that the message dropped may be re-applied
// in a different tipset in small amount of time.
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error {
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, mf MatchFunc) error {
e.lk.Lock()
defer e.lk.Unlock()
@ -317,12 +319,8 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
revert: rev,
}
ct := callTuple{
actor: actor,
method: method,
}
e.matchers[id] = append(e.matchers[id], mf)
e.callTuples[ct] = append(e.callTuples[ct], id)
if timeout != NoTimeout {
if e.timeouts[timeout+uint64(confidence)] == nil {
e.timeouts[timeout+uint64(confidence)] = map[uint64]int{}

View File

@ -57,6 +57,9 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
ParentStateRoot: dummyCid,
Messages: msgcid,
ParentMessageReceipts: dummyCid,
BlockSig: types.Signature{Type: types.KTBLS},
BLSAggregate: types.Signature{Type: types.KTBLS},
},
{
Height: h,
@ -67,6 +70,9 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
ParentStateRoot: dummyCid,
Messages: msgcid,
ParentMessageReceipts: dummyCid,
BlockSig: types.Signature{Type: types.KTBLS},
BLSAggregate: types.Signature{Type: types.KTBLS},
},
})
@ -482,6 +488,12 @@ func TestAtChainedConfidenceNull(t *testing.T) {
require.Equal(t, false, reverted)
}
func matchAddrMethod(to address.Address, m uint64) func(msg *types.Message) (bool, error) {
return func(msg *types.Message) (bool, error) {
return to == msg.To && m == msg.Method, nil
}
}
func TestCalled(t *testing.T) {
fcs := &fakeCS{
t: t,
@ -516,7 +528,7 @@ func TestCalled(t *testing.T) {
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 20, t0123, 5)
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
@ -710,7 +722,7 @@ func TestCalledTimeout(t *testing.T) {
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, t0123, 5)
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 21, nil)
@ -745,7 +757,7 @@ func TestCalledTimeout(t *testing.T) {
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, t0123, 5)
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 21, nil)
@ -799,7 +811,7 @@ func TestCalledOrder(t *testing.T) {
}
at++
return nil
}, 3, 20, t0123, 5)
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 10, map[int]cid.Cid{

View File

@ -27,6 +27,8 @@ func TestTsCache(t *testing.T) {
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: types.Signature{Type: types.KTBLS},
BLSAggregate: types.Signature{Type: types.KTBLS},
}})
if err != nil {
t.Fatal(err)
@ -67,6 +69,8 @@ func TestTsCacheNulls(t *testing.T) {
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: types.Signature{Type: types.KTBLS},
BLSAggregate: types.Signature{Type: types.KTBLS},
}})
if err != nil {
t.Fatal(err)