diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index e2b13e991..31e26f7c9 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -36,6 +36,7 @@ type beaconBackfiller struct { syncMode SyncMode // Sync mode to use for backfilling the skeleton chains success func() // Callback to run on successful sync cycle completion filling bool // Flag whether the downloader is backfilling or not + filled *types.Header // Last header filled by the last terminated sync loop started chan struct{} // Notification channel whether the downloader inited lock sync.Mutex // Mutex protecting the sync lock } @@ -48,16 +49,18 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller { } } -// suspend cancels any background downloader threads. -func (b *beaconBackfiller) suspend() { +// suspend cancels any background downloader threads and returns the last header +// that has been successfully backfilled. +func (b *beaconBackfiller) suspend() *types.Header { // If no filling is running, don't waste cycles b.lock.Lock() filling := b.filling + filled := b.filled started := b.started b.lock.Unlock() if !filling { - return + return filled // Return the filled header on the previous sync completion } // A previous filling should be running, though it may happen that it hasn't // yet started (being done on a new goroutine). Many concurrent beacon head @@ -69,6 +72,10 @@ func (b *beaconBackfiller) suspend() { // Now that we're sure the downloader successfully started up, we can cancel // it safely without running the risk of data races. b.downloader.Cancel() + + // Sync cycle was just terminated, retrieve and return the last filled header. + // Can't use `filled` as that contains a stale value from before cancellation. + return b.downloader.blockchain.CurrentFastBlock().Header() } // resume starts the downloader threads for backfilling state and chain data. @@ -81,6 +88,7 @@ func (b *beaconBackfiller) resume() { return } b.filling = true + b.filled = nil b.started = make(chan struct{}) mode := b.syncMode b.lock.Unlock() @@ -92,6 +100,7 @@ func (b *beaconBackfiller) resume() { defer func() { b.lock.Lock() b.filling = false + b.filled = b.downloader.blockchain.CurrentFastBlock().Header() b.lock.Unlock() }() // If the downloader fails, report an error as in beacon chain mode there diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index bebf273da..8df3a26e0 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -19,6 +19,7 @@ package downloader import ( "encoding/json" "errors" + "fmt" "math/rand" "sort" "time" @@ -148,11 +149,15 @@ type backfiller interface { // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even // on initial sartup. - suspend() + // + // The method should return the last block header that has been successfully + // backfilled, or nil if the backfiller was not resumed. + suspend() *types.Header // resume requests the backfiller to start running fill or snap sync based on // the skeleton chain as it has successfully been linked. Appending new heads // to the end of the chain will not result in suspend/resume cycles. + // leaking too much sync logic out to the filler. resume() } @@ -358,8 +363,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { if linked { s.filler.resume() } - defer s.filler.suspend() - + defer func() { + if filled := s.filler.suspend(); filled != nil { + // If something was filled, try to delete stale sync helpers. If + // unsuccessful, warn the user, but not much else we can do (it's + // a programming error, just let users report an issue and don't + // choke in the meantime). + if err := s.cleanStales(filled); err != nil { + log.Error("Failed to clean stale beacon headers", "err", err) + } + } + }() // Create a set of unique channels for this sync cycle. We need these to be // ephemeral so a data race doesn't accidentally deliver something stale on // a persistent channel across syncs (yup, this happened) @@ -582,8 +596,16 @@ func (s *skeleton) processNewHead(head *types.Header, force bool) bool { lastchain := s.progress.Subchains[0] if lastchain.Tail >= number { + // If the chain is down to a single beacon header, and it is re-announced + // once more, ignore it instead of tearing down sync for a noop. + if lastchain.Head == lastchain.Tail { + if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() { + return false + } + } + // Not a noop / double head announce, abort with a reorg if force { - log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number) + log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number) } return true } @@ -943,12 +965,44 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo // If the beacon chain was linked to the local chain, completely swap out // all internal progress and abort header synchronization. if linked { - // Note, linking into the local chain should also mean that there are - // no leftover subchains, but just in case there's some junk due to - // strange conditions or bugs, clean up all internal state. - if len(s.progress.Subchains) > 1 { - log.Error("Cleaning up leftovers after beacon link") - s.progress.Subchains = s.progress.Subchains[:1] + // Linking into the local chain should also mean that there are no + // leftover subchains, but in the case of importing the blocks via + // the engine API, we will not push the subchains forward. This will + // lead to a gap between an old sync cycle and a future one. + if subchains := len(s.progress.Subchains); subchains > 1 { + switch { + // If there are only 2 subchains - the current one and an older + // one - and the old one consists of a single block, then it's + // the expected new sync cycle after some propagated blocks. Log + // it for debugging purposes, explicitly clean and don't escalate. + case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail: + log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head) + rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head) + s.progress.Subchains = s.progress.Subchains[:1] + + // If we have more than one header or more than one leftover chain, + // the syncer's internal state is corrupted. Do try to fix it, but + // be very vocal about the fault. + default: + var context []interface{} + + for i := range s.progress.Subchains[1:] { + context = append(context, fmt.Sprintf("stale_head_%d", i+1)) + context = append(context, s.progress.Subchains[i+1].Head) + context = append(context, fmt.Sprintf("stale_tail_%d", i+1)) + context = append(context, s.progress.Subchains[i+1].Tail) + context = append(context, fmt.Sprintf("stale_next_%d", i+1)) + context = append(context, s.progress.Subchains[i+1].Next) + } + log.Error("Cleaning spurious beacon sync leftovers", context...) + s.progress.Subchains = s.progress.Subchains[:1] + + // Note, here we didn't actually delete the headers at all, + // just the metadata. We could implement a cleanup mechanism, + // but further modifying corrupted state is kind of asking + // for it. Unless there's a good enough reason to risk it, + // better to live with the small database junk. + } } break } @@ -1023,6 +1077,74 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo return linked, merged } +// cleanStales removes previously synced beacon headers that have become stale +// due to the downloader backfilling past the tracked tail. +func (s *skeleton) cleanStales(filled *types.Header) error { + number := filled.Number.Uint64() + log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash()) + + // If the filled header is below the linked subchain, something's + // corrupted internally. Report and error and refuse to do anything. + if number < s.progress.Subchains[0].Tail { + return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail) + } + // Subchain seems trimmable, push the tail forward up to the last + // filled header and delete everything before it - if available. In + // case we filled past the head, recreate the subchain with a new + // head to keep it consistent with the data on disk. + var ( + start = s.progress.Subchains[0].Tail // start deleting from the first known header + end = number // delete until the requested threshold + ) + s.progress.Subchains[0].Tail = number + s.progress.Subchains[0].Next = filled.ParentHash + + if s.progress.Subchains[0].Head < number { + // If more headers were filled than available, push the entire + // subchain forward to keep tracking the node's block imports + end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head + s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this) + } + // Execute the trimming and the potential rewiring of the progress + batch := s.db.NewBatch() + + if end != number { + // The entire original skeleton chain was deleted and a new one + // defined. Make sure the new single-header chain gets pushed to + // disk to keep internal state consistent. + rawdb.WriteSkeletonHeader(batch, filled) + } + s.saveSyncStatus(batch) + for n := start; n < end; n++ { + // If the batch grew too big, flush it and continue with a new batch. + // The catch is that the sync metadata needs to reflect the actually + // flushed state, so temporarily change the subchain progress and + // revert after the flush. + if batch.ValueSize() >= ethdb.IdealBatchSize { + tmpTail := s.progress.Subchains[0].Tail + tmpNext := s.progress.Subchains[0].Next + + s.progress.Subchains[0].Tail = n + s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write beacon trim data", "err", err) + } + batch.Reset() + + s.progress.Subchains[0].Tail = tmpTail + s.progress.Subchains[0].Next = tmpNext + s.saveSyncStatus(batch) + } + rawdb.DeleteSkeletonHeader(batch, n) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to write beacon trim data", "err", err) + } + return nil +} + // Bounds retrieves the current head and tail tracked by the skeleton syncer. // This method is used by the backfiller, whose life cycle is controlled by the // skeleton syncer. diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index cbe0d51d3..4dcaf6e71 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -55,10 +55,11 @@ func newHookedBackfiller() backfiller { // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even // on initial sartup. -func (hf *hookedBackfiller) suspend() { +func (hf *hookedBackfiller) suspend() *types.Header { if hf.suspendHook != nil { hf.suspendHook() } + return nil // we don't really care about header cleanups for now } // resume requests the backfiller to start running fill or snap sync based on @@ -426,7 +427,6 @@ func TestSkeletonSyncExtend(t *testing.T) { newstate: []*subchain{ {Head: 49, Tail: 49}, }, - err: errReorgDenied, }, // Initialize a sync and try to extend it with a sibling block. { @@ -489,7 +489,7 @@ func TestSkeletonSyncExtend(t *testing.T) { <-wait if err := skeleton.Sync(tt.extend, false); err != tt.err { - t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err) + t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err) } skeleton.Terminate()