eth/fetcher: allow underpriced transactions in after timeout (#28097)

This PR will allow a previously underpriced transaction back in after a timeout
of 5 minutes. This will block most transaction spam but allow for transactions to
be re-broadcasted on networks with less transaction flow.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Marius van der Wijden 2023-09-26 13:12:44 +02:00 committed by GitHub
parent 40219109b0
commit 2b7bc2c36b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 18 deletions

View File

@ -24,8 +24,8 @@ import (
"sort" "sort"
"time" "time"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -53,6 +53,9 @@ const (
// re-request them. // re-request them.
maxTxUnderpricedSetSize = 32768 maxTxUnderpricedSetSize = 32768
// maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set.
maxTxUnderpricedTimeout = int64(5 * time.Minute)
// txArriveTimeout is the time allowance before an announced transaction is // txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested. // explicitly requested.
txArriveTimeout = 500 * time.Millisecond txArriveTimeout = 500 * time.Millisecond
@ -148,7 +151,7 @@ type TxFetcher struct {
drop chan *txDrop drop chan *txDrop
quit chan struct{} quit chan struct{}
underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch) underpriced *lru.Cache[common.Hash, int64] // Transactions discarded as too cheap (don't re-fetch)
// Stage 1: Waiting lists for newly discovered transactions that might be // Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips. // broadcast without needing explicit request/reply round trips.
@ -202,7 +205,7 @@ func NewTxFetcherForTests(
fetching: make(map[common.Hash]string), fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest), requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}), alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet[common.Hash](), underpriced: lru.NewCache[common.Hash, int64](maxTxUnderpricedSetSize),
hasTx: hasTx, hasTx: hasTx,
addTxs: addTxs, addTxs: addTxs,
fetchTxs: fetchTxs, fetchTxs: fetchTxs,
@ -224,16 +227,15 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// loop, so anything caught here is time saved internally. // loop, so anything caught here is time saved internally.
var ( var (
unknowns = make([]common.Hash, 0, len(hashes)) unknowns = make([]common.Hash, 0, len(hashes))
duplicate, underpriced int64 duplicate int64
underpriced int64
) )
for _, hash := range hashes { for _, hash := range hashes {
switch { switch {
case f.hasTx(hash): case f.hasTx(hash):
duplicate++ duplicate++
case f.isKnownUnderpriced(hash):
case f.underpriced.Contains(hash):
underpriced++ underpriced++
default: default:
unknowns = append(unknowns, hash) unknowns = append(unknowns, hash)
} }
@ -245,10 +247,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
if len(unknowns) == 0 { if len(unknowns) == 0 {
return nil return nil
} }
announce := &txAnnounce{ announce := &txAnnounce{origin: peer, hashes: unknowns}
origin: peer,
hashes: unknowns,
}
select { select {
case f.notify <- announce: case f.notify <- announce:
return nil return nil
@ -257,6 +256,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
} }
} }
// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced.
func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool {
prevTime, ok := f.underpriced.Peek(hash)
if ok && prevTime+maxTxUnderpricedTimeout < time.Now().Unix() {
f.underpriced.Remove(hash)
return false
}
return ok
}
// Enqueue imports a batch of received transaction into the transaction pool // Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and // and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can // direct request replies. The differentiation is important so the fetcher can
@ -300,10 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// Avoid re-request this transaction when we receive another // Avoid re-request this transaction when we receive another
// announcement. // announcement.
if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) { if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { f.underpriced.Add(batch[j].Hash(), batch[j].Time().Unix())
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
} }
// Track a few interesting failure types // Track a few interesting failure types
switch { switch {

View File

@ -1509,8 +1509,8 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
} }
case isUnderpriced: case isUnderpriced:
if fetcher.underpriced.Cardinality() != int(step) { if fetcher.underpriced.Len() != int(step) {
t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Cardinality(), step) t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Len(), step)
} }
default: default: