From db9a178ad24bff653ba0786d1396992bc81f9c20 Mon Sep 17 00:00:00 2001 From: Delweng Date: Thu, 25 May 2023 20:40:28 +0800 Subject: [PATCH] eth/filters: retrieve logs in async (#27135) This change implements async log retrievals via feeding logs in channels, instead of returning slices. This is a first step to implement #15063. --------- Signed-off-by: jsvisa Co-authored-by: Sina Mahmoodi Co-authored-by: Martin Holst Swende Co-authored-by: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> --- eth/filters/filter.go | 168 ++++++++++------- eth/filters/filter_system_test.go | 4 +- eth/filters/filter_test.go | 301 +++++++++++++++++++++--------- 3 files changed, 320 insertions(+), 153 deletions(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 122676e96..7543c0603 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } return f.blockLogs(ctx, header) } - // Short-cut if all we care about is pending logs - if f.begin == rpc.PendingBlockNumber.Int64() { - if f.end != rpc.PendingBlockNumber.Int64() { - return nil, errors.New("invalid block range") - } - return f.pendingLogs() - } - // Figure out the limits of the filter range - header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if header == nil { - return nil, nil - } + var ( - err error - head = header.Number.Int64() - pending = f.end == rpc.PendingBlockNumber.Int64() + beginPending = f.begin == rpc.PendingBlockNumber.Int64() + endPending = f.end == rpc.PendingBlockNumber.Int64() ) + + // special case for pending logs + if beginPending && !endPending { + return nil, errors.New("invalid block range") + } + + // Short-cut if all we care about is pending logs + if beginPending && endPending { + return f.pendingLogs(), nil + } + resolveSpecial := func(number int64) (int64, error) { var hdr *types.Header switch number { - case rpc.LatestBlockNumber.Int64(): - return head, nil - case rpc.PendingBlockNumber.Int64(): + case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): // we should return head here since we've already captured // that we need to get the pending logs in the pending boolean above - return head, nil + hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if hdr == nil { + return 0, errors.New("latest header not found") + } case rpc.FinalizedBlockNumber.Int64(): hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { @@ -147,57 +147,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } return hdr.Number.Int64(), nil } + + var err error + // range query need to resolve the special begin/end block number if f.begin, err = resolveSpecial(f.begin); err != nil { return nil, err } if f.end, err = resolveSpecial(f.end); err != nil { return nil, err } - // Gather all indexed logs, and finish with non indexed ones + + logChan, errChan := f.rangeLogsAsync(ctx) + var logs []*types.Log + for { + select { + case log := <-logChan: + logs = append(logs, log) + case err := <-errChan: + if err != nil { + // if an error occurs during extraction, we do return the extracted data + return logs, err + } + // Append the pending ones + if endPending { + pendingLogs := f.pendingLogs() + logs = append(logs, pendingLogs...) + } + return logs, nil + } + } +} + +// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously, +// it creates and returns two channels: one for delivering log data, and one for reporting errors. +func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) { var ( - logs []*types.Log - end = uint64(f.end) - size, sections = f.sys.backend.BloomStatus() + logChan = make(chan *types.Log) + errChan = make(chan error) ) - if indexed := sections * size; indexed > uint64(f.begin) { - if indexed > end { - logs, err = f.indexedLogs(ctx, end) - } else { - logs, err = f.indexedLogs(ctx, indexed-1) + + go func() { + defer func() { + close(errChan) + close(logChan) + }() + + // Gather all indexed logs, and finish with non indexed ones + var ( + end = uint64(f.end) + size, sections = f.sys.backend.BloomStatus() + err error + ) + if indexed := sections * size; indexed > uint64(f.begin) { + if indexed > end { + indexed = end + 1 + } + if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil { + errChan <- err + return + } } - if err != nil { - return logs, err + + if err := f.unindexedLogs(ctx, end, logChan); err != nil { + errChan <- err + return } - } - rest, err := f.unindexedLogs(ctx, end) - logs = append(logs, rest...) - if pending { - pendingLogs, err := f.pendingLogs() - if err != nil { - return nil, err - } - logs = append(logs, pendingLogs...) - } - return logs, err + + errChan <- nil + }() + + return logChan, errChan } // indexedLogs returns the logs matching the filter criteria based on the bloom // bits indexed available locally or via the network. -func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { +func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { // Create a matcher session and request servicing from the backend matches := make(chan uint64, 64) session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) if err != nil { - return nil, err + return err } defer session.Close() f.sys.backend.ServiceFilter(ctx, session) - // Iterate over the matches until exhausted or context closed - var logs []*types.Log - for { select { case number, ok := <-matches: @@ -207,47 +242,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err if err == nil { f.begin = int64(end) + 1 } - return logs, err + return err } f.begin = int64(number) + 1 // Retrieve the suggested block and pull any truly matching logs header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { - return logs, err + return err } found, err := f.checkMatches(ctx, header) if err != nil { - return logs, err + return err + } + for _, log := range found { + logChan <- log } - logs = append(logs, found...) case <-ctx.Done(): - return logs, ctx.Err() + return ctx.Err() } } } // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. -func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { - var logs []*types.Log - +func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { for ; f.begin <= int64(end); f.begin++ { - if f.begin%10 == 0 && ctx.Err() != nil { - return logs, ctx.Err() - } header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { - return logs, err + return err } found, err := f.blockLogs(ctx, header) if err != nil { - return logs, err + return err + } + for _, log := range found { + select { + case logChan <- log: + case <-ctx.Done(): + return ctx.Err() + } } - logs = append(logs, found...) } - return logs, nil + return nil } // blockLogs returns the logs matching the filter criteria within a single block. @@ -294,19 +332,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ } // pendingLogs returns the logs matching the filter criteria within the pending block. -func (f *Filter) pendingLogs() ([]*types.Log, error) { +func (f *Filter) pendingLogs() []*types.Log { block, receipts := f.sys.backend.PendingBlockAndReceipts() if block == nil { - return nil, errors.New("pending state not available") + return nil } if bloomFilter(block.Bloom(), f.addresses, f.topics) { var unfiltered []*types.Log for _, r := range receipts { unfiltered = append(unfiltered, r.Logs...) } - return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil + return filterLogs(unfiltered, nil, nil, f.addresses, f.topics) } - return nil, nil + return nil } func includes(addresses []common.Address, a common.Address) bool { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index ae0c83eb2..8f06dff1c 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -50,6 +50,8 @@ type testBackend struct { rmLogsFeed event.Feed pendingLogsFeed event.Feed chainFeed event.Feed + pendingBlock *types.Block + pendingReceipts types.Receipts } func (b *testBackend) ChainConfig() *params.ChainConfig { @@ -124,7 +126,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint } func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return nil, nil + return b.pendingBlock, b.pendingReceipts } func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 0e42c798e..8eea0a267 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -18,18 +18,23 @@ package filters import ( "context" + "encoding/json" "math/big" - "reflect" + "strings" "testing" + "time" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie" ) func makeReceipt(addr common.Address) *types.Receipt { @@ -103,142 +108,264 @@ func BenchmarkFilters(b *testing.B) { func TestFilters(t *testing.T) { var ( - db, _ = rawdb.NewLevelDBDatabase(t.TempDir(), 0, 0, "", false) - _, sys = newTestFilterSystem(t, db, Config{}) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + // Sender account key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) + signer = types.NewLondonSigner(big.NewInt(1)) + // Logging contract + contract = common.Address{0xfe} + contract2 = common.Address{0xff} + abiStr = `[{"inputs":[],"name":"log0","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"}],"name":"log1","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"}],"name":"log2","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"},{"internalType":"uint256","name":"t3","type":"uint256"}],"name":"log3","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"},{"internalType":"uint256","name":"t3","type":"uint256"},{"internalType":"uint256","name":"t4","type":"uint256"}],"name":"log4","outputs":[],"stateMutability":"nonpayable","type":"function"}]` + /* + // SPDX-License-Identifier: GPL-3.0 + pragma solidity >=0.7.0 <0.9.0; + + contract Logger { + function log0() external { + assembly { + log0(0, 0) + } + } + + function log1(uint t1) external { + assembly { + log1(0, 0, t1) + } + } + + function log2(uint t1, uint t2) external { + assembly { + log2(0, 0, t1, t2) + } + } + + function log3(uint t1, uint t2, uint t3) external { + assembly { + log3(0, 0, t1, t2, t3) + } + } + + function log4(uint t1, uint t2, uint t3, uint t4) external { + assembly { + log4(0, 0, t1, t2, t3, t4) + } + } + } + */ + bytecode = common.FromHex("608060405234801561001057600080fd5b50600436106100575760003560e01c80630aa731851461005c5780632a4c08961461006657806378b9a1f314610082578063c670f8641461009e578063c683d6a3146100ba575b600080fd5b6100646100d6565b005b610080600480360381019061007b9190610143565b6100dc565b005b61009c60048036038101906100979190610196565b6100e8565b005b6100b860048036038101906100b391906101d6565b6100f2565b005b6100d460048036038101906100cf9190610203565b6100fa565b005b600080a0565b808284600080a3505050565b8082600080a25050565b80600080a150565b80828486600080a450505050565b600080fd5b6000819050919050565b6101208161010d565b811461012b57600080fd5b50565b60008135905061013d81610117565b92915050565b60008060006060848603121561015c5761015b610108565b5b600061016a8682870161012e565b935050602061017b8682870161012e565b925050604061018c8682870161012e565b9150509250925092565b600080604083850312156101ad576101ac610108565b5b60006101bb8582860161012e565b92505060206101cc8582860161012e565b9150509250929050565b6000602082840312156101ec576101eb610108565b5b60006101fa8482850161012e565b91505092915050565b6000806000806080858703121561021d5761021c610108565b5b600061022b8782880161012e565b945050602061023c8782880161012e565b935050604061024d8782880161012e565b925050606061025e8782880161012e565b9150509295919450925056fea264697066735822122073a4b156f487e59970dc1ef449cc0d51467268f676033a17188edafcee861f9864736f6c63430008110033") hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) hash3 = common.BytesToHash([]byte("topic3")) hash4 = common.BytesToHash([]byte("topic4")) + hash5 = common.BytesToHash([]byte("topic5")) gspec = &core.Genesis{ - Config: params.TestChainConfig, - Alloc: core.GenesisAlloc{addr: {Balance: big.NewInt(1000000)}}, + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{ + addr: {Balance: big.NewInt(0).Mul(big.NewInt(100), big.NewInt(params.Ether))}, + contract: {Balance: big.NewInt(0), Code: bytecode}, + contract2: {Balance: big.NewInt(0), Code: bytecode}, + }, BaseFee: big.NewInt(params.InitialBaseFee), } ) - defer db.Close() - _, chain, receipts := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), 1000, func(i int, gen *core.BlockGen) { + contractABI, err := abi.JSON(strings.NewReader(abiStr)) + if err != nil { + t.Fatal(err) + } + + // Hack: GenerateChainWithGenesis creates a new db. + // Commit the genesis manually and use GenerateChain. + _, err = gspec.Commit(db, trie.NewDatabase(db)) + if err != nil { + t.Fatal(err) + } + chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) { switch i { case 1: - receipt := types.NewReceipt(nil, false, 0) - receipt.Logs = []*types.Log{ - { - Address: addr, - Topics: []common.Hash{hash1}, - }, + data, err := contractABI.Pack("log1", hash1.Big()) + if err != nil { + t.Fatal(err) } - gen.AddUncheckedReceipt(receipt) - gen.AddUncheckedTx(types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(1), 1, gen.BaseFee(), nil)) + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 0, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract, + Data: data, + }), signer, key1) + gen.AddTx(tx) + tx2, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 1, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract2, + Data: data, + }), signer, key1) + gen.AddTx(tx2) case 2: - receipt := types.NewReceipt(nil, false, 0) - receipt.Logs = []*types.Log{ - { - Address: addr, - Topics: []common.Hash{hash2}, - }, + data, err := contractABI.Pack("log2", hash2.Big(), hash1.Big()) + if err != nil { + t.Fatal(err) } - gen.AddUncheckedReceipt(receipt) - gen.AddUncheckedTx(types.NewTransaction(2, common.HexToAddress("0x2"), big.NewInt(2), 2, gen.BaseFee(), nil)) - + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 2, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract, + Data: data, + }), signer, key1) + gen.AddTx(tx) case 998: - receipt := types.NewReceipt(nil, false, 0) - receipt.Logs = []*types.Log{ - { - Address: addr, - Topics: []common.Hash{hash3}, - }, + data, err := contractABI.Pack("log1", hash3.Big()) + if err != nil { + t.Fatal(err) } - gen.AddUncheckedReceipt(receipt) - gen.AddUncheckedTx(types.NewTransaction(998, common.HexToAddress("0x998"), big.NewInt(998), 998, gen.BaseFee(), nil)) + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 3, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract2, + Data: data, + }), signer, key1) + gen.AddTx(tx) case 999: - receipt := types.NewReceipt(nil, false, 0) - receipt.Logs = []*types.Log{ - { - Address: addr, - Topics: []common.Hash{hash4}, - }, + data, err := contractABI.Pack("log1", hash4.Big()) + if err != nil { + t.Fatal(err) } - gen.AddUncheckedReceipt(receipt) - gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, gen.BaseFee(), nil)) + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 4, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract, + Data: data, + }), signer, key1) + gen.AddTx(tx) } }) - // The test txs are not properly signed, can't simply create a chain - // and then import blocks. TODO(rjl493456442) try to get rid of the - // manual database writes. - gspec.MustCommit(db) - for i, block := range chain { - rawdb.WriteBlock(db, block) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + var l uint64 + bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) + if err != nil { + t.Fatal(err) + } + _, err = bc.InsertChain(chain) + if err != nil { + t.Fatal(err) } // Set block 998 as Finalized (-3) - rawdb.WriteFinalizedBlockHash(db, chain[998].Hash()) + bc.SetFinalized(chain[998].Header()) - filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) - logs, _ := filter.Logs(context.Background()) - if len(logs) != 4 { - t.Error("expected 4 log, got", len(logs)) - } + // Generate pending block + pchain, preceipts := core.GenerateChain(gspec.Config, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *core.BlockGen) { + data, err := contractABI.Pack("log1", hash5.Big()) + if err != nil { + t.Fatal(err) + } + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{ + Nonce: 5, + GasPrice: gen.BaseFee(), + Gas: 30000, + To: &contract, + Data: data, + }), signer, key1) + gen.AddTx(tx) + }) + sys.backend.(*testBackend).pendingBlock = pchain[0] + sys.backend.(*testBackend).pendingReceipts = preceipts[0] for i, tc := range []struct { - f *Filter - wantHashes []common.Hash + f *Filter + want string + err string }{ { - sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}), - []common.Hash{hash3}, + f: sys.NewBlockFilter(chain[2].Hash(), []common.Address{contract}, nil), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xdefe471992a07a02acdfbe33edaae22fbb86d7d3cec3f1b8e4e77702fb3acc1d","transactionIndex":"0x0","blockHash":"0x7a7556792ca7d37882882e2b001fe14833eaf81c2c7f865c9c771ec37a024f6b","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(990, int64(rpc.LatestBlockNumber), []common.Address{addr}, [][]common.Hash{{hash3}}), - []common.Hash{hash3}, + f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{contract}, [][]common.Hash{{hash1, hash2, hash3, hash4}}), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0xa8028c655b6423204c8edfbc339f57b042d6bec2b6a61145d76b7c08b4cccd42","transactionIndex":"0x0","blockHash":"0x24417bb49ce44cfad65da68f33b510bf2a129c0d89ccf06acb6958b8585ccf34","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xdefe471992a07a02acdfbe33edaae22fbb86d7d3cec3f1b8e4e77702fb3acc1d","transactionIndex":"0x0","blockHash":"0x7a7556792ca7d37882882e2b001fe14833eaf81c2c7f865c9c771ec37a024f6b","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}), - []common.Hash{hash1, hash2}, + f: sys.NewRangeFilter(900, 999, []common.Address{contract}, [][]common.Hash{{hash3}}), }, { - sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}}), - nil, + f: sys.NewRangeFilter(990, int64(rpc.LatestBlockNumber), []common.Address{contract2}, [][]common.Hash{{hash3}}), + want: `[{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696333"],"data":"0x","blockNumber":"0x3e7","transactionHash":"0x53e3675800c6908424b61b35a44e51ca4c73ca603e58a65b32c67968b4f42200","transactionIndex":"0x0","blockHash":"0x2e4620a2b426b0612ec6cad9603f466723edaed87f98c9137405dd4f7a2409ff","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{common.BytesToAddress([]byte("failmenow"))}, nil), - nil, + f: sys.NewRangeFilter(1, 10, []common.Address{contract}, [][]common.Hash{{hash2}, {hash1}}), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xdefe471992a07a02acdfbe33edaae22fbb86d7d3cec3f1b8e4e77702fb3acc1d","transactionIndex":"0x0","blockHash":"0x7a7556792ca7d37882882e2b001fe14833eaf81c2c7f865c9c771ec37a024f6b","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}, {hash1}}), - nil, + f: sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0xa8028c655b6423204c8edfbc339f57b042d6bec2b6a61145d76b7c08b4cccd42","transactionIndex":"0x0","blockHash":"0x24417bb49ce44cfad65da68f33b510bf2a129c0d89ccf06acb6958b8585ccf34","logIndex":"0x0","removed":false},{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0xdba3e2ea9a7d690b722d70ee605fd67ba4c00d1d3aecd5cf187a7b92ad8eb3df","transactionIndex":"0x1","blockHash":"0x24417bb49ce44cfad65da68f33b510bf2a129c0d89ccf06acb6958b8585ccf34","logIndex":"0x1","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xdefe471992a07a02acdfbe33edaae22fbb86d7d3cec3f1b8e4e77702fb3acc1d","transactionIndex":"0x0","blockHash":"0x7a7556792ca7d37882882e2b001fe14833eaf81c2c7f865c9c771ec37a024f6b","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), []common.Hash{hash4}, + f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}}), }, { - sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), []common.Hash{hash3, hash4}, + f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{common.BytesToAddress([]byte("failmenow"))}, nil), }, { - sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil), []common.Hash{hash3}, + f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}, {hash1}}), }, { - sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil), nil, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(int64(rpc.SafeBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), nil, + f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), + want: `[{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696333"],"data":"0x","blockNumber":"0x3e7","transactionHash":"0x53e3675800c6908424b61b35a44e51ca4c73ca603e58a65b32c67968b4f42200","transactionIndex":"0x0","blockHash":"0x2e4620a2b426b0612ec6cad9603f466723edaed87f98c9137405dd4f7a2409ff","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(int64(rpc.SafeBlockNumber), int64(rpc.SafeBlockNumber), nil, nil), nil, + f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil), + want: `[{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696333"],"data":"0x","blockNumber":"0x3e7","transactionHash":"0x53e3675800c6908424b61b35a44e51ca4c73ca603e58a65b32c67968b4f42200","transactionIndex":"0x0","blockHash":"0x2e4620a2b426b0612ec6cad9603f466723edaed87f98c9137405dd4f7a2409ff","logIndex":"0x0","removed":false}]`, }, { - sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.SafeBlockNumber), nil, nil), nil, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil), }, { - sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), nil, + f: sys.NewRangeFilter(int64(rpc.SafeBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), + err: "safe header not found", + }, { + f: sys.NewRangeFilter(int64(rpc.SafeBlockNumber), int64(rpc.SafeBlockNumber), nil, nil), + err: "safe header not found", + }, { + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.SafeBlockNumber), nil, nil), + err: "safe header not found", + }, { + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xc7245899e5817f16fa99cf5ad2d9c1e4b98443a565a673ec9c764640443ef037","logIndex":"0x0","removed":false}]`, + }, { + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xc7245899e5817f16fa99cf5ad2d9c1e4b98443a565a673ec9c764640443ef037","logIndex":"0x0","removed":false}]`, + }, { + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), + err: "invalid block range", }, } { - logs, _ := tc.f.Logs(context.Background()) - var haveHashes []common.Hash - for _, l := range logs { - haveHashes = append(haveHashes, l.Topics[0]) + logs, err := tc.f.Logs(context.Background()) + if err == nil && tc.err != "" { + t.Fatalf("test %d, expected error %q, got nil", i, tc.err) + } else if err != nil && err.Error() != tc.err { + t.Fatalf("test %d, expected error %q, got %q", i, tc.err, err.Error()) } - if have, want := len(haveHashes), len(tc.wantHashes); have != want { - t.Fatalf("test %d, have %d logs, want %d", i, have, want) - } - if len(haveHashes) == 0 { + if tc.want == "" && len(logs) == 0 { continue } - if !reflect.DeepEqual(tc.wantHashes, haveHashes) { - t.Fatalf("test %d, have %v want %v", i, haveHashes, tc.wantHashes) + have, err := json.Marshal(logs) + if err != nil { + t.Fatal(err) + } + if string(have) != tc.want { + t.Fatalf("test %d, have:\n%s\nwant:\n%s", i, have, tc.want) } } + + t.Run("timeout", func(t *testing.T) { + f := sys.NewRangeFilter(0, -1, nil, nil) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) + defer cancel() + _, err := f.Logs(ctx) + if err == nil { + t.Fatal("expected error") + } + if err != context.DeadlineExceeded { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } + }) }