eth/fetcher: throttle tx fetches to 128KB responses (#28304)

* eth/fetcher: throttle tx fetches to 128KB responses

* eth/fetcher: unindent a clause per review request
This commit is contained in:
Péter Szilágyi 2023-10-11 10:50:09 +03:00 committed by GitHub
parent 7776a3214a
commit a6deb2d994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 36 deletions

View File

@ -39,16 +39,22 @@ const (
// can announce in a short time. // can announce in a short time.
maxTxAnnounces = 4096 maxTxAnnounces = 4096
// maxTxRetrievals is the maximum transaction number can be fetched in one // maxTxRetrievals is the maximum number of transactions that can be fetched
// request. The rationale to pick 256 is: // in one request. The rationale for picking 256 is to have a reasonabe lower
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to // bound for the transferred data (don't waste RTTs, transfer more meaningful
// Etherscan the average transaction size is around 200B, so in theory // batch sizes), but also have an upper bound on the sequentiality to allow
// we can include lots of transaction in a single protocol packet. // using our entire peerset for deliveries.
// - However the maximum size of a single transaction is raised to 128KB, //
// so pick a middle value here to ensure we can maximize the efficiency // This number also acts as a failsafe against malicious announces which might
// of the retrieval and response size overflow won't happen in most cases. // cause us to request more data than we'd expect.
maxTxRetrievals = 256 maxTxRetrievals = 256
// maxTxRetrievalSize is the max number of bytes that delivered transactions
// should weigh according to the announcements. The 128KB was chosen to limit
// retrieving a maximum of one blob transaction at a time to minimize hogging
// a connection between two peers.
maxTxRetrievalSize = 128 * 1024
// maxTxUnderpricedSetSize is the size of the underpriced transaction set that // maxTxUnderpricedSetSize is the size of the underpriced transaction set that
// is used to track recent transactions that have been dropped so we don't // is used to track recent transactions that have been dropped so we don't
// re-request them. // re-request them.
@ -859,9 +865,15 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(f.announces[peer]) == 0 { if len(f.announces[peer]) == 0 {
return // continue in the for-each return // continue in the for-each
} }
hashes := make([]common.Hash, 0, maxTxRetrievals) var (
f.forEachHash(f.announces[peer], func(hash common.Hash) bool { hashes = make([]common.Hash, 0, maxTxRetrievals)
if _, ok := f.fetching[hash]; !ok { bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
// If the transaction is alcear fetching, skip to the next one
if _, ok := f.fetching[hash]; ok {
return true
}
// Mark the hash as fetching and stash away possible alternates // Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer f.fetching[hash] = peer
@ -876,8 +888,13 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(hashes) >= maxTxRetrievals { if len(hashes) >= maxTxRetrievals {
return false // break in the for-each return false // break in the for-each
} }
if meta != nil { // Only set eth/68 and upwards
bytes += uint64(meta.size)
if bytes >= maxTxRetrievalSize {
return false
} }
return true // continue in the for-each }
return true // scheduled, try to add more
}) })
// If any hashes were allocated, request them from the peer // If any hashes were allocated, request them from the peer
if len(hashes) > 0 { if len(hashes) > 0 {
@ -922,27 +939,28 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
} }
} }
// forEachHash does a range loop over a map of hashes in production, but during // forEachAnnounce does a range loop over a map of announcements in production,
// testing it does a deterministic sorted random to allow reproducing issues. // but during testing it does a deterministic sorted random to allow reproducing
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) { // issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us // If we're running production, use whatever Go's map gives us
if f.rand == nil { if f.rand == nil {
for hash := range hashes { for hash, meta := range announces {
if !do(hash) { if !do(hash, meta) {
return return
} }
} }
return return
} }
// We're running the test suite, make iteration deterministic // We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(hashes)) list := make([]common.Hash, 0, len(announces))
for hash := range hashes { for hash := range announces {
list = append(list, hash) list = append(list, hash)
} }
sortHashes(list) sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list))) rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list { for _, hash := range list {
if !do(hash) { if !do(hash, announces[hash]) {
return return
} }
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
) )
var ( var (
@ -993,15 +994,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
}) })
} }
// Tests that if thousands of transactions are announces, only a small // Tests that if thousands of transactions are announced, only a small
// number of them will be requested at a time. // number of them will be requested at a time.
func TestTransactionFetcherRateLimiting(t *testing.T) { func TestTransactionFetcherRateLimiting(t *testing.T) {
// Create a slew of transactions and to announce them // Create a slew of transactions and announce them
var hashes []common.Hash var hashes []common.Hash
for i := 0; i < maxTxAnnounces; i++ { for i := 0; i < maxTxAnnounces; i++ {
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)}) hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
} }
testTransactionFetcherParallel(t, txFetcherTest{ testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher { init: func() *TxFetcher {
return NewTxFetcher( return NewTxFetcher(
@ -1029,6 +1029,68 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
}) })
} }
// Tests that if huge transactions are announced, only a small number of them will
// be requested at a time, to keep the responses below a resonable level.
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce mid size transactions from A to verify that multiple
// ones can be piled into a single request.
doTxNotify{peer: "A",
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
},
// Announce exactly on the limit transactions to see that only one
// gets requested
doTxNotify{peer: "B",
hashes: []common.Hash{{0x05}, {0x06}},
types: []byte{types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
},
// Announce oversized blob transactions to see that overflows are ok
doTxNotify{peer: "C",
hashes: []common.Hash{{0x07}, {0x08}},
types: []byte{types.BlobTxType, types.BlobTxType},
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
},
"B": {
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
},
"C": {
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
},
},
fetching: map[string][]common.Hash{
"A": {{0x02}, {0x03}, {0x04}},
"B": {{0x06}},
"C": {{0x08}},
},
},
},
})
}
// Tests that then number of transactions a peer is allowed to announce and/or // Tests that then number of transactions a peer is allowed to announce and/or
// request at the same time is hard capped. // request at the same time is hard capped.
func TestTransactionFetcherDoSProtection(t *testing.T) { func TestTransactionFetcherDoSProtection(t *testing.T) {
@ -1664,7 +1726,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) || if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) || (meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) { (meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
} }
} }
} }
@ -1733,7 +1795,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) || if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) || (meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) { (meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
} }
} }
} }