From 3d57e377a4e95941fd3f572b42e073b40d10d27c Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 12 Apr 2015 20:25:09 +0100 Subject: [PATCH] blockpool stability fixes: - follow up locks and fix them - chainManager: call SetQueued for parentErr future blocks, uncomment TD checks, unskip test - make ErrIncorrectTD non-fatal to be forgiving to genuine mistaken nodes (temp) but demote them to guard against stuck best peers. - add purging to bounded nodeCache (config nodeCacheSize) - use nodeCache when creating blockpool entries and let non-best peers add blocks (performance boost) - minor error in addError - reduce idleBestPeerTimeout to 1 minute - correct status counts and unskip status passing status test - glogified logging --- blockpool/blockpool.go | 184 ++++++++++++++++++++---------------- blockpool/blockpool_test.go | 36 ++----- blockpool/config_test.go | 4 +- blockpool/errors_test.go | 27 +++--- blockpool/peers.go | 169 ++++++++++++++++----------------- blockpool/peers_test.go | 113 ++++++++++++++-------- blockpool/section.go | 91 +++++++++--------- blockpool/status_test.go | 76 +++++++-------- core/chain_manager.go | 1 + errs/errors.go | 9 +- errs/errors_test.go | 2 +- eth/protocol.go | 12 +-- logger/glog/README | 12 +-- 13 files changed, 370 insertions(+), 366 deletions(-) diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 3ed2e92c7..a60b6f43c 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -11,13 +11,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/event" - ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/pow" ) -var plog = ethlogger.NewLogger("Blockpool") - var ( // max number of block hashes sent in one request blockHashesBatchSize = 256 @@ -36,11 +34,13 @@ var ( // timeout interval: max time allowed for peer without sending a block blocksTimeout = 60 * time.Second // timeout interval: max time allowed for best peer to remain idle (not send new block after sync complete) - idleBestPeerTimeout = 120 * time.Second + idleBestPeerTimeout = 60 * time.Second // duration of suspension after peer fatal error during which peer is not allowed to reconnect peerSuspensionInterval = 300 * time.Second // status is logged every statusUpdateInterval statusUpdateInterval = 3 * time.Second + // + nodeCacheSize = 1000 ) // blockpool config, values default to constants @@ -49,6 +49,7 @@ type Config struct { BlockBatchSize int BlocksRequestRepetition int BlocksRequestMaxIdleRounds int + NodeCacheSize int BlockHashesRequestInterval time.Duration BlocksRequestInterval time.Duration BlockHashesTimeout time.Duration @@ -74,17 +75,19 @@ var errorToString = map[int]string{ ErrInvalidPoW: "Invalid PoW", // fatal ErrInsufficientChainInfo: "Insufficient chain info", // fatal ErrIdleTooLong: "Idle too long", // fatal - ErrIncorrectTD: "Incorrect Total Difficulty", // fatal + ErrIncorrectTD: "Incorrect Total Difficulty", // should be fatal, not now temporarily ErrUnrequestedBlock: "Unrequested block", } // error severity -func severity(code int) ethlogger.LogLevel { +func severity(code int) logger.LogLevel { switch code { + case ErrIncorrectTD: + return logger.WarnLevel case ErrUnrequestedBlock: - return ethlogger.WarnLevel + return logger.WarnLevel default: - return ethlogger.ErrorLevel + return logger.ErrorLevel } } @@ -120,6 +123,9 @@ func (self *Config) init() { if self.PeerSuspensionInterval == 0 { self.PeerSuspensionInterval = peerSuspensionInterval } + if self.NodeCacheSize == 0 { + self.NodeCacheSize = nodeCacheSize + } if self.StatusUpdateInterval == 0 { self.StatusUpdateInterval = statusUpdateInterval } @@ -171,6 +177,7 @@ type BlockPool struct { nodeCache map[common.Hash]*node nodeCacheLock sync.RWMutex + nodeCacheList []common.Hash // waitgroup is used in tests to wait for result-critical routines // as well as in determining idle / syncing status @@ -248,7 +255,7 @@ func (self *BlockPool) Start() { if (ev.Block.HeaderHash == common.Hash{}) { height = ev.Block.Header().Number } - plog.DebugDetailf("ChainHeadEvent: height: %v, td: %v, hash: %s", height, td, hex(ev.Block.Hash())) + glog.V(logger.Detail).Infof("ChainHeadEvent: height: %v, td: %v, hash: %s", height, td, hex(ev.Block.Hash())) self.setTD(td) self.peers.lock.Lock() @@ -262,11 +269,11 @@ func (self *BlockPool) Start() { self.peers.lock.Unlock() } case <-timer.C: - plog.DebugDetailf("status:\n%v", self.Status()) + glog.V(logger.Detail).Infof("status:\n%v", self.Status()) } } }() - glog.V(ethlogger.Info).Infoln("Blockpool started") + glog.V(logger.Info).Infoln("Blockpool started") } func (self *BlockPool) Stop() { @@ -279,7 +286,7 @@ func (self *BlockPool) Stop() { self.lock.Unlock() - plog.Infoln("Stopping...") + glog.V(logger.Info).Infoln("Stopping...") self.tdSub.Unsubscribe() close(self.quit) @@ -289,7 +296,7 @@ func (self *BlockPool) Stop() { self.pool = nil self.lock.Unlock() - plog.Infoln("Stopped") + glog.V(logger.Info).Infoln("Stopped") } // Wait blocks until active processes finish @@ -301,7 +308,7 @@ func (self *BlockPool) Wait(t time.Duration) { } self.lock.Unlock() - plog.Infoln("Waiting for processes to complete...") + glog.V(logger.Info).Infoln("Waiting for processes to complete...") w := make(chan bool) go func() { self.wg.Wait() @@ -310,9 +317,9 @@ func (self *BlockPool) Wait(t time.Duration) { select { case <-w: - plog.Infoln("Processes complete") + glog.V(logger.Info).Infoln("Processes complete") case <-time.After(t): - plog.Warnf("Timeout") + glog.V(logger.Warn).Infoln("Timeout") } } @@ -343,7 +350,7 @@ func (self *BlockPool) AddPeer( // RemovePeer needs to be called when the peer disconnects func (self *BlockPool) RemovePeer(peerId string) { - self.peers.removePeer(peerId) + self.peers.removePeer(peerId, true) } /* @@ -383,7 +390,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st hash, ok = next() bestpeer.lock.RLock() - plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) + glog.V(logger.Debug).Infof("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) // first check if we are building the head section of a peer's chain if bestpeer.parentHash == hash { @@ -400,48 +407,45 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st */ headSection = true if entry := self.get(bestpeer.currentBlockHash); entry == nil { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) // if head block is not yet in the pool, create entry and start node list for section + self.nodeCacheLock.Lock() + n := self.findOrCreateNode(bestpeer.currentBlockHash, peerId) + n.block = bestpeer.currentBlock + n.blockBy = peerId + n.td = bestpeer.td + self.nodeCacheLock.Unlock() - node := &node{ - hash: bestpeer.currentBlockHash, - block: bestpeer.currentBlock, - hashBy: peerId, - blockBy: peerId, - td: bestpeer.td, - } // nodes is a list of nodes in one section ordered top-bottom (old to young) - nodes = append(nodes, node) - n++ + nodes = append(nodes, n) } else { // otherwise set child section iff found node is the root of a section // this is a possible scenario when a singleton head section was created // on an earlier occasion when this peer or another with the same block was best peer if entry.node == entry.section.bottom { child = entry.section - plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) } } } else { // otherwise : we are not building the head section of the peer - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash)) } // the switch channel signals peerswitch event - switchC := bestpeer.switchC bestpeer.lock.RUnlock() // iterate over hashes coming from peer (first round we have hash set above) LOOP: for ; ok; hash, ok = next() { - + n++ select { case <-self.quit: // global quit for blockpool return - case <-switchC: + case <-bestpeer.switchC: // if the peer is demoted, no more hashes read - plog.DebugDetailf("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash)) peerswitch = true break LOOP default: @@ -450,9 +454,9 @@ LOOP: // if we reach the blockchain we stop reading further blockhashes if self.hasBlock(hash) { // check if known block connecting the downloaded chain to our blockchain - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) if len(nodes) == 1 { - plog.DebugDetailf("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) // create new section if needed and push it to the blockchain sec = self.newSection(nodes) @@ -470,7 +474,7 @@ LOOP: and td together with blockBy are recorded on the node */ if len(nodes) == 0 && child != nil { - plog.DebugDetailf("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash)) child.addSectionToBlockChain(bestpeer) } @@ -490,23 +494,21 @@ LOOP: response to hashes request. Note that by providing we can link sections without having to wait for the root block of the child section to arrive, so it allows for superior performance. */ - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) // record the entry's chain section as child section child = entry.section continue LOOP } // otherwise record entry's chain section as parent connecting it to the pool - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) parent = entry.section break LOOP } // finally if node for block hash does not exist, create it and append node to section nodes - node := &node{ - hash: hash, - hashBy: peerId, - } - nodes = append(nodes, node) + self.nodeCacheLock.Lock() + nodes = append(nodes, self.findOrCreateNode(hash, peerId)) + self.nodeCacheLock.Unlock() } //for /* @@ -518,13 +520,13 @@ LOOP: */ self.chainLock.Lock() - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) /* Handle forks where connecting node is mid-section by splitting section at fork. No splitting needed if connecting node is head of a section. */ if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) self.splitSection(parent, entry) @@ -537,10 +539,7 @@ LOOP: sec = self.linkSections(nodes, parent, child) if sec != nil { - self.status.lock.Lock() - self.status.values.BlockHashes += len(nodes) - self.status.lock.Unlock() - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) } self.chainLock.Unlock() @@ -554,10 +553,8 @@ LOOP: In this case no activation should happen */ if parent != nil && !peerswitch { - bestpeer.lock.RLock() + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) self.activateChain(parent, bestpeer, bestpeer.switchC, nil) - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) - bestpeer.lock.RUnlock() } /* @@ -578,10 +575,10 @@ LOOP: Otherwise no way to check if it arrived. */ bestpeer.requestBlockHashes(sec.bottom.hash) - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) sec.activate(bestpeer) } else { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) sec.deactivate() } } @@ -589,7 +586,7 @@ LOOP: // If we are processing peer's head section, signal it to headSection process that it is created. if headSection { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) var headSec *section switch { @@ -601,7 +598,7 @@ LOOP: headSec = parent } if !peerswitch { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec)) bestpeer.headSectionC <- headSec } } @@ -635,6 +632,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { if sender == nil { return } + sender.lock.Lock() tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block) entry := self.get(hash) @@ -643,7 +641,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. delayed B sends you block ... UNREQUESTED. Blocked if entry == nil { - plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.addError(ErrUnrequestedBlock, "%x", hash) self.status.lock.Lock() @@ -656,28 +654,17 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { var bnode *node if entry == nil { self.nodeCacheLock.Lock() - bnode, _ = self.nodeCache[hash] - if bnode == nil { - bnode = &node{ - hash: currentBlockHash, - block: block, - hashBy: peerId, - blockBy: peerId, - td: tdFromCurrentHead, - } - self.nodeCache[hash] = bnode - } + bnode = self.findOrCreateNode(currentBlockHash, peerId) self.nodeCacheLock.Unlock() } else { bnode = entry.node } bnode.lock.Lock() - defer bnode.lock.Unlock() // check if block already received if bnode.block != nil { - plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy) + glog.V(logger.Detail).Infof("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy) // register peer on node as source if bnode.peers == nil { bnode.peers = make(map[string]bool) @@ -699,7 +686,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { creation is not blocking // validate block for PoW if !self.verifyPoW(block) { - plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + glog.V(logger.Warn).Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.addError(ErrInvalidPoW, "%x", hash) self.status.lock.Lock() @@ -711,13 +698,49 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { */ bnode.block = block bnode.blockBy = peerId + glog.V(logger.Detail).Infof("AddBlock: set td on node %s from peer <%s> (head: %s) to %v (was %v) ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.td, tdFromCurrentHead) bnode.td = tdFromCurrentHead self.status.lock.Lock() self.status.values.Blocks++ self.status.values.BlocksInPool++ self.status.lock.Unlock() } + bnode.lock.Unlock() + currentBlockC := sender.currentBlockC + switchC := sender.switchC + sender.lock.Unlock() + // this must be called without peerlock. + // peerlock held can halt the loop and block on select forever + if tdFromCurrentHead != nil { + select { + case currentBlockC <- block: + case <-switchC: // peer is not best peer + } + } +} + +func (self *BlockPool) findOrCreateNode(hash common.Hash, peerId string) (bnode *node) { + bnode, _ = self.nodeCache[hash] + if bnode == nil { + bnode = &node{ + hash: hash, + hashBy: peerId, + } + self.nodeCache[hash] = bnode + // purge oversize cache + if len(self.nodeCache) > self.Config.NodeCacheSize { + delete(self.nodeCache, self.nodeCacheList[0]) + self.nodeCacheList = append(self.nodeCacheList[1:], hash) + } else { + self.nodeCacheList = append(self.nodeCacheList, hash) + } + + self.status.lock.Lock() + self.status.values.BlockHashes++ + self.status.lock.Unlock() + } + return } /* @@ -731,8 +754,8 @@ func (self *BlockPool) activateChain(sec *section, p *peer, switchC chan bool, c LOOP: for sec != nil { - parent := self.getParent(sec) - plog.DebugDetailf("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id) + parent := sec.parent + glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id) sec.activate(p) if i > 0 && connected != nil { connected[sec.top.hash] = sec @@ -745,11 +768,11 @@ LOOP: if sec.bottom.block != nil { if entry := self.get(sec.bottom.block.ParentHash()); entry != nil { parent = entry.section - plog.DebugDetailf("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec)) + glog.V(logger.Detail).Infof("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec)) link(parent, sec) } } else { - plog.DebugDetailf("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id) + glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id) } } sec = parent @@ -769,17 +792,18 @@ LOOP: func (self *BlockPool) checkTD(nodes ...*node) { for _, n := range nodes { // skip check if queued future block + n.lock.RLock() if n.td != nil && !n.block.Queued() { - plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) - /* @zelig: Commented out temp untill the rest of the network has been fixed. + glog.V(logger.Detail).Infof("peer td %v =?= block td %v", n.td, n.block.Td) + // @zelig: Commented out temp untill the rest of the network has been fixed. if n.td.Cmp(n.block.Td) != 0 { - self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) + self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x peer td %v =?= block td %v", n.hash, n.td, n.block.Td) self.status.lock.Lock() self.status.badPeers[n.blockBy]++ self.status.lock.Unlock() } - */ } + n.lock.RUnlock() } } diff --git a/blockpool/blockpool_test.go b/blockpool/blockpool_test.go index b28c2abbf..e79991f15 100644 --- a/blockpool/blockpool_test.go +++ b/blockpool/blockpool_test.go @@ -3,19 +3,12 @@ package blockpool import ( "testing" "time" - - "github.com/ethereum/go-ethereum/blockpool/test" ) -func init() { - test.LogInit() -} - // using the mock framework in blockpool_util_test // we test various scenarios here func TestPeerWithKnownBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.refBlockChain[0] = nil blockPoolTester.blockChain[0] = nil @@ -31,7 +24,6 @@ func TestPeerWithKnownBlock(t *testing.T) { } func TestPeerWithKnownParentBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.initRefBlockChain(1) blockPoolTester.blockChain[0] = nil @@ -50,7 +42,6 @@ func TestPeerWithKnownParentBlock(t *testing.T) { } func TestSimpleChain(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(2) @@ -70,7 +61,6 @@ func TestSimpleChain(t *testing.T) { } func TestChainConnectingWithParentHash(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(3) @@ -90,7 +80,6 @@ func TestChainConnectingWithParentHash(t *testing.T) { } func TestMultiSectionChain(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(5) @@ -113,7 +102,6 @@ func TestMultiSectionChain(t *testing.T) { } func TestNewBlocksOnPartialChain(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(7) @@ -146,7 +134,6 @@ func TestNewBlocksOnPartialChain(t *testing.T) { } func TestPeerSwitchUp(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(7) @@ -174,7 +161,6 @@ func TestPeerSwitchUp(t *testing.T) { } func TestPeerSwitchDownOverlapSectionWithoutRootBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(6) @@ -200,7 +186,6 @@ func TestPeerSwitchDownOverlapSectionWithoutRootBlock(t *testing.T) { } func TestPeerSwitchDownOverlapSectionWithRootBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(6) @@ -227,7 +212,6 @@ func TestPeerSwitchDownOverlapSectionWithRootBlock(t *testing.T) { } func TestPeerSwitchDownDisjointSection(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(3) @@ -254,7 +238,6 @@ func TestPeerSwitchDownDisjointSection(t *testing.T) { } func TestPeerSwitchBack(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(8) @@ -270,7 +253,7 @@ func TestPeerSwitchBack(t *testing.T) { go peer2.serveBlockHashes(6, 5, 4) peer2.serveBlocks(4, 5) // section partially complete peer1.AddPeer() // peer1 is promoted as best peer - go peer1.serveBlocks(10, 11) // + peer1.serveBlocks(10, 11) // peer1.serveBlockHashes(11, 10) // only gives useless results blockPool.RemovePeer("peer1") // peer1 disconnects go peer2.serveBlockHashes(4, 3, 2, 1, 0) // tests that asking for hashes from 4 is remembered @@ -284,7 +267,6 @@ func TestPeerSwitchBack(t *testing.T) { } func TestForkSimple(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(9) @@ -320,7 +302,6 @@ func TestForkSimple(t *testing.T) { } func TestForkSwitchBackByNewBlocks(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(11) @@ -351,8 +332,8 @@ func TestForkSwitchBackByNewBlocks(t *testing.T) { go peer1.serveBlockHashes(11, 10, 9) go peer1.serveBlocks(9, 10) // time.Sleep(1 * time.Second) - go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered - go peer1.serveBlockHashes(2, 1) // tests that hash request from root of connecting chain section (added by demoted peer) is remembered + go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered + go peer1.serveBlockHashes(2, 1, 0) // tests that hash request from root of connecting chain section (added by demoted peer) is remembered peer1.serveBlocks(0, 1) blockPool.Wait(waitTimeout) @@ -367,7 +348,6 @@ func TestForkSwitchBackByNewBlocks(t *testing.T) { } func TestForkSwitchBackByPeerSwitchBack(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(9) @@ -411,7 +391,6 @@ func TestForkSwitchBackByPeerSwitchBack(t *testing.T) { } func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(9) @@ -429,16 +408,17 @@ func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) { peer1.AddPeer() go peer1.serveBlocks(8, 9) go peer1.serveBlockHashes(9, 8, 7) - peer1.serveBlocks(3, 7, 8) // make sure this section is complete - time.Sleep(1 * time.Second) // - go peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary + peer1.serveBlocks(3, 7, 8) // make sure this section is complete + // time.Sleep(2 * time.Second) // + peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary peer1.serveBlocks(2, 3) // partially complete sections block 2 missing peer2.AddPeer() // go peer2.serveBlocks(5, 6) // go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3 + time.Sleep(100 * time.Millisecond) // peer2.serveBlocks(2, 3, 4, 5) // block 2 still missing. blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer - go peer1.serveBlockHashes(2, 1, 0) // + go peer1.serveBlockHashes(2, 1) // peer1.serveBlocks(0, 1, 2) blockPool.Wait(waitTimeout) diff --git a/blockpool/config_test.go b/blockpool/config_test.go index e1ce31f27..e882fefe1 100644 --- a/blockpool/config_test.go +++ b/blockpool/config_test.go @@ -17,6 +17,7 @@ func TestBlockPoolConfig(t *testing.T) { test.CheckInt("BlockBatchSize", c.BlockBatchSize, blockBatchSize, t) test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t) test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t) + test.CheckInt("NodeCacheSize", c.NodeCacheSize, nodeCacheSize, t) test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, blockHashesRequestInterval, t) test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, blocksRequestInterval, t) test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t) @@ -29,7 +30,7 @@ func TestBlockPoolConfig(t *testing.T) { func TestBlockPoolOverrideConfig(t *testing.T) { test.LogInit() blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}} - c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second, 4 * time.Second} + c := &Config{128, 32, 1, 0, 500, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second, 4 * time.Second} blockPool.Config = c blockPool.Start() @@ -37,6 +38,7 @@ func TestBlockPoolOverrideConfig(t *testing.T) { test.CheckInt("BlockBatchSize", c.BlockBatchSize, 32, t) test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t) test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t) + test.CheckInt("NodeCacheSize", c.NodeCacheSize, 500, t) test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, 300*time.Millisecond, t) test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, 100*time.Millisecond, t) test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t) diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go index 0fbf94d7d..2ab2d47f5 100644 --- a/blockpool/errors_test.go +++ b/blockpool/errors_test.go @@ -4,14 +4,12 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/blockpool/test" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/pow" ) func TestInvalidBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(2) @@ -41,7 +39,6 @@ func TestInvalidBlock(t *testing.T) { func TestVerifyPoW(t *testing.T) { t.Skip() // :FIXME: - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(3) @@ -88,7 +85,6 @@ func TestVerifyPoW(t *testing.T) { func TestUnrequestedBlock(t *testing.T) { t.Skip() // :FIXME: - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPool.Start() @@ -108,7 +104,6 @@ func TestUnrequestedBlock(t *testing.T) { } func TestErrInsufficientChainInfo(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPool.Config.BlockHashesTimeout = 100 * time.Millisecond blockPool.Start() @@ -128,8 +123,6 @@ func TestErrInsufficientChainInfo(t *testing.T) { } func TestIncorrectTD(t *testing.T) { - t.Skip("skipping TD check until network is healthy") - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(3) @@ -156,9 +149,6 @@ func TestIncorrectTD(t *testing.T) { } func TestSkipIncorrectTDonFutureBlocks(t *testing.T) { - // t.Skip() // @zelig this one requires fixing for the TD - - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(3) @@ -195,31 +185,40 @@ func TestSkipIncorrectTDonFutureBlocks(t *testing.T) { } func TestPeerSuspension(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPool.Config.PeerSuspensionInterval = 100 * time.Millisecond blockPool.Start() - peer1 := blockPoolTester.newPeer("peer1", 1, 3) + peer1 := blockPoolTester.newPeer("peer1", 3, 3) peer1.AddPeer() - blockPool.peers.peerError("peer1", 0, "") bestpeer, _ := blockPool.peers.getPeer("peer1") + if bestpeer == nil { + t.Errorf("peer1 not best peer") + return + } + peer1.serveBlocks(2, 3) + + blockPool.peers.peerError("peer1", 0, "") + bestpeer, _ = blockPool.peers.getPeer("peer1") if bestpeer != nil { t.Errorf("peer1 not removed on error") + return } peer1.AddPeer() bestpeer, _ = blockPool.peers.getPeer("peer1") if bestpeer != nil { t.Errorf("peer1 not removed on reconnect") + return } time.Sleep(100 * time.Millisecond) peer1.AddPeer() + bestpeer, _ = blockPool.peers.getPeer("peer1") if bestpeer == nil { t.Errorf("peer1 not connected after PeerSuspensionInterval") + return } - // blockPool.Wait(waitTimeout) blockPool.Stop() } diff --git a/blockpool/peers.go b/blockpool/peers.go index c6cade460..eb2ec6a1f 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -10,6 +10,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) // the blockpool's model of a peer @@ -106,9 +108,10 @@ func (self *peers) peerError(id string, code int, format string, params ...inter peer, ok := self.peers[id] self.lock.RUnlock() if ok { - peer.addError(code, format, params) + peer.addError(code, format, params...) + } else { + self.addToBlacklist(id) } - self.addToBlacklist(id) } // record time of offence in blacklist to implement suspension for PeerSuspensionInterval @@ -134,7 +137,11 @@ func (self *peers) suspended(id string) (s bool) { func (self *peer) addError(code int, format string, params ...interface{}) { err := self.errors.New(code, format, params...) self.peerError(err) - self.addToBlacklist(self.id) + if err.Fatal() { + self.addToBlacklist(self.id) + } else { + go self.bp.peers.removePeer(self.id, false) + } } // caller must hold peer lock @@ -143,7 +150,8 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { defer self.lock.Unlock() if self.currentBlockHash != currentBlockHash { previousBlockHash := self.currentBlockHash - plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash)) + glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash)) + self.td = td self.currentBlockHash = currentBlockHash self.currentBlock = nil @@ -154,41 +162,30 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { } func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) { - self.lock.Lock() - currentBlockC := self.currentBlockC - switchC := self.switchC hash := block.Hash() // this happens when block came in a newblock message but // also if sent in a blockmsg (for instance, if we requested, only if we // dont apply on blockrequests the restriction of flood control) currentBlockHash = self.currentBlockHash - if currentBlockHash == hash && self.currentBlock == nil { - // signal to head section process - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) - td = self.td - } else { - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) - } - self.lock.Unlock() - // this must be called without peerlock. - // peerlock held can halt the loop and block on select forever - if td != nil { - select { - case currentBlockC <- block: - case <-switchC: // peer is not best peer + if currentBlockHash == hash { + if self.currentBlock == nil { + // signal to head section process + glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) + td = self.td + } else { + glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) } } return } // this will use the TD given by the first peer to update peer td, this helps second best peer selection -// :FIXME: node func (self *peer) setChainInfoFromNode(n *node) { // in case best peer is lost block := n.block hash := block.Hash() if n.td != nil && n.td.Cmp(self.td) > 0 { - plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) + glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) self.td = n.td self.currentBlockHash = block.Hash() self.parentHash = block.ParentHash() @@ -205,7 +202,7 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { peerCount := len(self.peers) // on first attempt use the best peer if attempts == 0 && self.best != nil { - plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id) + glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id) self.best.requestBlocks(hashes) return } @@ -217,11 +214,11 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { indexes := rand.Perm(peerCount)[0:repetitions] sort.Ints(indexes) - plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) + glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) for _, peer := range self.peers { if i == indexes[0] { - plog.DebugDetailf("request length: %v", len(hashes)) - plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id) + glog.V(logger.Detail).Infof("request length: %v", len(hashes)) + glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id) peer.requestBlocks(hashes) indexes = indexes[1:] if len(indexes) == 0 { @@ -248,7 +245,6 @@ func (self *peers) addPeer( self.lock.Lock() defer self.lock.Unlock() - var previousBlockHash common.Hash if self.suspended(id) { suspended = true @@ -259,7 +255,6 @@ func (self *peers) addPeer( // when called on an already connected peer, it means a newBlockMsg is received // peer head info is updated p.setChainInfo(td, currentBlockHash) - // FIXME: only count the same block once self.status.lock.Lock() self.status.values.NewBlocks++ self.status.lock.Unlock() @@ -272,25 +267,25 @@ func (self *peers) addPeer( self.status.values.NewBlocks++ self.status.lock.Unlock() - plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash)) } // check if peer's current head block is known if self.bp.hasBlock(currentBlockHash) { // peer not ahead - plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) return false, false } if self.best == p { // new block update for active current best peer -> request hashes - plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) if (previousBlockHash != common.Hash{}) { - plog.DebugDetailf("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash)) + glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash)) p.headSectionC <- nil if entry := self.bp.get(previousBlockHash); entry != nil { - plog.DebugDetailf("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) + glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) self.bp.activateChain(entry.section, p, p.switchC, nil) p.sections = append(p.sections, previousBlockHash) } @@ -309,7 +304,8 @@ func (self *peers) addPeer( self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) + glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) + // fmt.Printf("best peer %v - \n", bestpeer, id) self.bp.switchPeer(bestpeer, p) self.best = p best = true @@ -320,7 +316,7 @@ func (self *peers) addPeer( } // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects -func (self *peers) removePeer(id string) { +func (self *peers) removePeer(id string, del bool) { self.lock.Lock() defer self.lock.Unlock() @@ -328,10 +324,13 @@ func (self *peers) removePeer(id string) { if !found { return } + p.lock.Lock() + defer p.lock.Unlock() - delete(self.peers, id) - plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td) - + if del { + delete(self.peers, id) + glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td) + } // if current best peer is removed, need to find a better one if self.best == p { var newp *peer @@ -339,20 +338,29 @@ func (self *peers) removePeer(id string) { max := self.bp.getTD() // peer with the highest self-acclaimed TD is chosen for _, pp := range self.peers { + // demoted peer's td should be 0 + if pp.id == id { + pp.td = common.Big0 + pp.currentBlockHash = common.Hash{} + continue + } + pp.lock.RLock() if pp.td.Cmp(max) > 0 { max = pp.td newp = pp } + pp.lock.RUnlock() } if newp != nil { self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) + glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) } else { - plog.Warnln("addPeer: no suitable peers found") + glog.V(logger.Warn).Infof("addPeer: no suitable peers found") } self.best = newp + // fmt.Printf("remove peer %v - %v\n", p.id, newp) self.bp.switchPeer(p, newp) } } @@ -363,16 +371,17 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // first quit AddBlockHashes, requestHeadSection and activateChain // by closing the old peer's switchC channel if oldp != nil { - plog.DebugDetailf("<%s> quit peer processes", oldp.id) + glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id) + // fmt.Printf("close %v - %v\n", oldp.id, newp) close(oldp.switchC) } if newp != nil { - // newp.idleC = make(chan bool) - // newp.switchC = make(chan bool) // if new best peer has no head section yet, create it and run it // otherwise head section is an element of peer.sections + newp.idleC = make(chan bool) + newp.switchC = make(chan bool) if newp.headSection == nil { - plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash)) + glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash)) if newp.idle { self.wg.Add(1) @@ -388,15 +397,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } }() - } else { - newp.idleC = make(chan bool) - newp.switchC = make(chan bool) } var connected = make(map[common.Hash]*section) var sections []common.Hash for _, hash := range newp.sections { - plog.DebugDetailf("activate chain starting from section [%s]", hex(hash)) + glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash)) // if section not connected (ie, top of a contiguous sequence of sections) if connected[hash] == nil { // if not deleted, then reread from pool (it can be orphaned top half of a split section) @@ -407,7 +413,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } } } - plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) + glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) // need to lock now that newp is exposed to section processesr newp.lock.Lock() newp.sections = sections @@ -416,7 +422,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // finally deactivate section process for sections where newp didnt activate // newp activating section process changes the quit channel for this reason if oldp != nil { - plog.DebugDetailf("<%s> quit section processes", oldp.id) + glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id) close(oldp.idleC) } } @@ -438,7 +444,7 @@ func (self *peers) getPeer(id string) (p *peer, best bool) { func (self *peer) handleSection(sec *section) { self.lock.Lock() defer self.lock.Unlock() - plog.DebugDetailf("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec)) self.headSection = sec self.blockHashesRequestTimer = nil @@ -453,7 +459,7 @@ func (self *peer) handleSection(sec *section) { self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) self.bestIdleTimer = nil - plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) } else { if !self.idle { self.idle = true @@ -462,12 +468,14 @@ func (self *peer) handleSection(sec *section) { self.headInfoTimer = nil self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout) - plog.DebugDetailf("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec)) } } func (self *peer) getCurrentBlock(currentBlock *types.Block) { // called by update or after AddBlock signals that head block of current peer is received + self.lock.Lock() + defer self.lock.Unlock() if currentBlock == nil { if entry := self.bp.get(self.currentBlockHash); entry != nil { entry.node.lock.Lock() @@ -475,22 +483,20 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) { entry.node.lock.Unlock() } if currentBlock != nil { - plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) } else { - plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) self.requestBlocks([]common.Hash{self.currentBlockHash}) self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) return } } else { - plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash())) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash())) } - self.lock.Lock() - defer self.lock.Unlock() self.currentBlock = currentBlock self.parentHash = currentBlock.ParentHash() - plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash)) self.blockHashesRequestTimer = time.After(0) self.blocksRequestTimer = nil } @@ -500,7 +506,7 @@ func (self *peer) getBlockHashes() bool { defer self.lock.Unlock() //if connecting parent is found if self.bp.hasBlock(self.parentHash) { - plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock})) self.bp.status.lock.Lock() @@ -510,16 +516,15 @@ func (self *peer) getBlockHashes() bool { self.addError(ErrInvalidBlock, "%v", err) self.bp.status.badPeers[self.id]++ } else { - /* @zelig: Commented out temp untill the rest of the network has been fixed. // XXX added currentBlock check (?) if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() { - plog.DebugDetailf("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) + glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) if self.td.Cmp(self.currentBlock.Td) != 0 { - self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) + self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td) self.bp.status.badPeers[self.id]++ } } - */ + headKey := self.parentHash height := self.bp.status.chain[headKey] + 1 self.bp.status.chain[self.currentBlockHash] = height @@ -532,21 +537,20 @@ func (self *peer) getBlockHashes() bool { } else { if parent := self.bp.get(self.parentHash); parent != nil { if self.bp.get(self.currentBlockHash) == nil { - plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash)) - n := &node{ - hash: self.currentBlockHash, - block: self.currentBlock, - hashBy: self.id, - blockBy: self.id, - td: self.td, + glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash)) + self.bp.nodeCacheLock.Lock() + n, ok := self.bp.nodeCache[self.currentBlockHash] + if !ok { + panic("not found in nodeCache") } + self.bp.nodeCacheLock.Unlock() self.bp.newSection([]*node{n}).activate(self) } else { - plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) + glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) self.bp.activateChain(parent.section, self, self.switchC, nil) } } else { - plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) + glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) self.requestBlockHashes(self.currentBlockHash) self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval) return false @@ -565,15 +569,6 @@ func (self *peer) getBlockHashes() bool { // main loop for head section process func (self *peer) run() { - self.lock.Lock() - self.switchC = make(chan bool) - self.idleC = make(chan bool) - switchC := self.switchC - plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash)) - self.lock.Unlock() - - self.blockHashesRequestTimer = nil - self.blocksRequestTimer = time.After(0) self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) self.bestIdleTimer = nil @@ -585,7 +580,7 @@ LOOP: select { // to minitor section process behaviour case <-ping.C: - plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) + glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) // signal from AddBlockHashes that head section for current best peer is created // if sec == nil, it signals that chain info has updated (new block message) @@ -614,12 +609,12 @@ LOOP: // there is no persistence here, so GC will just take care of cleaning up // signal for peer switch, quit - case <-switchC: + case <-self.switchC: var complete = "incomplete " if self.idle { complete = "complete" } - plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete) + glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete) break LOOP // global quit for blockpool @@ -633,7 +628,7 @@ LOOP: self.bp.status.lock.Lock() self.bp.status.badPeers[self.id]++ self.bp.status.lock.Unlock() - plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) } } diff --git a/blockpool/peers_test.go b/blockpool/peers_test.go index e5788e379..639abbc26 100644 --- a/blockpool/peers_test.go +++ b/blockpool/peers_test.go @@ -1,23 +1,30 @@ package blockpool import ( + "flag" "math/big" "testing" "time" - "github.com/ethereum/go-ethereum/blockpool/test" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +var ( + _ = flag.Set("alsologtostderr", "true") + // _ = flag.Set("log_dir", ".") + _ = flag.Set("v", "5") ) // the actual tests func TestAddPeer(t *testing.T) { - test.LogInit() + glog.V(logger.Error).Infoln("logging...") hashPool, blockPool, blockPoolTester := newTestBlockPool(t) - peer0 := blockPoolTester.newPeer("peer0", 1, 1) - peer1 := blockPoolTester.newPeer("peer1", 2, 2) - peer2 := blockPoolTester.newPeer("peer2", 3, 3) + peer0 := blockPoolTester.newPeer("peer0", 2, 2) + peer1 := blockPoolTester.newPeer("peer1", 4, 4) + peer2 := blockPoolTester.newPeer("peer2", 6, 6) var bestpeer *peer blockPool.Start() @@ -25,127 +32,149 @@ func TestAddPeer(t *testing.T) { // pool best := peer0.AddPeer() if !best { - t.Errorf("peer0 (TD=1) not accepted as best") + t.Errorf("peer0 (TD=2) not accepted as best") + return } if blockPool.peers.best.id != "peer0" { - t.Errorf("peer0 (TD=1) not set as best") + t.Errorf("peer0 (TD=2) not set as best") + return } + peer0.serveBlocks(1, 2) best = peer2.AddPeer() if !best { - t.Errorf("peer2 (TD=3) not accepted as best") + t.Errorf("peer2 (TD=6) not accepted as best") + return } if blockPool.peers.best.id != "peer2" { - t.Errorf("peer2 (TD=3) not set as best") + t.Errorf("peer2 (TD=6) not set as best") + return } - peer2.waitBlocksRequests(3) + peer2.serveBlocks(5, 6) best = peer1.AddPeer() if best { - t.Errorf("peer1 (TD=2) accepted as best") + t.Errorf("peer1 (TD=4) accepted as best") + return } if blockPool.peers.best.id != "peer2" { - t.Errorf("peer2 (TD=3) not set any more as best") + t.Errorf("peer2 (TD=6) not set any more as best") + return } - if blockPool.peers.best.td.Cmp(big.NewInt(int64(3))) != 0 { - t.Errorf("peer1 TD not set") + if blockPool.peers.best.td.Cmp(big.NewInt(int64(6))) != 0 { + t.Errorf("peer2 TD=6 not set") + return } - peer2.td = 4 - peer2.currentBlock = 4 + peer2.td = 8 + peer2.currentBlock = 8 best = peer2.AddPeer() if !best { - t.Errorf("peer2 (TD=4) not accepted as best") + t.Errorf("peer2 (TD=8) not accepted as best") + return } if blockPool.peers.best.id != "peer2" { - t.Errorf("peer2 (TD=4) not set as best") + t.Errorf("peer2 (TD=8) not set as best") + return } - if blockPool.peers.best.td.Cmp(big.NewInt(int64(4))) != 0 { - t.Errorf("peer2 TD not updated") + if blockPool.peers.best.td.Cmp(big.NewInt(int64(8))) != 0 { + t.Errorf("peer2 TD = 8 not updated") + return } - peer2.waitBlocksRequests(4) - peer1.td = 3 - peer1.currentBlock = 3 + peer1.td = 6 + peer1.currentBlock = 6 best = peer1.AddPeer() if best { - t.Errorf("peer1 (TD=3) should not be set as best") + t.Errorf("peer1 (TD=6) should not be set as best") + return } if blockPool.peers.best.id == "peer1" { - t.Errorf("peer1 (TD=3) should not be set as best") + t.Errorf("peer1 (TD=6) should not be set as best") + return } bestpeer, best = blockPool.peers.getPeer("peer1") - if bestpeer.td.Cmp(big.NewInt(int64(3))) != 0 { - t.Errorf("peer1 TD should be updated") + if bestpeer.td.Cmp(big.NewInt(int64(6))) != 0 { + t.Errorf("peer1 TD=6 should be updated") + return } blockPool.RemovePeer("peer2") bestpeer, best = blockPool.peers.getPeer("peer2") if bestpeer != nil { t.Errorf("peer2 not removed") + return } if blockPool.peers.best.id != "peer1" { - t.Errorf("existing peer1 (TD=3) should be set as best peer") + t.Errorf("existing peer1 (TD=6) should be set as best peer") + return } - peer1.waitBlocksRequests(3) blockPool.RemovePeer("peer1") bestpeer, best = blockPool.peers.getPeer("peer1") if bestpeer != nil { t.Errorf("peer1 not removed") + return } if blockPool.peers.best.id != "peer0" { - t.Errorf("existing peer0 (TD=1) should be set as best peer") + t.Errorf("existing peer0 (TD=2) should be set as best peer") + return } - peer0.waitBlocksRequests(1) blockPool.RemovePeer("peer0") bestpeer, best = blockPool.peers.getPeer("peer0") if bestpeer != nil { - t.Errorf("peer1 not removed") + t.Errorf("peer0 not removed") + return } // adding back earlier peer ok - peer0.currentBlock = 3 + peer0.currentBlock = 5 + peer0.td = 5 best = peer0.AddPeer() if !best { - t.Errorf("peer0 (TD=1) should be set as best") + t.Errorf("peer0 (TD=5) should be set as best") + return } if blockPool.peers.best.id != "peer0" { - t.Errorf("peer0 (TD=1) should be set as best") + t.Errorf("peer0 (TD=5) should be set as best") + return } - peer0.waitBlocksRequests(3) + peer0.serveBlocks(4, 5) - hash := hashPool.IndexesToHashes([]int{0})[0] - newblock := &types.Block{Td: common.Big3, HeaderHash: hash} + hash := hashPool.IndexesToHashes([]int{6})[0] + newblock := &types.Block{Td: big.NewInt(int64(6)), HeaderHash: hash} blockPool.chainEvents.Post(core.ChainHeadEvent{newblock}) time.Sleep(100 * time.Millisecond) if blockPool.peers.best != nil { t.Errorf("no peer should be ahead of self") + return } best = peer1.AddPeer() if blockPool.peers.best != nil { - t.Errorf("still no peer should be ahead of self") + t.Errorf("after peer1 (TD=6) still no peer should be ahead of self") + return } best = peer2.AddPeer() if !best { - t.Errorf("peer2 (TD=4) not accepted as best") + t.Errorf("peer2 (TD=8) not accepted as best") + return } blockPool.RemovePeer("peer2") if blockPool.peers.best != nil { t.Errorf("no peer should be ahead of self") + return } blockPool.Stop() } func TestPeerPromotionByTdOnBlock(t *testing.T) { - test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(4) diff --git a/blockpool/section.go b/blockpool/section.go index 1ab543dc0..cab88e561 100644 --- a/blockpool/section.go +++ b/blockpool/section.go @@ -6,6 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) /* @@ -88,7 +90,7 @@ func (self *BlockPool) newSection(nodes []*node) *section { self.set(n.hash, entry) } - plog.DebugDetailf("[%s] setup section process", sectionhex(sec)) + glog.V(logger.Detail).Infof("[%s] setup section process", sectionhex(sec)) go sec.run() return sec @@ -132,13 +134,13 @@ func (self *section) addSectionToBlockChain(p *peer) { } self.bp.lock.Unlock() - plog.Debugf("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash())) + glog.V(logger.Debug).Infof("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash())) err := self.bp.insertChain(blocks) if err != nil { self.invalid = true self.bp.peers.peerError(n.blockBy, ErrInvalidBlock, "%v", err) - plog.Warnf("invalid block %x", n.hash) - plog.Warnf("penalise peers %v (hash), %v (block)", n.hashBy, n.blockBy) + glog.V(logger.Error).Infof("invalid block %x", n.hash) + glog.V(logger.Error).Infof("penalise peers %v (hash), %v (block)", n.hashBy, n.blockBy) // or invalid block and the entire chain needs to be removed self.removeChain() @@ -146,7 +148,6 @@ func (self *section) addSectionToBlockChain(p *peer) { // check tds self.bp.wg.Add(1) go func() { - plog.DebugDetailf("checking td") self.bp.checkTD(nodes...) self.bp.wg.Done() }() @@ -159,15 +160,15 @@ func (self *section) addSectionToBlockChain(p *peer) { if child := self.bp.getChild(self); child != nil { select { case <-child.offC: - plog.DebugDetailf("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) + glog.V(logger.Detail).Infof("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) case child.poolRootC <- p: - plog.DebugDetailf("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) + glog.V(logger.Detail).Infof("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child)) } child.addSectionToBlockChain(p) } else { - plog.DebugDetailf("[%s] no child section in pool", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] no child section in pool", sectionhex(self)) } - plog.DebugDetailf("[%s] section completely inserted to blockchain - remove", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] section completely inserted to blockchain - remove", sectionhex(self)) // complete sections are removed. if called from within section process, // this must run in its own go routine to avoid deadlock self.remove() @@ -216,7 +217,7 @@ LOOP: if self.peer != nil { name = self.peer.id } - plog.DebugDetailf("[%s] peer <%s> active: %v", sectionhex(self), name, self.active) + glog.V(logger.Detail).Infof("[%s] peer <%s> active: %v", sectionhex(self), name, self.active) // global quit from blockpool case <-self.bp.quit: @@ -239,30 +240,30 @@ LOOP: // peer quit or demoted, put section in idle mode case <-self.idleC: // peer quit or demoted, put section in idle mode - plog.Debugf("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id) + glog.V(logger.Debug).Infof("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id) self.switchOff() self.idleC = nil // timebomb - if section is not complete in time, nuke the entire chain case <-self.suicideTimer: self.removeChain() - plog.Debugf("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) + glog.V(logger.Debug).Infof("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.suicideTimer = nil break LOOP // closing suicideC triggers section suicide: removes section nodes from pool and terminates section process case <-self.suicideC: - plog.DebugDetailf("[%s] quit", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] quit", sectionhex(self)) break LOOP // alarm for checking blocks in the section case <-self.blocksRequestTimer: - plog.DebugDetailf("[%s] alarm: block request time", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] alarm: block request time", sectionhex(self)) self.processC = self.missingC // alarm for checking parent of the section or sending out hash requests case <-self.blockHashesRequestTimer: - plog.DebugDetailf("[%s] alarm: hash request time", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] alarm: hash request time", sectionhex(self)) self.blockHashesRequest() // activate this section process with a peer @@ -283,15 +284,13 @@ LOOP: case n, ok := <-self.processC: // channel closed, first iteration finished if !ok && !self.initialised { - plog.DebugDetailf("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) self.initialised = true self.processC = nil - // self.processC = make(chan *node, self.missing) self.checkRound() checking = false break } - // plog.DebugDetailf("[%s] section proc step %v: missing %v/%v/%v", sectionhex(self), self.step, self.missing, self.lastMissing, self.depth) if !checking { self.step = 0 self.missing = 0 @@ -322,19 +321,19 @@ LOOP: // if node has got block (received via async AddBlock call from protocol) if self.step == self.lastMissing { // current root of the pool - plog.DebugDetailf("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash)) + glog.V(logger.Detail).Infof("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash)) self.addSectionToBlockChain(self.peer) } } else { if (self.parentHash == common.Hash{}) && n == self.bottom { self.parentHash = block.ParentHash() - plog.DebugDetailf("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash)) + glog.V(logger.Detail).Infof("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash)) self.blockHashesRequest() } } } if self.initialised && self.step == self.lastMissing { - plog.DebugDetailf("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.checkRound() checking = false } @@ -347,7 +346,7 @@ LOOP: self.bp.wg.Done() } - plog.DebugDetailf("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests) + glog.V(logger.Detail).Infof("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests) } @@ -369,7 +368,7 @@ func (self *section) switchOn(newpeer *peer) { newp = newpeer.id } - plog.DebugDetailf("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp) + glog.V(logger.Detail).Infof("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp) } // activate section with current peer @@ -411,7 +410,7 @@ func (self *section) switchOff() { if oldpeer != nil { oldp = oldpeer.id } - plog.DebugDetailf("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth) self.active = false self.peer = nil @@ -462,19 +461,15 @@ func (self *section) blockHashesRequest() { if parentSection == nil { // only link to new parent if not switching peers - // this protects against synchronisation issue where during switching - // a demoted peer's fork will be chosen over the best peer's chain - // because relinking the correct chain (activateChain) is overwritten here in - // demoted peer's section process just before the section is put to idle mode if (self.parentHash != common.Hash{}) { if parent := self.bp.get(self.parentHash); parent != nil { parentSection = parent.section - plog.DebugDetailf("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection)) + glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection)) link(parentSection, self) } else { if self.bp.hasBlock(self.parentHash) { self.poolRoot = true - plog.DebugDetailf("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self)) self.addSectionToBlockChain(self.peer) self.blockHashesRequestTimer = nil self.blockHashesRequestsComplete = true @@ -488,15 +483,15 @@ func (self *section) blockHashesRequest() { if parentSection != nil { // activate parent section with this peer // but only if not during switch mode - plog.DebugDetailf("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection)) + glog.V(logger.Detail).Infof("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection)) self.bp.activateChain(parentSection, self.peer, self.peer.switchC, nil) // if not root of chain, switch off - plog.DebugDetailf("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests) + glog.V(logger.Detail).Infof("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests) self.blockHashesRequestTimer = nil self.blockHashesRequestsComplete = true } else { self.blockHashesRequests++ - plog.DebugDetailf("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests) + glog.V(logger.Detail).Infof("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests) self.peer.requestBlockHashes(self.bottom.hash) self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval) } @@ -508,12 +503,12 @@ func (self *section) blockHashesRequest() { func (self *section) checkRound() { if self.missing == 0 { // no missing blocks - plog.DebugDetailf("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) self.blocksRequestsComplete = true self.blocksRequestTimer = nil } else { // some missing blocks - plog.DebugDetailf("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth) self.blocksRequests++ pos := self.missing % self.bp.Config.BlockBatchSize if pos == 0 { @@ -529,7 +524,7 @@ func (self *section) checkRound() { self.idle++ // too many idle rounds if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds { - plog.DebugDetailf("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth) + glog.V(logger.Detail).Infof("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth) self.removeChain() } } else { @@ -558,7 +553,7 @@ func link(parent *section, child *section) { if exChild != nil && exChild != child { if child != nil { // if child is nil it is not a real fork - plog.DebugDetailf("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child)) + glog.V(logger.Detail).Infof("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child)) } exChild.parent = nil } @@ -568,7 +563,7 @@ func link(parent *section, child *section) { if exParent != nil && exParent != parent { if parent != nil { // if parent is nil it is not a real fork, but suicide delinking section - plog.DebugDetailf("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent)) + glog.V(logger.Detail).Infof("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent)) } exParent.child = nil } @@ -583,7 +578,7 @@ func link(parent *section, child *section) { caller must hold chain lock */ func (self *BlockPool) splitSection(parent *section, entry *entry) { - plog.DebugDetailf("[%s] split section at fork", sectionhex(parent)) + glog.V(logger.Detail).Infof("[%s] split section at fork", sectionhex(parent)) parent.deactivate() waiter := make(chan bool) parent.wait(waiter) @@ -606,14 +601,14 @@ func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec // and launch section process fetching block and further hashes if len(nodes) > 0 { sec = self.newSection(nodes) - plog.Debugf("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child)) + glog.V(logger.Debug).Infof("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child)) link(parent, sec) link(sec, child) } else { if parent != nil && child != nil { // now this can only happen if we allow response to hash request to include hash // in this case we just link parent and child (without needing root block of child section) - plog.Debugf("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child)) + glog.V(logger.Debug).Infof("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child)) link(parent, child) } } @@ -624,10 +619,10 @@ func (self *section) activate(p *peer) { self.bp.wg.Add(1) select { case <-self.offC: - plog.DebugDetailf("[%s] completed section process. cannot activate for peer <%s>", sectionhex(self), p.id) + glog.V(logger.Detail).Infof("[%s] completed section process. cannot activate for peer <%s>", sectionhex(self), p.id) self.bp.wg.Done() case self.controlC <- p: - plog.DebugDetailf("[%s] activate section process for peer <%s>", sectionhex(self), p.id) + glog.V(logger.Detail).Infof("[%s] activate section process for peer <%s>", sectionhex(self), p.id) } } @@ -641,16 +636,16 @@ func (self *section) remove() { select { case <-self.offC: close(self.suicideC) - plog.DebugDetailf("[%s] remove: suicide", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self)) case <-self.suicideC: - plog.DebugDetailf("[%s] remove: suicided already", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] remove: suicided already", sectionhex(self)) default: - plog.DebugDetailf("[%s] remove: suicide", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self)) close(self.suicideC) } self.unlink() self.bp.remove(self) - plog.DebugDetailf("[%s] removed section.", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] removed section.", sectionhex(self)) } @@ -661,7 +656,7 @@ func (self *section) removeChain() { child := self.child self.bp.chainLock.RUnlock() - plog.DebugDetailf("[%s] remove chain", sectionhex(self)) + glog.V(logger.Detail).Infof("[%s] remove chain", sectionhex(self)) self.remove() if child != nil { child.removeChain() diff --git a/blockpool/status_test.go b/blockpool/status_test.go index 000453de5..f7e63e421 100644 --- a/blockpool/status_test.go +++ b/blockpool/status_test.go @@ -51,7 +51,6 @@ func checkStatus(t *testing.T, bp *BlockPool, syncing bool, expected []int) (err got := getStatusValues(s) for i, v := range expected { err = test.CheckInt(statusFields[i], got[i], v, t) - // fmt.Printf("%v: %v (%v)\n", statusFields[i], got[i], v) if err != nil { return } @@ -60,9 +59,6 @@ func checkStatus(t *testing.T, bp *BlockPool, syncing bool, expected []int) (err } func TestBlockPoolStatus(t *testing.T) { - t.Skip() // :FIXME: - - test.LogInit() var err error n := 3 for n > 0 { @@ -86,19 +82,17 @@ func testBlockPoolStatus(t *testing.T) (err error) { blockPoolTester.blockChain[0] = nil blockPoolTester.initRefBlockChain(12) blockPoolTester.refBlockChain[3] = []int{4, 7} - delete(blockPoolTester.refBlockChain, 6) + blockPoolTester.refBlockChain[5] = []int{10} + blockPoolTester.refBlockChain[6] = []int{11} + blockPoolTester.refBlockChain[9] = []int{6} + delete(blockPoolTester.refBlockChain, 10) blockPool.Start() - blockPoolTester.tds = make(map[int]int) - blockPoolTester.tds[9] = 1 - blockPoolTester.tds[11] = 3 - blockPoolTester.tds[6] = 2 - - peer1 := blockPoolTester.newPeer("peer1", 1, 9) - peer2 := blockPoolTester.newPeer("peer2", 2, 6) - peer3 := blockPoolTester.newPeer("peer3", 3, 11) - peer4 := blockPoolTester.newPeer("peer4", 1, 9) + peer1 := blockPoolTester.newPeer("peer1", 9, 9) + peer2 := blockPoolTester.newPeer("peer2", 10, 10) + peer3 := blockPoolTester.newPeer("peer3", 11, 11) + peer4 := blockPoolTester.newPeer("peer4", 9, 9) peer2.blocksRequestsMap = peer1.blocksRequestsMap var expected []int @@ -124,119 +118,112 @@ func testBlockPoolStatus(t *testing.T) (err error) { } peer1.serveBlockHashes(9, 8, 7, 3, 2) - expected = []int{6, 5, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0} + expected = []int{5, 5, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer1.serveBlocks(3, 7, 8) - expected = []int{6, 5, 3, 3, 0, 1, 0, 0, 1, 1, 1, 1, 0} + expected = []int{5, 5, 3, 3, 0, 1, 0, 0, 1, 1, 1, 1, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer1.serveBlocks(2, 3) - expected = []int{6, 5, 4, 4, 0, 1, 0, 0, 1, 1, 1, 1, 0} + expected = []int{5, 5, 4, 4, 0, 1, 0, 0, 1, 1, 1, 1, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer4.AddPeer() - expected = []int{6, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0} - err = checkStatus(nil, blockPool, true, expected) - if err != nil { - return - } - - peer4.sendBlockHashes(12, 11) - expected = []int{6, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0} + expected = []int{5, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer2.AddPeer() - expected = []int{6, 5, 4, 4, 0, 3, 0, 0, 3, 3, 1, 2, 0} + expected = []int{5, 5, 4, 4, 0, 3, 0, 0, 3, 3, 1, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } - peer2.serveBlocks(5, 6) - peer2.serveBlockHashes(6, 5, 4, 3, 2) - expected = []int{10, 8, 5, 5, 0, 3, 1, 0, 3, 3, 2, 2, 0} + peer2.serveBlocks(5, 10) + peer2.serveBlockHashes(10, 5, 4, 3, 2) + expected = []int{8, 8, 5, 5, 0, 3, 1, 0, 3, 3, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer2.serveBlocks(2, 3, 4) - expected = []int{10, 8, 6, 6, 0, 3, 1, 0, 3, 3, 2, 2, 0} + expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 3, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } blockPool.RemovePeer("peer2") - expected = []int{10, 8, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0} + expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer1.serveBlockHashes(2, 1, 0) - expected = []int{11, 9, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0} + expected = []int{9, 9, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer1.serveBlocks(1, 2) - expected = []int{11, 9, 7, 7, 0, 3, 1, 0, 3, 2, 2, 2, 0} + expected = []int{9, 9, 7, 7, 0, 3, 1, 0, 3, 2, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer1.serveBlocks(4, 5) - expected = []int{11, 9, 8, 8, 0, 3, 1, 0, 3, 2, 2, 2, 0} + expected = []int{9, 9, 8, 8, 0, 3, 1, 0, 3, 2, 2, 2, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer3.AddPeer() - expected = []int{11, 9, 8, 8, 0, 4, 1, 0, 4, 3, 2, 3, 0} + expected = []int{9, 9, 8, 8, 0, 4, 1, 0, 4, 3, 2, 3, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } - peer3.serveBlocks(10, 11) - expected = []int{12, 9, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0} + peer3.serveBlocks(6, 11) + expected = []int{10, 9, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } - peer3.serveBlockHashes(11, 10, 9) - expected = []int{14, 11, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0} + peer3.serveBlockHashes(11, 6, 9) + expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } peer4.sendBlocks(11, 12) - expected = []int{14, 11, 9, 9, 0, 4, 1, 0, 4, 3, 4, 3, 1} + expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 4, 3, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return } - peer3.serveBlocks(9, 10) - expected = []int{14, 11, 10, 10, 0, 4, 1, 0, 4, 3, 4, 3, 1} + peer3.serveBlocks(9, 6) + expected = []int{11, 11, 10, 10, 0, 4, 1, 0, 4, 3, 4, 3, 0} err = checkStatus(nil, blockPool, true, expected) if err != nil { return @@ -245,10 +232,11 @@ func testBlockPoolStatus(t *testing.T) (err error) { peer3.serveBlocks(0, 1) blockPool.Wait(waitTimeout) time.Sleep(200 * time.Millisecond) + + expected = []int{11, 3, 11, 3, 8, 4, 1, 8, 4, 3, 4, 3, 0} + err = checkStatus(nil, blockPool, false, expected) blockPool.Stop() - expected = []int{14, 3, 11, 3, 8, 4, 1, 8, 4, 3, 4, 3, 1} - err = checkStatus(nil, blockPool, false, expected) if err != nil { return } diff --git a/core/chain_manager.go b/core/chain_manager.go index 5ad1dda83..676db405e 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -470,6 +470,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) { + block.SetQueued(true) self.futureBlocks.Push(block) stats.queued++ continue diff --git a/errs/errors.go b/errs/errors.go index 2e6c32494..face9b947 100644 --- a/errs/errors.go +++ b/errs/errors.go @@ -69,7 +69,7 @@ func (self *Errors) New(code int, format string, params ...interface{}) *Error { func (self Error) Error() (message string) { if len(message) == 0 { - self.message = fmt.Sprintf("[%s] %s", self.Package, self.Name) + self.message = fmt.Sprintf("[%s] ERROR: %s", self.Package, self.Name) if self.format != "" { self.message += ": " + fmt.Sprintf(self.format, self.params...) } @@ -81,15 +81,8 @@ func (self Error) Log(v glog.Verbose) { if v { v.Infoln(self) } - //log.Sendln(self.level, self) } -/* -func (self Error) Log(log *logger.Logger) { - log.Sendln(self.level, self) -} -*/ - /* err.Fatal() is true if err's severity level is 0 or 1 (logger.ErrorLevel or logger.Silence) */ diff --git a/errs/errors_test.go b/errs/errors_test.go index 09f70eef5..319093987 100644 --- a/errs/errors_test.go +++ b/errs/errors_test.go @@ -28,7 +28,7 @@ func testErrors() *Errors { func TestErrorMessage(t *testing.T) { err := testErrors().New(0, "zero detail %v", "available") message := fmt.Sprintf("%v", err) - exp := "[TEST] zero: zero detail available" + exp := "[TEST] ERROR: zero: zero detail available" if message != exp { t.Errorf("error message incorrect. expected %v, got %v", exp, message) } diff --git a/eth/protocol.go b/eth/protocol.go index 878038f74..1a19307db 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -299,7 +299,7 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) - if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best { + if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended { self.blockPool.AddBlock(request.Block, self.id) } @@ -384,11 +384,9 @@ func (self *ethProtocol) sendStatus() error { } func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) { - //err.Log(self.peer.Logger) err.Log(glog.V(logger.Info)) - /* - if err.Fatal() { - self.peer.Disconnect(p2p.DiscSubprotocolError) - } - */ + if err.Fatal() { + self.peer.Disconnect(p2p.DiscSubprotocolError) + } + } diff --git a/logger/glog/README b/logger/glog/README index 5f9c11485..c7b1e60cc 100644 --- a/logger/glog/README +++ b/logger/glog/README @@ -19,20 +19,20 @@ The comment from glog.go introduces the ideas: Error, Fatal, plus formatting variants such as Infof. It also provides V-style logging controlled by the -v and -vmodule=file=2 flags. - + Basic examples: - + glog.Info("Prepare to repel boarders") - + glog.Fatalf("Initialization failed: %s", err) - + See the documentation for the V function for an explanation of these examples: - + if glog.V(2) { glog.Info("Starting transaction...") } - + glog.V(2).Infoln("Processed", nItems, "elements")