diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 437b0caab..82b21c470 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -792,10 +792,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Account range request timed out") - select { - case s.accountReqFails <- req: - default: - } + s.scheduleRevertAccountRequest(req) }) s.accountReqs[reqid] = req delete(s.accountIdlers, idle) @@ -807,12 +804,8 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil { peer.Log().Debug("Failed to request account range", "err", err) - select { - case s.accountReqFails <- req: - default: - } + s.scheduleRevertAccountRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists // Inject the request into the task to block further assignments @@ -886,10 +879,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Bytecode request timed out") - select { - case s.bytecodeReqFails <- req: - default: - } + s.scheduleRevertBytecodeRequest(req) }) s.bytecodeReqs[reqid] = req delete(s.bytecodeIdlers, idle) @@ -901,12 +891,8 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil { log.Debug("Failed to request bytecodes", "err", err) - select { - case s.bytecodeReqFails <- req: - default: - } + s.scheduleRevertBytecodeRequest(req) } - // Request successfully sent, start a }(s.peers[idle]) // We're in the lock, peers[id] surely exists } } @@ -1018,10 +1004,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Storage request timed out") - select { - case s.storageReqFails <- req: - default: - } + s.scheduleRevertStorageRequest(req) }) s.storageReqs[reqid] = req delete(s.storageIdlers, idle) @@ -1037,12 +1020,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { } if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil { log.Debug("Failed to request storage", "err", err) - select { - case s.storageReqFails <- req: - default: - } + s.scheduleRevertStorageRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists // Inject the request into the subtask to block further assignments @@ -1140,10 +1119,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Trienode heal request timed out") - select { - case s.trienodeHealReqFails <- req: - default: - } + s.scheduleRevertTrienodeHealRequest(req) }) s.trienodeHealReqs[reqid] = req delete(s.trienodeHealIdlers, idle) @@ -1155,12 +1131,8 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil { log.Debug("Failed to request trienode healers", "err", err) - select { - case s.trienodeHealReqFails <- req: - default: - } + s.scheduleRevertTrienodeHealRequest(req) } - // Request successfully sent, start a }(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists } } @@ -1245,10 +1217,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { } req.timeout = time.AfterFunc(requestTimeout, func() { log.Debug("Bytecode heal request timed out") - select { - case s.bytecodeHealReqFails <- req: - default: - } + s.scheduleRevertBytecodeHealRequest(req) }) s.bytecodeHealReqs[reqid] = req delete(s.bytecodeHealIdlers, idle) @@ -1260,12 +1229,8 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { // Attempt to send the remote request and revert if it fails if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil { log.Debug("Failed to request bytecode healers", "err", err) - select { - case s.bytecodeHealReqFails <- req: - default: - } + s.scheduleRevertBytecodeHealRequest(req) } - // Request successfully sent, start a }(s.peers[idle]) // We're in the lock, peers[id] surely exists } } @@ -1325,10 +1290,26 @@ func (s *Syncer) revertRequests(peer string) { } } +// scheduleRevertAccountRequest asks the event loop to clean up an account range +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) { + select { + case s.accountReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + // revertAccountRequest cleans up an account range request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertAccountRequest. func (s *Syncer) revertAccountRequest(req *accountRequest) { - log.Trace("Reverting account request", "peer", req.peer, "reqid", req.id) + log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id) select { case <-req.stale: log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id) @@ -1350,10 +1331,26 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) { } } -// revertBytecodeRequest cleans up an bytecode request and returns all failed +// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request +// and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) { + select { + case s.bytecodeReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertBytecodeRequest cleans up a bytecode request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertBytecodeRequest. func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { - log.Trace("Reverting bytecode request", "peer", req.peer) + log.Debug("Reverting bytecode request", "peer", req.peer) select { case <-req.stale: log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id) @@ -1375,10 +1372,26 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { } } +// scheduleRevertStorageRequest asks the event loop to clean up a storage range +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) { + select { + case s.storageReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + // revertStorageRequest cleans up a storage range request and returns all failed // retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertStorageRequest. func (s *Syncer) revertStorageRequest(req *storageRequest) { - log.Trace("Reverting storage request", "peer", req.peer) + log.Debug("Reverting storage request", "peer", req.peer) select { case <-req.stale: log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id) @@ -1404,10 +1417,26 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) { } } -// revertTrienodeHealRequest cleans up an trienode heal request and returns all +// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) { + select { + case s.trienodeHealReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertTrienodeHealRequest cleans up a trienode heal request and returns all // failed retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertTrienodeHealRequest. func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { - log.Trace("Reverting trienode heal request", "peer", req.peer) + log.Debug("Reverting trienode heal request", "peer", req.peer) select { case <-req.stale: log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id) @@ -1429,10 +1458,26 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { } } -// revertBytecodeHealRequest cleans up an bytecode request and returns all failed -// retrieval tasks to the scheduler for reassignment. +// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal +// request and return all failed retrieval tasks to the scheduler for reassignment. +func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) { + select { + case s.bytecodeHealReqFails <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertBytecodeHealRequest cleans up a bytecode heal request and returns all +// failed retrieval tasks to the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertBytecodeHealRequest. func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) { - log.Trace("Reverting bytecode heal request", "peer", req.peer) + log.Debug("Reverting bytecode heal request", "peer", req.peer) select { case <-req.stale: log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id) @@ -1768,7 +1813,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } - log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize())) + log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) } // processBytecodeHealResponse integrates an already validated bytecode response @@ -1804,7 +1849,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } - log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize())) + log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize())) } // forwardAccountTask takes a filled account task and persists anything available @@ -1940,6 +1985,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account logger.Debug("Peer rejected account range request", "root", s.root) s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + + // Signal this request as failed, and ready for rescheduling + s.scheduleRevertAccountRequest(req) return nil } root := s.root @@ -2055,6 +2103,9 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { logger.Debug("Peer rejected bytecode request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + + // Signal this request as failed, and ready for rescheduling + s.scheduleRevertBytecodeRequest(req) return nil } s.lock.Unlock() @@ -2166,6 +2217,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots logger.Debug("Peer rejected storage request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + + // Signal this request as failed, and ready for rescheduling + s.scheduleRevertStorageRequest(req) return nil } s.lock.Unlock() @@ -2287,6 +2341,9 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error { logger.Debug("Peer rejected trienode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + + // Signal this request as failed, and ready for rescheduling + s.scheduleRevertTrienodeHealRequest(req) return nil } s.lock.Unlock() @@ -2371,6 +2428,9 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro logger.Debug("Peer rejected bytecode heal request") s.statelessPeers[peer.id] = struct{}{} s.lock.Unlock() + + // Signal this request as failed, and ready for rescheduling + s.scheduleRevertBytecodeHealRequest(req) return nil } s.lock.Unlock()