all: move main transaction pool into a subpool (#27463)

* all: move main transaction pool into a subpool

* go.mod: remove superfluous updates

* core/txpool: review fixes, handle txs rejected by all subpools

* core/txpool: typos
This commit is contained in:
Péter Szilágyi 2023-06-16 15:29:40 +03:00 committed by GitHub
parent c375936e81
commit d40a255e97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 2931 additions and 2105 deletions

View File

@ -41,7 +41,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -334,18 +334,18 @@ var (
TxPoolJournalFlag = &cli.StringFlag{
Name: "txpool.journal",
Usage: "Disk journal for local transaction to survive node restarts",
Value: txpool.DefaultConfig.Journal,
Value: ethconfig.Defaults.TxPool.Journal,
Category: flags.TxPoolCategory,
}
TxPoolRejournalFlag = &cli.DurationFlag{
Name: "txpool.rejournal",
Usage: "Time interval to regenerate the local transaction journal",
Value: txpool.DefaultConfig.Rejournal,
Value: ethconfig.Defaults.TxPool.Rejournal,
Category: flags.TxPoolCategory,
}
TxPoolPriceLimitFlag = &cli.Uint64Flag{
Name: "txpool.pricelimit",
Usage: "Minimum gas price limit to enforce for acceptance into the pool",
Usage: "Minimum gas price tip to enforce for acceptance into the pool",
Value: ethconfig.Defaults.TxPool.PriceLimit,
Category: flags.TxPoolCategory,
}
@ -385,7 +385,6 @@ var (
Value: ethconfig.Defaults.TxPool.Lifetime,
Category: flags.TxPoolCategory,
}
// Performance tuning settings
CacheFlag = &cli.IntFlag{
Name: "cache",
@ -1500,7 +1499,7 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config, light bool) {
}
}
func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolLocalsFlag.Name) {
locals := strings.Split(ctx.String(TxPoolLocalsFlag.Name), ",")
for _, account := range locals {

53
core/txpool/errors.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
import "errors"
var (
// ErrAlreadyKnown is returned if the transactions is already contained
// within the pool.
ErrAlreadyKnown = errors.New("already known")
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
// ErrNegativeValue is a sanity error to ensure no one is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")
// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")
)

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"errors"

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"crypto/ecdsa"
@ -33,7 +33,7 @@ func pricedValuedTransaction(nonce uint64, value int64, gaslimit uint64, gaspric
return tx
}
func count(t *testing.T, pool *TxPool) (pending int, queued int) {
func count(t *testing.T, pool *LegacyPool) (pending int, queued int) {
t.Helper()
pending, queued = pool.stats()
if err := validatePoolInternals(pool); err != nil {
@ -42,7 +42,7 @@ func count(t *testing.T, pool *TxPool) (pending int, queued int) {
return pending, queued
}
func fillPool(t testing.TB, pool *TxPool) {
func fillPool(t testing.TB, pool *LegacyPool) {
t.Helper()
// Create a number of test accounts, fund them and make transactions
executableTxs := types.Transactions{}
@ -56,8 +56,8 @@ func fillPool(t testing.TB, pool *TxPool) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotesSync(executableTxs)
pool.AddRemotesSync(nonExecutableTxs)
pool.addRemotesSync(executableTxs)
pool.addRemotesSync(nonExecutableTxs)
pending, queued := pool.Stats()
slots := pool.all.Slots()
// sanity-check that the test prerequisites are ok (pending full)
@ -79,12 +79,13 @@ func TestTransactionFutureAttack(t *testing.T) {
// Create the pool to test the limit enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
fillPool(t, pool)
pending, _ := pool.Stats()
// Now, future transaction attack starts, let's add a bunch of expensive non-executables, and see if the pending-count drops
@ -96,7 +97,7 @@ func TestTransactionFutureAttack(t *testing.T) {
futureTxs = append(futureTxs, pricedTransaction(1000+uint64(j), 100000, big.NewInt(500), key))
}
for i := 0; i < 5; i++ {
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
newPending, newQueued := count(t, pool)
t.Logf("pending: %d queued: %d, all: %d\n", newPending, newQueued, pool.all.Slots())
}
@ -115,9 +116,10 @@ func TestTransactionFuture1559(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
@ -131,7 +133,7 @@ func TestTransactionFuture1559(t *testing.T) {
for j := 0; j < int(pool.config.GlobalSlots+pool.config.GlobalQueue); j++ {
futureTxs = append(futureTxs, dynamicFeeTx(1000+uint64(j), 100000, big.NewInt(200), big.NewInt(101), key))
}
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
newPending, _ := pool.Stats()
// Pending should not have been touched
@ -147,9 +149,10 @@ func TestTransactionZAttack(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
@ -181,7 +184,7 @@ func TestTransactionZAttack(t *testing.T) {
key, _ := crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(100000000000))
futureTxs = append(futureTxs, pricedTransaction(1000+uint64(j), 21000, big.NewInt(500), key))
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
overDraftTxs := types.Transactions{}
@ -192,11 +195,11 @@ func TestTransactionZAttack(t *testing.T) {
overDraftTxs = append(overDraftTxs, pricedValuedTransaction(uint64(j), 600000000000, 21000, big.NewInt(500), key))
}
}
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
newPending, newQueued := count(t, pool)
newIvPending := countInvalidPending()
@ -214,12 +217,13 @@ func TestTransactionZAttack(t *testing.T) {
func BenchmarkFutureAttack(b *testing.B) {
// Create the pool to test the limit enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
fillPool(b, pool)
key, _ := crypto.GenerateKey()
@ -231,6 +235,6 @@ func BenchmarkFutureAttack(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < 5; i++ {
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"container/heap"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"math/big"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"sync"

111
core/txpool/subpool.go Normal file
View File

@ -0,0 +1,111 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
)
// Transaction is a helper struct to group together a canonical transaction with
// satellite data items that are needed by the pool but are not part of the chain.
type Transaction struct {
Tx *types.Transaction // Canonical transaction
BlobTxBlobs []kzg4844.Blob // Blobs needed by the blob pool
BlobTxCommits []kzg4844.Commitment // Commitments needed by the blob pool
BlobTxProofs []kzg4844.Proof // Proofs needed by the blob pool
}
// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
// production, this interface defines the common methods that allow the primary
// transaction pool to manage the subpools.
type SubPool interface {
// Filter is a selector used to decide whether a transaction whould be added
// to this particular subpool.
Filter(tx *types.Transaction) bool
// Init sets the base parameters of the subpool, allowing it to load any saved
// transactions from disk and also permitting internal maintenance routines to
// start up.
//
// These should not be passed as a constructor argument - nor should the pools
// start by themselves - in order to keep multiple subpools in lockstep with
// one another.
Init(gasTip *big.Int, head *types.Header) error
// Close terminates any background processing threads and releases any held
// resources.
Close() error
// Reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
Reset(oldHead, newHead *types.Header)
// SetGasTip updates the minimum price required by the subpool for a new
// transaction, and drops all transactions below this threshold.
SetGasTip(tip *big.Int)
// Has returns an indicator whether subpool has a transaction cached with the
// given hash.
Has(hash common.Hash) bool
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *Transaction
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*Transaction, local bool, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*types.Transaction
// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Nonce(addr common.Address) uint64
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
Stats() (int, int)
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction)
// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
// Locals retrieves the accounts currently considered local by the pool.
Locals() []common.Address
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus
}

File diff suppressed because it is too large Load Diff

View File

@ -555,7 +555,7 @@ func (s *TxByPriceAndTime) Pop() interface{} {
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
txs map[common.Address][]*Transaction // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee
@ -566,7 +566,7 @@ type TransactionsByPriceAndNonce struct {
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, baseFee *big.Int) *TransactionsByPriceAndNonce {
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address][]*Transaction, baseFee *big.Int) *TransactionsByPriceAndNonce {
// Initialize a price and received time based heap with the head transactions
heads := make(TxByPriceAndTime, 0, len(txs))
for from, accTxs := range txs {

View File

@ -57,9 +57,9 @@ func MakeSigner(config *params.ChainConfig, blockNumber *big.Int, blockTime uint
}
// LatestSigner returns the 'most permissive' Signer available for the given chain
// configuration. Specifically, this enables support of EIP-155 replay protection and
// EIP-2930 access list transactions when their respective forks are scheduled to occur at
// any block number in the chain config.
// configuration. Specifically, this enables support of all types of transacrions
// when their respective forks are scheduled to occur at any block number (or time)
// in the chain config.
//
// Use this in transaction-handling code where the current block number is unknown. If you
// have the current block number available, use MakeSigner instead.

View File

@ -281,7 +281,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
signer := LatestSignerForChainID(common.Big1)
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address]Transactions{}
groups := map[common.Address][]*Transaction{}
expectedCount := 0
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
@ -368,7 +368,7 @@ func TestTransactionTimeSort(t *testing.T) {
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

View File

@ -294,7 +294,7 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
return b.eth.txPool.Add([]*txpool.Transaction{&txpool.Transaction{Tx: signedTx}}, true, false)[0]
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
@ -307,7 +307,10 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
}
func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
return b.eth.txPool.Get(hash)
if tx := b.eth.txPool.Get(hash); tx != nil {
return tx.Tx
}
return nil
}
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
@ -319,24 +322,24 @@ func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (
return b.eth.txPool.Nonce(addr), nil
}
func (b *EthAPIBackend) Stats() (pending int, queued int) {
func (b *EthAPIBackend) Stats() (runnable int, blocked int) {
return b.eth.txPool.Stats()
}
func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
return b.eth.TxPool().Content()
func (b *EthAPIBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return b.eth.txPool.Content()
}
func (b *EthAPIBackend) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
return b.eth.TxPool().ContentFrom(addr)
func (b *EthAPIBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return b.eth.txPool.ContentFrom(addr)
}
func (b *EthAPIBackend) TxPool() *txpool.TxPool {
return b.eth.TxPool()
return b.eth.txPool
}
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.TxPool().SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeNewTxsEvent(ch)
}
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {

View File

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@ -67,6 +68,7 @@ type Ethereum struct {
// Handlers
txPool *txpool.TxPool
blockchain *core.BlockChain
handler *handler
ethDialCandidates enode.Iterator
@ -206,8 +208,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
eth.txPool = txpool.New(config.TxPool, eth.blockchain.Config(), eth.blockchain)
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool})
if err != nil {
return nil, err
}
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
@ -512,7 +518,7 @@ func (s *Ethereum) Stop() error {
// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
s.txPool.Stop()
s.txPool.Close()
s.miner.Close()
s.blockchain.Stop()
s.engine.Close()

View File

@ -35,6 +35,7 @@ import (
beaconConsensus "github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
@ -106,7 +107,7 @@ func TestEth2AssembleBlock(t *testing.T) {
if err != nil {
t.Fatalf("error signing transaction, err=%v", err)
}
ethservice.TxPool().AddLocal(tx)
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[9].Time() + 5,
}
@ -142,7 +143,12 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
api := NewConsensusAPI(ethservice)
// Put the 10th block's tx in the pool and produce a new block
api.eth.TxPool().AddRemotesSync(blocks[9].Transactions())
txs := blocks[9].Transactions()
wrapped := make([]*txpool.Transaction, len(txs))
for i, tx := range txs {
wrapped[i] = &txpool.Transaction{Tx: tx}
}
api.eth.TxPool().Add(wrapped, false, true)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5,
}
@ -181,7 +187,12 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
api := NewConsensusAPI(ethservice)
// Put the 10th block's tx in the pool and produce a new block
ethservice.TxPool().AddLocals(blocks[9].Transactions())
txs := blocks[9].Transactions()
wrapped := make([]*txpool.Transaction, len(txs))
for i, tx := range txs {
wrapped[i] = &txpool.Transaction{Tx: tx}
}
ethservice.TxPool().Add(wrapped, true, false)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5,
}
@ -303,7 +314,7 @@ func TestEth2NewBlock(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false)
execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{
Timestamp: parent.Time() + 5,
@ -472,7 +483,7 @@ func TestFullAPI(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root)
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false)
}
setupBlocks(t, ethservice, 10, parent, callback, nil)
@ -598,7 +609,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
GasPrice: big.NewInt(2 * params.InitialBaseFee),
Data: logCode,
})
ethservice.TxPool().AddRemotesSync([]*types.Transaction{tx})
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, false, true)
var (
params = engine.PayloadAttributes{
Timestamp: parent.Time + 1,
@ -1272,7 +1283,7 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root)
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
ethservice.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false)
}
withdrawals := make([][]*types.Withdrawal, 10)

View File

@ -27,7 +27,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
@ -71,7 +71,7 @@ var Defaults = Config{
SnapshotCache: 102,
FilterLogCacheSize: 32,
Miner: miner.DefaultConfig,
TxPool: txpool.DefaultConfig,
TxPool: legacypool.DefaultConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
@ -139,7 +139,7 @@ type Config struct {
Miner miner.Config
// Transaction pool options
TxPool txpool.Config
TxPool legacypool.Config
// Gas Price Oracle options
GPO gasprice.Config

View File

@ -7,7 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/miner"
@ -47,7 +47,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
Preimages bool
FilterLogCacheSize int
Miner miner.Config
TxPool txpool.Config
TxPool legacypool.Config
GPO gasprice.Config
EnablePreimageRecording bool
DocRoot string `toml:"-"`
@ -133,7 +133,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
Preimages *bool
FilterLogCacheSize *int
Miner *miner.Config
TxPool *txpool.Config
TxPool *legacypool.Config
GPO *gasprice.Config
EnablePreimageRecording *bool
DocRoot *string `toml:"-"`

View File

@ -170,7 +170,7 @@ type TxFetcher struct {
// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
addTxs func([]*txpool.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
step chan struct{} // Notification channel when the fetcher loop iterates
@ -180,14 +180,14 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
@ -294,7 +294,12 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
otherreject int64
)
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
wrapped := make([]*txpool.Transaction, len(batch))
for j, tx := range batch {
wrapped[j] = &txpool.Transaction{Tx: tx}
}
for j, err := range f.addTxs(wrapped) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.

View File

@ -378,7 +378,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -417,7 +417,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -455,7 +455,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -501,7 +501,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -539,7 +539,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -644,7 +644,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -865,7 +865,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%2 == 0 {
@ -938,7 +938,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
@ -964,7 +964,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1017,7 +1017,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1083,7 +1083,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1128,7 +1128,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1155,7 +1155,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1184,7 +1184,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1217,7 +1217,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error {

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/fetcher"
@ -59,14 +60,14 @@ type txPool interface {
// Get retrieves the transaction from local txpool with given
// tx hash.
Get(hash common.Hash) *types.Transaction
Get(hash common.Hash) *txpool.Transaction
// AddRemotes should add the given transactions to the pool.
AddRemotes([]*types.Transaction) []error
// Add should add the given transactions to the pool.
Add(txs []*txpool.Transaction, local bool, sync bool) []error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address]types.Transactions
Pending(enforceTips bool) map[common.Address][]*types.Transaction
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
@ -274,7 +275,10 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return p.RequestTxs(hashes)
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx)
addTxs := func(txs []*txpool.Transaction) []error {
return h.txpool.Add(txs, false, false)
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx)
h.chainSync = newChainSyncer(h)
return h, nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@ -307,14 +308,14 @@ func testSendTransactions(t *testing.T, protocol uint) {
handler := newTestHandler()
defer handler.close()
insert := make([]*types.Transaction, 100)
insert := make([]*txpool.Transaction, 100)
for nonce := range insert {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, 10240))
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
insert[nonce] = tx
insert[nonce] = &txpool.Transaction{Tx: tx}
}
go handler.txpool.AddRemotes(insert) // Need goroutine to not block on feed
go handler.txpool.Add(insert, false, false) // Need goroutine to not block on feed
time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join)
// Create a source handler to send messages through and a sink peer to receive them
@ -375,8 +376,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
}
}
for _, tx := range insert {
if _, ok := seen[tx.Hash()]; !ok {
t.Errorf("missing transaction: %x", tx.Hash())
if _, ok := seen[tx.Tx.Hash()]; !ok {
t.Errorf("missing transaction: %x", tx.Tx.Hash())
}
}
}
@ -433,14 +434,14 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
defer sub.Unsubscribe()
}
// Fill the source pool with transactions and wait for them at the sinks
txs := make([]*types.Transaction, 1024)
txs := make([]*txpool.Transaction, 1024)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
txs[nonce] = tx
txs[nonce] = &txpool.Transaction{Tx: tx}
}
source.txpool.AddRemotes(txs)
source.txpool.Add(txs, false, false)
// Iterate through all the sinks and ensure they all got the transactions
for i := range sinks {

View File

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -71,32 +72,40 @@ func (p *testTxPool) Has(hash common.Hash) bool {
// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
func (p *testTxPool) Get(hash common.Hash) *txpool.Transaction {
p.lock.Lock()
defer p.lock.Unlock()
return p.pool[hash]
if tx := p.pool[hash]; tx != nil {
return &txpool.Transaction{Tx: tx}
}
return nil
}
// AddRemotes appends a batch of transactions to the pool, and notifies any
// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
unwrapped := make([]*types.Transaction, len(txs))
for i, tx := range txs {
unwrapped[i] = tx.Tx
}
p.lock.Lock()
defer p.lock.Unlock()
for _, tx := range txs {
for _, tx := range unwrapped {
p.pool[tx.Hash()] = tx
}
p.txFeed.Send(core.NewTxsEvent{Txs: txs})
return make([]error, len(txs))
p.txFeed.Send(core.NewTxsEvent{Txs: unwrapped})
return make([]error, len(unwrapped))
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transactions {
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
p.lock.RLock()
defer p.lock.RUnlock()
batches := make(map[common.Address]types.Transactions)
batches := make(map[common.Address][]*types.Transaction)
for _, tx := range p.pool {
from, _ := types.Sender(types.HomesteadSigner{}, tx)
batches[from] = append(batches[from], tx)

View File

@ -81,8 +81,8 @@ func (p *Peer) broadcastTransactions() {
)
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
if tx := p.txpool.Get(queue[i]); tx != nil {
txs = append(txs, tx)
size += common.StorageSize(tx.Size())
txs = append(txs, tx.Tx)
size += common.StorageSize(tx.Tx.Size())
}
hashesCount++
}
@ -151,8 +151,8 @@ func (p *Peer) announceTransactions() {
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
if tx := p.txpool.Get(queue[count]); tx != nil {
pending = append(pending, queue[count])
pendingTypes = append(pendingTypes, tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Size()))
pendingTypes = append(pendingTypes, tx.Tx.Type())
pendingSizes = append(pendingSizes, uint32(tx.Tx.Size()))
size += common.HashLength
}
}

View File

@ -23,7 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -90,7 +90,7 @@ type Backend interface {
// TxPool defines the methods needed by the protocol handler to serve transactions.
type TxPool interface {
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction
Get(hash common.Hash) *txpool.Transaction
}
// MakeProtocols constructs the P2P protocol definitions for `eth`.

View File

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -113,19 +114,22 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
for _, block := range bs {
chain.StateCache().TrieDB().Commit(block.Root(), false)
}
txconfig := txpool.DefaultConfig
txconfig := legacypool.DefaultConfig
txconfig.Journal = "" // Don't litter the disk with test journals
pool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(new(big.Int).SetUint64(txconfig.PriceLimit), chain, []txpool.SubPool{pool})
return &testBackend{
db: db,
chain: chain,
txpool: txpool.New(txconfig, params.TestChainConfig, chain),
txpool: txpool,
}
}
// close tears down the transaction pool and chain behind the mock backend.
func (b *testBackend) close() {
b.txpool.Stop()
b.txpool.Close()
b.chain.Stop()
}

View File

@ -498,7 +498,7 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsPac
continue
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(tx); err != nil {
if encoded, err := rlp.EncodeToBytes(tx.Tx); err != nil {
log.Error("Failed to encode transaction", "err", err)
} else {
hashes = append(hashes, hash)

50
event/multisub.go Normal file
View File

@ -0,0 +1,50 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
// JoinSubscriptions joins multiple subscriptions to be able to track them as
// one entity and collectively cancel them of consume any errors from them.
func JoinSubscriptions(subs ...Subscription) Subscription {
return NewSubscription(func(unsubbed <-chan struct{}) error {
// Unsubscribe all subscriptions before returning
defer func() {
for _, sub := range subs {
sub.Unsubscribe()
}
}()
// Wait for an error on any of the subscriptions and propagate up
errc := make(chan error, len(subs))
for i := range subs {
go func(sub Subscription) {
select {
case err := <-sub.Err():
if err != nil {
errc <- err
}
case <-unsubbed:
}
}(subs[i])
}
select {
case err := <-errc:
return err
case <-unsubbed:
return nil
}
})
}

175
event/multisub_test.go Normal file
View File

@ -0,0 +1,175 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"testing"
"time"
)
func TestMultisub(t *testing.T) {
// Create a double subscription and ensure events propagate through
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
feed1.Send(1)
select {
case n := <-sink1:
if n != 1 {
t.Errorf("sink 1 delivery mismatch: have %d, want %d", n, 1)
}
default:
t.Error("sink 1 missing delivery")
}
feed2.Send(2)
select {
case n := <-sink2:
if n != 2 {
t.Errorf("sink 2 delivery mismatch: have %d, want %d", n, 2)
}
default:
t.Error("sink 2 missing delivery")
}
// Unsubscribe and ensure no more events are delivered
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
feed1.Send(11)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(22)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}
func TestMutisubPartialUnsubscribe(t *testing.T) {
// Create a double subscription but terminate one half, ensuring no error
// is propagated yet up to the outer subscription
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
sub1.Unsubscribe()
select {
case <-sub.Err():
t.Error("multisub propagated closure")
case <-time.After(50 * time.Millisecond):
}
// Ensure that events cross only the second feed
feed1.Send(1)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(2)
select {
case n := <-sink2:
if n != 2 {
t.Errorf("sink 2 delivery mismatch: have %d, want %d", n, 2)
}
default:
t.Error("sink 2 missing delivery")
}
// Unsubscribe and ensure no more events are delivered
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
feed1.Send(11)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(22)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}
func TestMultisubFullUnsubscribe(t *testing.T) {
// Create a double subscription and terminate the multi sub, ensuring an
// error is propagated up.
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
// Ensure no more events are delivered
feed1.Send(1)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(2)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}

View File

@ -366,10 +366,10 @@ func (b testBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uin
panic("implement me")
}
func (b testBackend) Stats() (pending int, queued int) { panic("implement me") }
func (b testBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
func (b testBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
panic("implement me")
}
func (b testBackend) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
func (b testBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
panic("implement me")
}
func (b testBackend) SubscribeNewTxsEvent(events chan<- core.NewTxsEvent) event.Subscription {

View File

@ -80,8 +80,8 @@ type Backend interface {
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Stats() (pending int, queued int)
TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction)
TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
ChainConfig() *params.ChainConfig

View File

@ -325,10 +325,10 @@ func (b *backendMock) GetPoolNonce(ctx context.Context, addr common.Address) (ui
return 0, nil
}
func (b *backendMock) Stats() (pending int, queued int) { return 0, 0 }
func (b *backendMock) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return nil, nil
}
func (b *backendMock) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return nil, nil
}
func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }

View File

@ -23,7 +23,8 @@ const (
LightCategory = "LIGHT CLIENT"
DevCategory = "DEVELOPER CHAIN"
EthashCategory = "ETHASH"
TxPoolCategory = "TRANSACTION POOL"
TxPoolCategory = "TRANSACTION POOL (EVM)"
BlobPoolCategory = "TRANSACTION POOL (BLOB)"
PerfCategory = "PERFORMANCE TUNING"
AccountCategory = "ACCOUNT"
APICategory = "API AND CONSOLE"

View File

@ -224,11 +224,11 @@ func (b *LesApiBackend) Stats() (pending int, queued int) {
return b.eth.txPool.Stats(), 0
}
func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
func (b *LesApiBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return b.eth.txPool.Content()
}
func (b *LesApiBackend) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
func (b *LesApiBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return b.eth.txPool.ContentFrom(addr)
}

View File

@ -518,12 +518,7 @@ func handleSendTx(msg Decoder) (serveRequestFn, uint64, uint64, error) {
hash := tx.Hash()
stats[i] = txStatus(backend, hash)
if stats[i].Status == txpool.TxStatusUnknown {
addFn := backend.TxPool().AddRemotes
// Add txs synchronously for testing purpose
if backend.AddTxsSync() {
addFn = backend.TxPool().AddRemotesSync
}
if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
if errs := backend.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, false, backend.AddTxsSync()); errs[0] != nil {
stats[i].Error = errs[0].Error()
continue
}
@ -556,7 +551,7 @@ func handleGetTxStatus(msg Decoder) (serveRequestFn, uint64, uint64, error) {
func txStatus(b serverBackend, hash common.Hash) light.TxStatus {
var stat light.TxStatus
// Looking the transaction in txpool first.
stat.Status = b.TxPool().Status([]common.Hash{hash})[0]
stat.Status = b.TxPool().Status(hash)
// If the transaction is unknown to the pool, try looking it up locally.
if stat.Status == txpool.TxStatusUnknown {

View File

@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
@ -232,9 +233,11 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
simulation := backends.NewSimulatedBackendWithDatabase(db, gspec.Alloc, 100000000)
prepare(blocks, simulation)
txpoolConfig := txpool.DefaultConfig
txpoolConfig := legacypool.DefaultConfig
txpoolConfig.Journal = ""
txpool := txpool.New(txpoolConfig, gspec.Config, simulation.Blockchain())
pool := legacypool.New(txpoolConfig, simulation.Blockchain())
txpool, _ := txpool.New(new(big.Int).SetUint64(txpoolConfig.PriceLimit), simulation.Blockchain(), []txpool.SubPool{pool})
server := &LesServer{
lesCommons: lesCommons{

View File

@ -494,29 +494,29 @@ func (pool *TxPool) GetTransactions() (txs types.Transactions, err error) {
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
func (pool *TxPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
pool.mu.RLock()
defer pool.mu.RUnlock()
// Retrieve all the pending transactions and sort by account and by nonce
pending := make(map[common.Address]types.Transactions)
pending := make(map[common.Address][]*types.Transaction)
for _, tx := range pool.pending {
account, _ := types.Sender(pool.signer, tx)
pending[account] = append(pending[account], tx)
}
// There are no queued transactions in a light pool, just return an empty map
queued := make(map[common.Address]types.Transactions)
queued := make(map[common.Address][]*types.Transaction)
return pending, queued
}
// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
func (pool *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
pool.mu.RLock()
defer pool.mu.RUnlock()
// Retrieve the pending transactions and sort by nonce
var pending types.Transactions
var pending []*types.Transaction
for _, tx := range pool.pending {
account, _ := types.Sender(pool.signer, tx)
if account != addr {
@ -525,7 +525,7 @@ func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.
pending = append(pending, tx)
}
// There are no queued transactions in a light pool, just return an empty map
return pending, types.Transactions{}
return pending, []*types.Transaction{}
}
// RemoveTransactions removes all given transactions from the pool.

View File

@ -29,10 +29,12 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
@ -61,11 +63,16 @@ func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *stat
}
type testBlockChain struct {
config *params.ChainConfig
statedb *state.StateDB
gasLimit uint64
chainHeadFeed *event.Feed
}
func (bc *testBlockChain) Config() *params.ChainConfig {
return bc.config
}
func (bc *testBlockChain) CurrentBlock() *types.Header {
return &types.Header{
Number: new(big.Int),
@ -265,10 +272,12 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) {
t.Fatalf("can't create new chain %v", err)
}
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(chainDB), nil)
blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)}
blockchain := &testBlockChain{chainConfig, statedb, 10000000, new(event.Feed)}
pool := txpool.New(testTxPoolConfig, chainConfig, blockchain)
backend := NewMockBackend(bc, pool)
pool := legacypool.New(testTxPoolConfig, blockchain)
txpool, _ := txpool.New(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain, []txpool.SubPool{pool})
backend := NewMockBackend(bc, txpool)
// Create event Mux
mux := new(event.TypeMux)
// Create Miner
@ -276,7 +285,7 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) {
cleanup := func(skipMiner bool) {
bc.Stop()
engine.Close()
pool.Stop()
txpool.Close()
if !skipMiner {
miner.Close()
}

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/common/fdlimit"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
@ -132,7 +133,7 @@ func main() {
if err != nil {
panic(err)
}
if err := backend.TxPool().AddLocal(tx); err != nil {
if err := backend.TxPool().Add([]*txpool.Transaction{{Tx: tx}}, true, false); err != nil {
panic(err)
}
nonces[index]++
@ -206,7 +207,7 @@ func makeSealer(genesis *core.Genesis) (*node.Node, *eth.Ethereum, error) {
SyncMode: downloader.FullSync,
DatabaseCache: 256,
DatabaseHandles: 256,
TxPool: txpool.DefaultConfig,
TxPool: legacypool.DefaultConfig,
GPO: ethconfig.Defaults.GPO,
Miner: miner.Config{
GasCeil: genesis.GasLimit * 11 / 10,

View File

@ -533,7 +533,7 @@ func (w *worker) mainLoop() {
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
}
txs := make(map[common.Address]types.Transactions, len(ev.Txs))
txs := make(map[common.Address][]*types.Transaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)
@ -904,7 +904,13 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
/*blobtxs := w.eth.BlobPool().Pending(
uint256.MustFromBig(env.header.BaseFee),
uint256.MustFromBig(misc.CalcBlobFee(*env.header.ExcessDataGas)),
)
log.Trace("Side-effect log, much wow", "blobs", len(blobtxs))*/
localTxs, remoteTxs := make(map[common.Address][]*types.Transaction), pending
for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account)

View File

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -49,7 +50,7 @@ const (
var (
// Test chain configurations
testTxPoolConfig txpool.Config
testTxPoolConfig legacypool.Config
ethashChainConfig *params.ChainConfig
cliqueChainConfig *params.ChainConfig
@ -62,7 +63,7 @@ var (
testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey)
// Test transactions
pendingTxs []*types.Transaction
pendingTxs []*txpool.Transaction
newTxs []*types.Transaction
testConfig = &Config{
@ -72,7 +73,7 @@ var (
)
func init() {
testTxPoolConfig = txpool.DefaultConfig
testTxPoolConfig = legacypool.DefaultConfig
testTxPoolConfig.Journal = ""
ethashChainConfig = new(params.ChainConfig)
*ethashChainConfig = *params.TestChainConfig
@ -92,7 +93,7 @@ func init() {
Gas: params.TxGas,
GasPrice: big.NewInt(params.InitialBaseFee),
})
pendingTxs = append(pendingTxs, tx1)
pendingTxs = append(pendingTxs, &txpool.Transaction{Tx: tx1})
tx2 := types.MustSignNewTx(testBankKey, signer, &types.LegacyTx{
Nonce: 1,
@ -132,10 +133,13 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
if err != nil {
t.Fatalf("core.NewBlockChain failed: %v", err)
}
pool := legacypool.New(testTxPoolConfig, chain)
txpool, _ := txpool.New(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), chain, []txpool.SubPool{pool})
return &testWorkerBackend{
db: db,
chain: chain,
txPool: txpool.New(testTxPoolConfig, chainConfig, chain),
txPool: txpool,
genesis: gspec,
}
}
@ -156,7 +160,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.AddLocals(pendingTxs)
backend.txPool.Add(pendingTxs, true, false)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
w.setEtherbase(testBankAddress)
return w, backend
@ -190,8 +194,8 @@ func TestGenerateAndImportBlock(t *testing.T) {
w.start()
for i := 0; i < 5; i++ {
b.txPool.AddLocal(b.newRandomTx(true))
b.txPool.AddLocal(b.newRandomTx(false))
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(true)}}, true, false)
b.txPool.Add([]*txpool.Transaction{{Tx: b.newRandomTx(false)}}, true, false)
select {
case ev := <-sub.Chan():

View File

@ -160,6 +160,8 @@ const (
RefundQuotient uint64 = 2
RefundQuotientEIP3529 uint64 = 5
BlobTxBytesPerFieldElement = 32 // Size in bytes of a field element
BlobTxFieldElementsPerBlob = 4096 // Number of field elements stored in a single data blob
BlobTxHashVersion = 0x01 // Version byte of the commitment hash
BlobTxMaxDataGasPerBlock = 1 << 19 // Maximum consumable data gas for data blobs per block
BlobTxTargetDataGasPerBlock = 1 << 18 // Target consumable data gas for data blobs per block (for 1559-like pricing)

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -128,6 +129,9 @@ type fuzzer struct {
}
func newFuzzer(input []byte) *fuzzer {
pool := legacypool.New(legacypool.DefaultConfig, chain)
txpool, _ := txpool.New(new(big.Int).SetUint64(legacypool.DefaultConfig.PriceLimit), chain, []txpool.SubPool{pool})
return &fuzzer{
chain: chain,
chainLen: testChainLen,
@ -138,7 +142,7 @@ func newFuzzer(input []byte) *fuzzer {
chtKeys: chtKeys,
bloomKeys: bloomKeys,
nonce: uint64(len(txHashes)),
pool: txpool.New(txpool.DefaultConfig, params.TestChainConfig, chain),
pool: txpool,
input: bytes.NewReader(input),
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
)
@ -79,7 +80,7 @@ func Fuzz(input []byte) int {
f := fetcher.NewTxFetcherForTests(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
func(txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },