From db4cee7eacec6010336642ae1d9d142343dfdcbc Mon Sep 17 00:00:00 2001 From: noot <36753753+noot@users.noreply.github.com> Date: Tue, 7 Apr 2020 16:00:06 -0400 Subject: [PATCH] eth_newBlockFilter and related functionality (#232) * add filter types for block, pending tx, log * implement pollForBlocks for block filters * implement getFilterChanges for block filter * implement uninstall filter by stopping polling --- CHANGELOG.md | 8 ++++ rpc/filter_api.go | 3 +- rpc/filters.go | 94 ++++++++++++++++++++++++++++++++++----- rpc/tester/tester_test.go | 42 ++++++++++------- x/evm/client/cli/query.go | 3 +- x/evm/client/cli/tx.go | 7 +-- x/evm/keeper/keeper.go | 4 -- 7 files changed, 126 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c89ba0ed..1909ac31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,3 +50,11 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (app/ante) Moved `AnteHandler` implementation to `app/ante` * (keys) Marked `ExportEthKeyCommand` as **UNSAFE** * (x/evm) Moved `BeginBlock` and `EndBlock` to `x/evm/abci.go` + +## Features + +* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement NewBlockFilter in rpc/filters.go which instantiates a polling block filter + * Polls for new blocks via BlockNumber rpc call; if block number changes, it requests the new block via GetBlockByNumber rpc call and adds it to its internal list of blocks + * Update uninstallFilter and getFilterChanges accordingly + * uninstallFilter stops the polling goroutine + * getFilterChanges returns the filter's internal list of block hashes and resets it diff --git a/rpc/filter_api.go b/rpc/filter_api.go index cc8f73fc..724fd040 100644 --- a/rpc/filter_api.go +++ b/rpc/filter_api.go @@ -52,7 +52,6 @@ func (e *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { // UninstallFilter uninstalls a filter with the given ID. func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { - // TODO e.filters[id].uninstallFilter() delete(e.filters, id) return true @@ -62,7 +61,7 @@ func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { // If the filter is a log filter, it returns an array of Logs. // If the filter is a block filter, it returns an array of block hashes. // If the filter is a pending transaction filter, it returns an array of transaction hashes. -func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { +func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { return e.filters[id].getFilterChanges() } diff --git a/rpc/filters.go b/rpc/filters.go index 6e63f317..3b565bba 100644 --- a/rpc/filters.go +++ b/rpc/filters.go @@ -1,9 +1,11 @@ package rpc import ( + "errors" "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" ) @@ -13,13 +15,24 @@ import ( Used to set the criteria passed in from RPC params */ -// Filter can be used to retrieve and filter logs. +const blockFilter = "block" +const pendingTxFilter = "pending" +const logFilter = "log" + +// Filter can be used to retrieve and filter logs, blocks, or pending transactions. type Filter struct { backend Backend fromBlock, toBlock *big.Int // start and end block numbers addresses []common.Address // contract addresses to watch topics [][]common.Hash // log topics to watch for blockHash *common.Hash // Block hash if filtering a single block + + typ string + hashes []common.Hash // filtered block or transaction hashes + logs []*ethtypes.Log //nolint // filtered logs + stopped bool // set to true once filter in uninstalled + + err error } // NewFilter returns a new Filter @@ -30,6 +43,8 @@ func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter { toBlock: criteria.ToBlock, addresses: criteria.Addresses, topics: criteria.Topics, + typ: logFilter, + stopped: false, } } @@ -42,33 +57,92 @@ func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) * addresses: criteria.Addresses, topics: criteria.Topics, blockHash: criteria.BlockHash, + typ: logFilter, } } // NewBlockFilter creates a new filter that notifies when a block arrives. func NewBlockFilter(backend Backend) *Filter { - // TODO: finish - filter := NewFilter(backend, nil) + filter := NewFilter(backend, &filters.FilterCriteria{}) + filter.typ = blockFilter + + go func() { + err := filter.pollForBlocks() + if err != nil { + filter.err = err + } + }() + return filter } +func (f *Filter) pollForBlocks() error { + prev := hexutil.Uint64(0) + + for { + if f.stopped { + return nil + } + + num, err := f.backend.BlockNumber() + if err != nil { + return err + } + + if num == prev { + continue + } + + block, err := f.backend.GetBlockByNumber(BlockNumber(num), false) + if err != nil { + return err + } + + hashBytes, ok := block["hash"].(hexutil.Bytes) + if !ok { + return errors.New("could not convert block hash to hexutil.Bytes") + } + + hash := common.BytesToHash([]byte(hashBytes)) + f.hashes = append(f.hashes, hash) + + prev = num + + // TODO: should we add a delay? + } +} + // NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives. func NewPendingTransactionFilter(backend Backend) *Filter { // TODO: finish filter := NewFilter(backend, nil) + filter.typ = pendingTxFilter return filter } func (f *Filter) uninstallFilter() { - // TODO + f.stopped = true } -func (f *Filter) getFilterChanges() interface{} { - // TODO - // we might want to use an interface for Filters themselves because of this function, it may return an array of logs - // or an array of hashes, depending of whether Filter is a log filter or a block/transaction filter. - // or, we can add a type field to Filter. - return nil +func (f *Filter) getFilterChanges() (interface{}, error) { + switch f.typ { + case blockFilter: + if f.err != nil { + return nil, f.err + } + + blocks := make([]common.Hash, len(f.hashes)) + copy(blocks, f.hashes) + f.hashes = []common.Hash{} + + return blocks, nil + case pendingTxFilter: + // TODO + case logFilter: + // TODO + } + + return nil, nil } func (f *Filter) getFilterLogs() []*ethtypes.Log { diff --git a/rpc/tester/tester_test.go b/rpc/tester/tester_test.go index 4317304b..81755fb7 100644 --- a/rpc/tester/tester_test.go +++ b/rpc/tester/tester_test.go @@ -31,10 +31,10 @@ const ( var addr = fmt.Sprintf("http://%s:%d", host, port) type Request struct { - Version string `json:"jsonrpc"` - Method string `json:"method"` - Params []string `json:"params"` - ID int `json:"id"` + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params"` + ID int `json:"id"` } type RPCError struct { @@ -49,7 +49,7 @@ type Response struct { Result json.RawMessage `json:"result,omitempty"` } -func createRequest(method string, params []string) Request { +func createRequest(method string, params interface{}) Request { return Request{ Version: "2.0", Method: method, @@ -58,7 +58,7 @@ func createRequest(method string, params []string) Request { } } -func call(method string, params []string) (*Response, error) { +func call(t *testing.T, method string, params interface{}) (*Response, error) { req, err := json.Marshal(createRequest(method, params)) if err != nil { return nil, err @@ -67,23 +67,23 @@ func call(method string, params []string) (*Response, error) { /* #nosec */ res, err := http.Post(addr, "application/json", bytes.NewBuffer(req)) if err != nil { - return nil, err + t.Fatal(err) } decoder := json.NewDecoder(res.Body) var rpcRes *Response err = decoder.Decode(&rpcRes) if err != nil { - return nil, err + t.Fatal(err) } if rpcRes.Error != nil { - return nil, errors.New(rpcRes.Error.Message) + t.Fatal(errors.New(rpcRes.Error.Message)) } err = res.Body.Close() if err != nil { - return nil, err + t.Fatal(err) } return rpcRes, nil @@ -92,7 +92,7 @@ func call(method string, params []string) (*Response, error) { func TestEth_protocolVersion(t *testing.T) { expectedRes := hexutil.Uint(version.ProtocolVersion) - rpcRes, err := call("eth_protocolVersion", []string{}) + rpcRes, err := call(t, "eth_protocolVersion", []string{}) require.NoError(t, err) var res hexutil.Uint @@ -104,7 +104,7 @@ func TestEth_protocolVersion(t *testing.T) { } func TestEth_blockNumber(t *testing.T) { - rpcRes, err := call("eth_blockNumber", []string{}) + rpcRes, err := call(t, "eth_blockNumber", []string{}) require.NoError(t, err) var res hexutil.Uint64 @@ -115,7 +115,7 @@ func TestEth_blockNumber(t *testing.T) { } func TestEth_GetBalance(t *testing.T) { - rpcRes, err := call("eth_getBalance", []string{addrA, "0x0"}) + rpcRes, err := call(t, "eth_getBalance", []string{addrA, "0x0"}) require.NoError(t, err) var res hexutil.Big @@ -132,7 +132,7 @@ func TestEth_GetBalance(t *testing.T) { func TestEth_GetStorageAt(t *testing.T) { expectedRes := hexutil.Bytes{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} - rpcRes, err := call("eth_getStorageAt", []string{addrA, string(addrAStoreKey), "0x0"}) + rpcRes, err := call(t, "eth_getStorageAt", []string{addrA, string(addrAStoreKey), "0x0"}) require.NoError(t, err) var storage hexutil.Bytes @@ -146,7 +146,7 @@ func TestEth_GetStorageAt(t *testing.T) { func TestEth_GetCode(t *testing.T) { expectedRes := hexutil.Bytes{} - rpcRes, err := call("eth_getCode", []string{addrA, "0x0"}) + rpcRes, err := call(t, "eth_getCode", []string{addrA, "0x0"}) require.NoError(t, err) var code hexutil.Bytes @@ -157,3 +157,15 @@ func TestEth_GetCode(t *testing.T) { t.Logf("Got code [%X] for %s\n", code, addrA) require.True(t, bytes.Equal(expectedRes, code), "expected: %X got: %X", expectedRes, code) } + +func TestEth_NewFilter(t *testing.T) { + param := make([]map[string][]string, 1) + param[0] = make(map[string][]string) + param[0]["topics"] = []string{"0x0000000000000000000000000000000000000000000000000000000012341234"} + rpcRes, err := call(t, "eth_newFilter", param) + require.NoError(t, err) + + var code hexutil.Bytes + err = code.UnmarshalJSON(rpcRes.Result) + require.NoError(t, err) +} diff --git a/x/evm/client/cli/query.go b/x/evm/client/cli/query.go index 61f15769..f274a20e 100644 --- a/x/evm/client/cli/query.go +++ b/x/evm/client/cli/query.go @@ -8,6 +8,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/context" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/ethermint/x/evm/types" @@ -22,7 +23,7 @@ func GetQueryCmd(moduleName string, cdc *codec.Codec) *cobra.Command { SuggestionsMinimumDistance: 2, RunE: client.ValidateCmd, } - evmQueryCmd.AddCommand(client.GetCommands( + evmQueryCmd.AddCommand(flags.GetCommands( GetCmdGetStorageAt(moduleName, cdc), GetCmdGetCode(moduleName, cdc), )...) diff --git a/x/evm/client/cli/tx.go b/x/evm/client/cli/tx.go index 2f2ec78d..c40a937a 100644 --- a/x/evm/client/cli/tx.go +++ b/x/evm/client/cli/tx.go @@ -14,6 +14,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/context" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/auth" @@ -33,7 +34,7 @@ func GetTxCmd(storeKey string, cdc *codec.Codec) *cobra.Command { RunE: client.ValidateCmd, } - evmTxCmd.AddCommand(client.PostCommands( + evmTxCmd.AddCommand(flags.PostCommands( GetCmdGenTx(cdc), GetCmdGenCreateTx(cdc), )...) @@ -73,7 +74,7 @@ func GetCmdGenTx(cdc *codec.Codec) *cobra.Command { data, err = hexutil.Decode(payload) if err != nil { - fmt.Println(err) + return err } } @@ -117,7 +118,7 @@ func GetCmdGenCreateTx(cdc *codec.Codec) *cobra.Command { data, err := hexutil.Decode(payload) if err != nil { - fmt.Println(err) + return err } var amount int64 diff --git a/x/evm/keeper/keeper.go b/x/evm/keeper/keeper.go index 111392b9..234c946c 100644 --- a/x/evm/keeper/keeper.go +++ b/x/evm/keeper/keeper.go @@ -97,10 +97,6 @@ func (k *Keeper) GetBlockBloomMapping(ctx sdk.Context, height int64) (ethtypes.B } bloom := store.Get(types.BloomKey(bz)) - if len(bloom) == 0 { - return ethtypes.BytesToBloom([]byte{}), fmt.Errorf("block with bloombits %v not found", bloom) - } - return ethtypes.BytesToBloom(bloom), nil }