downloader: improved downloading and synchronisation
* Downloader's peers keeps track of peer's previously requested hashes so that we don't have to re-request * Changed `AddBlock` to be fully synchronous
This commit is contained in:
parent
60613b57d1
commit
c2c24b3bb4
@ -29,6 +29,9 @@ var (
|
|||||||
errLowTd = errors.New("peer's TD is too low")
|
errLowTd = errors.New("peer's TD is too low")
|
||||||
errBusy = errors.New("busy")
|
errBusy = errors.New("busy")
|
||||||
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
||||||
|
errBadPeer = errors.New("action from bad peer ignored")
|
||||||
|
errTimeout = errors.New("timeout")
|
||||||
|
errEmptyHashSet = errors.New("empty hash set by peer")
|
||||||
)
|
)
|
||||||
|
|
||||||
type hashCheckFn func(common.Hash) bool
|
type hashCheckFn func(common.Hash) bool
|
||||||
@ -116,73 +119,6 @@ func (d *Downloader) UnregisterPeer(id string) {
|
|||||||
delete(d.peers, id)
|
delete(d.peers, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
|
||||||
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
|
||||||
// checks fail an error will be returned. This method is synchronous
|
|
||||||
func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
|
|
||||||
// Check if we're busy
|
|
||||||
if d.isBusy() {
|
|
||||||
return nil, errBusy
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to select a peer. This can either be nothing, which returns, best peer
|
|
||||||
// or selected peer. If no peer could be found an error will be returned
|
|
||||||
var p *peer
|
|
||||||
if len(id) == 0 {
|
|
||||||
p = d.peers[id]
|
|
||||||
if p == nil {
|
|
||||||
return nil, errUnknownPeer
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
p = d.peers.bestPeer()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure our td is lower than the peer's td
|
|
||||||
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
|
|
||||||
return nil, errLowTd
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the hash from the peer and initiate the downloading progress.
|
|
||||||
err := d.getFromPeer(p, p.recentHash, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return d.queue.blocks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Synchronise will synchronise using the best peer.
|
|
||||||
func (d *Downloader) Synchronise() (types.Blocks, error) {
|
|
||||||
return d.SynchroniseWithPeer("")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
|
|
||||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
|
||||||
// Start the fetcher. This will block the update entirely
|
|
||||||
// interupts need to be send to the appropriate channels
|
|
||||||
// respectively.
|
|
||||||
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
|
|
||||||
// handle error
|
|
||||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
|
||||||
// XXX Reset
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start fetching blocks in paralel. The strategy is simple
|
|
||||||
// take any available peers, seserve a chunk for each peer available,
|
|
||||||
// let the peer deliver the chunkn and periodically check if a peer
|
|
||||||
// has timedout. When done downloading, process blocks.
|
|
||||||
if err := d.startFetchingBlocks(p); err != nil {
|
|
||||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
|
||||||
// XXX reset
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(logger.Detail).Infoln("Sync completed")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) peerHandler() {
|
func (d *Downloader) peerHandler() {
|
||||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||||
itimer := time.NewTimer(peerCountTimeout)
|
itimer := time.NewTimer(peerCountTimeout)
|
||||||
@ -236,34 +172,14 @@ out:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case sync := <-d.syncCh:
|
case sync := <-d.syncCh:
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
var peer *peer = sync.peer
|
var peer *peer = sync.peer
|
||||||
|
|
||||||
d.activePeer = peer.id
|
d.activePeer = peer.id
|
||||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", peer.id)
|
|
||||||
// Start the fetcher. This will block the update entirely
|
err := d.getFromPeer(peer, sync.hash, sync.ignoreInitial)
|
||||||
// interupts need to be send to the appropriate channels
|
if err != nil {
|
||||||
// respectively.
|
|
||||||
if err := d.startFetchingHashes(peer, sync.hash, sync.ignoreInitial); err != nil {
|
|
||||||
// handle error
|
|
||||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
|
||||||
// XXX Reset
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start fetching blocks in paralel. The strategy is simple
|
|
||||||
// take any available peers, seserve a chunk for each peer available,
|
|
||||||
// let the peer deliver the chunkn and periodically check if a peer
|
|
||||||
// has timedout. When done downloading, process blocks.
|
|
||||||
if err := d.startFetchingBlocks(peer); err != nil {
|
|
||||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
|
||||||
// XXX reset
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(logger.Detail).Infoln("Network sync completed in", time.Since(start))
|
|
||||||
|
|
||||||
d.process()
|
d.process()
|
||||||
case <-d.quit:
|
case <-d.quit:
|
||||||
break out
|
break out
|
||||||
@ -314,9 +230,8 @@ out:
|
|||||||
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id)
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id)
|
||||||
d.queue.reset()
|
d.queue.reset()
|
||||||
|
|
||||||
break out
|
return errEmptyHashSet
|
||||||
} else if !done { // Check if we're done fetching
|
} else if !done { // Check if we're done fetching
|
||||||
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
|
|
||||||
// Get the next set of hashes
|
// Get the next set of hashes
|
||||||
p.getHashes(hashes[len(hashes)-1])
|
p.getHashes(hashes[len(hashes)-1])
|
||||||
} else { // we're done
|
} else { // we're done
|
||||||
@ -324,9 +239,12 @@ out:
|
|||||||
}
|
}
|
||||||
case <-failureResponse.C:
|
case <-failureResponse.C:
|
||||||
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
||||||
|
// TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
|
||||||
|
// 1. check for peer's best hash to be included in the current hash set;
|
||||||
|
// 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer.
|
||||||
d.queue.reset()
|
d.queue.reset()
|
||||||
|
|
||||||
break out
|
return errTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
|
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
|
||||||
@ -367,7 +285,6 @@ out:
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
//fmt.Println("fetching for", peer.id)
|
|
||||||
// XXX make fetch blocking.
|
// XXX make fetch blocking.
|
||||||
// Fetch the chunk and check for error. If the peer was somehow
|
// Fetch the chunk and check for error. If the peer was somehow
|
||||||
// already fetching a chunk due to a bug, it will be returned to
|
// already fetching a chunk due to a bug, it will be returned to
|
||||||
@ -417,7 +334,6 @@ out:
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
//fmt.Println(d.queue.hashPool.Size(), len(d.queue.fetching))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -441,11 +357,14 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
|||||||
|
|
||||||
// Add an (unrequested) block to the downloader. This is usually done through the
|
// Add an (unrequested) block to the downloader. This is usually done through the
|
||||||
// NewBlockMsg by the protocol handler.
|
// NewBlockMsg by the protocol handler.
|
||||||
func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
|
// Adding blocks is done synchronously. if there are missing blocks, blocks will be
|
||||||
|
// fetched first. If the downloader is busy or if some other processed failed an error
|
||||||
|
// will be returned.
|
||||||
|
func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error {
|
||||||
hash := block.Hash()
|
hash := block.Hash()
|
||||||
|
|
||||||
if d.hasBlock(hash) {
|
if d.hasBlock(hash) {
|
||||||
return
|
return fmt.Errorf("known block %x", hash.Bytes()[:4])
|
||||||
}
|
}
|
||||||
|
|
||||||
peer := d.peers.getPeer(id)
|
peer := d.peers.getPeer(id)
|
||||||
@ -453,7 +372,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
|
|||||||
// and add the block. Otherwise just ignore it
|
// and add the block. Otherwise just ignore it
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
|
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
|
||||||
return
|
return errBadPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.mu.Lock()
|
peer.mu.Lock()
|
||||||
@ -466,17 +385,24 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
|
|||||||
d.queue.addBlock(id, block, td)
|
d.queue.addBlock(id, block, td)
|
||||||
|
|
||||||
// if neither go ahead to process
|
// if neither go ahead to process
|
||||||
if !d.isBusy() {
|
if d.isBusy() {
|
||||||
|
return errBusy
|
||||||
|
}
|
||||||
|
|
||||||
// Check if the parent of the received block is known.
|
// Check if the parent of the received block is known.
|
||||||
// If the block is not know, request it otherwise, request.
|
// If the block is not know, request it otherwise, request.
|
||||||
phash := block.ParentHash()
|
phash := block.ParentHash()
|
||||||
if !d.hasBlock(phash) {
|
if !d.hasBlock(phash) {
|
||||||
glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
|
glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
|
||||||
d.syncCh <- syncPack{peer, peer.recentHash, true}
|
|
||||||
} else {
|
// Get the missing hashes from the peer (synchronously)
|
||||||
d.process()
|
err := d.getFromPeer(peer, peer.recentHash, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return d.process()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
|
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"gopkg.in/fatih/set.v0"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -64,13 +65,23 @@ type peer struct {
|
|||||||
td *big.Int
|
td *big.Int
|
||||||
recentHash common.Hash
|
recentHash common.Hash
|
||||||
|
|
||||||
|
requested *set.Set
|
||||||
|
|
||||||
getHashes hashFetcherFn
|
getHashes hashFetcherFn
|
||||||
getBlocks blockFetcherFn
|
getBlocks blockFetcherFn
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new peer
|
// create a new peer
|
||||||
func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
|
func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
|
||||||
return &peer{id: id, td: td, recentHash: hash, getHashes: getHashes, getBlocks: getBlocks, state: idleState}
|
return &peer{
|
||||||
|
id: id,
|
||||||
|
td: td,
|
||||||
|
recentHash: hash,
|
||||||
|
getHashes: getHashes,
|
||||||
|
getBlocks: getBlocks,
|
||||||
|
state: idleState,
|
||||||
|
requested: set.New(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch a chunk using the peer
|
// fetch a chunk using the peer
|
||||||
@ -82,6 +93,8 @@ func (p *peer) fetch(chunk *chunk) error {
|
|||||||
return errors.New("peer already fetching chunk")
|
return errors.New("peer already fetching chunk")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.requested.Merge(chunk.hashes)
|
||||||
|
|
||||||
// set working state
|
// set working state
|
||||||
p.state = workingState
|
p.state = workingState
|
||||||
// convert the set to a fetchable slice
|
// convert the set to a fetchable slice
|
||||||
|
@ -65,6 +65,9 @@ func (c *queue) get(p *peer, max int) *chunk {
|
|||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
// remove hashes that have previously been fetched
|
||||||
|
hashes.Separate(p.requested)
|
||||||
|
|
||||||
// remove the fetchable hashes from hash pool
|
// remove the fetchable hashes from hash pool
|
||||||
c.hashPool.Separate(hashes)
|
c.hashPool.Separate(hashes)
|
||||||
c.fetchPool.Merge(hashes)
|
c.fetchPool.Merge(hashes)
|
||||||
|
77
eth/downloader/synchronous.go
Normal file
77
eth/downloader/synchronous.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
package downloader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/logger"
|
||||||
|
"github.com/ethereum/go-ethereum/logger/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
|
||||||
|
|
||||||
|
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
||||||
|
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
||||||
|
// checks fail an error will be returned. This method is synchronous
|
||||||
|
func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
|
||||||
|
// Check if we're busy
|
||||||
|
if d.isBusy() {
|
||||||
|
return nil, errBusy
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to select a peer. This can either be nothing, which returns, best peer
|
||||||
|
// or selected peer. If no peer could be found an error will be returned
|
||||||
|
var p *peer
|
||||||
|
if len(id) == 0 {
|
||||||
|
p = d.peers[id]
|
||||||
|
if p == nil {
|
||||||
|
return nil, errUnknownPeer
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p = d.peers.bestPeer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure our td is lower than the peer's td
|
||||||
|
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
|
||||||
|
return nil, errLowTd
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the hash from the peer and initiate the downloading progress.
|
||||||
|
err := d.getFromPeer(p, p.recentHash, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.queue.blocks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Synchronise will synchronise using the best peer.
|
||||||
|
func (d *Downloader) Synchronise() (types.Blocks, error) {
|
||||||
|
return d.SynchroniseWithPeer("")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||||
|
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
||||||
|
// Start the fetcher. This will block the update entirely
|
||||||
|
// interupts need to be send to the appropriate channels
|
||||||
|
// respectively.
|
||||||
|
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
|
||||||
|
// handle error
|
||||||
|
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
||||||
|
// XXX Reset
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start fetching blocks in paralel. The strategy is simple
|
||||||
|
// take any available peers, seserve a chunk for each peer available,
|
||||||
|
// let the peer deliver the chunkn and periodically check if a peer
|
||||||
|
// has timedout. When done downloading, process blocks.
|
||||||
|
if err := d.startFetchingBlocks(p); err != nil {
|
||||||
|
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
||||||
|
// XXX reset
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(logger.Detail).Infoln("Sync completed")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user