eth/downloader: remove header rollback mechanism (#28147)
* eth/downloader: remove rollback mechanism in downloader * eth/downloader: remove the tests
This commit is contained in:
parent
adb9b319c9
commit
b85c183ea7
@ -1280,41 +1280,13 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
|
|||||||
// keeps processing and scheduling them into the header chain and downloader's
|
// keeps processing and scheduling them into the header chain and downloader's
|
||||||
// queue until the stream ends or a failure occurs.
|
// queue until the stream ends or a failure occurs.
|
||||||
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
|
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
|
||||||
// Keep a count of uncertain headers to roll back
|
|
||||||
var (
|
var (
|
||||||
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
|
mode = d.getMode()
|
||||||
rollbackErr error
|
gotHeaders = false // Wait for batches of headers to process
|
||||||
mode = d.getMode()
|
|
||||||
)
|
)
|
||||||
defer func() {
|
|
||||||
if rollback > 0 {
|
|
||||||
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
|
|
||||||
if mode != LightSync {
|
|
||||||
lastFastBlock = d.blockchain.CurrentSnapBlock().Number
|
|
||||||
lastBlock = d.blockchain.CurrentBlock().Number
|
|
||||||
}
|
|
||||||
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
|
|
||||||
// We're already unwinding the stack, only print the error to make it more visible
|
|
||||||
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
|
|
||||||
}
|
|
||||||
curFastBlock, curBlock := common.Big0, common.Big0
|
|
||||||
if mode != LightSync {
|
|
||||||
curFastBlock = d.blockchain.CurrentSnapBlock().Number
|
|
||||||
curBlock = d.blockchain.CurrentBlock().Number
|
|
||||||
}
|
|
||||||
log.Warn("Rolled back chain segment",
|
|
||||||
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
|
|
||||||
"snap", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
|
|
||||||
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
// Wait for batches of headers to process
|
|
||||||
gotHeaders := false
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
rollbackErr = errCanceled
|
|
||||||
return errCanceled
|
return errCanceled
|
||||||
|
|
||||||
case task := <-d.headerProcCh:
|
case task := <-d.headerProcCh:
|
||||||
@ -1363,8 +1335,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Disable any rollback and return
|
|
||||||
rollback = 0
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Otherwise split the chunk of headers into batches and process them
|
// Otherwise split the chunk of headers into batches and process them
|
||||||
@ -1375,7 +1345,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
|
|||||||
// Terminate if something failed in between processing chunks
|
// Terminate if something failed in between processing chunks
|
||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
rollbackErr = errCanceled
|
|
||||||
return errCanceled
|
return errCanceled
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -1422,29 +1391,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
|
|||||||
}
|
}
|
||||||
if len(chunkHeaders) > 0 {
|
if len(chunkHeaders) > 0 {
|
||||||
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
|
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
|
||||||
rollbackErr = err
|
|
||||||
|
|
||||||
// If some headers were inserted, track them as uncertain
|
|
||||||
if mode == SnapSync && n > 0 && rollback == 0 {
|
|
||||||
rollback = chunkHeaders[0].Number.Uint64()
|
|
||||||
}
|
|
||||||
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
|
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
|
||||||
return fmt.Errorf("%w: %v", errInvalidChain, err)
|
return fmt.Errorf("%w: %v", errInvalidChain, err)
|
||||||
}
|
}
|
||||||
// All verifications passed, track all headers within the allowed limits
|
|
||||||
if mode == SnapSync {
|
|
||||||
head := chunkHeaders[len(chunkHeaders)-1].Number.Uint64()
|
|
||||||
if head-rollback > uint64(fsHeaderSafetyNet) {
|
|
||||||
rollback = head - uint64(fsHeaderSafetyNet)
|
|
||||||
} else {
|
|
||||||
rollback = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(rejected) != 0 {
|
if len(rejected) != 0 {
|
||||||
// Merge threshold reached, stop importing, but don't roll back
|
|
||||||
rollback = 0
|
|
||||||
|
|
||||||
log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd)
|
log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd)
|
||||||
return ErrMergeTransition
|
return ErrMergeTransition
|
||||||
}
|
}
|
||||||
@ -1455,7 +1406,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
|
|||||||
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
|
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
|
||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
rollbackErr = errCanceled
|
|
||||||
return errCanceled
|
return errCanceled
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
}
|
}
|
||||||
@ -1463,7 +1413,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
|
|||||||
// Otherwise insert the headers for content retrieval
|
// Otherwise insert the headers for content retrieval
|
||||||
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
|
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
|
||||||
if len(inserts) != len(chunkHeaders) {
|
if len(inserts) != len(chunkHeaders) {
|
||||||
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders))
|
|
||||||
return fmt.Errorf("%w: stale headers", errBadPeer)
|
return fmt.Errorf("%w: stale headers", errBadPeer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -878,86 +878,6 @@ func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
|
|||||||
assertOwnChain(t, tester, len(chain.blocks))
|
assertOwnChain(t, tester, len(chain.blocks))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that upon detecting an invalid header, the recent ones are rolled back
|
|
||||||
// for various failure scenarios. Afterwards a full sync is attempted to make
|
|
||||||
// sure no state was corrupted.
|
|
||||||
func TestInvalidHeaderRollback66Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH66, SnapSync) }
|
|
||||||
func TestInvalidHeaderRollback67Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH67, SnapSync) }
|
|
||||||
|
|
||||||
func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
|
|
||||||
tester := newTester(t)
|
|
||||||
defer tester.terminate()
|
|
||||||
|
|
||||||
// Create a small enough block chain to download
|
|
||||||
targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
|
|
||||||
chain := testChainBase.shorten(targetBlocks)
|
|
||||||
|
|
||||||
// Attempt to sync with an attacker that feeds junk during the fast sync phase.
|
|
||||||
// This should result in the last fsHeaderSafetyNet headers being rolled back.
|
|
||||||
missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
|
|
||||||
|
|
||||||
fastAttacker := tester.newPeer("fast-attack", protocol, chain.blocks[1:])
|
|
||||||
fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}
|
|
||||||
|
|
||||||
if err := tester.sync("fast-attack", nil, mode); err == nil {
|
|
||||||
t.Fatalf("succeeded fast attacker synchronisation")
|
|
||||||
}
|
|
||||||
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
|
|
||||||
t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
|
|
||||||
}
|
|
||||||
// Attempt to sync with an attacker that feeds junk during the block import phase.
|
|
||||||
// This should result in both the last fsHeaderSafetyNet number of headers being
|
|
||||||
// rolled back, and also the pivot point being reverted to a non-block status.
|
|
||||||
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
|
|
||||||
|
|
||||||
blockAttacker := tester.newPeer("block-attack", protocol, chain.blocks[1:])
|
|
||||||
fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{} // Make sure the fast-attacker doesn't fill in
|
|
||||||
blockAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}
|
|
||||||
|
|
||||||
if err := tester.sync("block-attack", nil, mode); err == nil {
|
|
||||||
t.Fatalf("succeeded block attacker synchronisation")
|
|
||||||
}
|
|
||||||
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
|
|
||||||
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
|
|
||||||
}
|
|
||||||
if mode == SnapSync {
|
|
||||||
if head := tester.chain.CurrentBlock().Number.Uint64(); head != 0 {
|
|
||||||
t.Errorf("fast sync pivot block #%d not rolled back", head)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Attempt to sync with an attacker that withholds promised blocks after the
|
|
||||||
// fast sync pivot point. This could be a trial to leave the node with a bad
|
|
||||||
// but already imported pivot block.
|
|
||||||
withholdAttacker := tester.newPeer("withhold-attack", protocol, chain.blocks[1:])
|
|
||||||
|
|
||||||
tester.downloader.syncInitHook = func(uint64, uint64) {
|
|
||||||
for i := missing; i < len(chain.blocks); i++ {
|
|
||||||
withholdAttacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{}
|
|
||||||
}
|
|
||||||
tester.downloader.syncInitHook = nil
|
|
||||||
}
|
|
||||||
if err := tester.sync("withhold-attack", nil, mode); err == nil {
|
|
||||||
t.Fatalf("succeeded withholding attacker synchronisation")
|
|
||||||
}
|
|
||||||
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
|
|
||||||
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
|
|
||||||
}
|
|
||||||
if mode == SnapSync {
|
|
||||||
if head := tester.chain.CurrentBlock().Number.Uint64(); head != 0 {
|
|
||||||
t.Errorf("fast sync pivot block #%d not rolled back", head)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Synchronise with the valid peer and make sure sync succeeds. Since the last rollback
|
|
||||||
// should also disable fast syncing for this process, verify that we did a fresh full
|
|
||||||
// sync. Note, we can't assert anything about the receipts since we won't purge the
|
|
||||||
// database of them, hence we can't use assertOwnChain.
|
|
||||||
tester.newPeer("valid", protocol, chain.blocks[1:])
|
|
||||||
if err := tester.sync("valid", nil, mode); err != nil {
|
|
||||||
t.Fatalf("failed to synchronise blocks: %v", err)
|
|
||||||
}
|
|
||||||
assertOwnChain(t, tester, len(chain.blocks))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests that a peer advertising a high TD doesn't get to stall the downloader
|
// Tests that a peer advertising a high TD doesn't get to stall the downloader
|
||||||
// afterwards by not sending any useful hashes.
|
// afterwards by not sending any useful hashes.
|
||||||
func TestHighTDStarvationAttack66Full(t *testing.T) {
|
func TestHighTDStarvationAttack66Full(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user