diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a9ec73e9f..9e7ea947f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -328,7 +328,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode return err } - if errors.Is(err, errInvalidChain) { + if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) || + errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) || + errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) { log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. @@ -339,22 +341,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode } return err } - - switch err { - case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer, - errEmptyHeaderSet, errPeersUnavailable, errTooOld, - errInvalidAncestor: - log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) - if d.dropPeer == nil { - // The dropPeer method is nil when `--copydb` is used for a local copy. - // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored - log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) - } else { - d.dropPeer(id) - } - default: - log.Warn("Synchronisation failed, retrying", "err", err) - } + log.Warn("Synchronisation failed, retrying", "err", err) return err } @@ -643,7 +630,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { headers := packet.(*headerPack).headers if len(headers) != 1 { p.log.Debug("Multiple headers for single request", "headers", len(headers)) - return nil, errBadPeer + return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) } head := headers[0] if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint { @@ -876,7 +863,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) headers := packer.(*headerPack).headers if len(headers) != 1 { p.log.Debug("Multiple headers for single request", "headers", len(headers)) - return 0, errBadPeer + return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) } arrived = true @@ -900,7 +887,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) - return 0, errBadPeer + return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number) } start = check hash = h @@ -1092,7 +1079,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) case d.headerProcCh <- nil: case <-d.cancelCh: } - return errBadPeer + return fmt.Errorf("%w: header request timed out", errBadPeer) } } } @@ -1520,7 +1507,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er inserts := d.queue.Schedule(chunk, origin) if len(inserts) != len(chunk) { log.Debug("Stale headers") - return errBadPeer + return fmt.Errorf("%w: stale headers", errBadPeer) } } headers = headers[limit:] diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index f875b3a84..b022617bb 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -63,6 +63,10 @@ func (d *Downloader) syncState(root common.Hash) *stateSync { s := newStateSync(d, root) select { case d.stateSyncStart <- s: + // If we tell the statesync to restart with a new root, we also need + // to wait for it to actually also start -- when old requests have timed + // out or been delivered + <-s.started case <-d.quitCh: s.err = errCancelStateFetch close(s.done) @@ -95,15 +99,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { finished []*stateReq // Completed or failed requests timeout = make(chan *stateReq) // Timed out active requests ) - defer func() { - // Cancel active request timers on exit. Also set peers to idle so they're - // available for the next sync. - for _, req := range active { - req.timer.Stop() - req.peer.SetNodeDataIdle(len(req.items)) - } - }() + // Run the state sync. + log.Trace("State sync starting", "root", s.root) go s.run() defer s.Cancel() @@ -126,9 +124,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { select { // The stateSync lifecycle: case next := <-d.stateSyncStart: + d.spindownStateSync(active, finished, timeout, peerDrop) return next case <-s.done: + d.spindownStateSync(active, finished, timeout, peerDrop) return nil // Send the next finished request to the current sync: @@ -189,11 +189,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { // causes valid requests to go missing and sync to get stuck. if old := active[req.peer.id]; old != nil { log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) - - // Make sure the previous one doesn't get siletly lost + // Move the previous request to the finished set old.timer.Stop() old.dropped = true - finished = append(finished, old) } // Start a timer to notify the sync loop if the peer stalled. @@ -210,6 +208,46 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { } } +// spindownStateSync 'drains' the outstanding requests; some will be delivered and other +// will time out. This is to ensure that when the next stateSync starts working, all peers +// are marked as idle and de facto _are_ idle. +func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) { + log.Trace("State sync spinning down", "active", len(active), "finished", len(finished)) + + for len(active) > 0 { + var ( + req *stateReq + reason string + ) + select { + // Handle (drop) incoming state packs: + case pack := <-d.stateCh: + req = active[pack.PeerId()] + reason = "delivered" + // Handle dropped peer connections: + case p := <-peerDrop: + req = active[p.id] + reason = "peerdrop" + // Handle timed-out requests: + case req = <-timeout: + reason = "timeout" + } + if req == nil { + continue + } + req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason) + req.timer.Stop() + delete(active, req.peer.id) + req.peer.SetNodeDataIdle(len(req.items)) + } + // The 'finished' set contains deliveries that we were going to pass to processing. + // Those are now moot, but we still need to set those peers as idle, which would + // otherwise have been done after processing + for _, req := range finished { + req.peer.SetNodeDataIdle(len(req.items)) + } +} + // stateSync schedules requests for downloading a particular state trie defined // by a given state root. type stateSync struct { @@ -222,11 +260,15 @@ type stateSync struct { numUncommitted int bytesUncommitted int + started chan struct{} // Started is signalled once the sync loop starts + deliver chan *stateReq // Delivery channel multiplexing peer responses cancel chan struct{} // Channel to signal a termination request cancelOnce sync.Once // Ensures cancel only ever gets called once done chan struct{} // Channel to signal termination completion err error // Any error hit during sync (set before completion) + + root common.Hash } // stateTask represents a single trie node download task, containing a set of @@ -246,6 +288,8 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync { deliver: make(chan *stateReq), cancel: make(chan struct{}), done: make(chan struct{}), + started: make(chan struct{}), + root: root, } } @@ -276,6 +320,7 @@ func (s *stateSync) Cancel() error { // pushed here async. The reason is to decouple processing from data receipt // and timeouts. func (s *stateSync) loop() (err error) { + close(s.started) // Listen for new peer events to assign tasks to them newPeer := make(chan *peerConnection, 1024) peerSub := s.d.peers.SubscribeNewPeers(newPeer) @@ -331,11 +376,11 @@ func (s *stateSync) loop() (err error) { } // Process all the received blobs and check for stale delivery delivered, err := s.process(req) + req.peer.SetNodeDataIdle(delivered) if err != nil { log.Warn("Node data write error", "err", err) return err } - req.peer.SetNodeDataIdle(delivered) } } return nil @@ -372,7 +417,7 @@ func (s *stateSync) assignTasks() { // If the peer was assigned tasks to fetch, send the network request if len(req.items) > 0 { - req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root) select { case s.d.trackStateReq <- req: req.peer.FetchNodeData(req.items)