From 10357112e598cff583c562280acbac1b1e69f0f4 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Tue, 15 Nov 2022 17:28:26 +0000 Subject: [PATCH] Add basic itests for ethereum filter api --- api/eth_types.go | 32 +++--- api/eth_types_test.go | 34 ++++-- chain/events/filter/event.go | 25 ++++- chain/events/filter/mempool.go | 3 + chain/events/filter/tipset.go | 3 + ...ctor_events_test.go => eth_filter_test.go} | 102 ++++++++++++++++-- itests/kit/node_opts.go | 7 ++ node/impl/full/eth.go | 6 +- 8 files changed, 167 insertions(+), 45 deletions(-) rename itests/{actor_events_test.go => eth_filter_test.go} (59%) diff --git a/api/eth_types.go b/api/eth_types.go index 7f2e52dbb..22eedf2b1 100644 --- a/api/eth_types.go +++ b/api/eth_types.go @@ -484,6 +484,9 @@ type EthFilterSpec struct { type EthAddressList []EthAddress func (e *EthAddressList) UnmarshalJSON(b []byte) error { + if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) { + return nil + } if len(b) > 0 && b[0] == '[' { var addrs []EthAddress err := json.Unmarshal(b, &addrs) @@ -542,34 +545,29 @@ func (e *EthHashList) UnmarshalJSON(b []byte) error { return nil } -// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes +// FilterResult represents the response from executing a filter: a list of block hashes, a list of transaction hashes // or a list of logs // This is a union type. Only one field will be populated. // The JSON encoding must produce an array of the populated field. type EthFilterResult struct { - // List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter - NewBlockHashes []EthHash - - // List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter - NewTransactionHashes []EthHash - - // List of event logs. Only populated when the filter has been installed via EthNewFilter - NewLogs []EthLog + Results []interface{} } func (h EthFilterResult) MarshalJSON() ([]byte, error) { - if h.NewBlockHashes != nil { - return json.Marshal(h.NewBlockHashes) - } - if h.NewTransactionHashes != nil { - return json.Marshal(h.NewTransactionHashes) - } - if h.NewLogs != nil { - return json.Marshal(h.NewLogs) + if h.Results != nil { + return json.Marshal(h.Results) } return []byte{'[', ']'}, nil } +func (h *EthFilterResult) UnmarshalJSON(b []byte) error { + if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) { + return nil + } + err := json.Unmarshal(b, &h.Results) + return err +} + // EthLog represents the results of an event filter execution. type EthLog struct { // Address is the address of the actor that produced the event log. diff --git a/api/eth_types_test.go b/api/eth_types_test.go index e11a31ce4..c7268c0bc 100644 --- a/api/eth_types_test.go +++ b/api/eth_types_test.go @@ -193,30 +193,33 @@ func TestEthFilterResultMarshalJSON(t *testing.T) { { res: EthFilterResult{ - NewBlockHashes: []EthHash{hash1, hash2}, + Results: []any{hash1, hash2}, }, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, }, { res: EthFilterResult{ - NewTransactionHashes: []EthHash{hash1, hash2}, + Results: []any{hash1, hash2}, }, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, }, { res: EthFilterResult{ - NewLogs: []EthLog{log}, + Results: []any{log}, }, want: `[` + string(logjson) + `]`, }, } for _, tc := range testcases { - data, err := json.Marshal(tc.res) - require.NoError(t, err) - require.Equal(t, tc.want, string(data)) + tc := tc + t.Run("", func(t *testing.T) { + data, err := json.Marshal(tc.res) + require.NoError(t, err) + require.Equal(t, tc.want, string(data)) + }) } } @@ -325,12 +328,23 @@ func TestEthAddressListUnmarshalJSON(t *testing.T) { input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`, want: EthAddressList{addr1}, }, + { + input: `[]`, + want: EthAddressList{}, + }, + { + input: `null`, + want: EthAddressList(nil), + }, } for _, tc := range testcases { - var got EthAddressList - err := json.Unmarshal([]byte(tc.input), &got) - require.NoError(t, err) - require.Equal(t, tc.want, got) + tc := tc + t.Run("", func(t *testing.T) { + var got EthAddressList + err := json.Unmarshal([]byte(tc.input), &got) + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) } } diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index c31c7f8d5..ba5be9bde 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -3,14 +3,17 @@ package filter import ( "bytes" "context" + "math" "sync" "time" "github.com/google/uuid" "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" @@ -385,6 +388,9 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a } m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*EventFilter) + } m.filters[id.String()] = f m.mu.Unlock() @@ -437,19 +443,30 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc continue } - evtArr, err := blockadt.AsArray(st, *rct.EventsRoot) + evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(5)) if err != nil { return nil, xerrors.Errorf("load events amt: %w", err) } - ems[i].evs = make([]*types.Event, evtArr.Length()) + ems[i].evs = make([]*types.Event, evtArr.Len()) var evt types.Event - _ = arr.ForEach(&evt, func(i int64) error { + err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { + if u > math.MaxInt { + return xerrors.Errorf("too many events") + } + if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil { + return err + } + cpy := evt - ems[i].evs[int(i)] = &cpy + ems[i].evs[int(u)] = &cpy return nil }) + if err != nil { + return nil, xerrors.Errorf("read events: %w", err) + } + } return ems, nil diff --git a/chain/events/filter/mempool.go b/chain/events/filter/mempool.go index dcea0f54c..5be350644 100644 --- a/chain/events/filter/mempool.go +++ b/chain/events/filter/mempool.go @@ -124,6 +124,9 @@ func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, err } m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*MemPoolFilter) + } m.filters[id.String()] = f m.mu.Unlock() diff --git a/chain/events/filter/tipset.go b/chain/events/filter/tipset.go index 1f43b09a3..0e43c96ef 100644 --- a/chain/events/filter/tipset.go +++ b/chain/events/filter/tipset.go @@ -111,6 +111,9 @@ func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error } m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*TipSetFilter) + } m.filters[id.String()] = f m.mu.Unlock() diff --git a/itests/actor_events_test.go b/itests/eth_filter_test.go similarity index 59% rename from itests/actor_events_test.go rename to itests/eth_filter_test.go index f0f05418e..7140cabca 100644 --- a/itests/actor_events_test.go +++ b/itests/eth_filter_test.go @@ -3,24 +3,28 @@ package itests import ( "context" + "encoding/hex" + "os" "testing" "time" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" ) -func TestActorEventsMpool(t *testing.T) { +func TestEthNewPendingTransactionFilter(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) ens.InterconnectAll().BeginMining(10 * time.Millisecond) // create a new address where to send funds. @@ -92,15 +96,15 @@ func TestActorEventsMpool(t *testing.T) { require.NoError(t, err) // expect to have seen iteration number of mpool messages - require.Equal(t, iterations, len(res.NewTransactionHashes)) + require.Equal(t, iterations, len(res.Results)) } -func TestActorEventsTipsets(t *testing.T) { +func TestEthNewBlockFilter(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) ens.InterconnectAll().BeginMining(10 * time.Millisecond) // create a new address where to send funds. @@ -115,7 +119,7 @@ func TestActorEventsTipsets(t *testing.T) { filterID, err := client.EthNewBlockFilter(ctx) require.NoError(t, err) - const iterations = 100 + const iterations = 30 // we'll send half our balance (saving the other half for gas), // in `iterations` increments. @@ -133,10 +137,8 @@ func TestActorEventsTipsets(t *testing.T) { select { case headChanges := <-headChangeCh: for _, change := range headChanges { - if change.Type == store.HCApply { - msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) - require.NoError(t, err) - count += len(msgs) + if change.Type == store.HCApply || change.Type == store.HCRevert { + count++ if count == iterations { waitAllCh <- struct{}{} } @@ -172,5 +174,83 @@ func TestActorEventsTipsets(t *testing.T) { require.NoError(t, err) // expect to have seen iteration number of tipsets - require.Equal(t, iterations, len(res.NewBlockHashes)) + require.Equal(t, iterations, len(res.Results)) +} + +func TestEthNewFilterCatchAll(t *testing.T) { + require := require.New(t) + + kit.QuietMiningLogs() + + blockTime := 100 * time.Millisecond + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) + ens.InterconnectAll().BeginMining(blockTime) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // install contract + contractHex, err := os.ReadFile("contracts/events.bin") + require.NoError(err) + + contract, err := hex.DecodeString(string(contractHex)) + require.NoError(err) + + fromAddr, err := client.WalletDefaultAddress(ctx) + require.NoError(err) + + result := client.EVM().DeployContract(ctx, fromAddr, contract) + + idAddr, err := address.NewIDAddress(result.ActorID) + require.NoError(err) + t.Logf("actor ID address is %s", idAddr) + + // install filter + filterID, err := client.EthNewFilter(ctx, &api.EthFilterSpec{}) + require.NoError(err) + + const iterations = 10 + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply || change.Type == store.HCRevert { + count++ + if count == iterations*3 { + waitAllCh <- struct{}{} + } + } + } + } + } + }() + + time.Sleep(blockTime * 6) + + for i := 0; i < iterations; i++ { + // log a four topic event with data + ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil) + require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(err) + + // expect to have seen iteration number of events + require.Equal(iterations, len(res.Results)) } diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 9c482700c..edec47a9f 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -280,3 +280,10 @@ func SplitstoreMessges() NodeOpt { return nil }) } + +func RealTimeFilterAPI() NodeOpt { + return WithCfgOpt(func(cfg *config.FullNode) error { + cfg.ActorEvent.EnableRealTimeFilterAPI = true + return nil + }) +} diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 463337d2c..be971baeb 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1293,7 +1293,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu return nil, err } - res.NewLogs = append(res.NewLogs, log) + res.Results = append(res.Results, log) } return res, nil @@ -1312,7 +1312,7 @@ func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, e return nil, err } - res.NewBlockHashes = append(res.NewBlockHashes, hash) + res.Results = append(res.Results, hash) } return res, nil @@ -1327,7 +1327,7 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) { return nil, err } - res.NewTransactionHashes = append(res.NewTransactionHashes, hash) + res.Results = append(res.Results, hash) } return res, nil