* Fix some typos * Fix some mistakes * Revert 4byte.json * Fix an incorrect fix * Change files to fails
		
			
				
	
	
		
			382 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			382 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2021 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package downloader
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"sort"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/common"
 | |
| 	"github.com/ethereum/go-ethereum/common/prque"
 | |
| 	"github.com/ethereum/go-ethereum/eth/protocols/eth"
 | |
| 	"github.com/ethereum/go-ethereum/log"
 | |
| )
 | |
| 
 | |
| // timeoutGracePeriod is the amount of time to allow for a peer to deliver a
 | |
| // response to a locally already timed out request. Timeouts are not penalized
 | |
| // as a peer might be temporarily overloaded, however, they still must reply
 | |
| // to each request. Failing to do so is considered a protocol violation.
 | |
| var timeoutGracePeriod = 2 * time.Minute
 | |
| 
 | |
| // typedQueue is an interface defining the adaptor needed to translate the type
 | |
| // specific downloader/queue schedulers into the type-agnostic general concurrent
 | |
| // fetcher algorithm calls.
 | |
| type typedQueue interface {
 | |
| 	// waker returns a notification channel that gets pinged in case more fetches
 | |
| 	// have been queued up, so the fetcher might assign it to idle peers.
 | |
| 	waker() chan bool
 | |
| 
 | |
| 	// pending returns the number of wrapped items that are currently queued for
 | |
| 	// fetching by the concurrent downloader.
 | |
| 	pending() int
 | |
| 
 | |
| 	// capacity is responsible for calculating how many items of the abstracted
 | |
| 	// type a particular peer is estimated to be able to retrieve within the
 | |
| 	// allotted round trip time.
 | |
| 	capacity(peer *peerConnection, rtt time.Duration) int
 | |
| 
 | |
| 	// updateCapacity is responsible for updating how many items of the abstracted
 | |
| 	// type a particular peer is estimated to be able to retrieve in a unit time.
 | |
| 	updateCapacity(peer *peerConnection, items int, elapsed time.Duration)
 | |
| 
 | |
| 	// reserve is responsible for allocating a requested number of pending items
 | |
| 	// from the download queue to the specified peer.
 | |
| 	reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool)
 | |
| 
 | |
| 	// unreserve is responsible for removing the current retrieval allocation
 | |
| 	// assigned to a specific peer and placing it back into the pool to allow
 | |
| 	// reassigning to some other peer.
 | |
| 	unreserve(peer string) int
 | |
| 
 | |
| 	// request is responsible for converting a generic fetch request into a typed
 | |
| 	// one and sending it to the remote peer for fulfillment.
 | |
| 	request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error)
 | |
| 
 | |
| 	// deliver is responsible for taking a generic response packet from the
 | |
| 	// concurrent fetcher, unpacking the type specific data and delivering
 | |
| 	// it to the downloader's queue.
 | |
| 	deliver(peer *peerConnection, packet *eth.Response) (int, error)
 | |
| }
 | |
| 
 | |
| // concurrentFetch iteratively downloads scheduled block parts, taking available
 | |
| // peers, reserving a chunk of fetch requests for each and waiting for delivery
 | |
| // or timeouts.
 | |
| func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
 | |
| 	// Create a delivery channel to accept responses from all peers
 | |
| 	responses := make(chan *eth.Response)
 | |
| 
 | |
| 	// Track the currently active requests and their timeout order
 | |
| 	pending := make(map[string]*eth.Request)
 | |
| 	defer func() {
 | |
| 		// Abort all requests on sync cycle cancellation. The requests may still
 | |
| 		// be fulfilled by the remote side, but the dispatcher will not wait to
 | |
| 		// deliver them since nobody's going to be listening.
 | |
| 		for _, req := range pending {
 | |
| 			req.Close()
 | |
| 		}
 | |
| 	}()
 | |
| 	ordering := make(map[*eth.Request]int)
 | |
| 	timeouts := prque.New(func(data interface{}, index int) {
 | |
| 		ordering[data.(*eth.Request)] = index
 | |
| 	})
 | |
| 
 | |
| 	timeout := time.NewTimer(0)
 | |
| 	if !timeout.Stop() {
 | |
| 		<-timeout.C
 | |
| 	}
 | |
| 	defer timeout.Stop()
 | |
| 
 | |
| 	// Track the timed-out but not-yet-answered requests separately. We want to
 | |
| 	// keep tracking which peers are busy (potentially overloaded), so removing
 | |
| 	// all trace of a timed out request is not good. We also can't just cancel
 | |
| 	// the pending request altogether as that would prevent a late response from
 | |
| 	// being delivered, thus never unblocking the peer.
 | |
| 	stales := make(map[string]*eth.Request)
 | |
| 	defer func() {
 | |
| 		// Abort all requests on sync cycle cancellation. The requests may still
 | |
| 		// be fulfilled by the remote side, but the dispatcher will not wait to
 | |
| 		// deliver them since nobody's going to be listening.
 | |
| 		for _, req := range stales {
 | |
| 			req.Close()
 | |
| 		}
 | |
| 	}()
 | |
| 	// Subscribe to peer lifecycle events to schedule tasks to new joiners and
 | |
| 	// reschedule tasks upon disconnections. We don't care which event happened
 | |
| 	// for simplicity, so just use a single channel.
 | |
| 	peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
 | |
| 
 | |
| 	peeringSub := d.peers.SubscribeEvents(peering)
 | |
| 	defer peeringSub.Unsubscribe()
 | |
| 
 | |
| 	// Prepare the queue and fetch block parts until the block header fetcher's done
 | |
| 	finished := false
 | |
| 	for {
 | |
| 		// Short circuit if we lost all our peers
 | |
| 		if d.peers.Len() == 0 && !beaconMode {
 | |
| 			return errNoPeers
 | |
| 		}
 | |
| 		// If there's nothing more to fetch, wait or terminate
 | |
| 		if queue.pending() == 0 {
 | |
| 			if len(pending) == 0 && finished {
 | |
| 				return nil
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Send a download request to all idle peers, until throttled
 | |
| 			var (
 | |
| 				idles []*peerConnection
 | |
| 				caps  []int
 | |
| 			)
 | |
| 			for _, peer := range d.peers.AllPeers() {
 | |
| 				pending, stale := pending[peer.id], stales[peer.id]
 | |
| 				if pending == nil && stale == nil {
 | |
| 					idles = append(idles, peer)
 | |
| 					caps = append(caps, queue.capacity(peer, time.Second))
 | |
| 				} else if stale != nil {
 | |
| 					if waited := time.Since(stale.Sent); waited > timeoutGracePeriod {
 | |
| 						// Request has been in flight longer than the grace period
 | |
| 						// permitted it, consider the peer malicious attempting to
 | |
| 						// stall the sync.
 | |
| 						peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
 | |
| 						d.dropPeer(peer.id)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			sort.Sort(&peerCapacitySort{idles, caps})
 | |
| 
 | |
| 			var (
 | |
| 				progressed bool
 | |
| 				throttled  bool
 | |
| 				queued     = queue.pending()
 | |
| 			)
 | |
| 			for _, peer := range idles {
 | |
| 				// Short circuit if throttling activated or there are no more
 | |
| 				// queued tasks to be retrieved
 | |
| 				if throttled {
 | |
| 					break
 | |
| 				}
 | |
| 				if queued = queue.pending(); queued == 0 {
 | |
| 					break
 | |
| 				}
 | |
| 				// Reserve a chunk of fetches for a peer. A nil can mean either that
 | |
| 				// no more headers are available, or that the peer is known not to
 | |
| 				// have them.
 | |
| 				request, progress, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip()))
 | |
| 				if progress {
 | |
| 					progressed = true
 | |
| 				}
 | |
| 				if throttle {
 | |
| 					throttled = true
 | |
| 					throttleCounter.Inc(1)
 | |
| 				}
 | |
| 				if request == nil {
 | |
| 					continue
 | |
| 				}
 | |
| 				// Fetch the chunk and make sure any errors return the hashes to the queue
 | |
| 				req, err := queue.request(peer, request, responses)
 | |
| 				if err != nil {
 | |
| 					// Sending the request failed, which generally means the peer
 | |
| 					// was disconnected in between assignment and network send.
 | |
| 					// Although all peer removal operations return allocated tasks
 | |
| 					// to the queue, that is async, and we can do better here by
 | |
| 					// immediately pushing the unfulfilled requests.
 | |
| 					queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
 | |
| 					continue
 | |
| 				}
 | |
| 				pending[peer.id] = req
 | |
| 
 | |
| 				ttl := d.peers.rates.TargetTimeout()
 | |
| 				ordering[req] = timeouts.Size()
 | |
| 
 | |
| 				timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
 | |
| 				if timeouts.Size() == 1 {
 | |
| 					timeout.Reset(ttl)
 | |
| 				}
 | |
| 			}
 | |
| 			// Make sure that we have peers available for fetching. If all peers have been tried
 | |
| 			// and all failed throw an error
 | |
| 			if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
 | |
| 				return errPeersUnavailable
 | |
| 			}
 | |
| 		}
 | |
| 		// Wait for something to happen
 | |
| 		select {
 | |
| 		case <-d.cancelCh:
 | |
| 			// If sync was cancelled, tear down the parallel retriever. Pending
 | |
| 			// requests will be cancelled locally, and the remote responses will
 | |
| 			// be dropped when they arrive
 | |
| 			return errCanceled
 | |
| 
 | |
| 		case event := <-peering:
 | |
| 			// A peer joined or left, the tasks queue and allocations need to be
 | |
| 			// checked for potential assignment or reassignment
 | |
| 			peerid := event.peer.id
 | |
| 
 | |
| 			if event.join {
 | |
| 				// Sanity check the internal state; this can be dropped later
 | |
| 				if _, ok := pending[peerid]; ok {
 | |
| 					event.peer.log.Error("Pending request exists for joining peer")
 | |
| 				}
 | |
| 				if _, ok := stales[peerid]; ok {
 | |
| 					event.peer.log.Error("Stale request exists for joining peer")
 | |
| 				}
 | |
| 				// Loop back to the entry point for task assignment
 | |
| 				continue
 | |
| 			}
 | |
| 			// A peer left, any existing requests need to be untracked, pending
 | |
| 			// tasks returned and possible reassignment checked
 | |
| 			if req, ok := pending[peerid]; ok {
 | |
| 				queue.unreserve(peerid) // TODO(karalabe): This needs a non-expiration method
 | |
| 				delete(pending, peerid)
 | |
| 				req.Close()
 | |
| 
 | |
| 				if index, live := ordering[req]; live {
 | |
| 					timeouts.Remove(index)
 | |
| 					if index == 0 {
 | |
| 						if !timeout.Stop() {
 | |
| 							<-timeout.C
 | |
| 						}
 | |
| 						if timeouts.Size() > 0 {
 | |
| 							_, exp := timeouts.Peek()
 | |
| 							timeout.Reset(time.Until(time.Unix(0, -exp)))
 | |
| 						}
 | |
| 					}
 | |
| 					delete(ordering, req)
 | |
| 				}
 | |
| 			}
 | |
| 			if req, ok := stales[peerid]; ok {
 | |
| 				delete(stales, peerid)
 | |
| 				req.Close()
 | |
| 			}
 | |
| 
 | |
| 		case <-timeout.C:
 | |
| 			// Retrieve the next request which should have timed out. The check
 | |
| 			// below is purely for to catch programming errors, given the correct
 | |
| 			// code, there's no possible order of events that should result in a
 | |
| 			// timeout firing for a non-existent event.
 | |
| 			item, exp := timeouts.Peek()
 | |
| 			if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
 | |
| 				log.Error("Timeout triggered but not reached", "left", at.Sub(now))
 | |
| 				timeout.Reset(at.Sub(now))
 | |
| 				continue
 | |
| 			}
 | |
| 			req := item.(*eth.Request)
 | |
| 
 | |
| 			// Stop tracking the timed out request from a timing perspective,
 | |
| 			// cancel it, so it's not considered in-flight anymore, but keep
 | |
| 			// the peer marked busy to prevent assigning a second request and
 | |
| 			// overloading it further.
 | |
| 			delete(pending, req.Peer)
 | |
| 			stales[req.Peer] = req
 | |
| 			delete(ordering, req)
 | |
| 
 | |
| 			timeouts.Pop()
 | |
| 			if timeouts.Size() > 0 {
 | |
| 				_, exp := timeouts.Peek()
 | |
| 				timeout.Reset(time.Until(time.Unix(0, -exp)))
 | |
| 			}
 | |
| 			// New timeout potentially set if there are more requests pending,
 | |
| 			// reschedule the failed one to a free peer
 | |
| 			fails := queue.unreserve(req.Peer)
 | |
| 
 | |
| 			// Finally, update the peer's retrieval capacity, or if it's already
 | |
| 			// below the minimum allowance, drop the peer. If a lot of retrieval
 | |
| 			// elements expired, we might have overestimated the remote peer or
 | |
| 			// perhaps ourselves. Only reset to minimal throughput but don't drop
 | |
| 			// just yet.
 | |
| 			//
 | |
| 			// The reason the minimum threshold is 2 is that the downloader tries
 | |
| 			// to estimate the bandwidth and latency of a peer separately, which
 | |
| 			// requires pushing the measured capacity a bit and seeing how response
 | |
| 			// times reacts, to it always requests one more than the minimum (i.e.
 | |
| 			// min 2).
 | |
| 			peer := d.peers.Peer(req.Peer)
 | |
| 			if peer == nil {
 | |
| 				// If the peer got disconnected in between, we should really have
 | |
| 				// short-circuited it already. Just in case there's some strange
 | |
| 				// codepath, leave this check in not to crash.
 | |
| 				log.Error("Delivery timeout from unknown peer", "peer", req.Peer)
 | |
| 				continue
 | |
| 			}
 | |
| 			if fails > 2 {
 | |
| 				queue.updateCapacity(peer, 0, 0)
 | |
| 			} else {
 | |
| 				d.dropPeer(peer.id)
 | |
| 
 | |
| 				// If this peer was the master peer, abort sync immediately
 | |
| 				d.cancelLock.RLock()
 | |
| 				master := peer.id == d.cancelPeer
 | |
| 				d.cancelLock.RUnlock()
 | |
| 
 | |
| 				if master {
 | |
| 					d.cancel()
 | |
| 					return errTimeout
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 		case res := <-responses:
 | |
| 			// Response arrived, it may be for an existing or an already timed
 | |
| 			// out request. If the former, update the timeout heap and perhaps
 | |
| 			// reschedule the timeout timer.
 | |
| 			index, live := ordering[res.Req]
 | |
| 			if live {
 | |
| 				timeouts.Remove(index)
 | |
| 				if index == 0 {
 | |
| 					if !timeout.Stop() {
 | |
| 						<-timeout.C
 | |
| 					}
 | |
| 					if timeouts.Size() > 0 {
 | |
| 						_, exp := timeouts.Peek()
 | |
| 						timeout.Reset(time.Until(time.Unix(0, -exp)))
 | |
| 					}
 | |
| 				}
 | |
| 				delete(ordering, res.Req)
 | |
| 			}
 | |
| 			// Delete the pending request (if it still exists) and mark the peer idle
 | |
| 			delete(pending, res.Req.Peer)
 | |
| 			delete(stales, res.Req.Peer)
 | |
| 
 | |
| 			// Signal the dispatcher that the round trip is done. We'll drop the
 | |
| 			// peer if the data turns out to be junk.
 | |
| 			res.Done <- nil
 | |
| 			res.Req.Close()
 | |
| 
 | |
| 			// If the peer was previously banned and failed to deliver its pack
 | |
| 			// in a reasonable time frame, ignore its message.
 | |
| 			if peer := d.peers.Peer(res.Req.Peer); peer != nil {
 | |
| 				// Deliver the received chunk of data and check chain validity
 | |
| 				accepted, err := queue.deliver(peer, res)
 | |
| 				if errors.Is(err, errInvalidChain) {
 | |
| 					return err
 | |
| 				}
 | |
| 				// Unless a peer delivered something completely else than requested (usually
 | |
| 				// caused by a timed out request which came through in the end), set it to
 | |
| 				// idle. If the delivery's stale, the peer should have already been idled.
 | |
| 				if !errors.Is(err, errStaleDelivery) {
 | |
| 					queue.updateCapacity(peer, accepted, res.Time)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 		case cont := <-queue.waker():
 | |
| 			// The header fetcher sent a continuation flag, check if it's done
 | |
| 			if !cont {
 | |
| 				finished = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 |