From c89348834957553d8cc60159ec718a12aa1ef97f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 1 Dec 2021 20:18:12 +0200 Subject: [PATCH] eth: pre-process downloader responses on the peer reader thread --- eth/downloader/downloader.go | 72 ++++++++++++------- eth/downloader/downloader_test.go | 26 +++++++ eth/downloader/fetchers.go | 20 +++--- eth/downloader/fetchers_concurrent_bodies.go | 3 +- eth/downloader/fetchers_concurrent_headers.go | 4 +- .../fetchers_concurrent_receipts.go | 3 +- eth/downloader/queue.go | 59 ++++++++------- eth/downloader/queue_test.go | 62 ++++++++++++---- eth/protocols/eth/dispatcher.go | 8 ++- eth/protocols/eth/handlers.go | 35 +++++++-- 10 files changed, 207 insertions(+), 85 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95b90d3b5..6b262b5ec 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -85,6 +85,13 @@ var ( // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +// headerTask is a set of downloaded headers to queue along with their precomputed +// hashes to avoid constant rehashing. +type headerTask struct { + headers []*types.Header + hashes []common.Hash +} + type Downloader struct { mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -116,7 +123,7 @@ type Downloader struct { ancientLimit uint64 // The maximum block number which can be regarded as ancient data. // Channels - headerProcCh chan []*types.Header // Channel to feed the header processor new tasks + headerProcCh chan *headerTask // Channel to feed the header processor new tasks // State sync pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root @@ -210,7 +217,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, blockchain: chain, lightchain: lightchain, dropPeer: dropPeer, - headerProcCh: make(chan []*types.Header, 1), + headerProcCh: make(chan *headerTask, 1), quitCh: make(chan struct{}), SnapSyncer: snap.NewSyncer(stateDb), stateSyncStart: make(chan *stateSync), @@ -626,7 +633,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty if mode == SnapSync { fetch = 2 // head + pivot headers } - headers, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true) + headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true) if err != nil { return nil, nil, err } @@ -645,7 +652,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) { return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer) } - p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash()) + p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0]) return head, nil, nil } // At this point we have 2 headers in total and the first is the @@ -784,7 +791,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip) - headers, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false) + headers, hashes, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false) if err != nil { return 0, err } @@ -811,7 +818,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re continue } // Otherwise check if we already know the header or not - h := headers[i].Hash() + h := hashes[i] n := headers[i].Number.Uint64() var known bool @@ -854,7 +861,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 - headers, err := d.fetchHeadersByNumber(p, check, 1, 0, false) + headers, hashes, err := d.fetchHeadersByNumber(p, check, 1, 0, false) if err != nil { return 0, err } @@ -864,7 +871,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) } // Modify the search interval based on the response - h := headers[0].Hash() + h := hashes[0] n := headers[0].Number.Uint64() var known bool @@ -923,6 +930,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // - Full header retrieval if we're near the chain head var ( headers []*types.Header + hashes []common.Hash err error ) switch { @@ -932,15 +940,15 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e d.pivotLock.RUnlock() p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) - headers, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep + headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep case skeleton: p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - headers, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + headers, hashes, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) default: p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - headers, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false) + headers, hashes, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false) } switch err { case nil: @@ -1038,12 +1046,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // If we received a skeleton batch, resolve internals concurrently var progressed bool if skeleton { - filled, proced, err := d.fillHeaderSkeleton(from, headers) + filled, hashset, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return fmt.Errorf("%w: %v", errInvalidChain, err) } headers = filled[proced:] + hashes = hashset[proced:] + progressed = proced > 0 from += uint64(proced) } else { @@ -1079,6 +1089,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e delay = n } headers = headers[:n-delay] + hashes = hashes[:n-delay] } } } @@ -1098,7 +1109,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e if len(headers) > 0 { p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) select { - case d.headerProcCh <- headers: + case d.headerProcCh <- &headerTask{ + headers: headers, + hashes: hashes, + }: case <-d.cancelCh: return errCanceled } @@ -1121,7 +1135,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // // The method returns the entire filled skeleton and also the number of headers // already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, []common.Hash, int, error) { log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) @@ -1129,11 +1143,11 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( if err != nil { log.Debug("Skeleton fill failed", "err", err) } - filled, proced := d.queue.RetrieveHeaders() + filled, hashes, proced := d.queue.RetrieveHeaders() if err == nil { log.Debug("Skeleton fill succeeded", "filled", len(filled), "processed", proced) } - return filled, proced, err + return filled, hashes, proced, err } // fetchBodies iteratively downloads the scheduled block bodies, taking any @@ -1199,9 +1213,9 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollbackErr = errCanceled return errCanceled - case headers := <-d.headerProcCh: + case task := <-d.headerProcCh: // Terminate header processing if we synced up - if len(headers) == 0 { + if task == nil || len(task.headers) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { select { @@ -1245,6 +1259,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { return nil } // Otherwise split the chunk of headers into batches and process them + headers, hashes := task.headers, task.hashes + gotHeaders = true for len(headers) > 0 { // Terminate if something failed in between processing chunks @@ -1259,7 +1275,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if limit > len(headers) { limit = len(headers) } - chunk := headers[:limit] + chunkHeaders := headers[:limit] + chunkHashes := hashes[:limit] // In case of header only syncing, validate the chunk immediately if mode == SnapSync || mode == LightSync { @@ -1273,22 +1290,22 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { d.pivotLock.RUnlock() frequency := fsHeaderCheckFrequency - if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { + if chunkHeaders[len(chunkHeaders)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } - if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + if n, err := d.lightchain.InsertHeaderChain(chunkHeaders, frequency); err != nil { rollbackErr = err // If some headers were inserted, track them as uncertain if (mode == SnapSync || frequency > 1) && n > 0 && rollback == 0 { - rollback = chunk[0].Number.Uint64() + rollback = chunkHeaders[0].Number.Uint64() } - log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err) + log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) return fmt.Errorf("%w: %v", errInvalidChain, err) } // All verifications passed, track all headers within the alloted limits if mode == SnapSync { - head := chunk[len(chunk)-1].Number.Uint64() + head := chunkHeaders[len(chunkHeaders)-1].Number.Uint64() if head-rollback > uint64(fsHeaderSafetyNet) { rollback = head - uint64(fsHeaderSafetyNet) } else { @@ -1308,13 +1325,14 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } // Otherwise insert the headers for content retrieval - inserts := d.queue.Schedule(chunk, origin) - if len(inserts) != len(chunk) { - rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk)) + inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin) + if len(inserts) != len(chunkHeaders) { + rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders)) return fmt.Errorf("%w: stale headers", errBadPeer) } } headers = headers[limit:] + hashes = hashes[limit:] origin += uint64(limit) } // Update the highest block number we know if a higher one is found. diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e6e092b22..f62a5d028 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -177,6 +177,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i } } } + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } // Deliver the headers to the downloader req := ð.Request{ Peer: dlp.id, @@ -184,6 +188,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i res := ð.Response{ Req: req, Res: (*eth.BlockHeadersPacket)(&headers), + Meta: hashes, Time: 1, Done: make(chan error, 1), // Ignore the returned status } @@ -216,6 +221,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, } } } + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } // Deliver the headers to the downloader req := ð.Request{ Peer: dlp.id, @@ -223,6 +232,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, res := ð.Response{ Req: req, Res: (*eth.BlockHeadersPacket)(&headers), + Meta: hashes, Time: 1, Done: make(chan error, 1), // Ignore the returned status } @@ -243,12 +253,22 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et bodies[i] = new(eth.BlockBody) rlp.DecodeBytes(blob, bodies[i]) } + var ( + txsHashes = make([]common.Hash, len(bodies)) + uncleHashes = make([]common.Hash, len(bodies)) + ) + hasher := trie.NewStackTrie(nil) + for i, body := range bodies { + txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher) + uncleHashes[i] = types.CalcUncleHash(body.Uncles) + } req := ð.Request{ Peer: dlp.id, } res := ð.Response{ Req: req, Res: (*eth.BlockBodiesPacket)(&bodies), + Meta: [][]common.Hash{txsHashes, uncleHashes}, Time: 1, Done: make(chan error, 1), // Ignore the returned status } @@ -268,12 +288,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan * for i, blob := range blobs { rlp.DecodeBytes(blob, &receipts[i]) } + hasher := trie.NewStackTrie(nil) + hashes = make([]common.Hash, len(receipts)) + for i, receipt := range receipts { + hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) + } req := ð.Request{ Peer: dlp.id, } res := ð.Response{ Req: req, Res: (*eth.ReceiptsPacket)(&receipts), + Meta: hashes, Time: 1, Done: make(chan error, 1), // Ignore the returned status } diff --git a/eth/downloader/fetchers.go b/eth/downloader/fetchers.go index fe309b179..021e8c4f9 100644 --- a/eth/downloader/fetchers.go +++ b/eth/downloader/fetchers.go @@ -27,14 +27,14 @@ import ( // fetchHeadersByHash is a blocking version of Peer.RequestHeadersByHash which // handles all the cancellation, interruption and timeout mechanisms of a data // retrieval to allow blocking API calls. -func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, error) { +func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) { // Create the response sink and send the network request start := time.Now() resCh := make(chan *eth.Response) req, err := p.peer.RequestHeadersByHash(hash, amount, skip, reverse, resCh) if err != nil { - return nil, err + return nil, nil, err } defer req.Close() @@ -46,14 +46,14 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo select { case <-d.cancelCh: - return nil, errCanceled + return nil, nil, errCanceled case <-timeoutTimer.C: // Header retrieval timed out, update the metrics p.log.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) - return nil, errTimeout + return nil, nil, errTimeout case res := <-resCh: // Headers successfully retrieved, update the metrics @@ -65,21 +65,21 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo // be processed by the caller res.Done <- nil - return *res.Res.(*eth.BlockHeadersPacket), nil + return *res.Res.(*eth.BlockHeadersPacket), res.Meta.([]common.Hash), nil } } // fetchHeadersByNumber is a blocking version of Peer.RequestHeadersByNumber which // handles all the cancellation, interruption and timeout mechanisms of a data // retrieval to allow blocking API calls. -func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amount int, skip int, reverse bool) ([]*types.Header, error) { +func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) { // Create the response sink and send the network request start := time.Now() resCh := make(chan *eth.Response) req, err := p.peer.RequestHeadersByNumber(number, amount, skip, reverse, resCh) if err != nil { - return nil, err + return nil, nil, err } defer req.Close() @@ -91,14 +91,14 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou select { case <-d.cancelCh: - return nil, errCanceled + return nil, nil, errCanceled case <-timeoutTimer.C: // Header retrieval timed out, update the metrics p.log.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) - return nil, errTimeout + return nil, nil, errTimeout case res := <-resCh: // Headers successfully retrieved, update the metrics @@ -110,6 +110,6 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou // be processed by the caller res.Done <- nil - return *res.Res.(*eth.BlockHeadersPacket), nil + return *res.Res.(*eth.BlockHeadersPacket), res.Meta.([]common.Hash), nil } } diff --git a/eth/downloader/fetchers_concurrent_bodies.go b/eth/downloader/fetchers_concurrent_bodies.go index 259811f57..a8de41032 100644 --- a/eth/downloader/fetchers_concurrent_bodies.go +++ b/eth/downloader/fetchers_concurrent_bodies.go @@ -90,8 +90,9 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan // fetcher, unpacking the body data and delivering it to the downloader's queue. func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { txs, uncles := packet.Res.(*eth.BlockBodiesPacket).Unpack() + hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes} - accepted, err := q.queue.DeliverBodies(peer.id, txs, uncles) + accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1]) switch { case err == nil && len(txs) == 0: peer.log.Trace("Requested bodies delivered") diff --git a/eth/downloader/fetchers_concurrent_headers.go b/eth/downloader/fetchers_concurrent_headers.go index f30b279d2..bd3bb3e00 100644 --- a/eth/downloader/fetchers_concurrent_headers.go +++ b/eth/downloader/fetchers_concurrent_headers.go @@ -19,6 +19,7 @@ package downloader import ( "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" ) @@ -81,8 +82,9 @@ func (q *headerQueue) request(peer *peerConnection, req *fetchRequest, resCh cha // fetcher, unpacking the header data and delivering it to the downloader's queue. func (q *headerQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { headers := *packet.Res.(*eth.BlockHeadersPacket) + hashes := packet.Meta.([]common.Hash) - accepted, err := q.queue.DeliverHeaders(peer.id, headers, q.headerProcCh) + accepted, err := q.queue.DeliverHeaders(peer.id, headers, hashes, q.headerProcCh) switch { case err == nil && len(headers) == 0: peer.log.Trace("Requested headers delivered") diff --git a/eth/downloader/fetchers_concurrent_receipts.go b/eth/downloader/fetchers_concurrent_receipts.go index ce935f0a8..fee2c3410 100644 --- a/eth/downloader/fetchers_concurrent_receipts.go +++ b/eth/downloader/fetchers_concurrent_receipts.go @@ -89,8 +89,9 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch // fetcher, unpacking the receipt data and delivering it to the downloader's queue. func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { receipts := *packet.Res.(*eth.ReceiptsPacket) + hashes := packet.Meta.([]common.Hash) // {receipt hashes} - accepted, err := q.queue.DeliverReceipts(peer.id, receipts) + accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes) switch { case err == nil && len(receipts) == 0: peer.log.Trace("Requested receipts delivered") diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c0498af69..ff34d932f 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/trie" ) const ( @@ -119,6 +118,7 @@ type queue struct { headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations headerResults []*types.Header // Result cache accumulating the completed headers + headerHashes []common.Hash // Result cache accumulating the completed header hashes headerProced int // Number of headers already processed from the results headerOffset uint64 // Number of the first header in the result cache headerContCh chan bool // Channel to notify when header download finishes @@ -260,6 +260,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -274,27 +275,27 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, int) { +func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) { q.lock.Lock() defer q.lock.Unlock() - headers, proced := q.headerResults, q.headerProced - q.headerResults, q.headerProced = nil, 0 + headers, hashes, proced := q.headerResults, q.headerHashes, q.headerProced + q.headerResults, q.headerHashes, q.headerProced = nil, nil, 0 - return headers, proced + return headers, hashes, proced } // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() // Insert all the headers prioritised by the contained block number inserts := make([]*types.Header, 0, len(headers)) - for _, header := range headers { + for i, header := range headers { // Make sure chain order is honoured and preserved throughout - hash := header.Hash() + hash := hashes[i] if header.Number == nil || header.Number.Uint64() != from { log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from) break @@ -656,7 +657,7 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue // If the headers are accepted, the method makes an attempt to deliver the set // of ready headers to the processor to keep the pipeline full. However, it will // not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { +func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []common.Hash, headerProcCh chan *headerTask) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -684,17 +685,17 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh accepted := len(headers) == MaxHeaderFetch if accepted { if headers[0].Number.Uint64() != request.From { - logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", headers[0].Hash(), "expected", request.From) + logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", hashes[0], "expected", request.From) accepted = false - } else if headers[len(headers)-1].Hash() != target { - logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target) + } else if hashes[len(headers)-1] != target { + logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", hashes[len(headers)-1], "expected", target) accepted = false } } if accepted { - parentHash := headers[0].Hash() + parentHash := hashes[0] for i, header := range headers[1:] { - hash := header.Hash() + hash := hashes[i+1] if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want) accepted = false @@ -726,6 +727,8 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } // Clean up a successful fetch and try to deliver any sub-results copy(q.headerResults[request.From-q.headerOffset:], headers) + copy(q.headerHashes[request.From-q.headerOffset:], hashes) + delete(q.headerTaskPool, request.From) ready := 0 @@ -734,13 +737,19 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } if ready > 0 { // Headers are ready for delivery, gather them and push forward (non blocking) - process := make([]*types.Header, ready) - copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) + processHeaders := make([]*types.Header, ready) + copy(processHeaders, q.headerResults[q.headerProced:q.headerProced+ready]) + + processHashes := make([]common.Hash, ready) + copy(processHashes, q.headerHashes[q.headerProced:q.headerProced+ready]) select { - case headerProcCh <- process: - logger.Trace("Pre-scheduled new headers", "count", len(process), "from", process[0].Number) - q.headerProced += len(process) + case headerProcCh <- &headerTask{ + headers: processHeaders, + hashes: processHashes, + }: + logger.Trace("Pre-scheduled new headers", "count", len(processHeaders), "from", processHeaders[0].Number) + q.headerProced += len(processHeaders) default: } } @@ -754,16 +763,15 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh // DeliverBodies injects a block body retrieval response into the results queue. // The method returns the number of blocks bodies accepted from the delivery and // also wakes any threads waiting for data delivery. -func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash, uncleLists [][]*types.Header, uncleListHashes []common.Hash) (int, error) { q.lock.Lock() defer q.lock.Unlock() - trieHasher := trie.NewStackTrie(nil) validate := func(index int, header *types.Header) error { - if types.DeriveSha(types.Transactions(txLists[index]), trieHasher) != header.TxHash { + if txListHashes[index] != header.TxHash { return errInvalidBody } - if types.CalcUncleHash(uncleLists[index]) != header.UncleHash { + if uncleListHashes[index] != header.UncleHash { return errInvalidBody } return nil @@ -781,13 +789,12 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi // DeliverReceipts injects a receipt retrieval response into the results queue. // The method returns the number of transaction receipts accepted from the delivery // and also wakes any threads waiting for data delivery. -func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, receiptListHashes []common.Hash) (int, error) { q.lock.Lock() defer q.lock.Unlock() - trieHasher := trie.NewStackTrie(nil) validate := func(index int, header *types.Header) error { - if types.DeriveSha(types.Receipts(receiptList[index]), trieHasher) != header.ReceiptHash { + if receiptListHashes[index] != header.ReceiptHash { return errInvalidReceipt } return nil diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 9dc27a593..f729def67 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) var ( @@ -110,7 +111,12 @@ func TestBasics(t *testing.T) { } // Schedule a batch of headers - q.Schedule(chain.headers(), 1) + headers := chain.headers() + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + q.Schedule(headers, hashes, 1) if q.Idle() { t.Errorf("queue should not be idle") } @@ -198,8 +204,14 @@ func TestEmptyBlocks(t *testing.T) { q := newQueue(10, 10) q.Prepare(1, SnapSync) + // Schedule a batch of headers - q.Schedule(emptyChain.headers(), 1) + headers := emptyChain.headers() + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + q.Schedule(headers, hashes, 1) if q.Idle() { t.Errorf("queue should not be idle") } @@ -280,11 +292,15 @@ func XTestDelivery(t *testing.T) { c := 1 for { //fmt.Printf("getting headers from %d\n", c) - hdrs := world.headers(c) - l := len(hdrs) + headers := world.headers(c) + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + l := len(headers) //fmt.Printf("scheduling %d headers, first %d last %d\n", - // l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64()) - q.Schedule(hdrs, uint64(c)) + // l, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64()) + q.Schedule(headers, hashes, uint64(c)) c += l } }() @@ -311,18 +327,31 @@ func XTestDelivery(t *testing.T) { peer := dummyPeer(fmt.Sprintf("peer-%d", i)) f, _, _ := q.ReserveBodies(peer, rand.Intn(30)) if f != nil { - var emptyList []*types.Header - var txs [][]*types.Transaction - var uncles [][]*types.Header + var ( + emptyList []*types.Header + txset [][]*types.Transaction + uncleset [][]*types.Header + ) numToSkip := rand.Intn(len(f.Headers)) for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] { - txs = append(txs, world.getTransactions(hdr.Number.Uint64())) - uncles = append(uncles, emptyList) + txset = append(txset, world.getTransactions(hdr.Number.Uint64())) + uncleset = append(uncleset, emptyList) + } + var ( + txsHashes = make([]common.Hash, len(txset)) + uncleHashes = make([]common.Hash, len(uncleset)) + ) + hasher := trie.NewStackTrie(nil) + for i, txs := range txset { + txsHashes[i] = types.DeriveSha(types.Transactions(txs), hasher) + } + for i, uncles := range uncleset { + uncleHashes[i] = types.CalcUncleHash(uncles) } time.Sleep(100 * time.Millisecond) - _, err := q.DeliverBodies(peer.id, txs, uncles) + _, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes) if err != nil { - fmt.Printf("delivered %d bodies %v\n", len(txs), err) + fmt.Printf("delivered %d bodies %v\n", len(txset), err) } } else { i++ @@ -341,7 +370,12 @@ func XTestDelivery(t *testing.T) { for _, hdr := range f.Headers { rcs = append(rcs, world.getReceipts(hdr.Number.Uint64())) } - _, err := q.DeliverReceipts(peer.id, rcs) + hasher := trie.NewStackTrie(nil) + hashes := make([]common.Hash, len(rcs)) + for i, receipt := range rcs { + hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) + } + _, err := q.DeliverReceipts(peer.id, rcs, hashes) if err != nil { fmt.Printf("delivered %d receipts %v\n", len(rcs), err) } diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index d2e4f488c..bf88d400d 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -102,6 +102,7 @@ type Response struct { Req *Request // Original request to cross-reference with Res interface{} // Remote response for the request query + Meta interface{} // Metadata generated locally on the receiver thread Time time.Duration // Time it took for the request to be served Done chan error // Channel to signal message handling to the reader } @@ -137,7 +138,7 @@ func (p *Peer) dispatchRequest(req *Request) error { // dispatchRequest fulfils a pending request and delivers it to the requested // sink. -func (p *Peer) dispatchResponse(res *Response) error { +func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error { resOp := &response{ res: res, fail: make(chan error), @@ -151,6 +152,11 @@ func (p *Peer) dispatchResponse(res *Response) error { if err := <-resOp.fail; err != nil { return nil } + // Request was accepted, run any postprocessing step to generate metadata + // on the receiver thread, not the sink thread + if metadata != nil { + res.Meta = metadata() + } // Deliver the filled out response and wait until it's handled. This // path is a bit funky as Go's select has no order, so if a response // arrives to an already cancelled request, there's a 50-50% changes diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index aa5245679..0ed8a8eba 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -286,11 +286,18 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + metadata := func() interface{} { + hashes := make([]common.Hash, len(res.BlockHeadersPacket)) + for i, header := range res.BlockHeadersPacket { + hashes[i] = header.Hash() + } + return hashes + } return peer.dispatchResponse(&Response{ id: res.RequestId, code: BlockHeadersMsg, Res: &res.BlockHeadersPacket, - }) + }, metadata) } func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { @@ -299,11 +306,23 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + metadata := func() interface{} { + var ( + txsHashes = make([]common.Hash, len(res.BlockBodiesPacket)) + uncleHashes = make([]common.Hash, len(res.BlockBodiesPacket)) + ) + hasher := trie.NewStackTrie(nil) + for i, body := range res.BlockBodiesPacket { + txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher) + uncleHashes[i] = types.CalcUncleHash(body.Uncles) + } + return [][]common.Hash{txsHashes, uncleHashes} + } return peer.dispatchResponse(&Response{ id: res.RequestId, code: BlockBodiesMsg, Res: &res.BlockBodiesPacket, - }) + }, metadata) } func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { @@ -316,7 +335,7 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { id: res.RequestId, code: NodeDataMsg, Res: &res.NodeDataPacket, - }) + }, nil) // No post-processing, we're not using this packet anymore } func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { @@ -325,11 +344,19 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + metadata := func() interface{} { + hasher := trie.NewStackTrie(nil) + hashes := make([]common.Hash, len(res.ReceiptsPacket)) + for i, receipt := range res.ReceiptsPacket { + hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) + } + return hashes + } return peer.dispatchResponse(&Response{ id: res.RequestId, code: ReceiptsMsg, Res: &res.ReceiptsPacket, - }) + }, metadata) } func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {