From 8c5ce1107b3110c7cb735d8dfa91c9c701393c85 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 14 Nov 2022 14:48:01 +0100 Subject: [PATCH] eth/filters: send rpctransactions in pending-subscription (#26126) This PR changes the pending tx subscription to return RPCTransaction types instead of normal Transaction objects. This will fix the inconsistencies with other tx returning API methods (i.e. getTransactionByHash), and also fill in the sender value for the tx. co-authored by @s1na --- accounts/abi/bind/backends/simulated.go | 8 ++++++++ eth/filters/api.go | 6 +++++- eth/filters/filter_system.go | 3 +++ eth/filters/filter_system_test.go | 8 ++++++++ internal/ethapi/api.go | 16 ++++++++-------- internal/ethapi/backend.go | 10 ++++++++-- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 4c259e03c..0224488dd 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -915,6 +915,14 @@ func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.Matche panic("not supported") } +func (fb *filterBackend) ChainConfig() *params.ChainConfig { + panic("not supported") +} + +func (fb *filterBackend) CurrentHeader() *types.Header { + panic("not supported") +} + func nullSubscription() event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { <-quit diff --git a/eth/filters/api.go b/eth/filters/api.go index f52bff6f3..0b4e9a91a 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/rpc" ) @@ -147,15 +148,18 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) go func() { txs := make(chan []*types.Transaction, 128) pendingTxSub := api.events.SubscribePendingTxs(txs) + chainConfig := api.sys.backend.ChainConfig() for { select { case txs := <-txs: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification + latest := api.sys.backend.CurrentHeader() for _, tx := range txs { if fullTx != nil && *fullTx { - notifier.Notify(rpcSub.ID, tx) + rpcTx := ethapi.NewRPCPendingTransaction(tx, latest, chainConfig) + notifier.Notify(rpcSub.ID, rpcTx) } else { notifier.Notify(rpcSub.ID, tx.Hash()) } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index e86a67abf..ea0233dd0 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" lru "github.com/hashicorp/golang-lru" ) @@ -61,6 +62,8 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) PendingBlockAndReceipts() (*types.Block, types.Receipts) + CurrentHeader() *types.Header + ChainConfig() *params.ChainConfig SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index a41271f7b..0609acc42 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -50,6 +50,14 @@ type testBackend struct { chainFeed event.Feed } +func (b *testBackend) ChainConfig() *params.ChainConfig { + panic("implement me") +} + +func (b *testBackend) CurrentHeader() *types.Header { + panic("implement me") +} + func (b *testBackend) ChainDb() ethdb.Database { return b.db } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 538644908..ea0cbfe86 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -171,7 +171,7 @@ func (s *TxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction { for account, txs := range pending { dump := make(map[string]*RPCTransaction) for _, tx := range txs { - dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) + dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) } content["pending"][account.Hex()] = dump } @@ -179,7 +179,7 @@ func (s *TxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction { for account, txs := range queue { dump := make(map[string]*RPCTransaction) for _, tx := range txs { - dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) + dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) } content["queued"][account.Hex()] = dump } @@ -195,14 +195,14 @@ func (s *TxPoolAPI) ContentFrom(addr common.Address) map[string]map[string]*RPCT // Build the pending transactions dump := make(map[string]*RPCTransaction, len(pending)) for _, tx := range pending { - dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) + dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) } content["pending"] = dump // Build the queued transactions dump = make(map[string]*RPCTransaction, len(queue)) for _, tx := range queue { - dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) + dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()) } content["queued"] = dump @@ -1344,8 +1344,8 @@ func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber return result } -// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation -func newRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction { +// NewRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation +func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction { var baseFee *big.Int blockNumber := uint64(0) if current != nil { @@ -1577,7 +1577,7 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H } // No finalized transaction, try to retrieve it from the pool if tx := s.b.GetPoolTransaction(hash); tx != nil { - return newRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil + return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil } // Transaction unknown, return as such @@ -1847,7 +1847,7 @@ func (s *TransactionAPI) PendingTransactions() ([]*RPCTransaction, error) { for _, tx := range pending { from, _ := types.Sender(s.signer, tx) if _, exists := accounts[from]; exists { - transactions = append(transactions, newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())) + transactions = append(transactions, NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())) } } return transactions, nil diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 5b4ceb631..3bc7819a7 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -27,10 +27,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" @@ -87,9 +87,15 @@ type Backend interface { ChainConfig() *params.ChainConfig Engine() consensus.Engine + // This is copied from filters.Backend // eth/filters needs to be initialized from this backend type, so methods needed by // it must also be included here. - filters.Backend + GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } func GetAPIs(apiBackend Backend) []rpc.API {