core/txpool/blobpool: 4844 blob transaction pool (#26940)

* core/blobpool: implement txpool for blob txs

* core/txpool: track address reservations to notice any weird bugs

* core/txpool/blobpool: add support for in-memory operation for tests

* core/txpool/blobpool: fix heap updating after SetGasTip if account is evicted

* core/txpool/blobpool: fix eviction order if cheap leading txs are included

* core/txpool/blobpool: add note as to why the eviction fields are not inited in reinject

* go.mod: pull in inmem billy form upstream

* core/txpool/blobpool: fix review commens

* core/txpool/blobpool: make heap and heap test deterministic

* core/txpool/blobpool: luv u linter

* core/txpool: limit blob transactions to 16 per account

* core/txpool/blobpool: fix rebase errors

* core/txpool/blobpool: luv you linter

* go.mod: revert some strange crypto package dep updates
This commit is contained in:
Péter Szilágyi 2023-07-27 13:45:35 +03:00 committed by GitHub
parent 37b952a4a2
commit 1662228ac6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 4665 additions and 363 deletions

View File

@ -81,6 +81,9 @@ var (
utils.TxPoolAccountQueueFlag, utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag, utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag, utils.TxPoolLifetimeFlag,
utils.BlobPoolDataDirFlag,
utils.BlobPoolDataCapFlag,
utils.BlobPoolPriceBumpFlag,
utils.SyncModeFlag, utils.SyncModeFlag,
utils.SyncTargetFlag, utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag, utils.ExitWhenSyncedFlag,

View File

@ -368,6 +368,25 @@ var (
Value: ethconfig.Defaults.TxPool.Lifetime, Value: ethconfig.Defaults.TxPool.Lifetime,
Category: flags.TxPoolCategory, Category: flags.TxPoolCategory,
} }
// Blob transaction pool settings
BlobPoolDataDirFlag = &cli.StringFlag{
Name: "blobpool.datadir",
Usage: "Data directory to store blob transactions in",
Value: ethconfig.Defaults.BlobPool.Datadir,
Category: flags.BlobPoolCategory,
}
BlobPoolDataCapFlag = &cli.Uint64Flag{
Name: "blobpool.datacap",
Usage: "Disk space to allocate for pending blob transactions (soft limit)",
Value: ethconfig.Defaults.BlobPool.Datacap,
Category: flags.BlobPoolCategory,
}
BlobPoolPriceBumpFlag = &cli.Uint64Flag{
Name: "blobpool.pricebump",
Usage: "Price bump percentage to replace an already existing blob transaction",
Value: ethconfig.Defaults.BlobPool.PriceBump,
Category: flags.BlobPoolCategory,
}
// Performance tuning settings // Performance tuning settings
CacheFlag = &cli.IntFlag{ CacheFlag = &cli.IntFlag{
Name: "cache", Name: "cache",

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,50 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"github.com/ethereum/go-ethereum/log"
)
// Config are the configuration parameters of the blob transaction pool.
type Config struct {
Datadir string // Data directory containing the currently executable blobs
Datacap uint64 // Soft-cap of database storage (hard cap is larger due to overhead)
PriceBump uint64 // Minimum price bump percentage to replace an already existing nonce
}
// DefaultConfig contains the default configurations for the transaction pool.
var DefaultConfig = Config{
Datadir: "blobpool",
Datacap: 10 * 1024 * 1024 * 1024,
PriceBump: 100, // either have patience or be aggressive, no mushy ground
}
// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (config *Config) sanitize() Config {
conf := *config
if conf.Datacap < 1 {
log.Warn("Sanitizing invalid blobpool storage cap", "provided", conf.Datacap, "updated", DefaultConfig.Datacap)
conf.Datacap = DefaultConfig.Datacap
}
if conf.PriceBump < 1 {
log.Warn("Sanitizing invalid blobpool price bump", "provided", conf.PriceBump, "updated", DefaultConfig.PriceBump)
conf.PriceBump = DefaultConfig.PriceBump
}
return conf
}

View File

@ -0,0 +1,146 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"bytes"
"container/heap"
"math"
"sort"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
)
// evictHeap is a helper data structure to keep track of the cheapest bottleneck
// transaction from each account to determine which account to evict from.
//
// The heap internally tracks a slice of cheapest transactions from each account
// and a mapping from addresses to indices for direct removals/udates.
//
// The goal of the heap is to decide which account has the worst bottleneck to
// evict transactions from.
type evictHeap struct {
metas *map[common.Address][]*blobTxMeta // Pointer to the blob pool's index for price retrievals
basefeeJumps float64 // Pre-calculated absolute dynamic fee jumps for the base fee
blobfeeJumps float64 // Pre-calculated absolute dynamic fee jumps for the blob fee
addrs []common.Address // Heap of addresses to retrieve the cheapest out of
index map[common.Address]int // Indices into the heap for replacements
}
// newPriceHeap creates a new heap of cheapets accounts in the blob pool to evict
// from in case of over saturation.
func newPriceHeap(basefee *uint256.Int, blobfee *uint256.Int, index *map[common.Address][]*blobTxMeta) *evictHeap {
heap := &evictHeap{
metas: index,
index: make(map[common.Address]int),
}
// Populate the heap in account sort order. Not really needed in practice,
// but it makes the heap initialization deterministic and less annoying to
// test in unit tests.
addrs := make([]common.Address, 0, len(*index))
for addr := range *index {
addrs = append(addrs, addr)
}
sort.Slice(addrs, func(i, j int) bool { return bytes.Compare(addrs[i][:], addrs[j][:]) < 0 })
for _, addr := range addrs {
heap.index[addr] = len(heap.addrs)
heap.addrs = append(heap.addrs, addr)
}
heap.reinit(basefee, blobfee, true)
return heap
}
// reinit updates the pre-calculated dynamic fee jumps in the price heap and runs
// the sorting algorithm from scratch on the entire heap.
func (h *evictHeap) reinit(basefee *uint256.Int, blobfee *uint256.Int, force bool) {
// If the update is mostly the same as the old, don't sort pointlessly
basefeeJumps := dynamicFeeJumps(basefee)
blobfeeJumps := dynamicFeeJumps(blobfee)
if !force && math.Abs(h.basefeeJumps-basefeeJumps) < 0.01 && math.Abs(h.blobfeeJumps-blobfeeJumps) < 0.01 { // TODO(karalabe): 0.01 enough, maybe should be smaller? Maybe this optimization is moot?
return
}
// One or both of the dynamic fees jumped, resort the pool
h.basefeeJumps = basefeeJumps
h.blobfeeJumps = blobfeeJumps
heap.Init(h)
}
// Len implements sort.Interface as part of heap.Interface, returning the number
// of accounts in the pool which can be considered for eviction.
func (h *evictHeap) Len() int {
return len(h.addrs)
}
// Less implements sort.Interface as part of heap.Interface, returning which of
// the two requested accounts has a cheaper bottleneck.
func (h *evictHeap) Less(i, j int) bool {
txsI := (*(h.metas))[h.addrs[i]]
txsJ := (*(h.metas))[h.addrs[j]]
lastI := txsI[len(txsI)-1]
lastJ := txsJ[len(txsJ)-1]
prioI := evictionPriority(h.basefeeJumps, lastI.evictionExecFeeJumps, h.blobfeeJumps, lastI.evictionBlobFeeJumps)
if prioI > 0 {
prioI = 0
}
prioJ := evictionPriority(h.basefeeJumps, lastJ.evictionExecFeeJumps, h.blobfeeJumps, lastJ.evictionBlobFeeJumps)
if prioJ > 0 {
prioJ = 0
}
if prioI == prioJ {
return lastI.evictionExecTip.Lt(lastJ.evictionExecTip)
}
return prioI < prioJ
}
// Swap implements sort.Interface as part of heap.Interface, maintaining both the
// order of the accounts according to the heap, and the account->item slot mapping
// for replacements.
func (h *evictHeap) Swap(i, j int) {
h.index[h.addrs[i]], h.index[h.addrs[j]] = h.index[h.addrs[j]], h.index[h.addrs[i]]
h.addrs[i], h.addrs[j] = h.addrs[j], h.addrs[i]
}
// Push implements heap.Interface, appending an item to the end of the account
// ordering as well as the address to item slot mapping.
func (h *evictHeap) Push(x any) {
h.index[x.(common.Address)] = len(h.addrs)
h.addrs = append(h.addrs, x.(common.Address))
}
// Pop implements heap.Interface, removing and returning the last element of the
// heap.
//
// Note, use `heap.Pop`, not `evictHeap.Pop`. This method is used by Go's heap,
// to provide the functionality, it does not embed it.
func (h *evictHeap) Pop() any {
// Remove the last element from the heap
size := len(h.addrs)
addr := h.addrs[size-1]
h.addrs = h.addrs[:size-1]
// Unindex the removed element and return
delete(h.index, addr)
return addr
}

View File

@ -0,0 +1,320 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"container/heap"
mrand "math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
var rand = mrand.New(mrand.NewSource(1))
// verifyHeapInternals verifies that all accounts present in the index are also
// present in the heap and internals are consistent across various indices.
func verifyHeapInternals(t *testing.T, evict *evictHeap) {
t.Helper()
// Ensure that all accounts are present in the heap and no extras
seen := make(map[common.Address]struct{})
for i, addr := range evict.addrs {
seen[addr] = struct{}{}
if _, ok := (*evict.metas)[addr]; !ok {
t.Errorf("heap contains unexpected address at slot %d: %v", i, addr)
}
}
for addr := range *evict.metas {
if _, ok := seen[addr]; !ok {
t.Errorf("heap is missing required address %v", addr)
}
}
if len(evict.addrs) != len(*evict.metas) {
t.Errorf("heap size %d mismatches metadata size %d", len(evict.addrs), len(*evict.metas))
}
// Ensure that all accounts are present in the heap order index and no extras
have := make([]common.Address, len(evict.index))
for addr, i := range evict.index {
have[i] = addr
}
if len(have) != len(evict.addrs) {
t.Errorf("heap index size %d mismatches heap size %d", len(have), len(evict.addrs))
}
for i := 0; i < len(have) && i < len(evict.addrs); i++ {
if have[i] != evict.addrs[i] {
t.Errorf("heap index for slot %d mismatches: have %v, want %v", i, have[i], evict.addrs[i])
}
}
}
// Tests that the price heap can correctly sort its set of transactions based on
// an input base- and blob fee.
func TestPriceHeapSorting(t *testing.T) {
tests := []struct {
execTips []uint64
execFees []uint64
blobFees []uint64
basefee uint64
blobfee uint64
order []int
}{
// If everything is above the basefee and blobfee, order by miner tip
{
execTips: []uint64{1, 0, 2},
execFees: []uint64{1, 2, 3},
blobFees: []uint64{3, 2, 1},
basefee: 0,
blobfee: 0,
order: []int{1, 0, 2},
},
// If only basefees are used (blob fee matches with network), return the
// ones the furthest below the current basefee, splitting same ones with
// the tip. Anything above the basefee should be split by tip.
{
execTips: []uint64{100, 50, 100, 50, 1, 2, 3},
execFees: []uint64{1000, 1000, 500, 500, 2000, 2000, 2000},
blobFees: []uint64{0, 0, 0, 0, 0, 0, 0},
basefee: 1999,
blobfee: 0,
order: []int{3, 2, 1, 0, 4, 5, 6},
},
// If only blobfees are used (base fee matches with network), return the
// ones the furthest below the current blobfee, splitting same ones with
// the tip. Anything above the blobfee should be split by tip.
{
execTips: []uint64{100, 50, 100, 50, 1, 2, 3},
execFees: []uint64{0, 0, 0, 0, 0, 0, 0},
blobFees: []uint64{1000, 1000, 500, 500, 2000, 2000, 2000},
basefee: 0,
blobfee: 1999,
order: []int{3, 2, 1, 0, 4, 5, 6},
},
// If both basefee and blobfee is specified, sort by the larger distance
// of the two from the current network conditions, splitting same (loglog)
// ones via the tip.
//
// Basefee: 1000
// Blobfee: 100
//
// Tx #0: (800, 80) - 2 jumps below both => priority -1
// Tx #1: (630, 63) - 4 jumps below both => priority -2
// Tx #2: (800, 63) - 2 jumps below basefee, 4 jumps below blobfee => priority -2 (blob penalty dominates)
// Tx #3: (630, 80) - 4 jumps below basefee, 2 jumps below blobfee => priority -2 (base penalty dominates)
//
// Txs 1, 2, 3 share the same priority, split via tip, prefer 0 as the best
{
execTips: []uint64{1, 2, 3, 4},
execFees: []uint64{800, 630, 800, 630},
blobFees: []uint64{80, 63, 63, 80},
basefee: 1000,
blobfee: 100,
order: []int{1, 2, 3, 0},
},
}
for i, tt := range tests {
// Create an index of the transactions
index := make(map[common.Address][]*blobTxMeta)
for j := byte(0); j < byte(len(tt.execTips)); j++ {
addr := common.Address{j}
var (
execTip = uint256.NewInt(tt.execTips[j])
execFee = uint256.NewInt(tt.execFees[j])
blobFee = uint256.NewInt(tt.blobFees[j])
basefeeJumps = dynamicFeeJumps(execFee)
blobfeeJumps = dynamicFeeJumps(blobFee)
)
index[addr] = []*blobTxMeta{{
id: uint64(j),
size: 128 * 1024,
nonce: 0,
execTipCap: execTip,
execFeeCap: execFee,
blobFeeCap: blobFee,
basefeeJumps: basefeeJumps,
blobfeeJumps: blobfeeJumps,
evictionExecTip: execTip,
evictionExecFeeJumps: basefeeJumps,
evictionBlobFeeJumps: blobfeeJumps,
}}
}
// Create a price heap and check the pop order
priceheap := newPriceHeap(uint256.NewInt(tt.basefee), uint256.NewInt(tt.blobfee), &index)
verifyHeapInternals(t, priceheap)
for j := 0; j < len(tt.order); j++ {
if next := heap.Pop(priceheap); int(next.(common.Address)[0]) != tt.order[j] {
t.Errorf("test %d, item %d: order mismatch: have %d, want %d", i, j, next.(common.Address)[0], tt.order[j])
} else {
delete(index, next.(common.Address)) // remove to simulate a correct pool for the test
}
verifyHeapInternals(t, priceheap)
}
}
}
// Benchmarks reheaping the entire set of accounts in the blob pool.
func BenchmarkPriceHeapReinit1MB(b *testing.B) { benchmarkPriceHeapReinit(b, 1024*1024) }
func BenchmarkPriceHeapReinit10MB(b *testing.B) { benchmarkPriceHeapReinit(b, 10*1024*1024) }
func BenchmarkPriceHeapReinit100MB(b *testing.B) { benchmarkPriceHeapReinit(b, 100*1024*1024) }
func BenchmarkPriceHeapReinit1GB(b *testing.B) { benchmarkPriceHeapReinit(b, 1024*1024*1024) }
func BenchmarkPriceHeapReinit10GB(b *testing.B) { benchmarkPriceHeapReinit(b, 10*1024*1024*1024) }
func BenchmarkPriceHeapReinit25GB(b *testing.B) { benchmarkPriceHeapReinit(b, 25*1024*1024*1024) }
func BenchmarkPriceHeapReinit50GB(b *testing.B) { benchmarkPriceHeapReinit(b, 50*1024*1024*1024) }
func BenchmarkPriceHeapReinit100GB(b *testing.B) { benchmarkPriceHeapReinit(b, 100*1024*1024*1024) }
func benchmarkPriceHeapReinit(b *testing.B, datacap uint64) {
// Calculate how many unique transactions we can fit into the provided disk
// data cap
blobs := datacap / (params.BlobTxBytesPerFieldElement * params.BlobTxFieldElementsPerBlob)
// Create a random set of transactions with random fees. Use a separate account
// for each transaction to make it worse case.
index := make(map[common.Address][]*blobTxMeta)
for i := 0; i < int(blobs); i++ {
var addr common.Address
rand.Read(addr[:])
var (
execTip = uint256.NewInt(rand.Uint64())
execFee = uint256.NewInt(rand.Uint64())
blobFee = uint256.NewInt(rand.Uint64())
basefeeJumps = dynamicFeeJumps(execFee)
blobfeeJumps = dynamicFeeJumps(blobFee)
)
index[addr] = []*blobTxMeta{{
id: uint64(i),
size: 128 * 1024,
nonce: 0,
execTipCap: execTip,
execFeeCap: execFee,
blobFeeCap: blobFee,
basefeeJumps: basefeeJumps,
blobfeeJumps: blobfeeJumps,
evictionExecTip: execTip,
evictionExecFeeJumps: basefeeJumps,
evictionBlobFeeJumps: blobfeeJumps,
}}
}
// Create a price heap and reinit it over and over
heap := newPriceHeap(uint256.NewInt(rand.Uint64()), uint256.NewInt(rand.Uint64()), &index)
basefees := make([]*uint256.Int, b.N)
blobfees := make([]*uint256.Int, b.N)
for i := 0; i < b.N; i++ {
basefees[i] = uint256.NewInt(rand.Uint64())
blobfees[i] = uint256.NewInt(rand.Uint64())
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
heap.reinit(basefees[i], blobfees[i], true)
}
}
// Benchmarks overflowing the heap over and over (add and then drop).
func BenchmarkPriceHeapOverflow1MB(b *testing.B) { benchmarkPriceHeapOverflow(b, 1024*1024) }
func BenchmarkPriceHeapOverflow10MB(b *testing.B) { benchmarkPriceHeapOverflow(b, 10*1024*1024) }
func BenchmarkPriceHeapOverflow100MB(b *testing.B) { benchmarkPriceHeapOverflow(b, 100*1024*1024) }
func BenchmarkPriceHeapOverflow1GB(b *testing.B) { benchmarkPriceHeapOverflow(b, 1024*1024*1024) }
func BenchmarkPriceHeapOverflow10GB(b *testing.B) { benchmarkPriceHeapOverflow(b, 10*1024*1024*1024) }
func BenchmarkPriceHeapOverflow25GB(b *testing.B) { benchmarkPriceHeapOverflow(b, 25*1024*1024*1024) }
func BenchmarkPriceHeapOverflow50GB(b *testing.B) { benchmarkPriceHeapOverflow(b, 50*1024*1024*1024) }
func BenchmarkPriceHeapOverflow100GB(b *testing.B) { benchmarkPriceHeapOverflow(b, 100*1024*1024*1024) }
func benchmarkPriceHeapOverflow(b *testing.B, datacap uint64) {
// Calculate how many unique transactions we can fit into the provided disk
// data cap
blobs := datacap / (params.BlobTxBytesPerFieldElement * params.BlobTxFieldElementsPerBlob)
// Create a random set of transactions with random fees. Use a separate account
// for each transaction to make it worse case.
index := make(map[common.Address][]*blobTxMeta)
for i := 0; i < int(blobs); i++ {
var addr common.Address
rand.Read(addr[:])
var (
execTip = uint256.NewInt(rand.Uint64())
execFee = uint256.NewInt(rand.Uint64())
blobFee = uint256.NewInt(rand.Uint64())
basefeeJumps = dynamicFeeJumps(execFee)
blobfeeJumps = dynamicFeeJumps(blobFee)
)
index[addr] = []*blobTxMeta{{
id: uint64(i),
size: 128 * 1024,
nonce: 0,
execTipCap: execTip,
execFeeCap: execFee,
blobFeeCap: blobFee,
basefeeJumps: basefeeJumps,
blobfeeJumps: blobfeeJumps,
evictionExecTip: execTip,
evictionExecFeeJumps: basefeeJumps,
evictionBlobFeeJumps: blobfeeJumps,
}}
}
// Create a price heap and overflow it over and over
evict := newPriceHeap(uint256.NewInt(rand.Uint64()), uint256.NewInt(rand.Uint64()), &index)
var (
addrs = make([]common.Address, b.N)
metas = make([]*blobTxMeta, b.N)
)
for i := 0; i < b.N; i++ {
rand.Read(addrs[i][:])
var (
execTip = uint256.NewInt(rand.Uint64())
execFee = uint256.NewInt(rand.Uint64())
blobFee = uint256.NewInt(rand.Uint64())
basefeeJumps = dynamicFeeJumps(execFee)
blobfeeJumps = dynamicFeeJumps(blobFee)
)
metas[i] = &blobTxMeta{
id: uint64(int(blobs) + i),
size: 128 * 1024,
nonce: 0,
execTipCap: execTip,
execFeeCap: execFee,
blobFeeCap: blobFee,
basefeeJumps: basefeeJumps,
blobfeeJumps: blobfeeJumps,
evictionExecTip: execTip,
evictionExecFeeJumps: basefeeJumps,
evictionBlobFeeJumps: blobfeeJumps,
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
index[addrs[i]] = []*blobTxMeta{metas[i]}
heap.Push(evict, addrs[i])
drop := heap.Pop(evict)
delete(index, drop.(common.Address))
}
}

View File

@ -0,0 +1,44 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)
// BlockChain defines the minimal set of methods needed to back a blob pool with
// a chain. Exists to allow mocking the live chain out of tests.
type BlockChain interface {
// Config retrieves the chain's fork configuration.
Config() *params.ChainConfig
// CurrentBlock returns the current head of the chain.
CurrentBlock() *types.Header
// CurrentFinalBlock returns the current block below which blobs should not
// be maintained anymore for reorg purposes.
CurrentFinalBlock() *types.Header
// GetBlock retrieves a specific block, used during pool resets.
GetBlock(hash common.Hash, number uint64) *types.Block
// StateAt returns a state database for a given root hash (generally the head).
StateAt(root common.Hash) (*state.StateDB, error)
}

View File

@ -0,0 +1,258 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/billy"
)
// limboBlob is a wrapper around an opaque blobset that also contains the tx hash
// to which it belongs as well as the block number in which it was included for
// finality eviction.
type limboBlob struct {
Owner common.Hash // Owner transaction's hash to support resurrecting reorged txs
Block uint64 // Block in which the blob transaction was included
Blobs []kzg4844.Blob // The opaque blobs originally part of the transaction
Commits []kzg4844.Commitment // The commitments for the original blobs
Proofs []kzg4844.Proof // The proofs verifying the commitments
}
// limbo is a light, indexed database to temporarily store recently included
// blobs until they are finalized. The purpose is to support small reorgs, which
// would require pulling back up old blobs (which aren't part of the chain).
//
// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without?
type limbo struct {
store billy.Database // Persistent data store for limboed blobs
index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks
}
// newLimbo opens and indexes a set of limboed blob transactions.
func newLimbo(datadir string) (*limbo, error) {
l := &limbo{
index: make(map[common.Hash]uint64),
groups: make(map[uint64]map[uint64]common.Hash),
}
// Index all limboed blobs on disk and delete anything inprocessable
var fails []uint64
index := func(id uint64, size uint32, data []byte) {
if l.parseBlob(id, data) != nil {
fails = append(fails, id)
}
}
store, err := billy.Open(billy.Options{Path: datadir}, newSlotter(), index)
if err != nil {
return nil, err
}
l.store = store
if len(fails) > 0 {
log.Warn("Dropping invalidated limboed blobs", "ids", fails)
for _, id := range fails {
if err := l.store.Delete(id); err != nil {
l.Close()
return nil, err
}
}
}
return l, nil
}
// Close closes down the underlying persistent store.
func (l *limbo) Close() error {
return l.store.Close()
}
// parseBlob is a callback method on limbo creation that gets called for each
// limboed blob on disk to create the in-memory metadata index.
func (l *limbo) parseBlob(id uint64, data []byte) error {
item := new(limboBlob)
if err := rlp.DecodeBytes(data, item); err != nil {
// This path is impossible unless the disk data representation changes
// across restarts. For that ever unprobable case, recover gracefully
// by ignoring this data entry.
log.Error("Failed to decode blob limbo entry", "id", id, "err", err)
return err
}
if _, ok := l.index[item.Owner]; ok {
// This path is impossible, unless due to a programming error a blob gets
// inserted into the limbo which was already part of if. Recover gracefully
// by ignoring this data entry.
log.Error("Dropping duplicate blob limbo entry", "owner", item.Owner, "id", id)
return errors.New("duplicate blob")
}
l.index[item.Owner] = id
if _, ok := l.groups[item.Block]; !ok {
l.groups[item.Block] = make(map[uint64]common.Hash)
}
l.groups[item.Block][id] = item.Owner
return nil
}
// finalize evicts all blobs belonging to a recently finalized block or older.
func (l *limbo) finalize(final *types.Header) {
// Just in case there's no final block yet (network not yet merged, weird
// restart, sethead, etc), fail gracefully.
if final == nil {
log.Error("Nil finalized block cannot evict old blobs")
return
}
for block, ids := range l.groups {
if block > final.Number.Uint64() {
continue
}
for id, owner := range ids {
if err := l.store.Delete(id); err != nil {
log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err)
}
delete(l.index, owner)
}
delete(l.groups, block)
}
}
// push stores a new blob transaction into the limbo, waiting until finality for
// it to be automatically evicted.
func (l *limbo) push(tx common.Hash, block uint64, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
// If the blobs are already tracked by the limbo, consider it a programming
// error. There's not much to do against it, but be loud.
if _, ok := l.index[tx]; ok {
log.Error("Limbo cannot push already tracked blobs", "tx", tx)
return errors.New("already tracked blob transaction")
}
if err := l.setAndIndex(tx, block, blobs, commits, proofs); err != nil {
log.Error("Failed to set and index liboed blobs", "tx", tx, "err", err)
return err
}
return nil
}
// pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob
// transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) ([]kzg4844.Blob, []kzg4844.Commitment, []kzg4844.Proof, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
id, ok := l.index[tx]
if !ok {
log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx)
return nil, nil, nil, errors.New("unseen blob transaction")
}
item, err := l.getAndDrop(id)
if err != nil {
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
return nil, nil, nil, err
}
return item.Blobs, item.Commits, item.Proofs, nil
}
// update changes the block number under which a blob transaction is tracked. This
// method should be used when a reorg changes a transaction's inclusion block.
//
// The method may log errors for various unexpcted scenarios but will not return
// any of it since there's no clear error case. Some errors may be due to coding
// issues, others caused by signers mining MEV stuff or swapping transactions. In
// all cases, the pool needs to continue operating.
func (l *limbo) update(tx common.Hash, block uint64) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
id, ok := l.index[tx]
if !ok {
log.Trace("Limbo cannot update non-tracked blobs", "tx", tx)
return
}
// If there was no change in the blob's inclusion block, don't mess around
// with heavy database operations.
if _, ok := l.groups[block][id]; ok {
log.Trace("Blob transaction unchanged in limbo", "tx", tx, "block", block)
return
}
// Retrieve the old blobs from the data store and write tehm back with a new
// block number. IF anything fails, there's not much to do, go on.
item, err := l.getAndDrop(id)
if err != nil {
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
return
}
if err := l.setAndIndex(tx, block, item.Blobs, item.Commits, item.Proofs); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", tx, "err", err)
return
}
log.Trace("Blob transaction updated in limbo", "tx", tx, "old-block", item.Block, "new-block", block)
}
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
// the store and indices.
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
data, err := l.store.Get(id)
if err != nil {
return nil, err
}
item := new(limboBlob)
if err = rlp.DecodeBytes(data, item); err != nil {
return nil, err
}
delete(l.index, item.Owner)
delete(l.groups[item.Block], id)
if len(l.groups[item.Block]) == 0 {
delete(l.groups, item.Block)
}
if err := l.store.Delete(id); err != nil {
return nil, err
}
return item, nil
}
// setAndIndex assembles a limbo blob database entry and stores it, also updating
// the in-memory indices.
func (l *limbo) setAndIndex(tx common.Hash, block uint64, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
item := &limboBlob{
Owner: tx,
Block: block,
Blobs: blobs,
Commits: commits,
Proofs: proofs,
}
data, err := rlp.EncodeToBytes(item)
if err != nil {
panic(err) // cannot happen runtime, dev error
}
id, err := l.store.Put(data)
if err != nil {
return err
}
l.index[tx] = id
if _, ok := l.groups[block]; !ok {
l.groups[block] = make(map[uint64]common.Hash)
}
l.groups[block][id] = tx
return nil
}

View File

@ -0,0 +1,78 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import "github.com/ethereum/go-ethereum/metrics"
var (
// datacapGauge tracks the user's configured capacity for the blob pool. It
// is mostly a way to expose/debug issues.
datacapGauge = metrics.NewRegisteredGauge("blobpool/datacap", nil)
// The below metrics track the per-datastore metrics for the primary blob
// store and the temporary limbo store.
datausedGauge = metrics.NewRegisteredGauge("blobpool/dataused", nil)
datarealGauge = metrics.NewRegisteredGauge("blobpool/datareal", nil)
slotusedGauge = metrics.NewRegisteredGauge("blobpool/slotused", nil)
limboDatausedGauge = metrics.NewRegisteredGauge("blobpool/limbo/dataused", nil)
limboDatarealGauge = metrics.NewRegisteredGauge("blobpool/limbo/datareal", nil)
limboSlotusedGauge = metrics.NewRegisteredGauge("blobpool/limbo/slotused", nil)
// The below metrics track the per-shelf metrics for the primary blob store
// and the temporary limbo store.
shelfDatausedGaugeName = "blobpool/shelf-%d/dataused"
shelfDatagapsGaugeName = "blobpool/shelf-%d/datagaps"
shelfSlotusedGaugeName = "blobpool/shelf-%d/slotused"
shelfSlotgapsGaugeName = "blobpool/shelf-%d/slotgaps"
limboShelfDatausedGaugeName = "blobpool/limbo/shelf-%d/dataused"
limboShelfDatagapsGaugeName = "blobpool/limbo/shelf-%d/datagaps"
limboShelfSlotusedGaugeName = "blobpool/limbo/shelf-%d/slotused"
limboShelfSlotgapsGaugeName = "blobpool/limbo/shelf-%d/slotgaps"
// The oversized metrics aggregate the shelf stats above the max blob count
// limits to track transactions that are just huge, but don't contain blobs.
//
// There are no oversized data in the limbo, it only contains blobs and some
// constant metadata.
oversizedDatausedGauge = metrics.NewRegisteredGauge("blobpool/oversized/dataused", nil)
oversizedDatagapsGauge = metrics.NewRegisteredGauge("blobpool/oversized/datagaps", nil)
oversizedSlotusedGauge = metrics.NewRegisteredGauge("blobpool/oversized/slotused", nil)
oversizedSlotgapsGauge = metrics.NewRegisteredGauge("blobpool/oversized/slotgaps", nil)
// basefeeGauge and blobfeeGauge track the current network 1559 base fee and
// 4844 blob fee respectively.
basefeeGauge = metrics.NewRegisteredGauge("blobpool/basefee", nil)
blobfeeGauge = metrics.NewRegisteredGauge("blobpool/blobfee", nil)
// pooltipGauge is the configurable miner tip to permit a transaction into
// the pool.
pooltipGauge = metrics.NewRegisteredGauge("blobpool/pooltip", nil)
// addwait/time, resetwait/time and getwait/time track the rough health of
// the pool and wether or not it's capable of keeping up with the load from
// the network.
addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015))
addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015))
getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015))
gettimeHist = metrics.NewRegisteredHistogram("blobpool/gettime", nil, metrics.NewExpDecaySample(1028, 0.015))
pendwaitHist = metrics.NewRegisteredHistogram("blobpool/pendwait", nil, metrics.NewExpDecaySample(1028, 0.015))
pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015))
resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))
resettimeHist = metrics.NewRegisteredHistogram("blobpool/resettime", nil, metrics.NewExpDecaySample(1028, 0.015))
)

View File

@ -0,0 +1,90 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"math"
"math/bits"
"github.com/holiman/uint256"
)
// log2_1_125 is used in the eviction priority calculation.
var log2_1_125 = math.Log2(1.125)
// evictionPriority calculates the eviction priority based on the algorithm
// described in the BlobPool docs for a both fee components.
//
// This method takes about 8ns on a very recent laptop CPU, recalculating about
// 125 million transaction priority values per second.
func evictionPriority(basefeeJumps float64, txBasefeeJumps, blobfeeJumps, txBlobfeeJumps float64) int {
var (
basefeePriority = evictionPriority1D(basefeeJumps, txBasefeeJumps)
blobfeePriority = evictionPriority1D(blobfeeJumps, txBlobfeeJumps)
)
if basefeePriority < blobfeePriority {
return basefeePriority
}
return blobfeePriority
}
// evictionPriority1D calculates the eviction priority based on the algorithm
// described in the BlobPool docs for a single fee component.
func evictionPriority1D(basefeeJumps float64, txfeeJumps float64) int {
jumps := txfeeJumps - basefeeJumps
if int(jumps) == 0 {
return 0 // can't log2 0
}
if jumps < 0 {
return -intLog2(uint(-math.Floor(jumps)))
}
return intLog2(uint(math.Ceil(jumps)))
}
// dynamicFeeJumps calculates the log1.125(fee), namely the number of fee jumps
// needed to reach the requested one. We only use it when calculating the jumps
// between 2 fees, so it doesn't matter from what exact number with returns.
// it returns the result from (0, 1, 1.125).
//
// This method is very expensive, taking about 75ns on a very recent laptop CPU,
// but the result does not change with the lifetime of a transaction, so it can
// be cached.
func dynamicFeeJumps(fee *uint256.Int) float64 {
if fee.IsZero() {
return 0 // can't log2 zero, should never happen outside tests, but don't choke
}
return math.Log2(fee.Float64()) / log2_1_125
}
// intLog2 is a helper to calculate the integral part of a log2 of an unsigned
// integer. It is a very specific calculation that's not particularly useful in
// general, but it's what we need here (it's fast).
func intLog2(n uint) int {
switch {
case n == 0:
panic("log2(0) is undefined")
case n < 2048:
return bits.UintSize - bits.LeadingZeros(n) - 1
default:
// The input is log1.125(uint256) = log2(uint256) / log2(1.125). At the
// most extreme, log2(uint256) will be a bit below 257, and the constant
// log2(1.125) ~= 0.17. The larges input thus is ~257 / ~0.17 ~= ~1511.
panic("dynamic fee jump diffs cannot reach this")
}
}

View File

@ -0,0 +1,87 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"testing"
"github.com/holiman/uint256"
)
// Tests that the priority fees are calculated correctly as the log2 of the fee
// jumps needed to go from the base fee to the tx's fee cap.
func TestPriorityCalculation(t *testing.T) {
tests := []struct {
basefee uint64
txfee uint64
result int
}{
{basefee: 7, txfee: 10, result: 2}, // 3.02 jumps, 4 ceil, 2 log2
{basefee: 17_200_000_000, txfee: 17_200_000_000, result: 0}, // 0 jumps, special case 0 log2
{basefee: 9_853_941_692, txfee: 11_085_092_510, result: 0}, // 0.99 jumps, 1 ceil, 0 log2
{basefee: 11_544_106_391, txfee: 10_356_781_100, result: 0}, // -0.92 jumps, -1 floor, 0 log2
{basefee: 17_200_000_000, txfee: 7, result: -7}, // -183.57 jumps, -184 floor, -7 log2
{basefee: 7, txfee: 17_200_000_000, result: 7}, // 183.57 jumps, 184 ceil, 7 log2
}
for i, tt := range tests {
var (
baseJumps = dynamicFeeJumps(uint256.NewInt(tt.basefee))
feeJumps = dynamicFeeJumps(uint256.NewInt(tt.txfee))
)
if prio := evictionPriority1D(baseJumps, feeJumps); prio != tt.result {
t.Errorf("test %d priority mismatch: have %d, want %d", i, prio, tt.result)
}
}
}
// Benchmarks how many dynamic fee jump values can be done.
func BenchmarkDynamicFeeJumpCalculation(b *testing.B) {
fees := make([]*uint256.Int, b.N)
for i := 0; i < b.N; i++ {
fees[i] = uint256.NewInt(rand.Uint64())
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
dynamicFeeJumps(fees[i])
}
}
// Benchmarks how many priority recalculations can be done.
func BenchmarkPriorityCalculation(b *testing.B) {
// The basefee and blob fee is constant for all transactions across a block,
// so we can assume theit absolute jump counts can be pre-computed.
basefee := uint256.NewInt(17_200_000_000) // 17.2 Gwei is the 22.03.2023 zero-emission basefee, random number
blobfee := uint256.NewInt(123_456_789_000) // Completely random, no idea what this will be
basefeeJumps := dynamicFeeJumps(basefee)
blobfeeJumps := dynamicFeeJumps(blobfee)
// The transaction's fee cap and blob fee cap are constant across the life
// of the transaction, so we can pre-calculate and cache them.
txBasefeeJumps := make([]float64, b.N)
txBlobfeeJumps := make([]float64, b.N)
for i := 0; i < b.N; i++ {
txBasefeeJumps[i] = dynamicFeeJumps(uint256.NewInt(rand.Uint64()))
txBlobfeeJumps[i] = dynamicFeeJumps(uint256.NewInt(rand.Uint64()))
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
evictionPriority(basefeeJumps, txBasefeeJumps[i], blobfeeJumps, txBlobfeeJumps[i])
}
}

View File

@ -0,0 +1,38 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
// newSlotter creates a helper method for the Billy datastore that returns the
// individual shelf sizes used to store transactions in.
//
// The slotter will create shelves for each possible blob count + some tx metadata
// wiggle room, up to the max permitted limits.
//
// The slotter also creates a shelf for 0-blob transactions. Whilst those are not
// allowed in the current protocol, having an empty shelf is not a relevant use
// of resources, but it makes stress testing with junk transactions simpler.
func newSlotter() func() (uint32, bool) {
slotsize := uint32(txAvgSize)
slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return
return func() (size uint32, done bool) {
slotsize += blobSize
finished := slotsize > maxBlobsPerTransaction*blobSize+txMaxSize
return slotsize, finished
}
}

View File

@ -0,0 +1,58 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import "testing"
// Tests that the slotter creates the expected database shelves.
func TestNewSlotter(t *testing.T) {
// Generate the database shelve sizes
slotter := newSlotter()
var shelves []uint32
for {
shelf, done := slotter()
shelves = append(shelves, shelf)
if done {
break
}
}
// Compare the database shelves to the expected ones
want := []uint32{
0*blobSize + txAvgSize, // 0 blob + some expected tx infos
1*blobSize + txAvgSize, // 1 blob + some expected tx infos
2*blobSize + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data)
3*blobSize + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data)
4*blobSize + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data)
5*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
6*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
7*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
8*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
9*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
10*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
11*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
12*blobSize + txAvgSize, // 1-4 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size
}
if len(shelves) != len(want) {
t.Errorf("shelves count mismatch: have %d, want %d", len(shelves), len(want))
}
for i := 0; i < len(shelves) && i < len(want); i++ {
if shelves[i] != want[i] {
t.Errorf("shelf %d mismatch: have %d, want %d", i, shelves[i], want[i])
}
}
}

View File

@ -34,6 +34,10 @@ var (
// with a different one without the required price bump. // with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrAccountLimitExceeded is returned if a transaction would exceed the number
// allowed by a pool for a single account.
ErrAccountLimitExceeded = errors.New("account limit exceeded")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block. // maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit") ErrGasLimit = errors.New("exceeds block gas limit")

View File

@ -219,6 +219,7 @@ type LegacyPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk journal *journal // Journal of local transaction to back up to disk
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account beats map[common.Address]time.Time // Last heartbeat from each known account
@ -291,7 +292,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
// head to allow balance / nonce checks. The transaction journal will be loaded // head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal // from disk and filtered based on the provided starting settings. The internal
// goroutines will be spun up and the pool deemed operational afterwards. // goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error { func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
pool.reserve = reserve
// Set the basic pool parameters // Set the basic pool parameters
pool.gasTip.Store(gasTip) pool.gasTip.Store(gasTip)
pool.reset(nil, head) pool.reset(nil, head)
@ -365,7 +369,7 @@ func (pool *LegacyPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime { if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten() list := pool.queue[addr].Flatten()
for _, tx := range list { for _, tx := range list {
pool.removeTx(tx.Hash(), true) pool.removeTx(tx.Hash(), true, true)
} }
queuedEvictionMeter.Mark(int64(len(list))) queuedEvictionMeter.Mark(int64(len(list)))
} }
@ -428,7 +432,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(tip) drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop { for _, tx := range drop {
pool.removeTx(tx.Hash(), false) pool.removeTx(tx.Hash(), false, true)
} }
pool.priced.Removed(len(drop)) pool.priced.Removed(len(drop))
} }
@ -508,11 +512,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The enforceTips parameter can be used to do an extra filtering on the pending // The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in // transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment. // the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
pending := make(map[common.Address][]*types.Transaction, len(pool.pending)) pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending { for addr, list := range pool.pending {
txs := list.Flatten() txs := list.Flatten()
@ -526,7 +530,18 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Tr
} }
} }
if len(txs) > 0 { if len(txs) > 0 {
pending[addr] = txs lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: &txpool.Transaction{Tx: txs[i]},
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
}
}
pending[addr] = lazies
} }
} }
return pending return pending
@ -586,6 +601,16 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
State: pool.currentState, State: pool.currentState,
FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
UsedAndLeftSlots: func(addr common.Address) (int, int) {
var have int
if list := pool.pending[addr]; list != nil {
have += list.Len()
}
if list := pool.queue[addr]; list != nil {
have += list.Len()
}
return have, math.MaxInt
},
ExistingExpenditure: func(addr common.Address) *big.Int { ExistingExpenditure: func(addr common.Address) *big.Int {
if list := pool.pending[addr]; list != nil { if list := pool.pending[addr]; list != nil {
return list.totalcost return list.totalcost
@ -632,10 +657,31 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
invalidTxMeter.Mark(1) invalidTxMeter.Mark(1)
return false, err return false, err
} }
// already validated by this point // already validated by this point
from, _ := types.Sender(pool.signer, tx) from, _ := types.Sender(pool.signer, tx)
// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
)
if !hasPending && !hasQueued {
if err := pool.reserve(from, true); err != nil {
return false, err
}
defer func() {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
pool.reserve(from, false)
}
}()
}
// If the transaction pool is full, discard underpriced transactions // If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it // If the new transaction is underpriced, don't accept it
@ -690,7 +736,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
for _, tx := range drop { for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1) underpricedTxMeter.Mark(1)
dropped := pool.removeTx(tx.Hash(), false)
sender, _ := types.Sender(pool.signer, tx)
dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc
pool.changesSinceReorg += dropped pool.changesSinceReorg += dropped
} }
} }
@ -1014,8 +1063,14 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {
// removeTx removes a single transaction from the queue, moving all subsequent // removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue. // transactions back to the future queue.
//
// In unreserve is false, the account will not be relinquished to the main txpool
// even if there are no more references to it. This is used to handle a race when
// a tx being added, and it evicts a previously scheduled tx from the same account,
// which could lead to a premature release of the lock.
//
// Returns the number of transactions removed from the pending queue. // Returns the number of transactions removed from the pending queue.
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int { func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int {
// Fetch the transaction we wish to delete // Fetch the transaction we wish to delete
tx := pool.all.Get(hash) tx := pool.all.Get(hash)
if tx == nil { if tx == nil {
@ -1023,6 +1078,20 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int {
} }
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// If after deletion there are no more transactions belonging to this account,
// relinquish the address reservation. It's a bit convoluted do this, via a
// defer, but it's safer vs. the many return pathways.
if unreserve {
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
)
if !hasPending && !hasQueued {
pool.reserve(addr, false)
}
}()
}
// Remove it from the list of known transactions // Remove it from the list of known transactions
pool.all.Remove(hash) pool.all.Remove(hash)
if outofbound { if outofbound {
@ -1273,7 +1342,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// there's nothing to add // there's nothing to add
if newNum >= oldNum { if newNum >= oldNum {
// If we reorged to a same or higher number, then it's not a case of setHead // If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing oldhead", log.Warn("Transaction pool reset with missing old head",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
return return
} }
@ -1287,7 +1356,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// the firing of newhead-event and _now_: most likely a // the firing of newhead-event and _now_: most likely a
// reorg caused by sync-reversion or explicit sethead back to an // reorg caused by sync-reversion or explicit sethead back to an
// earlier block. // earlier block.
log.Warn("New head missing in txpool reset", "number", newHead.Number, "hash", newHead.Hash()) log.Warn("Transaction pool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash())
return return
} }
var discarded, included types.Transactions var discarded, included types.Transactions
@ -1317,7 +1386,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
return return
} }
} }
reinject = types.TxDifference(discarded, included) lost := make([]*types.Transaction, 0, len(discarded))
for _, tx := range types.TxDifference(discarded, included) {
if pool.Filter(tx) {
lost = append(lost, tx)
}
}
reinject = lost
} }
} }
} }
@ -1402,6 +1477,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list.Empty() { if list.Empty() {
delete(pool.queue, addr) delete(pool.queue, addr)
delete(pool.beats, addr) delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserve(addr, false)
}
} }
} }
return promoted return promoted
@ -1523,7 +1601,7 @@ func (pool *LegacyPool) truncateQueue() {
// Drop all transactions if they are less than the overflow // Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop { if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() { for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true) pool.removeTx(tx.Hash(), true, true)
} }
drop -= size drop -= size
queuedRateLimitMeter.Mark(int64(size)) queuedRateLimitMeter.Mark(int64(size))
@ -1532,7 +1610,7 @@ func (pool *LegacyPool) truncateQueue() {
// Otherwise drop only last few transactions // Otherwise drop only last few transactions
txs := list.Flatten() txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- { for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true) pool.removeTx(txs[i].Hash(), true, true)
drop-- drop--
queuedRateLimitMeter.Mark(1) queuedRateLimitMeter.Mark(1)
} }
@ -1594,6 +1672,9 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty. // Delete the entire pending entry if it became empty.
if list.Empty() { if list.Empty() {
delete(pool.pending, addr) delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
pool.reserve(addr, false)
}
} }
} }
} }

View File

@ -84,7 +84,7 @@ func TestTransactionFutureAttack(t *testing.T) {
config.GlobalQueue = 100 config.GlobalQueue = 100
config.GlobalSlots = 100 config.GlobalSlots = 100
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
fillPool(t, pool) fillPool(t, pool)
pending, _ := pool.Stats() pending, _ := pool.Stats()
@ -118,7 +118,7 @@ func TestTransactionFuture1559(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts, fund them and make transactions // Create a number of test accounts, fund them and make transactions
@ -151,7 +151,7 @@ func TestTransactionZAttack(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts, fund them and make transactions // Create a number of test accounts, fund them and make transactions
fillPool(t, pool) fillPool(t, pool)
@ -222,7 +222,7 @@ func BenchmarkFutureAttack(b *testing.B) {
config.GlobalQueue = 100 config.GlobalQueue = 100
config.GlobalSlots = 100 config.GlobalSlots = 100
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
fillPool(b, pool) fillPool(b, pool)

View File

@ -24,6 +24,7 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -127,6 +128,31 @@ func dynamicFeeTx(nonce uint64, gaslimit uint64, gasFee *big.Int, tip *big.Int,
return tx return tx
} }
func makeAddressReserver() txpool.AddressReserver {
var (
reserved = make(map[common.Address]struct{})
lock sync.Mutex
)
return func(addr common.Address, reserve bool) error {
lock.Lock()
defer lock.Unlock()
_, exists := reserved[addr]
if reserve {
if exists {
panic("already reserved")
}
reserved[addr] = struct{}{}
return nil
}
if !exists {
panic("not reserved")
}
delete(reserved, addr)
return nil
}
}
func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { func setupPool() (*LegacyPool, *ecdsa.PrivateKey) {
return setupPoolWithConfig(params.TestChainConfig) return setupPoolWithConfig(params.TestChainConfig)
} }
@ -137,7 +163,7 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()); err != nil { if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()); err != nil {
panic(err) panic(err)
} }
// wait for the pool to initialize // wait for the pool to initialize
@ -256,7 +282,7 @@ func TestStateChangeDuringReset(t *testing.T) {
tx1 := transaction(1, 100000, key) tx1 := transaction(1, 100000, key)
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
nonce := pool.Nonce(address) nonce := pool.Nonce(address)
@ -455,7 +481,7 @@ func TestChainFork(t *testing.T) {
if _, err := pool.add(tx, false); err != nil { if _, err := pool.add(tx, false); err != nil {
t.Error("didn't expect error", err) t.Error("didn't expect error", err)
} }
pool.removeTx(tx.Hash(), true) pool.removeTx(tx.Hash(), true, true)
// reset the pool's internal state // reset the pool's internal state
resetState() resetState()
@ -676,7 +702,7 @@ func TestPostponing(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create two test accounts to produce different gap profiles with // Create two test accounts to produce different gap profiles with
@ -893,7 +919,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts and fund them (last one will be the local) // Create a number of test accounts and fund them (last one will be the local)
@ -986,7 +1012,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
config.NoLocals = nolocals config.NoLocals = nolocals
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create two test accounts to ensure remotes expire but locals do not // Create two test accounts to ensure remotes expire but locals do not
@ -1171,7 +1197,7 @@ func TestPendingGlobalLimiting(t *testing.T) {
config.GlobalSlots = config.AccountSlots * 10 config.GlobalSlots = config.AccountSlots * 10
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
@ -1275,7 +1301,7 @@ func TestCapClearsFromAll(t *testing.T) {
config.GlobalSlots = 8 config.GlobalSlots = 8
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
@ -1308,7 +1334,7 @@ func TestPendingMinimumAllowance(t *testing.T) {
config.GlobalSlots = 1 config.GlobalSlots = 1
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
@ -1354,7 +1380,7 @@ func TestRepricing(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Keep track of transaction events to ensure all executables get announced // Keep track of transaction events to ensure all executables get announced
@ -1603,7 +1629,7 @@ func TestRepricingKeepsLocals(t *testing.T) {
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
@ -1681,7 +1707,7 @@ func TestUnderpricing(t *testing.T) {
config.GlobalQueue = 2 config.GlobalQueue = 2
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Keep track of transaction events to ensure all executables get announced // Keep track of transaction events to ensure all executables get announced
@ -1796,7 +1822,7 @@ func TestStableUnderpricing(t *testing.T) {
config.GlobalQueue = 0 config.GlobalQueue = 0
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Keep track of transaction events to ensure all executables get announced // Keep track of transaction events to ensure all executables get announced
@ -2025,7 +2051,7 @@ func TestDeduplication(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create a test account to add transactions with // Create a test account to add transactions with
@ -2092,7 +2118,7 @@ func TestReplacement(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Keep track of transaction events to ensure all executables get announced // Keep track of transaction events to ensure all executables get announced
@ -2303,7 +2329,7 @@ func testJournaling(t *testing.T, nolocals bool) {
config.Rejournal = time.Second config.Rejournal = time.Second
pool := New(config, blockchain) pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
// Create two test accounts to ensure remotes expire but locals do not // Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey() local, _ := crypto.GenerateKey()
@ -2341,7 +2367,7 @@ func testJournaling(t *testing.T, nolocals bool) {
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain) pool = New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats() pending, queued = pool.Stats()
if queued != 0 { if queued != 0 {
@ -2368,7 +2394,7 @@ func testJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain) pool = New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats() pending, queued = pool.Stats()
if pending != 0 { if pending != 0 {
@ -2399,7 +2425,7 @@ func TestStatusCheck(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain) pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close() defer pool.Close()
// Create the test accounts to check various transaction statuses with // Create the test accounts to check various transaction statuses with

View File

@ -18,6 +18,7 @@ package txpool
import ( import (
"math/big" "math/big"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -36,6 +37,32 @@ type Transaction struct {
BlobTxProofs []kzg4844.Proof // Proofs needed by the blob pool BlobTxProofs []kzg4844.Proof // Proofs needed by the blob pool
} }
// LazyTransaction contains a small subset of the transaction properties that is
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *Transaction // Transaction if already resolved
Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
}
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
// maintained by the transaction pool.
func (ltx *LazyTransaction) Resolve() *Transaction {
if ltx.Tx == nil {
ltx.Tx = ltx.Pool.Get(ltx.Hash)
}
return ltx.Tx
}
// AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error
// SubPool represents a specialized transaction pool that lives on its own (e.g. // SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do // blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block // need to be updated in lockstep and assemble into one coherent view for block
@ -53,7 +80,7 @@ type SubPool interface {
// These should not be passed as a constructor argument - nor should the pools // These should not be passed as a constructor argument - nor should the pools
// start by themselves - in order to keep multiple subpools in lockstep with // start by themselves - in order to keep multiple subpools in lockstep with
// one another. // one another.
Init(gasTip *big.Int, head *types.Header) error Init(gasTip *big.Int, head *types.Header, reserve AddressReserver) error
// Close terminates any background processing threads and releases any held // Close terminates any background processing threads and releases any held
// resources. // resources.
@ -81,7 +108,7 @@ type SubPool interface {
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*types.Transaction Pending(enforceTips bool) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. // SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription

View File

@ -17,13 +17,17 @@
package txpool package txpool
import ( import (
"errors"
"fmt" "fmt"
"math/big" "math/big"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
) )
// TxStatus is the current status of a transaction as seen by the pool. // TxStatus is the current status of a transaction as seen by the pool.
@ -36,6 +40,15 @@ const (
TxStatusIncluded TxStatusIncluded
) )
var (
// reservationsGaugeName is the prefix of a per-subpool address reservation
// metric.
//
// This is mostly a sanity metric to ensure there's no bug that would make
// some subpool hog all the reservations due to mis-accounting.
reservationsGaugeName = "txpool/reservations"
)
// BlockChain defines the minimal set of methods needed to back a tx pool with // BlockChain defines the minimal set of methods needed to back a tx pool with
// a chain. Exists to allow mocking the live chain out of tests. // a chain. Exists to allow mocking the live chain out of tests.
type BlockChain interface { type BlockChain interface {
@ -52,9 +65,13 @@ type BlockChain interface {
// They exit the pool when they are included in the blockchain or evicted due to // They exit the pool when they are included in the blockchain or evicted due to
// resource constraints. // resource constraints.
type TxPool struct { type TxPool struct {
subpools []SubPool // List of subpools for specialized transaction handling subpools []SubPool // List of subpools for specialized transaction handling
subs event.SubscriptionScope // Subscription scope to unscubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater reservations map[common.Address]SubPool // Map with the account to pool reservations
reserveLock sync.Mutex // Lock protecting the account reservations
subs event.SubscriptionScope // Subscription scope to unscubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
} }
// New creates a new transaction pool to gather, sort and filter inbound // New creates a new transaction pool to gather, sort and filter inbound
@ -66,11 +83,12 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
head := chain.CurrentBlock() head := chain.CurrentBlock()
pool := &TxPool{ pool := &TxPool{
subpools: subpools, subpools: subpools,
quit: make(chan chan error), reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
} }
for i, subpool := range subpools { for i, subpool := range subpools {
if err := subpool.Init(gasTip, head); err != nil { if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
for j := i - 1; j >= 0; j-- { for j := i - 1; j >= 0; j-- {
subpools[j].Close() subpools[j].Close()
} }
@ -81,6 +99,52 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
return pool, nil return pool, nil
} }
// reserver is a method to create an address reservation callback to exclusively
// assign/deassign addresses to/from subpools. This can ensure that at any point
// in time, only a single subpool is able to manage an account, avoiding cross
// subpool eviction issues and nonce conflicts.
func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return func(addr common.Address, reserve bool) error {
p.reserveLock.Lock()
defer p.reserveLock.Unlock()
owner, exists := p.reservations[addr]
if reserve {
// Double reservations are forbidden even from the same pool to
// avoid subtle bugs in the long term.
if exists {
if owner == subpool {
log.Error("pool attempted to reserve already-owned address", "address", addr)
return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed
}
return errors.New("address already reserved")
}
p.reservations[addr] = subpool
if metrics.Enabled {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Inc(1)
}
return nil
}
// Ensure subpools only attempt to unreserve their own owned addresses,
// otherwise flag as a programming error.
if !exists {
log.Error("pool attempted to unreserve non-reserved address", "address", addr)
return errors.New("address not reserved")
}
if subpool != owner {
log.Error("pool attempted to unreserve non-owned address", "address", addr)
return errors.New("address not owned")
}
delete(p.reservations, addr)
if metrics.Enabled {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Dec(1)
}
return nil
}
}
// Close terminates the transaction pool and all its subpools. // Close terminates the transaction pool and all its subpools.
func (p *TxPool) Close() error { func (p *TxPool) Close() error {
var errs []error var errs []error
@ -242,8 +306,8 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*types.Transaction) txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools { for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(enforceTips) { for addr, set := range subpool.Pending(enforceTips) {
txs[addr] = set txs[addr] = set

View File

@ -166,6 +166,11 @@ type ValidationOptionsWithState struct {
// nonce gaps will be ignored and permitted. // nonce gaps will be ignored and permitted.
FirstNonceGap func(addr common.Address) uint64 FirstNonceGap func(addr common.Address) uint64
// UsedAndLeftSlots is a mandatory callback to retrieve the number of tx slots
// used and the number still permitted for an account. New transactions will
// be rejected once the number of remaining slots reaches zero.
UsedAndLeftSlots func(addr common.Address) (int, int)
// ExistingExpenditure is a mandatory callback to retrieve the cummulative // ExistingExpenditure is a mandatory callback to retrieve the cummulative
// cost of the already pooled transactions to check for overdrafts. // cost of the already pooled transactions to check for overdrafts.
ExistingExpenditure func(addr common.Address) *big.Int ExistingExpenditure func(addr common.Address) *big.Int
@ -220,6 +225,12 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
if balance.Cmp(need) < 0 { if balance.Cmp(need) < 0 {
return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, balance)) return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, balance))
} }
// Transaction takes a new nonce value out of the pool. Ensure it doesn't
// overflow the number of permitted transactions from a single accoun
// (i.e. max cancellable via out-of-bound transaction).
if used, left := opts.UsedAndLeftSlots(from); left <= 0 {
return fmt.Errorf("%w: pooled %d txs", ErrAccountLimitExceeded, used)
}
} }
return nil return nil
} }

View File

@ -18,7 +18,6 @@ package types
import ( import (
"bytes" "bytes"
"container/heap"
"errors" "errors"
"io" "io"
"math/big" "math/big"
@ -394,6 +393,19 @@ func (tx *Transaction) BlobGasFeeCapIntCmp(other *big.Int) int {
return tx.inner.blobGasFeeCap().Cmp(other) return tx.inner.blobGasFeeCap().Cmp(other)
} }
// SetTime sets the decoding time of a transaction. This is used by tests to set
// arbitrary times and by persistent transaction pools when loading old txs from
// disk.
func (tx *Transaction) SetTime(t time.Time) {
tx.time = t
}
// Time returns the time when the transaction was first seen on the network. It
// is a heuristic to prefer mining older txs vs new all other things equal.
func (tx *Transaction) Time() time.Time {
return tx.time
}
// Hash returns the transaction hash. // Hash returns the transaction hash.
func (tx *Transaction) Hash() common.Hash { func (tx *Transaction) Hash() common.Hash {
if hash := tx.hash.Load(); hash != nil { if hash := tx.hash.Load(); hash != nil {
@ -502,123 +514,6 @@ func (s TxByNonce) Len() int { return len(s) }
func (s TxByNonce) Less(i, j int) bool { return s[i].Nonce() < s[j].Nonce() } func (s TxByNonce) Less(i, j int) bool { return s[i].Nonce() < s[j].Nonce() }
func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// TxWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type TxWithMinerFee struct {
tx *Transaction
minerFee *big.Int
}
// NewTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap.
func NewTxWithMinerFee(tx *Transaction, baseFee *big.Int) (*TxWithMinerFee, error) {
minerFee, err := tx.EffectiveGasTip(baseFee)
if err != nil {
return nil, err
}
return &TxWithMinerFee{
tx: tx,
minerFee: minerFee,
}, nil
}
// TxByPriceAndTime implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type TxByPriceAndTime []*TxWithMinerFee
func (s TxByPriceAndTime) Len() int { return len(s) }
func (s TxByPriceAndTime) Less(i, j int) bool {
// If the prices are equal, use the time the transaction was first seen for
// deterministic sorting
cmp := s[i].minerFee.Cmp(s[j].minerFee)
if cmp == 0 {
return s[i].tx.time.Before(s[j].tx.time)
}
return cmp > 0
}
func (s TxByPriceAndTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s *TxByPriceAndTime) Push(x interface{}) {
*s = append(*s, x.(*TxWithMinerFee))
}
func (s *TxByPriceAndTime) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
old[n-1] = nil
*s = old[0 : n-1]
return x
}
// TransactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.Address][]*Transaction // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee
}
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address][]*Transaction, baseFee *big.Int) *TransactionsByPriceAndNonce {
// Initialize a price and received time based heap with the head transactions
heads := make(TxByPriceAndTime, 0, len(txs))
for from, accTxs := range txs {
acc, _ := Sender(signer, accTxs[0])
wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee)
// Remove transaction if sender doesn't match from, or if wrapping fails.
if acc != from || err != nil {
delete(txs, from)
continue
}
heads = append(heads, wrapped)
txs[from] = accTxs[1:]
}
heap.Init(&heads)
// Assemble and return the transaction set
return &TransactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
baseFee: baseFee,
}
}
// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
return nil
}
return t.heads[0].tx
}
// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsByPriceAndNonce) Shift() {
acc, _ := Sender(t.signer, t.heads[0].tx)
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
if wrapped, err := NewTxWithMinerFee(txs[0], t.baseFee); err == nil {
t.heads[0], t.txs[acc] = wrapped, txs[1:]
heap.Fix(&t.heads, 0)
return
}
}
heap.Pop(&t.heads)
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}
// copyAddressPtr copies an address. // copyAddressPtr copies an address.
func copyAddressPtr(a *common.Address) *common.Address { func copyAddressPtr(a *common.Address) *common.Address {
if a == nil { if a == nil {

View File

@ -23,10 +23,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"math/rand"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -259,152 +257,6 @@ func TestRecipientNormal(t *testing.T) {
} }
} }
func TestTransactionPriceNonceSortLegacy(t *testing.T) {
testTransactionPriceNonceSort(t, nil)
}
func TestTransactionPriceNonceSort1559(t *testing.T) {
testTransactionPriceNonceSort(t, big.NewInt(0))
testTransactionPriceNonceSort(t, big.NewInt(5))
testTransactionPriceNonceSort(t, big.NewInt(50))
}
// Tests that transactions can be correctly sorted according to their price in
// decreasing order, but at the same time with increasing nonces when issued by
// the same account.
func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 25)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := LatestSignerForChainID(common.Big1)
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address][]*Transaction{}
expectedCount := 0
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
count := 25
for i := 0; i < 25; i++ {
var tx *Transaction
gasFeeCap := rand.Intn(50)
if baseFee == nil {
tx = NewTx(&LegacyTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasPrice: big.NewInt(int64(gasFeeCap)),
Data: nil,
})
} else {
tx = NewTx(&DynamicFeeTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasFeeCap: big.NewInt(int64(gasFeeCap)),
GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))),
Data: nil,
})
if count == 25 && int64(gasFeeCap) < baseFee.Int64() {
count = i
}
}
tx, err := SignTx(tx, signer, key)
if err != nil {
t.Fatalf("failed to sign tx: %s", err)
}
groups[addr] = append(groups[addr], tx)
}
expectedCount += count
}
// Sort the transactions and cross check the nonce ordering
txset := NewTransactionsByPriceAndNonce(signer, groups, baseFee)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)
txset.Shift()
}
if len(txs) != expectedCount {
t.Errorf("expected %d transactions, found %d", expectedCount, len(txs))
}
for i, txi := range txs {
fromi, _ := Sender(signer, txi)
// Make sure the nonce order is valid
for j, txj := range txs[i+1:] {
fromj, _ := Sender(signer, txj)
if fromi == fromj && txi.Nonce() > txj.Nonce() {
t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce())
}
}
// If the next tx has different from account, the price must be lower than the current one
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := Sender(signer, next)
tip, err := txi.EffectiveGasTip(baseFee)
nextTip, nextErr := next.EffectiveGasTip(baseFee)
if err != nil || nextErr != nil {
t.Errorf("error calculating effective tip")
}
if fromi != fromNext && tip.Cmp(nextTip) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
}
}
}
// Tests that if multiple transactions have the same price, the ones seen earlier
// are prioritized to avoid network spam attacks aiming for a specific ordering.
func TestTransactionTimeSort(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx.time = time.Unix(0, int64(len(keys)-start))
groups[addr] = append(groups[addr], tx)
}
// Sort the transactions and cross check the nonce ordering
txset := NewTransactionsByPriceAndNonce(signer, groups, nil)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)
txset.Shift()
}
if len(txs) != len(keys) {
t.Errorf("expected %d transactions, found %d", len(keys), len(txs))
}
for i, txi := range txs {
fromi, _ := Sender(signer, txi)
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := Sender(signer, next)
if txi.GasPrice().Cmp(next.GasPrice()) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
// Make sure time order is ascending if the txs have the same gas price
if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.time.After(next.time) {
t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.time, i+1, fromNext[:4], next.time)
}
}
}
}
// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) { func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey() key, err := crypto.GenerateKey()

View File

@ -301,7 +301,11 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false) pending := b.eth.txPool.Pending(false)
var txs types.Transactions var txs types.Transactions
for _, batch := range pending { for _, batch := range pending {
txs = append(txs, batch...) for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {
txs = append(txs, tx.Tx)
}
}
} }
return txs, nil return txs, nil
} }

View File

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -206,12 +207,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
} }
eth.bloomIndexer.Start(eth.blockchain) eth.bloomIndexer.Start(eth.blockchain)
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
}
blobPool := blobpool.New(config.BlobPool, eth.blockchain)
if config.TxPool.Journal != "" { if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
} }
legacyPool := legacypool.New(config.TxPool, eth.blockchain) legacyPool := legacypool.New(config.TxPool, eth.blockchain)
eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool}) eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/gasprice"
@ -69,6 +70,7 @@ var Defaults = Config{
FilterLogCacheSize: 32, FilterLogCacheSize: 32,
Miner: miner.DefaultConfig, Miner: miner.DefaultConfig,
TxPool: legacypool.DefaultConfig, TxPool: legacypool.DefaultConfig,
BlobPool: blobpool.DefaultConfig,
RPCGasCap: 50000000, RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second, RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO, GPO: FullNodeGPO,
@ -129,7 +131,8 @@ type Config struct {
Miner miner.Config Miner miner.Config
// Transaction pool options // Transaction pool options
TxPool legacypool.Config TxPool legacypool.Config
BlobPool blobpool.Config
// Gas Price Oracle options // Gas Price Oracle options
GPO gasprice.Config GPO gasprice.Config

View File

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/gasprice"
@ -43,6 +44,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
FilterLogCacheSize int FilterLogCacheSize int
Miner miner.Config Miner miner.Config
TxPool legacypool.Config TxPool legacypool.Config
BlobPool blobpool.Config
GPO gasprice.Config GPO gasprice.Config
EnablePreimageRecording bool EnablePreimageRecording bool
DocRoot string `toml:"-"` DocRoot string `toml:"-"`
@ -80,6 +82,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.FilterLogCacheSize = c.FilterLogCacheSize enc.FilterLogCacheSize = c.FilterLogCacheSize
enc.Miner = c.Miner enc.Miner = c.Miner
enc.TxPool = c.TxPool enc.TxPool = c.TxPool
enc.BlobPool = c.BlobPool
enc.GPO = c.GPO enc.GPO = c.GPO
enc.EnablePreimageRecording = c.EnablePreimageRecording enc.EnablePreimageRecording = c.EnablePreimageRecording
enc.DocRoot = c.DocRoot enc.DocRoot = c.DocRoot
@ -121,6 +124,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
FilterLogCacheSize *int FilterLogCacheSize *int
Miner *miner.Config Miner *miner.Config
TxPool *legacypool.Config TxPool *legacypool.Config
BlobPool *blobpool.Config
GPO *gasprice.Config GPO *gasprice.Config
EnablePreimageRecording *bool EnablePreimageRecording *bool
DocRoot *string `toml:"-"` DocRoot *string `toml:"-"`
@ -215,6 +219,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.TxPool != nil { if dec.TxPool != nil {
c.TxPool = *dec.TxPool c.TxPool = *dec.TxPool
} }
if dec.BlobPool != nil {
c.BlobPool = *dec.BlobPool
}
if dec.GPO != nil { if dec.GPO != nil {
c.GPO = *dec.GPO c.GPO = *dec.GPO
} }

View File

@ -73,7 +73,7 @@ type txPool interface {
// Pending should return pending transactions. // Pending should return pending transactions.
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*types.Transaction Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
// SubscribeNewTxsEvent should return an event subscription of // SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel. // NewTxsEvent and send events to the given channel.

View File

@ -101,7 +101,7 @@ func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []err
} }
// Pending returns all the transactions known to the pool // Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction { func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -113,7 +113,19 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Trans
for _, batch := range batches { for _, batch := range batches {
sort.Sort(types.TxByNonce(batch)) sort.Sort(types.TxByNonce(batch))
} }
return batches pending := make(map[common.Address][]*txpool.LazyTransaction)
for addr, batch := range batches {
for _, tx := range batch {
pending[addr] = append(pending[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
}
return pending
} }
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and // SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and

View File

@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -36,27 +35,15 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer. // syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) { func (h *handler) syncTransactions(p *eth.Peer) {
// Assemble the set of transaction to broadcast or announce to the remote var hashes []common.Hash
// peer. Fun fact, this is quite an expensive operation as it needs to sort for _, batch := range h.txpool.Pending(false) {
// the transactions if the sorting is not cached yet. However, with a random for _, tx := range batch {
// order, insertions could overflow the non-executable queues and get dropped. hashes = append(hashes, tx.Hash)
// }
// TODO(karalabe): Figure out if we could get away with random order somehow
var txs types.Transactions
pending := h.txpool.Pending(false)
for _, batch := range pending {
txs = append(txs, batch...)
} }
if len(txs) == 0 { if len(hashes) == 0 {
return return
} }
// The eth/65 protocol introduces proper transaction announcements, so instead
// of dripping transactions across multiple peers, just send the entire list as
// an announcement and let the remote side decide what they need (likely nothing).
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
}
p.AsyncSendPooledTransactionHashes(hashes) p.AsyncSendPooledTransactionHashes(hashes)
} }

5
go.mod
View File

@ -36,8 +36,9 @@ require (
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v1.3.0 github.com/graph-gophers/graphql-go v1.3.0
github.com/hashicorp/go-bexpr v0.1.10 github.com/hashicorp/go-bexpr v0.1.10
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7
github.com/holiman/bloomfilter/v2 v2.0.3 github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c github.com/holiman/uint256 v1.2.3
github.com/huin/goupnp v1.0.3 github.com/huin/goupnp v1.0.3
github.com/influxdata/influxdb-client-go/v2 v2.4.0 github.com/influxdata/influxdb-client-go/v2 v2.4.0
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
@ -59,7 +60,7 @@ require (
github.com/supranational/blst v0.3.11-0.20230406105308-e9dfc5ee724b github.com/supranational/blst v0.3.11-0.20230406105308-e9dfc5ee724b
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tyler-smith/go-bip39 v1.1.0 github.com/tyler-smith/go-bip39 v1.1.0
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa github.com/urfave/cli/v2 v2.24.1
go.uber.org/automaxprocs v1.5.2 go.uber.org/automaxprocs v1.5.2
golang.org/x/crypto v0.9.0 golang.org/x/crypto v0.9.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc

12
go.sum
View File

@ -7,7 +7,7 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSu
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 h1:Px2UA+2RvSSvv+RvJNuUB6n7rs5Wsel4dXLe90Um2n4= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 h1:Px2UA+2RvSSvv+RvJNuUB6n7rs5Wsel4dXLe90Um2n4=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0= github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
@ -233,10 +233,12 @@ github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpx
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c h1:DZfsyhDK1hnSS5lH8l+JggqzEleHteTYfutAiVlSUM8= github.com/holiman/uint256 v1.2.3 h1:K8UWO1HUJpRMXBxbmaY1Y8IAMZC/RsKB+ArEnnK4l5o=
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c/go.mod h1:SC8Ryt4n+UBbPbIBKaG9zbbDlp4jOru9xFZmPzLUTxw= github.com/holiman/uint256 v1.2.3/go.mod h1:SC8Ryt4n+UBbPbIBKaG9zbbDlp4jOru9xFZmPzLUTxw=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ=
github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixHM7Y= github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixHM7Y=
@ -439,8 +441,8 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa h1:5SqCsI/2Qya2bCzK15ozrqo2sZxkh0FHynJZOTVoV6Q= github.com/urfave/cli/v2 v2.24.1 h1:/QYYr7g0EhwXEML8jO+8OYt5trPnLHS0p3mrgExJ5NU=
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/urfave/cli/v2 v2.24.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=

147
miner/ordering.go Normal file
View File

@ -0,0 +1,147 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"container/heap"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
)
// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type txWithMinerFee struct {
tx *txpool.LazyTransaction
from common.Address
fees *big.Int
}
// newTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap.
func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *big.Int) (*txWithMinerFee, error) {
tip := new(big.Int).Set(tx.GasTipCap)
if baseFee != nil {
if tx.GasFeeCap.Cmp(baseFee) < 0 {
return nil, types.ErrGasFeeCapTooLow
}
tip = math.BigMin(tx.GasTipCap, new(big.Int).Sub(tx.GasFeeCap, baseFee))
}
return &txWithMinerFee{
tx: tx,
from: from,
fees: tip,
}, nil
}
// txByPriceAndTime implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type txByPriceAndTime []*txWithMinerFee
func (s txByPriceAndTime) Len() int { return len(s) }
func (s txByPriceAndTime) Less(i, j int) bool {
// If the prices are equal, use the time the transaction was first seen for
// deterministic sorting
cmp := s[i].fees.Cmp(s[j].fees)
if cmp == 0 {
return s[i].tx.Time.Before(s[j].tx.Time)
}
return cmp > 0
}
func (s txByPriceAndTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s *txByPriceAndTime) Push(x interface{}) {
*s = append(*s, x.(*txWithMinerFee))
}
func (s *txByPriceAndTime) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
old[n-1] = nil
*s = old[0 : n-1]
return x
}
// transactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type transactionsByPriceAndNonce struct {
txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions
heads txByPriceAndTime // Next transaction for each unique account (price heap)
signer types.Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee
}
// newTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce {
// Initialize a price and received time based heap with the head transactions
heads := make(txByPriceAndTime, 0, len(txs))
for from, accTxs := range txs {
wrapped, err := newTxWithMinerFee(accTxs[0], from, baseFee)
if err != nil {
delete(txs, from)
continue
}
heads = append(heads, wrapped)
txs[from] = accTxs[1:]
}
heap.Init(&heads)
// Assemble and return the transaction set
return &transactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
baseFee: baseFee,
}
}
// Peek returns the next transaction by price.
func (t *transactionsByPriceAndNonce) Peek() *txpool.LazyTransaction {
if len(t.heads) == 0 {
return nil
}
return t.heads[0].tx
}
// Shift replaces the current best head with the next one from the same account.
func (t *transactionsByPriceAndNonce) Shift() {
acc := t.heads[0].from
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
if wrapped, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil {
t.heads[0], t.txs[acc] = wrapped, txs[1:]
heap.Fix(&t.heads, 0)
return
}
}
heap.Pop(&t.heads)
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *transactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

188
miner/ordering_test.go Normal file
View File

@ -0,0 +1,188 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"crypto/ecdsa"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
func TestTransactionPriceNonceSortLegacy(t *testing.T) {
testTransactionPriceNonceSort(t, nil)
}
func TestTransactionPriceNonceSort1559(t *testing.T) {
testTransactionPriceNonceSort(t, big.NewInt(0))
testTransactionPriceNonceSort(t, big.NewInt(5))
testTransactionPriceNonceSort(t, big.NewInt(50))
}
// Tests that transactions can be correctly sorted according to their price in
// decreasing order, but at the same time with increasing nonces when issued by
// the same account.
func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 25)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.LatestSignerForChainID(common.Big1)
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address][]*txpool.LazyTransaction{}
expectedCount := 0
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
count := 25
for i := 0; i < 25; i++ {
var tx *types.Transaction
gasFeeCap := rand.Intn(50)
if baseFee == nil {
tx = types.NewTx(&types.LegacyTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasPrice: big.NewInt(int64(gasFeeCap)),
Data: nil,
})
} else {
tx = types.NewTx(&types.DynamicFeeTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasFeeCap: big.NewInt(int64(gasFeeCap)),
GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))),
Data: nil,
})
if count == 25 && int64(gasFeeCap) < baseFee.Int64() {
count = i
}
}
tx, err := types.SignTx(tx, signer, key)
if err != nil {
t.Fatalf("failed to sign tx: %s", err)
}
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
expectedCount += count
}
// Sort the transactions and cross check the nonce ordering
txset := newTransactionsByPriceAndNonce(signer, groups, baseFee)
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txset.Shift()
}
if len(txs) != expectedCount {
t.Errorf("expected %d transactions, found %d", expectedCount, len(txs))
}
for i, txi := range txs {
fromi, _ := types.Sender(signer, txi)
// Make sure the nonce order is valid
for j, txj := range txs[i+1:] {
fromj, _ := types.Sender(signer, txj)
if fromi == fromj && txi.Nonce() > txj.Nonce() {
t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce())
}
}
// If the next tx has different from account, the price must be lower than the current one
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := types.Sender(signer, next)
tip, err := txi.EffectiveGasTip(baseFee)
nextTip, nextErr := next.EffectiveGasTip(baseFee)
if err != nil || nextErr != nil {
t.Errorf("error calculating effective tip: %v, %v", err, nextErr)
}
if fromi != fromNext && tip.Cmp(nextTip) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
}
}
}
// Tests that if multiple transactions have the same price, the ones seen earlier
// are prioritized to avoid network spam attacks aiming for a specific ordering.
func TestTransactionTimeSort(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address][]*txpool.LazyTransaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx.SetTime(time.Unix(0, int64(len(keys)-start)))
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
// Sort the transactions and cross check the nonce ordering
txset := newTransactionsByPriceAndNonce(signer, groups, nil)
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txset.Shift()
}
if len(txs) != len(keys) {
t.Errorf("expected %d transactions, found %d", len(keys), len(txs))
}
for i, txi := range txs {
fromi, _ := types.Sender(signer, txi)
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := types.Sender(signer, next)
if txi.GasPrice().Cmp(next.GasPrice()) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
// Make sure time order is ascending if the txs have the same gas price
if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.Time().After(next.Time()) {
t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.Time(), i+1, fromNext[:4], next.Time())
}
}
}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -533,12 +534,18 @@ func (w *worker) mainLoop() {
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue continue
} }
txs := make(map[common.Address][]*types.Transaction, len(ev.Txs)) txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs))
for _, tx := range ev.Txs { for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx) acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx) txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
} }
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil) w.commitTransactions(w.current, txset, nil)
@ -727,24 +734,24 @@ func (w *worker) updateSnapshot(env *environment) {
w.snapshotState = env.state.Copy() w.snapshotState = env.state.Copy()
} }
func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { func (w *worker) commitTransaction(env *environment, tx *txpool.Transaction) ([]*types.Log, error) {
var ( var (
snap = env.state.Snapshot() snap = env.state.Snapshot()
gp = env.gasPool.Gas() gp = env.gasPool.Gas()
) )
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig()) receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx.Tx, &env.header.GasUsed, *w.chain.GetVMConfig())
if err != nil { if err != nil {
env.state.RevertToSnapshot(snap) env.state.RevertToSnapshot(snap)
env.gasPool.SetGas(gp) env.gasPool.SetGas(gp)
return nil, err return nil, err
} }
env.txs = append(env.txs, tx) env.txs = append(env.txs, tx.Tx)
env.receipts = append(env.receipts, receipt) env.receipts = append(env.receipts, receipt)
return receipt.Logs, nil return receipt.Logs, nil
} }
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error { func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit gasLimit := env.header.GasLimit
if env.gasPool == nil { if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit) env.gasPool = new(core.GasPool).AddGas(gasLimit)
@ -764,30 +771,37 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
break break
} }
// Retrieve the next transaction and abort if all done. // Retrieve the next transaction and abort if all done.
tx := txs.Peek() ltx := txs.Peek()
if tx == nil { if ltx == nil {
break break
} }
tx := ltx.Resolve()
if tx == nil {
log.Warn("Ignoring evicted transaction")
txs.Pop()
continue
}
// Error may be ignored here. The error has already been checked // Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool. // during transaction acceptance is the transaction pool.
from, _ := types.Sender(env.signer, tx) from, _ := types.Sender(env.signer, tx.Tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf // Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do. // phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { if tx.Tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) log.Trace("Ignoring reply protected transaction", "hash", tx.Tx.Hash(), "eip155", w.chainConfig.EIP155Block)
txs.Pop() txs.Pop()
continue continue
} }
// Start executing the transaction // Start executing the transaction
env.state.SetTxContext(tx.Hash(), env.tcount) env.state.SetTxContext(tx.Tx.Hash(), env.tcount)
logs, err := w.commitTransaction(env, tx) logs, err := w.commitTransaction(env, tx)
switch { switch {
case errors.Is(err, core.ErrNonceTooLow): case errors.Is(err, core.ErrNonceTooLow):
// New head notification data race between the transaction pool and miner, shift // New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Tx.Nonce())
txs.Shift() txs.Shift()
case errors.Is(err, nil): case errors.Is(err, nil):
@ -799,7 +813,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
default: default:
// Transaction is regarded as invalid, drop all consecutive transactions from // Transaction is regarded as invalid, drop all consecutive transactions from
// the same sender because of `nonce-too-high` clause. // the same sender because of `nonce-too-high` clause.
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) log.Debug("Transaction failed, account skipped", "hash", tx.Tx.Hash(), "err", err)
txs.Pop() txs.Pop()
} }
} }
@ -905,7 +919,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
// Fill the block with all available pending transactions. // Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true) pending := w.eth.TxPool().Pending(true)
localTxs, remoteTxs := make(map[common.Address][]*types.Transaction), pending localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
for _, account := range w.eth.TxPool().Locals() { for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 { if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account) delete(remoteTxs, account)
@ -913,13 +927,13 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
} }
} }
if len(localTxs) > 0 { if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt); err != nil { if err := w.commitTransactions(env, txs, interrupt); err != nil {
return err return err
} }
} }
if len(remoteTxs) > 0 { if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt); err != nil { if err := w.commitTransactions(env, txs, interrupt); err != nil {
return err return err
} }