Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
4 changed files with 69 additions and 43 deletions
Showing only changes of commit 7f2890a9be - Show all commits

View File

@ -357,9 +357,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
}
return nil
case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue
default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
}

View File

@ -544,9 +544,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket))
}
return
// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue
// ignore block announcements from previous tests
case *NewBlockHashes:
continue

View File

@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
nonce = txs[len(txs)-1].Nonce()
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within 3 announcements.
// all txs should be announced within a couple announcements.
recvHashes := make([]common.Hash, 0)
for i := 0; i < 3; i++ {
for i := 0; i < 20; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions:
for _, tx := range *msg {

View File

@ -262,22 +262,39 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
// Keep track of all the propagated transactions
if direct {
txReplyInMeter.Mark(int64(len(txs)))
} else {
txBroadcastInMeter.Mark(int64(len(txs)))
var (
inMeter = txReplyInMeter
knownMeter = txReplyKnownMeter
underpricedMeter = txReplyUnderpricedMeter
otherRejectMeter = txReplyOtherRejectMeter
)
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
}
// Keep track of all the propagated transactions
inMeter.Mark(int64(len(txs)))
// Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
)
// proceed in batches
for i := 0; i < len(txs); i += 128 {
end := i + 128
if end > len(txs) {
end = len(txs)
}
var (
duplicate int64
underpriced int64
otherreject int64
)
errs := f.addTxs(txs)
for i, err := range errs {
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
@ -285,7 +302,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(txs[i].Hash())
f.underpriced.Add(batch[j].Hash())
}
// Track a few interesting failure types
switch {
@ -300,16 +317,17 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
default:
otherreject++
}
added = append(added, txs[i].Hash())
added = append(added, batch[j].Hash())
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
otherRejectMeter.Mark(otherreject)
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
if otherreject > 128/4 {
time.Sleep(200 * time.Millisecond)
log.Warn("Peer delivering stale transactions", "peer", peer, "rejected", otherreject)
}
if direct {
txReplyKnownMeter.Mark(duplicate)
txReplyUnderpricedMeter.Mark(underpriced)
txReplyOtherRejectMeter.Mark(otherreject)
} else {
txBroadcastKnownMeter.Mark(duplicate)
txBroadcastUnderpricedMeter.Mark(underpriced)
txBroadcastOtherRejectMeter.Mark(otherreject)
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: