use own total difficulty to limit best peer

- update blockpool td by subscribing to ChainHeadEvent
- if ahead of best peer, demote it
- addPeer now take own td as current td
- removePeer now take own td as current td
- add relevant tests to peers_test
- eth: backend now calls blockpool with eth.eventMux and chainManager.Td
This commit is contained in:
zelig 2015-03-19 22:53:15 +00:00
parent 50661f0e68
commit 391e89d70a
7 changed files with 87 additions and 26 deletions

View File

@ -7,8 +7,10 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/event"
ethlogger "github.com/ethereum/go-ethereum/logger" ethlogger "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
) )
@ -32,8 +34,9 @@ var (
blockHashesTimeout = 60 * time.Second blockHashesTimeout = 60 * time.Second
// timeout interval: max time allowed for peer without sending a block // timeout interval: max time allowed for peer without sending a block
blocksTimeout = 60 * time.Second blocksTimeout = 60 * time.Second
// // timeout interval: max time allowed for best peer to remain idle (not send new block after sync complete)
idleBestPeerTimeout = 120 * time.Second idleBestPeerTimeout = 120 * time.Second
// duration of suspension after peer fatal error during which peer is not allowed to reconnect
peerSuspensionInterval = 300 * time.Second peerSuspensionInterval = 300 * time.Second
) )
@ -131,6 +134,10 @@ type BlockPool struct {
hasBlock func(hash common.Hash) bool hasBlock func(hash common.Hash) bool
insertChain func(types.Blocks) error insertChain func(types.Blocks) error
verifyPoW func(pow.Block) bool verifyPoW func(pow.Block) bool
chainEvents *event.TypeMux
tdSub event.Subscription
td *big.Int
pool map[string]*entry pool map[string]*entry
peers *peers peers *peers
@ -152,6 +159,8 @@ func New(
hasBlock func(hash common.Hash) bool, hasBlock func(hash common.Hash) bool,
insertChain func(types.Blocks) error, insertChain func(types.Blocks) error,
verifyPoW func(pow.Block) bool, verifyPoW func(pow.Block) bool,
chainEvents *event.TypeMux,
td *big.Int,
) *BlockPool { ) *BlockPool {
return &BlockPool{ return &BlockPool{
@ -159,6 +168,8 @@ func New(
hasBlock: hasBlock, hasBlock: hasBlock,
insertChain: insertChain, insertChain: insertChain,
verifyPoW: verifyPoW, verifyPoW: verifyPoW,
chainEvents: chainEvents,
td: td,
} }
} }
@ -198,12 +209,29 @@ func (self *BlockPool) Start() {
status: self.status, status: self.status,
bp: self, bp: self,
} }
self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{})
timer := time.NewTicker(3 * time.Second) timer := time.NewTicker(3 * time.Second)
go func() { go func() {
for { for {
select { select {
case <-self.quit: case <-self.quit:
return return
case event := <-self.tdSub.Chan():
if ev, ok := event.(core.ChainHeadEvent); ok {
td := ev.Block.Td
plog.DebugDetailf("td: %v", td)
self.setTD(td)
self.peers.lock.Lock()
if best := self.peers.best; best != nil {
if td.Cmp(best.td) >= 0 {
self.peers.best = nil
self.switchPeer(best, nil)
}
}
self.peers.lock.Unlock()
}
case <-timer.C: case <-timer.C:
plog.DebugDetailf("status:\n%v", self.Status()) plog.DebugDetailf("status:\n%v", self.Status())
} }
@ -224,6 +252,7 @@ func (self *BlockPool) Stop() {
plog.Infoln("Stopping...") plog.Infoln("Stopping...")
self.tdSub.Unsubscribe()
close(self.quit) close(self.quit)
self.lock.Lock() self.lock.Lock()
@ -736,6 +765,19 @@ func (self *BlockPool) set(hash common.Hash, e *entry) {
self.pool[hash.Str()] = e self.pool[hash.Str()] = e
} }
// accessor and setter for total difficulty
func (self *BlockPool) getTD() *big.Int {
self.lock.RLock()
defer self.lock.RUnlock()
return self.td
}
func (self *BlockPool) setTD(td *big.Int) {
self.lock.Lock()
defer self.lock.Unlock()
self.td = td
}
func (self *BlockPool) remove(sec *section) { func (self *BlockPool) remove(sec *section) {
// delete node entries from pool index under pool lock // delete node entries from pool index under pool lock
self.lock.Lock() self.lock.Lock()

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
) )
@ -38,6 +39,7 @@ type blockPoolTester struct {
blockChain blockChain blockChain blockChain
blockPool *BlockPool blockPool *BlockPool
t *testing.T t *testing.T
chainEvents *event.TypeMux
} }
func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *BlockPool, b *blockPoolTester) { func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *BlockPool, b *blockPoolTester) {
@ -48,8 +50,9 @@ func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *Blo
blockChain: make(blockChain), blockChain: make(blockChain),
refBlockChain: make(blockChain), refBlockChain: make(blockChain),
blocksRequestsMap: make(map[int]bool), blocksRequestsMap: make(map[int]bool),
chainEvents: &event.TypeMux{},
} }
b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW) b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW, b.chainEvents, common.Big0)
blockPool = b.blockPool blockPool = b.blockPool
blockPool.Config.BlockHashesRequestInterval = testBlockHashesRequestInterval blockPool.Config.BlockHashesRequestInterval = testBlockHashesRequestInterval
blockPool.Config.BlocksRequestInterval = testBlocksRequestInterval blockPool.Config.BlocksRequestInterval = testBlocksRequestInterval

View File

@ -5,11 +5,12 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/blockpool/test" "github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/event"
) )
func TestBlockPoolConfig(t *testing.T) { func TestBlockPoolConfig(t *testing.T) {
test.LogInit() test.LogInit()
blockPool := &BlockPool{Config: &Config{}} blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}}
blockPool.Start() blockPool.Start()
c := blockPool.Config c := blockPool.Config
test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, blockHashesBatchSize, t) test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, blockHashesBatchSize, t)
@ -26,7 +27,7 @@ func TestBlockPoolConfig(t *testing.T) {
func TestBlockPoolOverrideConfig(t *testing.T) { func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit() test.LogInit()
blockPool := &BlockPool{Config: &Config{}} blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second} c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second}
blockPool.Config = c blockPool.Config = c

View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/errs"
) )
@ -256,7 +255,7 @@ func (self *peers) addPeer(
} }
best = true best = true
} else { } else {
currentTD := common.Big0 currentTD := self.bp.getTD()
if self.best != nil { if self.best != nil {
currentTD = self.best.td currentTD = self.best.td
} }
@ -264,7 +263,7 @@ func (self *peers) addPeer(
self.status.lock.Lock() self.status.lock.Lock()
self.status.bestPeers[p.id]++ self.status.bestPeers[p.id]++
self.status.lock.Unlock() self.status.lock.Unlock()
plog.Debugf("addPeer: peer <%v> promoted best peer", id) plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
self.bp.switchPeer(self.best, p) self.bp.switchPeer(self.best, p)
self.best = p self.best = p
best = true best = true
@ -275,10 +274,8 @@ func (self *peers) addPeer(
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string) { func (self *peers) removePeer(id string) {
plog.Debugf("addPeer: remove peer 0 <%v>", id)
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
plog.Debugf("addPeer: remove peer 1 <%v>", id)
p, found := self.peers[id] p, found := self.peers[id]
if !found { if !found {
@ -286,13 +283,13 @@ func (self *peers) removePeer(id string) {
} }
delete(self.peers, id) delete(self.peers, id)
plog.Debugf("addPeer: remove peer <%v>", id) plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td)
// if current best peer is removed, need to find a better one // if current best peer is removed, need to find a better one
if self.best == p { if self.best == p {
var newp *peer var newp *peer
// FIXME: own TD // only peers that are ahead of us are considered
max := common.Big0 max := self.bp.getTD()
// peer with the highest self-acclaimed TD is chosen // peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers { for _, pp := range self.peers {
if pp.td.Cmp(max) > 0 { if pp.td.Cmp(max) > 0 {
@ -304,7 +301,7 @@ func (self *peers) removePeer(id string) {
self.status.lock.Lock() self.status.lock.Lock()
self.status.bestPeers[p.id]++ self.status.bestPeers[p.id]++
self.status.lock.Unlock() self.status.lock.Unlock()
plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td) plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
} else { } else {
plog.Warnln("addPeer: no suitable peers found") plog.Warnln("addPeer: no suitable peers found")
} }

View File

@ -3,8 +3,12 @@ package blockpool
import ( import (
"math/big" "math/big"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test" "github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
) )
// the actual tests // the actual tests
@ -115,6 +119,26 @@ func TestAddPeer(t *testing.T) {
} }
peer0.waitBlocksRequests(3) peer0.waitBlocksRequests(3)
blockPool.Stop() newblock := &types.Block{Td: common.Big3}
blockPool.chainEvents.Post(core.ChainHeadEvent{newblock})
time.Sleep(100 * time.Millisecond)
if blockPool.peers.best != nil {
t.Errorf("no peer should be ahead of self")
}
best = peer1.AddPeer()
if blockPool.peers.best != nil {
t.Errorf("still no peer should be ahead of self")
}
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=4) not accepted as best")
}
blockPool.RemovePeer("peer2")
if blockPool.peers.best != nil {
t.Errorf("no peer should be ahead of self")
}
blockPool.Stop()
} }

View File

@ -195,7 +195,8 @@ func New(config *Config) (*Ethereum, error) {
hasBlock := eth.chainManager.HasBlock hasBlock := eth.chainManager.HasBlock
insertChain := eth.chainManager.InsertChain insertChain := eth.chainManager.InsertChain
eth.blockPool = blockpool.New(hasBlock, insertChain, eth.pow.Verify) td := eth.chainManager.Td()
eth.blockPool = blockpool.New(hasBlock, insertChain, eth.pow.Verify, eth.EventMux(), td)
netprv, err := config.nodeKey() netprv, err := config.nodeKey()
if err != nil { if err != nil {

View File

@ -41,17 +41,10 @@ type testChainManager struct {
type testBlockPool struct { type testBlockPool struct {
addBlockHashes func(next func() (common.Hash, bool), peerId string) addBlockHashes func(next func() (common.Hash, bool), peerId string)
addBlock func(block *types.Block, peerId string) (err error) addBlock func(block *types.Block, peerId string) (err error)
addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool) addPeer func(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool)
removePeer func(peerId string) removePeer func(peerId string)
} }
// func (self *testTxPool) GetTransactions() (txs []*types.Transaction) {
// if self.getTransactions != nil {
// txs = self.getTransactions()
// }
// return
// }
func (self *testTxPool) AddTransactions(txs []*types.Transaction) { func (self *testTxPool) AddTransactions(txs []*types.Transaction) {
if self.addTransactions != nil { if self.addTransactions != nil {
self.addTransactions(txs) self.addTransactions(txs)
@ -93,9 +86,9 @@ func (self *testBlockPool) AddBlock(block *types.Block, peerId string) {
} }
} }
func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool) { func (self *testBlockPool) AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) {
if self.addPeer != nil { if self.addPeer != nil {
best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError) best, suspended = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError)
} }
return return
} }