From 7e2bbb9cbb14d32edeaeacaf0f645f4018bca7f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 6 Apr 2022 10:18:57 +0300 Subject: [PATCH] eth/fetcher: if peers never respond, drop them --- eth/fetcher/block_fetcher.go | 37 +++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 247d0eac6..d75ba3f8e 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -477,10 +477,21 @@ func (f *BlockFetcher) loop() { } defer req.Close() - res := <-resCh - res.Done <- nil + timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer + defer timeout.Stop() - f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time)) + select { + case res := <-resCh: + res.Done <- nil + f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time)) + + case <-timeout.C: + // The peer didn't respond in time. The request + // was already rescheduled at this point, we were + // waiting for a catchup. With an unresponsive + // peer however, it's a protocol violation. + f.dropPeer(peer) + } }(hash) } }(peer) @@ -523,11 +534,23 @@ func (f *BlockFetcher) loop() { } defer req.Close() - res := <-resCh - res.Done <- nil + timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer + defer timeout.Stop() - txs, uncles := res.Res.(*eth.BlockBodiesPacket).Unpack() - f.FilterBodies(peer, txs, uncles, time.Now()) + select { + case res := <-resCh: + res.Done <- nil + + txs, uncles := res.Res.(*eth.BlockBodiesPacket).Unpack() + f.FilterBodies(peer, txs, uncles, time.Now()) + + case <-timeout.C: + // The peer didn't respond in time. The request + // was already rescheduled at this point, we were + // waiting for a catchup. With an unresponsive + // peer however, it's a protocol violation. + f.dropPeer(peer) + } }(peer, hashes) } // Schedule the next fetch if blocks are still pending