From 8906b2fe0934c67ebb1db5d4d77acdf1a7e988f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 17 May 2016 11:12:57 +0300 Subject: [PATCH] eth/downloader: fix reviewer comments --- eth/downloader/downloader.go | 37 +++++++++++++++++++++++++++++------- eth/downloader/queue.go | 4 ++-- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 2f79c2dfd..74bff2b66 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -65,7 +65,7 @@ var ( maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxHeadersProcess = 2048 // Number of header download results to import at once into the chain - maxResultsProcess = 4096 // Number of content download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected @@ -716,9 +716,9 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { getHashes := func(from uint64) { glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) - go p.getAbsHashes(from, MaxHashFetch) request = time.Now() timeout.Reset(hashTTL) + go p.getAbsHashes(from, MaxHashFetch) } // Start pulling hashes, until all are exhausted getHashes(from) @@ -1168,7 +1168,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // facilitate concurrency but still protect against malicious nodes sending bad // headers, we construct a header chain skeleton using the "origin" peer we are // syncing with, and fill in the missing headers using anyone else. Headers from -// other peers are only accepted if they map cleanly to the skeleton. If noone +// other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. func (d *Downloader) fetchHeaders(p *peer, from uint64) error { @@ -1183,6 +1183,9 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { defer timeout.Stop() getHeaders := func(from uint64) { + request = time.Now() + timeout.Reset(headerTTL) + if skeleton { glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) @@ -1190,8 +1193,6 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from) go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) } - request = time.Now() - timeout.Reset(headerTTL) } // Start pulling the header chain skeleton until all is done getHeaders(from) @@ -1413,6 +1414,28 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. +// +// As the scheduling/timeout logic mostly is the same for all downloaded data +// types, this method is used by each for data gathering and is instrumented with +// various callbacks to handle the slight differences between processing them. +// +// The instrumentation parameters: +// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer) +// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers) +// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`) +// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed) +// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping) +// - pending: task callback for the number of requests still needing download (detect completion/non-completability) +// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish) +// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use) +// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions) +// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic) +// - fetch: network callback to actually send a particular download request to a physical remote peer +// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer) +// - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping) +// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks +// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) +// - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, @@ -1581,10 +1604,10 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { for i, header := range rollback { hashes[i] = header.Hash() } - lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() + lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() d.rollback(hashes) glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", - len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number()) + len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) // If we're already past the pivot point, this could be an attack, disable fast sync if rollback[len(rollback)-1].Number.Uint64() > pivot { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index dd839de19..195eae4ff 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -39,8 +39,8 @@ import ( ) var ( - blockCacheLimit = 16384 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently + blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download + maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently ) var (