eth/fetcher: don't spend too much time on transaction inclusion (#25524)

* eth/fetcher: introduce some lag in tx fetching

* eth/fetcher: change conditions a bit

* eth/fetcher: use per-batch quota check

* eth/fetcher: fix some comments

* eth/fetcher: address review concerns

* eth/fetcher: fix panic + add warn log

* eth/fetcher: fix log

* eth/fetcher: fix log

* cmd/devp2p/internal/ethtest: fix ignorign tx announcements from prev. tests

* cmd/devp2p/internal/ethtest: fix TestLargeTxRequest

This increases the number of tx relay messages the test waits for. Since
go-ethereum now processes incoming txs in smaller batches, the
announcement messages it sends are also smaller.

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Martin Holst Swende 2022-08-19 15:59:36 +02:00 committed by GitHub
parent ac7ad811b4
commit 0ce494b60c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 43 deletions

View File

@ -357,9 +357,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash) return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
} }
return nil return nil
// ignore tx announcements from previous tests
case *NewPooledTransactionHashes: case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
continue continue
case *Transactions:
continue
default: default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg)) return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
} }

View File

@ -544,9 +544,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket)) t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket))
} }
return return
// ignore propagated txs from previous tests // ignore propagated txs from previous tests
case *NewPooledTransactionHashes: case *NewPooledTransactionHashes:
continue continue
case *Transactions:
continue
// ignore block announcements from previous tests // ignore block announcements from previous tests
case *NewBlockHashes: case *NewBlockHashes:
continue continue

View File

@ -29,7 +29,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
) )
//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7") // var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
func (s *Suite) sendSuccessfulTxs(t *utesting.T) error { func (s *Suite) sendSuccessfulTxs(t *utesting.T) error {
@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
nonce = txs[len(txs)-1].Nonce() nonce = txs[len(txs)-1].Nonce()
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated. // Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within 3 announcements. // all txs should be announced within a couple announcements.
recvHashes := make([]common.Hash, 0) recvHashes := make([]common.Hash, 0)
for i := 0; i < 3; i++ { for i := 0; i < 20; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) { switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions: case *Transactions:
for _, tx := range *msg { for _, tx := range *msg {

View File

@ -262,57 +262,79 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// direct request replies. The differentiation is important so the fetcher can // direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible. // re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
// Keep track of all the propagated transactions var (
if direct { inMeter = txReplyInMeter
txReplyInMeter.Mark(int64(len(txs))) knownMeter = txReplyKnownMeter
} else { underpricedMeter = txReplyUnderpricedMeter
txBroadcastInMeter.Mark(int64(len(txs))) otherRejectMeter = txReplyOtherRejectMeter
)
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
} }
// Keep track of all the propagated transactions
inMeter.Mark(int64(len(txs)))
// Push all the transactions into the pool, tracking underpriced ones to avoid // Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers. // re-requesting them and dropping the peer in case of malicious transfers.
var ( var (
added = make([]common.Hash, 0, len(txs)) added = make([]common.Hash, 0, len(txs))
duplicate int64 delay time.Duration
underpriced int64
otherreject int64
) )
errs := f.addTxs(txs) // proceed in batches
for i, err := range errs { for i := 0; i < len(txs); i += 128 {
// Track the transaction hash if the price is too low for us. end := i + 128
// Avoid re-request this transaction when we receive another if end > len(txs) {
// announcement. end = len(txs)
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) { }
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { var (
f.underpriced.Pop() duplicate int64
underpriced int64
otherreject int64
)
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
} }
f.underpriced.Add(txs[i].Hash()) // Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
case errors.Is(err, core.ErrAlreadyKnown):
duplicate++
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
underpriced++
default:
otherreject++
}
added = append(added, batch[j].Hash())
} }
// Track a few interesting failure types knownMeter.Mark(duplicate)
switch { underpricedMeter.Mark(underpriced)
case err == nil: // Noop, but need to handle to not count these otherRejectMeter.Mark(otherreject)
case errors.Is(err, core.ErrAlreadyKnown): // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
duplicate++ // out of sync with the chain or the peer is griefing us.
if otherreject > 128/4 {
case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced): delay = 200 * time.Millisecond
underpriced++ log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end)
break
default:
otherreject++
} }
added = append(added, txs[i].Hash())
}
if direct {
txReplyKnownMeter.Mark(duplicate)
txReplyUnderpricedMeter.Mark(underpriced)
txReplyOtherRejectMeter.Mark(otherreject)
} else {
txBroadcastKnownMeter.Mark(duplicate)
txBroadcastUnderpricedMeter.Mark(underpriced)
txBroadcastOtherRejectMeter.Mark(otherreject)
} }
select { select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
time.Sleep(delay)
return nil return nil
case <-f.quit: case <-f.quit:
return errTerminated return errTerminated