eth/downloader, trie: pull head state concurrently with chain
This commit is contained in:
		
							parent
							
								
									a7434fd008
								
							
						
					
					
						commit
						4f1d92b332
					
				| @ -35,6 +35,7 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum/logger" | ||||
| 	"github.com/ethereum/go-ethereum/logger/glog" | ||||
| 	"github.com/ethereum/go-ethereum/params" | ||||
| 	"github.com/ethereum/go-ethereum/trie" | ||||
| 	"github.com/rcrowley/go-metrics" | ||||
| ) | ||||
| 
 | ||||
| @ -114,7 +115,6 @@ type Downloader struct { | ||||
| 	// Statistics
 | ||||
| 	syncStatsChainOrigin uint64       // Origin block number where syncing started at
 | ||||
| 	syncStatsChainHeight uint64       // Highest block number known when syncing started
 | ||||
| 	syncStatsStateTotal  uint64       // Total number of node state entries known so far
 | ||||
| 	syncStatsStateDone   uint64       // Number of state trie entries already pulled
 | ||||
| 	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
 | ||||
| 
 | ||||
| @ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode | ||||
| 			empty = true | ||||
| 		} | ||||
| 	} | ||||
| 	// Reset any ephemeral sync statistics
 | ||||
| 	d.syncStatsLock.Lock() | ||||
| 	d.syncStatsStateTotal = 0 | ||||
| 	d.syncStatsStateDone = 0 | ||||
| 	d.syncStatsLock.Unlock() | ||||
| 
 | ||||
| 	// Create cancel channel for aborting mid-flight
 | ||||
| 	d.cancelLock.Lock() | ||||
| 	d.cancelCh = make(chan struct{}) | ||||
| @ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e | ||||
| 		d.syncStatsLock.Unlock() | ||||
| 
 | ||||
| 		// Initiate the sync using a concurrent hash and block retrieval algorithm
 | ||||
| 		d.queue.Prepare(origin+1, d.mode, 0) | ||||
| 		d.queue.Prepare(origin+1, d.mode, 0, nil) | ||||
| 		if d.syncInitHook != nil { | ||||
| 			d.syncInitHook(origin, latest) | ||||
| 		} | ||||
| @ -397,7 +391,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		origin, err := d.findAncestor(p, latest) | ||||
| 		height := latest.Number.Uint64() | ||||
| 
 | ||||
| 		origin, err := d.findAncestor(p, height) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @ -405,22 +401,22 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e | ||||
| 		if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { | ||||
| 			d.syncStatsChainOrigin = origin | ||||
| 		} | ||||
| 		d.syncStatsChainHeight = latest | ||||
| 		d.syncStatsChainHeight = height | ||||
| 		d.syncStatsLock.Unlock() | ||||
| 
 | ||||
| 		// Initiate the sync using a concurrent header and content retrieval algorithm
 | ||||
| 		pivot := uint64(0) | ||||
| 		switch d.mode { | ||||
| 		case LightSync: | ||||
| 			pivot = latest | ||||
| 			pivot = height | ||||
| 		case FastSync: | ||||
| 			// Calculate the new fast/slow sync pivot point
 | ||||
| 			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) | ||||
| 			if err != nil { | ||||
| 				panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) | ||||
| 			} | ||||
| 			if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { | ||||
| 				pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64() | ||||
| 			if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { | ||||
| 				pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() | ||||
| 			} | ||||
| 			// If the point is below the origin, move origin back to ensure state download
 | ||||
| 			if pivot < origin { | ||||
| @ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e | ||||
| 			} | ||||
| 			glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) | ||||
| 		} | ||||
| 		d.queue.Prepare(origin+1, d.mode, pivot) | ||||
| 		d.queue.Prepare(origin+1, d.mode, pivot, latest) | ||||
| 		if d.syncInitHook != nil { | ||||
| 			d.syncInitHook(origin, latest) | ||||
| 			d.syncInitHook(origin, height) | ||||
| 		} | ||||
| 		return d.spawnSync(origin+1, | ||||
| 			func() error { return d.fetchHeaders(p, origin+1) },    // Headers are always retrieved
 | ||||
| @ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { | ||||
| 
 | ||||
| // fetchHeight retrieves the head header of the remote peer to aid in estimating
 | ||||
| // the total time a pending synchronisation would take.
 | ||||
| func (d *Downloader) fetchHeight(p *peer) (uint64, error) { | ||||
| func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { | ||||
| 	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) | ||||
| 
 | ||||
| 	// Request the advertised remote head block and wait for the response
 | ||||
| @ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-d.cancelCh: | ||||
| 			return 0, errCancelBlockFetch | ||||
| 			return nil, errCancelBlockFetch | ||||
| 
 | ||||
| 		case packet := <-d.headerCh: | ||||
| 			// Discard anything not from the origin peer
 | ||||
| @ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { | ||||
| 			headers := packet.(*headerPack).headers | ||||
| 			if len(headers) != 1 { | ||||
| 				glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) | ||||
| 				return 0, errBadPeer | ||||
| 				return nil, errBadPeer | ||||
| 			} | ||||
| 			return headers[0].Number.Uint64(), nil | ||||
| 			return headers[0], nil | ||||
| 
 | ||||
| 		case <-timeout: | ||||
| 			glog.V(logger.Debug).Infof("%v: head header timeout", p) | ||||
| 			return 0, errTimeout | ||||
| 			return nil, errTimeout | ||||
| 
 | ||||
| 		case <-d.bodyCh: | ||||
| 		case <-d.stateCh: | ||||
| @ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error { | ||||
| 		deliver = func(packet dataPack) (int, error) { | ||||
| 			start := time.Now() | ||||
| 			return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { | ||||
| 				// If the peer gave us nothing, stalling fast sync, drop
 | ||||
| 				if delivered == 0 { | ||||
| 					glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId()) | ||||
| 					d.dropPeer(packet.PeerId()) | ||||
| 				// If the peer returned old-requested data, forgive
 | ||||
| 				if err == trie.ErrNotRequested { | ||||
| 					glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) | ||||
| 					return | ||||
| 				} | ||||
| 				if err != nil { | ||||
| 					// If the node data processing failed, the root hash is very wrong, abort
 | ||||
| @ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error { | ||||
| 					return | ||||
| 				} | ||||
| 				// Processing succeeded, notify state fetcher of continuation
 | ||||
| 				if d.queue.PendingNodeData() > 0 { | ||||
| 				pending := d.queue.PendingNodeData() | ||||
| 				if pending > 0 { | ||||
| 					select { | ||||
| 					case d.stateWakeCh <- true: | ||||
| 					default: | ||||
| 					} | ||||
| 				} | ||||
| 				// Log a message to the user and return
 | ||||
| 				d.syncStatsLock.Lock() | ||||
| 				defer d.syncStatsLock.Unlock() | ||||
| 				d.syncStatsStateDone += uint64(delivered) | ||||
| 				glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) | ||||
| 				d.syncStatsLock.Unlock() | ||||
| 
 | ||||
| 				// Log a message to the user and return
 | ||||
| 				if delivered > 0 { | ||||
| 					glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending) | ||||
| 				} | ||||
| 			}) | ||||
| 		} | ||||
| 		expire   = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } | ||||
|  | ||||
| @ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, | ||||
| 
 | ||||
| // Prepare configures the result cache to allow accepting and caching inbound
 | ||||
| // fetch results.
 | ||||
| func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { | ||||
| func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { | ||||
| 	q.lock.Lock() | ||||
| 	defer q.lock.Unlock() | ||||
| 
 | ||||
| 	// Prepare the queue for sync results
 | ||||
| 	if q.resultOffset < offset { | ||||
| 		q.resultOffset = offset | ||||
| 	} | ||||
| 	q.fastSyncPivot = pivot | ||||
| 	q.mode = mode | ||||
| 
 | ||||
| 	// If long running fast sync, also start up a head stateretrieval immediately
 | ||||
| 	if mode == FastSync && pivot > 0 { | ||||
| 		q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -17,6 +17,7 @@ | ||||
| package trie | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| @ -24,6 +25,10 @@ import ( | ||||
| 	"gopkg.in/karalabe/cookiejar.v2/collections/prque" | ||||
| ) | ||||
| 
 | ||||
| // ErrNotRequested is returned by the trie sync when it's requested to process a
 | ||||
| // node it did not request.
 | ||||
| var ErrNotRequested = errors.New("not requested") | ||||
| 
 | ||||
| // request represents a scheduled or already in-flight state retrieval request.
 | ||||
| type request struct { | ||||
| 	hash   common.Hash // Hash of the node data content to retrieve
 | ||||
| @ -143,7 +148,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { | ||||
| 		// If the item was not requested, bail out
 | ||||
| 		request := s.requests[item.Hash] | ||||
| 		if request == nil { | ||||
| 			return i, fmt.Errorf("not requested: %x", item.Hash) | ||||
| 			return i, ErrNotRequested | ||||
| 		} | ||||
| 		// If the item is a raw entry request, commit directly
 | ||||
| 		if request.object == nil { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user