Merge pull request #1243 from karalabe/instrument-downloader-sync
eth, eth/downloader: separate concerns, clean up test suite
This commit is contained in:
commit
263903378b
@ -193,7 +193,6 @@ type Ethereum struct {
|
||||
whisper *whisper.Whisper
|
||||
pow *ethash.Ethash
|
||||
protocolManager *ProtocolManager
|
||||
downloader *downloader.Downloader
|
||||
SolcPath string
|
||||
solc *compiler.Solidity
|
||||
|
||||
@ -290,14 +289,13 @@ func New(config *Config) (*Ethereum, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
|
||||
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
|
||||
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux())
|
||||
eth.chainManager.SetProcessor(eth.blockProcessor)
|
||||
eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager)
|
||||
|
||||
eth.miner = miner.New(eth, eth.EventMux(), eth.pow)
|
||||
eth.miner.SetGasPrice(config.GasPrice)
|
||||
|
||||
eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader)
|
||||
if config.Shh {
|
||||
eth.whisper = whisper.New()
|
||||
eth.shhVersionId = int(eth.whisper.Version())
|
||||
@ -447,7 +445,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio
|
||||
func (s *Ethereum) EthVersion() int { return s.ethVersionId }
|
||||
func (s *Ethereum) NetVersion() int { return s.netVersionId }
|
||||
func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
|
||||
func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader }
|
||||
func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
|
||||
|
||||
// Start the ethereum
|
||||
func (s *Ethereum) Start() error {
|
||||
|
@ -3,6 +3,7 @@ package downloader
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -28,32 +29,40 @@ var (
|
||||
crossCheckCycle = time.Second // Period after which to check for expired cross checks
|
||||
|
||||
maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
|
||||
maxBlockProcess = 256 // Number of blocks to import at once into the chain
|
||||
)
|
||||
|
||||
var (
|
||||
errLowTd = errors.New("peers TD is too low")
|
||||
ErrBusy = errors.New("busy")
|
||||
errUnknownPeer = errors.New("peer is unknown or unhealthy")
|
||||
ErrBadPeer = errors.New("action from bad peer ignored")
|
||||
ErrStallingPeer = errors.New("peer is stalling")
|
||||
errBannedHead = errors.New("peer head hash already banned")
|
||||
errNoPeers = errors.New("no peers to keep download active")
|
||||
ErrPendingQueue = errors.New("pending items in queue")
|
||||
ErrTimeout = errors.New("timeout")
|
||||
ErrEmptyHashSet = errors.New("empty hash set by peer")
|
||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||
errAlreadyInPool = errors.New("hash already in pool")
|
||||
ErrInvalidChain = errors.New("retrieved hash chain is invalid")
|
||||
ErrCrossCheckFailed = errors.New("block cross-check failed")
|
||||
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
|
||||
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
|
||||
errNoSyncActive = errors.New("no sync active")
|
||||
errBusy = errors.New("busy")
|
||||
errUnknownPeer = errors.New("peer is unknown or unhealthy")
|
||||
errBadPeer = errors.New("action from bad peer ignored")
|
||||
errStallingPeer = errors.New("peer is stalling")
|
||||
errBannedHead = errors.New("peer head hash already banned")
|
||||
errNoPeers = errors.New("no peers to keep download active")
|
||||
errPendingQueue = errors.New("pending items in queue")
|
||||
errTimeout = errors.New("timeout")
|
||||
errEmptyHashSet = errors.New("empty hash set by peer")
|
||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||
errAlreadyInPool = errors.New("hash already in pool")
|
||||
errInvalidChain = errors.New("retrieved hash chain is invalid")
|
||||
errCrossCheckFailed = errors.New("block cross-check failed")
|
||||
errCancelHashFetch = errors.New("hash fetching canceled (requested)")
|
||||
errCancelBlockFetch = errors.New("block downloading canceled (requested)")
|
||||
errCancelChainImport = errors.New("chain importing canceled (requested)")
|
||||
errNoSyncActive = errors.New("no sync active")
|
||||
)
|
||||
|
||||
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
|
||||
type hashCheckFn func(common.Hash) bool
|
||||
type getBlockFn func(common.Hash) *types.Block
|
||||
|
||||
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
|
||||
type blockRetrievalFn func(common.Hash) *types.Block
|
||||
|
||||
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
|
||||
type chainInsertFn func(types.Blocks) (int, error)
|
||||
type hashIterFn func() (common.Hash, error)
|
||||
|
||||
// peerDropFn is a callback type for dropping a peer detected as malicious.
|
||||
type peerDropFn func(id string)
|
||||
|
||||
type blockPack struct {
|
||||
peerId string
|
||||
@ -85,12 +94,16 @@ type Downloader struct {
|
||||
importLock sync.Mutex
|
||||
|
||||
// Callbacks
|
||||
hasBlock hashCheckFn
|
||||
getBlock getBlockFn
|
||||
hasBlock hashCheckFn // Checks if a block is present in the chain
|
||||
getBlock blockRetrievalFn // Retrieves a block from the chain
|
||||
insertChain chainInsertFn // Injects a batch of blocks into the chain
|
||||
dropPeer peerDropFn // Retrieved the TD of our own chain
|
||||
|
||||
// Status
|
||||
synchronising int32
|
||||
notified int32
|
||||
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
|
||||
synchronising int32
|
||||
processing int32
|
||||
notified int32
|
||||
|
||||
// Channels
|
||||
newPeerCh chan *peer
|
||||
@ -107,17 +120,20 @@ type Block struct {
|
||||
OriginPeer string
|
||||
}
|
||||
|
||||
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
|
||||
// New creates a new downloader to fetch hashes and blocks from remote peers.
|
||||
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
|
||||
// Create the base downloader
|
||||
downloader := &Downloader{
|
||||
mux: mux,
|
||||
queue: newQueue(),
|
||||
peers: newPeerSet(),
|
||||
hasBlock: hasBlock,
|
||||
getBlock: getBlock,
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
hashCh: make(chan hashPack, 1),
|
||||
blockCh: make(chan blockPack, 1),
|
||||
mux: mux,
|
||||
queue: newQueue(),
|
||||
peers: newPeerSet(),
|
||||
hasBlock: hasBlock,
|
||||
getBlock: getBlock,
|
||||
insertChain: insertChain,
|
||||
dropPeer: dropPeer,
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
hashCh: make(chan hashPack, 1),
|
||||
blockCh: make(chan blockPack, 1),
|
||||
}
|
||||
// Inject all the known bad hashes
|
||||
downloader.banned = set.New()
|
||||
@ -150,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t
|
||||
return
|
||||
}
|
||||
|
||||
// Synchronising returns the state of the downloader
|
||||
// Synchronising returns whether the downloader is currently retrieving blocks.
|
||||
func (d *Downloader) Synchronising() bool {
|
||||
return atomic.LoadInt32(&d.synchronising) > 0
|
||||
}
|
||||
@ -183,19 +199,47 @@ func (d *Downloader) UnregisterPeer(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Synchronise will select the peer and use it for synchronising. If an empty string is given
|
||||
// Synchronise tries to sync up our local block chain with a remote peer, both
|
||||
// adding various sanity checks as well as wrapping it with various log entries.
|
||||
func (d *Downloader) Synchronise(id string, head common.Hash) {
|
||||
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)
|
||||
|
||||
switch err := d.synchronise(id, head); err {
|
||||
case nil:
|
||||
glog.V(logger.Detail).Infof("Synchronisation completed")
|
||||
|
||||
case errBusy:
|
||||
glog.V(logger.Detail).Infof("Synchronisation already in progress")
|
||||
|
||||
case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
|
||||
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
|
||||
d.dropPeer(id)
|
||||
|
||||
case errPendingQueue:
|
||||
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
|
||||
|
||||
default:
|
||||
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// synchronise will select the peer and use it for synchronising. If an empty string is given
|
||||
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
||||
// checks fail an error will be returned. This method is synchronous
|
||||
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
||||
func (d *Downloader) synchronise(id string, hash common.Hash) error {
|
||||
// Mock out the synchonisation if testing
|
||||
if d.synchroniseMock != nil {
|
||||
return d.synchroniseMock(id, hash)
|
||||
}
|
||||
// Make sure only one goroutine is ever allowed past this point at once
|
||||
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
|
||||
return ErrBusy
|
||||
return errBusy
|
||||
}
|
||||
defer atomic.StoreInt32(&d.synchronising, 0)
|
||||
|
||||
// If the head hash is banned, terminate immediately
|
||||
if d.banned.Has(hash) {
|
||||
return ErrInvalidChain
|
||||
return errBannedHead
|
||||
}
|
||||
// Post a user notification of the sync (only once per session)
|
||||
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
||||
@ -209,7 +253,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
||||
|
||||
// Abort if the queue still contains some leftover data
|
||||
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
|
||||
return ErrPendingQueue
|
||||
return errPendingQueue
|
||||
}
|
||||
// Reset the queue and peer set to clean any internal leftover state
|
||||
d.queue.Reset()
|
||||
@ -225,19 +269,6 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
||||
return d.syncWithPeer(p, hash)
|
||||
}
|
||||
|
||||
// TakeBlocks takes blocks from the queue and yields them to the caller.
|
||||
func (d *Downloader) TakeBlocks() []*Block {
|
||||
blocks := d.queue.TakeBlocks()
|
||||
if len(blocks) > 0 {
|
||||
d.importLock.Lock()
|
||||
d.importStart = time.Now()
|
||||
d.importQueue = blocks
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
}
|
||||
return blocks
|
||||
}
|
||||
|
||||
// Has checks if the downloader knows about a particular hash, meaning that its
|
||||
// either already downloaded of pending retrieval.
|
||||
func (d *Downloader) Has(hash common.Hash) bool {
|
||||
@ -272,34 +303,26 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
||||
|
||||
// Cancel cancels all of the operations and resets the queue. It returns true
|
||||
// if the cancel operation was completed.
|
||||
func (d *Downloader) Cancel() bool {
|
||||
// If we're not syncing just return.
|
||||
hs, bs := d.queue.Size()
|
||||
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
|
||||
return false
|
||||
}
|
||||
func (d *Downloader) Cancel() {
|
||||
// Close the current cancel channel
|
||||
d.cancelLock.Lock()
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
// Channel was already closed
|
||||
default:
|
||||
close(d.cancelCh)
|
||||
if d.cancelCh != nil {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
// Channel was already closed
|
||||
default:
|
||||
close(d.cancelCh)
|
||||
}
|
||||
}
|
||||
d.cancelLock.Unlock()
|
||||
|
||||
// Reset the queue and import statistics
|
||||
// Reset the queue
|
||||
d.queue.Reset()
|
||||
|
||||
d.importLock.Lock()
|
||||
d.importQueue = nil
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// XXX Make synchronous
|
||||
// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
|
||||
// up until it finds a common ancestor. If the source peer times out, alternative
|
||||
// ones are tried for continuation.
|
||||
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
var (
|
||||
start = time.Now()
|
||||
@ -317,7 +340,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
<-timeout.C // timeout channel should be initially empty.
|
||||
|
||||
getHashes := func(from common.Hash) {
|
||||
active.getHashes(from)
|
||||
go active.getHashes(from)
|
||||
timeout.Reset(hashTTL)
|
||||
}
|
||||
|
||||
@ -342,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
// Make sure the peer actually gave something valid
|
||||
if len(hashPack.hashes) == 0 {
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
|
||||
return ErrEmptyHashSet
|
||||
return errEmptyHashSet
|
||||
}
|
||||
for index, hash := range hashPack.hashes {
|
||||
if d.banned.Has(hash) {
|
||||
@ -352,7 +375,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
if err := d.banBlocks(active.id, hash); err != nil {
|
||||
glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
|
||||
}
|
||||
return ErrInvalidChain
|
||||
return errInvalidChain
|
||||
}
|
||||
}
|
||||
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
|
||||
@ -369,12 +392,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
inserts := d.queue.Insert(hashPack.hashes)
|
||||
if len(inserts) == 0 && !done {
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
|
||||
return ErrBadPeer
|
||||
return errBadPeer
|
||||
}
|
||||
if !done {
|
||||
// Check that the peer is not stalling the sync
|
||||
if len(inserts) < MinHashFetch {
|
||||
return ErrStallingPeer
|
||||
return errStallingPeer
|
||||
}
|
||||
// Try and fetch a random block to verify the hash batch
|
||||
// Skip the last hash as the cross check races with the next hash fetch
|
||||
@ -386,9 +409,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
expire: time.Now().Add(blockSoftTTL),
|
||||
parent: parent,
|
||||
}
|
||||
active.getBlocks([]common.Hash{origin})
|
||||
go active.getBlocks([]common.Hash{origin})
|
||||
|
||||
// Also fetch a fresh
|
||||
// Also fetch a fresh batch of hashes
|
||||
getHashes(head)
|
||||
continue
|
||||
}
|
||||
@ -408,7 +431,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
block := blockPack.blocks[0]
|
||||
if check, ok := d.checks[block.Hash()]; ok {
|
||||
if block.ParentHash() != check.parent {
|
||||
return ErrCrossCheckFailed
|
||||
return errCrossCheckFailed
|
||||
}
|
||||
delete(d.checks, block.Hash())
|
||||
}
|
||||
@ -418,7 +441,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
for hash, check := range d.checks {
|
||||
if time.Now().After(check.expire) {
|
||||
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
|
||||
return ErrCrossCheckFailed
|
||||
return errCrossCheckFailed
|
||||
}
|
||||
}
|
||||
|
||||
@ -438,7 +461,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
// if all peers have been tried, abort the process entirely or if the hash is
|
||||
// the zero hash.
|
||||
if p == nil || (head == common.Hash{}) {
|
||||
return ErrTimeout
|
||||
return errTimeout
|
||||
}
|
||||
// set p to the active peer. this will invalidate any hashes that may be returned
|
||||
// by our previous (delayed) peer.
|
||||
@ -495,12 +518,13 @@ out:
|
||||
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
|
||||
break
|
||||
}
|
||||
// All was successful, promote the peer
|
||||
// All was successful, promote the peer and potentially start processing
|
||||
peer.Promote()
|
||||
peer.SetIdle()
|
||||
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
|
||||
go d.process()
|
||||
|
||||
case ErrInvalidChain:
|
||||
case errInvalidChain:
|
||||
// The hash chain is invalid (blocks are not ordered properly), abort
|
||||
return err
|
||||
|
||||
@ -617,7 +641,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
|
||||
return errCancelBlockFetch
|
||||
|
||||
case <-timeout:
|
||||
return ErrTimeout
|
||||
return errTimeout
|
||||
|
||||
case <-d.hashCh:
|
||||
// Out of bounds hashes received, ignore them
|
||||
@ -674,6 +698,92 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
|
||||
}
|
||||
}
|
||||
|
||||
// process takes blocks from the queue and tries to import them into the chain.
|
||||
//
|
||||
// The algorithmic flow is as follows:
|
||||
// - The `processing` flag is swapped to 1 to ensure singleton access
|
||||
// - The current `cancel` channel is retrieved to detect sync abortions
|
||||
// - Blocks are iteratively taken from the cache and inserted into the chain
|
||||
// - When the cache becomes empty, insertion stops
|
||||
// - The `processing` flag is swapped back to 0
|
||||
// - A post-exit check is made whether new blocks became available
|
||||
// - This step is important: it handles a potential race condition between
|
||||
// checking for no more work, and releasing the processing "mutex". In
|
||||
// between these state changes, a block may have arrived, but a processing
|
||||
// attempt denied, so we need to re-enter to ensure the block isn't left
|
||||
// to idle in the cache.
|
||||
func (d *Downloader) process() (err error) {
|
||||
// Make sure only one goroutine is ever allowed to process blocks at once
|
||||
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
|
||||
return
|
||||
}
|
||||
// If the processor just exited, but there are freshly pending items, try to
|
||||
// reenter. This is needed because the goroutine spinned up for processing
|
||||
// the fresh blocks might have been rejected entry to to this present thread
|
||||
// not yet releasing the `processing` state.
|
||||
defer func() {
|
||||
if err == nil && d.queue.GetHeadBlock() != nil {
|
||||
err = d.process()
|
||||
}
|
||||
}()
|
||||
// Release the lock upon exit (note, before checking for reentry!), and set
|
||||
// the import statistics to zero.
|
||||
defer func() {
|
||||
d.importLock.Lock()
|
||||
d.importQueue = nil
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
|
||||
atomic.StoreInt32(&d.processing, 0)
|
||||
}()
|
||||
|
||||
// Fetch the current cancel channel to allow termination
|
||||
d.cancelLock.RLock()
|
||||
cancel := d.cancelCh
|
||||
d.cancelLock.RUnlock()
|
||||
|
||||
// Repeat the processing as long as there are blocks to import
|
||||
for {
|
||||
// Fetch the next batch of blocks
|
||||
blocks := d.queue.TakeBlocks()
|
||||
if len(blocks) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Reset the import statistics
|
||||
d.importLock.Lock()
|
||||
d.importStart = time.Now()
|
||||
d.importQueue = blocks
|
||||
d.importDone = 0
|
||||
d.importLock.Unlock()
|
||||
|
||||
// Actually import the blocks
|
||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
|
||||
for len(blocks) != 0 { // TODO: quit
|
||||
// Check for any termination requests
|
||||
select {
|
||||
case <-cancel:
|
||||
return errCancelChainImport
|
||||
default:
|
||||
}
|
||||
// Retrieve the first batch of blocks to insert
|
||||
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
|
||||
raw := make(types.Blocks, 0, max)
|
||||
for _, block := range blocks[:max] {
|
||||
raw = append(raw, block.RawBlock)
|
||||
}
|
||||
// Try to inset the blocks, drop the originating peer if there's an error
|
||||
index, err := d.insertChain(raw)
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
|
||||
d.dropPeer(blocks[index].OriginPeer)
|
||||
d.Cancel()
|
||||
return errCancelChainImport
|
||||
}
|
||||
blocks = blocks[max:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeliverBlocks injects a new batch of blocks received from a remote node.
|
||||
// This is usually invoked through the BlocksMsg by the protocol handler.
|
||||
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -74,7 +74,7 @@ func (p *peer) Fetch(request *fetchRequest) error {
|
||||
for hash, _ := range request.Hashes {
|
||||
hashes = append(hashes, hash)
|
||||
}
|
||||
p.getBlocks(hashes)
|
||||
go p.getBlocks(hashes)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -320,7 +320,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
|
||||
// If a requested block falls out of the range, the hash chain is invalid
|
||||
index := int(block.NumberU64()) - q.blockOffset
|
||||
if index >= len(q.blockCache) || index < 0 {
|
||||
return ErrInvalidChain
|
||||
return errInvalidChain
|
||||
}
|
||||
// Otherwise merge the block and mark the hash block
|
||||
q.blockCache[index] = &Block{
|
||||
|
@ -1,30 +0,0 @@
|
||||
package downloader
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"gopkg.in/fatih/set.v0"
|
||||
)
|
||||
|
||||
func createHashSet(hashes []common.Hash) *set.Set {
|
||||
hset := set.New()
|
||||
|
||||
for _, hash := range hashes {
|
||||
hset.Add(hash)
|
||||
}
|
||||
|
||||
return hset
|
||||
}
|
||||
|
||||
func createBlocksFromHashSet(hashes *set.Set) []*types.Block {
|
||||
blocks := make([]*types.Block, hashes.Size())
|
||||
|
||||
var i int
|
||||
hashes.Each(func(v interface{}) bool {
|
||||
blocks[i] = createBlock(i, common.Hash{}, v.(common.Hash))
|
||||
i++
|
||||
return true
|
||||
})
|
||||
|
||||
return blocks
|
||||
}
|
@ -68,12 +68,11 @@ type ProtocolManager struct {
|
||||
|
||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||
// with the ethereum network.
|
||||
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
|
||||
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
|
||||
manager := &ProtocolManager{
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
chainman: chainman,
|
||||
downloader: downloader,
|
||||
peers: newPeerSet(),
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
newHashCh: make(chan []*blockAnnounce, 1),
|
||||
@ -81,6 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
}
|
||||
manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
|
||||
manager.SubProtocol = p2p.Protocol{
|
||||
Name: "eth",
|
||||
Version: uint(protocolVersion),
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
@ -168,8 +167,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol
|
||||
db, _ = ethdb.NewMemDatabase()
|
||||
chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em)
|
||||
txpool = &fakeTxPool{added: txAdded}
|
||||
dl = downloader.New(em, chain.HasBlock, chain.GetBlock)
|
||||
pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain, dl)
|
||||
pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain)
|
||||
)
|
||||
pm.Start()
|
||||
return pm
|
||||
|
89
eth/sync.go
89
eth/sync.go
@ -1,14 +1,11 @@
|
||||
package eth
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
@ -16,12 +13,10 @@ import (
|
||||
|
||||
const (
|
||||
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
|
||||
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
|
||||
notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
|
||||
notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
|
||||
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
|
||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||
blockProcAmount = 256
|
||||
|
||||
// This is the target size for the packs of transactions sent by txsyncLoop.
|
||||
// A pack can get larger than this if a single transactions exceeds this size.
|
||||
@ -176,7 +171,7 @@ func (pm *ProtocolManager) fetcher() {
|
||||
// Send out all block requests
|
||||
for peer, hashes := range request {
|
||||
glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
|
||||
peer.requestBlocks(hashes)
|
||||
go peer.requestBlocks(hashes)
|
||||
}
|
||||
request = make(map[*peer][]common.Hash)
|
||||
|
||||
@ -219,7 +214,7 @@ func (pm *ProtocolManager) fetcher() {
|
||||
if announce := pending[hash]; announce != nil {
|
||||
// Drop the block if it surely cannot fit
|
||||
if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) {
|
||||
delete(pending, hash)
|
||||
// delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout?
|
||||
continue
|
||||
}
|
||||
// Otherwise accumulate for import
|
||||
@ -255,10 +250,10 @@ func (pm *ProtocolManager) fetcher() {
|
||||
// syncer is responsible for periodically synchronising with the network, both
|
||||
// downloading hashes and blocks as well as retrieving cached ones.
|
||||
func (pm *ProtocolManager) syncer() {
|
||||
forceSync := time.Tick(forceSyncCycle)
|
||||
blockProc := time.Tick(blockProcCycle)
|
||||
blockProcPend := int32(0)
|
||||
// Abort any pending syncs if we terminate
|
||||
defer pm.downloader.Cancel()
|
||||
|
||||
forceSync := time.Tick(forceSyncCycle)
|
||||
for {
|
||||
select {
|
||||
case <-pm.newPeerCh:
|
||||
@ -272,55 +267,12 @@ func (pm *ProtocolManager) syncer() {
|
||||
// Force a sync even if not enough peers are present
|
||||
go pm.synchronise(pm.peers.BestPeer())
|
||||
|
||||
case <-blockProc:
|
||||
// Try to pull some blocks from the downloaded
|
||||
if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
|
||||
go func() {
|
||||
pm.processBlocks()
|
||||
atomic.StoreInt32(&blockProcPend, 0)
|
||||
}()
|
||||
}
|
||||
|
||||
case <-pm.quitSync:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processBlocks retrieves downloaded blocks from the download cache and tries
|
||||
// to construct the local block chain with it. Note, since the block retrieval
|
||||
// order matters, access to this function *must* be synchronized/serialized.
|
||||
func (pm *ProtocolManager) processBlocks() error {
|
||||
pm.wg.Add(1)
|
||||
defer pm.wg.Done()
|
||||
|
||||
// Short circuit if no blocks are available for insertion
|
||||
blocks := pm.downloader.TakeBlocks()
|
||||
if len(blocks) == 0 {
|
||||
return nil
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
|
||||
|
||||
for len(blocks) != 0 && !pm.quit {
|
||||
// Retrieve the first batch of blocks to insert
|
||||
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
|
||||
raw := make(types.Blocks, 0, max)
|
||||
for _, block := range blocks[:max] {
|
||||
raw = append(raw, block.RawBlock)
|
||||
}
|
||||
// Try to inset the blocks, drop the originating peer if there's an error
|
||||
index, err := pm.chainman.InsertChain(raw)
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infoln("Downloaded block import failed:", err)
|
||||
pm.removePeer(blocks[index].OriginPeer)
|
||||
pm.downloader.Cancel()
|
||||
return err
|
||||
}
|
||||
blocks = blocks[max:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// synchronise tries to sync up our local block chain with a remote peer, both
|
||||
// adding various sanity checks as well as wrapping it with various log entries.
|
||||
func (pm *ProtocolManager) synchronise(peer *peer) {
|
||||
@ -332,33 +284,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
||||
if peer.Td().Cmp(pm.chainman.Td()) <= 0 {
|
||||
return
|
||||
}
|
||||
// FIXME if we have the hash in our chain and the TD of the peer is
|
||||
// much higher than ours, something is wrong with us or the peer.
|
||||
// Check if the hash is on our own chain
|
||||
head := peer.Head()
|
||||
if pm.chainman.HasBlock(head) {
|
||||
glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known")
|
||||
return
|
||||
}
|
||||
// Get the hashes from the peer (synchronously)
|
||||
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head)
|
||||
|
||||
err := pm.downloader.Synchronise(peer.id, head)
|
||||
switch err {
|
||||
case nil:
|
||||
glog.V(logger.Detail).Infof("Synchronisation completed")
|
||||
|
||||
case downloader.ErrBusy:
|
||||
glog.V(logger.Detail).Infof("Synchronisation already in progress")
|
||||
|
||||
case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
|
||||
glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
|
||||
pm.removePeer(peer.id)
|
||||
|
||||
case downloader.ErrPendingQueue:
|
||||
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
|
||||
|
||||
default:
|
||||
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
|
||||
}
|
||||
// Otherwise try to sync with the downloader
|
||||
pm.downloader.Synchronise(peer.id, peer.Head())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user