From 391e89d70a43b4a2153db8acac9a6af7a4f76adf Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 22:53:15 +0000 Subject: [PATCH] use own total difficulty to limit best peer - update blockpool td by subscribing to ChainHeadEvent - if ahead of best peer, demote it - addPeer now take own td as current td - removePeer now take own td as current td - add relevant tests to peers_test - eth: backend now calls blockpool with eth.eventMux and chainManager.Td --- blockpool/blockpool.go | 46 ++++++++++++++++++++++++++++++-- blockpool/blockpool_util_test.go | 5 +++- blockpool/config_test.go | 5 ++-- blockpool/peers.go | 15 +++++------ blockpool/peers_test.go | 26 +++++++++++++++++- eth/backend.go | 3 ++- eth/protocol_test.go | 13 +++------ 7 files changed, 87 insertions(+), 26 deletions(-) diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index ef619b27b..a552e1b72 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -7,8 +7,10 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/event" ethlogger "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/pow" ) @@ -32,8 +34,9 @@ var ( blockHashesTimeout = 60 * time.Second // timeout interval: max time allowed for peer without sending a block blocksTimeout = 60 * time.Second - // - idleBestPeerTimeout = 120 * time.Second + // timeout interval: max time allowed for best peer to remain idle (not send new block after sync complete) + idleBestPeerTimeout = 120 * time.Second + // duration of suspension after peer fatal error during which peer is not allowed to reconnect peerSuspensionInterval = 300 * time.Second ) @@ -131,6 +134,10 @@ type BlockPool struct { hasBlock func(hash common.Hash) bool insertChain func(types.Blocks) error verifyPoW func(pow.Block) bool + chainEvents *event.TypeMux + + tdSub event.Subscription + td *big.Int pool map[string]*entry peers *peers @@ -152,6 +159,8 @@ func New( hasBlock func(hash common.Hash) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool, + chainEvents *event.TypeMux, + td *big.Int, ) *BlockPool { return &BlockPool{ @@ -159,6 +168,8 @@ func New( hasBlock: hasBlock, insertChain: insertChain, verifyPoW: verifyPoW, + chainEvents: chainEvents, + td: td, } } @@ -198,12 +209,29 @@ func (self *BlockPool) Start() { status: self.status, bp: self, } + + self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{}) timer := time.NewTicker(3 * time.Second) go func() { for { select { case <-self.quit: return + case event := <-self.tdSub.Chan(): + if ev, ok := event.(core.ChainHeadEvent); ok { + td := ev.Block.Td + plog.DebugDetailf("td: %v", td) + self.setTD(td) + self.peers.lock.Lock() + + if best := self.peers.best; best != nil { + if td.Cmp(best.td) >= 0 { + self.peers.best = nil + self.switchPeer(best, nil) + } + } + self.peers.lock.Unlock() + } case <-timer.C: plog.DebugDetailf("status:\n%v", self.Status()) } @@ -224,6 +252,7 @@ func (self *BlockPool) Stop() { plog.Infoln("Stopping...") + self.tdSub.Unsubscribe() close(self.quit) self.lock.Lock() @@ -736,6 +765,19 @@ func (self *BlockPool) set(hash common.Hash, e *entry) { self.pool[hash.Str()] = e } +// accessor and setter for total difficulty +func (self *BlockPool) getTD() *big.Int { + self.lock.RLock() + defer self.lock.RUnlock() + return self.td +} + +func (self *BlockPool) setTD(td *big.Int) { + self.lock.Lock() + defer self.lock.Unlock() + self.td = td +} + func (self *BlockPool) remove(sec *section) { // delete node entries from pool index under pool lock self.lock.Lock() diff --git a/blockpool/blockpool_util_test.go b/blockpool/blockpool_util_test.go index 5ba92066c..a17bc584e 100644 --- a/blockpool/blockpool_util_test.go +++ b/blockpool/blockpool_util_test.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/pow" ) @@ -38,6 +39,7 @@ type blockPoolTester struct { blockChain blockChain blockPool *BlockPool t *testing.T + chainEvents *event.TypeMux } func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *BlockPool, b *blockPoolTester) { @@ -48,8 +50,9 @@ func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *Blo blockChain: make(blockChain), refBlockChain: make(blockChain), blocksRequestsMap: make(map[int]bool), + chainEvents: &event.TypeMux{}, } - b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW) + b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW, b.chainEvents, common.Big0) blockPool = b.blockPool blockPool.Config.BlockHashesRequestInterval = testBlockHashesRequestInterval blockPool.Config.BlocksRequestInterval = testBlocksRequestInterval diff --git a/blockpool/config_test.go b/blockpool/config_test.go index 8eeaceb51..2cb93769e 100644 --- a/blockpool/config_test.go +++ b/blockpool/config_test.go @@ -5,11 +5,12 @@ import ( "time" "github.com/ethereum/go-ethereum/blockpool/test" + "github.com/ethereum/go-ethereum/event" ) func TestBlockPoolConfig(t *testing.T) { test.LogInit() - blockPool := &BlockPool{Config: &Config{}} + blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}} blockPool.Start() c := blockPool.Config test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, blockHashesBatchSize, t) @@ -26,7 +27,7 @@ func TestBlockPoolConfig(t *testing.T) { func TestBlockPoolOverrideConfig(t *testing.T) { test.LogInit() - blockPool := &BlockPool{Config: &Config{}} + blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}} c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second} blockPool.Config = c diff --git a/blockpool/peers.go b/blockpool/peers.go index 81bab31e7..41782983c 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" ) @@ -256,7 +255,7 @@ func (self *peers) addPeer( } best = true } else { - currentTD := common.Big0 + currentTD := self.bp.getTD() if self.best != nil { currentTD = self.best.td } @@ -264,7 +263,7 @@ func (self *peers) addPeer( self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> promoted best peer", id) + plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) self.bp.switchPeer(self.best, p) self.best = p best = true @@ -275,10 +274,8 @@ func (self *peers) addPeer( // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects func (self *peers) removePeer(id string) { - plog.Debugf("addPeer: remove peer 0 <%v>", id) self.lock.Lock() defer self.lock.Unlock() - plog.Debugf("addPeer: remove peer 1 <%v>", id) p, found := self.peers[id] if !found { @@ -286,13 +283,13 @@ func (self *peers) removePeer(id string) { } delete(self.peers, id) - plog.Debugf("addPeer: remove peer <%v>", id) + plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td) // if current best peer is removed, need to find a better one if self.best == p { var newp *peer - // FIXME: own TD - max := common.Big0 + // only peers that are ahead of us are considered + max := self.bp.getTD() // peer with the highest self-acclaimed TD is chosen for _, pp := range self.peers { if pp.td.Cmp(max) > 0 { @@ -304,7 +301,7 @@ func (self *peers) removePeer(id string) { self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td) + plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) } else { plog.Warnln("addPeer: no suitable peers found") } diff --git a/blockpool/peers_test.go b/blockpool/peers_test.go index e53d7160b..99dd16ba1 100644 --- a/blockpool/peers_test.go +++ b/blockpool/peers_test.go @@ -3,8 +3,12 @@ package blockpool import ( "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/blockpool/test" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" ) // the actual tests @@ -115,6 +119,26 @@ func TestAddPeer(t *testing.T) { } peer0.waitBlocksRequests(3) - blockPool.Stop() + newblock := &types.Block{Td: common.Big3} + blockPool.chainEvents.Post(core.ChainHeadEvent{newblock}) + time.Sleep(100 * time.Millisecond) + if blockPool.peers.best != nil { + t.Errorf("no peer should be ahead of self") + } + best = peer1.AddPeer() + if blockPool.peers.best != nil { + t.Errorf("still no peer should be ahead of self") + } + best = peer2.AddPeer() + if !best { + t.Errorf("peer2 (TD=4) not accepted as best") + } + + blockPool.RemovePeer("peer2") + if blockPool.peers.best != nil { + t.Errorf("no peer should be ahead of self") + } + + blockPool.Stop() } diff --git a/eth/backend.go b/eth/backend.go index afe314d74..141c6c605 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -195,7 +195,8 @@ func New(config *Config) (*Ethereum, error) { hasBlock := eth.chainManager.HasBlock insertChain := eth.chainManager.InsertChain - eth.blockPool = blockpool.New(hasBlock, insertChain, eth.pow.Verify) + td := eth.chainManager.Td() + eth.blockPool = blockpool.New(hasBlock, insertChain, eth.pow.Verify, eth.EventMux(), td) netprv, err := config.nodeKey() if err != nil { diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 7620b3854..8ca6d1be6 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -41,17 +41,10 @@ type testChainManager struct { type testBlockPool struct { addBlockHashes func(next func() (common.Hash, bool), peerId string) addBlock func(block *types.Block, peerId string) (err error) - addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool) + addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) removePeer func(peerId string) } -// func (self *testTxPool) GetTransactions() (txs []*types.Transaction) { -// if self.getTransactions != nil { -// txs = self.getTransactions() -// } -// return -// } - func (self *testTxPool) AddTransactions(txs []*types.Transaction) { if self.addTransactions != nil { self.addTransactions(txs) @@ -93,9 +86,9 @@ func (self *testBlockPool) AddBlock(block *types.Block, peerId string) { } } -func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool) { +func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) { if self.addPeer != nil { - best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError) + best, suspended = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError) } return }