improve documentation and move one test

This commit is contained in:
zelig 2015-03-19 23:14:08 +00:00
parent 8767179d74
commit a578db5dae
9 changed files with 202 additions and 156 deletions

View File

@ -38,10 +38,11 @@ var (
idleBestPeerTimeout = 120 * time.Second idleBestPeerTimeout = 120 * time.Second
// duration of suspension after peer fatal error during which peer is not allowed to reconnect // duration of suspension after peer fatal error during which peer is not allowed to reconnect
peerSuspensionInterval = 300 * time.Second peerSuspensionInterval = 300 * time.Second
// status is logged every statusUpdateInterval
statusUpdateInterval = 3 * time.Second
) )
// config embedded in components, by default fall back to constants // blockpool config, values default to constants
// by default all resolved to local
type Config struct { type Config struct {
BlockHashesBatchSize int BlockHashesBatchSize int
BlockBatchSize int BlockBatchSize int
@ -53,28 +54,40 @@ type Config struct {
BlocksTimeout time.Duration BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration IdleBestPeerTimeout time.Duration
PeerSuspensionInterval time.Duration PeerSuspensionInterval time.Duration
StatusUpdateInterval time.Duration
} }
// blockpool errors // blockpool errors
const ( const (
ErrInvalidBlock = iota ErrInvalidBlock = iota
ErrInvalidPoW ErrInvalidPoW
ErrUnrequestedBlock
ErrInsufficientChainInfo ErrInsufficientChainInfo
ErrIdleTooLong ErrIdleTooLong
ErrIncorrectTD ErrIncorrectTD
ErrUnrequestedBlock
) )
// error descriptions
var errorToString = map[int]string{ var errorToString = map[int]string{
ErrInvalidBlock: "Invalid block", ErrInvalidBlock: "Invalid block", // fatal
ErrInvalidPoW: "Invalid PoW", ErrInvalidPoW: "Invalid PoW", // fatal
ErrInsufficientChainInfo: "Insufficient chain info", // fatal
ErrIdleTooLong: "Idle too long", // fatal
ErrIncorrectTD: "Incorrect Total Difficulty", // fatal
ErrUnrequestedBlock: "Unrequested block", ErrUnrequestedBlock: "Unrequested block",
ErrInsufficientChainInfo: "Insufficient chain info",
ErrIdleTooLong: "Idle too long",
ErrIncorrectTD: "Incorrect Total Difficulty",
} }
// init initialises all your laundry // error severity
func severity(code int) ethlogger.LogLevel {
switch code {
case ErrUnrequestedBlock:
return ethlogger.WarnLevel
default:
return ethlogger.ErrorLevel
}
}
// init initialises the Config, zero values fall back to constants
func (self *Config) init() { func (self *Config) init() {
if self.BlockHashesBatchSize == 0 { if self.BlockHashesBatchSize == 0 {
self.BlockHashesBatchSize = blockHashesBatchSize self.BlockHashesBatchSize = blockHashesBatchSize
@ -106,6 +119,9 @@ func (self *Config) init() {
if self.PeerSuspensionInterval == 0 { if self.PeerSuspensionInterval == 0 {
self.PeerSuspensionInterval = peerSuspensionInterval self.PeerSuspensionInterval = peerSuspensionInterval
} }
if self.StatusUpdateInterval == 0 {
self.StatusUpdateInterval = statusUpdateInterval
}
} }
// node is the basic unit of the internal model of block chain/tree in the blockpool // node is the basic unit of the internal model of block chain/tree in the blockpool
@ -132,31 +148,35 @@ type entry struct {
type BlockPool struct { type BlockPool struct {
Config *Config Config *Config
// the minimal interface with blockchain // the minimal interface with blockchain manager
hasBlock func(hash common.Hash) bool hasBlock func(hash common.Hash) bool // query if block is known
insertChain func(types.Blocks) error insertChain func(types.Blocks) error // add section to blockchain
verifyPoW func(pow.Block) bool verifyPoW func(pow.Block) bool // soft PoW verification
chainEvents *event.TypeMux chainEvents *event.TypeMux // ethereum eventer for chainEvents
tdSub event.Subscription tdSub event.Subscription // subscription to core.ChainHeadEvent
td *big.Int td *big.Int // our own total difficulty
pool map[string]*entry pool map[string]*entry // the actual blockpool
peers *peers peers *peers // peers manager in peers.go
status *status // info about blockpool (UI interface) in status.go
lock sync.RWMutex lock sync.RWMutex
chainLock sync.RWMutex chainLock sync.RWMutex
// alloc-easy pool of hash slices // alloc-easy pool of hash slices
hashSlicePool chan []common.Hash hashSlicePool chan []common.Hash
status *status // waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
quit chan bool wg sync.WaitGroup //
wg sync.WaitGroup quit chan bool // chan used for quitting parallel routines
running bool running bool //
} }
// public constructor // public constructor
// after blockpool returned, config can be set
// BlockPool.Start will call Config.init to set missing values
func New( func New(
hasBlock func(hash common.Hash) bool, hasBlock func(hash common.Hash) bool,
insertChain func(types.Blocks) error, insertChain func(types.Blocks) error,
@ -175,15 +195,6 @@ func New(
} }
} }
func severity(code int) ethlogger.LogLevel {
switch code {
case ErrUnrequestedBlock:
return ethlogger.WarnLevel
default:
return ethlogger.ErrorLevel
}
}
// allows restart // allows restart
func (self *BlockPool) Start() { func (self *BlockPool) Start() {
self.lock.Lock() self.lock.Lock()
@ -193,7 +204,9 @@ func (self *BlockPool) Start() {
return return
} }
// set missing values
self.Config.init() self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150) self.hashSlicePool = make(chan []common.Hash, 150)
self.status = newStatus() self.status = newStatus()
self.quit = make(chan bool) self.quit = make(chan bool)
@ -212,8 +225,11 @@ func (self *BlockPool) Start() {
bp: self, bp: self,
} }
// subscribe and listen to core.ChainHeadEvent{} for uptodate TD
self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{}) self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{})
timer := time.NewTicker(3 * time.Second)
// status update interval
timer := time.NewTicker(self.Config.StatusUpdateInterval)
go func() { go func() {
for { for {
select { select {
@ -292,9 +308,14 @@ func (self *BlockPool) Wait(t time.Duration) {
/* /*
AddPeer is called by the eth protocol instance running on the peer after AddPeer is called by the eth protocol instance running on the peer after
the status message has been received with total difficulty and current block hash the status message has been received with total difficulty and current block hash
Called a second time with the same peer id, it is used to update chain info for a peer. This is used when a new (mined) block message is received.
Called a second time with the same peer id, it is used to update chain info for a peer.
This is used when a new (mined) block message is received.
RemovePeer needs to be called when the peer disconnects. RemovePeer needs to be called when the peer disconnects.
Peer info is currently not persisted across disconnects (or sessions)
Peer info is currently not persisted across disconnects (or sessions) except for suspension
*/ */
func (self *BlockPool) AddPeer( func (self *BlockPool) AddPeer(
@ -319,12 +340,12 @@ AddBlockHashes
Entry point for eth protocol to add block hashes received via BlockHashesMsg Entry point for eth protocol to add block hashes received via BlockHashesMsg
only hashes from the best peer are handled Only hashes from the best peer are handled
initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer) Initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
launches all block request processes on each chain section Launches all block request processes on each chain section
the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns. The first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
*/ */
func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) { func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) {
@ -335,7 +356,6 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
// bestpeer is still the best peer // bestpeer is still the best peer
self.wg.Add(1) self.wg.Add(1)
defer func() { self.wg.Done() }() defer func() { self.wg.Done() }()
self.status.lock.Lock() self.status.lock.Lock()
@ -360,11 +380,11 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
return return
} }
/* /*
when peer is promoted in switchPeer, a new header section process is launched When peer is promoted in switchPeer, a new header section process is launched.
as the head section skeleton is actually created here, it is signaled to the process Once the head section skeleton is actually created here, it is signaled to the process
so that it can quit so that it can quit.
in the special case that the node for parent of the head block is found in the blockpool In the special case that the node for parent of the head block is found in the blockpool
(with or without fetched block) (with or without fetched block), a singleton section containing only the head block node is created.
*/ */
headSection = true headSection = true
if entry := self.get(bestpeer.currentBlockHash); entry == nil { if entry := self.get(bestpeer.currentBlockHash); entry == nil {
@ -383,7 +403,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
} else { } else {
// otherwise set child section iff found node is the root of a section // otherwise set child section iff found node is the root of a section
// this is a possible scenario when a singleton head section was created // this is a possible scenario when a singleton head section was created
// on an earlier occasion this peer or another with the same block was best peer // on an earlier occasion when this peer or another with the same block was best peer
if entry.node == entry.section.bottom { if entry.node == entry.section.bottom {
child = entry.section child = entry.section
plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash))
@ -414,7 +434,7 @@ LOOP:
default: default:
} }
// if we reach the blockchain we stop reading more // if we reach the blockchain we stop reading further blockhashes
if self.hasBlock(hash) { if self.hasBlock(hash) {
// check if known block connecting the downloaded chain to our blockchain // check if known block connecting the downloaded chain to our blockchain
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
@ -451,10 +471,11 @@ LOOP:
// reached a known chain in the pool // reached a known chain in the pool
if entry.node == entry.section.bottom && n == 1 { if entry.node == entry.section.bottom && n == 1 {
/* /*
the first block hash received is an orphan in the pool The first block hash received is an orphan node in the pool
this also supports clients that (despite the spec) include <from> hash in their
This also supports clients that (despite the spec) include <from> hash in their
response to hashes request. Note that by providing <from> we can link sections response to hashes request. Note that by providing <from> we can link sections
without having to wait for the root block of the child section to arrive, so it allows for superior performance without having to wait for the root block of the child section to arrive, so it allows for superior performance.
*/ */
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
// record the entry's chain section as child section // record the entry's chain section as child section
@ -486,9 +507,8 @@ LOOP:
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes))
/* /*
handle forks where connecting node is mid-section Handle forks where connecting node is mid-section by splitting section at fork.
by splitting section at fork No splitting needed if connecting node is head of a section.
no splitting needed if connecting node is head of a section
*/ */
if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 { if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash))
@ -500,10 +520,7 @@ LOOP:
self.status.lock.Unlock() self.status.lock.Unlock()
} }
/* // If new section is created, link it to parent/child sections.
if new section is created, link it to parent/child sections
and launch section process fetching blocks and further hashes
*/
sec = self.linkSections(nodes, parent, child) sec = self.linkSections(nodes, parent, child)
if sec != nil { if sec != nil {
@ -516,11 +533,12 @@ LOOP:
self.chainLock.Unlock() self.chainLock.Unlock()
/* /*
if a blockpool node is reached (parent section is not nil), If a blockpool node is reached (parent section is not nil),
activate section (unless our peer is demoted by now). activate section (unless our peer is demoted by now).
this can be the bottom half of a newly split section in case of a fork. This can be the bottom half of a newly split section in case of a fork.
bestPeer is nil if we got here after our peer got demoted while processing. bestPeer is nil if we got here after our peer got demoted while processing.
in this case no activation should happen In this case no activation should happen
*/ */
if parent != nil && !peerswitch { if parent != nil && !peerswitch {
self.activateChain(parent, bestpeer, nil) self.activateChain(parent, bestpeer, nil)
@ -528,9 +546,8 @@ LOOP:
} }
/* /*
if a new section was created, If a new section was created, register section iff head section or no child known
register section iff head section or no child known Activate it with this peer.
activate it with this peer
*/ */
if sec != nil { if sec != nil {
// switch on section process (it is paused by switchC) // switch on section process (it is paused by switchC)
@ -541,9 +558,9 @@ LOOP:
bestpeer.lock.Unlock() bestpeer.lock.Unlock()
} }
/* /*
request next block hashes for parent section here. Request another batch of older block hashes for parent section here.
but only once, repeating only when bottom block arrives, But only once, repeating only when the section's root block arrives.
otherwise no way to check if it arrived Otherwise no way to check if it arrived.
*/ */
bestpeer.requestBlockHashes(sec.bottom.hash) bestpeer.requestBlockHashes(sec.bottom.hash)
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
@ -554,7 +571,7 @@ LOOP:
} }
} }
// if we are processing peer's head section, signal it to headSection process that it is created // If we are processing peer's head section, signal it to headSection process that it is created.
if headSection { if headSection {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash))
@ -578,11 +595,13 @@ LOOP:
/* /*
AddBlock is the entry point for the eth protocol to call when blockMsg is received. AddBlock is the entry point for the eth protocol to call when blockMsg is received.
It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error.
At the same time it is opportunistic in that if a requested block may be provided by any peer. At the same time it is opportunistic in that if a requested block may be provided by any peer.
The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit. The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit.
If the block received is the head block of the current best peer, signal it to the head section process
*/ */
func (self *BlockPool) AddBlock(block *types.Block, peerId string) { func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash() hash := block.Hash()
@ -611,6 +630,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
self.status.lock.Unlock() self.status.lock.Unlock()
} else { } else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash))
// signal to head section process
sender.currentBlockC <- block sender.currentBlockC <- block
} }
} else { } else {
@ -629,7 +649,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
sender.lock.Unlock() sender.lock.Unlock()
if entry == nil { if entry == nil {
// penalise peer for sending what we have not asked
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash) sender.addError(ErrUnrequestedBlock, "%x", hash)
@ -647,7 +666,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
node.lock.Lock() node.lock.Lock()
defer node.lock.Unlock() defer node.lock.Unlock()
// check if block already present // check if block already received
if node.block != nil { if node.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy) plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
return return
@ -683,9 +702,9 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
} }
/* /*
iterates down a chain section by section activateChain iterates down a chain section by section.
activating section process on incomplete sections with peer It activates the section process on incomplete sections with peer.
relinking orphaned sections with their parent if root block (and its parent hash) is known) It relinks orphaned sections with their parent if root block (and its parent hash) is known.
*/ */
func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) { func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) {
@ -704,8 +723,8 @@ LOOP:
connected[sec.top.hash.Str()] = sec connected[sec.top.hash.Str()] = sec
} }
/* /*
we need to relink both complete and incomplete sections Need to relink both complete and incomplete sections
the latter could have been blockHashesRequestsComplete before being delinked from its parent An incomplete section could have been blockHashesRequestsComplete before being delinked from its parent.
*/ */
if parent == nil { if parent == nil {
if sec.bottom.block != nil { if sec.bottom.block != nil {
@ -720,7 +739,7 @@ LOOP:
} }
sec = parent sec = parent
// stop if peer got demoted // stop if peer got demoted or global quit
select { select {
case <-switchC: case <-switchC:
break LOOP break LOOP
@ -731,6 +750,7 @@ LOOP:
} }
} }
// check if block's actual TD (calculated after successful insertChain) is identical to TD advertised for peer's head block.
func (self *BlockPool) checkTD(nodes ...*node) { func (self *BlockPool) checkTD(nodes ...*node) {
for _, n := range nodes { for _, n := range nodes {
if n.td != nil { if n.td != nil {
@ -742,7 +762,7 @@ func (self *BlockPool) checkTD(nodes ...*node) {
} }
} }
// must run in separate go routine, otherwise // requestBlocks must run in separate go routine, otherwise
// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock // switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock
func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) { func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) {
self.wg.Add(1) self.wg.Add(1)
@ -806,6 +826,7 @@ func (self *BlockPool) remove(sec *section) {
} }
} }
// get/put for optimised allocation similar to sync.Pool
func (self *BlockPool) getHashSlice() (s []common.Hash) { func (self *BlockPool) getHashSlice() (s []common.Hash) {
select { select {
case s = <-self.hashSlicePool: case s = <-self.hashSlicePool:
@ -815,7 +836,6 @@ func (self *BlockPool) getHashSlice() (s []common.Hash) {
return return
} }
// Return returns a Client to the pool.
func (self *BlockPool) putHashSlice(s []common.Hash) { func (self *BlockPool) putHashSlice(s []common.Hash) {
if len(s) == self.Config.BlockBatchSize { if len(s) == self.Config.BlockBatchSize {
select { select {

View File

@ -9,6 +9,9 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
// using the mock framework in blockpool_util_test
// we test various scenarios here
func TestPeerWithKnownBlock(t *testing.T) { func TestPeerWithKnownBlock(t *testing.T) {
test.LogInit() test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t) _, blockPool, blockPoolTester := newTestBlockPool(t)
@ -44,50 +47,6 @@ func TestPeerWithKnownParentBlock(t *testing.T) {
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain) blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
} }
func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(4)
peer0 := blockPoolTester.newPeer("peer0", 2, 2)
peer1 := blockPoolTester.newPeer("peer1", 1, 1)
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[3] = 3
// pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.waitBlocksRequests(3)
blockPool.AddBlock(&types.Block{
HeaderHash: common.Hash(hashes[1]),
ParentHeaderHash: common.Hash(hashes[0]),
Td: common.Big3,
}, "peer1")
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
}
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[4] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestSimpleChain(t *testing.T) { func TestSimpleChain(t *testing.T) {
test.LogInit() test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t) _, blockPool, blockPoolTester := newTestBlockPool(t)
@ -166,6 +125,7 @@ func TestNewBlocksOnPartialChain(t *testing.T) {
go peer1.serveBlocks(4, 5) // partially complete section go peer1.serveBlocks(4, 5) // partially complete section
go peer1.serveBlockHashes(5, 4, 3) go peer1.serveBlockHashes(5, 4, 3)
peer1.serveBlocks(3, 4) // partially complete section peer1.serveBlocks(3, 4) // partially complete section
// peer1 found new blocks // peer1 found new blocks
peer1.td = 7 peer1.td = 7
peer1.currentBlock = 7 peer1.currentBlock = 7
@ -176,7 +136,6 @@ func TestNewBlocksOnPartialChain(t *testing.T) {
go peer1.serveBlocks(5, 6) go peer1.serveBlocks(5, 6)
go peer1.serveBlockHashes(3, 2, 1) // tests that hash request from known chain root is remembered go peer1.serveBlockHashes(3, 2, 1) // tests that hash request from known chain root is remembered
peer1.serveBlocks(0, 1, 2) peer1.serveBlocks(0, 1, 2)
// blockPool.RemovePeer("peer1")
blockPool.Wait(waitTimeout) blockPool.Wait(waitTimeout)
blockPool.Stop() blockPool.Stop()
@ -468,8 +427,8 @@ func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) {
peer1.AddPeer() peer1.AddPeer()
go peer1.serveBlocks(8, 9) go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7) go peer1.serveBlockHashes(9, 8, 7)
peer1.serveBlocks(3, 7, 8) // make sure this section is complete peer1.serveBlocks(3, 7, 8) // make sure this section is complete
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second) //
go peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary go peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary
peer1.serveBlocks(2, 3) // partially complete sections block 2 missing peer1.serveBlocks(2, 3) // partially complete sections block 2 missing
peer2.AddPeer() // peer2.AddPeer() //
@ -477,8 +436,7 @@ func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) {
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3 go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(2, 3, 4, 5) // block 2 still missing. peer2.serveBlocks(2, 3, 4, 5) // block 2 still missing.
blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer
// peer1.serveBlockHashes(7, 3) // tests that hash request from fork root is remembered even though section process completed go peer1.serveBlockHashes(2, 1, 0) //
go peer1.serveBlockHashes(2, 1, 0) //
peer1.serveBlocks(0, 1, 2) peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout) blockPool.Wait(waitTimeout)

View File

@ -66,7 +66,8 @@ func (self *blockPoolTester) Errorf(format string, params ...interface{}) {
} }
// blockPoolTester implements the 3 callbacks needed by the blockPool: // blockPoolTester implements the 3 callbacks needed by the blockPool:
// hasBlock, insetChain, verifyPoW // hasBlock, insetChain, verifyPoW as well as provides the eventer
// to subscribe to head insertions
func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) { func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) {
self.lock.RLock() self.lock.RLock()
defer self.lock.RUnlock() defer self.lock.RUnlock()
@ -77,6 +78,7 @@ func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) {
return return
} }
// mock insertChain relies on refBlockChain to determine block validity
func (self *blockPoolTester) insertChain(blocks types.Blocks) error { func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
@ -127,6 +129,7 @@ func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
return nil return nil
} }
// mock soft block validation always succeeds
func (self *blockPoolTester) verifyPoW(pblock pow.Block) bool { func (self *blockPoolTester) verifyPoW(pblock pow.Block) bool {
return true return true
} }
@ -152,24 +155,24 @@ func (self *blockPoolTester) checkBlockChain(blockChain map[int][]int) {
} }
} }
//
// peerTester provides the peer callbacks for the blockPool // peerTester provides the peer callbacks for the blockPool
// it registers actual callbacks so that the result can be compared to desired behaviour // it registers actual callbacks so that the result can be compared to desired behaviour
// provides helper functions to mock the protocol calls to the blockPool // provides helper functions to mock the protocol calls to the blockPool
type peerTester struct { type peerTester struct {
// containers to record request and error callbacks
blockHashesRequests []int blockHashesRequests []int
blocksRequests [][]int blocksRequests [][]int
blocksRequestsMap map[int]bool blocksRequestsMap map[int]bool
peerErrors []int peerErrors []int
blockPool *BlockPool
hashPool *test.TestHashPool blockPool *BlockPool
lock sync.RWMutex hashPool *test.TestHashPool
bt *blockPoolTester lock sync.RWMutex
id string bt *blockPoolTester
td int id string
currentBlock int td int
t *testing.T currentBlock int
t *testing.T
} }
// peerTester constructor takes hashPool and blockPool from the blockPoolTester // peerTester constructor takes hashPool and blockPool from the blockPoolTester
@ -222,6 +225,7 @@ func (self *peerTester) checkBlockHashesRequests(blocksHashesRequests ...int) {
// waiter function used by peer.serveBlocks // waiter function used by peer.serveBlocks
// blocking until requests appear // blocking until requests appear
// this mocks proper wire protocol behaviour
// since block requests are sent to any random peers // since block requests are sent to any random peers
// block request map is shared between peers // block request map is shared between peers
// times out after waitTimeout // times out after waitTimeout
@ -254,6 +258,7 @@ func (self *peerTester) waitBlocksRequests(blocksRequest ...int) {
// waiter function used by peer.serveBlockHashes // waiter function used by peer.serveBlockHashes
// blocking until requests appear // blocking until requests appear
// this mocks proper wire protocol behaviour
// times out after a period // times out after a period
func (self *peerTester) waitBlockHashesRequests(blocksHashesRequest int) { func (self *peerTester) waitBlockHashesRequests(blocksHashesRequest int) {
timeout := time.After(waitTimeout) timeout := time.After(waitTimeout)
@ -299,6 +304,7 @@ func (self *peerTester) serveBlockHashes(indexes ...int) {
self.sendBlockHashes(indexes...) self.sendBlockHashes(indexes...)
} }
// peer sends blockhashes not waiting for request
func (self *peerTester) sendBlockHashes(indexes ...int) { func (self *peerTester) sendBlockHashes(indexes ...int) {
// fmt.Printf("adding block hashes %v\n", indexes) // fmt.Printf("adding block hashes %v\n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes) hashes := self.hashPool.IndexesToHashes(indexes)
@ -315,13 +321,14 @@ func (self *peerTester) sendBlockHashes(indexes ...int) {
} }
// peer sends blocks if and when there is a request // peer sends blocks if and when there is a request
// (in the shared request store, not necessarily to a person) // (in the shared request store, not necessarily to a specific peer)
func (self *peerTester) serveBlocks(indexes ...int) { func (self *peerTester) serveBlocks(indexes ...int) {
// fmt.Printf("ready to serve blocks %v\n", indexes[1:]) // fmt.Printf("ready to serve blocks %v\n", indexes[1:])
self.waitBlocksRequests(indexes[1:]...) self.waitBlocksRequests(indexes[1:]...)
self.sendBlocks(indexes...) self.sendBlocks(indexes...)
} }
// peer sends blocks not waiting for request
func (self *peerTester) sendBlocks(indexes ...int) { func (self *peerTester) sendBlocks(indexes ...int) {
// fmt.Printf("adding blocks %v \n", indexes) // fmt.Printf("adding blocks %v \n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes) hashes := self.hashPool.IndexesToHashes(indexes)
@ -331,9 +338,10 @@ func (self *peerTester) sendBlocks(indexes ...int) {
} }
} }
// peer callbacks // the 3 mock peer callbacks
// -1 is special: not found (a hash never seen)
// records block hashes requests by the blockPool // records block hashes requests by the blockPool
// -1 is special: not found (a hash never seen)
func (self *peerTester) requestBlockHashes(hash common.Hash) error { func (self *peerTester) requestBlockHashes(hash common.Hash) error {
indexes := self.hashPool.HashesToIndexes([]common.Hash{hash}) indexes := self.hashPool.HashesToIndexes([]common.Hash{hash})
// fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4]) // fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4])

View File

@ -23,12 +23,13 @@ func TestBlockPoolConfig(t *testing.T) {
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t) test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t) test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t)
test.CheckDuration("StatusUpdateInterval", c.StatusUpdateInterval, statusUpdateInterval, t)
} }
func TestBlockPoolOverrideConfig(t *testing.T) { func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit() test.LogInit()
blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}} blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second} c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second, 4 * time.Second}
blockPool.Config = c blockPool.Config = c
blockPool.Start() blockPool.Start()
@ -42,4 +43,5 @@ func TestBlockPoolOverrideConfig(t *testing.T) {
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t) test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t) test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t)
test.CheckDuration("StatusUpdateInterval", c.StatusUpdateInterval, 4*time.Second, t)
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/errs"
) )
// the blockpool's model of a peer
type peer struct { type peer struct {
lock sync.RWMutex lock sync.RWMutex
@ -104,12 +105,14 @@ func (self *peers) peerError(id string, code int, format string, params ...inter
self.addToBlacklist(id) self.addToBlacklist(id)
} }
// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
func (self *peers) addToBlacklist(id string) { func (self *peers) addToBlacklist(id string) {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
self.blacklist[id] = time.Now() self.blacklist[id] = time.Now()
} }
// suspended checks if peer is still suspended
func (self *peers) suspended(id string) (s bool) { func (self *peers) suspended(id string) (s bool) {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
@ -160,8 +163,8 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
}() }()
} }
// distribute block request among known peers
func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
// distribute block request among known peers
self.lock.RLock() self.lock.RLock()
defer self.lock.RUnlock() defer self.lock.RUnlock()
peerCount := len(self.peers) peerCount := len(self.peers)
@ -196,7 +199,9 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
} }
// addPeer implements the logic for blockpool.AddPeer // addPeer implements the logic for blockpool.AddPeer
// returns true iff peer is promoted as best peer in the pool // returns 2 bool values
// 1. true iff peer is promoted as best peer in the pool
// 2. true iff peer is still suspended
func (self *peers) addPeer( func (self *peers) addPeer(
td *big.Int, td *big.Int,
currentBlockHash common.Hash, currentBlockHash common.Hash,
@ -214,10 +219,13 @@ func (self *peers) addPeer(
self.lock.Lock() self.lock.Lock()
p, found := self.peers[id] p, found := self.peers[id]
if found { if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
if p.currentBlockHash != currentBlockHash { if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash)) plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash) p.setChainInfo(td, currentBlockHash)
self.status.lock.Lock() self.status.lock.Lock()
self.status.values.NewBlocks++ self.status.values.NewBlocks++
self.status.lock.Unlock() self.status.lock.Unlock()
@ -235,7 +243,7 @@ func (self *peers) addPeer(
} }
self.lock.Unlock() self.lock.Unlock()
// check peer current head // check if peer's current head block is known
if self.bp.hasBlock(currentBlockHash) { if self.bp.hasBlock(currentBlockHash) {
// peer not ahead // peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
@ -255,6 +263,7 @@ func (self *peers) addPeer(
} }
best = true best = true
} else { } else {
// baseline is our own TD
currentTD := self.bp.getTD() currentTD := self.bp.getTD()
if self.best != nil { if self.best != nil {
currentTD = self.best.td currentTD = self.best.td
@ -314,6 +323,7 @@ func (self *peers) removePeer(id string) {
func (self *BlockPool) switchPeer(oldp, newp *peer) { func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain // first quit AddBlockHashes, requestHeadSection and activateChain
// by closing the old peer's switchC channel
if oldp != nil { if oldp != nil {
plog.DebugDetailf("<%s> quit peer processes", oldp.id) plog.DebugDetailf("<%s> quit peer processes", oldp.id)
close(oldp.switchC) close(oldp.switchC)
@ -366,11 +376,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
// newp activating section process changes the quit channel for this reason // newp activating section process changes the quit channel for this reason
if oldp != nil { if oldp != nil {
plog.DebugDetailf("<%s> quit section processes", oldp.id) plog.DebugDetailf("<%s> quit section processes", oldp.id)
//
close(oldp.idleC) close(oldp.idleC)
} }
} }
// getPeer looks up peer by id, returns peer and a bool value
// that is true iff peer is current best peer
func (self *peers) getPeer(id string) (p *peer, best bool) { func (self *peers) getPeer(id string) (p *peer, best bool) {
self.lock.RLock() self.lock.RLock()
defer self.lock.RUnlock() defer self.lock.RUnlock()
@ -381,6 +392,8 @@ func (self *peers) getPeer(id string) (p *peer, best bool) {
return return
} }
// head section process
func (self *peer) handleSection(sec *section) { func (self *peer) handleSection(sec *section) {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
@ -516,7 +529,7 @@ func (self *peer) run() {
LOOP: LOOP:
for { for {
select { select {
// to minitor section process behaviou // to minitor section process behaviour
case <-ping.C: case <-ping.C:
plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)

View File

@ -142,3 +142,47 @@ func TestAddPeer(t *testing.T) {
blockPool.Stop() blockPool.Stop()
} }
func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(4)
peer0 := blockPoolTester.newPeer("peer0", 2, 2)
peer1 := blockPoolTester.newPeer("peer1", 1, 1)
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[3] = 3
// pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.waitBlocksRequests(3)
blockPool.AddBlock(&types.Block{
HeaderHash: common.Bytes(hashes[1]),
ParentHeaderHash: common.Bytes(hashes[0]),
Td: common.Big3,
}, "peer1")
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
}
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[4] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}

View File

@ -7,8 +7,12 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
) )
// test helpers // hashPool is a test helper, that allows random hashes to be referred to by integers
// TODO: move into common test helper package (see p2p/crypto etc.) type TestHashPool struct {
intToHash
hashToInt
lock sync.Mutex
}
func NewHashPool() *TestHashPool { func NewHashPool() *TestHashPool {
return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)} return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)}
@ -18,13 +22,6 @@ type intToHash map[int]common.Hash
type hashToInt map[common.Hash]int type hashToInt map[common.Hash]int
// hashPool is a test helper, that allows random hashes to be referred to by integers
type TestHashPool struct {
intToHash
hashToInt
lock sync.Mutex
}
func newHash(i int) common.Hash { func newHash(i int) common.Hash {
return common.BytesToHash(crypto.Sha3([]byte(string(i)))) return common.BytesToHash(crypto.Sha3([]byte(string(i))))
} }

View File

@ -9,6 +9,8 @@ import (
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
) )
// logging in tests
var once sync.Once var once sync.Once
/* usage: /* usage:

View File

@ -6,6 +6,8 @@ import (
"time" "time"
) )
// miscellaneous test helpers
func CheckInt(name string, got int, expected int, t *testing.T) (err error) { func CheckInt(name string, got int, expected int, t *testing.T) (err error) {
if got != expected { if got != expected {
t.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got) t.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got)