From 3e795881ea6d68c32da5da3c95f0d458a64e35c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 19 May 2021 15:09:03 +0300 Subject: [PATCH] eth, p2p/msgrate: move peer QoS tracking to its own package and use it for snap (#22876) This change extracts the peer QoS tracking logic from eth/downloader, moving it into the new package p2p/msgrate. The job of msgrate.Tracker is determining suitable timeout values and request sizes per peer. The snap sync scheduler now uses msgrate.Tracker instead of the hard-coded 15s timeout. This should make the sync work better on network links with high latency. --- eth/downloader/downloader.go | 119 +-------- eth/downloader/peer.go | 171 ++++-------- eth/downloader/peer_test.go | 53 ---- eth/downloader/statesync.go | 4 +- eth/protocols/snap/sync.go | 331 +++++++++++++++-------- eth/protocols/snap/sync_test.go | 18 +- p2p/msgrate/msgrate.go | 458 ++++++++++++++++++++++++++++++++ 7 files changed, 745 insertions(+), 409 deletions(-) delete mode 100644 eth/downloader/peer_test.go create mode 100644 p2p/msgrate/msgrate.go diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6f59b29a5..e8a4a76ca 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -47,16 +47,6 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests - rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests - rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value - ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion - ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts - - qosTuningPeers = 5 // Number of peers to tune based on (best peers) - qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence - qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value - maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain @@ -96,13 +86,6 @@ var ( ) type Downloader struct { - // WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically. - // On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is - // guaranteed to be so aligned, so take advantage of that. For more information, - // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG. - rttEstimate uint64 // Round trip time to target for download requests - rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) - mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -232,8 +215,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, checkpoint: checkpoint, queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), peers: newPeerSet(), - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), blockchain: chain, lightchain: lightchain, dropPeer: dropPeer, @@ -252,7 +233,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, }, trackStateReq: make(chan *stateReq), } - go dl.qosTuner() go dl.stateFetcher() return dl } @@ -310,8 +290,6 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error { logger.Error("Failed to register sync peer", "err", err) return err } - d.qosReduceConfidence() - return nil } @@ -670,7 +648,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty } go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true) - ttl := d.requestTTL() + ttl := d.peers.rates.TargetTimeout() timeout := time.After(ttl) for { select { @@ -853,7 +831,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} - ttl := d.requestTTL() + ttl := d.peers.rates.TargetTimeout() timeout := time.After(ttl) for finished := false; !finished; { @@ -942,7 +920,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 - ttl := d.requestTTL() + ttl := d.peers.rates.TargetTimeout() timeout := time.After(ttl) go p.peer.RequestHeadersByNumber(check, 1, 0, false) @@ -1035,7 +1013,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { getHeaders := func(from uint64) { request = time.Now() - ttl = d.requestTTL() + ttl = d.peers.rates.TargetTimeout() timeout.Reset(ttl) if skeleton { @@ -1050,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { pivoting = true request = time.Now() - ttl = d.requestTTL() + ttl = d.peers.rates.TargetTimeout() timeout.Reset(ttl) d.pivotLock.RLock() @@ -1262,12 +1240,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) } - expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } + expire = func() map[string]int { return d.queue.ExpireHeaders(d.peers.rates.TargetTimeout()) } reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) { return d.queue.ReserveHeaders(p, count), false, false } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } + capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.peers.rates.TargetRoundTrip()) } setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetHeadersIdle(accepted, deliveryTime) } @@ -1293,9 +1271,9 @@ func (d *Downloader) fetchBodies(from uint64) error { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles) } - expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } + expire = func() map[string]int { return d.queue.ExpireBodies(d.peers.rates.TargetTimeout()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } + capacity = func(p *peerConnection) int { return p.BlockCapacity(d.peers.rates.TargetRoundTrip()) } setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) } ) err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire, @@ -1317,9 +1295,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerID, pack.receipts) } - expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } + expire = func() map[string]int { return d.queue.ExpireReceipts(d.peers.rates.TargetTimeout()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } + capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.peers.rates.TargetRoundTrip()) } setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetReceiptsIdle(accepted, deliveryTime) } @@ -2031,78 +2009,3 @@ func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dro return errNoSyncActive } } - -// qosTuner is the quality of service tuning loop that occasionally gathers the -// peer latency statistics and updates the estimated request round trip time. -func (d *Downloader) qosTuner() { - for { - // Retrieve the current median RTT and integrate into the previoust target RTT - rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) - atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) - - // A new RTT cycle passed, increase our confidence in the estimated RTT - conf := atomic.LoadUint64(&d.rttConfidence) - conf = conf + (1000000-conf)/2 - atomic.StoreUint64(&d.rttConfidence, conf) - - // Log the new QoS values and sleep until the next RTT - log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) - select { - case <-d.quitCh: - return - case <-time.After(rtt): - } - } -} - -// qosReduceConfidence is meant to be called when a new peer joins the downloader's -// peer set, needing to reduce the confidence we have in out QoS estimates. -func (d *Downloader) qosReduceConfidence() { - // If we have a single peer, confidence is always 1 - peers := uint64(d.peers.Len()) - if peers == 0 { - // Ensure peer connectivity races don't catch us off guard - return - } - if peers == 1 { - atomic.StoreUint64(&d.rttConfidence, 1000000) - return - } - // If we have a ton of peers, don't drop confidence) - if peers >= uint64(qosConfidenceCap) { - return - } - // Otherwise drop the confidence factor - conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers - if float64(conf)/1000000 < rttMinConfidence { - conf = uint64(rttMinConfidence * 1000000) - } - atomic.StoreUint64(&d.rttConfidence, conf) - - rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) - log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) -} - -// requestRTT returns the current target round trip time for a download request -// to complete in. -// -// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that -// the downloader tries to adapt queries to the RTT, so multiple RTT values can -// be adapted to, but smaller ones are preferred (stabler download stream). -func (d *Downloader) requestRTT() time.Duration { - return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10 -} - -// requestTTL returns the current timeout allowance for a single download request -// to finish under. -func (d *Downloader) requestTTL() time.Duration { - var ( - rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate)) - conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0 - ) - ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf) - if ttl > ttlLimit { - ttl = ttlLimit - } - return ttl -} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index b3b6cc95a..b9c771694 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -32,11 +32,11 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/msgrate" ) const ( - maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items - measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items ) var ( @@ -54,18 +54,12 @@ type peerConnection struct { receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) - headerThroughput float64 // Number of headers measured to be retrievable per second - blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second - receiptThroughput float64 // Number of receipts measured to be retrievable per second - stateThroughput float64 // Number of node data pieces measured to be retrievable per second - - rtt time.Duration // Request round trip time to track responsiveness (QoS) - headerStarted time.Time // Time instance when the last header fetch was started blockStarted time.Time // Time instance when the last block (body) fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started + rates *msgrate.Tracker // Tracker to hone in on the number of items retrievable per second lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) peer Peer @@ -133,11 +127,6 @@ func (p *peerConnection) Reset() { atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.stateIdle, 0) - p.headerThroughput = 0 - p.blockThroughput = 0 - p.receiptThroughput = 0 - p.stateThroughput = 0 - p.lacking = make(map[common.Hash]struct{}) } @@ -212,93 +201,72 @@ func (p *peerConnection) FetchNodeData(hashes []common.Hash) error { // requests. Its estimated header retrieval throughput is updated with that measured // just now. func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) { - p.setIdle(deliveryTime.Sub(p.headerStarted), delivered, &p.headerThroughput, &p.headerIdle) + p.rates.Update(eth.BlockHeadersMsg, deliveryTime.Sub(p.headerStarted), delivered) + atomic.StoreInt32(&p.headerIdle, 0) } // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval // requests. Its estimated body retrieval throughput is updated with that measured // just now. func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) { - p.setIdle(deliveryTime.Sub(p.blockStarted), delivered, &p.blockThroughput, &p.blockIdle) + p.rates.Update(eth.BlockBodiesMsg, deliveryTime.Sub(p.blockStarted), delivered) + atomic.StoreInt32(&p.blockIdle, 0) } // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt // retrieval requests. Its estimated receipt retrieval throughput is updated // with that measured just now. func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) { - p.setIdle(deliveryTime.Sub(p.receiptStarted), delivered, &p.receiptThroughput, &p.receiptIdle) + p.rates.Update(eth.ReceiptsMsg, deliveryTime.Sub(p.receiptStarted), delivered) + atomic.StoreInt32(&p.receiptIdle, 0) } // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie // data retrieval requests. Its estimated state retrieval throughput is updated // with that measured just now. func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) { - p.setIdle(deliveryTime.Sub(p.stateStarted), delivered, &p.stateThroughput, &p.stateIdle) -} - -// setIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its estimated retrieval throughput is updated with that measured just now. -func (p *peerConnection) setIdle(elapsed time.Duration, delivered int, throughput *float64, idle *int32) { - // Irrelevant of the scaling, make sure the peer ends up idle - defer atomic.StoreInt32(idle, 0) - - p.lock.Lock() - defer p.lock.Unlock() - - // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum - if delivered == 0 { - *throughput = 0 - return - } - // Otherwise update the throughput with a new measurement - if elapsed <= 0 { - elapsed = 1 // +1 (ns) to ensure non-zero divisor - } - measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) - - *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured - p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) - - p.log.Trace("Peer throughput measurements updated", - "hps", p.headerThroughput, "bps", p.blockThroughput, - "rps", p.receiptThroughput, "sps", p.stateThroughput, - "miss", len(p.lacking), "rtt", p.rtt) + p.rates.Update(eth.NodeDataMsg, deliveryTime.Sub(p.stateStarted), delivered) + atomic.StoreInt32(&p.stateIdle, 0) } // HeaderCapacity retrieves the peers header download allowance based on its // previously discovered throughput. func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { - p.lock.RLock() - defer p.lock.RUnlock() - - return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch))) + cap := int(math.Ceil(p.rates.Capacity(eth.BlockHeadersMsg, targetRTT))) + if cap > MaxHeaderFetch { + cap = MaxHeaderFetch + } + return cap } // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int { - p.lock.RLock() - defer p.lock.RUnlock() - - return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch))) + cap := int(math.Ceil(p.rates.Capacity(eth.BlockBodiesMsg, targetRTT))) + if cap > MaxBlockFetch { + cap = MaxBlockFetch + } + return cap } // ReceiptCapacity retrieves the peers receipt download allowance based on its // previously discovered throughput. func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { - p.lock.RLock() - defer p.lock.RUnlock() - - return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch))) + cap := int(math.Ceil(p.rates.Capacity(eth.ReceiptsMsg, targetRTT))) + if cap > MaxReceiptFetch { + cap = MaxReceiptFetch + } + return cap } // NodeDataCapacity retrieves the peers state download allowance based on its // previously discovered throughput. func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int { - p.lock.RLock() - defer p.lock.RUnlock() - - return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch))) + cap := int(math.Ceil(p.rates.Capacity(eth.NodeDataMsg, targetRTT))) + if cap > MaxStateFetch { + cap = MaxStateFetch + } + return cap } // MarkLacking appends a new entity to the set of items (blocks, receipts, states) @@ -330,16 +298,20 @@ func (p *peerConnection) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peerConnection + peers map[string]*peerConnection + rates *msgrate.Trackers // Set of rate trackers to give the sync a common beat + newPeerFeed event.Feed peerDropFeed event.Feed - lock sync.RWMutex + + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. func newPeerSet() *peerSet { return &peerSet{ peers: make(map[string]*peerConnection), + rates: msgrate.NewTrackers(log.New("proto", "eth")), } } @@ -371,30 +343,15 @@ func (ps *peerSet) Reset() { // average of all existing peers, to give it a realistic chance of being used // for data retrievals. func (ps *peerSet) Register(p *peerConnection) error { - // Retrieve the current median RTT as a sane default - p.rtt = ps.medianRTT() - // Register the new peer with some meaningful defaults ps.lock.Lock() if _, ok := ps.peers[p.id]; ok { ps.lock.Unlock() return errAlreadyRegistered } - if len(ps.peers) > 0 { - p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0 - - for _, peer := range ps.peers { - peer.lock.RLock() - p.headerThroughput += peer.headerThroughput - p.blockThroughput += peer.blockThroughput - p.receiptThroughput += peer.receiptThroughput - p.stateThroughput += peer.stateThroughput - peer.lock.RUnlock() - } - p.headerThroughput /= float64(len(ps.peers)) - p.blockThroughput /= float64(len(ps.peers)) - p.receiptThroughput /= float64(len(ps.peers)) - p.stateThroughput /= float64(len(ps.peers)) + p.rates = msgrate.NewTracker(ps.rates.MeanCapacities(), ps.rates.MedianRoundTrip()) + if err := ps.rates.Track(p.id, p.rates); err != nil { + return err } ps.peers[p.id] = p ps.lock.Unlock() @@ -413,6 +370,7 @@ func (ps *peerSet) Unregister(id string) error { return errNotRegistered } delete(ps.peers, id) + ps.rates.Untrack(id) ps.lock.Unlock() ps.peerDropFeed.Send(p) @@ -454,9 +412,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { return atomic.LoadInt32(&p.headerIdle) == 0 } throughput := func(p *peerConnection) float64 { - p.lock.RLock() - defer p.lock.RUnlock() - return p.headerThroughput + return p.rates.Capacity(eth.BlockHeadersMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } @@ -468,9 +424,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { return atomic.LoadInt32(&p.blockIdle) == 0 } throughput := func(p *peerConnection) float64 { - p.lock.RLock() - defer p.lock.RUnlock() - return p.blockThroughput + return p.rates.Capacity(eth.BlockBodiesMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } @@ -482,9 +436,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { return atomic.LoadInt32(&p.receiptIdle) == 0 } throughput := func(p *peerConnection) float64 { - p.lock.RLock() - defer p.lock.RUnlock() - return p.receiptThroughput + return p.rates.Capacity(eth.ReceiptsMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } @@ -496,9 +448,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { return atomic.LoadInt32(&p.stateIdle) == 0 } throughput := func(p *peerConnection) float64 { - p.lock.RLock() - defer p.lock.RUnlock() - return p.stateThroughput + return p.rates.Capacity(eth.NodeDataMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } @@ -527,37 +477,6 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peer return sortPeers.p, total } -// medianRTT returns the median RTT of the peerset, considering only the tuning -// peers if there are more peers available. -func (ps *peerSet) medianRTT() time.Duration { - // Gather all the currently measured round trip times - ps.lock.RLock() - defer ps.lock.RUnlock() - - rtts := make([]float64, 0, len(ps.peers)) - for _, p := range ps.peers { - p.lock.RLock() - rtts = append(rtts, float64(p.rtt)) - p.lock.RUnlock() - } - sort.Float64s(rtts) - - median := rttMaxEstimate - if qosTuningPeers <= len(rtts) { - median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers - } else if len(rtts) > 0 { - median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos) - } - // Restrict the RTT into some QoS defaults, irrelevant of true RTT - if median < rttMinEstimate { - median = rttMinEstimate - } - if median > rttMaxEstimate { - median = rttMaxEstimate - } - return median -} - // peerThroughputSort implements the Sort interface, and allows for // sorting a set of peers by their throughput // The sorted data is with the _highest_ throughput first diff --git a/eth/downloader/peer_test.go b/eth/downloader/peer_test.go deleted file mode 100644 index 4bf0e200b..000000000 --- a/eth/downloader/peer_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2020 The go-ethereum Authors -// This file is part of go-ethereum. -// -// go-ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// go-ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with go-ethereum. If not, see . - -package downloader - -import ( - "sort" - "testing" -) - -func TestPeerThroughputSorting(t *testing.T) { - a := &peerConnection{ - id: "a", - headerThroughput: 1.25, - } - b := &peerConnection{ - id: "b", - headerThroughput: 1.21, - } - c := &peerConnection{ - id: "c", - headerThroughput: 1.23, - } - - peers := []*peerConnection{a, b, c} - tps := []float64{a.headerThroughput, - b.headerThroughput, c.headerThroughput} - sortPeers := &peerThroughputSort{peers, tps} - sort.Sort(sortPeers) - if got, exp := sortPeers.p[0].id, "a"; got != exp { - t.Errorf("sort fail, got %v exp %v", got, exp) - } - if got, exp := sortPeers.p[1].id, "c"; got != exp { - t.Errorf("sort fail, got %v exp %v", got, exp) - } - if got, exp := sortPeers.p[2].id, "b"; got != exp { - t.Errorf("sort fail, got %v exp %v", got, exp) - } - -} diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index ff84a3a8f..6c53e5577 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -433,8 +433,8 @@ func (s *stateSync) assignTasks() { peers, _ := s.d.peers.NodeDataIdlePeers() for _, p := range peers { // Assign a batch of fetches proportional to the estimated latency/bandwidth - cap := p.NodeDataCapacity(s.d.requestRTT()) - req := &stateReq{peer: p, timeout: s.d.requestTTL()} + cap := p.NodeDataCapacity(s.d.peers.rates.TargetRoundTrip()) + req := &stateReq{peer: p, timeout: s.d.peers.rates.TargetTimeout()} nodes, _, codes := s.fillTasks(cap, req) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index e28347320..c57fcd71f 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/msgrate" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" "golang.org/x/crypto/sha3" @@ -51,14 +52,15 @@ var ( ) const ( - // maxRequestSize is the maximum number of bytes to request from a remote peer. - maxRequestSize = 128 * 1024 + // minRequestSize is the minimum number of bytes to request from a remote peer. + // This number is used as the low cap for account and storage range requests. + // Bytecode and trienode are limited inherently by item count (1). + minRequestSize = 64 * 1024 - // maxStorageSetRequestCount is the maximum number of contracts to request the - // storage of in a single query. If this number is too low, we're not filling - // responses fully and waste round trip times. If it's too high, we're capping - // responses and waste bandwidth. - maxStorageSetRequestCount = maxRequestSize / 1024 + // maxRequestSize is the maximum number of bytes to request from a remote peer. + // This number is used as the high cap for account and storage range requests. + // Bytecode and trienode are limited more explicitly by the caps below. + maxRequestSize = 512 * 1024 // maxCodeRequestCount is the maximum number of bytecode blobs to request in a // single query. If this number is too low, we're not filling responses fully @@ -74,7 +76,7 @@ const ( // a single query. If this number is too low, we're not filling responses fully // and waste round trip times. If it's too high, we're capping responses and // waste bandwidth. - maxTrieRequestCount = 256 + maxTrieRequestCount = maxRequestSize / 512 ) var ( @@ -85,10 +87,6 @@ var ( // storageConcurrency is the number of chunks to split the a large contract // storage trie into to allow concurrent retrievals. storageConcurrency = 16 - - // requestTimeout is the maximum time a peer is allowed to spend on serving - // a single network request. - requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync? ) // ErrCancelled is returned from snap syncing if the operation was prematurely @@ -105,8 +103,9 @@ var ErrCancelled = errors.New("sync cancelled") // is only included to allow the runloop to match a response to the task being // synced without having yet another set of maps. type accountRequest struct { - peer string // Peer to which this request is assigned - id uint64 // Request ID of this request + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + time time.Time // Timestamp when the request was sent deliver chan *accountResponse // Channel to deliver successful response on revert chan *accountRequest // Channel to deliver request failure on @@ -142,8 +141,9 @@ type accountResponse struct { // is only included to allow the runloop to match a response to the task being // synced without having yet another set of maps. type bytecodeRequest struct { - peer string // Peer to which this request is assigned - id uint64 // Request ID of this request + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + time time.Time // Timestamp when the request was sent deliver chan *bytecodeResponse // Channel to deliver successful response on revert chan *bytecodeRequest // Channel to deliver request failure on @@ -173,8 +173,9 @@ type bytecodeResponse struct { // is only included to allow the runloop to match a response to the task being // synced without having yet another set of maps. type storageRequest struct { - peer string // Peer to which this request is assigned - id uint64 // Request ID of this request + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + time time.Time // Timestamp when the request was sent deliver chan *storageResponse // Channel to deliver successful response on revert chan *storageRequest // Channel to deliver request failure on @@ -218,8 +219,9 @@ type storageResponse struct { // is only included to allow the runloop to match a response to the task being // synced without having yet another set of maps. type trienodeHealRequest struct { - peer string // Peer to which this request is assigned - id uint64 // Request ID of this request + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + time time.Time // Timestamp when the request was sent deliver chan *trienodeHealResponse // Channel to deliver successful response on revert chan *trienodeHealRequest // Channel to deliver request failure on @@ -252,8 +254,9 @@ type trienodeHealResponse struct { // is only included to allow the runloop to match a response to the task being // synced without having yet another set of maps. type bytecodeHealRequest struct { - peer string // Peer to which this request is assigned - id uint64 // Request ID of this request + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + time time.Time // Timestamp when the request was sent deliver chan *bytecodeHealResponse // Channel to deliver successful response on revert chan *bytecodeHealRequest // Channel to deliver request failure on @@ -396,6 +399,7 @@ type Syncer struct { peers map[string]SyncPeer // Currently active peers to download from peerJoin *event.Feed // Event feed to react to peers joining peerDrop *event.Feed // Event feed to react to peers dropping + rates *msgrate.Trackers // Message throughput rates for peers // Request tracking during syncing phase statelessPeers map[string]struct{} // Peers that failed to deliver state data @@ -452,6 +456,7 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer { peers: make(map[string]SyncPeer), peerJoin: new(event.Feed), peerDrop: new(event.Feed), + rates: msgrate.NewTrackers(log.New("proto", "snap")), update: make(chan struct{}, 1), accountIdlers: make(map[string]struct{}), @@ -484,6 +489,7 @@ func (s *Syncer) Register(peer SyncPeer) error { return errors.New("already registered") } s.peers[id] = peer + s.rates.Track(id, msgrate.NewTracker(s.rates.MeanCapacities(), s.rates.MedianRoundTrip())) // Mark the peer as idle, even if no sync is running s.accountIdlers[id] = struct{}{} @@ -509,6 +515,7 @@ func (s *Syncer) Unregister(id string) error { return errors.New("not registered") } delete(s.peers, id) + s.rates.Untrack(id) // Remove status markers, even if no sync is running delete(s.statelessPeers, id) @@ -851,10 +858,24 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac s.lock.Lock() defer s.lock.Unlock() - // If there are no idle peers, short circuit assignment - if len(s.accountIdlers) == 0 { + // Sort the peers by download capacity to use faster ones if many available + idlers := &capacitySort{ + ids: make([]string, 0, len(s.accountIdlers)), + caps: make([]float64, 0, len(s.accountIdlers)), + } + targetTTL := s.rates.TargetTimeout() + for id := range s.accountIdlers { + if _, ok := s.statelessPeers[id]; ok { + continue + } + idlers.ids = append(idlers.ids, id) + idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL)) + } + if len(idlers.ids) == 0 { return } + sort.Sort(sort.Reverse(idlers)) + // Iterate over all the tasks and try to find a pending one for _, task := range s.tasks { // Skip any tasks already filling @@ -864,20 +885,15 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). // Abort the entire assignment mechanism. - var idle string - for id := range s.accountIdlers { - // If the peer rejected a query in this sync cycle, don't bother asking - // again for anything, it's either out of sync or already pruned - if _, ok := s.statelessPeers[id]; ok { - continue - } - idle = id - break - } - if idle == "" { + if len(idlers.ids) == 0 { return } - peer := s.peers[idle] + var ( + idle = idlers.ids[0] + peer = s.peers[idle] + cap = idlers.caps[0] + ) + idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] // Matched a pending task to an idle peer, allocate a unique request id var reqid uint64 @@ -895,6 +911,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac req := &accountRequest{ peer: idle, id: reqid, + time: time.Now(), deliver: success, revert: fail, cancel: cancel, @@ -903,8 +920,9 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac limit: task.Last, task: task, } - req.timeout = time.AfterFunc(requestTimeout, func() { + req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { peer.Log().Debug("Account range request timed out", "reqid", reqid) + s.rates.Update(idle, AccountRangeMsg, 0, 0) s.scheduleRevertAccountRequest(req) }) s.accountReqs[reqid] = req @@ -915,7 +933,13 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac defer s.pend.Done() // Attempt to send the remote request and revert if it fails - if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil { + if cap > maxRequestSize { + cap = maxRequestSize + } + if cap < minRequestSize { // Don't bother with peers below a bare minimum performance + cap = minRequestSize + } + if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, uint64(cap)); err != nil { peer.Log().Debug("Failed to request account range", "err", err) s.scheduleRevertAccountRequest(req) } @@ -931,10 +955,24 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * s.lock.Lock() defer s.lock.Unlock() - // If there are no idle peers, short circuit assignment - if len(s.bytecodeIdlers) == 0 { + // Sort the peers by download capacity to use faster ones if many available + idlers := &capacitySort{ + ids: make([]string, 0, len(s.bytecodeIdlers)), + caps: make([]float64, 0, len(s.bytecodeIdlers)), + } + targetTTL := s.rates.TargetTimeout() + for id := range s.bytecodeIdlers { + if _, ok := s.statelessPeers[id]; ok { + continue + } + idlers.ids = append(idlers.ids, id) + idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) + } + if len(idlers.ids) == 0 { return } + sort.Sort(sort.Reverse(idlers)) + // Iterate over all the tasks and try to find a pending one for _, task := range s.tasks { // Skip any tasks not in the bytecode retrieval phase @@ -948,20 +986,15 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). // Abort the entire assignment mechanism. - var idle string - for id := range s.bytecodeIdlers { - // If the peer rejected a query in this sync cycle, don't bother asking - // again for anything, it's either out of sync or already pruned - if _, ok := s.statelessPeers[id]; ok { - continue - } - idle = id - break - } - if idle == "" { + if len(idlers.ids) == 0 { return } - peer := s.peers[idle] + var ( + idle = idlers.ids[0] + peer = s.peers[idle] + cap = idlers.caps[0] + ) + idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] // Matched a pending task to an idle peer, allocate a unique request id var reqid uint64 @@ -976,17 +1009,21 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * break } // Generate the network query and send it to the peer - hashes := make([]common.Hash, 0, maxCodeRequestCount) + if cap > maxCodeRequestCount { + cap = maxCodeRequestCount + } + hashes := make([]common.Hash, 0, int(cap)) for hash := range task.codeTasks { delete(task.codeTasks, hash) hashes = append(hashes, hash) - if len(hashes) >= maxCodeRequestCount { + if len(hashes) >= int(cap) { break } } req := &bytecodeRequest{ peer: idle, id: reqid, + time: time.Now(), deliver: success, revert: fail, cancel: cancel, @@ -994,8 +1031,9 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * hashes: hashes, task: task, } - req.timeout = time.AfterFunc(requestTimeout, func() { + req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { peer.Log().Debug("Bytecode request timed out", "reqid", reqid) + s.rates.Update(idle, ByteCodesMsg, 0, 0) s.scheduleRevertBytecodeRequest(req) }) s.bytecodeReqs[reqid] = req @@ -1020,10 +1058,24 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st s.lock.Lock() defer s.lock.Unlock() - // If there are no idle peers, short circuit assignment - if len(s.storageIdlers) == 0 { + // Sort the peers by download capacity to use faster ones if many available + idlers := &capacitySort{ + ids: make([]string, 0, len(s.storageIdlers)), + caps: make([]float64, 0, len(s.storageIdlers)), + } + targetTTL := s.rates.TargetTimeout() + for id := range s.storageIdlers { + if _, ok := s.statelessPeers[id]; ok { + continue + } + idlers.ids = append(idlers.ids, id) + idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL)) + } + if len(idlers.ids) == 0 { return } + sort.Sort(sort.Reverse(idlers)) + // Iterate over all the tasks and try to find a pending one for _, task := range s.tasks { // Skip any tasks not in the storage retrieval phase @@ -1037,20 +1089,15 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). // Abort the entire assignment mechanism. - var idle string - for id := range s.storageIdlers { - // If the peer rejected a query in this sync cycle, don't bother asking - // again for anything, it's either out of sync or already pruned - if _, ok := s.statelessPeers[id]; ok { - continue - } - idle = id - break - } - if idle == "" { + if len(idlers.ids) == 0 { return } - peer := s.peers[idle] + var ( + idle = idlers.ids[0] + peer = s.peers[idle] + cap = idlers.caps[0] + ) + idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] // Matched a pending task to an idle peer, allocate a unique request id var reqid uint64 @@ -1067,9 +1114,17 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st // Generate the network query and send it to the peer. If there are // large contract tasks pending, complete those before diving into // even more new contracts. + if cap > maxRequestSize { + cap = maxRequestSize + } + if cap < minRequestSize { // Don't bother with peers below a bare minimum performance + cap = minRequestSize + } + storageSets := int(cap / 1024) + var ( - accounts = make([]common.Hash, 0, maxStorageSetRequestCount) - roots = make([]common.Hash, 0, maxStorageSetRequestCount) + accounts = make([]common.Hash, 0, storageSets) + roots = make([]common.Hash, 0, storageSets) subtask *storageTask ) for account, subtasks := range task.SubTasks { @@ -1096,7 +1151,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st accounts = append(accounts, acccount) roots = append(roots, root) - if len(accounts) >= maxStorageSetRequestCount { + if len(accounts) >= storageSets { break } } @@ -1109,6 +1164,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st req := &storageRequest{ peer: idle, id: reqid, + time: time.Now(), deliver: success, revert: fail, cancel: cancel, @@ -1122,8 +1178,9 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st req.origin = subtask.Next req.limit = subtask.Last } - req.timeout = time.AfterFunc(requestTimeout, func() { + req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { peer.Log().Debug("Storage request timed out", "reqid", reqid) + s.rates.Update(idle, StorageRangesMsg, 0, 0) s.scheduleRevertStorageRequest(req) }) s.storageReqs[reqid] = req @@ -1138,7 +1195,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st if subtask != nil { origin, limit = req.origin[:], req.limit[:] } - if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil { + if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, uint64(cap)); err != nil { log.Debug("Failed to request storage", "err", err) s.scheduleRevertStorageRequest(req) } @@ -1157,10 +1214,24 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai s.lock.Lock() defer s.lock.Unlock() - // If there are no idle peers, short circuit assignment - if len(s.trienodeHealIdlers) == 0 { + // Sort the peers by download capacity to use faster ones if many available + idlers := &capacitySort{ + ids: make([]string, 0, len(s.trienodeHealIdlers)), + caps: make([]float64, 0, len(s.trienodeHealIdlers)), + } + targetTTL := s.rates.TargetTimeout() + for id := range s.trienodeHealIdlers { + if _, ok := s.statelessPeers[id]; ok { + continue + } + idlers.ids = append(idlers.ids, id) + idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL)) + } + if len(idlers.ids) == 0 { return } + sort.Sort(sort.Reverse(idlers)) + // Iterate over pending tasks and try to find a peer to retrieve with for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 { // If there are not enough trie tasks queued to fully assign, fill the @@ -1186,20 +1257,15 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). // Abort the entire assignment mechanism. - var idle string - for id := range s.trienodeHealIdlers { - // If the peer rejected a query in this sync cycle, don't bother asking - // again for anything, it's either out of sync or already pruned - if _, ok := s.statelessPeers[id]; ok { - continue - } - idle = id - break - } - if idle == "" { + if len(idlers.ids) == 0 { return } - peer := s.peers[idle] + var ( + idle = idlers.ids[0] + peer = s.peers[idle] + cap = idlers.caps[0] + ) + idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] // Matched a pending task to an idle peer, allocate a unique request id var reqid uint64 @@ -1214,10 +1280,13 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai break } // Generate the network query and send it to the peer + if cap > maxTrieRequestCount { + cap = maxTrieRequestCount + } var ( - hashes = make([]common.Hash, 0, maxTrieRequestCount) - paths = make([]trie.SyncPath, 0, maxTrieRequestCount) - pathsets = make([]TrieNodePathSet, 0, maxTrieRequestCount) + hashes = make([]common.Hash, 0, int(cap)) + paths = make([]trie.SyncPath, 0, int(cap)) + pathsets = make([]TrieNodePathSet, 0, int(cap)) ) for hash, pathset := range s.healer.trieTasks { delete(s.healer.trieTasks, hash) @@ -1226,13 +1295,14 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai paths = append(paths, pathset) pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash - if len(hashes) >= maxTrieRequestCount { + if len(hashes) >= int(cap) { break } } req := &trienodeHealRequest{ peer: idle, id: reqid, + time: time.Now(), deliver: success, revert: fail, cancel: cancel, @@ -1241,8 +1311,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai paths: paths, task: s.healer, } - req.timeout = time.AfterFunc(requestTimeout, func() { + req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { peer.Log().Debug("Trienode heal request timed out", "reqid", reqid) + s.rates.Update(idle, TrieNodesMsg, 0, 0) s.scheduleRevertTrienodeHealRequest(req) }) s.trienodeHealReqs[reqid] = req @@ -1267,10 +1338,24 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai s.lock.Lock() defer s.lock.Unlock() - // If there are no idle peers, short circuit assignment - if len(s.bytecodeHealIdlers) == 0 { + // Sort the peers by download capacity to use faster ones if many available + idlers := &capacitySort{ + ids: make([]string, 0, len(s.bytecodeHealIdlers)), + caps: make([]float64, 0, len(s.bytecodeHealIdlers)), + } + targetTTL := s.rates.TargetTimeout() + for id := range s.bytecodeHealIdlers { + if _, ok := s.statelessPeers[id]; ok { + continue + } + idlers.ids = append(idlers.ids, id) + idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) + } + if len(idlers.ids) == 0 { return } + sort.Sort(sort.Reverse(idlers)) + // Iterate over pending tasks and try to find a peer to retrieve with for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 { // If there are not enough trie tasks queued to fully assign, fill the @@ -1296,20 +1381,15 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). // Abort the entire assignment mechanism. - var idle string - for id := range s.bytecodeHealIdlers { - // If the peer rejected a query in this sync cycle, don't bother asking - // again for anything, it's either out of sync or already pruned - if _, ok := s.statelessPeers[id]; ok { - continue - } - idle = id - break - } - if idle == "" { + if len(idlers.ids) == 0 { return } - peer := s.peers[idle] + var ( + idle = idlers.ids[0] + peer = s.peers[idle] + cap = idlers.caps[0] + ) + idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] // Matched a pending task to an idle peer, allocate a unique request id var reqid uint64 @@ -1324,18 +1404,22 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai break } // Generate the network query and send it to the peer - hashes := make([]common.Hash, 0, maxCodeRequestCount) + if cap > maxCodeRequestCount { + cap = maxCodeRequestCount + } + hashes := make([]common.Hash, 0, int(cap)) for hash := range s.healer.codeTasks { delete(s.healer.codeTasks, hash) hashes = append(hashes, hash) - if len(hashes) >= maxCodeRequestCount { + if len(hashes) >= int(cap) { break } } req := &bytecodeHealRequest{ peer: idle, id: reqid, + time: time.Now(), deliver: success, revert: fail, cancel: cancel, @@ -1343,8 +1427,9 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai hashes: hashes, task: s.healer, } - req.timeout = time.AfterFunc(requestTimeout, func() { + req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid) + s.rates.Update(idle, ByteCodesMsg, 0, 0) s.scheduleRevertBytecodeHealRequest(req) }) s.bytecodeHealReqs[reqid] = req @@ -2142,6 +2227,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco return nil } delete(s.accountReqs, id) + s.rates.Update(peer.ID(), AccountRangeMsg, time.Since(req.time), int(size)) // Clean up the request timeout timer, we'll see how to proceed further based // on the actual delivered content @@ -2253,6 +2339,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error return nil } delete(s.bytecodeReqs, id) + s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes)) // Clean up the request timeout timer, we'll see how to proceed further based // on the actual delivered content @@ -2361,6 +2448,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo return nil } delete(s.storageReqs, id) + s.rates.Update(peer.ID(), StorageRangesMsg, time.Since(req.time), int(size)) // Clean up the request timeout timer, we'll see how to proceed further based // on the actual delivered content @@ -2487,6 +2575,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error return nil } delete(s.trienodeHealReqs, id) + s.rates.Update(peer.ID(), TrieNodesMsg, time.Since(req.time), len(trienodes)) // Clean up the request timeout timer, we'll see how to proceed further based // on the actual delivered content @@ -2581,6 +2670,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e return nil } delete(s.bytecodeHealReqs, id) + s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes)) // Clean up the request timeout timer, we'll see how to proceed further based // on the actual delivered content @@ -2756,3 +2846,24 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) { } return space.Uint64() - uint64(hashes), nil } + +// capacitySort implements the Sort interface, allowing sorting by peer message +// throughput. Note, callers should use sort.Reverse to get the desired effect +// of highest capacity being at the front. +type capacitySort struct { + ids []string + caps []float64 +} + +func (s *capacitySort) Len() int { + return len(s.ids) +} + +func (s *capacitySort) Less(i, j int) bool { + return s.caps[i] < s.caps[j] +} + +func (s *capacitySort) Swap(i, j int) { + s.ids[i], s.ids[j] = s.ids[j], s.ids[i] + s.caps[i], s.caps[j] = s.caps[j], s.caps[i] +} diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index a1cc3581a..023fc8ee0 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -796,12 +796,6 @@ func TestMultiSyncManyUseless(t *testing.T) { // TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) { - // We're setting the timeout to very low, to increase the chance of the timeout - // being triggered. This was previously a cause of panic, when a response - // arrived simultaneously as a timeout was triggered. - defer func(old time.Duration) { requestTimeout = old }(requestTimeout) - requestTimeout = time.Millisecond - var ( once sync.Once cancel = make(chan struct{}) @@ -838,6 +832,11 @@ func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) { mkSource("noStorage", true, false, true), mkSource("noTrie", true, true, false), ) + // We're setting the timeout to very low, to increase the chance of the timeout + // being triggered. This was previously a cause of panic, when a response + // arrived simultaneously as a timeout was triggered. + syncer.rates.OverrideTTLLimit = time.Millisecond + done := checkStall(t, term) if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil { t.Fatalf("sync failed: %v", err) @@ -848,10 +847,6 @@ func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) { // TestMultiSyncManyUnresponsive contains one good peer, and many which doesn't respond at all func TestMultiSyncManyUnresponsive(t *testing.T) { - // We're setting the timeout to very low, to make the test run a bit faster - defer func(old time.Duration) { requestTimeout = old }(requestTimeout) - requestTimeout = time.Millisecond - var ( once sync.Once cancel = make(chan struct{}) @@ -888,6 +883,9 @@ func TestMultiSyncManyUnresponsive(t *testing.T) { mkSource("noStorage", true, false, true), mkSource("noTrie", true, true, false), ) + // We're setting the timeout to very low, to make the test run a bit faster + syncer.rates.OverrideTTLLimit = time.Millisecond + done := checkStall(t, term) if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil { t.Fatalf("sync failed: %v", err) diff --git a/p2p/msgrate/msgrate.go b/p2p/msgrate/msgrate.go new file mode 100644 index 000000000..7cd172c56 --- /dev/null +++ b/p2p/msgrate/msgrate.go @@ -0,0 +1,458 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package msgrate allows estimating the throughput of peers for more balanced syncs. +package msgrate + +import ( + "errors" + "sort" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +// measurementImpact is the impact a single measurement has on a peer's final +// capacity value. A value closer to 0 reacts slower to sudden network changes, +// but it is also more stable against temporary hiccups. 0.1 worked well for +// most of Ethereum's existence, so might as well go with it. +const measurementImpact = 0.1 + +// capacityOverestimation is the ratio of items to over-estimate when retrieving +// a peer's capacity to avoid locking into a lower value due to never attempting +// to fetch more than some local stable value. +const capacityOverestimation = 1.01 + +// qosTuningPeers is the number of best peers to tune round trip times based on. +// An Ethereum node doesn't need hundreds of connections to operate correctly, +// so instead of lowering our download speed to the median of potentially many +// bad nodes, we can target a smaller set of vey good nodes. At worse this will +// result in less nodes to sync from, but that's still better than some hogging +// the pipeline. +const qosTuningPeers = 5 + +// rttMinEstimate is the minimal round trip time to target requests for. Since +// every request entails a 2 way latency + bandwidth + serving database lookups, +// it should be generous enough to permit meaningful work to be done on top of +// the transmission costs. +const rttMinEstimate = 2 * time.Second + +// rttMaxEstimate is the maximal round trip time to target requests for. Although +// the expectation is that a well connected node will never reach this, certain +// special connectivity ones might experience significant delays (e.g. satellite +// uplink with 3s RTT). This value should be low enough to forbid stalling the +// pipeline too long, but large enough to cover the worst of the worst links. +const rttMaxEstimate = 20 * time.Second + +// rttPushdownFactor is a multiplier to attempt forcing quicker requests than +// what the message rate tracker estimates. The reason is that message rate +// tracking adapts queries to the RTT, but multiple RTT values can be perfectly +// valid, they just result in higher packet sizes. Since smaller packets almost +// always result in stabler download streams, this factor hones in on the lowest +// RTT from all the functional ones. +const rttPushdownFactor = 0.9 + +// rttMinConfidence is the minimum value the roundtrip confidence factor may drop +// to. Since the target timeouts are based on how confident the tracker is in the +// true roundtrip, it's important to not allow too huge fluctuations. +const rttMinConfidence = 0.1 + +// ttlScaling is the multiplier that converts the estimated roundtrip time to a +// timeout cap for network requests. The expectation is that peers' response time +// will fluctuate around the estimated roundtrip, but depending in their load at +// request time, it might be higher than anticipated. This scaling factor ensures +// that we allow remote connections some slack but at the same time do enforce a +// behavior similar to our median peers. +const ttlScaling = 3 + +// ttlLimit is the maximum timeout allowance to prevent reaching crazy numbers +// if some unforeseen network events shappen. As much as we try to hone in on +// the most optimal values, it doesn't make any sense to go above a threshold, +// even if everything is slow and screwy. +const ttlLimit = time.Minute + +// tuningConfidenceCap is the number of active peers above which to stop detuning +// the confidence number. The idea here is that once we hone in on the capacity +// of a meaningful number of peers, adding one more should ot have a significant +// impact on things, so just ron with the originals. +const tuningConfidenceCap = 10 + +// tuningImpact is the influence that a new tuning target has on the previously +// cached value. This number is mostly just an out-of-the-blue heuristic that +// prevents the estimates from jumping around. There's no particular reason for +// the current value. +const tuningImpact = 0.25 + +// Tracker estimates the throughput capacity of a peer with regard to each data +// type it can deliver. The goal is to dynamically adjust request sizes to max +// out network throughput without overloading either the peer or th elocal node. +// +// By tracking in real time the latencies and bandiwdths peers exhibit for each +// packet type, it's possible to prevent overloading by detecting a slowdown on +// one type when another type is pushed too hard. +// +// Similarly, real time measurements also help avoid overloading the local net +// connection if our peers would otherwise be capable to deliver more, but the +// local link is saturated. In that case, the live measurements will force us +// to reduce request sizes until the throughput gets stable. +// +// Lastly, message rate measurements allows us to detect if a peer is unsuaully +// slow compared to other peers, in which case we can decide to keep it around +// or free up the slot so someone closer. +// +// Since throughput tracking and estimation adapts dynamically to live network +// conditions, it's fine to have multiple trackers locally track the same peer +// in different subsystem. The throughput will simply be distributed across the +// two trackers if both are highly active. +type Tracker struct { + // capacity is the number of items retrievable per second of a given type. + // It is analogous to bandwidth, but we deliberately avoided using bytes + // as the unit, since serving nodes also spend a lot of time loading data + // from disk, which is linear in the number of items, but mostly constant + // in their sizes. + // + // Callers of course are free to use the item counter as a byte counter if + // or when their protocol of choise if capped by bytes instead of items. + // (eg. eth.getHeaders vs snap.getAccountRange). + capacity map[uint64]float64 + + // roundtrip is the latency a peer in general responds to data requests. + // This number is not used inside the tracker, but is exposed to compare + // peers to each other and filter out slow ones. Note however, it only + // makes sense to compare RTTs if the caller caters request sizes for + // each peer to target the same RTT. There's no need to make this number + // the real networking RTT, we just need a number to compare peers with. + roundtrip time.Duration + + lock sync.RWMutex +} + +// NewTracker creates a new message rate tracker for a specific peer. An initial +// RTT is needed to avoid a peer getting marked as an outlier compared to others +// right after joining. It's suggested to use the median rtt across all peers to +// init a new peer tracker. +func NewTracker(caps map[uint64]float64, rtt time.Duration) *Tracker { + if caps == nil { + caps = make(map[uint64]float64) + } + return &Tracker{ + capacity: caps, + roundtrip: rtt, + } +} + +// Capacity calculates the number of items the peer is estimated to be able to +// retrieve within the alloted time slot. The method will round up any division +// errors and will add an additional overestimation ratio on top. The reason for +// overshooting the capacity is because certain message types might not increase +// the load proportionally to the requested items, so fetching a bit more might +// still take the same RTT. By forcefully overshooting by a small amount, we can +// avoid locking into a lower-that-real capacity. +func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 { + t.lock.RLock() + defer t.lock.RUnlock() + + // Calculate the actual measured throughput + throughput := t.capacity[kind] * float64(targetRTT) / float64(time.Second) + + // Return an overestimation to force the peer out of a stuck minima, adding + // +1 in case the item count is too low for the overestimator to dent + return 1 + capacityOverestimation*throughput +} + +// Update modifies the peer's capacity values for a specific data type with a new +// measurement. If the delivery is zero, the peer is assumed to have either timed +// out or to not have the requested data, resulting in a slash to 0 capacity. This +// avoids assigning the peer retrievals that it won't be able to honour. +func (t *Tracker) Update(kind uint64, elapsed time.Duration, items int) { + t.lock.Lock() + defer t.lock.Unlock() + + // If nothing was delivered (timeout / unavailable data), reduce throughput + // to minimum + if items == 0 { + t.capacity[kind] = 0 + return + } + // Otherwise update the throughput with a new measurement + if elapsed <= 0 { + elapsed = 1 // +1 (ns) to ensure non-zero divisor + } + measured := float64(items) / (float64(elapsed) / float64(time.Second)) + + t.capacity[kind] = (1-measurementImpact)*(t.capacity[kind]) + measurementImpact*measured + t.roundtrip = time.Duration((1-measurementImpact)*float64(t.roundtrip) + measurementImpact*float64(elapsed)) +} + +// Trackers is a set of message rate trackers across a number of peers with the +// goal of aggregating certain measurements across the entire set for outlier +// filtering and newly joining initialization. +type Trackers struct { + trackers map[string]*Tracker + + // roundtrip is the current best guess as to what is a stable round trip time + // across the entire collection of connected peers. This is derived from the + // various trackers added, but is used as a cache to avoid recomputing on each + // network request. The value is updated once every RTT to avoid fluctuations + // caused by hiccups or peer events. + roundtrip time.Duration + + // confidence represents the probability that the estimated roundtrip value + // is the real one across all our peers. The confidence value is used as an + // impact factor of new measurements on old estimates. As our connectivity + // stabilizes, this value gravitates towards 1, new measurements havinng + // almost no impact. If there's a large peer churn and few peers, then new + // measurements will impact it more. The confidence is increased with every + // packet and dropped with every new connection. + confidence float64 + + // tuned is the time instance the tracker recalculated its cached roundtrip + // value and confidence values. A cleaner way would be to have a heartbeat + // goroutine do it regularly, but that requires a lot of maintenance to just + // run every now and again. + tuned time.Time + + // The fields below can be used to override certain default values. Their + // purpose is to allow quicker tests. Don't use them in production. + OverrideTTLLimit time.Duration + + log log.Logger + lock sync.RWMutex +} + +// NewTrackers creates an empty set of trackers to be filled with peers. +func NewTrackers(log log.Logger) *Trackers { + return &Trackers{ + trackers: make(map[string]*Tracker), + roundtrip: rttMaxEstimate, + confidence: 1, + tuned: time.Now(), + OverrideTTLLimit: ttlLimit, + log: log, + } +} + +// Track inserts a new tracker into the set. +func (t *Trackers) Track(id string, tracker *Tracker) error { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.trackers[id]; ok { + return errors.New("already tracking") + } + t.trackers[id] = tracker + t.detune() + + return nil +} + +// Untrack stops tracking a previously added peer. +func (t *Trackers) Untrack(id string) error { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.trackers[id]; !ok { + return errors.New("not tracking") + } + delete(t.trackers, id) + return nil +} + +// MedianRoundTrip returns the median RTT across all known trackers. The purpose +// of the median RTT is to initialize a new peer with sane statistics that it will +// hopefully outperform. If it seriously underperforms, there's a risk of dropping +// the peer, but that is ok as we're aiming for a strong median. +func (t *Trackers) MedianRoundTrip() time.Duration { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.medianRoundTrip() +} + +// medianRoundTrip is the internal lockless version of MedianRoundTrip to be used +// by the QoS tuner. +func (t *Trackers) medianRoundTrip() time.Duration { + // Gather all the currently measured round trip times + rtts := make([]float64, 0, len(t.trackers)) + for _, tt := range t.trackers { + tt.lock.RLock() + rtts = append(rtts, float64(tt.roundtrip)) + tt.lock.RUnlock() + } + sort.Float64s(rtts) + + median := rttMaxEstimate + if qosTuningPeers <= len(rtts) { + median = time.Duration(rtts[qosTuningPeers/2]) // Median of our best few peers + } else if len(rtts) > 0 { + median = time.Duration(rtts[len(rtts)/2]) // Median of all out connected peers + } + // Restrict the RTT into some QoS defaults, irrelevant of true RTT + if median < rttMinEstimate { + median = rttMinEstimate + } + if median > rttMaxEstimate { + median = rttMaxEstimate + } + return median +} + +// MeanCapacities returns the capacities averaged across all the added trackers. +// The purpos of the mean capacities are to initialize a new peer with some sane +// starting values that it will hopefully outperform. If the mean overshoots, the +// peer will be cut back to minimal capacity and given another chance. +func (t *Trackers) MeanCapacities() map[uint64]float64 { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.meanCapacities() +} + +// meanCapacities is the internal lockless version of MeanCapacities used for +// debug logging. +func (t *Trackers) meanCapacities() map[uint64]float64 { + capacities := make(map[uint64]float64) + for _, tt := range t.trackers { + tt.lock.RLock() + for key, val := range tt.capacity { + capacities[key] += val + } + tt.lock.RUnlock() + } + for key, val := range capacities { + capacities[key] = val / float64(len(t.trackers)) + } + return capacities +} + +// TargetRoundTrip returns the current target round trip time for a request to +// complete in.The returned RTT is slightly under the estimated RTT. The reason +// is that message rate estimation is a 2 dimensional problem which is solvable +// for any RTT. The goal is to gravitate towards smaller RTTs instead of large +// messages, to result in a stabler download stream. +func (t *Trackers) TargetRoundTrip() time.Duration { + // Recalculate the internal caches if it's been a while + t.tune() + + // Caches surely recent, return target roundtrip + t.lock.RLock() + defer t.lock.RUnlock() + + return time.Duration(float64(t.roundtrip) * rttPushdownFactor) +} + +// TargetTimeout returns the timeout allowance for a single request to finish +// under. The timeout is proportional to the roundtrip, but also takes into +// consideration the tracker's confidence in said roundtrip and scales it +// accordingly. The final value is capped to avoid runaway requests. +func (t *Trackers) TargetTimeout() time.Duration { + // Recalculate the internal caches if it's been a while + t.tune() + + // Caches surely recent, return target timeout + t.lock.RLock() + defer t.lock.RUnlock() + + return t.targetTimeout() +} + +// targetTimeout is the internal lockless version of TargetTimeout to be used +// during QoS tuning. +func (t *Trackers) targetTimeout() time.Duration { + timeout := time.Duration(ttlScaling * float64(t.roundtrip) / t.confidence) + if timeout > t.OverrideTTLLimit { + timeout = t.OverrideTTLLimit + } + return timeout +} + +// tune gathers the individual tracker statistics and updates the estimated +// request round trip time. +func (t *Trackers) tune() { + // Tune may be called concurrently all over the place, but we only want to + // periodically update and even then only once. First check if it was updated + // recently and abort if so. + t.lock.RLock() + dirty := time.Since(t.tuned) > t.roundtrip + t.lock.RUnlock() + if !dirty { + return + } + // If an update is needed, obtain a write lock but make sure we don't update + // it on all concurrent threads one by one. + t.lock.Lock() + defer t.lock.Unlock() + + if dirty := time.Since(t.tuned) > t.roundtrip; !dirty { + return // A concurrent request beat us to the tuning + } + // First thread reaching the tuning point, update the estimates and return + t.roundtrip = time.Duration((1-tuningImpact)*float64(t.roundtrip) + tuningImpact*float64(t.medianRoundTrip())) + t.confidence = t.confidence + (1-t.confidence)/2 + + t.tuned = time.Now() + t.log.Debug("Recalculated msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout(), "next", t.tuned.Add(t.roundtrip)) + t.log.Trace("Debug dump of mean capacities", "caps", log.Lazy{Fn: t.meanCapacities}) +} + +// detune reduces the tracker's confidence in order to make fresh measurements +// have a larger impact on the estimates. It is meant to be used during new peer +// connections so they can have a proper impact on the estimates. +func (t *Trackers) detune() { + // If we have a single peer, confidence is always 1 + if len(t.trackers) == 1 { + t.confidence = 1 + return + } + // If we have a ton of peers, don't drop the confidence since there's enough + // remaining to retain the same throughput + if len(t.trackers) >= tuningConfidenceCap { + return + } + // Otherwise drop the confidence factor + peers := float64(len(t.trackers)) + + t.confidence = t.confidence * (peers - 1) / peers + if t.confidence < rttMinConfidence { + t.confidence = rttMinConfidence + } + t.log.Debug("Relaxed msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout()) +} + +// Capacity is a helper function to access a specific tracker without having to +// track it explicitly outside. +func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) float64 { + t.lock.RLock() + defer t.lock.RUnlock() + + tracker := t.trackers[id] + if tracker == nil { + return 1 // Unregister race, don't return 0, it's a dangerous number + } + return tracker.Capacity(kind, targetRTT) +} + +// Update is a helper function to access a specific tracker without having to +// track it explicitly outside. +func (t *Trackers) Update(id string, kind uint64, elapsed time.Duration, items int) { + t.lock.RLock() + defer t.lock.RUnlock() + + if tracker := t.trackers[id]; tracker != nil { + tracker.Update(kind, elapsed, items) + } +}