fix: eth_newPendingTransactionFilter doesn't return ethereum tx hash (#1012)

* Problem: eth_newPendingTransactionFilter don't return correct tx hash

Closes: #1011
Solution:
- use eth tx hash rather than tendermint one

* changelog

* remove copied TODO comment and ignore err result of Notify

* add e2e test

* fix ws client in e2e test

* fix test

* Apply suggestions from code review

* Apply suggestions from code review

Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
This commit is contained in:
yihuang 2022-03-24 15:56:59 +08:00 committed by GitHub
parent 42b5e443da
commit 70d52948da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 24 deletions

View File

@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (ante) [tharsis#991](https://github.com/tharsis/ethermint/pull/991) Set an upper bound to gasWanted to prevent DoS attack. * (ante) [tharsis#991](https://github.com/tharsis/ethermint/pull/991) Set an upper bound to gasWanted to prevent DoS attack.
* (rpc) [tharsis#1006](https://github.com/tharsis/ethermint/pull/1006) Use `string` as the parameters type to correct ambiguous results. * (rpc) [tharsis#1006](https://github.com/tharsis/ethermint/pull/1006) Use `string` as the parameters type to correct ambiguous results.
* (ante) [tharsis#1004](https://github.com/tharsis/ethermint/pull/1004) make MaxTxGasWanted configurable. * (ante) [tharsis#1004](https://github.com/tharsis/ethermint/pull/1004) make MaxTxGasWanted configurable.
* (rpc) [tharsis#1012](https://github.com/tharsis/ethermint/pull/1012) fix the tx hash in filter entries created by `eth_newPendingTransactionFilter`.
## [v0.11.0] - 2022-03-06 ## [v0.11.0] - 2022-03-06

View File

@ -57,7 +57,7 @@ func GetRPCAPIs(ctx *server.Context, clientCtx client.Context, tmWSClient *rpccl
rpc.API{ rpc.API{
Namespace: EthNamespace, Namespace: EthNamespace,
Version: apiVersion, Version: apiVersion,
Service: filters.NewPublicAPI(ctx.Logger, tmWSClient, evmBackend), Service: filters.NewPublicAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend),
Public: true, Public: true,
}, },
) )

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/cosmos/cosmos-sdk/client"
"github.com/tharsis/ethermint/rpc/ethereum/types" "github.com/tharsis/ethermint/rpc/ethereum/types"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -56,6 +57,7 @@ type filter struct {
// information related to the Ethereum protocol such as blocks, transactions and logs. // information related to the Ethereum protocol such as blocks, transactions and logs.
type PublicFilterAPI struct { type PublicFilterAPI struct {
logger log.Logger logger log.Logger
clientCtx client.Context
backend Backend backend Backend
events *EventSystem events *EventSystem
filtersMu sync.Mutex filtersMu sync.Mutex
@ -63,13 +65,14 @@ type PublicFilterAPI struct {
} }
// NewPublicAPI returns a new PublicFilterAPI instance. // NewPublicAPI returns a new PublicFilterAPI instance.
func NewPublicAPI(logger log.Logger, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI { func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI {
logger = logger.With("api", "filter") logger = logger.With("api", "filter")
api := &PublicFilterAPI{ api := &PublicFilterAPI{
logger: logger, logger: logger,
backend: backend, clientCtx: clientCtx,
filters: make(map[rpc.ID]*filter), backend: backend,
events: NewEventSystem(logger, tmWSClient), filters: make(map[rpc.ID]*filter),
events: NewEventSystem(logger, tmWSClient),
} }
go api.timeoutLoop() go api.timeoutLoop()
@ -141,11 +144,20 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
continue continue
} }
txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash()) tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx)
if err != nil {
api.logger.Debug("fail to decode tx", "error", err.Error())
continue
}
api.filtersMu.Lock() api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID()]; found { if f, found := api.filters[pendingTxSub.ID()]; found {
f.hashes = append(f.hashes, txHash) for _, msg := range tx.GetMsgs() {
ethTx, ok := msg.(*evmtypes.MsgEthereumTx)
if ok {
f.hashes = append(f.hashes, common.HexToHash(ethTx.Hash))
}
}
} }
api.filtersMu.Unlock() api.filtersMu.Unlock()
case <-errCh: case <-errCh:
@ -198,13 +210,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
continue continue
} }
txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash()) tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx)
// To keep the original behavior, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
err = notifier.Notify(rpcSub.ID, txHash)
if err != nil { if err != nil {
return api.logger.Debug("fail to decode tx", "error", err.Error())
continue
}
for _, msg := range tx.GetMsgs() {
ethTx, ok := msg.(*evmtypes.MsgEthereumTx)
if ok {
_ = notifier.Notify(rpcSub.ID, common.HexToHash(ethTx.Hash))
}
} }
case <-rpcSub.Err(): case <-rpcSub.Err():
pendingTxSub.Unsubscribe(api.events) pendingTxSub.Unsubscribe(api.events)
@ -314,11 +330,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
// TODO: fetch bloom from events // TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee)
err = notifier.Notify(rpcSub.ID, header) _ = notifier.Notify(rpcSub.ID, header)
if err != nil {
headersSub.err <- err
return
}
case <-rpcSub.Err(): case <-rpcSub.Err():
headersSub.Unsubscribe(api.events) headersSub.Unsubscribe(api.events)
return return
@ -381,10 +393,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri
logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
for _, log := range logs { for _, log := range logs {
err = notifier.Notify(rpcSub.ID, log) _ = notifier.Notify(rpcSub.ID, log)
if err != nil {
return
}
} }
case <-rpcSub.Err(): // client send an unsubscribe request case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe(api.events) logsSub.Unsubscribe(api.events)

View File

@ -721,3 +721,29 @@ func (s *IntegrationTestSuite) TestWeb3Sha3() {
}) })
} }
} }
func (s *IntegrationTestSuite) TestPendingTransactionFilter() {
var (
filterID string
filterResult []common.Hash
)
// create filter
err := s.rpcClient.Call(&filterID, "eth_newPendingTransactionFilter")
s.Require().NoError(err)
// check filter result is empty
err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID)
s.Require().NoError(err)
s.Require().Empty(filterResult)
// send transaction
signedTx := s.signValidTx(common.HexToAddress("0x378c50D9264C63F3F92B806d4ee56E9D86FfB3Ec"), big.NewInt(10)).AsTransaction()
err = s.network.Validators[0].JSONRPCClient.SendTransaction(s.ctx, signedTx)
s.Require().NoError(err)
s.waitForTransaction()
s.expectSuccessReceipt(signedTx.Hash())
// check filter changes match the tx hash
err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID)
s.Require().NoError(err)
s.Require().Equal([]common.Hash{signedTx.Hash()}, filterResult)
}

View File

@ -128,7 +128,7 @@ func startInProcess(cfg Config, val *Validator) error {
} }
tmEndpoint := "/websocket" tmEndpoint := "/websocket"
tmRPCAddr := fmt.Sprintf("tcp://%s", val.AppConfig.GRPC.Address) tmRPCAddr := val.RPCAddress
val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, *val.AppConfig) val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, *val.AppConfig)
if err != nil { if err != nil {