eth, eth/downloader: remove bad peers from peer set
Peers in the eth protocol handler are now being ignored for catch up.
This commit is contained in:
parent
3fef601903
commit
9b6e8f6195
@ -30,7 +30,7 @@ var (
|
|||||||
errLowTd = errors.New("peer's TD is too low")
|
errLowTd = errors.New("peer's TD is too low")
|
||||||
errBusy = errors.New("busy")
|
errBusy = errors.New("busy")
|
||||||
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
||||||
errBadPeer = errors.New("action from bad peer ignored")
|
ErrBadPeer = errors.New("action from bad peer ignored")
|
||||||
errTimeout = errors.New("timeout")
|
errTimeout = errors.New("timeout")
|
||||||
errEmptyHashSet = errors.New("empty hash set by peer")
|
errEmptyHashSet = errors.New("empty hash set by peer")
|
||||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||||
@ -376,7 +376,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error
|
|||||||
// and add the block. Otherwise just ignore it
|
// and add the block. Otherwise just ignore it
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
|
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
|
||||||
return errBadPeer
|
return ErrBadPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.mu.Lock()
|
peer.mu.Lock()
|
||||||
@ -422,10 +422,7 @@ func (d *Downloader) process(peer *peer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var blocks = d.queue.blocks
|
||||||
blocks = d.queue.blocks
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
||||||
|
|
||||||
// Loop untill we're out of blocks
|
// Loop untill we're out of blocks
|
||||||
@ -435,7 +432,7 @@ func (d *Downloader) process(peer *peer) error {
|
|||||||
// processing and start requesting the `block.hash` so that it's parent and
|
// processing and start requesting the `block.hash` so that it's parent and
|
||||||
// grandparents can be requested and queued.
|
// grandparents can be requested and queued.
|
||||||
var i int
|
var i int
|
||||||
i, err = d.insertChain(blocks[:max])
|
i, err := d.insertChain(blocks[:max])
|
||||||
if err != nil && core.IsParentErr(err) {
|
if err != nil && core.IsParentErr(err) {
|
||||||
// Ignore the missing blocks. Handler should take care of anything that's missing.
|
// Ignore the missing blocks. Handler should take care of anything that's missing.
|
||||||
glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i)
|
glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i)
|
||||||
@ -447,8 +444,9 @@ func (d *Downloader) process(peer *peer) error {
|
|||||||
d.UnregisterPeer(d.activePeer)
|
d.UnregisterPeer(d.activePeer)
|
||||||
// Reset chain completely. This needs much, much improvement.
|
// Reset chain completely. This needs much, much improvement.
|
||||||
// instead: check all blocks leading down to this block false block and remove it
|
// instead: check all blocks leading down to this block false block and remove it
|
||||||
blocks = nil
|
d.queue.blocks = nil
|
||||||
break
|
|
||||||
|
return ErrBadPeer
|
||||||
}
|
}
|
||||||
blocks = blocks[max:]
|
blocks = blocks[max:]
|
||||||
}
|
}
|
||||||
@ -459,7 +457,7 @@ func (d *Downloader) process(peer *peer) error {
|
|||||||
} else {
|
} else {
|
||||||
d.queue.blocks = blocks
|
d.queue.blocks = blocks
|
||||||
}
|
}
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) isFetchingHashes() bool {
|
func (d *Downloader) isFetchingHashes() bool {
|
||||||
|
@ -122,6 +122,12 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
|||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *ProtocolManager) removePeer(peer *peer) {
|
||||||
|
pm.pmu.Lock()
|
||||||
|
defer pm.pmu.Unlock()
|
||||||
|
delete(pm.peers, peer.id)
|
||||||
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) syncHandler() {
|
func (pm *ProtocolManager) syncHandler() {
|
||||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||||
itimer := time.NewTimer(peerCountTimeout)
|
itimer := time.NewTimer(peerCountTimeout)
|
||||||
@ -172,7 +178,10 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
|||||||
glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td)
|
glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td)
|
||||||
// Get the hashes from the peer (synchronously)
|
// Get the hashes from the peer (synchronously)
|
||||||
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
||||||
if err != nil {
|
if err != nil && err == downloader.ErrBadPeer {
|
||||||
|
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
|
||||||
|
pm.removePeer(peer)
|
||||||
|
} else if err != nil {
|
||||||
// handle error
|
// handle error
|
||||||
glog.V(logger.Debug).Infoln("error downloading:", err)
|
glog.V(logger.Debug).Infoln("error downloading:", err)
|
||||||
}
|
}
|
||||||
@ -214,10 +223,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
|||||||
|
|
||||||
pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
|
pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
|
||||||
defer func() {
|
defer func() {
|
||||||
pm.pmu.Lock()
|
|
||||||
defer pm.pmu.Unlock()
|
|
||||||
delete(pm.peers, p.id)
|
|
||||||
pm.downloader.UnregisterPeer(p.id)
|
pm.downloader.UnregisterPeer(p.id)
|
||||||
|
pm.removePeer(p)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// propagate existing transactions. new transactions appearing
|
// propagate existing transactions. new transactions appearing
|
||||||
@ -379,16 +386,23 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
// if the parent does not exists we delegate to the downloader.
|
// if the parent does not exists we delegate to the downloader.
|
||||||
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
||||||
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
||||||
// handle error
|
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
|
||||||
|
|
||||||
|
self.removePeer(p)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
self.BroadcastBlock(hash, request.Block)
|
self.BroadcastBlock(hash, request.Block)
|
||||||
} else {
|
} else {
|
||||||
// adding blocks is synchronous
|
// adding blocks is synchronous
|
||||||
go func() {
|
go func() {
|
||||||
// TODO check parent error
|
|
||||||
err := self.downloader.AddBlock(p.id, request.Block, request.TD)
|
err := self.downloader.AddBlock(p.id, request.Block, request.TD)
|
||||||
if err != nil {
|
if err != nil && err == downloader.ErrBadPeer {
|
||||||
|
glog.V(logger.Error).Infoln("removed peer (", p.id, ") with err:", err)
|
||||||
|
|
||||||
|
self.removePeer(p)
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
glog.V(logger.Detail).Infoln("downloader err:", err)
|
glog.V(logger.Detail).Infoln("downloader err:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user