eth, eth/downloader: simplified synchronisation process

This commit is contained in:
obscuren 2015-04-24 15:37:32 +02:00
parent bd9c76097d
commit d84c2202e7
2 changed files with 24 additions and 31 deletions

View File

@ -96,14 +96,14 @@ func (d *Downloader) Stats() (current int, max int) {
return d.queue.blockHashes.Size(), d.queue.fetchPool.Size() + d.queue.hashPool.Size() return d.queue.blockHashes.Size(), d.queue.fetchPool.Size() + d.queue.hashPool.Size()
} }
func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td) glog.V(logger.Detail).Infoln("Register peer", id)
// Create a new peer and add it to the list of known peers // Create a new peer and add it to the list of known peers
peer := newPeer(id, td, hash, getHashes, getBlocks) peer := newPeer(id, hash, getHashes, getBlocks)
// add peer to our peer set // add peer to our peer set
d.peers[id] = peer d.peers[id] = peer
// broadcast new peer // broadcast new peer
@ -133,7 +133,7 @@ out:
break break
} }
d.process() d.process(peer)
case <-d.quit: case <-d.quit:
break out break out
} }
@ -143,27 +143,27 @@ out:
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given // SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the // it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous // checks fail an error will be returned. This method is synchronous
func (d *Downloader) Synchronise(id string, hash common.Hash) (types.Blocks, error) { func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Make sure it's doing neither. Once done we can restart the // Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on // downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching. // with whatever is going on. This prevents unecessary switching.
if d.isBusy() { if d.isBusy() {
return nil, errBusy return errBusy
} }
// Fetch the peer using the id or throw an error if the peer couldn't be found // Fetch the peer using the id or throw an error if the peer couldn't be found
p := d.peers[id] p := d.peers[id]
if p == nil { if p == nil {
return nil, errUnknownPeer return errUnknownPeer
} }
// Get the hash from the peer and initiate the downloading progress. // Get the hash from the peer and initiate the downloading progress.
err := d.getFromPeer(p, hash, false) err := d.getFromPeer(p, hash, false)
if err != nil { if err != nil {
return nil, err return err
} }
return d.queue.blocks, nil return d.process(p)
} }
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
@ -405,13 +405,12 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error
} }
peer.mu.Lock() peer.mu.Lock()
peer.td = td
peer.recentHash = block.Hash() peer.recentHash = block.Hash()
peer.mu.Unlock() peer.mu.Unlock()
peer.promote() peer.promote()
glog.V(logger.Detail).Infoln("Inserting new block from:", id) glog.V(logger.Detail).Infoln("Inserting new block from:", id)
d.queue.addBlock(id, block, td) d.queue.addBlock(id, block)
// if neither go ahead to process // if neither go ahead to process
if d.isBusy() { if d.isBusy() {
@ -431,10 +430,10 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error
} }
} }
return d.process() return d.process(peer)
} }
func (d *Downloader) process() error { func (d *Downloader) process(peer *peer) error {
atomic.StoreInt32(&d.processingBlocks, 1) atomic.StoreInt32(&d.processingBlocks, 1)
defer atomic.StoreInt32(&d.processingBlocks, 0) defer atomic.StoreInt32(&d.processingBlocks, 0)
@ -460,18 +459,8 @@ func (d *Downloader) process() error {
// grandparents can be requested and queued. // grandparents can be requested and queued.
err = d.insertChain(blocks[:max]) err = d.insertChain(blocks[:max])
if err != nil && core.IsParentErr(err) { if err != nil && core.IsParentErr(err) {
glog.V(logger.Debug).Infoln("Aborting process due to missing parent. Fetching hashes") glog.V(logger.Debug).Infoln("Aborting process due to missing parent.")
// TODO change this. This shite
for i, block := range blocks[:max] {
if !d.hasBlock(block.ParentHash()) {
d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
// remove processed blocks
blocks = blocks[i:]
break
}
}
break break
} else if err != nil { } else if err != nil {
// immediatly unregister the false peer but do not disconnect // immediatly unregister the false peer but do not disconnect

View File

@ -90,7 +90,7 @@ type ProtocolManager struct {
minedBlockSub event.Subscription minedBlockSub event.Subscription
newPeerCh chan *peer newPeerCh chan *peer
quit chan struct{} quitSync chan struct{}
} }
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@ -103,9 +103,8 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
downloader: downloader, downloader: downloader,
peers: make(map[string]*peer), peers: make(map[string]*peer),
newPeerCh: make(chan *peer, 1), newPeerCh: make(chan *peer, 1),
quit: make(chan struct{}), quitSync: make(chan struct{}),
} }
go manager.peerHandler()
manager.SubProtocol = p2p.Protocol{ manager.SubProtocol = p2p.Protocol{
Name: "eth", Name: "eth",
@ -123,7 +122,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
return manager return manager
} }
func (pm *ProtocolManager) peerHandler() { func (pm *ProtocolManager) syncHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount` // itimer is used to determine when to start ignoring `minDesiredPeerCount`
itimer := time.NewTimer(peerCountTimeout) itimer := time.NewTimer(peerCountTimeout)
out: out:
@ -153,7 +152,7 @@ out:
} else { } else {
itimer.Reset(5 * time.Second) itimer.Reset(5 * time.Second)
} }
case <-pm.quit: case <-pm.quitSync:
break out break out
} }
} }
@ -161,7 +160,7 @@ out:
func (pm *ProtocolManager) synchronise(peer *peer) { func (pm *ProtocolManager) synchronise(peer *peer) {
// Get the hashes from the peer (synchronously) // Get the hashes from the peer (synchronously)
_, err := pm.downloader.Synchronise(peer.id, peer.recentHash) err := pm.downloader.Synchronise(peer.id, peer.recentHash)
if err != nil { if err != nil {
// handle error // handle error
glog.V(logger.Debug).Infoln("error downloading:", err) glog.V(logger.Debug).Infoln("error downloading:", err)
@ -176,11 +175,15 @@ func (pm *ProtocolManager) Start() {
// broadcast mined blocks // broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop() go pm.minedBroadcastLoop()
// sync handler
go pm.syncHandler()
} }
func (pm *ProtocolManager) Stop() { func (pm *ProtocolManager) Stop() {
pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
close(pm.quitSync) // quits the sync handler
} }
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@ -198,7 +201,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
pm.peers[p.id] = p pm.peers[p.id] = p
pm.pmu.Unlock() pm.pmu.Unlock()
pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks) pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
defer func() { defer func() {
pm.pmu.Lock() pm.pmu.Lock()
defer pm.pmu.Unlock() defer pm.pmu.Unlock()
@ -370,6 +373,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
} else { } else {
// adding blocks is synchronous // adding blocks is synchronous
go func() { go func() {
// TODO check parent error
err := self.downloader.AddBlock(p.id, request.Block, request.TD) err := self.downloader.AddBlock(p.id, request.Block, request.TD)
if err != nil { if err != nil {
glog.V(logger.Detail).Infoln("downloader err:", err) glog.V(logger.Detail).Infoln("downloader err:", err)