eth/downloader: create repro testcase for beacon header loss

This commit is contained in:
Péter Szilágyi 2023-01-09 12:12:25 +02:00
parent 686f7438d3
commit 71f7988b0f
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
2 changed files with 120 additions and 39 deletions

View File

@ -520,7 +520,7 @@ func (s *skeleton) initSync(head *types.Header) {
}
break
}
// If the last subchain can be extended, we're lucky. Otherwise create
// If the last subchain can be extended, we're lucky. Otherwise, create
// a new subchain sync task.
var extended bool
if n := len(s.progress.Subchains); n > 0 {

View File

@ -37,7 +37,7 @@ import (
type hookedBackfiller struct {
// suspendHook is an optional hook to be called when the filler is requested
// to be suspended.
suspendHook func()
suspendHook func() *types.Header
// resumeHook is an optional hook to be called when the filler is requested
// to be resumed.
@ -56,7 +56,7 @@ func newHookedBackfiller() backfiller {
// on initial startup.
func (hf *hookedBackfiller) suspend() *types.Header {
if hf.suspendHook != nil {
hf.suspendHook()
return hf.suspendHook()
}
return nil // we don't really care about header cleanups for now
}
@ -525,9 +525,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
Number: big.NewInt(int64(i)),
})
}
// Some tests require a forking side chain to trigger cornercases.
var sidechain []*types.Header
for i := 0; i < len(chain)/2; i++ { // Fork at block #5000
sidechain = append(sidechain, chain[i])
}
for i := len(chain) / 2; i < len(chain); i++ {
sidechain = append(sidechain, &types.Header{
ParentHash: sidechain[i-1].Hash(),
Number: big.NewInt(int64(i)),
Extra: []byte("B"), // force a different hash
})
}
tests := []struct {
headers []*types.Header // Database content (beside the genesis)
oldstate []*subchain // Old sync state with various interrupted subchains
fill bool // Whether to run a real backfiller in this test case
unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments
head *types.Header // New head header to announce to reorg to
peers []*skeletonTestPeer // Initial peer set to start the sync with
@ -760,11 +772,41 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}},
endserve: 4 * requestHeaders,
},
// This test reproduces a bug caught by (@rjl493456442) where a skeleton
// header goes missing, causing the sync to get stuck and/or panic.
//
// The setup requires a previously successfully synced chain up to a block
// height N. That results is a single skeleton header (block N) and a single
// subchain (head N, Tail N) being stored on disk.
//
// The following step requires a new sync cycle to a new side chain of a
// height higher than N, and an ancestor lower than N (e.g. N-2, N+2).
// In this scenario, when processing a batch of headers, a link point of
// N-2 will be found, meaning that N-1 and N have been overwritten.
//
// The link event triggers an early exit, noticing that the previous sub-
// chain is a leftover and deletes it (with it's skeleton header N). But
// since skeleton header N has been overwritten to the new side chain, we
// end up losing it and creating a gap.
{
fill: true,
unpredictable: true, // We have good and bad peer too, bad may be dropped, test too short for certainty
head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2
peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)},
midstate: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}},
newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4
newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain),
endstate: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}},
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, chain[0])
rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0]))
rawdb.WriteReceipts(db, chain[0].Hash(), chain[0].Number.Uint64(), types.Receipts{})
// Create a peer set to feed headers through
peerset := newPeerSet()
@ -780,8 +822,43 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
peerset.Unregister(peer)
dropped[peer]++
}
// Create a backfiller if we need to run more advanced tests
filler := newHookedBackfiller()
if tt.fill {
var filled *types.Header
filler = &hookedBackfiller{
resumeHook: func() {
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
for progress.Subchains[0].Tail < progress.Subchains[0].Head {
header := rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)
rawdb.WriteBlock(db, types.NewBlockWithHeader(header))
rawdb.WriteReceipts(db, header.Hash(), header.Number.Uint64(), types.Receipts{})
rawdb.DeleteSkeletonHeader(db, header.Number.Uint64())
progress.Subchains[0].Tail++
progress.Subchains[0].Next = header.Hash()
}
filled = rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)
rawdb.WriteBlock(db, types.NewBlockWithHeader(filled))
rawdb.WriteReceipts(db, filled.Hash(), filled.Number.Uint64(), types.Receipts{})
},
suspendHook: func() *types.Header {
prev := filled
filled = nil
return prev
},
}
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller())
skeleton := newSkeleton(db, peerset, drop, filler)
skeleton.Sync(tt.head, true)
var progress skeletonProgress
@ -815,19 +892,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
t.Error(err)
continue
}
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
if !tt.unpredictable {
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
}
// Apply the post-init events if there's any
if tt.newHead != nil {
@ -868,25 +947,27 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
continue
}
// Check that the peers served no more headers than we actually needed
served = 0
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops = 0
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
if !tt.unpredictable {
served := uint64(0)
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops := uint64(0)
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
}
if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
}
// Clean up any leftover skeleton sync resources
skeleton.Terminate()