Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
2 changed files with 128 additions and 41 deletions
Showing only changes of commit 43692c84d1 - Show all commits

View File

@ -520,7 +520,7 @@ func (s *skeleton) initSync(head *types.Header) {
} }
break break
} }
// If the last subchain can be extended, we're lucky. Otherwise create // If the last subchain can be extended, we're lucky. Otherwise, create
// a new subchain sync task. // a new subchain sync task.
var extended bool var extended bool
if n := len(s.progress.Subchains); n > 0 { if n := len(s.progress.Subchains); n > 0 {
@ -977,8 +977,14 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// the expected new sync cycle after some propagated blocks. Log // the expected new sync cycle after some propagated blocks. Log
// it for debugging purposes, explicitly clean and don't escalate. // it for debugging purposes, explicitly clean and don't escalate.
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail: case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
// Remove the leftover skeleton header associated with old
// skeleton chain only if it's not covered by the current
// skeleton range.
if s.progress.Subchains[1].Head < s.progress.Subchains[0].Tail {
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head) log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head) rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
}
// Drop the leftover skeleton chain since it's stale.
s.progress.Subchains = s.progress.Subchains[:1] s.progress.Subchains = s.progress.Subchains[:1]
// If we have more than one header or more than one leftover chain, // If we have more than one header or more than one leftover chain,

View File

@ -37,7 +37,7 @@ import (
type hookedBackfiller struct { type hookedBackfiller struct {
// suspendHook is an optional hook to be called when the filler is requested // suspendHook is an optional hook to be called when the filler is requested
// to be suspended. // to be suspended.
suspendHook func() suspendHook func() *types.Header
// resumeHook is an optional hook to be called when the filler is requested // resumeHook is an optional hook to be called when the filler is requested
// to be resumed. // to be resumed.
@ -56,7 +56,7 @@ func newHookedBackfiller() backfiller {
// on initial startup. // on initial startup.
func (hf *hookedBackfiller) suspend() *types.Header { func (hf *hookedBackfiller) suspend() *types.Header {
if hf.suspendHook != nil { if hf.suspendHook != nil {
hf.suspendHook() return hf.suspendHook()
} }
return nil // we don't really care about header cleanups for now return nil // we don't really care about header cleanups for now
} }
@ -525,9 +525,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
Number: big.NewInt(int64(i)), Number: big.NewInt(int64(i)),
}) })
} }
// Some tests require a forking side chain to trigger cornercases.
var sidechain []*types.Header
for i := 0; i < len(chain)/2; i++ { // Fork at block #5000
sidechain = append(sidechain, chain[i])
}
for i := len(chain) / 2; i < len(chain); i++ {
sidechain = append(sidechain, &types.Header{
ParentHash: sidechain[i-1].Hash(),
Number: big.NewInt(int64(i)),
Extra: []byte("B"), // force a different hash
})
}
tests := []struct { tests := []struct {
headers []*types.Header // Database content (beside the genesis) fill bool // Whether to run a real backfiller in this test case
oldstate []*subchain // Old sync state with various interrupted subchains unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments
head *types.Header // New head header to announce to reorg to head *types.Header // New head header to announce to reorg to
peers []*skeletonTestPeer // Initial peer set to start the sync with peers []*skeletonTestPeer // Initial peer set to start the sync with
@ -760,11 +772,41 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}}, endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}},
endserve: 4 * requestHeaders, endserve: 4 * requestHeaders,
}, },
// This test reproduces a bug caught by (@rjl493456442) where a skeleton
// header goes missing, causing the sync to get stuck and/or panic.
//
// The setup requires a previously successfully synced chain up to a block
// height N. That results is a single skeleton header (block N) and a single
// subchain (head N, Tail N) being stored on disk.
//
// The following step requires a new sync cycle to a new side chain of a
// height higher than N, and an ancestor lower than N (e.g. N-2, N+2).
// In this scenario, when processing a batch of headers, a link point of
// N-2 will be found, meaning that N-1 and N have been overwritten.
//
// The link event triggers an early exit, noticing that the previous sub-
// chain is a leftover and deletes it (with it's skeleton header N). But
// since skeleton header N has been overwritten to the new side chain, we
// end up losing it and creating a gap.
{
fill: true,
unpredictable: true, // We have good and bad peer too, bad may be dropped, test too short for certainty
head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2
peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)},
midstate: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}},
newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4
newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain),
endstate: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}},
},
} }
for i, tt := range tests { for i, tt := range tests {
// Create a fresh database and initialize it with the starting state // Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase() db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, chain[0])
rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0]))
rawdb.WriteReceipts(db, chain[0].Hash(), chain[0].Number.Uint64(), types.Receipts{})
// Create a peer set to feed headers through // Create a peer set to feed headers through
peerset := newPeerSet() peerset := newPeerSet()
@ -780,8 +822,43 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
peerset.Unregister(peer) peerset.Unregister(peer)
dropped[peer]++ dropped[peer]++
} }
// Create a backfiller if we need to run more advanced tests
filler := newHookedBackfiller()
if tt.fill {
var filled *types.Header
filler = &hookedBackfiller{
resumeHook: func() {
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
for progress.Subchains[0].Tail < progress.Subchains[0].Head {
header := rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)
rawdb.WriteBlock(db, types.NewBlockWithHeader(header))
rawdb.WriteReceipts(db, header.Hash(), header.Number.Uint64(), types.Receipts{})
rawdb.DeleteSkeletonHeader(db, header.Number.Uint64())
progress.Subchains[0].Tail++
progress.Subchains[0].Next = header.Hash()
}
filled = rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)
rawdb.WriteBlock(db, types.NewBlockWithHeader(filled))
rawdb.WriteReceipts(db, filled.Hash(), filled.Number.Uint64(), types.Receipts{})
},
suspendHook: func() *types.Header {
prev := filled
filled = nil
return prev
},
}
}
// Create a skeleton sync and run a cycle // Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller()) skeleton := newSkeleton(db, peerset, drop, filler)
skeleton.Sync(tt.head, true) skeleton.Sync(tt.head, true)
var progress skeletonProgress var progress skeletonProgress
@ -815,6 +892,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
t.Error(err) t.Error(err)
continue continue
} }
if !tt.unpredictable {
var served uint64 var served uint64
for _, peer := range tt.peers { for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served) served += atomic.LoadUint64(&peer.served)
@ -829,6 +907,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if drops != tt.middrop { if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
} }
}
// Apply the post-init events if there's any // Apply the post-init events if there's any
if tt.newHead != nil { if tt.newHead != nil {
skeleton.Sync(tt.newHead, true) skeleton.Sync(tt.newHead, true)
@ -868,7 +947,8 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
continue continue
} }
// Check that the peers served no more headers than we actually needed // Check that the peers served no more headers than we actually needed
served = 0 if !tt.unpredictable {
served := uint64(0)
for _, peer := range tt.peers { for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served) served += atomic.LoadUint64(&peer.served)
} }
@ -878,16 +958,17 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if served != tt.endserve { if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
} }
drops = 0 drops := uint64(0)
for _, peer := range tt.peers { for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped) drops += atomic.LoadUint64(&peer.dropped)
} }
if tt.newPeer != nil { if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped) drops += atomic.LoadUint64(&tt.newPeer.dropped)
} }
if drops != tt.middrop { if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
} }
}
// Clean up any leftover skeleton sync resources // Clean up any leftover skeleton sync resources
skeleton.Terminate() skeleton.Terminate()
} }