evm, rpc: apply bloom filter when querying ethlogs with a range of blocks (#587)
* apply the bloom filter when query the ethlogs with a range of blocks * fix lint * error handling in calcBloomIVs * print error log in createBloomFilters * update changelog Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
This commit is contained in:
parent
8d3381a5f9
commit
b42e187060
@ -65,6 +65,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
* (evm) [tharsis#461](https://github.com/tharsis/ethermint/pull/461) Increase performance of `StateDB` transaction log storage (r/w).
|
||||
* (evm) [tharsis#566](https://github.com/tharsis/ethermint/pull/566) Introduce `stateErr` store in `StateDB` to avoid meaningless operations if any error happened before
|
||||
* (rpc, evm) [tharsis#587](https://github.com/tharsis/ethermint/pull/587) Apply bloom filter when query ethlogs with range of blocks
|
||||
* (evm) [tharsis#586](https://github.com/tharsis/ethermint/pull/586) Benchmark evm keeper
|
||||
|
||||
## [v0.5.0] - 2021-08-20
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
ethtypes "github.com/ethereum/go-ethereum/core/types"
|
||||
|
||||
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth/filters"
|
||||
"github.com/tharsis/ethermint/ethereum/rpc/types"
|
||||
"github.com/tharsis/ethermint/server/config"
|
||||
ethermint "github.com/tharsis/ethermint/types"
|
||||
@ -57,6 +58,7 @@ type Backend interface {
|
||||
EstimateGas(args evmtypes.CallArgs, blockNrOptional *types.BlockNumber) (hexutil.Uint64, error)
|
||||
RPCGasCap() uint64
|
||||
RPCMinGasPrice() int64
|
||||
GetFilteredBlocks(from int64, to int64, filter [][]filters.BloomIV, filterAddresses bool) ([]int64, error)
|
||||
}
|
||||
|
||||
var _ Backend = (*EVMBackend)(nil)
|
||||
@ -713,3 +715,65 @@ func (e *EVMBackend) RPCMinGasPrice() int64 {
|
||||
|
||||
return ethermint.DefaultGasPrice
|
||||
}
|
||||
|
||||
// GetFilteredBlocks returns the block height list match the given bloom filters.
|
||||
func (e *EVMBackend) GetFilteredBlocks(
|
||||
from int64,
|
||||
to int64,
|
||||
filters [][]filters.BloomIV,
|
||||
filterAddresses bool,
|
||||
) ([]int64, error) {
|
||||
matchedBlocks := make([]int64, 0)
|
||||
|
||||
BLOCKS:
|
||||
for height := from; height <= to; height++ {
|
||||
if err := e.ctx.Err(); err != nil {
|
||||
e.logger.Error("EVMBackend context error", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h := height
|
||||
bloom, err := e.BlockBloom(&h)
|
||||
if err != nil {
|
||||
e.logger.Error("retrieve header failed", "blockHeight", height, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, filter := range filters {
|
||||
// filter the header bloom with the addresses
|
||||
if filterAddresses && i == 0 {
|
||||
if !checkMatches(bloom, filter) {
|
||||
continue BLOCKS
|
||||
}
|
||||
|
||||
// the filter doesn't have any topics
|
||||
if len(filters) == 1 {
|
||||
matchedBlocks = append(matchedBlocks, height)
|
||||
continue BLOCKS
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// filter the bloom with topics
|
||||
if len(filter) > 0 && !checkMatches(bloom, filter) {
|
||||
continue BLOCKS
|
||||
}
|
||||
}
|
||||
matchedBlocks = append(matchedBlocks, height)
|
||||
}
|
||||
|
||||
return matchedBlocks, nil
|
||||
}
|
||||
|
||||
// checkMatches revised the function from
|
||||
// https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L88
|
||||
func checkMatches(bloom ethtypes.Bloom, filter []filters.BloomIV) bool {
|
||||
for _, bloomIV := range filter {
|
||||
if bloomIV.V[0] == bloomIV.V[0]&bloom[bloomIV.I[0]] &&
|
||||
bloomIV.V[1] == bloomIV.V[1]&bloom[bloomIV.I[1]] &&
|
||||
bloomIV.V[2] == bloomIV.V[2]&bloom[bloomIV.I[2]] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -32,6 +32,8 @@ type Backend interface {
|
||||
|
||||
GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
|
||||
BloomStatus() (uint64, uint64)
|
||||
|
||||
GetFilteredBlocks(from int64, to int64, bloomIndexes [][]BloomIV, filterAddresses bool) ([]int64, error)
|
||||
}
|
||||
|
||||
// consider a filter inactive if it has not been polled for within deadline
|
||||
|
@ -2,7 +2,7 @@ package filters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"encoding/binary"
|
||||
"math/big"
|
||||
|
||||
"github.com/tharsis/ethermint/ethereum/rpc/types"
|
||||
@ -11,17 +11,25 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||
ethtypes "github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/eth/filters"
|
||||
)
|
||||
|
||||
// BloomIV represents the bit indexes and value inside the bloom filter that belong
|
||||
// to some key.
|
||||
type BloomIV struct {
|
||||
I [3]uint
|
||||
V [3]byte
|
||||
}
|
||||
|
||||
// Filter can be used to retrieve and filter logs.
|
||||
type Filter struct {
|
||||
logger log.Logger
|
||||
backend Backend
|
||||
criteria filters.FilterCriteria
|
||||
matcher *bloombits.Matcher
|
||||
|
||||
bloomFilters [][]BloomIV // Filter the system is matching for
|
||||
}
|
||||
|
||||
// NewBlockFilter creates a new filter which directly inspects the contents of
|
||||
@ -54,8 +62,6 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
|
||||
filtersBz = append(filtersBz, filter)
|
||||
}
|
||||
|
||||
size, _ := backend.BloomStatus()
|
||||
|
||||
// Create a generic filter and convert it into a range filter
|
||||
criteria := filters.FilterCriteria{
|
||||
FromBlock: big.NewInt(begin),
|
||||
@ -64,16 +70,16 @@ func NewRangeFilter(logger log.Logger, backend Backend, begin, end int64, addres
|
||||
Topics: topics,
|
||||
}
|
||||
|
||||
return newFilter(logger, backend, criteria, bloombits.NewMatcher(size, filtersBz))
|
||||
return newFilter(logger, backend, criteria, createBloomFilters(filtersBz, logger))
|
||||
}
|
||||
|
||||
// newFilter returns a new Filter
|
||||
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
|
||||
func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriteria, bloomFilters [][]BloomIV) *Filter {
|
||||
return &Filter{
|
||||
logger: logger,
|
||||
backend: backend,
|
||||
criteria: criteria,
|
||||
matcher: matcher,
|
||||
bloomFilters: bloomFilters,
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,52 +138,25 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
|
||||
f.criteria.ToBlock = big.NewInt(head + maxToOverhang)
|
||||
}
|
||||
|
||||
for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ {
|
||||
block, err := f.backend.GetBlockByNumber(types.BlockNumber(i), false)
|
||||
from := f.criteria.FromBlock.Int64()
|
||||
to := f.criteria.ToBlock.Int64()
|
||||
|
||||
blocks, err := f.backend.GetFilteredBlocks(from, to, f.bloomFilters, len(f.criteria.Addresses) > 0)
|
||||
if err != nil {
|
||||
return logs, errors.Wrapf(err, "failed to fetch block by number %d", i)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if block["transactions"] == nil {
|
||||
continue
|
||||
for _, height := range blocks {
|
||||
ethLogs, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
|
||||
if err != nil {
|
||||
return logs, errors.Wrapf(err, "failed to fetch block by number %d", height)
|
||||
}
|
||||
|
||||
var txHashes []common.Hash
|
||||
|
||||
txs, ok := block["transactions"].([]interface{})
|
||||
if !ok {
|
||||
_, ok = block["transactions"].([]common.Hash)
|
||||
if !ok {
|
||||
f.logger.Error(
|
||||
"reading transactions from block data failed",
|
||||
"type", fmt.Sprintf("%T", block["transactions"]),
|
||||
)
|
||||
for _, ethLog := range ethLogs {
|
||||
filtered := FilterLogs(ethLog, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
|
||||
logs = append(logs, filtered...)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if len(txs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tx := range txs {
|
||||
txHash, ok := tx.(common.Hash)
|
||||
if !ok {
|
||||
f.logger.Error(
|
||||
"transactions list contains non-hash element",
|
||||
"type", fmt.Sprintf("%T", tx),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
txHashes = append(txHashes, txHash)
|
||||
}
|
||||
|
||||
logsMatched := f.checkMatches(txHashes)
|
||||
logs = append(logs, logsMatched...)
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
@ -207,21 +186,64 @@ func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// checkMatches checks if the logs from the a list of transactions transaction
|
||||
// contain any log events that match the filter criteria. This function is
|
||||
// called when the bloom filter signals a potential match.
|
||||
func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
|
||||
unfiltered := []*ethtypes.Log{}
|
||||
for _, tx := range transactions {
|
||||
logs, err := f.backend.GetTransactionLogs(tx)
|
||||
if err != nil {
|
||||
// ignore error if transaction didn't set any logs (eg: when tx type is not
|
||||
// MsgEthereumTx or MsgEthermint)
|
||||
func createBloomFilters(filters [][][]byte, logger log.Logger) [][]BloomIV {
|
||||
bloomFilters := make([][]BloomIV, 0)
|
||||
for _, filter := range filters {
|
||||
// Gather the bit indexes of the filter rule, special casing the nil filter
|
||||
if len(filter) == 0 {
|
||||
continue
|
||||
}
|
||||
bloomIVs := make([]BloomIV, len(filter))
|
||||
|
||||
unfiltered = append(unfiltered, logs...)
|
||||
// Transform the filter rules (the addresses and topics) to the bloom index and value arrays
|
||||
// So it can be used to compare with the bloom of the block header. If the rule has any nil
|
||||
// clauses. The rule will be ignored.
|
||||
for i, clause := range filter {
|
||||
if clause == nil {
|
||||
bloomIVs = nil
|
||||
break
|
||||
}
|
||||
|
||||
return FilterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
|
||||
iv, err := calcBloomIVs(clause)
|
||||
if err != nil {
|
||||
bloomIVs = nil
|
||||
logger.Error("calcBloomIVs error: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
bloomIVs[i] = iv
|
||||
}
|
||||
// Accumulate the filter rules if no nil rule was within
|
||||
if bloomIVs != nil {
|
||||
bloomFilters = append(bloomFilters, bloomIVs)
|
||||
}
|
||||
}
|
||||
return bloomFilters
|
||||
}
|
||||
|
||||
// calcBloomIVs returns BloomIV for the given data,
|
||||
// revised from https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L139
|
||||
func calcBloomIVs(data []byte) (BloomIV, error) {
|
||||
hashbuf := make([]byte, 6)
|
||||
biv := BloomIV{}
|
||||
|
||||
sha := crypto.NewKeccakState()
|
||||
sha.Reset()
|
||||
if _, err := sha.Write(data); err != nil {
|
||||
return BloomIV{}, err
|
||||
}
|
||||
if _, err := sha.Read(hashbuf); err != nil {
|
||||
return BloomIV{}, err
|
||||
}
|
||||
|
||||
// The actual bits to flip
|
||||
biv.V[0] = byte(1 << (hashbuf[1] & 0x7))
|
||||
biv.V[1] = byte(1 << (hashbuf[3] & 0x7))
|
||||
biv.V[2] = byte(1 << (hashbuf[5] & 0x7))
|
||||
// The indices for the bytes to OR in
|
||||
biv.I[0] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1
|
||||
biv.I[1] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
|
||||
biv.I[2] = ethtypes.BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1
|
||||
|
||||
return biv, nil
|
||||
}
|
||||
|
@ -35,7 +35,6 @@ const (
|
||||
|
||||
var (
|
||||
MODE = os.Getenv("MODE")
|
||||
|
||||
zeroString = "0x0"
|
||||
from = []byte{}
|
||||
)
|
||||
@ -911,3 +910,38 @@ func TestEth_GetBlockByNumber(t *testing.T) {
|
||||
require.Equal(t, "0x", block["extraData"].(string))
|
||||
require.Equal(t, []interface{}{}, block["uncles"].([]interface{}))
|
||||
}
|
||||
|
||||
func TestEth_GetLogs(t *testing.T) {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
rpcRes := call(t, "eth_blockNumber", []string{})
|
||||
|
||||
var res hexutil.Uint64
|
||||
err := res.UnmarshalJSON(rpcRes.Result)
|
||||
require.NoError(t, err)
|
||||
|
||||
param := make([]map[string]interface{}, 1)
|
||||
param[0] = make(map[string]interface{})
|
||||
param[0]["topics"] = []string{helloTopic, worldTopic}
|
||||
param[0]["fromBlock"] = res.String()
|
||||
|
||||
deployTestContractWithFunction(t)
|
||||
|
||||
// get filter changes
|
||||
logRes := call(t, "eth_getLogs", param)
|
||||
|
||||
var logs []*ethtypes.Log
|
||||
err = json.Unmarshal(logRes.Result, &logs)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(logs))
|
||||
|
||||
// filter log with address
|
||||
param[0] = make(map[string]interface{})
|
||||
param[0]["address"] = "0x" + fmt.Sprintf("%x", from)
|
||||
param[0]["fromBlock"] = res.String()
|
||||
err = json.Unmarshal(logRes.Result, &logs)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(logs))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user