diff --git a/core/state/sync.go b/core/state/sync.go index 8456a810b..2c29d706a 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -59,10 +59,16 @@ func (s *StateSync) Missing(max int) []common.Hash { } // Process injects a batch of retrieved trie nodes data, returning if something -// was committed to the database and also the index of an entry if processing of +// was committed to the memcache and also the index of an entry if processing of // it failed. -func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) { - return (*trie.TrieSync)(s).Process(list, dbw) +func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { + return (*trie.TrieSync)(s).Process(list) +} + +// Commit flushes the data stored in the internal memcache out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *StateSync) Commit(dbw trie.DatabaseWriter) (int, error) { + return (*trie.TrieSync)(s).Commit(dbw) } // Pending returns the number of state entries currently pending for download. diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 43d146e3a..108ebb320 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -138,9 +138,12 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two states are in sync @@ -168,9 +171,12 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(0)...) } // Cross check that the two states are in sync @@ -206,9 +212,12 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{Hash: hash, Data: data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -249,9 +258,12 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} } @@ -283,9 +295,12 @@ func TestIncompleteStateSync(t *testing.T) { results[i] = trie.SyncResult{Hash: hash, Data: data} } // Process each of the state nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03..e4d1392d0 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,9 +109,9 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks @@ -136,16 +136,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he dl := &Downloader{ mode: mode, mux: mux, - queue: newQueue(stateDb), + queue: newQueue(), peers: newPeerSet(), + stateDB: stateDb, rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), hasHeader: hasHeader, @@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he insertReceipts: insertReceipts, rollback: rollback, dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), headerCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), quitCh: make(chan struct{}), + // for stateFetcher + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), + stateCh: make(chan dataPack), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress { StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,73 +1293,153 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.insertBlocks(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) - } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain - } - // Shift the results to the next batch - results = results[items:] + } + if err := d.importBlockResults(afterP); err != nil { + return err } } } +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err + } + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.insertReceipts(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err + } + return d.commitHeadBlock(b.Hash()) +} + // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 0d76c7dfd..58764ccf0 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -38,8 +38,6 @@ var ( receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") - stateInMeter = metrics.NewMeter("eth/downloader/states/in") - stateReqTimer = metrics.NewTimer("eth/downloader/states/req") - stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") - stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 15a912f1f..dc8b09772 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { } // FetchNodeData sends a node state data retrieval request to the remote peer. -func (p *peer) FetchNodeData(request *fetchRequest) error { +func (p *peer) FetchNodeData(hashes []common.Hash) error { // Sanity check the protocol version if p.version < 63 { panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) @@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return errAlreadyFetching } p.stateStarted = time.Now() - - // Convert the hash set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash := range request.Hashes { - hashes = append(hashes, hash) - } go p.getNodeData(hashes) - return nil } @@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peer + newPeerFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. @@ -354,6 +349,10 @@ func newPeerSet() *peerSet { } } +func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription { + return ps.newPeerFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error { // Register the new peer with some meaningful defaults ps.lock.Lock() - defer ps.lock.Unlock() - if _, ok := ps.peers[p.id]; ok { + ps.lock.Unlock() return errAlreadyRegistered } if len(ps.peers) > 0 { @@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error { p.stateThroughput /= float64(len(ps.peers)) } ps.peers[p.id] = p + ps.lock.Unlock() + + ps.newPeerFeed.Send(p) return nil } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 855097c45..8a7735d67 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,20 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var ( - blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently -) +var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download var ( errNoFetchesPending = errors.New("no fetches pending") @@ -94,15 +87,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order - stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority - stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for - statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateWriters int // [eth/63] Number of running state DB writer goroutines - resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -112,7 +96,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(stateDb ethdb.Database) *queue { +func newQueue() *queue { lock := new(sync.Mutex) return &queue{ headerPendPool: make(map[string]*fetchRequest), @@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - stateTaskPool: make(map[common.Hash]int), - stateTaskQueue: prque.New(), - statePendPool: make(map[string]*fetchRequest), - stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), active: sync.NewCond(lock), lock: lock, @@ -158,12 +138,6 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - q.statePendPool = make(map[string]*fetchRequest) - q.stateScheduler = nil - q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 } @@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingNodeData retrieves the number of node data entries pending for retrieval. -func (q *queue) PendingNodeData() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.pendingNodeDataLocked() -} - -// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. -// The caller must hold q.lock. -func (q *queue) pendingNodeDataLocked() int { - var n int - if q.stateScheduler != nil { - n = q.stateScheduler.Pending() - } - // Ensure that PendingNodeData doesn't return 0 until all state is written. - if q.stateWriters > 0 { - n++ - } - return n -} - // InFlightHeaders retrieves whether there are header fetch requests currently // in flight. func (q *queue) InFlightHeaders() bool { @@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightNodeData retrieves whether there are node data entry fetch requests -// currently in flight. -func (q *queue) InFlightNodeData() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.statePendPool)+q.stateWriters > 0 -} - -// Idle returns if the queue is fully idle or has some data still inside. This -// method is used by the tester to detect termination events. +// Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - if q.stateScheduler != nil { - queued += q.stateScheduler.Pending() - } return (queued + pending + cached) == 0 } @@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } - if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash) - - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - for _, req := range q.statePendPool { - req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear - } - - q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - } inserts = append(inserts, header) q.headerHead = hash from++ @@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int { if result == nil || result.Pending > 0 { return i } - // Special handling for the fast-sync pivot block: - if q.mode == FastSync { - bnum := result.Header.Number.Uint64() - if bnum == q.fastSyncPivot { - // If the state of the pivot block is not - // available yet, we cannot proceed and return 0. - // - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { return i } - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } - // If we're just the fast sync pivot, stop as well - // because the following batch needs different insertion. - // This simplifies handling the switchover in d.process. - if bnum == q.fastSyncPivot+1 && i > 0 { - return i } } } @@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { return request } -// ReserveNodeData reserves a set of node data hashes for the given peer, skipping -// any previously failed download. -func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - // Create a task generator to fetch status-fetch tasks if all schedules ones are done - generator := func(max int) { - if q.stateScheduler != nil { - for _, hash := range q.stateScheduler.Missing(max) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } - } - } - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates) -} - -// reserveHashes reserves a set of hashes for the given peer, skipping previously -// failed ones. -// -// Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need -// to access the queue, so they already need a lock anyway. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := pendPool[p.id]; ok { - return nil - } - // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - allowance := maxPending - if allowance > 0 { - for _, request := range pendPool { - allowance -= len(request.Hashes) - } - } - // If there's a task generator, ask it to fill our task queue - if taskGen != nil && taskQueue.Size() < allowance { - taskGen(allowance - taskQueue.Size()) - } - if taskQueue.Empty() { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send := make(map[common.Hash]int) - skip := make(map[common.Hash]int) - - for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { - hash, priority := taskQueue.Pop() - if p.Lacks(hash.(common.Hash)) { - skip[hash.(common.Hash)] = int(priority) - } else { - send[hash.(common.Hash)] = int(priority) - } - } - // Merge all the skipped hashes back - for hash, index := range skip { - taskQueue.Push(hash, float32(index)) - } - // Assemble and return the block download request - if len(send) == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - Hashes: send, - Time: time.Now(), - } - pendPool[p.id] = request - - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. @@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// CancelNodeData aborts a node state data fetch request, returning all pending -// hashes to the task queue. -func (q *queue) CancelNodeData(request *fetchRequest) { - q.cancel(request, q.stateTaskQueue, q.statePendPool) -} - // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() @@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } - if request, ok := q.statePendPool[peerId]; ok { - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - delete(q.statePendPool, peerId) - } } // ExpireHeaders checks for in flight requests that exceeded a timeout allowance, @@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } -// ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) -} - // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // @@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } -// DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the data was never requested - request := q.statePendPool[id] - if request == nil { - return 0, errNoFetchesPending - } - stateReqTimer.UpdateSince(request.Time) - delete(q.statePendPool, id) - - // If no data was retrieved, mark their hashes as unavailable for the origin peer - if len(data) == 0 { - for hash := range request.Hashes { - request.Peer.MarkLacking(hash) - } - } - // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) - process := []trie.SyncResult{} - for _, blob := range data { - // Skip any state trie entries that were not requested - hash := common.BytesToHash(crypto.Keccak256(blob)) - if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) - continue - } - // Inject the next state trie item into the processing queue - process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - delete(request.Hashes, hash) - delete(q.stateTaskPool, hash) - } - // Return all failed or missing fetches to the queue - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - if q.stateScheduler == nil { - return 0, errNoFetchesPending - } - - // Run valid nodes through the trie download scheduler. It writes completed nodes to a - // batch, which is committed asynchronously. This may lead to over-fetches because the - // scheduler treats everything as written after Process has returned, but it's - // unlikely to be an issue in practice. - batch := q.stateDatabase.NewBatch() - progressed, nproc, procerr := q.stateScheduler.Process(process, batch) - q.stateWriters += 1 - go func() { - if procerr == nil { - nproc = len(process) - procerr = batch.Write() - } - // Return processing errors through the callback so the sync gets canceled. The - // number of writers is decremented prior to the call so PendingNodeData will - // return zero when the callback runs. - q.lock.Lock() - q.stateWriters -= 1 - q.lock.Unlock() - callback(nproc, progressed, procerr) - // Wake up WaitResults after the state has been written because it might be - // waiting for completion of the pivot block's state download. - q.active.Signal() - }() - - // If none of the data items were good, it's a stale delivery - switch { - case len(errs) == 0: - return len(process), nil - case len(errs) == len(request.Hashes): - return len(process), errStaleDelivery - default: - return len(process), fmt.Errorf("multiple failures: %v", errs) - } -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { @@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. } q.fastSyncPivot = pivot q.mode = mode - - // If long running fast sync, also start up a head stateretrieval immediately - if mode == FastSync && pivot > 0 { - q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) - } } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go new file mode 100644 index 000000000..4e6612039 --- /dev/null +++ b/eth/downloader/statesync.go @@ -0,0 +1,449 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "hash" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" +) + +// stateReq represents a batch of state fetch requests groupped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peer // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + finished = append(finished[:0], finished[1:]...) + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previsouly timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practive however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *state.StateSync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download taks, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() error { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peer, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err := s.assignTasks(); err != nil { + return err + } + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case req := <-s.deliver: + // Response or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) + if len(req.items) <= 2 && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + stale, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) + if !stale { + req.peer.SetNodeDataIdle(len(req.response)) + } + } + } + return nil +} + +// assignTasks attempts to assing new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() error { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + } + } + } + return nil +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +func (s *stateSync) process(req *stateReq) (bool, error) { + // Collect processing stats and update progress if valid data was received + processed, written, duplicate, unexpected := 0, 0, 0, 0 + + defer func(start time.Time) { + if processed+written+duplicate+unexpected > 0 { + s.updateStats(processed, written, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress, stale := false, len(req.response) > 0 + + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + processed++ + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + if prog { + progress = true + } + // If the node delivered a requested item, mark the delivery non-stale + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + stale = false + } + } + // If some data managed to hit the database, flush and reset failure counters + if progress { + // Flush any accumulated data out to disk + batch := s.d.stateDB.NewBatch() + + count, err := s.sched.Commit(batch) + if err != nil { + return stale, err + } + if err := batch.Write(); err != nil { + return stale, err + } + written = count + + // If we're inside the critical section, reset fail counter since we progressed + if atomic.LoadUint32(&s.d.fsPivotFails) > 1 { + log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) + atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block + } + } + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return stale, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(processed) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) +} diff --git a/trie/sync.go b/trie/sync.go index 168501392..9e8449431 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -28,6 +28,10 @@ import ( // node it did not request. var ErrNotRequested = errors.New("not requested") +// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a +// node it already processed previously. +var ErrAlreadyProcessed = errors.New("already processed") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -48,6 +52,21 @@ type SyncResult struct { Data []byte // Data content of the retrieved node } +// syncMemBatch is an in-memory buffer of successfully downloaded but not yet +// persisted data items. +type syncMemBatch struct { + batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items + order []common.Hash // Order of completion to prevent out-of-order data loss +} + +// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. +func newSyncMemBatch() *syncMemBatch { + return &syncMemBatch{ + batch: make(map[common.Hash][]byte), + order: make([]common.Hash, 0, 256), + } +} + // TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a // leaf node. It's used by state syncing to check if the leaf node requires some // further data syncing. @@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type TrieSync struct { - database DatabaseReader + database DatabaseReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } @@ -66,6 +86,7 @@ type TrieSync struct { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, + membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(), } @@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c if root == emptyRoot { return } + if _, ok := s.membatch.batch[root]; ok { + return + } key := root.Bytes() blob, _ := s.database.Get(key) if local, err := decodeNode(key, blob, 0); local != nil && err == nil { @@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) if hash == emptyState { return } + if _, ok := s.membatch.batch[hash]; ok { + return + } if blob, _ := s.database.Get(hash.Bytes()); blob != nil { return } @@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, if request == nil { return committed, i, ErrNotRequested } + if request.data != nil { + return committed, i, ErrAlreadyProcessed + } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, 0, nil } +// Commit flushes the data stored in the internal membatch out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) { + // Dump the membatch into a database dbw + for i, key := range s.membatch.order { + if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { + return i, err + } + } + written := len(s.membatch.order) + + // Drop the membatch data and return + s.membatch = newSyncMemBatch() + return written, nil +} + // Pending returns the number of state entries currently pending for download. func (s *TrieSync) Pending() int { return len(s.requests) @@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // If the child references another node, resolve or schedule if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database + hash := common.BytesToHash(node) + if _, ok := s.membatch.batch[hash]; ok { + continue + } blob, _ := s.database.Get(node) if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil { continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ - hash: common.BytesToHash(node), + hash: hash, parents: []*request{req}, depth: child.depth, callback: req.callback, @@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { return requests, nil } -// commit finalizes a retrieval request and stores it into the database. If any +// commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { - // Write the node content to disk - if err := dbw.Put(req.hash[:], req.data); err != nil { - return err - } +func (s *TrieSync) commit(req *request) (err error) { + // Write the node content to the membatch + s.membatch.batch[req.hash] = req.data + s.membatch.order = append(s.membatch.order, req.hash) + delete(s.requests, req.hash) // Check all parents for completion for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent, dbw); err != nil { + if err := s.commit(parent); err != nil { return err } } diff --git a/trie/sync_test.go b/trie/sync_test.go index d778555b9..ec16a25bd 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two tries are in sync @@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { delete(queue, result.Hash) } @@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) }