Merge pull request #6355 from filecoin-project/fix/evt-many-called-epoch
events: Fix handling of multiple matched events per epoch
This commit is contained in:
commit
be3a8bd320
@ -144,9 +144,11 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
|||||||
|
|
||||||
// Queue up calls until there have been enough blocks to reach
|
// Queue up calls until there have been enough blocks to reach
|
||||||
// confidence on the message calls
|
// confidence on the message calls
|
||||||
for tid, data := range newCalls {
|
for tid, calls := range newCalls {
|
||||||
|
for _, data := range calls {
|
||||||
e.queueForConfidence(tid, data, nil, ts)
|
e.queueForConfidence(tid, data, nil, ts)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
|
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
|
||||||
// Apply any queued events and timeouts that were targeted at the
|
// Apply any queued events and timeouts that were targeted at the
|
||||||
@ -474,7 +476,7 @@ func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) mes
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if there are any new actor calls
|
// Check if there are any new actor calls
|
||||||
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) {
|
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) {
|
||||||
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
|
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
|
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
|
||||||
@ -485,7 +487,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
|
|||||||
defer me.lk.RUnlock()
|
defer me.lk.RUnlock()
|
||||||
|
|
||||||
// For each message in the tipset
|
// For each message in the tipset
|
||||||
res := make(map[triggerID]eventData)
|
res := make(map[triggerID][]eventData)
|
||||||
me.messagesForTs(pts, func(msg *types.Message) {
|
me.messagesForTs(pts, func(msg *types.Message) {
|
||||||
// TODO: provide receipts
|
// TODO: provide receipts
|
||||||
|
|
||||||
@ -500,7 +502,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
|
|||||||
// If there was a match, include the message in the results for the
|
// If there was a match, include the message in the results for the
|
||||||
// trigger
|
// trigger
|
||||||
if matched {
|
if matched {
|
||||||
res[tid] = msg
|
res[tid] = append(res[tid], msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -1323,3 +1323,62 @@ func TestStateChangedTimeout(t *testing.T) {
|
|||||||
fcs.advance(0, 5, nil)
|
fcs.advance(0, 5, nil)
|
||||||
require.False(t, called)
|
require.False(t, called)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCalledMultiplePerEpoch(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
t0123, err := address.NewFromString("t0123")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
at := 0
|
||||||
|
|
||||||
|
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
switch at {
|
||||||
|
case 0:
|
||||||
|
require.Equal(t, uint64(1), msg.Nonce)
|
||||||
|
require.Equal(t, abi.ChainEpoch(4), ts.Height())
|
||||||
|
case 1:
|
||||||
|
require.Equal(t, uint64(2), msg.Nonce)
|
||||||
|
require.Equal(t, abi.ChainEpoch(4), ts.Height())
|
||||||
|
default:
|
||||||
|
t.Fatal("apply should only get called twice, at: ", at)
|
||||||
|
}
|
||||||
|
at++
|
||||||
|
return true, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
switch at {
|
||||||
|
case 2:
|
||||||
|
require.Equal(t, abi.ChainEpoch(4), ts.Height())
|
||||||
|
case 3:
|
||||||
|
require.Equal(t, abi.ChainEpoch(4), ts.Height())
|
||||||
|
default:
|
||||||
|
t.Fatal("revert should only get called twice, at: ", at)
|
||||||
|
}
|
||||||
|
at++
|
||||||
|
return nil
|
||||||
|
}, 3, 20, matchAddrMethod(t0123, 5))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 10, map[int]cid.Cid{
|
||||||
|
1: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, From: t0123, Method: 5, Nonce: 1},
|
||||||
|
{To: t0123, From: t0123, Method: 5, Nonce: 2},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
fcs.advance(9, 1, nil)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user