eth/protocols/snap: snap sync testing (#22179)

* eth/protocols/snap: make timeout configurable

* eth/protocols/snap: snap sync testing

* eth/protocols/snap: test to trigger panic

* eth/protocols/snap: fix race condition on timeouts

* eth/protocols/snap: return error on cancelled sync

* squashme: updates + test causing panic + properly serve accounts in order

* eth/protocols/snap: revert failing storage response

* eth/protocols/snap: revert on bad responses (storage, code)

* eth/protocols/snap: fix account handling stall

* eth/protocols/snap: fix remaining revertal-issues

* eth/protocols/snap: timeouthandler for bytecode requests

* eth/protocols/snap: debugging + fix log message

* eth/protocols/snap: fix misspelliings in docs

* eth/protocols/snap: fix race in bytecode handling

* eth/protocols/snap: undo deduplication of storage roots

* synctests: refactor + minify panic testcase

* eth/protocols/snap: minor polishes

* eth: minor polishes to make logs more useful

* eth/protocols/snap: remove excessive logs from the test runs

* eth/protocols/snap: stress tests with concurrency

* eth/protocols/snap: further fixes to test cancel channel handling

* eth/protocols/snap: extend test timeouts on CI

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
Martin Holst Swende 2021-01-25 07:17:05 +01:00 committed by GitHub
parent 3708454f58
commit 797b0812ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1251 additions and 150 deletions

View File

@ -298,7 +298,7 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
// Tests use short IDs, don't choke on them // Tests use short IDs, don't choke on them
logger = log.New("peer", id) logger = log.New("peer", id)
} else { } else {
logger = log.New("peer", id[:16]) logger = log.New("peer", id[:8])
} }
logger.Trace("Registering sync peer") logger.Trace("Registering sync peer")
if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
@ -325,7 +325,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
// Tests use short IDs, don't choke on them // Tests use short IDs, don't choke on them
logger = log.New("peer", id) logger = log.New("peer", id)
} else { } else {
logger = log.New("peer", id[:16]) logger = log.New("peer", id[:8])
} }
logger.Trace("Unregistering sync peer") logger.Trace("Unregistering sync peer")
if err := d.peers.Unregister(id); err != nil { if err := d.peers.Unregister(id); err != nil {

View File

@ -326,24 +326,32 @@ func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
} }
func (h *handler) removePeer(id string) { func (h *handler) removePeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:8])
}
// Remove the eth peer if it exists // Remove the eth peer if it exists
eth := h.peers.ethPeer(id) eth := h.peers.ethPeer(id)
if eth != nil { if eth != nil {
log.Debug("Removing Ethereum peer", "peer", id) logger.Debug("Removing Ethereum peer")
h.downloader.UnregisterPeer(id) h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id) h.txFetcher.Drop(id)
if err := h.peers.unregisterEthPeer(id); err != nil { if err := h.peers.unregisterEthPeer(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err) logger.Error("Ethereum peer removal failed", "err", err)
} }
} }
// Remove the snap peer if it exists // Remove the snap peer if it exists
snap := h.peers.snapPeer(id) snap := h.peers.snapPeer(id)
if snap != nil { if snap != nil {
log.Debug("Removing Snapshot peer", "peer", id) logger.Debug("Removing Snapshot peer")
h.downloader.SnapSyncer.Unregister(id) h.downloader.SnapSyncer.Unregister(id)
if err := h.peers.unregisterSnapPeer(id); err != nil { if err := h.peers.unregisterSnapPeer(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err) logger.Error("Snapshot peer removel failed", "err", err)
} }
} }
// Hard disconnect at the networking layer // Hard disconnect at the networking layer

View File

@ -56,6 +56,11 @@ func (p *Peer) Version() uint {
return p.version return p.version
} }
// Log overrides the P2P logget with the higher level one containing only the id.
func (p *Peer) Log() log.Logger {
return p.logger
}
// RequestAccountRange fetches a batch of accounts rooted in a specific account // RequestAccountRange fetches a batch of accounts rooted in a specific account
// trie, starting with the origin. // trie, starting with the origin.
func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error { func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error {

View File

@ -61,6 +61,7 @@ var (
errDecode = errors.New("invalid message") errDecode = errors.New("invalid message")
errInvalidMsgCode = errors.New("invalid message code") errInvalidMsgCode = errors.New("invalid message code")
errBadRequest = errors.New("bad request") errBadRequest = errors.New("bad request")
errCancelled = errors.New("sync cancelled")
) )
// Packet represents a p2p message in the `snap` protocol. // Packet represents a p2p message in the `snap` protocol.

View File

@ -73,10 +73,6 @@ const (
// waste bandwidth. // waste bandwidth.
maxTrieRequestCount = 512 maxTrieRequestCount = 512
// requestTimeout is the maximum time a peer is allowed to spend on serving
// a single network request.
requestTimeout = 10 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
// accountConcurrency is the number of chunks to split the account trie into // accountConcurrency is the number of chunks to split the account trie into
// to allow concurrent retrievals. // to allow concurrent retrievals.
accountConcurrency = 16 accountConcurrency = 16
@ -86,6 +82,12 @@ const (
storageConcurrency = 16 storageConcurrency = 16
) )
var (
// requestTimeout is the maximum time a peer is allowed to spend on serving
// a single network request.
requestTimeout = 10 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
)
// accountRequest tracks a pending account range request to ensure responses are // accountRequest tracks a pending account range request to ensure responses are
// to actual requests and to validate any security constraints. // to actual requests and to validate any security constraints.
// //
@ -331,6 +333,33 @@ type syncProgress struct {
BytecodeHealNops uint64 // Number of bytecodes not requested BytecodeHealNops uint64 // Number of bytecodes not requested
} }
// SyncPeer abstracts out the methods required for a peer to be synced against
// with the goal of allowing the construction of mock peers without the full
// blown networking.
type SyncPeer interface {
// ID retrieves the peer's unique identifier.
ID() string
// RequestAccountRange fetches a batch of accounts rooted in a specific account
// trie, starting with the origin.
RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error
// RequestStorageRange fetches a batch of storage slots belonging to one or
// more accounts. If slots from only one accout is requested, an origin marker
// may also be used to retrieve from there.
RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error
// RequestByteCodes fetches a batch of bytecodes by hash.
RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error
// RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
// a specificstate trie.
RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error
// Log retrieves the peer's own contextual logger.
Log() log.Logger
}
// Syncer is an Ethereum account and storage trie syncer based on snapshots and // Syncer is an Ethereum account and storage trie syncer based on snapshots and
// the snap protocol. It's purpose is to download all the accounts and storage // the snap protocol. It's purpose is to download all the accounts and storage
// slots from remote peers and reassemble chunks of the state trie, on top of // slots from remote peers and reassemble chunks of the state trie, on top of
@ -346,14 +375,15 @@ type Syncer struct {
db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
bloom *trie.SyncBloom // Bloom filter to deduplicate nodes for state fixup bloom *trie.SyncBloom // Bloom filter to deduplicate nodes for state fixup
root common.Hash // Current state trie root being synced root common.Hash // Current state trie root being synced
tasks []*accountTask // Current account task set being synced tasks []*accountTask // Current account task set being synced
healer *healTask // Current state healing task being executed snapped bool // Flag to signal that snap phase is done
update chan struct{} // Notification channel for possible sync progression healer *healTask // Current state healing task being executed
update chan struct{} // Notification channel for possible sync progression
peers map[string]*Peer // Currently active peers to download from peers map[string]SyncPeer // Currently active peers to download from
peerJoin *event.Feed // Event feed to react to peers joining peerJoin *event.Feed // Event feed to react to peers joining
peerDrop *event.Feed // Event feed to react to peers dropping peerDrop *event.Feed // Event feed to react to peers dropping
// Request tracking during syncing phase // Request tracking during syncing phase
statelessPeers map[string]struct{} // Peers that failed to deliver state data statelessPeers map[string]struct{} // Peers that failed to deliver state data
@ -410,12 +440,14 @@ type Syncer struct {
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
} }
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol.
func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer { func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
return &Syncer{ return &Syncer{
db: db, db: db,
bloom: bloom, bloom: bloom,
peers: make(map[string]*Peer), peers: make(map[string]SyncPeer),
peerJoin: new(event.Feed), peerJoin: new(event.Feed),
peerDrop: new(event.Feed), peerDrop: new(event.Feed),
update: make(chan struct{}, 1), update: make(chan struct{}, 1),
@ -447,27 +479,29 @@ func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
} }
// Register injects a new data source into the syncer's peerset. // Register injects a new data source into the syncer's peerset.
func (s *Syncer) Register(peer *Peer) error { func (s *Syncer) Register(peer SyncPeer) error {
// Make sure the peer is not registered yet // Make sure the peer is not registered yet
id := peer.ID()
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[id]; ok {
log.Error("Snap peer already registered", "id", peer.id) log.Error("Snap peer already registered", "id", id)
s.lock.Unlock() s.lock.Unlock()
return errors.New("already registered") return errors.New("already registered")
} }
s.peers[peer.id] = peer s.peers[id] = peer
// Mark the peer as idle, even if no sync is running // Mark the peer as idle, even if no sync is running
s.accountIdlers[peer.id] = struct{}{} s.accountIdlers[id] = struct{}{}
s.storageIdlers[peer.id] = struct{}{} s.storageIdlers[id] = struct{}{}
s.bytecodeIdlers[peer.id] = struct{}{} s.bytecodeIdlers[id] = struct{}{}
s.trienodeHealIdlers[peer.id] = struct{}{} s.trienodeHealIdlers[id] = struct{}{}
s.bytecodeHealIdlers[peer.id] = struct{}{} s.bytecodeHealIdlers[id] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
// Notify any active syncs that a new peer can be assigned data // Notify any active syncs that a new peer can be assigned data
s.peerJoin.Send(peer.id) s.peerJoin.Send(id)
return nil return nil
} }
@ -566,6 +600,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.assignAccountTasks(cancel) s.assignAccountTasks(cancel)
s.assignBytecodeTasks(cancel) s.assignBytecodeTasks(cancel)
s.assignStorageTasks(cancel) s.assignStorageTasks(cancel)
if len(s.tasks) == 0 { if len(s.tasks) == 0 {
// Sync phase done, run heal phase // Sync phase done, run heal phase
s.assignTrienodeHealTasks(cancel) s.assignTrienodeHealTasks(cancel)
@ -580,7 +615,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
case id := <-peerDrop: case id := <-peerDrop:
s.revertRequests(id) s.revertRequests(id)
case <-cancel: case <-cancel:
return nil return errCancelled
case req := <-s.accountReqFails: case req := <-s.accountReqFails:
s.revertAccountRequest(req) s.revertAccountRequest(req)
@ -622,6 +657,7 @@ func (s *Syncer) loadSyncStatus() {
log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last) log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
} }
s.tasks = progress.Tasks s.tasks = progress.Tasks
s.snapped = len(s.tasks) == 0
s.accountSynced = progress.AccountSynced s.accountSynced = progress.AccountSynced
s.accountBytes = progress.AccountBytes s.accountBytes = progress.AccountBytes
@ -701,6 +737,11 @@ func (s *Syncer) cleanAccountTasks() {
i-- i--
} }
} }
if len(s.tasks) == 0 {
s.lock.Lock()
s.snapped = true
s.lock.Unlock()
}
} }
// cleanStorageTasks iterates over all the account tasks and storage sub-tasks // cleanStorageTasks iterates over all the account tasks and storage sub-tasks
@ -798,7 +839,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
delete(s.accountIdlers, idle) delete(s.accountIdlers, idle)
s.pend.Add(1) s.pend.Add(1)
go func(peer *Peer, root common.Hash) { go func(peer SyncPeer, root common.Hash) {
defer s.pend.Done() defer s.pend.Done()
// Attempt to send the remote request and revert if it fails // Attempt to send the remote request and revert if it fails
@ -885,7 +926,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
delete(s.bytecodeIdlers, idle) delete(s.bytecodeIdlers, idle)
s.pend.Add(1) s.pend.Add(1)
go func(peer *Peer) { go func(peer SyncPeer) {
defer s.pend.Done() defer s.pend.Done()
// Attempt to send the remote request and revert if it fails // Attempt to send the remote request and revert if it fails
@ -962,7 +1003,6 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
// Found an incomplete storage chunk, schedule it // Found an incomplete storage chunk, schedule it
accounts = append(accounts, account) accounts = append(accounts, account)
roots = append(roots, st.root) roots = append(roots, st.root)
subtask = st subtask = st
break // Large contract chunks are downloaded individually break // Large contract chunks are downloaded individually
} }
@ -1010,7 +1050,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
delete(s.storageIdlers, idle) delete(s.storageIdlers, idle)
s.pend.Add(1) s.pend.Add(1)
go func(peer *Peer, root common.Hash) { go func(peer SyncPeer, root common.Hash) {
defer s.pend.Done() defer s.pend.Done()
// Attempt to send the remote request and revert if it fails // Attempt to send the remote request and revert if it fails
@ -1125,7 +1165,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
delete(s.trienodeHealIdlers, idle) delete(s.trienodeHealIdlers, idle)
s.pend.Add(1) s.pend.Add(1)
go func(peer *Peer, root common.Hash) { go func(peer SyncPeer, root common.Hash) {
defer s.pend.Done() defer s.pend.Done()
// Attempt to send the remote request and revert if it fails // Attempt to send the remote request and revert if it fails
@ -1223,7 +1263,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
delete(s.bytecodeHealIdlers, idle) delete(s.bytecodeHealIdlers, idle)
s.pend.Add(1) s.pend.Add(1)
go func(peer *Peer) { go func(peer SyncPeer) {
defer s.pend.Done() defer s.pend.Done()
// Attempt to send the remote request and revert if it fails // Attempt to send the remote request and revert if it fails
@ -1522,7 +1562,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
break break
} }
} }
// Itereate over all the accounts and assemble which ones need further sub- // Iterate over all the accounts and assemble which ones need further sub-
// filling before the entire account range can be persisted. // filling before the entire account range can be persisted.
res.task.needCode = make([]bool, len(res.accounts)) res.task.needCode = make([]bool, len(res.accounts))
res.task.needState = make([]bool, len(res.accounts)) res.task.needState = make([]bool, len(res.accounts))
@ -1566,7 +1606,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
} }
} }
// Delete any subtasks that have been aborted but not resumed. This may undo // Delete any subtasks that have been aborted but not resumed. This may undo
// some progress if a newpeer gives us less accounts than an old one, but for // some progress if a new peer gives us less accounts than an old one, but for
// now we have to live with that. // now we have to live with that.
for hash := range res.task.SubTasks { for hash := range res.task.SubTasks {
if _, ok := resumed[hash]; !ok { if _, ok := resumed[hash]; !ok {
@ -1650,94 +1690,91 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
) )
// Iterate over all the accounts and reconstruct their storage tries from the // Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots // delivered slots
delivered := make(map[common.Hash]bool)
for i := 0; i < len(res.hashes); i++ {
delivered[res.roots[i]] = true
}
for i, account := range res.accounts { for i, account := range res.accounts {
// If the account was not delivered, reschedule it // If the account was not delivered, reschedule it
if i >= len(res.hashes) { if i >= len(res.hashes) {
if !delivered[res.roots[i]] { res.mainTask.stateTasks[account] = res.roots[i]
res.mainTask.stateTasks[account] = res.roots[i]
}
continue continue
} }
// State was delivered, if complete mark as not needed any more, otherwise // State was delivered, if complete mark as not needed any more, otherwise
// mark the account as needing healing // mark the account as needing healing
for j, acc := range res.mainTask.res.accounts { for j, hash := range res.mainTask.res.hashes {
if res.roots[i] == acc.Root { if account != hash {
// If the packet contains multiple contract storage slots, all continue
// but the last are surely complete. The last contract may be }
// chunked, so check it's continuation flag. acc := res.mainTask.res.accounts[j]
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
res.mainTask.needHeal[j] = true
}
// If the last contract was chunked, we need to switch to large
// contract handling mode
if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
// If we haven't yet started a large-contract retrieval, create
// the subtasks for it within the main account task
if tasks, ok := res.mainTask.SubTasks[account]; !ok {
var (
next common.Hash
)
step := new(big.Int).Sub(
new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(storageConcurrency),
), common.Big1,
)
for k := 0; k < storageConcurrency; k++ {
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
if k == storageConcurrency-1 {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
tasks = append(tasks, &storageTask{
Next: next,
Last: last,
root: acc.Root,
})
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
}
res.mainTask.SubTasks[account] = tasks
// Since we've just created the sub-tasks, this response // If the packet contains multiple contract storage slots, all
// is surely for the first one (zero origin) // but the last are surely complete. The last contract may be
res.subTask = tasks[0] // chunked, so check it's continuation flag.
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
res.mainTask.needHeal[j] = true
}
// If the last contract was chunked, we need to switch to large
// contract handling mode
if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
// If we haven't yet started a large-contract retrieval, create
// the subtasks for it within the main account task
if tasks, ok := res.mainTask.SubTasks[account]; !ok {
var (
next common.Hash
)
step := new(big.Int).Sub(
new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(storageConcurrency),
), common.Big1,
)
for k := 0; k < storageConcurrency; k++ {
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
if k == storageConcurrency-1 {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
tasks = append(tasks, &storageTask{
Next: next,
Last: last,
root: acc.Root,
})
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
}
res.mainTask.SubTasks[account] = tasks
// Since we've just created the sub-tasks, this response
// is surely for the first one (zero origin)
res.subTask = tasks[0]
}
}
// If we're in large contract delivery mode, forward the subtask
if res.subTask != nil {
// Ensure the response doesn't overflow into the subsequent task
last := res.subTask.Last.Big()
for k, hash := range res.hashes[i] {
if hash.Big().Cmp(last) > 0 {
// Chunk overflown, cut off excess, but also update the boundary
for l := k; l < len(res.hashes[i]); l++ {
if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
panic(err) // Account range was already proven, what happened
}
}
res.hashes[i] = res.hashes[i][:k]
res.slots[i] = res.slots[i][:k]
res.cont = false // Mark range completed
break
} }
} }
// If we're in large contract delivery mode, forward the subtask // Forward the relevant storage chunk (even if created just now)
if res.subTask != nil { if res.cont {
// Ensure the response doesn't overflow into the subsequent task res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
last := res.subTask.Last.Big() } else {
for k, hash := range res.hashes[i] { res.subTask.done = true
if hash.Big().Cmp(last) > 0 {
// Chunk overflown, cut off excess, but also update the boundary
for l := k; l < len(res.hashes[i]); l++ {
if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
panic(err) // Account range was already proven, what happened
}
}
res.hashes[i] = res.hashes[i][:k]
res.slots[i] = res.slots[i][:k]
res.cont = false // Mark range completed
break
}
}
// Forward the relevant storage chunk (even if created just now)
if res.cont {
res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
} else {
res.subTask.done = true
}
} }
} }
} }
@ -1941,7 +1978,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// OnAccounts is a callback method to invoke when a range of accounts are // OnAccounts is a callback method to invoke when a range of accounts are
// received from a remote peer. // received from a remote peer.
func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error { func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
size := common.StorageSize(len(hashes) * common.HashLength) size := common.StorageSize(len(hashes) * common.HashLength)
for _, account := range accounts { for _, account := range accounts {
size += common.StorageSize(len(account)) size += common.StorageSize(len(account))
@ -1949,15 +1986,15 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
for _, node := range proof { for _, node := range proof {
size += common.StorageSize(len(node)) size += common.StorageSize(len(node))
} }
logger := peer.logger.New("reqid", id) logger := peer.Log().New("reqid", id)
logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size) logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size)
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.id] = struct{}{} s.accountIdlers[peer.ID()] = struct{}{}
} }
select { select {
case s.update <- struct{}{}: case s.update <- struct{}{}:
@ -1975,7 +2012,11 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
// Clean up the request timeout timer, we'll see how to proceed further based // Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content // on the actual delivered content
req.timeout.Stop() if !req.timeout.Stop() {
// The timeout is already triggered, and this request will be reverted+rescheduled
s.lock.Unlock()
return nil
}
// Response is valid, but check if peer is signalling that it does not have // Response is valid, but check if peer is signalling that it does not have
// the requested data. For account range queries that means the state being // the requested data. For account range queries that means the state being
@ -1983,7 +2024,7 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
// synced to our head. // synced to our head.
if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 { if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
logger.Debug("Peer rejected account range request", "root", s.root) logger.Debug("Peer rejected account range request", "root", s.root)
s.statelessPeers[peer.id] = struct{}{} s.statelessPeers[peer.ID()] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2011,6 +2052,8 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb) db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
if err != nil { if err != nil {
logger.Warn("Account range failed proof", "err", err) logger.Warn("Account range failed proof", "err", err)
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertAccountRequest(req)
return err return err
} }
// Partial trie reconstructed, send it to the scheduler for storage filling // Partial trie reconstructed, send it to the scheduler for storage filling
@ -2050,9 +2093,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
// OnByteCodes is a callback method to invoke when a batch of contract // OnByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer. // bytes codes are received from a remote peer.
func (s *Syncer) OnByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
s.lock.RLock() s.lock.RLock()
syncing := len(s.tasks) > 0 syncing := !s.snapped
s.lock.RUnlock() s.lock.RUnlock()
if syncing { if syncing {
@ -2063,20 +2106,20 @@ func (s *Syncer) OnByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
// onByteCodes is a callback method to invoke when a batch of contract // onByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer in the syncing phase. // bytes codes are received from a remote peer in the syncing phase.
func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
var size common.StorageSize var size common.StorageSize
for _, code := range bytecodes { for _, code := range bytecodes {
size += common.StorageSize(len(code)) size += common.StorageSize(len(code))
} }
logger := peer.logger.New("reqid", id) logger := peer.Log().New("reqid", id)
logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size) logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size)
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.id] = struct{}{} s.bytecodeIdlers[peer.ID()] = struct{}{}
} }
select { select {
case s.update <- struct{}{}: case s.update <- struct{}{}:
@ -2094,14 +2137,18 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
// Clean up the request timeout timer, we'll see how to proceed further based // Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content // on the actual delivered content
req.timeout.Stop() if !req.timeout.Stop() {
// The timeout is already triggered, and this request will be reverted+rescheduled
s.lock.Unlock()
return nil
}
// Response is valid, but check if peer is signalling that it does not have // Response is valid, but check if peer is signalling that it does not have
// the requested data. For bytecode range queries that means the peer is not // the requested data. For bytecode range queries that means the peer is not
// yet synced. // yet synced.
if len(bytecodes) == 0 { if len(bytecodes) == 0 {
logger.Debug("Peer rejected bytecode request") logger.Debug("Peer rejected bytecode request")
s.statelessPeers[peer.id] = struct{}{} s.statelessPeers[peer.ID()] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2132,6 +2179,8 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
} }
// We've either ran out of hashes, or got unrequested data // We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i) logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i)
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertBytecodeRequest(req)
return errors.New("unexpected bytecode") return errors.New("unexpected bytecode")
} }
// Response validated, send it to the scheduler for filling // Response validated, send it to the scheduler for filling
@ -2150,7 +2199,7 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
// OnStorage is a callback method to invoke when ranges of storage slots // OnStorage is a callback method to invoke when ranges of storage slots
// are received from a remote peer. // are received from a remote peer.
func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error { func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
// Gather some trace stats to aid in debugging issues // Gather some trace stats to aid in debugging issues
var ( var (
hashCount int hashCount int
@ -2170,15 +2219,15 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
for _, node := range proof { for _, node := range proof {
size += common.StorageSize(len(node)) size += common.StorageSize(len(node))
} }
logger := peer.logger.New("reqid", id) logger := peer.Log().New("reqid", id)
logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size) logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size)
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.id] = struct{}{} s.storageIdlers[peer.ID()] = struct{}{}
} }
select { select {
case s.update <- struct{}{}: case s.update <- struct{}{}:
@ -2196,17 +2245,23 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
// Clean up the request timeout timer, we'll see how to proceed further based // Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content // on the actual delivered content
req.timeout.Stop() if !req.timeout.Stop() {
// The timeout is already triggered, and this request will be reverted+rescheduled
s.lock.Unlock()
return nil
}
// Reject the response if the hash sets and slot sets don't match, or if the // Reject the response if the hash sets and slot sets don't match, or if the
// peer sent more data than requested. // peer sent more data than requested.
if len(hashes) != len(slots) { if len(hashes) != len(slots) {
s.lock.Unlock() s.lock.Unlock()
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots)) logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots))
return errors.New("hash and slot set size mismatch") return errors.New("hash and slot set size mismatch")
} }
if len(hashes) > len(req.accounts) { if len(hashes) > len(req.accounts) {
s.lock.Unlock() s.lock.Unlock()
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts)) logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts))
return errors.New("hash set larger than requested") return errors.New("hash set larger than requested")
} }
@ -2216,11 +2271,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
// synced to our head. // synced to our head.
if len(hashes) == 0 { if len(hashes) == 0 {
logger.Debug("Peer rejected storage request") logger.Debug("Peer rejected storage request")
s.statelessPeers[peer.id] = struct{}{} s.statelessPeers[peer.ID()] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
s.scheduleRevertStorageRequest(req) // reschedule request
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertStorageRequest(req)
return nil return nil
} }
s.lock.Unlock() s.lock.Unlock()
@ -2250,6 +2303,7 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
// space and hash to the origin root. // space and hash to the origin root.
dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil) dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
if err != nil { if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage slots failed proof", "err", err) logger.Warn("Storage slots failed proof", "err", err)
return err return err
} }
@ -2264,6 +2318,7 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
} }
dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
if err != nil { if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage range failed proof", "err", err) logger.Warn("Storage range failed proof", "err", err)
return err return err
} }
@ -2302,20 +2357,20 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
// OnTrieNodes is a callback method to invoke when a batch of trie nodes // OnTrieNodes is a callback method to invoke when a batch of trie nodes
// are received from a remote peer. // are received from a remote peer.
func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error { func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
var size common.StorageSize var size common.StorageSize
for _, node := range trienodes { for _, node := range trienodes {
size += common.StorageSize(len(node)) size += common.StorageSize(len(node))
} }
logger := peer.logger.New("reqid", id) logger := peer.Log().New("reqid", id)
logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size) logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.id] = struct{}{} s.trienodeHealIdlers[peer.ID()] = struct{}{}
} }
select { select {
case s.update <- struct{}{}: case s.update <- struct{}{}:
@ -2333,14 +2388,18 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
// Clean up the request timeout timer, we'll see how to proceed further based // Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content // on the actual delivered content
req.timeout.Stop() if !req.timeout.Stop() {
// The timeout is already triggered, and this request will be reverted+rescheduled
s.lock.Unlock()
return nil
}
// Response is valid, but check if peer is signalling that it does not have // Response is valid, but check if peer is signalling that it does not have
// the requested data. For bytecode range queries that means the peer is not // the requested data. For bytecode range queries that means the peer is not
// yet synced. // yet synced.
if len(trienodes) == 0 { if len(trienodes) == 0 {
logger.Debug("Peer rejected trienode heal request") logger.Debug("Peer rejected trienode heal request")
s.statelessPeers[peer.id] = struct{}{} s.statelessPeers[peer.ID()] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2371,6 +2430,8 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
} }
// We've either ran out of hashes, or got unrequested data // We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i) logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode") return errors.New("unexpected healing trienode")
} }
// Response validated, send it to the scheduler for filling // Response validated, send it to the scheduler for filling
@ -2390,20 +2451,20 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
// onHealByteCodes is a callback method to invoke when a batch of contract // onHealByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer in the healing phase. // bytes codes are received from a remote peer in the healing phase.
func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error { func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
var size common.StorageSize var size common.StorageSize
for _, code := range bytecodes { for _, code := range bytecodes {
size += common.StorageSize(len(code)) size += common.StorageSize(len(code))
} }
logger := peer.logger.New("reqid", id) logger := peer.Log().New("reqid", id)
logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size) logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
s.lock.Lock() s.lock.Lock()
if _, ok := s.peers[peer.id]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.id] = struct{}{} s.bytecodeHealIdlers[peer.ID()] = struct{}{}
} }
select { select {
case s.update <- struct{}{}: case s.update <- struct{}{}:
@ -2421,14 +2482,18 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
// Clean up the request timeout timer, we'll see how to proceed further based // Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content // on the actual delivered content
req.timeout.Stop() if !req.timeout.Stop() {
// The timeout is already triggered, and this request will be reverted+rescheduled
s.lock.Unlock()
return nil
}
// Response is valid, but check if peer is signalling that it does not have // Response is valid, but check if peer is signalling that it does not have
// the requested data. For bytecode range queries that means the peer is not // the requested data. For bytecode range queries that means the peer is not
// yet synced. // yet synced.
if len(bytecodes) == 0 { if len(bytecodes) == 0 {
logger.Debug("Peer rejected bytecode heal request") logger.Debug("Peer rejected bytecode heal request")
s.statelessPeers[peer.id] = struct{}{} s.statelessPeers[peer.ID()] = struct{}{}
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2459,6 +2524,8 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
} }
// We've either ran out of hashes, or got unrequested data // We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i) logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertBytecodeHealRequest(req)
return errors.New("unexpected healing bytecode") return errors.New("unexpected healing bytecode")
} }
// Response validated, send it to the scheduler for filling // Response validated, send it to the scheduler for filling

File diff suppressed because it is too large Load Diff