From e86619e75d1bd1209818ab4df2fac52e3c43b5e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 19 Apr 2016 12:27:37 +0300 Subject: [PATCH] eth/downloader: stream partial skeleton filling to processor --- eth/downloader/downloader.go | 38 ++++++++++++++++++++----------- eth/downloader/downloader_test.go | 1 + eth/downloader/queue.go | 35 +++++++++++++++++++++++----- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 2b2de1b5f..2f79c2dfd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,7 +54,7 @@ var ( blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) - headerTTL = 2 * time.Second // [eth/62] Time it takes for a header request to time out + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request @@ -1064,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -1226,21 +1226,24 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // If we received a skeleton batch, resolve internals concurrently if skeleton { - filled, err := d.fillHeaderSkeleton(from, headers) + filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err) return errInvalidChain } - headers = filled + headers = filled[proced:] + from += uint64(proced) } // Insert all the new headers and fetch the next batch - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) - select { - case d.headerProcCh <- headers: - case <-d.cancelCh: - return errCancelHeaderFetch + if len(headers) > 0 { + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCancelHeaderFetch + } + from += uint64(len(headers)) } - from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1272,14 +1275,21 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // fillHeaderSkeleton concurrently retrieves headers from all our available peers // and maps them to the provided skeleton header chain. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, error) { +// +// Any partial results from the beginning of the skeleton is (if possible) forwarded +// immediately to the header processor to keep the rest of the pipeline full even +// in the case of header stalls. +// +// The method returs the entire filled skeleton and also the number of headers +// already forwarded for processing. +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from) d.queue.ScheduleSkeleton(from, skeleton) var ( deliver = func(packet dataPack) (int, error) { pack := packet.(*headerPack) - return d.queue.DeliverHeaders(pack.peerId, pack.headers) + return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) } expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } throttle = func() bool { return false } @@ -1295,7 +1305,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header") glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err) - return d.queue.RetrieveHeaders(), err + + filled, proced := d.queue.RetrieveHeaders() + return filled, proced, err } // fetchBodies iteratively downloads the scheduled block bodies, taking any diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c013f3d2c..4ea8a8abe 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1258,6 +1258,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { // rolled back, and also the pivot point being reverted to a non-block status. tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 + delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) // Make sure the fast-attacker doesn't fill in delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing]) if err := tester.sync("block-attack", nil, mode); err == nil { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 1f46d0a4a..dd839de19 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -87,6 +87,7 @@ type queue struct { headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results headerOffset uint64 // [eth/62] Number of the first header in the result cache headerContCh chan bool // [eth/62] Channel to notify when header download finishes @@ -365,6 +366,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { q.headerTaskQueue = prque.New() q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -378,14 +380,14 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() []*types.Header { +func (q *queue) RetrieveHeaders() ([]*types.Header, int) { q.lock.Lock() defer q.lock.Unlock() - headers := q.headerResults - q.headerResults = nil + headers, proced := q.headerResults, q.headerProced + q.headerResults, q.headerProced = nil, 0 - return headers + return headers, proced } // Schedule adds a set of headers for the download queue for scheduling, returning @@ -976,7 +978,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { // DeliverHeaders injects a header retrieval response into the header results // cache. This method either accepts all headers it received, or none of them // if they do not map correctly to the skeleton. -func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error) { +// +// If the headers are accepted, the method makes an attempt to deliver the set +// of ready headers to the processor to keep the pipeline full. However it will +// not block to prevent stalling other pending deliveries. +func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -1030,10 +1036,27 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header) (int, error) q.headerTaskQueue.Push(request.From, -float32(request.From)) return 0, errors.New("delivery not accepted") } - // Clean up a successful fetch, check for termination and return + // Clean up a successful fetch and try to deliver any sub-results copy(q.headerResults[request.From-q.headerOffset:], headers) delete(q.headerTaskPool, request.From) + ready := 0 + for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil { + ready += MaxHeaderFetch + } + if ready > 0 { + // Headers are ready for delivery, gather them and push forward (non blocking) + process := make([]*types.Header, ready) + copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) + + select { + case headerProcCh <- process: + glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number) + q.headerProced += len(process) + default: + } + } + // Check for termination and return if len(q.headerTaskPool) == 0 { q.headerContCh <- false }