eth/handler, broadcast: optimize tx broadcast mechanism (#22176)
This PR optimizes the broadcast loop. Instead of iterating twice through a given set of transactions to weed out which peers have and which do not have a tx, to send/announce transactions, we do it only once.
This commit is contained in:
parent
1489c3f494
commit
e01096f531
@ -456,44 +456,51 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
|
||||
// BroadcastTransactions will propagate a batch of transactions
|
||||
// - To a square root of all peers
|
||||
// - And, separately, as announcements to all peers which are not known to
|
||||
// already have the given transaction.
|
||||
func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) {
|
||||
func (h *handler) BroadcastTransactions(txs types.Transactions) {
|
||||
var (
|
||||
txset = make(map[*ethPeer][]common.Hash)
|
||||
annos = make(map[*ethPeer][]common.Hash)
|
||||
annoCount int // Count of announcements made
|
||||
annoPeers int
|
||||
directCount int // Count of the txs sent directly to peers
|
||||
directPeers int // Count of the peers that were sent transactions directly
|
||||
|
||||
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
|
||||
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
|
||||
|
||||
)
|
||||
// Broadcast transactions to a batch of peers not knowing about it
|
||||
if propagate {
|
||||
for _, tx := range txs {
|
||||
peers := h.peers.peersWithoutTransaction(tx.Hash())
|
||||
|
||||
// Send the block to a subset of our peers
|
||||
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
|
||||
for _, peer := range transfer {
|
||||
// Send the tx unconditionally to a subset of our peers
|
||||
numDirect := int(math.Sqrt(float64(len(peers))))
|
||||
for _, peer := range peers[:numDirect] {
|
||||
txset[peer] = append(txset[peer], tx.Hash())
|
||||
}
|
||||
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer))
|
||||
}
|
||||
for peer, hashes := range txset {
|
||||
peer.AsyncSendTransactions(hashes)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Otherwise only broadcast the announcement to peers
|
||||
for _, tx := range txs {
|
||||
peers := h.peers.peersWithoutTransaction(tx.Hash())
|
||||
for _, peer := range peers {
|
||||
// For the remaining peers, send announcement only
|
||||
for _, peer := range peers[numDirect:] {
|
||||
annos[peer] = append(annos[peer], tx.Hash())
|
||||
}
|
||||
}
|
||||
for peer, hashes := range txset {
|
||||
directPeers++
|
||||
directCount += len(hashes)
|
||||
peer.AsyncSendTransactions(hashes)
|
||||
}
|
||||
for peer, hashes := range annos {
|
||||
annoPeers++
|
||||
annoCount += len(hashes)
|
||||
if peer.Version() >= eth.ETH65 {
|
||||
peer.AsyncSendPooledTransactionHashes(hashes)
|
||||
} else {
|
||||
peer.AsyncSendTransactions(hashes)
|
||||
}
|
||||
}
|
||||
log.Debug("Transaction broadcast", "txs", len(txs),
|
||||
"announce packs", annoPeers, "announced hashes", annoCount,
|
||||
"tx packs", directPeers, "broadcast txs", directCount)
|
||||
}
|
||||
|
||||
// minedBroadcastLoop sends mined blocks to connected peers.
|
||||
@ -511,13 +518,10 @@ func (h *handler) minedBroadcastLoop() {
|
||||
// txBroadcastLoop announces new transactions to connected peers.
|
||||
func (h *handler) txBroadcastLoop() {
|
||||
defer h.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-h.txsCh:
|
||||
h.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
|
||||
h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
|
||||
|
||||
h.BroadcastTransactions(event.Txs)
|
||||
case <-h.txsSub.Err():
|
||||
return
|
||||
}
|
||||
|
@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() {
|
||||
if done == nil && len(queue) > 0 {
|
||||
// Pile transaction hashes until we reach our allowed network limit
|
||||
var (
|
||||
hashes []common.Hash
|
||||
count int
|
||||
pending []common.Hash
|
||||
size common.StorageSize
|
||||
)
|
||||
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
|
||||
if p.txpool.Get(queue[i]) != nil {
|
||||
pending = append(pending, queue[i])
|
||||
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
|
||||
if p.txpool.Get(queue[count]) != nil {
|
||||
pending = append(pending, queue[count])
|
||||
size += common.HashLength
|
||||
}
|
||||
hashes = append(hashes, queue[i])
|
||||
}
|
||||
queue = queue[:copy(queue, queue[len(hashes):])]
|
||||
// Shift and trim queue
|
||||
queue = queue[:copy(queue, queue[count:])]
|
||||
|
||||
// If there's anything available to transfer, fire up an async writer
|
||||
if len(pending) > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user