eth_newBlockFilter and related functionality (#232)

* add filter types for block, pending tx, log
* implement pollForBlocks for block filters
* implement getFilterChanges for block filter
* implement uninstall filter by stopping polling
This commit is contained in:
noot 2020-04-07 16:00:06 -04:00 committed by GitHub
parent eaaae0e3fd
commit db4cee7eac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 126 additions and 35 deletions

View File

@ -50,3 +50,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (app/ante) Moved `AnteHandler` implementation to `app/ante`
* (keys) Marked `ExportEthKeyCommand` as **UNSAFE**
* (x/evm) Moved `BeginBlock` and `EndBlock` to `x/evm/abci.go`
## Features
* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement NewBlockFilter in rpc/filters.go which instantiates a polling block filter
* Polls for new blocks via BlockNumber rpc call; if block number changes, it requests the new block via GetBlockByNumber rpc call and adds it to its internal list of blocks
* Update uninstallFilter and getFilterChanges accordingly
* uninstallFilter stops the polling goroutine
* getFilterChanges returns the filter's internal list of block hashes and resets it

View File

@ -52,7 +52,6 @@ func (e *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
// UninstallFilter uninstalls a filter with the given ID.
func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
// TODO
e.filters[id].uninstallFilter()
delete(e.filters, id)
return true
@ -62,7 +61,7 @@ func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
// If the filter is a log filter, it returns an array of Logs.
// If the filter is a block filter, it returns an array of block hashes.
// If the filter is a pending transaction filter, it returns an array of transaction hashes.
func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
return e.filters[id].getFilterChanges()
}

View File

@ -1,9 +1,11 @@
package rpc
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
)
@ -13,13 +15,24 @@ import (
Used to set the criteria passed in from RPC params
*/
// Filter can be used to retrieve and filter logs.
const blockFilter = "block"
const pendingTxFilter = "pending"
const logFilter = "log"
// Filter can be used to retrieve and filter logs, blocks, or pending transactions.
type Filter struct {
backend Backend
fromBlock, toBlock *big.Int // start and end block numbers
addresses []common.Address // contract addresses to watch
topics [][]common.Hash // log topics to watch for
blockHash *common.Hash // Block hash if filtering a single block
typ string
hashes []common.Hash // filtered block or transaction hashes
logs []*ethtypes.Log //nolint // filtered logs
stopped bool // set to true once filter in uninstalled
err error
}
// NewFilter returns a new Filter
@ -30,6 +43,8 @@ func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter {
toBlock: criteria.ToBlock,
addresses: criteria.Addresses,
topics: criteria.Topics,
typ: logFilter,
stopped: false,
}
}
@ -42,33 +57,92 @@ func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) *
addresses: criteria.Addresses,
topics: criteria.Topics,
blockHash: criteria.BlockHash,
typ: logFilter,
}
}
// NewBlockFilter creates a new filter that notifies when a block arrives.
func NewBlockFilter(backend Backend) *Filter {
// TODO: finish
filter := NewFilter(backend, nil)
filter := NewFilter(backend, &filters.FilterCriteria{})
filter.typ = blockFilter
go func() {
err := filter.pollForBlocks()
if err != nil {
filter.err = err
}
}()
return filter
}
func (f *Filter) pollForBlocks() error {
prev := hexutil.Uint64(0)
for {
if f.stopped {
return nil
}
num, err := f.backend.BlockNumber()
if err != nil {
return err
}
if num == prev {
continue
}
block, err := f.backend.GetBlockByNumber(BlockNumber(num), false)
if err != nil {
return err
}
hashBytes, ok := block["hash"].(hexutil.Bytes)
if !ok {
return errors.New("could not convert block hash to hexutil.Bytes")
}
hash := common.BytesToHash([]byte(hashBytes))
f.hashes = append(f.hashes, hash)
prev = num
// TODO: should we add a delay?
}
}
// NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives.
func NewPendingTransactionFilter(backend Backend) *Filter {
// TODO: finish
filter := NewFilter(backend, nil)
filter.typ = pendingTxFilter
return filter
}
func (f *Filter) uninstallFilter() {
// TODO
f.stopped = true
}
func (f *Filter) getFilterChanges() interface{} {
func (f *Filter) getFilterChanges() (interface{}, error) {
switch f.typ {
case blockFilter:
if f.err != nil {
return nil, f.err
}
blocks := make([]common.Hash, len(f.hashes))
copy(blocks, f.hashes)
f.hashes = []common.Hash{}
return blocks, nil
case pendingTxFilter:
// TODO
// we might want to use an interface for Filters themselves because of this function, it may return an array of logs
// or an array of hashes, depending of whether Filter is a log filter or a block/transaction filter.
// or, we can add a type field to Filter.
return nil
case logFilter:
// TODO
}
return nil, nil
}
func (f *Filter) getFilterLogs() []*ethtypes.Log {

View File

@ -33,7 +33,7 @@ var addr = fmt.Sprintf("http://%s:%d", host, port)
type Request struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params []string `json:"params"`
Params interface{} `json:"params"`
ID int `json:"id"`
}
@ -49,7 +49,7 @@ type Response struct {
Result json.RawMessage `json:"result,omitempty"`
}
func createRequest(method string, params []string) Request {
func createRequest(method string, params interface{}) Request {
return Request{
Version: "2.0",
Method: method,
@ -58,7 +58,7 @@ func createRequest(method string, params []string) Request {
}
}
func call(method string, params []string) (*Response, error) {
func call(t *testing.T, method string, params interface{}) (*Response, error) {
req, err := json.Marshal(createRequest(method, params))
if err != nil {
return nil, err
@ -67,23 +67,23 @@ func call(method string, params []string) (*Response, error) {
/* #nosec */
res, err := http.Post(addr, "application/json", bytes.NewBuffer(req))
if err != nil {
return nil, err
t.Fatal(err)
}
decoder := json.NewDecoder(res.Body)
var rpcRes *Response
err = decoder.Decode(&rpcRes)
if err != nil {
return nil, err
t.Fatal(err)
}
if rpcRes.Error != nil {
return nil, errors.New(rpcRes.Error.Message)
t.Fatal(errors.New(rpcRes.Error.Message))
}
err = res.Body.Close()
if err != nil {
return nil, err
t.Fatal(err)
}
return rpcRes, nil
@ -92,7 +92,7 @@ func call(method string, params []string) (*Response, error) {
func TestEth_protocolVersion(t *testing.T) {
expectedRes := hexutil.Uint(version.ProtocolVersion)
rpcRes, err := call("eth_protocolVersion", []string{})
rpcRes, err := call(t, "eth_protocolVersion", []string{})
require.NoError(t, err)
var res hexutil.Uint
@ -104,7 +104,7 @@ func TestEth_protocolVersion(t *testing.T) {
}
func TestEth_blockNumber(t *testing.T) {
rpcRes, err := call("eth_blockNumber", []string{})
rpcRes, err := call(t, "eth_blockNumber", []string{})
require.NoError(t, err)
var res hexutil.Uint64
@ -115,7 +115,7 @@ func TestEth_blockNumber(t *testing.T) {
}
func TestEth_GetBalance(t *testing.T) {
rpcRes, err := call("eth_getBalance", []string{addrA, "0x0"})
rpcRes, err := call(t, "eth_getBalance", []string{addrA, "0x0"})
require.NoError(t, err)
var res hexutil.Big
@ -132,7 +132,7 @@ func TestEth_GetBalance(t *testing.T) {
func TestEth_GetStorageAt(t *testing.T) {
expectedRes := hexutil.Bytes{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
rpcRes, err := call("eth_getStorageAt", []string{addrA, string(addrAStoreKey), "0x0"})
rpcRes, err := call(t, "eth_getStorageAt", []string{addrA, string(addrAStoreKey), "0x0"})
require.NoError(t, err)
var storage hexutil.Bytes
@ -146,7 +146,7 @@ func TestEth_GetStorageAt(t *testing.T) {
func TestEth_GetCode(t *testing.T) {
expectedRes := hexutil.Bytes{}
rpcRes, err := call("eth_getCode", []string{addrA, "0x0"})
rpcRes, err := call(t, "eth_getCode", []string{addrA, "0x0"})
require.NoError(t, err)
var code hexutil.Bytes
@ -157,3 +157,15 @@ func TestEth_GetCode(t *testing.T) {
t.Logf("Got code [%X] for %s\n", code, addrA)
require.True(t, bytes.Equal(expectedRes, code), "expected: %X got: %X", expectedRes, code)
}
func TestEth_NewFilter(t *testing.T) {
param := make([]map[string][]string, 1)
param[0] = make(map[string][]string)
param[0]["topics"] = []string{"0x0000000000000000000000000000000000000000000000000000000012341234"}
rpcRes, err := call(t, "eth_newFilter", param)
require.NoError(t, err)
var code hexutil.Bytes
err = code.UnmarshalJSON(rpcRes.Result)
require.NoError(t, err)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/ethermint/x/evm/types"
@ -22,7 +23,7 @@ func GetQueryCmd(moduleName string, cdc *codec.Codec) *cobra.Command {
SuggestionsMinimumDistance: 2,
RunE: client.ValidateCmd,
}
evmQueryCmd.AddCommand(client.GetCommands(
evmQueryCmd.AddCommand(flags.GetCommands(
GetCmdGetStorageAt(moduleName, cdc),
GetCmdGetCode(moduleName, cdc),
)...)

View File

@ -14,6 +14,7 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth"
@ -33,7 +34,7 @@ func GetTxCmd(storeKey string, cdc *codec.Codec) *cobra.Command {
RunE: client.ValidateCmd,
}
evmTxCmd.AddCommand(client.PostCommands(
evmTxCmd.AddCommand(flags.PostCommands(
GetCmdGenTx(cdc),
GetCmdGenCreateTx(cdc),
)...)
@ -73,7 +74,7 @@ func GetCmdGenTx(cdc *codec.Codec) *cobra.Command {
data, err = hexutil.Decode(payload)
if err != nil {
fmt.Println(err)
return err
}
}
@ -117,7 +118,7 @@ func GetCmdGenCreateTx(cdc *codec.Codec) *cobra.Command {
data, err := hexutil.Decode(payload)
if err != nil {
fmt.Println(err)
return err
}
var amount int64

View File

@ -97,10 +97,6 @@ func (k *Keeper) GetBlockBloomMapping(ctx sdk.Context, height int64) (ethtypes.B
}
bloom := store.Get(types.BloomKey(bz))
if len(bloom) == 0 {
return ethtypes.BytesToBloom([]byte{}), fmt.Errorf("block with bloombits %v not found", bloom)
}
return ethtypes.BytesToBloom(bloom), nil
}