Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
Showing only changes of commit 111a1b73cf - Show all commits

View File

@ -477,10 +477,21 @@ func (f *BlockFetcher) loop() {
} }
defer req.Close() defer req.Close()
res := <-resCh timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer
res.Done <- nil defer timeout.Stop()
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time)) select {
case res := <-resCh:
res.Done <- nil
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time))
case <-timeout.C:
// The peer didn't respond in time. The request
// was already rescheduled at this point, we were
// waiting for a catchup. With an unresponsive
// peer however, it's a protocol violation.
f.dropPeer(peer)
}
}(hash) }(hash)
} }
}(peer) }(peer)
@ -523,11 +534,23 @@ func (f *BlockFetcher) loop() {
} }
defer req.Close() defer req.Close()
res := <-resCh timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer
res.Done <- nil defer timeout.Stop()
txs, uncles := res.Res.(*eth.BlockBodiesPacket).Unpack() select {
f.FilterBodies(peer, txs, uncles, time.Now()) case res := <-resCh:
res.Done <- nil
txs, uncles := res.Res.(*eth.BlockBodiesPacket).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())
case <-timeout.C:
// The peer didn't respond in time. The request
// was already rescheduled at this point, we were
// waiting for a catchup. With an unresponsive
// peer however, it's a protocol violation.
f.dropPeer(peer)
}
}(peer, hashes) }(peer, hashes)
} }
// Schedule the next fetch if blocks are still pending // Schedule the next fetch if blocks are still pending