From 3ea39f76e1b7aacf23da103290bce689d8359f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 May 2021 17:20:14 +0200 Subject: [PATCH] events: Fix handling of multiple matched events per epoch --- chain/events/events_called.go | 12 ++++--- chain/events/events_test.go | 59 +++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 1a619c195..2fe6853eb 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -144,8 +144,10 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { // Queue up calls until there have been enough blocks to reach // confidence on the message calls - for tid, data := range newCalls { - e.queueForConfidence(tid, data, nil, ts) + for tid, calls := range newCalls { + for _, data := range calls { + e.queueForConfidence(tid, data, nil, ts) + } } for at := e.lastTs.Height(); at <= ts.Height(); at++ { @@ -474,7 +476,7 @@ func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) mes } // Check if there are any new actor calls -func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) { +func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) { pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here if err != nil { log.Errorf("getting parent tipset in checkNewCalls: %s", err) @@ -485,7 +487,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat defer me.lk.RUnlock() // For each message in the tipset - res := make(map[triggerID]eventData) + res := make(map[triggerID][]eventData) me.messagesForTs(pts, func(msg *types.Message) { // TODO: provide receipts @@ -500,7 +502,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat // If there was a match, include the message in the results for the // trigger if matched { - res[tid] = msg + res[tid] = append(res[tid], msg) } } }) diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 0aab626dd..e18d5ba7c 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1323,3 +1323,62 @@ func TestStateChangedTimeout(t *testing.T) { fcs.advance(0, 5, nil) require.False(t, called) } + +func TestCalledMultiplePerEpoch(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + t0123, err := address.NewFromString("t0123") + require.NoError(t, err) + + at := 0 + + err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { + switch at { + case 0: + require.Equal(t, uint64(1), msg.Nonce) + require.Equal(t, abi.ChainEpoch(4), ts.Height()) + case 1: + require.Equal(t, uint64(2), msg.Nonce) + require.Equal(t, abi.ChainEpoch(4), ts.Height()) + default: + t.Fatal("apply should only get called twice, at: ", at) + } + at++ + return true, nil + }, func(_ context.Context, ts *types.TipSet) error { + switch at { + case 2: + require.Equal(t, abi.ChainEpoch(4), ts.Height()) + case 3: + require.Equal(t, abi.ChainEpoch(4), ts.Height()) + default: + t.Fatal("revert should only get called twice, at: ", at) + } + at++ + return nil + }, 3, 20, matchAddrMethod(t0123, 5)) + require.NoError(t, err) + + fcs.advance(0, 10, map[int]cid.Cid{ + 1: fcs.fakeMsgs(fakeMsg{ + bmsgs: []*types.Message{ + {To: t0123, From: t0123, Method: 5, Nonce: 1}, + {To: t0123, From: t0123, Method: 5, Nonce: 2}, + }, + }), + }) + + fcs.advance(9, 1, nil) +}