Patch for concurrent iterator & others (onto v1.11.6) #386
@ -36,6 +36,7 @@ type beaconBackfiller struct {
|
||||
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
|
||||
success func() // Callback to run on successful sync cycle completion
|
||||
filling bool // Flag whether the downloader is backfilling or not
|
||||
filled *types.Header // Last header filled by the last terminated sync loop
|
||||
started chan struct{} // Notification channel whether the downloader inited
|
||||
lock sync.Mutex // Mutex protecting the sync lock
|
||||
}
|
||||
@ -48,16 +49,18 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
|
||||
}
|
||||
}
|
||||
|
||||
// suspend cancels any background downloader threads.
|
||||
func (b *beaconBackfiller) suspend() {
|
||||
// suspend cancels any background downloader threads and returns the last header
|
||||
// that has been successfully backfilled.
|
||||
func (b *beaconBackfiller) suspend() *types.Header {
|
||||
// If no filling is running, don't waste cycles
|
||||
b.lock.Lock()
|
||||
filling := b.filling
|
||||
filled := b.filled
|
||||
started := b.started
|
||||
b.lock.Unlock()
|
||||
|
||||
if !filling {
|
||||
return
|
||||
return filled // Return the filled header on the previous sync completion
|
||||
}
|
||||
// A previous filling should be running, though it may happen that it hasn't
|
||||
// yet started (being done on a new goroutine). Many concurrent beacon head
|
||||
@ -69,6 +72,10 @@ func (b *beaconBackfiller) suspend() {
|
||||
// Now that we're sure the downloader successfully started up, we can cancel
|
||||
// it safely without running the risk of data races.
|
||||
b.downloader.Cancel()
|
||||
|
||||
// Sync cycle was just terminated, retrieve and return the last filled header.
|
||||
// Can't use `filled` as that contains a stale value from before cancellation.
|
||||
return b.downloader.blockchain.CurrentFastBlock().Header()
|
||||
}
|
||||
|
||||
// resume starts the downloader threads for backfilling state and chain data.
|
||||
@ -81,6 +88,7 @@ func (b *beaconBackfiller) resume() {
|
||||
return
|
||||
}
|
||||
b.filling = true
|
||||
b.filled = nil
|
||||
b.started = make(chan struct{})
|
||||
mode := b.syncMode
|
||||
b.lock.Unlock()
|
||||
@ -92,6 +100,7 @@ func (b *beaconBackfiller) resume() {
|
||||
defer func() {
|
||||
b.lock.Lock()
|
||||
b.filling = false
|
||||
b.filled = b.downloader.blockchain.CurrentFastBlock().Header()
|
||||
b.lock.Unlock()
|
||||
}()
|
||||
// If the downloader fails, report an error as in beacon chain mode there
|
||||
|
@ -19,6 +19,7 @@ package downloader
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
@ -148,11 +149,15 @@ type backfiller interface {
|
||||
// based on the skeleton chain as it might be invalid. The backfiller should
|
||||
// gracefully handle multiple consecutive suspends without a resume, even
|
||||
// on initial sartup.
|
||||
suspend()
|
||||
//
|
||||
// The method should return the last block header that has been successfully
|
||||
// backfilled, or nil if the backfiller was not resumed.
|
||||
suspend() *types.Header
|
||||
|
||||
// resume requests the backfiller to start running fill or snap sync based on
|
||||
// the skeleton chain as it has successfully been linked. Appending new heads
|
||||
// to the end of the chain will not result in suspend/resume cycles.
|
||||
// leaking too much sync logic out to the filler.
|
||||
resume()
|
||||
}
|
||||
|
||||
@ -358,8 +363,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
if linked {
|
||||
s.filler.resume()
|
||||
}
|
||||
defer s.filler.suspend()
|
||||
|
||||
defer func() {
|
||||
if filled := s.filler.suspend(); filled != nil {
|
||||
// If something was filled, try to delete stale sync helpers. If
|
||||
// unsuccessful, warn the user, but not much else we can do (it's
|
||||
// a programming error, just let users report an issue and don't
|
||||
// choke in the meantime).
|
||||
if err := s.cleanStales(filled); err != nil {
|
||||
log.Error("Failed to clean stale beacon headers", "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
// Create a set of unique channels for this sync cycle. We need these to be
|
||||
// ephemeral so a data race doesn't accidentally deliver something stale on
|
||||
// a persistent channel across syncs (yup, this happened)
|
||||
@ -582,8 +596,16 @@ func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
|
||||
|
||||
lastchain := s.progress.Subchains[0]
|
||||
if lastchain.Tail >= number {
|
||||
// If the chain is down to a single beacon header, and it is re-announced
|
||||
// once more, ignore it instead of tearing down sync for a noop.
|
||||
if lastchain.Head == lastchain.Tail {
|
||||
if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// Not a noop / double head announce, abort with a reorg
|
||||
if force {
|
||||
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number)
|
||||
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@ -943,12 +965,44 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
|
||||
// If the beacon chain was linked to the local chain, completely swap out
|
||||
// all internal progress and abort header synchronization.
|
||||
if linked {
|
||||
// Note, linking into the local chain should also mean that there are
|
||||
// no leftover subchains, but just in case there's some junk due to
|
||||
// strange conditions or bugs, clean up all internal state.
|
||||
if len(s.progress.Subchains) > 1 {
|
||||
log.Error("Cleaning up leftovers after beacon link")
|
||||
s.progress.Subchains = s.progress.Subchains[:1]
|
||||
// Linking into the local chain should also mean that there are no
|
||||
// leftover subchains, but in the case of importing the blocks via
|
||||
// the engine API, we will not push the subchains forward. This will
|
||||
// lead to a gap between an old sync cycle and a future one.
|
||||
if subchains := len(s.progress.Subchains); subchains > 1 {
|
||||
switch {
|
||||
// If there are only 2 subchains - the current one and an older
|
||||
// one - and the old one consists of a single block, then it's
|
||||
// the expected new sync cycle after some propagated blocks. Log
|
||||
// it for debugging purposes, explicitly clean and don't escalate.
|
||||
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
|
||||
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
|
||||
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
|
||||
s.progress.Subchains = s.progress.Subchains[:1]
|
||||
|
||||
// If we have more than one header or more than one leftover chain,
|
||||
// the syncer's internal state is corrupted. Do try to fix it, but
|
||||
// be very vocal about the fault.
|
||||
default:
|
||||
var context []interface{}
|
||||
|
||||
for i := range s.progress.Subchains[1:] {
|
||||
context = append(context, fmt.Sprintf("stale_head_%d", i+1))
|
||||
context = append(context, s.progress.Subchains[i+1].Head)
|
||||
context = append(context, fmt.Sprintf("stale_tail_%d", i+1))
|
||||
context = append(context, s.progress.Subchains[i+1].Tail)
|
||||
context = append(context, fmt.Sprintf("stale_next_%d", i+1))
|
||||
context = append(context, s.progress.Subchains[i+1].Next)
|
||||
}
|
||||
log.Error("Cleaning spurious beacon sync leftovers", context...)
|
||||
s.progress.Subchains = s.progress.Subchains[:1]
|
||||
|
||||
// Note, here we didn't actually delete the headers at all,
|
||||
// just the metadata. We could implement a cleanup mechanism,
|
||||
// but further modifying corrupted state is kind of asking
|
||||
// for it. Unless there's a good enough reason to risk it,
|
||||
// better to live with the small database junk.
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
@ -1023,6 +1077,74 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
|
||||
return linked, merged
|
||||
}
|
||||
|
||||
// cleanStales removes previously synced beacon headers that have become stale
|
||||
// due to the downloader backfilling past the tracked tail.
|
||||
func (s *skeleton) cleanStales(filled *types.Header) error {
|
||||
number := filled.Number.Uint64()
|
||||
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
|
||||
|
||||
// If the filled header is below the linked subchain, something's
|
||||
// corrupted internally. Report and error and refuse to do anything.
|
||||
if number < s.progress.Subchains[0].Tail {
|
||||
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
|
||||
}
|
||||
// Subchain seems trimmable, push the tail forward up to the last
|
||||
// filled header and delete everything before it - if available. In
|
||||
// case we filled past the head, recreate the subchain with a new
|
||||
// head to keep it consistent with the data on disk.
|
||||
var (
|
||||
start = s.progress.Subchains[0].Tail // start deleting from the first known header
|
||||
end = number // delete until the requested threshold
|
||||
)
|
||||
s.progress.Subchains[0].Tail = number
|
||||
s.progress.Subchains[0].Next = filled.ParentHash
|
||||
|
||||
if s.progress.Subchains[0].Head < number {
|
||||
// If more headers were filled than available, push the entire
|
||||
// subchain forward to keep tracking the node's block imports
|
||||
end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
|
||||
s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
|
||||
}
|
||||
// Execute the trimming and the potential rewiring of the progress
|
||||
batch := s.db.NewBatch()
|
||||
|
||||
if end != number {
|
||||
// The entire original skeleton chain was deleted and a new one
|
||||
// defined. Make sure the new single-header chain gets pushed to
|
||||
// disk to keep internal state consistent.
|
||||
rawdb.WriteSkeletonHeader(batch, filled)
|
||||
}
|
||||
s.saveSyncStatus(batch)
|
||||
for n := start; n < end; n++ {
|
||||
// If the batch grew too big, flush it and continue with a new batch.
|
||||
// The catch is that the sync metadata needs to reflect the actually
|
||||
// flushed state, so temporarily change the subchain progress and
|
||||
// revert after the flush.
|
||||
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||
tmpTail := s.progress.Subchains[0].Tail
|
||||
tmpNext := s.progress.Subchains[0].Next
|
||||
|
||||
s.progress.Subchains[0].Tail = n
|
||||
s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash
|
||||
s.saveSyncStatus(batch)
|
||||
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to write beacon trim data", "err", err)
|
||||
}
|
||||
batch.Reset()
|
||||
|
||||
s.progress.Subchains[0].Tail = tmpTail
|
||||
s.progress.Subchains[0].Next = tmpNext
|
||||
s.saveSyncStatus(batch)
|
||||
}
|
||||
rawdb.DeleteSkeletonHeader(batch, n)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to write beacon trim data", "err", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
|
||||
// This method is used by the backfiller, whose life cycle is controlled by the
|
||||
// skeleton syncer.
|
||||
|
@ -55,10 +55,11 @@ func newHookedBackfiller() backfiller {
|
||||
// based on the skeleton chain as it might be invalid. The backfiller should
|
||||
// gracefully handle multiple consecutive suspends without a resume, even
|
||||
// on initial sartup.
|
||||
func (hf *hookedBackfiller) suspend() {
|
||||
func (hf *hookedBackfiller) suspend() *types.Header {
|
||||
if hf.suspendHook != nil {
|
||||
hf.suspendHook()
|
||||
}
|
||||
return nil // we don't really care about header cleanups for now
|
||||
}
|
||||
|
||||
// resume requests the backfiller to start running fill or snap sync based on
|
||||
@ -426,7 +427,6 @@ func TestSkeletonSyncExtend(t *testing.T) {
|
||||
newstate: []*subchain{
|
||||
{Head: 49, Tail: 49},
|
||||
},
|
||||
err: errReorgDenied,
|
||||
},
|
||||
// Initialize a sync and try to extend it with a sibling block.
|
||||
{
|
||||
@ -489,7 +489,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
|
||||
|
||||
<-wait
|
||||
if err := skeleton.Sync(tt.extend, false); err != tt.err {
|
||||
t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err)
|
||||
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
|
||||
}
|
||||
skeleton.Terminate()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user