From eafdc1f8e371fd698da33491a01799393249729a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 26 May 2015 14:00:21 +0300 Subject: [PATCH] eth, eth/downloader: surface downloaded block origin, drop on error --- eth/downloader/downloader.go | 8 +++++++- eth/downloader/downloader_test.go | 8 ++++---- eth/downloader/queue.go | 20 +++++++++++--------- eth/handler.go | 12 ++++++------ eth/sync.go | 14 +++++++++++--- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index fd588d2f3..0634baaed 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -93,6 +93,12 @@ type Downloader struct { cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers } +// Block is an origin-tagged blockchain block. +type Block struct { + RawBlock *types.Block + OriginPeer string +} + func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { downloader := &Downloader{ mux: mux, @@ -177,7 +183,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } // TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() types.Blocks { +func (d *Downloader) TakeBlocks() []*Block { return d.queue.TakeBlocks() } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8b541d8b7..66be1ca18 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -88,10 +88,10 @@ func (dl *downloadTester) sync(peerId string, head common.Hash) error { // syncTake is starts synchronising with a remote peer, but concurrently it also // starts fetching blocks that the downloader retrieved. IT blocks until both go // routines terminate. -func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) { +func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) { // Start a block collector to take blocks as they become available done := make(chan struct{}) - took := []*types.Block{} + took := []*Block{} go func() { for running := true; running; { select { @@ -349,7 +349,7 @@ func TestNonExistingParentAttack(t *testing.T) { if len(bs) != 1 { t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } - if tester.hasBlock(bs[0].ParentHash()) { + if tester.hasBlock(bs[0].RawBlock.ParentHash()) { t.Fatalf("tester knows about the unknown hash") } tester.downloader.Cancel() @@ -364,7 +364,7 @@ func TestNonExistingParentAttack(t *testing.T) { if len(bs) != 1 { t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) } - if !tester.hasBlock(bs[0].ParentHash()) { + if !tester.hasBlock(bs[0].RawBlock.ParentHash()) { t.Fatalf("tester doesn't know about the origin hash") } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 591a37773..7ea400dc4 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -36,7 +36,7 @@ type queue struct { pendPool map[string]*fetchRequest // Currently pending block retrieval operations blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes - blockCache []*types.Block // Downloaded but not yet delivered blocks + blockCache []*Block // Downloaded but not yet delivered blocks blockOffset int // Offset of the first cached block in the block-chain lock sync.RWMutex @@ -148,7 +148,7 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash { // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't // been downloaded yet (or simply non existent). -func (q *queue) GetHeadBlock() *types.Block { +func (q *queue) GetHeadBlock() *Block { q.lock.RLock() defer q.lock.RUnlock() @@ -159,7 +159,7 @@ func (q *queue) GetHeadBlock() *types.Block { } // GetBlock retrieves a downloaded block, or nil if non-existent. -func (q *queue) GetBlock(hash common.Hash) *types.Block { +func (q *queue) GetBlock(hash common.Hash) *Block { q.lock.RLock() defer q.lock.RUnlock() @@ -176,18 +176,18 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block { } // TakeBlocks retrieves and permanently removes a batch of blocks from the cache. -func (q *queue) TakeBlocks() types.Blocks { +func (q *queue) TakeBlocks() []*Block { q.lock.Lock() defer q.lock.Unlock() // Accumulate all available blocks - var blocks types.Blocks + blocks := []*Block{} for _, block := range q.blockCache { if block == nil { break } blocks = append(blocks, block) - delete(q.blockPool, block.Hash()) + delete(q.blockPool, block.RawBlock.Hash()) } // Delete the blocks from the slice and let them be garbage collected // without this slice trick the blocks would stay in memory until nil @@ -312,8 +312,10 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { return ErrInvalidChain } // Otherwise merge the block and mark the hash block - q.blockCache[index] = block - + q.blockCache[index] = &Block{ + RawBlock: block, + OriginPeer: id, + } delete(request.Hashes, hash) delete(q.hashPool, hash) q.blockPool[hash] = int(block.NumberU64()) @@ -342,6 +344,6 @@ func (q *queue) Alloc(offset int) { size = blockCacheLimit } if len(q.blockCache) < size { - q.blockCache = append(q.blockCache, make([]*types.Block, size-len(q.blockCache))...) + q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...) } } diff --git a/eth/handler.go b/eth/handler.go index 9117a70de..777a9c7c0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -92,13 +92,13 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo return manager } -func (pm *ProtocolManager) removePeer(peer *peer) { +func (pm *ProtocolManager) removePeer(id string) { // Unregister the peer from the downloader - pm.downloader.UnregisterPeer(peer.id) + pm.downloader.UnregisterPeer(id) // Remove the peer from the Ethereum peer set too - glog.V(logger.Detail).Infoln("Removing peer", peer.id) - if err := pm.peers.Unregister(peer.id); err != nil { + glog.V(logger.Detail).Infoln("Removing peer", id) + if err := pm.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Removal failed:", err) } } @@ -148,7 +148,7 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Error).Infoln("Addition failed:", err) return err } - defer pm.removePeer(p) + defer pm.removePeer(p.id) if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil { return err @@ -315,7 +315,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") - self.removePeer(p) + self.removePeer(p.id) return nil } diff --git a/eth/sync.go b/eth/sync.go index 8a0da39ec..d93f83a78 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -57,13 +58,20 @@ func (pm *ProtocolManager) processBlocks() error { if len(blocks) == 0 { return nil } - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) for len(blocks) != 0 && !pm.quit { + // Retrieve the first batch of blocks to insert max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) - _, err := pm.chainman.InsertChain(blocks[:max]) + raw := make(types.Blocks, 0, max) + for _, block := range blocks[:max] { + raw = append(raw, block.RawBlock) + } + // Try to inset the blocks, drop the originating peer if there's an error + index, err := pm.chainman.InsertChain(raw) if err != nil { glog.V(logger.Warn).Infof("Block insertion failed: %v", err) + pm.removePeer(blocks[index].OriginPeer) pm.downloader.Cancel() return err } @@ -105,7 +113,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) - pm.removePeer(peer) + pm.removePeer(peer.id) case downloader.ErrPendingQueue: glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)